1use crate::{
2 search::SearchQuery,
3 worktree_store::{WorktreeStore, WorktreeStoreEvent},
4 Item, NoRepositoryError, ProjectPath,
5};
6use anyhow::{anyhow, Context as _, Result};
7use client::Client;
8use collections::{hash_map, HashMap, HashSet};
9use fs::Fs;
10use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt};
11use git::blame::Blame;
12use gpui::{
13 AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Task, WeakModel,
14};
15use http_client::Url;
16use language::{
17 proto::{deserialize_line_ending, deserialize_version, serialize_version, split_operations},
18 Buffer, Capability, Event as BufferEvent, File as _, Language, Operation,
19};
20use rpc::{
21 proto::{self, AnyProtoClient, EnvelopedMessage},
22 ErrorExt as _, TypedEnvelope,
23};
24use smol::channel::Receiver;
25use std::{io, path::Path, str::FromStr as _, sync::Arc};
26use text::BufferId;
27use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
28use worktree::{
29 File, PathChange, ProjectEntryId, RemoteWorktree, UpdatedGitRepositoriesSet, Worktree,
30 WorktreeId,
31};
32
33/// A set of open buffers.
34pub struct BufferStore {
35 remote_id: Option<u64>,
36 #[allow(unused)]
37 worktree_store: Model<WorktreeStore>,
38 opened_buffers: HashMap<BufferId, OpenBuffer>,
39 local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
40 local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
41 #[allow(clippy::type_complexity)]
42 loading_buffers_by_path: HashMap<
43 ProjectPath,
44 postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
45 >,
46 loading_remote_buffers_by_id: HashMap<BufferId, Model<Buffer>>,
47 remote_buffer_listeners:
48 HashMap<BufferId, Vec<oneshot::Sender<Result<Model<Buffer>, anyhow::Error>>>>,
49 shared_buffers: HashMap<proto::PeerId, HashSet<BufferId>>,
50}
51
52enum OpenBuffer {
53 Strong(Model<Buffer>),
54 Weak(WeakModel<Buffer>),
55 Operations(Vec<Operation>),
56}
57
58pub enum BufferStoreEvent {
59 BufferAdded(Model<Buffer>),
60 BufferDropped(BufferId),
61 BufferChangedFilePath {
62 buffer: Model<Buffer>,
63 old_file: Option<Arc<dyn language::File>>,
64 },
65 MessageToReplicas(Box<proto::Envelope>),
66}
67
68impl EventEmitter<BufferStoreEvent> for BufferStore {}
69
70impl BufferStore {
71 /// Creates a buffer store, optionally retaining its buffers.
72 ///
73 /// If `retain_buffers` is `true`, then buffers are owned by the buffer store
74 /// and won't be released unless they are explicitly removed, or `retain_buffers`
75 /// is set to `false` via `set_retain_buffers`. Otherwise, buffers are stored as
76 /// weak handles.
77 pub fn new(
78 worktree_store: Model<WorktreeStore>,
79 remote_id: Option<u64>,
80 cx: &mut ModelContext<Self>,
81 ) -> Self {
82 cx.subscribe(&worktree_store, |this, _, event, cx| match event {
83 WorktreeStoreEvent::WorktreeAdded(worktree) => {
84 this.subscribe_to_worktree(worktree, cx);
85 }
86 _ => {}
87 })
88 .detach();
89
90 Self {
91 remote_id,
92 worktree_store,
93 opened_buffers: Default::default(),
94 remote_buffer_listeners: Default::default(),
95 loading_remote_buffers_by_id: Default::default(),
96 local_buffer_ids_by_path: Default::default(),
97 local_buffer_ids_by_entry_id: Default::default(),
98 loading_buffers_by_path: Default::default(),
99 shared_buffers: Default::default(),
100 }
101 }
102
103 pub fn open_buffer(
104 &mut self,
105 project_path: ProjectPath,
106 cx: &mut ModelContext<Self>,
107 ) -> Task<Result<Model<Buffer>>> {
108 let existing_buffer = self.get_by_path(&project_path, cx);
109 if let Some(existing_buffer) = existing_buffer {
110 return Task::ready(Ok(existing_buffer));
111 }
112
113 let Some(worktree) = self
114 .worktree_store
115 .read(cx)
116 .worktree_for_id(project_path.worktree_id, cx)
117 else {
118 return Task::ready(Err(anyhow!("no such worktree")));
119 };
120
121 let loading_watch = match self.loading_buffers_by_path.entry(project_path.clone()) {
122 // If the given path is already being loaded, then wait for that existing
123 // task to complete and return the same buffer.
124 hash_map::Entry::Occupied(e) => e.get().clone(),
125
126 // Otherwise, record the fact that this path is now being loaded.
127 hash_map::Entry::Vacant(entry) => {
128 let (mut tx, rx) = postage::watch::channel();
129 entry.insert(rx.clone());
130
131 let project_path = project_path.clone();
132 let load_buffer = match worktree.read(cx) {
133 Worktree::Local(_) => {
134 self.open_local_buffer_internal(project_path.path.clone(), worktree, cx)
135 }
136 Worktree::Remote(tree) => {
137 self.open_remote_buffer_internal(&project_path.path, tree, cx)
138 }
139 };
140
141 cx.spawn(move |this, mut cx| async move {
142 let load_result = load_buffer.await;
143 *tx.borrow_mut() = Some(this.update(&mut cx, |this, _| {
144 // Record the fact that the buffer is no longer loading.
145 this.loading_buffers_by_path.remove(&project_path);
146 let buffer = load_result.map_err(Arc::new)?;
147 Ok(buffer)
148 })?);
149 anyhow::Ok(())
150 })
151 .detach();
152 rx
153 }
154 };
155
156 cx.background_executor().spawn(async move {
157 Self::wait_for_loading_buffer(loading_watch)
158 .await
159 .map_err(|e| e.cloned())
160 })
161 }
162
163 fn subscribe_to_worktree(&mut self, worktree: &Model<Worktree>, cx: &mut ModelContext<Self>) {
164 cx.subscribe(worktree, |this, worktree, event, cx| {
165 if worktree.read(cx).is_local() {
166 match event {
167 worktree::Event::UpdatedEntries(changes) => {
168 this.local_worktree_entries_changed(&worktree, changes, cx);
169 }
170 worktree::Event::UpdatedGitRepositories(updated_repos) => {
171 this.local_worktree_git_repos_changed(worktree.clone(), updated_repos, cx)
172 }
173 _ => {}
174 }
175 }
176 })
177 .detach();
178 }
179
180 fn local_worktree_entries_changed(
181 &mut self,
182 worktree_handle: &Model<Worktree>,
183 changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
184 cx: &mut ModelContext<Self>,
185 ) {
186 let snapshot = worktree_handle.read(cx).snapshot();
187 for (path, entry_id, _) in changes {
188 self.local_worktree_entry_changed(*entry_id, path, worktree_handle, &snapshot, cx);
189 }
190 }
191
192 fn local_worktree_git_repos_changed(
193 &mut self,
194 worktree_handle: Model<Worktree>,
195 changed_repos: &UpdatedGitRepositoriesSet,
196 cx: &mut ModelContext<Self>,
197 ) {
198 debug_assert!(worktree_handle.read(cx).is_local());
199
200 // Identify the loading buffers whose containing repository that has changed.
201 let future_buffers = self
202 .loading_buffers()
203 .filter_map(|(project_path, receiver)| {
204 if project_path.worktree_id != worktree_handle.read(cx).id() {
205 return None;
206 }
207 let path = &project_path.path;
208 changed_repos
209 .iter()
210 .find(|(work_dir, _)| path.starts_with(work_dir))?;
211 let path = path.clone();
212 Some(async move {
213 Self::wait_for_loading_buffer(receiver)
214 .await
215 .ok()
216 .map(|buffer| (buffer, path))
217 })
218 })
219 .collect::<FuturesUnordered<_>>();
220
221 // Identify the current buffers whose containing repository has changed.
222 let current_buffers = self
223 .buffers()
224 .filter_map(|buffer| {
225 let file = File::from_dyn(buffer.read(cx).file())?;
226 if file.worktree != worktree_handle {
227 return None;
228 }
229 changed_repos
230 .iter()
231 .find(|(work_dir, _)| file.path.starts_with(work_dir))?;
232 Some((buffer, file.path.clone()))
233 })
234 .collect::<Vec<_>>();
235
236 if future_buffers.len() + current_buffers.len() == 0 {
237 return;
238 }
239
240 cx.spawn(move |this, mut cx| async move {
241 // Wait for all of the buffers to load.
242 let future_buffers = future_buffers.collect::<Vec<_>>().await;
243
244 // Reload the diff base for every buffer whose containing git repository has changed.
245 let snapshot =
246 worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
247 let diff_bases_by_buffer = cx
248 .background_executor()
249 .spawn(async move {
250 let mut diff_base_tasks = future_buffers
251 .into_iter()
252 .flatten()
253 .chain(current_buffers)
254 .filter_map(|(buffer, path)| {
255 let (repo_entry, local_repo_entry) = snapshot.repo_for_path(&path)?;
256 let relative_path = repo_entry.relativize(&snapshot, &path).ok()?;
257 Some(async move {
258 let base_text =
259 local_repo_entry.repo().load_index_text(&relative_path);
260 Some((buffer, base_text))
261 })
262 })
263 .collect::<FuturesUnordered<_>>();
264
265 let mut diff_bases = Vec::with_capacity(diff_base_tasks.len());
266 while let Some(diff_base) = diff_base_tasks.next().await {
267 if let Some(diff_base) = diff_base {
268 diff_bases.push(diff_base);
269 }
270 }
271 diff_bases
272 })
273 .await;
274
275 this.update(&mut cx, |this, cx| {
276 // Assign the new diff bases on all of the buffers.
277 for (buffer, diff_base) in diff_bases_by_buffer {
278 let buffer_id = buffer.update(cx, |buffer, cx| {
279 buffer.set_diff_base(diff_base.clone(), cx);
280 buffer.remote_id().to_proto()
281 });
282 if let Some(project_id) = this.remote_id {
283 cx.emit(BufferStoreEvent::MessageToReplicas(Box::new(
284 proto::UpdateDiffBase {
285 project_id,
286 buffer_id,
287 diff_base,
288 }
289 .into_envelope(0, None, None),
290 )))
291 }
292 }
293 })
294 })
295 .detach_and_log_err(cx);
296 }
297
298 fn open_local_buffer_internal(
299 &mut self,
300 path: Arc<Path>,
301 worktree: Model<Worktree>,
302 cx: &mut ModelContext<Self>,
303 ) -> Task<Result<Model<Buffer>>> {
304 let load_buffer = worktree.update(cx, |worktree, cx| {
305 let load_file = worktree.load_file(path.as_ref(), cx);
306 let reservation = cx.reserve_model();
307 let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
308 cx.spawn(move |_, mut cx| async move {
309 let loaded = load_file.await?;
310 let text_buffer = cx
311 .background_executor()
312 .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
313 .await;
314 cx.insert_model(reservation, |_| {
315 Buffer::build(
316 text_buffer,
317 loaded.diff_base,
318 Some(loaded.file),
319 Capability::ReadWrite,
320 )
321 })
322 })
323 });
324
325 cx.spawn(move |this, mut cx| async move {
326 let buffer = match load_buffer.await {
327 Ok(buffer) => Ok(buffer),
328 Err(error) if is_not_found_error(&error) => cx.new_model(|cx| {
329 let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
330 let text_buffer = text::Buffer::new(0, buffer_id, "".into());
331 Buffer::build(
332 text_buffer,
333 None,
334 Some(Arc::new(File {
335 worktree,
336 path,
337 mtime: None,
338 entry_id: None,
339 is_local: true,
340 is_deleted: false,
341 is_private: false,
342 })),
343 Capability::ReadWrite,
344 )
345 }),
346 Err(e) => Err(e),
347 }?;
348 this.update(&mut cx, |this, cx| {
349 this.add_buffer(buffer.clone(), cx).log_err();
350 })?;
351 Ok(buffer)
352 })
353 }
354
355 fn open_remote_buffer_internal(
356 &self,
357 path: &Arc<Path>,
358 worktree: &RemoteWorktree,
359 cx: &ModelContext<Self>,
360 ) -> Task<Result<Model<Buffer>>> {
361 let worktree_id = worktree.id().to_proto();
362 let project_id = worktree.project_id();
363 let client = worktree.client();
364 let path_string = path.clone().to_string_lossy().to_string();
365 cx.spawn(move |this, mut cx| async move {
366 let response = client
367 .request(proto::OpenBufferByPath {
368 project_id,
369 worktree_id,
370 path: path_string,
371 })
372 .await?;
373 let buffer_id = BufferId::new(response.buffer_id)?;
374 this.update(&mut cx, |this, cx| {
375 this.wait_for_remote_buffer(buffer_id, cx)
376 })?
377 .await
378 })
379 }
380
381 pub fn create_buffer(
382 &mut self,
383 remote_client: Option<(AnyProtoClient, u64)>,
384 cx: &mut ModelContext<Self>,
385 ) -> Task<Result<Model<Buffer>>> {
386 if let Some((remote_client, project_id)) = remote_client {
387 let create = remote_client.request(proto::OpenNewBuffer { project_id });
388 cx.spawn(|this, mut cx| async move {
389 let response = create.await?;
390 let buffer_id = BufferId::new(response.buffer_id)?;
391
392 this.update(&mut cx, |this, cx| {
393 this.wait_for_remote_buffer(buffer_id, cx)
394 })?
395 .await
396 })
397 } else {
398 Task::ready(Ok(self.create_local_buffer("", None, cx)))
399 }
400 }
401
402 pub fn create_local_buffer(
403 &mut self,
404 text: &str,
405 language: Option<Arc<Language>>,
406 cx: &mut ModelContext<Self>,
407 ) -> Model<Buffer> {
408 let buffer = cx.new_model(|cx| {
409 Buffer::local(text, cx)
410 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
411 });
412 self.add_buffer(buffer.clone(), cx).log_err();
413 buffer
414 }
415
416 pub fn save_buffer(
417 &mut self,
418 buffer: Model<Buffer>,
419 cx: &mut ModelContext<Self>,
420 ) -> Task<Result<()>> {
421 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
422 return Task::ready(Err(anyhow!("buffer doesn't have a file")));
423 };
424 match file.worktree.read(cx) {
425 Worktree::Local(_) => {
426 self.save_local_buffer(file.worktree.clone(), buffer, file.path.clone(), false, cx)
427 }
428 Worktree::Remote(tree) => self.save_remote_buffer(buffer, None, tree, cx),
429 }
430 }
431
432 pub fn save_buffer_as(
433 &mut self,
434 buffer: Model<Buffer>,
435 path: ProjectPath,
436 cx: &mut ModelContext<Self>,
437 ) -> Task<Result<()>> {
438 let Some(worktree) = self
439 .worktree_store
440 .read(cx)
441 .worktree_for_id(path.worktree_id, cx)
442 else {
443 return Task::ready(Err(anyhow!("no such worktree")));
444 };
445
446 let old_file = buffer.read(cx).file().cloned();
447
448 let task = match worktree.read(cx) {
449 Worktree::Local(_) => {
450 self.save_local_buffer(worktree, buffer.clone(), path.path, true, cx)
451 }
452 Worktree::Remote(tree) => {
453 self.save_remote_buffer(buffer.clone(), Some(path.to_proto()), tree, cx)
454 }
455 };
456 cx.spawn(|this, mut cx| async move {
457 task.await?;
458 this.update(&mut cx, |_, cx| {
459 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
460 })
461 })
462 }
463
464 fn save_local_buffer(
465 &self,
466 worktree: Model<Worktree>,
467 buffer_handle: Model<Buffer>,
468 path: Arc<Path>,
469 mut has_changed_file: bool,
470 cx: &mut ModelContext<Self>,
471 ) -> Task<Result<()>> {
472 let buffer = buffer_handle.read(cx);
473 let text = buffer.as_rope().clone();
474 let line_ending = buffer.line_ending();
475 let version = buffer.version();
476 let buffer_id = buffer.remote_id();
477 if buffer.file().is_some_and(|file| !file.is_created()) {
478 has_changed_file = true;
479 }
480
481 let save = worktree.update(cx, |worktree, cx| {
482 worktree.write_file(path.as_ref(), text, line_ending, cx)
483 });
484
485 cx.spawn(move |this, mut cx| async move {
486 let new_file = save.await?;
487 let mtime = new_file.mtime;
488 this.update(&mut cx, |this, cx| {
489 if let Some(project_id) = this.remote_id {
490 if has_changed_file {
491 cx.emit(BufferStoreEvent::MessageToReplicas(Box::new(
492 proto::UpdateBufferFile {
493 project_id,
494 buffer_id: buffer_id.to_proto(),
495 file: Some(language::File::to_proto(&*new_file, cx)),
496 }
497 .into_envelope(0, None, None),
498 )));
499 }
500 cx.emit(BufferStoreEvent::MessageToReplicas(Box::new(
501 proto::BufferSaved {
502 project_id,
503 buffer_id: buffer_id.to_proto(),
504 version: serialize_version(&version),
505 mtime: mtime.map(|time| time.into()),
506 }
507 .into_envelope(0, None, None),
508 )));
509 }
510 })?;
511 buffer_handle.update(&mut cx, |buffer, cx| {
512 if has_changed_file {
513 buffer.file_updated(new_file, cx);
514 }
515 buffer.did_save(version.clone(), mtime, cx);
516 })
517 })
518 }
519
520 fn save_remote_buffer(
521 &self,
522 buffer_handle: Model<Buffer>,
523 new_path: Option<proto::ProjectPath>,
524 tree: &RemoteWorktree,
525 cx: &ModelContext<Self>,
526 ) -> Task<Result<()>> {
527 let buffer = buffer_handle.read(cx);
528 let buffer_id = buffer.remote_id().into();
529 let version = buffer.version();
530 let rpc = tree.client();
531 let project_id = tree.project_id();
532 cx.spawn(move |_, mut cx| async move {
533 let response = rpc
534 .request(proto::SaveBuffer {
535 project_id,
536 buffer_id,
537 new_path,
538 version: serialize_version(&version),
539 })
540 .await?;
541 let version = deserialize_version(&response.version);
542 let mtime = response.mtime.map(|mtime| mtime.into());
543
544 buffer_handle.update(&mut cx, |buffer, cx| {
545 buffer.did_save(version.clone(), mtime, cx);
546 })?;
547
548 Ok(())
549 })
550 }
551
552 pub fn blame_buffer(
553 &self,
554 buffer: &Model<Buffer>,
555 version: Option<clock::Global>,
556 cx: &AppContext,
557 ) -> Task<Result<Blame>> {
558 let buffer = buffer.read(cx);
559 let Some(file) = File::from_dyn(buffer.file()) else {
560 return Task::ready(Err(anyhow!("buffer has no file")));
561 };
562
563 match file.worktree.clone().read(cx) {
564 Worktree::Local(worktree) => {
565 let worktree = worktree.snapshot();
566 let blame_params = maybe!({
567 let (repo_entry, local_repo_entry) = match worktree.repo_for_path(&file.path) {
568 Some(repo_for_path) => repo_for_path,
569 None => anyhow::bail!(NoRepositoryError {}),
570 };
571
572 let relative_path = repo_entry
573 .relativize(&worktree, &file.path)
574 .context("failed to relativize buffer path")?;
575
576 let repo = local_repo_entry.repo().clone();
577
578 let content = match version {
579 Some(version) => buffer.rope_for_version(&version).clone(),
580 None => buffer.as_rope().clone(),
581 };
582
583 anyhow::Ok((repo, relative_path, content))
584 });
585
586 cx.background_executor().spawn(async move {
587 let (repo, relative_path, content) = blame_params?;
588 repo.blame(&relative_path, content)
589 .with_context(|| format!("Failed to blame {:?}", relative_path.0))
590 })
591 }
592 Worktree::Remote(worktree) => {
593 let buffer_id = buffer.remote_id();
594 let version = buffer.version();
595 let project_id = worktree.project_id();
596 let client = worktree.client();
597 cx.spawn(|_| async move {
598 let response = client
599 .request(proto::BlameBuffer {
600 project_id,
601 buffer_id: buffer_id.into(),
602 version: serialize_version(&version),
603 })
604 .await?;
605 Ok(deserialize_blame_buffer_response(response))
606 })
607 }
608 }
609 }
610
611 fn add_buffer(&mut self, buffer: Model<Buffer>, cx: &mut ModelContext<Self>) -> Result<()> {
612 let remote_id = buffer.read(cx).remote_id();
613 let is_remote = buffer.read(cx).replica_id() != 0;
614 let open_buffer = if self.remote_id.is_some() {
615 OpenBuffer::Strong(buffer.clone())
616 } else {
617 OpenBuffer::Weak(buffer.downgrade())
618 };
619
620 let handle = cx.handle().downgrade();
621 buffer.update(cx, move |_, cx| {
622 cx.on_release(move |buffer, cx| {
623 handle
624 .update(cx, |_, cx| {
625 cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
626 })
627 .ok();
628 })
629 .detach()
630 });
631
632 match self.opened_buffers.entry(remote_id) {
633 hash_map::Entry::Vacant(entry) => {
634 entry.insert(open_buffer);
635 }
636 hash_map::Entry::Occupied(mut entry) => {
637 if let OpenBuffer::Operations(operations) = entry.get_mut() {
638 buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx))?;
639 } else if entry.get().upgrade().is_some() {
640 if is_remote {
641 return Ok(());
642 } else {
643 debug_panic!("buffer {} was already registered", remote_id);
644 Err(anyhow!("buffer {} was already registered", remote_id))?;
645 }
646 }
647 entry.insert(open_buffer);
648 }
649 }
650
651 if let Some(senders) = self.remote_buffer_listeners.remove(&remote_id) {
652 for sender in senders {
653 sender.send(Ok(buffer.clone())).ok();
654 }
655 }
656
657 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
658 if file.is_local {
659 self.local_buffer_ids_by_path.insert(
660 ProjectPath {
661 worktree_id: file.worktree_id(cx),
662 path: file.path.clone(),
663 },
664 remote_id,
665 );
666
667 if let Some(entry_id) = file.entry_id {
668 self.local_buffer_ids_by_entry_id
669 .insert(entry_id, remote_id);
670 }
671 }
672 }
673
674 cx.subscribe(&buffer, Self::on_buffer_event).detach();
675 cx.emit(BufferStoreEvent::BufferAdded(buffer));
676 Ok(())
677 }
678
679 pub fn buffers(&self) -> impl '_ + Iterator<Item = Model<Buffer>> {
680 self.opened_buffers
681 .values()
682 .filter_map(|buffer| buffer.upgrade())
683 }
684
685 pub fn loading_buffers(
686 &self,
687 ) -> impl Iterator<
688 Item = (
689 &ProjectPath,
690 postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
691 ),
692 > {
693 self.loading_buffers_by_path
694 .iter()
695 .map(|(path, rx)| (path, rx.clone()))
696 }
697
698 pub fn get_by_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Model<Buffer>> {
699 self.buffers().find_map(|buffer| {
700 let file = File::from_dyn(buffer.read(cx).file())?;
701 if file.worktree_id(cx) == path.worktree_id && &file.path == &path.path {
702 Some(buffer)
703 } else {
704 None
705 }
706 })
707 }
708
709 pub fn get(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
710 self.opened_buffers
711 .get(&buffer_id)
712 .and_then(|buffer| buffer.upgrade())
713 }
714
715 pub fn get_existing(&self, buffer_id: BufferId) -> Result<Model<Buffer>> {
716 self.get(buffer_id)
717 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
718 }
719
720 pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
721 self.get(buffer_id)
722 .or_else(|| self.loading_remote_buffers_by_id.get(&buffer_id).cloned())
723 }
724
725 pub fn wait_for_remote_buffer(
726 &mut self,
727 id: BufferId,
728 cx: &mut AppContext,
729 ) -> Task<Result<Model<Buffer>>> {
730 let buffer = self.get(id);
731 if let Some(buffer) = buffer {
732 return Task::ready(Ok(buffer));
733 }
734 let (tx, rx) = oneshot::channel();
735 self.remote_buffer_listeners.entry(id).or_default().push(tx);
736 cx.background_executor().spawn(async move { rx.await? })
737 }
738
739 pub fn buffer_version_info(
740 &self,
741 cx: &AppContext,
742 ) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
743 let buffers = self
744 .buffers()
745 .map(|buffer| {
746 let buffer = buffer.read(cx);
747 proto::BufferVersion {
748 id: buffer.remote_id().into(),
749 version: language::proto::serialize_version(&buffer.version),
750 }
751 })
752 .collect();
753 let incomplete_buffer_ids = self
754 .loading_remote_buffers_by_id
755 .keys()
756 .copied()
757 .collect::<Vec<_>>();
758 (buffers, incomplete_buffer_ids)
759 }
760
761 pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
762 self.set_remote_id(None, cx);
763
764 for buffer in self.buffers() {
765 buffer.update(cx, |buffer, cx| {
766 buffer.set_capability(Capability::ReadOnly, cx)
767 });
768 }
769
770 // Wake up all futures currently waiting on a buffer to get opened,
771 // to give them a chance to fail now that we've disconnected.
772 self.remote_buffer_listeners.clear();
773 }
774
775 pub fn set_remote_id(&mut self, remote_id: Option<u64>, cx: &mut AppContext) {
776 self.remote_id = remote_id;
777 for open_buffer in self.opened_buffers.values_mut() {
778 if remote_id.is_some() {
779 if let OpenBuffer::Weak(buffer) = open_buffer {
780 if let Some(buffer) = buffer.upgrade() {
781 *open_buffer = OpenBuffer::Strong(buffer);
782 }
783 }
784 } else {
785 if let Some(buffer) = open_buffer.upgrade() {
786 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
787 }
788 if let OpenBuffer::Strong(buffer) = open_buffer {
789 *open_buffer = OpenBuffer::Weak(buffer.downgrade());
790 }
791 }
792 }
793 }
794
795 pub fn discard_incomplete(&mut self) {
796 self.opened_buffers
797 .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
798 }
799
800 pub fn find_search_candidates(
801 &mut self,
802 query: &SearchQuery,
803 mut limit: usize,
804 fs: Arc<dyn Fs>,
805 cx: &mut ModelContext<Self>,
806 ) -> Receiver<Model<Buffer>> {
807 let (tx, rx) = smol::channel::unbounded();
808 let mut open_buffers = HashSet::default();
809 let mut unnamed_buffers = Vec::new();
810 for handle in self.buffers() {
811 let buffer = handle.read(cx);
812 if let Some(entry_id) = buffer.entry_id(cx) {
813 open_buffers.insert(entry_id);
814 } else {
815 limit = limit.saturating_sub(1);
816 unnamed_buffers.push(handle)
817 };
818 }
819
820 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
821 let mut project_paths_rx = self
822 .worktree_store
823 .update(cx, |worktree_store, cx| {
824 worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
825 })
826 .chunks(MAX_CONCURRENT_BUFFER_OPENS);
827
828 cx.spawn(|this, mut cx| async move {
829 for buffer in unnamed_buffers {
830 tx.send(buffer).await.ok();
831 }
832
833 while let Some(project_paths) = project_paths_rx.next().await {
834 let buffers = this.update(&mut cx, |this, cx| {
835 project_paths
836 .into_iter()
837 .map(|project_path| this.open_buffer(project_path, cx))
838 .collect::<Vec<_>>()
839 })?;
840 for buffer_task in buffers {
841 if let Some(buffer) = buffer_task.await.log_err() {
842 if tx.send(buffer).await.is_err() {
843 return anyhow::Ok(());
844 }
845 }
846 }
847 }
848 anyhow::Ok(())
849 })
850 .detach();
851 rx
852 }
853
854 fn on_buffer_event(
855 &mut self,
856 buffer: Model<Buffer>,
857 event: &BufferEvent,
858 cx: &mut ModelContext<Self>,
859 ) {
860 match event {
861 BufferEvent::FileHandleChanged => {
862 self.buffer_changed_file(buffer, cx);
863 }
864 _ => {}
865 }
866 }
867
868 fn local_worktree_entry_changed(
869 &mut self,
870 entry_id: ProjectEntryId,
871 path: &Arc<Path>,
872 worktree: &Model<worktree::Worktree>,
873 snapshot: &worktree::Snapshot,
874 cx: &mut ModelContext<Self>,
875 ) -> Option<()> {
876 let project_path = ProjectPath {
877 worktree_id: snapshot.id(),
878 path: path.clone(),
879 };
880 let buffer_id = match self.local_buffer_ids_by_entry_id.get(&entry_id) {
881 Some(&buffer_id) => buffer_id,
882 None => self.local_buffer_ids_by_path.get(&project_path).copied()?,
883 };
884 let buffer = if let Some(buffer) = self.get(buffer_id) {
885 buffer
886 } else {
887 self.opened_buffers.remove(&buffer_id);
888 self.local_buffer_ids_by_path.remove(&project_path);
889 self.local_buffer_ids_by_entry_id.remove(&entry_id);
890 return None;
891 };
892
893 let events = buffer.update(cx, |buffer, cx| {
894 let file = buffer.file()?;
895 let old_file = File::from_dyn(Some(file))?;
896 if old_file.worktree != *worktree {
897 return None;
898 }
899
900 let new_file = if let Some(entry) = old_file
901 .entry_id
902 .and_then(|entry_id| snapshot.entry_for_id(entry_id))
903 {
904 File {
905 is_local: true,
906 entry_id: Some(entry.id),
907 mtime: entry.mtime,
908 path: entry.path.clone(),
909 worktree: worktree.clone(),
910 is_deleted: false,
911 is_private: entry.is_private,
912 }
913 } else if let Some(entry) = snapshot.entry_for_path(old_file.path.as_ref()) {
914 File {
915 is_local: true,
916 entry_id: Some(entry.id),
917 mtime: entry.mtime,
918 path: entry.path.clone(),
919 worktree: worktree.clone(),
920 is_deleted: false,
921 is_private: entry.is_private,
922 }
923 } else {
924 File {
925 is_local: true,
926 entry_id: old_file.entry_id,
927 path: old_file.path.clone(),
928 mtime: old_file.mtime,
929 worktree: worktree.clone(),
930 is_deleted: true,
931 is_private: old_file.is_private,
932 }
933 };
934
935 if new_file == *old_file {
936 return None;
937 }
938
939 let mut events = Vec::new();
940 if new_file.path != old_file.path {
941 self.local_buffer_ids_by_path.remove(&ProjectPath {
942 path: old_file.path.clone(),
943 worktree_id: old_file.worktree_id(cx),
944 });
945 self.local_buffer_ids_by_path.insert(
946 ProjectPath {
947 worktree_id: new_file.worktree_id(cx),
948 path: new_file.path.clone(),
949 },
950 buffer_id,
951 );
952 events.push(BufferStoreEvent::BufferChangedFilePath {
953 buffer: cx.handle(),
954 old_file: buffer.file().cloned(),
955 });
956 }
957
958 if new_file.entry_id != old_file.entry_id {
959 if let Some(entry_id) = old_file.entry_id {
960 self.local_buffer_ids_by_entry_id.remove(&entry_id);
961 }
962 if let Some(entry_id) = new_file.entry_id {
963 self.local_buffer_ids_by_entry_id
964 .insert(entry_id, buffer_id);
965 }
966 }
967
968 if let Some(project_id) = self.remote_id {
969 events.push(BufferStoreEvent::MessageToReplicas(Box::new(
970 proto::UpdateBufferFile {
971 project_id,
972 buffer_id: buffer_id.to_proto(),
973 file: Some(new_file.to_proto(cx)),
974 }
975 .into_envelope(0, None, None),
976 )))
977 }
978
979 buffer.file_updated(Arc::new(new_file), cx);
980 Some(events)
981 })?;
982
983 for event in events {
984 cx.emit(event);
985 }
986
987 None
988 }
989
990 fn buffer_changed_file(&mut self, buffer: Model<Buffer>, cx: &mut AppContext) -> Option<()> {
991 let file = File::from_dyn(buffer.read(cx).file())?;
992
993 let remote_id = buffer.read(cx).remote_id();
994 if let Some(entry_id) = file.entry_id {
995 match self.local_buffer_ids_by_entry_id.get(&entry_id) {
996 Some(_) => {
997 return None;
998 }
999 None => {
1000 self.local_buffer_ids_by_entry_id
1001 .insert(entry_id, remote_id);
1002 }
1003 }
1004 };
1005 self.local_buffer_ids_by_path.insert(
1006 ProjectPath {
1007 worktree_id: file.worktree_id(cx),
1008 path: file.path.clone(),
1009 },
1010 remote_id,
1011 );
1012
1013 Some(())
1014 }
1015
1016 pub async fn handle_update_buffer(
1017 this: Model<Self>,
1018 envelope: TypedEnvelope<proto::UpdateBuffer>,
1019 mut cx: AsyncAppContext,
1020 ) -> Result<proto::Ack> {
1021 let payload = envelope.payload.clone();
1022 let buffer_id = BufferId::new(payload.buffer_id)?;
1023 let ops = payload
1024 .operations
1025 .into_iter()
1026 .map(language::proto::deserialize_operation)
1027 .collect::<Result<Vec<_>, _>>()?;
1028 this.update(&mut cx, |this, cx| {
1029 match this.opened_buffers.entry(buffer_id) {
1030 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
1031 OpenBuffer::Strong(buffer) => {
1032 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
1033 }
1034 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
1035 OpenBuffer::Weak(_) => {}
1036 },
1037 hash_map::Entry::Vacant(e) => {
1038 e.insert(OpenBuffer::Operations(ops));
1039 }
1040 }
1041 Ok(proto::Ack {})
1042 })?
1043 }
1044
1045 pub fn handle_synchronize_buffers(
1046 &mut self,
1047 envelope: TypedEnvelope<proto::SynchronizeBuffers>,
1048 cx: &mut ModelContext<Self>,
1049 client: Arc<Client>,
1050 ) -> Result<proto::SynchronizeBuffersResponse> {
1051 let project_id = envelope.payload.project_id;
1052 let mut response = proto::SynchronizeBuffersResponse {
1053 buffers: Default::default(),
1054 };
1055 let Some(guest_id) = envelope.original_sender_id else {
1056 anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
1057 };
1058
1059 self.shared_buffers.entry(guest_id).or_default().clear();
1060 for buffer in envelope.payload.buffers {
1061 let buffer_id = BufferId::new(buffer.id)?;
1062 let remote_version = language::proto::deserialize_version(&buffer.version);
1063 if let Some(buffer) = self.get(buffer_id) {
1064 self.shared_buffers
1065 .entry(guest_id)
1066 .or_default()
1067 .insert(buffer_id);
1068
1069 let buffer = buffer.read(cx);
1070 response.buffers.push(proto::BufferVersion {
1071 id: buffer_id.into(),
1072 version: language::proto::serialize_version(&buffer.version),
1073 });
1074
1075 let operations = buffer.serialize_ops(Some(remote_version), cx);
1076 let client = client.clone();
1077 if let Some(file) = buffer.file() {
1078 client
1079 .send(proto::UpdateBufferFile {
1080 project_id,
1081 buffer_id: buffer_id.into(),
1082 file: Some(file.to_proto(cx)),
1083 })
1084 .log_err();
1085 }
1086
1087 client
1088 .send(proto::UpdateDiffBase {
1089 project_id,
1090 buffer_id: buffer_id.into(),
1091 diff_base: buffer.diff_base().map(ToString::to_string),
1092 })
1093 .log_err();
1094
1095 client
1096 .send(proto::BufferReloaded {
1097 project_id,
1098 buffer_id: buffer_id.into(),
1099 version: language::proto::serialize_version(buffer.saved_version()),
1100 mtime: buffer.saved_mtime().map(|time| time.into()),
1101 line_ending: language::proto::serialize_line_ending(buffer.line_ending())
1102 as i32,
1103 })
1104 .log_err();
1105
1106 cx.background_executor()
1107 .spawn(
1108 async move {
1109 let operations = operations.await;
1110 for chunk in split_operations(operations) {
1111 client
1112 .request(proto::UpdateBuffer {
1113 project_id,
1114 buffer_id: buffer_id.into(),
1115 operations: chunk,
1116 })
1117 .await?;
1118 }
1119 anyhow::Ok(())
1120 }
1121 .log_err(),
1122 )
1123 .detach();
1124 }
1125 }
1126 Ok(response)
1127 }
1128
1129 pub fn handle_create_buffer_for_peer(
1130 &mut self,
1131 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
1132 replica_id: u16,
1133 capability: Capability,
1134 cx: &mut ModelContext<Self>,
1135 ) -> Result<()> {
1136 match envelope
1137 .payload
1138 .variant
1139 .ok_or_else(|| anyhow!("missing variant"))?
1140 {
1141 proto::create_buffer_for_peer::Variant::State(mut state) => {
1142 let buffer_id = BufferId::new(state.id)?;
1143
1144 let buffer_result = maybe!({
1145 let mut buffer_file = None;
1146 if let Some(file) = state.file.take() {
1147 let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
1148 let worktree = self
1149 .worktree_store
1150 .read(cx)
1151 .worktree_for_id(worktree_id, cx)
1152 .ok_or_else(|| {
1153 anyhow!("no worktree found for id {}", file.worktree_id)
1154 })?;
1155 buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
1156 as Arc<dyn language::File>);
1157 }
1158 Buffer::from_proto(replica_id, capability, state, buffer_file)
1159 });
1160
1161 match buffer_result {
1162 Ok(buffer) => {
1163 let buffer = cx.new_model(|_| buffer);
1164 self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
1165 }
1166 Err(error) => {
1167 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
1168 for listener in listeners {
1169 listener.send(Err(anyhow!(error.cloned()))).ok();
1170 }
1171 }
1172 }
1173 }
1174 }
1175 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
1176 let buffer_id = BufferId::new(chunk.buffer_id)?;
1177 let buffer = self
1178 .loading_remote_buffers_by_id
1179 .get(&buffer_id)
1180 .cloned()
1181 .ok_or_else(|| {
1182 anyhow!(
1183 "received chunk for buffer {} without initial state",
1184 chunk.buffer_id
1185 )
1186 })?;
1187
1188 let result = maybe!({
1189 let operations = chunk
1190 .operations
1191 .into_iter()
1192 .map(language::proto::deserialize_operation)
1193 .collect::<Result<Vec<_>>>()?;
1194 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))
1195 });
1196
1197 if let Err(error) = result {
1198 self.loading_remote_buffers_by_id.remove(&buffer_id);
1199 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
1200 for listener in listeners {
1201 listener.send(Err(error.cloned())).ok();
1202 }
1203 }
1204 } else if chunk.is_last {
1205 self.loading_remote_buffers_by_id.remove(&buffer_id);
1206 self.add_buffer(buffer, cx)?;
1207 }
1208 }
1209 }
1210
1211 Ok(())
1212 }
1213
1214 pub async fn handle_update_buffer_file(
1215 this: Model<Self>,
1216 envelope: TypedEnvelope<proto::UpdateBufferFile>,
1217 mut cx: AsyncAppContext,
1218 ) -> Result<()> {
1219 let buffer_id = envelope.payload.buffer_id;
1220 let buffer_id = BufferId::new(buffer_id)?;
1221
1222 this.update(&mut cx, |this, cx| {
1223 let payload = envelope.payload.clone();
1224 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1225 let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
1226 let worktree = this
1227 .worktree_store
1228 .read(cx)
1229 .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
1230 .ok_or_else(|| anyhow!("no such worktree"))?;
1231 let file = File::from_proto(file, worktree, cx)?;
1232 let old_file = buffer.update(cx, |buffer, cx| {
1233 let old_file = buffer.file().cloned();
1234 let new_path = file.path.clone();
1235 buffer.file_updated(Arc::new(file), cx);
1236 if old_file
1237 .as_ref()
1238 .map_or(true, |old| *old.path() != new_path)
1239 {
1240 Some(old_file)
1241 } else {
1242 None
1243 }
1244 });
1245 if let Some(old_file) = old_file {
1246 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1247 }
1248 }
1249 Ok(())
1250 })?
1251 }
1252
1253 pub async fn handle_update_diff_base(
1254 this: Model<Self>,
1255 envelope: TypedEnvelope<proto::UpdateDiffBase>,
1256 mut cx: AsyncAppContext,
1257 ) -> Result<()> {
1258 this.update(&mut cx, |this, cx| {
1259 let buffer_id = envelope.payload.buffer_id;
1260 let buffer_id = BufferId::new(buffer_id)?;
1261 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1262 buffer.update(cx, |buffer, cx| {
1263 buffer.set_diff_base(envelope.payload.diff_base, cx)
1264 });
1265 }
1266 Ok(())
1267 })?
1268 }
1269
1270 pub async fn handle_save_buffer(
1271 this: Model<Self>,
1272 envelope: TypedEnvelope<proto::SaveBuffer>,
1273 mut cx: AsyncAppContext,
1274 ) -> Result<proto::BufferSaved> {
1275 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1276 let (buffer, project_id) = this.update(&mut cx, |this, _| {
1277 anyhow::Ok((
1278 this.get_existing(buffer_id)?,
1279 this.remote_id.context("project is not shared")?,
1280 ))
1281 })??;
1282 buffer
1283 .update(&mut cx, |buffer, _| {
1284 buffer.wait_for_version(deserialize_version(&envelope.payload.version))
1285 })?
1286 .await?;
1287 let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
1288
1289 if let Some(new_path) = envelope.payload.new_path {
1290 let new_path = ProjectPath::from_proto(new_path);
1291 this.update(&mut cx, |this, cx| {
1292 this.save_buffer_as(buffer.clone(), new_path, cx)
1293 })?
1294 .await?;
1295 } else {
1296 this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
1297 .await?;
1298 }
1299
1300 buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
1301 project_id,
1302 buffer_id: buffer_id.into(),
1303 version: serialize_version(buffer.saved_version()),
1304 mtime: buffer.saved_mtime().map(|time| time.into()),
1305 })
1306 }
1307
1308 pub async fn handle_close_buffer(
1309 this: Model<Self>,
1310 envelope: TypedEnvelope<proto::CloseBuffer>,
1311 mut cx: AsyncAppContext,
1312 ) -> Result<()> {
1313 let peer_id = envelope.sender_id;
1314 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1315 this.update(&mut cx, |this, _| {
1316 if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
1317 if shared.remove(&buffer_id) {
1318 if shared.is_empty() {
1319 this.shared_buffers.remove(&peer_id);
1320 }
1321 return;
1322 }
1323 };
1324 debug_panic!(
1325 "peer_id {} closed buffer_id {} which was either not open or already closed",
1326 peer_id,
1327 buffer_id
1328 )
1329 })
1330 }
1331
1332 pub async fn handle_buffer_saved(
1333 this: Model<Self>,
1334 envelope: TypedEnvelope<proto::BufferSaved>,
1335 mut cx: AsyncAppContext,
1336 ) -> Result<()> {
1337 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1338 let version = deserialize_version(&envelope.payload.version);
1339 let mtime = envelope.payload.mtime.map(|time| time.into());
1340 this.update(&mut cx, |this, cx| {
1341 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1342 buffer.update(cx, |buffer, cx| {
1343 buffer.did_save(version, mtime, cx);
1344 });
1345 }
1346 })
1347 }
1348
1349 pub async fn handle_buffer_reloaded(
1350 this: Model<Self>,
1351 envelope: TypedEnvelope<proto::BufferReloaded>,
1352 mut cx: AsyncAppContext,
1353 ) -> Result<()> {
1354 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1355 let version = deserialize_version(&envelope.payload.version);
1356 let mtime = envelope.payload.mtime.map(|time| time.into());
1357 let line_ending = deserialize_line_ending(
1358 proto::LineEnding::from_i32(envelope.payload.line_ending)
1359 .ok_or_else(|| anyhow!("missing line ending"))?,
1360 );
1361 this.update(&mut cx, |this, cx| {
1362 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1363 buffer.update(cx, |buffer, cx| {
1364 buffer.did_reload(version, line_ending, mtime, cx);
1365 });
1366 }
1367 })
1368 }
1369
1370 pub async fn handle_blame_buffer(
1371 this: Model<Self>,
1372 envelope: TypedEnvelope<proto::BlameBuffer>,
1373 mut cx: AsyncAppContext,
1374 ) -> Result<proto::BlameBufferResponse> {
1375 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1376 let version = deserialize_version(&envelope.payload.version);
1377 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1378 buffer
1379 .update(&mut cx, |buffer, _| {
1380 buffer.wait_for_version(version.clone())
1381 })?
1382 .await?;
1383 let blame = this
1384 .update(&mut cx, |this, cx| {
1385 this.blame_buffer(&buffer, Some(version), cx)
1386 })?
1387 .await?;
1388 Ok(serialize_blame_buffer_response(blame))
1389 }
1390
1391 pub async fn wait_for_loading_buffer(
1392 mut receiver: postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
1393 ) -> Result<Model<Buffer>, Arc<anyhow::Error>> {
1394 loop {
1395 if let Some(result) = receiver.borrow().as_ref() {
1396 match result {
1397 Ok(buffer) => return Ok(buffer.to_owned()),
1398 Err(e) => return Err(e.to_owned()),
1399 }
1400 }
1401 receiver.next().await;
1402 }
1403 }
1404
1405 pub fn create_buffer_for_peer(
1406 &mut self,
1407 buffer: &Model<Buffer>,
1408 peer_id: proto::PeerId,
1409 project_id: u64,
1410 client: AnyProtoClient,
1411 cx: &mut ModelContext<Self>,
1412 ) -> Task<Result<()>> {
1413 let buffer_id = buffer.read(cx).remote_id();
1414 if !self
1415 .shared_buffers
1416 .entry(peer_id)
1417 .or_default()
1418 .insert(buffer_id)
1419 {
1420 return Task::ready(Ok(()));
1421 }
1422
1423 cx.spawn(|this, mut cx| async move {
1424 let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else {
1425 return anyhow::Ok(());
1426 };
1427
1428 let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
1429 let operations = operations.await;
1430 let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?;
1431
1432 let initial_state = proto::CreateBufferForPeer {
1433 project_id,
1434 peer_id: Some(peer_id),
1435 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
1436 };
1437
1438 if client.send(initial_state).log_err().is_some() {
1439 let client = client.clone();
1440 cx.background_executor()
1441 .spawn(async move {
1442 let mut chunks = split_operations(operations).peekable();
1443 while let Some(chunk) = chunks.next() {
1444 let is_last = chunks.peek().is_none();
1445 client.send(proto::CreateBufferForPeer {
1446 project_id,
1447 peer_id: Some(peer_id),
1448 variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
1449 proto::BufferChunk {
1450 buffer_id: buffer_id.into(),
1451 operations: chunk,
1452 is_last,
1453 },
1454 )),
1455 })?;
1456 }
1457 anyhow::Ok(())
1458 })
1459 .await
1460 .log_err();
1461 }
1462 Ok(())
1463 })
1464 }
1465
1466 pub fn forget_shared_buffers(&mut self) {
1467 self.shared_buffers.clear();
1468 }
1469
1470 pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
1471 self.shared_buffers.remove(peer_id);
1472 }
1473
1474 pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
1475 if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
1476 self.shared_buffers.insert(new_peer_id, buffers);
1477 }
1478 }
1479
1480 pub fn shared_buffers(&self) -> &HashMap<proto::PeerId, HashSet<BufferId>> {
1481 &self.shared_buffers
1482 }
1483}
1484
1485impl OpenBuffer {
1486 fn upgrade(&self) -> Option<Model<Buffer>> {
1487 match self {
1488 OpenBuffer::Strong(handle) => Some(handle.clone()),
1489 OpenBuffer::Weak(handle) => handle.upgrade(),
1490 OpenBuffer::Operations(_) => None,
1491 }
1492 }
1493}
1494
1495fn is_not_found_error(error: &anyhow::Error) -> bool {
1496 error
1497 .root_cause()
1498 .downcast_ref::<io::Error>()
1499 .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
1500}
1501
1502fn serialize_blame_buffer_response(blame: git::blame::Blame) -> proto::BlameBufferResponse {
1503 let entries = blame
1504 .entries
1505 .into_iter()
1506 .map(|entry| proto::BlameEntry {
1507 sha: entry.sha.as_bytes().into(),
1508 start_line: entry.range.start,
1509 end_line: entry.range.end,
1510 original_line_number: entry.original_line_number,
1511 author: entry.author.clone(),
1512 author_mail: entry.author_mail.clone(),
1513 author_time: entry.author_time,
1514 author_tz: entry.author_tz.clone(),
1515 committer: entry.committer.clone(),
1516 committer_mail: entry.committer_mail.clone(),
1517 committer_time: entry.committer_time,
1518 committer_tz: entry.committer_tz.clone(),
1519 summary: entry.summary.clone(),
1520 previous: entry.previous.clone(),
1521 filename: entry.filename.clone(),
1522 })
1523 .collect::<Vec<_>>();
1524
1525 let messages = blame
1526 .messages
1527 .into_iter()
1528 .map(|(oid, message)| proto::CommitMessage {
1529 oid: oid.as_bytes().into(),
1530 message,
1531 })
1532 .collect::<Vec<_>>();
1533
1534 let permalinks = blame
1535 .permalinks
1536 .into_iter()
1537 .map(|(oid, url)| proto::CommitPermalink {
1538 oid: oid.as_bytes().into(),
1539 permalink: url.to_string(),
1540 })
1541 .collect::<Vec<_>>();
1542
1543 proto::BlameBufferResponse {
1544 entries,
1545 messages,
1546 permalinks,
1547 remote_url: blame.remote_url,
1548 }
1549}
1550
1551fn deserialize_blame_buffer_response(response: proto::BlameBufferResponse) -> git::blame::Blame {
1552 let entries = response
1553 .entries
1554 .into_iter()
1555 .filter_map(|entry| {
1556 Some(git::blame::BlameEntry {
1557 sha: git::Oid::from_bytes(&entry.sha).ok()?,
1558 range: entry.start_line..entry.end_line,
1559 original_line_number: entry.original_line_number,
1560 committer: entry.committer,
1561 committer_time: entry.committer_time,
1562 committer_tz: entry.committer_tz,
1563 committer_mail: entry.committer_mail,
1564 author: entry.author,
1565 author_mail: entry.author_mail,
1566 author_time: entry.author_time,
1567 author_tz: entry.author_tz,
1568 summary: entry.summary,
1569 previous: entry.previous,
1570 filename: entry.filename,
1571 })
1572 })
1573 .collect::<Vec<_>>();
1574
1575 let messages = response
1576 .messages
1577 .into_iter()
1578 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
1579 .collect::<HashMap<_, _>>();
1580
1581 let permalinks = response
1582 .permalinks
1583 .into_iter()
1584 .filter_map(|permalink| {
1585 Some((
1586 git::Oid::from_bytes(&permalink.oid).ok()?,
1587 Url::from_str(&permalink.permalink).ok()?,
1588 ))
1589 })
1590 .collect::<HashMap<_, _>>();
1591
1592 Blame {
1593 entries,
1594 permalinks,
1595 messages,
1596 remote_url: response.remote_url,
1597 }
1598}