1mod ignore;
2mod lsp_command;
3pub mod project_settings;
4pub mod search;
5pub mod terminals;
6pub mod worktree;
7
8#[cfg(test)]
9mod project_tests;
10
11use anyhow::{anyhow, Context, Result};
12use client::{proto, Client, TypedEnvelope, UserStore};
13use clock::ReplicaId;
14use collections::{hash_map, BTreeMap, HashMap, HashSet};
15use copilot::Copilot;
16use futures::{
17 channel::{
18 mpsc::{self, UnboundedReceiver},
19 oneshot,
20 },
21 future::{try_join_all, Shared},
22 stream::FuturesUnordered,
23 AsyncWriteExt, Future, FutureExt, StreamExt, TryFutureExt,
24};
25use globset::{Glob, GlobSet, GlobSetBuilder};
26use gpui::{
27 AnyModelHandle, AppContext, AsyncAppContext, BorrowAppContext, Entity, ModelContext,
28 ModelHandle, Task, WeakModelHandle,
29};
30use language::{
31 language_settings::{language_settings, FormatOnSave, Formatter},
32 point_to_lsp,
33 proto::{
34 deserialize_anchor, deserialize_fingerprint, deserialize_line_ending, deserialize_version,
35 serialize_anchor, serialize_version,
36 },
37 range_from_lsp, range_to_lsp, Anchor, Bias, Buffer, CachedLspAdapter, CodeAction, CodeLabel,
38 Completion, Diagnostic, DiagnosticEntry, DiagnosticSet, Diff, Event as BufferEvent, File as _,
39 Language, LanguageRegistry, LanguageServerName, LocalFile, OffsetRangeExt, Operation, Patch,
40 PendingLanguageServer, PointUtf16, RopeFingerprint, TextBufferSnapshot, ToOffset, ToPointUtf16,
41 Transaction, Unclipped,
42};
43use log::error;
44use lsp::{
45 DiagnosticSeverity, DiagnosticTag, DidChangeWatchedFilesRegistrationOptions,
46 DocumentHighlightKind, LanguageServer, LanguageServerId,
47};
48use lsp_command::*;
49use postage::watch;
50use project_settings::ProjectSettings;
51use rand::prelude::*;
52use search::SearchQuery;
53use serde::Serialize;
54use settings::SettingsStore;
55use sha2::{Digest, Sha256};
56use similar::{ChangeTag, TextDiff};
57use std::{
58 cell::RefCell,
59 cmp::{self, Ordering},
60 convert::TryInto,
61 hash::Hash,
62 mem,
63 num::NonZeroU32,
64 ops::Range,
65 path::{Component, Path, PathBuf},
66 rc::Rc,
67 str,
68 sync::{
69 atomic::{AtomicUsize, Ordering::SeqCst},
70 Arc,
71 },
72 time::{Duration, Instant, SystemTime},
73};
74use terminals::Terminals;
75use util::{
76 debug_panic, defer, merge_json_value_into, paths::LOCAL_SETTINGS_RELATIVE_PATH, post_inc,
77 ResultExt, TryFutureExt as _,
78};
79
80pub use fs::*;
81pub use worktree::*;
82
83pub trait Item {
84 fn entry_id(&self, cx: &AppContext) -> Option<ProjectEntryId>;
85 fn project_path(&self, cx: &AppContext) -> Option<ProjectPath>;
86}
87
88// Language server state is stored across 3 collections:
89// language_servers =>
90// a mapping from unique server id to LanguageServerState which can either be a task for a
91// server in the process of starting, or a running server with adapter and language server arcs
92// language_server_ids => a mapping from worktreeId and server name to the unique server id
93// language_server_statuses => a mapping from unique server id to the current server status
94//
95// Multiple worktrees can map to the same language server for example when you jump to the definition
96// of a file in the standard library. So language_server_ids is used to look up which server is active
97// for a given worktree and language server name
98//
99// When starting a language server, first the id map is checked to make sure a server isn't already available
100// for that worktree. If there is one, it finishes early. Otherwise, a new id is allocated and and
101// the Starting variant of LanguageServerState is stored in the language_servers map.
102pub struct Project {
103 worktrees: Vec<WorktreeHandle>,
104 active_entry: Option<ProjectEntryId>,
105 buffer_ordered_messages_tx: mpsc::UnboundedSender<BufferOrderedMessage>,
106 languages: Arc<LanguageRegistry>,
107 language_servers: HashMap<LanguageServerId, LanguageServerState>,
108 language_server_ids: HashMap<(WorktreeId, LanguageServerName), LanguageServerId>,
109 language_server_statuses: BTreeMap<LanguageServerId, LanguageServerStatus>,
110 last_workspace_edits_by_language_server: HashMap<LanguageServerId, ProjectTransaction>,
111 client: Arc<client::Client>,
112 next_entry_id: Arc<AtomicUsize>,
113 join_project_response_message_id: u32,
114 next_diagnostic_group_id: usize,
115 user_store: ModelHandle<UserStore>,
116 fs: Arc<dyn Fs>,
117 client_state: Option<ProjectClientState>,
118 collaborators: HashMap<proto::PeerId, Collaborator>,
119 client_subscriptions: Vec<client::Subscription>,
120 _subscriptions: Vec<gpui::Subscription>,
121 next_buffer_id: u64,
122 opened_buffer: (watch::Sender<()>, watch::Receiver<()>),
123 shared_buffers: HashMap<proto::PeerId, HashSet<u64>>,
124 #[allow(clippy::type_complexity)]
125 loading_buffers_by_path: HashMap<
126 ProjectPath,
127 postage::watch::Receiver<Option<Result<ModelHandle<Buffer>, Arc<anyhow::Error>>>>,
128 >,
129 #[allow(clippy::type_complexity)]
130 loading_local_worktrees:
131 HashMap<Arc<Path>, Shared<Task<Result<ModelHandle<Worktree>, Arc<anyhow::Error>>>>>,
132 opened_buffers: HashMap<u64, OpenBuffer>,
133 local_buffer_ids_by_path: HashMap<ProjectPath, u64>,
134 local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, u64>,
135 /// A mapping from a buffer ID to None means that we've started waiting for an ID but haven't finished loading it.
136 /// Used for re-issuing buffer requests when peers temporarily disconnect
137 incomplete_remote_buffers: HashMap<u64, Option<ModelHandle<Buffer>>>,
138 buffer_snapshots: HashMap<u64, HashMap<LanguageServerId, Vec<LspBufferSnapshot>>>, // buffer_id -> server_id -> vec of snapshots
139 buffers_being_formatted: HashSet<u64>,
140 buffers_needing_diff: HashSet<WeakModelHandle<Buffer>>,
141 git_diff_debouncer: DelayedDebounced,
142 nonce: u128,
143 _maintain_buffer_languages: Task<()>,
144 _maintain_workspace_config: Task<()>,
145 terminals: Terminals,
146 copilot_enabled: bool,
147}
148
149struct DelayedDebounced {
150 task: Option<Task<()>>,
151 cancel_channel: Option<oneshot::Sender<()>>,
152}
153
154impl DelayedDebounced {
155 fn new() -> DelayedDebounced {
156 DelayedDebounced {
157 task: None,
158 cancel_channel: None,
159 }
160 }
161
162 fn fire_new<F>(&mut self, delay: Duration, cx: &mut ModelContext<Project>, func: F)
163 where
164 F: 'static + FnOnce(&mut Project, &mut ModelContext<Project>) -> Task<()>,
165 {
166 if let Some(channel) = self.cancel_channel.take() {
167 _ = channel.send(());
168 }
169
170 let (sender, mut receiver) = oneshot::channel::<()>();
171 self.cancel_channel = Some(sender);
172
173 let previous_task = self.task.take();
174 self.task = Some(cx.spawn(|workspace, mut cx| async move {
175 let mut timer = cx.background().timer(delay).fuse();
176 if let Some(previous_task) = previous_task {
177 previous_task.await;
178 }
179
180 futures::select_biased! {
181 _ = receiver => return,
182 _ = timer => {}
183 }
184
185 workspace
186 .update(&mut cx, |workspace, cx| (func)(workspace, cx))
187 .await;
188 }));
189 }
190}
191
192struct LspBufferSnapshot {
193 version: i32,
194 snapshot: TextBufferSnapshot,
195}
196
197/// Message ordered with respect to buffer operations
198enum BufferOrderedMessage {
199 Operation {
200 buffer_id: u64,
201 operation: proto::Operation,
202 },
203 LanguageServerUpdate {
204 language_server_id: LanguageServerId,
205 message: proto::update_language_server::Variant,
206 },
207 Resync,
208}
209
210enum LocalProjectUpdate {
211 WorktreesChanged,
212 CreateBufferForPeer {
213 peer_id: proto::PeerId,
214 buffer_id: u64,
215 },
216}
217
218enum OpenBuffer {
219 Strong(ModelHandle<Buffer>),
220 Weak(WeakModelHandle<Buffer>),
221 Operations(Vec<Operation>),
222}
223
224enum WorktreeHandle {
225 Strong(ModelHandle<Worktree>),
226 Weak(WeakModelHandle<Worktree>),
227}
228
229enum ProjectClientState {
230 Local {
231 remote_id: u64,
232 updates_tx: mpsc::UnboundedSender<LocalProjectUpdate>,
233 _send_updates: Task<()>,
234 },
235 Remote {
236 sharing_has_stopped: bool,
237 remote_id: u64,
238 replica_id: ReplicaId,
239 },
240}
241
242#[derive(Clone, Debug)]
243pub struct Collaborator {
244 pub peer_id: proto::PeerId,
245 pub replica_id: ReplicaId,
246}
247
248#[derive(Clone, Debug, PartialEq, Eq)]
249pub enum Event {
250 LanguageServerAdded(LanguageServerId),
251 LanguageServerRemoved(LanguageServerId),
252 ActiveEntryChanged(Option<ProjectEntryId>),
253 WorktreeAdded,
254 WorktreeRemoved(WorktreeId),
255 DiskBasedDiagnosticsStarted {
256 language_server_id: LanguageServerId,
257 },
258 DiskBasedDiagnosticsFinished {
259 language_server_id: LanguageServerId,
260 },
261 DiagnosticsUpdated {
262 path: ProjectPath,
263 language_server_id: LanguageServerId,
264 },
265 RemoteIdChanged(Option<u64>),
266 DisconnectedFromHost,
267 Closed,
268 DeletedEntry(ProjectEntryId),
269 CollaboratorUpdated {
270 old_peer_id: proto::PeerId,
271 new_peer_id: proto::PeerId,
272 },
273 CollaboratorLeft(proto::PeerId),
274}
275
276pub enum LanguageServerState {
277 Starting(Task<Option<Arc<LanguageServer>>>),
278 Running {
279 language: Arc<Language>,
280 adapter: Arc<CachedLspAdapter>,
281 server: Arc<LanguageServer>,
282 watched_paths: HashMap<WorktreeId, GlobSet>,
283 simulate_disk_based_diagnostics_completion: Option<Task<()>>,
284 },
285}
286
287#[derive(Serialize)]
288pub struct LanguageServerStatus {
289 pub name: String,
290 pub pending_work: BTreeMap<String, LanguageServerProgress>,
291 pub has_pending_diagnostic_updates: bool,
292 progress_tokens: HashSet<String>,
293}
294
295#[derive(Clone, Debug, Serialize)]
296pub struct LanguageServerProgress {
297 pub message: Option<String>,
298 pub percentage: Option<usize>,
299 #[serde(skip_serializing)]
300 pub last_update_at: Instant,
301}
302
303#[derive(Clone, Debug, Eq, PartialEq, Hash, PartialOrd, Ord)]
304pub struct ProjectPath {
305 pub worktree_id: WorktreeId,
306 pub path: Arc<Path>,
307}
308
309#[derive(Copy, Clone, Debug, Default, PartialEq, Serialize)]
310pub struct DiagnosticSummary {
311 pub error_count: usize,
312 pub warning_count: usize,
313}
314
315#[derive(Debug, Clone)]
316pub struct Location {
317 pub buffer: ModelHandle<Buffer>,
318 pub range: Range<language::Anchor>,
319}
320
321#[derive(Debug, Clone)]
322pub struct LocationLink {
323 pub origin: Option<Location>,
324 pub target: Location,
325}
326
327#[derive(Debug)]
328pub struct DocumentHighlight {
329 pub range: Range<language::Anchor>,
330 pub kind: DocumentHighlightKind,
331}
332
333#[derive(Clone, Debug)]
334pub struct Symbol {
335 pub language_server_name: LanguageServerName,
336 pub source_worktree_id: WorktreeId,
337 pub path: ProjectPath,
338 pub label: CodeLabel,
339 pub name: String,
340 pub kind: lsp::SymbolKind,
341 pub range: Range<Unclipped<PointUtf16>>,
342 pub signature: [u8; 32],
343}
344
345#[derive(Clone, Debug, PartialEq)]
346pub struct HoverBlock {
347 pub text: String,
348 pub kind: HoverBlockKind,
349}
350
351#[derive(Clone, Debug, PartialEq)]
352pub enum HoverBlockKind {
353 PlainText,
354 Markdown,
355 Code { language: String },
356}
357
358#[derive(Debug)]
359pub struct Hover {
360 pub contents: Vec<HoverBlock>,
361 pub range: Option<Range<language::Anchor>>,
362}
363
364#[derive(Default)]
365pub struct ProjectTransaction(pub HashMap<ModelHandle<Buffer>, language::Transaction>);
366
367impl DiagnosticSummary {
368 fn new<'a, T: 'a>(diagnostics: impl IntoIterator<Item = &'a DiagnosticEntry<T>>) -> Self {
369 let mut this = Self {
370 error_count: 0,
371 warning_count: 0,
372 };
373
374 for entry in diagnostics {
375 if entry.diagnostic.is_primary {
376 match entry.diagnostic.severity {
377 DiagnosticSeverity::ERROR => this.error_count += 1,
378 DiagnosticSeverity::WARNING => this.warning_count += 1,
379 _ => {}
380 }
381 }
382 }
383
384 this
385 }
386
387 pub fn is_empty(&self) -> bool {
388 self.error_count == 0 && self.warning_count == 0
389 }
390
391 pub fn to_proto(
392 &self,
393 language_server_id: LanguageServerId,
394 path: &Path,
395 ) -> proto::DiagnosticSummary {
396 proto::DiagnosticSummary {
397 path: path.to_string_lossy().to_string(),
398 language_server_id: language_server_id.0 as u64,
399 error_count: self.error_count as u32,
400 warning_count: self.warning_count as u32,
401 }
402 }
403}
404
405#[derive(Clone, Copy, Debug, Default, Hash, PartialEq, Eq, PartialOrd, Ord)]
406pub struct ProjectEntryId(usize);
407
408impl ProjectEntryId {
409 pub const MAX: Self = Self(usize::MAX);
410
411 pub fn new(counter: &AtomicUsize) -> Self {
412 Self(counter.fetch_add(1, SeqCst))
413 }
414
415 pub fn from_proto(id: u64) -> Self {
416 Self(id as usize)
417 }
418
419 pub fn to_proto(&self) -> u64 {
420 self.0 as u64
421 }
422
423 pub fn to_usize(&self) -> usize {
424 self.0
425 }
426}
427
428#[derive(Debug, Clone, Copy, PartialEq, Eq)]
429pub enum FormatTrigger {
430 Save,
431 Manual,
432}
433
434impl FormatTrigger {
435 fn from_proto(value: i32) -> FormatTrigger {
436 match value {
437 0 => FormatTrigger::Save,
438 1 => FormatTrigger::Manual,
439 _ => FormatTrigger::Save,
440 }
441 }
442}
443
444impl Project {
445 pub fn init_settings(cx: &mut AppContext) {
446 settings::register::<ProjectSettings>(cx);
447 }
448
449 pub fn init(client: &Arc<Client>, cx: &mut AppContext) {
450 Self::init_settings(cx);
451
452 client.add_model_message_handler(Self::handle_add_collaborator);
453 client.add_model_message_handler(Self::handle_update_project_collaborator);
454 client.add_model_message_handler(Self::handle_remove_collaborator);
455 client.add_model_message_handler(Self::handle_buffer_reloaded);
456 client.add_model_message_handler(Self::handle_buffer_saved);
457 client.add_model_message_handler(Self::handle_start_language_server);
458 client.add_model_message_handler(Self::handle_update_language_server);
459 client.add_model_message_handler(Self::handle_update_project);
460 client.add_model_message_handler(Self::handle_unshare_project);
461 client.add_model_message_handler(Self::handle_create_buffer_for_peer);
462 client.add_model_message_handler(Self::handle_update_buffer_file);
463 client.add_model_request_handler(Self::handle_update_buffer);
464 client.add_model_message_handler(Self::handle_update_diagnostic_summary);
465 client.add_model_message_handler(Self::handle_update_worktree);
466 client.add_model_message_handler(Self::handle_update_worktree_settings);
467 client.add_model_request_handler(Self::handle_create_project_entry);
468 client.add_model_request_handler(Self::handle_rename_project_entry);
469 client.add_model_request_handler(Self::handle_copy_project_entry);
470 client.add_model_request_handler(Self::handle_delete_project_entry);
471 client.add_model_request_handler(Self::handle_apply_additional_edits_for_completion);
472 client.add_model_request_handler(Self::handle_apply_code_action);
473 client.add_model_request_handler(Self::handle_on_type_formatting);
474 client.add_model_request_handler(Self::handle_reload_buffers);
475 client.add_model_request_handler(Self::handle_synchronize_buffers);
476 client.add_model_request_handler(Self::handle_format_buffers);
477 client.add_model_request_handler(Self::handle_lsp_command::<GetCodeActions>);
478 client.add_model_request_handler(Self::handle_lsp_command::<GetCompletions>);
479 client.add_model_request_handler(Self::handle_lsp_command::<GetHover>);
480 client.add_model_request_handler(Self::handle_lsp_command::<GetDefinition>);
481 client.add_model_request_handler(Self::handle_lsp_command::<GetTypeDefinition>);
482 client.add_model_request_handler(Self::handle_lsp_command::<GetDocumentHighlights>);
483 client.add_model_request_handler(Self::handle_lsp_command::<GetReferences>);
484 client.add_model_request_handler(Self::handle_lsp_command::<PrepareRename>);
485 client.add_model_request_handler(Self::handle_lsp_command::<PerformRename>);
486 client.add_model_request_handler(Self::handle_search_project);
487 client.add_model_request_handler(Self::handle_get_project_symbols);
488 client.add_model_request_handler(Self::handle_open_buffer_for_symbol);
489 client.add_model_request_handler(Self::handle_open_buffer_by_id);
490 client.add_model_request_handler(Self::handle_open_buffer_by_path);
491 client.add_model_request_handler(Self::handle_save_buffer);
492 client.add_model_message_handler(Self::handle_update_diff_base);
493 }
494
495 pub fn local(
496 client: Arc<Client>,
497 user_store: ModelHandle<UserStore>,
498 languages: Arc<LanguageRegistry>,
499 fs: Arc<dyn Fs>,
500 cx: &mut AppContext,
501 ) -> ModelHandle<Self> {
502 cx.add_model(|cx: &mut ModelContext<Self>| {
503 let (tx, rx) = mpsc::unbounded();
504 cx.spawn_weak(|this, cx| Self::send_buffer_ordered_messages(this, rx, cx))
505 .detach();
506 Self {
507 worktrees: Default::default(),
508 buffer_ordered_messages_tx: tx,
509 collaborators: Default::default(),
510 next_buffer_id: 0,
511 opened_buffers: Default::default(),
512 shared_buffers: Default::default(),
513 incomplete_remote_buffers: Default::default(),
514 loading_buffers_by_path: Default::default(),
515 loading_local_worktrees: Default::default(),
516 local_buffer_ids_by_path: Default::default(),
517 local_buffer_ids_by_entry_id: Default::default(),
518 buffer_snapshots: Default::default(),
519 join_project_response_message_id: 0,
520 client_state: None,
521 opened_buffer: watch::channel(),
522 client_subscriptions: Vec::new(),
523 _subscriptions: vec![
524 cx.observe_global::<SettingsStore, _>(Self::on_settings_changed)
525 ],
526 _maintain_buffer_languages: Self::maintain_buffer_languages(&languages, cx),
527 _maintain_workspace_config: Self::maintain_workspace_config(languages.clone(), cx),
528 active_entry: None,
529 languages,
530 client,
531 user_store,
532 fs,
533 next_entry_id: Default::default(),
534 next_diagnostic_group_id: Default::default(),
535 language_servers: Default::default(),
536 language_server_ids: Default::default(),
537 language_server_statuses: Default::default(),
538 last_workspace_edits_by_language_server: Default::default(),
539 buffers_being_formatted: Default::default(),
540 buffers_needing_diff: Default::default(),
541 git_diff_debouncer: DelayedDebounced::new(),
542 nonce: StdRng::from_entropy().gen(),
543 terminals: Terminals {
544 local_handles: Vec::new(),
545 },
546 copilot_enabled: Copilot::global(cx).is_some(),
547 }
548 })
549 }
550
551 pub async fn remote(
552 remote_id: u64,
553 client: Arc<Client>,
554 user_store: ModelHandle<UserStore>,
555 languages: Arc<LanguageRegistry>,
556 fs: Arc<dyn Fs>,
557 mut cx: AsyncAppContext,
558 ) -> Result<ModelHandle<Self>> {
559 client.authenticate_and_connect(true, &cx).await?;
560
561 let subscription = client.subscribe_to_entity(remote_id)?;
562 let response = client
563 .request_envelope(proto::JoinProject {
564 project_id: remote_id,
565 })
566 .await?;
567 let this = cx.add_model(|cx| {
568 let replica_id = response.payload.replica_id as ReplicaId;
569
570 let mut worktrees = Vec::new();
571 for worktree in response.payload.worktrees {
572 let worktree = cx.update(|cx| {
573 Worktree::remote(remote_id, replica_id, worktree, client.clone(), cx)
574 });
575 worktrees.push(worktree);
576 }
577
578 let (tx, rx) = mpsc::unbounded();
579 cx.spawn_weak(|this, cx| Self::send_buffer_ordered_messages(this, rx, cx))
580 .detach();
581 let mut this = Self {
582 worktrees: Vec::new(),
583 buffer_ordered_messages_tx: tx,
584 loading_buffers_by_path: Default::default(),
585 next_buffer_id: 0,
586 opened_buffer: watch::channel(),
587 shared_buffers: Default::default(),
588 incomplete_remote_buffers: Default::default(),
589 loading_local_worktrees: Default::default(),
590 local_buffer_ids_by_path: Default::default(),
591 local_buffer_ids_by_entry_id: Default::default(),
592 active_entry: None,
593 collaborators: Default::default(),
594 join_project_response_message_id: response.message_id,
595 _maintain_buffer_languages: Self::maintain_buffer_languages(&languages, cx),
596 _maintain_workspace_config: Self::maintain_workspace_config(languages.clone(), cx),
597 languages,
598 user_store: user_store.clone(),
599 fs,
600 next_entry_id: Default::default(),
601 next_diagnostic_group_id: Default::default(),
602 client_subscriptions: Default::default(),
603 _subscriptions: Default::default(),
604 client: client.clone(),
605 client_state: Some(ProjectClientState::Remote {
606 sharing_has_stopped: false,
607 remote_id,
608 replica_id,
609 }),
610 language_servers: Default::default(),
611 language_server_ids: Default::default(),
612 language_server_statuses: response
613 .payload
614 .language_servers
615 .into_iter()
616 .map(|server| {
617 (
618 LanguageServerId(server.id as usize),
619 LanguageServerStatus {
620 name: server.name,
621 pending_work: Default::default(),
622 has_pending_diagnostic_updates: false,
623 progress_tokens: Default::default(),
624 },
625 )
626 })
627 .collect(),
628 last_workspace_edits_by_language_server: Default::default(),
629 opened_buffers: Default::default(),
630 buffers_being_formatted: Default::default(),
631 buffers_needing_diff: Default::default(),
632 git_diff_debouncer: DelayedDebounced::new(),
633 buffer_snapshots: Default::default(),
634 nonce: StdRng::from_entropy().gen(),
635 terminals: Terminals {
636 local_handles: Vec::new(),
637 },
638 copilot_enabled: Copilot::global(cx).is_some(),
639 };
640 for worktree in worktrees {
641 let _ = this.add_worktree(&worktree, cx);
642 }
643 this
644 });
645 let subscription = subscription.set_model(&this, &mut cx);
646
647 let user_ids = response
648 .payload
649 .collaborators
650 .iter()
651 .map(|peer| peer.user_id)
652 .collect();
653 user_store
654 .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))
655 .await?;
656
657 this.update(&mut cx, |this, cx| {
658 this.set_collaborators_from_proto(response.payload.collaborators, cx)?;
659 this.client_subscriptions.push(subscription);
660 anyhow::Ok(())
661 })?;
662
663 Ok(this)
664 }
665
666 #[cfg(any(test, feature = "test-support"))]
667 pub async fn test(
668 fs: Arc<dyn Fs>,
669 root_paths: impl IntoIterator<Item = &Path>,
670 cx: &mut gpui::TestAppContext,
671 ) -> ModelHandle<Project> {
672 let mut languages = LanguageRegistry::test();
673 languages.set_executor(cx.background());
674 let http_client = util::http::FakeHttpClient::with_404_response();
675 let client = cx.update(|cx| client::Client::new(http_client.clone(), cx));
676 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
677 let project =
678 cx.update(|cx| Project::local(client, user_store, Arc::new(languages), fs, cx));
679 for path in root_paths {
680 let (tree, _) = project
681 .update(cx, |project, cx| {
682 project.find_or_create_local_worktree(path, true, cx)
683 })
684 .await
685 .unwrap();
686 tree.read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
687 .await;
688 }
689 project
690 }
691
692 fn on_settings_changed(&mut self, cx: &mut ModelContext<Self>) {
693 let mut language_servers_to_start = Vec::new();
694 for buffer in self.opened_buffers.values() {
695 if let Some(buffer) = buffer.upgrade(cx) {
696 let buffer = buffer.read(cx);
697 if let Some((file, language)) = buffer.file().zip(buffer.language()) {
698 let settings = language_settings(Some(language), Some(file), cx);
699 if settings.enable_language_server {
700 if let Some(file) = File::from_dyn(Some(file)) {
701 language_servers_to_start
702 .push((file.worktree.clone(), language.clone()));
703 }
704 }
705 }
706 }
707 }
708
709 let mut language_servers_to_stop = Vec::new();
710 let languages = self.languages.to_vec();
711 for (worktree_id, started_lsp_name) in self.language_server_ids.keys() {
712 let language = languages.iter().find(|l| {
713 l.lsp_adapters()
714 .iter()
715 .any(|adapter| &adapter.name == started_lsp_name)
716 });
717 if let Some(language) = language {
718 let worktree = self.worktree_for_id(*worktree_id, cx);
719 let file = worktree.and_then(|tree| {
720 tree.update(cx, |tree, cx| tree.root_file(cx).map(|f| f as _))
721 });
722 if !language_settings(Some(language), file.as_ref(), cx).enable_language_server {
723 language_servers_to_stop.push((*worktree_id, started_lsp_name.clone()));
724 }
725 }
726 }
727
728 // Stop all newly-disabled language servers.
729 for (worktree_id, adapter_name) in language_servers_to_stop {
730 self.stop_language_server(worktree_id, adapter_name, cx)
731 .detach();
732 }
733
734 // Start all the newly-enabled language servers.
735 for (worktree, language) in language_servers_to_start {
736 let worktree_path = worktree.read(cx).abs_path();
737 self.start_language_servers(&worktree, worktree_path, language, cx);
738 }
739
740 if !self.copilot_enabled && Copilot::global(cx).is_some() {
741 self.copilot_enabled = true;
742 for buffer in self.opened_buffers.values() {
743 if let Some(buffer) = buffer.upgrade(cx) {
744 self.register_buffer_with_copilot(&buffer, cx);
745 }
746 }
747 }
748
749 cx.notify();
750 }
751
752 pub fn buffer_for_id(&self, remote_id: u64, cx: &AppContext) -> Option<ModelHandle<Buffer>> {
753 self.opened_buffers
754 .get(&remote_id)
755 .and_then(|buffer| buffer.upgrade(cx))
756 }
757
758 pub fn languages(&self) -> &Arc<LanguageRegistry> {
759 &self.languages
760 }
761
762 pub fn client(&self) -> Arc<Client> {
763 self.client.clone()
764 }
765
766 pub fn user_store(&self) -> ModelHandle<UserStore> {
767 self.user_store.clone()
768 }
769
770 #[cfg(any(test, feature = "test-support"))]
771 pub fn opened_buffers(&self, cx: &AppContext) -> Vec<ModelHandle<Buffer>> {
772 self.opened_buffers
773 .values()
774 .filter_map(|b| b.upgrade(cx))
775 .collect()
776 }
777
778 #[cfg(any(test, feature = "test-support"))]
779 pub fn has_open_buffer(&self, path: impl Into<ProjectPath>, cx: &AppContext) -> bool {
780 let path = path.into();
781 if let Some(worktree) = self.worktree_for_id(path.worktree_id, cx) {
782 self.opened_buffers.iter().any(|(_, buffer)| {
783 if let Some(buffer) = buffer.upgrade(cx) {
784 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
785 if file.worktree == worktree && file.path() == &path.path {
786 return true;
787 }
788 }
789 }
790 false
791 })
792 } else {
793 false
794 }
795 }
796
797 pub fn fs(&self) -> &Arc<dyn Fs> {
798 &self.fs
799 }
800
801 pub fn remote_id(&self) -> Option<u64> {
802 match self.client_state.as_ref()? {
803 ProjectClientState::Local { remote_id, .. }
804 | ProjectClientState::Remote { remote_id, .. } => Some(*remote_id),
805 }
806 }
807
808 pub fn replica_id(&self) -> ReplicaId {
809 match &self.client_state {
810 Some(ProjectClientState::Remote { replica_id, .. }) => *replica_id,
811 _ => 0,
812 }
813 }
814
815 fn metadata_changed(&mut self, cx: &mut ModelContext<Self>) {
816 if let Some(ProjectClientState::Local { updates_tx, .. }) = &mut self.client_state {
817 updates_tx
818 .unbounded_send(LocalProjectUpdate::WorktreesChanged)
819 .ok();
820 }
821 cx.notify();
822 }
823
824 pub fn collaborators(&self) -> &HashMap<proto::PeerId, Collaborator> {
825 &self.collaborators
826 }
827
828 /// Collect all worktrees, including ones that don't appear in the project panel
829 pub fn worktrees<'a>(
830 &'a self,
831 cx: &'a AppContext,
832 ) -> impl 'a + DoubleEndedIterator<Item = ModelHandle<Worktree>> {
833 self.worktrees
834 .iter()
835 .filter_map(move |worktree| worktree.upgrade(cx))
836 }
837
838 /// Collect all user-visible worktrees, the ones that appear in the project panel
839 pub fn visible_worktrees<'a>(
840 &'a self,
841 cx: &'a AppContext,
842 ) -> impl 'a + DoubleEndedIterator<Item = ModelHandle<Worktree>> {
843 self.worktrees.iter().filter_map(|worktree| {
844 worktree.upgrade(cx).and_then(|worktree| {
845 if worktree.read(cx).is_visible() {
846 Some(worktree)
847 } else {
848 None
849 }
850 })
851 })
852 }
853
854 pub fn worktree_root_names<'a>(&'a self, cx: &'a AppContext) -> impl Iterator<Item = &'a str> {
855 self.visible_worktrees(cx)
856 .map(|tree| tree.read(cx).root_name())
857 }
858
859 pub fn worktree_for_id(
860 &self,
861 id: WorktreeId,
862 cx: &AppContext,
863 ) -> Option<ModelHandle<Worktree>> {
864 self.worktrees(cx)
865 .find(|worktree| worktree.read(cx).id() == id)
866 }
867
868 pub fn worktree_for_entry(
869 &self,
870 entry_id: ProjectEntryId,
871 cx: &AppContext,
872 ) -> Option<ModelHandle<Worktree>> {
873 self.worktrees(cx)
874 .find(|worktree| worktree.read(cx).contains_entry(entry_id))
875 }
876
877 pub fn worktree_id_for_entry(
878 &self,
879 entry_id: ProjectEntryId,
880 cx: &AppContext,
881 ) -> Option<WorktreeId> {
882 self.worktree_for_entry(entry_id, cx)
883 .map(|worktree| worktree.read(cx).id())
884 }
885
886 pub fn contains_paths(&self, paths: &[PathBuf], cx: &AppContext) -> bool {
887 paths.iter().all(|path| self.contains_path(path, cx))
888 }
889
890 pub fn contains_path(&self, path: &Path, cx: &AppContext) -> bool {
891 for worktree in self.worktrees(cx) {
892 let worktree = worktree.read(cx).as_local();
893 if worktree.map_or(false, |w| w.contains_abs_path(path)) {
894 return true;
895 }
896 }
897 false
898 }
899
900 pub fn create_entry(
901 &mut self,
902 project_path: impl Into<ProjectPath>,
903 is_directory: bool,
904 cx: &mut ModelContext<Self>,
905 ) -> Option<Task<Result<Entry>>> {
906 let project_path = project_path.into();
907 let worktree = self.worktree_for_id(project_path.worktree_id, cx)?;
908 if self.is_local() {
909 Some(worktree.update(cx, |worktree, cx| {
910 worktree
911 .as_local_mut()
912 .unwrap()
913 .create_entry(project_path.path, is_directory, cx)
914 }))
915 } else {
916 let client = self.client.clone();
917 let project_id = self.remote_id().unwrap();
918 Some(cx.spawn_weak(|_, mut cx| async move {
919 let response = client
920 .request(proto::CreateProjectEntry {
921 worktree_id: project_path.worktree_id.to_proto(),
922 project_id,
923 path: project_path.path.to_string_lossy().into(),
924 is_directory,
925 })
926 .await?;
927 let entry = response
928 .entry
929 .ok_or_else(|| anyhow!("missing entry in response"))?;
930 worktree
931 .update(&mut cx, |worktree, cx| {
932 worktree.as_remote_mut().unwrap().insert_entry(
933 entry,
934 response.worktree_scan_id as usize,
935 cx,
936 )
937 })
938 .await
939 }))
940 }
941 }
942
943 pub fn copy_entry(
944 &mut self,
945 entry_id: ProjectEntryId,
946 new_path: impl Into<Arc<Path>>,
947 cx: &mut ModelContext<Self>,
948 ) -> Option<Task<Result<Entry>>> {
949 let worktree = self.worktree_for_entry(entry_id, cx)?;
950 let new_path = new_path.into();
951 if self.is_local() {
952 worktree.update(cx, |worktree, cx| {
953 worktree
954 .as_local_mut()
955 .unwrap()
956 .copy_entry(entry_id, new_path, cx)
957 })
958 } else {
959 let client = self.client.clone();
960 let project_id = self.remote_id().unwrap();
961
962 Some(cx.spawn_weak(|_, mut cx| async move {
963 let response = client
964 .request(proto::CopyProjectEntry {
965 project_id,
966 entry_id: entry_id.to_proto(),
967 new_path: new_path.to_string_lossy().into(),
968 })
969 .await?;
970 let entry = response
971 .entry
972 .ok_or_else(|| anyhow!("missing entry in response"))?;
973 worktree
974 .update(&mut cx, |worktree, cx| {
975 worktree.as_remote_mut().unwrap().insert_entry(
976 entry,
977 response.worktree_scan_id as usize,
978 cx,
979 )
980 })
981 .await
982 }))
983 }
984 }
985
986 pub fn rename_entry(
987 &mut self,
988 entry_id: ProjectEntryId,
989 new_path: impl Into<Arc<Path>>,
990 cx: &mut ModelContext<Self>,
991 ) -> Option<Task<Result<Entry>>> {
992 let worktree = self.worktree_for_entry(entry_id, cx)?;
993 let new_path = new_path.into();
994 if self.is_local() {
995 worktree.update(cx, |worktree, cx| {
996 worktree
997 .as_local_mut()
998 .unwrap()
999 .rename_entry(entry_id, new_path, cx)
1000 })
1001 } else {
1002 let client = self.client.clone();
1003 let project_id = self.remote_id().unwrap();
1004
1005 Some(cx.spawn_weak(|_, mut cx| async move {
1006 let response = client
1007 .request(proto::RenameProjectEntry {
1008 project_id,
1009 entry_id: entry_id.to_proto(),
1010 new_path: new_path.to_string_lossy().into(),
1011 })
1012 .await?;
1013 let entry = response
1014 .entry
1015 .ok_or_else(|| anyhow!("missing entry in response"))?;
1016 worktree
1017 .update(&mut cx, |worktree, cx| {
1018 worktree.as_remote_mut().unwrap().insert_entry(
1019 entry,
1020 response.worktree_scan_id as usize,
1021 cx,
1022 )
1023 })
1024 .await
1025 }))
1026 }
1027 }
1028
1029 pub fn delete_entry(
1030 &mut self,
1031 entry_id: ProjectEntryId,
1032 cx: &mut ModelContext<Self>,
1033 ) -> Option<Task<Result<()>>> {
1034 let worktree = self.worktree_for_entry(entry_id, cx)?;
1035
1036 cx.emit(Event::DeletedEntry(entry_id));
1037
1038 if self.is_local() {
1039 worktree.update(cx, |worktree, cx| {
1040 worktree.as_local_mut().unwrap().delete_entry(entry_id, cx)
1041 })
1042 } else {
1043 let client = self.client.clone();
1044 let project_id = self.remote_id().unwrap();
1045 Some(cx.spawn_weak(|_, mut cx| async move {
1046 let response = client
1047 .request(proto::DeleteProjectEntry {
1048 project_id,
1049 entry_id: entry_id.to_proto(),
1050 })
1051 .await?;
1052 worktree
1053 .update(&mut cx, move |worktree, cx| {
1054 worktree.as_remote_mut().unwrap().delete_entry(
1055 entry_id,
1056 response.worktree_scan_id as usize,
1057 cx,
1058 )
1059 })
1060 .await
1061 }))
1062 }
1063 }
1064
1065 pub fn shared(&mut self, project_id: u64, cx: &mut ModelContext<Self>) -> Result<()> {
1066 if self.client_state.is_some() {
1067 return Err(anyhow!("project was already shared"));
1068 }
1069 self.client_subscriptions.push(
1070 self.client
1071 .subscribe_to_entity(project_id)?
1072 .set_model(&cx.handle(), &mut cx.to_async()),
1073 );
1074
1075 for open_buffer in self.opened_buffers.values_mut() {
1076 match open_buffer {
1077 OpenBuffer::Strong(_) => {}
1078 OpenBuffer::Weak(buffer) => {
1079 if let Some(buffer) = buffer.upgrade(cx) {
1080 *open_buffer = OpenBuffer::Strong(buffer);
1081 }
1082 }
1083 OpenBuffer::Operations(_) => unreachable!(),
1084 }
1085 }
1086
1087 for worktree_handle in self.worktrees.iter_mut() {
1088 match worktree_handle {
1089 WorktreeHandle::Strong(_) => {}
1090 WorktreeHandle::Weak(worktree) => {
1091 if let Some(worktree) = worktree.upgrade(cx) {
1092 *worktree_handle = WorktreeHandle::Strong(worktree);
1093 }
1094 }
1095 }
1096 }
1097
1098 for (server_id, status) in &self.language_server_statuses {
1099 self.client
1100 .send(proto::StartLanguageServer {
1101 project_id,
1102 server: Some(proto::LanguageServer {
1103 id: server_id.0 as u64,
1104 name: status.name.clone(),
1105 }),
1106 })
1107 .log_err();
1108 }
1109
1110 let store = cx.global::<SettingsStore>();
1111 for worktree in self.worktrees(cx) {
1112 let worktree_id = worktree.read(cx).id().to_proto();
1113 for (path, content) in store.local_settings(worktree.id()) {
1114 self.client
1115 .send(proto::UpdateWorktreeSettings {
1116 project_id,
1117 worktree_id,
1118 path: path.to_string_lossy().into(),
1119 content: Some(content),
1120 })
1121 .log_err();
1122 }
1123 }
1124
1125 let (updates_tx, mut updates_rx) = mpsc::unbounded();
1126 let client = self.client.clone();
1127 self.client_state = Some(ProjectClientState::Local {
1128 remote_id: project_id,
1129 updates_tx,
1130 _send_updates: cx.spawn_weak(move |this, mut cx| async move {
1131 while let Some(update) = updates_rx.next().await {
1132 let Some(this) = this.upgrade(&cx) else { break };
1133
1134 match update {
1135 LocalProjectUpdate::WorktreesChanged => {
1136 let worktrees = this
1137 .read_with(&cx, |this, cx| this.worktrees(cx).collect::<Vec<_>>());
1138 let update_project = this
1139 .read_with(&cx, |this, cx| {
1140 this.client.request(proto::UpdateProject {
1141 project_id,
1142 worktrees: this.worktree_metadata_protos(cx),
1143 })
1144 })
1145 .await;
1146 if update_project.is_ok() {
1147 for worktree in worktrees {
1148 worktree.update(&mut cx, |worktree, cx| {
1149 let worktree = worktree.as_local_mut().unwrap();
1150 worktree.share(project_id, cx).detach_and_log_err(cx)
1151 });
1152 }
1153 }
1154 }
1155 LocalProjectUpdate::CreateBufferForPeer { peer_id, buffer_id } => {
1156 let buffer = this.update(&mut cx, |this, _| {
1157 let buffer = this.opened_buffers.get(&buffer_id).unwrap();
1158 let shared_buffers =
1159 this.shared_buffers.entry(peer_id).or_default();
1160 if shared_buffers.insert(buffer_id) {
1161 if let OpenBuffer::Strong(buffer) = buffer {
1162 Some(buffer.clone())
1163 } else {
1164 None
1165 }
1166 } else {
1167 None
1168 }
1169 });
1170
1171 let Some(buffer) = buffer else { continue };
1172 let operations =
1173 buffer.read_with(&cx, |b, cx| b.serialize_ops(None, cx));
1174 let operations = operations.await;
1175 let state = buffer.read_with(&cx, |buffer, _| buffer.to_proto());
1176
1177 let initial_state = proto::CreateBufferForPeer {
1178 project_id,
1179 peer_id: Some(peer_id),
1180 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
1181 };
1182 if client.send(initial_state).log_err().is_some() {
1183 let client = client.clone();
1184 cx.background()
1185 .spawn(async move {
1186 let mut chunks = split_operations(operations).peekable();
1187 while let Some(chunk) = chunks.next() {
1188 let is_last = chunks.peek().is_none();
1189 client.send(proto::CreateBufferForPeer {
1190 project_id,
1191 peer_id: Some(peer_id),
1192 variant: Some(
1193 proto::create_buffer_for_peer::Variant::Chunk(
1194 proto::BufferChunk {
1195 buffer_id,
1196 operations: chunk,
1197 is_last,
1198 },
1199 ),
1200 ),
1201 })?;
1202 }
1203 anyhow::Ok(())
1204 })
1205 .await
1206 .log_err();
1207 }
1208 }
1209 }
1210 }
1211 }),
1212 });
1213
1214 self.metadata_changed(cx);
1215 cx.emit(Event::RemoteIdChanged(Some(project_id)));
1216 cx.notify();
1217 Ok(())
1218 }
1219
1220 pub fn reshared(
1221 &mut self,
1222 message: proto::ResharedProject,
1223 cx: &mut ModelContext<Self>,
1224 ) -> Result<()> {
1225 self.shared_buffers.clear();
1226 self.set_collaborators_from_proto(message.collaborators, cx)?;
1227 self.metadata_changed(cx);
1228 Ok(())
1229 }
1230
1231 pub fn rejoined(
1232 &mut self,
1233 message: proto::RejoinedProject,
1234 message_id: u32,
1235 cx: &mut ModelContext<Self>,
1236 ) -> Result<()> {
1237 cx.update_global::<SettingsStore, _, _>(|store, cx| {
1238 for worktree in &self.worktrees {
1239 store
1240 .clear_local_settings(worktree.handle_id(), cx)
1241 .log_err();
1242 }
1243 });
1244
1245 self.join_project_response_message_id = message_id;
1246 self.set_worktrees_from_proto(message.worktrees, cx)?;
1247 self.set_collaborators_from_proto(message.collaborators, cx)?;
1248 self.language_server_statuses = message
1249 .language_servers
1250 .into_iter()
1251 .map(|server| {
1252 (
1253 LanguageServerId(server.id as usize),
1254 LanguageServerStatus {
1255 name: server.name,
1256 pending_work: Default::default(),
1257 has_pending_diagnostic_updates: false,
1258 progress_tokens: Default::default(),
1259 },
1260 )
1261 })
1262 .collect();
1263 self.buffer_ordered_messages_tx
1264 .unbounded_send(BufferOrderedMessage::Resync)
1265 .unwrap();
1266 cx.notify();
1267 Ok(())
1268 }
1269
1270 pub fn unshare(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1271 self.unshare_internal(cx)?;
1272 self.metadata_changed(cx);
1273 cx.notify();
1274 Ok(())
1275 }
1276
1277 fn unshare_internal(&mut self, cx: &mut AppContext) -> Result<()> {
1278 if self.is_remote() {
1279 return Err(anyhow!("attempted to unshare a remote project"));
1280 }
1281
1282 if let Some(ProjectClientState::Local { remote_id, .. }) = self.client_state.take() {
1283 self.collaborators.clear();
1284 self.shared_buffers.clear();
1285 self.client_subscriptions.clear();
1286
1287 for worktree_handle in self.worktrees.iter_mut() {
1288 if let WorktreeHandle::Strong(worktree) = worktree_handle {
1289 let is_visible = worktree.update(cx, |worktree, _| {
1290 worktree.as_local_mut().unwrap().unshare();
1291 worktree.is_visible()
1292 });
1293 if !is_visible {
1294 *worktree_handle = WorktreeHandle::Weak(worktree.downgrade());
1295 }
1296 }
1297 }
1298
1299 for open_buffer in self.opened_buffers.values_mut() {
1300 // Wake up any tasks waiting for peers' edits to this buffer.
1301 if let Some(buffer) = open_buffer.upgrade(cx) {
1302 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1303 }
1304
1305 if let OpenBuffer::Strong(buffer) = open_buffer {
1306 *open_buffer = OpenBuffer::Weak(buffer.downgrade());
1307 }
1308 }
1309
1310 self.client.send(proto::UnshareProject {
1311 project_id: remote_id,
1312 })?;
1313
1314 Ok(())
1315 } else {
1316 Err(anyhow!("attempted to unshare an unshared project"))
1317 }
1318 }
1319
1320 pub fn disconnected_from_host(&mut self, cx: &mut ModelContext<Self>) {
1321 self.disconnected_from_host_internal(cx);
1322 cx.emit(Event::DisconnectedFromHost);
1323 cx.notify();
1324 }
1325
1326 fn disconnected_from_host_internal(&mut self, cx: &mut AppContext) {
1327 if let Some(ProjectClientState::Remote {
1328 sharing_has_stopped,
1329 ..
1330 }) = &mut self.client_state
1331 {
1332 *sharing_has_stopped = true;
1333
1334 self.collaborators.clear();
1335
1336 for worktree in &self.worktrees {
1337 if let Some(worktree) = worktree.upgrade(cx) {
1338 worktree.update(cx, |worktree, _| {
1339 if let Some(worktree) = worktree.as_remote_mut() {
1340 worktree.disconnected_from_host();
1341 }
1342 });
1343 }
1344 }
1345
1346 for open_buffer in self.opened_buffers.values_mut() {
1347 // Wake up any tasks waiting for peers' edits to this buffer.
1348 if let Some(buffer) = open_buffer.upgrade(cx) {
1349 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1350 }
1351
1352 if let OpenBuffer::Strong(buffer) = open_buffer {
1353 *open_buffer = OpenBuffer::Weak(buffer.downgrade());
1354 }
1355 }
1356
1357 // Wake up all futures currently waiting on a buffer to get opened,
1358 // to give them a chance to fail now that we've disconnected.
1359 *self.opened_buffer.0.borrow_mut() = ();
1360 }
1361 }
1362
1363 pub fn close(&mut self, cx: &mut ModelContext<Self>) {
1364 cx.emit(Event::Closed);
1365 }
1366
1367 pub fn is_read_only(&self) -> bool {
1368 match &self.client_state {
1369 Some(ProjectClientState::Remote {
1370 sharing_has_stopped,
1371 ..
1372 }) => *sharing_has_stopped,
1373 _ => false,
1374 }
1375 }
1376
1377 pub fn is_local(&self) -> bool {
1378 match &self.client_state {
1379 Some(ProjectClientState::Remote { .. }) => false,
1380 _ => true,
1381 }
1382 }
1383
1384 pub fn is_remote(&self) -> bool {
1385 !self.is_local()
1386 }
1387
1388 pub fn create_buffer(
1389 &mut self,
1390 text: &str,
1391 language: Option<Arc<Language>>,
1392 cx: &mut ModelContext<Self>,
1393 ) -> Result<ModelHandle<Buffer>> {
1394 if self.is_remote() {
1395 return Err(anyhow!("creating buffers as a guest is not supported yet"));
1396 }
1397
1398 let buffer = cx.add_model(|cx| {
1399 Buffer::new(self.replica_id(), text, cx)
1400 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
1401 });
1402 self.register_buffer(&buffer, cx)?;
1403 Ok(buffer)
1404 }
1405
1406 pub fn open_path(
1407 &mut self,
1408 path: impl Into<ProjectPath>,
1409 cx: &mut ModelContext<Self>,
1410 ) -> Task<Result<(ProjectEntryId, AnyModelHandle)>> {
1411 let task = self.open_buffer(path, cx);
1412 cx.spawn_weak(|_, cx| async move {
1413 let buffer = task.await?;
1414 let project_entry_id = buffer
1415 .read_with(&cx, |buffer, cx| {
1416 File::from_dyn(buffer.file()).and_then(|file| file.project_entry_id(cx))
1417 })
1418 .ok_or_else(|| anyhow!("no project entry"))?;
1419
1420 let buffer: &AnyModelHandle = &buffer;
1421 Ok((project_entry_id, buffer.clone()))
1422 })
1423 }
1424
1425 pub fn open_local_buffer(
1426 &mut self,
1427 abs_path: impl AsRef<Path>,
1428 cx: &mut ModelContext<Self>,
1429 ) -> Task<Result<ModelHandle<Buffer>>> {
1430 if let Some((worktree, relative_path)) = self.find_local_worktree(abs_path.as_ref(), cx) {
1431 self.open_buffer((worktree.read(cx).id(), relative_path), cx)
1432 } else {
1433 Task::ready(Err(anyhow!("no such path")))
1434 }
1435 }
1436
1437 pub fn open_buffer(
1438 &mut self,
1439 path: impl Into<ProjectPath>,
1440 cx: &mut ModelContext<Self>,
1441 ) -> Task<Result<ModelHandle<Buffer>>> {
1442 let project_path = path.into();
1443 let worktree = if let Some(worktree) = self.worktree_for_id(project_path.worktree_id, cx) {
1444 worktree
1445 } else {
1446 return Task::ready(Err(anyhow!("no such worktree")));
1447 };
1448
1449 // If there is already a buffer for the given path, then return it.
1450 let existing_buffer = self.get_open_buffer(&project_path, cx);
1451 if let Some(existing_buffer) = existing_buffer {
1452 return Task::ready(Ok(existing_buffer));
1453 }
1454
1455 let loading_watch = match self.loading_buffers_by_path.entry(project_path.clone()) {
1456 // If the given path is already being loaded, then wait for that existing
1457 // task to complete and return the same buffer.
1458 hash_map::Entry::Occupied(e) => e.get().clone(),
1459
1460 // Otherwise, record the fact that this path is now being loaded.
1461 hash_map::Entry::Vacant(entry) => {
1462 let (mut tx, rx) = postage::watch::channel();
1463 entry.insert(rx.clone());
1464
1465 let load_buffer = if worktree.read(cx).is_local() {
1466 self.open_local_buffer_internal(&project_path.path, &worktree, cx)
1467 } else {
1468 self.open_remote_buffer_internal(&project_path.path, &worktree, cx)
1469 };
1470
1471 cx.spawn(move |this, mut cx| async move {
1472 let load_result = load_buffer.await;
1473 *tx.borrow_mut() = Some(this.update(&mut cx, |this, _| {
1474 // Record the fact that the buffer is no longer loading.
1475 this.loading_buffers_by_path.remove(&project_path);
1476 let buffer = load_result.map_err(Arc::new)?;
1477 Ok(buffer)
1478 }));
1479 })
1480 .detach();
1481 rx
1482 }
1483 };
1484
1485 cx.foreground().spawn(async move {
1486 wait_for_loading_buffer(loading_watch)
1487 .await
1488 .map_err(|error| anyhow!("{}", error))
1489 })
1490 }
1491
1492 fn open_local_buffer_internal(
1493 &mut self,
1494 path: &Arc<Path>,
1495 worktree: &ModelHandle<Worktree>,
1496 cx: &mut ModelContext<Self>,
1497 ) -> Task<Result<ModelHandle<Buffer>>> {
1498 let buffer_id = post_inc(&mut self.next_buffer_id);
1499 let load_buffer = worktree.update(cx, |worktree, cx| {
1500 let worktree = worktree.as_local_mut().unwrap();
1501 worktree.load_buffer(buffer_id, path, cx)
1502 });
1503 cx.spawn(|this, mut cx| async move {
1504 let buffer = load_buffer.await?;
1505 this.update(&mut cx, |this, cx| this.register_buffer(&buffer, cx))?;
1506 Ok(buffer)
1507 })
1508 }
1509
1510 fn open_remote_buffer_internal(
1511 &mut self,
1512 path: &Arc<Path>,
1513 worktree: &ModelHandle<Worktree>,
1514 cx: &mut ModelContext<Self>,
1515 ) -> Task<Result<ModelHandle<Buffer>>> {
1516 let rpc = self.client.clone();
1517 let project_id = self.remote_id().unwrap();
1518 let remote_worktree_id = worktree.read(cx).id();
1519 let path = path.clone();
1520 let path_string = path.to_string_lossy().to_string();
1521 cx.spawn(|this, mut cx| async move {
1522 let response = rpc
1523 .request(proto::OpenBufferByPath {
1524 project_id,
1525 worktree_id: remote_worktree_id.to_proto(),
1526 path: path_string,
1527 })
1528 .await?;
1529 this.update(&mut cx, |this, cx| {
1530 this.wait_for_remote_buffer(response.buffer_id, cx)
1531 })
1532 .await
1533 })
1534 }
1535
1536 /// LanguageServerName is owned, because it is inserted into a map
1537 fn open_local_buffer_via_lsp(
1538 &mut self,
1539 abs_path: lsp::Url,
1540 language_server_id: LanguageServerId,
1541 language_server_name: LanguageServerName,
1542 cx: &mut ModelContext<Self>,
1543 ) -> Task<Result<ModelHandle<Buffer>>> {
1544 cx.spawn(|this, mut cx| async move {
1545 let abs_path = abs_path
1546 .to_file_path()
1547 .map_err(|_| anyhow!("can't convert URI to path"))?;
1548 let (worktree, relative_path) = if let Some(result) =
1549 this.read_with(&cx, |this, cx| this.find_local_worktree(&abs_path, cx))
1550 {
1551 result
1552 } else {
1553 let worktree = this
1554 .update(&mut cx, |this, cx| {
1555 this.create_local_worktree(&abs_path, false, cx)
1556 })
1557 .await?;
1558 this.update(&mut cx, |this, cx| {
1559 this.language_server_ids.insert(
1560 (worktree.read(cx).id(), language_server_name),
1561 language_server_id,
1562 );
1563 });
1564 (worktree, PathBuf::new())
1565 };
1566
1567 let project_path = ProjectPath {
1568 worktree_id: worktree.read_with(&cx, |worktree, _| worktree.id()),
1569 path: relative_path.into(),
1570 };
1571 this.update(&mut cx, |this, cx| this.open_buffer(project_path, cx))
1572 .await
1573 })
1574 }
1575
1576 pub fn open_buffer_by_id(
1577 &mut self,
1578 id: u64,
1579 cx: &mut ModelContext<Self>,
1580 ) -> Task<Result<ModelHandle<Buffer>>> {
1581 if let Some(buffer) = self.buffer_for_id(id, cx) {
1582 Task::ready(Ok(buffer))
1583 } else if self.is_local() {
1584 Task::ready(Err(anyhow!("buffer {} does not exist", id)))
1585 } else if let Some(project_id) = self.remote_id() {
1586 let request = self
1587 .client
1588 .request(proto::OpenBufferById { project_id, id });
1589 cx.spawn(|this, mut cx| async move {
1590 let buffer_id = request.await?.buffer_id;
1591 this.update(&mut cx, |this, cx| {
1592 this.wait_for_remote_buffer(buffer_id, cx)
1593 })
1594 .await
1595 })
1596 } else {
1597 Task::ready(Err(anyhow!("cannot open buffer while disconnected")))
1598 }
1599 }
1600
1601 pub fn save_buffers(
1602 &self,
1603 buffers: HashSet<ModelHandle<Buffer>>,
1604 cx: &mut ModelContext<Self>,
1605 ) -> Task<Result<()>> {
1606 cx.spawn(|this, mut cx| async move {
1607 let save_tasks = buffers
1608 .into_iter()
1609 .map(|buffer| this.update(&mut cx, |this, cx| this.save_buffer(buffer, cx)));
1610 try_join_all(save_tasks).await?;
1611 Ok(())
1612 })
1613 }
1614
1615 pub fn save_buffer(
1616 &self,
1617 buffer: ModelHandle<Buffer>,
1618 cx: &mut ModelContext<Self>,
1619 ) -> Task<Result<(clock::Global, RopeFingerprint, SystemTime)>> {
1620 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
1621 return Task::ready(Err(anyhow!("buffer doesn't have a file")));
1622 };
1623 let worktree = file.worktree.clone();
1624 let path = file.path.clone();
1625 worktree.update(cx, |worktree, cx| match worktree {
1626 Worktree::Local(worktree) => worktree.save_buffer(buffer, path, false, cx),
1627 Worktree::Remote(worktree) => worktree.save_buffer(buffer, cx),
1628 })
1629 }
1630
1631 pub fn save_buffer_as(
1632 &mut self,
1633 buffer: ModelHandle<Buffer>,
1634 abs_path: PathBuf,
1635 cx: &mut ModelContext<Self>,
1636 ) -> Task<Result<()>> {
1637 let worktree_task = self.find_or_create_local_worktree(&abs_path, true, cx);
1638 let old_file = File::from_dyn(buffer.read(cx).file())
1639 .filter(|f| f.is_local())
1640 .cloned();
1641 cx.spawn(|this, mut cx| async move {
1642 if let Some(old_file) = &old_file {
1643 this.update(&mut cx, |this, cx| {
1644 this.unregister_buffer_from_language_servers(&buffer, old_file, cx);
1645 });
1646 }
1647 let (worktree, path) = worktree_task.await?;
1648 worktree
1649 .update(&mut cx, |worktree, cx| match worktree {
1650 Worktree::Local(worktree) => {
1651 worktree.save_buffer(buffer.clone(), path.into(), true, cx)
1652 }
1653 Worktree::Remote(_) => panic!("cannot remote buffers as new files"),
1654 })
1655 .await?;
1656 this.update(&mut cx, |this, cx| {
1657 this.detect_language_for_buffer(&buffer, cx);
1658 this.register_buffer_with_language_servers(&buffer, cx);
1659 });
1660 Ok(())
1661 })
1662 }
1663
1664 pub fn get_open_buffer(
1665 &mut self,
1666 path: &ProjectPath,
1667 cx: &mut ModelContext<Self>,
1668 ) -> Option<ModelHandle<Buffer>> {
1669 let worktree = self.worktree_for_id(path.worktree_id, cx)?;
1670 self.opened_buffers.values().find_map(|buffer| {
1671 let buffer = buffer.upgrade(cx)?;
1672 let file = File::from_dyn(buffer.read(cx).file())?;
1673 if file.worktree == worktree && file.path() == &path.path {
1674 Some(buffer)
1675 } else {
1676 None
1677 }
1678 })
1679 }
1680
1681 fn register_buffer(
1682 &mut self,
1683 buffer: &ModelHandle<Buffer>,
1684 cx: &mut ModelContext<Self>,
1685 ) -> Result<()> {
1686 self.request_buffer_diff_recalculation(buffer, cx);
1687 buffer.update(cx, |buffer, _| {
1688 buffer.set_language_registry(self.languages.clone())
1689 });
1690
1691 let remote_id = buffer.read(cx).remote_id();
1692 let is_remote = self.is_remote();
1693 let open_buffer = if is_remote || self.is_shared() {
1694 OpenBuffer::Strong(buffer.clone())
1695 } else {
1696 OpenBuffer::Weak(buffer.downgrade())
1697 };
1698
1699 match self.opened_buffers.entry(remote_id) {
1700 hash_map::Entry::Vacant(entry) => {
1701 entry.insert(open_buffer);
1702 }
1703 hash_map::Entry::Occupied(mut entry) => {
1704 if let OpenBuffer::Operations(operations) = entry.get_mut() {
1705 buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx))?;
1706 } else if entry.get().upgrade(cx).is_some() {
1707 if is_remote {
1708 return Ok(());
1709 } else {
1710 debug_panic!("buffer {} was already registered", remote_id);
1711 Err(anyhow!("buffer {} was already registered", remote_id))?;
1712 }
1713 }
1714 entry.insert(open_buffer);
1715 }
1716 }
1717 cx.subscribe(buffer, |this, buffer, event, cx| {
1718 this.on_buffer_event(buffer, event, cx);
1719 })
1720 .detach();
1721
1722 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
1723 if file.is_local {
1724 self.local_buffer_ids_by_path.insert(
1725 ProjectPath {
1726 worktree_id: file.worktree_id(cx),
1727 path: file.path.clone(),
1728 },
1729 remote_id,
1730 );
1731
1732 self.local_buffer_ids_by_entry_id
1733 .insert(file.entry_id, remote_id);
1734 }
1735 }
1736
1737 self.detect_language_for_buffer(buffer, cx);
1738 self.register_buffer_with_language_servers(buffer, cx);
1739 self.register_buffer_with_copilot(buffer, cx);
1740 cx.observe_release(buffer, |this, buffer, cx| {
1741 if let Some(file) = File::from_dyn(buffer.file()) {
1742 if file.is_local() {
1743 let uri = lsp::Url::from_file_path(file.abs_path(cx)).unwrap();
1744 for server in this.language_servers_for_buffer(buffer, cx) {
1745 server
1746 .1
1747 .notify::<lsp::notification::DidCloseTextDocument>(
1748 lsp::DidCloseTextDocumentParams {
1749 text_document: lsp::TextDocumentIdentifier::new(uri.clone()),
1750 },
1751 )
1752 .log_err();
1753 }
1754 }
1755 }
1756 })
1757 .detach();
1758
1759 *self.opened_buffer.0.borrow_mut() = ();
1760 Ok(())
1761 }
1762
1763 fn register_buffer_with_language_servers(
1764 &mut self,
1765 buffer_handle: &ModelHandle<Buffer>,
1766 cx: &mut ModelContext<Self>,
1767 ) {
1768 let buffer = buffer_handle.read(cx);
1769 let buffer_id = buffer.remote_id();
1770
1771 if let Some(file) = File::from_dyn(buffer.file()) {
1772 if !file.is_local() {
1773 return;
1774 }
1775
1776 let uri = lsp::Url::from_file_path(file.abs_path(cx)).unwrap();
1777 let initial_snapshot = buffer.text_snapshot();
1778 let language = buffer.language().cloned();
1779 let worktree_id = file.worktree_id(cx);
1780
1781 if let Some(local_worktree) = file.worktree.read(cx).as_local() {
1782 for (server_id, diagnostics) in local_worktree.diagnostics_for_path(file.path()) {
1783 self.update_buffer_diagnostics(buffer_handle, server_id, None, diagnostics, cx)
1784 .log_err();
1785 }
1786 }
1787
1788 if let Some(language) = language {
1789 for adapter in language.lsp_adapters() {
1790 let language_id = adapter.language_ids.get(language.name().as_ref()).cloned();
1791 let server = self
1792 .language_server_ids
1793 .get(&(worktree_id, adapter.name.clone()))
1794 .and_then(|id| self.language_servers.get(id))
1795 .and_then(|server_state| {
1796 if let LanguageServerState::Running { server, .. } = server_state {
1797 Some(server.clone())
1798 } else {
1799 None
1800 }
1801 });
1802 let server = match server {
1803 Some(server) => server,
1804 None => continue,
1805 };
1806
1807 server
1808 .notify::<lsp::notification::DidOpenTextDocument>(
1809 lsp::DidOpenTextDocumentParams {
1810 text_document: lsp::TextDocumentItem::new(
1811 uri.clone(),
1812 language_id.unwrap_or_default(),
1813 0,
1814 initial_snapshot.text(),
1815 ),
1816 },
1817 )
1818 .log_err();
1819
1820 buffer_handle.update(cx, |buffer, cx| {
1821 buffer.set_completion_triggers(
1822 server
1823 .capabilities()
1824 .completion_provider
1825 .as_ref()
1826 .and_then(|provider| provider.trigger_characters.clone())
1827 .unwrap_or_default(),
1828 cx,
1829 );
1830 });
1831
1832 let snapshot = LspBufferSnapshot {
1833 version: 0,
1834 snapshot: initial_snapshot.clone(),
1835 };
1836 self.buffer_snapshots
1837 .entry(buffer_id)
1838 .or_default()
1839 .insert(server.server_id(), vec![snapshot]);
1840 }
1841 }
1842 }
1843 }
1844
1845 fn unregister_buffer_from_language_servers(
1846 &mut self,
1847 buffer: &ModelHandle<Buffer>,
1848 old_file: &File,
1849 cx: &mut ModelContext<Self>,
1850 ) {
1851 let old_path = match old_file.as_local() {
1852 Some(local) => local.abs_path(cx),
1853 None => return,
1854 };
1855
1856 buffer.update(cx, |buffer, cx| {
1857 let worktree_id = old_file.worktree_id(cx);
1858 let ids = &self.language_server_ids;
1859
1860 let language = buffer.language().cloned();
1861 let adapters = language.iter().flat_map(|language| language.lsp_adapters());
1862 for &server_id in adapters.flat_map(|a| ids.get(&(worktree_id, a.name.clone()))) {
1863 buffer.update_diagnostics(server_id, Default::default(), cx);
1864 }
1865
1866 self.buffer_snapshots.remove(&buffer.remote_id());
1867 let file_url = lsp::Url::from_file_path(old_path).unwrap();
1868 for (_, language_server) in self.language_servers_for_buffer(buffer, cx) {
1869 language_server
1870 .notify::<lsp::notification::DidCloseTextDocument>(
1871 lsp::DidCloseTextDocumentParams {
1872 text_document: lsp::TextDocumentIdentifier::new(file_url.clone()),
1873 },
1874 )
1875 .log_err();
1876 }
1877 });
1878 }
1879
1880 fn register_buffer_with_copilot(
1881 &self,
1882 buffer_handle: &ModelHandle<Buffer>,
1883 cx: &mut ModelContext<Self>,
1884 ) {
1885 if let Some(copilot) = Copilot::global(cx) {
1886 copilot.update(cx, |copilot, cx| copilot.register_buffer(buffer_handle, cx));
1887 }
1888 }
1889
1890 async fn send_buffer_ordered_messages(
1891 this: WeakModelHandle<Self>,
1892 rx: UnboundedReceiver<BufferOrderedMessage>,
1893 mut cx: AsyncAppContext,
1894 ) -> Option<()> {
1895 const MAX_BATCH_SIZE: usize = 128;
1896
1897 let mut operations_by_buffer_id = HashMap::default();
1898 async fn flush_operations(
1899 this: &ModelHandle<Project>,
1900 operations_by_buffer_id: &mut HashMap<u64, Vec<proto::Operation>>,
1901 needs_resync_with_host: &mut bool,
1902 is_local: bool,
1903 cx: &AsyncAppContext,
1904 ) {
1905 for (buffer_id, operations) in operations_by_buffer_id.drain() {
1906 let request = this.read_with(cx, |this, _| {
1907 let project_id = this.remote_id()?;
1908 Some(this.client.request(proto::UpdateBuffer {
1909 buffer_id,
1910 project_id,
1911 operations,
1912 }))
1913 });
1914 if let Some(request) = request {
1915 if request.await.is_err() && !is_local {
1916 *needs_resync_with_host = true;
1917 break;
1918 }
1919 }
1920 }
1921 }
1922
1923 let mut needs_resync_with_host = false;
1924 let mut changes = rx.ready_chunks(MAX_BATCH_SIZE);
1925
1926 while let Some(changes) = changes.next().await {
1927 let this = this.upgrade(&mut cx)?;
1928 let is_local = this.read_with(&cx, |this, _| this.is_local());
1929
1930 for change in changes {
1931 match change {
1932 BufferOrderedMessage::Operation {
1933 buffer_id,
1934 operation,
1935 } => {
1936 if needs_resync_with_host {
1937 continue;
1938 }
1939
1940 operations_by_buffer_id
1941 .entry(buffer_id)
1942 .or_insert(Vec::new())
1943 .push(operation);
1944 }
1945
1946 BufferOrderedMessage::Resync => {
1947 operations_by_buffer_id.clear();
1948 if this
1949 .update(&mut cx, |this, cx| this.synchronize_remote_buffers(cx))
1950 .await
1951 .is_ok()
1952 {
1953 needs_resync_with_host = false;
1954 }
1955 }
1956
1957 BufferOrderedMessage::LanguageServerUpdate {
1958 language_server_id,
1959 message,
1960 } => {
1961 flush_operations(
1962 &this,
1963 &mut operations_by_buffer_id,
1964 &mut needs_resync_with_host,
1965 is_local,
1966 &cx,
1967 )
1968 .await;
1969
1970 this.read_with(&cx, |this, _| {
1971 if let Some(project_id) = this.remote_id() {
1972 this.client
1973 .send(proto::UpdateLanguageServer {
1974 project_id,
1975 language_server_id: language_server_id.0 as u64,
1976 variant: Some(message),
1977 })
1978 .log_err();
1979 }
1980 });
1981 }
1982 }
1983 }
1984
1985 flush_operations(
1986 &this,
1987 &mut operations_by_buffer_id,
1988 &mut needs_resync_with_host,
1989 is_local,
1990 &cx,
1991 )
1992 .await;
1993 }
1994
1995 None
1996 }
1997
1998 fn on_buffer_event(
1999 &mut self,
2000 buffer: ModelHandle<Buffer>,
2001 event: &BufferEvent,
2002 cx: &mut ModelContext<Self>,
2003 ) -> Option<()> {
2004 if matches!(
2005 event,
2006 BufferEvent::Edited { .. } | BufferEvent::Reloaded | BufferEvent::DiffBaseChanged
2007 ) {
2008 self.request_buffer_diff_recalculation(&buffer, cx);
2009 }
2010
2011 match event {
2012 BufferEvent::Operation(operation) => {
2013 self.buffer_ordered_messages_tx
2014 .unbounded_send(BufferOrderedMessage::Operation {
2015 buffer_id: buffer.read(cx).remote_id(),
2016 operation: language::proto::serialize_operation(operation),
2017 })
2018 .ok();
2019 }
2020
2021 BufferEvent::Edited { .. } => {
2022 let buffer = buffer.read(cx);
2023 let file = File::from_dyn(buffer.file())?;
2024 let abs_path = file.as_local()?.abs_path(cx);
2025 let uri = lsp::Url::from_file_path(abs_path).unwrap();
2026 let next_snapshot = buffer.text_snapshot();
2027
2028 let language_servers: Vec<_> = self
2029 .language_servers_for_buffer(buffer, cx)
2030 .map(|i| i.1.clone())
2031 .collect();
2032
2033 for language_server in language_servers {
2034 let language_server = language_server.clone();
2035
2036 let buffer_snapshots = self
2037 .buffer_snapshots
2038 .get_mut(&buffer.remote_id())
2039 .and_then(|m| m.get_mut(&language_server.server_id()))?;
2040 let previous_snapshot = buffer_snapshots.last()?;
2041 let next_version = previous_snapshot.version + 1;
2042
2043 let content_changes = buffer
2044 .edits_since::<(PointUtf16, usize)>(previous_snapshot.snapshot.version())
2045 .map(|edit| {
2046 let edit_start = edit.new.start.0;
2047 let edit_end = edit_start + (edit.old.end.0 - edit.old.start.0);
2048 let new_text = next_snapshot
2049 .text_for_range(edit.new.start.1..edit.new.end.1)
2050 .collect();
2051 lsp::TextDocumentContentChangeEvent {
2052 range: Some(lsp::Range::new(
2053 point_to_lsp(edit_start),
2054 point_to_lsp(edit_end),
2055 )),
2056 range_length: None,
2057 text: new_text,
2058 }
2059 })
2060 .collect();
2061
2062 buffer_snapshots.push(LspBufferSnapshot {
2063 version: next_version,
2064 snapshot: next_snapshot.clone(),
2065 });
2066
2067 language_server
2068 .notify::<lsp::notification::DidChangeTextDocument>(
2069 lsp::DidChangeTextDocumentParams {
2070 text_document: lsp::VersionedTextDocumentIdentifier::new(
2071 uri.clone(),
2072 next_version,
2073 ),
2074 content_changes,
2075 },
2076 )
2077 .log_err();
2078 }
2079 }
2080
2081 BufferEvent::Saved => {
2082 let file = File::from_dyn(buffer.read(cx).file())?;
2083 let worktree_id = file.worktree_id(cx);
2084 let abs_path = file.as_local()?.abs_path(cx);
2085 let text_document = lsp::TextDocumentIdentifier {
2086 uri: lsp::Url::from_file_path(abs_path).unwrap(),
2087 };
2088
2089 for (_, _, server) in self.language_servers_for_worktree(worktree_id) {
2090 server
2091 .notify::<lsp::notification::DidSaveTextDocument>(
2092 lsp::DidSaveTextDocumentParams {
2093 text_document: text_document.clone(),
2094 text: None,
2095 },
2096 )
2097 .log_err();
2098 }
2099
2100 let language_server_ids = self.language_server_ids_for_buffer(buffer.read(cx), cx);
2101 for language_server_id in language_server_ids {
2102 if let Some(LanguageServerState::Running {
2103 adapter,
2104 simulate_disk_based_diagnostics_completion,
2105 ..
2106 }) = self.language_servers.get_mut(&language_server_id)
2107 {
2108 // After saving a buffer using a language server that doesn't provide
2109 // a disk-based progress token, kick off a timer that will reset every
2110 // time the buffer is saved. If the timer eventually fires, simulate
2111 // disk-based diagnostics being finished so that other pieces of UI
2112 // (e.g., project diagnostics view, diagnostic status bar) can update.
2113 // We don't emit an event right away because the language server might take
2114 // some time to publish diagnostics.
2115 if adapter.disk_based_diagnostics_progress_token.is_none() {
2116 const DISK_BASED_DIAGNOSTICS_DEBOUNCE: Duration =
2117 Duration::from_secs(1);
2118
2119 let task = cx.spawn_weak(|this, mut cx| async move {
2120 cx.background().timer(DISK_BASED_DIAGNOSTICS_DEBOUNCE).await;
2121 if let Some(this) = this.upgrade(&cx) {
2122 this.update(&mut cx, |this, cx| {
2123 this.disk_based_diagnostics_finished(
2124 language_server_id,
2125 cx,
2126 );
2127 this.buffer_ordered_messages_tx
2128 .unbounded_send(
2129 BufferOrderedMessage::LanguageServerUpdate {
2130 language_server_id,
2131 message:proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(Default::default())
2132 },
2133 )
2134 .ok();
2135 });
2136 }
2137 });
2138 *simulate_disk_based_diagnostics_completion = Some(task);
2139 }
2140 }
2141 }
2142 }
2143
2144 _ => {}
2145 }
2146
2147 None
2148 }
2149
2150 fn request_buffer_diff_recalculation(
2151 &mut self,
2152 buffer: &ModelHandle<Buffer>,
2153 cx: &mut ModelContext<Self>,
2154 ) {
2155 self.buffers_needing_diff.insert(buffer.downgrade());
2156 let first_insertion = self.buffers_needing_diff.len() == 1;
2157
2158 let settings = settings::get::<ProjectSettings>(cx);
2159 let delay = if let Some(delay) = settings.git.gutter_debounce {
2160 delay
2161 } else {
2162 if first_insertion {
2163 let this = cx.weak_handle();
2164 cx.defer(move |cx| {
2165 if let Some(this) = this.upgrade(cx) {
2166 this.update(cx, |this, cx| {
2167 this.recalculate_buffer_diffs(cx).detach();
2168 });
2169 }
2170 });
2171 }
2172 return;
2173 };
2174
2175 const MIN_DELAY: u64 = 50;
2176 let delay = delay.max(MIN_DELAY);
2177 let duration = Duration::from_millis(delay);
2178
2179 self.git_diff_debouncer
2180 .fire_new(duration, cx, move |this, cx| {
2181 this.recalculate_buffer_diffs(cx)
2182 });
2183 }
2184
2185 fn recalculate_buffer_diffs(&mut self, cx: &mut ModelContext<Self>) -> Task<()> {
2186 cx.spawn(|this, mut cx| async move {
2187 let buffers: Vec<_> = this.update(&mut cx, |this, _| {
2188 this.buffers_needing_diff.drain().collect()
2189 });
2190
2191 let tasks: Vec<_> = this.update(&mut cx, |_, cx| {
2192 buffers
2193 .iter()
2194 .filter_map(|buffer| {
2195 let buffer = buffer.upgrade(cx)?;
2196 buffer.update(cx, |buffer, cx| buffer.git_diff_recalc(cx))
2197 })
2198 .collect()
2199 });
2200
2201 futures::future::join_all(tasks).await;
2202
2203 this.update(&mut cx, |this, cx| {
2204 if !this.buffers_needing_diff.is_empty() {
2205 this.recalculate_buffer_diffs(cx).detach();
2206 } else {
2207 // TODO: Would a `ModelContext<Project>.notify()` suffice here?
2208 for buffer in buffers {
2209 if let Some(buffer) = buffer.upgrade(cx) {
2210 buffer.update(cx, |_, cx| cx.notify());
2211 }
2212 }
2213 }
2214 });
2215 })
2216 }
2217
2218 fn language_servers_for_worktree(
2219 &self,
2220 worktree_id: WorktreeId,
2221 ) -> impl Iterator<Item = (&Arc<CachedLspAdapter>, &Arc<Language>, &Arc<LanguageServer>)> {
2222 self.language_server_ids
2223 .iter()
2224 .filter_map(move |((language_server_worktree_id, _), id)| {
2225 if *language_server_worktree_id == worktree_id {
2226 if let Some(LanguageServerState::Running {
2227 adapter,
2228 language,
2229 server,
2230 ..
2231 }) = self.language_servers.get(id)
2232 {
2233 return Some((adapter, language, server));
2234 }
2235 }
2236 None
2237 })
2238 }
2239
2240 fn maintain_buffer_languages(
2241 languages: &LanguageRegistry,
2242 cx: &mut ModelContext<Project>,
2243 ) -> Task<()> {
2244 let mut subscription = languages.subscribe();
2245 cx.spawn_weak(|project, mut cx| async move {
2246 while let Some(()) = subscription.next().await {
2247 if let Some(project) = project.upgrade(&cx) {
2248 project.update(&mut cx, |project, cx| {
2249 let mut plain_text_buffers = Vec::new();
2250 let mut buffers_with_unknown_injections = Vec::new();
2251 for buffer in project.opened_buffers.values() {
2252 if let Some(handle) = buffer.upgrade(cx) {
2253 let buffer = &handle.read(cx);
2254 if buffer.language().is_none()
2255 || buffer.language() == Some(&*language::PLAIN_TEXT)
2256 {
2257 plain_text_buffers.push(handle);
2258 } else if buffer.contains_unknown_injections() {
2259 buffers_with_unknown_injections.push(handle);
2260 }
2261 }
2262 }
2263
2264 for buffer in plain_text_buffers {
2265 project.detect_language_for_buffer(&buffer, cx);
2266 project.register_buffer_with_language_servers(&buffer, cx);
2267 }
2268
2269 for buffer in buffers_with_unknown_injections {
2270 buffer.update(cx, |buffer, cx| buffer.reparse(cx));
2271 }
2272 });
2273 }
2274 }
2275 })
2276 }
2277
2278 fn maintain_workspace_config(
2279 languages: Arc<LanguageRegistry>,
2280 cx: &mut ModelContext<Project>,
2281 ) -> Task<()> {
2282 let (mut settings_changed_tx, mut settings_changed_rx) = watch::channel();
2283 let _ = postage::stream::Stream::try_recv(&mut settings_changed_rx);
2284
2285 let settings_observation = cx.observe_global::<SettingsStore, _>(move |_, _| {
2286 *settings_changed_tx.borrow_mut() = ();
2287 });
2288 cx.spawn_weak(|this, mut cx| async move {
2289 while let Some(_) = settings_changed_rx.next().await {
2290 let workspace_config = cx.update(|cx| languages.workspace_configuration(cx)).await;
2291 if let Some(this) = this.upgrade(&cx) {
2292 this.read_with(&cx, |this, _| {
2293 for server_state in this.language_servers.values() {
2294 if let LanguageServerState::Running { server, .. } = server_state {
2295 server
2296 .notify::<lsp::notification::DidChangeConfiguration>(
2297 lsp::DidChangeConfigurationParams {
2298 settings: workspace_config.clone(),
2299 },
2300 )
2301 .ok();
2302 }
2303 }
2304 })
2305 } else {
2306 break;
2307 }
2308 }
2309
2310 drop(settings_observation);
2311 })
2312 }
2313
2314 fn detect_language_for_buffer(
2315 &mut self,
2316 buffer_handle: &ModelHandle<Buffer>,
2317 cx: &mut ModelContext<Self>,
2318 ) -> Option<()> {
2319 // If the buffer has a language, set it and start the language server if we haven't already.
2320 let buffer = buffer_handle.read(cx);
2321 let full_path = buffer.file()?.full_path(cx);
2322 let content = buffer.as_rope();
2323 let new_language = self
2324 .languages
2325 .language_for_file(&full_path, Some(content))
2326 .now_or_never()?
2327 .ok()?;
2328 self.set_language_for_buffer(buffer_handle, new_language, cx);
2329 None
2330 }
2331
2332 pub fn set_language_for_buffer(
2333 &mut self,
2334 buffer: &ModelHandle<Buffer>,
2335 new_language: Arc<Language>,
2336 cx: &mut ModelContext<Self>,
2337 ) {
2338 buffer.update(cx, |buffer, cx| {
2339 if buffer.language().map_or(true, |old_language| {
2340 !Arc::ptr_eq(old_language, &new_language)
2341 }) {
2342 buffer.set_language(Some(new_language.clone()), cx);
2343 }
2344 });
2345
2346 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
2347 let worktree = file.worktree.clone();
2348 if let Some(tree) = worktree.read(cx).as_local() {
2349 self.start_language_servers(&worktree, tree.abs_path().clone(), new_language, cx);
2350 }
2351 }
2352 }
2353
2354 fn start_language_servers(
2355 &mut self,
2356 worktree: &ModelHandle<Worktree>,
2357 worktree_path: Arc<Path>,
2358 language: Arc<Language>,
2359 cx: &mut ModelContext<Self>,
2360 ) {
2361 if !language_settings(
2362 Some(&language),
2363 worktree
2364 .update(cx, |tree, cx| tree.root_file(cx))
2365 .map(|f| f as _)
2366 .as_ref(),
2367 cx,
2368 )
2369 .enable_language_server
2370 {
2371 return;
2372 }
2373
2374 let worktree_id = worktree.read(cx).id();
2375 for adapter in language.lsp_adapters() {
2376 let key = (worktree_id, adapter.name.clone());
2377 if self.language_server_ids.contains_key(&key) {
2378 continue;
2379 }
2380
2381 let pending_server = match self.languages.start_language_server(
2382 language.clone(),
2383 adapter.clone(),
2384 worktree_path.clone(),
2385 self.client.http_client(),
2386 cx,
2387 ) {
2388 Some(pending_server) => pending_server,
2389 None => continue,
2390 };
2391
2392 let lsp = settings::get::<ProjectSettings>(cx)
2393 .lsp
2394 .get(&adapter.name.0);
2395 let override_options = lsp.map(|s| s.initialization_options.clone()).flatten();
2396
2397 let mut initialization_options = adapter.initialization_options.clone();
2398 match (&mut initialization_options, override_options) {
2399 (Some(initialization_options), Some(override_options)) => {
2400 merge_json_value_into(override_options, initialization_options);
2401 }
2402 (None, override_options) => initialization_options = override_options,
2403 _ => {}
2404 }
2405
2406 let server_id = pending_server.server_id;
2407 let state = self.setup_pending_language_server(
2408 initialization_options,
2409 pending_server,
2410 adapter.clone(),
2411 language.clone(),
2412 key.clone(),
2413 cx,
2414 );
2415 self.language_servers.insert(server_id, state);
2416 self.language_server_ids.insert(key.clone(), server_id);
2417 }
2418 }
2419
2420 fn setup_pending_language_server(
2421 &mut self,
2422 initialization_options: Option<serde_json::Value>,
2423 pending_server: PendingLanguageServer,
2424 adapter: Arc<CachedLspAdapter>,
2425 language: Arc<Language>,
2426 key: (WorktreeId, LanguageServerName),
2427 cx: &mut ModelContext<Project>,
2428 ) -> LanguageServerState {
2429 let server_id = pending_server.server_id;
2430 let languages = self.languages.clone();
2431
2432 LanguageServerState::Starting(cx.spawn_weak(|this, mut cx| async move {
2433 let workspace_config = cx.update(|cx| languages.workspace_configuration(cx)).await;
2434 let language_server = pending_server.task.await.log_err()?;
2435 let language_server = language_server
2436 .initialize(initialization_options)
2437 .await
2438 .log_err()?;
2439 let this = this.upgrade(&cx)?;
2440
2441 language_server
2442 .on_notification::<lsp::notification::PublishDiagnostics, _>({
2443 let this = this.downgrade();
2444 let adapter = adapter.clone();
2445 move |mut params, cx| {
2446 let this = this;
2447 let adapter = adapter.clone();
2448 cx.spawn(|mut cx| async move {
2449 adapter.process_diagnostics(&mut params).await;
2450 if let Some(this) = this.upgrade(&cx) {
2451 this.update(&mut cx, |this, cx| {
2452 this.update_diagnostics(
2453 server_id,
2454 params,
2455 &adapter.disk_based_diagnostic_sources,
2456 cx,
2457 )
2458 .log_err();
2459 });
2460 }
2461 })
2462 .detach();
2463 }
2464 })
2465 .detach();
2466
2467 language_server
2468 .on_request::<lsp::request::WorkspaceConfiguration, _, _>({
2469 let languages = languages.clone();
2470 move |params, mut cx| {
2471 let languages = languages.clone();
2472 async move {
2473 let workspace_config =
2474 cx.update(|cx| languages.workspace_configuration(cx)).await;
2475 Ok(params
2476 .items
2477 .into_iter()
2478 .map(|item| {
2479 if let Some(section) = &item.section {
2480 workspace_config
2481 .get(section)
2482 .cloned()
2483 .unwrap_or(serde_json::Value::Null)
2484 } else {
2485 workspace_config.clone()
2486 }
2487 })
2488 .collect())
2489 }
2490 }
2491 })
2492 .detach();
2493
2494 // Even though we don't have handling for these requests, respond to them to
2495 // avoid stalling any language server like `gopls` which waits for a response
2496 // to these requests when initializing.
2497 language_server
2498 .on_request::<lsp::request::WorkDoneProgressCreate, _, _>({
2499 let this = this.downgrade();
2500 move |params, mut cx| async move {
2501 if let Some(this) = this.upgrade(&cx) {
2502 this.update(&mut cx, |this, _| {
2503 if let Some(status) =
2504 this.language_server_statuses.get_mut(&server_id)
2505 {
2506 if let lsp::NumberOrString::String(token) = params.token {
2507 status.progress_tokens.insert(token);
2508 }
2509 }
2510 });
2511 }
2512 Ok(())
2513 }
2514 })
2515 .detach();
2516 language_server
2517 .on_request::<lsp::request::RegisterCapability, _, _>({
2518 let this = this.downgrade();
2519 move |params, mut cx| async move {
2520 let this = this
2521 .upgrade(&cx)
2522 .ok_or_else(|| anyhow!("project dropped"))?;
2523 for reg in params.registrations {
2524 if reg.method == "workspace/didChangeWatchedFiles" {
2525 if let Some(options) = reg.register_options {
2526 let options = serde_json::from_value(options)?;
2527 this.update(&mut cx, |this, cx| {
2528 this.on_lsp_did_change_watched_files(
2529 server_id, options, cx,
2530 );
2531 });
2532 }
2533 }
2534 }
2535 Ok(())
2536 }
2537 })
2538 .detach();
2539
2540 language_server
2541 .on_request::<lsp::request::ApplyWorkspaceEdit, _, _>({
2542 let this = this.downgrade();
2543 let adapter = adapter.clone();
2544 let language_server = language_server.clone();
2545 move |params, cx| {
2546 Self::on_lsp_workspace_edit(
2547 this,
2548 params,
2549 server_id,
2550 adapter.clone(),
2551 language_server.clone(),
2552 cx,
2553 )
2554 }
2555 })
2556 .detach();
2557
2558 let disk_based_diagnostics_progress_token =
2559 adapter.disk_based_diagnostics_progress_token.clone();
2560
2561 language_server
2562 .on_notification::<lsp::notification::Progress, _>({
2563 let this = this.downgrade();
2564 move |params, mut cx| {
2565 if let Some(this) = this.upgrade(&cx) {
2566 this.update(&mut cx, |this, cx| {
2567 this.on_lsp_progress(
2568 params,
2569 server_id,
2570 disk_based_diagnostics_progress_token.clone(),
2571 cx,
2572 );
2573 });
2574 }
2575 }
2576 })
2577 .detach();
2578
2579 language_server
2580 .notify::<lsp::notification::DidChangeConfiguration>(
2581 lsp::DidChangeConfigurationParams {
2582 settings: workspace_config,
2583 },
2584 )
2585 .ok();
2586
2587 this.update(&mut cx, |this, cx| {
2588 // If the language server for this key doesn't match the server id, don't store the
2589 // server. Which will cause it to be dropped, killing the process
2590 if this
2591 .language_server_ids
2592 .get(&key)
2593 .map(|id| id != &server_id)
2594 .unwrap_or(false)
2595 {
2596 return None;
2597 }
2598
2599 // Update language_servers collection with Running variant of LanguageServerState
2600 // indicating that the server is up and running and ready
2601 this.language_servers.insert(
2602 server_id,
2603 LanguageServerState::Running {
2604 adapter: adapter.clone(),
2605 language: language.clone(),
2606 watched_paths: Default::default(),
2607 server: language_server.clone(),
2608 simulate_disk_based_diagnostics_completion: None,
2609 },
2610 );
2611 this.language_server_statuses.insert(
2612 server_id,
2613 LanguageServerStatus {
2614 name: language_server.name().to_string(),
2615 pending_work: Default::default(),
2616 has_pending_diagnostic_updates: false,
2617 progress_tokens: Default::default(),
2618 },
2619 );
2620
2621 if let Some(project_id) = this.remote_id() {
2622 this.client
2623 .send(proto::StartLanguageServer {
2624 project_id,
2625 server: Some(proto::LanguageServer {
2626 id: server_id.0 as u64,
2627 name: language_server.name().to_string(),
2628 }),
2629 })
2630 .log_err();
2631 }
2632
2633 // Tell the language server about every open buffer in the worktree that matches the language.
2634 for buffer in this.opened_buffers.values() {
2635 if let Some(buffer_handle) = buffer.upgrade(cx) {
2636 let buffer = buffer_handle.read(cx);
2637 let file = match File::from_dyn(buffer.file()) {
2638 Some(file) => file,
2639 None => continue,
2640 };
2641 let language = match buffer.language() {
2642 Some(language) => language,
2643 None => continue,
2644 };
2645
2646 if file.worktree.read(cx).id() != key.0
2647 || !language.lsp_adapters().iter().any(|a| a.name == key.1)
2648 {
2649 continue;
2650 }
2651
2652 let file = file.as_local()?;
2653 let versions = this
2654 .buffer_snapshots
2655 .entry(buffer.remote_id())
2656 .or_default()
2657 .entry(server_id)
2658 .or_insert_with(|| {
2659 vec![LspBufferSnapshot {
2660 version: 0,
2661 snapshot: buffer.text_snapshot(),
2662 }]
2663 });
2664
2665 let snapshot = versions.last().unwrap();
2666 let version = snapshot.version;
2667 let initial_snapshot = &snapshot.snapshot;
2668 let uri = lsp::Url::from_file_path(file.abs_path(cx)).unwrap();
2669 language_server
2670 .notify::<lsp::notification::DidOpenTextDocument>(
2671 lsp::DidOpenTextDocumentParams {
2672 text_document: lsp::TextDocumentItem::new(
2673 uri,
2674 adapter
2675 .language_ids
2676 .get(language.name().as_ref())
2677 .cloned()
2678 .unwrap_or_default(),
2679 version,
2680 initial_snapshot.text(),
2681 ),
2682 },
2683 )
2684 .log_err()?;
2685 buffer_handle.update(cx, |buffer, cx| {
2686 buffer.set_completion_triggers(
2687 language_server
2688 .capabilities()
2689 .completion_provider
2690 .as_ref()
2691 .and_then(|provider| provider.trigger_characters.clone())
2692 .unwrap_or_default(),
2693 cx,
2694 )
2695 });
2696 }
2697 }
2698
2699 cx.notify();
2700 Some(language_server)
2701 })
2702 }))
2703 }
2704
2705 // Returns a list of all of the worktrees which no longer have a language server and the root path
2706 // for the stopped server
2707 fn stop_language_server(
2708 &mut self,
2709 worktree_id: WorktreeId,
2710 adapter_name: LanguageServerName,
2711 cx: &mut ModelContext<Self>,
2712 ) -> Task<(Option<PathBuf>, Vec<WorktreeId>)> {
2713 let key = (worktree_id, adapter_name);
2714 if let Some(server_id) = self.language_server_ids.remove(&key) {
2715 // Remove other entries for this language server as well
2716 let mut orphaned_worktrees = vec![worktree_id];
2717 let other_keys = self.language_server_ids.keys().cloned().collect::<Vec<_>>();
2718 for other_key in other_keys {
2719 if self.language_server_ids.get(&other_key) == Some(&server_id) {
2720 self.language_server_ids.remove(&other_key);
2721 orphaned_worktrees.push(other_key.0);
2722 }
2723 }
2724
2725 for buffer in self.opened_buffers.values() {
2726 if let Some(buffer) = buffer.upgrade(cx) {
2727 buffer.update(cx, |buffer, cx| {
2728 buffer.update_diagnostics(server_id, Default::default(), cx);
2729 });
2730 }
2731 }
2732 for worktree in &self.worktrees {
2733 if let Some(worktree) = worktree.upgrade(cx) {
2734 worktree.update(cx, |worktree, cx| {
2735 if let Some(worktree) = worktree.as_local_mut() {
2736 worktree.clear_diagnostics_for_language_server(server_id, cx);
2737 }
2738 });
2739 }
2740 }
2741
2742 self.language_server_statuses.remove(&server_id);
2743 cx.notify();
2744
2745 let server_state = self.language_servers.remove(&server_id);
2746 cx.spawn_weak(|this, mut cx| async move {
2747 let mut root_path = None;
2748
2749 let server = match server_state {
2750 Some(LanguageServerState::Starting(started_language_server)) => {
2751 started_language_server.await
2752 }
2753 Some(LanguageServerState::Running { server, .. }) => Some(server),
2754 None => None,
2755 };
2756
2757 if let Some(server) = server {
2758 root_path = Some(server.root_path().clone());
2759 if let Some(shutdown) = server.shutdown() {
2760 shutdown.await;
2761 }
2762 }
2763
2764 if let Some(this) = this.upgrade(&cx) {
2765 this.update(&mut cx, |this, cx| {
2766 this.language_server_statuses.remove(&server_id);
2767 cx.notify();
2768 });
2769 }
2770
2771 (root_path, orphaned_worktrees)
2772 })
2773 } else {
2774 Task::ready((None, Vec::new()))
2775 }
2776 }
2777
2778 pub fn restart_language_servers_for_buffers(
2779 &mut self,
2780 buffers: impl IntoIterator<Item = ModelHandle<Buffer>>,
2781 cx: &mut ModelContext<Self>,
2782 ) -> Option<()> {
2783 let language_server_lookup_info: HashSet<(ModelHandle<Worktree>, Arc<Language>)> = buffers
2784 .into_iter()
2785 .filter_map(|buffer| {
2786 let buffer = buffer.read(cx);
2787 let file = File::from_dyn(buffer.file())?;
2788 let full_path = file.full_path(cx);
2789 let language = self
2790 .languages
2791 .language_for_file(&full_path, Some(buffer.as_rope()))
2792 .now_or_never()?
2793 .ok()?;
2794 Some((file.worktree.clone(), language))
2795 })
2796 .collect();
2797 for (worktree, language) in language_server_lookup_info {
2798 self.restart_language_servers(worktree, language, cx);
2799 }
2800
2801 None
2802 }
2803
2804 // TODO This will break in the case where the adapter's root paths and worktrees are not equal
2805 fn restart_language_servers(
2806 &mut self,
2807 worktree: ModelHandle<Worktree>,
2808 language: Arc<Language>,
2809 cx: &mut ModelContext<Self>,
2810 ) {
2811 let worktree_id = worktree.read(cx).id();
2812 let fallback_path = worktree.read(cx).abs_path();
2813
2814 let mut stops = Vec::new();
2815 for adapter in language.lsp_adapters() {
2816 stops.push(self.stop_language_server(worktree_id, adapter.name.clone(), cx));
2817 }
2818
2819 if stops.is_empty() {
2820 return;
2821 }
2822 let mut stops = stops.into_iter();
2823
2824 cx.spawn_weak(|this, mut cx| async move {
2825 let (original_root_path, mut orphaned_worktrees) = stops.next().unwrap().await;
2826 for stop in stops {
2827 let (_, worktrees) = stop.await;
2828 orphaned_worktrees.extend_from_slice(&worktrees);
2829 }
2830
2831 let this = match this.upgrade(&cx) {
2832 Some(this) => this,
2833 None => return,
2834 };
2835
2836 this.update(&mut cx, |this, cx| {
2837 // Attempt to restart using original server path. Fallback to passed in
2838 // path if we could not retrieve the root path
2839 let root_path = original_root_path
2840 .map(|path_buf| Arc::from(path_buf.as_path()))
2841 .unwrap_or(fallback_path);
2842
2843 this.start_language_servers(&worktree, root_path, language.clone(), cx);
2844
2845 // Lookup new server ids and set them for each of the orphaned worktrees
2846 for adapter in language.lsp_adapters() {
2847 if let Some(new_server_id) = this
2848 .language_server_ids
2849 .get(&(worktree_id, adapter.name.clone()))
2850 .cloned()
2851 {
2852 for &orphaned_worktree in &orphaned_worktrees {
2853 this.language_server_ids
2854 .insert((orphaned_worktree, adapter.name.clone()), new_server_id);
2855 }
2856 }
2857 }
2858 });
2859 })
2860 .detach();
2861 }
2862
2863 fn on_lsp_progress(
2864 &mut self,
2865 progress: lsp::ProgressParams,
2866 language_server_id: LanguageServerId,
2867 disk_based_diagnostics_progress_token: Option<String>,
2868 cx: &mut ModelContext<Self>,
2869 ) {
2870 let token = match progress.token {
2871 lsp::NumberOrString::String(token) => token,
2872 lsp::NumberOrString::Number(token) => {
2873 log::info!("skipping numeric progress token {}", token);
2874 return;
2875 }
2876 };
2877 let lsp::ProgressParamsValue::WorkDone(progress) = progress.value;
2878 let language_server_status =
2879 if let Some(status) = self.language_server_statuses.get_mut(&language_server_id) {
2880 status
2881 } else {
2882 return;
2883 };
2884
2885 if !language_server_status.progress_tokens.contains(&token) {
2886 return;
2887 }
2888
2889 let is_disk_based_diagnostics_progress = disk_based_diagnostics_progress_token
2890 .as_ref()
2891 .map_or(false, |disk_based_token| {
2892 token.starts_with(disk_based_token)
2893 });
2894
2895 match progress {
2896 lsp::WorkDoneProgress::Begin(report) => {
2897 if is_disk_based_diagnostics_progress {
2898 language_server_status.has_pending_diagnostic_updates = true;
2899 self.disk_based_diagnostics_started(language_server_id, cx);
2900 self.buffer_ordered_messages_tx
2901 .unbounded_send(BufferOrderedMessage::LanguageServerUpdate {
2902 language_server_id,
2903 message: proto::update_language_server::Variant::DiskBasedDiagnosticsUpdating(Default::default())
2904 })
2905 .ok();
2906 } else {
2907 self.on_lsp_work_start(
2908 language_server_id,
2909 token.clone(),
2910 LanguageServerProgress {
2911 message: report.message.clone(),
2912 percentage: report.percentage.map(|p| p as usize),
2913 last_update_at: Instant::now(),
2914 },
2915 cx,
2916 );
2917 self.buffer_ordered_messages_tx
2918 .unbounded_send(BufferOrderedMessage::LanguageServerUpdate {
2919 language_server_id,
2920 message: proto::update_language_server::Variant::WorkStart(
2921 proto::LspWorkStart {
2922 token,
2923 message: report.message,
2924 percentage: report.percentage.map(|p| p as u32),
2925 },
2926 ),
2927 })
2928 .ok();
2929 }
2930 }
2931 lsp::WorkDoneProgress::Report(report) => {
2932 if !is_disk_based_diagnostics_progress {
2933 self.on_lsp_work_progress(
2934 language_server_id,
2935 token.clone(),
2936 LanguageServerProgress {
2937 message: report.message.clone(),
2938 percentage: report.percentage.map(|p| p as usize),
2939 last_update_at: Instant::now(),
2940 },
2941 cx,
2942 );
2943 self.buffer_ordered_messages_tx
2944 .unbounded_send(BufferOrderedMessage::LanguageServerUpdate {
2945 language_server_id,
2946 message: proto::update_language_server::Variant::WorkProgress(
2947 proto::LspWorkProgress {
2948 token,
2949 message: report.message,
2950 percentage: report.percentage.map(|p| p as u32),
2951 },
2952 ),
2953 })
2954 .ok();
2955 }
2956 }
2957 lsp::WorkDoneProgress::End(_) => {
2958 language_server_status.progress_tokens.remove(&token);
2959
2960 if is_disk_based_diagnostics_progress {
2961 language_server_status.has_pending_diagnostic_updates = false;
2962 self.disk_based_diagnostics_finished(language_server_id, cx);
2963 self.buffer_ordered_messages_tx
2964 .unbounded_send(BufferOrderedMessage::LanguageServerUpdate {
2965 language_server_id,
2966 message:
2967 proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
2968 Default::default(),
2969 ),
2970 })
2971 .ok();
2972 } else {
2973 self.on_lsp_work_end(language_server_id, token.clone(), cx);
2974 self.buffer_ordered_messages_tx
2975 .unbounded_send(BufferOrderedMessage::LanguageServerUpdate {
2976 language_server_id,
2977 message: proto::update_language_server::Variant::WorkEnd(
2978 proto::LspWorkEnd { token },
2979 ),
2980 })
2981 .ok();
2982 }
2983 }
2984 }
2985 }
2986
2987 fn on_lsp_work_start(
2988 &mut self,
2989 language_server_id: LanguageServerId,
2990 token: String,
2991 progress: LanguageServerProgress,
2992 cx: &mut ModelContext<Self>,
2993 ) {
2994 if let Some(status) = self.language_server_statuses.get_mut(&language_server_id) {
2995 status.pending_work.insert(token, progress);
2996 cx.notify();
2997 }
2998 }
2999
3000 fn on_lsp_work_progress(
3001 &mut self,
3002 language_server_id: LanguageServerId,
3003 token: String,
3004 progress: LanguageServerProgress,
3005 cx: &mut ModelContext<Self>,
3006 ) {
3007 if let Some(status) = self.language_server_statuses.get_mut(&language_server_id) {
3008 let entry = status
3009 .pending_work
3010 .entry(token)
3011 .or_insert(LanguageServerProgress {
3012 message: Default::default(),
3013 percentage: Default::default(),
3014 last_update_at: progress.last_update_at,
3015 });
3016 if progress.message.is_some() {
3017 entry.message = progress.message;
3018 }
3019 if progress.percentage.is_some() {
3020 entry.percentage = progress.percentage;
3021 }
3022 entry.last_update_at = progress.last_update_at;
3023 cx.notify();
3024 }
3025 }
3026
3027 fn on_lsp_work_end(
3028 &mut self,
3029 language_server_id: LanguageServerId,
3030 token: String,
3031 cx: &mut ModelContext<Self>,
3032 ) {
3033 if let Some(status) = self.language_server_statuses.get_mut(&language_server_id) {
3034 status.pending_work.remove(&token);
3035 cx.notify();
3036 }
3037 }
3038
3039 fn on_lsp_did_change_watched_files(
3040 &mut self,
3041 language_server_id: LanguageServerId,
3042 params: DidChangeWatchedFilesRegistrationOptions,
3043 cx: &mut ModelContext<Self>,
3044 ) {
3045 if let Some(LanguageServerState::Running { watched_paths, .. }) =
3046 self.language_servers.get_mut(&language_server_id)
3047 {
3048 let mut builders = HashMap::default();
3049 for watcher in params.watchers {
3050 for worktree in &self.worktrees {
3051 if let Some(worktree) = worktree.upgrade(cx) {
3052 let worktree = worktree.read(cx);
3053 if let Some(abs_path) = worktree.abs_path().to_str() {
3054 if let Some(suffix) = match &watcher.glob_pattern {
3055 lsp::GlobPattern::String(s) => s,
3056 lsp::GlobPattern::Relative(rp) => &rp.pattern,
3057 }
3058 .strip_prefix(abs_path)
3059 .and_then(|s| s.strip_prefix(std::path::MAIN_SEPARATOR))
3060 {
3061 if let Some(glob) = Glob::new(suffix).log_err() {
3062 builders
3063 .entry(worktree.id())
3064 .or_insert_with(|| GlobSetBuilder::new())
3065 .add(glob);
3066 }
3067 break;
3068 }
3069 }
3070 }
3071 }
3072 }
3073
3074 watched_paths.clear();
3075 for (worktree_id, builder) in builders {
3076 if let Ok(globset) = builder.build() {
3077 watched_paths.insert(worktree_id, globset);
3078 }
3079 }
3080
3081 cx.notify();
3082 }
3083 }
3084
3085 async fn on_lsp_workspace_edit(
3086 this: WeakModelHandle<Self>,
3087 params: lsp::ApplyWorkspaceEditParams,
3088 server_id: LanguageServerId,
3089 adapter: Arc<CachedLspAdapter>,
3090 language_server: Arc<LanguageServer>,
3091 mut cx: AsyncAppContext,
3092 ) -> Result<lsp::ApplyWorkspaceEditResponse> {
3093 let this = this
3094 .upgrade(&cx)
3095 .ok_or_else(|| anyhow!("project project closed"))?;
3096 let transaction = Self::deserialize_workspace_edit(
3097 this.clone(),
3098 params.edit,
3099 true,
3100 adapter.clone(),
3101 language_server.clone(),
3102 &mut cx,
3103 )
3104 .await
3105 .log_err();
3106 this.update(&mut cx, |this, _| {
3107 if let Some(transaction) = transaction {
3108 this.last_workspace_edits_by_language_server
3109 .insert(server_id, transaction);
3110 }
3111 });
3112 Ok(lsp::ApplyWorkspaceEditResponse {
3113 applied: true,
3114 failed_change: None,
3115 failure_reason: None,
3116 })
3117 }
3118
3119 pub fn language_server_statuses(
3120 &self,
3121 ) -> impl DoubleEndedIterator<Item = &LanguageServerStatus> {
3122 self.language_server_statuses.values()
3123 }
3124
3125 pub fn update_diagnostics(
3126 &mut self,
3127 language_server_id: LanguageServerId,
3128 mut params: lsp::PublishDiagnosticsParams,
3129 disk_based_sources: &[String],
3130 cx: &mut ModelContext<Self>,
3131 ) -> Result<()> {
3132 let abs_path = params
3133 .uri
3134 .to_file_path()
3135 .map_err(|_| anyhow!("URI is not a file"))?;
3136 let mut diagnostics = Vec::default();
3137 let mut primary_diagnostic_group_ids = HashMap::default();
3138 let mut sources_by_group_id = HashMap::default();
3139 let mut supporting_diagnostics = HashMap::default();
3140
3141 // Ensure that primary diagnostics are always the most severe
3142 params.diagnostics.sort_by_key(|item| item.severity);
3143
3144 for diagnostic in ¶ms.diagnostics {
3145 let source = diagnostic.source.as_ref();
3146 let code = diagnostic.code.as_ref().map(|code| match code {
3147 lsp::NumberOrString::Number(code) => code.to_string(),
3148 lsp::NumberOrString::String(code) => code.clone(),
3149 });
3150 let range = range_from_lsp(diagnostic.range);
3151 let is_supporting = diagnostic
3152 .related_information
3153 .as_ref()
3154 .map_or(false, |infos| {
3155 infos.iter().any(|info| {
3156 primary_diagnostic_group_ids.contains_key(&(
3157 source,
3158 code.clone(),
3159 range_from_lsp(info.location.range),
3160 ))
3161 })
3162 });
3163
3164 let is_unnecessary = diagnostic.tags.as_ref().map_or(false, |tags| {
3165 tags.iter().any(|tag| *tag == DiagnosticTag::UNNECESSARY)
3166 });
3167
3168 if is_supporting {
3169 supporting_diagnostics.insert(
3170 (source, code.clone(), range),
3171 (diagnostic.severity, is_unnecessary),
3172 );
3173 } else {
3174 let group_id = post_inc(&mut self.next_diagnostic_group_id);
3175 let is_disk_based =
3176 source.map_or(false, |source| disk_based_sources.contains(source));
3177
3178 sources_by_group_id.insert(group_id, source);
3179 primary_diagnostic_group_ids
3180 .insert((source, code.clone(), range.clone()), group_id);
3181
3182 diagnostics.push(DiagnosticEntry {
3183 range,
3184 diagnostic: Diagnostic {
3185 source: diagnostic.source.clone(),
3186 code: code.clone(),
3187 severity: diagnostic.severity.unwrap_or(DiagnosticSeverity::ERROR),
3188 message: diagnostic.message.clone(),
3189 group_id,
3190 is_primary: true,
3191 is_valid: true,
3192 is_disk_based,
3193 is_unnecessary,
3194 },
3195 });
3196 if let Some(infos) = &diagnostic.related_information {
3197 for info in infos {
3198 if info.location.uri == params.uri && !info.message.is_empty() {
3199 let range = range_from_lsp(info.location.range);
3200 diagnostics.push(DiagnosticEntry {
3201 range,
3202 diagnostic: Diagnostic {
3203 source: diagnostic.source.clone(),
3204 code: code.clone(),
3205 severity: DiagnosticSeverity::INFORMATION,
3206 message: info.message.clone(),
3207 group_id,
3208 is_primary: false,
3209 is_valid: true,
3210 is_disk_based,
3211 is_unnecessary: false,
3212 },
3213 });
3214 }
3215 }
3216 }
3217 }
3218 }
3219
3220 for entry in &mut diagnostics {
3221 let diagnostic = &mut entry.diagnostic;
3222 if !diagnostic.is_primary {
3223 let source = *sources_by_group_id.get(&diagnostic.group_id).unwrap();
3224 if let Some(&(severity, is_unnecessary)) = supporting_diagnostics.get(&(
3225 source,
3226 diagnostic.code.clone(),
3227 entry.range.clone(),
3228 )) {
3229 if let Some(severity) = severity {
3230 diagnostic.severity = severity;
3231 }
3232 diagnostic.is_unnecessary = is_unnecessary;
3233 }
3234 }
3235 }
3236
3237 self.update_diagnostic_entries(
3238 language_server_id,
3239 abs_path,
3240 params.version,
3241 diagnostics,
3242 cx,
3243 )?;
3244 Ok(())
3245 }
3246
3247 pub fn update_diagnostic_entries(
3248 &mut self,
3249 server_id: LanguageServerId,
3250 abs_path: PathBuf,
3251 version: Option<i32>,
3252 diagnostics: Vec<DiagnosticEntry<Unclipped<PointUtf16>>>,
3253 cx: &mut ModelContext<Project>,
3254 ) -> Result<(), anyhow::Error> {
3255 let (worktree, relative_path) = self
3256 .find_local_worktree(&abs_path, cx)
3257 .ok_or_else(|| anyhow!("no worktree found for diagnostics path {abs_path:?}"))?;
3258
3259 let project_path = ProjectPath {
3260 worktree_id: worktree.read(cx).id(),
3261 path: relative_path.into(),
3262 };
3263
3264 if let Some(buffer) = self.get_open_buffer(&project_path, cx) {
3265 self.update_buffer_diagnostics(&buffer, server_id, version, diagnostics.clone(), cx)?;
3266 }
3267
3268 let updated = worktree.update(cx, |worktree, cx| {
3269 worktree
3270 .as_local_mut()
3271 .ok_or_else(|| anyhow!("not a local worktree"))?
3272 .update_diagnostics(server_id, project_path.path.clone(), diagnostics, cx)
3273 })?;
3274 if updated {
3275 cx.emit(Event::DiagnosticsUpdated {
3276 language_server_id: server_id,
3277 path: project_path,
3278 });
3279 }
3280 Ok(())
3281 }
3282
3283 fn update_buffer_diagnostics(
3284 &mut self,
3285 buffer: &ModelHandle<Buffer>,
3286 server_id: LanguageServerId,
3287 version: Option<i32>,
3288 mut diagnostics: Vec<DiagnosticEntry<Unclipped<PointUtf16>>>,
3289 cx: &mut ModelContext<Self>,
3290 ) -> Result<()> {
3291 fn compare_diagnostics(a: &Diagnostic, b: &Diagnostic) -> Ordering {
3292 Ordering::Equal
3293 .then_with(|| b.is_primary.cmp(&a.is_primary))
3294 .then_with(|| a.is_disk_based.cmp(&b.is_disk_based))
3295 .then_with(|| a.severity.cmp(&b.severity))
3296 .then_with(|| a.message.cmp(&b.message))
3297 }
3298
3299 let snapshot = self.buffer_snapshot_for_lsp_version(buffer, server_id, version, cx)?;
3300
3301 diagnostics.sort_unstable_by(|a, b| {
3302 Ordering::Equal
3303 .then_with(|| a.range.start.cmp(&b.range.start))
3304 .then_with(|| b.range.end.cmp(&a.range.end))
3305 .then_with(|| compare_diagnostics(&a.diagnostic, &b.diagnostic))
3306 });
3307
3308 let mut sanitized_diagnostics = Vec::new();
3309 let edits_since_save = Patch::new(
3310 snapshot
3311 .edits_since::<Unclipped<PointUtf16>>(buffer.read(cx).saved_version())
3312 .collect(),
3313 );
3314 for entry in diagnostics {
3315 let start;
3316 let end;
3317 if entry.diagnostic.is_disk_based {
3318 // Some diagnostics are based on files on disk instead of buffers'
3319 // current contents. Adjust these diagnostics' ranges to reflect
3320 // any unsaved edits.
3321 start = edits_since_save.old_to_new(entry.range.start);
3322 end = edits_since_save.old_to_new(entry.range.end);
3323 } else {
3324 start = entry.range.start;
3325 end = entry.range.end;
3326 }
3327
3328 let mut range = snapshot.clip_point_utf16(start, Bias::Left)
3329 ..snapshot.clip_point_utf16(end, Bias::Right);
3330
3331 // Expand empty ranges by one codepoint
3332 if range.start == range.end {
3333 // This will be go to the next boundary when being clipped
3334 range.end.column += 1;
3335 range.end = snapshot.clip_point_utf16(Unclipped(range.end), Bias::Right);
3336 if range.start == range.end && range.end.column > 0 {
3337 range.start.column -= 1;
3338 range.end = snapshot.clip_point_utf16(Unclipped(range.end), Bias::Left);
3339 }
3340 }
3341
3342 sanitized_diagnostics.push(DiagnosticEntry {
3343 range,
3344 diagnostic: entry.diagnostic,
3345 });
3346 }
3347 drop(edits_since_save);
3348
3349 let set = DiagnosticSet::new(sanitized_diagnostics, &snapshot);
3350 buffer.update(cx, |buffer, cx| {
3351 buffer.update_diagnostics(server_id, set, cx)
3352 });
3353 Ok(())
3354 }
3355
3356 pub fn reload_buffers(
3357 &self,
3358 buffers: HashSet<ModelHandle<Buffer>>,
3359 push_to_history: bool,
3360 cx: &mut ModelContext<Self>,
3361 ) -> Task<Result<ProjectTransaction>> {
3362 let mut local_buffers = Vec::new();
3363 let mut remote_buffers = None;
3364 for buffer_handle in buffers {
3365 let buffer = buffer_handle.read(cx);
3366 if buffer.is_dirty() {
3367 if let Some(file) = File::from_dyn(buffer.file()) {
3368 if file.is_local() {
3369 local_buffers.push(buffer_handle);
3370 } else {
3371 remote_buffers.get_or_insert(Vec::new()).push(buffer_handle);
3372 }
3373 }
3374 }
3375 }
3376
3377 let remote_buffers = self.remote_id().zip(remote_buffers);
3378 let client = self.client.clone();
3379
3380 cx.spawn(|this, mut cx| async move {
3381 let mut project_transaction = ProjectTransaction::default();
3382
3383 if let Some((project_id, remote_buffers)) = remote_buffers {
3384 let response = client
3385 .request(proto::ReloadBuffers {
3386 project_id,
3387 buffer_ids: remote_buffers
3388 .iter()
3389 .map(|buffer| buffer.read_with(&cx, |buffer, _| buffer.remote_id()))
3390 .collect(),
3391 })
3392 .await?
3393 .transaction
3394 .ok_or_else(|| anyhow!("missing transaction"))?;
3395 project_transaction = this
3396 .update(&mut cx, |this, cx| {
3397 this.deserialize_project_transaction(response, push_to_history, cx)
3398 })
3399 .await?;
3400 }
3401
3402 for buffer in local_buffers {
3403 let transaction = buffer
3404 .update(&mut cx, |buffer, cx| buffer.reload(cx))
3405 .await?;
3406 buffer.update(&mut cx, |buffer, cx| {
3407 if let Some(transaction) = transaction {
3408 if !push_to_history {
3409 buffer.forget_transaction(transaction.id);
3410 }
3411 project_transaction.0.insert(cx.handle(), transaction);
3412 }
3413 });
3414 }
3415
3416 Ok(project_transaction)
3417 })
3418 }
3419
3420 pub fn format(
3421 &self,
3422 buffers: HashSet<ModelHandle<Buffer>>,
3423 push_to_history: bool,
3424 trigger: FormatTrigger,
3425 cx: &mut ModelContext<Project>,
3426 ) -> Task<Result<ProjectTransaction>> {
3427 if self.is_local() {
3428 let mut buffers_with_paths_and_servers = buffers
3429 .into_iter()
3430 .filter_map(|buffer_handle| {
3431 let buffer = buffer_handle.read(cx);
3432 let file = File::from_dyn(buffer.file())?;
3433 let buffer_abs_path = file.as_local().map(|f| f.abs_path(cx));
3434 let server = self
3435 .primary_language_servers_for_buffer(buffer, cx)
3436 .map(|s| s.1.clone());
3437 Some((buffer_handle, buffer_abs_path, server))
3438 })
3439 .collect::<Vec<_>>();
3440
3441 cx.spawn(|this, mut cx| async move {
3442 // Do not allow multiple concurrent formatting requests for the
3443 // same buffer.
3444 this.update(&mut cx, |this, cx| {
3445 buffers_with_paths_and_servers.retain(|(buffer, _, _)| {
3446 this.buffers_being_formatted
3447 .insert(buffer.read(cx).remote_id())
3448 });
3449 });
3450
3451 let _cleanup = defer({
3452 let this = this.clone();
3453 let mut cx = cx.clone();
3454 let buffers = &buffers_with_paths_and_servers;
3455 move || {
3456 this.update(&mut cx, |this, cx| {
3457 for (buffer, _, _) in buffers {
3458 this.buffers_being_formatted
3459 .remove(&buffer.read(cx).remote_id());
3460 }
3461 });
3462 }
3463 });
3464
3465 let mut project_transaction = ProjectTransaction::default();
3466 for (buffer, buffer_abs_path, language_server) in &buffers_with_paths_and_servers {
3467 let settings = buffer.read_with(&cx, |buffer, cx| {
3468 language_settings(buffer.language(), buffer.file(), cx).clone()
3469 });
3470
3471 let remove_trailing_whitespace = settings.remove_trailing_whitespace_on_save;
3472 let ensure_final_newline = settings.ensure_final_newline_on_save;
3473 let format_on_save = settings.format_on_save.clone();
3474 let formatter = settings.formatter.clone();
3475 let tab_size = settings.tab_size;
3476
3477 // First, format buffer's whitespace according to the settings.
3478 let trailing_whitespace_diff = if remove_trailing_whitespace {
3479 Some(
3480 buffer
3481 .read_with(&cx, |b, cx| b.remove_trailing_whitespace(cx))
3482 .await,
3483 )
3484 } else {
3485 None
3486 };
3487 let whitespace_transaction_id = buffer.update(&mut cx, |buffer, cx| {
3488 buffer.finalize_last_transaction();
3489 buffer.start_transaction();
3490 if let Some(diff) = trailing_whitespace_diff {
3491 buffer.apply_diff(diff, cx);
3492 }
3493 if ensure_final_newline {
3494 buffer.ensure_final_newline(cx);
3495 }
3496 buffer.end_transaction(cx)
3497 });
3498
3499 // Currently, formatting operations are represented differently depending on
3500 // whether they come from a language server or an external command.
3501 enum FormatOperation {
3502 Lsp(Vec<(Range<Anchor>, String)>),
3503 External(Diff),
3504 }
3505
3506 // Apply language-specific formatting using either a language server
3507 // or external command.
3508 let mut format_operation = None;
3509 match (formatter, format_on_save) {
3510 (_, FormatOnSave::Off) if trigger == FormatTrigger::Save => {}
3511
3512 (Formatter::LanguageServer, FormatOnSave::On | FormatOnSave::Off)
3513 | (_, FormatOnSave::LanguageServer) => {
3514 if let Some((language_server, buffer_abs_path)) =
3515 language_server.as_ref().zip(buffer_abs_path.as_ref())
3516 {
3517 format_operation = Some(FormatOperation::Lsp(
3518 Self::format_via_lsp(
3519 &this,
3520 &buffer,
3521 buffer_abs_path,
3522 &language_server,
3523 tab_size,
3524 &mut cx,
3525 )
3526 .await
3527 .context("failed to format via language server")?,
3528 ));
3529 }
3530 }
3531
3532 (
3533 Formatter::External { command, arguments },
3534 FormatOnSave::On | FormatOnSave::Off,
3535 )
3536 | (_, FormatOnSave::External { command, arguments }) => {
3537 if let Some(buffer_abs_path) = buffer_abs_path {
3538 format_operation = Self::format_via_external_command(
3539 &buffer,
3540 &buffer_abs_path,
3541 &command,
3542 &arguments,
3543 &mut cx,
3544 )
3545 .await
3546 .context(format!(
3547 "failed to format via external command {:?}",
3548 command
3549 ))?
3550 .map(FormatOperation::External);
3551 }
3552 }
3553 };
3554
3555 buffer.update(&mut cx, |b, cx| {
3556 // If the buffer had its whitespace formatted and was edited while the language-specific
3557 // formatting was being computed, avoid applying the language-specific formatting, because
3558 // it can't be grouped with the whitespace formatting in the undo history.
3559 if let Some(transaction_id) = whitespace_transaction_id {
3560 if b.peek_undo_stack()
3561 .map_or(true, |e| e.transaction_id() != transaction_id)
3562 {
3563 format_operation.take();
3564 }
3565 }
3566
3567 // Apply any language-specific formatting, and group the two formatting operations
3568 // in the buffer's undo history.
3569 if let Some(operation) = format_operation {
3570 match operation {
3571 FormatOperation::Lsp(edits) => {
3572 b.edit(edits, None, cx);
3573 }
3574 FormatOperation::External(diff) => {
3575 b.apply_diff(diff, cx);
3576 }
3577 }
3578
3579 if let Some(transaction_id) = whitespace_transaction_id {
3580 b.group_until_transaction(transaction_id);
3581 }
3582 }
3583
3584 if let Some(transaction) = b.finalize_last_transaction().cloned() {
3585 if !push_to_history {
3586 b.forget_transaction(transaction.id);
3587 }
3588 project_transaction.0.insert(buffer.clone(), transaction);
3589 }
3590 });
3591 }
3592
3593 Ok(project_transaction)
3594 })
3595 } else {
3596 let remote_id = self.remote_id();
3597 let client = self.client.clone();
3598 cx.spawn(|this, mut cx| async move {
3599 let mut project_transaction = ProjectTransaction::default();
3600 if let Some(project_id) = remote_id {
3601 let response = client
3602 .request(proto::FormatBuffers {
3603 project_id,
3604 trigger: trigger as i32,
3605 buffer_ids: buffers
3606 .iter()
3607 .map(|buffer| buffer.read_with(&cx, |buffer, _| buffer.remote_id()))
3608 .collect(),
3609 })
3610 .await?
3611 .transaction
3612 .ok_or_else(|| anyhow!("missing transaction"))?;
3613 project_transaction = this
3614 .update(&mut cx, |this, cx| {
3615 this.deserialize_project_transaction(response, push_to_history, cx)
3616 })
3617 .await?;
3618 }
3619 Ok(project_transaction)
3620 })
3621 }
3622 }
3623
3624 async fn format_via_lsp(
3625 this: &ModelHandle<Self>,
3626 buffer: &ModelHandle<Buffer>,
3627 abs_path: &Path,
3628 language_server: &Arc<LanguageServer>,
3629 tab_size: NonZeroU32,
3630 cx: &mut AsyncAppContext,
3631 ) -> Result<Vec<(Range<Anchor>, String)>> {
3632 let text_document =
3633 lsp::TextDocumentIdentifier::new(lsp::Url::from_file_path(abs_path).unwrap());
3634 let capabilities = &language_server.capabilities();
3635 let lsp_edits = if capabilities
3636 .document_formatting_provider
3637 .as_ref()
3638 .map_or(false, |provider| *provider != lsp::OneOf::Left(false))
3639 {
3640 language_server
3641 .request::<lsp::request::Formatting>(lsp::DocumentFormattingParams {
3642 text_document,
3643 options: lsp_command::lsp_formatting_options(tab_size.get()),
3644 work_done_progress_params: Default::default(),
3645 })
3646 .await?
3647 } else if capabilities
3648 .document_range_formatting_provider
3649 .as_ref()
3650 .map_or(false, |provider| *provider != lsp::OneOf::Left(false))
3651 {
3652 let buffer_start = lsp::Position::new(0, 0);
3653 let buffer_end =
3654 buffer.read_with(cx, |buffer, _| point_to_lsp(buffer.max_point_utf16()));
3655 language_server
3656 .request::<lsp::request::RangeFormatting>(lsp::DocumentRangeFormattingParams {
3657 text_document,
3658 range: lsp::Range::new(buffer_start, buffer_end),
3659 options: lsp_command::lsp_formatting_options(tab_size.get()),
3660 work_done_progress_params: Default::default(),
3661 })
3662 .await?
3663 } else {
3664 None
3665 };
3666
3667 if let Some(lsp_edits) = lsp_edits {
3668 this.update(cx, |this, cx| {
3669 this.edits_from_lsp(buffer, lsp_edits, language_server.server_id(), None, cx)
3670 })
3671 .await
3672 } else {
3673 Ok(Default::default())
3674 }
3675 }
3676
3677 async fn format_via_external_command(
3678 buffer: &ModelHandle<Buffer>,
3679 buffer_abs_path: &Path,
3680 command: &str,
3681 arguments: &[String],
3682 cx: &mut AsyncAppContext,
3683 ) -> Result<Option<Diff>> {
3684 let working_dir_path = buffer.read_with(cx, |buffer, cx| {
3685 let file = File::from_dyn(buffer.file())?;
3686 let worktree = file.worktree.read(cx).as_local()?;
3687 let mut worktree_path = worktree.abs_path().to_path_buf();
3688 if worktree.root_entry()?.is_file() {
3689 worktree_path.pop();
3690 }
3691 Some(worktree_path)
3692 });
3693
3694 if let Some(working_dir_path) = working_dir_path {
3695 let mut child =
3696 smol::process::Command::new(command)
3697 .args(arguments.iter().map(|arg| {
3698 arg.replace("{buffer_path}", &buffer_abs_path.to_string_lossy())
3699 }))
3700 .current_dir(&working_dir_path)
3701 .stdin(smol::process::Stdio::piped())
3702 .stdout(smol::process::Stdio::piped())
3703 .stderr(smol::process::Stdio::piped())
3704 .spawn()?;
3705 let stdin = child
3706 .stdin
3707 .as_mut()
3708 .ok_or_else(|| anyhow!("failed to acquire stdin"))?;
3709 let text = buffer.read_with(cx, |buffer, _| buffer.as_rope().clone());
3710 for chunk in text.chunks() {
3711 stdin.write_all(chunk.as_bytes()).await?;
3712 }
3713 stdin.flush().await?;
3714
3715 let output = child.output().await?;
3716 if !output.status.success() {
3717 return Err(anyhow!(
3718 "command failed with exit code {:?}:\nstdout: {}\nstderr: {}",
3719 output.status.code(),
3720 String::from_utf8_lossy(&output.stdout),
3721 String::from_utf8_lossy(&output.stderr),
3722 ));
3723 }
3724
3725 let stdout = String::from_utf8(output.stdout)?;
3726 Ok(Some(
3727 buffer
3728 .read_with(cx, |buffer, cx| buffer.diff(stdout, cx))
3729 .await,
3730 ))
3731 } else {
3732 Ok(None)
3733 }
3734 }
3735
3736 pub fn definition<T: ToPointUtf16>(
3737 &self,
3738 buffer: &ModelHandle<Buffer>,
3739 position: T,
3740 cx: &mut ModelContext<Self>,
3741 ) -> Task<Result<Vec<LocationLink>>> {
3742 let position = position.to_point_utf16(buffer.read(cx));
3743 self.request_lsp(buffer.clone(), GetDefinition { position }, cx)
3744 }
3745
3746 pub fn type_definition<T: ToPointUtf16>(
3747 &self,
3748 buffer: &ModelHandle<Buffer>,
3749 position: T,
3750 cx: &mut ModelContext<Self>,
3751 ) -> Task<Result<Vec<LocationLink>>> {
3752 let position = position.to_point_utf16(buffer.read(cx));
3753 self.request_lsp(buffer.clone(), GetTypeDefinition { position }, cx)
3754 }
3755
3756 pub fn references<T: ToPointUtf16>(
3757 &self,
3758 buffer: &ModelHandle<Buffer>,
3759 position: T,
3760 cx: &mut ModelContext<Self>,
3761 ) -> Task<Result<Vec<Location>>> {
3762 let position = position.to_point_utf16(buffer.read(cx));
3763 self.request_lsp(buffer.clone(), GetReferences { position }, cx)
3764 }
3765
3766 pub fn document_highlights<T: ToPointUtf16>(
3767 &self,
3768 buffer: &ModelHandle<Buffer>,
3769 position: T,
3770 cx: &mut ModelContext<Self>,
3771 ) -> Task<Result<Vec<DocumentHighlight>>> {
3772 let position = position.to_point_utf16(buffer.read(cx));
3773 self.request_lsp(buffer.clone(), GetDocumentHighlights { position }, cx)
3774 }
3775
3776 pub fn symbols(&self, query: &str, cx: &mut ModelContext<Self>) -> Task<Result<Vec<Symbol>>> {
3777 if self.is_local() {
3778 let mut requests = Vec::new();
3779 for ((worktree_id, _), server_id) in self.language_server_ids.iter() {
3780 let worktree_id = *worktree_id;
3781 if let Some(worktree) = self
3782 .worktree_for_id(worktree_id, cx)
3783 .and_then(|worktree| worktree.read(cx).as_local())
3784 {
3785 if let Some(LanguageServerState::Running {
3786 adapter,
3787 language,
3788 server,
3789 ..
3790 }) = self.language_servers.get(server_id)
3791 {
3792 let adapter = adapter.clone();
3793 let language = language.clone();
3794 let worktree_abs_path = worktree.abs_path().clone();
3795 requests.push(
3796 server
3797 .request::<lsp::request::WorkspaceSymbolRequest>(
3798 lsp::WorkspaceSymbolParams {
3799 query: query.to_string(),
3800 ..Default::default()
3801 },
3802 )
3803 .log_err()
3804 .map(move |response| {
3805 let lsp_symbols = response.flatten().map(|symbol_response| match symbol_response {
3806 lsp::WorkspaceSymbolResponse::Flat(flat_responses) => {
3807 flat_responses.into_iter().map(|lsp_symbol| {
3808 (lsp_symbol.name, lsp_symbol.kind, lsp_symbol.location)
3809 }).collect::<Vec<_>>()
3810 }
3811 lsp::WorkspaceSymbolResponse::Nested(nested_responses) => {
3812 nested_responses.into_iter().filter_map(|lsp_symbol| {
3813 let location = match lsp_symbol.location {
3814 lsp::OneOf::Left(location) => location,
3815 lsp::OneOf::Right(_) => {
3816 error!("Unexpected: client capabilities forbid symbol resolutions in workspace.symbol.resolveSupport");
3817 return None
3818 }
3819 };
3820 Some((lsp_symbol.name, lsp_symbol.kind, location))
3821 }).collect::<Vec<_>>()
3822 }
3823 }).unwrap_or_default();
3824
3825 (
3826 adapter,
3827 language,
3828 worktree_id,
3829 worktree_abs_path,
3830 lsp_symbols,
3831 )
3832 }),
3833 );
3834 }
3835 }
3836 }
3837
3838 cx.spawn_weak(|this, cx| async move {
3839 let responses = futures::future::join_all(requests).await;
3840 let this = if let Some(this) = this.upgrade(&cx) {
3841 this
3842 } else {
3843 return Ok(Default::default());
3844 };
3845 let symbols = this.read_with(&cx, |this, cx| {
3846 let mut symbols = Vec::new();
3847 for (
3848 adapter,
3849 adapter_language,
3850 source_worktree_id,
3851 worktree_abs_path,
3852 lsp_symbols,
3853 ) in responses
3854 {
3855 symbols.extend(lsp_symbols.into_iter().filter_map(
3856 |(symbol_name, symbol_kind, symbol_location)| {
3857 let abs_path = symbol_location.uri.to_file_path().ok()?;
3858 let mut worktree_id = source_worktree_id;
3859 let path;
3860 if let Some((worktree, rel_path)) =
3861 this.find_local_worktree(&abs_path, cx)
3862 {
3863 worktree_id = worktree.read(cx).id();
3864 path = rel_path;
3865 } else {
3866 path = relativize_path(&worktree_abs_path, &abs_path);
3867 }
3868
3869 let project_path = ProjectPath {
3870 worktree_id,
3871 path: path.into(),
3872 };
3873 let signature = this.symbol_signature(&project_path);
3874 let adapter_language = adapter_language.clone();
3875 let language = this
3876 .languages
3877 .language_for_file(&project_path.path, None)
3878 .unwrap_or_else(move |_| adapter_language);
3879 let language_server_name = adapter.name.clone();
3880 Some(async move {
3881 let language = language.await;
3882 let label =
3883 language.label_for_symbol(&symbol_name, symbol_kind).await;
3884
3885 Symbol {
3886 language_server_name,
3887 source_worktree_id,
3888 path: project_path,
3889 label: label.unwrap_or_else(|| {
3890 CodeLabel::plain(symbol_name.clone(), None)
3891 }),
3892 kind: symbol_kind,
3893 name: symbol_name,
3894 range: range_from_lsp(symbol_location.range),
3895 signature,
3896 }
3897 })
3898 },
3899 ));
3900 }
3901 symbols
3902 });
3903 Ok(futures::future::join_all(symbols).await)
3904 })
3905 } else if let Some(project_id) = self.remote_id() {
3906 let request = self.client.request(proto::GetProjectSymbols {
3907 project_id,
3908 query: query.to_string(),
3909 });
3910 cx.spawn_weak(|this, cx| async move {
3911 let response = request.await?;
3912 let mut symbols = Vec::new();
3913 if let Some(this) = this.upgrade(&cx) {
3914 let new_symbols = this.read_with(&cx, |this, _| {
3915 response
3916 .symbols
3917 .into_iter()
3918 .map(|symbol| this.deserialize_symbol(symbol))
3919 .collect::<Vec<_>>()
3920 });
3921 symbols = futures::future::join_all(new_symbols)
3922 .await
3923 .into_iter()
3924 .filter_map(|symbol| symbol.log_err())
3925 .collect::<Vec<_>>();
3926 }
3927 Ok(symbols)
3928 })
3929 } else {
3930 Task::ready(Ok(Default::default()))
3931 }
3932 }
3933
3934 pub fn open_buffer_for_symbol(
3935 &mut self,
3936 symbol: &Symbol,
3937 cx: &mut ModelContext<Self>,
3938 ) -> Task<Result<ModelHandle<Buffer>>> {
3939 if self.is_local() {
3940 let language_server_id = if let Some(id) = self.language_server_ids.get(&(
3941 symbol.source_worktree_id,
3942 symbol.language_server_name.clone(),
3943 )) {
3944 *id
3945 } else {
3946 return Task::ready(Err(anyhow!(
3947 "language server for worktree and language not found"
3948 )));
3949 };
3950
3951 let worktree_abs_path = if let Some(worktree_abs_path) = self
3952 .worktree_for_id(symbol.path.worktree_id, cx)
3953 .and_then(|worktree| worktree.read(cx).as_local())
3954 .map(|local_worktree| local_worktree.abs_path())
3955 {
3956 worktree_abs_path
3957 } else {
3958 return Task::ready(Err(anyhow!("worktree not found for symbol")));
3959 };
3960 let symbol_abs_path = worktree_abs_path.join(&symbol.path.path);
3961 let symbol_uri = if let Ok(uri) = lsp::Url::from_file_path(symbol_abs_path) {
3962 uri
3963 } else {
3964 return Task::ready(Err(anyhow!("invalid symbol path")));
3965 };
3966
3967 self.open_local_buffer_via_lsp(
3968 symbol_uri,
3969 language_server_id,
3970 symbol.language_server_name.clone(),
3971 cx,
3972 )
3973 } else if let Some(project_id) = self.remote_id() {
3974 let request = self.client.request(proto::OpenBufferForSymbol {
3975 project_id,
3976 symbol: Some(serialize_symbol(symbol)),
3977 });
3978 cx.spawn(|this, mut cx| async move {
3979 let response = request.await?;
3980 this.update(&mut cx, |this, cx| {
3981 this.wait_for_remote_buffer(response.buffer_id, cx)
3982 })
3983 .await
3984 })
3985 } else {
3986 Task::ready(Err(anyhow!("project does not have a remote id")))
3987 }
3988 }
3989
3990 pub fn hover<T: ToPointUtf16>(
3991 &self,
3992 buffer: &ModelHandle<Buffer>,
3993 position: T,
3994 cx: &mut ModelContext<Self>,
3995 ) -> Task<Result<Option<Hover>>> {
3996 let position = position.to_point_utf16(buffer.read(cx));
3997 self.request_lsp(buffer.clone(), GetHover { position }, cx)
3998 }
3999
4000 pub fn completions<T: ToPointUtf16>(
4001 &self,
4002 buffer: &ModelHandle<Buffer>,
4003 position: T,
4004 cx: &mut ModelContext<Self>,
4005 ) -> Task<Result<Vec<Completion>>> {
4006 let position = position.to_point_utf16(buffer.read(cx));
4007 self.request_lsp(buffer.clone(), GetCompletions { position }, cx)
4008 }
4009
4010 pub fn apply_additional_edits_for_completion(
4011 &self,
4012 buffer_handle: ModelHandle<Buffer>,
4013 completion: Completion,
4014 push_to_history: bool,
4015 cx: &mut ModelContext<Self>,
4016 ) -> Task<Result<Option<Transaction>>> {
4017 let buffer = buffer_handle.read(cx);
4018 let buffer_id = buffer.remote_id();
4019
4020 if self.is_local() {
4021 let lang_server = match self.primary_language_servers_for_buffer(buffer, cx) {
4022 Some((_, server)) => server.clone(),
4023 _ => return Task::ready(Ok(Default::default())),
4024 };
4025
4026 cx.spawn(|this, mut cx| async move {
4027 let resolved_completion = lang_server
4028 .request::<lsp::request::ResolveCompletionItem>(completion.lsp_completion)
4029 .await?;
4030
4031 if let Some(edits) = resolved_completion.additional_text_edits {
4032 let edits = this
4033 .update(&mut cx, |this, cx| {
4034 this.edits_from_lsp(
4035 &buffer_handle,
4036 edits,
4037 lang_server.server_id(),
4038 None,
4039 cx,
4040 )
4041 })
4042 .await?;
4043
4044 buffer_handle.update(&mut cx, |buffer, cx| {
4045 buffer.finalize_last_transaction();
4046 buffer.start_transaction();
4047
4048 for (range, text) in edits {
4049 let primary = &completion.old_range;
4050 let start_within = primary.start.cmp(&range.start, buffer).is_le()
4051 && primary.end.cmp(&range.start, buffer).is_ge();
4052 let end_within = range.start.cmp(&primary.end, buffer).is_le()
4053 && range.end.cmp(&primary.end, buffer).is_ge();
4054
4055 //Skip addtional edits which overlap with the primary completion edit
4056 //https://github.com/zed-industries/zed/pull/1871
4057 if !start_within && !end_within {
4058 buffer.edit([(range, text)], None, cx);
4059 }
4060 }
4061
4062 let transaction = if buffer.end_transaction(cx).is_some() {
4063 let transaction = buffer.finalize_last_transaction().unwrap().clone();
4064 if !push_to_history {
4065 buffer.forget_transaction(transaction.id);
4066 }
4067 Some(transaction)
4068 } else {
4069 None
4070 };
4071 Ok(transaction)
4072 })
4073 } else {
4074 Ok(None)
4075 }
4076 })
4077 } else if let Some(project_id) = self.remote_id() {
4078 let client = self.client.clone();
4079 cx.spawn(|_, mut cx| async move {
4080 let response = client
4081 .request(proto::ApplyCompletionAdditionalEdits {
4082 project_id,
4083 buffer_id,
4084 completion: Some(language::proto::serialize_completion(&completion)),
4085 })
4086 .await?;
4087
4088 if let Some(transaction) = response.transaction {
4089 let transaction = language::proto::deserialize_transaction(transaction)?;
4090 buffer_handle
4091 .update(&mut cx, |buffer, _| {
4092 buffer.wait_for_edits(transaction.edit_ids.iter().copied())
4093 })
4094 .await?;
4095 if push_to_history {
4096 buffer_handle.update(&mut cx, |buffer, _| {
4097 buffer.push_transaction(transaction.clone(), Instant::now());
4098 });
4099 }
4100 Ok(Some(transaction))
4101 } else {
4102 Ok(None)
4103 }
4104 })
4105 } else {
4106 Task::ready(Err(anyhow!("project does not have a remote id")))
4107 }
4108 }
4109
4110 pub fn code_actions<T: Clone + ToOffset>(
4111 &self,
4112 buffer_handle: &ModelHandle<Buffer>,
4113 range: Range<T>,
4114 cx: &mut ModelContext<Self>,
4115 ) -> Task<Result<Vec<CodeAction>>> {
4116 let buffer = buffer_handle.read(cx);
4117 let range = buffer.anchor_before(range.start)..buffer.anchor_before(range.end);
4118 self.request_lsp(buffer_handle.clone(), GetCodeActions { range }, cx)
4119 }
4120
4121 pub fn apply_code_action(
4122 &self,
4123 buffer_handle: ModelHandle<Buffer>,
4124 mut action: CodeAction,
4125 push_to_history: bool,
4126 cx: &mut ModelContext<Self>,
4127 ) -> Task<Result<ProjectTransaction>> {
4128 if self.is_local() {
4129 let buffer = buffer_handle.read(cx);
4130 let (lsp_adapter, lang_server) = if let Some((adapter, server)) =
4131 self.language_server_for_buffer(buffer, action.server_id, cx)
4132 {
4133 (adapter.clone(), server.clone())
4134 } else {
4135 return Task::ready(Ok(Default::default()));
4136 };
4137 let range = action.range.to_point_utf16(buffer);
4138
4139 cx.spawn(|this, mut cx| async move {
4140 if let Some(lsp_range) = action
4141 .lsp_action
4142 .data
4143 .as_mut()
4144 .and_then(|d| d.get_mut("codeActionParams"))
4145 .and_then(|d| d.get_mut("range"))
4146 {
4147 *lsp_range = serde_json::to_value(&range_to_lsp(range)).unwrap();
4148 action.lsp_action = lang_server
4149 .request::<lsp::request::CodeActionResolveRequest>(action.lsp_action)
4150 .await?;
4151 } else {
4152 let actions = this
4153 .update(&mut cx, |this, cx| {
4154 this.code_actions(&buffer_handle, action.range, cx)
4155 })
4156 .await?;
4157 action.lsp_action = actions
4158 .into_iter()
4159 .find(|a| a.lsp_action.title == action.lsp_action.title)
4160 .ok_or_else(|| anyhow!("code action is outdated"))?
4161 .lsp_action;
4162 }
4163
4164 if let Some(edit) = action.lsp_action.edit {
4165 if edit.changes.is_some() || edit.document_changes.is_some() {
4166 return Self::deserialize_workspace_edit(
4167 this,
4168 edit,
4169 push_to_history,
4170 lsp_adapter.clone(),
4171 lang_server.clone(),
4172 &mut cx,
4173 )
4174 .await;
4175 }
4176 }
4177
4178 if let Some(command) = action.lsp_action.command {
4179 this.update(&mut cx, |this, _| {
4180 this.last_workspace_edits_by_language_server
4181 .remove(&lang_server.server_id());
4182 });
4183 lang_server
4184 .request::<lsp::request::ExecuteCommand>(lsp::ExecuteCommandParams {
4185 command: command.command,
4186 arguments: command.arguments.unwrap_or_default(),
4187 ..Default::default()
4188 })
4189 .await?;
4190 return Ok(this.update(&mut cx, |this, _| {
4191 this.last_workspace_edits_by_language_server
4192 .remove(&lang_server.server_id())
4193 .unwrap_or_default()
4194 }));
4195 }
4196
4197 Ok(ProjectTransaction::default())
4198 })
4199 } else if let Some(project_id) = self.remote_id() {
4200 let client = self.client.clone();
4201 let request = proto::ApplyCodeAction {
4202 project_id,
4203 buffer_id: buffer_handle.read(cx).remote_id(),
4204 action: Some(language::proto::serialize_code_action(&action)),
4205 };
4206 cx.spawn(|this, mut cx| async move {
4207 let response = client
4208 .request(request)
4209 .await?
4210 .transaction
4211 .ok_or_else(|| anyhow!("missing transaction"))?;
4212 this.update(&mut cx, |this, cx| {
4213 this.deserialize_project_transaction(response, push_to_history, cx)
4214 })
4215 .await
4216 })
4217 } else {
4218 Task::ready(Err(anyhow!("project does not have a remote id")))
4219 }
4220 }
4221
4222 fn apply_on_type_formatting(
4223 &self,
4224 buffer: ModelHandle<Buffer>,
4225 position: Anchor,
4226 trigger: String,
4227 cx: &mut ModelContext<Self>,
4228 ) -> Task<Result<Option<Transaction>>> {
4229 if self.is_local() {
4230 cx.spawn(|this, mut cx| async move {
4231 // Do not allow multiple concurrent formatting requests for the
4232 // same buffer.
4233 this.update(&mut cx, |this, cx| {
4234 this.buffers_being_formatted
4235 .insert(buffer.read(cx).remote_id())
4236 });
4237
4238 let _cleanup = defer({
4239 let this = this.clone();
4240 let mut cx = cx.clone();
4241 let closure_buffer = buffer.clone();
4242 move || {
4243 this.update(&mut cx, |this, cx| {
4244 this.buffers_being_formatted
4245 .remove(&closure_buffer.read(cx).remote_id());
4246 });
4247 }
4248 });
4249
4250 buffer
4251 .update(&mut cx, |buffer, _| {
4252 buffer.wait_for_edits(Some(position.timestamp))
4253 })
4254 .await?;
4255 this.update(&mut cx, |this, cx| {
4256 let position = position.to_point_utf16(buffer.read(cx));
4257 this.on_type_format(buffer, position, trigger, false, cx)
4258 })
4259 .await
4260 })
4261 } else if let Some(project_id) = self.remote_id() {
4262 let client = self.client.clone();
4263 let request = proto::OnTypeFormatting {
4264 project_id,
4265 buffer_id: buffer.read(cx).remote_id(),
4266 position: Some(serialize_anchor(&position)),
4267 trigger,
4268 version: serialize_version(&buffer.read(cx).version()),
4269 };
4270 cx.spawn(|_, _| async move {
4271 client
4272 .request(request)
4273 .await?
4274 .transaction
4275 .map(language::proto::deserialize_transaction)
4276 .transpose()
4277 })
4278 } else {
4279 Task::ready(Err(anyhow!("project does not have a remote id")))
4280 }
4281 }
4282
4283 async fn deserialize_edits(
4284 this: ModelHandle<Self>,
4285 buffer_to_edit: ModelHandle<Buffer>,
4286 edits: Vec<lsp::TextEdit>,
4287 push_to_history: bool,
4288 _: Arc<CachedLspAdapter>,
4289 language_server: Arc<LanguageServer>,
4290 cx: &mut AsyncAppContext,
4291 ) -> Result<Option<Transaction>> {
4292 let edits = this
4293 .update(cx, |this, cx| {
4294 this.edits_from_lsp(
4295 &buffer_to_edit,
4296 edits,
4297 language_server.server_id(),
4298 None,
4299 cx,
4300 )
4301 })
4302 .await?;
4303
4304 let transaction = buffer_to_edit.update(cx, |buffer, cx| {
4305 buffer.finalize_last_transaction();
4306 buffer.start_transaction();
4307 for (range, text) in edits {
4308 buffer.edit([(range, text)], None, cx);
4309 }
4310
4311 if buffer.end_transaction(cx).is_some() {
4312 let transaction = buffer.finalize_last_transaction().unwrap().clone();
4313 if !push_to_history {
4314 buffer.forget_transaction(transaction.id);
4315 }
4316 Some(transaction)
4317 } else {
4318 None
4319 }
4320 });
4321
4322 Ok(transaction)
4323 }
4324
4325 async fn deserialize_workspace_edit(
4326 this: ModelHandle<Self>,
4327 edit: lsp::WorkspaceEdit,
4328 push_to_history: bool,
4329 lsp_adapter: Arc<CachedLspAdapter>,
4330 language_server: Arc<LanguageServer>,
4331 cx: &mut AsyncAppContext,
4332 ) -> Result<ProjectTransaction> {
4333 let fs = this.read_with(cx, |this, _| this.fs.clone());
4334 let mut operations = Vec::new();
4335 if let Some(document_changes) = edit.document_changes {
4336 match document_changes {
4337 lsp::DocumentChanges::Edits(edits) => {
4338 operations.extend(edits.into_iter().map(lsp::DocumentChangeOperation::Edit))
4339 }
4340 lsp::DocumentChanges::Operations(ops) => operations = ops,
4341 }
4342 } else if let Some(changes) = edit.changes {
4343 operations.extend(changes.into_iter().map(|(uri, edits)| {
4344 lsp::DocumentChangeOperation::Edit(lsp::TextDocumentEdit {
4345 text_document: lsp::OptionalVersionedTextDocumentIdentifier {
4346 uri,
4347 version: None,
4348 },
4349 edits: edits.into_iter().map(lsp::OneOf::Left).collect(),
4350 })
4351 }));
4352 }
4353
4354 let mut project_transaction = ProjectTransaction::default();
4355 for operation in operations {
4356 match operation {
4357 lsp::DocumentChangeOperation::Op(lsp::ResourceOp::Create(op)) => {
4358 let abs_path = op
4359 .uri
4360 .to_file_path()
4361 .map_err(|_| anyhow!("can't convert URI to path"))?;
4362
4363 if let Some(parent_path) = abs_path.parent() {
4364 fs.create_dir(parent_path).await?;
4365 }
4366 if abs_path.ends_with("/") {
4367 fs.create_dir(&abs_path).await?;
4368 } else {
4369 fs.create_file(&abs_path, op.options.map(Into::into).unwrap_or_default())
4370 .await?;
4371 }
4372 }
4373
4374 lsp::DocumentChangeOperation::Op(lsp::ResourceOp::Rename(op)) => {
4375 let source_abs_path = op
4376 .old_uri
4377 .to_file_path()
4378 .map_err(|_| anyhow!("can't convert URI to path"))?;
4379 let target_abs_path = op
4380 .new_uri
4381 .to_file_path()
4382 .map_err(|_| anyhow!("can't convert URI to path"))?;
4383 fs.rename(
4384 &source_abs_path,
4385 &target_abs_path,
4386 op.options.map(Into::into).unwrap_or_default(),
4387 )
4388 .await?;
4389 }
4390
4391 lsp::DocumentChangeOperation::Op(lsp::ResourceOp::Delete(op)) => {
4392 let abs_path = op
4393 .uri
4394 .to_file_path()
4395 .map_err(|_| anyhow!("can't convert URI to path"))?;
4396 let options = op.options.map(Into::into).unwrap_or_default();
4397 if abs_path.ends_with("/") {
4398 fs.remove_dir(&abs_path, options).await?;
4399 } else {
4400 fs.remove_file(&abs_path, options).await?;
4401 }
4402 }
4403
4404 lsp::DocumentChangeOperation::Edit(op) => {
4405 let buffer_to_edit = this
4406 .update(cx, |this, cx| {
4407 this.open_local_buffer_via_lsp(
4408 op.text_document.uri,
4409 language_server.server_id(),
4410 lsp_adapter.name.clone(),
4411 cx,
4412 )
4413 })
4414 .await?;
4415
4416 let edits = this
4417 .update(cx, |this, cx| {
4418 let edits = op.edits.into_iter().map(|edit| match edit {
4419 lsp::OneOf::Left(edit) => edit,
4420 lsp::OneOf::Right(edit) => edit.text_edit,
4421 });
4422 this.edits_from_lsp(
4423 &buffer_to_edit,
4424 edits,
4425 language_server.server_id(),
4426 op.text_document.version,
4427 cx,
4428 )
4429 })
4430 .await?;
4431
4432 let transaction = buffer_to_edit.update(cx, |buffer, cx| {
4433 buffer.finalize_last_transaction();
4434 buffer.start_transaction();
4435 for (range, text) in edits {
4436 buffer.edit([(range, text)], None, cx);
4437 }
4438 let transaction = if buffer.end_transaction(cx).is_some() {
4439 let transaction = buffer.finalize_last_transaction().unwrap().clone();
4440 if !push_to_history {
4441 buffer.forget_transaction(transaction.id);
4442 }
4443 Some(transaction)
4444 } else {
4445 None
4446 };
4447
4448 transaction
4449 });
4450 if let Some(transaction) = transaction {
4451 project_transaction.0.insert(buffer_to_edit, transaction);
4452 }
4453 }
4454 }
4455 }
4456
4457 Ok(project_transaction)
4458 }
4459
4460 pub fn prepare_rename<T: ToPointUtf16>(
4461 &self,
4462 buffer: ModelHandle<Buffer>,
4463 position: T,
4464 cx: &mut ModelContext<Self>,
4465 ) -> Task<Result<Option<Range<Anchor>>>> {
4466 let position = position.to_point_utf16(buffer.read(cx));
4467 self.request_lsp(buffer, PrepareRename { position }, cx)
4468 }
4469
4470 pub fn perform_rename<T: ToPointUtf16>(
4471 &self,
4472 buffer: ModelHandle<Buffer>,
4473 position: T,
4474 new_name: String,
4475 push_to_history: bool,
4476 cx: &mut ModelContext<Self>,
4477 ) -> Task<Result<ProjectTransaction>> {
4478 let position = position.to_point_utf16(buffer.read(cx));
4479 self.request_lsp(
4480 buffer,
4481 PerformRename {
4482 position,
4483 new_name,
4484 push_to_history,
4485 },
4486 cx,
4487 )
4488 }
4489
4490 pub fn on_type_format<T: ToPointUtf16>(
4491 &self,
4492 buffer: ModelHandle<Buffer>,
4493 position: T,
4494 trigger: String,
4495 push_to_history: bool,
4496 cx: &mut ModelContext<Self>,
4497 ) -> Task<Result<Option<Transaction>>> {
4498 let (position, tab_size) = buffer.read_with(cx, |buffer, cx| {
4499 let position = position.to_point_utf16(buffer);
4500 (
4501 position,
4502 language_settings(buffer.language_at(position).as_ref(), buffer.file(), cx)
4503 .tab_size,
4504 )
4505 });
4506 self.request_lsp(
4507 buffer.clone(),
4508 OnTypeFormatting {
4509 position,
4510 trigger,
4511 options: lsp_command::lsp_formatting_options(tab_size.get()).into(),
4512 push_to_history,
4513 },
4514 cx,
4515 )
4516 }
4517
4518 #[allow(clippy::type_complexity)]
4519 pub fn search(
4520 &self,
4521 query: SearchQuery,
4522 cx: &mut ModelContext<Self>,
4523 ) -> Task<Result<HashMap<ModelHandle<Buffer>, Vec<Range<Anchor>>>>> {
4524 if self.is_local() {
4525 let snapshots = self
4526 .visible_worktrees(cx)
4527 .filter_map(|tree| {
4528 let tree = tree.read(cx).as_local()?;
4529 Some(tree.snapshot())
4530 })
4531 .collect::<Vec<_>>();
4532
4533 let background = cx.background().clone();
4534 let path_count: usize = snapshots.iter().map(|s| s.visible_file_count()).sum();
4535 if path_count == 0 {
4536 return Task::ready(Ok(Default::default()));
4537 }
4538 let workers = background.num_cpus().min(path_count);
4539 let (matching_paths_tx, mut matching_paths_rx) = smol::channel::bounded(1024);
4540 cx.background()
4541 .spawn({
4542 let fs = self.fs.clone();
4543 let background = cx.background().clone();
4544 let query = query.clone();
4545 async move {
4546 let fs = &fs;
4547 let query = &query;
4548 let matching_paths_tx = &matching_paths_tx;
4549 let paths_per_worker = (path_count + workers - 1) / workers;
4550 let snapshots = &snapshots;
4551 background
4552 .scoped(|scope| {
4553 for worker_ix in 0..workers {
4554 let worker_start_ix = worker_ix * paths_per_worker;
4555 let worker_end_ix = worker_start_ix + paths_per_worker;
4556 scope.spawn(async move {
4557 let mut snapshot_start_ix = 0;
4558 let mut abs_path = PathBuf::new();
4559 for snapshot in snapshots {
4560 let snapshot_end_ix =
4561 snapshot_start_ix + snapshot.visible_file_count();
4562 if worker_end_ix <= snapshot_start_ix {
4563 break;
4564 } else if worker_start_ix > snapshot_end_ix {
4565 snapshot_start_ix = snapshot_end_ix;
4566 continue;
4567 } else {
4568 let start_in_snapshot = worker_start_ix
4569 .saturating_sub(snapshot_start_ix);
4570 let end_in_snapshot =
4571 cmp::min(worker_end_ix, snapshot_end_ix)
4572 - snapshot_start_ix;
4573
4574 for entry in snapshot
4575 .files(false, start_in_snapshot)
4576 .take(end_in_snapshot - start_in_snapshot)
4577 {
4578 if matching_paths_tx.is_closed() {
4579 break;
4580 }
4581 let matches = if query
4582 .file_matches(Some(&entry.path))
4583 {
4584 abs_path.clear();
4585 abs_path.push(&snapshot.abs_path());
4586 abs_path.push(&entry.path);
4587 if let Some(file) =
4588 fs.open_sync(&abs_path).await.log_err()
4589 {
4590 query.detect(file).unwrap_or(false)
4591 } else {
4592 false
4593 }
4594 } else {
4595 false
4596 };
4597
4598 if matches {
4599 let project_path =
4600 (snapshot.id(), entry.path.clone());
4601 if matching_paths_tx
4602 .send(project_path)
4603 .await
4604 .is_err()
4605 {
4606 break;
4607 }
4608 }
4609 }
4610
4611 snapshot_start_ix = snapshot_end_ix;
4612 }
4613 }
4614 });
4615 }
4616 })
4617 .await;
4618 }
4619 })
4620 .detach();
4621
4622 let (buffers_tx, buffers_rx) = smol::channel::bounded(1024);
4623 let open_buffers = self
4624 .opened_buffers
4625 .values()
4626 .filter_map(|b| b.upgrade(cx))
4627 .collect::<HashSet<_>>();
4628 cx.spawn(|this, cx| async move {
4629 for buffer in &open_buffers {
4630 let snapshot = buffer.read_with(&cx, |buffer, _| buffer.snapshot());
4631 buffers_tx.send((buffer.clone(), snapshot)).await?;
4632 }
4633
4634 let open_buffers = Rc::new(RefCell::new(open_buffers));
4635 while let Some(project_path) = matching_paths_rx.next().await {
4636 if buffers_tx.is_closed() {
4637 break;
4638 }
4639
4640 let this = this.clone();
4641 let open_buffers = open_buffers.clone();
4642 let buffers_tx = buffers_tx.clone();
4643 cx.spawn(|mut cx| async move {
4644 if let Some(buffer) = this
4645 .update(&mut cx, |this, cx| this.open_buffer(project_path, cx))
4646 .await
4647 .log_err()
4648 {
4649 if open_buffers.borrow_mut().insert(buffer.clone()) {
4650 let snapshot = buffer.read_with(&cx, |buffer, _| buffer.snapshot());
4651 buffers_tx.send((buffer, snapshot)).await?;
4652 }
4653 }
4654
4655 Ok::<_, anyhow::Error>(())
4656 })
4657 .detach();
4658 }
4659
4660 Ok::<_, anyhow::Error>(())
4661 })
4662 .detach_and_log_err(cx);
4663
4664 let background = cx.background().clone();
4665 cx.background().spawn(async move {
4666 let query = &query;
4667 let mut matched_buffers = Vec::new();
4668 for _ in 0..workers {
4669 matched_buffers.push(HashMap::default());
4670 }
4671 background
4672 .scoped(|scope| {
4673 for worker_matched_buffers in matched_buffers.iter_mut() {
4674 let mut buffers_rx = buffers_rx.clone();
4675 scope.spawn(async move {
4676 while let Some((buffer, snapshot)) = buffers_rx.next().await {
4677 let buffer_matches = if query.file_matches(
4678 snapshot.file().map(|file| file.path().as_ref()),
4679 ) {
4680 query
4681 .search(snapshot.as_rope())
4682 .await
4683 .iter()
4684 .map(|range| {
4685 snapshot.anchor_before(range.start)
4686 ..snapshot.anchor_after(range.end)
4687 })
4688 .collect()
4689 } else {
4690 Vec::new()
4691 };
4692 if !buffer_matches.is_empty() {
4693 worker_matched_buffers
4694 .insert(buffer.clone(), buffer_matches);
4695 }
4696 }
4697 });
4698 }
4699 })
4700 .await;
4701 Ok(matched_buffers.into_iter().flatten().collect())
4702 })
4703 } else if let Some(project_id) = self.remote_id() {
4704 let request = self.client.request(query.to_proto(project_id));
4705 cx.spawn(|this, mut cx| async move {
4706 let response = request.await?;
4707 let mut result = HashMap::default();
4708 for location in response.locations {
4709 let target_buffer = this
4710 .update(&mut cx, |this, cx| {
4711 this.wait_for_remote_buffer(location.buffer_id, cx)
4712 })
4713 .await?;
4714 let start = location
4715 .start
4716 .and_then(deserialize_anchor)
4717 .ok_or_else(|| anyhow!("missing target start"))?;
4718 let end = location
4719 .end
4720 .and_then(deserialize_anchor)
4721 .ok_or_else(|| anyhow!("missing target end"))?;
4722 result
4723 .entry(target_buffer)
4724 .or_insert(Vec::new())
4725 .push(start..end)
4726 }
4727 Ok(result)
4728 })
4729 } else {
4730 Task::ready(Ok(Default::default()))
4731 }
4732 }
4733
4734 // TODO: Wire this up to allow selecting a server?
4735 fn request_lsp<R: LspCommand>(
4736 &self,
4737 buffer_handle: ModelHandle<Buffer>,
4738 request: R,
4739 cx: &mut ModelContext<Self>,
4740 ) -> Task<Result<R::Response>>
4741 where
4742 <R::LspRequest as lsp::request::Request>::Result: Send,
4743 {
4744 let buffer = buffer_handle.read(cx);
4745 if self.is_local() {
4746 let file = File::from_dyn(buffer.file()).and_then(File::as_local);
4747 if let Some((file, language_server)) = file.zip(
4748 self.primary_language_servers_for_buffer(buffer, cx)
4749 .map(|(_, server)| server.clone()),
4750 ) {
4751 let lsp_params = request.to_lsp(&file.abs_path(cx), buffer, &language_server, cx);
4752 return cx.spawn(|this, cx| async move {
4753 if !request.check_capabilities(language_server.capabilities()) {
4754 return Ok(Default::default());
4755 }
4756
4757 let response = language_server
4758 .request::<R::LspRequest>(lsp_params)
4759 .await
4760 .context("lsp request failed")?;
4761 request
4762 .response_from_lsp(
4763 response,
4764 this,
4765 buffer_handle,
4766 language_server.server_id(),
4767 cx,
4768 )
4769 .await
4770 });
4771 }
4772 } else if let Some(project_id) = self.remote_id() {
4773 let rpc = self.client.clone();
4774 let message = request.to_proto(project_id, buffer);
4775 return cx.spawn_weak(|this, cx| async move {
4776 // Ensure the project is still alive by the time the task
4777 // is scheduled.
4778 this.upgrade(&cx)
4779 .ok_or_else(|| anyhow!("project dropped"))?;
4780
4781 let response = rpc.request(message).await?;
4782
4783 let this = this
4784 .upgrade(&cx)
4785 .ok_or_else(|| anyhow!("project dropped"))?;
4786 if this.read_with(&cx, |this, _| this.is_read_only()) {
4787 Err(anyhow!("disconnected before completing request"))
4788 } else {
4789 request
4790 .response_from_proto(response, this, buffer_handle, cx)
4791 .await
4792 }
4793 });
4794 }
4795 Task::ready(Ok(Default::default()))
4796 }
4797
4798 pub fn find_or_create_local_worktree(
4799 &mut self,
4800 abs_path: impl AsRef<Path>,
4801 visible: bool,
4802 cx: &mut ModelContext<Self>,
4803 ) -> Task<Result<(ModelHandle<Worktree>, PathBuf)>> {
4804 let abs_path = abs_path.as_ref();
4805 if let Some((tree, relative_path)) = self.find_local_worktree(abs_path, cx) {
4806 Task::ready(Ok((tree, relative_path)))
4807 } else {
4808 let worktree = self.create_local_worktree(abs_path, visible, cx);
4809 cx.foreground()
4810 .spawn(async move { Ok((worktree.await?, PathBuf::new())) })
4811 }
4812 }
4813
4814 pub fn find_local_worktree(
4815 &self,
4816 abs_path: &Path,
4817 cx: &AppContext,
4818 ) -> Option<(ModelHandle<Worktree>, PathBuf)> {
4819 for tree in &self.worktrees {
4820 if let Some(tree) = tree.upgrade(cx) {
4821 if let Some(relative_path) = tree
4822 .read(cx)
4823 .as_local()
4824 .and_then(|t| abs_path.strip_prefix(t.abs_path()).ok())
4825 {
4826 return Some((tree.clone(), relative_path.into()));
4827 }
4828 }
4829 }
4830 None
4831 }
4832
4833 pub fn is_shared(&self) -> bool {
4834 match &self.client_state {
4835 Some(ProjectClientState::Local { .. }) => true,
4836 _ => false,
4837 }
4838 }
4839
4840 fn create_local_worktree(
4841 &mut self,
4842 abs_path: impl AsRef<Path>,
4843 visible: bool,
4844 cx: &mut ModelContext<Self>,
4845 ) -> Task<Result<ModelHandle<Worktree>>> {
4846 let fs = self.fs.clone();
4847 let client = self.client.clone();
4848 let next_entry_id = self.next_entry_id.clone();
4849 let path: Arc<Path> = abs_path.as_ref().into();
4850 let task = self
4851 .loading_local_worktrees
4852 .entry(path.clone())
4853 .or_insert_with(|| {
4854 cx.spawn(|project, mut cx| {
4855 async move {
4856 let worktree = Worktree::local(
4857 client.clone(),
4858 path.clone(),
4859 visible,
4860 fs,
4861 next_entry_id,
4862 &mut cx,
4863 )
4864 .await;
4865
4866 project.update(&mut cx, |project, _| {
4867 project.loading_local_worktrees.remove(&path);
4868 });
4869
4870 let worktree = worktree?;
4871 project.update(&mut cx, |project, cx| project.add_worktree(&worktree, cx));
4872 Ok(worktree)
4873 }
4874 .map_err(Arc::new)
4875 })
4876 .shared()
4877 })
4878 .clone();
4879 cx.foreground().spawn(async move {
4880 match task.await {
4881 Ok(worktree) => Ok(worktree),
4882 Err(err) => Err(anyhow!("{}", err)),
4883 }
4884 })
4885 }
4886
4887 pub fn remove_worktree(&mut self, id_to_remove: WorktreeId, cx: &mut ModelContext<Self>) {
4888 self.worktrees.retain(|worktree| {
4889 if let Some(worktree) = worktree.upgrade(cx) {
4890 let id = worktree.read(cx).id();
4891 if id == id_to_remove {
4892 cx.emit(Event::WorktreeRemoved(id));
4893 false
4894 } else {
4895 true
4896 }
4897 } else {
4898 false
4899 }
4900 });
4901 self.metadata_changed(cx);
4902 }
4903
4904 fn add_worktree(&mut self, worktree: &ModelHandle<Worktree>, cx: &mut ModelContext<Self>) {
4905 cx.observe(worktree, |_, _, cx| cx.notify()).detach();
4906 if worktree.read(cx).is_local() {
4907 cx.subscribe(worktree, |this, worktree, event, cx| match event {
4908 worktree::Event::UpdatedEntries(changes) => {
4909 this.update_local_worktree_buffers(&worktree, changes, cx);
4910 this.update_local_worktree_language_servers(&worktree, changes, cx);
4911 this.update_local_worktree_settings(&worktree, changes, cx);
4912 }
4913 worktree::Event::UpdatedGitRepositories(updated_repos) => {
4914 this.update_local_worktree_buffers_git_repos(worktree, updated_repos, cx)
4915 }
4916 })
4917 .detach();
4918 }
4919
4920 let push_strong_handle = {
4921 let worktree = worktree.read(cx);
4922 self.is_shared() || worktree.is_visible() || worktree.is_remote()
4923 };
4924 if push_strong_handle {
4925 self.worktrees
4926 .push(WorktreeHandle::Strong(worktree.clone()));
4927 } else {
4928 self.worktrees
4929 .push(WorktreeHandle::Weak(worktree.downgrade()));
4930 }
4931
4932 let handle_id = worktree.id();
4933 cx.observe_release(worktree, move |this, worktree, cx| {
4934 let _ = this.remove_worktree(worktree.id(), cx);
4935 cx.update_global::<SettingsStore, _, _>(|store, cx| {
4936 store.clear_local_settings(handle_id, cx).log_err()
4937 });
4938 })
4939 .detach();
4940
4941 cx.emit(Event::WorktreeAdded);
4942 self.metadata_changed(cx);
4943 }
4944
4945 fn update_local_worktree_buffers(
4946 &mut self,
4947 worktree_handle: &ModelHandle<Worktree>,
4948 changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
4949 cx: &mut ModelContext<Self>,
4950 ) {
4951 let snapshot = worktree_handle.read(cx).snapshot();
4952
4953 let mut renamed_buffers = Vec::new();
4954 for (path, entry_id, _) in changes {
4955 let worktree_id = worktree_handle.read(cx).id();
4956 let project_path = ProjectPath {
4957 worktree_id,
4958 path: path.clone(),
4959 };
4960
4961 let buffer_id = match self.local_buffer_ids_by_entry_id.get(entry_id) {
4962 Some(&buffer_id) => buffer_id,
4963 None => match self.local_buffer_ids_by_path.get(&project_path) {
4964 Some(&buffer_id) => buffer_id,
4965 None => continue,
4966 },
4967 };
4968
4969 let open_buffer = self.opened_buffers.get(&buffer_id);
4970 let buffer = if let Some(buffer) = open_buffer.and_then(|buffer| buffer.upgrade(cx)) {
4971 buffer
4972 } else {
4973 self.opened_buffers.remove(&buffer_id);
4974 self.local_buffer_ids_by_path.remove(&project_path);
4975 self.local_buffer_ids_by_entry_id.remove(entry_id);
4976 continue;
4977 };
4978
4979 buffer.update(cx, |buffer, cx| {
4980 if let Some(old_file) = File::from_dyn(buffer.file()) {
4981 if old_file.worktree != *worktree_handle {
4982 return;
4983 }
4984
4985 let new_file = if let Some(entry) = snapshot.entry_for_id(old_file.entry_id) {
4986 File {
4987 is_local: true,
4988 entry_id: entry.id,
4989 mtime: entry.mtime,
4990 path: entry.path.clone(),
4991 worktree: worktree_handle.clone(),
4992 is_deleted: false,
4993 }
4994 } else if let Some(entry) = snapshot.entry_for_path(old_file.path().as_ref()) {
4995 File {
4996 is_local: true,
4997 entry_id: entry.id,
4998 mtime: entry.mtime,
4999 path: entry.path.clone(),
5000 worktree: worktree_handle.clone(),
5001 is_deleted: false,
5002 }
5003 } else {
5004 File {
5005 is_local: true,
5006 entry_id: old_file.entry_id,
5007 path: old_file.path().clone(),
5008 mtime: old_file.mtime(),
5009 worktree: worktree_handle.clone(),
5010 is_deleted: true,
5011 }
5012 };
5013
5014 let old_path = old_file.abs_path(cx);
5015 if new_file.abs_path(cx) != old_path {
5016 renamed_buffers.push((cx.handle(), old_file.clone()));
5017 self.local_buffer_ids_by_path.remove(&project_path);
5018 self.local_buffer_ids_by_path.insert(
5019 ProjectPath {
5020 worktree_id,
5021 path: path.clone(),
5022 },
5023 buffer_id,
5024 );
5025 }
5026
5027 if new_file.entry_id != *entry_id {
5028 self.local_buffer_ids_by_entry_id.remove(entry_id);
5029 self.local_buffer_ids_by_entry_id
5030 .insert(new_file.entry_id, buffer_id);
5031 }
5032
5033 if new_file != *old_file {
5034 if let Some(project_id) = self.remote_id() {
5035 self.client
5036 .send(proto::UpdateBufferFile {
5037 project_id,
5038 buffer_id: buffer_id as u64,
5039 file: Some(new_file.to_proto()),
5040 })
5041 .log_err();
5042 }
5043
5044 buffer.file_updated(Arc::new(new_file), cx).detach();
5045 }
5046 }
5047 });
5048 }
5049
5050 for (buffer, old_file) in renamed_buffers {
5051 self.unregister_buffer_from_language_servers(&buffer, &old_file, cx);
5052 self.detect_language_for_buffer(&buffer, cx);
5053 self.register_buffer_with_language_servers(&buffer, cx);
5054 }
5055 }
5056
5057 fn update_local_worktree_language_servers(
5058 &mut self,
5059 worktree_handle: &ModelHandle<Worktree>,
5060 changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
5061 cx: &mut ModelContext<Self>,
5062 ) {
5063 if changes.is_empty() {
5064 return;
5065 }
5066
5067 let worktree_id = worktree_handle.read(cx).id();
5068 let mut language_server_ids = self
5069 .language_server_ids
5070 .iter()
5071 .filter_map(|((server_worktree_id, _), server_id)| {
5072 (*server_worktree_id == worktree_id).then_some(*server_id)
5073 })
5074 .collect::<Vec<_>>();
5075 language_server_ids.sort();
5076 language_server_ids.dedup();
5077
5078 let abs_path = worktree_handle.read(cx).abs_path();
5079 for server_id in &language_server_ids {
5080 if let Some(server) = self.language_servers.get(server_id) {
5081 if let LanguageServerState::Running {
5082 server,
5083 watched_paths,
5084 ..
5085 } = server
5086 {
5087 if let Some(watched_paths) = watched_paths.get(&worktree_id) {
5088 let params = lsp::DidChangeWatchedFilesParams {
5089 changes: changes
5090 .iter()
5091 .filter_map(|(path, _, change)| {
5092 if !watched_paths.is_match(&path) {
5093 return None;
5094 }
5095 let typ = match change {
5096 PathChange::Loaded => return None,
5097 PathChange::Added => lsp::FileChangeType::CREATED,
5098 PathChange::Removed => lsp::FileChangeType::DELETED,
5099 PathChange::Updated => lsp::FileChangeType::CHANGED,
5100 PathChange::AddedOrUpdated => lsp::FileChangeType::CHANGED,
5101 };
5102 Some(lsp::FileEvent {
5103 uri: lsp::Url::from_file_path(abs_path.join(path)).unwrap(),
5104 typ,
5105 })
5106 })
5107 .collect(),
5108 };
5109
5110 if !params.changes.is_empty() {
5111 server
5112 .notify::<lsp::notification::DidChangeWatchedFiles>(params)
5113 .log_err();
5114 }
5115 }
5116 }
5117 }
5118 }
5119 }
5120
5121 fn update_local_worktree_buffers_git_repos(
5122 &mut self,
5123 worktree_handle: ModelHandle<Worktree>,
5124 changed_repos: &UpdatedGitRepositoriesSet,
5125 cx: &mut ModelContext<Self>,
5126 ) {
5127 debug_assert!(worktree_handle.read(cx).is_local());
5128
5129 // Identify the loading buffers whose containing repository that has changed.
5130 let future_buffers = self
5131 .loading_buffers_by_path
5132 .iter()
5133 .filter_map(|(project_path, receiver)| {
5134 if project_path.worktree_id != worktree_handle.read(cx).id() {
5135 return None;
5136 }
5137 let path = &project_path.path;
5138 changed_repos.iter().find(|(work_dir, change)| {
5139 path.starts_with(work_dir) && change.git_dir_changed
5140 })?;
5141 let receiver = receiver.clone();
5142 let path = path.clone();
5143 Some(async move {
5144 wait_for_loading_buffer(receiver)
5145 .await
5146 .ok()
5147 .map(|buffer| (buffer, path))
5148 })
5149 })
5150 .collect::<FuturesUnordered<_>>();
5151
5152 // Identify the current buffers whose containing repository has changed.
5153 let current_buffers = self
5154 .opened_buffers
5155 .values()
5156 .filter_map(|buffer| {
5157 let buffer = buffer.upgrade(cx)?;
5158 let file = File::from_dyn(buffer.read(cx).file())?;
5159 if file.worktree != worktree_handle {
5160 return None;
5161 }
5162 let path = file.path();
5163 changed_repos.iter().find(|(work_dir, change)| {
5164 path.starts_with(work_dir) && change.git_dir_changed
5165 })?;
5166 Some((buffer, path.clone()))
5167 })
5168 .collect::<Vec<_>>();
5169
5170 if future_buffers.len() + current_buffers.len() == 0 {
5171 return;
5172 }
5173
5174 let remote_id = self.remote_id();
5175 let client = self.client.clone();
5176 cx.spawn_weak(move |_, mut cx| async move {
5177 // Wait for all of the buffers to load.
5178 let future_buffers = future_buffers.collect::<Vec<_>>().await;
5179
5180 // Reload the diff base for every buffer whose containing git repository has changed.
5181 let snapshot =
5182 worktree_handle.read_with(&cx, |tree, _| tree.as_local().unwrap().snapshot());
5183 let diff_bases_by_buffer = cx
5184 .background()
5185 .spawn(async move {
5186 future_buffers
5187 .into_iter()
5188 .filter_map(|e| e)
5189 .chain(current_buffers)
5190 .filter_map(|(buffer, path)| {
5191 let (work_directory, repo) =
5192 snapshot.repository_and_work_directory_for_path(&path)?;
5193 let repo = snapshot.get_local_repo(&repo)?;
5194 let relative_path = path.strip_prefix(&work_directory).ok()?;
5195 let base_text = repo.repo_ptr.lock().load_index_text(&relative_path);
5196 Some((buffer, base_text))
5197 })
5198 .collect::<Vec<_>>()
5199 })
5200 .await;
5201
5202 // Assign the new diff bases on all of the buffers.
5203 for (buffer, diff_base) in diff_bases_by_buffer {
5204 let buffer_id = buffer.update(&mut cx, |buffer, cx| {
5205 buffer.set_diff_base(diff_base.clone(), cx);
5206 buffer.remote_id()
5207 });
5208 if let Some(project_id) = remote_id {
5209 client
5210 .send(proto::UpdateDiffBase {
5211 project_id,
5212 buffer_id,
5213 diff_base,
5214 })
5215 .log_err();
5216 }
5217 }
5218 })
5219 .detach();
5220 }
5221
5222 fn update_local_worktree_settings(
5223 &mut self,
5224 worktree: &ModelHandle<Worktree>,
5225 changes: &UpdatedEntriesSet,
5226 cx: &mut ModelContext<Self>,
5227 ) {
5228 let project_id = self.remote_id();
5229 let worktree_id = worktree.id();
5230 let worktree = worktree.read(cx).as_local().unwrap();
5231 let remote_worktree_id = worktree.id();
5232
5233 let mut settings_contents = Vec::new();
5234 for (path, _, change) in changes.iter() {
5235 if path.ends_with(&*LOCAL_SETTINGS_RELATIVE_PATH) {
5236 let settings_dir = Arc::from(
5237 path.ancestors()
5238 .nth(LOCAL_SETTINGS_RELATIVE_PATH.components().count())
5239 .unwrap(),
5240 );
5241 let fs = self.fs.clone();
5242 let removed = *change == PathChange::Removed;
5243 let abs_path = worktree.absolutize(path);
5244 settings_contents.push(async move {
5245 (settings_dir, (!removed).then_some(fs.load(&abs_path).await))
5246 });
5247 }
5248 }
5249
5250 if settings_contents.is_empty() {
5251 return;
5252 }
5253
5254 let client = self.client.clone();
5255 cx.spawn_weak(move |_, mut cx| async move {
5256 let settings_contents: Vec<(Arc<Path>, _)> =
5257 futures::future::join_all(settings_contents).await;
5258 cx.update(|cx| {
5259 cx.update_global::<SettingsStore, _, _>(|store, cx| {
5260 for (directory, file_content) in settings_contents {
5261 let file_content = file_content.and_then(|content| content.log_err());
5262 store
5263 .set_local_settings(
5264 worktree_id,
5265 directory.clone(),
5266 file_content.as_ref().map(String::as_str),
5267 cx,
5268 )
5269 .log_err();
5270 if let Some(remote_id) = project_id {
5271 client
5272 .send(proto::UpdateWorktreeSettings {
5273 project_id: remote_id,
5274 worktree_id: remote_worktree_id.to_proto(),
5275 path: directory.to_string_lossy().into_owned(),
5276 content: file_content,
5277 })
5278 .log_err();
5279 }
5280 }
5281 });
5282 });
5283 })
5284 .detach();
5285 }
5286
5287 pub fn set_active_path(&mut self, entry: Option<ProjectPath>, cx: &mut ModelContext<Self>) {
5288 let new_active_entry = entry.and_then(|project_path| {
5289 let worktree = self.worktree_for_id(project_path.worktree_id, cx)?;
5290 let entry = worktree.read(cx).entry_for_path(project_path.path)?;
5291 Some(entry.id)
5292 });
5293 if new_active_entry != self.active_entry {
5294 self.active_entry = new_active_entry;
5295 cx.emit(Event::ActiveEntryChanged(new_active_entry));
5296 }
5297 }
5298
5299 pub fn language_servers_running_disk_based_diagnostics(
5300 &self,
5301 ) -> impl Iterator<Item = LanguageServerId> + '_ {
5302 self.language_server_statuses
5303 .iter()
5304 .filter_map(|(id, status)| {
5305 if status.has_pending_diagnostic_updates {
5306 Some(*id)
5307 } else {
5308 None
5309 }
5310 })
5311 }
5312
5313 pub fn diagnostic_summary(&self, cx: &AppContext) -> DiagnosticSummary {
5314 let mut summary = DiagnosticSummary::default();
5315 for (_, _, path_summary) in self.diagnostic_summaries(cx) {
5316 summary.error_count += path_summary.error_count;
5317 summary.warning_count += path_summary.warning_count;
5318 }
5319 summary
5320 }
5321
5322 pub fn diagnostic_summaries<'a>(
5323 &'a self,
5324 cx: &'a AppContext,
5325 ) -> impl Iterator<Item = (ProjectPath, LanguageServerId, DiagnosticSummary)> + 'a {
5326 self.visible_worktrees(cx).flat_map(move |worktree| {
5327 let worktree = worktree.read(cx);
5328 let worktree_id = worktree.id();
5329 worktree
5330 .diagnostic_summaries()
5331 .map(move |(path, server_id, summary)| {
5332 (ProjectPath { worktree_id, path }, server_id, summary)
5333 })
5334 })
5335 }
5336
5337 pub fn disk_based_diagnostics_started(
5338 &mut self,
5339 language_server_id: LanguageServerId,
5340 cx: &mut ModelContext<Self>,
5341 ) {
5342 cx.emit(Event::DiskBasedDiagnosticsStarted { language_server_id });
5343 }
5344
5345 pub fn disk_based_diagnostics_finished(
5346 &mut self,
5347 language_server_id: LanguageServerId,
5348 cx: &mut ModelContext<Self>,
5349 ) {
5350 cx.emit(Event::DiskBasedDiagnosticsFinished { language_server_id });
5351 }
5352
5353 pub fn active_entry(&self) -> Option<ProjectEntryId> {
5354 self.active_entry
5355 }
5356
5357 pub fn entry_for_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Entry> {
5358 self.worktree_for_id(path.worktree_id, cx)?
5359 .read(cx)
5360 .entry_for_path(&path.path)
5361 .cloned()
5362 }
5363
5364 pub fn path_for_entry(&self, entry_id: ProjectEntryId, cx: &AppContext) -> Option<ProjectPath> {
5365 let worktree = self.worktree_for_entry(entry_id, cx)?;
5366 let worktree = worktree.read(cx);
5367 let worktree_id = worktree.id();
5368 let path = worktree.entry_for_id(entry_id)?.path.clone();
5369 Some(ProjectPath { worktree_id, path })
5370 }
5371
5372 pub fn absolute_path(&self, project_path: &ProjectPath, cx: &AppContext) -> Option<PathBuf> {
5373 let workspace_root = self
5374 .worktree_for_id(project_path.worktree_id, cx)?
5375 .read(cx)
5376 .abs_path();
5377 let project_path = project_path.path.as_ref();
5378
5379 Some(if project_path == Path::new("") {
5380 workspace_root.to_path_buf()
5381 } else {
5382 workspace_root.join(project_path)
5383 })
5384 }
5385
5386 // RPC message handlers
5387
5388 async fn handle_unshare_project(
5389 this: ModelHandle<Self>,
5390 _: TypedEnvelope<proto::UnshareProject>,
5391 _: Arc<Client>,
5392 mut cx: AsyncAppContext,
5393 ) -> Result<()> {
5394 this.update(&mut cx, |this, cx| {
5395 if this.is_local() {
5396 this.unshare(cx)?;
5397 } else {
5398 this.disconnected_from_host(cx);
5399 }
5400 Ok(())
5401 })
5402 }
5403
5404 async fn handle_add_collaborator(
5405 this: ModelHandle<Self>,
5406 mut envelope: TypedEnvelope<proto::AddProjectCollaborator>,
5407 _: Arc<Client>,
5408 mut cx: AsyncAppContext,
5409 ) -> Result<()> {
5410 let collaborator = envelope
5411 .payload
5412 .collaborator
5413 .take()
5414 .ok_or_else(|| anyhow!("empty collaborator"))?;
5415
5416 let collaborator = Collaborator::from_proto(collaborator)?;
5417 this.update(&mut cx, |this, cx| {
5418 this.shared_buffers.remove(&collaborator.peer_id);
5419 this.collaborators
5420 .insert(collaborator.peer_id, collaborator);
5421 cx.notify();
5422 });
5423
5424 Ok(())
5425 }
5426
5427 async fn handle_update_project_collaborator(
5428 this: ModelHandle<Self>,
5429 envelope: TypedEnvelope<proto::UpdateProjectCollaborator>,
5430 _: Arc<Client>,
5431 mut cx: AsyncAppContext,
5432 ) -> Result<()> {
5433 let old_peer_id = envelope
5434 .payload
5435 .old_peer_id
5436 .ok_or_else(|| anyhow!("missing old peer id"))?;
5437 let new_peer_id = envelope
5438 .payload
5439 .new_peer_id
5440 .ok_or_else(|| anyhow!("missing new peer id"))?;
5441 this.update(&mut cx, |this, cx| {
5442 let collaborator = this
5443 .collaborators
5444 .remove(&old_peer_id)
5445 .ok_or_else(|| anyhow!("received UpdateProjectCollaborator for unknown peer"))?;
5446 let is_host = collaborator.replica_id == 0;
5447 this.collaborators.insert(new_peer_id, collaborator);
5448
5449 let buffers = this.shared_buffers.remove(&old_peer_id);
5450 log::info!(
5451 "peer {} became {}. moving buffers {:?}",
5452 old_peer_id,
5453 new_peer_id,
5454 &buffers
5455 );
5456 if let Some(buffers) = buffers {
5457 this.shared_buffers.insert(new_peer_id, buffers);
5458 }
5459
5460 if is_host {
5461 this.opened_buffers
5462 .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
5463 this.buffer_ordered_messages_tx
5464 .unbounded_send(BufferOrderedMessage::Resync)
5465 .unwrap();
5466 }
5467
5468 cx.emit(Event::CollaboratorUpdated {
5469 old_peer_id,
5470 new_peer_id,
5471 });
5472 cx.notify();
5473 Ok(())
5474 })
5475 }
5476
5477 async fn handle_remove_collaborator(
5478 this: ModelHandle<Self>,
5479 envelope: TypedEnvelope<proto::RemoveProjectCollaborator>,
5480 _: Arc<Client>,
5481 mut cx: AsyncAppContext,
5482 ) -> Result<()> {
5483 this.update(&mut cx, |this, cx| {
5484 let peer_id = envelope
5485 .payload
5486 .peer_id
5487 .ok_or_else(|| anyhow!("invalid peer id"))?;
5488 let replica_id = this
5489 .collaborators
5490 .remove(&peer_id)
5491 .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?
5492 .replica_id;
5493 for buffer in this.opened_buffers.values() {
5494 if let Some(buffer) = buffer.upgrade(cx) {
5495 buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
5496 }
5497 }
5498 this.shared_buffers.remove(&peer_id);
5499
5500 cx.emit(Event::CollaboratorLeft(peer_id));
5501 cx.notify();
5502 Ok(())
5503 })
5504 }
5505
5506 async fn handle_update_project(
5507 this: ModelHandle<Self>,
5508 envelope: TypedEnvelope<proto::UpdateProject>,
5509 _: Arc<Client>,
5510 mut cx: AsyncAppContext,
5511 ) -> Result<()> {
5512 this.update(&mut cx, |this, cx| {
5513 // Don't handle messages that were sent before the response to us joining the project
5514 if envelope.message_id > this.join_project_response_message_id {
5515 this.set_worktrees_from_proto(envelope.payload.worktrees, cx)?;
5516 }
5517 Ok(())
5518 })
5519 }
5520
5521 async fn handle_update_worktree(
5522 this: ModelHandle<Self>,
5523 envelope: TypedEnvelope<proto::UpdateWorktree>,
5524 _: Arc<Client>,
5525 mut cx: AsyncAppContext,
5526 ) -> Result<()> {
5527 this.update(&mut cx, |this, cx| {
5528 let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
5529 if let Some(worktree) = this.worktree_for_id(worktree_id, cx) {
5530 worktree.update(cx, |worktree, _| {
5531 let worktree = worktree.as_remote_mut().unwrap();
5532 worktree.update_from_remote(envelope.payload);
5533 });
5534 }
5535 Ok(())
5536 })
5537 }
5538
5539 async fn handle_update_worktree_settings(
5540 this: ModelHandle<Self>,
5541 envelope: TypedEnvelope<proto::UpdateWorktreeSettings>,
5542 _: Arc<Client>,
5543 mut cx: AsyncAppContext,
5544 ) -> Result<()> {
5545 this.update(&mut cx, |this, cx| {
5546 let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
5547 if let Some(worktree) = this.worktree_for_id(worktree_id, cx) {
5548 cx.update_global::<SettingsStore, _, _>(|store, cx| {
5549 store
5550 .set_local_settings(
5551 worktree.id(),
5552 PathBuf::from(&envelope.payload.path).into(),
5553 envelope.payload.content.as_ref().map(String::as_str),
5554 cx,
5555 )
5556 .log_err();
5557 });
5558 }
5559 Ok(())
5560 })
5561 }
5562
5563 async fn handle_create_project_entry(
5564 this: ModelHandle<Self>,
5565 envelope: TypedEnvelope<proto::CreateProjectEntry>,
5566 _: Arc<Client>,
5567 mut cx: AsyncAppContext,
5568 ) -> Result<proto::ProjectEntryResponse> {
5569 let worktree = this.update(&mut cx, |this, cx| {
5570 let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
5571 this.worktree_for_id(worktree_id, cx)
5572 .ok_or_else(|| anyhow!("worktree not found"))
5573 })?;
5574 let worktree_scan_id = worktree.read_with(&cx, |worktree, _| worktree.scan_id());
5575 let entry = worktree
5576 .update(&mut cx, |worktree, cx| {
5577 let worktree = worktree.as_local_mut().unwrap();
5578 let path = PathBuf::from(envelope.payload.path);
5579 worktree.create_entry(path, envelope.payload.is_directory, cx)
5580 })
5581 .await?;
5582 Ok(proto::ProjectEntryResponse {
5583 entry: Some((&entry).into()),
5584 worktree_scan_id: worktree_scan_id as u64,
5585 })
5586 }
5587
5588 async fn handle_rename_project_entry(
5589 this: ModelHandle<Self>,
5590 envelope: TypedEnvelope<proto::RenameProjectEntry>,
5591 _: Arc<Client>,
5592 mut cx: AsyncAppContext,
5593 ) -> Result<proto::ProjectEntryResponse> {
5594 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
5595 let worktree = this.read_with(&cx, |this, cx| {
5596 this.worktree_for_entry(entry_id, cx)
5597 .ok_or_else(|| anyhow!("worktree not found"))
5598 })?;
5599 let worktree_scan_id = worktree.read_with(&cx, |worktree, _| worktree.scan_id());
5600 let entry = worktree
5601 .update(&mut cx, |worktree, cx| {
5602 let new_path = PathBuf::from(envelope.payload.new_path);
5603 worktree
5604 .as_local_mut()
5605 .unwrap()
5606 .rename_entry(entry_id, new_path, cx)
5607 .ok_or_else(|| anyhow!("invalid entry"))
5608 })?
5609 .await?;
5610 Ok(proto::ProjectEntryResponse {
5611 entry: Some((&entry).into()),
5612 worktree_scan_id: worktree_scan_id as u64,
5613 })
5614 }
5615
5616 async fn handle_copy_project_entry(
5617 this: ModelHandle<Self>,
5618 envelope: TypedEnvelope<proto::CopyProjectEntry>,
5619 _: Arc<Client>,
5620 mut cx: AsyncAppContext,
5621 ) -> Result<proto::ProjectEntryResponse> {
5622 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
5623 let worktree = this.read_with(&cx, |this, cx| {
5624 this.worktree_for_entry(entry_id, cx)
5625 .ok_or_else(|| anyhow!("worktree not found"))
5626 })?;
5627 let worktree_scan_id = worktree.read_with(&cx, |worktree, _| worktree.scan_id());
5628 let entry = worktree
5629 .update(&mut cx, |worktree, cx| {
5630 let new_path = PathBuf::from(envelope.payload.new_path);
5631 worktree
5632 .as_local_mut()
5633 .unwrap()
5634 .copy_entry(entry_id, new_path, cx)
5635 .ok_or_else(|| anyhow!("invalid entry"))
5636 })?
5637 .await?;
5638 Ok(proto::ProjectEntryResponse {
5639 entry: Some((&entry).into()),
5640 worktree_scan_id: worktree_scan_id as u64,
5641 })
5642 }
5643
5644 async fn handle_delete_project_entry(
5645 this: ModelHandle<Self>,
5646 envelope: TypedEnvelope<proto::DeleteProjectEntry>,
5647 _: Arc<Client>,
5648 mut cx: AsyncAppContext,
5649 ) -> Result<proto::ProjectEntryResponse> {
5650 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
5651
5652 this.update(&mut cx, |_, cx| cx.emit(Event::DeletedEntry(entry_id)));
5653
5654 let worktree = this.read_with(&cx, |this, cx| {
5655 this.worktree_for_entry(entry_id, cx)
5656 .ok_or_else(|| anyhow!("worktree not found"))
5657 })?;
5658 let worktree_scan_id = worktree.read_with(&cx, |worktree, _| worktree.scan_id());
5659 worktree
5660 .update(&mut cx, |worktree, cx| {
5661 worktree
5662 .as_local_mut()
5663 .unwrap()
5664 .delete_entry(entry_id, cx)
5665 .ok_or_else(|| anyhow!("invalid entry"))
5666 })?
5667 .await?;
5668 Ok(proto::ProjectEntryResponse {
5669 entry: None,
5670 worktree_scan_id: worktree_scan_id as u64,
5671 })
5672 }
5673
5674 async fn handle_update_diagnostic_summary(
5675 this: ModelHandle<Self>,
5676 envelope: TypedEnvelope<proto::UpdateDiagnosticSummary>,
5677 _: Arc<Client>,
5678 mut cx: AsyncAppContext,
5679 ) -> Result<()> {
5680 this.update(&mut cx, |this, cx| {
5681 let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
5682 if let Some(worktree) = this.worktree_for_id(worktree_id, cx) {
5683 if let Some(summary) = envelope.payload.summary {
5684 let project_path = ProjectPath {
5685 worktree_id,
5686 path: Path::new(&summary.path).into(),
5687 };
5688 worktree.update(cx, |worktree, _| {
5689 worktree
5690 .as_remote_mut()
5691 .unwrap()
5692 .update_diagnostic_summary(project_path.path.clone(), &summary);
5693 });
5694 cx.emit(Event::DiagnosticsUpdated {
5695 language_server_id: LanguageServerId(summary.language_server_id as usize),
5696 path: project_path,
5697 });
5698 }
5699 }
5700 Ok(())
5701 })
5702 }
5703
5704 async fn handle_start_language_server(
5705 this: ModelHandle<Self>,
5706 envelope: TypedEnvelope<proto::StartLanguageServer>,
5707 _: Arc<Client>,
5708 mut cx: AsyncAppContext,
5709 ) -> Result<()> {
5710 let server = envelope
5711 .payload
5712 .server
5713 .ok_or_else(|| anyhow!("invalid server"))?;
5714 this.update(&mut cx, |this, cx| {
5715 this.language_server_statuses.insert(
5716 LanguageServerId(server.id as usize),
5717 LanguageServerStatus {
5718 name: server.name,
5719 pending_work: Default::default(),
5720 has_pending_diagnostic_updates: false,
5721 progress_tokens: Default::default(),
5722 },
5723 );
5724 cx.notify();
5725 });
5726 Ok(())
5727 }
5728
5729 async fn handle_update_language_server(
5730 this: ModelHandle<Self>,
5731 envelope: TypedEnvelope<proto::UpdateLanguageServer>,
5732 _: Arc<Client>,
5733 mut cx: AsyncAppContext,
5734 ) -> Result<()> {
5735 this.update(&mut cx, |this, cx| {
5736 let language_server_id = LanguageServerId(envelope.payload.language_server_id as usize);
5737
5738 match envelope
5739 .payload
5740 .variant
5741 .ok_or_else(|| anyhow!("invalid variant"))?
5742 {
5743 proto::update_language_server::Variant::WorkStart(payload) => {
5744 this.on_lsp_work_start(
5745 language_server_id,
5746 payload.token,
5747 LanguageServerProgress {
5748 message: payload.message,
5749 percentage: payload.percentage.map(|p| p as usize),
5750 last_update_at: Instant::now(),
5751 },
5752 cx,
5753 );
5754 }
5755
5756 proto::update_language_server::Variant::WorkProgress(payload) => {
5757 this.on_lsp_work_progress(
5758 language_server_id,
5759 payload.token,
5760 LanguageServerProgress {
5761 message: payload.message,
5762 percentage: payload.percentage.map(|p| p as usize),
5763 last_update_at: Instant::now(),
5764 },
5765 cx,
5766 );
5767 }
5768
5769 proto::update_language_server::Variant::WorkEnd(payload) => {
5770 this.on_lsp_work_end(language_server_id, payload.token, cx);
5771 }
5772
5773 proto::update_language_server::Variant::DiskBasedDiagnosticsUpdating(_) => {
5774 this.disk_based_diagnostics_started(language_server_id, cx);
5775 }
5776
5777 proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(_) => {
5778 this.disk_based_diagnostics_finished(language_server_id, cx)
5779 }
5780 }
5781
5782 Ok(())
5783 })
5784 }
5785
5786 async fn handle_update_buffer(
5787 this: ModelHandle<Self>,
5788 envelope: TypedEnvelope<proto::UpdateBuffer>,
5789 _: Arc<Client>,
5790 mut cx: AsyncAppContext,
5791 ) -> Result<proto::Ack> {
5792 this.update(&mut cx, |this, cx| {
5793 let payload = envelope.payload.clone();
5794 let buffer_id = payload.buffer_id;
5795 let ops = payload
5796 .operations
5797 .into_iter()
5798 .map(language::proto::deserialize_operation)
5799 .collect::<Result<Vec<_>, _>>()?;
5800 let is_remote = this.is_remote();
5801 match this.opened_buffers.entry(buffer_id) {
5802 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
5803 OpenBuffer::Strong(buffer) => {
5804 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
5805 }
5806 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
5807 OpenBuffer::Weak(_) => {}
5808 },
5809 hash_map::Entry::Vacant(e) => {
5810 assert!(
5811 is_remote,
5812 "received buffer update from {:?}",
5813 envelope.original_sender_id
5814 );
5815 e.insert(OpenBuffer::Operations(ops));
5816 }
5817 }
5818 Ok(proto::Ack {})
5819 })
5820 }
5821
5822 async fn handle_create_buffer_for_peer(
5823 this: ModelHandle<Self>,
5824 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
5825 _: Arc<Client>,
5826 mut cx: AsyncAppContext,
5827 ) -> Result<()> {
5828 this.update(&mut cx, |this, cx| {
5829 match envelope
5830 .payload
5831 .variant
5832 .ok_or_else(|| anyhow!("missing variant"))?
5833 {
5834 proto::create_buffer_for_peer::Variant::State(mut state) => {
5835 let mut buffer_file = None;
5836 if let Some(file) = state.file.take() {
5837 let worktree_id = WorktreeId::from_proto(file.worktree_id);
5838 let worktree = this.worktree_for_id(worktree_id, cx).ok_or_else(|| {
5839 anyhow!("no worktree found for id {}", file.worktree_id)
5840 })?;
5841 buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
5842 as Arc<dyn language::File>);
5843 }
5844
5845 let buffer_id = state.id;
5846 let buffer = cx.add_model(|_| {
5847 Buffer::from_proto(this.replica_id(), state, buffer_file).unwrap()
5848 });
5849 this.incomplete_remote_buffers
5850 .insert(buffer_id, Some(buffer));
5851 }
5852 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
5853 let buffer = this
5854 .incomplete_remote_buffers
5855 .get(&chunk.buffer_id)
5856 .cloned()
5857 .flatten()
5858 .ok_or_else(|| {
5859 anyhow!(
5860 "received chunk for buffer {} without initial state",
5861 chunk.buffer_id
5862 )
5863 })?;
5864 let operations = chunk
5865 .operations
5866 .into_iter()
5867 .map(language::proto::deserialize_operation)
5868 .collect::<Result<Vec<_>>>()?;
5869 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))?;
5870
5871 if chunk.is_last {
5872 this.incomplete_remote_buffers.remove(&chunk.buffer_id);
5873 this.register_buffer(&buffer, cx)?;
5874 }
5875 }
5876 }
5877
5878 Ok(())
5879 })
5880 }
5881
5882 async fn handle_update_diff_base(
5883 this: ModelHandle<Self>,
5884 envelope: TypedEnvelope<proto::UpdateDiffBase>,
5885 _: Arc<Client>,
5886 mut cx: AsyncAppContext,
5887 ) -> Result<()> {
5888 this.update(&mut cx, |this, cx| {
5889 let buffer_id = envelope.payload.buffer_id;
5890 let diff_base = envelope.payload.diff_base;
5891 if let Some(buffer) = this
5892 .opened_buffers
5893 .get_mut(&buffer_id)
5894 .and_then(|b| b.upgrade(cx))
5895 .or_else(|| {
5896 this.incomplete_remote_buffers
5897 .get(&buffer_id)
5898 .cloned()
5899 .flatten()
5900 })
5901 {
5902 buffer.update(cx, |buffer, cx| buffer.set_diff_base(diff_base, cx));
5903 }
5904 Ok(())
5905 })
5906 }
5907
5908 async fn handle_update_buffer_file(
5909 this: ModelHandle<Self>,
5910 envelope: TypedEnvelope<proto::UpdateBufferFile>,
5911 _: Arc<Client>,
5912 mut cx: AsyncAppContext,
5913 ) -> Result<()> {
5914 let buffer_id = envelope.payload.buffer_id;
5915
5916 this.update(&mut cx, |this, cx| {
5917 let payload = envelope.payload.clone();
5918 if let Some(buffer) = this
5919 .opened_buffers
5920 .get(&buffer_id)
5921 .and_then(|b| b.upgrade(cx))
5922 .or_else(|| {
5923 this.incomplete_remote_buffers
5924 .get(&buffer_id)
5925 .cloned()
5926 .flatten()
5927 })
5928 {
5929 let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
5930 let worktree = this
5931 .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
5932 .ok_or_else(|| anyhow!("no such worktree"))?;
5933 let file = File::from_proto(file, worktree, cx)?;
5934 buffer.update(cx, |buffer, cx| {
5935 buffer.file_updated(Arc::new(file), cx).detach();
5936 });
5937 this.detect_language_for_buffer(&buffer, cx);
5938 }
5939 Ok(())
5940 })
5941 }
5942
5943 async fn handle_save_buffer(
5944 this: ModelHandle<Self>,
5945 envelope: TypedEnvelope<proto::SaveBuffer>,
5946 _: Arc<Client>,
5947 mut cx: AsyncAppContext,
5948 ) -> Result<proto::BufferSaved> {
5949 let buffer_id = envelope.payload.buffer_id;
5950 let (project_id, buffer) = this.update(&mut cx, |this, cx| {
5951 let project_id = this.remote_id().ok_or_else(|| anyhow!("not connected"))?;
5952 let buffer = this
5953 .opened_buffers
5954 .get(&buffer_id)
5955 .and_then(|buffer| buffer.upgrade(cx))
5956 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?;
5957 anyhow::Ok((project_id, buffer))
5958 })?;
5959 buffer
5960 .update(&mut cx, |buffer, _| {
5961 buffer.wait_for_version(deserialize_version(&envelope.payload.version))
5962 })
5963 .await?;
5964 let buffer_id = buffer.read_with(&cx, |buffer, _| buffer.remote_id());
5965
5966 let (saved_version, fingerprint, mtime) = this
5967 .update(&mut cx, |this, cx| this.save_buffer(buffer, cx))
5968 .await?;
5969 Ok(proto::BufferSaved {
5970 project_id,
5971 buffer_id,
5972 version: serialize_version(&saved_version),
5973 mtime: Some(mtime.into()),
5974 fingerprint: language::proto::serialize_fingerprint(fingerprint),
5975 })
5976 }
5977
5978 async fn handle_reload_buffers(
5979 this: ModelHandle<Self>,
5980 envelope: TypedEnvelope<proto::ReloadBuffers>,
5981 _: Arc<Client>,
5982 mut cx: AsyncAppContext,
5983 ) -> Result<proto::ReloadBuffersResponse> {
5984 let sender_id = envelope.original_sender_id()?;
5985 let reload = this.update(&mut cx, |this, cx| {
5986 let mut buffers = HashSet::default();
5987 for buffer_id in &envelope.payload.buffer_ids {
5988 buffers.insert(
5989 this.opened_buffers
5990 .get(buffer_id)
5991 .and_then(|buffer| buffer.upgrade(cx))
5992 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?,
5993 );
5994 }
5995 Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
5996 })?;
5997
5998 let project_transaction = reload.await?;
5999 let project_transaction = this.update(&mut cx, |this, cx| {
6000 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
6001 });
6002 Ok(proto::ReloadBuffersResponse {
6003 transaction: Some(project_transaction),
6004 })
6005 }
6006
6007 async fn handle_synchronize_buffers(
6008 this: ModelHandle<Self>,
6009 envelope: TypedEnvelope<proto::SynchronizeBuffers>,
6010 _: Arc<Client>,
6011 mut cx: AsyncAppContext,
6012 ) -> Result<proto::SynchronizeBuffersResponse> {
6013 let project_id = envelope.payload.project_id;
6014 let mut response = proto::SynchronizeBuffersResponse {
6015 buffers: Default::default(),
6016 };
6017
6018 this.update(&mut cx, |this, cx| {
6019 let Some(guest_id) = envelope.original_sender_id else {
6020 error!("missing original_sender_id on SynchronizeBuffers request");
6021 return;
6022 };
6023
6024 this.shared_buffers.entry(guest_id).or_default().clear();
6025 for buffer in envelope.payload.buffers {
6026 let buffer_id = buffer.id;
6027 let remote_version = language::proto::deserialize_version(&buffer.version);
6028 if let Some(buffer) = this.buffer_for_id(buffer_id, cx) {
6029 this.shared_buffers
6030 .entry(guest_id)
6031 .or_default()
6032 .insert(buffer_id);
6033
6034 let buffer = buffer.read(cx);
6035 response.buffers.push(proto::BufferVersion {
6036 id: buffer_id,
6037 version: language::proto::serialize_version(&buffer.version),
6038 });
6039
6040 let operations = buffer.serialize_ops(Some(remote_version), cx);
6041 let client = this.client.clone();
6042 if let Some(file) = buffer.file() {
6043 client
6044 .send(proto::UpdateBufferFile {
6045 project_id,
6046 buffer_id: buffer_id as u64,
6047 file: Some(file.to_proto()),
6048 })
6049 .log_err();
6050 }
6051
6052 client
6053 .send(proto::UpdateDiffBase {
6054 project_id,
6055 buffer_id: buffer_id as u64,
6056 diff_base: buffer.diff_base().map(Into::into),
6057 })
6058 .log_err();
6059
6060 client
6061 .send(proto::BufferReloaded {
6062 project_id,
6063 buffer_id,
6064 version: language::proto::serialize_version(buffer.saved_version()),
6065 mtime: Some(buffer.saved_mtime().into()),
6066 fingerprint: language::proto::serialize_fingerprint(
6067 buffer.saved_version_fingerprint(),
6068 ),
6069 line_ending: language::proto::serialize_line_ending(
6070 buffer.line_ending(),
6071 ) as i32,
6072 })
6073 .log_err();
6074
6075 cx.background()
6076 .spawn(
6077 async move {
6078 let operations = operations.await;
6079 for chunk in split_operations(operations) {
6080 client
6081 .request(proto::UpdateBuffer {
6082 project_id,
6083 buffer_id,
6084 operations: chunk,
6085 })
6086 .await?;
6087 }
6088 anyhow::Ok(())
6089 }
6090 .log_err(),
6091 )
6092 .detach();
6093 }
6094 }
6095 });
6096
6097 Ok(response)
6098 }
6099
6100 async fn handle_format_buffers(
6101 this: ModelHandle<Self>,
6102 envelope: TypedEnvelope<proto::FormatBuffers>,
6103 _: Arc<Client>,
6104 mut cx: AsyncAppContext,
6105 ) -> Result<proto::FormatBuffersResponse> {
6106 let sender_id = envelope.original_sender_id()?;
6107 let format = this.update(&mut cx, |this, cx| {
6108 let mut buffers = HashSet::default();
6109 for buffer_id in &envelope.payload.buffer_ids {
6110 buffers.insert(
6111 this.opened_buffers
6112 .get(buffer_id)
6113 .and_then(|buffer| buffer.upgrade(cx))
6114 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?,
6115 );
6116 }
6117 let trigger = FormatTrigger::from_proto(envelope.payload.trigger);
6118 Ok::<_, anyhow::Error>(this.format(buffers, false, trigger, cx))
6119 })?;
6120
6121 let project_transaction = format.await?;
6122 let project_transaction = this.update(&mut cx, |this, cx| {
6123 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
6124 });
6125 Ok(proto::FormatBuffersResponse {
6126 transaction: Some(project_transaction),
6127 })
6128 }
6129
6130 async fn handle_apply_additional_edits_for_completion(
6131 this: ModelHandle<Self>,
6132 envelope: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
6133 _: Arc<Client>,
6134 mut cx: AsyncAppContext,
6135 ) -> Result<proto::ApplyCompletionAdditionalEditsResponse> {
6136 let (buffer, completion) = this.update(&mut cx, |this, cx| {
6137 let buffer = this
6138 .opened_buffers
6139 .get(&envelope.payload.buffer_id)
6140 .and_then(|buffer| buffer.upgrade(cx))
6141 .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?;
6142 let language = buffer.read(cx).language();
6143 let completion = language::proto::deserialize_completion(
6144 envelope
6145 .payload
6146 .completion
6147 .ok_or_else(|| anyhow!("invalid completion"))?,
6148 language.cloned(),
6149 );
6150 Ok::<_, anyhow::Error>((buffer, completion))
6151 })?;
6152
6153 let completion = completion.await?;
6154
6155 let apply_additional_edits = this.update(&mut cx, |this, cx| {
6156 this.apply_additional_edits_for_completion(buffer, completion, false, cx)
6157 });
6158
6159 Ok(proto::ApplyCompletionAdditionalEditsResponse {
6160 transaction: apply_additional_edits
6161 .await?
6162 .as_ref()
6163 .map(language::proto::serialize_transaction),
6164 })
6165 }
6166
6167 async fn handle_apply_code_action(
6168 this: ModelHandle<Self>,
6169 envelope: TypedEnvelope<proto::ApplyCodeAction>,
6170 _: Arc<Client>,
6171 mut cx: AsyncAppContext,
6172 ) -> Result<proto::ApplyCodeActionResponse> {
6173 let sender_id = envelope.original_sender_id()?;
6174 let action = language::proto::deserialize_code_action(
6175 envelope
6176 .payload
6177 .action
6178 .ok_or_else(|| anyhow!("invalid action"))?,
6179 )?;
6180 let apply_code_action = this.update(&mut cx, |this, cx| {
6181 let buffer = this
6182 .opened_buffers
6183 .get(&envelope.payload.buffer_id)
6184 .and_then(|buffer| buffer.upgrade(cx))
6185 .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?;
6186 Ok::<_, anyhow::Error>(this.apply_code_action(buffer, action, false, cx))
6187 })?;
6188
6189 let project_transaction = apply_code_action.await?;
6190 let project_transaction = this.update(&mut cx, |this, cx| {
6191 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
6192 });
6193 Ok(proto::ApplyCodeActionResponse {
6194 transaction: Some(project_transaction),
6195 })
6196 }
6197
6198 async fn handle_on_type_formatting(
6199 this: ModelHandle<Self>,
6200 envelope: TypedEnvelope<proto::OnTypeFormatting>,
6201 _: Arc<Client>,
6202 mut cx: AsyncAppContext,
6203 ) -> Result<proto::OnTypeFormattingResponse> {
6204 let on_type_formatting = this.update(&mut cx, |this, cx| {
6205 let buffer = this
6206 .opened_buffers
6207 .get(&envelope.payload.buffer_id)
6208 .and_then(|buffer| buffer.upgrade(cx))
6209 .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?;
6210 let position = envelope
6211 .payload
6212 .position
6213 .and_then(deserialize_anchor)
6214 .ok_or_else(|| anyhow!("invalid position"))?;
6215 Ok::<_, anyhow::Error>(this.apply_on_type_formatting(
6216 buffer,
6217 position,
6218 envelope.payload.trigger.clone(),
6219 cx,
6220 ))
6221 })?;
6222
6223 let transaction = on_type_formatting
6224 .await?
6225 .as_ref()
6226 .map(language::proto::serialize_transaction);
6227 Ok(proto::OnTypeFormattingResponse { transaction })
6228 }
6229
6230 async fn handle_lsp_command<T: LspCommand>(
6231 this: ModelHandle<Self>,
6232 envelope: TypedEnvelope<T::ProtoRequest>,
6233 _: Arc<Client>,
6234 mut cx: AsyncAppContext,
6235 ) -> Result<<T::ProtoRequest as proto::RequestMessage>::Response>
6236 where
6237 <T::LspRequest as lsp::request::Request>::Result: Send,
6238 {
6239 let sender_id = envelope.original_sender_id()?;
6240 let buffer_id = T::buffer_id_from_proto(&envelope.payload);
6241 let buffer_handle = this.read_with(&cx, |this, _| {
6242 this.opened_buffers
6243 .get(&buffer_id)
6244 .and_then(|buffer| buffer.upgrade(&cx))
6245 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
6246 })?;
6247 let request = T::from_proto(
6248 envelope.payload,
6249 this.clone(),
6250 buffer_handle.clone(),
6251 cx.clone(),
6252 )
6253 .await?;
6254 let buffer_version = buffer_handle.read_with(&cx, |buffer, _| buffer.version());
6255 let response = this
6256 .update(&mut cx, |this, cx| {
6257 this.request_lsp(buffer_handle, request, cx)
6258 })
6259 .await?;
6260 this.update(&mut cx, |this, cx| {
6261 Ok(T::response_to_proto(
6262 response,
6263 this,
6264 sender_id,
6265 &buffer_version,
6266 cx,
6267 ))
6268 })
6269 }
6270
6271 async fn handle_get_project_symbols(
6272 this: ModelHandle<Self>,
6273 envelope: TypedEnvelope<proto::GetProjectSymbols>,
6274 _: Arc<Client>,
6275 mut cx: AsyncAppContext,
6276 ) -> Result<proto::GetProjectSymbolsResponse> {
6277 let symbols = this
6278 .update(&mut cx, |this, cx| {
6279 this.symbols(&envelope.payload.query, cx)
6280 })
6281 .await?;
6282
6283 Ok(proto::GetProjectSymbolsResponse {
6284 symbols: symbols.iter().map(serialize_symbol).collect(),
6285 })
6286 }
6287
6288 async fn handle_search_project(
6289 this: ModelHandle<Self>,
6290 envelope: TypedEnvelope<proto::SearchProject>,
6291 _: Arc<Client>,
6292 mut cx: AsyncAppContext,
6293 ) -> Result<proto::SearchProjectResponse> {
6294 let peer_id = envelope.original_sender_id()?;
6295 let query = SearchQuery::from_proto(envelope.payload)?;
6296 let result = this
6297 .update(&mut cx, |this, cx| this.search(query, cx))
6298 .await?;
6299
6300 this.update(&mut cx, |this, cx| {
6301 let mut locations = Vec::new();
6302 for (buffer, ranges) in result {
6303 for range in ranges {
6304 let start = serialize_anchor(&range.start);
6305 let end = serialize_anchor(&range.end);
6306 let buffer_id = this.create_buffer_for_peer(&buffer, peer_id, cx);
6307 locations.push(proto::Location {
6308 buffer_id,
6309 start: Some(start),
6310 end: Some(end),
6311 });
6312 }
6313 }
6314 Ok(proto::SearchProjectResponse { locations })
6315 })
6316 }
6317
6318 async fn handle_open_buffer_for_symbol(
6319 this: ModelHandle<Self>,
6320 envelope: TypedEnvelope<proto::OpenBufferForSymbol>,
6321 _: Arc<Client>,
6322 mut cx: AsyncAppContext,
6323 ) -> Result<proto::OpenBufferForSymbolResponse> {
6324 let peer_id = envelope.original_sender_id()?;
6325 let symbol = envelope
6326 .payload
6327 .symbol
6328 .ok_or_else(|| anyhow!("invalid symbol"))?;
6329 let symbol = this
6330 .read_with(&cx, |this, _| this.deserialize_symbol(symbol))
6331 .await?;
6332 let symbol = this.read_with(&cx, |this, _| {
6333 let signature = this.symbol_signature(&symbol.path);
6334 if signature == symbol.signature {
6335 Ok(symbol)
6336 } else {
6337 Err(anyhow!("invalid symbol signature"))
6338 }
6339 })?;
6340 let buffer = this
6341 .update(&mut cx, |this, cx| this.open_buffer_for_symbol(&symbol, cx))
6342 .await?;
6343
6344 Ok(proto::OpenBufferForSymbolResponse {
6345 buffer_id: this.update(&mut cx, |this, cx| {
6346 this.create_buffer_for_peer(&buffer, peer_id, cx)
6347 }),
6348 })
6349 }
6350
6351 fn symbol_signature(&self, project_path: &ProjectPath) -> [u8; 32] {
6352 let mut hasher = Sha256::new();
6353 hasher.update(project_path.worktree_id.to_proto().to_be_bytes());
6354 hasher.update(project_path.path.to_string_lossy().as_bytes());
6355 hasher.update(self.nonce.to_be_bytes());
6356 hasher.finalize().as_slice().try_into().unwrap()
6357 }
6358
6359 async fn handle_open_buffer_by_id(
6360 this: ModelHandle<Self>,
6361 envelope: TypedEnvelope<proto::OpenBufferById>,
6362 _: Arc<Client>,
6363 mut cx: AsyncAppContext,
6364 ) -> Result<proto::OpenBufferResponse> {
6365 let peer_id = envelope.original_sender_id()?;
6366 let buffer = this
6367 .update(&mut cx, |this, cx| {
6368 this.open_buffer_by_id(envelope.payload.id, cx)
6369 })
6370 .await?;
6371 this.update(&mut cx, |this, cx| {
6372 Ok(proto::OpenBufferResponse {
6373 buffer_id: this.create_buffer_for_peer(&buffer, peer_id, cx),
6374 })
6375 })
6376 }
6377
6378 async fn handle_open_buffer_by_path(
6379 this: ModelHandle<Self>,
6380 envelope: TypedEnvelope<proto::OpenBufferByPath>,
6381 _: Arc<Client>,
6382 mut cx: AsyncAppContext,
6383 ) -> Result<proto::OpenBufferResponse> {
6384 let peer_id = envelope.original_sender_id()?;
6385 let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
6386 let open_buffer = this.update(&mut cx, |this, cx| {
6387 this.open_buffer(
6388 ProjectPath {
6389 worktree_id,
6390 path: PathBuf::from(envelope.payload.path).into(),
6391 },
6392 cx,
6393 )
6394 });
6395
6396 let buffer = open_buffer.await?;
6397 this.update(&mut cx, |this, cx| {
6398 Ok(proto::OpenBufferResponse {
6399 buffer_id: this.create_buffer_for_peer(&buffer, peer_id, cx),
6400 })
6401 })
6402 }
6403
6404 fn serialize_project_transaction_for_peer(
6405 &mut self,
6406 project_transaction: ProjectTransaction,
6407 peer_id: proto::PeerId,
6408 cx: &mut AppContext,
6409 ) -> proto::ProjectTransaction {
6410 let mut serialized_transaction = proto::ProjectTransaction {
6411 buffer_ids: Default::default(),
6412 transactions: Default::default(),
6413 };
6414 for (buffer, transaction) in project_transaction.0 {
6415 serialized_transaction
6416 .buffer_ids
6417 .push(self.create_buffer_for_peer(&buffer, peer_id, cx));
6418 serialized_transaction
6419 .transactions
6420 .push(language::proto::serialize_transaction(&transaction));
6421 }
6422 serialized_transaction
6423 }
6424
6425 fn deserialize_project_transaction(
6426 &mut self,
6427 message: proto::ProjectTransaction,
6428 push_to_history: bool,
6429 cx: &mut ModelContext<Self>,
6430 ) -> Task<Result<ProjectTransaction>> {
6431 cx.spawn(|this, mut cx| async move {
6432 let mut project_transaction = ProjectTransaction::default();
6433 for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
6434 {
6435 let buffer = this
6436 .update(&mut cx, |this, cx| {
6437 this.wait_for_remote_buffer(buffer_id, cx)
6438 })
6439 .await?;
6440 let transaction = language::proto::deserialize_transaction(transaction)?;
6441 project_transaction.0.insert(buffer, transaction);
6442 }
6443
6444 for (buffer, transaction) in &project_transaction.0 {
6445 buffer
6446 .update(&mut cx, |buffer, _| {
6447 buffer.wait_for_edits(transaction.edit_ids.iter().copied())
6448 })
6449 .await?;
6450
6451 if push_to_history {
6452 buffer.update(&mut cx, |buffer, _| {
6453 buffer.push_transaction(transaction.clone(), Instant::now());
6454 });
6455 }
6456 }
6457
6458 Ok(project_transaction)
6459 })
6460 }
6461
6462 fn create_buffer_for_peer(
6463 &mut self,
6464 buffer: &ModelHandle<Buffer>,
6465 peer_id: proto::PeerId,
6466 cx: &mut AppContext,
6467 ) -> u64 {
6468 let buffer_id = buffer.read(cx).remote_id();
6469 if let Some(ProjectClientState::Local { updates_tx, .. }) = &self.client_state {
6470 updates_tx
6471 .unbounded_send(LocalProjectUpdate::CreateBufferForPeer { peer_id, buffer_id })
6472 .ok();
6473 }
6474 buffer_id
6475 }
6476
6477 fn wait_for_remote_buffer(
6478 &mut self,
6479 id: u64,
6480 cx: &mut ModelContext<Self>,
6481 ) -> Task<Result<ModelHandle<Buffer>>> {
6482 let mut opened_buffer_rx = self.opened_buffer.1.clone();
6483
6484 cx.spawn_weak(|this, mut cx| async move {
6485 let buffer = loop {
6486 let Some(this) = this.upgrade(&cx) else {
6487 return Err(anyhow!("project dropped"));
6488 };
6489
6490 let buffer = this.read_with(&cx, |this, cx| {
6491 this.opened_buffers
6492 .get(&id)
6493 .and_then(|buffer| buffer.upgrade(cx))
6494 });
6495
6496 if let Some(buffer) = buffer {
6497 break buffer;
6498 } else if this.read_with(&cx, |this, _| this.is_read_only()) {
6499 return Err(anyhow!("disconnected before buffer {} could be opened", id));
6500 }
6501
6502 this.update(&mut cx, |this, _| {
6503 this.incomplete_remote_buffers.entry(id).or_default();
6504 });
6505 drop(this);
6506
6507 opened_buffer_rx
6508 .next()
6509 .await
6510 .ok_or_else(|| anyhow!("project dropped while waiting for buffer"))?;
6511 };
6512
6513 Ok(buffer)
6514 })
6515 }
6516
6517 fn synchronize_remote_buffers(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
6518 let project_id = match self.client_state.as_ref() {
6519 Some(ProjectClientState::Remote {
6520 sharing_has_stopped,
6521 remote_id,
6522 ..
6523 }) => {
6524 if *sharing_has_stopped {
6525 return Task::ready(Err(anyhow!(
6526 "can't synchronize remote buffers on a readonly project"
6527 )));
6528 } else {
6529 *remote_id
6530 }
6531 }
6532 Some(ProjectClientState::Local { .. }) | None => {
6533 return Task::ready(Err(anyhow!(
6534 "can't synchronize remote buffers on a local project"
6535 )))
6536 }
6537 };
6538
6539 let client = self.client.clone();
6540 cx.spawn(|this, cx| async move {
6541 let (buffers, incomplete_buffer_ids) = this.read_with(&cx, |this, cx| {
6542 let buffers = this
6543 .opened_buffers
6544 .iter()
6545 .filter_map(|(id, buffer)| {
6546 let buffer = buffer.upgrade(cx)?;
6547 Some(proto::BufferVersion {
6548 id: *id,
6549 version: language::proto::serialize_version(&buffer.read(cx).version),
6550 })
6551 })
6552 .collect();
6553 let incomplete_buffer_ids = this
6554 .incomplete_remote_buffers
6555 .keys()
6556 .copied()
6557 .collect::<Vec<_>>();
6558
6559 (buffers, incomplete_buffer_ids)
6560 });
6561 let response = client
6562 .request(proto::SynchronizeBuffers {
6563 project_id,
6564 buffers,
6565 })
6566 .await?;
6567
6568 let send_updates_for_buffers = response.buffers.into_iter().map(|buffer| {
6569 let client = client.clone();
6570 let buffer_id = buffer.id;
6571 let remote_version = language::proto::deserialize_version(&buffer.version);
6572 this.read_with(&cx, |this, cx| {
6573 if let Some(buffer) = this.buffer_for_id(buffer_id, cx) {
6574 let operations = buffer.read(cx).serialize_ops(Some(remote_version), cx);
6575 cx.background().spawn(async move {
6576 let operations = operations.await;
6577 for chunk in split_operations(operations) {
6578 client
6579 .request(proto::UpdateBuffer {
6580 project_id,
6581 buffer_id,
6582 operations: chunk,
6583 })
6584 .await?;
6585 }
6586 anyhow::Ok(())
6587 })
6588 } else {
6589 Task::ready(Ok(()))
6590 }
6591 })
6592 });
6593
6594 // Any incomplete buffers have open requests waiting. Request that the host sends
6595 // creates these buffers for us again to unblock any waiting futures.
6596 for id in incomplete_buffer_ids {
6597 cx.background()
6598 .spawn(client.request(proto::OpenBufferById { project_id, id }))
6599 .detach();
6600 }
6601
6602 futures::future::join_all(send_updates_for_buffers)
6603 .await
6604 .into_iter()
6605 .collect()
6606 })
6607 }
6608
6609 pub fn worktree_metadata_protos(&self, cx: &AppContext) -> Vec<proto::WorktreeMetadata> {
6610 self.worktrees(cx)
6611 .map(|worktree| {
6612 let worktree = worktree.read(cx);
6613 proto::WorktreeMetadata {
6614 id: worktree.id().to_proto(),
6615 root_name: worktree.root_name().into(),
6616 visible: worktree.is_visible(),
6617 abs_path: worktree.abs_path().to_string_lossy().into(),
6618 }
6619 })
6620 .collect()
6621 }
6622
6623 fn set_worktrees_from_proto(
6624 &mut self,
6625 worktrees: Vec<proto::WorktreeMetadata>,
6626 cx: &mut ModelContext<Project>,
6627 ) -> Result<()> {
6628 let replica_id = self.replica_id();
6629 let remote_id = self.remote_id().ok_or_else(|| anyhow!("invalid project"))?;
6630
6631 let mut old_worktrees_by_id = self
6632 .worktrees
6633 .drain(..)
6634 .filter_map(|worktree| {
6635 let worktree = worktree.upgrade(cx)?;
6636 Some((worktree.read(cx).id(), worktree))
6637 })
6638 .collect::<HashMap<_, _>>();
6639
6640 for worktree in worktrees {
6641 if let Some(old_worktree) =
6642 old_worktrees_by_id.remove(&WorktreeId::from_proto(worktree.id))
6643 {
6644 self.worktrees.push(WorktreeHandle::Strong(old_worktree));
6645 } else {
6646 let worktree =
6647 Worktree::remote(remote_id, replica_id, worktree, self.client.clone(), cx);
6648 let _ = self.add_worktree(&worktree, cx);
6649 }
6650 }
6651
6652 self.metadata_changed(cx);
6653 for id in old_worktrees_by_id.keys() {
6654 cx.emit(Event::WorktreeRemoved(*id));
6655 }
6656
6657 Ok(())
6658 }
6659
6660 fn set_collaborators_from_proto(
6661 &mut self,
6662 messages: Vec<proto::Collaborator>,
6663 cx: &mut ModelContext<Self>,
6664 ) -> Result<()> {
6665 let mut collaborators = HashMap::default();
6666 for message in messages {
6667 let collaborator = Collaborator::from_proto(message)?;
6668 collaborators.insert(collaborator.peer_id, collaborator);
6669 }
6670 for old_peer_id in self.collaborators.keys() {
6671 if !collaborators.contains_key(old_peer_id) {
6672 cx.emit(Event::CollaboratorLeft(*old_peer_id));
6673 }
6674 }
6675 self.collaborators = collaborators;
6676 Ok(())
6677 }
6678
6679 fn deserialize_symbol(
6680 &self,
6681 serialized_symbol: proto::Symbol,
6682 ) -> impl Future<Output = Result<Symbol>> {
6683 let languages = self.languages.clone();
6684 async move {
6685 let source_worktree_id = WorktreeId::from_proto(serialized_symbol.source_worktree_id);
6686 let worktree_id = WorktreeId::from_proto(serialized_symbol.worktree_id);
6687 let start = serialized_symbol
6688 .start
6689 .ok_or_else(|| anyhow!("invalid start"))?;
6690 let end = serialized_symbol
6691 .end
6692 .ok_or_else(|| anyhow!("invalid end"))?;
6693 let kind = unsafe { mem::transmute(serialized_symbol.kind) };
6694 let path = ProjectPath {
6695 worktree_id,
6696 path: PathBuf::from(serialized_symbol.path).into(),
6697 };
6698 let language = languages
6699 .language_for_file(&path.path, None)
6700 .await
6701 .log_err();
6702 Ok(Symbol {
6703 language_server_name: LanguageServerName(
6704 serialized_symbol.language_server_name.into(),
6705 ),
6706 source_worktree_id,
6707 path,
6708 label: {
6709 match language {
6710 Some(language) => {
6711 language
6712 .label_for_symbol(&serialized_symbol.name, kind)
6713 .await
6714 }
6715 None => None,
6716 }
6717 .unwrap_or_else(|| CodeLabel::plain(serialized_symbol.name.clone(), None))
6718 },
6719
6720 name: serialized_symbol.name,
6721 range: Unclipped(PointUtf16::new(start.row, start.column))
6722 ..Unclipped(PointUtf16::new(end.row, end.column)),
6723 kind,
6724 signature: serialized_symbol
6725 .signature
6726 .try_into()
6727 .map_err(|_| anyhow!("invalid signature"))?,
6728 })
6729 }
6730 }
6731
6732 async fn handle_buffer_saved(
6733 this: ModelHandle<Self>,
6734 envelope: TypedEnvelope<proto::BufferSaved>,
6735 _: Arc<Client>,
6736 mut cx: AsyncAppContext,
6737 ) -> Result<()> {
6738 let fingerprint = deserialize_fingerprint(&envelope.payload.fingerprint)?;
6739 let version = deserialize_version(&envelope.payload.version);
6740 let mtime = envelope
6741 .payload
6742 .mtime
6743 .ok_or_else(|| anyhow!("missing mtime"))?
6744 .into();
6745
6746 this.update(&mut cx, |this, cx| {
6747 let buffer = this
6748 .opened_buffers
6749 .get(&envelope.payload.buffer_id)
6750 .and_then(|buffer| buffer.upgrade(cx))
6751 .or_else(|| {
6752 this.incomplete_remote_buffers
6753 .get(&envelope.payload.buffer_id)
6754 .and_then(|b| b.clone())
6755 });
6756 if let Some(buffer) = buffer {
6757 buffer.update(cx, |buffer, cx| {
6758 buffer.did_save(version, fingerprint, mtime, cx);
6759 });
6760 }
6761 Ok(())
6762 })
6763 }
6764
6765 async fn handle_buffer_reloaded(
6766 this: ModelHandle<Self>,
6767 envelope: TypedEnvelope<proto::BufferReloaded>,
6768 _: Arc<Client>,
6769 mut cx: AsyncAppContext,
6770 ) -> Result<()> {
6771 let payload = envelope.payload;
6772 let version = deserialize_version(&payload.version);
6773 let fingerprint = deserialize_fingerprint(&payload.fingerprint)?;
6774 let line_ending = deserialize_line_ending(
6775 proto::LineEnding::from_i32(payload.line_ending)
6776 .ok_or_else(|| anyhow!("missing line ending"))?,
6777 );
6778 let mtime = payload
6779 .mtime
6780 .ok_or_else(|| anyhow!("missing mtime"))?
6781 .into();
6782 this.update(&mut cx, |this, cx| {
6783 let buffer = this
6784 .opened_buffers
6785 .get(&payload.buffer_id)
6786 .and_then(|buffer| buffer.upgrade(cx))
6787 .or_else(|| {
6788 this.incomplete_remote_buffers
6789 .get(&payload.buffer_id)
6790 .cloned()
6791 .flatten()
6792 });
6793 if let Some(buffer) = buffer {
6794 buffer.update(cx, |buffer, cx| {
6795 buffer.did_reload(version, fingerprint, line_ending, mtime, cx);
6796 });
6797 }
6798 Ok(())
6799 })
6800 }
6801
6802 #[allow(clippy::type_complexity)]
6803 fn edits_from_lsp(
6804 &mut self,
6805 buffer: &ModelHandle<Buffer>,
6806 lsp_edits: impl 'static + Send + IntoIterator<Item = lsp::TextEdit>,
6807 server_id: LanguageServerId,
6808 version: Option<i32>,
6809 cx: &mut ModelContext<Self>,
6810 ) -> Task<Result<Vec<(Range<Anchor>, String)>>> {
6811 let snapshot = self.buffer_snapshot_for_lsp_version(buffer, server_id, version, cx);
6812 cx.background().spawn(async move {
6813 let snapshot = snapshot?;
6814 let mut lsp_edits = lsp_edits
6815 .into_iter()
6816 .map(|edit| (range_from_lsp(edit.range), edit.new_text))
6817 .collect::<Vec<_>>();
6818 lsp_edits.sort_by_key(|(range, _)| range.start);
6819
6820 let mut lsp_edits = lsp_edits.into_iter().peekable();
6821 let mut edits = Vec::new();
6822 while let Some((range, mut new_text)) = lsp_edits.next() {
6823 // Clip invalid ranges provided by the language server.
6824 let mut range = snapshot.clip_point_utf16(range.start, Bias::Left)
6825 ..snapshot.clip_point_utf16(range.end, Bias::Left);
6826
6827 // Combine any LSP edits that are adjacent.
6828 //
6829 // Also, combine LSP edits that are separated from each other by only
6830 // a newline. This is important because for some code actions,
6831 // Rust-analyzer rewrites the entire buffer via a series of edits that
6832 // are separated by unchanged newline characters.
6833 //
6834 // In order for the diffing logic below to work properly, any edits that
6835 // cancel each other out must be combined into one.
6836 while let Some((next_range, next_text)) = lsp_edits.peek() {
6837 if next_range.start.0 > range.end {
6838 if next_range.start.0.row > range.end.row + 1
6839 || next_range.start.0.column > 0
6840 || snapshot.clip_point_utf16(
6841 Unclipped(PointUtf16::new(range.end.row, u32::MAX)),
6842 Bias::Left,
6843 ) > range.end
6844 {
6845 break;
6846 }
6847 new_text.push('\n');
6848 }
6849 range.end = snapshot.clip_point_utf16(next_range.end, Bias::Left);
6850 new_text.push_str(next_text);
6851 lsp_edits.next();
6852 }
6853
6854 // For multiline edits, perform a diff of the old and new text so that
6855 // we can identify the changes more precisely, preserving the locations
6856 // of any anchors positioned in the unchanged regions.
6857 if range.end.row > range.start.row {
6858 let mut offset = range.start.to_offset(&snapshot);
6859 let old_text = snapshot.text_for_range(range).collect::<String>();
6860
6861 let diff = TextDiff::from_lines(old_text.as_str(), &new_text);
6862 let mut moved_since_edit = true;
6863 for change in diff.iter_all_changes() {
6864 let tag = change.tag();
6865 let value = change.value();
6866 match tag {
6867 ChangeTag::Equal => {
6868 offset += value.len();
6869 moved_since_edit = true;
6870 }
6871 ChangeTag::Delete => {
6872 let start = snapshot.anchor_after(offset);
6873 let end = snapshot.anchor_before(offset + value.len());
6874 if moved_since_edit {
6875 edits.push((start..end, String::new()));
6876 } else {
6877 edits.last_mut().unwrap().0.end = end;
6878 }
6879 offset += value.len();
6880 moved_since_edit = false;
6881 }
6882 ChangeTag::Insert => {
6883 if moved_since_edit {
6884 let anchor = snapshot.anchor_after(offset);
6885 edits.push((anchor..anchor, value.to_string()));
6886 } else {
6887 edits.last_mut().unwrap().1.push_str(value);
6888 }
6889 moved_since_edit = false;
6890 }
6891 }
6892 }
6893 } else if range.end == range.start {
6894 let anchor = snapshot.anchor_after(range.start);
6895 edits.push((anchor..anchor, new_text));
6896 } else {
6897 let edit_start = snapshot.anchor_after(range.start);
6898 let edit_end = snapshot.anchor_before(range.end);
6899 edits.push((edit_start..edit_end, new_text));
6900 }
6901 }
6902
6903 Ok(edits)
6904 })
6905 }
6906
6907 fn buffer_snapshot_for_lsp_version(
6908 &mut self,
6909 buffer: &ModelHandle<Buffer>,
6910 server_id: LanguageServerId,
6911 version: Option<i32>,
6912 cx: &AppContext,
6913 ) -> Result<TextBufferSnapshot> {
6914 const OLD_VERSIONS_TO_RETAIN: i32 = 10;
6915
6916 if let Some(version) = version {
6917 let buffer_id = buffer.read(cx).remote_id();
6918 let snapshots = self
6919 .buffer_snapshots
6920 .get_mut(&buffer_id)
6921 .and_then(|m| m.get_mut(&server_id))
6922 .ok_or_else(|| {
6923 anyhow!("no snapshots found for buffer {buffer_id} and server {server_id}")
6924 })?;
6925
6926 let found_snapshot = snapshots
6927 .binary_search_by_key(&version, |e| e.version)
6928 .map(|ix| snapshots[ix].snapshot.clone())
6929 .map_err(|_| {
6930 anyhow!("snapshot not found for buffer {buffer_id} server {server_id} at version {version}")
6931 })?;
6932
6933 snapshots.retain(|snapshot| snapshot.version + OLD_VERSIONS_TO_RETAIN >= version);
6934 Ok(found_snapshot)
6935 } else {
6936 Ok((buffer.read(cx)).text_snapshot())
6937 }
6938 }
6939
6940 pub fn language_servers(
6941 &self,
6942 ) -> impl '_ + Iterator<Item = (LanguageServerId, LanguageServerName, WorktreeId)> {
6943 self.language_server_ids
6944 .iter()
6945 .map(|((worktree_id, server_name), server_id)| {
6946 (*server_id, server_name.clone(), *worktree_id)
6947 })
6948 }
6949
6950 pub fn language_server_for_id(&self, id: LanguageServerId) -> Option<Arc<LanguageServer>> {
6951 if let LanguageServerState::Running { server, .. } = self.language_servers.get(&id)? {
6952 Some(server.clone())
6953 } else {
6954 None
6955 }
6956 }
6957
6958 pub fn language_servers_for_buffer(
6959 &self,
6960 buffer: &Buffer,
6961 cx: &AppContext,
6962 ) -> impl Iterator<Item = (&Arc<CachedLspAdapter>, &Arc<LanguageServer>)> {
6963 self.language_server_ids_for_buffer(buffer, cx)
6964 .into_iter()
6965 .filter_map(|server_id| {
6966 let server = self.language_servers.get(&server_id)?;
6967 if let LanguageServerState::Running {
6968 adapter, server, ..
6969 } = server
6970 {
6971 Some((adapter, server))
6972 } else {
6973 None
6974 }
6975 })
6976 }
6977
6978 fn primary_language_servers_for_buffer(
6979 &self,
6980 buffer: &Buffer,
6981 cx: &AppContext,
6982 ) -> Option<(&Arc<CachedLspAdapter>, &Arc<LanguageServer>)> {
6983 self.language_servers_for_buffer(buffer, cx).next()
6984 }
6985
6986 fn language_server_for_buffer(
6987 &self,
6988 buffer: &Buffer,
6989 server_id: LanguageServerId,
6990 cx: &AppContext,
6991 ) -> Option<(&Arc<CachedLspAdapter>, &Arc<LanguageServer>)> {
6992 self.language_servers_for_buffer(buffer, cx)
6993 .find(|(_, s)| s.server_id() == server_id)
6994 }
6995
6996 fn language_server_ids_for_buffer(
6997 &self,
6998 buffer: &Buffer,
6999 cx: &AppContext,
7000 ) -> Vec<LanguageServerId> {
7001 if let Some((file, language)) = File::from_dyn(buffer.file()).zip(buffer.language()) {
7002 let worktree_id = file.worktree_id(cx);
7003 language
7004 .lsp_adapters()
7005 .iter()
7006 .flat_map(|adapter| {
7007 let key = (worktree_id, adapter.name.clone());
7008 self.language_server_ids.get(&key).copied()
7009 })
7010 .collect()
7011 } else {
7012 Vec::new()
7013 }
7014 }
7015}
7016
7017impl WorktreeHandle {
7018 pub fn upgrade(&self, cx: &AppContext) -> Option<ModelHandle<Worktree>> {
7019 match self {
7020 WorktreeHandle::Strong(handle) => Some(handle.clone()),
7021 WorktreeHandle::Weak(handle) => handle.upgrade(cx),
7022 }
7023 }
7024
7025 pub fn handle_id(&self) -> usize {
7026 match self {
7027 WorktreeHandle::Strong(handle) => handle.id(),
7028 WorktreeHandle::Weak(handle) => handle.id(),
7029 }
7030 }
7031}
7032
7033impl OpenBuffer {
7034 pub fn upgrade(&self, cx: &impl BorrowAppContext) -> Option<ModelHandle<Buffer>> {
7035 match self {
7036 OpenBuffer::Strong(handle) => Some(handle.clone()),
7037 OpenBuffer::Weak(handle) => handle.upgrade(cx),
7038 OpenBuffer::Operations(_) => None,
7039 }
7040 }
7041}
7042
7043pub struct PathMatchCandidateSet {
7044 pub snapshot: Snapshot,
7045 pub include_ignored: bool,
7046 pub include_root_name: bool,
7047}
7048
7049impl<'a> fuzzy::PathMatchCandidateSet<'a> for PathMatchCandidateSet {
7050 type Candidates = PathMatchCandidateSetIter<'a>;
7051
7052 fn id(&self) -> usize {
7053 self.snapshot.id().to_usize()
7054 }
7055
7056 fn len(&self) -> usize {
7057 if self.include_ignored {
7058 self.snapshot.file_count()
7059 } else {
7060 self.snapshot.visible_file_count()
7061 }
7062 }
7063
7064 fn prefix(&self) -> Arc<str> {
7065 if self.snapshot.root_entry().map_or(false, |e| e.is_file()) {
7066 self.snapshot.root_name().into()
7067 } else if self.include_root_name {
7068 format!("{}/", self.snapshot.root_name()).into()
7069 } else {
7070 "".into()
7071 }
7072 }
7073
7074 fn candidates(&'a self, start: usize) -> Self::Candidates {
7075 PathMatchCandidateSetIter {
7076 traversal: self.snapshot.files(self.include_ignored, start),
7077 }
7078 }
7079}
7080
7081pub struct PathMatchCandidateSetIter<'a> {
7082 traversal: Traversal<'a>,
7083}
7084
7085impl<'a> Iterator for PathMatchCandidateSetIter<'a> {
7086 type Item = fuzzy::PathMatchCandidate<'a>;
7087
7088 fn next(&mut self) -> Option<Self::Item> {
7089 self.traversal.next().map(|entry| {
7090 if let EntryKind::File(char_bag) = entry.kind {
7091 fuzzy::PathMatchCandidate {
7092 path: &entry.path,
7093 char_bag,
7094 }
7095 } else {
7096 unreachable!()
7097 }
7098 })
7099 }
7100}
7101
7102impl Entity for Project {
7103 type Event = Event;
7104
7105 fn release(&mut self, cx: &mut gpui::AppContext) {
7106 match &self.client_state {
7107 Some(ProjectClientState::Local { .. }) => {
7108 let _ = self.unshare_internal(cx);
7109 }
7110 Some(ProjectClientState::Remote { remote_id, .. }) => {
7111 let _ = self.client.send(proto::LeaveProject {
7112 project_id: *remote_id,
7113 });
7114 self.disconnected_from_host_internal(cx);
7115 }
7116 _ => {}
7117 }
7118 }
7119
7120 fn app_will_quit(
7121 &mut self,
7122 _: &mut AppContext,
7123 ) -> Option<std::pin::Pin<Box<dyn 'static + Future<Output = ()>>>> {
7124 let shutdown_futures = self
7125 .language_servers
7126 .drain()
7127 .map(|(_, server_state)| async {
7128 match server_state {
7129 LanguageServerState::Running { server, .. } => server.shutdown()?.await,
7130 LanguageServerState::Starting(starting_server) => {
7131 starting_server.await?.shutdown()?.await
7132 }
7133 }
7134 })
7135 .collect::<Vec<_>>();
7136
7137 Some(
7138 async move {
7139 futures::future::join_all(shutdown_futures).await;
7140 }
7141 .boxed(),
7142 )
7143 }
7144}
7145
7146impl Collaborator {
7147 fn from_proto(message: proto::Collaborator) -> Result<Self> {
7148 Ok(Self {
7149 peer_id: message.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?,
7150 replica_id: message.replica_id as ReplicaId,
7151 })
7152 }
7153}
7154
7155impl<P: AsRef<Path>> From<(WorktreeId, P)> for ProjectPath {
7156 fn from((worktree_id, path): (WorktreeId, P)) -> Self {
7157 Self {
7158 worktree_id,
7159 path: path.as_ref().into(),
7160 }
7161 }
7162}
7163
7164fn split_operations(
7165 mut operations: Vec<proto::Operation>,
7166) -> impl Iterator<Item = Vec<proto::Operation>> {
7167 #[cfg(any(test, feature = "test-support"))]
7168 const CHUNK_SIZE: usize = 5;
7169
7170 #[cfg(not(any(test, feature = "test-support")))]
7171 const CHUNK_SIZE: usize = 100;
7172
7173 let mut done = false;
7174 std::iter::from_fn(move || {
7175 if done {
7176 return None;
7177 }
7178
7179 let operations = operations
7180 .drain(..cmp::min(CHUNK_SIZE, operations.len()))
7181 .collect::<Vec<_>>();
7182 if operations.is_empty() {
7183 done = true;
7184 }
7185 Some(operations)
7186 })
7187}
7188
7189fn serialize_symbol(symbol: &Symbol) -> proto::Symbol {
7190 proto::Symbol {
7191 language_server_name: symbol.language_server_name.0.to_string(),
7192 source_worktree_id: symbol.source_worktree_id.to_proto(),
7193 worktree_id: symbol.path.worktree_id.to_proto(),
7194 path: symbol.path.path.to_string_lossy().to_string(),
7195 name: symbol.name.clone(),
7196 kind: unsafe { mem::transmute(symbol.kind) },
7197 start: Some(proto::PointUtf16 {
7198 row: symbol.range.start.0.row,
7199 column: symbol.range.start.0.column,
7200 }),
7201 end: Some(proto::PointUtf16 {
7202 row: symbol.range.end.0.row,
7203 column: symbol.range.end.0.column,
7204 }),
7205 signature: symbol.signature.to_vec(),
7206 }
7207}
7208
7209fn relativize_path(base: &Path, path: &Path) -> PathBuf {
7210 let mut path_components = path.components();
7211 let mut base_components = base.components();
7212 let mut components: Vec<Component> = Vec::new();
7213 loop {
7214 match (path_components.next(), base_components.next()) {
7215 (None, None) => break,
7216 (Some(a), None) => {
7217 components.push(a);
7218 components.extend(path_components.by_ref());
7219 break;
7220 }
7221 (None, _) => components.push(Component::ParentDir),
7222 (Some(a), Some(b)) if components.is_empty() && a == b => (),
7223 (Some(a), Some(b)) if b == Component::CurDir => components.push(a),
7224 (Some(a), Some(_)) => {
7225 components.push(Component::ParentDir);
7226 for _ in base_components {
7227 components.push(Component::ParentDir);
7228 }
7229 components.push(a);
7230 components.extend(path_components.by_ref());
7231 break;
7232 }
7233 }
7234 }
7235 components.iter().map(|c| c.as_os_str()).collect()
7236}
7237
7238impl Item for Buffer {
7239 fn entry_id(&self, cx: &AppContext) -> Option<ProjectEntryId> {
7240 File::from_dyn(self.file()).and_then(|file| file.project_entry_id(cx))
7241 }
7242
7243 fn project_path(&self, cx: &AppContext) -> Option<ProjectPath> {
7244 File::from_dyn(self.file()).map(|file| ProjectPath {
7245 worktree_id: file.worktree_id(cx),
7246 path: file.path().clone(),
7247 })
7248 }
7249}
7250
7251async fn wait_for_loading_buffer(
7252 mut receiver: postage::watch::Receiver<Option<Result<ModelHandle<Buffer>, Arc<anyhow::Error>>>>,
7253) -> Result<ModelHandle<Buffer>, Arc<anyhow::Error>> {
7254 loop {
7255 if let Some(result) = receiver.borrow().as_ref() {
7256 match result {
7257 Ok(buffer) => return Ok(buffer.to_owned()),
7258 Err(e) => return Err(e.to_owned()),
7259 }
7260 }
7261 receiver.next().await;
7262 }
7263}