1use crate::{
2 search::SearchQuery,
3 worktree_store::{WorktreeStore, WorktreeStoreEvent},
4 Item, NoRepositoryError, ProjectPath,
5};
6use anyhow::{anyhow, Context as _, Result};
7use client::Client;
8use collections::{hash_map, HashMap, HashSet};
9use fs::Fs;
10use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt};
11use git::blame::Blame;
12use gpui::{
13 AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Task, WeakModel,
14};
15use http_client::Url;
16use language::{
17 proto::{deserialize_line_ending, deserialize_version, serialize_version, split_operations},
18 Buffer, BufferEvent, Capability, File as _, Language, Operation,
19};
20use rpc::{proto, AnyProtoClient, ErrorExt as _, TypedEnvelope};
21use smol::channel::Receiver;
22use std::{io, path::Path, str::FromStr as _, sync::Arc, time::Instant};
23use text::BufferId;
24use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
25use worktree::{
26 File, PathChange, ProjectEntryId, RemoteWorktree, UpdatedGitRepositoriesSet, Worktree,
27 WorktreeId,
28};
29
30/// A set of open buffers.
31pub struct BufferStore {
32 downstream_client: Option<AnyProtoClient>,
33 remote_id: Option<u64>,
34 #[allow(unused)]
35 worktree_store: Model<WorktreeStore>,
36 opened_buffers: HashMap<BufferId, OpenBuffer>,
37 local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
38 local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
39 #[allow(clippy::type_complexity)]
40 loading_buffers_by_path: HashMap<
41 ProjectPath,
42 postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
43 >,
44 loading_remote_buffers_by_id: HashMap<BufferId, Model<Buffer>>,
45 remote_buffer_listeners:
46 HashMap<BufferId, Vec<oneshot::Sender<Result<Model<Buffer>, anyhow::Error>>>>,
47 shared_buffers: HashMap<proto::PeerId, HashSet<BufferId>>,
48}
49
50enum OpenBuffer {
51 Strong(Model<Buffer>),
52 Weak(WeakModel<Buffer>),
53 Operations(Vec<Operation>),
54}
55
56pub enum BufferStoreEvent {
57 BufferAdded(Model<Buffer>),
58 BufferDropped(BufferId),
59 BufferChangedFilePath {
60 buffer: Model<Buffer>,
61 old_file: Option<Arc<dyn language::File>>,
62 },
63}
64
65#[derive(Default, Debug)]
66pub struct ProjectTransaction(pub HashMap<Model<Buffer>, language::Transaction>);
67
68impl EventEmitter<BufferStoreEvent> for BufferStore {}
69
70impl BufferStore {
71 pub fn init(client: &AnyProtoClient) {
72 client.add_model_message_handler(Self::handle_buffer_reloaded);
73 client.add_model_message_handler(Self::handle_buffer_saved);
74 client.add_model_message_handler(Self::handle_update_buffer_file);
75 client.add_model_message_handler(Self::handle_update_diff_base);
76 client.add_model_request_handler(Self::handle_save_buffer);
77 client.add_model_request_handler(Self::handle_blame_buffer);
78 }
79
80 /// Creates a buffer store, optionally retaining its buffers.
81 ///
82 /// If `retain_buffers` is `true`, then buffers are owned by the buffer store
83 /// and won't be released unless they are explicitly removed, or `retain_buffers`
84 /// is set to `false` via `set_retain_buffers`. Otherwise, buffers are stored as
85 /// weak handles.
86 pub fn new(
87 worktree_store: Model<WorktreeStore>,
88 remote_id: Option<u64>,
89 cx: &mut ModelContext<Self>,
90 ) -> Self {
91 cx.subscribe(&worktree_store, |this, _, event, cx| {
92 if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
93 this.subscribe_to_worktree(worktree, cx);
94 }
95 })
96 .detach();
97
98 Self {
99 remote_id,
100 downstream_client: None,
101 worktree_store,
102 opened_buffers: Default::default(),
103 remote_buffer_listeners: Default::default(),
104 loading_remote_buffers_by_id: Default::default(),
105 local_buffer_ids_by_path: Default::default(),
106 local_buffer_ids_by_entry_id: Default::default(),
107 loading_buffers_by_path: Default::default(),
108 shared_buffers: Default::default(),
109 }
110 }
111
112 pub fn open_buffer(
113 &mut self,
114 project_path: ProjectPath,
115 cx: &mut ModelContext<Self>,
116 ) -> Task<Result<Model<Buffer>>> {
117 let existing_buffer = self.get_by_path(&project_path, cx);
118 if let Some(existing_buffer) = existing_buffer {
119 return Task::ready(Ok(existing_buffer));
120 }
121
122 let Some(worktree) = self
123 .worktree_store
124 .read(cx)
125 .worktree_for_id(project_path.worktree_id, cx)
126 else {
127 return Task::ready(Err(anyhow!("no such worktree")));
128 };
129
130 let loading_watch = match self.loading_buffers_by_path.entry(project_path.clone()) {
131 // If the given path is already being loaded, then wait for that existing
132 // task to complete and return the same buffer.
133 hash_map::Entry::Occupied(e) => e.get().clone(),
134
135 // Otherwise, record the fact that this path is now being loaded.
136 hash_map::Entry::Vacant(entry) => {
137 let (mut tx, rx) = postage::watch::channel();
138 entry.insert(rx.clone());
139
140 let project_path = project_path.clone();
141 let load_buffer = match worktree.read(cx) {
142 Worktree::Local(_) => {
143 self.open_local_buffer_internal(project_path.path.clone(), worktree, cx)
144 }
145 Worktree::Remote(tree) => {
146 self.open_remote_buffer_internal(&project_path.path, tree, cx)
147 }
148 };
149
150 cx.spawn(move |this, mut cx| async move {
151 let load_result = load_buffer.await;
152 *tx.borrow_mut() = Some(this.update(&mut cx, |this, _| {
153 // Record the fact that the buffer is no longer loading.
154 this.loading_buffers_by_path.remove(&project_path);
155 let buffer = load_result.map_err(Arc::new)?;
156 Ok(buffer)
157 })?);
158 anyhow::Ok(())
159 })
160 .detach();
161 rx
162 }
163 };
164
165 cx.background_executor().spawn(async move {
166 Self::wait_for_loading_buffer(loading_watch)
167 .await
168 .map_err(|e| e.cloned())
169 })
170 }
171
172 fn subscribe_to_worktree(&mut self, worktree: &Model<Worktree>, cx: &mut ModelContext<Self>) {
173 cx.subscribe(worktree, |this, worktree, event, cx| {
174 if worktree.read(cx).is_local() {
175 match event {
176 worktree::Event::UpdatedEntries(changes) => {
177 this.local_worktree_entries_changed(&worktree, changes, cx);
178 }
179 worktree::Event::UpdatedGitRepositories(updated_repos) => {
180 this.local_worktree_git_repos_changed(worktree.clone(), updated_repos, cx)
181 }
182 _ => {}
183 }
184 }
185 })
186 .detach();
187 }
188
189 fn local_worktree_entries_changed(
190 &mut self,
191 worktree_handle: &Model<Worktree>,
192 changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
193 cx: &mut ModelContext<Self>,
194 ) {
195 let snapshot = worktree_handle.read(cx).snapshot();
196 for (path, entry_id, _) in changes {
197 self.local_worktree_entry_changed(*entry_id, path, worktree_handle, &snapshot, cx);
198 }
199 }
200
201 fn local_worktree_git_repos_changed(
202 &mut self,
203 worktree_handle: Model<Worktree>,
204 changed_repos: &UpdatedGitRepositoriesSet,
205 cx: &mut ModelContext<Self>,
206 ) {
207 debug_assert!(worktree_handle.read(cx).is_local());
208
209 // Identify the loading buffers whose containing repository that has changed.
210 let future_buffers = self
211 .loading_buffers()
212 .filter_map(|(project_path, receiver)| {
213 if project_path.worktree_id != worktree_handle.read(cx).id() {
214 return None;
215 }
216 let path = &project_path.path;
217 changed_repos
218 .iter()
219 .find(|(work_dir, _)| path.starts_with(work_dir))?;
220 let path = path.clone();
221 Some(async move {
222 Self::wait_for_loading_buffer(receiver)
223 .await
224 .ok()
225 .map(|buffer| (buffer, path))
226 })
227 })
228 .collect::<FuturesUnordered<_>>();
229
230 // Identify the current buffers whose containing repository has changed.
231 let current_buffers = self
232 .buffers()
233 .filter_map(|buffer| {
234 let file = File::from_dyn(buffer.read(cx).file())?;
235 if file.worktree != worktree_handle {
236 return None;
237 }
238 changed_repos
239 .iter()
240 .find(|(work_dir, _)| file.path.starts_with(work_dir))?;
241 Some((buffer, file.path.clone()))
242 })
243 .collect::<Vec<_>>();
244
245 if future_buffers.len() + current_buffers.len() == 0 {
246 return;
247 }
248
249 cx.spawn(move |this, mut cx| async move {
250 // Wait for all of the buffers to load.
251 let future_buffers = future_buffers.collect::<Vec<_>>().await;
252
253 // Reload the diff base for every buffer whose containing git repository has changed.
254 let snapshot =
255 worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
256 let diff_bases_by_buffer = cx
257 .background_executor()
258 .spawn(async move {
259 let mut diff_base_tasks = future_buffers
260 .into_iter()
261 .flatten()
262 .chain(current_buffers)
263 .filter_map(|(buffer, path)| {
264 let (repo_entry, local_repo_entry) = snapshot.repo_for_path(&path)?;
265 let relative_path = repo_entry.relativize(&snapshot, &path).ok()?;
266 Some(async move {
267 let base_text =
268 local_repo_entry.repo().load_index_text(&relative_path);
269 Some((buffer, base_text))
270 })
271 })
272 .collect::<FuturesUnordered<_>>();
273
274 let mut diff_bases = Vec::with_capacity(diff_base_tasks.len());
275 while let Some(diff_base) = diff_base_tasks.next().await {
276 if let Some(diff_base) = diff_base {
277 diff_bases.push(diff_base);
278 }
279 }
280 diff_bases
281 })
282 .await;
283
284 this.update(&mut cx, |this, cx| {
285 // Assign the new diff bases on all of the buffers.
286 for (buffer, diff_base) in diff_bases_by_buffer {
287 let buffer_id = buffer.update(cx, |buffer, cx| {
288 buffer.set_diff_base(diff_base.clone(), cx);
289 buffer.remote_id().to_proto()
290 });
291 if let Some(project_id) = this.remote_id {
292 if let Some(client) = &this.downstream_client {
293 client
294 .send(proto::UpdateDiffBase {
295 project_id,
296 buffer_id,
297 diff_base,
298 })
299 .log_err();
300 }
301 }
302 }
303 })
304 })
305 .detach_and_log_err(cx);
306 }
307
308 fn open_local_buffer_internal(
309 &mut self,
310 path: Arc<Path>,
311 worktree: Model<Worktree>,
312 cx: &mut ModelContext<Self>,
313 ) -> Task<Result<Model<Buffer>>> {
314 let load_buffer = worktree.update(cx, |worktree, cx| {
315 let load_file = worktree.load_file(path.as_ref(), cx);
316 let reservation = cx.reserve_model();
317 let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
318 cx.spawn(move |_, mut cx| async move {
319 let loaded = load_file.await?;
320 let text_buffer = cx
321 .background_executor()
322 .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
323 .await;
324 cx.insert_model(reservation, |_| {
325 Buffer::build(
326 text_buffer,
327 loaded.diff_base,
328 Some(loaded.file),
329 Capability::ReadWrite,
330 )
331 })
332 })
333 });
334
335 cx.spawn(move |this, mut cx| async move {
336 let buffer = match load_buffer.await {
337 Ok(buffer) => Ok(buffer),
338 Err(error) if is_not_found_error(&error) => cx.new_model(|cx| {
339 let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
340 let text_buffer = text::Buffer::new(0, buffer_id, "".into());
341 Buffer::build(
342 text_buffer,
343 None,
344 Some(Arc::new(File {
345 worktree,
346 path,
347 mtime: None,
348 entry_id: None,
349 is_local: true,
350 is_deleted: false,
351 is_private: false,
352 })),
353 Capability::ReadWrite,
354 )
355 }),
356 Err(e) => Err(e),
357 }?;
358 this.update(&mut cx, |this, cx| {
359 this.add_buffer(buffer.clone(), cx).log_err();
360 })?;
361 Ok(buffer)
362 })
363 }
364
365 fn open_remote_buffer_internal(
366 &self,
367 path: &Arc<Path>,
368 worktree: &RemoteWorktree,
369 cx: &ModelContext<Self>,
370 ) -> Task<Result<Model<Buffer>>> {
371 let worktree_id = worktree.id().to_proto();
372 let project_id = worktree.project_id();
373 let client = worktree.client();
374 let path_string = path.clone().to_string_lossy().to_string();
375 cx.spawn(move |this, mut cx| async move {
376 let response = client
377 .request(proto::OpenBufferByPath {
378 project_id,
379 worktree_id,
380 path: path_string,
381 })
382 .await?;
383 let buffer_id = BufferId::new(response.buffer_id)?;
384 this.update(&mut cx, |this, cx| {
385 this.wait_for_remote_buffer(buffer_id, cx)
386 })?
387 .await
388 })
389 }
390
391 pub fn create_buffer(
392 &mut self,
393 remote_client: Option<(AnyProtoClient, u64)>,
394 cx: &mut ModelContext<Self>,
395 ) -> Task<Result<Model<Buffer>>> {
396 if let Some((remote_client, project_id)) = remote_client {
397 let create = remote_client.request(proto::OpenNewBuffer { project_id });
398 cx.spawn(|this, mut cx| async move {
399 let response = create.await?;
400 let buffer_id = BufferId::new(response.buffer_id)?;
401
402 this.update(&mut cx, |this, cx| {
403 this.wait_for_remote_buffer(buffer_id, cx)
404 })?
405 .await
406 })
407 } else {
408 Task::ready(Ok(self.create_local_buffer("", None, cx)))
409 }
410 }
411
412 pub fn create_local_buffer(
413 &mut self,
414 text: &str,
415 language: Option<Arc<Language>>,
416 cx: &mut ModelContext<Self>,
417 ) -> Model<Buffer> {
418 let buffer = cx.new_model(|cx| {
419 Buffer::local(text, cx)
420 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
421 });
422 self.add_buffer(buffer.clone(), cx).log_err();
423 buffer
424 }
425
426 pub fn save_buffer(
427 &mut self,
428 buffer: Model<Buffer>,
429 cx: &mut ModelContext<Self>,
430 ) -> Task<Result<()>> {
431 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
432 return Task::ready(Err(anyhow!("buffer doesn't have a file")));
433 };
434 match file.worktree.read(cx) {
435 Worktree::Local(_) => {
436 self.save_local_buffer(file.worktree.clone(), buffer, file.path.clone(), false, cx)
437 }
438 Worktree::Remote(tree) => self.save_remote_buffer(buffer, None, tree, cx),
439 }
440 }
441
442 pub fn save_buffer_as(
443 &mut self,
444 buffer: Model<Buffer>,
445 path: ProjectPath,
446 cx: &mut ModelContext<Self>,
447 ) -> Task<Result<()>> {
448 let Some(worktree) = self
449 .worktree_store
450 .read(cx)
451 .worktree_for_id(path.worktree_id, cx)
452 else {
453 return Task::ready(Err(anyhow!("no such worktree")));
454 };
455
456 let old_file = buffer.read(cx).file().cloned();
457
458 let task = match worktree.read(cx) {
459 Worktree::Local(_) => {
460 self.save_local_buffer(worktree, buffer.clone(), path.path, true, cx)
461 }
462 Worktree::Remote(tree) => {
463 self.save_remote_buffer(buffer.clone(), Some(path.to_proto()), tree, cx)
464 }
465 };
466 cx.spawn(|this, mut cx| async move {
467 task.await?;
468 this.update(&mut cx, |_, cx| {
469 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
470 })
471 })
472 }
473
474 fn save_local_buffer(
475 &self,
476 worktree: Model<Worktree>,
477 buffer_handle: Model<Buffer>,
478 path: Arc<Path>,
479 mut has_changed_file: bool,
480 cx: &mut ModelContext<Self>,
481 ) -> Task<Result<()>> {
482 let buffer = buffer_handle.read(cx);
483 let text = buffer.as_rope().clone();
484 let line_ending = buffer.line_ending();
485 let version = buffer.version();
486 let buffer_id = buffer.remote_id();
487 if buffer.file().is_some_and(|file| !file.is_created()) {
488 has_changed_file = true;
489 }
490
491 let save = worktree.update(cx, |worktree, cx| {
492 worktree.write_file(path.as_ref(), text, line_ending, cx)
493 });
494
495 cx.spawn(move |this, mut cx| async move {
496 let new_file = save.await?;
497 let mtime = new_file.mtime;
498 this.update(&mut cx, |this, cx| {
499 if let Some(downstream_client) = this.downstream_client.as_ref() {
500 let project_id = this.remote_id.unwrap_or(0);
501 if has_changed_file {
502 downstream_client
503 .send(proto::UpdateBufferFile {
504 project_id,
505 buffer_id: buffer_id.to_proto(),
506 file: Some(language::File::to_proto(&*new_file, cx)),
507 })
508 .log_err();
509 }
510 downstream_client
511 .send(proto::BufferSaved {
512 project_id,
513 buffer_id: buffer_id.to_proto(),
514 version: serialize_version(&version),
515 mtime: mtime.map(|time| time.into()),
516 })
517 .log_err();
518 }
519 })?;
520 buffer_handle.update(&mut cx, |buffer, cx| {
521 if has_changed_file {
522 buffer.file_updated(new_file, cx);
523 }
524 buffer.did_save(version.clone(), mtime, cx);
525 })
526 })
527 }
528
529 fn save_remote_buffer(
530 &self,
531 buffer_handle: Model<Buffer>,
532 new_path: Option<proto::ProjectPath>,
533 tree: &RemoteWorktree,
534 cx: &ModelContext<Self>,
535 ) -> Task<Result<()>> {
536 let buffer = buffer_handle.read(cx);
537 let buffer_id = buffer.remote_id().into();
538 let version = buffer.version();
539 let rpc = tree.client();
540 let project_id = tree.project_id();
541 cx.spawn(move |_, mut cx| async move {
542 let response = rpc
543 .request(proto::SaveBuffer {
544 project_id,
545 buffer_id,
546 new_path,
547 version: serialize_version(&version),
548 })
549 .await?;
550 let version = deserialize_version(&response.version);
551 let mtime = response.mtime.map(|mtime| mtime.into());
552
553 buffer_handle.update(&mut cx, |buffer, cx| {
554 buffer.did_save(version.clone(), mtime, cx);
555 })?;
556
557 Ok(())
558 })
559 }
560
561 pub fn blame_buffer(
562 &self,
563 buffer: &Model<Buffer>,
564 version: Option<clock::Global>,
565 cx: &AppContext,
566 ) -> Task<Result<Blame>> {
567 let buffer = buffer.read(cx);
568 let Some(file) = File::from_dyn(buffer.file()) else {
569 return Task::ready(Err(anyhow!("buffer has no file")));
570 };
571
572 match file.worktree.clone().read(cx) {
573 Worktree::Local(worktree) => {
574 let worktree = worktree.snapshot();
575 let blame_params = maybe!({
576 let (repo_entry, local_repo_entry) = match worktree.repo_for_path(&file.path) {
577 Some(repo_for_path) => repo_for_path,
578 None => anyhow::bail!(NoRepositoryError {}),
579 };
580
581 let relative_path = repo_entry
582 .relativize(&worktree, &file.path)
583 .context("failed to relativize buffer path")?;
584
585 let repo = local_repo_entry.repo().clone();
586
587 let content = match version {
588 Some(version) => buffer.rope_for_version(&version).clone(),
589 None => buffer.as_rope().clone(),
590 };
591
592 anyhow::Ok((repo, relative_path, content))
593 });
594
595 cx.background_executor().spawn(async move {
596 let (repo, relative_path, content) = blame_params?;
597 repo.blame(&relative_path, content)
598 .with_context(|| format!("Failed to blame {:?}", relative_path.0))
599 })
600 }
601 Worktree::Remote(worktree) => {
602 let buffer_id = buffer.remote_id();
603 let version = buffer.version();
604 let project_id = worktree.project_id();
605 let client = worktree.client();
606 cx.spawn(|_| async move {
607 let response = client
608 .request(proto::BlameBuffer {
609 project_id,
610 buffer_id: buffer_id.into(),
611 version: serialize_version(&version),
612 })
613 .await?;
614 Ok(deserialize_blame_buffer_response(response))
615 })
616 }
617 }
618 }
619
620 fn add_buffer(&mut self, buffer: Model<Buffer>, cx: &mut ModelContext<Self>) -> Result<()> {
621 let remote_id = buffer.read(cx).remote_id();
622 let is_remote = buffer.read(cx).replica_id() != 0;
623 let open_buffer = if self.remote_id.is_some() {
624 OpenBuffer::Strong(buffer.clone())
625 } else {
626 OpenBuffer::Weak(buffer.downgrade())
627 };
628
629 let handle = cx.handle().downgrade();
630 buffer.update(cx, move |_, cx| {
631 cx.on_release(move |buffer, cx| {
632 handle
633 .update(cx, |_, cx| {
634 cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
635 })
636 .ok();
637 })
638 .detach()
639 });
640
641 match self.opened_buffers.entry(remote_id) {
642 hash_map::Entry::Vacant(entry) => {
643 entry.insert(open_buffer);
644 }
645 hash_map::Entry::Occupied(mut entry) => {
646 if let OpenBuffer::Operations(operations) = entry.get_mut() {
647 buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx))?;
648 } else if entry.get().upgrade().is_some() {
649 if is_remote {
650 return Ok(());
651 } else {
652 debug_panic!("buffer {} was already registered", remote_id);
653 Err(anyhow!("buffer {} was already registered", remote_id))?;
654 }
655 }
656 entry.insert(open_buffer);
657 }
658 }
659
660 if let Some(senders) = self.remote_buffer_listeners.remove(&remote_id) {
661 for sender in senders {
662 sender.send(Ok(buffer.clone())).ok();
663 }
664 }
665
666 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
667 if file.is_local {
668 self.local_buffer_ids_by_path.insert(
669 ProjectPath {
670 worktree_id: file.worktree_id(cx),
671 path: file.path.clone(),
672 },
673 remote_id,
674 );
675
676 if let Some(entry_id) = file.entry_id {
677 self.local_buffer_ids_by_entry_id
678 .insert(entry_id, remote_id);
679 }
680 }
681 }
682
683 cx.subscribe(&buffer, Self::on_buffer_event).detach();
684 cx.emit(BufferStoreEvent::BufferAdded(buffer));
685 Ok(())
686 }
687
688 pub fn buffers(&self) -> impl '_ + Iterator<Item = Model<Buffer>> {
689 self.opened_buffers
690 .values()
691 .filter_map(|buffer| buffer.upgrade())
692 }
693
694 pub fn loading_buffers(
695 &self,
696 ) -> impl Iterator<
697 Item = (
698 &ProjectPath,
699 postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
700 ),
701 > {
702 self.loading_buffers_by_path
703 .iter()
704 .map(|(path, rx)| (path, rx.clone()))
705 }
706
707 pub fn get_by_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Model<Buffer>> {
708 self.buffers().find_map(|buffer| {
709 let file = File::from_dyn(buffer.read(cx).file())?;
710 if file.worktree_id(cx) == path.worktree_id && file.path == path.path {
711 Some(buffer)
712 } else {
713 None
714 }
715 })
716 }
717
718 pub fn get(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
719 self.opened_buffers
720 .get(&buffer_id)
721 .and_then(|buffer| buffer.upgrade())
722 }
723
724 pub fn get_existing(&self, buffer_id: BufferId) -> Result<Model<Buffer>> {
725 self.get(buffer_id)
726 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
727 }
728
729 pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
730 self.get(buffer_id)
731 .or_else(|| self.loading_remote_buffers_by_id.get(&buffer_id).cloned())
732 }
733
734 pub fn wait_for_remote_buffer(
735 &mut self,
736 id: BufferId,
737 cx: &mut AppContext,
738 ) -> Task<Result<Model<Buffer>>> {
739 let buffer = self.get(id);
740 if let Some(buffer) = buffer {
741 return Task::ready(Ok(buffer));
742 }
743 let (tx, rx) = oneshot::channel();
744 self.remote_buffer_listeners.entry(id).or_default().push(tx);
745 cx.background_executor().spawn(async move { rx.await? })
746 }
747
748 pub fn buffer_version_info(
749 &self,
750 cx: &AppContext,
751 ) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
752 let buffers = self
753 .buffers()
754 .map(|buffer| {
755 let buffer = buffer.read(cx);
756 proto::BufferVersion {
757 id: buffer.remote_id().into(),
758 version: language::proto::serialize_version(&buffer.version),
759 }
760 })
761 .collect();
762 let incomplete_buffer_ids = self
763 .loading_remote_buffers_by_id
764 .keys()
765 .copied()
766 .collect::<Vec<_>>();
767 (buffers, incomplete_buffer_ids)
768 }
769
770 pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
771 self.downstream_client.take();
772 self.set_remote_id(None, cx);
773
774 for buffer in self.buffers() {
775 buffer.update(cx, |buffer, cx| {
776 buffer.set_capability(Capability::ReadOnly, cx)
777 });
778 }
779
780 // Wake up all futures currently waiting on a buffer to get opened,
781 // to give them a chance to fail now that we've disconnected.
782 self.remote_buffer_listeners.clear();
783 }
784
785 pub fn shared(
786 &mut self,
787 remote_id: u64,
788 downstream_client: AnyProtoClient,
789 cx: &mut AppContext,
790 ) {
791 self.downstream_client = Some(downstream_client);
792 self.set_remote_id(Some(remote_id), cx);
793 }
794
795 pub fn unshared(&mut self, _cx: &mut ModelContext<Self>) {
796 self.remote_id.take();
797 }
798
799 fn set_remote_id(&mut self, remote_id: Option<u64>, cx: &mut AppContext) {
800 self.remote_id = remote_id;
801 for open_buffer in self.opened_buffers.values_mut() {
802 if remote_id.is_some() {
803 if let OpenBuffer::Weak(buffer) = open_buffer {
804 if let Some(buffer) = buffer.upgrade() {
805 *open_buffer = OpenBuffer::Strong(buffer);
806 }
807 }
808 } else {
809 if let Some(buffer) = open_buffer.upgrade() {
810 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
811 }
812 if let OpenBuffer::Strong(buffer) = open_buffer {
813 *open_buffer = OpenBuffer::Weak(buffer.downgrade());
814 }
815 }
816 }
817 }
818
819 pub fn discard_incomplete(&mut self) {
820 self.opened_buffers
821 .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
822 }
823
824 pub fn find_search_candidates(
825 &mut self,
826 query: &SearchQuery,
827 mut limit: usize,
828 fs: Arc<dyn Fs>,
829 cx: &mut ModelContext<Self>,
830 ) -> Receiver<Model<Buffer>> {
831 let (tx, rx) = smol::channel::unbounded();
832 let mut open_buffers = HashSet::default();
833 let mut unnamed_buffers = Vec::new();
834 for handle in self.buffers() {
835 let buffer = handle.read(cx);
836 if let Some(entry_id) = buffer.entry_id(cx) {
837 open_buffers.insert(entry_id);
838 } else {
839 limit = limit.saturating_sub(1);
840 unnamed_buffers.push(handle)
841 };
842 }
843
844 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
845 let mut project_paths_rx = self
846 .worktree_store
847 .update(cx, |worktree_store, cx| {
848 worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
849 })
850 .chunks(MAX_CONCURRENT_BUFFER_OPENS);
851
852 cx.spawn(|this, mut cx| async move {
853 for buffer in unnamed_buffers {
854 tx.send(buffer).await.ok();
855 }
856
857 while let Some(project_paths) = project_paths_rx.next().await {
858 let buffers = this.update(&mut cx, |this, cx| {
859 project_paths
860 .into_iter()
861 .map(|project_path| this.open_buffer(project_path, cx))
862 .collect::<Vec<_>>()
863 })?;
864 for buffer_task in buffers {
865 if let Some(buffer) = buffer_task.await.log_err() {
866 if tx.send(buffer).await.is_err() {
867 return anyhow::Ok(());
868 }
869 }
870 }
871 }
872 anyhow::Ok(())
873 })
874 .detach();
875 rx
876 }
877
878 fn on_buffer_event(
879 &mut self,
880 buffer: Model<Buffer>,
881 event: &BufferEvent,
882 cx: &mut ModelContext<Self>,
883 ) {
884 if event == &BufferEvent::FileHandleChanged {
885 self.buffer_changed_file(buffer, cx);
886 }
887 }
888
889 fn local_worktree_entry_changed(
890 &mut self,
891 entry_id: ProjectEntryId,
892 path: &Arc<Path>,
893 worktree: &Model<worktree::Worktree>,
894 snapshot: &worktree::Snapshot,
895 cx: &mut ModelContext<Self>,
896 ) -> Option<()> {
897 let project_path = ProjectPath {
898 worktree_id: snapshot.id(),
899 path: path.clone(),
900 };
901 let buffer_id = match self.local_buffer_ids_by_entry_id.get(&entry_id) {
902 Some(&buffer_id) => buffer_id,
903 None => self.local_buffer_ids_by_path.get(&project_path).copied()?,
904 };
905 let buffer = if let Some(buffer) = self.get(buffer_id) {
906 buffer
907 } else {
908 self.opened_buffers.remove(&buffer_id);
909 self.local_buffer_ids_by_path.remove(&project_path);
910 self.local_buffer_ids_by_entry_id.remove(&entry_id);
911 return None;
912 };
913
914 let events = buffer.update(cx, |buffer, cx| {
915 let file = buffer.file()?;
916 let old_file = File::from_dyn(Some(file))?;
917 if old_file.worktree != *worktree {
918 return None;
919 }
920
921 let new_file = if let Some(entry) = old_file
922 .entry_id
923 .and_then(|entry_id| snapshot.entry_for_id(entry_id))
924 {
925 File {
926 is_local: true,
927 entry_id: Some(entry.id),
928 mtime: entry.mtime,
929 path: entry.path.clone(),
930 worktree: worktree.clone(),
931 is_deleted: false,
932 is_private: entry.is_private,
933 }
934 } else if let Some(entry) = snapshot.entry_for_path(old_file.path.as_ref()) {
935 File {
936 is_local: true,
937 entry_id: Some(entry.id),
938 mtime: entry.mtime,
939 path: entry.path.clone(),
940 worktree: worktree.clone(),
941 is_deleted: false,
942 is_private: entry.is_private,
943 }
944 } else {
945 File {
946 is_local: true,
947 entry_id: old_file.entry_id,
948 path: old_file.path.clone(),
949 mtime: old_file.mtime,
950 worktree: worktree.clone(),
951 is_deleted: true,
952 is_private: old_file.is_private,
953 }
954 };
955
956 if new_file == *old_file {
957 return None;
958 }
959
960 let mut events = Vec::new();
961 if new_file.path != old_file.path {
962 self.local_buffer_ids_by_path.remove(&ProjectPath {
963 path: old_file.path.clone(),
964 worktree_id: old_file.worktree_id(cx),
965 });
966 self.local_buffer_ids_by_path.insert(
967 ProjectPath {
968 worktree_id: new_file.worktree_id(cx),
969 path: new_file.path.clone(),
970 },
971 buffer_id,
972 );
973 events.push(BufferStoreEvent::BufferChangedFilePath {
974 buffer: cx.handle(),
975 old_file: buffer.file().cloned(),
976 });
977 }
978
979 if new_file.entry_id != old_file.entry_id {
980 if let Some(entry_id) = old_file.entry_id {
981 self.local_buffer_ids_by_entry_id.remove(&entry_id);
982 }
983 if let Some(entry_id) = new_file.entry_id {
984 self.local_buffer_ids_by_entry_id
985 .insert(entry_id, buffer_id);
986 }
987 }
988
989 if let Some(project_id) = self.remote_id {
990 if let Some(client) = &self.downstream_client {
991 client
992 .send(proto::UpdateBufferFile {
993 project_id,
994 buffer_id: buffer_id.to_proto(),
995 file: Some(new_file.to_proto(cx)),
996 })
997 .ok();
998 }
999 }
1000
1001 buffer.file_updated(Arc::new(new_file), cx);
1002 Some(events)
1003 })?;
1004
1005 for event in events {
1006 cx.emit(event);
1007 }
1008
1009 None
1010 }
1011
1012 fn buffer_changed_file(&mut self, buffer: Model<Buffer>, cx: &mut AppContext) -> Option<()> {
1013 let file = File::from_dyn(buffer.read(cx).file())?;
1014
1015 let remote_id = buffer.read(cx).remote_id();
1016 if let Some(entry_id) = file.entry_id {
1017 match self.local_buffer_ids_by_entry_id.get(&entry_id) {
1018 Some(_) => {
1019 return None;
1020 }
1021 None => {
1022 self.local_buffer_ids_by_entry_id
1023 .insert(entry_id, remote_id);
1024 }
1025 }
1026 };
1027 self.local_buffer_ids_by_path.insert(
1028 ProjectPath {
1029 worktree_id: file.worktree_id(cx),
1030 path: file.path.clone(),
1031 },
1032 remote_id,
1033 );
1034
1035 Some(())
1036 }
1037
1038 pub async fn handle_update_buffer(
1039 this: Model<Self>,
1040 envelope: TypedEnvelope<proto::UpdateBuffer>,
1041 mut cx: AsyncAppContext,
1042 ) -> Result<proto::Ack> {
1043 let payload = envelope.payload.clone();
1044 let buffer_id = BufferId::new(payload.buffer_id)?;
1045 let ops = payload
1046 .operations
1047 .into_iter()
1048 .map(language::proto::deserialize_operation)
1049 .collect::<Result<Vec<_>, _>>()?;
1050 this.update(&mut cx, |this, cx| {
1051 match this.opened_buffers.entry(buffer_id) {
1052 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
1053 OpenBuffer::Strong(buffer) => {
1054 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
1055 }
1056 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
1057 OpenBuffer::Weak(buffer) => {
1058 if let Some(buffer) = buffer.upgrade() {
1059 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
1060 }
1061 }
1062 },
1063 hash_map::Entry::Vacant(e) => {
1064 e.insert(OpenBuffer::Operations(ops));
1065 }
1066 }
1067 Ok(proto::Ack {})
1068 })?
1069 }
1070
1071 pub fn handle_synchronize_buffers(
1072 &mut self,
1073 envelope: TypedEnvelope<proto::SynchronizeBuffers>,
1074 cx: &mut ModelContext<Self>,
1075 client: Arc<Client>,
1076 ) -> Result<proto::SynchronizeBuffersResponse> {
1077 let project_id = envelope.payload.project_id;
1078 let mut response = proto::SynchronizeBuffersResponse {
1079 buffers: Default::default(),
1080 };
1081 let Some(guest_id) = envelope.original_sender_id else {
1082 anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
1083 };
1084
1085 self.shared_buffers.entry(guest_id).or_default().clear();
1086 for buffer in envelope.payload.buffers {
1087 let buffer_id = BufferId::new(buffer.id)?;
1088 let remote_version = language::proto::deserialize_version(&buffer.version);
1089 if let Some(buffer) = self.get(buffer_id) {
1090 self.shared_buffers
1091 .entry(guest_id)
1092 .or_default()
1093 .insert(buffer_id);
1094
1095 let buffer = buffer.read(cx);
1096 response.buffers.push(proto::BufferVersion {
1097 id: buffer_id.into(),
1098 version: language::proto::serialize_version(&buffer.version),
1099 });
1100
1101 let operations = buffer.serialize_ops(Some(remote_version), cx);
1102 let client = client.clone();
1103 if let Some(file) = buffer.file() {
1104 client
1105 .send(proto::UpdateBufferFile {
1106 project_id,
1107 buffer_id: buffer_id.into(),
1108 file: Some(file.to_proto(cx)),
1109 })
1110 .log_err();
1111 }
1112
1113 client
1114 .send(proto::UpdateDiffBase {
1115 project_id,
1116 buffer_id: buffer_id.into(),
1117 diff_base: buffer.diff_base().map(ToString::to_string),
1118 })
1119 .log_err();
1120
1121 client
1122 .send(proto::BufferReloaded {
1123 project_id,
1124 buffer_id: buffer_id.into(),
1125 version: language::proto::serialize_version(buffer.saved_version()),
1126 mtime: buffer.saved_mtime().map(|time| time.into()),
1127 line_ending: language::proto::serialize_line_ending(buffer.line_ending())
1128 as i32,
1129 })
1130 .log_err();
1131
1132 cx.background_executor()
1133 .spawn(
1134 async move {
1135 let operations = operations.await;
1136 for chunk in split_operations(operations) {
1137 client
1138 .request(proto::UpdateBuffer {
1139 project_id,
1140 buffer_id: buffer_id.into(),
1141 operations: chunk,
1142 })
1143 .await?;
1144 }
1145 anyhow::Ok(())
1146 }
1147 .log_err(),
1148 )
1149 .detach();
1150 }
1151 }
1152 Ok(response)
1153 }
1154
1155 pub fn handle_create_buffer_for_peer(
1156 &mut self,
1157 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
1158 replica_id: u16,
1159 capability: Capability,
1160 cx: &mut ModelContext<Self>,
1161 ) -> Result<()> {
1162 match envelope
1163 .payload
1164 .variant
1165 .ok_or_else(|| anyhow!("missing variant"))?
1166 {
1167 proto::create_buffer_for_peer::Variant::State(mut state) => {
1168 let buffer_id = BufferId::new(state.id)?;
1169
1170 let buffer_result = maybe!({
1171 let mut buffer_file = None;
1172 if let Some(file) = state.file.take() {
1173 let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
1174 let worktree = self
1175 .worktree_store
1176 .read(cx)
1177 .worktree_for_id(worktree_id, cx)
1178 .ok_or_else(|| {
1179 anyhow!("no worktree found for id {}", file.worktree_id)
1180 })?;
1181 buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
1182 as Arc<dyn language::File>);
1183 }
1184 Buffer::from_proto(replica_id, capability, state, buffer_file)
1185 });
1186
1187 match buffer_result {
1188 Ok(buffer) => {
1189 let buffer = cx.new_model(|_| buffer);
1190 self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
1191 }
1192 Err(error) => {
1193 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
1194 for listener in listeners {
1195 listener.send(Err(anyhow!(error.cloned()))).ok();
1196 }
1197 }
1198 }
1199 }
1200 }
1201 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
1202 let buffer_id = BufferId::new(chunk.buffer_id)?;
1203 let buffer = self
1204 .loading_remote_buffers_by_id
1205 .get(&buffer_id)
1206 .cloned()
1207 .ok_or_else(|| {
1208 anyhow!(
1209 "received chunk for buffer {} without initial state",
1210 chunk.buffer_id
1211 )
1212 })?;
1213
1214 let result = maybe!({
1215 let operations = chunk
1216 .operations
1217 .into_iter()
1218 .map(language::proto::deserialize_operation)
1219 .collect::<Result<Vec<_>>>()?;
1220 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))
1221 });
1222
1223 if let Err(error) = result {
1224 self.loading_remote_buffers_by_id.remove(&buffer_id);
1225 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
1226 for listener in listeners {
1227 listener.send(Err(error.cloned())).ok();
1228 }
1229 }
1230 } else if chunk.is_last {
1231 self.loading_remote_buffers_by_id.remove(&buffer_id);
1232 self.add_buffer(buffer, cx)?;
1233 }
1234 }
1235 }
1236
1237 Ok(())
1238 }
1239
1240 pub async fn handle_update_buffer_file(
1241 this: Model<Self>,
1242 envelope: TypedEnvelope<proto::UpdateBufferFile>,
1243 mut cx: AsyncAppContext,
1244 ) -> Result<()> {
1245 let buffer_id = envelope.payload.buffer_id;
1246 let buffer_id = BufferId::new(buffer_id)?;
1247
1248 this.update(&mut cx, |this, cx| {
1249 let payload = envelope.payload.clone();
1250 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1251 let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
1252 let worktree = this
1253 .worktree_store
1254 .read(cx)
1255 .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
1256 .ok_or_else(|| anyhow!("no such worktree"))?;
1257 let file = File::from_proto(file, worktree, cx)?;
1258 let old_file = buffer.update(cx, |buffer, cx| {
1259 let old_file = buffer.file().cloned();
1260 let new_path = file.path.clone();
1261 buffer.file_updated(Arc::new(file), cx);
1262 if old_file
1263 .as_ref()
1264 .map_or(true, |old| *old.path() != new_path)
1265 {
1266 Some(old_file)
1267 } else {
1268 None
1269 }
1270 });
1271 if let Some(old_file) = old_file {
1272 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1273 }
1274 }
1275 Ok(())
1276 })?
1277 }
1278
1279 pub async fn handle_update_diff_base(
1280 this: Model<Self>,
1281 envelope: TypedEnvelope<proto::UpdateDiffBase>,
1282 mut cx: AsyncAppContext,
1283 ) -> Result<()> {
1284 this.update(&mut cx, |this, cx| {
1285 let buffer_id = envelope.payload.buffer_id;
1286 let buffer_id = BufferId::new(buffer_id)?;
1287 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1288 buffer.update(cx, |buffer, cx| {
1289 buffer.set_diff_base(envelope.payload.diff_base, cx)
1290 });
1291 }
1292 Ok(())
1293 })?
1294 }
1295
1296 pub async fn handle_save_buffer(
1297 this: Model<Self>,
1298 envelope: TypedEnvelope<proto::SaveBuffer>,
1299 mut cx: AsyncAppContext,
1300 ) -> Result<proto::BufferSaved> {
1301 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1302 let (buffer, project_id) = this.update(&mut cx, |this, _| {
1303 anyhow::Ok((
1304 this.get_existing(buffer_id)?,
1305 this.remote_id.context("project is not shared")?,
1306 ))
1307 })??;
1308 buffer
1309 .update(&mut cx, |buffer, _| {
1310 buffer.wait_for_version(deserialize_version(&envelope.payload.version))
1311 })?
1312 .await?;
1313 let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
1314
1315 if let Some(new_path) = envelope.payload.new_path {
1316 let new_path = ProjectPath::from_proto(new_path);
1317 this.update(&mut cx, |this, cx| {
1318 this.save_buffer_as(buffer.clone(), new_path, cx)
1319 })?
1320 .await?;
1321 } else {
1322 this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
1323 .await?;
1324 }
1325
1326 buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
1327 project_id,
1328 buffer_id: buffer_id.into(),
1329 version: serialize_version(buffer.saved_version()),
1330 mtime: buffer.saved_mtime().map(|time| time.into()),
1331 })
1332 }
1333
1334 pub async fn handle_close_buffer(
1335 this: Model<Self>,
1336 envelope: TypedEnvelope<proto::CloseBuffer>,
1337 mut cx: AsyncAppContext,
1338 ) -> Result<()> {
1339 let peer_id = envelope.sender_id;
1340 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1341 this.update(&mut cx, |this, _| {
1342 if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
1343 if shared.remove(&buffer_id) {
1344 if shared.is_empty() {
1345 this.shared_buffers.remove(&peer_id);
1346 }
1347 return;
1348 }
1349 };
1350 debug_panic!(
1351 "peer_id {} closed buffer_id {} which was either not open or already closed",
1352 peer_id,
1353 buffer_id
1354 )
1355 })
1356 }
1357
1358 pub async fn handle_buffer_saved(
1359 this: Model<Self>,
1360 envelope: TypedEnvelope<proto::BufferSaved>,
1361 mut cx: AsyncAppContext,
1362 ) -> Result<()> {
1363 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1364 let version = deserialize_version(&envelope.payload.version);
1365 let mtime = envelope.payload.mtime.map(|time| time.into());
1366 this.update(&mut cx, |this, cx| {
1367 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1368 buffer.update(cx, |buffer, cx| {
1369 buffer.did_save(version, mtime, cx);
1370 });
1371 }
1372 })
1373 }
1374
1375 pub async fn handle_buffer_reloaded(
1376 this: Model<Self>,
1377 envelope: TypedEnvelope<proto::BufferReloaded>,
1378 mut cx: AsyncAppContext,
1379 ) -> Result<()> {
1380 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1381 let version = deserialize_version(&envelope.payload.version);
1382 let mtime = envelope.payload.mtime.map(|time| time.into());
1383 let line_ending = deserialize_line_ending(
1384 proto::LineEnding::from_i32(envelope.payload.line_ending)
1385 .ok_or_else(|| anyhow!("missing line ending"))?,
1386 );
1387 this.update(&mut cx, |this, cx| {
1388 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1389 buffer.update(cx, |buffer, cx| {
1390 buffer.did_reload(version, line_ending, mtime, cx);
1391 });
1392 }
1393 })
1394 }
1395
1396 pub async fn handle_blame_buffer(
1397 this: Model<Self>,
1398 envelope: TypedEnvelope<proto::BlameBuffer>,
1399 mut cx: AsyncAppContext,
1400 ) -> Result<proto::BlameBufferResponse> {
1401 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1402 let version = deserialize_version(&envelope.payload.version);
1403 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1404 buffer
1405 .update(&mut cx, |buffer, _| {
1406 buffer.wait_for_version(version.clone())
1407 })?
1408 .await?;
1409 let blame = this
1410 .update(&mut cx, |this, cx| {
1411 this.blame_buffer(&buffer, Some(version), cx)
1412 })?
1413 .await?;
1414 Ok(serialize_blame_buffer_response(blame))
1415 }
1416
1417 pub async fn wait_for_loading_buffer(
1418 mut receiver: postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
1419 ) -> Result<Model<Buffer>, Arc<anyhow::Error>> {
1420 loop {
1421 if let Some(result) = receiver.borrow().as_ref() {
1422 match result {
1423 Ok(buffer) => return Ok(buffer.to_owned()),
1424 Err(e) => return Err(e.to_owned()),
1425 }
1426 }
1427 receiver.next().await;
1428 }
1429 }
1430
1431 pub fn create_buffer_for_peer(
1432 &mut self,
1433 buffer: &Model<Buffer>,
1434 peer_id: proto::PeerId,
1435 cx: &mut ModelContext<Self>,
1436 ) -> Task<Result<()>> {
1437 let buffer_id = buffer.read(cx).remote_id();
1438 if !self
1439 .shared_buffers
1440 .entry(peer_id)
1441 .or_default()
1442 .insert(buffer_id)
1443 {
1444 return Task::ready(Ok(()));
1445 }
1446
1447 let Some((client, project_id)) = self.downstream_client.clone().zip(self.remote_id) else {
1448 return Task::ready(Ok(()));
1449 };
1450
1451 cx.spawn(|this, mut cx| async move {
1452 let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else {
1453 return anyhow::Ok(());
1454 };
1455
1456 let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
1457 let operations = operations.await;
1458 let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?;
1459
1460 let initial_state = proto::CreateBufferForPeer {
1461 project_id,
1462 peer_id: Some(peer_id),
1463 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
1464 };
1465
1466 if client.send(initial_state).log_err().is_some() {
1467 let client = client.clone();
1468 cx.background_executor()
1469 .spawn(async move {
1470 let mut chunks = split_operations(operations).peekable();
1471 while let Some(chunk) = chunks.next() {
1472 let is_last = chunks.peek().is_none();
1473 client.send(proto::CreateBufferForPeer {
1474 project_id,
1475 peer_id: Some(peer_id),
1476 variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
1477 proto::BufferChunk {
1478 buffer_id: buffer_id.into(),
1479 operations: chunk,
1480 is_last,
1481 },
1482 )),
1483 })?;
1484 }
1485 anyhow::Ok(())
1486 })
1487 .await
1488 .log_err();
1489 }
1490 Ok(())
1491 })
1492 }
1493
1494 pub fn forget_shared_buffers(&mut self) {
1495 self.shared_buffers.clear();
1496 }
1497
1498 pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
1499 self.shared_buffers.remove(peer_id);
1500 }
1501
1502 pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
1503 if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
1504 self.shared_buffers.insert(new_peer_id, buffers);
1505 }
1506 }
1507
1508 pub fn shared_buffers(&self) -> &HashMap<proto::PeerId, HashSet<BufferId>> {
1509 &self.shared_buffers
1510 }
1511
1512 pub fn serialize_project_transaction_for_peer(
1513 &mut self,
1514 project_transaction: ProjectTransaction,
1515 peer_id: proto::PeerId,
1516 cx: &mut ModelContext<Self>,
1517 ) -> proto::ProjectTransaction {
1518 let mut serialized_transaction = proto::ProjectTransaction {
1519 buffer_ids: Default::default(),
1520 transactions: Default::default(),
1521 };
1522 for (buffer, transaction) in project_transaction.0 {
1523 self.create_buffer_for_peer(&buffer, peer_id, cx)
1524 .detach_and_log_err(cx);
1525 serialized_transaction
1526 .buffer_ids
1527 .push(buffer.read(cx).remote_id().into());
1528 serialized_transaction
1529 .transactions
1530 .push(language::proto::serialize_transaction(&transaction));
1531 }
1532 serialized_transaction
1533 }
1534
1535 pub async fn deserialize_project_transaction(
1536 this: WeakModel<Self>,
1537 message: proto::ProjectTransaction,
1538 push_to_history: bool,
1539 mut cx: AsyncAppContext,
1540 ) -> Result<ProjectTransaction> {
1541 let mut project_transaction = ProjectTransaction::default();
1542 for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions) {
1543 let buffer_id = BufferId::new(buffer_id)?;
1544 let buffer = this
1545 .update(&mut cx, |this, cx| {
1546 this.wait_for_remote_buffer(buffer_id, cx)
1547 })?
1548 .await?;
1549 let transaction = language::proto::deserialize_transaction(transaction)?;
1550 project_transaction.0.insert(buffer, transaction);
1551 }
1552
1553 for (buffer, transaction) in &project_transaction.0 {
1554 buffer
1555 .update(&mut cx, |buffer, _| {
1556 buffer.wait_for_edits(transaction.edit_ids.iter().copied())
1557 })?
1558 .await?;
1559
1560 if push_to_history {
1561 buffer.update(&mut cx, |buffer, _| {
1562 buffer.push_transaction(transaction.clone(), Instant::now());
1563 })?;
1564 }
1565 }
1566
1567 Ok(project_transaction)
1568 }
1569}
1570
1571impl OpenBuffer {
1572 fn upgrade(&self) -> Option<Model<Buffer>> {
1573 match self {
1574 OpenBuffer::Strong(handle) => Some(handle.clone()),
1575 OpenBuffer::Weak(handle) => handle.upgrade(),
1576 OpenBuffer::Operations(_) => None,
1577 }
1578 }
1579}
1580
1581fn is_not_found_error(error: &anyhow::Error) -> bool {
1582 error
1583 .root_cause()
1584 .downcast_ref::<io::Error>()
1585 .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
1586}
1587
1588fn serialize_blame_buffer_response(blame: git::blame::Blame) -> proto::BlameBufferResponse {
1589 let entries = blame
1590 .entries
1591 .into_iter()
1592 .map(|entry| proto::BlameEntry {
1593 sha: entry.sha.as_bytes().into(),
1594 start_line: entry.range.start,
1595 end_line: entry.range.end,
1596 original_line_number: entry.original_line_number,
1597 author: entry.author.clone(),
1598 author_mail: entry.author_mail.clone(),
1599 author_time: entry.author_time,
1600 author_tz: entry.author_tz.clone(),
1601 committer: entry.committer.clone(),
1602 committer_mail: entry.committer_mail.clone(),
1603 committer_time: entry.committer_time,
1604 committer_tz: entry.committer_tz.clone(),
1605 summary: entry.summary.clone(),
1606 previous: entry.previous.clone(),
1607 filename: entry.filename.clone(),
1608 })
1609 .collect::<Vec<_>>();
1610
1611 let messages = blame
1612 .messages
1613 .into_iter()
1614 .map(|(oid, message)| proto::CommitMessage {
1615 oid: oid.as_bytes().into(),
1616 message,
1617 })
1618 .collect::<Vec<_>>();
1619
1620 let permalinks = blame
1621 .permalinks
1622 .into_iter()
1623 .map(|(oid, url)| proto::CommitPermalink {
1624 oid: oid.as_bytes().into(),
1625 permalink: url.to_string(),
1626 })
1627 .collect::<Vec<_>>();
1628
1629 proto::BlameBufferResponse {
1630 entries,
1631 messages,
1632 permalinks,
1633 remote_url: blame.remote_url,
1634 }
1635}
1636
1637fn deserialize_blame_buffer_response(response: proto::BlameBufferResponse) -> git::blame::Blame {
1638 let entries = response
1639 .entries
1640 .into_iter()
1641 .filter_map(|entry| {
1642 Some(git::blame::BlameEntry {
1643 sha: git::Oid::from_bytes(&entry.sha).ok()?,
1644 range: entry.start_line..entry.end_line,
1645 original_line_number: entry.original_line_number,
1646 committer: entry.committer,
1647 committer_time: entry.committer_time,
1648 committer_tz: entry.committer_tz,
1649 committer_mail: entry.committer_mail,
1650 author: entry.author,
1651 author_mail: entry.author_mail,
1652 author_time: entry.author_time,
1653 author_tz: entry.author_tz,
1654 summary: entry.summary,
1655 previous: entry.previous,
1656 filename: entry.filename,
1657 })
1658 })
1659 .collect::<Vec<_>>();
1660
1661 let messages = response
1662 .messages
1663 .into_iter()
1664 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
1665 .collect::<HashMap<_, _>>();
1666
1667 let permalinks = response
1668 .permalinks
1669 .into_iter()
1670 .filter_map(|permalink| {
1671 Some((
1672 git::Oid::from_bytes(&permalink.oid).ok()?,
1673 Url::from_str(&permalink.permalink).ok()?,
1674 ))
1675 })
1676 .collect::<HashMap<_, _>>();
1677
1678 Blame {
1679 entries,
1680 permalinks,
1681 messages,
1682 remote_url: response.remote_url,
1683 }
1684}