1use crate::{
2 search::SearchQuery,
3 worktree_store::{WorktreeStore, WorktreeStoreEvent},
4 Item, NoRepositoryError, ProjectPath,
5};
6use anyhow::{anyhow, Context as _, Result};
7use client::Client;
8use collections::{hash_map, HashMap, HashSet};
9use fs::Fs;
10use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt};
11use git::blame::Blame;
12use gpui::{
13 AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Task, WeakModel,
14};
15use http_client::Url;
16use language::{
17 proto::{deserialize_line_ending, deserialize_version, serialize_version, split_operations},
18 Buffer, Capability, Event as BufferEvent, File as _, Language, Operation,
19};
20use rpc::{
21 proto::{self, AnyProtoClient},
22 ErrorExt as _, TypedEnvelope,
23};
24use smol::channel::Receiver;
25use std::{io, path::Path, str::FromStr as _, sync::Arc, time::Instant};
26use text::BufferId;
27use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
28use worktree::{
29 File, PathChange, ProjectEntryId, RemoteWorktree, UpdatedGitRepositoriesSet, Worktree,
30 WorktreeId,
31};
32
33/// A set of open buffers.
34pub struct BufferStore {
35 downstream_client: Option<AnyProtoClient>,
36 remote_id: Option<u64>,
37 #[allow(unused)]
38 worktree_store: Model<WorktreeStore>,
39 opened_buffers: HashMap<BufferId, OpenBuffer>,
40 local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
41 local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
42 #[allow(clippy::type_complexity)]
43 loading_buffers_by_path: HashMap<
44 ProjectPath,
45 postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
46 >,
47 loading_remote_buffers_by_id: HashMap<BufferId, Model<Buffer>>,
48 remote_buffer_listeners:
49 HashMap<BufferId, Vec<oneshot::Sender<Result<Model<Buffer>, anyhow::Error>>>>,
50 shared_buffers: HashMap<proto::PeerId, HashSet<BufferId>>,
51}
52
53enum OpenBuffer {
54 Strong(Model<Buffer>),
55 Weak(WeakModel<Buffer>),
56 Operations(Vec<Operation>),
57}
58
59pub enum BufferStoreEvent {
60 BufferAdded(Model<Buffer>),
61 BufferDropped(BufferId),
62 BufferChangedFilePath {
63 buffer: Model<Buffer>,
64 old_file: Option<Arc<dyn language::File>>,
65 },
66}
67
68#[derive(Default)]
69pub struct ProjectTransaction(pub HashMap<Model<Buffer>, language::Transaction>);
70
71impl EventEmitter<BufferStoreEvent> for BufferStore {}
72
73impl BufferStore {
74 pub fn init(client: &AnyProtoClient) {
75 client.add_model_message_handler(Self::handle_buffer_reloaded);
76 client.add_model_message_handler(Self::handle_buffer_saved);
77 client.add_model_message_handler(Self::handle_update_buffer_file);
78 client.add_model_message_handler(Self::handle_update_diff_base);
79 client.add_model_request_handler(Self::handle_save_buffer);
80 client.add_model_request_handler(Self::handle_blame_buffer);
81 }
82
83 /// Creates a buffer store, optionally retaining its buffers.
84 ///
85 /// If `retain_buffers` is `true`, then buffers are owned by the buffer store
86 /// and won't be released unless they are explicitly removed, or `retain_buffers`
87 /// is set to `false` via `set_retain_buffers`. Otherwise, buffers are stored as
88 /// weak handles.
89 pub fn new(
90 worktree_store: Model<WorktreeStore>,
91 remote_id: Option<u64>,
92 cx: &mut ModelContext<Self>,
93 ) -> Self {
94 cx.subscribe(&worktree_store, |this, _, event, cx| {
95 if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
96 this.subscribe_to_worktree(worktree, cx);
97 }
98 })
99 .detach();
100
101 Self {
102 remote_id,
103 downstream_client: None,
104 worktree_store,
105 opened_buffers: Default::default(),
106 remote_buffer_listeners: Default::default(),
107 loading_remote_buffers_by_id: Default::default(),
108 local_buffer_ids_by_path: Default::default(),
109 local_buffer_ids_by_entry_id: Default::default(),
110 loading_buffers_by_path: Default::default(),
111 shared_buffers: Default::default(),
112 }
113 }
114
115 pub fn open_buffer(
116 &mut self,
117 project_path: ProjectPath,
118 cx: &mut ModelContext<Self>,
119 ) -> Task<Result<Model<Buffer>>> {
120 let existing_buffer = self.get_by_path(&project_path, cx);
121 if let Some(existing_buffer) = existing_buffer {
122 return Task::ready(Ok(existing_buffer));
123 }
124
125 let Some(worktree) = self
126 .worktree_store
127 .read(cx)
128 .worktree_for_id(project_path.worktree_id, cx)
129 else {
130 return Task::ready(Err(anyhow!("no such worktree")));
131 };
132
133 let loading_watch = match self.loading_buffers_by_path.entry(project_path.clone()) {
134 // If the given path is already being loaded, then wait for that existing
135 // task to complete and return the same buffer.
136 hash_map::Entry::Occupied(e) => e.get().clone(),
137
138 // Otherwise, record the fact that this path is now being loaded.
139 hash_map::Entry::Vacant(entry) => {
140 let (mut tx, rx) = postage::watch::channel();
141 entry.insert(rx.clone());
142
143 let project_path = project_path.clone();
144 let load_buffer = match worktree.read(cx) {
145 Worktree::Local(_) => {
146 self.open_local_buffer_internal(project_path.path.clone(), worktree, cx)
147 }
148 Worktree::Remote(tree) => {
149 self.open_remote_buffer_internal(&project_path.path, tree, cx)
150 }
151 };
152
153 cx.spawn(move |this, mut cx| async move {
154 let load_result = load_buffer.await;
155 *tx.borrow_mut() = Some(this.update(&mut cx, |this, _| {
156 // Record the fact that the buffer is no longer loading.
157 this.loading_buffers_by_path.remove(&project_path);
158 let buffer = load_result.map_err(Arc::new)?;
159 Ok(buffer)
160 })?);
161 anyhow::Ok(())
162 })
163 .detach();
164 rx
165 }
166 };
167
168 cx.background_executor().spawn(async move {
169 Self::wait_for_loading_buffer(loading_watch)
170 .await
171 .map_err(|e| e.cloned())
172 })
173 }
174
175 fn subscribe_to_worktree(&mut self, worktree: &Model<Worktree>, cx: &mut ModelContext<Self>) {
176 cx.subscribe(worktree, |this, worktree, event, cx| {
177 if worktree.read(cx).is_local() {
178 match event {
179 worktree::Event::UpdatedEntries(changes) => {
180 this.local_worktree_entries_changed(&worktree, changes, cx);
181 }
182 worktree::Event::UpdatedGitRepositories(updated_repos) => {
183 this.local_worktree_git_repos_changed(worktree.clone(), updated_repos, cx)
184 }
185 _ => {}
186 }
187 }
188 })
189 .detach();
190 }
191
192 fn local_worktree_entries_changed(
193 &mut self,
194 worktree_handle: &Model<Worktree>,
195 changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
196 cx: &mut ModelContext<Self>,
197 ) {
198 let snapshot = worktree_handle.read(cx).snapshot();
199 for (path, entry_id, _) in changes {
200 self.local_worktree_entry_changed(*entry_id, path, worktree_handle, &snapshot, cx);
201 }
202 }
203
204 fn local_worktree_git_repos_changed(
205 &mut self,
206 worktree_handle: Model<Worktree>,
207 changed_repos: &UpdatedGitRepositoriesSet,
208 cx: &mut ModelContext<Self>,
209 ) {
210 debug_assert!(worktree_handle.read(cx).is_local());
211
212 // Identify the loading buffers whose containing repository that has changed.
213 let future_buffers = self
214 .loading_buffers()
215 .filter_map(|(project_path, receiver)| {
216 if project_path.worktree_id != worktree_handle.read(cx).id() {
217 return None;
218 }
219 let path = &project_path.path;
220 changed_repos
221 .iter()
222 .find(|(work_dir, _)| path.starts_with(work_dir))?;
223 let path = path.clone();
224 Some(async move {
225 Self::wait_for_loading_buffer(receiver)
226 .await
227 .ok()
228 .map(|buffer| (buffer, path))
229 })
230 })
231 .collect::<FuturesUnordered<_>>();
232
233 // Identify the current buffers whose containing repository has changed.
234 let current_buffers = self
235 .buffers()
236 .filter_map(|buffer| {
237 let file = File::from_dyn(buffer.read(cx).file())?;
238 if file.worktree != worktree_handle {
239 return None;
240 }
241 changed_repos
242 .iter()
243 .find(|(work_dir, _)| file.path.starts_with(work_dir))?;
244 Some((buffer, file.path.clone()))
245 })
246 .collect::<Vec<_>>();
247
248 if future_buffers.len() + current_buffers.len() == 0 {
249 return;
250 }
251
252 cx.spawn(move |this, mut cx| async move {
253 // Wait for all of the buffers to load.
254 let future_buffers = future_buffers.collect::<Vec<_>>().await;
255
256 // Reload the diff base for every buffer whose containing git repository has changed.
257 let snapshot =
258 worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
259 let diff_bases_by_buffer = cx
260 .background_executor()
261 .spawn(async move {
262 let mut diff_base_tasks = future_buffers
263 .into_iter()
264 .flatten()
265 .chain(current_buffers)
266 .filter_map(|(buffer, path)| {
267 let (repo_entry, local_repo_entry) = snapshot.repo_for_path(&path)?;
268 let relative_path = repo_entry.relativize(&snapshot, &path).ok()?;
269 Some(async move {
270 let base_text =
271 local_repo_entry.repo().load_index_text(&relative_path);
272 Some((buffer, base_text))
273 })
274 })
275 .collect::<FuturesUnordered<_>>();
276
277 let mut diff_bases = Vec::with_capacity(diff_base_tasks.len());
278 while let Some(diff_base) = diff_base_tasks.next().await {
279 if let Some(diff_base) = diff_base {
280 diff_bases.push(diff_base);
281 }
282 }
283 diff_bases
284 })
285 .await;
286
287 this.update(&mut cx, |this, cx| {
288 // Assign the new diff bases on all of the buffers.
289 for (buffer, diff_base) in diff_bases_by_buffer {
290 let buffer_id = buffer.update(cx, |buffer, cx| {
291 buffer.set_diff_base(diff_base.clone(), cx);
292 buffer.remote_id().to_proto()
293 });
294 if let Some(project_id) = this.remote_id {
295 if let Some(client) = &this.downstream_client {
296 client
297 .send(proto::UpdateDiffBase {
298 project_id,
299 buffer_id,
300 diff_base,
301 })
302 .log_err();
303 }
304 }
305 }
306 })
307 })
308 .detach_and_log_err(cx);
309 }
310
311 fn open_local_buffer_internal(
312 &mut self,
313 path: Arc<Path>,
314 worktree: Model<Worktree>,
315 cx: &mut ModelContext<Self>,
316 ) -> Task<Result<Model<Buffer>>> {
317 let load_buffer = worktree.update(cx, |worktree, cx| {
318 let load_file = worktree.load_file(path.as_ref(), cx);
319 let reservation = cx.reserve_model();
320 let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
321 cx.spawn(move |_, mut cx| async move {
322 let loaded = load_file.await?;
323 let text_buffer = cx
324 .background_executor()
325 .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
326 .await;
327 cx.insert_model(reservation, |_| {
328 Buffer::build(
329 text_buffer,
330 loaded.diff_base,
331 Some(loaded.file),
332 Capability::ReadWrite,
333 )
334 })
335 })
336 });
337
338 cx.spawn(move |this, mut cx| async move {
339 let buffer = match load_buffer.await {
340 Ok(buffer) => Ok(buffer),
341 Err(error) if is_not_found_error(&error) => cx.new_model(|cx| {
342 let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
343 let text_buffer = text::Buffer::new(0, buffer_id, "".into());
344 Buffer::build(
345 text_buffer,
346 None,
347 Some(Arc::new(File {
348 worktree,
349 path,
350 mtime: None,
351 entry_id: None,
352 is_local: true,
353 is_deleted: false,
354 is_private: false,
355 })),
356 Capability::ReadWrite,
357 )
358 }),
359 Err(e) => Err(e),
360 }?;
361 this.update(&mut cx, |this, cx| {
362 this.add_buffer(buffer.clone(), cx).log_err();
363 })?;
364 Ok(buffer)
365 })
366 }
367
368 fn open_remote_buffer_internal(
369 &self,
370 path: &Arc<Path>,
371 worktree: &RemoteWorktree,
372 cx: &ModelContext<Self>,
373 ) -> Task<Result<Model<Buffer>>> {
374 let worktree_id = worktree.id().to_proto();
375 let project_id = worktree.project_id();
376 let client = worktree.client();
377 let path_string = path.clone().to_string_lossy().to_string();
378 cx.spawn(move |this, mut cx| async move {
379 let response = client
380 .request(proto::OpenBufferByPath {
381 project_id,
382 worktree_id,
383 path: path_string,
384 })
385 .await?;
386 let buffer_id = BufferId::new(response.buffer_id)?;
387 this.update(&mut cx, |this, cx| {
388 this.wait_for_remote_buffer(buffer_id, cx)
389 })?
390 .await
391 })
392 }
393
394 pub fn create_buffer(
395 &mut self,
396 remote_client: Option<(AnyProtoClient, u64)>,
397 cx: &mut ModelContext<Self>,
398 ) -> Task<Result<Model<Buffer>>> {
399 if let Some((remote_client, project_id)) = remote_client {
400 let create = remote_client.request(proto::OpenNewBuffer { project_id });
401 cx.spawn(|this, mut cx| async move {
402 let response = create.await?;
403 let buffer_id = BufferId::new(response.buffer_id)?;
404
405 this.update(&mut cx, |this, cx| {
406 this.wait_for_remote_buffer(buffer_id, cx)
407 })?
408 .await
409 })
410 } else {
411 Task::ready(Ok(self.create_local_buffer("", None, cx)))
412 }
413 }
414
415 pub fn create_local_buffer(
416 &mut self,
417 text: &str,
418 language: Option<Arc<Language>>,
419 cx: &mut ModelContext<Self>,
420 ) -> Model<Buffer> {
421 let buffer = cx.new_model(|cx| {
422 Buffer::local(text, cx)
423 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
424 });
425 self.add_buffer(buffer.clone(), cx).log_err();
426 buffer
427 }
428
429 pub fn save_buffer(
430 &mut self,
431 buffer: Model<Buffer>,
432 cx: &mut ModelContext<Self>,
433 ) -> Task<Result<()>> {
434 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
435 return Task::ready(Err(anyhow!("buffer doesn't have a file")));
436 };
437 match file.worktree.read(cx) {
438 Worktree::Local(_) => {
439 self.save_local_buffer(file.worktree.clone(), buffer, file.path.clone(), false, cx)
440 }
441 Worktree::Remote(tree) => self.save_remote_buffer(buffer, None, tree, cx),
442 }
443 }
444
445 pub fn save_buffer_as(
446 &mut self,
447 buffer: Model<Buffer>,
448 path: ProjectPath,
449 cx: &mut ModelContext<Self>,
450 ) -> Task<Result<()>> {
451 let Some(worktree) = self
452 .worktree_store
453 .read(cx)
454 .worktree_for_id(path.worktree_id, cx)
455 else {
456 return Task::ready(Err(anyhow!("no such worktree")));
457 };
458
459 let old_file = buffer.read(cx).file().cloned();
460
461 let task = match worktree.read(cx) {
462 Worktree::Local(_) => {
463 self.save_local_buffer(worktree, buffer.clone(), path.path, true, cx)
464 }
465 Worktree::Remote(tree) => {
466 self.save_remote_buffer(buffer.clone(), Some(path.to_proto()), tree, cx)
467 }
468 };
469 cx.spawn(|this, mut cx| async move {
470 task.await?;
471 this.update(&mut cx, |_, cx| {
472 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
473 })
474 })
475 }
476
477 fn save_local_buffer(
478 &self,
479 worktree: Model<Worktree>,
480 buffer_handle: Model<Buffer>,
481 path: Arc<Path>,
482 mut has_changed_file: bool,
483 cx: &mut ModelContext<Self>,
484 ) -> Task<Result<()>> {
485 let buffer = buffer_handle.read(cx);
486 let text = buffer.as_rope().clone();
487 let line_ending = buffer.line_ending();
488 let version = buffer.version();
489 let buffer_id = buffer.remote_id();
490 if buffer.file().is_some_and(|file| !file.is_created()) {
491 has_changed_file = true;
492 }
493
494 let save = worktree.update(cx, |worktree, cx| {
495 worktree.write_file(path.as_ref(), text, line_ending, cx)
496 });
497
498 cx.spawn(move |this, mut cx| async move {
499 let new_file = save.await?;
500 let mtime = new_file.mtime;
501 this.update(&mut cx, |this, cx| {
502 if let Some(downstream_client) = this.downstream_client.as_ref() {
503 let project_id = this.remote_id.unwrap_or(0);
504 if has_changed_file {
505 downstream_client
506 .send(proto::UpdateBufferFile {
507 project_id,
508 buffer_id: buffer_id.to_proto(),
509 file: Some(language::File::to_proto(&*new_file, cx)),
510 })
511 .log_err();
512 }
513 downstream_client
514 .send(proto::BufferSaved {
515 project_id,
516 buffer_id: buffer_id.to_proto(),
517 version: serialize_version(&version),
518 mtime: mtime.map(|time| time.into()),
519 })
520 .log_err();
521 }
522 })?;
523 buffer_handle.update(&mut cx, |buffer, cx| {
524 if has_changed_file {
525 buffer.file_updated(new_file, cx);
526 }
527 buffer.did_save(version.clone(), mtime, cx);
528 })
529 })
530 }
531
532 fn save_remote_buffer(
533 &self,
534 buffer_handle: Model<Buffer>,
535 new_path: Option<proto::ProjectPath>,
536 tree: &RemoteWorktree,
537 cx: &ModelContext<Self>,
538 ) -> Task<Result<()>> {
539 let buffer = buffer_handle.read(cx);
540 let buffer_id = buffer.remote_id().into();
541 let version = buffer.version();
542 let rpc = tree.client();
543 let project_id = tree.project_id();
544 cx.spawn(move |_, mut cx| async move {
545 let response = rpc
546 .request(proto::SaveBuffer {
547 project_id,
548 buffer_id,
549 new_path,
550 version: serialize_version(&version),
551 })
552 .await?;
553 let version = deserialize_version(&response.version);
554 let mtime = response.mtime.map(|mtime| mtime.into());
555
556 buffer_handle.update(&mut cx, |buffer, cx| {
557 buffer.did_save(version.clone(), mtime, cx);
558 })?;
559
560 Ok(())
561 })
562 }
563
564 pub fn blame_buffer(
565 &self,
566 buffer: &Model<Buffer>,
567 version: Option<clock::Global>,
568 cx: &AppContext,
569 ) -> Task<Result<Blame>> {
570 let buffer = buffer.read(cx);
571 let Some(file) = File::from_dyn(buffer.file()) else {
572 return Task::ready(Err(anyhow!("buffer has no file")));
573 };
574
575 match file.worktree.clone().read(cx) {
576 Worktree::Local(worktree) => {
577 let worktree = worktree.snapshot();
578 let blame_params = maybe!({
579 let (repo_entry, local_repo_entry) = match worktree.repo_for_path(&file.path) {
580 Some(repo_for_path) => repo_for_path,
581 None => anyhow::bail!(NoRepositoryError {}),
582 };
583
584 let relative_path = repo_entry
585 .relativize(&worktree, &file.path)
586 .context("failed to relativize buffer path")?;
587
588 let repo = local_repo_entry.repo().clone();
589
590 let content = match version {
591 Some(version) => buffer.rope_for_version(&version).clone(),
592 None => buffer.as_rope().clone(),
593 };
594
595 anyhow::Ok((repo, relative_path, content))
596 });
597
598 cx.background_executor().spawn(async move {
599 let (repo, relative_path, content) = blame_params?;
600 repo.blame(&relative_path, content)
601 .with_context(|| format!("Failed to blame {:?}", relative_path.0))
602 })
603 }
604 Worktree::Remote(worktree) => {
605 let buffer_id = buffer.remote_id();
606 let version = buffer.version();
607 let project_id = worktree.project_id();
608 let client = worktree.client();
609 cx.spawn(|_| async move {
610 let response = client
611 .request(proto::BlameBuffer {
612 project_id,
613 buffer_id: buffer_id.into(),
614 version: serialize_version(&version),
615 })
616 .await?;
617 Ok(deserialize_blame_buffer_response(response))
618 })
619 }
620 }
621 }
622
623 fn add_buffer(&mut self, buffer: Model<Buffer>, cx: &mut ModelContext<Self>) -> Result<()> {
624 let remote_id = buffer.read(cx).remote_id();
625 let is_remote = buffer.read(cx).replica_id() != 0;
626 let open_buffer = if self.remote_id.is_some() {
627 OpenBuffer::Strong(buffer.clone())
628 } else {
629 OpenBuffer::Weak(buffer.downgrade())
630 };
631
632 let handle = cx.handle().downgrade();
633 buffer.update(cx, move |_, cx| {
634 cx.on_release(move |buffer, cx| {
635 handle
636 .update(cx, |_, cx| {
637 cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
638 })
639 .ok();
640 })
641 .detach()
642 });
643
644 match self.opened_buffers.entry(remote_id) {
645 hash_map::Entry::Vacant(entry) => {
646 entry.insert(open_buffer);
647 }
648 hash_map::Entry::Occupied(mut entry) => {
649 if let OpenBuffer::Operations(operations) = entry.get_mut() {
650 buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx))?;
651 } else if entry.get().upgrade().is_some() {
652 if is_remote {
653 return Ok(());
654 } else {
655 debug_panic!("buffer {} was already registered", remote_id);
656 Err(anyhow!("buffer {} was already registered", remote_id))?;
657 }
658 }
659 entry.insert(open_buffer);
660 }
661 }
662
663 if let Some(senders) = self.remote_buffer_listeners.remove(&remote_id) {
664 for sender in senders {
665 sender.send(Ok(buffer.clone())).ok();
666 }
667 }
668
669 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
670 if file.is_local {
671 self.local_buffer_ids_by_path.insert(
672 ProjectPath {
673 worktree_id: file.worktree_id(cx),
674 path: file.path.clone(),
675 },
676 remote_id,
677 );
678
679 if let Some(entry_id) = file.entry_id {
680 self.local_buffer_ids_by_entry_id
681 .insert(entry_id, remote_id);
682 }
683 }
684 }
685
686 cx.subscribe(&buffer, Self::on_buffer_event).detach();
687 cx.emit(BufferStoreEvent::BufferAdded(buffer));
688 Ok(())
689 }
690
691 pub fn buffers(&self) -> impl '_ + Iterator<Item = Model<Buffer>> {
692 self.opened_buffers
693 .values()
694 .filter_map(|buffer| buffer.upgrade())
695 }
696
697 pub fn loading_buffers(
698 &self,
699 ) -> impl Iterator<
700 Item = (
701 &ProjectPath,
702 postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
703 ),
704 > {
705 self.loading_buffers_by_path
706 .iter()
707 .map(|(path, rx)| (path, rx.clone()))
708 }
709
710 pub fn get_by_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Model<Buffer>> {
711 self.buffers().find_map(|buffer| {
712 let file = File::from_dyn(buffer.read(cx).file())?;
713 if file.worktree_id(cx) == path.worktree_id && file.path == path.path {
714 Some(buffer)
715 } else {
716 None
717 }
718 })
719 }
720
721 pub fn get(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
722 self.opened_buffers
723 .get(&buffer_id)
724 .and_then(|buffer| buffer.upgrade())
725 }
726
727 pub fn get_existing(&self, buffer_id: BufferId) -> Result<Model<Buffer>> {
728 self.get(buffer_id)
729 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
730 }
731
732 pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
733 self.get(buffer_id)
734 .or_else(|| self.loading_remote_buffers_by_id.get(&buffer_id).cloned())
735 }
736
737 pub fn wait_for_remote_buffer(
738 &mut self,
739 id: BufferId,
740 cx: &mut AppContext,
741 ) -> Task<Result<Model<Buffer>>> {
742 let buffer = self.get(id);
743 if let Some(buffer) = buffer {
744 return Task::ready(Ok(buffer));
745 }
746 let (tx, rx) = oneshot::channel();
747 self.remote_buffer_listeners.entry(id).or_default().push(tx);
748 cx.background_executor().spawn(async move { rx.await? })
749 }
750
751 pub fn buffer_version_info(
752 &self,
753 cx: &AppContext,
754 ) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
755 let buffers = self
756 .buffers()
757 .map(|buffer| {
758 let buffer = buffer.read(cx);
759 proto::BufferVersion {
760 id: buffer.remote_id().into(),
761 version: language::proto::serialize_version(&buffer.version),
762 }
763 })
764 .collect();
765 let incomplete_buffer_ids = self
766 .loading_remote_buffers_by_id
767 .keys()
768 .copied()
769 .collect::<Vec<_>>();
770 (buffers, incomplete_buffer_ids)
771 }
772
773 pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
774 self.downstream_client.take();
775 self.set_remote_id(None, cx);
776
777 for buffer in self.buffers() {
778 buffer.update(cx, |buffer, cx| {
779 buffer.set_capability(Capability::ReadOnly, cx)
780 });
781 }
782
783 // Wake up all futures currently waiting on a buffer to get opened,
784 // to give them a chance to fail now that we've disconnected.
785 self.remote_buffer_listeners.clear();
786 }
787
788 pub fn shared(
789 &mut self,
790 remote_id: u64,
791 downstream_client: AnyProtoClient,
792 cx: &mut AppContext,
793 ) {
794 self.downstream_client = Some(downstream_client);
795 self.set_remote_id(Some(remote_id), cx);
796 }
797
798 pub fn unshared(&mut self, _cx: &mut ModelContext<Self>) {
799 self.remote_id.take();
800 }
801
802 fn set_remote_id(&mut self, remote_id: Option<u64>, cx: &mut AppContext) {
803 self.remote_id = remote_id;
804 for open_buffer in self.opened_buffers.values_mut() {
805 if remote_id.is_some() {
806 if let OpenBuffer::Weak(buffer) = open_buffer {
807 if let Some(buffer) = buffer.upgrade() {
808 *open_buffer = OpenBuffer::Strong(buffer);
809 }
810 }
811 } else {
812 if let Some(buffer) = open_buffer.upgrade() {
813 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
814 }
815 if let OpenBuffer::Strong(buffer) = open_buffer {
816 *open_buffer = OpenBuffer::Weak(buffer.downgrade());
817 }
818 }
819 }
820 }
821
822 pub fn discard_incomplete(&mut self) {
823 self.opened_buffers
824 .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
825 }
826
827 pub fn find_search_candidates(
828 &mut self,
829 query: &SearchQuery,
830 mut limit: usize,
831 fs: Arc<dyn Fs>,
832 cx: &mut ModelContext<Self>,
833 ) -> Receiver<Model<Buffer>> {
834 let (tx, rx) = smol::channel::unbounded();
835 let mut open_buffers = HashSet::default();
836 let mut unnamed_buffers = Vec::new();
837 for handle in self.buffers() {
838 let buffer = handle.read(cx);
839 if let Some(entry_id) = buffer.entry_id(cx) {
840 open_buffers.insert(entry_id);
841 } else {
842 limit = limit.saturating_sub(1);
843 unnamed_buffers.push(handle)
844 };
845 }
846
847 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
848 let mut project_paths_rx = self
849 .worktree_store
850 .update(cx, |worktree_store, cx| {
851 worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
852 })
853 .chunks(MAX_CONCURRENT_BUFFER_OPENS);
854
855 cx.spawn(|this, mut cx| async move {
856 for buffer in unnamed_buffers {
857 tx.send(buffer).await.ok();
858 }
859
860 while let Some(project_paths) = project_paths_rx.next().await {
861 let buffers = this.update(&mut cx, |this, cx| {
862 project_paths
863 .into_iter()
864 .map(|project_path| this.open_buffer(project_path, cx))
865 .collect::<Vec<_>>()
866 })?;
867 for buffer_task in buffers {
868 if let Some(buffer) = buffer_task.await.log_err() {
869 if tx.send(buffer).await.is_err() {
870 return anyhow::Ok(());
871 }
872 }
873 }
874 }
875 anyhow::Ok(())
876 })
877 .detach();
878 rx
879 }
880
881 fn on_buffer_event(
882 &mut self,
883 buffer: Model<Buffer>,
884 event: &BufferEvent,
885 cx: &mut ModelContext<Self>,
886 ) {
887 if event == &BufferEvent::FileHandleChanged {
888 self.buffer_changed_file(buffer, cx);
889 }
890 }
891
892 fn local_worktree_entry_changed(
893 &mut self,
894 entry_id: ProjectEntryId,
895 path: &Arc<Path>,
896 worktree: &Model<worktree::Worktree>,
897 snapshot: &worktree::Snapshot,
898 cx: &mut ModelContext<Self>,
899 ) -> Option<()> {
900 let project_path = ProjectPath {
901 worktree_id: snapshot.id(),
902 path: path.clone(),
903 };
904 let buffer_id = match self.local_buffer_ids_by_entry_id.get(&entry_id) {
905 Some(&buffer_id) => buffer_id,
906 None => self.local_buffer_ids_by_path.get(&project_path).copied()?,
907 };
908 let buffer = if let Some(buffer) = self.get(buffer_id) {
909 buffer
910 } else {
911 self.opened_buffers.remove(&buffer_id);
912 self.local_buffer_ids_by_path.remove(&project_path);
913 self.local_buffer_ids_by_entry_id.remove(&entry_id);
914 return None;
915 };
916
917 let events = buffer.update(cx, |buffer, cx| {
918 let file = buffer.file()?;
919 let old_file = File::from_dyn(Some(file))?;
920 if old_file.worktree != *worktree {
921 return None;
922 }
923
924 let new_file = if let Some(entry) = old_file
925 .entry_id
926 .and_then(|entry_id| snapshot.entry_for_id(entry_id))
927 {
928 File {
929 is_local: true,
930 entry_id: Some(entry.id),
931 mtime: entry.mtime,
932 path: entry.path.clone(),
933 worktree: worktree.clone(),
934 is_deleted: false,
935 is_private: entry.is_private,
936 }
937 } else if let Some(entry) = snapshot.entry_for_path(old_file.path.as_ref()) {
938 File {
939 is_local: true,
940 entry_id: Some(entry.id),
941 mtime: entry.mtime,
942 path: entry.path.clone(),
943 worktree: worktree.clone(),
944 is_deleted: false,
945 is_private: entry.is_private,
946 }
947 } else {
948 File {
949 is_local: true,
950 entry_id: old_file.entry_id,
951 path: old_file.path.clone(),
952 mtime: old_file.mtime,
953 worktree: worktree.clone(),
954 is_deleted: true,
955 is_private: old_file.is_private,
956 }
957 };
958
959 if new_file == *old_file {
960 return None;
961 }
962
963 let mut events = Vec::new();
964 if new_file.path != old_file.path {
965 self.local_buffer_ids_by_path.remove(&ProjectPath {
966 path: old_file.path.clone(),
967 worktree_id: old_file.worktree_id(cx),
968 });
969 self.local_buffer_ids_by_path.insert(
970 ProjectPath {
971 worktree_id: new_file.worktree_id(cx),
972 path: new_file.path.clone(),
973 },
974 buffer_id,
975 );
976 events.push(BufferStoreEvent::BufferChangedFilePath {
977 buffer: cx.handle(),
978 old_file: buffer.file().cloned(),
979 });
980 }
981
982 if new_file.entry_id != old_file.entry_id {
983 if let Some(entry_id) = old_file.entry_id {
984 self.local_buffer_ids_by_entry_id.remove(&entry_id);
985 }
986 if let Some(entry_id) = new_file.entry_id {
987 self.local_buffer_ids_by_entry_id
988 .insert(entry_id, buffer_id);
989 }
990 }
991
992 if let Some(project_id) = self.remote_id {
993 if let Some(client) = &self.downstream_client {
994 client
995 .send(proto::UpdateBufferFile {
996 project_id,
997 buffer_id: buffer_id.to_proto(),
998 file: Some(new_file.to_proto(cx)),
999 })
1000 .ok();
1001 }
1002 }
1003
1004 buffer.file_updated(Arc::new(new_file), cx);
1005 Some(events)
1006 })?;
1007
1008 for event in events {
1009 cx.emit(event);
1010 }
1011
1012 None
1013 }
1014
1015 fn buffer_changed_file(&mut self, buffer: Model<Buffer>, cx: &mut AppContext) -> Option<()> {
1016 let file = File::from_dyn(buffer.read(cx).file())?;
1017
1018 let remote_id = buffer.read(cx).remote_id();
1019 if let Some(entry_id) = file.entry_id {
1020 match self.local_buffer_ids_by_entry_id.get(&entry_id) {
1021 Some(_) => {
1022 return None;
1023 }
1024 None => {
1025 self.local_buffer_ids_by_entry_id
1026 .insert(entry_id, remote_id);
1027 }
1028 }
1029 };
1030 self.local_buffer_ids_by_path.insert(
1031 ProjectPath {
1032 worktree_id: file.worktree_id(cx),
1033 path: file.path.clone(),
1034 },
1035 remote_id,
1036 );
1037
1038 Some(())
1039 }
1040
1041 pub async fn handle_update_buffer(
1042 this: Model<Self>,
1043 envelope: TypedEnvelope<proto::UpdateBuffer>,
1044 mut cx: AsyncAppContext,
1045 ) -> Result<proto::Ack> {
1046 let payload = envelope.payload.clone();
1047 let buffer_id = BufferId::new(payload.buffer_id)?;
1048 let ops = payload
1049 .operations
1050 .into_iter()
1051 .map(language::proto::deserialize_operation)
1052 .collect::<Result<Vec<_>, _>>()?;
1053 this.update(&mut cx, |this, cx| {
1054 match this.opened_buffers.entry(buffer_id) {
1055 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
1056 OpenBuffer::Strong(buffer) => {
1057 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
1058 }
1059 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
1060 OpenBuffer::Weak(_) => {}
1061 },
1062 hash_map::Entry::Vacant(e) => {
1063 e.insert(OpenBuffer::Operations(ops));
1064 }
1065 }
1066 Ok(proto::Ack {})
1067 })?
1068 }
1069
1070 pub fn handle_synchronize_buffers(
1071 &mut self,
1072 envelope: TypedEnvelope<proto::SynchronizeBuffers>,
1073 cx: &mut ModelContext<Self>,
1074 client: Arc<Client>,
1075 ) -> Result<proto::SynchronizeBuffersResponse> {
1076 let project_id = envelope.payload.project_id;
1077 let mut response = proto::SynchronizeBuffersResponse {
1078 buffers: Default::default(),
1079 };
1080 let Some(guest_id) = envelope.original_sender_id else {
1081 anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
1082 };
1083
1084 self.shared_buffers.entry(guest_id).or_default().clear();
1085 for buffer in envelope.payload.buffers {
1086 let buffer_id = BufferId::new(buffer.id)?;
1087 let remote_version = language::proto::deserialize_version(&buffer.version);
1088 if let Some(buffer) = self.get(buffer_id) {
1089 self.shared_buffers
1090 .entry(guest_id)
1091 .or_default()
1092 .insert(buffer_id);
1093
1094 let buffer = buffer.read(cx);
1095 response.buffers.push(proto::BufferVersion {
1096 id: buffer_id.into(),
1097 version: language::proto::serialize_version(&buffer.version),
1098 });
1099
1100 let operations = buffer.serialize_ops(Some(remote_version), cx);
1101 let client = client.clone();
1102 if let Some(file) = buffer.file() {
1103 client
1104 .send(proto::UpdateBufferFile {
1105 project_id,
1106 buffer_id: buffer_id.into(),
1107 file: Some(file.to_proto(cx)),
1108 })
1109 .log_err();
1110 }
1111
1112 client
1113 .send(proto::UpdateDiffBase {
1114 project_id,
1115 buffer_id: buffer_id.into(),
1116 diff_base: buffer.diff_base().map(ToString::to_string),
1117 })
1118 .log_err();
1119
1120 client
1121 .send(proto::BufferReloaded {
1122 project_id,
1123 buffer_id: buffer_id.into(),
1124 version: language::proto::serialize_version(buffer.saved_version()),
1125 mtime: buffer.saved_mtime().map(|time| time.into()),
1126 line_ending: language::proto::serialize_line_ending(buffer.line_ending())
1127 as i32,
1128 })
1129 .log_err();
1130
1131 cx.background_executor()
1132 .spawn(
1133 async move {
1134 let operations = operations.await;
1135 for chunk in split_operations(operations) {
1136 client
1137 .request(proto::UpdateBuffer {
1138 project_id,
1139 buffer_id: buffer_id.into(),
1140 operations: chunk,
1141 })
1142 .await?;
1143 }
1144 anyhow::Ok(())
1145 }
1146 .log_err(),
1147 )
1148 .detach();
1149 }
1150 }
1151 Ok(response)
1152 }
1153
1154 pub fn handle_create_buffer_for_peer(
1155 &mut self,
1156 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
1157 replica_id: u16,
1158 capability: Capability,
1159 cx: &mut ModelContext<Self>,
1160 ) -> Result<()> {
1161 match envelope
1162 .payload
1163 .variant
1164 .ok_or_else(|| anyhow!("missing variant"))?
1165 {
1166 proto::create_buffer_for_peer::Variant::State(mut state) => {
1167 let buffer_id = BufferId::new(state.id)?;
1168
1169 let buffer_result = maybe!({
1170 let mut buffer_file = None;
1171 if let Some(file) = state.file.take() {
1172 let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
1173 let worktree = self
1174 .worktree_store
1175 .read(cx)
1176 .worktree_for_id(worktree_id, cx)
1177 .ok_or_else(|| {
1178 anyhow!("no worktree found for id {}", file.worktree_id)
1179 })?;
1180 buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
1181 as Arc<dyn language::File>);
1182 }
1183 Buffer::from_proto(replica_id, capability, state, buffer_file)
1184 });
1185
1186 match buffer_result {
1187 Ok(buffer) => {
1188 let buffer = cx.new_model(|_| buffer);
1189 self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
1190 }
1191 Err(error) => {
1192 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
1193 for listener in listeners {
1194 listener.send(Err(anyhow!(error.cloned()))).ok();
1195 }
1196 }
1197 }
1198 }
1199 }
1200 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
1201 let buffer_id = BufferId::new(chunk.buffer_id)?;
1202 let buffer = self
1203 .loading_remote_buffers_by_id
1204 .get(&buffer_id)
1205 .cloned()
1206 .ok_or_else(|| {
1207 anyhow!(
1208 "received chunk for buffer {} without initial state",
1209 chunk.buffer_id
1210 )
1211 })?;
1212
1213 let result = maybe!({
1214 let operations = chunk
1215 .operations
1216 .into_iter()
1217 .map(language::proto::deserialize_operation)
1218 .collect::<Result<Vec<_>>>()?;
1219 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))
1220 });
1221
1222 if let Err(error) = result {
1223 self.loading_remote_buffers_by_id.remove(&buffer_id);
1224 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
1225 for listener in listeners {
1226 listener.send(Err(error.cloned())).ok();
1227 }
1228 }
1229 } else if chunk.is_last {
1230 self.loading_remote_buffers_by_id.remove(&buffer_id);
1231 self.add_buffer(buffer, cx)?;
1232 }
1233 }
1234 }
1235
1236 Ok(())
1237 }
1238
1239 pub async fn handle_update_buffer_file(
1240 this: Model<Self>,
1241 envelope: TypedEnvelope<proto::UpdateBufferFile>,
1242 mut cx: AsyncAppContext,
1243 ) -> Result<()> {
1244 let buffer_id = envelope.payload.buffer_id;
1245 let buffer_id = BufferId::new(buffer_id)?;
1246
1247 this.update(&mut cx, |this, cx| {
1248 let payload = envelope.payload.clone();
1249 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1250 let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
1251 let worktree = this
1252 .worktree_store
1253 .read(cx)
1254 .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
1255 .ok_or_else(|| anyhow!("no such worktree"))?;
1256 let file = File::from_proto(file, worktree, cx)?;
1257 let old_file = buffer.update(cx, |buffer, cx| {
1258 let old_file = buffer.file().cloned();
1259 let new_path = file.path.clone();
1260 buffer.file_updated(Arc::new(file), cx);
1261 if old_file
1262 .as_ref()
1263 .map_or(true, |old| *old.path() != new_path)
1264 {
1265 Some(old_file)
1266 } else {
1267 None
1268 }
1269 });
1270 if let Some(old_file) = old_file {
1271 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1272 }
1273 }
1274 Ok(())
1275 })?
1276 }
1277
1278 pub async fn handle_update_diff_base(
1279 this: Model<Self>,
1280 envelope: TypedEnvelope<proto::UpdateDiffBase>,
1281 mut cx: AsyncAppContext,
1282 ) -> Result<()> {
1283 this.update(&mut cx, |this, cx| {
1284 let buffer_id = envelope.payload.buffer_id;
1285 let buffer_id = BufferId::new(buffer_id)?;
1286 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1287 buffer.update(cx, |buffer, cx| {
1288 buffer.set_diff_base(envelope.payload.diff_base, cx)
1289 });
1290 }
1291 Ok(())
1292 })?
1293 }
1294
1295 pub async fn handle_save_buffer(
1296 this: Model<Self>,
1297 envelope: TypedEnvelope<proto::SaveBuffer>,
1298 mut cx: AsyncAppContext,
1299 ) -> Result<proto::BufferSaved> {
1300 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1301 let (buffer, project_id) = this.update(&mut cx, |this, _| {
1302 anyhow::Ok((
1303 this.get_existing(buffer_id)?,
1304 this.remote_id.context("project is not shared")?,
1305 ))
1306 })??;
1307 buffer
1308 .update(&mut cx, |buffer, _| {
1309 buffer.wait_for_version(deserialize_version(&envelope.payload.version))
1310 })?
1311 .await?;
1312 let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
1313
1314 if let Some(new_path) = envelope.payload.new_path {
1315 let new_path = ProjectPath::from_proto(new_path);
1316 this.update(&mut cx, |this, cx| {
1317 this.save_buffer_as(buffer.clone(), new_path, cx)
1318 })?
1319 .await?;
1320 } else {
1321 this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
1322 .await?;
1323 }
1324
1325 buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
1326 project_id,
1327 buffer_id: buffer_id.into(),
1328 version: serialize_version(buffer.saved_version()),
1329 mtime: buffer.saved_mtime().map(|time| time.into()),
1330 })
1331 }
1332
1333 pub async fn handle_close_buffer(
1334 this: Model<Self>,
1335 envelope: TypedEnvelope<proto::CloseBuffer>,
1336 mut cx: AsyncAppContext,
1337 ) -> Result<()> {
1338 let peer_id = envelope.sender_id;
1339 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1340 this.update(&mut cx, |this, _| {
1341 if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
1342 if shared.remove(&buffer_id) {
1343 if shared.is_empty() {
1344 this.shared_buffers.remove(&peer_id);
1345 }
1346 return;
1347 }
1348 };
1349 debug_panic!(
1350 "peer_id {} closed buffer_id {} which was either not open or already closed",
1351 peer_id,
1352 buffer_id
1353 )
1354 })
1355 }
1356
1357 pub async fn handle_buffer_saved(
1358 this: Model<Self>,
1359 envelope: TypedEnvelope<proto::BufferSaved>,
1360 mut cx: AsyncAppContext,
1361 ) -> Result<()> {
1362 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1363 let version = deserialize_version(&envelope.payload.version);
1364 let mtime = envelope.payload.mtime.map(|time| time.into());
1365 this.update(&mut cx, |this, cx| {
1366 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1367 buffer.update(cx, |buffer, cx| {
1368 buffer.did_save(version, mtime, cx);
1369 });
1370 }
1371 })
1372 }
1373
1374 pub async fn handle_buffer_reloaded(
1375 this: Model<Self>,
1376 envelope: TypedEnvelope<proto::BufferReloaded>,
1377 mut cx: AsyncAppContext,
1378 ) -> Result<()> {
1379 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1380 let version = deserialize_version(&envelope.payload.version);
1381 let mtime = envelope.payload.mtime.map(|time| time.into());
1382 let line_ending = deserialize_line_ending(
1383 proto::LineEnding::from_i32(envelope.payload.line_ending)
1384 .ok_or_else(|| anyhow!("missing line ending"))?,
1385 );
1386 this.update(&mut cx, |this, cx| {
1387 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1388 buffer.update(cx, |buffer, cx| {
1389 buffer.did_reload(version, line_ending, mtime, cx);
1390 });
1391 }
1392 })
1393 }
1394
1395 pub async fn handle_blame_buffer(
1396 this: Model<Self>,
1397 envelope: TypedEnvelope<proto::BlameBuffer>,
1398 mut cx: AsyncAppContext,
1399 ) -> Result<proto::BlameBufferResponse> {
1400 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1401 let version = deserialize_version(&envelope.payload.version);
1402 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1403 buffer
1404 .update(&mut cx, |buffer, _| {
1405 buffer.wait_for_version(version.clone())
1406 })?
1407 .await?;
1408 let blame = this
1409 .update(&mut cx, |this, cx| {
1410 this.blame_buffer(&buffer, Some(version), cx)
1411 })?
1412 .await?;
1413 Ok(serialize_blame_buffer_response(blame))
1414 }
1415
1416 pub async fn wait_for_loading_buffer(
1417 mut receiver: postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
1418 ) -> Result<Model<Buffer>, Arc<anyhow::Error>> {
1419 loop {
1420 if let Some(result) = receiver.borrow().as_ref() {
1421 match result {
1422 Ok(buffer) => return Ok(buffer.to_owned()),
1423 Err(e) => return Err(e.to_owned()),
1424 }
1425 }
1426 receiver.next().await;
1427 }
1428 }
1429
1430 pub fn create_buffer_for_peer(
1431 &mut self,
1432 buffer: &Model<Buffer>,
1433 peer_id: proto::PeerId,
1434 cx: &mut ModelContext<Self>,
1435 ) -> Task<Result<()>> {
1436 let buffer_id = buffer.read(cx).remote_id();
1437 if !self
1438 .shared_buffers
1439 .entry(peer_id)
1440 .or_default()
1441 .insert(buffer_id)
1442 {
1443 return Task::ready(Ok(()));
1444 }
1445
1446 let Some((client, project_id)) = self.downstream_client.clone().zip(self.remote_id) else {
1447 return Task::ready(Ok(()));
1448 };
1449
1450 cx.spawn(|this, mut cx| async move {
1451 let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else {
1452 return anyhow::Ok(());
1453 };
1454
1455 let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
1456 let operations = operations.await;
1457 let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?;
1458
1459 let initial_state = proto::CreateBufferForPeer {
1460 project_id,
1461 peer_id: Some(peer_id),
1462 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
1463 };
1464
1465 if client.send(initial_state).log_err().is_some() {
1466 let client = client.clone();
1467 cx.background_executor()
1468 .spawn(async move {
1469 let mut chunks = split_operations(operations).peekable();
1470 while let Some(chunk) = chunks.next() {
1471 let is_last = chunks.peek().is_none();
1472 client.send(proto::CreateBufferForPeer {
1473 project_id,
1474 peer_id: Some(peer_id),
1475 variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
1476 proto::BufferChunk {
1477 buffer_id: buffer_id.into(),
1478 operations: chunk,
1479 is_last,
1480 },
1481 )),
1482 })?;
1483 }
1484 anyhow::Ok(())
1485 })
1486 .await
1487 .log_err();
1488 }
1489 Ok(())
1490 })
1491 }
1492
1493 pub fn forget_shared_buffers(&mut self) {
1494 self.shared_buffers.clear();
1495 }
1496
1497 pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
1498 self.shared_buffers.remove(peer_id);
1499 }
1500
1501 pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
1502 if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
1503 self.shared_buffers.insert(new_peer_id, buffers);
1504 }
1505 }
1506
1507 pub fn shared_buffers(&self) -> &HashMap<proto::PeerId, HashSet<BufferId>> {
1508 &self.shared_buffers
1509 }
1510
1511 pub fn serialize_project_transaction_for_peer(
1512 &mut self,
1513 project_transaction: ProjectTransaction,
1514 peer_id: proto::PeerId,
1515 cx: &mut ModelContext<Self>,
1516 ) -> proto::ProjectTransaction {
1517 let mut serialized_transaction = proto::ProjectTransaction {
1518 buffer_ids: Default::default(),
1519 transactions: Default::default(),
1520 };
1521 for (buffer, transaction) in project_transaction.0 {
1522 self.create_buffer_for_peer(&buffer, peer_id, cx)
1523 .detach_and_log_err(cx);
1524 serialized_transaction
1525 .buffer_ids
1526 .push(buffer.read(cx).remote_id().into());
1527 serialized_transaction
1528 .transactions
1529 .push(language::proto::serialize_transaction(&transaction));
1530 }
1531 serialized_transaction
1532 }
1533
1534 pub async fn deserialize_project_transaction(
1535 this: WeakModel<Self>,
1536 message: proto::ProjectTransaction,
1537 push_to_history: bool,
1538 mut cx: AsyncAppContext,
1539 ) -> Result<ProjectTransaction> {
1540 let mut project_transaction = ProjectTransaction::default();
1541 for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions) {
1542 let buffer_id = BufferId::new(buffer_id)?;
1543 let buffer = this
1544 .update(&mut cx, |this, cx| {
1545 this.wait_for_remote_buffer(buffer_id, cx)
1546 })?
1547 .await?;
1548 let transaction = language::proto::deserialize_transaction(transaction)?;
1549 project_transaction.0.insert(buffer, transaction);
1550 }
1551
1552 for (buffer, transaction) in &project_transaction.0 {
1553 buffer
1554 .update(&mut cx, |buffer, _| {
1555 buffer.wait_for_edits(transaction.edit_ids.iter().copied())
1556 })?
1557 .await?;
1558
1559 if push_to_history {
1560 buffer.update(&mut cx, |buffer, _| {
1561 buffer.push_transaction(transaction.clone(), Instant::now());
1562 })?;
1563 }
1564 }
1565
1566 Ok(project_transaction)
1567 }
1568}
1569
1570impl OpenBuffer {
1571 fn upgrade(&self) -> Option<Model<Buffer>> {
1572 match self {
1573 OpenBuffer::Strong(handle) => Some(handle.clone()),
1574 OpenBuffer::Weak(handle) => handle.upgrade(),
1575 OpenBuffer::Operations(_) => None,
1576 }
1577 }
1578}
1579
1580fn is_not_found_error(error: &anyhow::Error) -> bool {
1581 error
1582 .root_cause()
1583 .downcast_ref::<io::Error>()
1584 .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
1585}
1586
1587fn serialize_blame_buffer_response(blame: git::blame::Blame) -> proto::BlameBufferResponse {
1588 let entries = blame
1589 .entries
1590 .into_iter()
1591 .map(|entry| proto::BlameEntry {
1592 sha: entry.sha.as_bytes().into(),
1593 start_line: entry.range.start,
1594 end_line: entry.range.end,
1595 original_line_number: entry.original_line_number,
1596 author: entry.author.clone(),
1597 author_mail: entry.author_mail.clone(),
1598 author_time: entry.author_time,
1599 author_tz: entry.author_tz.clone(),
1600 committer: entry.committer.clone(),
1601 committer_mail: entry.committer_mail.clone(),
1602 committer_time: entry.committer_time,
1603 committer_tz: entry.committer_tz.clone(),
1604 summary: entry.summary.clone(),
1605 previous: entry.previous.clone(),
1606 filename: entry.filename.clone(),
1607 })
1608 .collect::<Vec<_>>();
1609
1610 let messages = blame
1611 .messages
1612 .into_iter()
1613 .map(|(oid, message)| proto::CommitMessage {
1614 oid: oid.as_bytes().into(),
1615 message,
1616 })
1617 .collect::<Vec<_>>();
1618
1619 let permalinks = blame
1620 .permalinks
1621 .into_iter()
1622 .map(|(oid, url)| proto::CommitPermalink {
1623 oid: oid.as_bytes().into(),
1624 permalink: url.to_string(),
1625 })
1626 .collect::<Vec<_>>();
1627
1628 proto::BlameBufferResponse {
1629 entries,
1630 messages,
1631 permalinks,
1632 remote_url: blame.remote_url,
1633 }
1634}
1635
1636fn deserialize_blame_buffer_response(response: proto::BlameBufferResponse) -> git::blame::Blame {
1637 let entries = response
1638 .entries
1639 .into_iter()
1640 .filter_map(|entry| {
1641 Some(git::blame::BlameEntry {
1642 sha: git::Oid::from_bytes(&entry.sha).ok()?,
1643 range: entry.start_line..entry.end_line,
1644 original_line_number: entry.original_line_number,
1645 committer: entry.committer,
1646 committer_time: entry.committer_time,
1647 committer_tz: entry.committer_tz,
1648 committer_mail: entry.committer_mail,
1649 author: entry.author,
1650 author_mail: entry.author_mail,
1651 author_time: entry.author_time,
1652 author_tz: entry.author_tz,
1653 summary: entry.summary,
1654 previous: entry.previous,
1655 filename: entry.filename,
1656 })
1657 })
1658 .collect::<Vec<_>>();
1659
1660 let messages = response
1661 .messages
1662 .into_iter()
1663 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
1664 .collect::<HashMap<_, _>>();
1665
1666 let permalinks = response
1667 .permalinks
1668 .into_iter()
1669 .filter_map(|permalink| {
1670 Some((
1671 git::Oid::from_bytes(&permalink.oid).ok()?,
1672 Url::from_str(&permalink.permalink).ok()?,
1673 ))
1674 })
1675 .collect::<HashMap<_, _>>();
1676
1677 Blame {
1678 entries,
1679 permalinks,
1680 messages,
1681 remote_url: response.remote_url,
1682 }
1683}