1use crate::{
2 search::SearchQuery,
3 worktree_store::{WorktreeStore, WorktreeStoreEvent},
4 Item, ProjectPath,
5};
6use ::git::{parse_git_remote_url, BuildPermalinkParams, GitHostingProviderRegistry};
7use anyhow::{anyhow, Context as _, Result};
8use client::Client;
9use collections::{hash_map, HashMap, HashSet};
10use fs::Fs;
11use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt};
12use git::blame::Blame;
13use gpui::{
14 AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Subscription,
15 Task, WeakModel,
16};
17use http_client::Url;
18use language::{
19 proto::{
20 deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
21 split_operations,
22 },
23 Buffer, BufferEvent, Capability, DiskState, File as _, Language, Operation,
24};
25use rpc::{proto, AnyProtoClient, ErrorExt as _, TypedEnvelope};
26use smol::channel::Receiver;
27use std::{io, ops::Range, path::Path, str::FromStr as _, sync::Arc, time::Instant};
28use text::BufferId;
29use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
30use worktree::{File, PathChange, ProjectEntryId, UpdatedGitRepositoriesSet, Worktree, WorktreeId};
31
32trait BufferStoreImpl {
33 fn open_buffer(
34 &self,
35 path: Arc<Path>,
36 worktree: Model<Worktree>,
37 cx: &mut ModelContext<BufferStore>,
38 ) -> Task<Result<Model<Buffer>>>;
39
40 fn save_buffer(
41 &self,
42 buffer: Model<Buffer>,
43 cx: &mut ModelContext<BufferStore>,
44 ) -> Task<Result<()>>;
45
46 fn save_buffer_as(
47 &self,
48 buffer: Model<Buffer>,
49 path: ProjectPath,
50 cx: &mut ModelContext<BufferStore>,
51 ) -> Task<Result<()>>;
52
53 fn create_buffer(&self, cx: &mut ModelContext<BufferStore>) -> Task<Result<Model<Buffer>>>;
54
55 fn reload_buffers(
56 &self,
57 buffers: HashSet<Model<Buffer>>,
58 push_to_history: bool,
59 cx: &mut ModelContext<BufferStore>,
60 ) -> Task<Result<ProjectTransaction>>;
61
62 fn as_remote(&self) -> Option<Model<RemoteBufferStore>>;
63 fn as_local(&self) -> Option<Model<LocalBufferStore>>;
64}
65
66struct RemoteBufferStore {
67 shared_with_me: HashSet<Model<Buffer>>,
68 upstream_client: AnyProtoClient,
69 project_id: u64,
70 loading_remote_buffers_by_id: HashMap<BufferId, Model<Buffer>>,
71 remote_buffer_listeners:
72 HashMap<BufferId, Vec<oneshot::Sender<Result<Model<Buffer>, anyhow::Error>>>>,
73 worktree_store: Model<WorktreeStore>,
74 buffer_store: WeakModel<BufferStore>,
75}
76
77struct LocalBufferStore {
78 local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
79 local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
80 buffer_store: WeakModel<BufferStore>,
81 worktree_store: Model<WorktreeStore>,
82 _subscription: Subscription,
83}
84
85/// A set of open buffers.
86pub struct BufferStore {
87 state: Box<dyn BufferStoreImpl>,
88 #[allow(clippy::type_complexity)]
89 loading_buffers_by_path: HashMap<
90 ProjectPath,
91 postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
92 >,
93 worktree_store: Model<WorktreeStore>,
94 opened_buffers: HashMap<BufferId, OpenBuffer>,
95 downstream_client: Option<(AnyProtoClient, u64)>,
96 shared_buffers: HashMap<proto::PeerId, HashSet<Model<Buffer>>>,
97}
98
99enum OpenBuffer {
100 Buffer(WeakModel<Buffer>),
101 Operations(Vec<Operation>),
102}
103
104pub enum BufferStoreEvent {
105 BufferAdded(Model<Buffer>),
106 BufferDropped(BufferId),
107 BufferChangedFilePath {
108 buffer: Model<Buffer>,
109 old_file: Option<Arc<dyn language::File>>,
110 },
111}
112
113#[derive(Default, Debug)]
114pub struct ProjectTransaction(pub HashMap<Model<Buffer>, language::Transaction>);
115
116impl EventEmitter<BufferStoreEvent> for BufferStore {}
117
118impl RemoteBufferStore {
119 pub fn wait_for_remote_buffer(
120 &mut self,
121 id: BufferId,
122 cx: &mut AppContext,
123 ) -> Task<Result<Model<Buffer>>> {
124 let buffer_store = self.buffer_store.clone();
125 let (tx, rx) = oneshot::channel();
126 self.remote_buffer_listeners.entry(id).or_default().push(tx);
127
128 cx.spawn(|cx| async move {
129 if let Some(buffer) = buffer_store
130 .read_with(&cx, |buffer_store, _| buffer_store.get(id))
131 .ok()
132 .flatten()
133 {
134 return Ok(buffer);
135 }
136
137 cx.background_executor()
138 .spawn(async move { rx.await? })
139 .await
140 })
141 }
142
143 fn save_remote_buffer(
144 &self,
145 buffer_handle: Model<Buffer>,
146 new_path: Option<proto::ProjectPath>,
147 cx: &ModelContext<Self>,
148 ) -> Task<Result<()>> {
149 let buffer = buffer_handle.read(cx);
150 let buffer_id = buffer.remote_id().into();
151 let version = buffer.version();
152 let rpc = self.upstream_client.clone();
153 let project_id = self.project_id;
154 cx.spawn(move |_, mut cx| async move {
155 let response = rpc
156 .request(proto::SaveBuffer {
157 project_id,
158 buffer_id,
159 new_path,
160 version: serialize_version(&version),
161 })
162 .await?;
163 let version = deserialize_version(&response.version);
164 let mtime = response.mtime.map(|mtime| mtime.into());
165
166 buffer_handle.update(&mut cx, |buffer, cx| {
167 buffer.did_save(version.clone(), mtime, cx);
168 })?;
169
170 Ok(())
171 })
172 }
173
174 pub fn handle_create_buffer_for_peer(
175 &mut self,
176 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
177 replica_id: u16,
178 capability: Capability,
179 cx: &mut ModelContext<Self>,
180 ) -> Result<Option<Model<Buffer>>> {
181 match envelope
182 .payload
183 .variant
184 .ok_or_else(|| anyhow!("missing variant"))?
185 {
186 proto::create_buffer_for_peer::Variant::State(mut state) => {
187 let buffer_id = BufferId::new(state.id)?;
188
189 let buffer_result = maybe!({
190 let mut buffer_file = None;
191 if let Some(file) = state.file.take() {
192 let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
193 let worktree = self
194 .worktree_store
195 .read(cx)
196 .worktree_for_id(worktree_id, cx)
197 .ok_or_else(|| {
198 anyhow!("no worktree found for id {}", file.worktree_id)
199 })?;
200 buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
201 as Arc<dyn language::File>);
202 }
203 Buffer::from_proto(replica_id, capability, state, buffer_file)
204 });
205
206 match buffer_result {
207 Ok(buffer) => {
208 let buffer = cx.new_model(|_| buffer);
209 self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
210 }
211 Err(error) => {
212 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
213 for listener in listeners {
214 listener.send(Err(anyhow!(error.cloned()))).ok();
215 }
216 }
217 }
218 }
219 }
220 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
221 let buffer_id = BufferId::new(chunk.buffer_id)?;
222 let buffer = self
223 .loading_remote_buffers_by_id
224 .get(&buffer_id)
225 .cloned()
226 .ok_or_else(|| {
227 anyhow!(
228 "received chunk for buffer {} without initial state",
229 chunk.buffer_id
230 )
231 })?;
232
233 let result = maybe!({
234 let operations = chunk
235 .operations
236 .into_iter()
237 .map(language::proto::deserialize_operation)
238 .collect::<Result<Vec<_>>>()?;
239 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
240 anyhow::Ok(())
241 });
242
243 if let Err(error) = result {
244 self.loading_remote_buffers_by_id.remove(&buffer_id);
245 if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
246 for listener in listeners {
247 listener.send(Err(error.cloned())).ok();
248 }
249 }
250 } else if chunk.is_last {
251 self.loading_remote_buffers_by_id.remove(&buffer_id);
252 if self.upstream_client.is_via_collab() {
253 // retain buffers sent by peers to avoid races.
254 self.shared_with_me.insert(buffer.clone());
255 }
256
257 if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) {
258 for sender in senders {
259 sender.send(Ok(buffer.clone())).ok();
260 }
261 }
262 return Ok(Some(buffer));
263 }
264 }
265 }
266 return Ok(None);
267 }
268
269 pub fn incomplete_buffer_ids(&self) -> Vec<BufferId> {
270 self.loading_remote_buffers_by_id
271 .keys()
272 .copied()
273 .collect::<Vec<_>>()
274 }
275
276 pub fn deserialize_project_transaction(
277 &self,
278 message: proto::ProjectTransaction,
279 push_to_history: bool,
280 cx: &mut ModelContext<Self>,
281 ) -> Task<Result<ProjectTransaction>> {
282 cx.spawn(|this, mut cx| async move {
283 let mut project_transaction = ProjectTransaction::default();
284 for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
285 {
286 let buffer_id = BufferId::new(buffer_id)?;
287 let buffer = this
288 .update(&mut cx, |this, cx| {
289 this.wait_for_remote_buffer(buffer_id, cx)
290 })?
291 .await?;
292 let transaction = language::proto::deserialize_transaction(transaction)?;
293 project_transaction.0.insert(buffer, transaction);
294 }
295
296 for (buffer, transaction) in &project_transaction.0 {
297 buffer
298 .update(&mut cx, |buffer, _| {
299 buffer.wait_for_edits(transaction.edit_ids.iter().copied())
300 })?
301 .await?;
302
303 if push_to_history {
304 buffer.update(&mut cx, |buffer, _| {
305 buffer.push_transaction(transaction.clone(), Instant::now());
306 })?;
307 }
308 }
309
310 Ok(project_transaction)
311 })
312 }
313}
314
315impl BufferStoreImpl for Model<RemoteBufferStore> {
316 fn as_remote(&self) -> Option<Model<RemoteBufferStore>> {
317 Some(self.clone())
318 }
319
320 fn as_local(&self) -> Option<Model<LocalBufferStore>> {
321 None
322 }
323
324 fn save_buffer(
325 &self,
326 buffer: Model<Buffer>,
327 cx: &mut ModelContext<BufferStore>,
328 ) -> Task<Result<()>> {
329 self.update(cx, |this, cx| {
330 this.save_remote_buffer(buffer.clone(), None, cx)
331 })
332 }
333 fn save_buffer_as(
334 &self,
335 buffer: Model<Buffer>,
336 path: ProjectPath,
337 cx: &mut ModelContext<BufferStore>,
338 ) -> Task<Result<()>> {
339 self.update(cx, |this, cx| {
340 this.save_remote_buffer(buffer, Some(path.to_proto()), cx)
341 })
342 }
343
344 fn open_buffer(
345 &self,
346 path: Arc<Path>,
347 worktree: Model<Worktree>,
348 cx: &mut ModelContext<BufferStore>,
349 ) -> Task<Result<Model<Buffer>>> {
350 self.update(cx, |this, cx| {
351 let worktree_id = worktree.read(cx).id().to_proto();
352 let project_id = this.project_id;
353 let client = this.upstream_client.clone();
354 let path_string = path.clone().to_string_lossy().to_string();
355 cx.spawn(move |this, mut cx| async move {
356 let response = client
357 .request(proto::OpenBufferByPath {
358 project_id,
359 worktree_id,
360 path: path_string,
361 })
362 .await?;
363 let buffer_id = BufferId::new(response.buffer_id)?;
364
365 let buffer = this
366 .update(&mut cx, {
367 |this, cx| this.wait_for_remote_buffer(buffer_id, cx)
368 })?
369 .await?;
370
371 Ok(buffer)
372 })
373 })
374 }
375
376 fn create_buffer(&self, cx: &mut ModelContext<BufferStore>) -> Task<Result<Model<Buffer>>> {
377 self.update(cx, |this, cx| {
378 let create = this.upstream_client.request(proto::OpenNewBuffer {
379 project_id: this.project_id,
380 });
381 cx.spawn(|this, mut cx| async move {
382 let response = create.await?;
383 let buffer_id = BufferId::new(response.buffer_id)?;
384
385 this.update(&mut cx, |this, cx| {
386 this.wait_for_remote_buffer(buffer_id, cx)
387 })?
388 .await
389 })
390 })
391 }
392
393 fn reload_buffers(
394 &self,
395 buffers: HashSet<Model<Buffer>>,
396 push_to_history: bool,
397 cx: &mut ModelContext<BufferStore>,
398 ) -> Task<Result<ProjectTransaction>> {
399 self.update(cx, |this, cx| {
400 let request = this.upstream_client.request(proto::ReloadBuffers {
401 project_id: this.project_id,
402 buffer_ids: buffers
403 .iter()
404 .map(|buffer| buffer.read(cx).remote_id().to_proto())
405 .collect(),
406 });
407
408 cx.spawn(|this, mut cx| async move {
409 let response = request
410 .await?
411 .transaction
412 .ok_or_else(|| anyhow!("missing transaction"))?;
413 this.update(&mut cx, |this, cx| {
414 this.deserialize_project_transaction(response, push_to_history, cx)
415 })?
416 .await
417 })
418 })
419 }
420}
421
422impl LocalBufferStore {
423 fn save_local_buffer(
424 &self,
425 buffer_handle: Model<Buffer>,
426 worktree: Model<Worktree>,
427 path: Arc<Path>,
428 mut has_changed_file: bool,
429 cx: &mut ModelContext<Self>,
430 ) -> Task<Result<()>> {
431 let buffer = buffer_handle.read(cx);
432
433 let text = buffer.as_rope().clone();
434 let line_ending = buffer.line_ending();
435 let version = buffer.version();
436 let buffer_id = buffer.remote_id();
437 if buffer
438 .file()
439 .is_some_and(|file| file.disk_state() == DiskState::New)
440 {
441 has_changed_file = true;
442 }
443
444 let save = worktree.update(cx, |worktree, cx| {
445 worktree.write_file(path.as_ref(), text, line_ending, cx)
446 });
447
448 cx.spawn(move |this, mut cx| async move {
449 let new_file = save.await?;
450 let mtime = new_file.disk_state().mtime();
451 this.update(&mut cx, |this, cx| {
452 if let Some((downstream_client, project_id)) = this.downstream_client(cx) {
453 if has_changed_file {
454 downstream_client
455 .send(proto::UpdateBufferFile {
456 project_id,
457 buffer_id: buffer_id.to_proto(),
458 file: Some(language::File::to_proto(&*new_file, cx)),
459 })
460 .log_err();
461 }
462 downstream_client
463 .send(proto::BufferSaved {
464 project_id,
465 buffer_id: buffer_id.to_proto(),
466 version: serialize_version(&version),
467 mtime: mtime.map(|time| time.into()),
468 })
469 .log_err();
470 }
471 })?;
472 buffer_handle.update(&mut cx, |buffer, cx| {
473 if has_changed_file {
474 buffer.file_updated(new_file, cx);
475 }
476 buffer.did_save(version.clone(), mtime, cx);
477 })
478 })
479 }
480
481 fn subscribe_to_worktree(&mut self, worktree: &Model<Worktree>, cx: &mut ModelContext<Self>) {
482 cx.subscribe(worktree, |this, worktree, event, cx| {
483 if worktree.read(cx).is_local() {
484 match event {
485 worktree::Event::UpdatedEntries(changes) => {
486 this.local_worktree_entries_changed(&worktree, changes, cx);
487 }
488 worktree::Event::UpdatedGitRepositories(updated_repos) => {
489 this.local_worktree_git_repos_changed(worktree.clone(), updated_repos, cx)
490 }
491 _ => {}
492 }
493 }
494 })
495 .detach();
496 }
497
498 fn local_worktree_entries_changed(
499 &mut self,
500 worktree_handle: &Model<Worktree>,
501 changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
502 cx: &mut ModelContext<Self>,
503 ) {
504 let snapshot = worktree_handle.read(cx).snapshot();
505 for (path, entry_id, _) in changes {
506 self.local_worktree_entry_changed(*entry_id, path, worktree_handle, &snapshot, cx);
507 }
508 }
509
510 fn local_worktree_git_repos_changed(
511 &mut self,
512 worktree_handle: Model<Worktree>,
513 changed_repos: &UpdatedGitRepositoriesSet,
514 cx: &mut ModelContext<Self>,
515 ) {
516 debug_assert!(worktree_handle.read(cx).is_local());
517 let Some(buffer_store) = self.buffer_store.upgrade() else {
518 return;
519 };
520
521 // Identify the loading buffers whose containing repository that has changed.
522 let (future_buffers, current_buffers) = buffer_store.update(cx, |buffer_store, cx| {
523 let future_buffers = buffer_store
524 .loading_buffers()
525 .filter_map(|(project_path, receiver)| {
526 if project_path.worktree_id != worktree_handle.read(cx).id() {
527 return None;
528 }
529 let path = &project_path.path;
530 changed_repos
531 .iter()
532 .find(|(work_dir, _)| path.starts_with(work_dir))?;
533 let path = path.clone();
534 Some(async move {
535 BufferStore::wait_for_loading_buffer(receiver)
536 .await
537 .ok()
538 .map(|buffer| (buffer, path))
539 })
540 })
541 .collect::<FuturesUnordered<_>>();
542
543 // Identify the current buffers whose containing repository has changed.
544 let current_buffers = buffer_store
545 .buffers()
546 .filter_map(|buffer| {
547 let file = File::from_dyn(buffer.read(cx).file())?;
548 if file.worktree != worktree_handle {
549 return None;
550 }
551 changed_repos
552 .iter()
553 .find(|(work_dir, _)| file.path.starts_with(work_dir))?;
554 Some((buffer, file.path.clone()))
555 })
556 .collect::<Vec<_>>();
557 (future_buffers, current_buffers)
558 });
559
560 if future_buffers.len() + current_buffers.len() == 0 {
561 return;
562 }
563
564 cx.spawn(move |this, mut cx| async move {
565 // Wait for all of the buffers to load.
566 let future_buffers = future_buffers.collect::<Vec<_>>().await;
567
568 // Reload the diff base for every buffer whose containing git repository has changed.
569 let snapshot =
570 worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
571 let diff_bases_by_buffer = cx
572 .background_executor()
573 .spawn(async move {
574 let mut diff_base_tasks = future_buffers
575 .into_iter()
576 .flatten()
577 .chain(current_buffers)
578 .filter_map(|(buffer, path)| {
579 let (repo_entry, local_repo_entry) = snapshot.repo_for_path(&path)?;
580 let relative_path = repo_entry.relativize(&snapshot, &path).ok()?;
581 Some(async move {
582 let base_text =
583 local_repo_entry.repo().load_index_text(&relative_path);
584 Some((buffer, base_text))
585 })
586 })
587 .collect::<FuturesUnordered<_>>();
588
589 let mut diff_bases = Vec::with_capacity(diff_base_tasks.len());
590 while let Some(diff_base) = diff_base_tasks.next().await {
591 if let Some(diff_base) = diff_base {
592 diff_bases.push(diff_base);
593 }
594 }
595 diff_bases
596 })
597 .await;
598
599 this.update(&mut cx, |this, cx| {
600 // Assign the new diff bases on all of the buffers.
601 for (buffer, diff_base) in diff_bases_by_buffer {
602 let buffer_id = buffer.update(cx, |buffer, cx| {
603 buffer.set_diff_base(diff_base.clone(), cx);
604 buffer.remote_id().to_proto()
605 });
606 if let Some((client, project_id)) = &this.downstream_client(cx) {
607 client
608 .send(proto::UpdateDiffBase {
609 project_id: *project_id,
610 buffer_id,
611 diff_base,
612 })
613 .log_err();
614 }
615 }
616 })
617 })
618 .detach_and_log_err(cx);
619 }
620
621 fn local_worktree_entry_changed(
622 &mut self,
623 entry_id: ProjectEntryId,
624 path: &Arc<Path>,
625 worktree: &Model<worktree::Worktree>,
626 snapshot: &worktree::Snapshot,
627 cx: &mut ModelContext<Self>,
628 ) -> Option<()> {
629 let project_path = ProjectPath {
630 worktree_id: snapshot.id(),
631 path: path.clone(),
632 };
633 let buffer_id = match self.local_buffer_ids_by_entry_id.get(&entry_id) {
634 Some(&buffer_id) => buffer_id,
635 None => self.local_buffer_ids_by_path.get(&project_path).copied()?,
636 };
637 let buffer = self
638 .buffer_store
639 .update(cx, |buffer_store, _| {
640 if let Some(buffer) = buffer_store.get(buffer_id) {
641 Some(buffer)
642 } else {
643 buffer_store.opened_buffers.remove(&buffer_id);
644 None
645 }
646 })
647 .ok()
648 .flatten();
649 let buffer = if let Some(buffer) = buffer {
650 buffer
651 } else {
652 self.local_buffer_ids_by_path.remove(&project_path);
653 self.local_buffer_ids_by_entry_id.remove(&entry_id);
654 return None;
655 };
656
657 let events = buffer.update(cx, |buffer, cx| {
658 let file = buffer.file()?;
659 let old_file = File::from_dyn(Some(file))?;
660 if old_file.worktree != *worktree {
661 return None;
662 }
663
664 let snapshot_entry = old_file
665 .entry_id
666 .and_then(|entry_id| snapshot.entry_for_id(entry_id))
667 .or_else(|| snapshot.entry_for_path(old_file.path.as_ref()));
668
669 let new_file = if let Some(entry) = snapshot_entry {
670 File {
671 disk_state: match entry.mtime {
672 Some(mtime) => DiskState::Present { mtime },
673 None => old_file.disk_state,
674 },
675 is_local: true,
676 entry_id: Some(entry.id),
677 path: entry.path.clone(),
678 worktree: worktree.clone(),
679 is_private: entry.is_private,
680 }
681 } else {
682 File {
683 disk_state: DiskState::Deleted,
684 is_local: true,
685 entry_id: old_file.entry_id,
686 path: old_file.path.clone(),
687 worktree: worktree.clone(),
688 is_private: old_file.is_private,
689 }
690 };
691
692 if new_file == *old_file {
693 return None;
694 }
695
696 let mut events = Vec::new();
697 if new_file.path != old_file.path {
698 self.local_buffer_ids_by_path.remove(&ProjectPath {
699 path: old_file.path.clone(),
700 worktree_id: old_file.worktree_id(cx),
701 });
702 self.local_buffer_ids_by_path.insert(
703 ProjectPath {
704 worktree_id: new_file.worktree_id(cx),
705 path: new_file.path.clone(),
706 },
707 buffer_id,
708 );
709 events.push(BufferStoreEvent::BufferChangedFilePath {
710 buffer: cx.handle(),
711 old_file: buffer.file().cloned(),
712 });
713 }
714
715 if new_file.entry_id != old_file.entry_id {
716 if let Some(entry_id) = old_file.entry_id {
717 self.local_buffer_ids_by_entry_id.remove(&entry_id);
718 }
719 if let Some(entry_id) = new_file.entry_id {
720 self.local_buffer_ids_by_entry_id
721 .insert(entry_id, buffer_id);
722 }
723 }
724
725 if let Some((client, project_id)) = &self.downstream_client(cx) {
726 client
727 .send(proto::UpdateBufferFile {
728 project_id: *project_id,
729 buffer_id: buffer_id.to_proto(),
730 file: Some(new_file.to_proto(cx)),
731 })
732 .ok();
733 }
734
735 buffer.file_updated(Arc::new(new_file), cx);
736 Some(events)
737 })?;
738 self.buffer_store
739 .update(cx, |_buffer_store, cx| {
740 for event in events {
741 cx.emit(event);
742 }
743 })
744 .log_err()?;
745
746 None
747 }
748
749 fn downstream_client(&self, cx: &AppContext) -> Option<(AnyProtoClient, u64)> {
750 self.buffer_store
751 .upgrade()?
752 .read(cx)
753 .downstream_client
754 .clone()
755 }
756
757 fn buffer_changed_file(&mut self, buffer: Model<Buffer>, cx: &mut AppContext) -> Option<()> {
758 let file = File::from_dyn(buffer.read(cx).file())?;
759
760 let remote_id = buffer.read(cx).remote_id();
761 if let Some(entry_id) = file.entry_id {
762 match self.local_buffer_ids_by_entry_id.get(&entry_id) {
763 Some(_) => {
764 return None;
765 }
766 None => {
767 self.local_buffer_ids_by_entry_id
768 .insert(entry_id, remote_id);
769 }
770 }
771 };
772 self.local_buffer_ids_by_path.insert(
773 ProjectPath {
774 worktree_id: file.worktree_id(cx),
775 path: file.path.clone(),
776 },
777 remote_id,
778 );
779
780 Some(())
781 }
782}
783
784impl BufferStoreImpl for Model<LocalBufferStore> {
785 fn as_remote(&self) -> Option<Model<RemoteBufferStore>> {
786 None
787 }
788
789 fn as_local(&self) -> Option<Model<LocalBufferStore>> {
790 Some(self.clone())
791 }
792
793 fn save_buffer(
794 &self,
795 buffer: Model<Buffer>,
796 cx: &mut ModelContext<BufferStore>,
797 ) -> Task<Result<()>> {
798 self.update(cx, |this, cx| {
799 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
800 return Task::ready(Err(anyhow!("buffer doesn't have a file")));
801 };
802 let worktree = file.worktree.clone();
803 this.save_local_buffer(buffer, worktree, file.path.clone(), false, cx)
804 })
805 }
806
807 fn save_buffer_as(
808 &self,
809 buffer: Model<Buffer>,
810 path: ProjectPath,
811 cx: &mut ModelContext<BufferStore>,
812 ) -> Task<Result<()>> {
813 self.update(cx, |this, cx| {
814 let Some(worktree) = this
815 .worktree_store
816 .read(cx)
817 .worktree_for_id(path.worktree_id, cx)
818 else {
819 return Task::ready(Err(anyhow!("no such worktree")));
820 };
821 this.save_local_buffer(buffer, worktree, path.path.clone(), true, cx)
822 })
823 }
824
825 fn open_buffer(
826 &self,
827 path: Arc<Path>,
828 worktree: Model<Worktree>,
829 cx: &mut ModelContext<BufferStore>,
830 ) -> Task<Result<Model<Buffer>>> {
831 let buffer_store = cx.weak_model();
832 self.update(cx, |_, cx| {
833 let load_buffer = worktree.update(cx, |worktree, cx| {
834 let load_file = worktree.load_file(path.as_ref(), cx);
835 let reservation = cx.reserve_model();
836 let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
837 cx.spawn(move |_, mut cx| async move {
838 let loaded = load_file.await?;
839 let text_buffer = cx
840 .background_executor()
841 .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
842 .await;
843 cx.insert_model(reservation, |_| {
844 Buffer::build(
845 text_buffer,
846 loaded.diff_base,
847 Some(loaded.file),
848 Capability::ReadWrite,
849 )
850 })
851 })
852 });
853
854 cx.spawn(move |this, mut cx| async move {
855 let buffer = match load_buffer.await {
856 Ok(buffer) => Ok(buffer),
857 Err(error) if is_not_found_error(&error) => cx.new_model(|cx| {
858 let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
859 let text_buffer = text::Buffer::new(0, buffer_id, "".into());
860 Buffer::build(
861 text_buffer,
862 None,
863 Some(Arc::new(File {
864 worktree,
865 path,
866 disk_state: DiskState::New,
867 entry_id: None,
868 is_local: true,
869 is_private: false,
870 })),
871 Capability::ReadWrite,
872 )
873 }),
874 Err(e) => Err(e),
875 }?;
876 this.update(&mut cx, |this, cx| {
877 buffer_store.update(cx, |buffer_store, cx| {
878 buffer_store.add_buffer(buffer.clone(), cx)
879 })??;
880 let buffer_id = buffer.read(cx).remote_id();
881 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
882 this.local_buffer_ids_by_path.insert(
883 ProjectPath {
884 worktree_id: file.worktree_id(cx),
885 path: file.path.clone(),
886 },
887 buffer_id,
888 );
889
890 if let Some(entry_id) = file.entry_id {
891 this.local_buffer_ids_by_entry_id
892 .insert(entry_id, buffer_id);
893 }
894 }
895
896 anyhow::Ok(())
897 })??;
898
899 Ok(buffer)
900 })
901 })
902 }
903
904 fn create_buffer(&self, cx: &mut ModelContext<BufferStore>) -> Task<Result<Model<Buffer>>> {
905 cx.spawn(|buffer_store, mut cx| async move {
906 let buffer = cx.new_model(|cx| {
907 Buffer::local("", cx).with_language(language::PLAIN_TEXT.clone(), cx)
908 })?;
909 buffer_store.update(&mut cx, |buffer_store, cx| {
910 buffer_store.add_buffer(buffer.clone(), cx).log_err();
911 })?;
912 Ok(buffer)
913 })
914 }
915
916 fn reload_buffers(
917 &self,
918 buffers: HashSet<Model<Buffer>>,
919 push_to_history: bool,
920 cx: &mut ModelContext<BufferStore>,
921 ) -> Task<Result<ProjectTransaction>> {
922 cx.spawn(move |_, mut cx| async move {
923 let mut project_transaction = ProjectTransaction::default();
924 for buffer in buffers {
925 let transaction = buffer
926 .update(&mut cx, |buffer, cx| buffer.reload(cx))?
927 .await?;
928 buffer.update(&mut cx, |buffer, cx| {
929 if let Some(transaction) = transaction {
930 if !push_to_history {
931 buffer.forget_transaction(transaction.id);
932 }
933 project_transaction.0.insert(cx.handle(), transaction);
934 }
935 })?;
936 }
937
938 Ok(project_transaction)
939 })
940 }
941}
942
943impl BufferStore {
944 pub fn init(client: &AnyProtoClient) {
945 client.add_model_message_handler(Self::handle_buffer_reloaded);
946 client.add_model_message_handler(Self::handle_buffer_saved);
947 client.add_model_message_handler(Self::handle_update_buffer_file);
948 client.add_model_message_handler(Self::handle_update_diff_base);
949 client.add_model_request_handler(Self::handle_save_buffer);
950 client.add_model_request_handler(Self::handle_blame_buffer);
951 client.add_model_request_handler(Self::handle_reload_buffers);
952 client.add_model_request_handler(Self::handle_get_permalink_to_line);
953 }
954
955 /// Creates a buffer store, optionally retaining its buffers.
956 pub fn local(worktree_store: Model<WorktreeStore>, cx: &mut ModelContext<Self>) -> Self {
957 let this = cx.weak_model();
958 Self {
959 state: Box::new(cx.new_model(|cx| {
960 let subscription = cx.subscribe(
961 &worktree_store,
962 |this: &mut LocalBufferStore, _, event, cx| {
963 if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
964 this.subscribe_to_worktree(worktree, cx);
965 }
966 },
967 );
968
969 LocalBufferStore {
970 local_buffer_ids_by_path: Default::default(),
971 local_buffer_ids_by_entry_id: Default::default(),
972 buffer_store: this,
973 worktree_store: worktree_store.clone(),
974 _subscription: subscription,
975 }
976 })),
977 downstream_client: None,
978 opened_buffers: Default::default(),
979 shared_buffers: Default::default(),
980 loading_buffers_by_path: Default::default(),
981 worktree_store,
982 }
983 }
984
985 pub fn remote(
986 worktree_store: Model<WorktreeStore>,
987 upstream_client: AnyProtoClient,
988 remote_id: u64,
989 cx: &mut ModelContext<Self>,
990 ) -> Self {
991 let this = cx.weak_model();
992 Self {
993 state: Box::new(cx.new_model(|_| RemoteBufferStore {
994 shared_with_me: Default::default(),
995 loading_remote_buffers_by_id: Default::default(),
996 remote_buffer_listeners: Default::default(),
997 project_id: remote_id,
998 upstream_client,
999 worktree_store: worktree_store.clone(),
1000 buffer_store: this,
1001 })),
1002 downstream_client: None,
1003 opened_buffers: Default::default(),
1004 loading_buffers_by_path: Default::default(),
1005 shared_buffers: Default::default(),
1006 worktree_store,
1007 }
1008 }
1009
1010 pub fn open_buffer(
1011 &mut self,
1012 project_path: ProjectPath,
1013 cx: &mut ModelContext<Self>,
1014 ) -> Task<Result<Model<Buffer>>> {
1015 let existing_buffer = self.get_by_path(&project_path, cx);
1016 if let Some(existing_buffer) = existing_buffer {
1017 return Task::ready(Ok(existing_buffer));
1018 }
1019
1020 let Some(worktree) = self
1021 .worktree_store
1022 .read(cx)
1023 .worktree_for_id(project_path.worktree_id, cx)
1024 else {
1025 return Task::ready(Err(anyhow!("no such worktree")));
1026 };
1027
1028 let loading_watch = match self.loading_buffers_by_path.entry(project_path.clone()) {
1029 // If the given path is already being loaded, then wait for that existing
1030 // task to complete and return the same buffer.
1031 hash_map::Entry::Occupied(e) => e.get().clone(),
1032
1033 // Otherwise, record the fact that this path is now being loaded.
1034 hash_map::Entry::Vacant(entry) => {
1035 let (mut tx, rx) = postage::watch::channel();
1036 entry.insert(rx.clone());
1037
1038 let project_path = project_path.clone();
1039 let load_buffer = self
1040 .state
1041 .open_buffer(project_path.path.clone(), worktree, cx);
1042
1043 cx.spawn(move |this, mut cx| async move {
1044 let load_result = load_buffer.await;
1045 *tx.borrow_mut() = Some(this.update(&mut cx, |this, _cx| {
1046 // Record the fact that the buffer is no longer loading.
1047 this.loading_buffers_by_path.remove(&project_path);
1048 let buffer = load_result.map_err(Arc::new)?;
1049 Ok(buffer)
1050 })?);
1051 anyhow::Ok(())
1052 })
1053 .detach();
1054 rx
1055 }
1056 };
1057
1058 cx.background_executor().spawn(async move {
1059 Self::wait_for_loading_buffer(loading_watch)
1060 .await
1061 .map_err(|e| e.cloned())
1062 })
1063 }
1064
1065 pub fn create_buffer(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<Model<Buffer>>> {
1066 self.state.create_buffer(cx)
1067 }
1068
1069 pub fn save_buffer(
1070 &mut self,
1071 buffer: Model<Buffer>,
1072 cx: &mut ModelContext<Self>,
1073 ) -> Task<Result<()>> {
1074 self.state.save_buffer(buffer, cx)
1075 }
1076
1077 pub fn save_buffer_as(
1078 &mut self,
1079 buffer: Model<Buffer>,
1080 path: ProjectPath,
1081 cx: &mut ModelContext<Self>,
1082 ) -> Task<Result<()>> {
1083 let old_file = buffer.read(cx).file().cloned();
1084 let task = self.state.save_buffer_as(buffer.clone(), path, cx);
1085 cx.spawn(|this, mut cx| async move {
1086 task.await?;
1087 this.update(&mut cx, |_, cx| {
1088 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1089 })
1090 })
1091 }
1092
1093 pub fn blame_buffer(
1094 &self,
1095 buffer: &Model<Buffer>,
1096 version: Option<clock::Global>,
1097 cx: &AppContext,
1098 ) -> Task<Result<Option<Blame>>> {
1099 let buffer = buffer.read(cx);
1100 let Some(file) = File::from_dyn(buffer.file()) else {
1101 return Task::ready(Err(anyhow!("buffer has no file")));
1102 };
1103
1104 match file.worktree.clone().read(cx) {
1105 Worktree::Local(worktree) => {
1106 let worktree = worktree.snapshot();
1107 let blame_params = maybe!({
1108 let (repo_entry, local_repo_entry) = match worktree.repo_for_path(&file.path) {
1109 Some(repo_for_path) => repo_for_path,
1110 None => return Ok(None),
1111 };
1112
1113 let relative_path = repo_entry
1114 .relativize(&worktree, &file.path)
1115 .context("failed to relativize buffer path")?;
1116
1117 let repo = local_repo_entry.repo().clone();
1118
1119 let content = match version {
1120 Some(version) => buffer.rope_for_version(&version).clone(),
1121 None => buffer.as_rope().clone(),
1122 };
1123
1124 anyhow::Ok(Some((repo, relative_path, content)))
1125 });
1126
1127 cx.background_executor().spawn(async move {
1128 let Some((repo, relative_path, content)) = blame_params? else {
1129 return Ok(None);
1130 };
1131 repo.blame(&relative_path, content)
1132 .with_context(|| format!("Failed to blame {:?}", relative_path.0))
1133 .map(Some)
1134 })
1135 }
1136 Worktree::Remote(worktree) => {
1137 let buffer_id = buffer.remote_id();
1138 let version = buffer.version();
1139 let project_id = worktree.project_id();
1140 let client = worktree.client();
1141 cx.spawn(|_| async move {
1142 let response = client
1143 .request(proto::BlameBuffer {
1144 project_id,
1145 buffer_id: buffer_id.into(),
1146 version: serialize_version(&version),
1147 })
1148 .await?;
1149 Ok(deserialize_blame_buffer_response(response))
1150 })
1151 }
1152 }
1153 }
1154
1155 pub fn get_permalink_to_line(
1156 &self,
1157 buffer: &Model<Buffer>,
1158 selection: Range<u32>,
1159 cx: &AppContext,
1160 ) -> Task<Result<url::Url>> {
1161 let buffer = buffer.read(cx);
1162 let Some(file) = File::from_dyn(buffer.file()) else {
1163 return Task::ready(Err(anyhow!("buffer has no file")));
1164 };
1165
1166 match file.worktree.clone().read(cx) {
1167 Worktree::Local(worktree) => {
1168 let Some(repo) = worktree.local_git_repo(file.path()) else {
1169 return Task::ready(Err(anyhow!("no repository for buffer found")));
1170 };
1171
1172 let path = file.path().clone();
1173
1174 cx.spawn(|cx| async move {
1175 const REMOTE_NAME: &str = "origin";
1176 let origin_url = repo
1177 .remote_url(REMOTE_NAME)
1178 .ok_or_else(|| anyhow!("remote \"{REMOTE_NAME}\" not found"))?;
1179
1180 let sha = repo
1181 .head_sha()
1182 .ok_or_else(|| anyhow!("failed to read HEAD SHA"))?;
1183
1184 let provider_registry =
1185 cx.update(GitHostingProviderRegistry::default_global)?;
1186
1187 let (provider, remote) =
1188 parse_git_remote_url(provider_registry, &origin_url)
1189 .ok_or_else(|| anyhow!("failed to parse Git remote URL"))?;
1190
1191 let path = path
1192 .to_str()
1193 .context("failed to convert buffer path to string")?;
1194
1195 Ok(provider.build_permalink(
1196 remote,
1197 BuildPermalinkParams {
1198 sha: &sha,
1199 path,
1200 selection: Some(selection),
1201 },
1202 ))
1203 })
1204 }
1205 Worktree::Remote(worktree) => {
1206 let buffer_id = buffer.remote_id();
1207 let project_id = worktree.project_id();
1208 let client = worktree.client();
1209 cx.spawn(|_| async move {
1210 let response = client
1211 .request(proto::GetPermalinkToLine {
1212 project_id,
1213 buffer_id: buffer_id.into(),
1214 selection: Some(proto::Range {
1215 start: selection.start as u64,
1216 end: selection.end as u64,
1217 }),
1218 })
1219 .await?;
1220
1221 url::Url::parse(&response.permalink).context("failed to parse permalink")
1222 })
1223 }
1224 }
1225 }
1226
1227 fn add_buffer(&mut self, buffer: Model<Buffer>, cx: &mut ModelContext<Self>) -> Result<()> {
1228 let remote_id = buffer.read(cx).remote_id();
1229 let is_remote = buffer.read(cx).replica_id() != 0;
1230 let open_buffer = OpenBuffer::Buffer(buffer.downgrade());
1231
1232 let handle = cx.handle().downgrade();
1233 buffer.update(cx, move |_, cx| {
1234 cx.on_release(move |buffer, cx| {
1235 handle
1236 .update(cx, |_, cx| {
1237 cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
1238 })
1239 .ok();
1240 })
1241 .detach()
1242 });
1243
1244 match self.opened_buffers.entry(remote_id) {
1245 hash_map::Entry::Vacant(entry) => {
1246 entry.insert(open_buffer);
1247 }
1248 hash_map::Entry::Occupied(mut entry) => {
1249 if let OpenBuffer::Operations(operations) = entry.get_mut() {
1250 buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
1251 } else if entry.get().upgrade().is_some() {
1252 if is_remote {
1253 return Ok(());
1254 } else {
1255 debug_panic!("buffer {} was already registered", remote_id);
1256 Err(anyhow!("buffer {} was already registered", remote_id))?;
1257 }
1258 }
1259 entry.insert(open_buffer);
1260 }
1261 }
1262
1263 cx.subscribe(&buffer, Self::on_buffer_event).detach();
1264 cx.emit(BufferStoreEvent::BufferAdded(buffer));
1265 Ok(())
1266 }
1267
1268 pub fn buffers(&self) -> impl '_ + Iterator<Item = Model<Buffer>> {
1269 self.opened_buffers
1270 .values()
1271 .filter_map(|buffer| buffer.upgrade())
1272 }
1273
1274 pub fn loading_buffers(
1275 &self,
1276 ) -> impl Iterator<
1277 Item = (
1278 &ProjectPath,
1279 postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
1280 ),
1281 > {
1282 self.loading_buffers_by_path
1283 .iter()
1284 .map(|(path, rx)| (path, rx.clone()))
1285 }
1286
1287 pub fn get_by_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Model<Buffer>> {
1288 self.buffers().find_map(|buffer| {
1289 let file = File::from_dyn(buffer.read(cx).file())?;
1290 if file.worktree_id(cx) == path.worktree_id && file.path == path.path {
1291 Some(buffer)
1292 } else {
1293 None
1294 }
1295 })
1296 }
1297
1298 pub fn get(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
1299 self.opened_buffers
1300 .get(&buffer_id)
1301 .and_then(|buffer| buffer.upgrade())
1302 }
1303
1304 pub fn get_existing(&self, buffer_id: BufferId) -> Result<Model<Buffer>> {
1305 self.get(buffer_id)
1306 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
1307 }
1308
1309 pub fn get_possibly_incomplete(
1310 &self,
1311 buffer_id: BufferId,
1312 cx: &AppContext,
1313 ) -> Option<Model<Buffer>> {
1314 self.get(buffer_id).or_else(|| {
1315 self.state.as_remote().and_then(|remote| {
1316 remote
1317 .read(cx)
1318 .loading_remote_buffers_by_id
1319 .get(&buffer_id)
1320 .cloned()
1321 })
1322 })
1323 }
1324
1325 pub fn buffer_version_info(
1326 &self,
1327 cx: &AppContext,
1328 ) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
1329 let buffers = self
1330 .buffers()
1331 .map(|buffer| {
1332 let buffer = buffer.read(cx);
1333 proto::BufferVersion {
1334 id: buffer.remote_id().into(),
1335 version: language::proto::serialize_version(&buffer.version),
1336 }
1337 })
1338 .collect();
1339 let incomplete_buffer_ids = self
1340 .state
1341 .as_remote()
1342 .map(|remote| remote.read(cx).incomplete_buffer_ids())
1343 .unwrap_or_default();
1344 (buffers, incomplete_buffer_ids)
1345 }
1346
1347 pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
1348 for open_buffer in self.opened_buffers.values_mut() {
1349 if let Some(buffer) = open_buffer.upgrade() {
1350 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1351 }
1352 }
1353
1354 for buffer in self.buffers() {
1355 buffer.update(cx, |buffer, cx| {
1356 buffer.set_capability(Capability::ReadOnly, cx)
1357 });
1358 }
1359
1360 if let Some(remote) = self.state.as_remote() {
1361 remote.update(cx, |remote, _| {
1362 // Wake up all futures currently waiting on a buffer to get opened,
1363 // to give them a chance to fail now that we've disconnected.
1364 remote.remote_buffer_listeners.clear()
1365 })
1366 }
1367 }
1368
1369 pub fn shared(
1370 &mut self,
1371 remote_id: u64,
1372 downstream_client: AnyProtoClient,
1373 _cx: &mut AppContext,
1374 ) {
1375 self.downstream_client = Some((downstream_client, remote_id));
1376 }
1377
1378 pub fn unshared(&mut self, _cx: &mut ModelContext<Self>) {
1379 self.downstream_client.take();
1380 self.forget_shared_buffers();
1381 }
1382
1383 pub fn discard_incomplete(&mut self) {
1384 self.opened_buffers
1385 .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
1386 }
1387
1388 pub fn find_search_candidates(
1389 &mut self,
1390 query: &SearchQuery,
1391 mut limit: usize,
1392 fs: Arc<dyn Fs>,
1393 cx: &mut ModelContext<Self>,
1394 ) -> Receiver<Model<Buffer>> {
1395 let (tx, rx) = smol::channel::unbounded();
1396 let mut open_buffers = HashSet::default();
1397 let mut unnamed_buffers = Vec::new();
1398 for handle in self.buffers() {
1399 let buffer = handle.read(cx);
1400 if let Some(entry_id) = buffer.entry_id(cx) {
1401 open_buffers.insert(entry_id);
1402 } else {
1403 limit = limit.saturating_sub(1);
1404 unnamed_buffers.push(handle)
1405 };
1406 }
1407
1408 const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
1409 let mut project_paths_rx = self
1410 .worktree_store
1411 .update(cx, |worktree_store, cx| {
1412 worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
1413 })
1414 .chunks(MAX_CONCURRENT_BUFFER_OPENS);
1415
1416 cx.spawn(|this, mut cx| async move {
1417 for buffer in unnamed_buffers {
1418 tx.send(buffer).await.ok();
1419 }
1420
1421 while let Some(project_paths) = project_paths_rx.next().await {
1422 let buffers = this.update(&mut cx, |this, cx| {
1423 project_paths
1424 .into_iter()
1425 .map(|project_path| this.open_buffer(project_path, cx))
1426 .collect::<Vec<_>>()
1427 })?;
1428 for buffer_task in buffers {
1429 if let Some(buffer) = buffer_task.await.log_err() {
1430 if tx.send(buffer).await.is_err() {
1431 return anyhow::Ok(());
1432 }
1433 }
1434 }
1435 }
1436 anyhow::Ok(())
1437 })
1438 .detach();
1439 rx
1440 }
1441
1442 fn on_buffer_event(
1443 &mut self,
1444 buffer: Model<Buffer>,
1445 event: &BufferEvent,
1446 cx: &mut ModelContext<Self>,
1447 ) {
1448 match event {
1449 BufferEvent::FileHandleChanged => {
1450 if let Some(local) = self.state.as_local() {
1451 local.update(cx, |local, cx| {
1452 local.buffer_changed_file(buffer, cx);
1453 })
1454 }
1455 }
1456 BufferEvent::Reloaded => {
1457 let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
1458 return;
1459 };
1460 let buffer = buffer.read(cx);
1461 downstream_client
1462 .send(proto::BufferReloaded {
1463 project_id: *project_id,
1464 buffer_id: buffer.remote_id().to_proto(),
1465 version: serialize_version(&buffer.version()),
1466 mtime: buffer.saved_mtime().map(|t| t.into()),
1467 line_ending: serialize_line_ending(buffer.line_ending()) as i32,
1468 })
1469 .log_err();
1470 }
1471 _ => {}
1472 }
1473 }
1474
1475 pub async fn handle_update_buffer(
1476 this: Model<Self>,
1477 envelope: TypedEnvelope<proto::UpdateBuffer>,
1478 mut cx: AsyncAppContext,
1479 ) -> Result<proto::Ack> {
1480 let payload = envelope.payload.clone();
1481 let buffer_id = BufferId::new(payload.buffer_id)?;
1482 let ops = payload
1483 .operations
1484 .into_iter()
1485 .map(language::proto::deserialize_operation)
1486 .collect::<Result<Vec<_>, _>>()?;
1487 this.update(&mut cx, |this, cx| {
1488 match this.opened_buffers.entry(buffer_id) {
1489 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
1490 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
1491 OpenBuffer::Buffer(buffer) => {
1492 if let Some(buffer) = buffer.upgrade() {
1493 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
1494 }
1495 }
1496 },
1497 hash_map::Entry::Vacant(e) => {
1498 e.insert(OpenBuffer::Operations(ops));
1499 }
1500 }
1501 Ok(proto::Ack {})
1502 })?
1503 }
1504
1505 pub fn handle_synchronize_buffers(
1506 &mut self,
1507 envelope: TypedEnvelope<proto::SynchronizeBuffers>,
1508 cx: &mut ModelContext<Self>,
1509 client: Arc<Client>,
1510 ) -> Result<proto::SynchronizeBuffersResponse> {
1511 let project_id = envelope.payload.project_id;
1512 let mut response = proto::SynchronizeBuffersResponse {
1513 buffers: Default::default(),
1514 };
1515 let Some(guest_id) = envelope.original_sender_id else {
1516 anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
1517 };
1518
1519 self.shared_buffers.entry(guest_id).or_default().clear();
1520 for buffer in envelope.payload.buffers {
1521 let buffer_id = BufferId::new(buffer.id)?;
1522 let remote_version = language::proto::deserialize_version(&buffer.version);
1523 if let Some(buffer) = self.get(buffer_id) {
1524 self.shared_buffers
1525 .entry(guest_id)
1526 .or_default()
1527 .insert(buffer.clone());
1528
1529 let buffer = buffer.read(cx);
1530 response.buffers.push(proto::BufferVersion {
1531 id: buffer_id.into(),
1532 version: language::proto::serialize_version(&buffer.version),
1533 });
1534
1535 let operations = buffer.serialize_ops(Some(remote_version), cx);
1536 let client = client.clone();
1537 if let Some(file) = buffer.file() {
1538 client
1539 .send(proto::UpdateBufferFile {
1540 project_id,
1541 buffer_id: buffer_id.into(),
1542 file: Some(file.to_proto(cx)),
1543 })
1544 .log_err();
1545 }
1546
1547 client
1548 .send(proto::UpdateDiffBase {
1549 project_id,
1550 buffer_id: buffer_id.into(),
1551 diff_base: buffer.diff_base().map(ToString::to_string),
1552 })
1553 .log_err();
1554
1555 client
1556 .send(proto::BufferReloaded {
1557 project_id,
1558 buffer_id: buffer_id.into(),
1559 version: language::proto::serialize_version(buffer.saved_version()),
1560 mtime: buffer.saved_mtime().map(|time| time.into()),
1561 line_ending: language::proto::serialize_line_ending(buffer.line_ending())
1562 as i32,
1563 })
1564 .log_err();
1565
1566 cx.background_executor()
1567 .spawn(
1568 async move {
1569 let operations = operations.await;
1570 for chunk in split_operations(operations) {
1571 client
1572 .request(proto::UpdateBuffer {
1573 project_id,
1574 buffer_id: buffer_id.into(),
1575 operations: chunk,
1576 })
1577 .await?;
1578 }
1579 anyhow::Ok(())
1580 }
1581 .log_err(),
1582 )
1583 .detach();
1584 }
1585 }
1586 Ok(response)
1587 }
1588
1589 pub fn handle_create_buffer_for_peer(
1590 &mut self,
1591 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
1592 replica_id: u16,
1593 capability: Capability,
1594 cx: &mut ModelContext<Self>,
1595 ) -> Result<()> {
1596 let Some(remote) = self.state.as_remote() else {
1597 return Err(anyhow!("buffer store is not a remote"));
1598 };
1599
1600 if let Some(buffer) = remote.update(cx, |remote, cx| {
1601 remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx)
1602 })? {
1603 self.add_buffer(buffer, cx)?;
1604 }
1605
1606 Ok(())
1607 }
1608
1609 pub async fn handle_update_buffer_file(
1610 this: Model<Self>,
1611 envelope: TypedEnvelope<proto::UpdateBufferFile>,
1612 mut cx: AsyncAppContext,
1613 ) -> Result<()> {
1614 let buffer_id = envelope.payload.buffer_id;
1615 let buffer_id = BufferId::new(buffer_id)?;
1616
1617 this.update(&mut cx, |this, cx| {
1618 let payload = envelope.payload.clone();
1619 if let Some(buffer) = this.get_possibly_incomplete(buffer_id, cx) {
1620 let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
1621 let worktree = this
1622 .worktree_store
1623 .read(cx)
1624 .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
1625 .ok_or_else(|| anyhow!("no such worktree"))?;
1626 let file = File::from_proto(file, worktree, cx)?;
1627 let old_file = buffer.update(cx, |buffer, cx| {
1628 let old_file = buffer.file().cloned();
1629 let new_path = file.path.clone();
1630 buffer.file_updated(Arc::new(file), cx);
1631 if old_file
1632 .as_ref()
1633 .map_or(true, |old| *old.path() != new_path)
1634 {
1635 Some(old_file)
1636 } else {
1637 None
1638 }
1639 });
1640 if let Some(old_file) = old_file {
1641 cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1642 }
1643 }
1644 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1645 downstream_client
1646 .send(proto::UpdateBufferFile {
1647 project_id: *project_id,
1648 buffer_id: buffer_id.into(),
1649 file: envelope.payload.file,
1650 })
1651 .log_err();
1652 }
1653 Ok(())
1654 })?
1655 }
1656
1657 pub async fn handle_update_diff_base(
1658 this: Model<Self>,
1659 envelope: TypedEnvelope<proto::UpdateDiffBase>,
1660 mut cx: AsyncAppContext,
1661 ) -> Result<()> {
1662 this.update(&mut cx, |this, cx| {
1663 let buffer_id = envelope.payload.buffer_id;
1664 let buffer_id = BufferId::new(buffer_id)?;
1665 if let Some(buffer) = this.get_possibly_incomplete(buffer_id, cx) {
1666 buffer.update(cx, |buffer, cx| {
1667 buffer.set_diff_base(envelope.payload.diff_base.clone(), cx)
1668 });
1669 }
1670 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1671 downstream_client
1672 .send(proto::UpdateDiffBase {
1673 project_id: *project_id,
1674 buffer_id: buffer_id.into(),
1675 diff_base: envelope.payload.diff_base,
1676 })
1677 .log_err();
1678 }
1679 Ok(())
1680 })?
1681 }
1682
1683 pub async fn handle_save_buffer(
1684 this: Model<Self>,
1685 envelope: TypedEnvelope<proto::SaveBuffer>,
1686 mut cx: AsyncAppContext,
1687 ) -> Result<proto::BufferSaved> {
1688 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1689 let (buffer, project_id) = this.update(&mut cx, |this, _| {
1690 anyhow::Ok((
1691 this.get_existing(buffer_id)?,
1692 this.downstream_client
1693 .as_ref()
1694 .map(|(_, project_id)| *project_id)
1695 .context("project is not shared")?,
1696 ))
1697 })??;
1698 buffer
1699 .update(&mut cx, |buffer, _| {
1700 buffer.wait_for_version(deserialize_version(&envelope.payload.version))
1701 })?
1702 .await?;
1703 let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
1704
1705 if let Some(new_path) = envelope.payload.new_path {
1706 let new_path = ProjectPath::from_proto(new_path);
1707 this.update(&mut cx, |this, cx| {
1708 this.save_buffer_as(buffer.clone(), new_path, cx)
1709 })?
1710 .await?;
1711 } else {
1712 this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
1713 .await?;
1714 }
1715
1716 buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
1717 project_id,
1718 buffer_id: buffer_id.into(),
1719 version: serialize_version(buffer.saved_version()),
1720 mtime: buffer.saved_mtime().map(|time| time.into()),
1721 })
1722 }
1723
1724 pub async fn handle_close_buffer(
1725 this: Model<Self>,
1726 envelope: TypedEnvelope<proto::CloseBuffer>,
1727 mut cx: AsyncAppContext,
1728 ) -> Result<()> {
1729 let peer_id = envelope.sender_id;
1730 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1731 this.update(&mut cx, |this, _| {
1732 if let Some(buffer) = this.get(buffer_id) {
1733 if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
1734 if shared.remove(&buffer) {
1735 if shared.is_empty() {
1736 this.shared_buffers.remove(&peer_id);
1737 }
1738 return;
1739 }
1740 }
1741 };
1742 debug_panic!(
1743 "peer_id {} closed buffer_id {} which was either not open or already closed",
1744 peer_id,
1745 buffer_id
1746 )
1747 })
1748 }
1749
1750 pub async fn handle_buffer_saved(
1751 this: Model<Self>,
1752 envelope: TypedEnvelope<proto::BufferSaved>,
1753 mut cx: AsyncAppContext,
1754 ) -> Result<()> {
1755 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1756 let version = deserialize_version(&envelope.payload.version);
1757 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1758 this.update(&mut cx, move |this, cx| {
1759 if let Some(buffer) = this.get_possibly_incomplete(buffer_id, cx) {
1760 buffer.update(cx, |buffer, cx| {
1761 buffer.did_save(version, mtime, cx);
1762 });
1763 }
1764
1765 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1766 downstream_client
1767 .send(proto::BufferSaved {
1768 project_id: *project_id,
1769 buffer_id: buffer_id.into(),
1770 mtime: envelope.payload.mtime,
1771 version: envelope.payload.version,
1772 })
1773 .log_err();
1774 }
1775 })
1776 }
1777
1778 pub async fn handle_buffer_reloaded(
1779 this: Model<Self>,
1780 envelope: TypedEnvelope<proto::BufferReloaded>,
1781 mut cx: AsyncAppContext,
1782 ) -> Result<()> {
1783 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1784 let version = deserialize_version(&envelope.payload.version);
1785 let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1786 let line_ending = deserialize_line_ending(
1787 proto::LineEnding::from_i32(envelope.payload.line_ending)
1788 .ok_or_else(|| anyhow!("missing line ending"))?,
1789 );
1790 this.update(&mut cx, |this, cx| {
1791 if let Some(buffer) = this.get_possibly_incomplete(buffer_id, cx) {
1792 buffer.update(cx, |buffer, cx| {
1793 buffer.did_reload(version, line_ending, mtime, cx);
1794 });
1795 }
1796
1797 if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1798 downstream_client
1799 .send(proto::BufferReloaded {
1800 project_id: *project_id,
1801 buffer_id: buffer_id.into(),
1802 mtime: envelope.payload.mtime,
1803 version: envelope.payload.version,
1804 line_ending: envelope.payload.line_ending,
1805 })
1806 .log_err();
1807 }
1808 })
1809 }
1810
1811 pub async fn handle_blame_buffer(
1812 this: Model<Self>,
1813 envelope: TypedEnvelope<proto::BlameBuffer>,
1814 mut cx: AsyncAppContext,
1815 ) -> Result<proto::BlameBufferResponse> {
1816 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1817 let version = deserialize_version(&envelope.payload.version);
1818 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1819 buffer
1820 .update(&mut cx, |buffer, _| {
1821 buffer.wait_for_version(version.clone())
1822 })?
1823 .await?;
1824 let blame = this
1825 .update(&mut cx, |this, cx| {
1826 this.blame_buffer(&buffer, Some(version), cx)
1827 })?
1828 .await?;
1829 Ok(serialize_blame_buffer_response(blame))
1830 }
1831
1832 pub async fn handle_get_permalink_to_line(
1833 this: Model<Self>,
1834 envelope: TypedEnvelope<proto::GetPermalinkToLine>,
1835 mut cx: AsyncAppContext,
1836 ) -> Result<proto::GetPermalinkToLineResponse> {
1837 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1838 // let version = deserialize_version(&envelope.payload.version);
1839 let selection = {
1840 let proto_selection = envelope
1841 .payload
1842 .selection
1843 .context("no selection to get permalink for defined")?;
1844 proto_selection.start as u32..proto_selection.end as u32
1845 };
1846 let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1847 let permalink = this
1848 .update(&mut cx, |this, cx| {
1849 this.get_permalink_to_line(&buffer, selection, cx)
1850 })?
1851 .await?;
1852 Ok(proto::GetPermalinkToLineResponse {
1853 permalink: permalink.to_string(),
1854 })
1855 }
1856
1857 pub async fn wait_for_loading_buffer(
1858 mut receiver: postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
1859 ) -> Result<Model<Buffer>, Arc<anyhow::Error>> {
1860 loop {
1861 if let Some(result) = receiver.borrow().as_ref() {
1862 match result {
1863 Ok(buffer) => return Ok(buffer.to_owned()),
1864 Err(e) => return Err(e.to_owned()),
1865 }
1866 }
1867 receiver.next().await;
1868 }
1869 }
1870
1871 pub fn reload_buffers(
1872 &self,
1873 buffers: HashSet<Model<Buffer>>,
1874 push_to_history: bool,
1875 cx: &mut ModelContext<Self>,
1876 ) -> Task<Result<ProjectTransaction>> {
1877 if buffers.is_empty() {
1878 return Task::ready(Ok(ProjectTransaction::default()));
1879 }
1880
1881 self.state.reload_buffers(buffers, push_to_history, cx)
1882 }
1883
1884 async fn handle_reload_buffers(
1885 this: Model<Self>,
1886 envelope: TypedEnvelope<proto::ReloadBuffers>,
1887 mut cx: AsyncAppContext,
1888 ) -> Result<proto::ReloadBuffersResponse> {
1889 let sender_id = envelope.original_sender_id().unwrap_or_default();
1890 let reload = this.update(&mut cx, |this, cx| {
1891 let mut buffers = HashSet::default();
1892 for buffer_id in &envelope.payload.buffer_ids {
1893 let buffer_id = BufferId::new(*buffer_id)?;
1894 buffers.insert(this.get_existing(buffer_id)?);
1895 }
1896 Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
1897 })??;
1898
1899 let project_transaction = reload.await?;
1900 let project_transaction = this.update(&mut cx, |this, cx| {
1901 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
1902 })?;
1903 Ok(proto::ReloadBuffersResponse {
1904 transaction: Some(project_transaction),
1905 })
1906 }
1907
1908 pub fn create_buffer_for_peer(
1909 &mut self,
1910 buffer: &Model<Buffer>,
1911 peer_id: proto::PeerId,
1912 cx: &mut ModelContext<Self>,
1913 ) -> Task<Result<()>> {
1914 let buffer_id = buffer.read(cx).remote_id();
1915 if !self
1916 .shared_buffers
1917 .entry(peer_id)
1918 .or_default()
1919 .insert(buffer.clone())
1920 {
1921 return Task::ready(Ok(()));
1922 }
1923
1924 let Some((client, project_id)) = self.downstream_client.clone() else {
1925 return Task::ready(Ok(()));
1926 };
1927
1928 cx.spawn(|this, mut cx| async move {
1929 let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else {
1930 return anyhow::Ok(());
1931 };
1932
1933 let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
1934 let operations = operations.await;
1935 let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?;
1936
1937 let initial_state = proto::CreateBufferForPeer {
1938 project_id,
1939 peer_id: Some(peer_id),
1940 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
1941 };
1942
1943 if client.send(initial_state).log_err().is_some() {
1944 let client = client.clone();
1945 cx.background_executor()
1946 .spawn(async move {
1947 let mut chunks = split_operations(operations).peekable();
1948 while let Some(chunk) = chunks.next() {
1949 let is_last = chunks.peek().is_none();
1950 client.send(proto::CreateBufferForPeer {
1951 project_id,
1952 peer_id: Some(peer_id),
1953 variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
1954 proto::BufferChunk {
1955 buffer_id: buffer_id.into(),
1956 operations: chunk,
1957 is_last,
1958 },
1959 )),
1960 })?;
1961 }
1962 anyhow::Ok(())
1963 })
1964 .await
1965 .log_err();
1966 }
1967 Ok(())
1968 })
1969 }
1970
1971 pub fn forget_shared_buffers(&mut self) {
1972 self.shared_buffers.clear();
1973 }
1974
1975 pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
1976 self.shared_buffers.remove(peer_id);
1977 }
1978
1979 pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
1980 if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
1981 self.shared_buffers.insert(new_peer_id, buffers);
1982 }
1983 }
1984
1985 pub fn shared_buffers(&self) -> &HashMap<proto::PeerId, HashSet<Model<Buffer>>> {
1986 &self.shared_buffers
1987 }
1988
1989 pub fn create_local_buffer(
1990 &mut self,
1991 text: &str,
1992 language: Option<Arc<Language>>,
1993 cx: &mut ModelContext<Self>,
1994 ) -> Model<Buffer> {
1995 let buffer = cx.new_model(|cx| {
1996 Buffer::local(text, cx)
1997 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
1998 });
1999
2000 self.add_buffer(buffer.clone(), cx).log_err();
2001 let buffer_id = buffer.read(cx).remote_id();
2002
2003 let local = self
2004 .state
2005 .as_local()
2006 .expect("local-only method called in a non-local context");
2007 local.update(cx, |this, cx| {
2008 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
2009 this.local_buffer_ids_by_path.insert(
2010 ProjectPath {
2011 worktree_id: file.worktree_id(cx),
2012 path: file.path.clone(),
2013 },
2014 buffer_id,
2015 );
2016
2017 if let Some(entry_id) = file.entry_id {
2018 this.local_buffer_ids_by_entry_id
2019 .insert(entry_id, buffer_id);
2020 }
2021 }
2022 });
2023 buffer
2024 }
2025
2026 pub fn deserialize_project_transaction(
2027 &mut self,
2028 message: proto::ProjectTransaction,
2029 push_to_history: bool,
2030 cx: &mut ModelContext<Self>,
2031 ) -> Task<Result<ProjectTransaction>> {
2032 if let Some(remote) = self.state.as_remote() {
2033 remote.update(cx, |remote, cx| {
2034 remote.deserialize_project_transaction(message, push_to_history, cx)
2035 })
2036 } else {
2037 debug_panic!("not a remote buffer store");
2038 Task::ready(Err(anyhow!("not a remote buffer store")))
2039 }
2040 }
2041
2042 pub fn wait_for_remote_buffer(
2043 &self,
2044 id: BufferId,
2045 cx: &mut AppContext,
2046 ) -> Task<Result<Model<Buffer>>> {
2047 if let Some(remote) = self.state.as_remote() {
2048 remote.update(cx, |remote, cx| remote.wait_for_remote_buffer(id, cx))
2049 } else {
2050 debug_panic!("not a remote buffer store");
2051 Task::ready(Err(anyhow!("not a remote buffer store")))
2052 }
2053 }
2054
2055 pub fn serialize_project_transaction_for_peer(
2056 &mut self,
2057 project_transaction: ProjectTransaction,
2058 peer_id: proto::PeerId,
2059 cx: &mut ModelContext<Self>,
2060 ) -> proto::ProjectTransaction {
2061 let mut serialized_transaction = proto::ProjectTransaction {
2062 buffer_ids: Default::default(),
2063 transactions: Default::default(),
2064 };
2065 for (buffer, transaction) in project_transaction.0 {
2066 self.create_buffer_for_peer(&buffer, peer_id, cx)
2067 .detach_and_log_err(cx);
2068 serialized_transaction
2069 .buffer_ids
2070 .push(buffer.read(cx).remote_id().into());
2071 serialized_transaction
2072 .transactions
2073 .push(language::proto::serialize_transaction(&transaction));
2074 }
2075 serialized_transaction
2076 }
2077}
2078
2079impl OpenBuffer {
2080 fn upgrade(&self) -> Option<Model<Buffer>> {
2081 match self {
2082 OpenBuffer::Buffer(handle) => handle.upgrade(),
2083 OpenBuffer::Operations(_) => None,
2084 }
2085 }
2086}
2087
2088fn is_not_found_error(error: &anyhow::Error) -> bool {
2089 error
2090 .root_cause()
2091 .downcast_ref::<io::Error>()
2092 .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
2093}
2094
2095fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
2096 let Some(blame) = blame else {
2097 return proto::BlameBufferResponse {
2098 blame_response: None,
2099 };
2100 };
2101
2102 let entries = blame
2103 .entries
2104 .into_iter()
2105 .map(|entry| proto::BlameEntry {
2106 sha: entry.sha.as_bytes().into(),
2107 start_line: entry.range.start,
2108 end_line: entry.range.end,
2109 original_line_number: entry.original_line_number,
2110 author: entry.author.clone(),
2111 author_mail: entry.author_mail.clone(),
2112 author_time: entry.author_time,
2113 author_tz: entry.author_tz.clone(),
2114 committer: entry.committer.clone(),
2115 committer_mail: entry.committer_mail.clone(),
2116 committer_time: entry.committer_time,
2117 committer_tz: entry.committer_tz.clone(),
2118 summary: entry.summary.clone(),
2119 previous: entry.previous.clone(),
2120 filename: entry.filename.clone(),
2121 })
2122 .collect::<Vec<_>>();
2123
2124 let messages = blame
2125 .messages
2126 .into_iter()
2127 .map(|(oid, message)| proto::CommitMessage {
2128 oid: oid.as_bytes().into(),
2129 message,
2130 })
2131 .collect::<Vec<_>>();
2132
2133 let permalinks = blame
2134 .permalinks
2135 .into_iter()
2136 .map(|(oid, url)| proto::CommitPermalink {
2137 oid: oid.as_bytes().into(),
2138 permalink: url.to_string(),
2139 })
2140 .collect::<Vec<_>>();
2141
2142 proto::BlameBufferResponse {
2143 blame_response: Some(proto::blame_buffer_response::BlameResponse {
2144 entries,
2145 messages,
2146 permalinks,
2147 remote_url: blame.remote_url,
2148 }),
2149 }
2150}
2151
2152fn deserialize_blame_buffer_response(
2153 response: proto::BlameBufferResponse,
2154) -> Option<git::blame::Blame> {
2155 let response = response.blame_response?;
2156 let entries = response
2157 .entries
2158 .into_iter()
2159 .filter_map(|entry| {
2160 Some(git::blame::BlameEntry {
2161 sha: git::Oid::from_bytes(&entry.sha).ok()?,
2162 range: entry.start_line..entry.end_line,
2163 original_line_number: entry.original_line_number,
2164 committer: entry.committer,
2165 committer_time: entry.committer_time,
2166 committer_tz: entry.committer_tz,
2167 committer_mail: entry.committer_mail,
2168 author: entry.author,
2169 author_mail: entry.author_mail,
2170 author_time: entry.author_time,
2171 author_tz: entry.author_tz,
2172 summary: entry.summary,
2173 previous: entry.previous,
2174 filename: entry.filename,
2175 })
2176 })
2177 .collect::<Vec<_>>();
2178
2179 let messages = response
2180 .messages
2181 .into_iter()
2182 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
2183 .collect::<HashMap<_, _>>();
2184
2185 let permalinks = response
2186 .permalinks
2187 .into_iter()
2188 .filter_map(|permalink| {
2189 Some((
2190 git::Oid::from_bytes(&permalink.oid).ok()?,
2191 Url::from_str(&permalink.permalink).ok()?,
2192 ))
2193 })
2194 .collect::<HashMap<_, _>>();
2195
2196 Some(Blame {
2197 entries,
2198 permalinks,
2199 messages,
2200 remote_url: response.remote_url,
2201 })
2202}