1use crate::{
2 search::SearchQuery,
3 worktree_store::{WorktreeStore, WorktreeStoreEvent},
4 Item, NoRepositoryError, ProjectPath,
5};
6use anyhow::{anyhow, Context as _, Result};
7use client::Client;
8use collections::{hash_map, HashMap, HashSet};
9use fs::Fs;
10use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt};
11use git::blame::Blame;
12use gpui::{
13 AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Subscription,
14 Task, WeakModel,
15};
16use http_client::Url;
17use language::{
18 proto::{
19 deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
20 split_operations,
21 },
22 Buffer, BufferEvent, Capability, File as _, Language, Operation,
23};
24use rpc::{proto, AnyProtoClient, ErrorExt as _, TypedEnvelope};
25use smol::channel::Receiver;
26use std::{io, path::Path, str::FromStr as _, sync::Arc, time::Instant};
27use text::BufferId;
28use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
29use worktree::{File, PathChange, ProjectEntryId, UpdatedGitRepositoriesSet, Worktree, WorktreeId};
30
31trait BufferStoreImpl {
32 fn open_buffer(
33 &self,
34 path: Arc<Path>,
35 worktree: Model<Worktree>,
36 cx: &mut ModelContext<BufferStore>,
37 ) -> Task<Result<Model<Buffer>>>;
38
39 fn save_buffer(
40 &self,
41 buffer: Model<Buffer>,
42 cx: &mut ModelContext<BufferStore>,
43 ) -> Task<Result<()>>;
44
45 fn save_buffer_as(
46 &self,
47 buffer: Model<Buffer>,
48 path: ProjectPath,
49 cx: &mut ModelContext<BufferStore>,
50 ) -> Task<Result<()>>;
51
52 fn create_buffer(&self, cx: &mut ModelContext<BufferStore>) -> Task<Result<Model<Buffer>>>;
53
54 fn reload_buffers(
55 &self,
56 buffers: Vec<Model<Buffer>>,
57 push_to_history: bool,
58 cx: &mut ModelContext<BufferStore>,
59 ) -> Task<Result<ProjectTransaction>>;
60
61 fn as_remote(&self) -> Option<Model<RemoteBufferStore>>;
62 fn as_local(&self) -> Option<Model<LocalBufferStore>>;
63}
64
65struct RemoteBufferStore {
66 shared_with_me: HashSet<Model<Buffer>>,
67 upstream_client: AnyProtoClient,
68 project_id: u64,
69 loading_remote_buffers_by_id: HashMap<BufferId, Model<Buffer>>,
70 remote_buffer_listeners:
71 HashMap<BufferId, Vec<oneshot::Sender<Result<Model<Buffer>, anyhow::Error>>>>,
72 worktree_store: Model<WorktreeStore>,
73 buffer_store: WeakModel<BufferStore>,
74}
75
76struct LocalBufferStore {
77 local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
78 local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
79 buffer_store: WeakModel<BufferStore>,
80 worktree_store: Model<WorktreeStore>,
81 _subscription: Subscription,
82}
83
84/// A set of open buffers.
85pub struct BufferStore {
86 state: Box<dyn BufferStoreImpl>,
87 #[allow(clippy::type_complexity)]
88 loading_buffers_by_path: HashMap<
89 ProjectPath,
90 postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
91 >,
92 worktree_store: Model<WorktreeStore>,
93 opened_buffers: HashMap<BufferId, OpenBuffer>,
94 downstream_client: Option<(AnyProtoClient, u64)>,
95 shared_buffers: HashMap<proto::PeerId, HashSet<Model<Buffer>>>,
96}
97
98enum OpenBuffer {
99 Buffer(WeakModel<Buffer>),
100 Operations(Vec<Operation>),
101}
102
103pub enum BufferStoreEvent {
104 BufferAdded(Model<Buffer>),
105 BufferDropped(BufferId),
106 BufferChangedFilePath {
107 buffer: Model<Buffer>,
108 old_file: Option<Arc<dyn language::File>>,
109 },
110}
111
112#[derive(Default, Debug)]
113pub struct ProjectTransaction(pub HashMap<Model<Buffer>, language::Transaction>);
114
115impl EventEmitter<BufferStoreEvent> for BufferStore {}
116
117impl RemoteBufferStore {
118 pub fn wait_for_remote_buffer(
119 &mut self,
120 id: BufferId,
121 cx: &mut AppContext,
122 ) -> Task<Result<Model<Buffer>>> {
123 let buffer_store = self.buffer_store.clone();
124 let (tx, rx) = oneshot::channel();
125 self.remote_buffer_listeners.entry(id).or_default().push(tx);
126
127 cx.spawn(|cx| async move {
128 if let Some(buffer) = buffer_store
129 .read_with(&cx, |buffer_store, _| buffer_store.get(id))
130 .ok()
131 .flatten()
132 {
133 return Ok(buffer);
134 }
135
136 cx.background_executor()
137 .spawn(async move { rx.await? })
138 .await
139 })
140 }
141
142 fn save_remote_buffer(
143 &self,
144 buffer_handle: Model<Buffer>,
145 new_path: Option<proto::ProjectPath>,
146 cx: &ModelContext<Self>,
147 ) -> Task<Result<()>> {
148 let buffer = buffer_handle.read(cx);
149 let buffer_id = buffer.remote_id().into();
150 let version = buffer.version();
151 let rpc = self.upstream_client.clone();
152 let project_id = self.project_id;
153 cx.spawn(move |_, mut cx| async move {
154 let response = rpc
155 .request(proto::SaveBuffer {
156 project_id,
157 buffer_id,
158 new_path,
159 version: serialize_version(&version),
160 })
161 .await?;
162 let version = deserialize_version(&response.version);
163 let mtime = response.mtime.map(|mtime| mtime.into());
164
165 buffer_handle.update(&mut cx, |buffer, cx| {
166 buffer.did_save(version.clone(), mtime, cx);
167 })?;
168
169 Ok(())
170 })
171 }
172
173 pub fn handle_create_buffer_for_peer(
174 &mut self,
175 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
176 replica_id: u16,
177 capability: Capability,
178 cx: &mut ModelContext<Self>,
179 ) -> Result<Option<Model<Buffer>>> {
180 match envelope
181 .payload
182 .variant
183 .ok_or_else(|| anyhow!("missing variant"))?
184 {
185 proto::create_buffer_for_peer::Variant::State(mut state) => {
186 let buffer_id = BufferId::new(state.id)?;
187
188 let buffer_result = maybe!({
189 let mut buffer_file = None;
190 if let Some(file) = state.file.take() {
191 let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
192 let worktree = self
193 .worktree_store
194 .read(cx)
195 .worktree_for_id(worktree_id, cx)
196 .ok_or_else(|| {
197 anyhow!("no worktree found for id {}", file.worktree_id)
198 })?;
199 buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
200 as Arc<dyn language::File>);
201 }
202 Buffer::from_proto(replica_id, capability, state, buffer_file)
203 });
204
205 match buffer_result {
206 Ok(buffer) => {
207 let buffer = cx.new_model(|_| buffer);
208 self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
209 }
210 Err(error) => {
211 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
212 for listener in listeners {
213 listener.send(Err(anyhow!(error.cloned()))).ok();
214 }
215 }
216 }
217 }
218 }
219 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
220 let buffer_id = BufferId::new(chunk.buffer_id)?;
221 let buffer = self
222 .loading_remote_buffers_by_id
223 .get(&buffer_id)
224 .cloned()
225 .ok_or_else(|| {
226 anyhow!(
227 "received chunk for buffer {} without initial state",
228 chunk.buffer_id
229 )
230 })?;
231
232 let result = maybe!({
233 let operations = chunk
234 .operations
235 .into_iter()
236 .map(language::proto::deserialize_operation)
237 .collect::<Result<Vec<_>>>()?;
238 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
239 anyhow::Ok(())
240 });
241
242 if let Err(error) = result {
243 self.loading_remote_buffers_by_id.remove(&buffer_id);
244 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
245 for listener in listeners {
246 listener.send(Err(error.cloned())).ok();
247 }
248 }
249 } else if chunk.is_last {
250 self.loading_remote_buffers_by_id.remove(&buffer_id);
251 if self.upstream_client.is_via_collab() {
252 // retain buffers sent by peers to avoid races.
253 self.shared_with_me.insert(buffer.clone());
254 }
255
256 if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) {
257 for sender in senders {
258 sender.send(Ok(buffer.clone())).ok();
259 }
260 }
261 return Ok(Some(buffer));
262 }
263 }
264 }
265 return Ok(None);
266 }
267
268 pub fn incomplete_buffer_ids(&self) -> Vec<BufferId> {
269 self.loading_remote_buffers_by_id
270 .keys()
271 .copied()
272 .collect::<Vec<_>>()
273 }
274
275 pub fn deserialize_project_transaction(
276 &self,
277 message: proto::ProjectTransaction,
278 push_to_history: bool,
279 cx: &mut ModelContext<Self>,
280 ) -> Task<Result<ProjectTransaction>> {
281 cx.spawn(|this, mut cx| async move {
282 let mut project_transaction = ProjectTransaction::default();
283 for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
284 {
285 let buffer_id = BufferId::new(buffer_id)?;
286 let buffer = this
287 .update(&mut cx, |this, cx| {
288 this.wait_for_remote_buffer(buffer_id, cx)
289 })?
290 .await?;
291 let transaction = language::proto::deserialize_transaction(transaction)?;
292 project_transaction.0.insert(buffer, transaction);
293 }
294
295 for (buffer, transaction) in &project_transaction.0 {
296 buffer
297 .update(&mut cx, |buffer, _| {
298 buffer.wait_for_edits(transaction.edit_ids.iter().copied())
299 })?
300 .await?;
301
302 if push_to_history {
303 buffer.update(&mut cx, |buffer, _| {
304 buffer.push_transaction(transaction.clone(), Instant::now());
305 })?;
306 }
307 }
308
309 Ok(project_transaction)
310 })
311 }
312}
313
314impl BufferStoreImpl for Model<RemoteBufferStore> {
315 fn as_remote(&self) -> Option<Model<RemoteBufferStore>> {
316 Some(self.clone())
317 }
318
319 fn as_local(&self) -> Option<Model<LocalBufferStore>> {
320 None
321 }
322
323 fn save_buffer(
324 &self,
325 buffer: Model<Buffer>,
326 cx: &mut ModelContext<BufferStore>,
327 ) -> Task<Result<()>> {
328 self.update(cx, |this, cx| {
329 this.save_remote_buffer(buffer.clone(), None, cx)
330 })
331 }
332 fn save_buffer_as(
333 &self,
334 buffer: Model<Buffer>,
335 path: ProjectPath,
336 cx: &mut ModelContext<BufferStore>,
337 ) -> Task<Result<()>> {
338 self.update(cx, |this, cx| {
339 this.save_remote_buffer(buffer, Some(path.to_proto()), cx)
340 })
341 }
342
343 fn open_buffer(
344 &self,
345 path: Arc<Path>,
346 worktree: Model<Worktree>,
347 cx: &mut ModelContext<BufferStore>,
348 ) -> Task<Result<Model<Buffer>>> {
349 self.update(cx, |this, cx| {
350 let worktree_id = worktree.read(cx).id().to_proto();
351 let project_id = this.project_id;
352 let client = this.upstream_client.clone();
353 let path_string = path.clone().to_string_lossy().to_string();
354 cx.spawn(move |this, mut cx| async move {
355 let response = client
356 .request(proto::OpenBufferByPath {
357 project_id,
358 worktree_id,
359 path: path_string,
360 })
361 .await?;
362 let buffer_id = BufferId::new(response.buffer_id)?;
363
364 let buffer = this
365 .update(&mut cx, {
366 |this, cx| this.wait_for_remote_buffer(buffer_id, cx)
367 })?
368 .await?;
369
370 Ok(buffer)
371 })
372 })
373 }
374
375 fn create_buffer(&self, cx: &mut ModelContext<BufferStore>) -> Task<Result<Model<Buffer>>> {
376 self.update(cx, |this, cx| {
377 let create = this.upstream_client.request(proto::OpenNewBuffer {
378 project_id: this.project_id,
379 });
380 cx.spawn(|this, mut cx| async move {
381 let response = create.await?;
382 let buffer_id = BufferId::new(response.buffer_id)?;
383
384 this.update(&mut cx, |this, cx| {
385 this.wait_for_remote_buffer(buffer_id, cx)
386 })?
387 .await
388 })
389 })
390 }
391
392 fn reload_buffers(
393 &self,
394 buffers: Vec<Model<Buffer>>,
395 push_to_history: bool,
396 cx: &mut ModelContext<BufferStore>,
397 ) -> Task<Result<ProjectTransaction>> {
398 self.update(cx, |this, cx| {
399 let request = this.upstream_client.request(proto::ReloadBuffers {
400 project_id: this.project_id,
401 buffer_ids: buffers
402 .iter()
403 .map(|buffer| buffer.read(cx).remote_id().to_proto())
404 .collect(),
405 });
406
407 cx.spawn(|this, mut cx| async move {
408 let response = request
409 .await?
410 .transaction
411 .ok_or_else(|| anyhow!("missing transaction"))?;
412 this.update(&mut cx, |this, cx| {
413 this.deserialize_project_transaction(response, push_to_history, cx)
414 })?
415 .await
416 })
417 })
418 }
419}
420
421impl LocalBufferStore {
422 fn save_local_buffer(
423 &self,
424 buffer_handle: Model<Buffer>,
425 worktree: Model<Worktree>,
426 path: Arc<Path>,
427 mut has_changed_file: bool,
428 cx: &mut ModelContext<Self>,
429 ) -> Task<Result<()>> {
430 let buffer = buffer_handle.read(cx);
431
432 let text = buffer.as_rope().clone();
433 let line_ending = buffer.line_ending();
434 let version = buffer.version();
435 let buffer_id = buffer.remote_id();
436 if buffer.file().is_some_and(|file| !file.is_created()) {
437 has_changed_file = true;
438 }
439
440 let save = worktree.update(cx, |worktree, cx| {
441 worktree.write_file(path.as_ref(), text, line_ending, cx)
442 });
443
444 cx.spawn(move |this, mut cx| async move {
445 let new_file = save.await?;
446 let mtime = new_file.mtime;
447 this.update(&mut cx, |this, cx| {
448 if let Some((downstream_client, project_id)) = this.downstream_client(cx) {
449 if has_changed_file {
450 downstream_client
451 .send(proto::UpdateBufferFile {
452 project_id,
453 buffer_id: buffer_id.to_proto(),
454 file: Some(language::File::to_proto(&*new_file, cx)),
455 })
456 .log_err();
457 }
458 downstream_client
459 .send(proto::BufferSaved {
460 project_id,
461 buffer_id: buffer_id.to_proto(),
462 version: serialize_version(&version),
463 mtime: mtime.map(|time| time.into()),
464 })
465 .log_err();
466 }
467 })?;
468 buffer_handle.update(&mut cx, |buffer, cx| {
469 if has_changed_file {
470 buffer.file_updated(new_file, cx);
471 }
472 buffer.did_save(version.clone(), mtime, cx);
473 })
474 })
475 }
476
477 fn subscribe_to_worktree(&mut self, worktree: &Model<Worktree>, cx: &mut ModelContext<Self>) {
478 cx.subscribe(worktree, |this, worktree, event, cx| {
479 if worktree.read(cx).is_local() {
480 match event {
481 worktree::Event::UpdatedEntries(changes) => {
482 this.local_worktree_entries_changed(&worktree, changes, cx);
483 }
484 worktree::Event::UpdatedGitRepositories(updated_repos) => {
485 this.local_worktree_git_repos_changed(worktree.clone(), updated_repos, cx)
486 }
487 _ => {}
488 }
489 }
490 })
491 .detach();
492 }
493
494 fn local_worktree_entries_changed(
495 &mut self,
496 worktree_handle: &Model<Worktree>,
497 changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
498 cx: &mut ModelContext<Self>,
499 ) {
500 let snapshot = worktree_handle.read(cx).snapshot();
501 for (path, entry_id, _) in changes {
502 self.local_worktree_entry_changed(*entry_id, path, worktree_handle, &snapshot, cx);
503 }
504 }
505
506 fn local_worktree_git_repos_changed(
507 &mut self,
508 worktree_handle: Model<Worktree>,
509 changed_repos: &UpdatedGitRepositoriesSet,
510 cx: &mut ModelContext<Self>,
511 ) {
512 debug_assert!(worktree_handle.read(cx).is_local());
513 let Some(buffer_store) = self.buffer_store.upgrade() else {
514 return;
515 };
516
517 // Identify the loading buffers whose containing repository that has changed.
518 let (future_buffers, current_buffers) = buffer_store.update(cx, |buffer_store, cx| {
519 let future_buffers = buffer_store
520 .loading_buffers()
521 .filter_map(|(project_path, receiver)| {
522 if project_path.worktree_id != worktree_handle.read(cx).id() {
523 return None;
524 }
525 let path = &project_path.path;
526 changed_repos
527 .iter()
528 .find(|(work_dir, _)| path.starts_with(work_dir))?;
529 let path = path.clone();
530 Some(async move {
531 BufferStore::wait_for_loading_buffer(receiver)
532 .await
533 .ok()
534 .map(|buffer| (buffer, path))
535 })
536 })
537 .collect::<FuturesUnordered<_>>();
538
539 // Identify the current buffers whose containing repository has changed.
540 let current_buffers = buffer_store
541 .buffers()
542 .filter_map(|buffer| {
543 let file = File::from_dyn(buffer.read(cx).file())?;
544 if file.worktree != worktree_handle {
545 return None;
546 }
547 changed_repos
548 .iter()
549 .find(|(work_dir, _)| file.path.starts_with(work_dir))?;
550 Some((buffer, file.path.clone()))
551 })
552 .collect::<Vec<_>>();
553 (future_buffers, current_buffers)
554 });
555
556 if future_buffers.len() + current_buffers.len() == 0 {
557 return;
558 }
559
560 cx.spawn(move |this, mut cx| async move {
561 // Wait for all of the buffers to load.
562 let future_buffers = future_buffers.collect::<Vec<_>>().await;
563
564 // Reload the diff base for every buffer whose containing git repository has changed.
565 let snapshot =
566 worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
567 let diff_bases_by_buffer = cx
568 .background_executor()
569 .spawn(async move {
570 let mut diff_base_tasks = future_buffers
571 .into_iter()
572 .flatten()
573 .chain(current_buffers)
574 .filter_map(|(buffer, path)| {
575 let (repo_entry, local_repo_entry) = snapshot.repo_for_path(&path)?;
576 let relative_path = repo_entry.relativize(&snapshot, &path).ok()?;
577 Some(async move {
578 let base_text =
579 local_repo_entry.repo().load_index_text(&relative_path);
580 Some((buffer, base_text))
581 })
582 })
583 .collect::<FuturesUnordered<_>>();
584
585 let mut diff_bases = Vec::with_capacity(diff_base_tasks.len());
586 while let Some(diff_base) = diff_base_tasks.next().await {
587 if let Some(diff_base) = diff_base {
588 diff_bases.push(diff_base);
589 }
590 }
591 diff_bases
592 })
593 .await;
594
595 this.update(&mut cx, |this, cx| {
596 // Assign the new diff bases on all of the buffers.
597 for (buffer, diff_base) in diff_bases_by_buffer {
598 let buffer_id = buffer.update(cx, |buffer, cx| {
599 buffer.set_diff_base(diff_base.clone(), cx);
600 buffer.remote_id().to_proto()
601 });
602 if let Some((client, project_id)) = &this.downstream_client(cx) {
603 client
604 .send(proto::UpdateDiffBase {
605 project_id: *project_id,
606 buffer_id,
607 diff_base,
608 })
609 .log_err();
610 }
611 }
612 })
613 })
614 .detach_and_log_err(cx);
615 }
616
617 fn local_worktree_entry_changed(
618 &mut self,
619 entry_id: ProjectEntryId,
620 path: &Arc<Path>,
621 worktree: &Model<worktree::Worktree>,
622 snapshot: &worktree::Snapshot,
623 cx: &mut ModelContext<Self>,
624 ) -> Option<()> {
625 let project_path = ProjectPath {
626 worktree_id: snapshot.id(),
627 path: path.clone(),
628 };
629 let buffer_id = match self.local_buffer_ids_by_entry_id.get(&entry_id) {
630 Some(&buffer_id) => buffer_id,
631 None => self.local_buffer_ids_by_path.get(&project_path).copied()?,
632 };
633 let buffer = self
634 .buffer_store
635 .update(cx, |buffer_store, _| {
636 if let Some(buffer) = buffer_store.get(buffer_id) {
637 Some(buffer)
638 } else {
639 buffer_store.opened_buffers.remove(&buffer_id);
640 None
641 }
642 })
643 .ok()
644 .flatten();
645 let buffer = if let Some(buffer) = buffer {
646 buffer
647 } else {
648 self.local_buffer_ids_by_path.remove(&project_path);
649 self.local_buffer_ids_by_entry_id.remove(&entry_id);
650 return None;
651 };
652
653 let events = buffer.update(cx, |buffer, cx| {
654 let file = buffer.file()?;
655 let old_file = File::from_dyn(Some(file))?;
656 if old_file.worktree != *worktree {
657 return None;
658 }
659
660 let new_file = if let Some(entry) = old_file
661 .entry_id
662 .and_then(|entry_id| snapshot.entry_for_id(entry_id))
663 {
664 File {
665 is_local: true,
666 entry_id: Some(entry.id),
667 mtime: entry.mtime,
668 path: entry.path.clone(),
669 worktree: worktree.clone(),
670 is_deleted: false,
671 is_private: entry.is_private,
672 }
673 } else if let Some(entry) = snapshot.entry_for_path(old_file.path.as_ref()) {
674 File {
675 is_local: true,
676 entry_id: Some(entry.id),
677 mtime: entry.mtime,
678 path: entry.path.clone(),
679 worktree: worktree.clone(),
680 is_deleted: false,
681 is_private: entry.is_private,
682 }
683 } else {
684 File {
685 is_local: true,
686 entry_id: old_file.entry_id,
687 path: old_file.path.clone(),
688 mtime: old_file.mtime,
689 worktree: worktree.clone(),
690 is_deleted: true,
691 is_private: old_file.is_private,
692 }
693 };
694
695 if new_file == *old_file {
696 return None;
697 }
698
699 let mut events = Vec::new();
700 if new_file.path != old_file.path {
701 self.local_buffer_ids_by_path.remove(&ProjectPath {
702 path: old_file.path.clone(),
703 worktree_id: old_file.worktree_id(cx),
704 });
705 self.local_buffer_ids_by_path.insert(
706 ProjectPath {
707 worktree_id: new_file.worktree_id(cx),
708 path: new_file.path.clone(),
709 },
710 buffer_id,
711 );
712 events.push(BufferStoreEvent::BufferChangedFilePath {
713 buffer: cx.handle(),
714 old_file: buffer.file().cloned(),
715 });
716 }
717
718 if new_file.entry_id != old_file.entry_id {
719 if let Some(entry_id) = old_file.entry_id {
720 self.local_buffer_ids_by_entry_id.remove(&entry_id);
721 }
722 if let Some(entry_id) = new_file.entry_id {
723 self.local_buffer_ids_by_entry_id
724 .insert(entry_id, buffer_id);
725 }
726 }
727
728 if let Some((client, project_id)) = &self.downstream_client(cx) {
729 client
730 .send(proto::UpdateBufferFile {
731 project_id: *project_id,
732 buffer_id: buffer_id.to_proto(),
733 file: Some(new_file.to_proto(cx)),
734 })
735 .ok();
736 }
737
738 buffer.file_updated(Arc::new(new_file), cx);
739 Some(events)
740 })?;
741 self.buffer_store
742 .update(cx, |_buffer_store, cx| {
743 for event in events {
744 cx.emit(event);
745 }
746 })
747 .log_err()?;
748
749 None
750 }
751
752 fn downstream_client(&self, cx: &AppContext) -> Option<(AnyProtoClient, u64)> {
753 self.buffer_store
754 .upgrade()?
755 .read(cx)
756 .downstream_client
757 .clone()
758 }
759
760 fn buffer_changed_file(&mut self, buffer: Model<Buffer>, cx: &mut AppContext) -> Option<()> {
761 let file = File::from_dyn(buffer.read(cx).file())?;
762
763 let remote_id = buffer.read(cx).remote_id();
764 if let Some(entry_id) = file.entry_id {
765 match self.local_buffer_ids_by_entry_id.get(&entry_id) {
766 Some(_) => {
767 return None;
768 }
769 None => {
770 self.local_buffer_ids_by_entry_id
771 .insert(entry_id, remote_id);
772 }
773 }
774 };
775 self.local_buffer_ids_by_path.insert(
776 ProjectPath {
777 worktree_id: file.worktree_id(cx),
778 path: file.path.clone(),
779 },
780 remote_id,
781 );
782
783 Some(())
784 }
785}
786
787impl BufferStoreImpl for Model<LocalBufferStore> {
788 fn as_remote(&self) -> Option<Model<RemoteBufferStore>> {
789 None
790 }
791
792 fn as_local(&self) -> Option<Model<LocalBufferStore>> {
793 Some(self.clone())
794 }
795
796 fn save_buffer(
797 &self,
798 buffer: Model<Buffer>,
799 cx: &mut ModelContext<BufferStore>,
800 ) -> Task<Result<()>> {
801 self.update(cx, |this, cx| {
802 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
803 return Task::ready(Err(anyhow!("buffer doesn't have a file")));
804 };
805 let worktree = file.worktree.clone();
806 this.save_local_buffer(buffer, worktree, file.path.clone(), false, cx)
807 })
808 }
809
810 fn save_buffer_as(
811 &self,
812 buffer: Model<Buffer>,
813 path: ProjectPath,
814 cx: &mut ModelContext<BufferStore>,
815 ) -> Task<Result<()>> {
816 self.update(cx, |this, cx| {
817 let Some(worktree) = this
818 .worktree_store
819 .read(cx)
820 .worktree_for_id(path.worktree_id, cx)
821 else {
822 return Task::ready(Err(anyhow!("no such worktree")));
823 };
824 this.save_local_buffer(buffer, worktree, path.path.clone(), true, cx)
825 })
826 }
827
828 fn open_buffer(
829 &self,
830 path: Arc<Path>,
831 worktree: Model<Worktree>,
832 cx: &mut ModelContext<BufferStore>,
833 ) -> Task<Result<Model<Buffer>>> {
834 let buffer_store = cx.weak_model();
835 self.update(cx, |_, cx| {
836 let load_buffer = worktree.update(cx, |worktree, cx| {
837 let load_file = worktree.load_file(path.as_ref(), cx);
838 let reservation = cx.reserve_model();
839 let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
840 cx.spawn(move |_, mut cx| async move {
841 let loaded = load_file.await?;
842 let text_buffer = cx
843 .background_executor()
844 .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
845 .await;
846 cx.insert_model(reservation, |_| {
847 Buffer::build(
848 text_buffer,
849 loaded.diff_base,
850 Some(loaded.file),
851 Capability::ReadWrite,
852 )
853 })
854 })
855 });
856
857 cx.spawn(move |this, mut cx| async move {
858 let buffer = match load_buffer.await {
859 Ok(buffer) => Ok(buffer),
860 Err(error) if is_not_found_error(&error) => cx.new_model(|cx| {
861 let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
862 let text_buffer = text::Buffer::new(0, buffer_id, "".into());
863 Buffer::build(
864 text_buffer,
865 None,
866 Some(Arc::new(File {
867 worktree,
868 path,
869 mtime: None,
870 entry_id: None,
871 is_local: true,
872 is_deleted: false,
873 is_private: false,
874 })),
875 Capability::ReadWrite,
876 )
877 }),
878 Err(e) => Err(e),
879 }?;
880 this.update(&mut cx, |this, cx| {
881 buffer_store.update(cx, |buffer_store, cx| {
882 buffer_store.add_buffer(buffer.clone(), cx)
883 })??;
884 let buffer_id = buffer.read(cx).remote_id();
885 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
886 this.local_buffer_ids_by_path.insert(
887 ProjectPath {
888 worktree_id: file.worktree_id(cx),
889 path: file.path.clone(),
890 },
891 buffer_id,
892 );
893
894 if let Some(entry_id) = file.entry_id {
895 this.local_buffer_ids_by_entry_id
896 .insert(entry_id, buffer_id);
897 }
898 }
899
900 anyhow::Ok(())
901 })??;
902
903 Ok(buffer)
904 })
905 })
906 }
907
908 fn create_buffer(&self, cx: &mut ModelContext<BufferStore>) -> Task<Result<Model<Buffer>>> {
909 let handle = self.clone();
910 cx.spawn(|buffer_store, mut cx| async move {
911 let buffer = cx.new_model(|cx| {
912 Buffer::local("", cx).with_language(language::PLAIN_TEXT.clone(), cx)
913 })?;
914 buffer_store.update(&mut cx, |buffer_store, cx| {
915 buffer_store.add_buffer(buffer.clone(), cx).log_err();
916 let buffer_id = buffer.read(cx).remote_id();
917 handle.update(cx, |this, cx| {
918 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
919 this.local_buffer_ids_by_path.insert(
920 ProjectPath {
921 worktree_id: file.worktree_id(cx),
922 path: file.path.clone(),
923 },
924 buffer_id,
925 );
926
927 if let Some(entry_id) = file.entry_id {
928 this.local_buffer_ids_by_entry_id
929 .insert(entry_id, buffer_id);
930 }
931 }
932 });
933 })?;
934 Ok(buffer)
935 })
936 }
937
938 fn reload_buffers(
939 &self,
940 buffers: Vec<Model<Buffer>>,
941 push_to_history: bool,
942 cx: &mut ModelContext<BufferStore>,
943 ) -> Task<Result<ProjectTransaction>> {
944 cx.spawn(move |_, mut cx| async move {
945 let mut project_transaction = ProjectTransaction::default();
946 for buffer in buffers {
947 let transaction = buffer
948 .update(&mut cx, |buffer, cx| buffer.reload(cx))?
949 .await?;
950 buffer.update(&mut cx, |buffer, cx| {
951 if let Some(transaction) = transaction {
952 if !push_to_history {
953 buffer.forget_transaction(transaction.id);
954 }
955 project_transaction.0.insert(cx.handle(), transaction);
956 }
957 })?;
958 }
959
960 Ok(project_transaction)
961 })
962 }
963}
964
965impl BufferStore {
966 pub fn init(client: &AnyProtoClient) {
967 client.add_model_message_handler(Self::handle_buffer_reloaded);
968 client.add_model_message_handler(Self::handle_buffer_saved);
969 client.add_model_message_handler(Self::handle_update_buffer_file);
970 client.add_model_message_handler(Self::handle_update_diff_base);
971 client.add_model_request_handler(Self::handle_save_buffer);
972 client.add_model_request_handler(Self::handle_blame_buffer);
973 client.add_model_request_handler(Self::handle_reload_buffers);
974 }
975
976 /// Creates a buffer store, optionally retaining its buffers.
977 pub fn local(worktree_store: Model<WorktreeStore>, cx: &mut ModelContext<Self>) -> Self {
978 let this = cx.weak_model();
979 Self {
980 state: Box::new(cx.new_model(|cx| {
981 let subscription = cx.subscribe(
982 &worktree_store,
983 |this: &mut LocalBufferStore, _, event, cx| {
984 if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
985 this.subscribe_to_worktree(worktree, cx);
986 }
987 },
988 );
989
990 LocalBufferStore {
991 local_buffer_ids_by_path: Default::default(),
992 local_buffer_ids_by_entry_id: Default::default(),
993 buffer_store: this,
994 worktree_store: worktree_store.clone(),
995 _subscription: subscription,
996 }
997 })),
998 downstream_client: None,
999 opened_buffers: Default::default(),
1000 shared_buffers: Default::default(),
1001 loading_buffers_by_path: Default::default(),
1002 worktree_store,
1003 }
1004 }
1005
1006 pub fn remote(
1007 worktree_store: Model<WorktreeStore>,
1008 upstream_client: AnyProtoClient,
1009 remote_id: u64,
1010 cx: &mut ModelContext<Self>,
1011 ) -> Self {
1012 let this = cx.weak_model();
1013 Self {
1014 state: Box::new(cx.new_model(|_| RemoteBufferStore {
1015 shared_with_me: Default::default(),
1016 loading_remote_buffers_by_id: Default::default(),
1017 remote_buffer_listeners: Default::default(),
1018 project_id: remote_id,
1019 upstream_client,
1020 worktree_store: worktree_store.clone(),
1021 buffer_store: this,
1022 })),
1023 downstream_client: None,
1024 opened_buffers: Default::default(),
1025 loading_buffers_by_path: Default::default(),
1026 shared_buffers: Default::default(),
1027 worktree_store,
1028 }
1029 }
1030
1031 pub fn open_buffer(
1032 &mut self,
1033 project_path: ProjectPath,
1034 cx: &mut ModelContext<Self>,
1035 ) -> Task<Result<Model<Buffer>>> {
1036 let existing_buffer = self.get_by_path(&project_path, cx);
1037 if let Some(existing_buffer) = existing_buffer {
1038 return Task::ready(Ok(existing_buffer));
1039 }
1040
1041 let Some(worktree) = self
1042 .worktree_store
1043 .read(cx)
1044 .worktree_for_id(project_path.worktree_id, cx)
1045 else {
1046 return Task::ready(Err(anyhow!("no such worktree")));
1047 };
1048
1049 let loading_watch = match self.loading_buffers_by_path.entry(project_path.clone()) {
1050 // If the given path is already being loaded, then wait for that existing
1051 // task to complete and return the same buffer.
1052 hash_map::Entry::Occupied(e) => e.get().clone(),
1053
1054 // Otherwise, record the fact that this path is now being loaded.
1055 hash_map::Entry::Vacant(entry) => {
1056 let (mut tx, rx) = postage::watch::channel();
1057 entry.insert(rx.clone());
1058
1059 let project_path = project_path.clone();
1060 let load_buffer = self
1061 .state
1062 .open_buffer(project_path.path.clone(), worktree, cx);
1063
1064 cx.spawn(move |this, mut cx| async move {
1065 let load_result = load_buffer.await;
1066 *tx.borrow_mut() = Some(this.update(&mut cx, |this, _cx| {
1067 // Record the fact that the buffer is no longer loading.
1068 this.loading_buffers_by_path.remove(&project_path);
1069 let buffer = load_result.map_err(Arc::new)?;
1070 Ok(buffer)
1071 })?);
1072 anyhow::Ok(())
1073 })
1074 .detach();
1075 rx
1076 }
1077 };
1078
1079 cx.background_executor().spawn(async move {
1080 Self::wait_for_loading_buffer(loading_watch)
1081 .await
1082 .map_err(|e| e.cloned())
1083 })
1084 }
1085
1086 pub fn create_buffer(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<Model<Buffer>>> {
1087 self.state.create_buffer(cx)
1088 }
1089
1090 pub fn save_buffer(
1091 &mut self,
1092 buffer: Model<Buffer>,
1093 cx: &mut ModelContext<Self>,
1094 ) -> Task<Result<()>> {
1095 self.state.save_buffer(buffer, cx)
1096 }
1097
1098 pub fn save_buffer_as(
1099 &mut self,
1100 buffer: Model<Buffer>,
1101 path: ProjectPath,
1102 cx: &mut ModelContext<Self>,
1103 ) -> Task<Result<()>> {
1104 let old_file = buffer.read(cx).file().cloned();
1105 let task = self.state.save_buffer_as(buffer.clone(), path, cx);
1106 cx.spawn(|this, mut cx| async move {
1107 task.await?;
1108 this.update(&mut cx, |_, cx| {
1109 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1110 })
1111 })
1112 }
1113
1114 pub fn blame_buffer(
1115 &self,
1116 buffer: &Model<Buffer>,
1117 version: Option<clock::Global>,
1118 cx: &AppContext,
1119 ) -> Task<Result<Blame>> {
1120 let buffer = buffer.read(cx);
1121 let Some(file) = File::from_dyn(buffer.file()) else {
1122 return Task::ready(Err(anyhow!("buffer has no file")));
1123 };
1124
1125 match file.worktree.clone().read(cx) {
1126 Worktree::Local(worktree) => {
1127 let worktree = worktree.snapshot();
1128 let blame_params = maybe!({
1129 let (repo_entry, local_repo_entry) = match worktree.repo_for_path(&file.path) {
1130 Some(repo_for_path) => repo_for_path,
1131 None => anyhow::bail!(NoRepositoryError {}),
1132 };
1133
1134 let relative_path = repo_entry
1135 .relativize(&worktree, &file.path)
1136 .context("failed to relativize buffer path")?;
1137
1138 let repo = local_repo_entry.repo().clone();
1139
1140 let content = match version {
1141 Some(version) => buffer.rope_for_version(&version).clone(),
1142 None => buffer.as_rope().clone(),
1143 };
1144
1145 anyhow::Ok((repo, relative_path, content))
1146 });
1147
1148 cx.background_executor().spawn(async move {
1149 let (repo, relative_path, content) = blame_params?;
1150 repo.blame(&relative_path, content)
1151 .with_context(|| format!("Failed to blame {:?}", relative_path.0))
1152 })
1153 }
1154 Worktree::Remote(worktree) => {
1155 let buffer_id = buffer.remote_id();
1156 let version = buffer.version();
1157 let project_id = worktree.project_id();
1158 let client = worktree.client();
1159 cx.spawn(|_| async move {
1160 let response = client
1161 .request(proto::BlameBuffer {
1162 project_id,
1163 buffer_id: buffer_id.into(),
1164 version: serialize_version(&version),
1165 })
1166 .await?;
1167 Ok(deserialize_blame_buffer_response(response))
1168 })
1169 }
1170 }
1171 }
1172
1173 fn add_buffer(&mut self, buffer: Model<Buffer>, cx: &mut ModelContext<Self>) -> Result<()> {
1174 let remote_id = buffer.read(cx).remote_id();
1175 let is_remote = buffer.read(cx).replica_id() != 0;
1176 let open_buffer = OpenBuffer::Buffer(buffer.downgrade());
1177
1178 let handle = cx.handle().downgrade();
1179 buffer.update(cx, move |_, cx| {
1180 cx.on_release(move |buffer, cx| {
1181 handle
1182 .update(cx, |_, cx| {
1183 cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
1184 })
1185 .ok();
1186 })
1187 .detach()
1188 });
1189
1190 match self.opened_buffers.entry(remote_id) {
1191 hash_map::Entry::Vacant(entry) => {
1192 entry.insert(open_buffer);
1193 }
1194 hash_map::Entry::Occupied(mut entry) => {
1195 if let OpenBuffer::Operations(operations) = entry.get_mut() {
1196 buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
1197 } else if entry.get().upgrade().is_some() {
1198 if is_remote {
1199 return Ok(());
1200 } else {
1201 debug_panic!("buffer {} was already registered", remote_id);
1202 Err(anyhow!("buffer {} was already registered", remote_id))?;
1203 }
1204 }
1205 entry.insert(open_buffer);
1206 }
1207 }
1208
1209 cx.subscribe(&buffer, Self::on_buffer_event).detach();
1210 cx.emit(BufferStoreEvent::BufferAdded(buffer));
1211 Ok(())
1212 }
1213
1214 pub fn buffers(&self) -> impl '_ + Iterator<Item = Model<Buffer>> {
1215 self.opened_buffers
1216 .values()
1217 .filter_map(|buffer| buffer.upgrade())
1218 }
1219
1220 pub fn loading_buffers(
1221 &self,
1222 ) -> impl Iterator<
1223 Item = (
1224 &ProjectPath,
1225 postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
1226 ),
1227 > {
1228 self.loading_buffers_by_path
1229 .iter()
1230 .map(|(path, rx)| (path, rx.clone()))
1231 }
1232
1233 pub fn get_by_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Model<Buffer>> {
1234 self.buffers().find_map(|buffer| {
1235 let file = File::from_dyn(buffer.read(cx).file())?;
1236 if file.worktree_id(cx) == path.worktree_id && file.path == path.path {
1237 Some(buffer)
1238 } else {
1239 None
1240 }
1241 })
1242 }
1243
1244 pub fn get(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
1245 self.opened_buffers
1246 .get(&buffer_id)
1247 .and_then(|buffer| buffer.upgrade())
1248 }
1249
1250 pub fn get_existing(&self, buffer_id: BufferId) -> Result<Model<Buffer>> {
1251 self.get(buffer_id)
1252 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
1253 }
1254
1255 pub fn get_possibly_incomplete(
1256 &self,
1257 buffer_id: BufferId,
1258 cx: &AppContext,
1259 ) -> Option<Model<Buffer>> {
1260 self.get(buffer_id).or_else(|| {
1261 self.state.as_remote().and_then(|remote| {
1262 remote
1263 .read(cx)
1264 .loading_remote_buffers_by_id
1265 .get(&buffer_id)
1266 .cloned()
1267 })
1268 })
1269 }
1270
1271 pub fn buffer_version_info(
1272 &self,
1273 cx: &AppContext,
1274 ) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
1275 let buffers = self
1276 .buffers()
1277 .map(|buffer| {
1278 let buffer = buffer.read(cx);
1279 proto::BufferVersion {
1280 id: buffer.remote_id().into(),
1281 version: language::proto::serialize_version(&buffer.version),
1282 }
1283 })
1284 .collect();
1285 let incomplete_buffer_ids = self
1286 .state
1287 .as_remote()
1288 .map(|remote| remote.read(cx).incomplete_buffer_ids())
1289 .unwrap_or_default();
1290 (buffers, incomplete_buffer_ids)
1291 }
1292
1293 pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
1294 for open_buffer in self.opened_buffers.values_mut() {
1295 if let Some(buffer) = open_buffer.upgrade() {
1296 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1297 }
1298 }
1299
1300 for buffer in self.buffers() {
1301 buffer.update(cx, |buffer, cx| {
1302 buffer.set_capability(Capability::ReadOnly, cx)
1303 });
1304 }
1305
1306 if let Some(remote) = self.state.as_remote() {
1307 remote.update(cx, |remote, _| {
1308 // Wake up all futures currently waiting on a buffer to get opened,
1309 // to give them a chance to fail now that we've disconnected.
1310 remote.remote_buffer_listeners.clear()
1311 })
1312 }
1313 }
1314
1315 pub fn shared(
1316 &mut self,
1317 remote_id: u64,
1318 downstream_client: AnyProtoClient,
1319 _cx: &mut AppContext,
1320 ) {
1321 self.downstream_client = Some((downstream_client, remote_id));
1322 }
1323
1324 pub fn unshared(&mut self, _cx: &mut ModelContext<Self>) {
1325 self.downstream_client.take();
1326 self.forget_shared_buffers();
1327 }
1328
1329 pub fn discard_incomplete(&mut self) {
1330 self.opened_buffers
1331 .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
1332 }
1333
1334 pub fn find_search_candidates(
1335 &mut self,
1336 query: &SearchQuery,
1337 mut limit: usize,
1338 fs: Arc<dyn Fs>,
1339 cx: &mut ModelContext<Self>,
1340 ) -> Receiver<Model<Buffer>> {
1341 let (tx, rx) = smol::channel::unbounded();
1342 let mut open_buffers = HashSet::default();
1343 let mut unnamed_buffers = Vec::new();
1344 for handle in self.buffers() {
1345 let buffer = handle.read(cx);
1346 if let Some(entry_id) = buffer.entry_id(cx) {
1347 open_buffers.insert(entry_id);
1348 } else {
1349 limit = limit.saturating_sub(1);
1350 unnamed_buffers.push(handle)
1351 };
1352 }
1353
1354 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
1355 let mut project_paths_rx = self
1356 .worktree_store
1357 .update(cx, |worktree_store, cx| {
1358 worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
1359 })
1360 .chunks(MAX_CONCURRENT_BUFFER_OPENS);
1361
1362 cx.spawn(|this, mut cx| async move {
1363 for buffer in unnamed_buffers {
1364 tx.send(buffer).await.ok();
1365 }
1366
1367 while let Some(project_paths) = project_paths_rx.next().await {
1368 let buffers = this.update(&mut cx, |this, cx| {
1369 project_paths
1370 .into_iter()
1371 .map(|project_path| this.open_buffer(project_path, cx))
1372 .collect::<Vec<_>>()
1373 })?;
1374 for buffer_task in buffers {
1375 if let Some(buffer) = buffer_task.await.log_err() {
1376 if tx.send(buffer).await.is_err() {
1377 return anyhow::Ok(());
1378 }
1379 }
1380 }
1381 }
1382 anyhow::Ok(())
1383 })
1384 .detach();
1385 rx
1386 }
1387
1388 fn on_buffer_event(
1389 &mut self,
1390 buffer: Model<Buffer>,
1391 event: &BufferEvent,
1392 cx: &mut ModelContext<Self>,
1393 ) {
1394 match event {
1395 BufferEvent::FileHandleChanged => {
1396 if let Some(local) = self.state.as_local() {
1397 local.update(cx, |local, cx| {
1398 local.buffer_changed_file(buffer, cx);
1399 })
1400 }
1401 }
1402 BufferEvent::Reloaded => {
1403 let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
1404 return;
1405 };
1406 let buffer = buffer.read(cx);
1407 downstream_client
1408 .send(proto::BufferReloaded {
1409 project_id: *project_id,
1410 buffer_id: buffer.remote_id().to_proto(),
1411 version: serialize_version(&buffer.version()),
1412 mtime: buffer.saved_mtime().map(|t| t.into()),
1413 line_ending: serialize_line_ending(buffer.line_ending()) as i32,
1414 })
1415 .log_err();
1416 }
1417 _ => {}
1418 }
1419 }
1420
1421 pub async fn handle_update_buffer(
1422 this: Model<Self>,
1423 envelope: TypedEnvelope<proto::UpdateBuffer>,
1424 mut cx: AsyncAppContext,
1425 ) -> Result<proto::Ack> {
1426 let payload = envelope.payload.clone();
1427 let buffer_id = BufferId::new(payload.buffer_id)?;
1428 let ops = payload
1429 .operations
1430 .into_iter()
1431 .map(language::proto::deserialize_operation)
1432 .collect::<Result<Vec<_>, _>>()?;
1433 this.update(&mut cx, |this, cx| {
1434 match this.opened_buffers.entry(buffer_id) {
1435 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
1436 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
1437 OpenBuffer::Buffer(buffer) => {
1438 if let Some(buffer) = buffer.upgrade() {
1439 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
1440 }
1441 }
1442 },
1443 hash_map::Entry::Vacant(e) => {
1444 e.insert(OpenBuffer::Operations(ops));
1445 }
1446 }
1447 Ok(proto::Ack {})
1448 })?
1449 }
1450
1451 pub fn handle_synchronize_buffers(
1452 &mut self,
1453 envelope: TypedEnvelope<proto::SynchronizeBuffers>,
1454 cx: &mut ModelContext<Self>,
1455 client: Arc<Client>,
1456 ) -> Result<proto::SynchronizeBuffersResponse> {
1457 let project_id = envelope.payload.project_id;
1458 let mut response = proto::SynchronizeBuffersResponse {
1459 buffers: Default::default(),
1460 };
1461 let Some(guest_id) = envelope.original_sender_id else {
1462 anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
1463 };
1464
1465 self.shared_buffers.entry(guest_id).or_default().clear();
1466 for buffer in envelope.payload.buffers {
1467 let buffer_id = BufferId::new(buffer.id)?;
1468 let remote_version = language::proto::deserialize_version(&buffer.version);
1469 if let Some(buffer) = self.get(buffer_id) {
1470 self.shared_buffers
1471 .entry(guest_id)
1472 .or_default()
1473 .insert(buffer.clone());
1474
1475 let buffer = buffer.read(cx);
1476 response.buffers.push(proto::BufferVersion {
1477 id: buffer_id.into(),
1478 version: language::proto::serialize_version(&buffer.version),
1479 });
1480
1481 let operations = buffer.serialize_ops(Some(remote_version), cx);
1482 let client = client.clone();
1483 if let Some(file) = buffer.file() {
1484 client
1485 .send(proto::UpdateBufferFile {
1486 project_id,
1487 buffer_id: buffer_id.into(),
1488 file: Some(file.to_proto(cx)),
1489 })
1490 .log_err();
1491 }
1492
1493 client
1494 .send(proto::UpdateDiffBase {
1495 project_id,
1496 buffer_id: buffer_id.into(),
1497 diff_base: buffer.diff_base().map(ToString::to_string),
1498 })
1499 .log_err();
1500
1501 client
1502 .send(proto::BufferReloaded {
1503 project_id,
1504 buffer_id: buffer_id.into(),
1505 version: language::proto::serialize_version(buffer.saved_version()),
1506 mtime: buffer.saved_mtime().map(|time| time.into()),
1507 line_ending: language::proto::serialize_line_ending(buffer.line_ending())
1508 as i32,
1509 })
1510 .log_err();
1511
1512 cx.background_executor()
1513 .spawn(
1514 async move {
1515 let operations = operations.await;
1516 for chunk in split_operations(operations) {
1517 client
1518 .request(proto::UpdateBuffer {
1519 project_id,
1520 buffer_id: buffer_id.into(),
1521 operations: chunk,
1522 })
1523 .await?;
1524 }
1525 anyhow::Ok(())
1526 }
1527 .log_err(),
1528 )
1529 .detach();
1530 }
1531 }
1532 Ok(response)
1533 }
1534
1535 pub fn handle_create_buffer_for_peer(
1536 &mut self,
1537 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
1538 replica_id: u16,
1539 capability: Capability,
1540 cx: &mut ModelContext<Self>,
1541 ) -> Result<()> {
1542 let Some(remote) = self.state.as_remote() else {
1543 return Err(anyhow!("buffer store is not a remote"));
1544 };
1545
1546 if let Some(buffer) = remote.update(cx, |remote, cx| {
1547 remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx)
1548 })? {
1549 self.add_buffer(buffer, cx)?;
1550 }
1551
1552 Ok(())
1553 }
1554
1555 pub async fn handle_update_buffer_file(
1556 this: Model<Self>,
1557 envelope: TypedEnvelope<proto::UpdateBufferFile>,
1558 mut cx: AsyncAppContext,
1559 ) -> Result<()> {
1560 let buffer_id = envelope.payload.buffer_id;
1561 let buffer_id = BufferId::new(buffer_id)?;
1562
1563 this.update(&mut cx, |this, cx| {
1564 let payload = envelope.payload.clone();
1565 if let Some(buffer) = this.get_possibly_incomplete(buffer_id, cx) {
1566 let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
1567 let worktree = this
1568 .worktree_store
1569 .read(cx)
1570 .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
1571 .ok_or_else(|| anyhow!("no such worktree"))?;
1572 let file = File::from_proto(file, worktree, cx)?;
1573 let old_file = buffer.update(cx, |buffer, cx| {
1574 let old_file = buffer.file().cloned();
1575 let new_path = file.path.clone();
1576 buffer.file_updated(Arc::new(file), cx);
1577 if old_file
1578 .as_ref()
1579 .map_or(true, |old| *old.path() != new_path)
1580 {
1581 Some(old_file)
1582 } else {
1583 None
1584 }
1585 });
1586 if let Some(old_file) = old_file {
1587 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1588 }
1589 }
1590 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1591 downstream_client
1592 .send(proto::UpdateBufferFile {
1593 project_id: *project_id,
1594 buffer_id: buffer_id.into(),
1595 file: envelope.payload.file,
1596 })
1597 .log_err();
1598 }
1599 Ok(())
1600 })?
1601 }
1602
1603 pub async fn handle_update_diff_base(
1604 this: Model<Self>,
1605 envelope: TypedEnvelope<proto::UpdateDiffBase>,
1606 mut cx: AsyncAppContext,
1607 ) -> Result<()> {
1608 this.update(&mut cx, |this, cx| {
1609 let buffer_id = envelope.payload.buffer_id;
1610 let buffer_id = BufferId::new(buffer_id)?;
1611 if let Some(buffer) = this.get_possibly_incomplete(buffer_id, cx) {
1612 buffer.update(cx, |buffer, cx| {
1613 buffer.set_diff_base(envelope.payload.diff_base.clone(), cx)
1614 });
1615 }
1616 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1617 downstream_client
1618 .send(proto::UpdateDiffBase {
1619 project_id: *project_id,
1620 buffer_id: buffer_id.into(),
1621 diff_base: envelope.payload.diff_base,
1622 })
1623 .log_err();
1624 }
1625 Ok(())
1626 })?
1627 }
1628
1629 pub async fn handle_save_buffer(
1630 this: Model<Self>,
1631 envelope: TypedEnvelope<proto::SaveBuffer>,
1632 mut cx: AsyncAppContext,
1633 ) -> Result<proto::BufferSaved> {
1634 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1635 let (buffer, project_id) = this.update(&mut cx, |this, _| {
1636 anyhow::Ok((
1637 this.get_existing(buffer_id)?,
1638 this.downstream_client
1639 .as_ref()
1640 .map(|(_, project_id)| *project_id)
1641 .context("project is not shared")?,
1642 ))
1643 })??;
1644 buffer
1645 .update(&mut cx, |buffer, _| {
1646 buffer.wait_for_version(deserialize_version(&envelope.payload.version))
1647 })?
1648 .await?;
1649 let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
1650
1651 if let Some(new_path) = envelope.payload.new_path {
1652 let new_path = ProjectPath::from_proto(new_path);
1653 this.update(&mut cx, |this, cx| {
1654 this.save_buffer_as(buffer.clone(), new_path, cx)
1655 })?
1656 .await?;
1657 } else {
1658 this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
1659 .await?;
1660 }
1661
1662 buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
1663 project_id,
1664 buffer_id: buffer_id.into(),
1665 version: serialize_version(buffer.saved_version()),
1666 mtime: buffer.saved_mtime().map(|time| time.into()),
1667 })
1668 }
1669
1670 pub async fn handle_close_buffer(
1671 this: Model<Self>,
1672 envelope: TypedEnvelope<proto::CloseBuffer>,
1673 mut cx: AsyncAppContext,
1674 ) -> Result<()> {
1675 let peer_id = envelope.sender_id;
1676 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1677 this.update(&mut cx, |this, _| {
1678 if let Some(buffer) = this.get(buffer_id) {
1679 if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
1680 if shared.remove(&buffer) {
1681 if shared.is_empty() {
1682 this.shared_buffers.remove(&peer_id);
1683 }
1684 return;
1685 }
1686 }
1687 };
1688 debug_panic!(
1689 "peer_id {} closed buffer_id {} which was either not open or already closed",
1690 peer_id,
1691 buffer_id
1692 )
1693 })
1694 }
1695
1696 pub async fn handle_buffer_saved(
1697 this: Model<Self>,
1698 envelope: TypedEnvelope<proto::BufferSaved>,
1699 mut cx: AsyncAppContext,
1700 ) -> Result<()> {
1701 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1702 let version = deserialize_version(&envelope.payload.version);
1703 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1704 this.update(&mut cx, move |this, cx| {
1705 if let Some(buffer) = this.get_possibly_incomplete(buffer_id, cx) {
1706 buffer.update(cx, |buffer, cx| {
1707 buffer.did_save(version, mtime, cx);
1708 });
1709 }
1710
1711 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1712 downstream_client
1713 .send(proto::BufferSaved {
1714 project_id: *project_id,
1715 buffer_id: buffer_id.into(),
1716 mtime: envelope.payload.mtime,
1717 version: envelope.payload.version,
1718 })
1719 .log_err();
1720 }
1721 })
1722 }
1723
1724 pub async fn handle_buffer_reloaded(
1725 this: Model<Self>,
1726 envelope: TypedEnvelope<proto::BufferReloaded>,
1727 mut cx: AsyncAppContext,
1728 ) -> Result<()> {
1729 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1730 let version = deserialize_version(&envelope.payload.version);
1731 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1732 let line_ending = deserialize_line_ending(
1733 proto::LineEnding::from_i32(envelope.payload.line_ending)
1734 .ok_or_else(|| anyhow!("missing line ending"))?,
1735 );
1736 this.update(&mut cx, |this, cx| {
1737 if let Some(buffer) = this.get_possibly_incomplete(buffer_id, cx) {
1738 buffer.update(cx, |buffer, cx| {
1739 buffer.did_reload(version, line_ending, mtime, cx);
1740 });
1741 }
1742
1743 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1744 downstream_client
1745 .send(proto::BufferReloaded {
1746 project_id: *project_id,
1747 buffer_id: buffer_id.into(),
1748 mtime: envelope.payload.mtime,
1749 version: envelope.payload.version,
1750 line_ending: envelope.payload.line_ending,
1751 })
1752 .log_err();
1753 }
1754 })
1755 }
1756
1757 pub async fn handle_blame_buffer(
1758 this: Model<Self>,
1759 envelope: TypedEnvelope<proto::BlameBuffer>,
1760 mut cx: AsyncAppContext,
1761 ) -> Result<proto::BlameBufferResponse> {
1762 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1763 let version = deserialize_version(&envelope.payload.version);
1764 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1765 buffer
1766 .update(&mut cx, |buffer, _| {
1767 buffer.wait_for_version(version.clone())
1768 })?
1769 .await?;
1770 let blame = this
1771 .update(&mut cx, |this, cx| {
1772 this.blame_buffer(&buffer, Some(version), cx)
1773 })?
1774 .await?;
1775 Ok(serialize_blame_buffer_response(blame))
1776 }
1777
1778 pub async fn wait_for_loading_buffer(
1779 mut receiver: postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
1780 ) -> Result<Model<Buffer>, Arc<anyhow::Error>> {
1781 loop {
1782 if let Some(result) = receiver.borrow().as_ref() {
1783 match result {
1784 Ok(buffer) => return Ok(buffer.to_owned()),
1785 Err(e) => return Err(e.to_owned()),
1786 }
1787 }
1788 receiver.next().await;
1789 }
1790 }
1791
1792 pub fn reload_buffers(
1793 &self,
1794 buffers: HashSet<Model<Buffer>>,
1795 push_to_history: bool,
1796 cx: &mut ModelContext<Self>,
1797 ) -> Task<Result<ProjectTransaction>> {
1798 let buffers: Vec<Model<Buffer>> = buffers
1799 .into_iter()
1800 .filter(|buffer| buffer.read(cx).is_dirty())
1801 .collect();
1802 if buffers.is_empty() {
1803 return Task::ready(Ok(ProjectTransaction::default()));
1804 }
1805 self.state.reload_buffers(buffers, push_to_history, cx)
1806 }
1807
1808 async fn handle_reload_buffers(
1809 this: Model<Self>,
1810 envelope: TypedEnvelope<proto::ReloadBuffers>,
1811 mut cx: AsyncAppContext,
1812 ) -> Result<proto::ReloadBuffersResponse> {
1813 let sender_id = envelope.original_sender_id().unwrap_or_default();
1814 let reload = this.update(&mut cx, |this, cx| {
1815 let mut buffers = HashSet::default();
1816 for buffer_id in &envelope.payload.buffer_ids {
1817 let buffer_id = BufferId::new(*buffer_id)?;
1818 buffers.insert(this.get_existing(buffer_id)?);
1819 }
1820 Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
1821 })??;
1822
1823 let project_transaction = reload.await?;
1824 let project_transaction = this.update(&mut cx, |this, cx| {
1825 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
1826 })?;
1827 Ok(proto::ReloadBuffersResponse {
1828 transaction: Some(project_transaction),
1829 })
1830 }
1831
1832 pub fn create_buffer_for_peer(
1833 &mut self,
1834 buffer: &Model<Buffer>,
1835 peer_id: proto::PeerId,
1836 cx: &mut ModelContext<Self>,
1837 ) -> Task<Result<()>> {
1838 let buffer_id = buffer.read(cx).remote_id();
1839 if !self
1840 .shared_buffers
1841 .entry(peer_id)
1842 .or_default()
1843 .insert(buffer.clone())
1844 {
1845 return Task::ready(Ok(()));
1846 }
1847
1848 let Some((client, project_id)) = self.downstream_client.clone() else {
1849 return Task::ready(Ok(()));
1850 };
1851
1852 cx.spawn(|this, mut cx| async move {
1853 let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else {
1854 return anyhow::Ok(());
1855 };
1856
1857 let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
1858 let operations = operations.await;
1859 let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?;
1860
1861 let initial_state = proto::CreateBufferForPeer {
1862 project_id,
1863 peer_id: Some(peer_id),
1864 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
1865 };
1866
1867 if client.send(initial_state).log_err().is_some() {
1868 let client = client.clone();
1869 cx.background_executor()
1870 .spawn(async move {
1871 let mut chunks = split_operations(operations).peekable();
1872 while let Some(chunk) = chunks.next() {
1873 let is_last = chunks.peek().is_none();
1874 client.send(proto::CreateBufferForPeer {
1875 project_id,
1876 peer_id: Some(peer_id),
1877 variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
1878 proto::BufferChunk {
1879 buffer_id: buffer_id.into(),
1880 operations: chunk,
1881 is_last,
1882 },
1883 )),
1884 })?;
1885 }
1886 anyhow::Ok(())
1887 })
1888 .await
1889 .log_err();
1890 }
1891 Ok(())
1892 })
1893 }
1894
1895 pub fn forget_shared_buffers(&mut self) {
1896 self.shared_buffers.clear();
1897 }
1898
1899 pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
1900 self.shared_buffers.remove(peer_id);
1901 }
1902
1903 pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
1904 if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
1905 self.shared_buffers.insert(new_peer_id, buffers);
1906 }
1907 }
1908
1909 pub fn shared_buffers(&self) -> &HashMap<proto::PeerId, HashSet<Model<Buffer>>> {
1910 &self.shared_buffers
1911 }
1912
1913 pub fn create_local_buffer(
1914 &mut self,
1915 text: &str,
1916 language: Option<Arc<Language>>,
1917 cx: &mut ModelContext<Self>,
1918 ) -> Model<Buffer> {
1919 let buffer = cx.new_model(|cx| {
1920 Buffer::local(text, cx)
1921 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
1922 });
1923
1924 self.add_buffer(buffer.clone(), cx).log_err();
1925 let buffer_id = buffer.read(cx).remote_id();
1926
1927 let local = self
1928 .state
1929 .as_local()
1930 .expect("local-only method called in a non-local context");
1931 local.update(cx, |this, cx| {
1932 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
1933 this.local_buffer_ids_by_path.insert(
1934 ProjectPath {
1935 worktree_id: file.worktree_id(cx),
1936 path: file.path.clone(),
1937 },
1938 buffer_id,
1939 );
1940
1941 if let Some(entry_id) = file.entry_id {
1942 this.local_buffer_ids_by_entry_id
1943 .insert(entry_id, buffer_id);
1944 }
1945 }
1946 });
1947 buffer
1948 }
1949
1950 pub fn deserialize_project_transaction(
1951 &mut self,
1952 message: proto::ProjectTransaction,
1953 push_to_history: bool,
1954 cx: &mut ModelContext<Self>,
1955 ) -> Task<Result<ProjectTransaction>> {
1956 if let Some(remote) = self.state.as_remote() {
1957 remote.update(cx, |remote, cx| {
1958 remote.deserialize_project_transaction(message, push_to_history, cx)
1959 })
1960 } else {
1961 debug_panic!("not a remote buffer store");
1962 Task::ready(Err(anyhow!("not a remote buffer store")))
1963 }
1964 }
1965
1966 pub fn wait_for_remote_buffer(
1967 &self,
1968 id: BufferId,
1969 cx: &mut AppContext,
1970 ) -> Task<Result<Model<Buffer>>> {
1971 if let Some(remote) = self.state.as_remote() {
1972 remote.update(cx, |remote, cx| remote.wait_for_remote_buffer(id, cx))
1973 } else {
1974 debug_panic!("not a remote buffer store");
1975 Task::ready(Err(anyhow!("not a remote buffer store")))
1976 }
1977 }
1978
1979 pub fn serialize_project_transaction_for_peer(
1980 &mut self,
1981 project_transaction: ProjectTransaction,
1982 peer_id: proto::PeerId,
1983 cx: &mut ModelContext<Self>,
1984 ) -> proto::ProjectTransaction {
1985 let mut serialized_transaction = proto::ProjectTransaction {
1986 buffer_ids: Default::default(),
1987 transactions: Default::default(),
1988 };
1989 for (buffer, transaction) in project_transaction.0 {
1990 self.create_buffer_for_peer(&buffer, peer_id, cx)
1991 .detach_and_log_err(cx);
1992 serialized_transaction
1993 .buffer_ids
1994 .push(buffer.read(cx).remote_id().into());
1995 serialized_transaction
1996 .transactions
1997 .push(language::proto::serialize_transaction(&transaction));
1998 }
1999 serialized_transaction
2000 }
2001}
2002
2003impl OpenBuffer {
2004 fn upgrade(&self) -> Option<Model<Buffer>> {
2005 match self {
2006 OpenBuffer::Buffer(handle) => handle.upgrade(),
2007 OpenBuffer::Operations(_) => None,
2008 }
2009 }
2010}
2011
2012fn is_not_found_error(error: &anyhow::Error) -> bool {
2013 error
2014 .root_cause()
2015 .downcast_ref::<io::Error>()
2016 .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
2017}
2018
2019fn serialize_blame_buffer_response(blame: git::blame::Blame) -> proto::BlameBufferResponse {
2020 let entries = blame
2021 .entries
2022 .into_iter()
2023 .map(|entry| proto::BlameEntry {
2024 sha: entry.sha.as_bytes().into(),
2025 start_line: entry.range.start,
2026 end_line: entry.range.end,
2027 original_line_number: entry.original_line_number,
2028 author: entry.author.clone(),
2029 author_mail: entry.author_mail.clone(),
2030 author_time: entry.author_time,
2031 author_tz: entry.author_tz.clone(),
2032 committer: entry.committer.clone(),
2033 committer_mail: entry.committer_mail.clone(),
2034 committer_time: entry.committer_time,
2035 committer_tz: entry.committer_tz.clone(),
2036 summary: entry.summary.clone(),
2037 previous: entry.previous.clone(),
2038 filename: entry.filename.clone(),
2039 })
2040 .collect::<Vec<_>>();
2041
2042 let messages = blame
2043 .messages
2044 .into_iter()
2045 .map(|(oid, message)| proto::CommitMessage {
2046 oid: oid.as_bytes().into(),
2047 message,
2048 })
2049 .collect::<Vec<_>>();
2050
2051 let permalinks = blame
2052 .permalinks
2053 .into_iter()
2054 .map(|(oid, url)| proto::CommitPermalink {
2055 oid: oid.as_bytes().into(),
2056 permalink: url.to_string(),
2057 })
2058 .collect::<Vec<_>>();
2059
2060 proto::BlameBufferResponse {
2061 entries,
2062 messages,
2063 permalinks,
2064 remote_url: blame.remote_url,
2065 }
2066}
2067
2068fn deserialize_blame_buffer_response(response: proto::BlameBufferResponse) -> git::blame::Blame {
2069 let entries = response
2070 .entries
2071 .into_iter()
2072 .filter_map(|entry| {
2073 Some(git::blame::BlameEntry {
2074 sha: git::Oid::from_bytes(&entry.sha).ok()?,
2075 range: entry.start_line..entry.end_line,
2076 original_line_number: entry.original_line_number,
2077 committer: entry.committer,
2078 committer_time: entry.committer_time,
2079 committer_tz: entry.committer_tz,
2080 committer_mail: entry.committer_mail,
2081 author: entry.author,
2082 author_mail: entry.author_mail,
2083 author_time: entry.author_time,
2084 author_tz: entry.author_tz,
2085 summary: entry.summary,
2086 previous: entry.previous,
2087 filename: entry.filename,
2088 })
2089 })
2090 .collect::<Vec<_>>();
2091
2092 let messages = response
2093 .messages
2094 .into_iter()
2095 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
2096 .collect::<HashMap<_, _>>();
2097
2098 let permalinks = response
2099 .permalinks
2100 .into_iter()
2101 .filter_map(|permalink| {
2102 Some((
2103 git::Oid::from_bytes(&permalink.oid).ok()?,
2104 Url::from_str(&permalink.permalink).ok()?,
2105 ))
2106 })
2107 .collect::<HashMap<_, _>>();
2108
2109 Blame {
2110 entries,
2111 permalinks,
2112 messages,
2113 remote_url: response.remote_url,
2114 }
2115}