1use crate::{
2 lsp_store::OpenLspBufferHandle,
3 search::SearchQuery,
4 worktree_store::{WorktreeStore, WorktreeStoreEvent},
5 ProjectItem as _, ProjectPath,
6};
7use ::git::{parse_git_remote_url, BuildPermalinkParams, GitHostingProviderRegistry};
8use anyhow::{anyhow, bail, Context as _, Result};
9use client::Client;
10use collections::{hash_map, HashMap, HashSet};
11use fs::Fs;
12use futures::{channel::oneshot, future::Shared, Future, FutureExt as _, StreamExt};
13use git::{blame::Blame, repository::RepoPath};
14use gpui::{
15 App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity,
16};
17use language::{
18 proto::{
19 deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
20 split_operations,
21 },
22 Buffer, BufferEvent, Capability, DiskState, File as _, Language, Operation,
23};
24use rpc::{
25 proto::{self, ToProto},
26 AnyProtoClient, ErrorExt as _, TypedEnvelope,
27};
28use serde::Deserialize;
29use smol::channel::Receiver;
30use std::{
31 io,
32 ops::Range,
33 path::{Path, PathBuf},
34 pin::pin,
35 sync::Arc,
36 time::Instant,
37};
38use text::BufferId;
39use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
40use worktree::{File, PathChange, ProjectEntryId, Worktree, WorktreeId};
41
42/// A set of open buffers.
43pub struct BufferStore {
44 state: BufferStoreState,
45 #[allow(clippy::type_complexity)]
46 loading_buffers: HashMap<ProjectPath, Shared<Task<Result<Entity<Buffer>, Arc<anyhow::Error>>>>>,
47 worktree_store: Entity<WorktreeStore>,
48 opened_buffers: HashMap<BufferId, OpenBuffer>,
49 downstream_client: Option<(AnyProtoClient, u64)>,
50 shared_buffers: HashMap<proto::PeerId, HashMap<BufferId, SharedBuffer>>,
51}
52
53#[derive(Hash, Eq, PartialEq, Clone)]
54struct SharedBuffer {
55 buffer: Entity<Buffer>,
56 lsp_handle: Option<OpenLspBufferHandle>,
57}
58
59enum BufferStoreState {
60 Local(LocalBufferStore),
61 Remote(RemoteBufferStore),
62}
63
64struct RemoteBufferStore {
65 shared_with_me: HashSet<Entity<Buffer>>,
66 upstream_client: AnyProtoClient,
67 project_id: u64,
68 loading_remote_buffers_by_id: HashMap<BufferId, Entity<Buffer>>,
69 remote_buffer_listeners:
70 HashMap<BufferId, Vec<oneshot::Sender<Result<Entity<Buffer>, anyhow::Error>>>>,
71 worktree_store: Entity<WorktreeStore>,
72}
73
74struct LocalBufferStore {
75 local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
76 local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
77 worktree_store: Entity<WorktreeStore>,
78 _subscription: Subscription,
79}
80
81enum OpenBuffer {
82 Complete { buffer: WeakEntity<Buffer> },
83 Operations(Vec<Operation>),
84}
85
86pub enum BufferStoreEvent {
87 BufferAdded(Entity<Buffer>),
88 BufferOpened {
89 buffer: Entity<Buffer>,
90 project_path: ProjectPath,
91 },
92 SharedBufferClosed(proto::PeerId, BufferId),
93 BufferDropped(BufferId),
94 BufferChangedFilePath {
95 buffer: Entity<Buffer>,
96 old_file: Option<Arc<dyn language::File>>,
97 },
98}
99
100#[derive(Default, Debug)]
101pub struct ProjectTransaction(pub HashMap<Entity<Buffer>, language::Transaction>);
102
103impl EventEmitter<BufferStoreEvent> for BufferStore {}
104
105impl RemoteBufferStore {
106 pub fn wait_for_remote_buffer(
107 &mut self,
108 id: BufferId,
109 cx: &mut Context<BufferStore>,
110 ) -> Task<Result<Entity<Buffer>>> {
111 let (tx, rx) = oneshot::channel();
112 self.remote_buffer_listeners.entry(id).or_default().push(tx);
113
114 cx.spawn(|this, cx| async move {
115 if let Some(buffer) = this
116 .read_with(&cx, |buffer_store, _| buffer_store.get(id))
117 .ok()
118 .flatten()
119 {
120 return Ok(buffer);
121 }
122
123 cx.background_spawn(async move { rx.await? }).await
124 })
125 }
126
127 fn save_remote_buffer(
128 &self,
129 buffer_handle: Entity<Buffer>,
130 new_path: Option<proto::ProjectPath>,
131 cx: &Context<BufferStore>,
132 ) -> Task<Result<()>> {
133 let buffer = buffer_handle.read(cx);
134 let buffer_id = buffer.remote_id().into();
135 let version = buffer.version();
136 let rpc = self.upstream_client.clone();
137 let project_id = self.project_id;
138 cx.spawn(move |_, mut cx| async move {
139 let response = rpc
140 .request(proto::SaveBuffer {
141 project_id,
142 buffer_id,
143 new_path,
144 version: serialize_version(&version),
145 })
146 .await?;
147 let version = deserialize_version(&response.version);
148 let mtime = response.mtime.map(|mtime| mtime.into());
149
150 buffer_handle.update(&mut cx, |buffer, cx| {
151 buffer.did_save(version.clone(), mtime, cx);
152 })?;
153
154 Ok(())
155 })
156 }
157
158 pub fn handle_create_buffer_for_peer(
159 &mut self,
160 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
161 replica_id: u16,
162 capability: Capability,
163 cx: &mut Context<BufferStore>,
164 ) -> Result<Option<Entity<Buffer>>> {
165 match envelope
166 .payload
167 .variant
168 .ok_or_else(|| anyhow!("missing variant"))?
169 {
170 proto::create_buffer_for_peer::Variant::State(mut state) => {
171 let buffer_id = BufferId::new(state.id)?;
172
173 let buffer_result = maybe!({
174 let mut buffer_file = None;
175 if let Some(file) = state.file.take() {
176 let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
177 let worktree = self
178 .worktree_store
179 .read(cx)
180 .worktree_for_id(worktree_id, cx)
181 .ok_or_else(|| {
182 anyhow!("no worktree found for id {}", file.worktree_id)
183 })?;
184 buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
185 as Arc<dyn language::File>);
186 }
187 Buffer::from_proto(replica_id, capability, state, buffer_file)
188 });
189
190 match buffer_result {
191 Ok(buffer) => {
192 let buffer = cx.new(|_| buffer);
193 self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
194 }
195 Err(error) => {
196 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
197 for listener in listeners {
198 listener.send(Err(anyhow!(error.cloned()))).ok();
199 }
200 }
201 }
202 }
203 }
204 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
205 let buffer_id = BufferId::new(chunk.buffer_id)?;
206 let buffer = self
207 .loading_remote_buffers_by_id
208 .get(&buffer_id)
209 .cloned()
210 .ok_or_else(|| {
211 anyhow!(
212 "received chunk for buffer {} without initial state",
213 chunk.buffer_id
214 )
215 })?;
216
217 let result = maybe!({
218 let operations = chunk
219 .operations
220 .into_iter()
221 .map(language::proto::deserialize_operation)
222 .collect::<Result<Vec<_>>>()?;
223 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
224 anyhow::Ok(())
225 });
226
227 if let Err(error) = result {
228 self.loading_remote_buffers_by_id.remove(&buffer_id);
229 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
230 for listener in listeners {
231 listener.send(Err(error.cloned())).ok();
232 }
233 }
234 } else if chunk.is_last {
235 self.loading_remote_buffers_by_id.remove(&buffer_id);
236 if self.upstream_client.is_via_collab() {
237 // retain buffers sent by peers to avoid races.
238 self.shared_with_me.insert(buffer.clone());
239 }
240
241 if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) {
242 for sender in senders {
243 sender.send(Ok(buffer.clone())).ok();
244 }
245 }
246 return Ok(Some(buffer));
247 }
248 }
249 }
250 return Ok(None);
251 }
252
253 pub fn incomplete_buffer_ids(&self) -> Vec<BufferId> {
254 self.loading_remote_buffers_by_id
255 .keys()
256 .copied()
257 .collect::<Vec<_>>()
258 }
259
260 pub fn deserialize_project_transaction(
261 &self,
262 message: proto::ProjectTransaction,
263 push_to_history: bool,
264 cx: &mut Context<BufferStore>,
265 ) -> Task<Result<ProjectTransaction>> {
266 cx.spawn(|this, mut cx| async move {
267 let mut project_transaction = ProjectTransaction::default();
268 for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
269 {
270 let buffer_id = BufferId::new(buffer_id)?;
271 let buffer = this
272 .update(&mut cx, |this, cx| {
273 this.wait_for_remote_buffer(buffer_id, cx)
274 })?
275 .await?;
276 let transaction = language::proto::deserialize_transaction(transaction)?;
277 project_transaction.0.insert(buffer, transaction);
278 }
279
280 for (buffer, transaction) in &project_transaction.0 {
281 buffer
282 .update(&mut cx, |buffer, _| {
283 buffer.wait_for_edits(transaction.edit_ids.iter().copied())
284 })?
285 .await?;
286
287 if push_to_history {
288 buffer.update(&mut cx, |buffer, _| {
289 buffer.push_transaction(transaction.clone(), Instant::now());
290 })?;
291 }
292 }
293
294 Ok(project_transaction)
295 })
296 }
297
298 fn open_buffer(
299 &self,
300 path: Arc<Path>,
301 worktree: Entity<Worktree>,
302 cx: &mut Context<BufferStore>,
303 ) -> Task<Result<Entity<Buffer>>> {
304 let worktree_id = worktree.read(cx).id().to_proto();
305 let project_id = self.project_id;
306 let client = self.upstream_client.clone();
307 cx.spawn(move |this, mut cx| async move {
308 let response = client
309 .request(proto::OpenBufferByPath {
310 project_id,
311 worktree_id,
312 path: path.to_proto(),
313 })
314 .await?;
315 let buffer_id = BufferId::new(response.buffer_id)?;
316
317 let buffer = this
318 .update(&mut cx, {
319 |this, cx| this.wait_for_remote_buffer(buffer_id, cx)
320 })?
321 .await?;
322
323 Ok(buffer)
324 })
325 }
326
327 fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
328 let create = self.upstream_client.request(proto::OpenNewBuffer {
329 project_id: self.project_id,
330 });
331 cx.spawn(|this, mut cx| async move {
332 let response = create.await?;
333 let buffer_id = BufferId::new(response.buffer_id)?;
334
335 this.update(&mut cx, |this, cx| {
336 this.wait_for_remote_buffer(buffer_id, cx)
337 })?
338 .await
339 })
340 }
341
342 fn reload_buffers(
343 &self,
344 buffers: HashSet<Entity<Buffer>>,
345 push_to_history: bool,
346 cx: &mut Context<BufferStore>,
347 ) -> Task<Result<ProjectTransaction>> {
348 let request = self.upstream_client.request(proto::ReloadBuffers {
349 project_id: self.project_id,
350 buffer_ids: buffers
351 .iter()
352 .map(|buffer| buffer.read(cx).remote_id().to_proto())
353 .collect(),
354 });
355
356 cx.spawn(|this, mut cx| async move {
357 let response = request
358 .await?
359 .transaction
360 .ok_or_else(|| anyhow!("missing transaction"))?;
361 this.update(&mut cx, |this, cx| {
362 this.deserialize_project_transaction(response, push_to_history, cx)
363 })?
364 .await
365 })
366 }
367}
368
369impl LocalBufferStore {
370 fn save_local_buffer(
371 &self,
372 buffer_handle: Entity<Buffer>,
373 worktree: Entity<Worktree>,
374 path: Arc<Path>,
375 mut has_changed_file: bool,
376 cx: &mut Context<BufferStore>,
377 ) -> Task<Result<()>> {
378 let buffer = buffer_handle.read(cx);
379
380 let text = buffer.as_rope().clone();
381 let line_ending = buffer.line_ending();
382 let version = buffer.version();
383 let buffer_id = buffer.remote_id();
384 if buffer
385 .file()
386 .is_some_and(|file| file.disk_state() == DiskState::New)
387 {
388 has_changed_file = true;
389 }
390
391 let save = worktree.update(cx, |worktree, cx| {
392 worktree.write_file(path.as_ref(), text, line_ending, cx)
393 });
394
395 cx.spawn(move |this, mut cx| async move {
396 let new_file = save.await?;
397 let mtime = new_file.disk_state().mtime();
398 this.update(&mut cx, |this, cx| {
399 if let Some((downstream_client, project_id)) = this.downstream_client.clone() {
400 if has_changed_file {
401 downstream_client
402 .send(proto::UpdateBufferFile {
403 project_id,
404 buffer_id: buffer_id.to_proto(),
405 file: Some(language::File::to_proto(&*new_file, cx)),
406 })
407 .log_err();
408 }
409 downstream_client
410 .send(proto::BufferSaved {
411 project_id,
412 buffer_id: buffer_id.to_proto(),
413 version: serialize_version(&version),
414 mtime: mtime.map(|time| time.into()),
415 })
416 .log_err();
417 }
418 })?;
419 buffer_handle.update(&mut cx, |buffer, cx| {
420 if has_changed_file {
421 buffer.file_updated(new_file, cx);
422 }
423 buffer.did_save(version.clone(), mtime, cx);
424 })
425 })
426 }
427
428 fn subscribe_to_worktree(
429 &mut self,
430 worktree: &Entity<Worktree>,
431 cx: &mut Context<BufferStore>,
432 ) {
433 cx.subscribe(worktree, |this, worktree, event, cx| {
434 if worktree.read(cx).is_local() {
435 match event {
436 worktree::Event::UpdatedEntries(changes) => {
437 Self::local_worktree_entries_changed(this, &worktree, changes, cx);
438 }
439 _ => {}
440 }
441 }
442 })
443 .detach();
444 }
445
446 fn local_worktree_entries_changed(
447 this: &mut BufferStore,
448 worktree_handle: &Entity<Worktree>,
449 changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
450 cx: &mut Context<BufferStore>,
451 ) {
452 let snapshot = worktree_handle.read(cx).snapshot();
453 for (path, entry_id, _) in changes {
454 Self::local_worktree_entry_changed(
455 this,
456 *entry_id,
457 path,
458 worktree_handle,
459 &snapshot,
460 cx,
461 );
462 }
463 }
464
465 fn local_worktree_entry_changed(
466 this: &mut BufferStore,
467 entry_id: ProjectEntryId,
468 path: &Arc<Path>,
469 worktree: &Entity<worktree::Worktree>,
470 snapshot: &worktree::Snapshot,
471 cx: &mut Context<BufferStore>,
472 ) -> Option<()> {
473 let project_path = ProjectPath {
474 worktree_id: snapshot.id(),
475 path: path.clone(),
476 };
477
478 let buffer_id = {
479 let local = this.as_local_mut()?;
480 match local.local_buffer_ids_by_entry_id.get(&entry_id) {
481 Some(&buffer_id) => buffer_id,
482 None => local.local_buffer_ids_by_path.get(&project_path).copied()?,
483 }
484 };
485
486 let buffer = if let Some(buffer) = this.get(buffer_id) {
487 Some(buffer)
488 } else {
489 this.opened_buffers.remove(&buffer_id);
490 None
491 };
492
493 let buffer = if let Some(buffer) = buffer {
494 buffer
495 } else {
496 let this = this.as_local_mut()?;
497 this.local_buffer_ids_by_path.remove(&project_path);
498 this.local_buffer_ids_by_entry_id.remove(&entry_id);
499 return None;
500 };
501
502 let events = buffer.update(cx, |buffer, cx| {
503 let local = this.as_local_mut()?;
504 let file = buffer.file()?;
505 let old_file = File::from_dyn(Some(file))?;
506 if old_file.worktree != *worktree {
507 return None;
508 }
509
510 let snapshot_entry = old_file
511 .entry_id
512 .and_then(|entry_id| snapshot.entry_for_id(entry_id))
513 .or_else(|| snapshot.entry_for_path(old_file.path.as_ref()));
514
515 let new_file = if let Some(entry) = snapshot_entry {
516 File {
517 disk_state: match entry.mtime {
518 Some(mtime) => DiskState::Present { mtime },
519 None => old_file.disk_state,
520 },
521 is_local: true,
522 entry_id: Some(entry.id),
523 path: entry.path.clone(),
524 worktree: worktree.clone(),
525 is_private: entry.is_private,
526 }
527 } else {
528 File {
529 disk_state: DiskState::Deleted,
530 is_local: true,
531 entry_id: old_file.entry_id,
532 path: old_file.path.clone(),
533 worktree: worktree.clone(),
534 is_private: old_file.is_private,
535 }
536 };
537
538 if new_file == *old_file {
539 return None;
540 }
541
542 let mut events = Vec::new();
543 if new_file.path != old_file.path {
544 local.local_buffer_ids_by_path.remove(&ProjectPath {
545 path: old_file.path.clone(),
546 worktree_id: old_file.worktree_id(cx),
547 });
548 local.local_buffer_ids_by_path.insert(
549 ProjectPath {
550 worktree_id: new_file.worktree_id(cx),
551 path: new_file.path.clone(),
552 },
553 buffer_id,
554 );
555 events.push(BufferStoreEvent::BufferChangedFilePath {
556 buffer: cx.entity(),
557 old_file: buffer.file().cloned(),
558 });
559 }
560
561 if new_file.entry_id != old_file.entry_id {
562 if let Some(entry_id) = old_file.entry_id {
563 local.local_buffer_ids_by_entry_id.remove(&entry_id);
564 }
565 if let Some(entry_id) = new_file.entry_id {
566 local
567 .local_buffer_ids_by_entry_id
568 .insert(entry_id, buffer_id);
569 }
570 }
571
572 if let Some((client, project_id)) = &this.downstream_client {
573 client
574 .send(proto::UpdateBufferFile {
575 project_id: *project_id,
576 buffer_id: buffer_id.to_proto(),
577 file: Some(new_file.to_proto(cx)),
578 })
579 .ok();
580 }
581
582 buffer.file_updated(Arc::new(new_file), cx);
583 Some(events)
584 })?;
585
586 for event in events {
587 cx.emit(event);
588 }
589
590 None
591 }
592
593 fn buffer_changed_file(&mut self, buffer: Entity<Buffer>, cx: &mut App) -> Option<()> {
594 let file = File::from_dyn(buffer.read(cx).file())?;
595
596 let remote_id = buffer.read(cx).remote_id();
597 if let Some(entry_id) = file.entry_id {
598 match self.local_buffer_ids_by_entry_id.get(&entry_id) {
599 Some(_) => {
600 return None;
601 }
602 None => {
603 self.local_buffer_ids_by_entry_id
604 .insert(entry_id, remote_id);
605 }
606 }
607 };
608 self.local_buffer_ids_by_path.insert(
609 ProjectPath {
610 worktree_id: file.worktree_id(cx),
611 path: file.path.clone(),
612 },
613 remote_id,
614 );
615
616 Some(())
617 }
618
619 fn save_buffer(
620 &self,
621 buffer: Entity<Buffer>,
622 cx: &mut Context<BufferStore>,
623 ) -> Task<Result<()>> {
624 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
625 return Task::ready(Err(anyhow!("buffer doesn't have a file")));
626 };
627 let worktree = file.worktree.clone();
628 self.save_local_buffer(buffer, worktree, file.path.clone(), false, cx)
629 }
630
631 fn save_buffer_as(
632 &self,
633 buffer: Entity<Buffer>,
634 path: ProjectPath,
635 cx: &mut Context<BufferStore>,
636 ) -> Task<Result<()>> {
637 let Some(worktree) = self
638 .worktree_store
639 .read(cx)
640 .worktree_for_id(path.worktree_id, cx)
641 else {
642 return Task::ready(Err(anyhow!("no such worktree")));
643 };
644 self.save_local_buffer(buffer, worktree, path.path.clone(), true, cx)
645 }
646
647 fn open_buffer(
648 &self,
649 path: Arc<Path>,
650 worktree: Entity<Worktree>,
651 cx: &mut Context<BufferStore>,
652 ) -> Task<Result<Entity<Buffer>>> {
653 let load_buffer = worktree.update(cx, |worktree, cx| {
654 let load_file = worktree.load_file(path.as_ref(), cx);
655 let reservation = cx.reserve_entity();
656 let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
657 cx.spawn(move |_, mut cx| async move {
658 let loaded = load_file.await?;
659 let text_buffer = cx
660 .background_spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
661 .await;
662 cx.insert_entity(reservation, |_| {
663 Buffer::build(text_buffer, Some(loaded.file), Capability::ReadWrite)
664 })
665 })
666 });
667
668 cx.spawn(move |this, mut cx| async move {
669 let buffer = match load_buffer.await {
670 Ok(buffer) => Ok(buffer),
671 Err(error) if is_not_found_error(&error) => cx.new(|cx| {
672 let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
673 let text_buffer = text::Buffer::new(0, buffer_id, "".into());
674 Buffer::build(
675 text_buffer,
676 Some(Arc::new(File {
677 worktree,
678 path,
679 disk_state: DiskState::New,
680 entry_id: None,
681 is_local: true,
682 is_private: false,
683 })),
684 Capability::ReadWrite,
685 )
686 }),
687 Err(e) => Err(e),
688 }?;
689 this.update(&mut cx, |this, cx| {
690 this.add_buffer(buffer.clone(), cx)?;
691 let buffer_id = buffer.read(cx).remote_id();
692 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
693 let this = this.as_local_mut().unwrap();
694 this.local_buffer_ids_by_path.insert(
695 ProjectPath {
696 worktree_id: file.worktree_id(cx),
697 path: file.path.clone(),
698 },
699 buffer_id,
700 );
701
702 if let Some(entry_id) = file.entry_id {
703 this.local_buffer_ids_by_entry_id
704 .insert(entry_id, buffer_id);
705 }
706 }
707
708 anyhow::Ok(())
709 })??;
710
711 Ok(buffer)
712 })
713 }
714
715 fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
716 cx.spawn(|buffer_store, mut cx| async move {
717 let buffer =
718 cx.new(|cx| Buffer::local("", cx).with_language(language::PLAIN_TEXT.clone(), cx))?;
719 buffer_store.update(&mut cx, |buffer_store, cx| {
720 buffer_store.add_buffer(buffer.clone(), cx).log_err();
721 })?;
722 Ok(buffer)
723 })
724 }
725
726 fn reload_buffers(
727 &self,
728 buffers: HashSet<Entity<Buffer>>,
729 push_to_history: bool,
730 cx: &mut Context<BufferStore>,
731 ) -> Task<Result<ProjectTransaction>> {
732 cx.spawn(move |_, mut cx| async move {
733 let mut project_transaction = ProjectTransaction::default();
734 for buffer in buffers {
735 let transaction = buffer
736 .update(&mut cx, |buffer, cx| buffer.reload(cx))?
737 .await?;
738 buffer.update(&mut cx, |buffer, cx| {
739 if let Some(transaction) = transaction {
740 if !push_to_history {
741 buffer.forget_transaction(transaction.id);
742 }
743 project_transaction.0.insert(cx.entity(), transaction);
744 }
745 })?;
746 }
747
748 Ok(project_transaction)
749 })
750 }
751}
752
753impl BufferStore {
754 pub fn init(client: &AnyProtoClient) {
755 client.add_entity_message_handler(Self::handle_buffer_reloaded);
756 client.add_entity_message_handler(Self::handle_buffer_saved);
757 client.add_entity_message_handler(Self::handle_update_buffer_file);
758 client.add_entity_request_handler(Self::handle_save_buffer);
759 client.add_entity_request_handler(Self::handle_blame_buffer);
760 client.add_entity_request_handler(Self::handle_reload_buffers);
761 client.add_entity_request_handler(Self::handle_get_permalink_to_line);
762 }
763
764 /// Creates a buffer store, optionally retaining its buffers.
765 pub fn local(worktree_store: Entity<WorktreeStore>, cx: &mut Context<Self>) -> Self {
766 Self {
767 state: BufferStoreState::Local(LocalBufferStore {
768 local_buffer_ids_by_path: Default::default(),
769 local_buffer_ids_by_entry_id: Default::default(),
770 worktree_store: worktree_store.clone(),
771 _subscription: cx.subscribe(&worktree_store, |this, _, event, cx| {
772 if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
773 let this = this.as_local_mut().unwrap();
774 this.subscribe_to_worktree(worktree, cx);
775 }
776 }),
777 }),
778 downstream_client: None,
779 opened_buffers: Default::default(),
780 shared_buffers: Default::default(),
781 loading_buffers: Default::default(),
782 worktree_store,
783 }
784 }
785
786 pub fn remote(
787 worktree_store: Entity<WorktreeStore>,
788 upstream_client: AnyProtoClient,
789 remote_id: u64,
790 _cx: &mut Context<Self>,
791 ) -> Self {
792 Self {
793 state: BufferStoreState::Remote(RemoteBufferStore {
794 shared_with_me: Default::default(),
795 loading_remote_buffers_by_id: Default::default(),
796 remote_buffer_listeners: Default::default(),
797 project_id: remote_id,
798 upstream_client,
799 worktree_store: worktree_store.clone(),
800 }),
801 downstream_client: None,
802 opened_buffers: Default::default(),
803 loading_buffers: Default::default(),
804 shared_buffers: Default::default(),
805 worktree_store,
806 }
807 }
808
809 fn as_local(&self) -> Option<&LocalBufferStore> {
810 match &self.state {
811 BufferStoreState::Local(state) => Some(state),
812 _ => None,
813 }
814 }
815
816 fn as_local_mut(&mut self) -> Option<&mut LocalBufferStore> {
817 match &mut self.state {
818 BufferStoreState::Local(state) => Some(state),
819 _ => None,
820 }
821 }
822
823 fn as_remote_mut(&mut self) -> Option<&mut RemoteBufferStore> {
824 match &mut self.state {
825 BufferStoreState::Remote(state) => Some(state),
826 _ => None,
827 }
828 }
829
830 fn as_remote(&self) -> Option<&RemoteBufferStore> {
831 match &self.state {
832 BufferStoreState::Remote(state) => Some(state),
833 _ => None,
834 }
835 }
836
837 pub fn open_buffer(
838 &mut self,
839 project_path: ProjectPath,
840 cx: &mut Context<Self>,
841 ) -> Task<Result<Entity<Buffer>>> {
842 if let Some(buffer) = self.get_by_path(&project_path, cx) {
843 cx.emit(BufferStoreEvent::BufferOpened {
844 buffer: buffer.clone(),
845 project_path,
846 });
847
848 return Task::ready(Ok(buffer));
849 }
850
851 let task = match self.loading_buffers.entry(project_path.clone()) {
852 hash_map::Entry::Occupied(e) => e.get().clone(),
853 hash_map::Entry::Vacant(entry) => {
854 let path = project_path.path.clone();
855 let Some(worktree) = self
856 .worktree_store
857 .read(cx)
858 .worktree_for_id(project_path.worktree_id, cx)
859 else {
860 return Task::ready(Err(anyhow!("no such worktree")));
861 };
862 let load_buffer = match &self.state {
863 BufferStoreState::Local(this) => this.open_buffer(path, worktree, cx),
864 BufferStoreState::Remote(this) => this.open_buffer(path, worktree, cx),
865 };
866
867 entry
868 .insert(
869 cx.spawn(move |this, mut cx| async move {
870 let load_result = load_buffer.await;
871 this.update(&mut cx, |this, cx| {
872 // Record the fact that the buffer is no longer loading.
873 this.loading_buffers.remove(&project_path);
874
875 let buffer = load_result.map_err(Arc::new)?;
876 cx.emit(BufferStoreEvent::BufferOpened {
877 buffer: buffer.clone(),
878 project_path,
879 });
880
881 Ok(buffer)
882 })?
883 })
884 .shared(),
885 )
886 .clone()
887 }
888 };
889
890 cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
891 }
892
893 pub(crate) fn worktree_for_buffer(
894 &self,
895 buffer: &Entity<Buffer>,
896 cx: &App,
897 ) -> Option<(Entity<Worktree>, Arc<Path>)> {
898 let file = buffer.read(cx).file()?;
899 let worktree_id = file.worktree_id(cx);
900 let path = file.path().clone();
901 let worktree = self
902 .worktree_store
903 .read(cx)
904 .worktree_for_id(worktree_id, cx)?;
905 Some((worktree, path))
906 }
907
908 pub fn create_buffer(&mut self, cx: &mut Context<Self>) -> Task<Result<Entity<Buffer>>> {
909 match &self.state {
910 BufferStoreState::Local(this) => this.create_buffer(cx),
911 BufferStoreState::Remote(this) => this.create_buffer(cx),
912 }
913 }
914
915 pub fn save_buffer(
916 &mut self,
917 buffer: Entity<Buffer>,
918 cx: &mut Context<Self>,
919 ) -> Task<Result<()>> {
920 match &mut self.state {
921 BufferStoreState::Local(this) => this.save_buffer(buffer, cx),
922 BufferStoreState::Remote(this) => this.save_remote_buffer(buffer.clone(), None, cx),
923 }
924 }
925
926 pub fn save_buffer_as(
927 &mut self,
928 buffer: Entity<Buffer>,
929 path: ProjectPath,
930 cx: &mut Context<Self>,
931 ) -> Task<Result<()>> {
932 let old_file = buffer.read(cx).file().cloned();
933 let task = match &self.state {
934 BufferStoreState::Local(this) => this.save_buffer_as(buffer.clone(), path, cx),
935 BufferStoreState::Remote(this) => {
936 this.save_remote_buffer(buffer.clone(), Some(path.to_proto()), cx)
937 }
938 };
939 cx.spawn(|this, mut cx| async move {
940 task.await?;
941 this.update(&mut cx, |_, cx| {
942 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
943 })
944 })
945 }
946
947 pub fn blame_buffer(
948 &self,
949 buffer: &Entity<Buffer>,
950 version: Option<clock::Global>,
951 cx: &App,
952 ) -> Task<Result<Option<Blame>>> {
953 let buffer = buffer.read(cx);
954 let Some(file) = File::from_dyn(buffer.file()) else {
955 return Task::ready(Err(anyhow!("buffer has no file")));
956 };
957
958 match file.worktree.clone().read(cx) {
959 Worktree::Local(worktree) => {
960 let worktree = worktree.snapshot();
961 let blame_params = maybe!({
962 let local_repo = match worktree.local_repo_for_path(&file.path) {
963 Some(repo_for_path) => repo_for_path,
964 None => return Ok(None),
965 };
966
967 let relative_path = local_repo
968 .relativize(&file.path)
969 .context("failed to relativize buffer path")?;
970
971 let repo = local_repo.repo().clone();
972
973 let content = match version {
974 Some(version) => buffer.rope_for_version(&version).clone(),
975 None => buffer.as_rope().clone(),
976 };
977
978 anyhow::Ok(Some((repo, relative_path, content)))
979 });
980
981 cx.spawn(|cx| async move {
982 let Some((repo, relative_path, content)) = blame_params? else {
983 return Ok(None);
984 };
985 repo.blame(relative_path.clone(), content, cx)
986 .await
987 .with_context(|| format!("Failed to blame {:?}", relative_path.0))
988 .map(Some)
989 })
990 }
991 Worktree::Remote(worktree) => {
992 let buffer_id = buffer.remote_id();
993 let version = buffer.version();
994 let project_id = worktree.project_id();
995 let client = worktree.client();
996 cx.spawn(|_| async move {
997 let response = client
998 .request(proto::BlameBuffer {
999 project_id,
1000 buffer_id: buffer_id.into(),
1001 version: serialize_version(&version),
1002 })
1003 .await?;
1004 Ok(deserialize_blame_buffer_response(response))
1005 })
1006 }
1007 }
1008 }
1009
1010 pub fn get_permalink_to_line(
1011 &self,
1012 buffer: &Entity<Buffer>,
1013 selection: Range<u32>,
1014 cx: &App,
1015 ) -> Task<Result<url::Url>> {
1016 let buffer = buffer.read(cx);
1017 let Some(file) = File::from_dyn(buffer.file()) else {
1018 return Task::ready(Err(anyhow!("buffer has no file")));
1019 };
1020
1021 match file.worktree.read(cx) {
1022 Worktree::Local(worktree) => {
1023 let worktree_path = worktree.abs_path().clone();
1024 let Some((repo_entry, repo)) =
1025 worktree.repository_for_path(file.path()).and_then(|entry| {
1026 let repo = worktree.get_local_repo(&entry)?.repo().clone();
1027 Some((entry, repo))
1028 })
1029 else {
1030 // If we're not in a Git repo, check whether this is a Rust source
1031 // file in the Cargo registry (presumably opened with go-to-definition
1032 // from a normal Rust file). If so, we can put together a permalink
1033 // using crate metadata.
1034 if buffer
1035 .language()
1036 .is_none_or(|lang| lang.name() != "Rust".into())
1037 {
1038 return Task::ready(Err(anyhow!("no permalink available")));
1039 }
1040 let file_path = worktree_path.join(file.path());
1041 return cx.spawn(|cx| async move {
1042 let provider_registry =
1043 cx.update(GitHostingProviderRegistry::default_global)?;
1044 get_permalink_in_rust_registry_src(provider_registry, file_path, selection)
1045 .map_err(|_| anyhow!("no permalink available"))
1046 });
1047 };
1048
1049 let path = match repo_entry.relativize(file.path()) {
1050 Ok(RepoPath(path)) => path,
1051 Err(e) => return Task::ready(Err(e)),
1052 };
1053
1054 let remote = repo_entry
1055 .branch()
1056 .and_then(|b| b.upstream.as_ref())
1057 .and_then(|b| b.remote_name())
1058 .unwrap_or("origin")
1059 .to_string();
1060
1061 cx.spawn(|cx| async move {
1062 let origin_url = repo
1063 .remote_url(&remote)
1064 .ok_or_else(|| anyhow!("remote \"{remote}\" not found"))?;
1065
1066 let sha = repo
1067 .head_sha()
1068 .ok_or_else(|| anyhow!("failed to read HEAD SHA"))?;
1069
1070 let provider_registry =
1071 cx.update(GitHostingProviderRegistry::default_global)?;
1072
1073 let (provider, remote) =
1074 parse_git_remote_url(provider_registry, &origin_url)
1075 .ok_or_else(|| anyhow!("failed to parse Git remote URL"))?;
1076
1077 let path = path
1078 .to_str()
1079 .ok_or_else(|| anyhow!("failed to convert path to string"))?;
1080
1081 Ok(provider.build_permalink(
1082 remote,
1083 BuildPermalinkParams {
1084 sha: &sha,
1085 path,
1086 selection: Some(selection),
1087 },
1088 ))
1089 })
1090 }
1091 Worktree::Remote(worktree) => {
1092 let buffer_id = buffer.remote_id();
1093 let project_id = worktree.project_id();
1094 let client = worktree.client();
1095 cx.spawn(|_| async move {
1096 let response = client
1097 .request(proto::GetPermalinkToLine {
1098 project_id,
1099 buffer_id: buffer_id.into(),
1100 selection: Some(proto::Range {
1101 start: selection.start as u64,
1102 end: selection.end as u64,
1103 }),
1104 })
1105 .await?;
1106
1107 url::Url::parse(&response.permalink).context("failed to parse permalink")
1108 })
1109 }
1110 }
1111 }
1112
1113 fn add_buffer(&mut self, buffer_entity: Entity<Buffer>, cx: &mut Context<Self>) -> Result<()> {
1114 let buffer = buffer_entity.read(cx);
1115 let remote_id = buffer.remote_id();
1116 let is_remote = buffer.replica_id() != 0;
1117 let open_buffer = OpenBuffer::Complete {
1118 buffer: buffer_entity.downgrade(),
1119 };
1120
1121 let handle = cx.entity().downgrade();
1122 buffer_entity.update(cx, move |_, cx| {
1123 cx.on_release(move |buffer, cx| {
1124 handle
1125 .update(cx, |_, cx| {
1126 cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
1127 })
1128 .ok();
1129 })
1130 .detach()
1131 });
1132
1133 match self.opened_buffers.entry(remote_id) {
1134 hash_map::Entry::Vacant(entry) => {
1135 entry.insert(open_buffer);
1136 }
1137 hash_map::Entry::Occupied(mut entry) => {
1138 if let OpenBuffer::Operations(operations) = entry.get_mut() {
1139 buffer_entity.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
1140 } else if entry.get().upgrade().is_some() {
1141 if is_remote {
1142 return Ok(());
1143 } else {
1144 debug_panic!("buffer {} was already registered", remote_id);
1145 Err(anyhow!("buffer {} was already registered", remote_id))?;
1146 }
1147 }
1148 entry.insert(open_buffer);
1149 }
1150 }
1151
1152 cx.subscribe(&buffer_entity, Self::on_buffer_event).detach();
1153 cx.emit(BufferStoreEvent::BufferAdded(buffer_entity));
1154 Ok(())
1155 }
1156
1157 pub fn buffers(&self) -> impl '_ + Iterator<Item = Entity<Buffer>> {
1158 self.opened_buffers
1159 .values()
1160 .filter_map(|buffer| buffer.upgrade())
1161 }
1162
1163 pub fn loading_buffers(
1164 &self,
1165 ) -> impl Iterator<Item = (&ProjectPath, impl Future<Output = Result<Entity<Buffer>>>)> {
1166 self.loading_buffers.iter().map(|(path, task)| {
1167 let task = task.clone();
1168 (path, async move { task.await.map_err(|e| anyhow!("{e}")) })
1169 })
1170 }
1171
1172 pub fn buffer_id_for_project_path(&self, project_path: &ProjectPath) -> Option<&BufferId> {
1173 self.as_local()
1174 .and_then(|state| state.local_buffer_ids_by_path.get(project_path))
1175 }
1176
1177 pub fn get_by_path(&self, path: &ProjectPath, cx: &App) -> Option<Entity<Buffer>> {
1178 self.buffers().find_map(|buffer| {
1179 let file = File::from_dyn(buffer.read(cx).file())?;
1180 if file.worktree_id(cx) == path.worktree_id && file.path == path.path {
1181 Some(buffer)
1182 } else {
1183 None
1184 }
1185 })
1186 }
1187
1188 pub fn get(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1189 self.opened_buffers.get(&buffer_id)?.upgrade()
1190 }
1191
1192 pub fn get_existing(&self, buffer_id: BufferId) -> Result<Entity<Buffer>> {
1193 self.get(buffer_id)
1194 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
1195 }
1196
1197 pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1198 self.get(buffer_id).or_else(|| {
1199 self.as_remote()
1200 .and_then(|remote| remote.loading_remote_buffers_by_id.get(&buffer_id).cloned())
1201 })
1202 }
1203
1204 pub fn buffer_version_info(&self, cx: &App) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
1205 let buffers = self
1206 .buffers()
1207 .map(|buffer| {
1208 let buffer = buffer.read(cx);
1209 proto::BufferVersion {
1210 id: buffer.remote_id().into(),
1211 version: language::proto::serialize_version(&buffer.version),
1212 }
1213 })
1214 .collect();
1215 let incomplete_buffer_ids = self
1216 .as_remote()
1217 .map(|remote| remote.incomplete_buffer_ids())
1218 .unwrap_or_default();
1219 (buffers, incomplete_buffer_ids)
1220 }
1221
1222 pub fn disconnected_from_host(&mut self, cx: &mut App) {
1223 for open_buffer in self.opened_buffers.values_mut() {
1224 if let Some(buffer) = open_buffer.upgrade() {
1225 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1226 }
1227 }
1228
1229 for buffer in self.buffers() {
1230 buffer.update(cx, |buffer, cx| {
1231 buffer.set_capability(Capability::ReadOnly, cx)
1232 });
1233 }
1234
1235 if let Some(remote) = self.as_remote_mut() {
1236 // Wake up all futures currently waiting on a buffer to get opened,
1237 // to give them a chance to fail now that we've disconnected.
1238 remote.remote_buffer_listeners.clear()
1239 }
1240 }
1241
1242 pub fn shared(&mut self, remote_id: u64, downstream_client: AnyProtoClient, _cx: &mut App) {
1243 self.downstream_client = Some((downstream_client, remote_id));
1244 }
1245
1246 pub fn unshared(&mut self, _cx: &mut Context<Self>) {
1247 self.downstream_client.take();
1248 self.forget_shared_buffers();
1249 }
1250
1251 pub fn discard_incomplete(&mut self) {
1252 self.opened_buffers
1253 .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
1254 }
1255
1256 pub fn find_search_candidates(
1257 &mut self,
1258 query: &SearchQuery,
1259 mut limit: usize,
1260 fs: Arc<dyn Fs>,
1261 cx: &mut Context<Self>,
1262 ) -> Receiver<Entity<Buffer>> {
1263 let (tx, rx) = smol::channel::unbounded();
1264 let mut open_buffers = HashSet::default();
1265 let mut unnamed_buffers = Vec::new();
1266 for handle in self.buffers() {
1267 let buffer = handle.read(cx);
1268 if let Some(entry_id) = buffer.entry_id(cx) {
1269 open_buffers.insert(entry_id);
1270 } else {
1271 limit = limit.saturating_sub(1);
1272 unnamed_buffers.push(handle)
1273 };
1274 }
1275
1276 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
1277 let project_paths_rx = self
1278 .worktree_store
1279 .update(cx, |worktree_store, cx| {
1280 worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
1281 })
1282 .chunks(MAX_CONCURRENT_BUFFER_OPENS);
1283
1284 cx.spawn(|this, mut cx| async move {
1285 for buffer in unnamed_buffers {
1286 tx.send(buffer).await.ok();
1287 }
1288
1289 let mut project_paths_rx = pin!(project_paths_rx);
1290 while let Some(project_paths) = project_paths_rx.next().await {
1291 let buffers = this.update(&mut cx, |this, cx| {
1292 project_paths
1293 .into_iter()
1294 .map(|project_path| this.open_buffer(project_path, cx))
1295 .collect::<Vec<_>>()
1296 })?;
1297 for buffer_task in buffers {
1298 if let Some(buffer) = buffer_task.await.log_err() {
1299 if tx.send(buffer).await.is_err() {
1300 return anyhow::Ok(());
1301 }
1302 }
1303 }
1304 }
1305 anyhow::Ok(())
1306 })
1307 .detach();
1308 rx
1309 }
1310
1311 fn on_buffer_event(
1312 &mut self,
1313 buffer: Entity<Buffer>,
1314 event: &BufferEvent,
1315 cx: &mut Context<Self>,
1316 ) {
1317 match event {
1318 BufferEvent::FileHandleChanged => {
1319 if let Some(local) = self.as_local_mut() {
1320 local.buffer_changed_file(buffer, cx);
1321 }
1322 }
1323 BufferEvent::Reloaded => {
1324 let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
1325 return;
1326 };
1327 let buffer = buffer.read(cx);
1328 downstream_client
1329 .send(proto::BufferReloaded {
1330 project_id: *project_id,
1331 buffer_id: buffer.remote_id().to_proto(),
1332 version: serialize_version(&buffer.version()),
1333 mtime: buffer.saved_mtime().map(|t| t.into()),
1334 line_ending: serialize_line_ending(buffer.line_ending()) as i32,
1335 })
1336 .log_err();
1337 }
1338 BufferEvent::LanguageChanged => {}
1339 _ => {}
1340 }
1341 }
1342
1343 pub async fn handle_update_buffer(
1344 this: Entity<Self>,
1345 envelope: TypedEnvelope<proto::UpdateBuffer>,
1346 mut cx: AsyncApp,
1347 ) -> Result<proto::Ack> {
1348 let payload = envelope.payload.clone();
1349 let buffer_id = BufferId::new(payload.buffer_id)?;
1350 let ops = payload
1351 .operations
1352 .into_iter()
1353 .map(language::proto::deserialize_operation)
1354 .collect::<Result<Vec<_>, _>>()?;
1355 this.update(&mut cx, |this, cx| {
1356 match this.opened_buffers.entry(buffer_id) {
1357 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
1358 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
1359 OpenBuffer::Complete { buffer, .. } => {
1360 if let Some(buffer) = buffer.upgrade() {
1361 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
1362 }
1363 }
1364 },
1365 hash_map::Entry::Vacant(e) => {
1366 e.insert(OpenBuffer::Operations(ops));
1367 }
1368 }
1369 Ok(proto::Ack {})
1370 })?
1371 }
1372
1373 pub fn register_shared_lsp_handle(
1374 &mut self,
1375 peer_id: proto::PeerId,
1376 buffer_id: BufferId,
1377 handle: OpenLspBufferHandle,
1378 ) {
1379 if let Some(shared_buffers) = self.shared_buffers.get_mut(&peer_id) {
1380 if let Some(buffer) = shared_buffers.get_mut(&buffer_id) {
1381 buffer.lsp_handle = Some(handle);
1382 return;
1383 }
1384 }
1385 debug_panic!("tried to register shared lsp handle, but buffer was not shared")
1386 }
1387
1388 pub fn handle_synchronize_buffers(
1389 &mut self,
1390 envelope: TypedEnvelope<proto::SynchronizeBuffers>,
1391 cx: &mut Context<Self>,
1392 client: Arc<Client>,
1393 ) -> Result<proto::SynchronizeBuffersResponse> {
1394 let project_id = envelope.payload.project_id;
1395 let mut response = proto::SynchronizeBuffersResponse {
1396 buffers: Default::default(),
1397 };
1398 let Some(guest_id) = envelope.original_sender_id else {
1399 anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
1400 };
1401
1402 self.shared_buffers.entry(guest_id).or_default().clear();
1403 for buffer in envelope.payload.buffers {
1404 let buffer_id = BufferId::new(buffer.id)?;
1405 let remote_version = language::proto::deserialize_version(&buffer.version);
1406 if let Some(buffer) = self.get(buffer_id) {
1407 self.shared_buffers
1408 .entry(guest_id)
1409 .or_default()
1410 .entry(buffer_id)
1411 .or_insert_with(|| SharedBuffer {
1412 buffer: buffer.clone(),
1413 lsp_handle: None,
1414 });
1415
1416 let buffer = buffer.read(cx);
1417 response.buffers.push(proto::BufferVersion {
1418 id: buffer_id.into(),
1419 version: language::proto::serialize_version(&buffer.version),
1420 });
1421
1422 let operations = buffer.serialize_ops(Some(remote_version), cx);
1423 let client = client.clone();
1424 if let Some(file) = buffer.file() {
1425 client
1426 .send(proto::UpdateBufferFile {
1427 project_id,
1428 buffer_id: buffer_id.into(),
1429 file: Some(file.to_proto(cx)),
1430 })
1431 .log_err();
1432 }
1433
1434 // TODO(max): do something
1435 // client
1436 // .send(proto::UpdateStagedText {
1437 // project_id,
1438 // buffer_id: buffer_id.into(),
1439 // diff_base: buffer.diff_base().map(ToString::to_string),
1440 // })
1441 // .log_err();
1442
1443 client
1444 .send(proto::BufferReloaded {
1445 project_id,
1446 buffer_id: buffer_id.into(),
1447 version: language::proto::serialize_version(buffer.saved_version()),
1448 mtime: buffer.saved_mtime().map(|time| time.into()),
1449 line_ending: language::proto::serialize_line_ending(buffer.line_ending())
1450 as i32,
1451 })
1452 .log_err();
1453
1454 cx.background_spawn(
1455 async move {
1456 let operations = operations.await;
1457 for chunk in split_operations(operations) {
1458 client
1459 .request(proto::UpdateBuffer {
1460 project_id,
1461 buffer_id: buffer_id.into(),
1462 operations: chunk,
1463 })
1464 .await?;
1465 }
1466 anyhow::Ok(())
1467 }
1468 .log_err(),
1469 )
1470 .detach();
1471 }
1472 }
1473 Ok(response)
1474 }
1475
1476 pub fn handle_create_buffer_for_peer(
1477 &mut self,
1478 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
1479 replica_id: u16,
1480 capability: Capability,
1481 cx: &mut Context<Self>,
1482 ) -> Result<()> {
1483 let Some(remote) = self.as_remote_mut() else {
1484 return Err(anyhow!("buffer store is not a remote"));
1485 };
1486
1487 if let Some(buffer) =
1488 remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx)?
1489 {
1490 self.add_buffer(buffer, cx)?;
1491 }
1492
1493 Ok(())
1494 }
1495
1496 pub async fn handle_update_buffer_file(
1497 this: Entity<Self>,
1498 envelope: TypedEnvelope<proto::UpdateBufferFile>,
1499 mut cx: AsyncApp,
1500 ) -> Result<()> {
1501 let buffer_id = envelope.payload.buffer_id;
1502 let buffer_id = BufferId::new(buffer_id)?;
1503
1504 this.update(&mut cx, |this, cx| {
1505 let payload = envelope.payload.clone();
1506 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1507 let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
1508 let worktree = this
1509 .worktree_store
1510 .read(cx)
1511 .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
1512 .ok_or_else(|| anyhow!("no such worktree"))?;
1513 let file = File::from_proto(file, worktree, cx)?;
1514 let old_file = buffer.update(cx, |buffer, cx| {
1515 let old_file = buffer.file().cloned();
1516 let new_path = file.path.clone();
1517 buffer.file_updated(Arc::new(file), cx);
1518 if old_file
1519 .as_ref()
1520 .map_or(true, |old| *old.path() != new_path)
1521 {
1522 Some(old_file)
1523 } else {
1524 None
1525 }
1526 });
1527 if let Some(old_file) = old_file {
1528 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1529 }
1530 }
1531 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1532 downstream_client
1533 .send(proto::UpdateBufferFile {
1534 project_id: *project_id,
1535 buffer_id: buffer_id.into(),
1536 file: envelope.payload.file,
1537 })
1538 .log_err();
1539 }
1540 Ok(())
1541 })?
1542 }
1543
1544 pub async fn handle_save_buffer(
1545 this: Entity<Self>,
1546 envelope: TypedEnvelope<proto::SaveBuffer>,
1547 mut cx: AsyncApp,
1548 ) -> Result<proto::BufferSaved> {
1549 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1550 let (buffer, project_id) = this.update(&mut cx, |this, _| {
1551 anyhow::Ok((
1552 this.get_existing(buffer_id)?,
1553 this.downstream_client
1554 .as_ref()
1555 .map(|(_, project_id)| *project_id)
1556 .context("project is not shared")?,
1557 ))
1558 })??;
1559 buffer
1560 .update(&mut cx, |buffer, _| {
1561 buffer.wait_for_version(deserialize_version(&envelope.payload.version))
1562 })?
1563 .await?;
1564 let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
1565
1566 if let Some(new_path) = envelope.payload.new_path {
1567 let new_path = ProjectPath::from_proto(new_path);
1568 this.update(&mut cx, |this, cx| {
1569 this.save_buffer_as(buffer.clone(), new_path, cx)
1570 })?
1571 .await?;
1572 } else {
1573 this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
1574 .await?;
1575 }
1576
1577 buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
1578 project_id,
1579 buffer_id: buffer_id.into(),
1580 version: serialize_version(buffer.saved_version()),
1581 mtime: buffer.saved_mtime().map(|time| time.into()),
1582 })
1583 }
1584
1585 pub async fn handle_close_buffer(
1586 this: Entity<Self>,
1587 envelope: TypedEnvelope<proto::CloseBuffer>,
1588 mut cx: AsyncApp,
1589 ) -> Result<()> {
1590 let peer_id = envelope.sender_id;
1591 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1592 this.update(&mut cx, |this, cx| {
1593 if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
1594 if shared.remove(&buffer_id).is_some() {
1595 cx.emit(BufferStoreEvent::SharedBufferClosed(peer_id, buffer_id));
1596 if shared.is_empty() {
1597 this.shared_buffers.remove(&peer_id);
1598 }
1599 return;
1600 }
1601 }
1602 debug_panic!(
1603 "peer_id {} closed buffer_id {} which was either not open or already closed",
1604 peer_id,
1605 buffer_id
1606 )
1607 })
1608 }
1609
1610 pub async fn handle_buffer_saved(
1611 this: Entity<Self>,
1612 envelope: TypedEnvelope<proto::BufferSaved>,
1613 mut cx: AsyncApp,
1614 ) -> Result<()> {
1615 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1616 let version = deserialize_version(&envelope.payload.version);
1617 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1618 this.update(&mut cx, move |this, cx| {
1619 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1620 buffer.update(cx, |buffer, cx| {
1621 buffer.did_save(version, mtime, cx);
1622 });
1623 }
1624
1625 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1626 downstream_client
1627 .send(proto::BufferSaved {
1628 project_id: *project_id,
1629 buffer_id: buffer_id.into(),
1630 mtime: envelope.payload.mtime,
1631 version: envelope.payload.version,
1632 })
1633 .log_err();
1634 }
1635 })
1636 }
1637
1638 pub async fn handle_buffer_reloaded(
1639 this: Entity<Self>,
1640 envelope: TypedEnvelope<proto::BufferReloaded>,
1641 mut cx: AsyncApp,
1642 ) -> Result<()> {
1643 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1644 let version = deserialize_version(&envelope.payload.version);
1645 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1646 let line_ending = deserialize_line_ending(
1647 proto::LineEnding::from_i32(envelope.payload.line_ending)
1648 .ok_or_else(|| anyhow!("missing line ending"))?,
1649 );
1650 this.update(&mut cx, |this, cx| {
1651 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1652 buffer.update(cx, |buffer, cx| {
1653 buffer.did_reload(version, line_ending, mtime, cx);
1654 });
1655 }
1656
1657 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1658 downstream_client
1659 .send(proto::BufferReloaded {
1660 project_id: *project_id,
1661 buffer_id: buffer_id.into(),
1662 mtime: envelope.payload.mtime,
1663 version: envelope.payload.version,
1664 line_ending: envelope.payload.line_ending,
1665 })
1666 .log_err();
1667 }
1668 })
1669 }
1670
1671 pub async fn handle_blame_buffer(
1672 this: Entity<Self>,
1673 envelope: TypedEnvelope<proto::BlameBuffer>,
1674 mut cx: AsyncApp,
1675 ) -> Result<proto::BlameBufferResponse> {
1676 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1677 let version = deserialize_version(&envelope.payload.version);
1678 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1679 buffer
1680 .update(&mut cx, |buffer, _| {
1681 buffer.wait_for_version(version.clone())
1682 })?
1683 .await?;
1684 let blame = this
1685 .update(&mut cx, |this, cx| {
1686 this.blame_buffer(&buffer, Some(version), cx)
1687 })?
1688 .await?;
1689 Ok(serialize_blame_buffer_response(blame))
1690 }
1691
1692 pub async fn handle_get_permalink_to_line(
1693 this: Entity<Self>,
1694 envelope: TypedEnvelope<proto::GetPermalinkToLine>,
1695 mut cx: AsyncApp,
1696 ) -> Result<proto::GetPermalinkToLineResponse> {
1697 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1698 // let version = deserialize_version(&envelope.payload.version);
1699 let selection = {
1700 let proto_selection = envelope
1701 .payload
1702 .selection
1703 .context("no selection to get permalink for defined")?;
1704 proto_selection.start as u32..proto_selection.end as u32
1705 };
1706 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1707 let permalink = this
1708 .update(&mut cx, |this, cx| {
1709 this.get_permalink_to_line(&buffer, selection, cx)
1710 })?
1711 .await?;
1712 Ok(proto::GetPermalinkToLineResponse {
1713 permalink: permalink.to_string(),
1714 })
1715 }
1716
1717 pub fn reload_buffers(
1718 &self,
1719 buffers: HashSet<Entity<Buffer>>,
1720 push_to_history: bool,
1721 cx: &mut Context<Self>,
1722 ) -> Task<Result<ProjectTransaction>> {
1723 if buffers.is_empty() {
1724 return Task::ready(Ok(ProjectTransaction::default()));
1725 }
1726 match &self.state {
1727 BufferStoreState::Local(this) => this.reload_buffers(buffers, push_to_history, cx),
1728 BufferStoreState::Remote(this) => this.reload_buffers(buffers, push_to_history, cx),
1729 }
1730 }
1731
1732 async fn handle_reload_buffers(
1733 this: Entity<Self>,
1734 envelope: TypedEnvelope<proto::ReloadBuffers>,
1735 mut cx: AsyncApp,
1736 ) -> Result<proto::ReloadBuffersResponse> {
1737 let sender_id = envelope.original_sender_id().unwrap_or_default();
1738 let reload = this.update(&mut cx, |this, cx| {
1739 let mut buffers = HashSet::default();
1740 for buffer_id in &envelope.payload.buffer_ids {
1741 let buffer_id = BufferId::new(*buffer_id)?;
1742 buffers.insert(this.get_existing(buffer_id)?);
1743 }
1744 Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
1745 })??;
1746
1747 let project_transaction = reload.await?;
1748 let project_transaction = this.update(&mut cx, |this, cx| {
1749 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
1750 })?;
1751 Ok(proto::ReloadBuffersResponse {
1752 transaction: Some(project_transaction),
1753 })
1754 }
1755
1756 pub fn create_buffer_for_peer(
1757 &mut self,
1758 buffer: &Entity<Buffer>,
1759 peer_id: proto::PeerId,
1760 cx: &mut Context<Self>,
1761 ) -> Task<Result<()>> {
1762 let buffer_id = buffer.read(cx).remote_id();
1763 let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
1764 if shared_buffers.contains_key(&buffer_id) {
1765 return Task::ready(Ok(()));
1766 }
1767 shared_buffers.insert(
1768 buffer_id,
1769 SharedBuffer {
1770 buffer: buffer.clone(),
1771 lsp_handle: None,
1772 },
1773 );
1774
1775 let Some((client, project_id)) = self.downstream_client.clone() else {
1776 return Task::ready(Ok(()));
1777 };
1778
1779 cx.spawn(|this, mut cx| async move {
1780 let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else {
1781 return anyhow::Ok(());
1782 };
1783
1784 let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
1785 let operations = operations.await;
1786 let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?;
1787
1788 let initial_state = proto::CreateBufferForPeer {
1789 project_id,
1790 peer_id: Some(peer_id),
1791 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
1792 };
1793
1794 if client.send(initial_state).log_err().is_some() {
1795 let client = client.clone();
1796 cx.background_spawn(async move {
1797 let mut chunks = split_operations(operations).peekable();
1798 while let Some(chunk) = chunks.next() {
1799 let is_last = chunks.peek().is_none();
1800 client.send(proto::CreateBufferForPeer {
1801 project_id,
1802 peer_id: Some(peer_id),
1803 variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
1804 proto::BufferChunk {
1805 buffer_id: buffer_id.into(),
1806 operations: chunk,
1807 is_last,
1808 },
1809 )),
1810 })?;
1811 }
1812 anyhow::Ok(())
1813 })
1814 .await
1815 .log_err();
1816 }
1817 Ok(())
1818 })
1819 }
1820
1821 pub fn forget_shared_buffers(&mut self) {
1822 self.shared_buffers.clear();
1823 }
1824
1825 pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
1826 self.shared_buffers.remove(peer_id);
1827 }
1828
1829 pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
1830 if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
1831 self.shared_buffers.insert(new_peer_id, buffers);
1832 }
1833 }
1834
1835 pub fn has_shared_buffers(&self) -> bool {
1836 !self.shared_buffers.is_empty()
1837 }
1838
1839 pub fn create_local_buffer(
1840 &mut self,
1841 text: &str,
1842 language: Option<Arc<Language>>,
1843 cx: &mut Context<Self>,
1844 ) -> Entity<Buffer> {
1845 let buffer = cx.new(|cx| {
1846 Buffer::local(text, cx)
1847 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
1848 });
1849
1850 self.add_buffer(buffer.clone(), cx).log_err();
1851 let buffer_id = buffer.read(cx).remote_id();
1852
1853 let this = self
1854 .as_local_mut()
1855 .expect("local-only method called in a non-local context");
1856 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
1857 this.local_buffer_ids_by_path.insert(
1858 ProjectPath {
1859 worktree_id: file.worktree_id(cx),
1860 path: file.path.clone(),
1861 },
1862 buffer_id,
1863 );
1864
1865 if let Some(entry_id) = file.entry_id {
1866 this.local_buffer_ids_by_entry_id
1867 .insert(entry_id, buffer_id);
1868 }
1869 }
1870 buffer
1871 }
1872
1873 pub fn deserialize_project_transaction(
1874 &mut self,
1875 message: proto::ProjectTransaction,
1876 push_to_history: bool,
1877 cx: &mut Context<Self>,
1878 ) -> Task<Result<ProjectTransaction>> {
1879 if let Some(this) = self.as_remote_mut() {
1880 this.deserialize_project_transaction(message, push_to_history, cx)
1881 } else {
1882 debug_panic!("not a remote buffer store");
1883 Task::ready(Err(anyhow!("not a remote buffer store")))
1884 }
1885 }
1886
1887 pub fn wait_for_remote_buffer(
1888 &mut self,
1889 id: BufferId,
1890 cx: &mut Context<BufferStore>,
1891 ) -> Task<Result<Entity<Buffer>>> {
1892 if let Some(this) = self.as_remote_mut() {
1893 this.wait_for_remote_buffer(id, cx)
1894 } else {
1895 debug_panic!("not a remote buffer store");
1896 Task::ready(Err(anyhow!("not a remote buffer store")))
1897 }
1898 }
1899
1900 pub fn serialize_project_transaction_for_peer(
1901 &mut self,
1902 project_transaction: ProjectTransaction,
1903 peer_id: proto::PeerId,
1904 cx: &mut Context<Self>,
1905 ) -> proto::ProjectTransaction {
1906 let mut serialized_transaction = proto::ProjectTransaction {
1907 buffer_ids: Default::default(),
1908 transactions: Default::default(),
1909 };
1910 for (buffer, transaction) in project_transaction.0 {
1911 self.create_buffer_for_peer(&buffer, peer_id, cx)
1912 .detach_and_log_err(cx);
1913 serialized_transaction
1914 .buffer_ids
1915 .push(buffer.read(cx).remote_id().into());
1916 serialized_transaction
1917 .transactions
1918 .push(language::proto::serialize_transaction(&transaction));
1919 }
1920 serialized_transaction
1921 }
1922}
1923
1924impl OpenBuffer {
1925 fn upgrade(&self) -> Option<Entity<Buffer>> {
1926 match self {
1927 OpenBuffer::Complete { buffer, .. } => buffer.upgrade(),
1928 OpenBuffer::Operations(_) => None,
1929 }
1930 }
1931}
1932
1933fn is_not_found_error(error: &anyhow::Error) -> bool {
1934 error
1935 .root_cause()
1936 .downcast_ref::<io::Error>()
1937 .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
1938}
1939
1940fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
1941 let Some(blame) = blame else {
1942 return proto::BlameBufferResponse {
1943 blame_response: None,
1944 };
1945 };
1946
1947 let entries = blame
1948 .entries
1949 .into_iter()
1950 .map(|entry| proto::BlameEntry {
1951 sha: entry.sha.as_bytes().into(),
1952 start_line: entry.range.start,
1953 end_line: entry.range.end,
1954 original_line_number: entry.original_line_number,
1955 author: entry.author.clone(),
1956 author_mail: entry.author_mail.clone(),
1957 author_time: entry.author_time,
1958 author_tz: entry.author_tz.clone(),
1959 committer: entry.committer_name.clone(),
1960 committer_mail: entry.committer_email.clone(),
1961 committer_time: entry.committer_time,
1962 committer_tz: entry.committer_tz.clone(),
1963 summary: entry.summary.clone(),
1964 previous: entry.previous.clone(),
1965 filename: entry.filename.clone(),
1966 })
1967 .collect::<Vec<_>>();
1968
1969 let messages = blame
1970 .messages
1971 .into_iter()
1972 .map(|(oid, message)| proto::CommitMessage {
1973 oid: oid.as_bytes().into(),
1974 message,
1975 })
1976 .collect::<Vec<_>>();
1977
1978 proto::BlameBufferResponse {
1979 blame_response: Some(proto::blame_buffer_response::BlameResponse {
1980 entries,
1981 messages,
1982 remote_url: blame.remote_url,
1983 }),
1984 }
1985}
1986
1987fn deserialize_blame_buffer_response(
1988 response: proto::BlameBufferResponse,
1989) -> Option<git::blame::Blame> {
1990 let response = response.blame_response?;
1991 let entries = response
1992 .entries
1993 .into_iter()
1994 .filter_map(|entry| {
1995 Some(git::blame::BlameEntry {
1996 sha: git::Oid::from_bytes(&entry.sha).ok()?,
1997 range: entry.start_line..entry.end_line,
1998 original_line_number: entry.original_line_number,
1999 committer_name: entry.committer,
2000 committer_time: entry.committer_time,
2001 committer_tz: entry.committer_tz,
2002 committer_email: entry.committer_mail,
2003 author: entry.author,
2004 author_mail: entry.author_mail,
2005 author_time: entry.author_time,
2006 author_tz: entry.author_tz,
2007 summary: entry.summary,
2008 previous: entry.previous,
2009 filename: entry.filename,
2010 })
2011 })
2012 .collect::<Vec<_>>();
2013
2014 let messages = response
2015 .messages
2016 .into_iter()
2017 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
2018 .collect::<HashMap<_, _>>();
2019
2020 Some(Blame {
2021 entries,
2022 messages,
2023 remote_url: response.remote_url,
2024 })
2025}
2026
2027fn get_permalink_in_rust_registry_src(
2028 provider_registry: Arc<GitHostingProviderRegistry>,
2029 path: PathBuf,
2030 selection: Range<u32>,
2031) -> Result<url::Url> {
2032 #[derive(Deserialize)]
2033 struct CargoVcsGit {
2034 sha1: String,
2035 }
2036
2037 #[derive(Deserialize)]
2038 struct CargoVcsInfo {
2039 git: CargoVcsGit,
2040 path_in_vcs: String,
2041 }
2042
2043 #[derive(Deserialize)]
2044 struct CargoPackage {
2045 repository: String,
2046 }
2047
2048 #[derive(Deserialize)]
2049 struct CargoToml {
2050 package: CargoPackage,
2051 }
2052
2053 let Some((dir, cargo_vcs_info_json)) = path.ancestors().skip(1).find_map(|dir| {
2054 let json = std::fs::read_to_string(dir.join(".cargo_vcs_info.json")).ok()?;
2055 Some((dir, json))
2056 }) else {
2057 bail!("No .cargo_vcs_info.json found in parent directories")
2058 };
2059 let cargo_vcs_info = serde_json::from_str::<CargoVcsInfo>(&cargo_vcs_info_json)?;
2060 let cargo_toml = std::fs::read_to_string(dir.join("Cargo.toml"))?;
2061 let manifest = toml::from_str::<CargoToml>(&cargo_toml)?;
2062 let (provider, remote) = parse_git_remote_url(provider_registry, &manifest.package.repository)
2063 .ok_or_else(|| anyhow!("Failed to parse package.repository field of manifest"))?;
2064 let path = PathBuf::from(cargo_vcs_info.path_in_vcs).join(path.strip_prefix(dir).unwrap());
2065 let permalink = provider.build_permalink(
2066 remote,
2067 BuildPermalinkParams {
2068 sha: &cargo_vcs_info.git.sha1,
2069 path: &path.to_string_lossy(),
2070 selection: Some(selection),
2071 },
2072 );
2073 Ok(permalink)
2074}