1use crate::{
2 ProjectItem as _, ProjectPath,
3 lsp_store::OpenLspBufferHandle,
4 search::SearchQuery,
5 worktree_store::{WorktreeStore, WorktreeStoreEvent},
6};
7use anyhow::{Context as _, Result, anyhow};
8use client::Client;
9use collections::{HashMap, HashSet, hash_map};
10use fs::Fs;
11use futures::{Future, FutureExt as _, StreamExt, channel::oneshot, future::Shared};
12use gpui::{
13 App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity,
14};
15use language::{
16 Buffer, BufferEvent, Capability, DiskState, File as _, Language, Operation,
17 proto::{
18 deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
19 split_operations,
20 },
21};
22use rpc::{
23 AnyProtoClient, ErrorExt as _, TypedEnvelope,
24 proto::{self, ToProto},
25};
26use smol::channel::Receiver;
27use std::{io, path::Path, pin::pin, sync::Arc, time::Instant};
28use text::BufferId;
29use util::{ResultExt as _, TryFutureExt, debug_panic, maybe};
30use worktree::{File, PathChange, ProjectEntryId, Worktree, WorktreeId};
31
32/// A set of open buffers.
33pub struct BufferStore {
34 state: BufferStoreState,
35 #[allow(clippy::type_complexity)]
36 loading_buffers: HashMap<ProjectPath, Shared<Task<Result<Entity<Buffer>, Arc<anyhow::Error>>>>>,
37 worktree_store: Entity<WorktreeStore>,
38 opened_buffers: HashMap<BufferId, OpenBuffer>,
39 path_to_buffer_id: HashMap<ProjectPath, BufferId>,
40 downstream_client: Option<(AnyProtoClient, u64)>,
41 shared_buffers: HashMap<proto::PeerId, HashMap<BufferId, SharedBuffer>>,
42 non_searchable_buffers: HashSet<BufferId>,
43}
44
45#[derive(Hash, Eq, PartialEq, Clone)]
46struct SharedBuffer {
47 buffer: Entity<Buffer>,
48 lsp_handle: Option<OpenLspBufferHandle>,
49}
50
51enum BufferStoreState {
52 Local(LocalBufferStore),
53 Remote(RemoteBufferStore),
54}
55
56struct RemoteBufferStore {
57 shared_with_me: HashSet<Entity<Buffer>>,
58 upstream_client: AnyProtoClient,
59 project_id: u64,
60 loading_remote_buffers_by_id: HashMap<BufferId, Entity<Buffer>>,
61 remote_buffer_listeners:
62 HashMap<BufferId, Vec<oneshot::Sender<anyhow::Result<Entity<Buffer>>>>>,
63 worktree_store: Entity<WorktreeStore>,
64}
65
66struct LocalBufferStore {
67 local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
68 worktree_store: Entity<WorktreeStore>,
69 _subscription: Subscription,
70}
71
72enum OpenBuffer {
73 Complete { buffer: WeakEntity<Buffer> },
74 Operations(Vec<Operation>),
75}
76
77pub enum BufferStoreEvent {
78 BufferAdded(Entity<Buffer>),
79 BufferOpened {
80 buffer: Entity<Buffer>,
81 project_path: ProjectPath,
82 },
83 SharedBufferClosed(proto::PeerId, BufferId),
84 BufferDropped(BufferId),
85 BufferChangedFilePath {
86 buffer: Entity<Buffer>,
87 old_file: Option<Arc<dyn language::File>>,
88 },
89}
90
91#[derive(Default, Debug)]
92pub struct ProjectTransaction(pub HashMap<Entity<Buffer>, language::Transaction>);
93
94impl EventEmitter<BufferStoreEvent> for BufferStore {}
95
96impl RemoteBufferStore {
97 pub fn wait_for_remote_buffer(
98 &mut self,
99 id: BufferId,
100 cx: &mut Context<BufferStore>,
101 ) -> Task<Result<Entity<Buffer>>> {
102 let (tx, rx) = oneshot::channel();
103 self.remote_buffer_listeners.entry(id).or_default().push(tx);
104
105 cx.spawn(async move |this, cx| {
106 if let Some(buffer) = this
107 .read_with(cx, |buffer_store, _| buffer_store.get(id))
108 .ok()
109 .flatten()
110 {
111 return Ok(buffer);
112 }
113
114 cx.background_spawn(async move { rx.await? }).await
115 })
116 }
117
118 fn save_remote_buffer(
119 &self,
120 buffer_handle: Entity<Buffer>,
121 new_path: Option<proto::ProjectPath>,
122 cx: &Context<BufferStore>,
123 ) -> Task<Result<()>> {
124 let buffer = buffer_handle.read(cx);
125 let buffer_id = buffer.remote_id().into();
126 let version = buffer.version();
127 let rpc = self.upstream_client.clone();
128 let project_id = self.project_id;
129 cx.spawn(async move |_, cx| {
130 let response = rpc
131 .request(proto::SaveBuffer {
132 project_id,
133 buffer_id,
134 new_path,
135 version: serialize_version(&version),
136 })
137 .await?;
138 let version = deserialize_version(&response.version);
139 let mtime = response.mtime.map(|mtime| mtime.into());
140
141 buffer_handle.update(cx, |buffer, cx| {
142 buffer.did_save(version.clone(), mtime, cx);
143 })?;
144
145 Ok(())
146 })
147 }
148
149 pub fn handle_create_buffer_for_peer(
150 &mut self,
151 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
152 replica_id: u16,
153 capability: Capability,
154 cx: &mut Context<BufferStore>,
155 ) -> Result<Option<Entity<Buffer>>> {
156 match envelope.payload.variant.context("missing variant")? {
157 proto::create_buffer_for_peer::Variant::State(mut state) => {
158 let buffer_id = BufferId::new(state.id)?;
159
160 let buffer_result = maybe!({
161 let mut buffer_file = None;
162 if let Some(file) = state.file.take() {
163 let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
164 let worktree = self
165 .worktree_store
166 .read(cx)
167 .worktree_for_id(worktree_id, cx)
168 .with_context(|| {
169 format!("no worktree found for id {}", file.worktree_id)
170 })?;
171 buffer_file = Some(Arc::new(File::from_proto(file, worktree, cx)?)
172 as Arc<dyn language::File>);
173 }
174 Buffer::from_proto(replica_id, capability, state, buffer_file)
175 });
176
177 match buffer_result {
178 Ok(buffer) => {
179 let buffer = cx.new(|_| buffer);
180 self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
181 }
182 Err(error) => {
183 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
184 for listener in listeners {
185 listener.send(Err(anyhow!(error.cloned()))).ok();
186 }
187 }
188 }
189 }
190 }
191 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
192 let buffer_id = BufferId::new(chunk.buffer_id)?;
193 let buffer = self
194 .loading_remote_buffers_by_id
195 .get(&buffer_id)
196 .cloned()
197 .with_context(|| {
198 format!(
199 "received chunk for buffer {} without initial state",
200 chunk.buffer_id
201 )
202 })?;
203
204 let result = maybe!({
205 let operations = chunk
206 .operations
207 .into_iter()
208 .map(language::proto::deserialize_operation)
209 .collect::<Result<Vec<_>>>()?;
210 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
211 anyhow::Ok(())
212 });
213
214 if let Err(error) = result {
215 self.loading_remote_buffers_by_id.remove(&buffer_id);
216 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
217 for listener in listeners {
218 listener.send(Err(error.cloned())).ok();
219 }
220 }
221 } else if chunk.is_last {
222 self.loading_remote_buffers_by_id.remove(&buffer_id);
223 if self.upstream_client.is_via_collab() {
224 // retain buffers sent by peers to avoid races.
225 self.shared_with_me.insert(buffer.clone());
226 }
227
228 if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) {
229 for sender in senders {
230 sender.send(Ok(buffer.clone())).ok();
231 }
232 }
233 return Ok(Some(buffer));
234 }
235 }
236 }
237 Ok(None)
238 }
239
240 pub fn incomplete_buffer_ids(&self) -> Vec<BufferId> {
241 self.loading_remote_buffers_by_id
242 .keys()
243 .copied()
244 .collect::<Vec<_>>()
245 }
246
247 pub fn deserialize_project_transaction(
248 &self,
249 message: proto::ProjectTransaction,
250 push_to_history: bool,
251 cx: &mut Context<BufferStore>,
252 ) -> Task<Result<ProjectTransaction>> {
253 cx.spawn(async move |this, cx| {
254 let mut project_transaction = ProjectTransaction::default();
255 for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
256 {
257 let buffer_id = BufferId::new(buffer_id)?;
258 let buffer = this
259 .update(cx, |this, cx| this.wait_for_remote_buffer(buffer_id, cx))?
260 .await?;
261 let transaction = language::proto::deserialize_transaction(transaction)?;
262 project_transaction.0.insert(buffer, transaction);
263 }
264
265 for (buffer, transaction) in &project_transaction.0 {
266 buffer
267 .update(cx, |buffer, _| {
268 buffer.wait_for_edits(transaction.edit_ids.iter().copied())
269 })?
270 .await?;
271
272 if push_to_history {
273 buffer.update(cx, |buffer, _| {
274 buffer.push_transaction(transaction.clone(), Instant::now());
275 buffer.finalize_last_transaction();
276 })?;
277 }
278 }
279
280 Ok(project_transaction)
281 })
282 }
283
284 fn open_buffer(
285 &self,
286 path: Arc<Path>,
287 worktree: Entity<Worktree>,
288 cx: &mut Context<BufferStore>,
289 ) -> Task<Result<Entity<Buffer>>> {
290 let worktree_id = worktree.read(cx).id().to_proto();
291 let project_id = self.project_id;
292 let client = self.upstream_client.clone();
293 cx.spawn(async move |this, cx| {
294 let response = client
295 .request(proto::OpenBufferByPath {
296 project_id,
297 worktree_id,
298 path: path.to_proto(),
299 })
300 .await?;
301 let buffer_id = BufferId::new(response.buffer_id)?;
302
303 let buffer = this
304 .update(cx, {
305 |this, cx| this.wait_for_remote_buffer(buffer_id, cx)
306 })?
307 .await?;
308
309 Ok(buffer)
310 })
311 }
312
313 fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
314 let create = self.upstream_client.request(proto::OpenNewBuffer {
315 project_id: self.project_id,
316 });
317 cx.spawn(async move |this, cx| {
318 let response = create.await?;
319 let buffer_id = BufferId::new(response.buffer_id)?;
320
321 this.update(cx, |this, cx| this.wait_for_remote_buffer(buffer_id, cx))?
322 .await
323 })
324 }
325
326 fn reload_buffers(
327 &self,
328 buffers: HashSet<Entity<Buffer>>,
329 push_to_history: bool,
330 cx: &mut Context<BufferStore>,
331 ) -> Task<Result<ProjectTransaction>> {
332 let request = self.upstream_client.request(proto::ReloadBuffers {
333 project_id: self.project_id,
334 buffer_ids: buffers
335 .iter()
336 .map(|buffer| buffer.read(cx).remote_id().to_proto())
337 .collect(),
338 });
339
340 cx.spawn(async move |this, cx| {
341 let response = request.await?.transaction.context("missing transaction")?;
342 this.update(cx, |this, cx| {
343 this.deserialize_project_transaction(response, push_to_history, cx)
344 })?
345 .await
346 })
347 }
348}
349
350impl LocalBufferStore {
351 fn save_local_buffer(
352 &self,
353 buffer_handle: Entity<Buffer>,
354 worktree: Entity<Worktree>,
355 path: Arc<Path>,
356 mut has_changed_file: bool,
357 cx: &mut Context<BufferStore>,
358 ) -> Task<Result<()>> {
359 let buffer = buffer_handle.read(cx);
360
361 let text = buffer.as_rope().clone();
362 let line_ending = buffer.line_ending();
363 let version = buffer.version();
364 let buffer_id = buffer.remote_id();
365 let file = buffer.file().cloned();
366 if file
367 .as_ref()
368 .is_some_and(|file| file.disk_state() == DiskState::New)
369 {
370 has_changed_file = true;
371 }
372
373 let save = worktree.update(cx, |worktree, cx| {
374 worktree.write_file(path.as_ref(), text, line_ending, cx)
375 });
376
377 cx.spawn(async move |this, cx| {
378 let new_file = save.await?;
379 let mtime = new_file.disk_state().mtime();
380 this.update(cx, |this, cx| {
381 if let Some((downstream_client, project_id)) = this.downstream_client.clone() {
382 if has_changed_file {
383 downstream_client
384 .send(proto::UpdateBufferFile {
385 project_id,
386 buffer_id: buffer_id.to_proto(),
387 file: Some(language::File::to_proto(&*new_file, cx)),
388 })
389 .log_err();
390 }
391 downstream_client
392 .send(proto::BufferSaved {
393 project_id,
394 buffer_id: buffer_id.to_proto(),
395 version: serialize_version(&version),
396 mtime: mtime.map(|time| time.into()),
397 })
398 .log_err();
399 }
400 })?;
401 buffer_handle.update(cx, |buffer, cx| {
402 if has_changed_file {
403 buffer.file_updated(new_file, cx);
404 }
405 buffer.did_save(version.clone(), mtime, cx);
406 })
407 })
408 }
409
410 fn subscribe_to_worktree(
411 &mut self,
412 worktree: &Entity<Worktree>,
413 cx: &mut Context<BufferStore>,
414 ) {
415 cx.subscribe(worktree, |this, worktree, event, cx| {
416 if worktree.read(cx).is_local()
417 && let worktree::Event::UpdatedEntries(changes) = event
418 {
419 Self::local_worktree_entries_changed(this, &worktree, changes, cx);
420 }
421 })
422 .detach();
423 }
424
425 fn local_worktree_entries_changed(
426 this: &mut BufferStore,
427 worktree_handle: &Entity<Worktree>,
428 changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
429 cx: &mut Context<BufferStore>,
430 ) {
431 let snapshot = worktree_handle.read(cx).snapshot();
432 for (path, entry_id, _) in changes {
433 Self::local_worktree_entry_changed(
434 this,
435 *entry_id,
436 path,
437 worktree_handle,
438 &snapshot,
439 cx,
440 );
441 }
442 }
443
444 fn local_worktree_entry_changed(
445 this: &mut BufferStore,
446 entry_id: ProjectEntryId,
447 path: &Arc<Path>,
448 worktree: &Entity<worktree::Worktree>,
449 snapshot: &worktree::Snapshot,
450 cx: &mut Context<BufferStore>,
451 ) -> Option<()> {
452 let project_path = ProjectPath {
453 worktree_id: snapshot.id(),
454 path: path.clone(),
455 };
456
457 let buffer_id = this
458 .as_local_mut()
459 .and_then(|local| local.local_buffer_ids_by_entry_id.get(&entry_id))
460 .copied()
461 .or_else(|| this.path_to_buffer_id.get(&project_path).copied())?;
462
463 let buffer = if let Some(buffer) = this.get(buffer_id) {
464 Some(buffer)
465 } else {
466 this.opened_buffers.remove(&buffer_id);
467 None
468 };
469
470 let buffer = if let Some(buffer) = buffer {
471 buffer
472 } else {
473 this.path_to_buffer_id.remove(&project_path);
474 let this = this.as_local_mut()?;
475 this.local_buffer_ids_by_entry_id.remove(&entry_id);
476 return None;
477 };
478
479 let events = buffer.update(cx, |buffer, cx| {
480 let file = buffer.file()?;
481 let old_file = File::from_dyn(Some(file))?;
482 if old_file.worktree != *worktree {
483 return None;
484 }
485
486 let snapshot_entry = old_file
487 .entry_id
488 .and_then(|entry_id| snapshot.entry_for_id(entry_id))
489 .or_else(|| snapshot.entry_for_path(old_file.path.as_ref()));
490
491 let new_file = if let Some(entry) = snapshot_entry {
492 File {
493 disk_state: match entry.mtime {
494 Some(mtime) => DiskState::Present { mtime },
495 None => old_file.disk_state,
496 },
497 is_local: true,
498 entry_id: Some(entry.id),
499 path: entry.path.clone(),
500 worktree: worktree.clone(),
501 is_private: entry.is_private,
502 }
503 } else {
504 File {
505 disk_state: DiskState::Deleted,
506 is_local: true,
507 entry_id: old_file.entry_id,
508 path: old_file.path.clone(),
509 worktree: worktree.clone(),
510 is_private: old_file.is_private,
511 }
512 };
513
514 if new_file == *old_file {
515 return None;
516 }
517
518 let mut events = Vec::new();
519 if new_file.path != old_file.path {
520 this.path_to_buffer_id.remove(&ProjectPath {
521 path: old_file.path.clone(),
522 worktree_id: old_file.worktree_id(cx),
523 });
524 this.path_to_buffer_id.insert(
525 ProjectPath {
526 worktree_id: new_file.worktree_id(cx),
527 path: new_file.path.clone(),
528 },
529 buffer_id,
530 );
531 events.push(BufferStoreEvent::BufferChangedFilePath {
532 buffer: cx.entity(),
533 old_file: buffer.file().cloned(),
534 });
535 }
536 let local = this.as_local_mut()?;
537 if new_file.entry_id != old_file.entry_id {
538 if let Some(entry_id) = old_file.entry_id {
539 local.local_buffer_ids_by_entry_id.remove(&entry_id);
540 }
541 if let Some(entry_id) = new_file.entry_id {
542 local
543 .local_buffer_ids_by_entry_id
544 .insert(entry_id, buffer_id);
545 }
546 }
547
548 if let Some((client, project_id)) = &this.downstream_client {
549 client
550 .send(proto::UpdateBufferFile {
551 project_id: *project_id,
552 buffer_id: buffer_id.to_proto(),
553 file: Some(new_file.to_proto(cx)),
554 })
555 .ok();
556 }
557
558 buffer.file_updated(Arc::new(new_file), cx);
559 Some(events)
560 })?;
561
562 for event in events {
563 cx.emit(event);
564 }
565
566 None
567 }
568
569 fn save_buffer(
570 &self,
571 buffer: Entity<Buffer>,
572 cx: &mut Context<BufferStore>,
573 ) -> Task<Result<()>> {
574 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
575 return Task::ready(Err(anyhow!("buffer doesn't have a file")));
576 };
577 let worktree = file.worktree.clone();
578 self.save_local_buffer(buffer, worktree, file.path.clone(), false, cx)
579 }
580
581 fn save_buffer_as(
582 &self,
583 buffer: Entity<Buffer>,
584 path: ProjectPath,
585 cx: &mut Context<BufferStore>,
586 ) -> Task<Result<()>> {
587 let Some(worktree) = self
588 .worktree_store
589 .read(cx)
590 .worktree_for_id(path.worktree_id, cx)
591 else {
592 return Task::ready(Err(anyhow!("no such worktree")));
593 };
594 self.save_local_buffer(buffer, worktree, path.path, true, cx)
595 }
596
597 fn open_buffer(
598 &self,
599 path: Arc<Path>,
600 worktree: Entity<Worktree>,
601 cx: &mut Context<BufferStore>,
602 ) -> Task<Result<Entity<Buffer>>> {
603 let load_buffer = worktree.update(cx, |worktree, cx| {
604 let load_file = worktree.load_file(path.as_ref(), cx);
605 let reservation = cx.reserve_entity();
606 let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
607 cx.spawn(async move |_, cx| {
608 let loaded = load_file.await?;
609 let text_buffer = cx
610 .background_spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
611 .await;
612 cx.insert_entity(reservation, |_| {
613 Buffer::build(text_buffer, Some(loaded.file), Capability::ReadWrite)
614 })
615 })
616 });
617
618 cx.spawn(async move |this, cx| {
619 let buffer = match load_buffer.await {
620 Ok(buffer) => Ok(buffer),
621 Err(error) if is_not_found_error(&error) => cx.new(|cx| {
622 let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
623 let text_buffer = text::Buffer::new(0, buffer_id, "");
624 Buffer::build(
625 text_buffer,
626 Some(Arc::new(File {
627 worktree,
628 path,
629 disk_state: DiskState::New,
630 entry_id: None,
631 is_local: true,
632 is_private: false,
633 })),
634 Capability::ReadWrite,
635 )
636 }),
637 Err(e) => Err(e),
638 }?;
639 this.update(cx, |this, cx| {
640 this.add_buffer(buffer.clone(), cx)?;
641 let buffer_id = buffer.read(cx).remote_id();
642 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
643 this.path_to_buffer_id.insert(
644 ProjectPath {
645 worktree_id: file.worktree_id(cx),
646 path: file.path.clone(),
647 },
648 buffer_id,
649 );
650 let this = this.as_local_mut().unwrap();
651 if let Some(entry_id) = file.entry_id {
652 this.local_buffer_ids_by_entry_id
653 .insert(entry_id, buffer_id);
654 }
655 }
656
657 anyhow::Ok(())
658 })??;
659
660 Ok(buffer)
661 })
662 }
663
664 fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
665 cx.spawn(async move |buffer_store, cx| {
666 let buffer =
667 cx.new(|cx| Buffer::local("", cx).with_language(language::PLAIN_TEXT.clone(), cx))?;
668 buffer_store.update(cx, |buffer_store, cx| {
669 buffer_store.add_buffer(buffer.clone(), cx).log_err();
670 })?;
671 Ok(buffer)
672 })
673 }
674
675 fn reload_buffers(
676 &self,
677 buffers: HashSet<Entity<Buffer>>,
678 push_to_history: bool,
679 cx: &mut Context<BufferStore>,
680 ) -> Task<Result<ProjectTransaction>> {
681 cx.spawn(async move |_, cx| {
682 let mut project_transaction = ProjectTransaction::default();
683 for buffer in buffers {
684 let transaction = buffer.update(cx, |buffer, cx| buffer.reload(cx))?.await?;
685 buffer.update(cx, |buffer, cx| {
686 if let Some(transaction) = transaction {
687 if !push_to_history {
688 buffer.forget_transaction(transaction.id);
689 }
690 project_transaction.0.insert(cx.entity(), transaction);
691 }
692 })?;
693 }
694
695 Ok(project_transaction)
696 })
697 }
698}
699
700impl BufferStore {
701 pub fn init(client: &AnyProtoClient) {
702 client.add_entity_message_handler(Self::handle_buffer_reloaded);
703 client.add_entity_message_handler(Self::handle_buffer_saved);
704 client.add_entity_message_handler(Self::handle_update_buffer_file);
705 client.add_entity_request_handler(Self::handle_save_buffer);
706 client.add_entity_request_handler(Self::handle_reload_buffers);
707 }
708
709 /// Creates a buffer store, optionally retaining its buffers.
710 pub fn local(worktree_store: Entity<WorktreeStore>, cx: &mut Context<Self>) -> Self {
711 Self {
712 state: BufferStoreState::Local(LocalBufferStore {
713 local_buffer_ids_by_entry_id: Default::default(),
714 worktree_store: worktree_store.clone(),
715 _subscription: cx.subscribe(&worktree_store, |this, _, event, cx| {
716 if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
717 let this = this.as_local_mut().unwrap();
718 this.subscribe_to_worktree(worktree, cx);
719 }
720 }),
721 }),
722 downstream_client: None,
723 opened_buffers: Default::default(),
724 path_to_buffer_id: Default::default(),
725 shared_buffers: Default::default(),
726 loading_buffers: Default::default(),
727 non_searchable_buffers: Default::default(),
728 worktree_store,
729 }
730 }
731
732 pub fn remote(
733 worktree_store: Entity<WorktreeStore>,
734 upstream_client: AnyProtoClient,
735 remote_id: u64,
736 _cx: &mut Context<Self>,
737 ) -> Self {
738 Self {
739 state: BufferStoreState::Remote(RemoteBufferStore {
740 shared_with_me: Default::default(),
741 loading_remote_buffers_by_id: Default::default(),
742 remote_buffer_listeners: Default::default(),
743 project_id: remote_id,
744 upstream_client,
745 worktree_store: worktree_store.clone(),
746 }),
747 downstream_client: None,
748 opened_buffers: Default::default(),
749 path_to_buffer_id: Default::default(),
750 loading_buffers: Default::default(),
751 shared_buffers: Default::default(),
752 non_searchable_buffers: Default::default(),
753 worktree_store,
754 }
755 }
756
757 fn as_local_mut(&mut self) -> Option<&mut LocalBufferStore> {
758 match &mut self.state {
759 BufferStoreState::Local(state) => Some(state),
760 _ => None,
761 }
762 }
763
764 fn as_remote_mut(&mut self) -> Option<&mut RemoteBufferStore> {
765 match &mut self.state {
766 BufferStoreState::Remote(state) => Some(state),
767 _ => None,
768 }
769 }
770
771 fn as_remote(&self) -> Option<&RemoteBufferStore> {
772 match &self.state {
773 BufferStoreState::Remote(state) => Some(state),
774 _ => None,
775 }
776 }
777
778 pub fn open_buffer(
779 &mut self,
780 project_path: ProjectPath,
781 cx: &mut Context<Self>,
782 ) -> Task<Result<Entity<Buffer>>> {
783 if let Some(buffer) = self.get_by_path(&project_path) {
784 cx.emit(BufferStoreEvent::BufferOpened {
785 buffer: buffer.clone(),
786 project_path,
787 });
788
789 return Task::ready(Ok(buffer));
790 }
791
792 let task = match self.loading_buffers.entry(project_path.clone()) {
793 hash_map::Entry::Occupied(e) => e.get().clone(),
794 hash_map::Entry::Vacant(entry) => {
795 let path = project_path.path.clone();
796 let Some(worktree) = self
797 .worktree_store
798 .read(cx)
799 .worktree_for_id(project_path.worktree_id, cx)
800 else {
801 return Task::ready(Err(anyhow!("no such worktree")));
802 };
803 let load_buffer = match &self.state {
804 BufferStoreState::Local(this) => this.open_buffer(path, worktree, cx),
805 BufferStoreState::Remote(this) => this.open_buffer(path, worktree, cx),
806 };
807
808 entry
809 .insert(
810 cx.spawn(async move |this, cx| {
811 let load_result = load_buffer.await;
812 this.update(cx, |this, cx| {
813 // Record the fact that the buffer is no longer loading.
814 this.loading_buffers.remove(&project_path);
815
816 let buffer = load_result.map_err(Arc::new)?;
817 cx.emit(BufferStoreEvent::BufferOpened {
818 buffer: buffer.clone(),
819 project_path,
820 });
821
822 Ok(buffer)
823 })?
824 })
825 .shared(),
826 )
827 .clone()
828 }
829 };
830
831 cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
832 }
833
834 pub fn create_buffer(&mut self, cx: &mut Context<Self>) -> Task<Result<Entity<Buffer>>> {
835 match &self.state {
836 BufferStoreState::Local(this) => this.create_buffer(cx),
837 BufferStoreState::Remote(this) => this.create_buffer(cx),
838 }
839 }
840
841 pub fn save_buffer(
842 &mut self,
843 buffer: Entity<Buffer>,
844 cx: &mut Context<Self>,
845 ) -> Task<Result<()>> {
846 match &mut self.state {
847 BufferStoreState::Local(this) => this.save_buffer(buffer, cx),
848 BufferStoreState::Remote(this) => this.save_remote_buffer(buffer, None, cx),
849 }
850 }
851
852 pub fn save_buffer_as(
853 &mut self,
854 buffer: Entity<Buffer>,
855 path: ProjectPath,
856 cx: &mut Context<Self>,
857 ) -> Task<Result<()>> {
858 let old_file = buffer.read(cx).file().cloned();
859 let task = match &self.state {
860 BufferStoreState::Local(this) => this.save_buffer_as(buffer.clone(), path, cx),
861 BufferStoreState::Remote(this) => {
862 this.save_remote_buffer(buffer.clone(), Some(path.to_proto()), cx)
863 }
864 };
865 cx.spawn(async move |this, cx| {
866 task.await?;
867 this.update(cx, |_, cx| {
868 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
869 })
870 })
871 }
872
873 fn add_buffer(&mut self, buffer_entity: Entity<Buffer>, cx: &mut Context<Self>) -> Result<()> {
874 let buffer = buffer_entity.read(cx);
875 let remote_id = buffer.remote_id();
876 let path = File::from_dyn(buffer.file()).map(|file| ProjectPath {
877 path: file.path.clone(),
878 worktree_id: file.worktree_id(cx),
879 });
880 let is_remote = buffer.replica_id() != 0;
881 let open_buffer = OpenBuffer::Complete {
882 buffer: buffer_entity.downgrade(),
883 };
884
885 let handle = cx.entity().downgrade();
886 buffer_entity.update(cx, move |_, cx| {
887 cx.on_release(move |buffer, cx| {
888 handle
889 .update(cx, |_, cx| {
890 cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
891 })
892 .ok();
893 })
894 .detach()
895 });
896 let _expect_path_to_exist;
897 match self.opened_buffers.entry(remote_id) {
898 hash_map::Entry::Vacant(entry) => {
899 entry.insert(open_buffer);
900 _expect_path_to_exist = false;
901 }
902 hash_map::Entry::Occupied(mut entry) => {
903 if let OpenBuffer::Operations(operations) = entry.get_mut() {
904 buffer_entity.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
905 } else if entry.get().upgrade().is_some() {
906 if is_remote {
907 return Ok(());
908 } else {
909 debug_panic!("buffer {remote_id} was already registered");
910 anyhow::bail!("buffer {remote_id} was already registered");
911 }
912 }
913 entry.insert(open_buffer);
914 _expect_path_to_exist = true;
915 }
916 }
917
918 if let Some(path) = path {
919 self.path_to_buffer_id.insert(path, remote_id);
920 }
921
922 cx.subscribe(&buffer_entity, Self::on_buffer_event).detach();
923 cx.emit(BufferStoreEvent::BufferAdded(buffer_entity));
924 Ok(())
925 }
926
927 pub fn buffers(&self) -> impl '_ + Iterator<Item = Entity<Buffer>> {
928 self.opened_buffers
929 .values()
930 .filter_map(|buffer| buffer.upgrade())
931 }
932
933 pub fn loading_buffers(
934 &self,
935 ) -> impl Iterator<Item = (&ProjectPath, impl Future<Output = Result<Entity<Buffer>>>)> {
936 self.loading_buffers.iter().map(|(path, task)| {
937 let task = task.clone();
938 (path, async move { task.await.map_err(|e| anyhow!("{e}")) })
939 })
940 }
941
942 pub fn buffer_id_for_project_path(&self, project_path: &ProjectPath) -> Option<&BufferId> {
943 self.path_to_buffer_id.get(project_path)
944 }
945
946 pub fn get_by_path(&self, path: &ProjectPath) -> Option<Entity<Buffer>> {
947 self.path_to_buffer_id
948 .get(path)
949 .and_then(|buffer_id| self.get(*buffer_id))
950 }
951
952 pub fn get(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
953 self.opened_buffers.get(&buffer_id)?.upgrade()
954 }
955
956 pub fn get_existing(&self, buffer_id: BufferId) -> Result<Entity<Buffer>> {
957 self.get(buffer_id)
958 .with_context(|| format!("unknown buffer id {buffer_id}"))
959 }
960
961 pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
962 self.get(buffer_id).or_else(|| {
963 self.as_remote()
964 .and_then(|remote| remote.loading_remote_buffers_by_id.get(&buffer_id).cloned())
965 })
966 }
967
968 pub fn buffer_version_info(&self, cx: &App) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
969 let buffers = self
970 .buffers()
971 .map(|buffer| {
972 let buffer = buffer.read(cx);
973 proto::BufferVersion {
974 id: buffer.remote_id().into(),
975 version: language::proto::serialize_version(&buffer.version),
976 }
977 })
978 .collect();
979 let incomplete_buffer_ids = self
980 .as_remote()
981 .map(|remote| remote.incomplete_buffer_ids())
982 .unwrap_or_default();
983 (buffers, incomplete_buffer_ids)
984 }
985
986 pub fn disconnected_from_host(&mut self, cx: &mut App) {
987 for open_buffer in self.opened_buffers.values_mut() {
988 if let Some(buffer) = open_buffer.upgrade() {
989 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
990 }
991 }
992
993 for buffer in self.buffers() {
994 buffer.update(cx, |buffer, cx| {
995 buffer.set_capability(Capability::ReadOnly, cx)
996 });
997 }
998
999 if let Some(remote) = self.as_remote_mut() {
1000 // Wake up all futures currently waiting on a buffer to get opened,
1001 // to give them a chance to fail now that we've disconnected.
1002 remote.remote_buffer_listeners.clear()
1003 }
1004 }
1005
1006 pub fn shared(&mut self, remote_id: u64, downstream_client: AnyProtoClient, _cx: &mut App) {
1007 self.downstream_client = Some((downstream_client, remote_id));
1008 }
1009
1010 pub fn unshared(&mut self, _cx: &mut Context<Self>) {
1011 self.downstream_client.take();
1012 self.forget_shared_buffers();
1013 }
1014
1015 pub fn discard_incomplete(&mut self) {
1016 self.opened_buffers
1017 .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
1018 }
1019
1020 fn buffer_changed_file(&mut self, buffer: Entity<Buffer>, cx: &mut App) -> Option<()> {
1021 let file = File::from_dyn(buffer.read(cx).file())?;
1022
1023 let remote_id = buffer.read(cx).remote_id();
1024 if let Some(entry_id) = file.entry_id {
1025 if let Some(local) = self.as_local_mut() {
1026 match local.local_buffer_ids_by_entry_id.get(&entry_id) {
1027 Some(_) => {
1028 return None;
1029 }
1030 None => {
1031 local
1032 .local_buffer_ids_by_entry_id
1033 .insert(entry_id, remote_id);
1034 }
1035 }
1036 }
1037 self.path_to_buffer_id.insert(
1038 ProjectPath {
1039 worktree_id: file.worktree_id(cx),
1040 path: file.path.clone(),
1041 },
1042 remote_id,
1043 );
1044 };
1045
1046 Some(())
1047 }
1048
1049 pub fn find_search_candidates(
1050 &mut self,
1051 query: &SearchQuery,
1052 mut limit: usize,
1053 fs: Arc<dyn Fs>,
1054 cx: &mut Context<Self>,
1055 ) -> Receiver<Entity<Buffer>> {
1056 let (tx, rx) = smol::channel::unbounded();
1057 let mut open_buffers = HashSet::default();
1058 let mut unnamed_buffers = Vec::new();
1059 for handle in self.buffers() {
1060 let buffer = handle.read(cx);
1061 if self.non_searchable_buffers.contains(&buffer.remote_id()) {
1062 continue;
1063 } else if let Some(entry_id) = buffer.entry_id(cx) {
1064 open_buffers.insert(entry_id);
1065 } else {
1066 limit = limit.saturating_sub(1);
1067 unnamed_buffers.push(handle)
1068 };
1069 }
1070
1071 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
1072 let project_paths_rx = self
1073 .worktree_store
1074 .update(cx, |worktree_store, cx| {
1075 worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
1076 })
1077 .chunks(MAX_CONCURRENT_BUFFER_OPENS);
1078
1079 cx.spawn(async move |this, cx| {
1080 for buffer in unnamed_buffers {
1081 tx.send(buffer).await.ok();
1082 }
1083
1084 let mut project_paths_rx = pin!(project_paths_rx);
1085 while let Some(project_paths) = project_paths_rx.next().await {
1086 let buffers = this.update(cx, |this, cx| {
1087 project_paths
1088 .into_iter()
1089 .map(|project_path| this.open_buffer(project_path, cx))
1090 .collect::<Vec<_>>()
1091 })?;
1092 for buffer_task in buffers {
1093 if let Some(buffer) = buffer_task.await.log_err()
1094 && tx.send(buffer).await.is_err()
1095 {
1096 return anyhow::Ok(());
1097 }
1098 }
1099 }
1100 anyhow::Ok(())
1101 })
1102 .detach();
1103 rx
1104 }
1105
1106 fn on_buffer_event(
1107 &mut self,
1108 buffer: Entity<Buffer>,
1109 event: &BufferEvent,
1110 cx: &mut Context<Self>,
1111 ) {
1112 match event {
1113 BufferEvent::FileHandleChanged => {
1114 self.buffer_changed_file(buffer, cx);
1115 }
1116 BufferEvent::Reloaded => {
1117 let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
1118 return;
1119 };
1120 let buffer = buffer.read(cx);
1121 downstream_client
1122 .send(proto::BufferReloaded {
1123 project_id: *project_id,
1124 buffer_id: buffer.remote_id().to_proto(),
1125 version: serialize_version(&buffer.version()),
1126 mtime: buffer.saved_mtime().map(|t| t.into()),
1127 line_ending: serialize_line_ending(buffer.line_ending()) as i32,
1128 })
1129 .log_err();
1130 }
1131 BufferEvent::LanguageChanged => {}
1132 _ => {}
1133 }
1134 }
1135
1136 pub async fn handle_update_buffer(
1137 this: Entity<Self>,
1138 envelope: TypedEnvelope<proto::UpdateBuffer>,
1139 mut cx: AsyncApp,
1140 ) -> Result<proto::Ack> {
1141 let payload = envelope.payload;
1142 let buffer_id = BufferId::new(payload.buffer_id)?;
1143 let ops = payload
1144 .operations
1145 .into_iter()
1146 .map(language::proto::deserialize_operation)
1147 .collect::<Result<Vec<_>, _>>()?;
1148 this.update(&mut cx, |this, cx| {
1149 match this.opened_buffers.entry(buffer_id) {
1150 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
1151 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
1152 OpenBuffer::Complete { buffer, .. } => {
1153 if let Some(buffer) = buffer.upgrade() {
1154 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
1155 }
1156 }
1157 },
1158 hash_map::Entry::Vacant(e) => {
1159 e.insert(OpenBuffer::Operations(ops));
1160 }
1161 }
1162 Ok(proto::Ack {})
1163 })?
1164 }
1165
1166 pub fn register_shared_lsp_handle(
1167 &mut self,
1168 peer_id: proto::PeerId,
1169 buffer_id: BufferId,
1170 handle: OpenLspBufferHandle,
1171 ) {
1172 if let Some(shared_buffers) = self.shared_buffers.get_mut(&peer_id)
1173 && let Some(buffer) = shared_buffers.get_mut(&buffer_id)
1174 {
1175 buffer.lsp_handle = Some(handle);
1176 return;
1177 }
1178 debug_panic!("tried to register shared lsp handle, but buffer was not shared")
1179 }
1180
1181 pub fn handle_synchronize_buffers(
1182 &mut self,
1183 envelope: TypedEnvelope<proto::SynchronizeBuffers>,
1184 cx: &mut Context<Self>,
1185 client: Arc<Client>,
1186 ) -> Result<proto::SynchronizeBuffersResponse> {
1187 let project_id = envelope.payload.project_id;
1188 let mut response = proto::SynchronizeBuffersResponse {
1189 buffers: Default::default(),
1190 };
1191 let Some(guest_id) = envelope.original_sender_id else {
1192 anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
1193 };
1194
1195 self.shared_buffers.entry(guest_id).or_default().clear();
1196 for buffer in envelope.payload.buffers {
1197 let buffer_id = BufferId::new(buffer.id)?;
1198 let remote_version = language::proto::deserialize_version(&buffer.version);
1199 if let Some(buffer) = self.get(buffer_id) {
1200 self.shared_buffers
1201 .entry(guest_id)
1202 .or_default()
1203 .entry(buffer_id)
1204 .or_insert_with(|| SharedBuffer {
1205 buffer: buffer.clone(),
1206 lsp_handle: None,
1207 });
1208
1209 let buffer = buffer.read(cx);
1210 response.buffers.push(proto::BufferVersion {
1211 id: buffer_id.into(),
1212 version: language::proto::serialize_version(&buffer.version),
1213 });
1214
1215 let operations = buffer.serialize_ops(Some(remote_version), cx);
1216 let client = client.clone();
1217 if let Some(file) = buffer.file() {
1218 client
1219 .send(proto::UpdateBufferFile {
1220 project_id,
1221 buffer_id: buffer_id.into(),
1222 file: Some(file.to_proto(cx)),
1223 })
1224 .log_err();
1225 }
1226
1227 // TODO(max): do something
1228 // client
1229 // .send(proto::UpdateStagedText {
1230 // project_id,
1231 // buffer_id: buffer_id.into(),
1232 // diff_base: buffer.diff_base().map(ToString::to_string),
1233 // })
1234 // .log_err();
1235
1236 client
1237 .send(proto::BufferReloaded {
1238 project_id,
1239 buffer_id: buffer_id.into(),
1240 version: language::proto::serialize_version(buffer.saved_version()),
1241 mtime: buffer.saved_mtime().map(|time| time.into()),
1242 line_ending: language::proto::serialize_line_ending(buffer.line_ending())
1243 as i32,
1244 })
1245 .log_err();
1246
1247 cx.background_spawn(
1248 async move {
1249 let operations = operations.await;
1250 for chunk in split_operations(operations) {
1251 client
1252 .request(proto::UpdateBuffer {
1253 project_id,
1254 buffer_id: buffer_id.into(),
1255 operations: chunk,
1256 })
1257 .await?;
1258 }
1259 anyhow::Ok(())
1260 }
1261 .log_err(),
1262 )
1263 .detach();
1264 }
1265 }
1266 Ok(response)
1267 }
1268
1269 pub fn handle_create_buffer_for_peer(
1270 &mut self,
1271 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
1272 replica_id: u16,
1273 capability: Capability,
1274 cx: &mut Context<Self>,
1275 ) -> Result<()> {
1276 let remote = self
1277 .as_remote_mut()
1278 .context("buffer store is not a remote")?;
1279
1280 if let Some(buffer) =
1281 remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx)?
1282 {
1283 self.add_buffer(buffer, cx)?;
1284 }
1285
1286 Ok(())
1287 }
1288
1289 pub async fn handle_update_buffer_file(
1290 this: Entity<Self>,
1291 envelope: TypedEnvelope<proto::UpdateBufferFile>,
1292 mut cx: AsyncApp,
1293 ) -> Result<()> {
1294 let buffer_id = envelope.payload.buffer_id;
1295 let buffer_id = BufferId::new(buffer_id)?;
1296
1297 this.update(&mut cx, |this, cx| {
1298 let payload = envelope.payload.clone();
1299 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1300 let file = payload.file.context("invalid file")?;
1301 let worktree = this
1302 .worktree_store
1303 .read(cx)
1304 .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
1305 .context("no such worktree")?;
1306 let file = File::from_proto(file, worktree, cx)?;
1307 let old_file = buffer.update(cx, |buffer, cx| {
1308 let old_file = buffer.file().cloned();
1309 let new_path = file.path.clone();
1310
1311 buffer.file_updated(Arc::new(file), cx);
1312 if old_file.as_ref().is_none_or(|old| *old.path() != new_path) {
1313 Some(old_file)
1314 } else {
1315 None
1316 }
1317 });
1318 if let Some(old_file) = old_file {
1319 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1320 }
1321 }
1322 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1323 downstream_client
1324 .send(proto::UpdateBufferFile {
1325 project_id: *project_id,
1326 buffer_id: buffer_id.into(),
1327 file: envelope.payload.file,
1328 })
1329 .log_err();
1330 }
1331 Ok(())
1332 })?
1333 }
1334
1335 pub async fn handle_save_buffer(
1336 this: Entity<Self>,
1337 envelope: TypedEnvelope<proto::SaveBuffer>,
1338 mut cx: AsyncApp,
1339 ) -> Result<proto::BufferSaved> {
1340 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1341 let (buffer, project_id) = this.read_with(&cx, |this, _| {
1342 anyhow::Ok((
1343 this.get_existing(buffer_id)?,
1344 this.downstream_client
1345 .as_ref()
1346 .map(|(_, project_id)| *project_id)
1347 .context("project is not shared")?,
1348 ))
1349 })??;
1350 buffer
1351 .update(&mut cx, |buffer, _| {
1352 buffer.wait_for_version(deserialize_version(&envelope.payload.version))
1353 })?
1354 .await?;
1355 let buffer_id = buffer.read_with(&cx, |buffer, _| buffer.remote_id())?;
1356
1357 if let Some(new_path) = envelope.payload.new_path {
1358 let new_path = ProjectPath::from_proto(new_path);
1359 this.update(&mut cx, |this, cx| {
1360 this.save_buffer_as(buffer.clone(), new_path, cx)
1361 })?
1362 .await?;
1363 } else {
1364 this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
1365 .await?;
1366 }
1367
1368 buffer.read_with(&cx, |buffer, _| proto::BufferSaved {
1369 project_id,
1370 buffer_id: buffer_id.into(),
1371 version: serialize_version(buffer.saved_version()),
1372 mtime: buffer.saved_mtime().map(|time| time.into()),
1373 })
1374 }
1375
1376 pub async fn handle_close_buffer(
1377 this: Entity<Self>,
1378 envelope: TypedEnvelope<proto::CloseBuffer>,
1379 mut cx: AsyncApp,
1380 ) -> Result<()> {
1381 let peer_id = envelope.sender_id;
1382 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1383 this.update(&mut cx, |this, cx| {
1384 if let Some(shared) = this.shared_buffers.get_mut(&peer_id)
1385 && shared.remove(&buffer_id).is_some()
1386 {
1387 cx.emit(BufferStoreEvent::SharedBufferClosed(peer_id, buffer_id));
1388 if shared.is_empty() {
1389 this.shared_buffers.remove(&peer_id);
1390 }
1391 return;
1392 }
1393 debug_panic!(
1394 "peer_id {} closed buffer_id {} which was either not open or already closed",
1395 peer_id,
1396 buffer_id
1397 )
1398 })
1399 }
1400
1401 pub async fn handle_buffer_saved(
1402 this: Entity<Self>,
1403 envelope: TypedEnvelope<proto::BufferSaved>,
1404 mut cx: AsyncApp,
1405 ) -> Result<()> {
1406 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1407 let version = deserialize_version(&envelope.payload.version);
1408 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1409 this.update(&mut cx, move |this, cx| {
1410 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1411 buffer.update(cx, |buffer, cx| {
1412 buffer.did_save(version, mtime, cx);
1413 });
1414 }
1415
1416 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1417 downstream_client
1418 .send(proto::BufferSaved {
1419 project_id: *project_id,
1420 buffer_id: buffer_id.into(),
1421 mtime: envelope.payload.mtime,
1422 version: envelope.payload.version,
1423 })
1424 .log_err();
1425 }
1426 })
1427 }
1428
1429 pub async fn handle_buffer_reloaded(
1430 this: Entity<Self>,
1431 envelope: TypedEnvelope<proto::BufferReloaded>,
1432 mut cx: AsyncApp,
1433 ) -> Result<()> {
1434 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1435 let version = deserialize_version(&envelope.payload.version);
1436 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1437 let line_ending = deserialize_line_ending(
1438 proto::LineEnding::from_i32(envelope.payload.line_ending)
1439 .context("missing line ending")?,
1440 );
1441 this.update(&mut cx, |this, cx| {
1442 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1443 buffer.update(cx, |buffer, cx| {
1444 buffer.did_reload(version, line_ending, mtime, cx);
1445 });
1446 }
1447
1448 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1449 downstream_client
1450 .send(proto::BufferReloaded {
1451 project_id: *project_id,
1452 buffer_id: buffer_id.into(),
1453 mtime: envelope.payload.mtime,
1454 version: envelope.payload.version,
1455 line_ending: envelope.payload.line_ending,
1456 })
1457 .log_err();
1458 }
1459 })
1460 }
1461
1462 pub fn reload_buffers(
1463 &self,
1464 buffers: HashSet<Entity<Buffer>>,
1465 push_to_history: bool,
1466 cx: &mut Context<Self>,
1467 ) -> Task<Result<ProjectTransaction>> {
1468 if buffers.is_empty() {
1469 return Task::ready(Ok(ProjectTransaction::default()));
1470 }
1471 match &self.state {
1472 BufferStoreState::Local(this) => this.reload_buffers(buffers, push_to_history, cx),
1473 BufferStoreState::Remote(this) => this.reload_buffers(buffers, push_to_history, cx),
1474 }
1475 }
1476
1477 async fn handle_reload_buffers(
1478 this: Entity<Self>,
1479 envelope: TypedEnvelope<proto::ReloadBuffers>,
1480 mut cx: AsyncApp,
1481 ) -> Result<proto::ReloadBuffersResponse> {
1482 let sender_id = envelope.original_sender_id().unwrap_or_default();
1483 let reload = this.update(&mut cx, |this, cx| {
1484 let mut buffers = HashSet::default();
1485 for buffer_id in &envelope.payload.buffer_ids {
1486 let buffer_id = BufferId::new(*buffer_id)?;
1487 buffers.insert(this.get_existing(buffer_id)?);
1488 }
1489 anyhow::Ok(this.reload_buffers(buffers, false, cx))
1490 })??;
1491
1492 let project_transaction = reload.await?;
1493 let project_transaction = this.update(&mut cx, |this, cx| {
1494 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
1495 })?;
1496 Ok(proto::ReloadBuffersResponse {
1497 transaction: Some(project_transaction),
1498 })
1499 }
1500
1501 pub fn create_buffer_for_peer(
1502 &mut self,
1503 buffer: &Entity<Buffer>,
1504 peer_id: proto::PeerId,
1505 cx: &mut Context<Self>,
1506 ) -> Task<Result<()>> {
1507 let buffer_id = buffer.read(cx).remote_id();
1508 let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
1509 if shared_buffers.contains_key(&buffer_id) {
1510 return Task::ready(Ok(()));
1511 }
1512 shared_buffers.insert(
1513 buffer_id,
1514 SharedBuffer {
1515 buffer: buffer.clone(),
1516 lsp_handle: None,
1517 },
1518 );
1519
1520 let Some((client, project_id)) = self.downstream_client.clone() else {
1521 return Task::ready(Ok(()));
1522 };
1523
1524 cx.spawn(async move |this, cx| {
1525 let Some(buffer) = this.read_with(cx, |this, _| this.get(buffer_id))? else {
1526 return anyhow::Ok(());
1527 };
1528
1529 let operations = buffer.update(cx, |b, cx| b.serialize_ops(None, cx))?;
1530 let operations = operations.await;
1531 let state = buffer.update(cx, |buffer, cx| buffer.to_proto(cx))?;
1532
1533 let initial_state = proto::CreateBufferForPeer {
1534 project_id,
1535 peer_id: Some(peer_id),
1536 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
1537 };
1538
1539 if client.send(initial_state).log_err().is_some() {
1540 let client = client.clone();
1541 cx.background_spawn(async move {
1542 let mut chunks = split_operations(operations).peekable();
1543 while let Some(chunk) = chunks.next() {
1544 let is_last = chunks.peek().is_none();
1545 client.send(proto::CreateBufferForPeer {
1546 project_id,
1547 peer_id: Some(peer_id),
1548 variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
1549 proto::BufferChunk {
1550 buffer_id: buffer_id.into(),
1551 operations: chunk,
1552 is_last,
1553 },
1554 )),
1555 })?;
1556 }
1557 anyhow::Ok(())
1558 })
1559 .await
1560 .log_err();
1561 }
1562 Ok(())
1563 })
1564 }
1565
1566 pub fn forget_shared_buffers(&mut self) {
1567 self.shared_buffers.clear();
1568 }
1569
1570 pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
1571 self.shared_buffers.remove(peer_id);
1572 }
1573
1574 pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
1575 if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
1576 self.shared_buffers.insert(new_peer_id, buffers);
1577 }
1578 }
1579
1580 pub fn has_shared_buffers(&self) -> bool {
1581 !self.shared_buffers.is_empty()
1582 }
1583
1584 pub fn create_local_buffer(
1585 &mut self,
1586 text: &str,
1587 language: Option<Arc<Language>>,
1588 cx: &mut Context<Self>,
1589 ) -> Entity<Buffer> {
1590 let buffer = cx.new(|cx| {
1591 Buffer::local(text, cx)
1592 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
1593 });
1594
1595 self.add_buffer(buffer.clone(), cx).log_err();
1596 let buffer_id = buffer.read(cx).remote_id();
1597
1598 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
1599 self.path_to_buffer_id.insert(
1600 ProjectPath {
1601 worktree_id: file.worktree_id(cx),
1602 path: file.path.clone(),
1603 },
1604 buffer_id,
1605 );
1606 let this = self
1607 .as_local_mut()
1608 .expect("local-only method called in a non-local context");
1609 if let Some(entry_id) = file.entry_id {
1610 this.local_buffer_ids_by_entry_id
1611 .insert(entry_id, buffer_id);
1612 }
1613 }
1614 buffer
1615 }
1616
1617 pub fn deserialize_project_transaction(
1618 &mut self,
1619 message: proto::ProjectTransaction,
1620 push_to_history: bool,
1621 cx: &mut Context<Self>,
1622 ) -> Task<Result<ProjectTransaction>> {
1623 if let Some(this) = self.as_remote_mut() {
1624 this.deserialize_project_transaction(message, push_to_history, cx)
1625 } else {
1626 debug_panic!("not a remote buffer store");
1627 Task::ready(Err(anyhow!("not a remote buffer store")))
1628 }
1629 }
1630
1631 pub fn wait_for_remote_buffer(
1632 &mut self,
1633 id: BufferId,
1634 cx: &mut Context<BufferStore>,
1635 ) -> Task<Result<Entity<Buffer>>> {
1636 if let Some(this) = self.as_remote_mut() {
1637 this.wait_for_remote_buffer(id, cx)
1638 } else {
1639 debug_panic!("not a remote buffer store");
1640 Task::ready(Err(anyhow!("not a remote buffer store")))
1641 }
1642 }
1643
1644 pub fn serialize_project_transaction_for_peer(
1645 &mut self,
1646 project_transaction: ProjectTransaction,
1647 peer_id: proto::PeerId,
1648 cx: &mut Context<Self>,
1649 ) -> proto::ProjectTransaction {
1650 let mut serialized_transaction = proto::ProjectTransaction {
1651 buffer_ids: Default::default(),
1652 transactions: Default::default(),
1653 };
1654 for (buffer, transaction) in project_transaction.0 {
1655 self.create_buffer_for_peer(&buffer, peer_id, cx)
1656 .detach_and_log_err(cx);
1657 serialized_transaction
1658 .buffer_ids
1659 .push(buffer.read(cx).remote_id().into());
1660 serialized_transaction
1661 .transactions
1662 .push(language::proto::serialize_transaction(&transaction));
1663 }
1664 serialized_transaction
1665 }
1666
1667 pub(crate) fn mark_buffer_as_non_searchable(&mut self, buffer_id: BufferId) {
1668 self.non_searchable_buffers.insert(buffer_id);
1669 }
1670}
1671
1672impl OpenBuffer {
1673 fn upgrade(&self) -> Option<Entity<Buffer>> {
1674 match self {
1675 OpenBuffer::Complete { buffer, .. } => buffer.upgrade(),
1676 OpenBuffer::Operations(_) => None,
1677 }
1678 }
1679}
1680
1681fn is_not_found_error(error: &anyhow::Error) -> bool {
1682 error
1683 .root_cause()
1684 .downcast_ref::<io::Error>()
1685 .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
1686}