1use crate::{
2 search::SearchQuery,
3 worktree_store::{WorktreeStore, WorktreeStoreEvent},
4 Item, NoRepositoryError, ProjectPath,
5};
6use anyhow::{anyhow, Context as _, Result};
7use client::Client;
8use collections::{hash_map, HashMap, HashSet};
9use fs::Fs;
10use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt};
11use git::blame::Blame;
12use gpui::{
13 AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Task, WeakModel,
14};
15use http_client::Url;
16use language::{
17 proto::{
18 deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
19 split_operations,
20 },
21 Buffer, BufferEvent, Capability, File as _, Language, Operation,
22};
23use rpc::{proto, AnyProtoClient, ErrorExt as _, TypedEnvelope};
24use smol::channel::Receiver;
25use std::{io, path::Path, str::FromStr as _, sync::Arc, time::Instant};
26use text::BufferId;
27use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
28use worktree::{
29 File, PathChange, ProjectEntryId, RemoteWorktree, UpdatedGitRepositoriesSet, Worktree,
30 WorktreeId,
31};
32
33/// A set of open buffers.
34pub struct BufferStore {
35 state: BufferStoreState,
36 downstream_client: Option<(AnyProtoClient, u64)>,
37 worktree_store: Model<WorktreeStore>,
38 opened_buffers: HashMap<BufferId, OpenBuffer>,
39 local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
40 local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
41 #[allow(clippy::type_complexity)]
42 loading_buffers_by_path: HashMap<
43 ProjectPath,
44 postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
45 >,
46 loading_remote_buffers_by_id: HashMap<BufferId, Model<Buffer>>,
47 remote_buffer_listeners:
48 HashMap<BufferId, Vec<oneshot::Sender<Result<Model<Buffer>, anyhow::Error>>>>,
49 shared_buffers: HashMap<proto::PeerId, HashSet<Model<Buffer>>>,
50}
51
52enum OpenBuffer {
53 Buffer(WeakModel<Buffer>),
54 Operations(Vec<Operation>),
55}
56
57pub enum BufferStoreEvent {
58 BufferAdded(Model<Buffer>),
59 BufferDropped(BufferId),
60 BufferChangedFilePath {
61 buffer: Model<Buffer>,
62 old_file: Option<Arc<dyn language::File>>,
63 },
64}
65
66enum BufferStoreState {
67 Remote {
68 shared_with_me: HashSet<Model<Buffer>>,
69 upstream_client: AnyProtoClient,
70 project_id: u64,
71 },
72 Local {},
73}
74
75#[derive(Default, Debug)]
76pub struct ProjectTransaction(pub HashMap<Model<Buffer>, language::Transaction>);
77
78impl EventEmitter<BufferStoreEvent> for BufferStore {}
79
80impl BufferStore {
81 pub fn init(client: &AnyProtoClient) {
82 client.add_model_message_handler(Self::handle_buffer_reloaded);
83 client.add_model_message_handler(Self::handle_buffer_saved);
84 client.add_model_message_handler(Self::handle_update_buffer_file);
85 client.add_model_message_handler(Self::handle_update_diff_base);
86 client.add_model_request_handler(Self::handle_save_buffer);
87 client.add_model_request_handler(Self::handle_blame_buffer);
88 client.add_model_request_handler(Self::handle_reload_buffers);
89 }
90
91 /// Creates a buffer store, optionally retaining its buffers.
92 pub fn local(worktree_store: Model<WorktreeStore>, cx: &mut ModelContext<Self>) -> Self {
93 cx.subscribe(&worktree_store, |this, _, event, cx| {
94 if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
95 this.subscribe_to_worktree(worktree, cx);
96 }
97 })
98 .detach();
99
100 Self {
101 state: BufferStoreState::Local {},
102 downstream_client: None,
103 worktree_store,
104 opened_buffers: Default::default(),
105 remote_buffer_listeners: Default::default(),
106 loading_remote_buffers_by_id: Default::default(),
107 local_buffer_ids_by_path: Default::default(),
108 local_buffer_ids_by_entry_id: Default::default(),
109 loading_buffers_by_path: Default::default(),
110 shared_buffers: Default::default(),
111 }
112 }
113
114 pub fn remote(
115 worktree_store: Model<WorktreeStore>,
116 upstream_client: AnyProtoClient,
117 remote_id: u64,
118 cx: &mut ModelContext<Self>,
119 ) -> Self {
120 cx.subscribe(&worktree_store, |this, _, event, cx| {
121 if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
122 this.subscribe_to_worktree(worktree, cx);
123 }
124 })
125 .detach();
126
127 Self {
128 state: BufferStoreState::Remote {
129 shared_with_me: Default::default(),
130 upstream_client,
131 project_id: remote_id,
132 },
133 downstream_client: None,
134 worktree_store,
135 opened_buffers: Default::default(),
136 remote_buffer_listeners: Default::default(),
137 loading_remote_buffers_by_id: Default::default(),
138 local_buffer_ids_by_path: Default::default(),
139 local_buffer_ids_by_entry_id: Default::default(),
140 loading_buffers_by_path: Default::default(),
141 shared_buffers: Default::default(),
142 }
143 }
144
145 pub fn open_buffer(
146 &mut self,
147 project_path: ProjectPath,
148 cx: &mut ModelContext<Self>,
149 ) -> Task<Result<Model<Buffer>>> {
150 let existing_buffer = self.get_by_path(&project_path, cx);
151 if let Some(existing_buffer) = existing_buffer {
152 return Task::ready(Ok(existing_buffer));
153 }
154
155 let Some(worktree) = self
156 .worktree_store
157 .read(cx)
158 .worktree_for_id(project_path.worktree_id, cx)
159 else {
160 return Task::ready(Err(anyhow!("no such worktree")));
161 };
162
163 let loading_watch = match self.loading_buffers_by_path.entry(project_path.clone()) {
164 // If the given path is already being loaded, then wait for that existing
165 // task to complete and return the same buffer.
166 hash_map::Entry::Occupied(e) => e.get().clone(),
167
168 // Otherwise, record the fact that this path is now being loaded.
169 hash_map::Entry::Vacant(entry) => {
170 let (mut tx, rx) = postage::watch::channel();
171 entry.insert(rx.clone());
172
173 let project_path = project_path.clone();
174 let load_buffer = match worktree.read(cx) {
175 Worktree::Local(_) => {
176 self.open_local_buffer_internal(project_path.path.clone(), worktree, cx)
177 }
178 Worktree::Remote(tree) => {
179 self.open_remote_buffer_internal(&project_path.path, tree, cx)
180 }
181 };
182
183 cx.spawn(move |this, mut cx| async move {
184 let load_result = load_buffer.await;
185 *tx.borrow_mut() = Some(this.update(&mut cx, |this, _| {
186 // Record the fact that the buffer is no longer loading.
187 this.loading_buffers_by_path.remove(&project_path);
188 let buffer = load_result.map_err(Arc::new)?;
189 Ok(buffer)
190 })?);
191 anyhow::Ok(())
192 })
193 .detach();
194 rx
195 }
196 };
197
198 cx.background_executor().spawn(async move {
199 Self::wait_for_loading_buffer(loading_watch)
200 .await
201 .map_err(|e| e.cloned())
202 })
203 }
204
205 fn subscribe_to_worktree(&mut self, worktree: &Model<Worktree>, cx: &mut ModelContext<Self>) {
206 cx.subscribe(worktree, |this, worktree, event, cx| {
207 if worktree.read(cx).is_local() {
208 match event {
209 worktree::Event::UpdatedEntries(changes) => {
210 this.local_worktree_entries_changed(&worktree, changes, cx);
211 }
212 worktree::Event::UpdatedGitRepositories(updated_repos) => {
213 this.local_worktree_git_repos_changed(worktree.clone(), updated_repos, cx)
214 }
215 _ => {}
216 }
217 }
218 })
219 .detach();
220 }
221
222 fn local_worktree_entries_changed(
223 &mut self,
224 worktree_handle: &Model<Worktree>,
225 changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
226 cx: &mut ModelContext<Self>,
227 ) {
228 let snapshot = worktree_handle.read(cx).snapshot();
229 for (path, entry_id, _) in changes {
230 self.local_worktree_entry_changed(*entry_id, path, worktree_handle, &snapshot, cx);
231 }
232 }
233
234 fn local_worktree_git_repos_changed(
235 &mut self,
236 worktree_handle: Model<Worktree>,
237 changed_repos: &UpdatedGitRepositoriesSet,
238 cx: &mut ModelContext<Self>,
239 ) {
240 debug_assert!(worktree_handle.read(cx).is_local());
241
242 // Identify the loading buffers whose containing repository that has changed.
243 let future_buffers = self
244 .loading_buffers()
245 .filter_map(|(project_path, receiver)| {
246 if project_path.worktree_id != worktree_handle.read(cx).id() {
247 return None;
248 }
249 let path = &project_path.path;
250 changed_repos
251 .iter()
252 .find(|(work_dir, _)| path.starts_with(work_dir))?;
253 let path = path.clone();
254 Some(async move {
255 Self::wait_for_loading_buffer(receiver)
256 .await
257 .ok()
258 .map(|buffer| (buffer, path))
259 })
260 })
261 .collect::<FuturesUnordered<_>>();
262
263 // Identify the current buffers whose containing repository has changed.
264 let current_buffers = self
265 .buffers()
266 .filter_map(|buffer| {
267 let file = File::from_dyn(buffer.read(cx).file())?;
268 if file.worktree != worktree_handle {
269 return None;
270 }
271 changed_repos
272 .iter()
273 .find(|(work_dir, _)| file.path.starts_with(work_dir))?;
274 Some((buffer, file.path.clone()))
275 })
276 .collect::<Vec<_>>();
277
278 if future_buffers.len() + current_buffers.len() == 0 {
279 return;
280 }
281
282 cx.spawn(move |this, mut cx| async move {
283 // Wait for all of the buffers to load.
284 let future_buffers = future_buffers.collect::<Vec<_>>().await;
285
286 // Reload the diff base for every buffer whose containing git repository has changed.
287 let snapshot =
288 worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
289 let diff_bases_by_buffer = cx
290 .background_executor()
291 .spawn(async move {
292 let mut diff_base_tasks = future_buffers
293 .into_iter()
294 .flatten()
295 .chain(current_buffers)
296 .filter_map(|(buffer, path)| {
297 let (repo_entry, local_repo_entry) = snapshot.repo_for_path(&path)?;
298 let relative_path = repo_entry.relativize(&snapshot, &path).ok()?;
299 Some(async move {
300 let base_text =
301 local_repo_entry.repo().load_index_text(&relative_path);
302 Some((buffer, base_text))
303 })
304 })
305 .collect::<FuturesUnordered<_>>();
306
307 let mut diff_bases = Vec::with_capacity(diff_base_tasks.len());
308 while let Some(diff_base) = diff_base_tasks.next().await {
309 if let Some(diff_base) = diff_base {
310 diff_bases.push(diff_base);
311 }
312 }
313 diff_bases
314 })
315 .await;
316
317 this.update(&mut cx, |this, cx| {
318 // Assign the new diff bases on all of the buffers.
319 for (buffer, diff_base) in diff_bases_by_buffer {
320 let buffer_id = buffer.update(cx, |buffer, cx| {
321 buffer.set_diff_base(diff_base.clone(), cx);
322 buffer.remote_id().to_proto()
323 });
324 if let Some((client, project_id)) = &this.downstream_client {
325 client
326 .send(proto::UpdateDiffBase {
327 project_id: *project_id,
328 buffer_id,
329 diff_base,
330 })
331 .log_err();
332 }
333 }
334 })
335 })
336 .detach_and_log_err(cx);
337 }
338
339 fn open_local_buffer_internal(
340 &mut self,
341 path: Arc<Path>,
342 worktree: Model<Worktree>,
343 cx: &mut ModelContext<Self>,
344 ) -> Task<Result<Model<Buffer>>> {
345 let load_buffer = worktree.update(cx, |worktree, cx| {
346 let load_file = worktree.load_file(path.as_ref(), cx);
347 let reservation = cx.reserve_model();
348 let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
349 cx.spawn(move |_, mut cx| async move {
350 let loaded = load_file.await?;
351 let text_buffer = cx
352 .background_executor()
353 .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
354 .await;
355 cx.insert_model(reservation, |_| {
356 Buffer::build(
357 text_buffer,
358 loaded.diff_base,
359 Some(loaded.file),
360 Capability::ReadWrite,
361 )
362 })
363 })
364 });
365
366 cx.spawn(move |this, mut cx| async move {
367 let buffer = match load_buffer.await {
368 Ok(buffer) => Ok(buffer),
369 Err(error) if is_not_found_error(&error) => cx.new_model(|cx| {
370 let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
371 let text_buffer = text::Buffer::new(0, buffer_id, "".into());
372 Buffer::build(
373 text_buffer,
374 None,
375 Some(Arc::new(File {
376 worktree,
377 path,
378 mtime: None,
379 entry_id: None,
380 is_local: true,
381 is_deleted: false,
382 is_private: false,
383 })),
384 Capability::ReadWrite,
385 )
386 }),
387 Err(e) => Err(e),
388 }?;
389 this.update(&mut cx, |this, cx| {
390 this.add_buffer(buffer.clone(), cx).log_err();
391 })?;
392 Ok(buffer)
393 })
394 }
395
396 fn open_remote_buffer_internal(
397 &self,
398 path: &Arc<Path>,
399 worktree: &RemoteWorktree,
400 cx: &ModelContext<Self>,
401 ) -> Task<Result<Model<Buffer>>> {
402 let worktree_id = worktree.id().to_proto();
403 let project_id = worktree.project_id();
404 let client = worktree.client();
405 let path_string = path.clone().to_string_lossy().to_string();
406 cx.spawn(move |this, mut cx| async move {
407 let response = client
408 .request(proto::OpenBufferByPath {
409 project_id,
410 worktree_id,
411 path: path_string,
412 })
413 .await?;
414 let buffer_id = BufferId::new(response.buffer_id)?;
415 this.update(&mut cx, |this, cx| {
416 this.wait_for_remote_buffer(buffer_id, cx)
417 })?
418 .await
419 })
420 }
421
422 pub fn create_buffer(
423 &mut self,
424 remote_client: Option<(AnyProtoClient, u64)>,
425 cx: &mut ModelContext<Self>,
426 ) -> Task<Result<Model<Buffer>>> {
427 if let Some((remote_client, project_id)) = remote_client {
428 let create = remote_client.request(proto::OpenNewBuffer { project_id });
429 cx.spawn(|this, mut cx| async move {
430 let response = create.await?;
431 let buffer_id = BufferId::new(response.buffer_id)?;
432
433 this.update(&mut cx, |this, cx| {
434 this.wait_for_remote_buffer(buffer_id, cx)
435 })?
436 .await
437 })
438 } else {
439 Task::ready(Ok(self.create_local_buffer("", None, cx)))
440 }
441 }
442
443 pub fn create_local_buffer(
444 &mut self,
445 text: &str,
446 language: Option<Arc<Language>>,
447 cx: &mut ModelContext<Self>,
448 ) -> Model<Buffer> {
449 let buffer = cx.new_model(|cx| {
450 Buffer::local(text, cx)
451 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
452 });
453 self.add_buffer(buffer.clone(), cx).log_err();
454 buffer
455 }
456
457 pub fn save_buffer(
458 &mut self,
459 buffer: Model<Buffer>,
460 cx: &mut ModelContext<Self>,
461 ) -> Task<Result<()>> {
462 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
463 return Task::ready(Err(anyhow!("buffer doesn't have a file")));
464 };
465 match file.worktree.read(cx) {
466 Worktree::Local(_) => {
467 self.save_local_buffer(file.worktree.clone(), buffer, file.path.clone(), false, cx)
468 }
469 Worktree::Remote(tree) => self.save_remote_buffer(buffer, None, tree, cx),
470 }
471 }
472
473 pub fn save_buffer_as(
474 &mut self,
475 buffer: Model<Buffer>,
476 path: ProjectPath,
477 cx: &mut ModelContext<Self>,
478 ) -> Task<Result<()>> {
479 let Some(worktree) = self
480 .worktree_store
481 .read(cx)
482 .worktree_for_id(path.worktree_id, cx)
483 else {
484 return Task::ready(Err(anyhow!("no such worktree")));
485 };
486
487 let old_file = buffer.read(cx).file().cloned();
488
489 let task = match worktree.read(cx) {
490 Worktree::Local(_) => {
491 self.save_local_buffer(worktree, buffer.clone(), path.path, true, cx)
492 }
493 Worktree::Remote(tree) => {
494 self.save_remote_buffer(buffer.clone(), Some(path.to_proto()), tree, cx)
495 }
496 };
497 cx.spawn(|this, mut cx| async move {
498 task.await?;
499 this.update(&mut cx, |_, cx| {
500 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
501 })
502 })
503 }
504
505 fn save_local_buffer(
506 &self,
507 worktree: Model<Worktree>,
508 buffer_handle: Model<Buffer>,
509 path: Arc<Path>,
510 mut has_changed_file: bool,
511 cx: &mut ModelContext<Self>,
512 ) -> Task<Result<()>> {
513 let buffer = buffer_handle.read(cx);
514 let text = buffer.as_rope().clone();
515 let line_ending = buffer.line_ending();
516 let version = buffer.version();
517 let buffer_id = buffer.remote_id();
518 if buffer.file().is_some_and(|file| !file.is_created()) {
519 has_changed_file = true;
520 }
521
522 let save = worktree.update(cx, |worktree, cx| {
523 worktree.write_file(path.as_ref(), text, line_ending, cx)
524 });
525
526 cx.spawn(move |this, mut cx| async move {
527 let new_file = save.await?;
528 let mtime = new_file.mtime;
529 this.update(&mut cx, |this, cx| {
530 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
531 let project_id = *project_id;
532 if has_changed_file {
533 downstream_client
534 .send(proto::UpdateBufferFile {
535 project_id,
536 buffer_id: buffer_id.to_proto(),
537 file: Some(language::File::to_proto(&*new_file, cx)),
538 })
539 .log_err();
540 }
541 downstream_client
542 .send(proto::BufferSaved {
543 project_id,
544 buffer_id: buffer_id.to_proto(),
545 version: serialize_version(&version),
546 mtime: mtime.map(|time| time.into()),
547 })
548 .log_err();
549 }
550 })?;
551 buffer_handle.update(&mut cx, |buffer, cx| {
552 if has_changed_file {
553 buffer.file_updated(new_file, cx);
554 }
555 buffer.did_save(version.clone(), mtime, cx);
556 })
557 })
558 }
559
560 fn save_remote_buffer(
561 &self,
562 buffer_handle: Model<Buffer>,
563 new_path: Option<proto::ProjectPath>,
564 tree: &RemoteWorktree,
565 cx: &ModelContext<Self>,
566 ) -> Task<Result<()>> {
567 let buffer = buffer_handle.read(cx);
568 let buffer_id = buffer.remote_id().into();
569 let version = buffer.version();
570 let rpc = tree.client();
571 let project_id = tree.project_id();
572 cx.spawn(move |_, mut cx| async move {
573 let response = rpc
574 .request(proto::SaveBuffer {
575 project_id,
576 buffer_id,
577 new_path,
578 version: serialize_version(&version),
579 })
580 .await?;
581 let version = deserialize_version(&response.version);
582 let mtime = response.mtime.map(|mtime| mtime.into());
583
584 buffer_handle.update(&mut cx, |buffer, cx| {
585 buffer.did_save(version.clone(), mtime, cx);
586 })?;
587
588 Ok(())
589 })
590 }
591
592 pub fn blame_buffer(
593 &self,
594 buffer: &Model<Buffer>,
595 version: Option<clock::Global>,
596 cx: &AppContext,
597 ) -> Task<Result<Blame>> {
598 let buffer = buffer.read(cx);
599 let Some(file) = File::from_dyn(buffer.file()) else {
600 return Task::ready(Err(anyhow!("buffer has no file")));
601 };
602
603 match file.worktree.clone().read(cx) {
604 Worktree::Local(worktree) => {
605 let worktree = worktree.snapshot();
606 let blame_params = maybe!({
607 let (repo_entry, local_repo_entry) = match worktree.repo_for_path(&file.path) {
608 Some(repo_for_path) => repo_for_path,
609 None => anyhow::bail!(NoRepositoryError {}),
610 };
611
612 let relative_path = repo_entry
613 .relativize(&worktree, &file.path)
614 .context("failed to relativize buffer path")?;
615
616 let repo = local_repo_entry.repo().clone();
617
618 let content = match version {
619 Some(version) => buffer.rope_for_version(&version).clone(),
620 None => buffer.as_rope().clone(),
621 };
622
623 anyhow::Ok((repo, relative_path, content))
624 });
625
626 cx.background_executor().spawn(async move {
627 let (repo, relative_path, content) = blame_params?;
628 repo.blame(&relative_path, content)
629 .with_context(|| format!("Failed to blame {:?}", relative_path.0))
630 })
631 }
632 Worktree::Remote(worktree) => {
633 let buffer_id = buffer.remote_id();
634 let version = buffer.version();
635 let project_id = worktree.project_id();
636 let client = worktree.client();
637 cx.spawn(|_| async move {
638 let response = client
639 .request(proto::BlameBuffer {
640 project_id,
641 buffer_id: buffer_id.into(),
642 version: serialize_version(&version),
643 })
644 .await?;
645 Ok(deserialize_blame_buffer_response(response))
646 })
647 }
648 }
649 }
650
651 fn add_buffer(&mut self, buffer: Model<Buffer>, cx: &mut ModelContext<Self>) -> Result<()> {
652 let remote_id = buffer.read(cx).remote_id();
653 let is_remote = buffer.read(cx).replica_id() != 0;
654 let open_buffer = OpenBuffer::Buffer(buffer.downgrade());
655
656 let handle = cx.handle().downgrade();
657 buffer.update(cx, move |_, cx| {
658 cx.on_release(move |buffer, cx| {
659 handle
660 .update(cx, |_, cx| {
661 cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
662 })
663 .ok();
664 })
665 .detach()
666 });
667
668 match self.opened_buffers.entry(remote_id) {
669 hash_map::Entry::Vacant(entry) => {
670 entry.insert(open_buffer);
671 }
672 hash_map::Entry::Occupied(mut entry) => {
673 if let OpenBuffer::Operations(operations) = entry.get_mut() {
674 buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
675 } else if entry.get().upgrade().is_some() {
676 if is_remote {
677 return Ok(());
678 } else {
679 debug_panic!("buffer {} was already registered", remote_id);
680 Err(anyhow!("buffer {} was already registered", remote_id))?;
681 }
682 }
683 entry.insert(open_buffer);
684 }
685 }
686
687 if let Some(senders) = self.remote_buffer_listeners.remove(&remote_id) {
688 for sender in senders {
689 sender.send(Ok(buffer.clone())).ok();
690 }
691 }
692
693 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
694 if file.is_local {
695 self.local_buffer_ids_by_path.insert(
696 ProjectPath {
697 worktree_id: file.worktree_id(cx),
698 path: file.path.clone(),
699 },
700 remote_id,
701 );
702
703 if let Some(entry_id) = file.entry_id {
704 self.local_buffer_ids_by_entry_id
705 .insert(entry_id, remote_id);
706 }
707 }
708 }
709
710 cx.subscribe(&buffer, Self::on_buffer_event).detach();
711 cx.emit(BufferStoreEvent::BufferAdded(buffer));
712 Ok(())
713 }
714
715 pub fn buffers(&self) -> impl '_ + Iterator<Item = Model<Buffer>> {
716 self.opened_buffers
717 .values()
718 .filter_map(|buffer| buffer.upgrade())
719 }
720
721 pub fn loading_buffers(
722 &self,
723 ) -> impl Iterator<
724 Item = (
725 &ProjectPath,
726 postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
727 ),
728 > {
729 self.loading_buffers_by_path
730 .iter()
731 .map(|(path, rx)| (path, rx.clone()))
732 }
733
734 pub fn get_by_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Model<Buffer>> {
735 self.buffers().find_map(|buffer| {
736 let file = File::from_dyn(buffer.read(cx).file())?;
737 if file.worktree_id(cx) == path.worktree_id && file.path == path.path {
738 Some(buffer)
739 } else {
740 None
741 }
742 })
743 }
744
745 pub fn get(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
746 self.opened_buffers
747 .get(&buffer_id)
748 .and_then(|buffer| buffer.upgrade())
749 }
750
751 pub fn get_existing(&self, buffer_id: BufferId) -> Result<Model<Buffer>> {
752 self.get(buffer_id)
753 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
754 }
755
756 pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
757 self.get(buffer_id)
758 .or_else(|| self.loading_remote_buffers_by_id.get(&buffer_id).cloned())
759 }
760
761 pub fn wait_for_remote_buffer(
762 &mut self,
763 id: BufferId,
764 cx: &mut AppContext,
765 ) -> Task<Result<Model<Buffer>>> {
766 let buffer = self.get(id);
767 if let Some(buffer) = buffer {
768 return Task::ready(Ok(buffer));
769 }
770 let (tx, rx) = oneshot::channel();
771 self.remote_buffer_listeners.entry(id).or_default().push(tx);
772 cx.background_executor().spawn(async move { rx.await? })
773 }
774
775 pub fn buffer_version_info(
776 &self,
777 cx: &AppContext,
778 ) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
779 let buffers = self
780 .buffers()
781 .map(|buffer| {
782 let buffer = buffer.read(cx);
783 proto::BufferVersion {
784 id: buffer.remote_id().into(),
785 version: language::proto::serialize_version(&buffer.version),
786 }
787 })
788 .collect();
789 let incomplete_buffer_ids = self
790 .loading_remote_buffers_by_id
791 .keys()
792 .copied()
793 .collect::<Vec<_>>();
794 (buffers, incomplete_buffer_ids)
795 }
796
797 pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
798 self.drop_unnecessary_buffers(cx);
799
800 for buffer in self.buffers() {
801 buffer.update(cx, |buffer, cx| {
802 buffer.set_capability(Capability::ReadOnly, cx)
803 });
804 }
805
806 // Wake up all futures currently waiting on a buffer to get opened,
807 // to give them a chance to fail now that we've disconnected.
808 self.remote_buffer_listeners.clear();
809 }
810
811 pub fn shared(
812 &mut self,
813 remote_id: u64,
814 downstream_client: AnyProtoClient,
815 _cx: &mut AppContext,
816 ) {
817 self.downstream_client = Some((downstream_client, remote_id));
818 }
819
820 pub fn unshared(&mut self, _cx: &mut ModelContext<Self>) {
821 self.downstream_client.take();
822 self.forget_shared_buffers();
823 }
824
825 fn drop_unnecessary_buffers(&mut self, cx: &mut AppContext) {
826 for open_buffer in self.opened_buffers.values_mut() {
827 if let Some(buffer) = open_buffer.upgrade() {
828 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
829 }
830 }
831 }
832
833 pub fn discard_incomplete(&mut self) {
834 self.opened_buffers
835 .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
836 }
837
838 pub fn find_search_candidates(
839 &mut self,
840 query: &SearchQuery,
841 mut limit: usize,
842 fs: Arc<dyn Fs>,
843 cx: &mut ModelContext<Self>,
844 ) -> Receiver<Model<Buffer>> {
845 let (tx, rx) = smol::channel::unbounded();
846 let mut open_buffers = HashSet::default();
847 let mut unnamed_buffers = Vec::new();
848 for handle in self.buffers() {
849 let buffer = handle.read(cx);
850 if let Some(entry_id) = buffer.entry_id(cx) {
851 open_buffers.insert(entry_id);
852 } else {
853 limit = limit.saturating_sub(1);
854 unnamed_buffers.push(handle)
855 };
856 }
857
858 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
859 let mut project_paths_rx = self
860 .worktree_store
861 .update(cx, |worktree_store, cx| {
862 worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
863 })
864 .chunks(MAX_CONCURRENT_BUFFER_OPENS);
865
866 cx.spawn(|this, mut cx| async move {
867 for buffer in unnamed_buffers {
868 tx.send(buffer).await.ok();
869 }
870
871 while let Some(project_paths) = project_paths_rx.next().await {
872 let buffers = this.update(&mut cx, |this, cx| {
873 project_paths
874 .into_iter()
875 .map(|project_path| this.open_buffer(project_path, cx))
876 .collect::<Vec<_>>()
877 })?;
878 for buffer_task in buffers {
879 if let Some(buffer) = buffer_task.await.log_err() {
880 if tx.send(buffer).await.is_err() {
881 return anyhow::Ok(());
882 }
883 }
884 }
885 }
886 anyhow::Ok(())
887 })
888 .detach();
889 rx
890 }
891
892 fn on_buffer_event(
893 &mut self,
894 buffer: Model<Buffer>,
895 event: &BufferEvent,
896 cx: &mut ModelContext<Self>,
897 ) {
898 match event {
899 BufferEvent::FileHandleChanged => {
900 self.buffer_changed_file(buffer, cx);
901 }
902 BufferEvent::Reloaded => {
903 let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
904 return;
905 };
906 let buffer = buffer.read(cx);
907 downstream_client
908 .send(proto::BufferReloaded {
909 project_id: *project_id,
910 buffer_id: buffer.remote_id().to_proto(),
911 version: serialize_version(&buffer.version()),
912 mtime: buffer.saved_mtime().map(|t| t.into()),
913 line_ending: serialize_line_ending(buffer.line_ending()) as i32,
914 })
915 .log_err();
916 }
917 _ => {}
918 }
919 }
920
921 fn local_worktree_entry_changed(
922 &mut self,
923 entry_id: ProjectEntryId,
924 path: &Arc<Path>,
925 worktree: &Model<worktree::Worktree>,
926 snapshot: &worktree::Snapshot,
927 cx: &mut ModelContext<Self>,
928 ) -> Option<()> {
929 let project_path = ProjectPath {
930 worktree_id: snapshot.id(),
931 path: path.clone(),
932 };
933 let buffer_id = match self.local_buffer_ids_by_entry_id.get(&entry_id) {
934 Some(&buffer_id) => buffer_id,
935 None => self.local_buffer_ids_by_path.get(&project_path).copied()?,
936 };
937 let buffer = if let Some(buffer) = self.get(buffer_id) {
938 buffer
939 } else {
940 self.opened_buffers.remove(&buffer_id);
941 self.local_buffer_ids_by_path.remove(&project_path);
942 self.local_buffer_ids_by_entry_id.remove(&entry_id);
943 return None;
944 };
945
946 let events = buffer.update(cx, |buffer, cx| {
947 let file = buffer.file()?;
948 let old_file = File::from_dyn(Some(file))?;
949 if old_file.worktree != *worktree {
950 return None;
951 }
952
953 let new_file = if let Some(entry) = old_file
954 .entry_id
955 .and_then(|entry_id| snapshot.entry_for_id(entry_id))
956 {
957 File {
958 is_local: true,
959 entry_id: Some(entry.id),
960 mtime: entry.mtime,
961 path: entry.path.clone(),
962 worktree: worktree.clone(),
963 is_deleted: false,
964 is_private: entry.is_private,
965 }
966 } else if let Some(entry) = snapshot.entry_for_path(old_file.path.as_ref()) {
967 File {
968 is_local: true,
969 entry_id: Some(entry.id),
970 mtime: entry.mtime,
971 path: entry.path.clone(),
972 worktree: worktree.clone(),
973 is_deleted: false,
974 is_private: entry.is_private,
975 }
976 } else {
977 File {
978 is_local: true,
979 entry_id: old_file.entry_id,
980 path: old_file.path.clone(),
981 mtime: old_file.mtime,
982 worktree: worktree.clone(),
983 is_deleted: true,
984 is_private: old_file.is_private,
985 }
986 };
987
988 if new_file == *old_file {
989 return None;
990 }
991
992 let mut events = Vec::new();
993 if new_file.path != old_file.path {
994 self.local_buffer_ids_by_path.remove(&ProjectPath {
995 path: old_file.path.clone(),
996 worktree_id: old_file.worktree_id(cx),
997 });
998 self.local_buffer_ids_by_path.insert(
999 ProjectPath {
1000 worktree_id: new_file.worktree_id(cx),
1001 path: new_file.path.clone(),
1002 },
1003 buffer_id,
1004 );
1005 events.push(BufferStoreEvent::BufferChangedFilePath {
1006 buffer: cx.handle(),
1007 old_file: buffer.file().cloned(),
1008 });
1009 }
1010
1011 if new_file.entry_id != old_file.entry_id {
1012 if let Some(entry_id) = old_file.entry_id {
1013 self.local_buffer_ids_by_entry_id.remove(&entry_id);
1014 }
1015 if let Some(entry_id) = new_file.entry_id {
1016 self.local_buffer_ids_by_entry_id
1017 .insert(entry_id, buffer_id);
1018 }
1019 }
1020
1021 if let Some((client, project_id)) = &self.downstream_client {
1022 client
1023 .send(proto::UpdateBufferFile {
1024 project_id: *project_id,
1025 buffer_id: buffer_id.to_proto(),
1026 file: Some(new_file.to_proto(cx)),
1027 })
1028 .ok();
1029 }
1030
1031 buffer.file_updated(Arc::new(new_file), cx);
1032 Some(events)
1033 })?;
1034
1035 for event in events {
1036 cx.emit(event);
1037 }
1038
1039 None
1040 }
1041
1042 fn buffer_changed_file(&mut self, buffer: Model<Buffer>, cx: &mut AppContext) -> Option<()> {
1043 let file = File::from_dyn(buffer.read(cx).file())?;
1044
1045 let remote_id = buffer.read(cx).remote_id();
1046 if let Some(entry_id) = file.entry_id {
1047 match self.local_buffer_ids_by_entry_id.get(&entry_id) {
1048 Some(_) => {
1049 return None;
1050 }
1051 None => {
1052 self.local_buffer_ids_by_entry_id
1053 .insert(entry_id, remote_id);
1054 }
1055 }
1056 };
1057 self.local_buffer_ids_by_path.insert(
1058 ProjectPath {
1059 worktree_id: file.worktree_id(cx),
1060 path: file.path.clone(),
1061 },
1062 remote_id,
1063 );
1064
1065 Some(())
1066 }
1067
1068 pub async fn handle_update_buffer(
1069 this: Model<Self>,
1070 envelope: TypedEnvelope<proto::UpdateBuffer>,
1071 mut cx: AsyncAppContext,
1072 ) -> Result<proto::Ack> {
1073 let payload = envelope.payload.clone();
1074 let buffer_id = BufferId::new(payload.buffer_id)?;
1075 let ops = payload
1076 .operations
1077 .into_iter()
1078 .map(language::proto::deserialize_operation)
1079 .collect::<Result<Vec<_>, _>>()?;
1080 this.update(&mut cx, |this, cx| {
1081 match this.opened_buffers.entry(buffer_id) {
1082 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
1083 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
1084 OpenBuffer::Buffer(buffer) => {
1085 if let Some(buffer) = buffer.upgrade() {
1086 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
1087 }
1088 }
1089 },
1090 hash_map::Entry::Vacant(e) => {
1091 e.insert(OpenBuffer::Operations(ops));
1092 }
1093 }
1094 Ok(proto::Ack {})
1095 })?
1096 }
1097
1098 pub fn handle_synchronize_buffers(
1099 &mut self,
1100 envelope: TypedEnvelope<proto::SynchronizeBuffers>,
1101 cx: &mut ModelContext<Self>,
1102 client: Arc<Client>,
1103 ) -> Result<proto::SynchronizeBuffersResponse> {
1104 let project_id = envelope.payload.project_id;
1105 let mut response = proto::SynchronizeBuffersResponse {
1106 buffers: Default::default(),
1107 };
1108 let Some(guest_id) = envelope.original_sender_id else {
1109 anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
1110 };
1111
1112 self.shared_buffers.entry(guest_id).or_default().clear();
1113 for buffer in envelope.payload.buffers {
1114 let buffer_id = BufferId::new(buffer.id)?;
1115 let remote_version = language::proto::deserialize_version(&buffer.version);
1116 if let Some(buffer) = self.get(buffer_id) {
1117 self.shared_buffers
1118 .entry(guest_id)
1119 .or_default()
1120 .insert(buffer.clone());
1121
1122 let buffer = buffer.read(cx);
1123 response.buffers.push(proto::BufferVersion {
1124 id: buffer_id.into(),
1125 version: language::proto::serialize_version(&buffer.version),
1126 });
1127
1128 let operations = buffer.serialize_ops(Some(remote_version), cx);
1129 let client = client.clone();
1130 if let Some(file) = buffer.file() {
1131 client
1132 .send(proto::UpdateBufferFile {
1133 project_id,
1134 buffer_id: buffer_id.into(),
1135 file: Some(file.to_proto(cx)),
1136 })
1137 .log_err();
1138 }
1139
1140 client
1141 .send(proto::UpdateDiffBase {
1142 project_id,
1143 buffer_id: buffer_id.into(),
1144 diff_base: buffer.diff_base().map(ToString::to_string),
1145 })
1146 .log_err();
1147
1148 client
1149 .send(proto::BufferReloaded {
1150 project_id,
1151 buffer_id: buffer_id.into(),
1152 version: language::proto::serialize_version(buffer.saved_version()),
1153 mtime: buffer.saved_mtime().map(|time| time.into()),
1154 line_ending: language::proto::serialize_line_ending(buffer.line_ending())
1155 as i32,
1156 })
1157 .log_err();
1158
1159 cx.background_executor()
1160 .spawn(
1161 async move {
1162 let operations = operations.await;
1163 for chunk in split_operations(operations) {
1164 client
1165 .request(proto::UpdateBuffer {
1166 project_id,
1167 buffer_id: buffer_id.into(),
1168 operations: chunk,
1169 })
1170 .await?;
1171 }
1172 anyhow::Ok(())
1173 }
1174 .log_err(),
1175 )
1176 .detach();
1177 }
1178 }
1179 Ok(response)
1180 }
1181
1182 pub fn handle_create_buffer_for_peer(
1183 &mut self,
1184 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
1185 replica_id: u16,
1186 capability: Capability,
1187 cx: &mut ModelContext<Self>,
1188 ) -> Result<()> {
1189 match envelope
1190 .payload
1191 .variant
1192 .ok_or_else(|| anyhow!("missing variant"))?
1193 {
1194 proto::create_buffer_for_peer::Variant::State(mut state) => {
1195 let buffer_id = BufferId::new(state.id)?;
1196
1197 let buffer_result = maybe!({
1198 let mut buffer_file = None;
1199 if let Some(file) = state.file.take() {
1200 let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
1201 let worktree = self
1202 .worktree_store
1203 .read(cx)
1204 .worktree_for_id(worktree_id, cx)
1205 .ok_or_else(|| {
1206 anyhow!("no worktree found for id {}", file.worktree_id)
1207 })?;
1208 buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
1209 as Arc<dyn language::File>);
1210 }
1211 Buffer::from_proto(replica_id, capability, state, buffer_file)
1212 });
1213
1214 match buffer_result {
1215 Ok(buffer) => {
1216 let buffer = cx.new_model(|_| buffer);
1217 self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
1218 }
1219 Err(error) => {
1220 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
1221 for listener in listeners {
1222 listener.send(Err(anyhow!(error.cloned()))).ok();
1223 }
1224 }
1225 }
1226 }
1227 }
1228 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
1229 let buffer_id = BufferId::new(chunk.buffer_id)?;
1230 let buffer = self
1231 .loading_remote_buffers_by_id
1232 .get(&buffer_id)
1233 .cloned()
1234 .ok_or_else(|| {
1235 anyhow!(
1236 "received chunk for buffer {} without initial state",
1237 chunk.buffer_id
1238 )
1239 })?;
1240
1241 let result = maybe!({
1242 let operations = chunk
1243 .operations
1244 .into_iter()
1245 .map(language::proto::deserialize_operation)
1246 .collect::<Result<Vec<_>>>()?;
1247 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
1248 anyhow::Ok(())
1249 });
1250
1251 if let Err(error) = result {
1252 self.loading_remote_buffers_by_id.remove(&buffer_id);
1253 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
1254 for listener in listeners {
1255 listener.send(Err(error.cloned())).ok();
1256 }
1257 }
1258 } else if chunk.is_last {
1259 self.loading_remote_buffers_by_id.remove(&buffer_id);
1260 // retain buffers sent by peers to avoid races.
1261 match &mut self.state {
1262 BufferStoreState::Remote {
1263 ref mut shared_with_me,
1264 upstream_client,
1265 ..
1266 } => {
1267 if upstream_client.is_via_collab() {
1268 shared_with_me.insert(buffer.clone());
1269 }
1270 }
1271 _ => {}
1272 }
1273 self.add_buffer(buffer, cx)?;
1274 }
1275 }
1276 }
1277
1278 Ok(())
1279 }
1280
1281 pub async fn handle_update_buffer_file(
1282 this: Model<Self>,
1283 envelope: TypedEnvelope<proto::UpdateBufferFile>,
1284 mut cx: AsyncAppContext,
1285 ) -> Result<()> {
1286 let buffer_id = envelope.payload.buffer_id;
1287 let buffer_id = BufferId::new(buffer_id)?;
1288
1289 this.update(&mut cx, |this, cx| {
1290 let payload = envelope.payload.clone();
1291 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1292 let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
1293 let worktree = this
1294 .worktree_store
1295 .read(cx)
1296 .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
1297 .ok_or_else(|| anyhow!("no such worktree"))?;
1298 let file = File::from_proto(file, worktree, cx)?;
1299 let old_file = buffer.update(cx, |buffer, cx| {
1300 let old_file = buffer.file().cloned();
1301 let new_path = file.path.clone();
1302 buffer.file_updated(Arc::new(file), cx);
1303 if old_file
1304 .as_ref()
1305 .map_or(true, |old| *old.path() != new_path)
1306 {
1307 Some(old_file)
1308 } else {
1309 None
1310 }
1311 });
1312 if let Some(old_file) = old_file {
1313 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1314 }
1315 }
1316 Ok(())
1317 })?
1318 }
1319
1320 pub async fn handle_update_diff_base(
1321 this: Model<Self>,
1322 envelope: TypedEnvelope<proto::UpdateDiffBase>,
1323 mut cx: AsyncAppContext,
1324 ) -> Result<()> {
1325 this.update(&mut cx, |this, cx| {
1326 let buffer_id = envelope.payload.buffer_id;
1327 let buffer_id = BufferId::new(buffer_id)?;
1328 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1329 buffer.update(cx, |buffer, cx| {
1330 buffer.set_diff_base(envelope.payload.diff_base, cx)
1331 });
1332 }
1333 Ok(())
1334 })?
1335 }
1336
1337 pub async fn handle_save_buffer(
1338 this: Model<Self>,
1339 envelope: TypedEnvelope<proto::SaveBuffer>,
1340 mut cx: AsyncAppContext,
1341 ) -> Result<proto::BufferSaved> {
1342 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1343 let (buffer, project_id) = this.update(&mut cx, |this, _| {
1344 anyhow::Ok((
1345 this.get_existing(buffer_id)?,
1346 this.downstream_client
1347 .as_ref()
1348 .map(|(_, project_id)| *project_id)
1349 .context("project is not shared")?,
1350 ))
1351 })??;
1352 buffer
1353 .update(&mut cx, |buffer, _| {
1354 buffer.wait_for_version(deserialize_version(&envelope.payload.version))
1355 })?
1356 .await?;
1357 let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
1358
1359 if let Some(new_path) = envelope.payload.new_path {
1360 let new_path = ProjectPath::from_proto(new_path);
1361 this.update(&mut cx, |this, cx| {
1362 this.save_buffer_as(buffer.clone(), new_path, cx)
1363 })?
1364 .await?;
1365 } else {
1366 this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
1367 .await?;
1368 }
1369
1370 buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
1371 project_id,
1372 buffer_id: buffer_id.into(),
1373 version: serialize_version(buffer.saved_version()),
1374 mtime: buffer.saved_mtime().map(|time| time.into()),
1375 })
1376 }
1377
1378 pub async fn handle_close_buffer(
1379 this: Model<Self>,
1380 envelope: TypedEnvelope<proto::CloseBuffer>,
1381 mut cx: AsyncAppContext,
1382 ) -> Result<()> {
1383 let peer_id = envelope.sender_id;
1384 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1385 this.update(&mut cx, |this, _| {
1386 if let Some(buffer) = this.get(buffer_id) {
1387 if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
1388 if shared.remove(&buffer) {
1389 if shared.is_empty() {
1390 this.shared_buffers.remove(&peer_id);
1391 }
1392 return;
1393 }
1394 }
1395 };
1396 debug_panic!(
1397 "peer_id {} closed buffer_id {} which was either not open or already closed",
1398 peer_id,
1399 buffer_id
1400 )
1401 })
1402 }
1403
1404 pub async fn handle_buffer_saved(
1405 this: Model<Self>,
1406 envelope: TypedEnvelope<proto::BufferSaved>,
1407 mut cx: AsyncAppContext,
1408 ) -> Result<()> {
1409 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1410 let version = deserialize_version(&envelope.payload.version);
1411 let mtime = envelope.payload.mtime.map(|time| time.into());
1412 this.update(&mut cx, |this, cx| {
1413 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1414 buffer.update(cx, |buffer, cx| {
1415 buffer.did_save(version, mtime, cx);
1416 });
1417 }
1418 })
1419 }
1420
1421 pub async fn handle_buffer_reloaded(
1422 this: Model<Self>,
1423 envelope: TypedEnvelope<proto::BufferReloaded>,
1424 mut cx: AsyncAppContext,
1425 ) -> Result<()> {
1426 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1427 let version = deserialize_version(&envelope.payload.version);
1428 let mtime = envelope.payload.mtime.map(|time| time.into());
1429 let line_ending = deserialize_line_ending(
1430 proto::LineEnding::from_i32(envelope.payload.line_ending)
1431 .ok_or_else(|| anyhow!("missing line ending"))?,
1432 );
1433 this.update(&mut cx, |this, cx| {
1434 if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1435 buffer.update(cx, |buffer, cx| {
1436 buffer.did_reload(version, line_ending, mtime, cx);
1437 });
1438 }
1439 })
1440 }
1441
1442 pub async fn handle_blame_buffer(
1443 this: Model<Self>,
1444 envelope: TypedEnvelope<proto::BlameBuffer>,
1445 mut cx: AsyncAppContext,
1446 ) -> Result<proto::BlameBufferResponse> {
1447 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1448 let version = deserialize_version(&envelope.payload.version);
1449 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1450 buffer
1451 .update(&mut cx, |buffer, _| {
1452 buffer.wait_for_version(version.clone())
1453 })?
1454 .await?;
1455 let blame = this
1456 .update(&mut cx, |this, cx| {
1457 this.blame_buffer(&buffer, Some(version), cx)
1458 })?
1459 .await?;
1460 Ok(serialize_blame_buffer_response(blame))
1461 }
1462
1463 pub async fn wait_for_loading_buffer(
1464 mut receiver: postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
1465 ) -> Result<Model<Buffer>, Arc<anyhow::Error>> {
1466 loop {
1467 if let Some(result) = receiver.borrow().as_ref() {
1468 match result {
1469 Ok(buffer) => return Ok(buffer.to_owned()),
1470 Err(e) => return Err(e.to_owned()),
1471 }
1472 }
1473 receiver.next().await;
1474 }
1475 }
1476
1477 pub fn reload_buffers(
1478 &self,
1479 buffers: HashSet<Model<Buffer>>,
1480 push_to_history: bool,
1481 cx: &mut ModelContext<Self>,
1482 ) -> Task<Result<ProjectTransaction>> {
1483 let mut local_buffers = Vec::new();
1484 let mut remote_buffers = Vec::new();
1485 for buffer_handle in buffers {
1486 let buffer = buffer_handle.read(cx);
1487 if buffer.is_dirty() {
1488 if let Some(file) = File::from_dyn(buffer.file()) {
1489 if file.is_local() {
1490 local_buffers.push(buffer_handle);
1491 } else {
1492 remote_buffers.push(buffer_handle);
1493 }
1494 }
1495 }
1496 }
1497
1498 let client = self.upstream_client();
1499
1500 cx.spawn(move |this, mut cx| async move {
1501 let mut project_transaction = ProjectTransaction::default();
1502 if let Some((client, project_id)) = client {
1503 let response = client
1504 .request(proto::ReloadBuffers {
1505 project_id,
1506 buffer_ids: remote_buffers
1507 .iter()
1508 .filter_map(|buffer| {
1509 buffer
1510 .update(&mut cx, |buffer, _| buffer.remote_id().into())
1511 .ok()
1512 })
1513 .collect(),
1514 })
1515 .await?
1516 .transaction
1517 .ok_or_else(|| anyhow!("missing transaction"))?;
1518 BufferStore::deserialize_project_transaction(
1519 this,
1520 response,
1521 push_to_history,
1522 cx.clone(),
1523 )
1524 .await?;
1525 }
1526
1527 for buffer in local_buffers {
1528 let transaction = buffer
1529 .update(&mut cx, |buffer, cx| buffer.reload(cx))?
1530 .await?;
1531 buffer.update(&mut cx, |buffer, cx| {
1532 if let Some(transaction) = transaction {
1533 if !push_to_history {
1534 buffer.forget_transaction(transaction.id);
1535 }
1536 project_transaction.0.insert(cx.handle(), transaction);
1537 }
1538 })?;
1539 }
1540
1541 Ok(project_transaction)
1542 })
1543 }
1544
1545 async fn handle_reload_buffers(
1546 this: Model<Self>,
1547 envelope: TypedEnvelope<proto::ReloadBuffers>,
1548 mut cx: AsyncAppContext,
1549 ) -> Result<proto::ReloadBuffersResponse> {
1550 let sender_id = envelope.original_sender_id().unwrap_or_default();
1551 let reload = this.update(&mut cx, |this, cx| {
1552 let mut buffers = HashSet::default();
1553 for buffer_id in &envelope.payload.buffer_ids {
1554 let buffer_id = BufferId::new(*buffer_id)?;
1555 buffers.insert(this.get_existing(buffer_id)?);
1556 }
1557 Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
1558 })??;
1559
1560 let project_transaction = reload.await?;
1561 let project_transaction = this.update(&mut cx, |this, cx| {
1562 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
1563 })?;
1564 Ok(proto::ReloadBuffersResponse {
1565 transaction: Some(project_transaction),
1566 })
1567 }
1568
1569 pub fn create_buffer_for_peer(
1570 &mut self,
1571 buffer: &Model<Buffer>,
1572 peer_id: proto::PeerId,
1573 cx: &mut ModelContext<Self>,
1574 ) -> Task<Result<()>> {
1575 let buffer_id = buffer.read(cx).remote_id();
1576 if !self
1577 .shared_buffers
1578 .entry(peer_id)
1579 .or_default()
1580 .insert(buffer.clone())
1581 {
1582 return Task::ready(Ok(()));
1583 }
1584
1585 let Some((client, project_id)) = self.downstream_client.clone() else {
1586 return Task::ready(Ok(()));
1587 };
1588
1589 cx.spawn(|this, mut cx| async move {
1590 let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else {
1591 return anyhow::Ok(());
1592 };
1593
1594 let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
1595 let operations = operations.await;
1596 let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?;
1597
1598 let initial_state = proto::CreateBufferForPeer {
1599 project_id,
1600 peer_id: Some(peer_id),
1601 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
1602 };
1603
1604 if client.send(initial_state).log_err().is_some() {
1605 let client = client.clone();
1606 cx.background_executor()
1607 .spawn(async move {
1608 let mut chunks = split_operations(operations).peekable();
1609 while let Some(chunk) = chunks.next() {
1610 let is_last = chunks.peek().is_none();
1611 client.send(proto::CreateBufferForPeer {
1612 project_id,
1613 peer_id: Some(peer_id),
1614 variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
1615 proto::BufferChunk {
1616 buffer_id: buffer_id.into(),
1617 operations: chunk,
1618 is_last,
1619 },
1620 )),
1621 })?;
1622 }
1623 anyhow::Ok(())
1624 })
1625 .await
1626 .log_err();
1627 }
1628 Ok(())
1629 })
1630 }
1631
1632 pub fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> {
1633 match &self.state {
1634 BufferStoreState::Remote {
1635 upstream_client,
1636 project_id,
1637 ..
1638 } => Some((upstream_client.clone(), *project_id)),
1639 BufferStoreState::Local { .. } => None,
1640 }
1641 }
1642
1643 pub fn forget_shared_buffers(&mut self) {
1644 self.shared_buffers.clear();
1645 }
1646
1647 pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
1648 self.shared_buffers.remove(peer_id);
1649 }
1650
1651 pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
1652 if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
1653 self.shared_buffers.insert(new_peer_id, buffers);
1654 }
1655 }
1656
1657 pub fn shared_buffers(&self) -> &HashMap<proto::PeerId, HashSet<Model<Buffer>>> {
1658 &self.shared_buffers
1659 }
1660
1661 pub fn serialize_project_transaction_for_peer(
1662 &mut self,
1663 project_transaction: ProjectTransaction,
1664 peer_id: proto::PeerId,
1665 cx: &mut ModelContext<Self>,
1666 ) -> proto::ProjectTransaction {
1667 let mut serialized_transaction = proto::ProjectTransaction {
1668 buffer_ids: Default::default(),
1669 transactions: Default::default(),
1670 };
1671 for (buffer, transaction) in project_transaction.0 {
1672 self.create_buffer_for_peer(&buffer, peer_id, cx)
1673 .detach_and_log_err(cx);
1674 serialized_transaction
1675 .buffer_ids
1676 .push(buffer.read(cx).remote_id().into());
1677 serialized_transaction
1678 .transactions
1679 .push(language::proto::serialize_transaction(&transaction));
1680 }
1681 serialized_transaction
1682 }
1683
1684 pub async fn deserialize_project_transaction(
1685 this: WeakModel<Self>,
1686 message: proto::ProjectTransaction,
1687 push_to_history: bool,
1688 mut cx: AsyncAppContext,
1689 ) -> Result<ProjectTransaction> {
1690 let mut project_transaction = ProjectTransaction::default();
1691 for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions) {
1692 let buffer_id = BufferId::new(buffer_id)?;
1693 let buffer = this
1694 .update(&mut cx, |this, cx| {
1695 this.wait_for_remote_buffer(buffer_id, cx)
1696 })?
1697 .await?;
1698 let transaction = language::proto::deserialize_transaction(transaction)?;
1699 project_transaction.0.insert(buffer, transaction);
1700 }
1701
1702 for (buffer, transaction) in &project_transaction.0 {
1703 buffer
1704 .update(&mut cx, |buffer, _| {
1705 buffer.wait_for_edits(transaction.edit_ids.iter().copied())
1706 })?
1707 .await?;
1708
1709 if push_to_history {
1710 buffer.update(&mut cx, |buffer, _| {
1711 buffer.push_transaction(transaction.clone(), Instant::now());
1712 })?;
1713 }
1714 }
1715
1716 Ok(project_transaction)
1717 }
1718}
1719
1720impl OpenBuffer {
1721 fn upgrade(&self) -> Option<Model<Buffer>> {
1722 match self {
1723 OpenBuffer::Buffer(handle) => handle.upgrade(),
1724 OpenBuffer::Operations(_) => None,
1725 }
1726 }
1727}
1728
1729fn is_not_found_error(error: &anyhow::Error) -> bool {
1730 error
1731 .root_cause()
1732 .downcast_ref::<io::Error>()
1733 .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
1734}
1735
1736fn serialize_blame_buffer_response(blame: git::blame::Blame) -> proto::BlameBufferResponse {
1737 let entries = blame
1738 .entries
1739 .into_iter()
1740 .map(|entry| proto::BlameEntry {
1741 sha: entry.sha.as_bytes().into(),
1742 start_line: entry.range.start,
1743 end_line: entry.range.end,
1744 original_line_number: entry.original_line_number,
1745 author: entry.author.clone(),
1746 author_mail: entry.author_mail.clone(),
1747 author_time: entry.author_time,
1748 author_tz: entry.author_tz.clone(),
1749 committer: entry.committer.clone(),
1750 committer_mail: entry.committer_mail.clone(),
1751 committer_time: entry.committer_time,
1752 committer_tz: entry.committer_tz.clone(),
1753 summary: entry.summary.clone(),
1754 previous: entry.previous.clone(),
1755 filename: entry.filename.clone(),
1756 })
1757 .collect::<Vec<_>>();
1758
1759 let messages = blame
1760 .messages
1761 .into_iter()
1762 .map(|(oid, message)| proto::CommitMessage {
1763 oid: oid.as_bytes().into(),
1764 message,
1765 })
1766 .collect::<Vec<_>>();
1767
1768 let permalinks = blame
1769 .permalinks
1770 .into_iter()
1771 .map(|(oid, url)| proto::CommitPermalink {
1772 oid: oid.as_bytes().into(),
1773 permalink: url.to_string(),
1774 })
1775 .collect::<Vec<_>>();
1776
1777 proto::BlameBufferResponse {
1778 entries,
1779 messages,
1780 permalinks,
1781 remote_url: blame.remote_url,
1782 }
1783}
1784
1785fn deserialize_blame_buffer_response(response: proto::BlameBufferResponse) -> git::blame::Blame {
1786 let entries = response
1787 .entries
1788 .into_iter()
1789 .filter_map(|entry| {
1790 Some(git::blame::BlameEntry {
1791 sha: git::Oid::from_bytes(&entry.sha).ok()?,
1792 range: entry.start_line..entry.end_line,
1793 original_line_number: entry.original_line_number,
1794 committer: entry.committer,
1795 committer_time: entry.committer_time,
1796 committer_tz: entry.committer_tz,
1797 committer_mail: entry.committer_mail,
1798 author: entry.author,
1799 author_mail: entry.author_mail,
1800 author_time: entry.author_time,
1801 author_tz: entry.author_tz,
1802 summary: entry.summary,
1803 previous: entry.previous,
1804 filename: entry.filename,
1805 })
1806 })
1807 .collect::<Vec<_>>();
1808
1809 let messages = response
1810 .messages
1811 .into_iter()
1812 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
1813 .collect::<HashMap<_, _>>();
1814
1815 let permalinks = response
1816 .permalinks
1817 .into_iter()
1818 .filter_map(|permalink| {
1819 Some((
1820 git::Oid::from_bytes(&permalink.oid).ok()?,
1821 Url::from_str(&permalink.permalink).ok()?,
1822 ))
1823 })
1824 .collect::<HashMap<_, _>>();
1825
1826 Blame {
1827 entries,
1828 permalinks,
1829 messages,
1830 remote_url: response.remote_url,
1831 }
1832}