1use crate::{
2 ProjectPath,
3 lsp_store::OpenLspBufferHandle,
4 worktree_store::{WorktreeStore, WorktreeStoreEvent},
5};
6use anyhow::{Context as _, Result, anyhow};
7use client::Client;
8use collections::{HashMap, HashSet, hash_map};
9use futures::{Future, FutureExt as _, channel::oneshot, future::Shared};
10use gpui::{
11 App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity,
12};
13use language::{
14 Buffer, BufferEvent, Capability, DiskState, File as _, Language, Operation,
15 proto::{
16 deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
17 split_operations,
18 },
19};
20use rpc::{
21 AnyProtoClient, ErrorCode, ErrorExt as _, TypedEnvelope,
22 proto::{self, PeerId},
23};
24
25use settings::Settings;
26use std::{io, sync::Arc, time::Instant};
27use text::{BufferId, ReplicaId};
28use util::{ResultExt as _, TryFutureExt, debug_panic, maybe, rel_path::RelPath};
29use worktree::{File, PathChange, ProjectEntryId, Worktree, WorktreeId, WorktreeSettings};
30
31/// A set of open buffers.
32pub struct BufferStore {
33 state: BufferStoreState,
34 #[allow(clippy::type_complexity)]
35 loading_buffers: HashMap<ProjectPath, Shared<Task<Result<Entity<Buffer>, Arc<anyhow::Error>>>>>,
36 worktree_store: Entity<WorktreeStore>,
37 opened_buffers: HashMap<BufferId, OpenBuffer>,
38 path_to_buffer_id: HashMap<ProjectPath, BufferId>,
39 downstream_client: Option<(AnyProtoClient, u64)>,
40 shared_buffers: HashMap<proto::PeerId, HashMap<BufferId, SharedBuffer>>,
41 non_searchable_buffers: HashSet<BufferId>,
42 project_search: RemoteProjectSearchState,
43}
44
45#[derive(Default)]
46struct RemoteProjectSearchState {
47 // List of ongoing project search chunks from our remote host. Used by the side issuing a search RPC request.
48 chunks: HashMap<u64, smol::channel::Sender<BufferId>>,
49 // Monotonously-increasing handle to hand out to remote host in order to identify the project search result chunk.
50 next_id: u64,
51 // Used by the side running the actual search for match candidates to potentially cancel the search prematurely.
52 searches_in_progress: HashMap<(PeerId, u64), Task<Result<()>>>,
53}
54
55#[derive(Hash, Eq, PartialEq, Clone)]
56struct SharedBuffer {
57 buffer: Entity<Buffer>,
58 lsp_handle: Option<OpenLspBufferHandle>,
59}
60
61enum BufferStoreState {
62 Local(LocalBufferStore),
63 Remote(RemoteBufferStore),
64}
65
66struct RemoteBufferStore {
67 shared_with_me: HashSet<Entity<Buffer>>,
68 upstream_client: AnyProtoClient,
69 project_id: u64,
70 loading_remote_buffers_by_id: HashMap<BufferId, Entity<Buffer>>,
71 remote_buffer_listeners:
72 HashMap<BufferId, Vec<oneshot::Sender<anyhow::Result<Entity<Buffer>>>>>,
73 worktree_store: Entity<WorktreeStore>,
74}
75
76struct LocalBufferStore {
77 local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
78 worktree_store: Entity<WorktreeStore>,
79 _subscription: Subscription,
80}
81
82enum OpenBuffer {
83 Complete { buffer: WeakEntity<Buffer> },
84 Operations(Vec<Operation>),
85}
86
87pub enum BufferStoreEvent {
88 BufferAdded(Entity<Buffer>),
89 // TODO(jk): this event seems unused
90 BufferOpened {
91 buffer: Entity<Buffer>,
92 project_path: ProjectPath,
93 },
94 SharedBufferClosed(proto::PeerId, BufferId),
95 BufferDropped(BufferId),
96 BufferChangedFilePath {
97 buffer: Entity<Buffer>,
98 old_file: Option<Arc<dyn language::File>>,
99 },
100}
101
102#[derive(Default, Debug, Clone)]
103pub struct ProjectTransaction(pub HashMap<Entity<Buffer>, language::Transaction>);
104
105impl PartialEq for ProjectTransaction {
106 fn eq(&self, other: &Self) -> bool {
107 self.0.len() == other.0.len()
108 && self.0.iter().all(|(buffer, transaction)| {
109 other.0.get(buffer).is_some_and(|t| t.id == transaction.id)
110 })
111 }
112}
113
114impl EventEmitter<BufferStoreEvent> for BufferStore {}
115
116impl RemoteBufferStore {
117 pub fn wait_for_remote_buffer(
118 &mut self,
119 id: BufferId,
120 cx: &mut Context<BufferStore>,
121 ) -> Task<Result<Entity<Buffer>>> {
122 let (tx, rx) = oneshot::channel();
123 self.remote_buffer_listeners.entry(id).or_default().push(tx);
124
125 cx.spawn(async move |this, cx| {
126 if let Some(buffer) = this
127 .read_with(cx, |buffer_store, _| buffer_store.get(id))
128 .ok()
129 .flatten()
130 {
131 return Ok(buffer);
132 }
133
134 cx.background_spawn(async move { rx.await? }).await
135 })
136 }
137
138 fn save_remote_buffer(
139 &self,
140 buffer_handle: Entity<Buffer>,
141 new_path: Option<proto::ProjectPath>,
142 cx: &Context<BufferStore>,
143 ) -> Task<Result<()>> {
144 let buffer = buffer_handle.read(cx);
145 let buffer_id = buffer.remote_id().into();
146 let version = buffer.version();
147 let rpc = self.upstream_client.clone();
148 let project_id = self.project_id;
149 cx.spawn(async move |_, cx| {
150 let response = rpc
151 .request(proto::SaveBuffer {
152 project_id,
153 buffer_id,
154 new_path,
155 version: serialize_version(&version),
156 })
157 .await?;
158 let version = deserialize_version(&response.version);
159 let mtime = response.mtime.map(|mtime| mtime.into());
160
161 buffer_handle.update(cx, |buffer, cx| {
162 buffer.did_save(version.clone(), mtime, cx);
163 });
164
165 Ok(())
166 })
167 }
168
169 pub fn handle_create_buffer_for_peer(
170 &mut self,
171 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
172 replica_id: ReplicaId,
173 capability: Capability,
174 cx: &mut Context<BufferStore>,
175 ) -> Result<Option<Entity<Buffer>>> {
176 match envelope.payload.variant.context("missing variant")? {
177 proto::create_buffer_for_peer::Variant::State(mut state) => {
178 let buffer_id = BufferId::new(state.id)?;
179
180 let buffer_result = maybe!({
181 let mut buffer_file = None;
182 if let Some(file) = state.file.take() {
183 let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
184 let worktree = self
185 .worktree_store
186 .read(cx)
187 .worktree_for_id(worktree_id, cx)
188 .with_context(|| {
189 format!("no worktree found for id {}", file.worktree_id)
190 })?;
191 buffer_file = Some(Arc::new(File::from_proto(file, worktree, cx)?)
192 as Arc<dyn language::File>);
193 }
194 Buffer::from_proto(replica_id, capability, state, buffer_file)
195 });
196
197 match buffer_result {
198 Ok(buffer) => {
199 let buffer = cx.new(|_| buffer);
200 self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
201 }
202 Err(error) => {
203 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
204 for listener in listeners {
205 listener.send(Err(anyhow!(error.cloned()))).ok();
206 }
207 }
208 }
209 }
210 }
211 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
212 let buffer_id = BufferId::new(chunk.buffer_id)?;
213 let buffer = self
214 .loading_remote_buffers_by_id
215 .get(&buffer_id)
216 .cloned()
217 .with_context(|| {
218 format!(
219 "received chunk for buffer {} without initial state",
220 chunk.buffer_id
221 )
222 })?;
223
224 let result = maybe!({
225 let operations = chunk
226 .operations
227 .into_iter()
228 .map(language::proto::deserialize_operation)
229 .collect::<Result<Vec<_>>>()?;
230 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
231 anyhow::Ok(())
232 });
233
234 if let Err(error) = result {
235 self.loading_remote_buffers_by_id.remove(&buffer_id);
236 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
237 for listener in listeners {
238 listener.send(Err(error.cloned())).ok();
239 }
240 }
241 } else if chunk.is_last {
242 self.loading_remote_buffers_by_id.remove(&buffer_id);
243 if self.upstream_client.is_via_collab() {
244 // retain buffers sent by peers to avoid races.
245 self.shared_with_me.insert(buffer.clone());
246 }
247
248 if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) {
249 for sender in senders {
250 sender.send(Ok(buffer.clone())).ok();
251 }
252 }
253 return Ok(Some(buffer));
254 }
255 }
256 }
257 Ok(None)
258 }
259
260 pub fn incomplete_buffer_ids(&self) -> Vec<BufferId> {
261 self.loading_remote_buffers_by_id
262 .keys()
263 .copied()
264 .collect::<Vec<_>>()
265 }
266
267 pub fn deserialize_project_transaction(
268 &self,
269 message: proto::ProjectTransaction,
270 push_to_history: bool,
271 cx: &mut Context<BufferStore>,
272 ) -> Task<Result<ProjectTransaction>> {
273 cx.spawn(async move |this, cx| {
274 let mut project_transaction = ProjectTransaction::default();
275 for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
276 {
277 let buffer_id = BufferId::new(buffer_id)?;
278 let buffer = this
279 .update(cx, |this, cx| this.wait_for_remote_buffer(buffer_id, cx))?
280 .await?;
281 let transaction = language::proto::deserialize_transaction(transaction)?;
282 project_transaction.0.insert(buffer, transaction);
283 }
284
285 for (buffer, transaction) in &project_transaction.0 {
286 buffer
287 .update(cx, |buffer, _| {
288 buffer.wait_for_edits(transaction.edit_ids.iter().copied())
289 })
290 .await?;
291
292 if push_to_history {
293 buffer.update(cx, |buffer, _| {
294 buffer.push_transaction(transaction.clone(), Instant::now());
295 buffer.finalize_last_transaction();
296 });
297 }
298 }
299
300 Ok(project_transaction)
301 })
302 }
303
304 fn open_buffer(
305 &self,
306 path: Arc<RelPath>,
307 worktree: Entity<Worktree>,
308 cx: &mut Context<BufferStore>,
309 ) -> Task<Result<Entity<Buffer>>> {
310 let worktree_id = worktree.read(cx).id().to_proto();
311 let project_id = self.project_id;
312 let client = self.upstream_client.clone();
313 cx.spawn(async move |this, cx| {
314 let response = client
315 .request(proto::OpenBufferByPath {
316 project_id,
317 worktree_id,
318 path: path.to_proto(),
319 })
320 .await?;
321 let buffer_id = BufferId::new(response.buffer_id)?;
322
323 let buffer = this
324 .update(cx, {
325 |this, cx| this.wait_for_remote_buffer(buffer_id, cx)
326 })?
327 .await?;
328
329 Ok(buffer)
330 })
331 }
332
333 fn create_buffer(
334 &self,
335 language: Option<Arc<Language>>,
336 project_searchable: bool,
337 cx: &mut Context<BufferStore>,
338 ) -> Task<Result<Entity<Buffer>>> {
339 let create = self.upstream_client.request(proto::OpenNewBuffer {
340 project_id: self.project_id,
341 });
342 cx.spawn(async move |this, cx| {
343 let response = create.await?;
344 let buffer_id = BufferId::new(response.buffer_id)?;
345
346 let buffer = this
347 .update(cx, |this, cx| {
348 if !project_searchable {
349 this.non_searchable_buffers.insert(buffer_id);
350 }
351 this.wait_for_remote_buffer(buffer_id, cx)
352 })?
353 .await?;
354 if let Some(language) = language {
355 buffer.update(cx, |buffer, cx| {
356 buffer.set_language(Some(language), cx);
357 });
358 }
359 Ok(buffer)
360 })
361 }
362
363 fn reload_buffers(
364 &self,
365 buffers: HashSet<Entity<Buffer>>,
366 push_to_history: bool,
367 cx: &mut Context<BufferStore>,
368 ) -> Task<Result<ProjectTransaction>> {
369 let request = self.upstream_client.request(proto::ReloadBuffers {
370 project_id: self.project_id,
371 buffer_ids: buffers
372 .iter()
373 .map(|buffer| buffer.read(cx).remote_id().to_proto())
374 .collect(),
375 });
376
377 cx.spawn(async move |this, cx| {
378 let response = request.await?.transaction.context("missing transaction")?;
379 this.update(cx, |this, cx| {
380 this.deserialize_project_transaction(response, push_to_history, cx)
381 })?
382 .await
383 })
384 }
385}
386
387impl LocalBufferStore {
388 fn save_local_buffer(
389 &self,
390 buffer_handle: Entity<Buffer>,
391 worktree: Entity<Worktree>,
392 path: Arc<RelPath>,
393 mut has_changed_file: bool,
394 cx: &mut Context<BufferStore>,
395 ) -> Task<Result<()>> {
396 let buffer = buffer_handle.read(cx);
397
398 let text = buffer.as_rope().clone();
399 let line_ending = buffer.line_ending();
400 let encoding = buffer.encoding();
401 let has_bom = buffer.has_bom();
402 let version = buffer.version();
403 let buffer_id = buffer.remote_id();
404 let file = buffer.file().cloned();
405 if file
406 .as_ref()
407 .is_some_and(|file| file.disk_state() == DiskState::New)
408 {
409 has_changed_file = true;
410 }
411
412 let save = worktree.update(cx, |worktree, cx| {
413 worktree.write_file(path, text, line_ending, encoding, has_bom, cx)
414 });
415
416 cx.spawn(async move |this, cx| {
417 let new_file = save.await?;
418 let mtime = new_file.disk_state().mtime();
419 this.update(cx, |this, cx| {
420 if let Some((downstream_client, project_id)) = this.downstream_client.clone() {
421 if has_changed_file {
422 downstream_client
423 .send(proto::UpdateBufferFile {
424 project_id,
425 buffer_id: buffer_id.to_proto(),
426 file: Some(language::File::to_proto(&*new_file, cx)),
427 })
428 .log_err();
429 }
430 downstream_client
431 .send(proto::BufferSaved {
432 project_id,
433 buffer_id: buffer_id.to_proto(),
434 version: serialize_version(&version),
435 mtime: mtime.map(|time| time.into()),
436 })
437 .log_err();
438 }
439 })?;
440 buffer_handle.update(cx, |buffer, cx| {
441 if has_changed_file {
442 buffer.file_updated(new_file, cx);
443 }
444 buffer.did_save(version.clone(), mtime, cx);
445 });
446 Ok(())
447 })
448 }
449
450 fn subscribe_to_worktree(
451 &mut self,
452 worktree: &Entity<Worktree>,
453 cx: &mut Context<BufferStore>,
454 ) {
455 cx.subscribe(worktree, |this, worktree, event, cx| {
456 if worktree.read(cx).is_local()
457 && let worktree::Event::UpdatedEntries(changes) = event
458 {
459 Self::local_worktree_entries_changed(this, &worktree, changes, cx);
460 }
461 })
462 .detach();
463 }
464
465 fn local_worktree_entries_changed(
466 this: &mut BufferStore,
467 worktree_handle: &Entity<Worktree>,
468 changes: &[(Arc<RelPath>, ProjectEntryId, PathChange)],
469 cx: &mut Context<BufferStore>,
470 ) {
471 let snapshot = worktree_handle.read(cx).snapshot();
472 for (path, entry_id, _) in changes {
473 Self::local_worktree_entry_changed(
474 this,
475 *entry_id,
476 path,
477 worktree_handle,
478 &snapshot,
479 cx,
480 );
481 }
482 }
483
484 fn local_worktree_entry_changed(
485 this: &mut BufferStore,
486 entry_id: ProjectEntryId,
487 path: &Arc<RelPath>,
488 worktree: &Entity<worktree::Worktree>,
489 snapshot: &worktree::Snapshot,
490 cx: &mut Context<BufferStore>,
491 ) -> Option<()> {
492 let project_path = ProjectPath {
493 worktree_id: snapshot.id(),
494 path: path.clone(),
495 };
496
497 let buffer_id = this
498 .as_local_mut()
499 .and_then(|local| local.local_buffer_ids_by_entry_id.get(&entry_id))
500 .copied()
501 .or_else(|| this.path_to_buffer_id.get(&project_path).copied())?;
502
503 let buffer = if let Some(buffer) = this.get(buffer_id) {
504 Some(buffer)
505 } else {
506 this.opened_buffers.remove(&buffer_id);
507 this.non_searchable_buffers.remove(&buffer_id);
508 None
509 };
510
511 let buffer = if let Some(buffer) = buffer {
512 buffer
513 } else {
514 this.path_to_buffer_id.remove(&project_path);
515 let this = this.as_local_mut()?;
516 this.local_buffer_ids_by_entry_id.remove(&entry_id);
517 return None;
518 };
519
520 let events = buffer.update(cx, |buffer, cx| {
521 let file = buffer.file()?;
522 let old_file = File::from_dyn(Some(file))?;
523 if old_file.worktree != *worktree {
524 return None;
525 }
526
527 let snapshot_entry = old_file
528 .entry_id
529 .and_then(|entry_id| snapshot.entry_for_id(entry_id))
530 .or_else(|| snapshot.entry_for_path(old_file.path.as_ref()));
531
532 let new_file = if let Some(entry) = snapshot_entry {
533 File {
534 disk_state: match entry.mtime {
535 Some(mtime) => DiskState::Present { mtime },
536 None => old_file.disk_state,
537 },
538 is_local: true,
539 entry_id: Some(entry.id),
540 path: entry.path.clone(),
541 worktree: worktree.clone(),
542 is_private: entry.is_private,
543 }
544 } else {
545 File {
546 disk_state: DiskState::Deleted,
547 is_local: true,
548 entry_id: old_file.entry_id,
549 path: old_file.path.clone(),
550 worktree: worktree.clone(),
551 is_private: old_file.is_private,
552 }
553 };
554
555 if new_file == *old_file {
556 return None;
557 }
558
559 let mut events = Vec::new();
560 if new_file.path != old_file.path {
561 this.path_to_buffer_id.remove(&ProjectPath {
562 path: old_file.path.clone(),
563 worktree_id: old_file.worktree_id(cx),
564 });
565 this.path_to_buffer_id.insert(
566 ProjectPath {
567 worktree_id: new_file.worktree_id(cx),
568 path: new_file.path.clone(),
569 },
570 buffer_id,
571 );
572 events.push(BufferStoreEvent::BufferChangedFilePath {
573 buffer: cx.entity(),
574 old_file: buffer.file().cloned(),
575 });
576 }
577 let local = this.as_local_mut()?;
578 if new_file.entry_id != old_file.entry_id {
579 if let Some(entry_id) = old_file.entry_id {
580 local.local_buffer_ids_by_entry_id.remove(&entry_id);
581 }
582 if let Some(entry_id) = new_file.entry_id {
583 local
584 .local_buffer_ids_by_entry_id
585 .insert(entry_id, buffer_id);
586 }
587 }
588
589 if let Some((client, project_id)) = &this.downstream_client {
590 client
591 .send(proto::UpdateBufferFile {
592 project_id: *project_id,
593 buffer_id: buffer_id.to_proto(),
594 file: Some(new_file.to_proto(cx)),
595 })
596 .ok();
597 }
598
599 buffer.file_updated(Arc::new(new_file), cx);
600 Some(events)
601 })?;
602
603 for event in events {
604 cx.emit(event);
605 }
606
607 None
608 }
609
610 fn save_buffer(
611 &self,
612 buffer: Entity<Buffer>,
613 cx: &mut Context<BufferStore>,
614 ) -> Task<Result<()>> {
615 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
616 return Task::ready(Err(anyhow!("buffer doesn't have a file")));
617 };
618 let worktree = file.worktree.clone();
619 self.save_local_buffer(buffer, worktree, file.path.clone(), false, cx)
620 }
621
622 fn save_buffer_as(
623 &self,
624 buffer: Entity<Buffer>,
625 path: ProjectPath,
626 cx: &mut Context<BufferStore>,
627 ) -> Task<Result<()>> {
628 let Some(worktree) = self
629 .worktree_store
630 .read(cx)
631 .worktree_for_id(path.worktree_id, cx)
632 else {
633 return Task::ready(Err(anyhow!("no such worktree")));
634 };
635 self.save_local_buffer(buffer, worktree, path.path, true, cx)
636 }
637
638 #[ztracing::instrument(skip_all)]
639 fn open_buffer(
640 &self,
641 path: Arc<RelPath>,
642 worktree: Entity<Worktree>,
643 cx: &mut Context<BufferStore>,
644 ) -> Task<Result<Entity<Buffer>>> {
645 let load_file = worktree.update(cx, |worktree, cx| worktree.load_file(path.as_ref(), cx));
646 cx.spawn(async move |this, cx| {
647 let path = path.clone();
648 let buffer = match load_file.await {
649 Ok(loaded) => {
650 let reservation = cx.reserve_entity::<Buffer>();
651 let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
652 let text_buffer = cx
653 .background_spawn(async move {
654 text::Buffer::new(ReplicaId::LOCAL, buffer_id, loaded.text)
655 })
656 .await;
657 cx.insert_entity(reservation, |_| {
658 let mut buffer =
659 Buffer::build(text_buffer, Some(loaded.file), Capability::ReadWrite);
660 buffer.set_encoding(loaded.encoding);
661 buffer.set_has_bom(loaded.has_bom);
662 buffer
663 })
664 }
665 Err(error) if is_not_found_error(&error) => cx.new(|cx| {
666 let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
667 let text_buffer = text::Buffer::new(ReplicaId::LOCAL, buffer_id, "");
668 Buffer::build(
669 text_buffer,
670 Some(Arc::new(File {
671 worktree,
672 path,
673 disk_state: DiskState::New,
674 entry_id: None,
675 is_local: true,
676 is_private: false,
677 })),
678 Capability::ReadWrite,
679 )
680 }),
681 Err(e) => return Err(e),
682 };
683 this.update(cx, |this, cx| {
684 this.add_buffer(buffer.clone(), cx)?;
685 let buffer_id = buffer.read(cx).remote_id();
686 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
687 let project_path = ProjectPath {
688 worktree_id: file.worktree_id(cx),
689 path: file.path.clone(),
690 };
691 let entry_id = file.entry_id;
692
693 // Check if the file should be read-only based on settings
694 let settings = WorktreeSettings::get(Some((&project_path).into()), cx);
695 let is_read_only = if project_path.path.is_empty() {
696 settings.is_std_path_read_only(&file.full_path(cx))
697 } else {
698 settings.is_path_read_only(&project_path.path)
699 };
700 if is_read_only {
701 buffer.update(cx, |buffer, cx| {
702 buffer.set_capability(Capability::Read, cx);
703 });
704 }
705
706 this.path_to_buffer_id.insert(project_path, buffer_id);
707 let this = this.as_local_mut().unwrap();
708 if let Some(entry_id) = entry_id {
709 this.local_buffer_ids_by_entry_id
710 .insert(entry_id, buffer_id);
711 }
712 }
713
714 anyhow::Ok(())
715 })??;
716
717 Ok(buffer)
718 })
719 }
720
721 fn create_buffer(
722 &self,
723 language: Option<Arc<Language>>,
724 project_searchable: bool,
725 cx: &mut Context<BufferStore>,
726 ) -> Task<Result<Entity<Buffer>>> {
727 cx.spawn(async move |buffer_store, cx| {
728 let buffer = cx.new(|cx| {
729 Buffer::local("", cx)
730 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
731 });
732 buffer_store.update(cx, |buffer_store, cx| {
733 buffer_store.add_buffer(buffer.clone(), cx).log_err();
734 if !project_searchable {
735 buffer_store
736 .non_searchable_buffers
737 .insert(buffer.read(cx).remote_id());
738 }
739 })?;
740 Ok(buffer)
741 })
742 }
743
744 fn reload_buffers(
745 &self,
746 buffers: HashSet<Entity<Buffer>>,
747 push_to_history: bool,
748 cx: &mut Context<BufferStore>,
749 ) -> Task<Result<ProjectTransaction>> {
750 cx.spawn(async move |_, cx| {
751 let mut project_transaction = ProjectTransaction::default();
752 for buffer in buffers {
753 let transaction = buffer.update(cx, |buffer, cx| buffer.reload(cx)).await?;
754 buffer.update(cx, |buffer, cx| {
755 if let Some(transaction) = transaction {
756 if !push_to_history {
757 buffer.forget_transaction(transaction.id);
758 }
759 project_transaction.0.insert(cx.entity(), transaction);
760 }
761 });
762 }
763
764 Ok(project_transaction)
765 })
766 }
767}
768
769impl BufferStore {
770 pub fn init(client: &AnyProtoClient) {
771 client.add_entity_message_handler(Self::handle_buffer_reloaded);
772 client.add_entity_message_handler(Self::handle_buffer_saved);
773 client.add_entity_message_handler(Self::handle_update_buffer_file);
774 client.add_entity_request_handler(Self::handle_save_buffer);
775 client.add_entity_request_handler(Self::handle_reload_buffers);
776 }
777
778 /// Creates a buffer store, optionally retaining its buffers.
779 pub fn local(worktree_store: Entity<WorktreeStore>, cx: &mut Context<Self>) -> Self {
780 Self {
781 state: BufferStoreState::Local(LocalBufferStore {
782 local_buffer_ids_by_entry_id: Default::default(),
783 worktree_store: worktree_store.clone(),
784 _subscription: cx.subscribe(&worktree_store, |this, _, event, cx| {
785 if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
786 let this = this.as_local_mut().unwrap();
787 this.subscribe_to_worktree(worktree, cx);
788 }
789 }),
790 }),
791 downstream_client: None,
792 opened_buffers: Default::default(),
793 path_to_buffer_id: Default::default(),
794 shared_buffers: Default::default(),
795 loading_buffers: Default::default(),
796 non_searchable_buffers: Default::default(),
797 worktree_store,
798 project_search: Default::default(),
799 }
800 }
801
802 pub fn remote(
803 worktree_store: Entity<WorktreeStore>,
804 upstream_client: AnyProtoClient,
805 remote_id: u64,
806 _cx: &mut Context<Self>,
807 ) -> Self {
808 Self {
809 state: BufferStoreState::Remote(RemoteBufferStore {
810 shared_with_me: Default::default(),
811 loading_remote_buffers_by_id: Default::default(),
812 remote_buffer_listeners: Default::default(),
813 project_id: remote_id,
814 upstream_client,
815 worktree_store: worktree_store.clone(),
816 }),
817 downstream_client: None,
818 opened_buffers: Default::default(),
819 path_to_buffer_id: Default::default(),
820 loading_buffers: Default::default(),
821 shared_buffers: Default::default(),
822 non_searchable_buffers: Default::default(),
823 worktree_store,
824 project_search: Default::default(),
825 }
826 }
827
828 fn as_local_mut(&mut self) -> Option<&mut LocalBufferStore> {
829 match &mut self.state {
830 BufferStoreState::Local(state) => Some(state),
831 _ => None,
832 }
833 }
834
835 fn as_remote_mut(&mut self) -> Option<&mut RemoteBufferStore> {
836 match &mut self.state {
837 BufferStoreState::Remote(state) => Some(state),
838 _ => None,
839 }
840 }
841
842 fn as_remote(&self) -> Option<&RemoteBufferStore> {
843 match &self.state {
844 BufferStoreState::Remote(state) => Some(state),
845 _ => None,
846 }
847 }
848
849 #[ztracing::instrument(skip_all)]
850 pub fn open_buffer(
851 &mut self,
852 project_path: ProjectPath,
853 cx: &mut Context<Self>,
854 ) -> Task<Result<Entity<Buffer>>> {
855 if let Some(buffer) = self.get_by_path(&project_path) {
856 cx.emit(BufferStoreEvent::BufferOpened {
857 buffer: buffer.clone(),
858 project_path,
859 });
860
861 return Task::ready(Ok(buffer));
862 }
863
864 let task = match self.loading_buffers.entry(project_path.clone()) {
865 hash_map::Entry::Occupied(e) => e.get().clone(),
866 hash_map::Entry::Vacant(entry) => {
867 let path = project_path.path.clone();
868 let Some(worktree) = self
869 .worktree_store
870 .read(cx)
871 .worktree_for_id(project_path.worktree_id, cx)
872 else {
873 return Task::ready(Err(anyhow!("no such worktree")));
874 };
875 let load_buffer = match &self.state {
876 BufferStoreState::Local(this) => this.open_buffer(path, worktree, cx),
877 BufferStoreState::Remote(this) => this.open_buffer(path, worktree, cx),
878 };
879
880 entry
881 .insert(
882 // todo(lw): hot foreground spawn
883 cx.spawn(async move |this, cx| {
884 let load_result = load_buffer.await;
885 this.update(cx, |this, cx| {
886 // Record the fact that the buffer is no longer loading.
887 this.loading_buffers.remove(&project_path);
888
889 let buffer = load_result.map_err(Arc::new)?;
890 cx.emit(BufferStoreEvent::BufferOpened {
891 buffer: buffer.clone(),
892 project_path,
893 });
894
895 Ok(buffer)
896 })?
897 })
898 .shared(),
899 )
900 .clone()
901 }
902 };
903
904 cx.background_spawn(async move {
905 task.await.map_err(|e| {
906 if e.error_code() != ErrorCode::Internal {
907 anyhow!(e.error_code())
908 } else {
909 anyhow!("{e}")
910 }
911 })
912 })
913 }
914
915 pub fn create_buffer(
916 &mut self,
917 language: Option<Arc<Language>>,
918 project_searchable: bool,
919 cx: &mut Context<Self>,
920 ) -> Task<Result<Entity<Buffer>>> {
921 match &self.state {
922 BufferStoreState::Local(this) => this.create_buffer(language, project_searchable, cx),
923 BufferStoreState::Remote(this) => this.create_buffer(language, project_searchable, cx),
924 }
925 }
926
927 pub fn save_buffer(
928 &mut self,
929 buffer: Entity<Buffer>,
930 cx: &mut Context<Self>,
931 ) -> Task<Result<()>> {
932 match &mut self.state {
933 BufferStoreState::Local(this) => this.save_buffer(buffer, cx),
934 BufferStoreState::Remote(this) => this.save_remote_buffer(buffer, None, cx),
935 }
936 }
937
938 pub fn save_buffer_as(
939 &mut self,
940 buffer: Entity<Buffer>,
941 path: ProjectPath,
942 cx: &mut Context<Self>,
943 ) -> Task<Result<()>> {
944 let old_file = buffer.read(cx).file().cloned();
945 let task = match &self.state {
946 BufferStoreState::Local(this) => this.save_buffer_as(buffer.clone(), path, cx),
947 BufferStoreState::Remote(this) => {
948 this.save_remote_buffer(buffer.clone(), Some(path.to_proto()), cx)
949 }
950 };
951 cx.spawn(async move |this, cx| {
952 task.await?;
953 this.update(cx, |this, cx| {
954 old_file.clone().and_then(|file| {
955 this.path_to_buffer_id.remove(&ProjectPath {
956 worktree_id: file.worktree_id(cx),
957 path: file.path().clone(),
958 })
959 });
960
961 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
962 })
963 })
964 }
965
966 fn add_buffer(&mut self, buffer_entity: Entity<Buffer>, cx: &mut Context<Self>) -> Result<()> {
967 let buffer = buffer_entity.read(cx);
968 let remote_id = buffer.remote_id();
969 let path = File::from_dyn(buffer.file()).map(|file| ProjectPath {
970 path: file.path.clone(),
971 worktree_id: file.worktree_id(cx),
972 });
973 let is_remote = buffer.replica_id().is_remote();
974 let open_buffer = OpenBuffer::Complete {
975 buffer: buffer_entity.downgrade(),
976 };
977
978 let handle = cx.entity().downgrade();
979 buffer_entity.update(cx, move |_, cx| {
980 cx.on_release(move |buffer, cx| {
981 handle
982 .update(cx, |_, cx| {
983 cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
984 })
985 .ok();
986 })
987 .detach()
988 });
989 let _expect_path_to_exist;
990 match self.opened_buffers.entry(remote_id) {
991 hash_map::Entry::Vacant(entry) => {
992 entry.insert(open_buffer);
993 _expect_path_to_exist = false;
994 }
995 hash_map::Entry::Occupied(mut entry) => {
996 if let OpenBuffer::Operations(operations) = entry.get_mut() {
997 buffer_entity.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
998 } else if entry.get().upgrade().is_some() {
999 if is_remote {
1000 return Ok(());
1001 } else {
1002 debug_panic!("buffer {remote_id} was already registered");
1003 anyhow::bail!("buffer {remote_id} was already registered");
1004 }
1005 }
1006 entry.insert(open_buffer);
1007 _expect_path_to_exist = true;
1008 }
1009 }
1010
1011 if let Some(path) = path {
1012 self.path_to_buffer_id.insert(path, remote_id);
1013 }
1014
1015 cx.subscribe(&buffer_entity, Self::on_buffer_event).detach();
1016 cx.emit(BufferStoreEvent::BufferAdded(buffer_entity));
1017 Ok(())
1018 }
1019
1020 pub fn buffers(&self) -> impl '_ + Iterator<Item = Entity<Buffer>> {
1021 self.opened_buffers
1022 .values()
1023 .filter_map(|buffer| buffer.upgrade())
1024 }
1025
1026 pub(crate) fn is_searchable(&self, id: &BufferId) -> bool {
1027 !self.non_searchable_buffers.contains(&id)
1028 }
1029
1030 pub fn loading_buffers(
1031 &self,
1032 ) -> impl Iterator<Item = (&ProjectPath, impl Future<Output = Result<Entity<Buffer>>>)> {
1033 self.loading_buffers.iter().map(|(path, task)| {
1034 let task = task.clone();
1035 (path, async move {
1036 task.await.map_err(|e| {
1037 if e.error_code() != ErrorCode::Internal {
1038 anyhow!(e.error_code())
1039 } else {
1040 anyhow!("{e}")
1041 }
1042 })
1043 })
1044 })
1045 }
1046
1047 pub fn buffer_id_for_project_path(&self, project_path: &ProjectPath) -> Option<&BufferId> {
1048 self.path_to_buffer_id.get(project_path)
1049 }
1050
1051 pub fn get_by_path(&self, path: &ProjectPath) -> Option<Entity<Buffer>> {
1052 self.path_to_buffer_id
1053 .get(path)
1054 .and_then(|buffer_id| self.get(*buffer_id))
1055 }
1056
1057 pub fn get(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1058 self.opened_buffers.get(&buffer_id)?.upgrade()
1059 }
1060
1061 pub fn get_existing(&self, buffer_id: BufferId) -> Result<Entity<Buffer>> {
1062 self.get(buffer_id)
1063 .with_context(|| format!("unknown buffer id {buffer_id}"))
1064 }
1065
1066 pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1067 self.get(buffer_id).or_else(|| {
1068 self.as_remote()
1069 .and_then(|remote| remote.loading_remote_buffers_by_id.get(&buffer_id).cloned())
1070 })
1071 }
1072
1073 pub fn buffer_version_info(&self, cx: &App) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
1074 let buffers = self
1075 .buffers()
1076 .map(|buffer| {
1077 let buffer = buffer.read(cx);
1078 proto::BufferVersion {
1079 id: buffer.remote_id().into(),
1080 version: language::proto::serialize_version(&buffer.version),
1081 }
1082 })
1083 .collect();
1084 let incomplete_buffer_ids = self
1085 .as_remote()
1086 .map(|remote| remote.incomplete_buffer_ids())
1087 .unwrap_or_default();
1088 (buffers, incomplete_buffer_ids)
1089 }
1090
1091 pub fn disconnected_from_host(&mut self, cx: &mut App) {
1092 for open_buffer in self.opened_buffers.values_mut() {
1093 if let Some(buffer) = open_buffer.upgrade() {
1094 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1095 }
1096 }
1097
1098 for buffer in self.buffers() {
1099 buffer.update(cx, |buffer, cx| {
1100 buffer.set_capability(Capability::ReadOnly, cx)
1101 });
1102 }
1103
1104 if let Some(remote) = self.as_remote_mut() {
1105 // Wake up all futures currently waiting on a buffer to get opened,
1106 // to give them a chance to fail now that we've disconnected.
1107 remote.remote_buffer_listeners.clear()
1108 }
1109 }
1110
1111 pub fn shared(&mut self, remote_id: u64, downstream_client: AnyProtoClient, _cx: &mut App) {
1112 self.downstream_client = Some((downstream_client, remote_id));
1113 }
1114
1115 pub fn unshared(&mut self, _cx: &mut Context<Self>) {
1116 self.downstream_client.take();
1117 self.forget_shared_buffers();
1118 }
1119
1120 pub fn discard_incomplete(&mut self) {
1121 self.opened_buffers
1122 .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
1123 }
1124
1125 fn buffer_changed_file(&mut self, buffer: Entity<Buffer>, cx: &mut App) -> Option<()> {
1126 let file = File::from_dyn(buffer.read(cx).file())?;
1127
1128 let remote_id = buffer.read(cx).remote_id();
1129 if let Some(entry_id) = file.entry_id {
1130 if let Some(local) = self.as_local_mut() {
1131 match local.local_buffer_ids_by_entry_id.get(&entry_id) {
1132 Some(_) => {
1133 return None;
1134 }
1135 None => {
1136 local
1137 .local_buffer_ids_by_entry_id
1138 .insert(entry_id, remote_id);
1139 }
1140 }
1141 }
1142 self.path_to_buffer_id.insert(
1143 ProjectPath {
1144 worktree_id: file.worktree_id(cx),
1145 path: file.path.clone(),
1146 },
1147 remote_id,
1148 );
1149 };
1150
1151 Some(())
1152 }
1153
1154 fn on_buffer_event(
1155 &mut self,
1156 buffer: Entity<Buffer>,
1157 event: &BufferEvent,
1158 cx: &mut Context<Self>,
1159 ) {
1160 match event {
1161 BufferEvent::FileHandleChanged => {
1162 self.buffer_changed_file(buffer, cx);
1163 }
1164 BufferEvent::Reloaded => {
1165 let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
1166 return;
1167 };
1168 let buffer = buffer.read(cx);
1169 downstream_client
1170 .send(proto::BufferReloaded {
1171 project_id: *project_id,
1172 buffer_id: buffer.remote_id().to_proto(),
1173 version: serialize_version(&buffer.version()),
1174 mtime: buffer.saved_mtime().map(|t| t.into()),
1175 line_ending: serialize_line_ending(buffer.line_ending()) as i32,
1176 })
1177 .log_err();
1178 }
1179 BufferEvent::LanguageChanged(_) => {}
1180 _ => {}
1181 }
1182 }
1183
1184 pub async fn handle_update_buffer(
1185 this: Entity<Self>,
1186 envelope: TypedEnvelope<proto::UpdateBuffer>,
1187 mut cx: AsyncApp,
1188 ) -> Result<proto::Ack> {
1189 let payload = envelope.payload;
1190 let buffer_id = BufferId::new(payload.buffer_id)?;
1191 let ops = payload
1192 .operations
1193 .into_iter()
1194 .map(language::proto::deserialize_operation)
1195 .collect::<Result<Vec<_>, _>>()?;
1196 this.update(&mut cx, |this, cx| {
1197 match this.opened_buffers.entry(buffer_id) {
1198 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
1199 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
1200 OpenBuffer::Complete { buffer, .. } => {
1201 if let Some(buffer) = buffer.upgrade() {
1202 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
1203 }
1204 }
1205 },
1206 hash_map::Entry::Vacant(e) => {
1207 e.insert(OpenBuffer::Operations(ops));
1208 }
1209 }
1210 Ok(proto::Ack {})
1211 })
1212 }
1213
1214 pub fn register_shared_lsp_handle(
1215 &mut self,
1216 peer_id: proto::PeerId,
1217 buffer_id: BufferId,
1218 handle: OpenLspBufferHandle,
1219 ) {
1220 if let Some(shared_buffers) = self.shared_buffers.get_mut(&peer_id)
1221 && let Some(buffer) = shared_buffers.get_mut(&buffer_id)
1222 {
1223 buffer.lsp_handle = Some(handle);
1224 return;
1225 }
1226 debug_panic!("tried to register shared lsp handle, but buffer was not shared")
1227 }
1228
1229 pub fn handle_synchronize_buffers(
1230 &mut self,
1231 envelope: TypedEnvelope<proto::SynchronizeBuffers>,
1232 cx: &mut Context<Self>,
1233 client: Arc<Client>,
1234 ) -> Result<proto::SynchronizeBuffersResponse> {
1235 let project_id = envelope.payload.project_id;
1236 let mut response = proto::SynchronizeBuffersResponse {
1237 buffers: Default::default(),
1238 };
1239 let Some(guest_id) = envelope.original_sender_id else {
1240 anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
1241 };
1242
1243 self.shared_buffers.entry(guest_id).or_default().clear();
1244 for buffer in envelope.payload.buffers {
1245 let buffer_id = BufferId::new(buffer.id)?;
1246 let remote_version = language::proto::deserialize_version(&buffer.version);
1247 if let Some(buffer) = self.get(buffer_id) {
1248 self.shared_buffers
1249 .entry(guest_id)
1250 .or_default()
1251 .entry(buffer_id)
1252 .or_insert_with(|| SharedBuffer {
1253 buffer: buffer.clone(),
1254 lsp_handle: None,
1255 });
1256
1257 let buffer = buffer.read(cx);
1258 response.buffers.push(proto::BufferVersion {
1259 id: buffer_id.into(),
1260 version: language::proto::serialize_version(&buffer.version),
1261 });
1262
1263 let operations = buffer.serialize_ops(Some(remote_version), cx);
1264 let client = client.clone();
1265 if let Some(file) = buffer.file() {
1266 client
1267 .send(proto::UpdateBufferFile {
1268 project_id,
1269 buffer_id: buffer_id.into(),
1270 file: Some(file.to_proto(cx)),
1271 })
1272 .log_err();
1273 }
1274
1275 // TODO(max): do something
1276 // client
1277 // .send(proto::UpdateStagedText {
1278 // project_id,
1279 // buffer_id: buffer_id.into(),
1280 // diff_base: buffer.diff_base().map(ToString::to_string),
1281 // })
1282 // .log_err();
1283
1284 client
1285 .send(proto::BufferReloaded {
1286 project_id,
1287 buffer_id: buffer_id.into(),
1288 version: language::proto::serialize_version(buffer.saved_version()),
1289 mtime: buffer.saved_mtime().map(|time| time.into()),
1290 line_ending: language::proto::serialize_line_ending(buffer.line_ending())
1291 as i32,
1292 })
1293 .log_err();
1294
1295 cx.background_spawn(
1296 async move {
1297 let operations = operations.await;
1298 for chunk in split_operations(operations) {
1299 client
1300 .request(proto::UpdateBuffer {
1301 project_id,
1302 buffer_id: buffer_id.into(),
1303 operations: chunk,
1304 })
1305 .await?;
1306 }
1307 anyhow::Ok(())
1308 }
1309 .log_err(),
1310 )
1311 .detach();
1312 }
1313 }
1314 Ok(response)
1315 }
1316
1317 pub fn handle_create_buffer_for_peer(
1318 &mut self,
1319 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
1320 replica_id: ReplicaId,
1321 capability: Capability,
1322 cx: &mut Context<Self>,
1323 ) -> Result<()> {
1324 let remote = self
1325 .as_remote_mut()
1326 .context("buffer store is not a remote")?;
1327
1328 if let Some(buffer) =
1329 remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx)?
1330 {
1331 self.add_buffer(buffer, cx)?;
1332 }
1333
1334 Ok(())
1335 }
1336
1337 pub async fn handle_update_buffer_file(
1338 this: Entity<Self>,
1339 envelope: TypedEnvelope<proto::UpdateBufferFile>,
1340 mut cx: AsyncApp,
1341 ) -> Result<()> {
1342 let buffer_id = envelope.payload.buffer_id;
1343 let buffer_id = BufferId::new(buffer_id)?;
1344
1345 this.update(&mut cx, |this, cx| {
1346 let payload = envelope.payload.clone();
1347 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1348 let file = payload.file.context("invalid file")?;
1349 let worktree = this
1350 .worktree_store
1351 .read(cx)
1352 .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
1353 .context("no such worktree")?;
1354 let file = File::from_proto(file, worktree, cx)?;
1355 let old_file = buffer.update(cx, |buffer, cx| {
1356 let old_file = buffer.file().cloned();
1357 let new_path = file.path.clone();
1358
1359 buffer.file_updated(Arc::new(file), cx);
1360 if old_file.as_ref().is_none_or(|old| *old.path() != new_path) {
1361 Some(old_file)
1362 } else {
1363 None
1364 }
1365 });
1366 if let Some(old_file) = old_file {
1367 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1368 }
1369 }
1370 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1371 downstream_client
1372 .send(proto::UpdateBufferFile {
1373 project_id: *project_id,
1374 buffer_id: buffer_id.into(),
1375 file: envelope.payload.file,
1376 })
1377 .log_err();
1378 }
1379 Ok(())
1380 })
1381 }
1382
1383 pub async fn handle_save_buffer(
1384 this: Entity<Self>,
1385 envelope: TypedEnvelope<proto::SaveBuffer>,
1386 mut cx: AsyncApp,
1387 ) -> Result<proto::BufferSaved> {
1388 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1389 let (buffer, project_id) = this.read_with(&cx, |this, _| {
1390 anyhow::Ok((
1391 this.get_existing(buffer_id)?,
1392 this.downstream_client
1393 .as_ref()
1394 .map(|(_, project_id)| *project_id)
1395 .context("project is not shared")?,
1396 ))
1397 })?;
1398 buffer
1399 .update(&mut cx, |buffer, _| {
1400 buffer.wait_for_version(deserialize_version(&envelope.payload.version))
1401 })
1402 .await?;
1403 let buffer_id = buffer.read_with(&cx, |buffer, _| buffer.remote_id());
1404
1405 if let Some(new_path) = envelope.payload.new_path
1406 && let Some(new_path) = ProjectPath::from_proto(new_path)
1407 {
1408 this.update(&mut cx, |this, cx| {
1409 this.save_buffer_as(buffer.clone(), new_path, cx)
1410 })
1411 .await?;
1412 } else {
1413 this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))
1414 .await?;
1415 }
1416
1417 Ok(buffer.read_with(&cx, |buffer, _| proto::BufferSaved {
1418 project_id,
1419 buffer_id: buffer_id.into(),
1420 version: serialize_version(buffer.saved_version()),
1421 mtime: buffer.saved_mtime().map(|time| time.into()),
1422 }))
1423 }
1424
1425 pub async fn handle_close_buffer(
1426 this: Entity<Self>,
1427 envelope: TypedEnvelope<proto::CloseBuffer>,
1428 mut cx: AsyncApp,
1429 ) -> Result<()> {
1430 let peer_id = envelope.sender_id;
1431 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1432 this.update(&mut cx, |this, cx| {
1433 if let Some(shared) = this.shared_buffers.get_mut(&peer_id)
1434 && shared.remove(&buffer_id).is_some()
1435 {
1436 cx.emit(BufferStoreEvent::SharedBufferClosed(peer_id, buffer_id));
1437 if shared.is_empty() {
1438 this.shared_buffers.remove(&peer_id);
1439 }
1440 return;
1441 }
1442 debug_panic!(
1443 "peer_id {} closed buffer_id {} which was either not open or already closed",
1444 peer_id,
1445 buffer_id
1446 )
1447 });
1448 Ok(())
1449 }
1450
1451 pub async fn handle_buffer_saved(
1452 this: Entity<Self>,
1453 envelope: TypedEnvelope<proto::BufferSaved>,
1454 mut cx: AsyncApp,
1455 ) -> Result<()> {
1456 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1457 let version = deserialize_version(&envelope.payload.version);
1458 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1459 this.update(&mut cx, move |this, cx| {
1460 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1461 buffer.update(cx, |buffer, cx| {
1462 buffer.did_save(version, mtime, cx);
1463 });
1464 }
1465
1466 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1467 downstream_client
1468 .send(proto::BufferSaved {
1469 project_id: *project_id,
1470 buffer_id: buffer_id.into(),
1471 mtime: envelope.payload.mtime,
1472 version: envelope.payload.version,
1473 })
1474 .log_err();
1475 }
1476 });
1477 Ok(())
1478 }
1479
1480 pub async fn handle_buffer_reloaded(
1481 this: Entity<Self>,
1482 envelope: TypedEnvelope<proto::BufferReloaded>,
1483 mut cx: AsyncApp,
1484 ) -> Result<()> {
1485 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1486 let version = deserialize_version(&envelope.payload.version);
1487 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1488 let line_ending = deserialize_line_ending(
1489 proto::LineEnding::from_i32(envelope.payload.line_ending)
1490 .context("missing line ending")?,
1491 );
1492 this.update(&mut cx, |this, cx| {
1493 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1494 buffer.update(cx, |buffer, cx| {
1495 buffer.did_reload(version, line_ending, mtime, cx);
1496 });
1497 }
1498
1499 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1500 downstream_client
1501 .send(proto::BufferReloaded {
1502 project_id: *project_id,
1503 buffer_id: buffer_id.into(),
1504 mtime: envelope.payload.mtime,
1505 version: envelope.payload.version,
1506 line_ending: envelope.payload.line_ending,
1507 })
1508 .log_err();
1509 }
1510 });
1511 Ok(())
1512 }
1513
1514 pub fn reload_buffers(
1515 &self,
1516 buffers: HashSet<Entity<Buffer>>,
1517 push_to_history: bool,
1518 cx: &mut Context<Self>,
1519 ) -> Task<Result<ProjectTransaction>> {
1520 if buffers.is_empty() {
1521 return Task::ready(Ok(ProjectTransaction::default()));
1522 }
1523 match &self.state {
1524 BufferStoreState::Local(this) => this.reload_buffers(buffers, push_to_history, cx),
1525 BufferStoreState::Remote(this) => this.reload_buffers(buffers, push_to_history, cx),
1526 }
1527 }
1528
1529 async fn handle_reload_buffers(
1530 this: Entity<Self>,
1531 envelope: TypedEnvelope<proto::ReloadBuffers>,
1532 mut cx: AsyncApp,
1533 ) -> Result<proto::ReloadBuffersResponse> {
1534 let sender_id = envelope.original_sender_id().unwrap_or_default();
1535 let reload = this.update(&mut cx, |this, cx| {
1536 let mut buffers = HashSet::default();
1537 for buffer_id in &envelope.payload.buffer_ids {
1538 let buffer_id = BufferId::new(*buffer_id)?;
1539 buffers.insert(this.get_existing(buffer_id)?);
1540 }
1541 anyhow::Ok(this.reload_buffers(buffers, false, cx))
1542 })?;
1543
1544 let project_transaction = reload.await?;
1545 let project_transaction = this.update(&mut cx, |this, cx| {
1546 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
1547 });
1548 Ok(proto::ReloadBuffersResponse {
1549 transaction: Some(project_transaction),
1550 })
1551 }
1552
1553 pub fn create_buffer_for_peer(
1554 &mut self,
1555 buffer: &Entity<Buffer>,
1556 peer_id: proto::PeerId,
1557 cx: &mut Context<Self>,
1558 ) -> Task<Result<()>> {
1559 let buffer_id = buffer.read(cx).remote_id();
1560 let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
1561 if shared_buffers.contains_key(&buffer_id) {
1562 return Task::ready(Ok(()));
1563 }
1564 shared_buffers.insert(
1565 buffer_id,
1566 SharedBuffer {
1567 buffer: buffer.clone(),
1568 lsp_handle: None,
1569 },
1570 );
1571
1572 let Some((client, project_id)) = self.downstream_client.clone() else {
1573 return Task::ready(Ok(()));
1574 };
1575
1576 cx.spawn(async move |this, cx| {
1577 let Some(buffer) = this.read_with(cx, |this, _| this.get(buffer_id))? else {
1578 return anyhow::Ok(());
1579 };
1580
1581 let operations = buffer.update(cx, |b, cx| b.serialize_ops(None, cx));
1582 let operations = operations.await;
1583 let state = buffer.update(cx, |buffer, cx| buffer.to_proto(cx));
1584
1585 let initial_state = proto::CreateBufferForPeer {
1586 project_id,
1587 peer_id: Some(peer_id),
1588 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
1589 };
1590
1591 if client.send(initial_state).log_err().is_some() {
1592 let client = client.clone();
1593 cx.background_spawn(async move {
1594 let mut chunks = split_operations(operations).peekable();
1595 while let Some(chunk) = chunks.next() {
1596 let is_last = chunks.peek().is_none();
1597 client.send(proto::CreateBufferForPeer {
1598 project_id,
1599 peer_id: Some(peer_id),
1600 variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
1601 proto::BufferChunk {
1602 buffer_id: buffer_id.into(),
1603 operations: chunk,
1604 is_last,
1605 },
1606 )),
1607 })?;
1608 }
1609 anyhow::Ok(())
1610 })
1611 .await
1612 .log_err();
1613 }
1614 Ok(())
1615 })
1616 }
1617
1618 pub fn forget_shared_buffers(&mut self) {
1619 self.shared_buffers.clear();
1620 }
1621
1622 pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
1623 self.shared_buffers.remove(peer_id);
1624 }
1625
1626 pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
1627 if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
1628 self.shared_buffers.insert(new_peer_id, buffers);
1629 }
1630 }
1631
1632 pub fn has_shared_buffers(&self) -> bool {
1633 !self.shared_buffers.is_empty()
1634 }
1635
1636 pub fn create_local_buffer(
1637 &mut self,
1638 text: &str,
1639 language: Option<Arc<Language>>,
1640 project_searchable: bool,
1641 cx: &mut Context<Self>,
1642 ) -> Entity<Buffer> {
1643 let buffer = cx.new(|cx| {
1644 Buffer::local(text, cx)
1645 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
1646 });
1647
1648 self.add_buffer(buffer.clone(), cx).log_err();
1649 let buffer_id = buffer.read(cx).remote_id();
1650 if !project_searchable {
1651 self.non_searchable_buffers.insert(buffer_id);
1652 }
1653
1654 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
1655 self.path_to_buffer_id.insert(
1656 ProjectPath {
1657 worktree_id: file.worktree_id(cx),
1658 path: file.path.clone(),
1659 },
1660 buffer_id,
1661 );
1662 let this = self
1663 .as_local_mut()
1664 .expect("local-only method called in a non-local context");
1665 if let Some(entry_id) = file.entry_id {
1666 this.local_buffer_ids_by_entry_id
1667 .insert(entry_id, buffer_id);
1668 }
1669 }
1670 buffer
1671 }
1672
1673 pub fn deserialize_project_transaction(
1674 &mut self,
1675 message: proto::ProjectTransaction,
1676 push_to_history: bool,
1677 cx: &mut Context<Self>,
1678 ) -> Task<Result<ProjectTransaction>> {
1679 if let Some(this) = self.as_remote_mut() {
1680 this.deserialize_project_transaction(message, push_to_history, cx)
1681 } else {
1682 debug_panic!("not a remote buffer store");
1683 Task::ready(Err(anyhow!("not a remote buffer store")))
1684 }
1685 }
1686
1687 pub fn wait_for_remote_buffer(
1688 &mut self,
1689 id: BufferId,
1690 cx: &mut Context<BufferStore>,
1691 ) -> Task<Result<Entity<Buffer>>> {
1692 if let Some(this) = self.as_remote_mut() {
1693 this.wait_for_remote_buffer(id, cx)
1694 } else {
1695 debug_panic!("not a remote buffer store");
1696 Task::ready(Err(anyhow!("not a remote buffer store")))
1697 }
1698 }
1699
1700 pub fn serialize_project_transaction_for_peer(
1701 &mut self,
1702 project_transaction: ProjectTransaction,
1703 peer_id: proto::PeerId,
1704 cx: &mut Context<Self>,
1705 ) -> proto::ProjectTransaction {
1706 let mut serialized_transaction = proto::ProjectTransaction {
1707 buffer_ids: Default::default(),
1708 transactions: Default::default(),
1709 };
1710 for (buffer, transaction) in project_transaction.0 {
1711 self.create_buffer_for_peer(&buffer, peer_id, cx)
1712 .detach_and_log_err(cx);
1713 serialized_transaction
1714 .buffer_ids
1715 .push(buffer.read(cx).remote_id().into());
1716 serialized_transaction
1717 .transactions
1718 .push(language::proto::serialize_transaction(&transaction));
1719 }
1720 serialized_transaction
1721 }
1722
1723 pub(crate) fn register_project_search_result_handle(
1724 &mut self,
1725 ) -> (u64, smol::channel::Receiver<BufferId>) {
1726 let (tx, rx) = smol::channel::unbounded();
1727 let handle = util::post_inc(&mut self.project_search.next_id);
1728 let _old_entry = self.project_search.chunks.insert(handle, tx);
1729 debug_assert!(_old_entry.is_none());
1730 (handle, rx)
1731 }
1732
1733 pub fn register_ongoing_project_search(
1734 &mut self,
1735 id: (PeerId, u64),
1736 search: Task<anyhow::Result<()>>,
1737 ) {
1738 let _old = self.project_search.searches_in_progress.insert(id, search);
1739 debug_assert!(_old.is_none());
1740 }
1741
1742 pub async fn handle_find_search_candidates_cancel(
1743 this: Entity<Self>,
1744 envelope: TypedEnvelope<proto::FindSearchCandidatesCancelled>,
1745 mut cx: AsyncApp,
1746 ) -> Result<()> {
1747 let id = (
1748 envelope.original_sender_id.unwrap_or(envelope.sender_id),
1749 envelope.payload.handle,
1750 );
1751 let _ = this.update(&mut cx, |this, _| {
1752 this.project_search.searches_in_progress.remove(&id)
1753 });
1754 Ok(())
1755 }
1756
1757 pub(crate) async fn handle_find_search_candidates_chunk(
1758 this: Entity<Self>,
1759 envelope: TypedEnvelope<proto::FindSearchCandidatesChunk>,
1760 mut cx: AsyncApp,
1761 ) -> Result<proto::Ack> {
1762 use proto::find_search_candidates_chunk::Variant;
1763 let handle = envelope.payload.handle;
1764
1765 let buffer_ids = match envelope
1766 .payload
1767 .variant
1768 .context("Expected non-null variant")?
1769 {
1770 Variant::Matches(find_search_candidates_matches) => find_search_candidates_matches
1771 .buffer_ids
1772 .into_iter()
1773 .filter_map(|buffer_id| BufferId::new(buffer_id).ok())
1774 .collect::<Vec<_>>(),
1775 Variant::Done(_) => {
1776 this.update(&mut cx, |this, _| {
1777 this.project_search.chunks.remove(&handle)
1778 });
1779 return Ok(proto::Ack {});
1780 }
1781 };
1782 let Some(sender) = this.read_with(&mut cx, |this, _| {
1783 this.project_search.chunks.get(&handle).cloned()
1784 }) else {
1785 return Ok(proto::Ack {});
1786 };
1787
1788 for buffer_id in buffer_ids {
1789 let Ok(_) = sender.send(buffer_id).await else {
1790 this.update(&mut cx, |this, _| {
1791 this.project_search.chunks.remove(&handle)
1792 });
1793 return Ok(proto::Ack {});
1794 };
1795 }
1796 Ok(proto::Ack {})
1797 }
1798}
1799
1800impl OpenBuffer {
1801 fn upgrade(&self) -> Option<Entity<Buffer>> {
1802 match self {
1803 OpenBuffer::Complete { buffer, .. } => buffer.upgrade(),
1804 OpenBuffer::Operations(_) => None,
1805 }
1806 }
1807}
1808
1809fn is_not_found_error(error: &anyhow::Error) -> bool {
1810 error
1811 .root_cause()
1812 .downcast_ref::<io::Error>()
1813 .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
1814}