buffer_store.rs

  1use crate::ProjectPath;
  2use anyhow::{anyhow, Context as _, Result};
  3use collections::{hash_map, HashMap};
  4use futures::{channel::oneshot, StreamExt as _};
  5use gpui::{
  6    AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Task, WeakModel,
  7};
  8use language::{
  9    proto::{deserialize_version, serialize_version, split_operations},
 10    Buffer, Capability, Language, Operation,
 11};
 12use rpc::{
 13    proto::{self, AnyProtoClient, PeerId},
 14    ErrorExt as _, TypedEnvelope,
 15};
 16use std::{io, path::Path, sync::Arc};
 17use text::BufferId;
 18use util::{debug_panic, maybe, ResultExt as _};
 19use worktree::{File, ProjectEntryId, RemoteWorktree, Worktree};
 20
 21/// A set of open buffers.
 22pub struct BufferStore {
 23    retain_buffers: bool,
 24    opened_buffers: HashMap<BufferId, OpenBuffer>,
 25    local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
 26    local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
 27    #[allow(clippy::type_complexity)]
 28    loading_buffers_by_path: HashMap<
 29        ProjectPath,
 30        postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
 31    >,
 32    loading_remote_buffers_by_id: HashMap<BufferId, Model<Buffer>>,
 33    remote_buffer_listeners:
 34        HashMap<BufferId, Vec<oneshot::Sender<Result<Model<Buffer>, anyhow::Error>>>>,
 35}
 36
 37enum OpenBuffer {
 38    Strong(Model<Buffer>),
 39    Weak(WeakModel<Buffer>),
 40    Operations(Vec<Operation>),
 41}
 42
 43pub enum BufferStoreEvent {
 44    BufferAdded(Model<Buffer>),
 45    BufferChangedFilePath {
 46        buffer: Model<Buffer>,
 47        old_file: Option<Arc<File>>,
 48    },
 49    BufferSaved {
 50        buffer: Model<Buffer>,
 51        has_changed_file: bool,
 52        saved_version: clock::Global,
 53    },
 54}
 55
 56impl EventEmitter<BufferStoreEvent> for BufferStore {}
 57
 58impl BufferStore {
 59    /// Creates a buffer store, optionally retaining its buffers.
 60    ///
 61    /// If `retain_buffers` is `true`, then buffers are owned by the buffer store
 62    /// and won't be released unless they are explicitly removed, or `retain_buffers`
 63    /// is set to `false` via `set_retain_buffers`. Otherwise, buffers are stored as
 64    /// weak handles.
 65    pub fn new(retain_buffers: bool) -> Self {
 66        Self {
 67            retain_buffers,
 68            opened_buffers: Default::default(),
 69            remote_buffer_listeners: Default::default(),
 70            loading_remote_buffers_by_id: Default::default(),
 71            local_buffer_ids_by_path: Default::default(),
 72            local_buffer_ids_by_entry_id: Default::default(),
 73            loading_buffers_by_path: Default::default(),
 74        }
 75    }
 76
 77    pub fn open_buffer(
 78        &mut self,
 79        project_path: ProjectPath,
 80        worktree: Model<Worktree>,
 81        cx: &mut ModelContext<Self>,
 82    ) -> Task<Result<Model<Buffer>>> {
 83        let existing_buffer = self.get_by_path(&project_path, cx);
 84        if let Some(existing_buffer) = existing_buffer {
 85            return Task::ready(Ok(existing_buffer));
 86        }
 87
 88        let loading_watch = match self.loading_buffers_by_path.entry(project_path.clone()) {
 89            // If the given path is already being loaded, then wait for that existing
 90            // task to complete and return the same buffer.
 91            hash_map::Entry::Occupied(e) => e.get().clone(),
 92
 93            // Otherwise, record the fact that this path is now being loaded.
 94            hash_map::Entry::Vacant(entry) => {
 95                let (mut tx, rx) = postage::watch::channel();
 96                entry.insert(rx.clone());
 97
 98                let project_path = project_path.clone();
 99                let load_buffer = match worktree.read(cx) {
100                    Worktree::Local(_) => {
101                        self.open_local_buffer_internal(project_path.path.clone(), worktree, cx)
102                    }
103                    Worktree::Remote(tree) => {
104                        self.open_remote_buffer_internal(&project_path.path, tree, cx)
105                    }
106                };
107
108                cx.spawn(move |this, mut cx| async move {
109                    let load_result = load_buffer.await;
110                    *tx.borrow_mut() = Some(this.update(&mut cx, |this, _| {
111                        // Record the fact that the buffer is no longer loading.
112                        this.loading_buffers_by_path.remove(&project_path);
113                        let buffer = load_result.map_err(Arc::new)?;
114                        Ok(buffer)
115                    })?);
116                    anyhow::Ok(())
117                })
118                .detach();
119                rx
120            }
121        };
122
123        cx.background_executor().spawn(async move {
124            Self::wait_for_loading_buffer(loading_watch)
125                .await
126                .map_err(|e| e.cloned())
127        })
128    }
129
130    fn open_local_buffer_internal(
131        &mut self,
132        path: Arc<Path>,
133        worktree: Model<Worktree>,
134        cx: &mut ModelContext<Self>,
135    ) -> Task<Result<Model<Buffer>>> {
136        let load_buffer = worktree.update(cx, |worktree, cx| {
137            let load_file = worktree.load_file(path.as_ref(), cx);
138            let reservation = cx.reserve_model();
139            let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
140            cx.spawn(move |_, mut cx| async move {
141                let loaded = load_file.await?;
142                let text_buffer = cx
143                    .background_executor()
144                    .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
145                    .await;
146                cx.insert_model(reservation, |_| {
147                    Buffer::build(
148                        text_buffer,
149                        loaded.diff_base,
150                        Some(loaded.file),
151                        Capability::ReadWrite,
152                    )
153                })
154            })
155        });
156
157        cx.spawn(move |this, mut cx| async move {
158            let buffer = match load_buffer.await {
159                Ok(buffer) => Ok(buffer),
160                Err(error) if is_not_found_error(&error) => cx.new_model(|cx| {
161                    let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
162                    let text_buffer = text::Buffer::new(0, buffer_id, "".into());
163                    Buffer::build(
164                        text_buffer,
165                        None,
166                        Some(Arc::new(File {
167                            worktree,
168                            path,
169                            mtime: None,
170                            entry_id: None,
171                            is_local: true,
172                            is_deleted: false,
173                            is_private: false,
174                        })),
175                        Capability::ReadWrite,
176                    )
177                }),
178                Err(e) => Err(e),
179            }?;
180            this.update(&mut cx, |this, cx| {
181                this.add_buffer(buffer.clone(), cx).log_err();
182            })?;
183            Ok(buffer)
184        })
185    }
186
187    fn open_remote_buffer_internal(
188        &self,
189        path: &Arc<Path>,
190        worktree: &RemoteWorktree,
191        cx: &ModelContext<Self>,
192    ) -> Task<Result<Model<Buffer>>> {
193        let worktree_id = worktree.id().to_proto();
194        let project_id = worktree.project_id();
195        let client = worktree.client();
196        let path_string = path.clone().to_string_lossy().to_string();
197        cx.spawn(move |this, mut cx| async move {
198            let response = client
199                .request(proto::OpenBufferByPath {
200                    project_id,
201                    worktree_id,
202                    path: path_string,
203                })
204                .await?;
205            let buffer_id = BufferId::new(response.buffer_id)?;
206            this.update(&mut cx, |this, cx| {
207                this.wait_for_remote_buffer(buffer_id, cx)
208            })?
209            .await
210        })
211    }
212
213    pub fn create_buffer(
214        &mut self,
215        remote_client: Option<(AnyProtoClient, u64)>,
216        cx: &mut ModelContext<Self>,
217    ) -> Task<Result<Model<Buffer>>> {
218        if let Some((remote_client, project_id)) = remote_client {
219            let create = remote_client.request(proto::OpenNewBuffer { project_id });
220            cx.spawn(|this, mut cx| async move {
221                let response = create.await?;
222                let buffer_id = BufferId::new(response.buffer_id)?;
223
224                this.update(&mut cx, |this, cx| {
225                    this.wait_for_remote_buffer(buffer_id, cx)
226                })?
227                .await
228            })
229        } else {
230            Task::ready(Ok(self.create_local_buffer("", None, cx)))
231        }
232    }
233
234    pub fn create_local_buffer(
235        &mut self,
236        text: &str,
237        language: Option<Arc<Language>>,
238        cx: &mut ModelContext<Self>,
239    ) -> Model<Buffer> {
240        let buffer = cx.new_model(|cx| {
241            Buffer::local(text, cx)
242                .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
243        });
244        self.add_buffer(buffer.clone(), cx).log_err();
245        buffer
246    }
247
248    pub fn save_buffer(
249        &mut self,
250        buffer: Model<Buffer>,
251        cx: &mut ModelContext<Self>,
252    ) -> Task<Result<()>> {
253        let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
254            return Task::ready(Err(anyhow!("buffer doesn't have a file")));
255        };
256        match file.worktree.read(cx) {
257            Worktree::Local(_) => {
258                self.save_local_buffer(file.worktree.clone(), buffer, file.path.clone(), false, cx)
259            }
260            Worktree::Remote(tree) => self.save_remote_buffer(buffer, None, tree, cx),
261        }
262    }
263
264    pub fn save_buffer_as(
265        &mut self,
266        buffer: Model<Buffer>,
267        path: ProjectPath,
268        worktree: Model<Worktree>,
269        cx: &mut ModelContext<Self>,
270    ) -> Task<Result<()>> {
271        let old_file = File::from_dyn(buffer.read(cx).file())
272            .cloned()
273            .map(Arc::new);
274
275        let task = match worktree.read(cx) {
276            Worktree::Local(_) => {
277                self.save_local_buffer(worktree, buffer.clone(), path.path, true, cx)
278            }
279            Worktree::Remote(tree) => {
280                self.save_remote_buffer(buffer.clone(), Some(path.to_proto()), tree, cx)
281            }
282        };
283        cx.spawn(|this, mut cx| async move {
284            task.await?;
285            this.update(&mut cx, |_, cx| {
286                cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
287            })
288        })
289    }
290
291    fn save_local_buffer(
292        &self,
293        worktree: Model<Worktree>,
294        buffer_handle: Model<Buffer>,
295        path: Arc<Path>,
296        mut has_changed_file: bool,
297        cx: &mut ModelContext<Self>,
298    ) -> Task<Result<()>> {
299        let buffer = buffer_handle.read(cx);
300        let text = buffer.as_rope().clone();
301        let line_ending = buffer.line_ending();
302        let version = buffer.version();
303        if buffer.file().is_some_and(|file| !file.is_created()) {
304            has_changed_file = true;
305        }
306
307        let save = worktree.update(cx, |worktree, cx| {
308            worktree.write_file(path.as_ref(), text, line_ending, cx)
309        });
310
311        cx.spawn(move |this, mut cx| async move {
312            let new_file = save.await?;
313            let mtime = new_file.mtime;
314            buffer_handle.update(&mut cx, |buffer, cx| {
315                if has_changed_file {
316                    buffer.file_updated(new_file, cx);
317                }
318                buffer.did_save(version.clone(), mtime, cx);
319            })?;
320            this.update(&mut cx, |_, cx| {
321                cx.emit(BufferStoreEvent::BufferSaved {
322                    buffer: buffer_handle,
323                    has_changed_file,
324                    saved_version: version,
325                })
326            })?;
327            Ok(())
328        })
329    }
330
331    fn save_remote_buffer(
332        &self,
333        buffer_handle: Model<Buffer>,
334        new_path: Option<proto::ProjectPath>,
335        tree: &RemoteWorktree,
336        cx: &ModelContext<Self>,
337    ) -> Task<Result<()>> {
338        let buffer = buffer_handle.read(cx);
339        let buffer_id = buffer.remote_id().into();
340        let version = buffer.version();
341        let rpc = tree.client();
342        let project_id = tree.project_id();
343        cx.spawn(move |_, mut cx| async move {
344            let response = rpc
345                .request(proto::SaveBuffer {
346                    project_id,
347                    buffer_id,
348                    new_path,
349                    version: serialize_version(&version),
350                })
351                .await?;
352            let version = deserialize_version(&response.version);
353            let mtime = response.mtime.map(|mtime| mtime.into());
354
355            buffer_handle.update(&mut cx, |buffer, cx| {
356                buffer.did_save(version.clone(), mtime, cx);
357            })?;
358
359            Ok(())
360        })
361    }
362
363    fn add_buffer(&mut self, buffer: Model<Buffer>, cx: &mut ModelContext<Self>) -> Result<()> {
364        let remote_id = buffer.read(cx).remote_id();
365        let is_remote = buffer.read(cx).replica_id() != 0;
366        let open_buffer = if self.retain_buffers {
367            OpenBuffer::Strong(buffer.clone())
368        } else {
369            OpenBuffer::Weak(buffer.downgrade())
370        };
371
372        match self.opened_buffers.entry(remote_id) {
373            hash_map::Entry::Vacant(entry) => {
374                entry.insert(open_buffer);
375            }
376            hash_map::Entry::Occupied(mut entry) => {
377                if let OpenBuffer::Operations(operations) = entry.get_mut() {
378                    buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx))?;
379                } else if entry.get().upgrade().is_some() {
380                    if is_remote {
381                        return Ok(());
382                    } else {
383                        debug_panic!("buffer {} was already registered", remote_id);
384                        Err(anyhow!("buffer {} was already registered", remote_id))?;
385                    }
386                }
387                entry.insert(open_buffer);
388            }
389        }
390
391        if let Some(senders) = self.remote_buffer_listeners.remove(&remote_id) {
392            for sender in senders {
393                sender.send(Ok(buffer.clone())).ok();
394            }
395        }
396
397        if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
398            if file.is_local {
399                self.local_buffer_ids_by_path.insert(
400                    ProjectPath {
401                        worktree_id: file.worktree_id(cx),
402                        path: file.path.clone(),
403                    },
404                    remote_id,
405                );
406
407                if let Some(entry_id) = file.entry_id {
408                    self.local_buffer_ids_by_entry_id
409                        .insert(entry_id, remote_id);
410                }
411            }
412        }
413
414        cx.emit(BufferStoreEvent::BufferAdded(buffer));
415        Ok(())
416    }
417
418    pub fn buffers(&self) -> impl '_ + Iterator<Item = Model<Buffer>> {
419        self.opened_buffers
420            .values()
421            .filter_map(|buffer| buffer.upgrade())
422    }
423
424    pub fn loading_buffers(
425        &self,
426    ) -> impl Iterator<
427        Item = (
428            &ProjectPath,
429            postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
430        ),
431    > {
432        self.loading_buffers_by_path
433            .iter()
434            .map(|(path, rx)| (path, rx.clone()))
435    }
436
437    pub fn get_by_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Model<Buffer>> {
438        self.buffers().find_map(|buffer| {
439            let file = File::from_dyn(buffer.read(cx).file())?;
440            if file.worktree_id(cx) == path.worktree_id && &file.path == &path.path {
441                Some(buffer)
442            } else {
443                None
444            }
445        })
446    }
447
448    pub fn get(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
449        self.opened_buffers
450            .get(&buffer_id)
451            .and_then(|buffer| buffer.upgrade())
452    }
453
454    pub fn get_existing(&self, buffer_id: BufferId) -> Result<Model<Buffer>> {
455        self.get(buffer_id)
456            .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
457    }
458
459    pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
460        self.get(buffer_id)
461            .or_else(|| self.loading_remote_buffers_by_id.get(&buffer_id).cloned())
462    }
463
464    fn get_or_remove_by_path(
465        &mut self,
466        entry_id: ProjectEntryId,
467        project_path: &ProjectPath,
468    ) -> Option<(BufferId, Model<Buffer>)> {
469        let buffer_id = match self.local_buffer_ids_by_entry_id.get(&entry_id) {
470            Some(&buffer_id) => buffer_id,
471            None => match self.local_buffer_ids_by_path.get(project_path) {
472                Some(&buffer_id) => buffer_id,
473                None => {
474                    return None;
475                }
476            },
477        };
478        let buffer = if let Some(buffer) = self.get(buffer_id) {
479            buffer
480        } else {
481            self.opened_buffers.remove(&buffer_id);
482            self.local_buffer_ids_by_path.remove(project_path);
483            self.local_buffer_ids_by_entry_id.remove(&entry_id);
484            return None;
485        };
486        Some((buffer_id, buffer))
487    }
488
489    pub fn wait_for_remote_buffer(
490        &mut self,
491        id: BufferId,
492        cx: &mut AppContext,
493    ) -> Task<Result<Model<Buffer>>> {
494        let buffer = self.get(id);
495        if let Some(buffer) = buffer {
496            return Task::ready(Ok(buffer));
497        }
498        let (tx, rx) = oneshot::channel();
499        self.remote_buffer_listeners.entry(id).or_default().push(tx);
500        cx.background_executor().spawn(async move { rx.await? })
501    }
502
503    pub fn buffer_version_info(
504        &self,
505        cx: &AppContext,
506    ) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
507        let buffers = self
508            .buffers()
509            .map(|buffer| {
510                let buffer = buffer.read(cx);
511                proto::BufferVersion {
512                    id: buffer.remote_id().into(),
513                    version: language::proto::serialize_version(&buffer.version),
514                }
515            })
516            .collect();
517        let incomplete_buffer_ids = self
518            .loading_remote_buffers_by_id
519            .keys()
520            .copied()
521            .collect::<Vec<_>>();
522        (buffers, incomplete_buffer_ids)
523    }
524
525    pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
526        self.set_retain_buffers(false, cx);
527
528        for buffer in self.buffers() {
529            buffer.update(cx, |buffer, cx| {
530                buffer.set_capability(Capability::ReadOnly, cx)
531            });
532        }
533
534        // Wake up all futures currently waiting on a buffer to get opened,
535        // to give them a chance to fail now that we've disconnected.
536        self.remote_buffer_listeners.clear();
537    }
538
539    pub fn set_retain_buffers(&mut self, retain_buffers: bool, cx: &mut AppContext) {
540        self.retain_buffers = retain_buffers;
541        for open_buffer in self.opened_buffers.values_mut() {
542            if retain_buffers {
543                if let OpenBuffer::Weak(buffer) = open_buffer {
544                    if let Some(buffer) = buffer.upgrade() {
545                        *open_buffer = OpenBuffer::Strong(buffer);
546                    }
547                }
548            } else {
549                if let Some(buffer) = open_buffer.upgrade() {
550                    buffer.update(cx, |buffer, _| buffer.give_up_waiting());
551                }
552                if let OpenBuffer::Strong(buffer) = open_buffer {
553                    *open_buffer = OpenBuffer::Weak(buffer.downgrade());
554                }
555            }
556        }
557    }
558
559    pub fn discard_incomplete(&mut self) {
560        self.opened_buffers
561            .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
562    }
563
564    pub fn file_changed(
565        &mut self,
566        path: Arc<Path>,
567        entry_id: ProjectEntryId,
568        worktree_handle: &Model<worktree::Worktree>,
569        snapshot: &worktree::Snapshot,
570        cx: &mut ModelContext<Self>,
571    ) -> Option<(Model<Buffer>, Arc<File>, Arc<File>)> {
572        let (buffer_id, buffer) = self.get_or_remove_by_path(
573            entry_id,
574            &ProjectPath {
575                worktree_id: snapshot.id(),
576                path,
577            },
578        )?;
579
580        let result = buffer.update(cx, |buffer, cx| {
581            let old_file = File::from_dyn(buffer.file())?;
582            if old_file.worktree != *worktree_handle {
583                return None;
584            }
585
586            let new_file = if let Some(entry) = old_file
587                .entry_id
588                .and_then(|entry_id| snapshot.entry_for_id(entry_id))
589            {
590                File {
591                    is_local: true,
592                    entry_id: Some(entry.id),
593                    mtime: entry.mtime,
594                    path: entry.path.clone(),
595                    worktree: worktree_handle.clone(),
596                    is_deleted: false,
597                    is_private: entry.is_private,
598                }
599            } else if let Some(entry) = snapshot.entry_for_path(old_file.path.as_ref()) {
600                File {
601                    is_local: true,
602                    entry_id: Some(entry.id),
603                    mtime: entry.mtime,
604                    path: entry.path.clone(),
605                    worktree: worktree_handle.clone(),
606                    is_deleted: false,
607                    is_private: entry.is_private,
608                }
609            } else {
610                File {
611                    is_local: true,
612                    entry_id: old_file.entry_id,
613                    path: old_file.path.clone(),
614                    mtime: old_file.mtime,
615                    worktree: worktree_handle.clone(),
616                    is_deleted: true,
617                    is_private: old_file.is_private,
618                }
619            };
620
621            if new_file == *old_file {
622                return None;
623            }
624
625            let old_file = Arc::new(old_file.clone());
626            let new_file = Arc::new(new_file);
627            buffer.file_updated(new_file.clone(), cx);
628            Some((cx.handle(), old_file, new_file))
629        });
630
631        if let Some((buffer, old_file, new_file)) = &result {
632            if new_file.path != old_file.path {
633                self.local_buffer_ids_by_path.remove(&ProjectPath {
634                    path: old_file.path.clone(),
635                    worktree_id: old_file.worktree_id(cx),
636                });
637                self.local_buffer_ids_by_path.insert(
638                    ProjectPath {
639                        worktree_id: new_file.worktree_id(cx),
640                        path: new_file.path.clone(),
641                    },
642                    buffer_id,
643                );
644                cx.emit(BufferStoreEvent::BufferChangedFilePath {
645                    buffer: buffer.clone(),
646                    old_file: Some(old_file.clone()),
647                });
648            }
649
650            if new_file.entry_id != old_file.entry_id {
651                if let Some(entry_id) = old_file.entry_id {
652                    self.local_buffer_ids_by_entry_id.remove(&entry_id);
653                }
654                if let Some(entry_id) = new_file.entry_id {
655                    self.local_buffer_ids_by_entry_id
656                        .insert(entry_id, buffer_id);
657                }
658            }
659        }
660
661        result
662    }
663
664    pub fn buffer_changed_file(
665        &mut self,
666        buffer: Model<Buffer>,
667        cx: &mut AppContext,
668    ) -> Option<()> {
669        let file = File::from_dyn(buffer.read(cx).file())?;
670
671        let remote_id = buffer.read(cx).remote_id();
672        if let Some(entry_id) = file.entry_id {
673            match self.local_buffer_ids_by_entry_id.get(&entry_id) {
674                Some(_) => {
675                    return None;
676                }
677                None => {
678                    self.local_buffer_ids_by_entry_id
679                        .insert(entry_id, remote_id);
680                }
681            }
682        };
683        self.local_buffer_ids_by_path.insert(
684            ProjectPath {
685                worktree_id: file.worktree_id(cx),
686                path: file.path.clone(),
687            },
688            remote_id,
689        );
690
691        Some(())
692    }
693
694    pub async fn create_buffer_for_peer(
695        this: Model<Self>,
696        peer_id: PeerId,
697        buffer_id: BufferId,
698        project_id: u64,
699        client: AnyProtoClient,
700        cx: &mut AsyncAppContext,
701    ) -> Result<()> {
702        let Some(buffer) = this.update(cx, |this, _| this.get(buffer_id))? else {
703            return Ok(());
704        };
705
706        let operations = buffer.update(cx, |b, cx| b.serialize_ops(None, cx))?;
707        let operations = operations.await;
708        let state = buffer.update(cx, |buffer, cx| buffer.to_proto(cx))?;
709
710        let initial_state = proto::CreateBufferForPeer {
711            project_id,
712            peer_id: Some(peer_id),
713            variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
714        };
715
716        if client.send(initial_state).log_err().is_some() {
717            let client = client.clone();
718            cx.background_executor()
719                .spawn(async move {
720                    let mut chunks = split_operations(operations).peekable();
721                    while let Some(chunk) = chunks.next() {
722                        let is_last = chunks.peek().is_none();
723                        client.send(proto::CreateBufferForPeer {
724                            project_id,
725                            peer_id: Some(peer_id),
726                            variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
727                                proto::BufferChunk {
728                                    buffer_id: buffer_id.into(),
729                                    operations: chunk,
730                                    is_last,
731                                },
732                            )),
733                        })?;
734                    }
735                    anyhow::Ok(())
736                })
737                .await
738                .log_err();
739        }
740        Ok(())
741    }
742
743    pub fn handle_update_buffer(
744        &mut self,
745        envelope: TypedEnvelope<proto::UpdateBuffer>,
746        is_remote: bool,
747        cx: &mut AppContext,
748    ) -> Result<proto::Ack> {
749        let payload = envelope.payload.clone();
750        let buffer_id = BufferId::new(payload.buffer_id)?;
751        let ops = payload
752            .operations
753            .into_iter()
754            .map(language::proto::deserialize_operation)
755            .collect::<Result<Vec<_>, _>>()?;
756        match self.opened_buffers.entry(buffer_id) {
757            hash_map::Entry::Occupied(mut e) => match e.get_mut() {
758                OpenBuffer::Strong(buffer) => {
759                    buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
760                }
761                OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
762                OpenBuffer::Weak(_) => {}
763            },
764            hash_map::Entry::Vacant(e) => {
765                if !is_remote {
766                    debug_panic!(
767                        "received buffer update from {:?}",
768                        envelope.original_sender_id
769                    );
770                    return Err(anyhow!("received buffer update for non-remote project"));
771                }
772                e.insert(OpenBuffer::Operations(ops));
773            }
774        }
775        Ok(proto::Ack {})
776    }
777
778    pub fn handle_create_buffer_for_peer(
779        &mut self,
780        envelope: TypedEnvelope<proto::CreateBufferForPeer>,
781        mut worktrees: impl Iterator<Item = Model<Worktree>>,
782        replica_id: u16,
783        capability: Capability,
784        cx: &mut ModelContext<Self>,
785    ) -> Result<()> {
786        match envelope
787            .payload
788            .variant
789            .ok_or_else(|| anyhow!("missing variant"))?
790        {
791            proto::create_buffer_for_peer::Variant::State(mut state) => {
792                let buffer_id = BufferId::new(state.id)?;
793
794                let buffer_result = maybe!({
795                    let mut buffer_file = None;
796                    if let Some(file) = state.file.take() {
797                        let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
798                        let worktree = worktrees
799                            .find(|worktree| worktree.read(cx).id() == worktree_id)
800                            .ok_or_else(|| {
801                                anyhow!("no worktree found for id {}", file.worktree_id)
802                            })?;
803                        buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
804                            as Arc<dyn language::File>);
805                    }
806                    Buffer::from_proto(replica_id, capability, state, buffer_file)
807                });
808
809                match buffer_result {
810                    Ok(buffer) => {
811                        let buffer = cx.new_model(|_| buffer);
812                        self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
813                    }
814                    Err(error) => {
815                        if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
816                            for listener in listeners {
817                                listener.send(Err(anyhow!(error.cloned()))).ok();
818                            }
819                        }
820                    }
821                }
822            }
823            proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
824                let buffer_id = BufferId::new(chunk.buffer_id)?;
825                let buffer = self
826                    .loading_remote_buffers_by_id
827                    .get(&buffer_id)
828                    .cloned()
829                    .ok_or_else(|| {
830                        anyhow!(
831                            "received chunk for buffer {} without initial state",
832                            chunk.buffer_id
833                        )
834                    })?;
835
836                let result = maybe!({
837                    let operations = chunk
838                        .operations
839                        .into_iter()
840                        .map(language::proto::deserialize_operation)
841                        .collect::<Result<Vec<_>>>()?;
842                    buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))
843                });
844
845                if let Err(error) = result {
846                    self.loading_remote_buffers_by_id.remove(&buffer_id);
847                    if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
848                        for listener in listeners {
849                            listener.send(Err(error.cloned())).ok();
850                        }
851                    }
852                } else if chunk.is_last {
853                    self.loading_remote_buffers_by_id.remove(&buffer_id);
854                    self.add_buffer(buffer, cx)?;
855                }
856            }
857        }
858
859        Ok(())
860    }
861
862    pub async fn handle_save_buffer(
863        this: Model<Self>,
864        project_id: u64,
865        worktree: Option<Model<Worktree>>,
866        envelope: TypedEnvelope<proto::SaveBuffer>,
867        mut cx: AsyncAppContext,
868    ) -> Result<proto::BufferSaved> {
869        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
870        let buffer = this.update(&mut cx, |this, _| this.get_existing(buffer_id))??;
871        buffer
872            .update(&mut cx, |buffer, _| {
873                buffer.wait_for_version(deserialize_version(&envelope.payload.version))
874            })?
875            .await?;
876        let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
877
878        if let Some(new_path) = envelope.payload.new_path {
879            let worktree = worktree.context("no such worktree")?;
880            let new_path = ProjectPath::from_proto(new_path);
881            this.update(&mut cx, |this, cx| {
882                this.save_buffer_as(buffer.clone(), new_path, worktree, cx)
883            })?
884            .await?;
885        } else {
886            this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
887                .await?;
888        }
889
890        buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
891            project_id,
892            buffer_id: buffer_id.into(),
893            version: serialize_version(buffer.saved_version()),
894            mtime: buffer.saved_mtime().map(|time| time.into()),
895        })
896    }
897
898    pub async fn wait_for_loading_buffer(
899        mut receiver: postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
900    ) -> Result<Model<Buffer>, Arc<anyhow::Error>> {
901        loop {
902            if let Some(result) = receiver.borrow().as_ref() {
903                match result {
904                    Ok(buffer) => return Ok(buffer.to_owned()),
905                    Err(e) => return Err(e.to_owned()),
906                }
907            }
908            receiver.next().await;
909        }
910    }
911}
912
913impl OpenBuffer {
914    fn upgrade(&self) -> Option<Model<Buffer>> {
915        match self {
916            OpenBuffer::Strong(handle) => Some(handle.clone()),
917            OpenBuffer::Weak(handle) => handle.upgrade(),
918            OpenBuffer::Operations(_) => None,
919        }
920    }
921}
922
923fn is_not_found_error(error: &anyhow::Error) -> bool {
924    error
925        .root_cause()
926        .downcast_ref::<io::Error>()
927        .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
928}