1use crate::{
2 ProjectPath,
3 lsp_store::OpenLspBufferHandle,
4 worktree_store::{WorktreeStore, WorktreeStoreEvent},
5};
6use anyhow::{Context as _, Result, anyhow};
7use client::Client;
8use collections::{HashMap, HashSet, hash_map};
9use futures::{Future, FutureExt as _, channel::oneshot, future::Shared};
10use gpui::{
11 App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity,
12};
13use language::{
14 Buffer, BufferEvent, Capability, DiskState, File as _, Language, Operation,
15 proto::{
16 deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
17 split_operations,
18 },
19};
20use rpc::{
21 AnyProtoClient, ErrorCode, ErrorExt as _, TypedEnvelope,
22 proto::{self, PeerId},
23};
24
25use settings::Settings;
26use std::{io, sync::Arc, time::Instant};
27use text::{BufferId, ReplicaId};
28use util::{ResultExt as _, TryFutureExt, debug_panic, maybe, rel_path::RelPath};
29use worktree::{File, PathChange, ProjectEntryId, Worktree, WorktreeId, WorktreeSettings};
30
31/// A set of open buffers.
32pub struct BufferStore {
33 state: BufferStoreState,
34 #[allow(clippy::type_complexity)]
35 loading_buffers: HashMap<ProjectPath, Shared<Task<Result<Entity<Buffer>, Arc<anyhow::Error>>>>>,
36 worktree_store: Entity<WorktreeStore>,
37 opened_buffers: HashMap<BufferId, OpenBuffer>,
38 path_to_buffer_id: HashMap<ProjectPath, BufferId>,
39 downstream_client: Option<(AnyProtoClient, u64)>,
40 shared_buffers: HashMap<proto::PeerId, HashMap<BufferId, SharedBuffer>>,
41 non_searchable_buffers: HashSet<BufferId>,
42 project_search: RemoteProjectSearchState,
43}
44
45#[derive(Default)]
46struct RemoteProjectSearchState {
47 // List of ongoing project search chunks from our remote host. Used by the side issuing a search RPC request.
48 chunks: HashMap<u64, smol::channel::Sender<BufferId>>,
49 // Monotonously-increasing handle to hand out to remote host in order to identify the project search result chunk.
50 next_id: u64,
51 // Used by the side running the actual search for match candidates to potentially cancel the search prematurely.
52 searches_in_progress: HashMap<(PeerId, u64), Task<Result<()>>>,
53}
54
55#[derive(Hash, Eq, PartialEq, Clone)]
56struct SharedBuffer {
57 buffer: Entity<Buffer>,
58 lsp_handle: Option<OpenLspBufferHandle>,
59}
60
61enum BufferStoreState {
62 Local(LocalBufferStore),
63 Remote(RemoteBufferStore),
64}
65
66struct RemoteBufferStore {
67 shared_with_me: HashSet<Entity<Buffer>>,
68 upstream_client: AnyProtoClient,
69 project_id: u64,
70 loading_remote_buffers_by_id: HashMap<BufferId, Entity<Buffer>>,
71 remote_buffer_listeners:
72 HashMap<BufferId, Vec<oneshot::Sender<anyhow::Result<Entity<Buffer>>>>>,
73 worktree_store: Entity<WorktreeStore>,
74}
75
76struct LocalBufferStore {
77 local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
78 worktree_store: Entity<WorktreeStore>,
79 _subscription: Subscription,
80}
81
82enum OpenBuffer {
83 Complete { buffer: WeakEntity<Buffer> },
84 Operations(Vec<Operation>),
85}
86
87pub enum BufferStoreEvent {
88 BufferAdded(Entity<Buffer>),
89 BufferOpened {
90 buffer: Entity<Buffer>,
91 project_path: ProjectPath,
92 },
93 SharedBufferClosed(proto::PeerId, BufferId),
94 BufferDropped(BufferId),
95 BufferChangedFilePath {
96 buffer: Entity<Buffer>,
97 old_file: Option<Arc<dyn language::File>>,
98 },
99}
100
101#[derive(Default, Debug, Clone)]
102pub struct ProjectTransaction(pub HashMap<Entity<Buffer>, language::Transaction>);
103
104impl PartialEq for ProjectTransaction {
105 fn eq(&self, other: &Self) -> bool {
106 self.0.len() == other.0.len()
107 && self.0.iter().all(|(buffer, transaction)| {
108 other.0.get(buffer).is_some_and(|t| t.id == transaction.id)
109 })
110 }
111}
112
113impl EventEmitter<BufferStoreEvent> for BufferStore {}
114
115impl RemoteBufferStore {
116 pub fn wait_for_remote_buffer(
117 &mut self,
118 id: BufferId,
119 cx: &mut Context<BufferStore>,
120 ) -> Task<Result<Entity<Buffer>>> {
121 let (tx, rx) = oneshot::channel();
122 self.remote_buffer_listeners.entry(id).or_default().push(tx);
123
124 cx.spawn(async move |this, cx| {
125 if let Some(buffer) = this
126 .read_with(cx, |buffer_store, _| buffer_store.get(id))
127 .ok()
128 .flatten()
129 {
130 return Ok(buffer);
131 }
132
133 cx.background_spawn(async move { rx.await? }).await
134 })
135 }
136
137 fn save_remote_buffer(
138 &self,
139 buffer_handle: Entity<Buffer>,
140 new_path: Option<proto::ProjectPath>,
141 cx: &Context<BufferStore>,
142 ) -> Task<Result<()>> {
143 let buffer = buffer_handle.read(cx);
144 let buffer_id = buffer.remote_id().into();
145 let version = buffer.version();
146 let rpc = self.upstream_client.clone();
147 let project_id = self.project_id;
148 cx.spawn(async move |_, cx| {
149 let response = rpc
150 .request(proto::SaveBuffer {
151 project_id,
152 buffer_id,
153 new_path,
154 version: serialize_version(&version),
155 })
156 .await?;
157 let version = deserialize_version(&response.version);
158 let mtime = response.mtime.map(|mtime| mtime.into());
159
160 buffer_handle.update(cx, |buffer, cx| {
161 buffer.did_save(version.clone(), mtime, cx);
162 });
163
164 Ok(())
165 })
166 }
167
168 pub fn handle_create_buffer_for_peer(
169 &mut self,
170 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
171 replica_id: ReplicaId,
172 capability: Capability,
173 cx: &mut Context<BufferStore>,
174 ) -> Result<Option<Entity<Buffer>>> {
175 match envelope.payload.variant.context("missing variant")? {
176 proto::create_buffer_for_peer::Variant::State(mut state) => {
177 let buffer_id = BufferId::new(state.id)?;
178
179 let buffer_result = maybe!({
180 let mut buffer_file = None;
181 if let Some(file) = state.file.take() {
182 let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
183 let worktree = self
184 .worktree_store
185 .read(cx)
186 .worktree_for_id(worktree_id, cx)
187 .with_context(|| {
188 format!("no worktree found for id {}", file.worktree_id)
189 })?;
190 buffer_file = Some(Arc::new(File::from_proto(file, worktree, cx)?)
191 as Arc<dyn language::File>);
192 }
193 Buffer::from_proto(replica_id, capability, state, buffer_file)
194 });
195
196 match buffer_result {
197 Ok(buffer) => {
198 let buffer = cx.new(|_| buffer);
199 self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
200 }
201 Err(error) => {
202 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
203 for listener in listeners {
204 listener.send(Err(anyhow!(error.cloned()))).ok();
205 }
206 }
207 }
208 }
209 }
210 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
211 let buffer_id = BufferId::new(chunk.buffer_id)?;
212 let buffer = self
213 .loading_remote_buffers_by_id
214 .get(&buffer_id)
215 .cloned()
216 .with_context(|| {
217 format!(
218 "received chunk for buffer {} without initial state",
219 chunk.buffer_id
220 )
221 })?;
222
223 let result = maybe!({
224 let operations = chunk
225 .operations
226 .into_iter()
227 .map(language::proto::deserialize_operation)
228 .collect::<Result<Vec<_>>>()?;
229 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
230 anyhow::Ok(())
231 });
232
233 if let Err(error) = result {
234 self.loading_remote_buffers_by_id.remove(&buffer_id);
235 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
236 for listener in listeners {
237 listener.send(Err(error.cloned())).ok();
238 }
239 }
240 } else if chunk.is_last {
241 self.loading_remote_buffers_by_id.remove(&buffer_id);
242 if self.upstream_client.is_via_collab() {
243 // retain buffers sent by peers to avoid races.
244 self.shared_with_me.insert(buffer.clone());
245 }
246
247 if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) {
248 for sender in senders {
249 sender.send(Ok(buffer.clone())).ok();
250 }
251 }
252 return Ok(Some(buffer));
253 }
254 }
255 }
256 Ok(None)
257 }
258
259 pub fn incomplete_buffer_ids(&self) -> Vec<BufferId> {
260 self.loading_remote_buffers_by_id
261 .keys()
262 .copied()
263 .collect::<Vec<_>>()
264 }
265
266 pub fn deserialize_project_transaction(
267 &self,
268 message: proto::ProjectTransaction,
269 push_to_history: bool,
270 cx: &mut Context<BufferStore>,
271 ) -> Task<Result<ProjectTransaction>> {
272 cx.spawn(async move |this, cx| {
273 let mut project_transaction = ProjectTransaction::default();
274 for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
275 {
276 let buffer_id = BufferId::new(buffer_id)?;
277 let buffer = this
278 .update(cx, |this, cx| this.wait_for_remote_buffer(buffer_id, cx))?
279 .await?;
280 let transaction = language::proto::deserialize_transaction(transaction)?;
281 project_transaction.0.insert(buffer, transaction);
282 }
283
284 for (buffer, transaction) in &project_transaction.0 {
285 buffer
286 .update(cx, |buffer, _| {
287 buffer.wait_for_edits(transaction.edit_ids.iter().copied())
288 })
289 .await?;
290
291 if push_to_history {
292 buffer.update(cx, |buffer, _| {
293 buffer.push_transaction(transaction.clone(), Instant::now());
294 buffer.finalize_last_transaction();
295 });
296 }
297 }
298
299 Ok(project_transaction)
300 })
301 }
302
303 fn open_buffer(
304 &self,
305 path: Arc<RelPath>,
306 worktree: Entity<Worktree>,
307 cx: &mut Context<BufferStore>,
308 ) -> Task<Result<Entity<Buffer>>> {
309 let worktree_id = worktree.read(cx).id().to_proto();
310 let project_id = self.project_id;
311 let client = self.upstream_client.clone();
312 cx.spawn(async move |this, cx| {
313 let response = client
314 .request(proto::OpenBufferByPath {
315 project_id,
316 worktree_id,
317 path: path.to_proto(),
318 })
319 .await?;
320 let buffer_id = BufferId::new(response.buffer_id)?;
321
322 let buffer = this
323 .update(cx, {
324 |this, cx| this.wait_for_remote_buffer(buffer_id, cx)
325 })?
326 .await?;
327
328 Ok(buffer)
329 })
330 }
331
332 fn create_buffer(
333 &self,
334 language: Option<Arc<Language>>,
335 project_searchable: bool,
336 cx: &mut Context<BufferStore>,
337 ) -> Task<Result<Entity<Buffer>>> {
338 let create = self.upstream_client.request(proto::OpenNewBuffer {
339 project_id: self.project_id,
340 });
341 cx.spawn(async move |this, cx| {
342 let response = create.await?;
343 let buffer_id = BufferId::new(response.buffer_id)?;
344
345 let buffer = this
346 .update(cx, |this, cx| {
347 if !project_searchable {
348 this.non_searchable_buffers.insert(buffer_id);
349 }
350 this.wait_for_remote_buffer(buffer_id, cx)
351 })?
352 .await?;
353 if let Some(language) = language {
354 buffer.update(cx, |buffer, cx| {
355 buffer.set_language(Some(language), cx);
356 });
357 }
358 Ok(buffer)
359 })
360 }
361
362 fn reload_buffers(
363 &self,
364 buffers: HashSet<Entity<Buffer>>,
365 push_to_history: bool,
366 cx: &mut Context<BufferStore>,
367 ) -> Task<Result<ProjectTransaction>> {
368 let request = self.upstream_client.request(proto::ReloadBuffers {
369 project_id: self.project_id,
370 buffer_ids: buffers
371 .iter()
372 .map(|buffer| buffer.read(cx).remote_id().to_proto())
373 .collect(),
374 });
375
376 cx.spawn(async move |this, cx| {
377 let response = request.await?.transaction.context("missing transaction")?;
378 this.update(cx, |this, cx| {
379 this.deserialize_project_transaction(response, push_to_history, cx)
380 })?
381 .await
382 })
383 }
384}
385
386impl LocalBufferStore {
387 fn save_local_buffer(
388 &self,
389 buffer_handle: Entity<Buffer>,
390 worktree: Entity<Worktree>,
391 path: Arc<RelPath>,
392 mut has_changed_file: bool,
393 cx: &mut Context<BufferStore>,
394 ) -> Task<Result<()>> {
395 let buffer = buffer_handle.read(cx);
396
397 let text = buffer.as_rope().clone();
398 let line_ending = buffer.line_ending();
399 let encoding = buffer.encoding();
400 let has_bom = buffer.has_bom();
401 let version = buffer.version();
402 let buffer_id = buffer.remote_id();
403 let file = buffer.file().cloned();
404 if file
405 .as_ref()
406 .is_some_and(|file| file.disk_state() == DiskState::New)
407 {
408 has_changed_file = true;
409 }
410
411 let save = worktree.update(cx, |worktree, cx| {
412 worktree.write_file(path, text, line_ending, encoding, has_bom, cx)
413 });
414
415 cx.spawn(async move |this, cx| {
416 let new_file = save.await?;
417 let mtime = new_file.disk_state().mtime();
418 this.update(cx, |this, cx| {
419 if let Some((downstream_client, project_id)) = this.downstream_client.clone() {
420 if has_changed_file {
421 downstream_client
422 .send(proto::UpdateBufferFile {
423 project_id,
424 buffer_id: buffer_id.to_proto(),
425 file: Some(language::File::to_proto(&*new_file, cx)),
426 })
427 .log_err();
428 }
429 downstream_client
430 .send(proto::BufferSaved {
431 project_id,
432 buffer_id: buffer_id.to_proto(),
433 version: serialize_version(&version),
434 mtime: mtime.map(|time| time.into()),
435 })
436 .log_err();
437 }
438 })?;
439 buffer_handle.update(cx, |buffer, cx| {
440 if has_changed_file {
441 buffer.file_updated(new_file, cx);
442 }
443 buffer.did_save(version.clone(), mtime, cx);
444 });
445 Ok(())
446 })
447 }
448
449 fn subscribe_to_worktree(
450 &mut self,
451 worktree: &Entity<Worktree>,
452 cx: &mut Context<BufferStore>,
453 ) {
454 cx.subscribe(worktree, |this, worktree, event, cx| {
455 if worktree.read(cx).is_local()
456 && let worktree::Event::UpdatedEntries(changes) = event
457 {
458 Self::local_worktree_entries_changed(this, &worktree, changes, cx);
459 }
460 })
461 .detach();
462 }
463
464 fn local_worktree_entries_changed(
465 this: &mut BufferStore,
466 worktree_handle: &Entity<Worktree>,
467 changes: &[(Arc<RelPath>, ProjectEntryId, PathChange)],
468 cx: &mut Context<BufferStore>,
469 ) {
470 let snapshot = worktree_handle.read(cx).snapshot();
471 for (path, entry_id, _) in changes {
472 Self::local_worktree_entry_changed(
473 this,
474 *entry_id,
475 path,
476 worktree_handle,
477 &snapshot,
478 cx,
479 );
480 }
481 }
482
483 fn local_worktree_entry_changed(
484 this: &mut BufferStore,
485 entry_id: ProjectEntryId,
486 path: &Arc<RelPath>,
487 worktree: &Entity<worktree::Worktree>,
488 snapshot: &worktree::Snapshot,
489 cx: &mut Context<BufferStore>,
490 ) -> Option<()> {
491 let project_path = ProjectPath {
492 worktree_id: snapshot.id(),
493 path: path.clone(),
494 };
495
496 let buffer_id = this
497 .as_local_mut()
498 .and_then(|local| local.local_buffer_ids_by_entry_id.get(&entry_id))
499 .copied()
500 .or_else(|| this.path_to_buffer_id.get(&project_path).copied())?;
501
502 let buffer = if let Some(buffer) = this.get(buffer_id) {
503 Some(buffer)
504 } else {
505 this.opened_buffers.remove(&buffer_id);
506 this.non_searchable_buffers.remove(&buffer_id);
507 None
508 };
509
510 let buffer = if let Some(buffer) = buffer {
511 buffer
512 } else {
513 this.path_to_buffer_id.remove(&project_path);
514 let this = this.as_local_mut()?;
515 this.local_buffer_ids_by_entry_id.remove(&entry_id);
516 return None;
517 };
518
519 let events = buffer.update(cx, |buffer, cx| {
520 let file = buffer.file()?;
521 let old_file = File::from_dyn(Some(file))?;
522 if old_file.worktree != *worktree {
523 return None;
524 }
525
526 let snapshot_entry = old_file
527 .entry_id
528 .and_then(|entry_id| snapshot.entry_for_id(entry_id))
529 .or_else(|| snapshot.entry_for_path(old_file.path.as_ref()));
530
531 let new_file = if let Some(entry) = snapshot_entry {
532 File {
533 disk_state: match entry.mtime {
534 Some(mtime) => DiskState::Present { mtime },
535 None => old_file.disk_state,
536 },
537 is_local: true,
538 entry_id: Some(entry.id),
539 path: entry.path.clone(),
540 worktree: worktree.clone(),
541 is_private: entry.is_private,
542 }
543 } else {
544 File {
545 disk_state: DiskState::Deleted,
546 is_local: true,
547 entry_id: old_file.entry_id,
548 path: old_file.path.clone(),
549 worktree: worktree.clone(),
550 is_private: old_file.is_private,
551 }
552 };
553
554 if new_file == *old_file {
555 return None;
556 }
557
558 let mut events = Vec::new();
559 if new_file.path != old_file.path {
560 this.path_to_buffer_id.remove(&ProjectPath {
561 path: old_file.path.clone(),
562 worktree_id: old_file.worktree_id(cx),
563 });
564 this.path_to_buffer_id.insert(
565 ProjectPath {
566 worktree_id: new_file.worktree_id(cx),
567 path: new_file.path.clone(),
568 },
569 buffer_id,
570 );
571 events.push(BufferStoreEvent::BufferChangedFilePath {
572 buffer: cx.entity(),
573 old_file: buffer.file().cloned(),
574 });
575 }
576 let local = this.as_local_mut()?;
577 if new_file.entry_id != old_file.entry_id {
578 if let Some(entry_id) = old_file.entry_id {
579 local.local_buffer_ids_by_entry_id.remove(&entry_id);
580 }
581 if let Some(entry_id) = new_file.entry_id {
582 local
583 .local_buffer_ids_by_entry_id
584 .insert(entry_id, buffer_id);
585 }
586 }
587
588 if let Some((client, project_id)) = &this.downstream_client {
589 client
590 .send(proto::UpdateBufferFile {
591 project_id: *project_id,
592 buffer_id: buffer_id.to_proto(),
593 file: Some(new_file.to_proto(cx)),
594 })
595 .ok();
596 }
597
598 buffer.file_updated(Arc::new(new_file), cx);
599 Some(events)
600 })?;
601
602 for event in events {
603 cx.emit(event);
604 }
605
606 None
607 }
608
609 fn save_buffer(
610 &self,
611 buffer: Entity<Buffer>,
612 cx: &mut Context<BufferStore>,
613 ) -> Task<Result<()>> {
614 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
615 return Task::ready(Err(anyhow!("buffer doesn't have a file")));
616 };
617 let worktree = file.worktree.clone();
618 self.save_local_buffer(buffer, worktree, file.path.clone(), false, cx)
619 }
620
621 fn save_buffer_as(
622 &self,
623 buffer: Entity<Buffer>,
624 path: ProjectPath,
625 cx: &mut Context<BufferStore>,
626 ) -> Task<Result<()>> {
627 let Some(worktree) = self
628 .worktree_store
629 .read(cx)
630 .worktree_for_id(path.worktree_id, cx)
631 else {
632 return Task::ready(Err(anyhow!("no such worktree")));
633 };
634 self.save_local_buffer(buffer, worktree, path.path, true, cx)
635 }
636
637 fn open_buffer(
638 &self,
639 path: Arc<RelPath>,
640 worktree: Entity<Worktree>,
641 cx: &mut Context<BufferStore>,
642 ) -> Task<Result<Entity<Buffer>>> {
643 let load_file = worktree.update(cx, |worktree, cx| worktree.load_file(path.as_ref(), cx));
644 cx.spawn(async move |this, cx| {
645 let path = path.clone();
646 let buffer = match load_file.await {
647 Ok(loaded) => {
648 let reservation = cx.reserve_entity::<Buffer>();
649 let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
650 let text_buffer = cx
651 .background_spawn(async move {
652 text::Buffer::new(ReplicaId::LOCAL, buffer_id, loaded.text)
653 })
654 .await;
655 cx.insert_entity(reservation, |_| {
656 let mut buffer =
657 Buffer::build(text_buffer, Some(loaded.file), Capability::ReadWrite);
658 buffer.set_encoding(loaded.encoding);
659 buffer.set_has_bom(loaded.has_bom);
660 buffer
661 })
662 }
663 Err(error) if is_not_found_error(&error) => cx.new(|cx| {
664 let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
665 let text_buffer = text::Buffer::new(ReplicaId::LOCAL, buffer_id, "");
666 Buffer::build(
667 text_buffer,
668 Some(Arc::new(File {
669 worktree,
670 path,
671 disk_state: DiskState::New,
672 entry_id: None,
673 is_local: true,
674 is_private: false,
675 })),
676 Capability::ReadWrite,
677 )
678 }),
679 Err(e) => return Err(e),
680 };
681 this.update(cx, |this, cx| {
682 this.add_buffer(buffer.clone(), cx)?;
683 let buffer_id = buffer.read(cx).remote_id();
684 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
685 let project_path = ProjectPath {
686 worktree_id: file.worktree_id(cx),
687 path: file.path.clone(),
688 };
689 let entry_id = file.entry_id;
690
691 // Check if the file should be read-only based on settings
692 let settings = WorktreeSettings::get(Some((&project_path).into()), cx);
693 let is_read_only = if project_path.path.is_empty() {
694 settings.is_std_path_read_only(&file.full_path(cx))
695 } else {
696 settings.is_path_read_only(&project_path.path)
697 };
698 if is_read_only {
699 buffer.update(cx, |buffer, cx| {
700 buffer.set_capability(Capability::Read, cx);
701 });
702 }
703
704 this.path_to_buffer_id.insert(project_path, buffer_id);
705 let this = this.as_local_mut().unwrap();
706 if let Some(entry_id) = entry_id {
707 this.local_buffer_ids_by_entry_id
708 .insert(entry_id, buffer_id);
709 }
710 }
711
712 anyhow::Ok(())
713 })??;
714
715 Ok(buffer)
716 })
717 }
718
719 fn create_buffer(
720 &self,
721 language: Option<Arc<Language>>,
722 project_searchable: bool,
723 cx: &mut Context<BufferStore>,
724 ) -> Task<Result<Entity<Buffer>>> {
725 cx.spawn(async move |buffer_store, cx| {
726 let buffer = cx.new(|cx| {
727 Buffer::local("", cx)
728 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
729 });
730 buffer_store.update(cx, |buffer_store, cx| {
731 buffer_store.add_buffer(buffer.clone(), cx).log_err();
732 if !project_searchable {
733 buffer_store
734 .non_searchable_buffers
735 .insert(buffer.read(cx).remote_id());
736 }
737 })?;
738 Ok(buffer)
739 })
740 }
741
742 fn reload_buffers(
743 &self,
744 buffers: HashSet<Entity<Buffer>>,
745 push_to_history: bool,
746 cx: &mut Context<BufferStore>,
747 ) -> Task<Result<ProjectTransaction>> {
748 cx.spawn(async move |_, cx| {
749 let mut project_transaction = ProjectTransaction::default();
750 for buffer in buffers {
751 let transaction = buffer.update(cx, |buffer, cx| buffer.reload(cx)).await?;
752 buffer.update(cx, |buffer, cx| {
753 if let Some(transaction) = transaction {
754 if !push_to_history {
755 buffer.forget_transaction(transaction.id);
756 }
757 project_transaction.0.insert(cx.entity(), transaction);
758 }
759 });
760 }
761
762 Ok(project_transaction)
763 })
764 }
765}
766
767impl BufferStore {
768 pub fn init(client: &AnyProtoClient) {
769 client.add_entity_message_handler(Self::handle_buffer_reloaded);
770 client.add_entity_message_handler(Self::handle_buffer_saved);
771 client.add_entity_message_handler(Self::handle_update_buffer_file);
772 client.add_entity_request_handler(Self::handle_save_buffer);
773 client.add_entity_request_handler(Self::handle_reload_buffers);
774 }
775
776 /// Creates a buffer store, optionally retaining its buffers.
777 pub fn local(worktree_store: Entity<WorktreeStore>, cx: &mut Context<Self>) -> Self {
778 Self {
779 state: BufferStoreState::Local(LocalBufferStore {
780 local_buffer_ids_by_entry_id: Default::default(),
781 worktree_store: worktree_store.clone(),
782 _subscription: cx.subscribe(&worktree_store, |this, _, event, cx| {
783 if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
784 let this = this.as_local_mut().unwrap();
785 this.subscribe_to_worktree(worktree, cx);
786 }
787 }),
788 }),
789 downstream_client: None,
790 opened_buffers: Default::default(),
791 path_to_buffer_id: Default::default(),
792 shared_buffers: Default::default(),
793 loading_buffers: Default::default(),
794 non_searchable_buffers: Default::default(),
795 worktree_store,
796 project_search: Default::default(),
797 }
798 }
799
800 pub fn remote(
801 worktree_store: Entity<WorktreeStore>,
802 upstream_client: AnyProtoClient,
803 remote_id: u64,
804 _cx: &mut Context<Self>,
805 ) -> Self {
806 Self {
807 state: BufferStoreState::Remote(RemoteBufferStore {
808 shared_with_me: Default::default(),
809 loading_remote_buffers_by_id: Default::default(),
810 remote_buffer_listeners: Default::default(),
811 project_id: remote_id,
812 upstream_client,
813 worktree_store: worktree_store.clone(),
814 }),
815 downstream_client: None,
816 opened_buffers: Default::default(),
817 path_to_buffer_id: Default::default(),
818 loading_buffers: Default::default(),
819 shared_buffers: Default::default(),
820 non_searchable_buffers: Default::default(),
821 worktree_store,
822 project_search: Default::default(),
823 }
824 }
825
826 fn as_local_mut(&mut self) -> Option<&mut LocalBufferStore> {
827 match &mut self.state {
828 BufferStoreState::Local(state) => Some(state),
829 _ => None,
830 }
831 }
832
833 fn as_remote_mut(&mut self) -> Option<&mut RemoteBufferStore> {
834 match &mut self.state {
835 BufferStoreState::Remote(state) => Some(state),
836 _ => None,
837 }
838 }
839
840 fn as_remote(&self) -> Option<&RemoteBufferStore> {
841 match &self.state {
842 BufferStoreState::Remote(state) => Some(state),
843 _ => None,
844 }
845 }
846
847 pub fn open_buffer(
848 &mut self,
849 project_path: ProjectPath,
850 cx: &mut Context<Self>,
851 ) -> Task<Result<Entity<Buffer>>> {
852 if let Some(buffer) = self.get_by_path(&project_path) {
853 cx.emit(BufferStoreEvent::BufferOpened {
854 buffer: buffer.clone(),
855 project_path,
856 });
857
858 return Task::ready(Ok(buffer));
859 }
860
861 let task = match self.loading_buffers.entry(project_path.clone()) {
862 hash_map::Entry::Occupied(e) => e.get().clone(),
863 hash_map::Entry::Vacant(entry) => {
864 let path = project_path.path.clone();
865 let Some(worktree) = self
866 .worktree_store
867 .read(cx)
868 .worktree_for_id(project_path.worktree_id, cx)
869 else {
870 return Task::ready(Err(anyhow!("no such worktree")));
871 };
872 let load_buffer = match &self.state {
873 BufferStoreState::Local(this) => this.open_buffer(path, worktree, cx),
874 BufferStoreState::Remote(this) => this.open_buffer(path, worktree, cx),
875 };
876
877 entry
878 .insert(
879 // todo(lw): hot foreground spawn
880 cx.spawn(async move |this, cx| {
881 let load_result = load_buffer.await;
882 this.update(cx, |this, cx| {
883 // Record the fact that the buffer is no longer loading.
884 this.loading_buffers.remove(&project_path);
885
886 let buffer = load_result.map_err(Arc::new)?;
887 cx.emit(BufferStoreEvent::BufferOpened {
888 buffer: buffer.clone(),
889 project_path,
890 });
891
892 Ok(buffer)
893 })?
894 })
895 .shared(),
896 )
897 .clone()
898 }
899 };
900
901 cx.background_spawn(async move {
902 task.await.map_err(|e| {
903 if e.error_code() != ErrorCode::Internal {
904 anyhow!(e.error_code())
905 } else {
906 anyhow!("{e}")
907 }
908 })
909 })
910 }
911
912 pub fn create_buffer(
913 &mut self,
914 language: Option<Arc<Language>>,
915 project_searchable: bool,
916 cx: &mut Context<Self>,
917 ) -> Task<Result<Entity<Buffer>>> {
918 match &self.state {
919 BufferStoreState::Local(this) => this.create_buffer(language, project_searchable, cx),
920 BufferStoreState::Remote(this) => this.create_buffer(language, project_searchable, cx),
921 }
922 }
923
924 pub fn save_buffer(
925 &mut self,
926 buffer: Entity<Buffer>,
927 cx: &mut Context<Self>,
928 ) -> Task<Result<()>> {
929 match &mut self.state {
930 BufferStoreState::Local(this) => this.save_buffer(buffer, cx),
931 BufferStoreState::Remote(this) => this.save_remote_buffer(buffer, None, cx),
932 }
933 }
934
935 pub fn save_buffer_as(
936 &mut self,
937 buffer: Entity<Buffer>,
938 path: ProjectPath,
939 cx: &mut Context<Self>,
940 ) -> Task<Result<()>> {
941 let old_file = buffer.read(cx).file().cloned();
942 let task = match &self.state {
943 BufferStoreState::Local(this) => this.save_buffer_as(buffer.clone(), path, cx),
944 BufferStoreState::Remote(this) => {
945 this.save_remote_buffer(buffer.clone(), Some(path.to_proto()), cx)
946 }
947 };
948 cx.spawn(async move |this, cx| {
949 task.await?;
950 this.update(cx, |this, cx| {
951 old_file.clone().and_then(|file| {
952 this.path_to_buffer_id.remove(&ProjectPath {
953 worktree_id: file.worktree_id(cx),
954 path: file.path().clone(),
955 })
956 });
957
958 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
959 })
960 })
961 }
962
963 fn add_buffer(&mut self, buffer_entity: Entity<Buffer>, cx: &mut Context<Self>) -> Result<()> {
964 let buffer = buffer_entity.read(cx);
965 let remote_id = buffer.remote_id();
966 let path = File::from_dyn(buffer.file()).map(|file| ProjectPath {
967 path: file.path.clone(),
968 worktree_id: file.worktree_id(cx),
969 });
970 let is_remote = buffer.replica_id().is_remote();
971 let open_buffer = OpenBuffer::Complete {
972 buffer: buffer_entity.downgrade(),
973 };
974
975 let handle = cx.entity().downgrade();
976 buffer_entity.update(cx, move |_, cx| {
977 cx.on_release(move |buffer, cx| {
978 handle
979 .update(cx, |_, cx| {
980 cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
981 })
982 .ok();
983 })
984 .detach()
985 });
986 let _expect_path_to_exist;
987 match self.opened_buffers.entry(remote_id) {
988 hash_map::Entry::Vacant(entry) => {
989 entry.insert(open_buffer);
990 _expect_path_to_exist = false;
991 }
992 hash_map::Entry::Occupied(mut entry) => {
993 if let OpenBuffer::Operations(operations) = entry.get_mut() {
994 buffer_entity.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
995 } else if entry.get().upgrade().is_some() {
996 if is_remote {
997 return Ok(());
998 } else {
999 debug_panic!("buffer {remote_id} was already registered");
1000 anyhow::bail!("buffer {remote_id} was already registered");
1001 }
1002 }
1003 entry.insert(open_buffer);
1004 _expect_path_to_exist = true;
1005 }
1006 }
1007
1008 if let Some(path) = path {
1009 self.path_to_buffer_id.insert(path, remote_id);
1010 }
1011
1012 cx.subscribe(&buffer_entity, Self::on_buffer_event).detach();
1013 cx.emit(BufferStoreEvent::BufferAdded(buffer_entity));
1014 Ok(())
1015 }
1016
1017 pub fn buffers(&self) -> impl '_ + Iterator<Item = Entity<Buffer>> {
1018 self.opened_buffers
1019 .values()
1020 .filter_map(|buffer| buffer.upgrade())
1021 }
1022
1023 pub(crate) fn is_searchable(&self, id: &BufferId) -> bool {
1024 !self.non_searchable_buffers.contains(&id)
1025 }
1026
1027 pub fn loading_buffers(
1028 &self,
1029 ) -> impl Iterator<Item = (&ProjectPath, impl Future<Output = Result<Entity<Buffer>>>)> {
1030 self.loading_buffers.iter().map(|(path, task)| {
1031 let task = task.clone();
1032 (path, async move {
1033 task.await.map_err(|e| {
1034 if e.error_code() != ErrorCode::Internal {
1035 anyhow!(e.error_code())
1036 } else {
1037 anyhow!("{e}")
1038 }
1039 })
1040 })
1041 })
1042 }
1043
1044 pub fn buffer_id_for_project_path(&self, project_path: &ProjectPath) -> Option<&BufferId> {
1045 self.path_to_buffer_id.get(project_path)
1046 }
1047
1048 pub fn get_by_path(&self, path: &ProjectPath) -> Option<Entity<Buffer>> {
1049 self.path_to_buffer_id
1050 .get(path)
1051 .and_then(|buffer_id| self.get(*buffer_id))
1052 }
1053
1054 pub fn get(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1055 self.opened_buffers.get(&buffer_id)?.upgrade()
1056 }
1057
1058 pub fn get_existing(&self, buffer_id: BufferId) -> Result<Entity<Buffer>> {
1059 self.get(buffer_id)
1060 .with_context(|| format!("unknown buffer id {buffer_id}"))
1061 }
1062
1063 pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1064 self.get(buffer_id).or_else(|| {
1065 self.as_remote()
1066 .and_then(|remote| remote.loading_remote_buffers_by_id.get(&buffer_id).cloned())
1067 })
1068 }
1069
1070 pub fn buffer_version_info(&self, cx: &App) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
1071 let buffers = self
1072 .buffers()
1073 .map(|buffer| {
1074 let buffer = buffer.read(cx);
1075 proto::BufferVersion {
1076 id: buffer.remote_id().into(),
1077 version: language::proto::serialize_version(&buffer.version),
1078 }
1079 })
1080 .collect();
1081 let incomplete_buffer_ids = self
1082 .as_remote()
1083 .map(|remote| remote.incomplete_buffer_ids())
1084 .unwrap_or_default();
1085 (buffers, incomplete_buffer_ids)
1086 }
1087
1088 pub fn disconnected_from_host(&mut self, cx: &mut App) {
1089 for open_buffer in self.opened_buffers.values_mut() {
1090 if let Some(buffer) = open_buffer.upgrade() {
1091 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1092 }
1093 }
1094
1095 for buffer in self.buffers() {
1096 buffer.update(cx, |buffer, cx| {
1097 buffer.set_capability(Capability::ReadOnly, cx)
1098 });
1099 }
1100
1101 if let Some(remote) = self.as_remote_mut() {
1102 // Wake up all futures currently waiting on a buffer to get opened,
1103 // to give them a chance to fail now that we've disconnected.
1104 remote.remote_buffer_listeners.clear()
1105 }
1106 }
1107
1108 pub fn shared(&mut self, remote_id: u64, downstream_client: AnyProtoClient, _cx: &mut App) {
1109 self.downstream_client = Some((downstream_client, remote_id));
1110 }
1111
1112 pub fn unshared(&mut self, _cx: &mut Context<Self>) {
1113 self.downstream_client.take();
1114 self.forget_shared_buffers();
1115 }
1116
1117 pub fn discard_incomplete(&mut self) {
1118 self.opened_buffers
1119 .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
1120 }
1121
1122 fn buffer_changed_file(&mut self, buffer: Entity<Buffer>, cx: &mut App) -> Option<()> {
1123 let file = File::from_dyn(buffer.read(cx).file())?;
1124
1125 let remote_id = buffer.read(cx).remote_id();
1126 if let Some(entry_id) = file.entry_id {
1127 if let Some(local) = self.as_local_mut() {
1128 match local.local_buffer_ids_by_entry_id.get(&entry_id) {
1129 Some(_) => {
1130 return None;
1131 }
1132 None => {
1133 local
1134 .local_buffer_ids_by_entry_id
1135 .insert(entry_id, remote_id);
1136 }
1137 }
1138 }
1139 self.path_to_buffer_id.insert(
1140 ProjectPath {
1141 worktree_id: file.worktree_id(cx),
1142 path: file.path.clone(),
1143 },
1144 remote_id,
1145 );
1146 };
1147
1148 Some(())
1149 }
1150
1151 fn on_buffer_event(
1152 &mut self,
1153 buffer: Entity<Buffer>,
1154 event: &BufferEvent,
1155 cx: &mut Context<Self>,
1156 ) {
1157 match event {
1158 BufferEvent::FileHandleChanged => {
1159 self.buffer_changed_file(buffer, cx);
1160 }
1161 BufferEvent::Reloaded => {
1162 let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
1163 return;
1164 };
1165 let buffer = buffer.read(cx);
1166 downstream_client
1167 .send(proto::BufferReloaded {
1168 project_id: *project_id,
1169 buffer_id: buffer.remote_id().to_proto(),
1170 version: serialize_version(&buffer.version()),
1171 mtime: buffer.saved_mtime().map(|t| t.into()),
1172 line_ending: serialize_line_ending(buffer.line_ending()) as i32,
1173 })
1174 .log_err();
1175 }
1176 BufferEvent::LanguageChanged(_) => {}
1177 _ => {}
1178 }
1179 }
1180
1181 pub async fn handle_update_buffer(
1182 this: Entity<Self>,
1183 envelope: TypedEnvelope<proto::UpdateBuffer>,
1184 mut cx: AsyncApp,
1185 ) -> Result<proto::Ack> {
1186 let payload = envelope.payload;
1187 let buffer_id = BufferId::new(payload.buffer_id)?;
1188 let ops = payload
1189 .operations
1190 .into_iter()
1191 .map(language::proto::deserialize_operation)
1192 .collect::<Result<Vec<_>, _>>()?;
1193 this.update(&mut cx, |this, cx| {
1194 match this.opened_buffers.entry(buffer_id) {
1195 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
1196 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
1197 OpenBuffer::Complete { buffer, .. } => {
1198 if let Some(buffer) = buffer.upgrade() {
1199 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
1200 }
1201 }
1202 },
1203 hash_map::Entry::Vacant(e) => {
1204 e.insert(OpenBuffer::Operations(ops));
1205 }
1206 }
1207 Ok(proto::Ack {})
1208 })
1209 }
1210
1211 pub fn register_shared_lsp_handle(
1212 &mut self,
1213 peer_id: proto::PeerId,
1214 buffer_id: BufferId,
1215 handle: OpenLspBufferHandle,
1216 ) {
1217 if let Some(shared_buffers) = self.shared_buffers.get_mut(&peer_id)
1218 && let Some(buffer) = shared_buffers.get_mut(&buffer_id)
1219 {
1220 buffer.lsp_handle = Some(handle);
1221 return;
1222 }
1223 debug_panic!("tried to register shared lsp handle, but buffer was not shared")
1224 }
1225
1226 pub fn handle_synchronize_buffers(
1227 &mut self,
1228 envelope: TypedEnvelope<proto::SynchronizeBuffers>,
1229 cx: &mut Context<Self>,
1230 client: Arc<Client>,
1231 ) -> Result<proto::SynchronizeBuffersResponse> {
1232 let project_id = envelope.payload.project_id;
1233 let mut response = proto::SynchronizeBuffersResponse {
1234 buffers: Default::default(),
1235 };
1236 let Some(guest_id) = envelope.original_sender_id else {
1237 anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
1238 };
1239
1240 self.shared_buffers.entry(guest_id).or_default().clear();
1241 for buffer in envelope.payload.buffers {
1242 let buffer_id = BufferId::new(buffer.id)?;
1243 let remote_version = language::proto::deserialize_version(&buffer.version);
1244 if let Some(buffer) = self.get(buffer_id) {
1245 self.shared_buffers
1246 .entry(guest_id)
1247 .or_default()
1248 .entry(buffer_id)
1249 .or_insert_with(|| SharedBuffer {
1250 buffer: buffer.clone(),
1251 lsp_handle: None,
1252 });
1253
1254 let buffer = buffer.read(cx);
1255 response.buffers.push(proto::BufferVersion {
1256 id: buffer_id.into(),
1257 version: language::proto::serialize_version(&buffer.version),
1258 });
1259
1260 let operations = buffer.serialize_ops(Some(remote_version), cx);
1261 let client = client.clone();
1262 if let Some(file) = buffer.file() {
1263 client
1264 .send(proto::UpdateBufferFile {
1265 project_id,
1266 buffer_id: buffer_id.into(),
1267 file: Some(file.to_proto(cx)),
1268 })
1269 .log_err();
1270 }
1271
1272 // TODO(max): do something
1273 // client
1274 // .send(proto::UpdateStagedText {
1275 // project_id,
1276 // buffer_id: buffer_id.into(),
1277 // diff_base: buffer.diff_base().map(ToString::to_string),
1278 // })
1279 // .log_err();
1280
1281 client
1282 .send(proto::BufferReloaded {
1283 project_id,
1284 buffer_id: buffer_id.into(),
1285 version: language::proto::serialize_version(buffer.saved_version()),
1286 mtime: buffer.saved_mtime().map(|time| time.into()),
1287 line_ending: language::proto::serialize_line_ending(buffer.line_ending())
1288 as i32,
1289 })
1290 .log_err();
1291
1292 cx.background_spawn(
1293 async move {
1294 let operations = operations.await;
1295 for chunk in split_operations(operations) {
1296 client
1297 .request(proto::UpdateBuffer {
1298 project_id,
1299 buffer_id: buffer_id.into(),
1300 operations: chunk,
1301 })
1302 .await?;
1303 }
1304 anyhow::Ok(())
1305 }
1306 .log_err(),
1307 )
1308 .detach();
1309 }
1310 }
1311 Ok(response)
1312 }
1313
1314 pub fn handle_create_buffer_for_peer(
1315 &mut self,
1316 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
1317 replica_id: ReplicaId,
1318 capability: Capability,
1319 cx: &mut Context<Self>,
1320 ) -> Result<()> {
1321 let remote = self
1322 .as_remote_mut()
1323 .context("buffer store is not a remote")?;
1324
1325 if let Some(buffer) =
1326 remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx)?
1327 {
1328 self.add_buffer(buffer, cx)?;
1329 }
1330
1331 Ok(())
1332 }
1333
1334 pub async fn handle_update_buffer_file(
1335 this: Entity<Self>,
1336 envelope: TypedEnvelope<proto::UpdateBufferFile>,
1337 mut cx: AsyncApp,
1338 ) -> Result<()> {
1339 let buffer_id = envelope.payload.buffer_id;
1340 let buffer_id = BufferId::new(buffer_id)?;
1341
1342 this.update(&mut cx, |this, cx| {
1343 let payload = envelope.payload.clone();
1344 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1345 let file = payload.file.context("invalid file")?;
1346 let worktree = this
1347 .worktree_store
1348 .read(cx)
1349 .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
1350 .context("no such worktree")?;
1351 let file = File::from_proto(file, worktree, cx)?;
1352 let old_file = buffer.update(cx, |buffer, cx| {
1353 let old_file = buffer.file().cloned();
1354 let new_path = file.path.clone();
1355
1356 buffer.file_updated(Arc::new(file), cx);
1357 if old_file.as_ref().is_none_or(|old| *old.path() != new_path) {
1358 Some(old_file)
1359 } else {
1360 None
1361 }
1362 });
1363 if let Some(old_file) = old_file {
1364 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1365 }
1366 }
1367 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1368 downstream_client
1369 .send(proto::UpdateBufferFile {
1370 project_id: *project_id,
1371 buffer_id: buffer_id.into(),
1372 file: envelope.payload.file,
1373 })
1374 .log_err();
1375 }
1376 Ok(())
1377 })
1378 }
1379
1380 pub async fn handle_save_buffer(
1381 this: Entity<Self>,
1382 envelope: TypedEnvelope<proto::SaveBuffer>,
1383 mut cx: AsyncApp,
1384 ) -> Result<proto::BufferSaved> {
1385 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1386 let (buffer, project_id) = this.read_with(&cx, |this, _| {
1387 anyhow::Ok((
1388 this.get_existing(buffer_id)?,
1389 this.downstream_client
1390 .as_ref()
1391 .map(|(_, project_id)| *project_id)
1392 .context("project is not shared")?,
1393 ))
1394 })?;
1395 buffer
1396 .update(&mut cx, |buffer, _| {
1397 buffer.wait_for_version(deserialize_version(&envelope.payload.version))
1398 })
1399 .await?;
1400 let buffer_id = buffer.read_with(&cx, |buffer, _| buffer.remote_id());
1401
1402 if let Some(new_path) = envelope.payload.new_path
1403 && let Some(new_path) = ProjectPath::from_proto(new_path)
1404 {
1405 this.update(&mut cx, |this, cx| {
1406 this.save_buffer_as(buffer.clone(), new_path, cx)
1407 })
1408 .await?;
1409 } else {
1410 this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))
1411 .await?;
1412 }
1413
1414 Ok(buffer.read_with(&cx, |buffer, _| proto::BufferSaved {
1415 project_id,
1416 buffer_id: buffer_id.into(),
1417 version: serialize_version(buffer.saved_version()),
1418 mtime: buffer.saved_mtime().map(|time| time.into()),
1419 }))
1420 }
1421
1422 pub async fn handle_close_buffer(
1423 this: Entity<Self>,
1424 envelope: TypedEnvelope<proto::CloseBuffer>,
1425 mut cx: AsyncApp,
1426 ) -> Result<()> {
1427 let peer_id = envelope.sender_id;
1428 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1429 this.update(&mut cx, |this, cx| {
1430 if let Some(shared) = this.shared_buffers.get_mut(&peer_id)
1431 && shared.remove(&buffer_id).is_some()
1432 {
1433 cx.emit(BufferStoreEvent::SharedBufferClosed(peer_id, buffer_id));
1434 if shared.is_empty() {
1435 this.shared_buffers.remove(&peer_id);
1436 }
1437 return;
1438 }
1439 debug_panic!(
1440 "peer_id {} closed buffer_id {} which was either not open or already closed",
1441 peer_id,
1442 buffer_id
1443 )
1444 });
1445 Ok(())
1446 }
1447
1448 pub async fn handle_buffer_saved(
1449 this: Entity<Self>,
1450 envelope: TypedEnvelope<proto::BufferSaved>,
1451 mut cx: AsyncApp,
1452 ) -> Result<()> {
1453 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1454 let version = deserialize_version(&envelope.payload.version);
1455 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1456 this.update(&mut cx, move |this, cx| {
1457 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1458 buffer.update(cx, |buffer, cx| {
1459 buffer.did_save(version, mtime, cx);
1460 });
1461 }
1462
1463 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1464 downstream_client
1465 .send(proto::BufferSaved {
1466 project_id: *project_id,
1467 buffer_id: buffer_id.into(),
1468 mtime: envelope.payload.mtime,
1469 version: envelope.payload.version,
1470 })
1471 .log_err();
1472 }
1473 });
1474 Ok(())
1475 }
1476
1477 pub async fn handle_buffer_reloaded(
1478 this: Entity<Self>,
1479 envelope: TypedEnvelope<proto::BufferReloaded>,
1480 mut cx: AsyncApp,
1481 ) -> Result<()> {
1482 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1483 let version = deserialize_version(&envelope.payload.version);
1484 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1485 let line_ending = deserialize_line_ending(
1486 proto::LineEnding::from_i32(envelope.payload.line_ending)
1487 .context("missing line ending")?,
1488 );
1489 this.update(&mut cx, |this, cx| {
1490 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1491 buffer.update(cx, |buffer, cx| {
1492 buffer.did_reload(version, line_ending, mtime, cx);
1493 });
1494 }
1495
1496 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1497 downstream_client
1498 .send(proto::BufferReloaded {
1499 project_id: *project_id,
1500 buffer_id: buffer_id.into(),
1501 mtime: envelope.payload.mtime,
1502 version: envelope.payload.version,
1503 line_ending: envelope.payload.line_ending,
1504 })
1505 .log_err();
1506 }
1507 });
1508 Ok(())
1509 }
1510
1511 pub fn reload_buffers(
1512 &self,
1513 buffers: HashSet<Entity<Buffer>>,
1514 push_to_history: bool,
1515 cx: &mut Context<Self>,
1516 ) -> Task<Result<ProjectTransaction>> {
1517 if buffers.is_empty() {
1518 return Task::ready(Ok(ProjectTransaction::default()));
1519 }
1520 match &self.state {
1521 BufferStoreState::Local(this) => this.reload_buffers(buffers, push_to_history, cx),
1522 BufferStoreState::Remote(this) => this.reload_buffers(buffers, push_to_history, cx),
1523 }
1524 }
1525
1526 async fn handle_reload_buffers(
1527 this: Entity<Self>,
1528 envelope: TypedEnvelope<proto::ReloadBuffers>,
1529 mut cx: AsyncApp,
1530 ) -> Result<proto::ReloadBuffersResponse> {
1531 let sender_id = envelope.original_sender_id().unwrap_or_default();
1532 let reload = this.update(&mut cx, |this, cx| {
1533 let mut buffers = HashSet::default();
1534 for buffer_id in &envelope.payload.buffer_ids {
1535 let buffer_id = BufferId::new(*buffer_id)?;
1536 buffers.insert(this.get_existing(buffer_id)?);
1537 }
1538 anyhow::Ok(this.reload_buffers(buffers, false, cx))
1539 })?;
1540
1541 let project_transaction = reload.await?;
1542 let project_transaction = this.update(&mut cx, |this, cx| {
1543 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
1544 });
1545 Ok(proto::ReloadBuffersResponse {
1546 transaction: Some(project_transaction),
1547 })
1548 }
1549
1550 pub fn create_buffer_for_peer(
1551 &mut self,
1552 buffer: &Entity<Buffer>,
1553 peer_id: proto::PeerId,
1554 cx: &mut Context<Self>,
1555 ) -> Task<Result<()>> {
1556 let buffer_id = buffer.read(cx).remote_id();
1557 let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
1558 if shared_buffers.contains_key(&buffer_id) {
1559 return Task::ready(Ok(()));
1560 }
1561 shared_buffers.insert(
1562 buffer_id,
1563 SharedBuffer {
1564 buffer: buffer.clone(),
1565 lsp_handle: None,
1566 },
1567 );
1568
1569 let Some((client, project_id)) = self.downstream_client.clone() else {
1570 return Task::ready(Ok(()));
1571 };
1572
1573 cx.spawn(async move |this, cx| {
1574 let Some(buffer) = this.read_with(cx, |this, _| this.get(buffer_id))? else {
1575 return anyhow::Ok(());
1576 };
1577
1578 let operations = buffer.update(cx, |b, cx| b.serialize_ops(None, cx));
1579 let operations = operations.await;
1580 let state = buffer.update(cx, |buffer, cx| buffer.to_proto(cx));
1581
1582 let initial_state = proto::CreateBufferForPeer {
1583 project_id,
1584 peer_id: Some(peer_id),
1585 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
1586 };
1587
1588 if client.send(initial_state).log_err().is_some() {
1589 let client = client.clone();
1590 cx.background_spawn(async move {
1591 let mut chunks = split_operations(operations).peekable();
1592 while let Some(chunk) = chunks.next() {
1593 let is_last = chunks.peek().is_none();
1594 client.send(proto::CreateBufferForPeer {
1595 project_id,
1596 peer_id: Some(peer_id),
1597 variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
1598 proto::BufferChunk {
1599 buffer_id: buffer_id.into(),
1600 operations: chunk,
1601 is_last,
1602 },
1603 )),
1604 })?;
1605 }
1606 anyhow::Ok(())
1607 })
1608 .await
1609 .log_err();
1610 }
1611 Ok(())
1612 })
1613 }
1614
1615 pub fn forget_shared_buffers(&mut self) {
1616 self.shared_buffers.clear();
1617 }
1618
1619 pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
1620 self.shared_buffers.remove(peer_id);
1621 }
1622
1623 pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
1624 if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
1625 self.shared_buffers.insert(new_peer_id, buffers);
1626 }
1627 }
1628
1629 pub fn has_shared_buffers(&self) -> bool {
1630 !self.shared_buffers.is_empty()
1631 }
1632
1633 pub fn create_local_buffer(
1634 &mut self,
1635 text: &str,
1636 language: Option<Arc<Language>>,
1637 project_searchable: bool,
1638 cx: &mut Context<Self>,
1639 ) -> Entity<Buffer> {
1640 let buffer = cx.new(|cx| {
1641 Buffer::local(text, cx)
1642 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
1643 });
1644
1645 self.add_buffer(buffer.clone(), cx).log_err();
1646 let buffer_id = buffer.read(cx).remote_id();
1647 if !project_searchable {
1648 self.non_searchable_buffers.insert(buffer_id);
1649 }
1650
1651 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
1652 self.path_to_buffer_id.insert(
1653 ProjectPath {
1654 worktree_id: file.worktree_id(cx),
1655 path: file.path.clone(),
1656 },
1657 buffer_id,
1658 );
1659 let this = self
1660 .as_local_mut()
1661 .expect("local-only method called in a non-local context");
1662 if let Some(entry_id) = file.entry_id {
1663 this.local_buffer_ids_by_entry_id
1664 .insert(entry_id, buffer_id);
1665 }
1666 }
1667 buffer
1668 }
1669
1670 pub fn deserialize_project_transaction(
1671 &mut self,
1672 message: proto::ProjectTransaction,
1673 push_to_history: bool,
1674 cx: &mut Context<Self>,
1675 ) -> Task<Result<ProjectTransaction>> {
1676 if let Some(this) = self.as_remote_mut() {
1677 this.deserialize_project_transaction(message, push_to_history, cx)
1678 } else {
1679 debug_panic!("not a remote buffer store");
1680 Task::ready(Err(anyhow!("not a remote buffer store")))
1681 }
1682 }
1683
1684 pub fn wait_for_remote_buffer(
1685 &mut self,
1686 id: BufferId,
1687 cx: &mut Context<BufferStore>,
1688 ) -> Task<Result<Entity<Buffer>>> {
1689 if let Some(this) = self.as_remote_mut() {
1690 this.wait_for_remote_buffer(id, cx)
1691 } else {
1692 debug_panic!("not a remote buffer store");
1693 Task::ready(Err(anyhow!("not a remote buffer store")))
1694 }
1695 }
1696
1697 pub fn serialize_project_transaction_for_peer(
1698 &mut self,
1699 project_transaction: ProjectTransaction,
1700 peer_id: proto::PeerId,
1701 cx: &mut Context<Self>,
1702 ) -> proto::ProjectTransaction {
1703 let mut serialized_transaction = proto::ProjectTransaction {
1704 buffer_ids: Default::default(),
1705 transactions: Default::default(),
1706 };
1707 for (buffer, transaction) in project_transaction.0 {
1708 self.create_buffer_for_peer(&buffer, peer_id, cx)
1709 .detach_and_log_err(cx);
1710 serialized_transaction
1711 .buffer_ids
1712 .push(buffer.read(cx).remote_id().into());
1713 serialized_transaction
1714 .transactions
1715 .push(language::proto::serialize_transaction(&transaction));
1716 }
1717 serialized_transaction
1718 }
1719
1720 pub(crate) fn register_project_search_result_handle(
1721 &mut self,
1722 ) -> (u64, smol::channel::Receiver<BufferId>) {
1723 let (tx, rx) = smol::channel::unbounded();
1724 let handle = util::post_inc(&mut self.project_search.next_id);
1725 let _old_entry = self.project_search.chunks.insert(handle, tx);
1726 debug_assert!(_old_entry.is_none());
1727 (handle, rx)
1728 }
1729
1730 pub fn register_ongoing_project_search(
1731 &mut self,
1732 id: (PeerId, u64),
1733 search: Task<anyhow::Result<()>>,
1734 ) {
1735 let _old = self.project_search.searches_in_progress.insert(id, search);
1736 debug_assert!(_old.is_none());
1737 }
1738
1739 pub async fn handle_find_search_candidates_cancel(
1740 this: Entity<Self>,
1741 envelope: TypedEnvelope<proto::FindSearchCandidatesCancelled>,
1742 mut cx: AsyncApp,
1743 ) -> Result<()> {
1744 let id = (
1745 envelope.original_sender_id.unwrap_or(envelope.sender_id),
1746 envelope.payload.handle,
1747 );
1748 let _ = this.update(&mut cx, |this, _| {
1749 this.project_search.searches_in_progress.remove(&id)
1750 });
1751 Ok(())
1752 }
1753
1754 pub(crate) async fn handle_find_search_candidates_chunk(
1755 this: Entity<Self>,
1756 envelope: TypedEnvelope<proto::FindSearchCandidatesChunk>,
1757 mut cx: AsyncApp,
1758 ) -> Result<proto::Ack> {
1759 use proto::find_search_candidates_chunk::Variant;
1760 let handle = envelope.payload.handle;
1761
1762 let buffer_ids = match envelope
1763 .payload
1764 .variant
1765 .context("Expected non-null variant")?
1766 {
1767 Variant::Matches(find_search_candidates_matches) => find_search_candidates_matches
1768 .buffer_ids
1769 .into_iter()
1770 .filter_map(|buffer_id| BufferId::new(buffer_id).ok())
1771 .collect::<Vec<_>>(),
1772 Variant::Done(_) => {
1773 this.update(&mut cx, |this, _| {
1774 this.project_search.chunks.remove(&handle)
1775 });
1776 return Ok(proto::Ack {});
1777 }
1778 };
1779 let Some(sender) = this.read_with(&mut cx, |this, _| {
1780 this.project_search.chunks.get(&handle).cloned()
1781 }) else {
1782 return Ok(proto::Ack {});
1783 };
1784
1785 for buffer_id in buffer_ids {
1786 let Ok(_) = sender.send(buffer_id).await else {
1787 this.update(&mut cx, |this, _| {
1788 this.project_search.chunks.remove(&handle)
1789 });
1790 return Ok(proto::Ack {});
1791 };
1792 }
1793 Ok(proto::Ack {})
1794 }
1795}
1796
1797impl OpenBuffer {
1798 fn upgrade(&self) -> Option<Entity<Buffer>> {
1799 match self {
1800 OpenBuffer::Complete { buffer, .. } => buffer.upgrade(),
1801 OpenBuffer::Operations(_) => None,
1802 }
1803 }
1804}
1805
1806fn is_not_found_error(error: &anyhow::Error) -> bool {
1807 error
1808 .root_cause()
1809 .downcast_ref::<io::Error>()
1810 .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
1811}