1mod ignore;
2mod lsp_command;
3pub mod project_settings;
4pub mod search;
5pub mod terminals;
6pub mod worktree;
7
8#[cfg(test)]
9mod project_tests;
10#[cfg(test)]
11mod worktree_tests;
12
13use anyhow::{anyhow, Context, Result};
14use client::{proto, Client, TypedEnvelope, UserStore};
15use clock::ReplicaId;
16use collections::{hash_map, BTreeMap, HashMap, HashSet};
17use copilot::Copilot;
18use futures::{
19 channel::{
20 mpsc::{self, UnboundedReceiver},
21 oneshot,
22 },
23 future::{try_join_all, Shared},
24 stream::FuturesUnordered,
25 AsyncWriteExt, Future, FutureExt, StreamExt, TryFutureExt,
26};
27use globset::{Glob, GlobSet, GlobSetBuilder};
28use gpui::{
29 AnyModelHandle, AppContext, AsyncAppContext, BorrowAppContext, Entity, ModelContext,
30 ModelHandle, Task, WeakModelHandle,
31};
32use itertools::Itertools;
33use language::{
34 language_settings::{language_settings, FormatOnSave, Formatter, InlayHintKind},
35 point_to_lsp,
36 proto::{
37 deserialize_anchor, deserialize_fingerprint, deserialize_line_ending, deserialize_version,
38 serialize_anchor, serialize_version,
39 },
40 range_from_lsp, range_to_lsp, Bias, Buffer, CachedLspAdapter, CodeAction, CodeLabel,
41 Completion, Diagnostic, DiagnosticEntry, DiagnosticSet, Diff, Event as BufferEvent, File as _,
42 Language, LanguageRegistry, LanguageServerName, LocalFile, LspAdapterDelegate, OffsetRangeExt,
43 Operation, Patch, PendingLanguageServer, PointUtf16, TextBufferSnapshot, ToOffset,
44 ToPointUtf16, Transaction, Unclipped,
45};
46use log::error;
47use lsp::{
48 DiagnosticSeverity, DiagnosticTag, DidChangeWatchedFilesRegistrationOptions,
49 DocumentHighlightKind, LanguageServer, LanguageServerBinary, LanguageServerId, OneOf,
50};
51use lsp_command::*;
52use postage::watch;
53use project_settings::ProjectSettings;
54use rand::prelude::*;
55use search::SearchQuery;
56use serde::Serialize;
57use settings::SettingsStore;
58use sha2::{Digest, Sha256};
59use similar::{ChangeTag, TextDiff};
60use std::{
61 cell::RefCell,
62 cmp::{self, Ordering},
63 convert::TryInto,
64 hash::Hash,
65 mem,
66 num::NonZeroU32,
67 ops::Range,
68 path::{self, Component, Path, PathBuf},
69 process::Stdio,
70 rc::Rc,
71 str,
72 sync::{
73 atomic::{AtomicUsize, Ordering::SeqCst},
74 Arc,
75 },
76 time::{Duration, Instant},
77};
78use terminals::Terminals;
79use text::Anchor;
80use util::{
81 debug_panic, defer, http::HttpClient, merge_json_value_into,
82 paths::LOCAL_SETTINGS_RELATIVE_PATH, post_inc, ResultExt, TryFutureExt as _,
83};
84
85pub use fs::*;
86pub use worktree::*;
87
88pub trait Item {
89 fn entry_id(&self, cx: &AppContext) -> Option<ProjectEntryId>;
90 fn project_path(&self, cx: &AppContext) -> Option<ProjectPath>;
91}
92
93// Language server state is stored across 3 collections:
94// language_servers =>
95// a mapping from unique server id to LanguageServerState which can either be a task for a
96// server in the process of starting, or a running server with adapter and language server arcs
97// language_server_ids => a mapping from worktreeId and server name to the unique server id
98// language_server_statuses => a mapping from unique server id to the current server status
99//
100// Multiple worktrees can map to the same language server for example when you jump to the definition
101// of a file in the standard library. So language_server_ids is used to look up which server is active
102// for a given worktree and language server name
103//
104// When starting a language server, first the id map is checked to make sure a server isn't already available
105// for that worktree. If there is one, it finishes early. Otherwise, a new id is allocated and and
106// the Starting variant of LanguageServerState is stored in the language_servers map.
107pub struct Project {
108 worktrees: Vec<WorktreeHandle>,
109 active_entry: Option<ProjectEntryId>,
110 buffer_ordered_messages_tx: mpsc::UnboundedSender<BufferOrderedMessage>,
111 languages: Arc<LanguageRegistry>,
112 language_servers: HashMap<LanguageServerId, LanguageServerState>,
113 language_server_ids: HashMap<(WorktreeId, LanguageServerName), LanguageServerId>,
114 language_server_statuses: BTreeMap<LanguageServerId, LanguageServerStatus>,
115 last_workspace_edits_by_language_server: HashMap<LanguageServerId, ProjectTransaction>,
116 client: Arc<client::Client>,
117 next_entry_id: Arc<AtomicUsize>,
118 join_project_response_message_id: u32,
119 next_diagnostic_group_id: usize,
120 user_store: ModelHandle<UserStore>,
121 fs: Arc<dyn Fs>,
122 client_state: Option<ProjectClientState>,
123 collaborators: HashMap<proto::PeerId, Collaborator>,
124 client_subscriptions: Vec<client::Subscription>,
125 _subscriptions: Vec<gpui::Subscription>,
126 next_buffer_id: u64,
127 opened_buffer: (watch::Sender<()>, watch::Receiver<()>),
128 shared_buffers: HashMap<proto::PeerId, HashSet<u64>>,
129 #[allow(clippy::type_complexity)]
130 loading_buffers_by_path: HashMap<
131 ProjectPath,
132 postage::watch::Receiver<Option<Result<ModelHandle<Buffer>, Arc<anyhow::Error>>>>,
133 >,
134 #[allow(clippy::type_complexity)]
135 loading_local_worktrees:
136 HashMap<Arc<Path>, Shared<Task<Result<ModelHandle<Worktree>, Arc<anyhow::Error>>>>>,
137 opened_buffers: HashMap<u64, OpenBuffer>,
138 local_buffer_ids_by_path: HashMap<ProjectPath, u64>,
139 local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, u64>,
140 /// A mapping from a buffer ID to None means that we've started waiting for an ID but haven't finished loading it.
141 /// Used for re-issuing buffer requests when peers temporarily disconnect
142 incomplete_remote_buffers: HashMap<u64, Option<ModelHandle<Buffer>>>,
143 buffer_snapshots: HashMap<u64, HashMap<LanguageServerId, Vec<LspBufferSnapshot>>>, // buffer_id -> server_id -> vec of snapshots
144 buffers_being_formatted: HashSet<u64>,
145 buffers_needing_diff: HashSet<WeakModelHandle<Buffer>>,
146 git_diff_debouncer: DelayedDebounced,
147 nonce: u128,
148 _maintain_buffer_languages: Task<()>,
149 _maintain_workspace_config: Task<()>,
150 terminals: Terminals,
151 copilot_enabled: bool,
152}
153
154struct DelayedDebounced {
155 task: Option<Task<()>>,
156 cancel_channel: Option<oneshot::Sender<()>>,
157}
158
159impl DelayedDebounced {
160 fn new() -> DelayedDebounced {
161 DelayedDebounced {
162 task: None,
163 cancel_channel: None,
164 }
165 }
166
167 fn fire_new<F>(&mut self, delay: Duration, cx: &mut ModelContext<Project>, func: F)
168 where
169 F: 'static + FnOnce(&mut Project, &mut ModelContext<Project>) -> Task<()>,
170 {
171 if let Some(channel) = self.cancel_channel.take() {
172 _ = channel.send(());
173 }
174
175 let (sender, mut receiver) = oneshot::channel::<()>();
176 self.cancel_channel = Some(sender);
177
178 let previous_task = self.task.take();
179 self.task = Some(cx.spawn(|workspace, mut cx| async move {
180 let mut timer = cx.background().timer(delay).fuse();
181 if let Some(previous_task) = previous_task {
182 previous_task.await;
183 }
184
185 futures::select_biased! {
186 _ = receiver => return,
187 _ = timer => {}
188 }
189
190 workspace
191 .update(&mut cx, |workspace, cx| (func)(workspace, cx))
192 .await;
193 }));
194 }
195}
196
197struct LspBufferSnapshot {
198 version: i32,
199 snapshot: TextBufferSnapshot,
200}
201
202/// Message ordered with respect to buffer operations
203enum BufferOrderedMessage {
204 Operation {
205 buffer_id: u64,
206 operation: proto::Operation,
207 },
208 LanguageServerUpdate {
209 language_server_id: LanguageServerId,
210 message: proto::update_language_server::Variant,
211 },
212 Resync,
213}
214
215enum LocalProjectUpdate {
216 WorktreesChanged,
217 CreateBufferForPeer {
218 peer_id: proto::PeerId,
219 buffer_id: u64,
220 },
221}
222
223enum OpenBuffer {
224 Strong(ModelHandle<Buffer>),
225 Weak(WeakModelHandle<Buffer>),
226 Operations(Vec<Operation>),
227}
228
229#[derive(Clone)]
230enum WorktreeHandle {
231 Strong(ModelHandle<Worktree>),
232 Weak(WeakModelHandle<Worktree>),
233}
234
235enum ProjectClientState {
236 Local {
237 remote_id: u64,
238 updates_tx: mpsc::UnboundedSender<LocalProjectUpdate>,
239 _send_updates: Task<()>,
240 },
241 Remote {
242 sharing_has_stopped: bool,
243 remote_id: u64,
244 replica_id: ReplicaId,
245 },
246}
247
248#[derive(Clone, Debug)]
249pub struct Collaborator {
250 pub peer_id: proto::PeerId,
251 pub replica_id: ReplicaId,
252}
253
254#[derive(Clone, Debug, PartialEq)]
255pub enum Event {
256 LanguageServerAdded(LanguageServerId),
257 LanguageServerRemoved(LanguageServerId),
258 LanguageServerLog(LanguageServerId, String),
259 Notification(String),
260 ActiveEntryChanged(Option<ProjectEntryId>),
261 WorktreeAdded,
262 WorktreeRemoved(WorktreeId),
263 WorktreeUpdatedEntries(WorktreeId, UpdatedEntriesSet),
264 DiskBasedDiagnosticsStarted {
265 language_server_id: LanguageServerId,
266 },
267 DiskBasedDiagnosticsFinished {
268 language_server_id: LanguageServerId,
269 },
270 DiagnosticsUpdated {
271 path: ProjectPath,
272 language_server_id: LanguageServerId,
273 },
274 RemoteIdChanged(Option<u64>),
275 DisconnectedFromHost,
276 Closed,
277 DeletedEntry(ProjectEntryId),
278 CollaboratorUpdated {
279 old_peer_id: proto::PeerId,
280 new_peer_id: proto::PeerId,
281 },
282 CollaboratorLeft(proto::PeerId),
283 RefreshInlays,
284}
285
286pub enum LanguageServerState {
287 Starting(Task<Option<Arc<LanguageServer>>>),
288
289 Running {
290 language: Arc<Language>,
291 adapter: Arc<CachedLspAdapter>,
292 server: Arc<LanguageServer>,
293 watched_paths: HashMap<WorktreeId, GlobSet>,
294 simulate_disk_based_diagnostics_completion: Option<Task<()>>,
295 },
296}
297
298#[derive(Serialize)]
299pub struct LanguageServerStatus {
300 pub name: String,
301 pub pending_work: BTreeMap<String, LanguageServerProgress>,
302 pub has_pending_diagnostic_updates: bool,
303 progress_tokens: HashSet<String>,
304}
305
306#[derive(Clone, Debug, Serialize)]
307pub struct LanguageServerProgress {
308 pub message: Option<String>,
309 pub percentage: Option<usize>,
310 #[serde(skip_serializing)]
311 pub last_update_at: Instant,
312}
313
314#[derive(Clone, Debug, Eq, PartialEq, Hash, PartialOrd, Ord)]
315pub struct ProjectPath {
316 pub worktree_id: WorktreeId,
317 pub path: Arc<Path>,
318}
319
320#[derive(Copy, Clone, Debug, Default, PartialEq, Serialize)]
321pub struct DiagnosticSummary {
322 pub error_count: usize,
323 pub warning_count: usize,
324}
325
326#[derive(Debug, Clone, PartialEq, Eq, Hash)]
327pub struct Location {
328 pub buffer: ModelHandle<Buffer>,
329 pub range: Range<language::Anchor>,
330}
331
332#[derive(Debug, Clone, PartialEq, Eq, Hash)]
333pub struct InlayHint {
334 pub buffer_id: u64,
335 pub position: language::Anchor,
336 pub label: InlayHintLabel,
337 pub kind: Option<InlayHintKind>,
338 pub padding_left: bool,
339 pub padding_right: bool,
340 pub tooltip: Option<InlayHintTooltip>,
341}
342
343impl InlayHint {
344 pub fn text(&self) -> String {
345 match &self.label {
346 InlayHintLabel::String(s) => s.to_owned(),
347 InlayHintLabel::LabelParts(parts) => parts.iter().map(|part| &part.value).join(""),
348 }
349 }
350}
351
352#[derive(Debug, Clone, PartialEq, Eq, Hash)]
353pub enum InlayHintLabel {
354 String(String),
355 LabelParts(Vec<InlayHintLabelPart>),
356}
357
358#[derive(Debug, Clone, PartialEq, Eq, Hash)]
359pub struct InlayHintLabelPart {
360 pub value: String,
361 pub tooltip: Option<InlayHintLabelPartTooltip>,
362 pub location: Option<Location>,
363}
364
365#[derive(Debug, Clone, PartialEq, Eq, Hash)]
366pub enum InlayHintTooltip {
367 String(String),
368 MarkupContent(MarkupContent),
369}
370
371#[derive(Debug, Clone, PartialEq, Eq, Hash)]
372pub enum InlayHintLabelPartTooltip {
373 String(String),
374 MarkupContent(MarkupContent),
375}
376
377#[derive(Debug, Clone, PartialEq, Eq, Hash)]
378pub struct MarkupContent {
379 pub kind: String,
380 pub value: String,
381}
382
383#[derive(Debug, Clone)]
384pub struct LocationLink {
385 pub origin: Option<Location>,
386 pub target: Location,
387}
388
389#[derive(Debug)]
390pub struct DocumentHighlight {
391 pub range: Range<language::Anchor>,
392 pub kind: DocumentHighlightKind,
393}
394
395#[derive(Clone, Debug)]
396pub struct Symbol {
397 pub language_server_name: LanguageServerName,
398 pub source_worktree_id: WorktreeId,
399 pub path: ProjectPath,
400 pub label: CodeLabel,
401 pub name: String,
402 pub kind: lsp::SymbolKind,
403 pub range: Range<Unclipped<PointUtf16>>,
404 pub signature: [u8; 32],
405}
406
407#[derive(Clone, Debug, PartialEq)]
408pub struct HoverBlock {
409 pub text: String,
410 pub kind: HoverBlockKind,
411}
412
413#[derive(Clone, Debug, PartialEq)]
414pub enum HoverBlockKind {
415 PlainText,
416 Markdown,
417 Code { language: String },
418}
419
420#[derive(Debug)]
421pub struct Hover {
422 pub contents: Vec<HoverBlock>,
423 pub range: Option<Range<language::Anchor>>,
424 pub language: Option<Arc<Language>>,
425}
426
427#[derive(Default)]
428pub struct ProjectTransaction(pub HashMap<ModelHandle<Buffer>, language::Transaction>);
429
430impl DiagnosticSummary {
431 fn new<'a, T: 'a>(diagnostics: impl IntoIterator<Item = &'a DiagnosticEntry<T>>) -> Self {
432 let mut this = Self {
433 error_count: 0,
434 warning_count: 0,
435 };
436
437 for entry in diagnostics {
438 if entry.diagnostic.is_primary {
439 match entry.diagnostic.severity {
440 DiagnosticSeverity::ERROR => this.error_count += 1,
441 DiagnosticSeverity::WARNING => this.warning_count += 1,
442 _ => {}
443 }
444 }
445 }
446
447 this
448 }
449
450 pub fn is_empty(&self) -> bool {
451 self.error_count == 0 && self.warning_count == 0
452 }
453
454 pub fn to_proto(
455 &self,
456 language_server_id: LanguageServerId,
457 path: &Path,
458 ) -> proto::DiagnosticSummary {
459 proto::DiagnosticSummary {
460 path: path.to_string_lossy().to_string(),
461 language_server_id: language_server_id.0 as u64,
462 error_count: self.error_count as u32,
463 warning_count: self.warning_count as u32,
464 }
465 }
466}
467
468#[derive(Clone, Copy, Debug, Default, Hash, PartialEq, Eq, PartialOrd, Ord)]
469pub struct ProjectEntryId(usize);
470
471impl ProjectEntryId {
472 pub const MAX: Self = Self(usize::MAX);
473
474 pub fn new(counter: &AtomicUsize) -> Self {
475 Self(counter.fetch_add(1, SeqCst))
476 }
477
478 pub fn from_proto(id: u64) -> Self {
479 Self(id as usize)
480 }
481
482 pub fn to_proto(&self) -> u64 {
483 self.0 as u64
484 }
485
486 pub fn to_usize(&self) -> usize {
487 self.0
488 }
489}
490
491#[derive(Debug, Clone, Copy, PartialEq, Eq)]
492pub enum FormatTrigger {
493 Save,
494 Manual,
495}
496
497struct ProjectLspAdapterDelegate {
498 project: ModelHandle<Project>,
499 http_client: Arc<dyn HttpClient>,
500}
501
502impl FormatTrigger {
503 fn from_proto(value: i32) -> FormatTrigger {
504 match value {
505 0 => FormatTrigger::Save,
506 1 => FormatTrigger::Manual,
507 _ => FormatTrigger::Save,
508 }
509 }
510}
511
512impl Project {
513 pub fn init_settings(cx: &mut AppContext) {
514 settings::register::<ProjectSettings>(cx);
515 }
516
517 pub fn init(client: &Arc<Client>, cx: &mut AppContext) {
518 Self::init_settings(cx);
519
520 client.add_model_message_handler(Self::handle_add_collaborator);
521 client.add_model_message_handler(Self::handle_update_project_collaborator);
522 client.add_model_message_handler(Self::handle_remove_collaborator);
523 client.add_model_message_handler(Self::handle_buffer_reloaded);
524 client.add_model_message_handler(Self::handle_buffer_saved);
525 client.add_model_message_handler(Self::handle_start_language_server);
526 client.add_model_message_handler(Self::handle_update_language_server);
527 client.add_model_message_handler(Self::handle_update_project);
528 client.add_model_message_handler(Self::handle_unshare_project);
529 client.add_model_message_handler(Self::handle_create_buffer_for_peer);
530 client.add_model_message_handler(Self::handle_update_buffer_file);
531 client.add_model_request_handler(Self::handle_update_buffer);
532 client.add_model_message_handler(Self::handle_update_diagnostic_summary);
533 client.add_model_message_handler(Self::handle_update_worktree);
534 client.add_model_message_handler(Self::handle_update_worktree_settings);
535 client.add_model_request_handler(Self::handle_create_project_entry);
536 client.add_model_request_handler(Self::handle_rename_project_entry);
537 client.add_model_request_handler(Self::handle_copy_project_entry);
538 client.add_model_request_handler(Self::handle_delete_project_entry);
539 client.add_model_request_handler(Self::handle_expand_project_entry);
540 client.add_model_request_handler(Self::handle_apply_additional_edits_for_completion);
541 client.add_model_request_handler(Self::handle_apply_code_action);
542 client.add_model_request_handler(Self::handle_on_type_formatting);
543 client.add_model_request_handler(Self::handle_inlay_hints);
544 client.add_model_request_handler(Self::handle_refresh_inlay_hints);
545 client.add_model_request_handler(Self::handle_reload_buffers);
546 client.add_model_request_handler(Self::handle_synchronize_buffers);
547 client.add_model_request_handler(Self::handle_format_buffers);
548 client.add_model_request_handler(Self::handle_lsp_command::<GetCodeActions>);
549 client.add_model_request_handler(Self::handle_lsp_command::<GetCompletions>);
550 client.add_model_request_handler(Self::handle_lsp_command::<GetHover>);
551 client.add_model_request_handler(Self::handle_lsp_command::<GetDefinition>);
552 client.add_model_request_handler(Self::handle_lsp_command::<GetTypeDefinition>);
553 client.add_model_request_handler(Self::handle_lsp_command::<GetDocumentHighlights>);
554 client.add_model_request_handler(Self::handle_lsp_command::<GetReferences>);
555 client.add_model_request_handler(Self::handle_lsp_command::<PrepareRename>);
556 client.add_model_request_handler(Self::handle_lsp_command::<PerformRename>);
557 client.add_model_request_handler(Self::handle_search_project);
558 client.add_model_request_handler(Self::handle_get_project_symbols);
559 client.add_model_request_handler(Self::handle_open_buffer_for_symbol);
560 client.add_model_request_handler(Self::handle_open_buffer_by_id);
561 client.add_model_request_handler(Self::handle_open_buffer_by_path);
562 client.add_model_request_handler(Self::handle_save_buffer);
563 client.add_model_message_handler(Self::handle_update_diff_base);
564 }
565
566 pub fn local(
567 client: Arc<Client>,
568 user_store: ModelHandle<UserStore>,
569 languages: Arc<LanguageRegistry>,
570 fs: Arc<dyn Fs>,
571 cx: &mut AppContext,
572 ) -> ModelHandle<Self> {
573 cx.add_model(|cx: &mut ModelContext<Self>| {
574 let (tx, rx) = mpsc::unbounded();
575 cx.spawn_weak(|this, cx| Self::send_buffer_ordered_messages(this, rx, cx))
576 .detach();
577 Self {
578 worktrees: Default::default(),
579 buffer_ordered_messages_tx: tx,
580 collaborators: Default::default(),
581 next_buffer_id: 0,
582 opened_buffers: Default::default(),
583 shared_buffers: Default::default(),
584 incomplete_remote_buffers: Default::default(),
585 loading_buffers_by_path: Default::default(),
586 loading_local_worktrees: Default::default(),
587 local_buffer_ids_by_path: Default::default(),
588 local_buffer_ids_by_entry_id: Default::default(),
589 buffer_snapshots: Default::default(),
590 join_project_response_message_id: 0,
591 client_state: None,
592 opened_buffer: watch::channel(),
593 client_subscriptions: Vec::new(),
594 _subscriptions: vec![
595 cx.observe_global::<SettingsStore, _>(Self::on_settings_changed)
596 ],
597 _maintain_buffer_languages: Self::maintain_buffer_languages(languages.clone(), cx),
598 _maintain_workspace_config: Self::maintain_workspace_config(languages.clone(), cx),
599 active_entry: None,
600 languages,
601 client,
602 user_store,
603 fs,
604 next_entry_id: Default::default(),
605 next_diagnostic_group_id: Default::default(),
606 language_servers: Default::default(),
607 language_server_ids: Default::default(),
608 language_server_statuses: Default::default(),
609 last_workspace_edits_by_language_server: Default::default(),
610 buffers_being_formatted: Default::default(),
611 buffers_needing_diff: Default::default(),
612 git_diff_debouncer: DelayedDebounced::new(),
613 nonce: StdRng::from_entropy().gen(),
614 terminals: Terminals {
615 local_handles: Vec::new(),
616 },
617 copilot_enabled: Copilot::global(cx).is_some(),
618 }
619 })
620 }
621
622 pub async fn remote(
623 remote_id: u64,
624 client: Arc<Client>,
625 user_store: ModelHandle<UserStore>,
626 languages: Arc<LanguageRegistry>,
627 fs: Arc<dyn Fs>,
628 mut cx: AsyncAppContext,
629 ) -> Result<ModelHandle<Self>> {
630 client.authenticate_and_connect(true, &cx).await?;
631
632 let subscription = client.subscribe_to_entity(remote_id)?;
633 let response = client
634 .request_envelope(proto::JoinProject {
635 project_id: remote_id,
636 })
637 .await?;
638 let this = cx.add_model(|cx| {
639 let replica_id = response.payload.replica_id as ReplicaId;
640
641 let mut worktrees = Vec::new();
642 for worktree in response.payload.worktrees {
643 let worktree = cx.update(|cx| {
644 Worktree::remote(remote_id, replica_id, worktree, client.clone(), cx)
645 });
646 worktrees.push(worktree);
647 }
648
649 let (tx, rx) = mpsc::unbounded();
650 cx.spawn_weak(|this, cx| Self::send_buffer_ordered_messages(this, rx, cx))
651 .detach();
652 let mut this = Self {
653 worktrees: Vec::new(),
654 buffer_ordered_messages_tx: tx,
655 loading_buffers_by_path: Default::default(),
656 next_buffer_id: 0,
657 opened_buffer: watch::channel(),
658 shared_buffers: Default::default(),
659 incomplete_remote_buffers: Default::default(),
660 loading_local_worktrees: Default::default(),
661 local_buffer_ids_by_path: Default::default(),
662 local_buffer_ids_by_entry_id: Default::default(),
663 active_entry: None,
664 collaborators: Default::default(),
665 join_project_response_message_id: response.message_id,
666 _maintain_buffer_languages: Self::maintain_buffer_languages(languages.clone(), cx),
667 _maintain_workspace_config: Self::maintain_workspace_config(languages.clone(), cx),
668 languages,
669 user_store: user_store.clone(),
670 fs,
671 next_entry_id: Default::default(),
672 next_diagnostic_group_id: Default::default(),
673 client_subscriptions: Default::default(),
674 _subscriptions: Default::default(),
675 client: client.clone(),
676 client_state: Some(ProjectClientState::Remote {
677 sharing_has_stopped: false,
678 remote_id,
679 replica_id,
680 }),
681 language_servers: Default::default(),
682 language_server_ids: Default::default(),
683 language_server_statuses: response
684 .payload
685 .language_servers
686 .into_iter()
687 .map(|server| {
688 (
689 LanguageServerId(server.id as usize),
690 LanguageServerStatus {
691 name: server.name,
692 pending_work: Default::default(),
693 has_pending_diagnostic_updates: false,
694 progress_tokens: Default::default(),
695 },
696 )
697 })
698 .collect(),
699 last_workspace_edits_by_language_server: Default::default(),
700 opened_buffers: Default::default(),
701 buffers_being_formatted: Default::default(),
702 buffers_needing_diff: Default::default(),
703 git_diff_debouncer: DelayedDebounced::new(),
704 buffer_snapshots: Default::default(),
705 nonce: StdRng::from_entropy().gen(),
706 terminals: Terminals {
707 local_handles: Vec::new(),
708 },
709 copilot_enabled: Copilot::global(cx).is_some(),
710 };
711 for worktree in worktrees {
712 let _ = this.add_worktree(&worktree, cx);
713 }
714 this
715 });
716 let subscription = subscription.set_model(&this, &mut cx);
717
718 let user_ids = response
719 .payload
720 .collaborators
721 .iter()
722 .map(|peer| peer.user_id)
723 .collect();
724 user_store
725 .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))
726 .await?;
727
728 this.update(&mut cx, |this, cx| {
729 this.set_collaborators_from_proto(response.payload.collaborators, cx)?;
730 this.client_subscriptions.push(subscription);
731 anyhow::Ok(())
732 })?;
733
734 Ok(this)
735 }
736
737 #[cfg(any(test, feature = "test-support"))]
738 pub async fn test(
739 fs: Arc<dyn Fs>,
740 root_paths: impl IntoIterator<Item = &Path>,
741 cx: &mut gpui::TestAppContext,
742 ) -> ModelHandle<Project> {
743 let mut languages = LanguageRegistry::test();
744 languages.set_executor(cx.background());
745 let http_client = util::http::FakeHttpClient::with_404_response();
746 let client = cx.update(|cx| client::Client::new(http_client.clone(), cx));
747 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
748 let project =
749 cx.update(|cx| Project::local(client, user_store, Arc::new(languages), fs, cx));
750 for path in root_paths {
751 let (tree, _) = project
752 .update(cx, |project, cx| {
753 project.find_or_create_local_worktree(path, true, cx)
754 })
755 .await
756 .unwrap();
757 tree.read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
758 .await;
759 }
760 project
761 }
762
763 fn on_settings_changed(&mut self, cx: &mut ModelContext<Self>) {
764 let mut language_servers_to_start = Vec::new();
765 for buffer in self.opened_buffers.values() {
766 if let Some(buffer) = buffer.upgrade(cx) {
767 let buffer = buffer.read(cx);
768 if let Some((file, language)) = buffer.file().zip(buffer.language()) {
769 let settings = language_settings(Some(language), Some(file), cx);
770 if settings.enable_language_server {
771 if let Some(file) = File::from_dyn(Some(file)) {
772 language_servers_to_start
773 .push((file.worktree.clone(), language.clone()));
774 }
775 }
776 }
777 }
778 }
779
780 let mut language_servers_to_stop = Vec::new();
781 let mut language_servers_to_restart = Vec::new();
782 let languages = self.languages.to_vec();
783 let project_settings = settings::get::<ProjectSettings>(cx).clone();
784 for (worktree_id, started_lsp_name) in self.language_server_ids.keys() {
785 let language = languages.iter().find_map(|l| {
786 let adapter = l
787 .lsp_adapters()
788 .iter()
789 .find(|adapter| &adapter.name == started_lsp_name)?;
790 Some((l, adapter))
791 });
792 if let Some((language, adapter)) = language {
793 let worktree = self.worktree_for_id(*worktree_id, cx);
794 let file = worktree.as_ref().and_then(|tree| {
795 tree.update(cx, |tree, cx| tree.root_file(cx).map(|f| f as _))
796 });
797 if !language_settings(Some(language), file.as_ref(), cx).enable_language_server {
798 language_servers_to_stop.push((*worktree_id, started_lsp_name.clone()));
799 } else if let Some(worktree) = worktree {
800 let new_lsp_settings = project_settings
801 .lsp
802 .get(&adapter.name.0)
803 .and_then(|s| s.initialization_options.as_ref());
804 if adapter.initialization_options.as_ref() != new_lsp_settings {
805 language_servers_to_restart.push((worktree, Arc::clone(language)));
806 }
807 }
808 }
809 }
810
811 // Stop all newly-disabled language servers.
812 for (worktree_id, adapter_name) in language_servers_to_stop {
813 self.stop_language_server(worktree_id, adapter_name, cx)
814 .detach();
815 }
816
817 // Start all the newly-enabled language servers.
818 for (worktree, language) in language_servers_to_start {
819 let worktree_path = worktree.read(cx).abs_path();
820 self.start_language_servers(&worktree, worktree_path, language, cx);
821 }
822
823 // Restart all language servers with changed initialization options.
824 for (worktree, language) in language_servers_to_restart {
825 self.restart_language_servers(worktree, language, cx);
826 }
827
828 if !self.copilot_enabled && Copilot::global(cx).is_some() {
829 self.copilot_enabled = true;
830 for buffer in self.opened_buffers.values() {
831 if let Some(buffer) = buffer.upgrade(cx) {
832 self.register_buffer_with_copilot(&buffer, cx);
833 }
834 }
835 }
836
837 cx.notify();
838 }
839
840 pub fn buffer_for_id(&self, remote_id: u64, cx: &AppContext) -> Option<ModelHandle<Buffer>> {
841 self.opened_buffers
842 .get(&remote_id)
843 .and_then(|buffer| buffer.upgrade(cx))
844 }
845
846 pub fn languages(&self) -> &Arc<LanguageRegistry> {
847 &self.languages
848 }
849
850 pub fn client(&self) -> Arc<Client> {
851 self.client.clone()
852 }
853
854 pub fn user_store(&self) -> ModelHandle<UserStore> {
855 self.user_store.clone()
856 }
857
858 #[cfg(any(test, feature = "test-support"))]
859 pub fn opened_buffers(&self, cx: &AppContext) -> Vec<ModelHandle<Buffer>> {
860 self.opened_buffers
861 .values()
862 .filter_map(|b| b.upgrade(cx))
863 .collect()
864 }
865
866 #[cfg(any(test, feature = "test-support"))]
867 pub fn has_open_buffer(&self, path: impl Into<ProjectPath>, cx: &AppContext) -> bool {
868 let path = path.into();
869 if let Some(worktree) = self.worktree_for_id(path.worktree_id, cx) {
870 self.opened_buffers.iter().any(|(_, buffer)| {
871 if let Some(buffer) = buffer.upgrade(cx) {
872 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
873 if file.worktree == worktree && file.path() == &path.path {
874 return true;
875 }
876 }
877 }
878 false
879 })
880 } else {
881 false
882 }
883 }
884
885 pub fn fs(&self) -> &Arc<dyn Fs> {
886 &self.fs
887 }
888
889 pub fn remote_id(&self) -> Option<u64> {
890 match self.client_state.as_ref()? {
891 ProjectClientState::Local { remote_id, .. }
892 | ProjectClientState::Remote { remote_id, .. } => Some(*remote_id),
893 }
894 }
895
896 pub fn replica_id(&self) -> ReplicaId {
897 match &self.client_state {
898 Some(ProjectClientState::Remote { replica_id, .. }) => *replica_id,
899 _ => 0,
900 }
901 }
902
903 fn metadata_changed(&mut self, cx: &mut ModelContext<Self>) {
904 if let Some(ProjectClientState::Local { updates_tx, .. }) = &mut self.client_state {
905 updates_tx
906 .unbounded_send(LocalProjectUpdate::WorktreesChanged)
907 .ok();
908 }
909 cx.notify();
910 }
911
912 pub fn collaborators(&self) -> &HashMap<proto::PeerId, Collaborator> {
913 &self.collaborators
914 }
915
916 /// Collect all worktrees, including ones that don't appear in the project panel
917 pub fn worktrees<'a>(
918 &'a self,
919 cx: &'a AppContext,
920 ) -> impl 'a + DoubleEndedIterator<Item = ModelHandle<Worktree>> {
921 self.worktrees
922 .iter()
923 .filter_map(move |worktree| worktree.upgrade(cx))
924 }
925
926 /// Collect all user-visible worktrees, the ones that appear in the project panel
927 pub fn visible_worktrees<'a>(
928 &'a self,
929 cx: &'a AppContext,
930 ) -> impl 'a + DoubleEndedIterator<Item = ModelHandle<Worktree>> {
931 self.worktrees.iter().filter_map(|worktree| {
932 worktree.upgrade(cx).and_then(|worktree| {
933 if worktree.read(cx).is_visible() {
934 Some(worktree)
935 } else {
936 None
937 }
938 })
939 })
940 }
941
942 pub fn worktree_root_names<'a>(&'a self, cx: &'a AppContext) -> impl Iterator<Item = &'a str> {
943 self.visible_worktrees(cx)
944 .map(|tree| tree.read(cx).root_name())
945 }
946
947 pub fn worktree_for_id(
948 &self,
949 id: WorktreeId,
950 cx: &AppContext,
951 ) -> Option<ModelHandle<Worktree>> {
952 self.worktrees(cx)
953 .find(|worktree| worktree.read(cx).id() == id)
954 }
955
956 pub fn worktree_for_entry(
957 &self,
958 entry_id: ProjectEntryId,
959 cx: &AppContext,
960 ) -> Option<ModelHandle<Worktree>> {
961 self.worktrees(cx)
962 .find(|worktree| worktree.read(cx).contains_entry(entry_id))
963 }
964
965 pub fn worktree_id_for_entry(
966 &self,
967 entry_id: ProjectEntryId,
968 cx: &AppContext,
969 ) -> Option<WorktreeId> {
970 self.worktree_for_entry(entry_id, cx)
971 .map(|worktree| worktree.read(cx).id())
972 }
973
974 pub fn contains_paths(&self, paths: &[PathBuf], cx: &AppContext) -> bool {
975 paths.iter().all(|path| self.contains_path(path, cx))
976 }
977
978 pub fn contains_path(&self, path: &Path, cx: &AppContext) -> bool {
979 for worktree in self.worktrees(cx) {
980 let worktree = worktree.read(cx).as_local();
981 if worktree.map_or(false, |w| w.contains_abs_path(path)) {
982 return true;
983 }
984 }
985 false
986 }
987
988 pub fn create_entry(
989 &mut self,
990 project_path: impl Into<ProjectPath>,
991 is_directory: bool,
992 cx: &mut ModelContext<Self>,
993 ) -> Option<Task<Result<Entry>>> {
994 let project_path = project_path.into();
995 let worktree = self.worktree_for_id(project_path.worktree_id, cx)?;
996 if self.is_local() {
997 Some(worktree.update(cx, |worktree, cx| {
998 worktree
999 .as_local_mut()
1000 .unwrap()
1001 .create_entry(project_path.path, is_directory, cx)
1002 }))
1003 } else {
1004 let client = self.client.clone();
1005 let project_id = self.remote_id().unwrap();
1006 Some(cx.spawn_weak(|_, mut cx| async move {
1007 let response = client
1008 .request(proto::CreateProjectEntry {
1009 worktree_id: project_path.worktree_id.to_proto(),
1010 project_id,
1011 path: project_path.path.to_string_lossy().into(),
1012 is_directory,
1013 })
1014 .await?;
1015 let entry = response
1016 .entry
1017 .ok_or_else(|| anyhow!("missing entry in response"))?;
1018 worktree
1019 .update(&mut cx, |worktree, cx| {
1020 worktree.as_remote_mut().unwrap().insert_entry(
1021 entry,
1022 response.worktree_scan_id as usize,
1023 cx,
1024 )
1025 })
1026 .await
1027 }))
1028 }
1029 }
1030
1031 pub fn copy_entry(
1032 &mut self,
1033 entry_id: ProjectEntryId,
1034 new_path: impl Into<Arc<Path>>,
1035 cx: &mut ModelContext<Self>,
1036 ) -> Option<Task<Result<Entry>>> {
1037 let worktree = self.worktree_for_entry(entry_id, cx)?;
1038 let new_path = new_path.into();
1039 if self.is_local() {
1040 worktree.update(cx, |worktree, cx| {
1041 worktree
1042 .as_local_mut()
1043 .unwrap()
1044 .copy_entry(entry_id, new_path, cx)
1045 })
1046 } else {
1047 let client = self.client.clone();
1048 let project_id = self.remote_id().unwrap();
1049
1050 Some(cx.spawn_weak(|_, mut cx| async move {
1051 let response = client
1052 .request(proto::CopyProjectEntry {
1053 project_id,
1054 entry_id: entry_id.to_proto(),
1055 new_path: new_path.to_string_lossy().into(),
1056 })
1057 .await?;
1058 let entry = response
1059 .entry
1060 .ok_or_else(|| anyhow!("missing entry in response"))?;
1061 worktree
1062 .update(&mut cx, |worktree, cx| {
1063 worktree.as_remote_mut().unwrap().insert_entry(
1064 entry,
1065 response.worktree_scan_id as usize,
1066 cx,
1067 )
1068 })
1069 .await
1070 }))
1071 }
1072 }
1073
1074 pub fn rename_entry(
1075 &mut self,
1076 entry_id: ProjectEntryId,
1077 new_path: impl Into<Arc<Path>>,
1078 cx: &mut ModelContext<Self>,
1079 ) -> Option<Task<Result<Entry>>> {
1080 let worktree = self.worktree_for_entry(entry_id, cx)?;
1081 let new_path = new_path.into();
1082 if self.is_local() {
1083 worktree.update(cx, |worktree, cx| {
1084 worktree
1085 .as_local_mut()
1086 .unwrap()
1087 .rename_entry(entry_id, new_path, cx)
1088 })
1089 } else {
1090 let client = self.client.clone();
1091 let project_id = self.remote_id().unwrap();
1092
1093 Some(cx.spawn_weak(|_, mut cx| async move {
1094 let response = client
1095 .request(proto::RenameProjectEntry {
1096 project_id,
1097 entry_id: entry_id.to_proto(),
1098 new_path: new_path.to_string_lossy().into(),
1099 })
1100 .await?;
1101 let entry = response
1102 .entry
1103 .ok_or_else(|| anyhow!("missing entry in response"))?;
1104 worktree
1105 .update(&mut cx, |worktree, cx| {
1106 worktree.as_remote_mut().unwrap().insert_entry(
1107 entry,
1108 response.worktree_scan_id as usize,
1109 cx,
1110 )
1111 })
1112 .await
1113 }))
1114 }
1115 }
1116
1117 pub fn delete_entry(
1118 &mut self,
1119 entry_id: ProjectEntryId,
1120 cx: &mut ModelContext<Self>,
1121 ) -> Option<Task<Result<()>>> {
1122 let worktree = self.worktree_for_entry(entry_id, cx)?;
1123
1124 cx.emit(Event::DeletedEntry(entry_id));
1125
1126 if self.is_local() {
1127 worktree.update(cx, |worktree, cx| {
1128 worktree.as_local_mut().unwrap().delete_entry(entry_id, cx)
1129 })
1130 } else {
1131 let client = self.client.clone();
1132 let project_id = self.remote_id().unwrap();
1133 Some(cx.spawn_weak(|_, mut cx| async move {
1134 let response = client
1135 .request(proto::DeleteProjectEntry {
1136 project_id,
1137 entry_id: entry_id.to_proto(),
1138 })
1139 .await?;
1140 worktree
1141 .update(&mut cx, move |worktree, cx| {
1142 worktree.as_remote_mut().unwrap().delete_entry(
1143 entry_id,
1144 response.worktree_scan_id as usize,
1145 cx,
1146 )
1147 })
1148 .await
1149 }))
1150 }
1151 }
1152
1153 pub fn expand_entry(
1154 &mut self,
1155 worktree_id: WorktreeId,
1156 entry_id: ProjectEntryId,
1157 cx: &mut ModelContext<Self>,
1158 ) -> Option<Task<Result<()>>> {
1159 let worktree = self.worktree_for_id(worktree_id, cx)?;
1160 if self.is_local() {
1161 worktree.update(cx, |worktree, cx| {
1162 worktree.as_local_mut().unwrap().expand_entry(entry_id, cx)
1163 })
1164 } else {
1165 let worktree = worktree.downgrade();
1166 let request = self.client.request(proto::ExpandProjectEntry {
1167 project_id: self.remote_id().unwrap(),
1168 entry_id: entry_id.to_proto(),
1169 });
1170 Some(cx.spawn_weak(|_, mut cx| async move {
1171 let response = request.await?;
1172 if let Some(worktree) = worktree.upgrade(&cx) {
1173 worktree
1174 .update(&mut cx, |worktree, _| {
1175 worktree
1176 .as_remote_mut()
1177 .unwrap()
1178 .wait_for_snapshot(response.worktree_scan_id as usize)
1179 })
1180 .await?;
1181 }
1182 Ok(())
1183 }))
1184 }
1185 }
1186
1187 pub fn shared(&mut self, project_id: u64, cx: &mut ModelContext<Self>) -> Result<()> {
1188 if self.client_state.is_some() {
1189 return Err(anyhow!("project was already shared"));
1190 }
1191 self.client_subscriptions.push(
1192 self.client
1193 .subscribe_to_entity(project_id)?
1194 .set_model(&cx.handle(), &mut cx.to_async()),
1195 );
1196
1197 for open_buffer in self.opened_buffers.values_mut() {
1198 match open_buffer {
1199 OpenBuffer::Strong(_) => {}
1200 OpenBuffer::Weak(buffer) => {
1201 if let Some(buffer) = buffer.upgrade(cx) {
1202 *open_buffer = OpenBuffer::Strong(buffer);
1203 }
1204 }
1205 OpenBuffer::Operations(_) => unreachable!(),
1206 }
1207 }
1208
1209 for worktree_handle in self.worktrees.iter_mut() {
1210 match worktree_handle {
1211 WorktreeHandle::Strong(_) => {}
1212 WorktreeHandle::Weak(worktree) => {
1213 if let Some(worktree) = worktree.upgrade(cx) {
1214 *worktree_handle = WorktreeHandle::Strong(worktree);
1215 }
1216 }
1217 }
1218 }
1219
1220 for (server_id, status) in &self.language_server_statuses {
1221 self.client
1222 .send(proto::StartLanguageServer {
1223 project_id,
1224 server: Some(proto::LanguageServer {
1225 id: server_id.0 as u64,
1226 name: status.name.clone(),
1227 }),
1228 })
1229 .log_err();
1230 }
1231
1232 let store = cx.global::<SettingsStore>();
1233 for worktree in self.worktrees(cx) {
1234 let worktree_id = worktree.read(cx).id().to_proto();
1235 for (path, content) in store.local_settings(worktree.id()) {
1236 self.client
1237 .send(proto::UpdateWorktreeSettings {
1238 project_id,
1239 worktree_id,
1240 path: path.to_string_lossy().into(),
1241 content: Some(content),
1242 })
1243 .log_err();
1244 }
1245 }
1246
1247 let (updates_tx, mut updates_rx) = mpsc::unbounded();
1248 let client = self.client.clone();
1249 self.client_state = Some(ProjectClientState::Local {
1250 remote_id: project_id,
1251 updates_tx,
1252 _send_updates: cx.spawn_weak(move |this, mut cx| async move {
1253 while let Some(update) = updates_rx.next().await {
1254 let Some(this) = this.upgrade(&cx) else { break };
1255
1256 match update {
1257 LocalProjectUpdate::WorktreesChanged => {
1258 let worktrees = this
1259 .read_with(&cx, |this, cx| this.worktrees(cx).collect::<Vec<_>>());
1260 let update_project = this
1261 .read_with(&cx, |this, cx| {
1262 this.client.request(proto::UpdateProject {
1263 project_id,
1264 worktrees: this.worktree_metadata_protos(cx),
1265 })
1266 })
1267 .await;
1268 if update_project.is_ok() {
1269 for worktree in worktrees {
1270 worktree.update(&mut cx, |worktree, cx| {
1271 let worktree = worktree.as_local_mut().unwrap();
1272 worktree.share(project_id, cx).detach_and_log_err(cx)
1273 });
1274 }
1275 }
1276 }
1277 LocalProjectUpdate::CreateBufferForPeer { peer_id, buffer_id } => {
1278 let buffer = this.update(&mut cx, |this, _| {
1279 let buffer = this.opened_buffers.get(&buffer_id).unwrap();
1280 let shared_buffers =
1281 this.shared_buffers.entry(peer_id).or_default();
1282 if shared_buffers.insert(buffer_id) {
1283 if let OpenBuffer::Strong(buffer) = buffer {
1284 Some(buffer.clone())
1285 } else {
1286 None
1287 }
1288 } else {
1289 None
1290 }
1291 });
1292
1293 let Some(buffer) = buffer else { continue };
1294 let operations =
1295 buffer.read_with(&cx, |b, cx| b.serialize_ops(None, cx));
1296 let operations = operations.await;
1297 let state = buffer.read_with(&cx, |buffer, _| buffer.to_proto());
1298
1299 let initial_state = proto::CreateBufferForPeer {
1300 project_id,
1301 peer_id: Some(peer_id),
1302 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
1303 };
1304 if client.send(initial_state).log_err().is_some() {
1305 let client = client.clone();
1306 cx.background()
1307 .spawn(async move {
1308 let mut chunks = split_operations(operations).peekable();
1309 while let Some(chunk) = chunks.next() {
1310 let is_last = chunks.peek().is_none();
1311 client.send(proto::CreateBufferForPeer {
1312 project_id,
1313 peer_id: Some(peer_id),
1314 variant: Some(
1315 proto::create_buffer_for_peer::Variant::Chunk(
1316 proto::BufferChunk {
1317 buffer_id,
1318 operations: chunk,
1319 is_last,
1320 },
1321 ),
1322 ),
1323 })?;
1324 }
1325 anyhow::Ok(())
1326 })
1327 .await
1328 .log_err();
1329 }
1330 }
1331 }
1332 }
1333 }),
1334 });
1335
1336 self.metadata_changed(cx);
1337 cx.emit(Event::RemoteIdChanged(Some(project_id)));
1338 cx.notify();
1339 Ok(())
1340 }
1341
1342 pub fn reshared(
1343 &mut self,
1344 message: proto::ResharedProject,
1345 cx: &mut ModelContext<Self>,
1346 ) -> Result<()> {
1347 self.shared_buffers.clear();
1348 self.set_collaborators_from_proto(message.collaborators, cx)?;
1349 self.metadata_changed(cx);
1350 Ok(())
1351 }
1352
1353 pub fn rejoined(
1354 &mut self,
1355 message: proto::RejoinedProject,
1356 message_id: u32,
1357 cx: &mut ModelContext<Self>,
1358 ) -> Result<()> {
1359 cx.update_global::<SettingsStore, _, _>(|store, cx| {
1360 for worktree in &self.worktrees {
1361 store
1362 .clear_local_settings(worktree.handle_id(), cx)
1363 .log_err();
1364 }
1365 });
1366
1367 self.join_project_response_message_id = message_id;
1368 self.set_worktrees_from_proto(message.worktrees, cx)?;
1369 self.set_collaborators_from_proto(message.collaborators, cx)?;
1370 self.language_server_statuses = message
1371 .language_servers
1372 .into_iter()
1373 .map(|server| {
1374 (
1375 LanguageServerId(server.id as usize),
1376 LanguageServerStatus {
1377 name: server.name,
1378 pending_work: Default::default(),
1379 has_pending_diagnostic_updates: false,
1380 progress_tokens: Default::default(),
1381 },
1382 )
1383 })
1384 .collect();
1385 self.buffer_ordered_messages_tx
1386 .unbounded_send(BufferOrderedMessage::Resync)
1387 .unwrap();
1388 cx.notify();
1389 Ok(())
1390 }
1391
1392 pub fn unshare(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1393 self.unshare_internal(cx)?;
1394 self.metadata_changed(cx);
1395 cx.notify();
1396 Ok(())
1397 }
1398
1399 fn unshare_internal(&mut self, cx: &mut AppContext) -> Result<()> {
1400 if self.is_remote() {
1401 return Err(anyhow!("attempted to unshare a remote project"));
1402 }
1403
1404 if let Some(ProjectClientState::Local { remote_id, .. }) = self.client_state.take() {
1405 self.collaborators.clear();
1406 self.shared_buffers.clear();
1407 self.client_subscriptions.clear();
1408
1409 for worktree_handle in self.worktrees.iter_mut() {
1410 if let WorktreeHandle::Strong(worktree) = worktree_handle {
1411 let is_visible = worktree.update(cx, |worktree, _| {
1412 worktree.as_local_mut().unwrap().unshare();
1413 worktree.is_visible()
1414 });
1415 if !is_visible {
1416 *worktree_handle = WorktreeHandle::Weak(worktree.downgrade());
1417 }
1418 }
1419 }
1420
1421 for open_buffer in self.opened_buffers.values_mut() {
1422 // Wake up any tasks waiting for peers' edits to this buffer.
1423 if let Some(buffer) = open_buffer.upgrade(cx) {
1424 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1425 }
1426
1427 if let OpenBuffer::Strong(buffer) = open_buffer {
1428 *open_buffer = OpenBuffer::Weak(buffer.downgrade());
1429 }
1430 }
1431
1432 self.client.send(proto::UnshareProject {
1433 project_id: remote_id,
1434 })?;
1435
1436 Ok(())
1437 } else {
1438 Err(anyhow!("attempted to unshare an unshared project"))
1439 }
1440 }
1441
1442 pub fn disconnected_from_host(&mut self, cx: &mut ModelContext<Self>) {
1443 self.disconnected_from_host_internal(cx);
1444 cx.emit(Event::DisconnectedFromHost);
1445 cx.notify();
1446 }
1447
1448 fn disconnected_from_host_internal(&mut self, cx: &mut AppContext) {
1449 if let Some(ProjectClientState::Remote {
1450 sharing_has_stopped,
1451 ..
1452 }) = &mut self.client_state
1453 {
1454 *sharing_has_stopped = true;
1455
1456 self.collaborators.clear();
1457
1458 for worktree in &self.worktrees {
1459 if let Some(worktree) = worktree.upgrade(cx) {
1460 worktree.update(cx, |worktree, _| {
1461 if let Some(worktree) = worktree.as_remote_mut() {
1462 worktree.disconnected_from_host();
1463 }
1464 });
1465 }
1466 }
1467
1468 for open_buffer in self.opened_buffers.values_mut() {
1469 // Wake up any tasks waiting for peers' edits to this buffer.
1470 if let Some(buffer) = open_buffer.upgrade(cx) {
1471 buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1472 }
1473
1474 if let OpenBuffer::Strong(buffer) = open_buffer {
1475 *open_buffer = OpenBuffer::Weak(buffer.downgrade());
1476 }
1477 }
1478
1479 // Wake up all futures currently waiting on a buffer to get opened,
1480 // to give them a chance to fail now that we've disconnected.
1481 *self.opened_buffer.0.borrow_mut() = ();
1482 }
1483 }
1484
1485 pub fn close(&mut self, cx: &mut ModelContext<Self>) {
1486 cx.emit(Event::Closed);
1487 }
1488
1489 pub fn is_read_only(&self) -> bool {
1490 match &self.client_state {
1491 Some(ProjectClientState::Remote {
1492 sharing_has_stopped,
1493 ..
1494 }) => *sharing_has_stopped,
1495 _ => false,
1496 }
1497 }
1498
1499 pub fn is_local(&self) -> bool {
1500 match &self.client_state {
1501 Some(ProjectClientState::Remote { .. }) => false,
1502 _ => true,
1503 }
1504 }
1505
1506 pub fn is_remote(&self) -> bool {
1507 !self.is_local()
1508 }
1509
1510 pub fn create_buffer(
1511 &mut self,
1512 text: &str,
1513 language: Option<Arc<Language>>,
1514 cx: &mut ModelContext<Self>,
1515 ) -> Result<ModelHandle<Buffer>> {
1516 if self.is_remote() {
1517 return Err(anyhow!("creating buffers as a guest is not supported yet"));
1518 }
1519
1520 let buffer = cx.add_model(|cx| {
1521 Buffer::new(self.replica_id(), text, cx)
1522 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
1523 });
1524 self.register_buffer(&buffer, cx)?;
1525 Ok(buffer)
1526 }
1527
1528 pub fn open_path(
1529 &mut self,
1530 path: impl Into<ProjectPath>,
1531 cx: &mut ModelContext<Self>,
1532 ) -> Task<Result<(ProjectEntryId, AnyModelHandle)>> {
1533 let task = self.open_buffer(path, cx);
1534 cx.spawn_weak(|_, cx| async move {
1535 let buffer = task.await?;
1536 let project_entry_id = buffer
1537 .read_with(&cx, |buffer, cx| {
1538 File::from_dyn(buffer.file()).and_then(|file| file.project_entry_id(cx))
1539 })
1540 .ok_or_else(|| anyhow!("no project entry"))?;
1541
1542 let buffer: &AnyModelHandle = &buffer;
1543 Ok((project_entry_id, buffer.clone()))
1544 })
1545 }
1546
1547 pub fn open_local_buffer(
1548 &mut self,
1549 abs_path: impl AsRef<Path>,
1550 cx: &mut ModelContext<Self>,
1551 ) -> Task<Result<ModelHandle<Buffer>>> {
1552 if let Some((worktree, relative_path)) = self.find_local_worktree(abs_path.as_ref(), cx) {
1553 self.open_buffer((worktree.read(cx).id(), relative_path), cx)
1554 } else {
1555 Task::ready(Err(anyhow!("no such path")))
1556 }
1557 }
1558
1559 pub fn open_buffer(
1560 &mut self,
1561 path: impl Into<ProjectPath>,
1562 cx: &mut ModelContext<Self>,
1563 ) -> Task<Result<ModelHandle<Buffer>>> {
1564 let project_path = path.into();
1565 let worktree = if let Some(worktree) = self.worktree_for_id(project_path.worktree_id, cx) {
1566 worktree
1567 } else {
1568 return Task::ready(Err(anyhow!("no such worktree")));
1569 };
1570
1571 // If there is already a buffer for the given path, then return it.
1572 let existing_buffer = self.get_open_buffer(&project_path, cx);
1573 if let Some(existing_buffer) = existing_buffer {
1574 return Task::ready(Ok(existing_buffer));
1575 }
1576
1577 let loading_watch = match self.loading_buffers_by_path.entry(project_path.clone()) {
1578 // If the given path is already being loaded, then wait for that existing
1579 // task to complete and return the same buffer.
1580 hash_map::Entry::Occupied(e) => e.get().clone(),
1581
1582 // Otherwise, record the fact that this path is now being loaded.
1583 hash_map::Entry::Vacant(entry) => {
1584 let (mut tx, rx) = postage::watch::channel();
1585 entry.insert(rx.clone());
1586
1587 let load_buffer = if worktree.read(cx).is_local() {
1588 self.open_local_buffer_internal(&project_path.path, &worktree, cx)
1589 } else {
1590 self.open_remote_buffer_internal(&project_path.path, &worktree, cx)
1591 };
1592
1593 cx.spawn(move |this, mut cx| async move {
1594 let load_result = load_buffer.await;
1595 *tx.borrow_mut() = Some(this.update(&mut cx, |this, _| {
1596 // Record the fact that the buffer is no longer loading.
1597 this.loading_buffers_by_path.remove(&project_path);
1598 let buffer = load_result.map_err(Arc::new)?;
1599 Ok(buffer)
1600 }));
1601 })
1602 .detach();
1603 rx
1604 }
1605 };
1606
1607 cx.foreground().spawn(async move {
1608 wait_for_loading_buffer(loading_watch)
1609 .await
1610 .map_err(|error| anyhow!("{}", error))
1611 })
1612 }
1613
1614 fn open_local_buffer_internal(
1615 &mut self,
1616 path: &Arc<Path>,
1617 worktree: &ModelHandle<Worktree>,
1618 cx: &mut ModelContext<Self>,
1619 ) -> Task<Result<ModelHandle<Buffer>>> {
1620 let buffer_id = post_inc(&mut self.next_buffer_id);
1621 let load_buffer = worktree.update(cx, |worktree, cx| {
1622 let worktree = worktree.as_local_mut().unwrap();
1623 worktree.load_buffer(buffer_id, path, cx)
1624 });
1625 cx.spawn(|this, mut cx| async move {
1626 let buffer = load_buffer.await?;
1627 this.update(&mut cx, |this, cx| this.register_buffer(&buffer, cx))?;
1628 Ok(buffer)
1629 })
1630 }
1631
1632 fn open_remote_buffer_internal(
1633 &mut self,
1634 path: &Arc<Path>,
1635 worktree: &ModelHandle<Worktree>,
1636 cx: &mut ModelContext<Self>,
1637 ) -> Task<Result<ModelHandle<Buffer>>> {
1638 let rpc = self.client.clone();
1639 let project_id = self.remote_id().unwrap();
1640 let remote_worktree_id = worktree.read(cx).id();
1641 let path = path.clone();
1642 let path_string = path.to_string_lossy().to_string();
1643 cx.spawn(|this, mut cx| async move {
1644 let response = rpc
1645 .request(proto::OpenBufferByPath {
1646 project_id,
1647 worktree_id: remote_worktree_id.to_proto(),
1648 path: path_string,
1649 })
1650 .await?;
1651 this.update(&mut cx, |this, cx| {
1652 this.wait_for_remote_buffer(response.buffer_id, cx)
1653 })
1654 .await
1655 })
1656 }
1657
1658 /// LanguageServerName is owned, because it is inserted into a map
1659 fn open_local_buffer_via_lsp(
1660 &mut self,
1661 abs_path: lsp::Url,
1662 language_server_id: LanguageServerId,
1663 language_server_name: LanguageServerName,
1664 cx: &mut ModelContext<Self>,
1665 ) -> Task<Result<ModelHandle<Buffer>>> {
1666 cx.spawn(|this, mut cx| async move {
1667 let abs_path = abs_path
1668 .to_file_path()
1669 .map_err(|_| anyhow!("can't convert URI to path"))?;
1670 let (worktree, relative_path) = if let Some(result) =
1671 this.read_with(&cx, |this, cx| this.find_local_worktree(&abs_path, cx))
1672 {
1673 result
1674 } else {
1675 let worktree = this
1676 .update(&mut cx, |this, cx| {
1677 this.create_local_worktree(&abs_path, false, cx)
1678 })
1679 .await?;
1680 this.update(&mut cx, |this, cx| {
1681 this.language_server_ids.insert(
1682 (worktree.read(cx).id(), language_server_name),
1683 language_server_id,
1684 );
1685 });
1686 (worktree, PathBuf::new())
1687 };
1688
1689 let project_path = ProjectPath {
1690 worktree_id: worktree.read_with(&cx, |worktree, _| worktree.id()),
1691 path: relative_path.into(),
1692 };
1693 this.update(&mut cx, |this, cx| this.open_buffer(project_path, cx))
1694 .await
1695 })
1696 }
1697
1698 pub fn open_buffer_by_id(
1699 &mut self,
1700 id: u64,
1701 cx: &mut ModelContext<Self>,
1702 ) -> Task<Result<ModelHandle<Buffer>>> {
1703 if let Some(buffer) = self.buffer_for_id(id, cx) {
1704 Task::ready(Ok(buffer))
1705 } else if self.is_local() {
1706 Task::ready(Err(anyhow!("buffer {} does not exist", id)))
1707 } else if let Some(project_id) = self.remote_id() {
1708 let request = self
1709 .client
1710 .request(proto::OpenBufferById { project_id, id });
1711 cx.spawn(|this, mut cx| async move {
1712 let buffer_id = request.await?.buffer_id;
1713 this.update(&mut cx, |this, cx| {
1714 this.wait_for_remote_buffer(buffer_id, cx)
1715 })
1716 .await
1717 })
1718 } else {
1719 Task::ready(Err(anyhow!("cannot open buffer while disconnected")))
1720 }
1721 }
1722
1723 pub fn save_buffers(
1724 &self,
1725 buffers: HashSet<ModelHandle<Buffer>>,
1726 cx: &mut ModelContext<Self>,
1727 ) -> Task<Result<()>> {
1728 cx.spawn(|this, mut cx| async move {
1729 let save_tasks = buffers
1730 .into_iter()
1731 .map(|buffer| this.update(&mut cx, |this, cx| this.save_buffer(buffer, cx)));
1732 try_join_all(save_tasks).await?;
1733 Ok(())
1734 })
1735 }
1736
1737 pub fn save_buffer(
1738 &self,
1739 buffer: ModelHandle<Buffer>,
1740 cx: &mut ModelContext<Self>,
1741 ) -> Task<Result<()>> {
1742 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
1743 return Task::ready(Err(anyhow!("buffer doesn't have a file")));
1744 };
1745 let worktree = file.worktree.clone();
1746 let path = file.path.clone();
1747 worktree.update(cx, |worktree, cx| match worktree {
1748 Worktree::Local(worktree) => worktree.save_buffer(buffer, path, false, cx),
1749 Worktree::Remote(worktree) => worktree.save_buffer(buffer, cx),
1750 })
1751 }
1752
1753 pub fn save_buffer_as(
1754 &mut self,
1755 buffer: ModelHandle<Buffer>,
1756 abs_path: PathBuf,
1757 cx: &mut ModelContext<Self>,
1758 ) -> Task<Result<()>> {
1759 let worktree_task = self.find_or_create_local_worktree(&abs_path, true, cx);
1760 let old_file = File::from_dyn(buffer.read(cx).file())
1761 .filter(|f| f.is_local())
1762 .cloned();
1763 cx.spawn(|this, mut cx| async move {
1764 if let Some(old_file) = &old_file {
1765 this.update(&mut cx, |this, cx| {
1766 this.unregister_buffer_from_language_servers(&buffer, old_file, cx);
1767 });
1768 }
1769 let (worktree, path) = worktree_task.await?;
1770 worktree
1771 .update(&mut cx, |worktree, cx| match worktree {
1772 Worktree::Local(worktree) => {
1773 worktree.save_buffer(buffer.clone(), path.into(), true, cx)
1774 }
1775 Worktree::Remote(_) => panic!("cannot remote buffers as new files"),
1776 })
1777 .await?;
1778 this.update(&mut cx, |this, cx| {
1779 this.detect_language_for_buffer(&buffer, cx);
1780 this.register_buffer_with_language_servers(&buffer, cx);
1781 });
1782 Ok(())
1783 })
1784 }
1785
1786 pub fn get_open_buffer(
1787 &mut self,
1788 path: &ProjectPath,
1789 cx: &mut ModelContext<Self>,
1790 ) -> Option<ModelHandle<Buffer>> {
1791 let worktree = self.worktree_for_id(path.worktree_id, cx)?;
1792 self.opened_buffers.values().find_map(|buffer| {
1793 let buffer = buffer.upgrade(cx)?;
1794 let file = File::from_dyn(buffer.read(cx).file())?;
1795 if file.worktree == worktree && file.path() == &path.path {
1796 Some(buffer)
1797 } else {
1798 None
1799 }
1800 })
1801 }
1802
1803 fn register_buffer(
1804 &mut self,
1805 buffer: &ModelHandle<Buffer>,
1806 cx: &mut ModelContext<Self>,
1807 ) -> Result<()> {
1808 self.request_buffer_diff_recalculation(buffer, cx);
1809 buffer.update(cx, |buffer, _| {
1810 buffer.set_language_registry(self.languages.clone())
1811 });
1812
1813 let remote_id = buffer.read(cx).remote_id();
1814 let is_remote = self.is_remote();
1815 let open_buffer = if is_remote || self.is_shared() {
1816 OpenBuffer::Strong(buffer.clone())
1817 } else {
1818 OpenBuffer::Weak(buffer.downgrade())
1819 };
1820
1821 match self.opened_buffers.entry(remote_id) {
1822 hash_map::Entry::Vacant(entry) => {
1823 entry.insert(open_buffer);
1824 }
1825 hash_map::Entry::Occupied(mut entry) => {
1826 if let OpenBuffer::Operations(operations) = entry.get_mut() {
1827 buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx))?;
1828 } else if entry.get().upgrade(cx).is_some() {
1829 if is_remote {
1830 return Ok(());
1831 } else {
1832 debug_panic!("buffer {} was already registered", remote_id);
1833 Err(anyhow!("buffer {} was already registered", remote_id))?;
1834 }
1835 }
1836 entry.insert(open_buffer);
1837 }
1838 }
1839 cx.subscribe(buffer, |this, buffer, event, cx| {
1840 this.on_buffer_event(buffer, event, cx);
1841 })
1842 .detach();
1843
1844 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
1845 if file.is_local {
1846 self.local_buffer_ids_by_path.insert(
1847 ProjectPath {
1848 worktree_id: file.worktree_id(cx),
1849 path: file.path.clone(),
1850 },
1851 remote_id,
1852 );
1853
1854 self.local_buffer_ids_by_entry_id
1855 .insert(file.entry_id, remote_id);
1856 }
1857 }
1858
1859 self.detect_language_for_buffer(buffer, cx);
1860 self.register_buffer_with_language_servers(buffer, cx);
1861 self.register_buffer_with_copilot(buffer, cx);
1862 cx.observe_release(buffer, |this, buffer, cx| {
1863 if let Some(file) = File::from_dyn(buffer.file()) {
1864 if file.is_local() {
1865 let uri = lsp::Url::from_file_path(file.abs_path(cx)).unwrap();
1866 for server in this.language_servers_for_buffer(buffer, cx) {
1867 server
1868 .1
1869 .notify::<lsp::notification::DidCloseTextDocument>(
1870 lsp::DidCloseTextDocumentParams {
1871 text_document: lsp::TextDocumentIdentifier::new(uri.clone()),
1872 },
1873 )
1874 .log_err();
1875 }
1876 }
1877 }
1878 })
1879 .detach();
1880
1881 *self.opened_buffer.0.borrow_mut() = ();
1882 Ok(())
1883 }
1884
1885 fn register_buffer_with_language_servers(
1886 &mut self,
1887 buffer_handle: &ModelHandle<Buffer>,
1888 cx: &mut ModelContext<Self>,
1889 ) {
1890 let buffer = buffer_handle.read(cx);
1891 let buffer_id = buffer.remote_id();
1892
1893 if let Some(file) = File::from_dyn(buffer.file()) {
1894 if !file.is_local() {
1895 return;
1896 }
1897
1898 let uri = lsp::Url::from_file_path(file.abs_path(cx)).unwrap();
1899 let initial_snapshot = buffer.text_snapshot();
1900 let language = buffer.language().cloned();
1901 let worktree_id = file.worktree_id(cx);
1902
1903 if let Some(local_worktree) = file.worktree.read(cx).as_local() {
1904 for (server_id, diagnostics) in local_worktree.diagnostics_for_path(file.path()) {
1905 self.update_buffer_diagnostics(buffer_handle, server_id, None, diagnostics, cx)
1906 .log_err();
1907 }
1908 }
1909
1910 if let Some(language) = language {
1911 for adapter in language.lsp_adapters() {
1912 let language_id = adapter.language_ids.get(language.name().as_ref()).cloned();
1913 let server = self
1914 .language_server_ids
1915 .get(&(worktree_id, adapter.name.clone()))
1916 .and_then(|id| self.language_servers.get(id))
1917 .and_then(|server_state| {
1918 if let LanguageServerState::Running { server, .. } = server_state {
1919 Some(server.clone())
1920 } else {
1921 None
1922 }
1923 });
1924 let server = match server {
1925 Some(server) => server,
1926 None => continue,
1927 };
1928
1929 server
1930 .notify::<lsp::notification::DidOpenTextDocument>(
1931 lsp::DidOpenTextDocumentParams {
1932 text_document: lsp::TextDocumentItem::new(
1933 uri.clone(),
1934 language_id.unwrap_or_default(),
1935 0,
1936 initial_snapshot.text(),
1937 ),
1938 },
1939 )
1940 .log_err();
1941
1942 buffer_handle.update(cx, |buffer, cx| {
1943 buffer.set_completion_triggers(
1944 server
1945 .capabilities()
1946 .completion_provider
1947 .as_ref()
1948 .and_then(|provider| provider.trigger_characters.clone())
1949 .unwrap_or_default(),
1950 cx,
1951 );
1952 });
1953
1954 let snapshot = LspBufferSnapshot {
1955 version: 0,
1956 snapshot: initial_snapshot.clone(),
1957 };
1958 self.buffer_snapshots
1959 .entry(buffer_id)
1960 .or_default()
1961 .insert(server.server_id(), vec![snapshot]);
1962 }
1963 }
1964 }
1965 }
1966
1967 fn unregister_buffer_from_language_servers(
1968 &mut self,
1969 buffer: &ModelHandle<Buffer>,
1970 old_file: &File,
1971 cx: &mut ModelContext<Self>,
1972 ) {
1973 let old_path = match old_file.as_local() {
1974 Some(local) => local.abs_path(cx),
1975 None => return,
1976 };
1977
1978 buffer.update(cx, |buffer, cx| {
1979 let worktree_id = old_file.worktree_id(cx);
1980 let ids = &self.language_server_ids;
1981
1982 let language = buffer.language().cloned();
1983 let adapters = language.iter().flat_map(|language| language.lsp_adapters());
1984 for &server_id in adapters.flat_map(|a| ids.get(&(worktree_id, a.name.clone()))) {
1985 buffer.update_diagnostics(server_id, Default::default(), cx);
1986 }
1987
1988 self.buffer_snapshots.remove(&buffer.remote_id());
1989 let file_url = lsp::Url::from_file_path(old_path).unwrap();
1990 for (_, language_server) in self.language_servers_for_buffer(buffer, cx) {
1991 language_server
1992 .notify::<lsp::notification::DidCloseTextDocument>(
1993 lsp::DidCloseTextDocumentParams {
1994 text_document: lsp::TextDocumentIdentifier::new(file_url.clone()),
1995 },
1996 )
1997 .log_err();
1998 }
1999 });
2000 }
2001
2002 fn register_buffer_with_copilot(
2003 &self,
2004 buffer_handle: &ModelHandle<Buffer>,
2005 cx: &mut ModelContext<Self>,
2006 ) {
2007 if let Some(copilot) = Copilot::global(cx) {
2008 copilot.update(cx, |copilot, cx| copilot.register_buffer(buffer_handle, cx));
2009 }
2010 }
2011
2012 async fn send_buffer_ordered_messages(
2013 this: WeakModelHandle<Self>,
2014 rx: UnboundedReceiver<BufferOrderedMessage>,
2015 mut cx: AsyncAppContext,
2016 ) -> Option<()> {
2017 const MAX_BATCH_SIZE: usize = 128;
2018
2019 let mut operations_by_buffer_id = HashMap::default();
2020 async fn flush_operations(
2021 this: &ModelHandle<Project>,
2022 operations_by_buffer_id: &mut HashMap<u64, Vec<proto::Operation>>,
2023 needs_resync_with_host: &mut bool,
2024 is_local: bool,
2025 cx: &AsyncAppContext,
2026 ) {
2027 for (buffer_id, operations) in operations_by_buffer_id.drain() {
2028 let request = this.read_with(cx, |this, _| {
2029 let project_id = this.remote_id()?;
2030 Some(this.client.request(proto::UpdateBuffer {
2031 buffer_id,
2032 project_id,
2033 operations,
2034 }))
2035 });
2036 if let Some(request) = request {
2037 if request.await.is_err() && !is_local {
2038 *needs_resync_with_host = true;
2039 break;
2040 }
2041 }
2042 }
2043 }
2044
2045 let mut needs_resync_with_host = false;
2046 let mut changes = rx.ready_chunks(MAX_BATCH_SIZE);
2047
2048 while let Some(changes) = changes.next().await {
2049 let this = this.upgrade(&mut cx)?;
2050 let is_local = this.read_with(&cx, |this, _| this.is_local());
2051
2052 for change in changes {
2053 match change {
2054 BufferOrderedMessage::Operation {
2055 buffer_id,
2056 operation,
2057 } => {
2058 if needs_resync_with_host {
2059 continue;
2060 }
2061
2062 operations_by_buffer_id
2063 .entry(buffer_id)
2064 .or_insert(Vec::new())
2065 .push(operation);
2066 }
2067
2068 BufferOrderedMessage::Resync => {
2069 operations_by_buffer_id.clear();
2070 if this
2071 .update(&mut cx, |this, cx| this.synchronize_remote_buffers(cx))
2072 .await
2073 .is_ok()
2074 {
2075 needs_resync_with_host = false;
2076 }
2077 }
2078
2079 BufferOrderedMessage::LanguageServerUpdate {
2080 language_server_id,
2081 message,
2082 } => {
2083 flush_operations(
2084 &this,
2085 &mut operations_by_buffer_id,
2086 &mut needs_resync_with_host,
2087 is_local,
2088 &cx,
2089 )
2090 .await;
2091
2092 this.read_with(&cx, |this, _| {
2093 if let Some(project_id) = this.remote_id() {
2094 this.client
2095 .send(proto::UpdateLanguageServer {
2096 project_id,
2097 language_server_id: language_server_id.0 as u64,
2098 variant: Some(message),
2099 })
2100 .log_err();
2101 }
2102 });
2103 }
2104 }
2105 }
2106
2107 flush_operations(
2108 &this,
2109 &mut operations_by_buffer_id,
2110 &mut needs_resync_with_host,
2111 is_local,
2112 &cx,
2113 )
2114 .await;
2115 }
2116
2117 None
2118 }
2119
2120 fn on_buffer_event(
2121 &mut self,
2122 buffer: ModelHandle<Buffer>,
2123 event: &BufferEvent,
2124 cx: &mut ModelContext<Self>,
2125 ) -> Option<()> {
2126 if matches!(
2127 event,
2128 BufferEvent::Edited { .. } | BufferEvent::Reloaded | BufferEvent::DiffBaseChanged
2129 ) {
2130 self.request_buffer_diff_recalculation(&buffer, cx);
2131 }
2132
2133 match event {
2134 BufferEvent::Operation(operation) => {
2135 self.buffer_ordered_messages_tx
2136 .unbounded_send(BufferOrderedMessage::Operation {
2137 buffer_id: buffer.read(cx).remote_id(),
2138 operation: language::proto::serialize_operation(operation),
2139 })
2140 .ok();
2141 }
2142
2143 BufferEvent::Edited { .. } => {
2144 let buffer = buffer.read(cx);
2145 let file = File::from_dyn(buffer.file())?;
2146 let abs_path = file.as_local()?.abs_path(cx);
2147 let uri = lsp::Url::from_file_path(abs_path).unwrap();
2148 let next_snapshot = buffer.text_snapshot();
2149
2150 let language_servers: Vec<_> = self
2151 .language_servers_for_buffer(buffer, cx)
2152 .map(|i| i.1.clone())
2153 .collect();
2154
2155 for language_server in language_servers {
2156 let language_server = language_server.clone();
2157
2158 let buffer_snapshots = self
2159 .buffer_snapshots
2160 .get_mut(&buffer.remote_id())
2161 .and_then(|m| m.get_mut(&language_server.server_id()))?;
2162 let previous_snapshot = buffer_snapshots.last()?;
2163 let next_version = previous_snapshot.version + 1;
2164
2165 let content_changes = buffer
2166 .edits_since::<(PointUtf16, usize)>(previous_snapshot.snapshot.version())
2167 .map(|edit| {
2168 let edit_start = edit.new.start.0;
2169 let edit_end = edit_start + (edit.old.end.0 - edit.old.start.0);
2170 let new_text = next_snapshot
2171 .text_for_range(edit.new.start.1..edit.new.end.1)
2172 .collect();
2173 lsp::TextDocumentContentChangeEvent {
2174 range: Some(lsp::Range::new(
2175 point_to_lsp(edit_start),
2176 point_to_lsp(edit_end),
2177 )),
2178 range_length: None,
2179 text: new_text,
2180 }
2181 })
2182 .collect();
2183
2184 buffer_snapshots.push(LspBufferSnapshot {
2185 version: next_version,
2186 snapshot: next_snapshot.clone(),
2187 });
2188
2189 language_server
2190 .notify::<lsp::notification::DidChangeTextDocument>(
2191 lsp::DidChangeTextDocumentParams {
2192 text_document: lsp::VersionedTextDocumentIdentifier::new(
2193 uri.clone(),
2194 next_version,
2195 ),
2196 content_changes,
2197 },
2198 )
2199 .log_err();
2200 }
2201 }
2202
2203 BufferEvent::Saved => {
2204 let file = File::from_dyn(buffer.read(cx).file())?;
2205 let worktree_id = file.worktree_id(cx);
2206 let abs_path = file.as_local()?.abs_path(cx);
2207 let text_document = lsp::TextDocumentIdentifier {
2208 uri: lsp::Url::from_file_path(abs_path).unwrap(),
2209 };
2210
2211 for (_, _, server) in self.language_servers_for_worktree(worktree_id) {
2212 server
2213 .notify::<lsp::notification::DidSaveTextDocument>(
2214 lsp::DidSaveTextDocumentParams {
2215 text_document: text_document.clone(),
2216 text: None,
2217 },
2218 )
2219 .log_err();
2220 }
2221
2222 let language_server_ids = self.language_server_ids_for_buffer(buffer.read(cx), cx);
2223 for language_server_id in language_server_ids {
2224 if let Some(LanguageServerState::Running {
2225 adapter,
2226 simulate_disk_based_diagnostics_completion,
2227 ..
2228 }) = self.language_servers.get_mut(&language_server_id)
2229 {
2230 // After saving a buffer using a language server that doesn't provide
2231 // a disk-based progress token, kick off a timer that will reset every
2232 // time the buffer is saved. If the timer eventually fires, simulate
2233 // disk-based diagnostics being finished so that other pieces of UI
2234 // (e.g., project diagnostics view, diagnostic status bar) can update.
2235 // We don't emit an event right away because the language server might take
2236 // some time to publish diagnostics.
2237 if adapter.disk_based_diagnostics_progress_token.is_none() {
2238 const DISK_BASED_DIAGNOSTICS_DEBOUNCE: Duration =
2239 Duration::from_secs(1);
2240
2241 let task = cx.spawn_weak(|this, mut cx| async move {
2242 cx.background().timer(DISK_BASED_DIAGNOSTICS_DEBOUNCE).await;
2243 if let Some(this) = this.upgrade(&cx) {
2244 this.update(&mut cx, |this, cx| {
2245 this.disk_based_diagnostics_finished(
2246 language_server_id,
2247 cx,
2248 );
2249 this.buffer_ordered_messages_tx
2250 .unbounded_send(
2251 BufferOrderedMessage::LanguageServerUpdate {
2252 language_server_id,
2253 message:proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(Default::default())
2254 },
2255 )
2256 .ok();
2257 });
2258 }
2259 });
2260 *simulate_disk_based_diagnostics_completion = Some(task);
2261 }
2262 }
2263 }
2264 }
2265
2266 _ => {}
2267 }
2268
2269 None
2270 }
2271
2272 fn request_buffer_diff_recalculation(
2273 &mut self,
2274 buffer: &ModelHandle<Buffer>,
2275 cx: &mut ModelContext<Self>,
2276 ) {
2277 self.buffers_needing_diff.insert(buffer.downgrade());
2278 let first_insertion = self.buffers_needing_diff.len() == 1;
2279
2280 let settings = settings::get::<ProjectSettings>(cx);
2281 let delay = if let Some(delay) = settings.git.gutter_debounce {
2282 delay
2283 } else {
2284 if first_insertion {
2285 let this = cx.weak_handle();
2286 cx.defer(move |cx| {
2287 if let Some(this) = this.upgrade(cx) {
2288 this.update(cx, |this, cx| {
2289 this.recalculate_buffer_diffs(cx).detach();
2290 });
2291 }
2292 });
2293 }
2294 return;
2295 };
2296
2297 const MIN_DELAY: u64 = 50;
2298 let delay = delay.max(MIN_DELAY);
2299 let duration = Duration::from_millis(delay);
2300
2301 self.git_diff_debouncer
2302 .fire_new(duration, cx, move |this, cx| {
2303 this.recalculate_buffer_diffs(cx)
2304 });
2305 }
2306
2307 fn recalculate_buffer_diffs(&mut self, cx: &mut ModelContext<Self>) -> Task<()> {
2308 cx.spawn(|this, mut cx| async move {
2309 let buffers: Vec<_> = this.update(&mut cx, |this, _| {
2310 this.buffers_needing_diff.drain().collect()
2311 });
2312
2313 let tasks: Vec<_> = this.update(&mut cx, |_, cx| {
2314 buffers
2315 .iter()
2316 .filter_map(|buffer| {
2317 let buffer = buffer.upgrade(cx)?;
2318 buffer.update(cx, |buffer, cx| buffer.git_diff_recalc(cx))
2319 })
2320 .collect()
2321 });
2322
2323 futures::future::join_all(tasks).await;
2324
2325 this.update(&mut cx, |this, cx| {
2326 if !this.buffers_needing_diff.is_empty() {
2327 this.recalculate_buffer_diffs(cx).detach();
2328 } else {
2329 // TODO: Would a `ModelContext<Project>.notify()` suffice here?
2330 for buffer in buffers {
2331 if let Some(buffer) = buffer.upgrade(cx) {
2332 buffer.update(cx, |_, cx| cx.notify());
2333 }
2334 }
2335 }
2336 });
2337 })
2338 }
2339
2340 fn language_servers_for_worktree(
2341 &self,
2342 worktree_id: WorktreeId,
2343 ) -> impl Iterator<Item = (&Arc<CachedLspAdapter>, &Arc<Language>, &Arc<LanguageServer>)> {
2344 self.language_server_ids
2345 .iter()
2346 .filter_map(move |((language_server_worktree_id, _), id)| {
2347 if *language_server_worktree_id == worktree_id {
2348 if let Some(LanguageServerState::Running {
2349 adapter,
2350 language,
2351 server,
2352 ..
2353 }) = self.language_servers.get(id)
2354 {
2355 return Some((adapter, language, server));
2356 }
2357 }
2358 None
2359 })
2360 }
2361
2362 fn maintain_buffer_languages(
2363 languages: Arc<LanguageRegistry>,
2364 cx: &mut ModelContext<Project>,
2365 ) -> Task<()> {
2366 let mut subscription = languages.subscribe();
2367 let mut prev_reload_count = languages.reload_count();
2368 cx.spawn_weak(|project, mut cx| async move {
2369 while let Some(()) = subscription.next().await {
2370 if let Some(project) = project.upgrade(&cx) {
2371 // If the language registry has been reloaded, then remove and
2372 // re-assign the languages on all open buffers.
2373 let reload_count = languages.reload_count();
2374 if reload_count > prev_reload_count {
2375 prev_reload_count = reload_count;
2376 project.update(&mut cx, |this, cx| {
2377 let buffers = this
2378 .opened_buffers
2379 .values()
2380 .filter_map(|b| b.upgrade(cx))
2381 .collect::<Vec<_>>();
2382 for buffer in buffers {
2383 if let Some(f) = File::from_dyn(buffer.read(cx).file()).cloned() {
2384 this.unregister_buffer_from_language_servers(&buffer, &f, cx);
2385 buffer.update(cx, |buffer, cx| buffer.set_language(None, cx));
2386 }
2387 }
2388 });
2389 }
2390
2391 project.update(&mut cx, |project, cx| {
2392 let mut plain_text_buffers = Vec::new();
2393 let mut buffers_with_unknown_injections = Vec::new();
2394 for buffer in project.opened_buffers.values() {
2395 if let Some(handle) = buffer.upgrade(cx) {
2396 let buffer = &handle.read(cx);
2397 if buffer.language().is_none()
2398 || buffer.language() == Some(&*language::PLAIN_TEXT)
2399 {
2400 plain_text_buffers.push(handle);
2401 } else if buffer.contains_unknown_injections() {
2402 buffers_with_unknown_injections.push(handle);
2403 }
2404 }
2405 }
2406
2407 for buffer in plain_text_buffers {
2408 project.detect_language_for_buffer(&buffer, cx);
2409 project.register_buffer_with_language_servers(&buffer, cx);
2410 }
2411
2412 for buffer in buffers_with_unknown_injections {
2413 buffer.update(cx, |buffer, cx| buffer.reparse(cx));
2414 }
2415 });
2416 }
2417 }
2418 })
2419 }
2420
2421 fn maintain_workspace_config(
2422 languages: Arc<LanguageRegistry>,
2423 cx: &mut ModelContext<Project>,
2424 ) -> Task<()> {
2425 let (mut settings_changed_tx, mut settings_changed_rx) = watch::channel();
2426 let _ = postage::stream::Stream::try_recv(&mut settings_changed_rx);
2427
2428 let settings_observation = cx.observe_global::<SettingsStore, _>(move |_, _| {
2429 *settings_changed_tx.borrow_mut() = ();
2430 });
2431 cx.spawn_weak(|this, mut cx| async move {
2432 while let Some(_) = settings_changed_rx.next().await {
2433 let workspace_config = cx.update(|cx| languages.workspace_configuration(cx)).await;
2434 if let Some(this) = this.upgrade(&cx) {
2435 this.read_with(&cx, |this, _| {
2436 for server_state in this.language_servers.values() {
2437 if let LanguageServerState::Running { server, .. } = server_state {
2438 server
2439 .notify::<lsp::notification::DidChangeConfiguration>(
2440 lsp::DidChangeConfigurationParams {
2441 settings: workspace_config.clone(),
2442 },
2443 )
2444 .ok();
2445 }
2446 }
2447 })
2448 } else {
2449 break;
2450 }
2451 }
2452
2453 drop(settings_observation);
2454 })
2455 }
2456
2457 fn detect_language_for_buffer(
2458 &mut self,
2459 buffer_handle: &ModelHandle<Buffer>,
2460 cx: &mut ModelContext<Self>,
2461 ) -> Option<()> {
2462 // If the buffer has a language, set it and start the language server if we haven't already.
2463 let buffer = buffer_handle.read(cx);
2464 let full_path = buffer.file()?.full_path(cx);
2465 let content = buffer.as_rope();
2466 let new_language = self
2467 .languages
2468 .language_for_file(&full_path, Some(content))
2469 .now_or_never()?
2470 .ok()?;
2471 self.set_language_for_buffer(buffer_handle, new_language, cx);
2472 None
2473 }
2474
2475 pub fn set_language_for_buffer(
2476 &mut self,
2477 buffer: &ModelHandle<Buffer>,
2478 new_language: Arc<Language>,
2479 cx: &mut ModelContext<Self>,
2480 ) {
2481 buffer.update(cx, |buffer, cx| {
2482 if buffer.language().map_or(true, |old_language| {
2483 !Arc::ptr_eq(old_language, &new_language)
2484 }) {
2485 buffer.set_language(Some(new_language.clone()), cx);
2486 }
2487 });
2488
2489 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
2490 let worktree = file.worktree.clone();
2491 if let Some(tree) = worktree.read(cx).as_local() {
2492 self.start_language_servers(&worktree, tree.abs_path().clone(), new_language, cx);
2493 }
2494 }
2495 }
2496
2497 fn start_language_servers(
2498 &mut self,
2499 worktree: &ModelHandle<Worktree>,
2500 worktree_path: Arc<Path>,
2501 language: Arc<Language>,
2502 cx: &mut ModelContext<Self>,
2503 ) {
2504 let root_file = worktree.update(cx, |tree, cx| tree.root_file(cx));
2505 let settings = language_settings(Some(&language), root_file.map(|f| f as _).as_ref(), cx);
2506 if !settings.enable_language_server {
2507 return;
2508 }
2509
2510 let worktree_id = worktree.read(cx).id();
2511 for adapter in language.lsp_adapters() {
2512 self.start_language_server(
2513 worktree_id,
2514 worktree_path.clone(),
2515 adapter.clone(),
2516 language.clone(),
2517 cx,
2518 );
2519 }
2520 }
2521
2522 fn start_language_server(
2523 &mut self,
2524 worktree_id: WorktreeId,
2525 worktree_path: Arc<Path>,
2526 adapter: Arc<CachedLspAdapter>,
2527 language: Arc<Language>,
2528 cx: &mut ModelContext<Self>,
2529 ) {
2530 let key = (worktree_id, adapter.name.clone());
2531 if self.language_server_ids.contains_key(&key) {
2532 return;
2533 }
2534
2535 let pending_server = match self.languages.create_pending_language_server(
2536 language.clone(),
2537 adapter.clone(),
2538 worktree_path,
2539 ProjectLspAdapterDelegate::new(self, cx),
2540 cx,
2541 ) {
2542 Some(pending_server) => pending_server,
2543 None => return,
2544 };
2545
2546 let project_settings = settings::get::<ProjectSettings>(cx);
2547 let lsp = project_settings.lsp.get(&adapter.name.0);
2548 let override_options = lsp.map(|s| s.initialization_options.clone()).flatten();
2549
2550 let mut initialization_options = adapter.initialization_options.clone();
2551 match (&mut initialization_options, override_options) {
2552 (Some(initialization_options), Some(override_options)) => {
2553 merge_json_value_into(override_options, initialization_options);
2554 }
2555 (None, override_options) => initialization_options = override_options,
2556 _ => {}
2557 }
2558
2559 let server_id = pending_server.server_id;
2560 let container_dir = pending_server.container_dir.clone();
2561 let state = LanguageServerState::Starting({
2562 let adapter = adapter.clone();
2563 let server_name = adapter.name.0.clone();
2564 let languages = self.languages.clone();
2565 let language = language.clone();
2566 let key = key.clone();
2567
2568 cx.spawn_weak(|this, mut cx| async move {
2569 let result = Self::setup_and_insert_language_server(
2570 this,
2571 initialization_options,
2572 pending_server,
2573 adapter.clone(),
2574 languages,
2575 language.clone(),
2576 server_id,
2577 key,
2578 &mut cx,
2579 )
2580 .await;
2581
2582 match result {
2583 Ok(server) => server,
2584
2585 Err(err) => {
2586 log::error!("failed to start language server {:?}: {}", server_name, err);
2587
2588 if let Some(this) = this.upgrade(&cx) {
2589 if let Some(container_dir) = container_dir {
2590 let installation_test_binary = adapter
2591 .installation_test_binary(container_dir.to_path_buf())
2592 .await;
2593
2594 this.update(&mut cx, |_, cx| {
2595 Self::check_errored_server(
2596 language,
2597 adapter,
2598 server_id,
2599 installation_test_binary,
2600 cx,
2601 )
2602 });
2603 }
2604 }
2605
2606 None
2607 }
2608 }
2609 })
2610 });
2611
2612 self.language_servers.insert(server_id, state);
2613 self.language_server_ids.insert(key, server_id);
2614 }
2615
2616 fn reinstall_language_server(
2617 &mut self,
2618 language: Arc<Language>,
2619 adapter: Arc<CachedLspAdapter>,
2620 server_id: LanguageServerId,
2621 cx: &mut ModelContext<Self>,
2622 ) -> Option<Task<()>> {
2623 log::info!("beginning to reinstall server");
2624
2625 let existing_server = match self.language_servers.remove(&server_id) {
2626 Some(LanguageServerState::Running { server, .. }) => Some(server),
2627 _ => None,
2628 };
2629
2630 for worktree in &self.worktrees {
2631 if let Some(worktree) = worktree.upgrade(cx) {
2632 let key = (worktree.read(cx).id(), adapter.name.clone());
2633 self.language_server_ids.remove(&key);
2634 }
2635 }
2636
2637 Some(cx.spawn(move |this, mut cx| async move {
2638 if let Some(task) = existing_server.and_then(|server| server.shutdown()) {
2639 log::info!("shutting down existing server");
2640 task.await;
2641 }
2642
2643 // TODO: This is race-safe with regards to preventing new instances from
2644 // starting while deleting, but existing instances in other projects are going
2645 // to be very confused and messed up
2646 this.update(&mut cx, |this, cx| {
2647 this.languages.delete_server_container(adapter.clone(), cx)
2648 })
2649 .await;
2650
2651 this.update(&mut cx, |this, mut cx| {
2652 let worktrees = this.worktrees.clone();
2653 for worktree in worktrees {
2654 let worktree = match worktree.upgrade(cx) {
2655 Some(worktree) => worktree.read(cx),
2656 None => continue,
2657 };
2658 let worktree_id = worktree.id();
2659 let root_path = worktree.abs_path();
2660
2661 this.start_language_server(
2662 worktree_id,
2663 root_path,
2664 adapter.clone(),
2665 language.clone(),
2666 &mut cx,
2667 );
2668 }
2669 })
2670 }))
2671 }
2672
2673 async fn setup_and_insert_language_server(
2674 this: WeakModelHandle<Self>,
2675 initialization_options: Option<serde_json::Value>,
2676 pending_server: PendingLanguageServer,
2677 adapter: Arc<CachedLspAdapter>,
2678 languages: Arc<LanguageRegistry>,
2679 language: Arc<Language>,
2680 server_id: LanguageServerId,
2681 key: (WorktreeId, LanguageServerName),
2682 cx: &mut AsyncAppContext,
2683 ) -> Result<Option<Arc<LanguageServer>>> {
2684 let setup = Self::setup_pending_language_server(
2685 this,
2686 initialization_options,
2687 pending_server,
2688 adapter.clone(),
2689 languages,
2690 server_id,
2691 cx,
2692 );
2693
2694 let language_server = match setup.await? {
2695 Some(language_server) => language_server,
2696 None => return Ok(None),
2697 };
2698
2699 let this = match this.upgrade(cx) {
2700 Some(this) => this,
2701 None => return Err(anyhow!("failed to upgrade project handle")),
2702 };
2703
2704 this.update(cx, |this, cx| {
2705 this.insert_newly_running_language_server(
2706 language,
2707 adapter,
2708 language_server.clone(),
2709 server_id,
2710 key,
2711 cx,
2712 )
2713 })?;
2714
2715 Ok(Some(language_server))
2716 }
2717
2718 async fn setup_pending_language_server(
2719 this: WeakModelHandle<Self>,
2720 initialization_options: Option<serde_json::Value>,
2721 pending_server: PendingLanguageServer,
2722 adapter: Arc<CachedLspAdapter>,
2723 languages: Arc<LanguageRegistry>,
2724 server_id: LanguageServerId,
2725 cx: &mut AsyncAppContext,
2726 ) -> Result<Option<Arc<LanguageServer>>> {
2727 let workspace_config = cx.update(|cx| languages.workspace_configuration(cx)).await;
2728 let language_server = match pending_server.task.await? {
2729 Some(server) => server.initialize(initialization_options).await?,
2730 None => {
2731 return Ok(None);
2732 }
2733 };
2734
2735 language_server
2736 .on_notification::<lsp::notification::LogMessage, _>({
2737 move |params, mut cx| {
2738 if let Some(this) = this.upgrade(&cx) {
2739 this.update(&mut cx, |_, cx| {
2740 cx.emit(Event::LanguageServerLog(server_id, params.message))
2741 });
2742 }
2743 }
2744 })
2745 .detach();
2746
2747 language_server
2748 .on_notification::<lsp::notification::PublishDiagnostics, _>({
2749 let adapter = adapter.clone();
2750 move |mut params, cx| {
2751 let this = this;
2752 let adapter = adapter.clone();
2753 cx.spawn(|mut cx| async move {
2754 adapter.process_diagnostics(&mut params).await;
2755 if let Some(this) = this.upgrade(&cx) {
2756 this.update(&mut cx, |this, cx| {
2757 this.update_diagnostics(
2758 server_id,
2759 params,
2760 &adapter.disk_based_diagnostic_sources,
2761 cx,
2762 )
2763 .log_err();
2764 });
2765 }
2766 })
2767 .detach();
2768 }
2769 })
2770 .detach();
2771
2772 language_server
2773 .on_request::<lsp::request::WorkspaceConfiguration, _, _>({
2774 let languages = languages.clone();
2775 move |params, mut cx| {
2776 let languages = languages.clone();
2777 async move {
2778 let workspace_config =
2779 cx.update(|cx| languages.workspace_configuration(cx)).await;
2780 Ok(params
2781 .items
2782 .into_iter()
2783 .map(|item| {
2784 if let Some(section) = &item.section {
2785 workspace_config
2786 .get(section)
2787 .cloned()
2788 .unwrap_or(serde_json::Value::Null)
2789 } else {
2790 workspace_config.clone()
2791 }
2792 })
2793 .collect())
2794 }
2795 }
2796 })
2797 .detach();
2798
2799 // Even though we don't have handling for these requests, respond to them to
2800 // avoid stalling any language server like `gopls` which waits for a response
2801 // to these requests when initializing.
2802 language_server
2803 .on_request::<lsp::request::WorkDoneProgressCreate, _, _>(
2804 move |params, mut cx| async move {
2805 if let Some(this) = this.upgrade(&cx) {
2806 this.update(&mut cx, |this, _| {
2807 if let Some(status) = this.language_server_statuses.get_mut(&server_id)
2808 {
2809 if let lsp::NumberOrString::String(token) = params.token {
2810 status.progress_tokens.insert(token);
2811 }
2812 }
2813 });
2814 }
2815 Ok(())
2816 },
2817 )
2818 .detach();
2819 language_server
2820 .on_request::<lsp::request::RegisterCapability, _, _>({
2821 move |params, mut cx| async move {
2822 let this = this
2823 .upgrade(&cx)
2824 .ok_or_else(|| anyhow!("project dropped"))?;
2825 for reg in params.registrations {
2826 if reg.method == "workspace/didChangeWatchedFiles" {
2827 if let Some(options) = reg.register_options {
2828 let options = serde_json::from_value(options)?;
2829 this.update(&mut cx, |this, cx| {
2830 this.on_lsp_did_change_watched_files(server_id, options, cx);
2831 });
2832 }
2833 }
2834 }
2835 Ok(())
2836 }
2837 })
2838 .detach();
2839
2840 language_server
2841 .on_request::<lsp::request::ApplyWorkspaceEdit, _, _>({
2842 let adapter = adapter.clone();
2843 move |params, cx| {
2844 Self::on_lsp_workspace_edit(this, params, server_id, adapter.clone(), cx)
2845 }
2846 })
2847 .detach();
2848
2849 language_server
2850 .on_request::<lsp::request::InlayHintRefreshRequest, _, _>({
2851 move |(), mut cx| async move {
2852 let this = this
2853 .upgrade(&cx)
2854 .ok_or_else(|| anyhow!("project dropped"))?;
2855 this.update(&mut cx, |project, cx| {
2856 cx.emit(Event::RefreshInlays);
2857 project.remote_id().map(|project_id| {
2858 project.client.send(proto::RefreshInlayHints { project_id })
2859 })
2860 })
2861 .transpose()?;
2862 Ok(())
2863 }
2864 })
2865 .detach();
2866
2867 let disk_based_diagnostics_progress_token =
2868 adapter.disk_based_diagnostics_progress_token.clone();
2869
2870 language_server
2871 .on_notification::<lsp::notification::Progress, _>(move |params, mut cx| {
2872 if let Some(this) = this.upgrade(&cx) {
2873 this.update(&mut cx, |this, cx| {
2874 this.on_lsp_progress(
2875 params,
2876 server_id,
2877 disk_based_diagnostics_progress_token.clone(),
2878 cx,
2879 );
2880 });
2881 }
2882 })
2883 .detach();
2884
2885 language_server
2886 .notify::<lsp::notification::DidChangeConfiguration>(
2887 lsp::DidChangeConfigurationParams {
2888 settings: workspace_config,
2889 },
2890 )
2891 .ok();
2892
2893 Ok(Some(language_server))
2894 }
2895
2896 fn insert_newly_running_language_server(
2897 &mut self,
2898 language: Arc<Language>,
2899 adapter: Arc<CachedLspAdapter>,
2900 language_server: Arc<LanguageServer>,
2901 server_id: LanguageServerId,
2902 key: (WorktreeId, LanguageServerName),
2903 cx: &mut ModelContext<Self>,
2904 ) -> Result<()> {
2905 // If the language server for this key doesn't match the server id, don't store the
2906 // server. Which will cause it to be dropped, killing the process
2907 if self
2908 .language_server_ids
2909 .get(&key)
2910 .map(|id| id != &server_id)
2911 .unwrap_or(false)
2912 {
2913 return Ok(());
2914 }
2915
2916 // Update language_servers collection with Running variant of LanguageServerState
2917 // indicating that the server is up and running and ready
2918 self.language_servers.insert(
2919 server_id,
2920 LanguageServerState::Running {
2921 adapter: adapter.clone(),
2922 language: language.clone(),
2923 watched_paths: Default::default(),
2924 server: language_server.clone(),
2925 simulate_disk_based_diagnostics_completion: None,
2926 },
2927 );
2928
2929 self.language_server_statuses.insert(
2930 server_id,
2931 LanguageServerStatus {
2932 name: language_server.name().to_string(),
2933 pending_work: Default::default(),
2934 has_pending_diagnostic_updates: false,
2935 progress_tokens: Default::default(),
2936 },
2937 );
2938
2939 cx.emit(Event::LanguageServerAdded(server_id));
2940
2941 if let Some(project_id) = self.remote_id() {
2942 self.client.send(proto::StartLanguageServer {
2943 project_id,
2944 server: Some(proto::LanguageServer {
2945 id: server_id.0 as u64,
2946 name: language_server.name().to_string(),
2947 }),
2948 })?;
2949 }
2950
2951 // Tell the language server about every open buffer in the worktree that matches the language.
2952 for buffer in self.opened_buffers.values() {
2953 if let Some(buffer_handle) = buffer.upgrade(cx) {
2954 let buffer = buffer_handle.read(cx);
2955 let file = match File::from_dyn(buffer.file()) {
2956 Some(file) => file,
2957 None => continue,
2958 };
2959 let language = match buffer.language() {
2960 Some(language) => language,
2961 None => continue,
2962 };
2963
2964 if file.worktree.read(cx).id() != key.0
2965 || !language.lsp_adapters().iter().any(|a| a.name == key.1)
2966 {
2967 continue;
2968 }
2969
2970 let file = match file.as_local() {
2971 Some(file) => file,
2972 None => continue,
2973 };
2974
2975 let versions = self
2976 .buffer_snapshots
2977 .entry(buffer.remote_id())
2978 .or_default()
2979 .entry(server_id)
2980 .or_insert_with(|| {
2981 vec![LspBufferSnapshot {
2982 version: 0,
2983 snapshot: buffer.text_snapshot(),
2984 }]
2985 });
2986
2987 let snapshot = versions.last().unwrap();
2988 let version = snapshot.version;
2989 let initial_snapshot = &snapshot.snapshot;
2990 let uri = lsp::Url::from_file_path(file.abs_path(cx)).unwrap();
2991 language_server.notify::<lsp::notification::DidOpenTextDocument>(
2992 lsp::DidOpenTextDocumentParams {
2993 text_document: lsp::TextDocumentItem::new(
2994 uri,
2995 adapter
2996 .language_ids
2997 .get(language.name().as_ref())
2998 .cloned()
2999 .unwrap_or_default(),
3000 version,
3001 initial_snapshot.text(),
3002 ),
3003 },
3004 )?;
3005
3006 buffer_handle.update(cx, |buffer, cx| {
3007 buffer.set_completion_triggers(
3008 language_server
3009 .capabilities()
3010 .completion_provider
3011 .as_ref()
3012 .and_then(|provider| provider.trigger_characters.clone())
3013 .unwrap_or_default(),
3014 cx,
3015 )
3016 });
3017 }
3018 }
3019
3020 cx.notify();
3021 Ok(())
3022 }
3023
3024 // Returns a list of all of the worktrees which no longer have a language server and the root path
3025 // for the stopped server
3026 fn stop_language_server(
3027 &mut self,
3028 worktree_id: WorktreeId,
3029 adapter_name: LanguageServerName,
3030 cx: &mut ModelContext<Self>,
3031 ) -> Task<(Option<PathBuf>, Vec<WorktreeId>)> {
3032 let key = (worktree_id, adapter_name);
3033 if let Some(server_id) = self.language_server_ids.remove(&key) {
3034 // Remove other entries for this language server as well
3035 let mut orphaned_worktrees = vec![worktree_id];
3036 let other_keys = self.language_server_ids.keys().cloned().collect::<Vec<_>>();
3037 for other_key in other_keys {
3038 if self.language_server_ids.get(&other_key) == Some(&server_id) {
3039 self.language_server_ids.remove(&other_key);
3040 orphaned_worktrees.push(other_key.0);
3041 }
3042 }
3043
3044 for buffer in self.opened_buffers.values() {
3045 if let Some(buffer) = buffer.upgrade(cx) {
3046 buffer.update(cx, |buffer, cx| {
3047 buffer.update_diagnostics(server_id, Default::default(), cx);
3048 });
3049 }
3050 }
3051 for worktree in &self.worktrees {
3052 if let Some(worktree) = worktree.upgrade(cx) {
3053 worktree.update(cx, |worktree, cx| {
3054 if let Some(worktree) = worktree.as_local_mut() {
3055 worktree.clear_diagnostics_for_language_server(server_id, cx);
3056 }
3057 });
3058 }
3059 }
3060
3061 self.language_server_statuses.remove(&server_id);
3062 cx.notify();
3063
3064 let server_state = self.language_servers.remove(&server_id);
3065 cx.emit(Event::LanguageServerRemoved(server_id));
3066 cx.spawn_weak(|this, mut cx| async move {
3067 let mut root_path = None;
3068
3069 let server = match server_state {
3070 Some(LanguageServerState::Starting(task)) => task.await,
3071 Some(LanguageServerState::Running { server, .. }) => Some(server),
3072 None => None,
3073 };
3074
3075 if let Some(server) = server {
3076 root_path = Some(server.root_path().clone());
3077 if let Some(shutdown) = server.shutdown() {
3078 shutdown.await;
3079 }
3080 }
3081
3082 if let Some(this) = this.upgrade(&cx) {
3083 this.update(&mut cx, |this, cx| {
3084 this.language_server_statuses.remove(&server_id);
3085 cx.notify();
3086 });
3087 }
3088
3089 (root_path, orphaned_worktrees)
3090 })
3091 } else {
3092 Task::ready((None, Vec::new()))
3093 }
3094 }
3095
3096 pub fn restart_language_servers_for_buffers(
3097 &mut self,
3098 buffers: impl IntoIterator<Item = ModelHandle<Buffer>>,
3099 cx: &mut ModelContext<Self>,
3100 ) -> Option<()> {
3101 let language_server_lookup_info: HashSet<(ModelHandle<Worktree>, Arc<Language>)> = buffers
3102 .into_iter()
3103 .filter_map(|buffer| {
3104 let buffer = buffer.read(cx);
3105 let file = File::from_dyn(buffer.file())?;
3106 let full_path = file.full_path(cx);
3107 let language = self
3108 .languages
3109 .language_for_file(&full_path, Some(buffer.as_rope()))
3110 .now_or_never()?
3111 .ok()?;
3112 Some((file.worktree.clone(), language))
3113 })
3114 .collect();
3115 for (worktree, language) in language_server_lookup_info {
3116 self.restart_language_servers(worktree, language, cx);
3117 }
3118
3119 None
3120 }
3121
3122 // TODO This will break in the case where the adapter's root paths and worktrees are not equal
3123 fn restart_language_servers(
3124 &mut self,
3125 worktree: ModelHandle<Worktree>,
3126 language: Arc<Language>,
3127 cx: &mut ModelContext<Self>,
3128 ) {
3129 let worktree_id = worktree.read(cx).id();
3130 let fallback_path = worktree.read(cx).abs_path();
3131
3132 let mut stops = Vec::new();
3133 for adapter in language.lsp_adapters() {
3134 stops.push(self.stop_language_server(worktree_id, adapter.name.clone(), cx));
3135 }
3136
3137 if stops.is_empty() {
3138 return;
3139 }
3140 let mut stops = stops.into_iter();
3141
3142 cx.spawn_weak(|this, mut cx| async move {
3143 let (original_root_path, mut orphaned_worktrees) = stops.next().unwrap().await;
3144 for stop in stops {
3145 let (_, worktrees) = stop.await;
3146 orphaned_worktrees.extend_from_slice(&worktrees);
3147 }
3148
3149 let this = match this.upgrade(&cx) {
3150 Some(this) => this,
3151 None => return,
3152 };
3153
3154 this.update(&mut cx, |this, cx| {
3155 // Attempt to restart using original server path. Fallback to passed in
3156 // path if we could not retrieve the root path
3157 let root_path = original_root_path
3158 .map(|path_buf| Arc::from(path_buf.as_path()))
3159 .unwrap_or(fallback_path);
3160
3161 this.start_language_servers(&worktree, root_path, language.clone(), cx);
3162
3163 // Lookup new server ids and set them for each of the orphaned worktrees
3164 for adapter in language.lsp_adapters() {
3165 if let Some(new_server_id) = this
3166 .language_server_ids
3167 .get(&(worktree_id, adapter.name.clone()))
3168 .cloned()
3169 {
3170 for &orphaned_worktree in &orphaned_worktrees {
3171 this.language_server_ids
3172 .insert((orphaned_worktree, adapter.name.clone()), new_server_id);
3173 }
3174 }
3175 }
3176 });
3177 })
3178 .detach();
3179 }
3180
3181 fn check_errored_server(
3182 language: Arc<Language>,
3183 adapter: Arc<CachedLspAdapter>,
3184 server_id: LanguageServerId,
3185 installation_test_binary: Option<LanguageServerBinary>,
3186 cx: &mut ModelContext<Self>,
3187 ) {
3188 if !adapter.can_be_reinstalled() {
3189 log::info!(
3190 "Validation check requested for {:?} but it cannot be reinstalled",
3191 adapter.name.0
3192 );
3193 return;
3194 }
3195
3196 cx.spawn(|this, mut cx| async move {
3197 log::info!("About to spawn test binary");
3198
3199 // A lack of test binary counts as a failure
3200 let process = installation_test_binary.and_then(|binary| {
3201 smol::process::Command::new(&binary.path)
3202 .current_dir(&binary.path)
3203 .args(binary.arguments)
3204 .stdin(Stdio::piped())
3205 .stdout(Stdio::piped())
3206 .stderr(Stdio::inherit())
3207 .kill_on_drop(true)
3208 .spawn()
3209 .ok()
3210 });
3211
3212 const PROCESS_TIMEOUT: Duration = Duration::from_secs(5);
3213 let mut timeout = cx.background().timer(PROCESS_TIMEOUT).fuse();
3214
3215 let mut errored = false;
3216 if let Some(mut process) = process {
3217 futures::select! {
3218 status = process.status().fuse() => match status {
3219 Ok(status) => errored = !status.success(),
3220 Err(_) => errored = true,
3221 },
3222
3223 _ = timeout => {
3224 log::info!("test binary time-ed out, this counts as a success");
3225 _ = process.kill();
3226 }
3227 }
3228 } else {
3229 log::warn!("test binary failed to launch");
3230 errored = true;
3231 }
3232
3233 if errored {
3234 log::warn!("test binary check failed");
3235 let task = this.update(&mut cx, move |this, mut cx| {
3236 this.reinstall_language_server(language, adapter, server_id, &mut cx)
3237 });
3238
3239 if let Some(task) = task {
3240 task.await;
3241 }
3242 }
3243 })
3244 .detach();
3245 }
3246
3247 fn on_lsp_progress(
3248 &mut self,
3249 progress: lsp::ProgressParams,
3250 language_server_id: LanguageServerId,
3251 disk_based_diagnostics_progress_token: Option<String>,
3252 cx: &mut ModelContext<Self>,
3253 ) {
3254 let token = match progress.token {
3255 lsp::NumberOrString::String(token) => token,
3256 lsp::NumberOrString::Number(token) => {
3257 log::info!("skipping numeric progress token {}", token);
3258 return;
3259 }
3260 };
3261 let lsp::ProgressParamsValue::WorkDone(progress) = progress.value;
3262 let language_server_status =
3263 if let Some(status) = self.language_server_statuses.get_mut(&language_server_id) {
3264 status
3265 } else {
3266 return;
3267 };
3268
3269 if !language_server_status.progress_tokens.contains(&token) {
3270 return;
3271 }
3272
3273 let is_disk_based_diagnostics_progress = disk_based_diagnostics_progress_token
3274 .as_ref()
3275 .map_or(false, |disk_based_token| {
3276 token.starts_with(disk_based_token)
3277 });
3278
3279 match progress {
3280 lsp::WorkDoneProgress::Begin(report) => {
3281 if is_disk_based_diagnostics_progress {
3282 language_server_status.has_pending_diagnostic_updates = true;
3283 self.disk_based_diagnostics_started(language_server_id, cx);
3284 self.buffer_ordered_messages_tx
3285 .unbounded_send(BufferOrderedMessage::LanguageServerUpdate {
3286 language_server_id,
3287 message: proto::update_language_server::Variant::DiskBasedDiagnosticsUpdating(Default::default())
3288 })
3289 .ok();
3290 } else {
3291 self.on_lsp_work_start(
3292 language_server_id,
3293 token.clone(),
3294 LanguageServerProgress {
3295 message: report.message.clone(),
3296 percentage: report.percentage.map(|p| p as usize),
3297 last_update_at: Instant::now(),
3298 },
3299 cx,
3300 );
3301 self.buffer_ordered_messages_tx
3302 .unbounded_send(BufferOrderedMessage::LanguageServerUpdate {
3303 language_server_id,
3304 message: proto::update_language_server::Variant::WorkStart(
3305 proto::LspWorkStart {
3306 token,
3307 message: report.message,
3308 percentage: report.percentage.map(|p| p as u32),
3309 },
3310 ),
3311 })
3312 .ok();
3313 }
3314 }
3315 lsp::WorkDoneProgress::Report(report) => {
3316 if !is_disk_based_diagnostics_progress {
3317 self.on_lsp_work_progress(
3318 language_server_id,
3319 token.clone(),
3320 LanguageServerProgress {
3321 message: report.message.clone(),
3322 percentage: report.percentage.map(|p| p as usize),
3323 last_update_at: Instant::now(),
3324 },
3325 cx,
3326 );
3327 self.buffer_ordered_messages_tx
3328 .unbounded_send(BufferOrderedMessage::LanguageServerUpdate {
3329 language_server_id,
3330 message: proto::update_language_server::Variant::WorkProgress(
3331 proto::LspWorkProgress {
3332 token,
3333 message: report.message,
3334 percentage: report.percentage.map(|p| p as u32),
3335 },
3336 ),
3337 })
3338 .ok();
3339 }
3340 }
3341 lsp::WorkDoneProgress::End(_) => {
3342 language_server_status.progress_tokens.remove(&token);
3343
3344 if is_disk_based_diagnostics_progress {
3345 language_server_status.has_pending_diagnostic_updates = false;
3346 self.disk_based_diagnostics_finished(language_server_id, cx);
3347 self.buffer_ordered_messages_tx
3348 .unbounded_send(BufferOrderedMessage::LanguageServerUpdate {
3349 language_server_id,
3350 message:
3351 proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
3352 Default::default(),
3353 ),
3354 })
3355 .ok();
3356 } else {
3357 self.on_lsp_work_end(language_server_id, token.clone(), cx);
3358 self.buffer_ordered_messages_tx
3359 .unbounded_send(BufferOrderedMessage::LanguageServerUpdate {
3360 language_server_id,
3361 message: proto::update_language_server::Variant::WorkEnd(
3362 proto::LspWorkEnd { token },
3363 ),
3364 })
3365 .ok();
3366 }
3367 }
3368 }
3369 }
3370
3371 fn on_lsp_work_start(
3372 &mut self,
3373 language_server_id: LanguageServerId,
3374 token: String,
3375 progress: LanguageServerProgress,
3376 cx: &mut ModelContext<Self>,
3377 ) {
3378 if let Some(status) = self.language_server_statuses.get_mut(&language_server_id) {
3379 status.pending_work.insert(token, progress);
3380 cx.notify();
3381 }
3382 }
3383
3384 fn on_lsp_work_progress(
3385 &mut self,
3386 language_server_id: LanguageServerId,
3387 token: String,
3388 progress: LanguageServerProgress,
3389 cx: &mut ModelContext<Self>,
3390 ) {
3391 if let Some(status) = self.language_server_statuses.get_mut(&language_server_id) {
3392 let entry = status
3393 .pending_work
3394 .entry(token)
3395 .or_insert(LanguageServerProgress {
3396 message: Default::default(),
3397 percentage: Default::default(),
3398 last_update_at: progress.last_update_at,
3399 });
3400 if progress.message.is_some() {
3401 entry.message = progress.message;
3402 }
3403 if progress.percentage.is_some() {
3404 entry.percentage = progress.percentage;
3405 }
3406 entry.last_update_at = progress.last_update_at;
3407 cx.notify();
3408 }
3409 }
3410
3411 fn on_lsp_work_end(
3412 &mut self,
3413 language_server_id: LanguageServerId,
3414 token: String,
3415 cx: &mut ModelContext<Self>,
3416 ) {
3417 if let Some(status) = self.language_server_statuses.get_mut(&language_server_id) {
3418 cx.emit(Event::RefreshInlays);
3419 status.pending_work.remove(&token);
3420 cx.notify();
3421 }
3422 }
3423
3424 fn on_lsp_did_change_watched_files(
3425 &mut self,
3426 language_server_id: LanguageServerId,
3427 params: DidChangeWatchedFilesRegistrationOptions,
3428 cx: &mut ModelContext<Self>,
3429 ) {
3430 if let Some(LanguageServerState::Running { watched_paths, .. }) =
3431 self.language_servers.get_mut(&language_server_id)
3432 {
3433 let mut builders = HashMap::default();
3434 for watcher in params.watchers {
3435 for worktree in &self.worktrees {
3436 if let Some(worktree) = worktree.upgrade(cx) {
3437 let glob_is_inside_worktree = worktree.update(cx, |tree, _| {
3438 if let Some(abs_path) = tree.abs_path().to_str() {
3439 let relative_glob_pattern = match &watcher.glob_pattern {
3440 lsp::GlobPattern::String(s) => s
3441 .strip_prefix(abs_path)
3442 .and_then(|s| s.strip_prefix(std::path::MAIN_SEPARATOR)),
3443 lsp::GlobPattern::Relative(rp) => {
3444 let base_uri = match &rp.base_uri {
3445 lsp::OneOf::Left(workspace_folder) => {
3446 &workspace_folder.uri
3447 }
3448 lsp::OneOf::Right(base_uri) => base_uri,
3449 };
3450 base_uri.to_file_path().ok().and_then(|file_path| {
3451 (file_path.to_str() == Some(abs_path))
3452 .then_some(rp.pattern.as_str())
3453 })
3454 }
3455 };
3456 if let Some(relative_glob_pattern) = relative_glob_pattern {
3457 let literal_prefix =
3458 glob_literal_prefix(&relative_glob_pattern);
3459 tree.as_local_mut()
3460 .unwrap()
3461 .add_path_prefix_to_scan(Path::new(literal_prefix).into());
3462 if let Some(glob) = Glob::new(relative_glob_pattern).log_err() {
3463 builders
3464 .entry(tree.id())
3465 .or_insert_with(|| GlobSetBuilder::new())
3466 .add(glob);
3467 }
3468 return true;
3469 }
3470 }
3471 false
3472 });
3473 if glob_is_inside_worktree {
3474 break;
3475 }
3476 }
3477 }
3478 }
3479
3480 watched_paths.clear();
3481 for (worktree_id, builder) in builders {
3482 if let Ok(globset) = builder.build() {
3483 watched_paths.insert(worktree_id, globset);
3484 }
3485 }
3486
3487 cx.notify();
3488 }
3489 }
3490
3491 async fn on_lsp_workspace_edit(
3492 this: WeakModelHandle<Self>,
3493 params: lsp::ApplyWorkspaceEditParams,
3494 server_id: LanguageServerId,
3495 adapter: Arc<CachedLspAdapter>,
3496 mut cx: AsyncAppContext,
3497 ) -> Result<lsp::ApplyWorkspaceEditResponse> {
3498 let this = this
3499 .upgrade(&cx)
3500 .ok_or_else(|| anyhow!("project project closed"))?;
3501 let language_server = this
3502 .read_with(&cx, |this, _| this.language_server_for_id(server_id))
3503 .ok_or_else(|| anyhow!("language server not found"))?;
3504 let transaction = Self::deserialize_workspace_edit(
3505 this.clone(),
3506 params.edit,
3507 true,
3508 adapter.clone(),
3509 language_server.clone(),
3510 &mut cx,
3511 )
3512 .await
3513 .log_err();
3514 this.update(&mut cx, |this, _| {
3515 if let Some(transaction) = transaction {
3516 this.last_workspace_edits_by_language_server
3517 .insert(server_id, transaction);
3518 }
3519 });
3520 Ok(lsp::ApplyWorkspaceEditResponse {
3521 applied: true,
3522 failed_change: None,
3523 failure_reason: None,
3524 })
3525 }
3526
3527 pub fn language_server_statuses(
3528 &self,
3529 ) -> impl DoubleEndedIterator<Item = &LanguageServerStatus> {
3530 self.language_server_statuses.values()
3531 }
3532
3533 pub fn update_diagnostics(
3534 &mut self,
3535 language_server_id: LanguageServerId,
3536 mut params: lsp::PublishDiagnosticsParams,
3537 disk_based_sources: &[String],
3538 cx: &mut ModelContext<Self>,
3539 ) -> Result<()> {
3540 let abs_path = params
3541 .uri
3542 .to_file_path()
3543 .map_err(|_| anyhow!("URI is not a file"))?;
3544 let mut diagnostics = Vec::default();
3545 let mut primary_diagnostic_group_ids = HashMap::default();
3546 let mut sources_by_group_id = HashMap::default();
3547 let mut supporting_diagnostics = HashMap::default();
3548
3549 // Ensure that primary diagnostics are always the most severe
3550 params.diagnostics.sort_by_key(|item| item.severity);
3551
3552 for diagnostic in ¶ms.diagnostics {
3553 let source = diagnostic.source.as_ref();
3554 let code = diagnostic.code.as_ref().map(|code| match code {
3555 lsp::NumberOrString::Number(code) => code.to_string(),
3556 lsp::NumberOrString::String(code) => code.clone(),
3557 });
3558 let range = range_from_lsp(diagnostic.range);
3559 let is_supporting = diagnostic
3560 .related_information
3561 .as_ref()
3562 .map_or(false, |infos| {
3563 infos.iter().any(|info| {
3564 primary_diagnostic_group_ids.contains_key(&(
3565 source,
3566 code.clone(),
3567 range_from_lsp(info.location.range),
3568 ))
3569 })
3570 });
3571
3572 let is_unnecessary = diagnostic.tags.as_ref().map_or(false, |tags| {
3573 tags.iter().any(|tag| *tag == DiagnosticTag::UNNECESSARY)
3574 });
3575
3576 if is_supporting {
3577 supporting_diagnostics.insert(
3578 (source, code.clone(), range),
3579 (diagnostic.severity, is_unnecessary),
3580 );
3581 } else {
3582 let group_id = post_inc(&mut self.next_diagnostic_group_id);
3583 let is_disk_based =
3584 source.map_or(false, |source| disk_based_sources.contains(source));
3585
3586 sources_by_group_id.insert(group_id, source);
3587 primary_diagnostic_group_ids
3588 .insert((source, code.clone(), range.clone()), group_id);
3589
3590 diagnostics.push(DiagnosticEntry {
3591 range,
3592 diagnostic: Diagnostic {
3593 source: diagnostic.source.clone(),
3594 code: code.clone(),
3595 severity: diagnostic.severity.unwrap_or(DiagnosticSeverity::ERROR),
3596 message: diagnostic.message.clone(),
3597 group_id,
3598 is_primary: true,
3599 is_valid: true,
3600 is_disk_based,
3601 is_unnecessary,
3602 },
3603 });
3604 if let Some(infos) = &diagnostic.related_information {
3605 for info in infos {
3606 if info.location.uri == params.uri && !info.message.is_empty() {
3607 let range = range_from_lsp(info.location.range);
3608 diagnostics.push(DiagnosticEntry {
3609 range,
3610 diagnostic: Diagnostic {
3611 source: diagnostic.source.clone(),
3612 code: code.clone(),
3613 severity: DiagnosticSeverity::INFORMATION,
3614 message: info.message.clone(),
3615 group_id,
3616 is_primary: false,
3617 is_valid: true,
3618 is_disk_based,
3619 is_unnecessary: false,
3620 },
3621 });
3622 }
3623 }
3624 }
3625 }
3626 }
3627
3628 for entry in &mut diagnostics {
3629 let diagnostic = &mut entry.diagnostic;
3630 if !diagnostic.is_primary {
3631 let source = *sources_by_group_id.get(&diagnostic.group_id).unwrap();
3632 if let Some(&(severity, is_unnecessary)) = supporting_diagnostics.get(&(
3633 source,
3634 diagnostic.code.clone(),
3635 entry.range.clone(),
3636 )) {
3637 if let Some(severity) = severity {
3638 diagnostic.severity = severity;
3639 }
3640 diagnostic.is_unnecessary = is_unnecessary;
3641 }
3642 }
3643 }
3644
3645 self.update_diagnostic_entries(
3646 language_server_id,
3647 abs_path,
3648 params.version,
3649 diagnostics,
3650 cx,
3651 )?;
3652 Ok(())
3653 }
3654
3655 pub fn update_diagnostic_entries(
3656 &mut self,
3657 server_id: LanguageServerId,
3658 abs_path: PathBuf,
3659 version: Option<i32>,
3660 diagnostics: Vec<DiagnosticEntry<Unclipped<PointUtf16>>>,
3661 cx: &mut ModelContext<Project>,
3662 ) -> Result<(), anyhow::Error> {
3663 let (worktree, relative_path) = self
3664 .find_local_worktree(&abs_path, cx)
3665 .ok_or_else(|| anyhow!("no worktree found for diagnostics path {abs_path:?}"))?;
3666
3667 let project_path = ProjectPath {
3668 worktree_id: worktree.read(cx).id(),
3669 path: relative_path.into(),
3670 };
3671
3672 if let Some(buffer) = self.get_open_buffer(&project_path, cx) {
3673 self.update_buffer_diagnostics(&buffer, server_id, version, diagnostics.clone(), cx)?;
3674 }
3675
3676 let updated = worktree.update(cx, |worktree, cx| {
3677 worktree
3678 .as_local_mut()
3679 .ok_or_else(|| anyhow!("not a local worktree"))?
3680 .update_diagnostics(server_id, project_path.path.clone(), diagnostics, cx)
3681 })?;
3682 if updated {
3683 cx.emit(Event::DiagnosticsUpdated {
3684 language_server_id: server_id,
3685 path: project_path,
3686 });
3687 }
3688 Ok(())
3689 }
3690
3691 fn update_buffer_diagnostics(
3692 &mut self,
3693 buffer: &ModelHandle<Buffer>,
3694 server_id: LanguageServerId,
3695 version: Option<i32>,
3696 mut diagnostics: Vec<DiagnosticEntry<Unclipped<PointUtf16>>>,
3697 cx: &mut ModelContext<Self>,
3698 ) -> Result<()> {
3699 fn compare_diagnostics(a: &Diagnostic, b: &Diagnostic) -> Ordering {
3700 Ordering::Equal
3701 .then_with(|| b.is_primary.cmp(&a.is_primary))
3702 .then_with(|| a.is_disk_based.cmp(&b.is_disk_based))
3703 .then_with(|| a.severity.cmp(&b.severity))
3704 .then_with(|| a.message.cmp(&b.message))
3705 }
3706
3707 let snapshot = self.buffer_snapshot_for_lsp_version(buffer, server_id, version, cx)?;
3708
3709 diagnostics.sort_unstable_by(|a, b| {
3710 Ordering::Equal
3711 .then_with(|| a.range.start.cmp(&b.range.start))
3712 .then_with(|| b.range.end.cmp(&a.range.end))
3713 .then_with(|| compare_diagnostics(&a.diagnostic, &b.diagnostic))
3714 });
3715
3716 let mut sanitized_diagnostics = Vec::new();
3717 let edits_since_save = Patch::new(
3718 snapshot
3719 .edits_since::<Unclipped<PointUtf16>>(buffer.read(cx).saved_version())
3720 .collect(),
3721 );
3722 for entry in diagnostics {
3723 let start;
3724 let end;
3725 if entry.diagnostic.is_disk_based {
3726 // Some diagnostics are based on files on disk instead of buffers'
3727 // current contents. Adjust these diagnostics' ranges to reflect
3728 // any unsaved edits.
3729 start = edits_since_save.old_to_new(entry.range.start);
3730 end = edits_since_save.old_to_new(entry.range.end);
3731 } else {
3732 start = entry.range.start;
3733 end = entry.range.end;
3734 }
3735
3736 let mut range = snapshot.clip_point_utf16(start, Bias::Left)
3737 ..snapshot.clip_point_utf16(end, Bias::Right);
3738
3739 // Expand empty ranges by one codepoint
3740 if range.start == range.end {
3741 // This will be go to the next boundary when being clipped
3742 range.end.column += 1;
3743 range.end = snapshot.clip_point_utf16(Unclipped(range.end), Bias::Right);
3744 if range.start == range.end && range.end.column > 0 {
3745 range.start.column -= 1;
3746 range.end = snapshot.clip_point_utf16(Unclipped(range.end), Bias::Left);
3747 }
3748 }
3749
3750 sanitized_diagnostics.push(DiagnosticEntry {
3751 range,
3752 diagnostic: entry.diagnostic,
3753 });
3754 }
3755 drop(edits_since_save);
3756
3757 let set = DiagnosticSet::new(sanitized_diagnostics, &snapshot);
3758 buffer.update(cx, |buffer, cx| {
3759 buffer.update_diagnostics(server_id, set, cx)
3760 });
3761 Ok(())
3762 }
3763
3764 pub fn reload_buffers(
3765 &self,
3766 buffers: HashSet<ModelHandle<Buffer>>,
3767 push_to_history: bool,
3768 cx: &mut ModelContext<Self>,
3769 ) -> Task<Result<ProjectTransaction>> {
3770 let mut local_buffers = Vec::new();
3771 let mut remote_buffers = None;
3772 for buffer_handle in buffers {
3773 let buffer = buffer_handle.read(cx);
3774 if buffer.is_dirty() {
3775 if let Some(file) = File::from_dyn(buffer.file()) {
3776 if file.is_local() {
3777 local_buffers.push(buffer_handle);
3778 } else {
3779 remote_buffers.get_or_insert(Vec::new()).push(buffer_handle);
3780 }
3781 }
3782 }
3783 }
3784
3785 let remote_buffers = self.remote_id().zip(remote_buffers);
3786 let client = self.client.clone();
3787
3788 cx.spawn(|this, mut cx| async move {
3789 let mut project_transaction = ProjectTransaction::default();
3790
3791 if let Some((project_id, remote_buffers)) = remote_buffers {
3792 let response = client
3793 .request(proto::ReloadBuffers {
3794 project_id,
3795 buffer_ids: remote_buffers
3796 .iter()
3797 .map(|buffer| buffer.read_with(&cx, |buffer, _| buffer.remote_id()))
3798 .collect(),
3799 })
3800 .await?
3801 .transaction
3802 .ok_or_else(|| anyhow!("missing transaction"))?;
3803 project_transaction = this
3804 .update(&mut cx, |this, cx| {
3805 this.deserialize_project_transaction(response, push_to_history, cx)
3806 })
3807 .await?;
3808 }
3809
3810 for buffer in local_buffers {
3811 let transaction = buffer
3812 .update(&mut cx, |buffer, cx| buffer.reload(cx))
3813 .await?;
3814 buffer.update(&mut cx, |buffer, cx| {
3815 if let Some(transaction) = transaction {
3816 if !push_to_history {
3817 buffer.forget_transaction(transaction.id);
3818 }
3819 project_transaction.0.insert(cx.handle(), transaction);
3820 }
3821 });
3822 }
3823
3824 Ok(project_transaction)
3825 })
3826 }
3827
3828 pub fn format(
3829 &self,
3830 buffers: HashSet<ModelHandle<Buffer>>,
3831 push_to_history: bool,
3832 trigger: FormatTrigger,
3833 cx: &mut ModelContext<Project>,
3834 ) -> Task<Result<ProjectTransaction>> {
3835 if self.is_local() {
3836 let mut buffers_with_paths_and_servers = buffers
3837 .into_iter()
3838 .filter_map(|buffer_handle| {
3839 let buffer = buffer_handle.read(cx);
3840 let file = File::from_dyn(buffer.file())?;
3841 let buffer_abs_path = file.as_local().map(|f| f.abs_path(cx));
3842 let server = self
3843 .primary_language_servers_for_buffer(buffer, cx)
3844 .map(|s| s.1.clone());
3845 Some((buffer_handle, buffer_abs_path, server))
3846 })
3847 .collect::<Vec<_>>();
3848
3849 cx.spawn(|this, mut cx| async move {
3850 // Do not allow multiple concurrent formatting requests for the
3851 // same buffer.
3852 this.update(&mut cx, |this, cx| {
3853 buffers_with_paths_and_servers.retain(|(buffer, _, _)| {
3854 this.buffers_being_formatted
3855 .insert(buffer.read(cx).remote_id())
3856 });
3857 });
3858
3859 let _cleanup = defer({
3860 let this = this.clone();
3861 let mut cx = cx.clone();
3862 let buffers = &buffers_with_paths_and_servers;
3863 move || {
3864 this.update(&mut cx, |this, cx| {
3865 for (buffer, _, _) in buffers {
3866 this.buffers_being_formatted
3867 .remove(&buffer.read(cx).remote_id());
3868 }
3869 });
3870 }
3871 });
3872
3873 let mut project_transaction = ProjectTransaction::default();
3874 for (buffer, buffer_abs_path, language_server) in &buffers_with_paths_and_servers {
3875 let settings = buffer.read_with(&cx, |buffer, cx| {
3876 language_settings(buffer.language(), buffer.file(), cx).clone()
3877 });
3878
3879 let remove_trailing_whitespace = settings.remove_trailing_whitespace_on_save;
3880 let ensure_final_newline = settings.ensure_final_newline_on_save;
3881 let format_on_save = settings.format_on_save.clone();
3882 let formatter = settings.formatter.clone();
3883 let tab_size = settings.tab_size;
3884
3885 // First, format buffer's whitespace according to the settings.
3886 let trailing_whitespace_diff = if remove_trailing_whitespace {
3887 Some(
3888 buffer
3889 .read_with(&cx, |b, cx| b.remove_trailing_whitespace(cx))
3890 .await,
3891 )
3892 } else {
3893 None
3894 };
3895 let whitespace_transaction_id = buffer.update(&mut cx, |buffer, cx| {
3896 buffer.finalize_last_transaction();
3897 buffer.start_transaction();
3898 if let Some(diff) = trailing_whitespace_diff {
3899 buffer.apply_diff(diff, cx);
3900 }
3901 if ensure_final_newline {
3902 buffer.ensure_final_newline(cx);
3903 }
3904 buffer.end_transaction(cx)
3905 });
3906
3907 // Currently, formatting operations are represented differently depending on
3908 // whether they come from a language server or an external command.
3909 enum FormatOperation {
3910 Lsp(Vec<(Range<Anchor>, String)>),
3911 External(Diff),
3912 }
3913
3914 // Apply language-specific formatting using either a language server
3915 // or external command.
3916 let mut format_operation = None;
3917 match (formatter, format_on_save) {
3918 (_, FormatOnSave::Off) if trigger == FormatTrigger::Save => {}
3919
3920 (Formatter::LanguageServer, FormatOnSave::On | FormatOnSave::Off)
3921 | (_, FormatOnSave::LanguageServer) => {
3922 if let Some((language_server, buffer_abs_path)) =
3923 language_server.as_ref().zip(buffer_abs_path.as_ref())
3924 {
3925 format_operation = Some(FormatOperation::Lsp(
3926 Self::format_via_lsp(
3927 &this,
3928 &buffer,
3929 buffer_abs_path,
3930 &language_server,
3931 tab_size,
3932 &mut cx,
3933 )
3934 .await
3935 .context("failed to format via language server")?,
3936 ));
3937 }
3938 }
3939
3940 (
3941 Formatter::External { command, arguments },
3942 FormatOnSave::On | FormatOnSave::Off,
3943 )
3944 | (_, FormatOnSave::External { command, arguments }) => {
3945 if let Some(buffer_abs_path) = buffer_abs_path {
3946 format_operation = Self::format_via_external_command(
3947 &buffer,
3948 &buffer_abs_path,
3949 &command,
3950 &arguments,
3951 &mut cx,
3952 )
3953 .await
3954 .context(format!(
3955 "failed to format via external command {:?}",
3956 command
3957 ))?
3958 .map(FormatOperation::External);
3959 }
3960 }
3961 };
3962
3963 buffer.update(&mut cx, |b, cx| {
3964 // If the buffer had its whitespace formatted and was edited while the language-specific
3965 // formatting was being computed, avoid applying the language-specific formatting, because
3966 // it can't be grouped with the whitespace formatting in the undo history.
3967 if let Some(transaction_id) = whitespace_transaction_id {
3968 if b.peek_undo_stack()
3969 .map_or(true, |e| e.transaction_id() != transaction_id)
3970 {
3971 format_operation.take();
3972 }
3973 }
3974
3975 // Apply any language-specific formatting, and group the two formatting operations
3976 // in the buffer's undo history.
3977 if let Some(operation) = format_operation {
3978 match operation {
3979 FormatOperation::Lsp(edits) => {
3980 b.edit(edits, None, cx);
3981 }
3982 FormatOperation::External(diff) => {
3983 b.apply_diff(diff, cx);
3984 }
3985 }
3986
3987 if let Some(transaction_id) = whitespace_transaction_id {
3988 b.group_until_transaction(transaction_id);
3989 }
3990 }
3991
3992 if let Some(transaction) = b.finalize_last_transaction().cloned() {
3993 if !push_to_history {
3994 b.forget_transaction(transaction.id);
3995 }
3996 project_transaction.0.insert(buffer.clone(), transaction);
3997 }
3998 });
3999 }
4000
4001 Ok(project_transaction)
4002 })
4003 } else {
4004 let remote_id = self.remote_id();
4005 let client = self.client.clone();
4006 cx.spawn(|this, mut cx| async move {
4007 let mut project_transaction = ProjectTransaction::default();
4008 if let Some(project_id) = remote_id {
4009 let response = client
4010 .request(proto::FormatBuffers {
4011 project_id,
4012 trigger: trigger as i32,
4013 buffer_ids: buffers
4014 .iter()
4015 .map(|buffer| buffer.read_with(&cx, |buffer, _| buffer.remote_id()))
4016 .collect(),
4017 })
4018 .await?
4019 .transaction
4020 .ok_or_else(|| anyhow!("missing transaction"))?;
4021 project_transaction = this
4022 .update(&mut cx, |this, cx| {
4023 this.deserialize_project_transaction(response, push_to_history, cx)
4024 })
4025 .await?;
4026 }
4027 Ok(project_transaction)
4028 })
4029 }
4030 }
4031
4032 async fn format_via_lsp(
4033 this: &ModelHandle<Self>,
4034 buffer: &ModelHandle<Buffer>,
4035 abs_path: &Path,
4036 language_server: &Arc<LanguageServer>,
4037 tab_size: NonZeroU32,
4038 cx: &mut AsyncAppContext,
4039 ) -> Result<Vec<(Range<Anchor>, String)>> {
4040 let uri = lsp::Url::from_file_path(abs_path)
4041 .map_err(|_| anyhow!("failed to convert abs path to uri"))?;
4042 let text_document = lsp::TextDocumentIdentifier::new(uri);
4043 let capabilities = &language_server.capabilities();
4044
4045 let formatting_provider = capabilities.document_formatting_provider.as_ref();
4046 let range_formatting_provider = capabilities.document_range_formatting_provider.as_ref();
4047
4048 let lsp_edits = if matches!(formatting_provider, Some(p) if *p != OneOf::Left(false)) {
4049 language_server
4050 .request::<lsp::request::Formatting>(lsp::DocumentFormattingParams {
4051 text_document,
4052 options: lsp_command::lsp_formatting_options(tab_size.get()),
4053 work_done_progress_params: Default::default(),
4054 })
4055 .await?
4056 } else if matches!(range_formatting_provider, Some(p) if *p != OneOf::Left(false)) {
4057 let buffer_start = lsp::Position::new(0, 0);
4058 let buffer_end = buffer.read_with(cx, |b, _| point_to_lsp(b.max_point_utf16()));
4059
4060 language_server
4061 .request::<lsp::request::RangeFormatting>(lsp::DocumentRangeFormattingParams {
4062 text_document,
4063 range: lsp::Range::new(buffer_start, buffer_end),
4064 options: lsp_command::lsp_formatting_options(tab_size.get()),
4065 work_done_progress_params: Default::default(),
4066 })
4067 .await?
4068 } else {
4069 None
4070 };
4071
4072 if let Some(lsp_edits) = lsp_edits {
4073 this.update(cx, |this, cx| {
4074 this.edits_from_lsp(buffer, lsp_edits, language_server.server_id(), None, cx)
4075 })
4076 .await
4077 } else {
4078 Ok(Vec::new())
4079 }
4080 }
4081
4082 async fn format_via_external_command(
4083 buffer: &ModelHandle<Buffer>,
4084 buffer_abs_path: &Path,
4085 command: &str,
4086 arguments: &[String],
4087 cx: &mut AsyncAppContext,
4088 ) -> Result<Option<Diff>> {
4089 let working_dir_path = buffer.read_with(cx, |buffer, cx| {
4090 let file = File::from_dyn(buffer.file())?;
4091 let worktree = file.worktree.read(cx).as_local()?;
4092 let mut worktree_path = worktree.abs_path().to_path_buf();
4093 if worktree.root_entry()?.is_file() {
4094 worktree_path.pop();
4095 }
4096 Some(worktree_path)
4097 });
4098
4099 if let Some(working_dir_path) = working_dir_path {
4100 let mut child =
4101 smol::process::Command::new(command)
4102 .args(arguments.iter().map(|arg| {
4103 arg.replace("{buffer_path}", &buffer_abs_path.to_string_lossy())
4104 }))
4105 .current_dir(&working_dir_path)
4106 .stdin(smol::process::Stdio::piped())
4107 .stdout(smol::process::Stdio::piped())
4108 .stderr(smol::process::Stdio::piped())
4109 .spawn()?;
4110 let stdin = child
4111 .stdin
4112 .as_mut()
4113 .ok_or_else(|| anyhow!("failed to acquire stdin"))?;
4114 let text = buffer.read_with(cx, |buffer, _| buffer.as_rope().clone());
4115 for chunk in text.chunks() {
4116 stdin.write_all(chunk.as_bytes()).await?;
4117 }
4118 stdin.flush().await?;
4119
4120 let output = child.output().await?;
4121 if !output.status.success() {
4122 return Err(anyhow!(
4123 "command failed with exit code {:?}:\nstdout: {}\nstderr: {}",
4124 output.status.code(),
4125 String::from_utf8_lossy(&output.stdout),
4126 String::from_utf8_lossy(&output.stderr),
4127 ));
4128 }
4129
4130 let stdout = String::from_utf8(output.stdout)?;
4131 Ok(Some(
4132 buffer
4133 .read_with(cx, |buffer, cx| buffer.diff(stdout, cx))
4134 .await,
4135 ))
4136 } else {
4137 Ok(None)
4138 }
4139 }
4140
4141 pub fn definition<T: ToPointUtf16>(
4142 &self,
4143 buffer: &ModelHandle<Buffer>,
4144 position: T,
4145 cx: &mut ModelContext<Self>,
4146 ) -> Task<Result<Vec<LocationLink>>> {
4147 let position = position.to_point_utf16(buffer.read(cx));
4148 self.request_lsp(buffer.clone(), GetDefinition { position }, cx)
4149 }
4150
4151 pub fn type_definition<T: ToPointUtf16>(
4152 &self,
4153 buffer: &ModelHandle<Buffer>,
4154 position: T,
4155 cx: &mut ModelContext<Self>,
4156 ) -> Task<Result<Vec<LocationLink>>> {
4157 let position = position.to_point_utf16(buffer.read(cx));
4158 self.request_lsp(buffer.clone(), GetTypeDefinition { position }, cx)
4159 }
4160
4161 pub fn references<T: ToPointUtf16>(
4162 &self,
4163 buffer: &ModelHandle<Buffer>,
4164 position: T,
4165 cx: &mut ModelContext<Self>,
4166 ) -> Task<Result<Vec<Location>>> {
4167 let position = position.to_point_utf16(buffer.read(cx));
4168 self.request_lsp(buffer.clone(), GetReferences { position }, cx)
4169 }
4170
4171 pub fn document_highlights<T: ToPointUtf16>(
4172 &self,
4173 buffer: &ModelHandle<Buffer>,
4174 position: T,
4175 cx: &mut ModelContext<Self>,
4176 ) -> Task<Result<Vec<DocumentHighlight>>> {
4177 let position = position.to_point_utf16(buffer.read(cx));
4178 self.request_lsp(buffer.clone(), GetDocumentHighlights { position }, cx)
4179 }
4180
4181 pub fn symbols(&self, query: &str, cx: &mut ModelContext<Self>) -> Task<Result<Vec<Symbol>>> {
4182 if self.is_local() {
4183 let mut requests = Vec::new();
4184 for ((worktree_id, _), server_id) in self.language_server_ids.iter() {
4185 let worktree_id = *worktree_id;
4186 let worktree_handle = self.worktree_for_id(worktree_id, cx);
4187 let worktree = match worktree_handle.and_then(|tree| tree.read(cx).as_local()) {
4188 Some(worktree) => worktree,
4189 None => continue,
4190 };
4191 let worktree_abs_path = worktree.abs_path().clone();
4192
4193 let (adapter, language, server) = match self.language_servers.get(server_id) {
4194 Some(LanguageServerState::Running {
4195 adapter,
4196 language,
4197 server,
4198 ..
4199 }) => (adapter.clone(), language.clone(), server),
4200
4201 _ => continue,
4202 };
4203
4204 requests.push(
4205 server
4206 .request::<lsp::request::WorkspaceSymbolRequest>(
4207 lsp::WorkspaceSymbolParams {
4208 query: query.to_string(),
4209 ..Default::default()
4210 },
4211 )
4212 .log_err()
4213 .map(move |response| {
4214 let lsp_symbols = response.flatten().map(|symbol_response| match symbol_response {
4215 lsp::WorkspaceSymbolResponse::Flat(flat_responses) => {
4216 flat_responses.into_iter().map(|lsp_symbol| {
4217 (lsp_symbol.name, lsp_symbol.kind, lsp_symbol.location)
4218 }).collect::<Vec<_>>()
4219 }
4220 lsp::WorkspaceSymbolResponse::Nested(nested_responses) => {
4221 nested_responses.into_iter().filter_map(|lsp_symbol| {
4222 let location = match lsp_symbol.location {
4223 OneOf::Left(location) => location,
4224 OneOf::Right(_) => {
4225 error!("Unexpected: client capabilities forbid symbol resolutions in workspace.symbol.resolveSupport");
4226 return None
4227 }
4228 };
4229 Some((lsp_symbol.name, lsp_symbol.kind, location))
4230 }).collect::<Vec<_>>()
4231 }
4232 }).unwrap_or_default();
4233
4234 (
4235 adapter,
4236 language,
4237 worktree_id,
4238 worktree_abs_path,
4239 lsp_symbols,
4240 )
4241 }),
4242 );
4243 }
4244
4245 cx.spawn_weak(|this, cx| async move {
4246 let responses = futures::future::join_all(requests).await;
4247 let this = match this.upgrade(&cx) {
4248 Some(this) => this,
4249 None => return Ok(Vec::new()),
4250 };
4251
4252 let symbols = this.read_with(&cx, |this, cx| {
4253 let mut symbols = Vec::new();
4254 for (
4255 adapter,
4256 adapter_language,
4257 source_worktree_id,
4258 worktree_abs_path,
4259 lsp_symbols,
4260 ) in responses
4261 {
4262 symbols.extend(lsp_symbols.into_iter().filter_map(
4263 |(symbol_name, symbol_kind, symbol_location)| {
4264 let abs_path = symbol_location.uri.to_file_path().ok()?;
4265 let mut worktree_id = source_worktree_id;
4266 let path;
4267 if let Some((worktree, rel_path)) =
4268 this.find_local_worktree(&abs_path, cx)
4269 {
4270 worktree_id = worktree.read(cx).id();
4271 path = rel_path;
4272 } else {
4273 path = relativize_path(&worktree_abs_path, &abs_path);
4274 }
4275
4276 let project_path = ProjectPath {
4277 worktree_id,
4278 path: path.into(),
4279 };
4280 let signature = this.symbol_signature(&project_path);
4281 let adapter_language = adapter_language.clone();
4282 let language = this
4283 .languages
4284 .language_for_file(&project_path.path, None)
4285 .unwrap_or_else(move |_| adapter_language);
4286 let language_server_name = adapter.name.clone();
4287 Some(async move {
4288 let language = language.await;
4289 let label =
4290 language.label_for_symbol(&symbol_name, symbol_kind).await;
4291
4292 Symbol {
4293 language_server_name,
4294 source_worktree_id,
4295 path: project_path,
4296 label: label.unwrap_or_else(|| {
4297 CodeLabel::plain(symbol_name.clone(), None)
4298 }),
4299 kind: symbol_kind,
4300 name: symbol_name,
4301 range: range_from_lsp(symbol_location.range),
4302 signature,
4303 }
4304 })
4305 },
4306 ));
4307 }
4308
4309 symbols
4310 });
4311
4312 Ok(futures::future::join_all(symbols).await)
4313 })
4314 } else if let Some(project_id) = self.remote_id() {
4315 let request = self.client.request(proto::GetProjectSymbols {
4316 project_id,
4317 query: query.to_string(),
4318 });
4319 cx.spawn_weak(|this, cx| async move {
4320 let response = request.await?;
4321 let mut symbols = Vec::new();
4322 if let Some(this) = this.upgrade(&cx) {
4323 let new_symbols = this.read_with(&cx, |this, _| {
4324 response
4325 .symbols
4326 .into_iter()
4327 .map(|symbol| this.deserialize_symbol(symbol))
4328 .collect::<Vec<_>>()
4329 });
4330 symbols = futures::future::join_all(new_symbols)
4331 .await
4332 .into_iter()
4333 .filter_map(|symbol| symbol.log_err())
4334 .collect::<Vec<_>>();
4335 }
4336 Ok(symbols)
4337 })
4338 } else {
4339 Task::ready(Ok(Default::default()))
4340 }
4341 }
4342
4343 pub fn open_buffer_for_symbol(
4344 &mut self,
4345 symbol: &Symbol,
4346 cx: &mut ModelContext<Self>,
4347 ) -> Task<Result<ModelHandle<Buffer>>> {
4348 if self.is_local() {
4349 let language_server_id = if let Some(id) = self.language_server_ids.get(&(
4350 symbol.source_worktree_id,
4351 symbol.language_server_name.clone(),
4352 )) {
4353 *id
4354 } else {
4355 return Task::ready(Err(anyhow!(
4356 "language server for worktree and language not found"
4357 )));
4358 };
4359
4360 let worktree_abs_path = if let Some(worktree_abs_path) = self
4361 .worktree_for_id(symbol.path.worktree_id, cx)
4362 .and_then(|worktree| worktree.read(cx).as_local())
4363 .map(|local_worktree| local_worktree.abs_path())
4364 {
4365 worktree_abs_path
4366 } else {
4367 return Task::ready(Err(anyhow!("worktree not found for symbol")));
4368 };
4369 let symbol_abs_path = worktree_abs_path.join(&symbol.path.path);
4370 let symbol_uri = if let Ok(uri) = lsp::Url::from_file_path(symbol_abs_path) {
4371 uri
4372 } else {
4373 return Task::ready(Err(anyhow!("invalid symbol path")));
4374 };
4375
4376 self.open_local_buffer_via_lsp(
4377 symbol_uri,
4378 language_server_id,
4379 symbol.language_server_name.clone(),
4380 cx,
4381 )
4382 } else if let Some(project_id) = self.remote_id() {
4383 let request = self.client.request(proto::OpenBufferForSymbol {
4384 project_id,
4385 symbol: Some(serialize_symbol(symbol)),
4386 });
4387 cx.spawn(|this, mut cx| async move {
4388 let response = request.await?;
4389 this.update(&mut cx, |this, cx| {
4390 this.wait_for_remote_buffer(response.buffer_id, cx)
4391 })
4392 .await
4393 })
4394 } else {
4395 Task::ready(Err(anyhow!("project does not have a remote id")))
4396 }
4397 }
4398
4399 pub fn hover<T: ToPointUtf16>(
4400 &self,
4401 buffer: &ModelHandle<Buffer>,
4402 position: T,
4403 cx: &mut ModelContext<Self>,
4404 ) -> Task<Result<Option<Hover>>> {
4405 let position = position.to_point_utf16(buffer.read(cx));
4406 self.request_lsp(buffer.clone(), GetHover { position }, cx)
4407 }
4408
4409 pub fn completions<T: ToPointUtf16>(
4410 &self,
4411 buffer: &ModelHandle<Buffer>,
4412 position: T,
4413 cx: &mut ModelContext<Self>,
4414 ) -> Task<Result<Vec<Completion>>> {
4415 let position = position.to_point_utf16(buffer.read(cx));
4416 self.request_lsp(buffer.clone(), GetCompletions { position }, cx)
4417 }
4418
4419 pub fn apply_additional_edits_for_completion(
4420 &self,
4421 buffer_handle: ModelHandle<Buffer>,
4422 completion: Completion,
4423 push_to_history: bool,
4424 cx: &mut ModelContext<Self>,
4425 ) -> Task<Result<Option<Transaction>>> {
4426 let buffer = buffer_handle.read(cx);
4427 let buffer_id = buffer.remote_id();
4428
4429 if self.is_local() {
4430 let lang_server = match self.primary_language_servers_for_buffer(buffer, cx) {
4431 Some((_, server)) => server.clone(),
4432 _ => return Task::ready(Ok(Default::default())),
4433 };
4434
4435 cx.spawn(|this, mut cx| async move {
4436 let resolved_completion = lang_server
4437 .request::<lsp::request::ResolveCompletionItem>(completion.lsp_completion)
4438 .await?;
4439
4440 if let Some(edits) = resolved_completion.additional_text_edits {
4441 let edits = this
4442 .update(&mut cx, |this, cx| {
4443 this.edits_from_lsp(
4444 &buffer_handle,
4445 edits,
4446 lang_server.server_id(),
4447 None,
4448 cx,
4449 )
4450 })
4451 .await?;
4452
4453 buffer_handle.update(&mut cx, |buffer, cx| {
4454 buffer.finalize_last_transaction();
4455 buffer.start_transaction();
4456
4457 for (range, text) in edits {
4458 let primary = &completion.old_range;
4459 let start_within = primary.start.cmp(&range.start, buffer).is_le()
4460 && primary.end.cmp(&range.start, buffer).is_ge();
4461 let end_within = range.start.cmp(&primary.end, buffer).is_le()
4462 && range.end.cmp(&primary.end, buffer).is_ge();
4463
4464 //Skip additional edits which overlap with the primary completion edit
4465 //https://github.com/zed-industries/zed/pull/1871
4466 if !start_within && !end_within {
4467 buffer.edit([(range, text)], None, cx);
4468 }
4469 }
4470
4471 let transaction = if buffer.end_transaction(cx).is_some() {
4472 let transaction = buffer.finalize_last_transaction().unwrap().clone();
4473 if !push_to_history {
4474 buffer.forget_transaction(transaction.id);
4475 }
4476 Some(transaction)
4477 } else {
4478 None
4479 };
4480 Ok(transaction)
4481 })
4482 } else {
4483 Ok(None)
4484 }
4485 })
4486 } else if let Some(project_id) = self.remote_id() {
4487 let client = self.client.clone();
4488 cx.spawn(|_, mut cx| async move {
4489 let response = client
4490 .request(proto::ApplyCompletionAdditionalEdits {
4491 project_id,
4492 buffer_id,
4493 completion: Some(language::proto::serialize_completion(&completion)),
4494 })
4495 .await?;
4496
4497 if let Some(transaction) = response.transaction {
4498 let transaction = language::proto::deserialize_transaction(transaction)?;
4499 buffer_handle
4500 .update(&mut cx, |buffer, _| {
4501 buffer.wait_for_edits(transaction.edit_ids.iter().copied())
4502 })
4503 .await?;
4504 if push_to_history {
4505 buffer_handle.update(&mut cx, |buffer, _| {
4506 buffer.push_transaction(transaction.clone(), Instant::now());
4507 });
4508 }
4509 Ok(Some(transaction))
4510 } else {
4511 Ok(None)
4512 }
4513 })
4514 } else {
4515 Task::ready(Err(anyhow!("project does not have a remote id")))
4516 }
4517 }
4518
4519 pub fn code_actions<T: Clone + ToOffset>(
4520 &self,
4521 buffer_handle: &ModelHandle<Buffer>,
4522 range: Range<T>,
4523 cx: &mut ModelContext<Self>,
4524 ) -> Task<Result<Vec<CodeAction>>> {
4525 let buffer = buffer_handle.read(cx);
4526 let range = buffer.anchor_before(range.start)..buffer.anchor_before(range.end);
4527 self.request_lsp(buffer_handle.clone(), GetCodeActions { range }, cx)
4528 }
4529
4530 pub fn apply_code_action(
4531 &self,
4532 buffer_handle: ModelHandle<Buffer>,
4533 mut action: CodeAction,
4534 push_to_history: bool,
4535 cx: &mut ModelContext<Self>,
4536 ) -> Task<Result<ProjectTransaction>> {
4537 if self.is_local() {
4538 let buffer = buffer_handle.read(cx);
4539 let (lsp_adapter, lang_server) = if let Some((adapter, server)) =
4540 self.language_server_for_buffer(buffer, action.server_id, cx)
4541 {
4542 (adapter.clone(), server.clone())
4543 } else {
4544 return Task::ready(Ok(Default::default()));
4545 };
4546 let range = action.range.to_point_utf16(buffer);
4547
4548 cx.spawn(|this, mut cx| async move {
4549 if let Some(lsp_range) = action
4550 .lsp_action
4551 .data
4552 .as_mut()
4553 .and_then(|d| d.get_mut("codeActionParams"))
4554 .and_then(|d| d.get_mut("range"))
4555 {
4556 *lsp_range = serde_json::to_value(&range_to_lsp(range)).unwrap();
4557 action.lsp_action = lang_server
4558 .request::<lsp::request::CodeActionResolveRequest>(action.lsp_action)
4559 .await?;
4560 } else {
4561 let actions = this
4562 .update(&mut cx, |this, cx| {
4563 this.code_actions(&buffer_handle, action.range, cx)
4564 })
4565 .await?;
4566 action.lsp_action = actions
4567 .into_iter()
4568 .find(|a| a.lsp_action.title == action.lsp_action.title)
4569 .ok_or_else(|| anyhow!("code action is outdated"))?
4570 .lsp_action;
4571 }
4572
4573 if let Some(edit) = action.lsp_action.edit {
4574 if edit.changes.is_some() || edit.document_changes.is_some() {
4575 return Self::deserialize_workspace_edit(
4576 this,
4577 edit,
4578 push_to_history,
4579 lsp_adapter.clone(),
4580 lang_server.clone(),
4581 &mut cx,
4582 )
4583 .await;
4584 }
4585 }
4586
4587 if let Some(command) = action.lsp_action.command {
4588 this.update(&mut cx, |this, _| {
4589 this.last_workspace_edits_by_language_server
4590 .remove(&lang_server.server_id());
4591 });
4592
4593 let result = lang_server
4594 .request::<lsp::request::ExecuteCommand>(lsp::ExecuteCommandParams {
4595 command: command.command,
4596 arguments: command.arguments.unwrap_or_default(),
4597 ..Default::default()
4598 })
4599 .await;
4600
4601 if let Err(err) = result {
4602 // TODO: LSP ERROR
4603 return Err(err);
4604 }
4605
4606 return Ok(this.update(&mut cx, |this, _| {
4607 this.last_workspace_edits_by_language_server
4608 .remove(&lang_server.server_id())
4609 .unwrap_or_default()
4610 }));
4611 }
4612
4613 Ok(ProjectTransaction::default())
4614 })
4615 } else if let Some(project_id) = self.remote_id() {
4616 let client = self.client.clone();
4617 let request = proto::ApplyCodeAction {
4618 project_id,
4619 buffer_id: buffer_handle.read(cx).remote_id(),
4620 action: Some(language::proto::serialize_code_action(&action)),
4621 };
4622 cx.spawn(|this, mut cx| async move {
4623 let response = client
4624 .request(request)
4625 .await?
4626 .transaction
4627 .ok_or_else(|| anyhow!("missing transaction"))?;
4628 this.update(&mut cx, |this, cx| {
4629 this.deserialize_project_transaction(response, push_to_history, cx)
4630 })
4631 .await
4632 })
4633 } else {
4634 Task::ready(Err(anyhow!("project does not have a remote id")))
4635 }
4636 }
4637
4638 fn apply_on_type_formatting(
4639 &self,
4640 buffer: ModelHandle<Buffer>,
4641 position: Anchor,
4642 trigger: String,
4643 cx: &mut ModelContext<Self>,
4644 ) -> Task<Result<Option<Transaction>>> {
4645 if self.is_local() {
4646 cx.spawn(|this, mut cx| async move {
4647 // Do not allow multiple concurrent formatting requests for the
4648 // same buffer.
4649 this.update(&mut cx, |this, cx| {
4650 this.buffers_being_formatted
4651 .insert(buffer.read(cx).remote_id())
4652 });
4653
4654 let _cleanup = defer({
4655 let this = this.clone();
4656 let mut cx = cx.clone();
4657 let closure_buffer = buffer.clone();
4658 move || {
4659 this.update(&mut cx, |this, cx| {
4660 this.buffers_being_formatted
4661 .remove(&closure_buffer.read(cx).remote_id());
4662 });
4663 }
4664 });
4665
4666 buffer
4667 .update(&mut cx, |buffer, _| {
4668 buffer.wait_for_edits(Some(position.timestamp))
4669 })
4670 .await?;
4671 this.update(&mut cx, |this, cx| {
4672 let position = position.to_point_utf16(buffer.read(cx));
4673 this.on_type_format(buffer, position, trigger, false, cx)
4674 })
4675 .await
4676 })
4677 } else if let Some(project_id) = self.remote_id() {
4678 let client = self.client.clone();
4679 let request = proto::OnTypeFormatting {
4680 project_id,
4681 buffer_id: buffer.read(cx).remote_id(),
4682 position: Some(serialize_anchor(&position)),
4683 trigger,
4684 version: serialize_version(&buffer.read(cx).version()),
4685 };
4686 cx.spawn(|_, _| async move {
4687 client
4688 .request(request)
4689 .await?
4690 .transaction
4691 .map(language::proto::deserialize_transaction)
4692 .transpose()
4693 })
4694 } else {
4695 Task::ready(Err(anyhow!("project does not have a remote id")))
4696 }
4697 }
4698
4699 async fn deserialize_edits(
4700 this: ModelHandle<Self>,
4701 buffer_to_edit: ModelHandle<Buffer>,
4702 edits: Vec<lsp::TextEdit>,
4703 push_to_history: bool,
4704 _: Arc<CachedLspAdapter>,
4705 language_server: Arc<LanguageServer>,
4706 cx: &mut AsyncAppContext,
4707 ) -> Result<Option<Transaction>> {
4708 let edits = this
4709 .update(cx, |this, cx| {
4710 this.edits_from_lsp(
4711 &buffer_to_edit,
4712 edits,
4713 language_server.server_id(),
4714 None,
4715 cx,
4716 )
4717 })
4718 .await?;
4719
4720 let transaction = buffer_to_edit.update(cx, |buffer, cx| {
4721 buffer.finalize_last_transaction();
4722 buffer.start_transaction();
4723 for (range, text) in edits {
4724 buffer.edit([(range, text)], None, cx);
4725 }
4726
4727 if buffer.end_transaction(cx).is_some() {
4728 let transaction = buffer.finalize_last_transaction().unwrap().clone();
4729 if !push_to_history {
4730 buffer.forget_transaction(transaction.id);
4731 }
4732 Some(transaction)
4733 } else {
4734 None
4735 }
4736 });
4737
4738 Ok(transaction)
4739 }
4740
4741 async fn deserialize_workspace_edit(
4742 this: ModelHandle<Self>,
4743 edit: lsp::WorkspaceEdit,
4744 push_to_history: bool,
4745 lsp_adapter: Arc<CachedLspAdapter>,
4746 language_server: Arc<LanguageServer>,
4747 cx: &mut AsyncAppContext,
4748 ) -> Result<ProjectTransaction> {
4749 let fs = this.read_with(cx, |this, _| this.fs.clone());
4750 let mut operations = Vec::new();
4751 if let Some(document_changes) = edit.document_changes {
4752 match document_changes {
4753 lsp::DocumentChanges::Edits(edits) => {
4754 operations.extend(edits.into_iter().map(lsp::DocumentChangeOperation::Edit))
4755 }
4756 lsp::DocumentChanges::Operations(ops) => operations = ops,
4757 }
4758 } else if let Some(changes) = edit.changes {
4759 operations.extend(changes.into_iter().map(|(uri, edits)| {
4760 lsp::DocumentChangeOperation::Edit(lsp::TextDocumentEdit {
4761 text_document: lsp::OptionalVersionedTextDocumentIdentifier {
4762 uri,
4763 version: None,
4764 },
4765 edits: edits.into_iter().map(OneOf::Left).collect(),
4766 })
4767 }));
4768 }
4769
4770 let mut project_transaction = ProjectTransaction::default();
4771 for operation in operations {
4772 match operation {
4773 lsp::DocumentChangeOperation::Op(lsp::ResourceOp::Create(op)) => {
4774 let abs_path = op
4775 .uri
4776 .to_file_path()
4777 .map_err(|_| anyhow!("can't convert URI to path"))?;
4778
4779 if let Some(parent_path) = abs_path.parent() {
4780 fs.create_dir(parent_path).await?;
4781 }
4782 if abs_path.ends_with("/") {
4783 fs.create_dir(&abs_path).await?;
4784 } else {
4785 fs.create_file(&abs_path, op.options.map(Into::into).unwrap_or_default())
4786 .await?;
4787 }
4788 }
4789
4790 lsp::DocumentChangeOperation::Op(lsp::ResourceOp::Rename(op)) => {
4791 let source_abs_path = op
4792 .old_uri
4793 .to_file_path()
4794 .map_err(|_| anyhow!("can't convert URI to path"))?;
4795 let target_abs_path = op
4796 .new_uri
4797 .to_file_path()
4798 .map_err(|_| anyhow!("can't convert URI to path"))?;
4799 fs.rename(
4800 &source_abs_path,
4801 &target_abs_path,
4802 op.options.map(Into::into).unwrap_or_default(),
4803 )
4804 .await?;
4805 }
4806
4807 lsp::DocumentChangeOperation::Op(lsp::ResourceOp::Delete(op)) => {
4808 let abs_path = op
4809 .uri
4810 .to_file_path()
4811 .map_err(|_| anyhow!("can't convert URI to path"))?;
4812 let options = op.options.map(Into::into).unwrap_or_default();
4813 if abs_path.ends_with("/") {
4814 fs.remove_dir(&abs_path, options).await?;
4815 } else {
4816 fs.remove_file(&abs_path, options).await?;
4817 }
4818 }
4819
4820 lsp::DocumentChangeOperation::Edit(op) => {
4821 let buffer_to_edit = this
4822 .update(cx, |this, cx| {
4823 this.open_local_buffer_via_lsp(
4824 op.text_document.uri,
4825 language_server.server_id(),
4826 lsp_adapter.name.clone(),
4827 cx,
4828 )
4829 })
4830 .await?;
4831
4832 let edits = this
4833 .update(cx, |this, cx| {
4834 let edits = op.edits.into_iter().map(|edit| match edit {
4835 OneOf::Left(edit) => edit,
4836 OneOf::Right(edit) => edit.text_edit,
4837 });
4838 this.edits_from_lsp(
4839 &buffer_to_edit,
4840 edits,
4841 language_server.server_id(),
4842 op.text_document.version,
4843 cx,
4844 )
4845 })
4846 .await?;
4847
4848 let transaction = buffer_to_edit.update(cx, |buffer, cx| {
4849 buffer.finalize_last_transaction();
4850 buffer.start_transaction();
4851 for (range, text) in edits {
4852 buffer.edit([(range, text)], None, cx);
4853 }
4854 let transaction = if buffer.end_transaction(cx).is_some() {
4855 let transaction = buffer.finalize_last_transaction().unwrap().clone();
4856 if !push_to_history {
4857 buffer.forget_transaction(transaction.id);
4858 }
4859 Some(transaction)
4860 } else {
4861 None
4862 };
4863
4864 transaction
4865 });
4866 if let Some(transaction) = transaction {
4867 project_transaction.0.insert(buffer_to_edit, transaction);
4868 }
4869 }
4870 }
4871 }
4872
4873 Ok(project_transaction)
4874 }
4875
4876 pub fn prepare_rename<T: ToPointUtf16>(
4877 &self,
4878 buffer: ModelHandle<Buffer>,
4879 position: T,
4880 cx: &mut ModelContext<Self>,
4881 ) -> Task<Result<Option<Range<Anchor>>>> {
4882 let position = position.to_point_utf16(buffer.read(cx));
4883 self.request_lsp(buffer, PrepareRename { position }, cx)
4884 }
4885
4886 pub fn perform_rename<T: ToPointUtf16>(
4887 &self,
4888 buffer: ModelHandle<Buffer>,
4889 position: T,
4890 new_name: String,
4891 push_to_history: bool,
4892 cx: &mut ModelContext<Self>,
4893 ) -> Task<Result<ProjectTransaction>> {
4894 let position = position.to_point_utf16(buffer.read(cx));
4895 self.request_lsp(
4896 buffer,
4897 PerformRename {
4898 position,
4899 new_name,
4900 push_to_history,
4901 },
4902 cx,
4903 )
4904 }
4905
4906 pub fn on_type_format<T: ToPointUtf16>(
4907 &self,
4908 buffer: ModelHandle<Buffer>,
4909 position: T,
4910 trigger: String,
4911 push_to_history: bool,
4912 cx: &mut ModelContext<Self>,
4913 ) -> Task<Result<Option<Transaction>>> {
4914 let (position, tab_size) = buffer.read_with(cx, |buffer, cx| {
4915 let position = position.to_point_utf16(buffer);
4916 (
4917 position,
4918 language_settings(buffer.language_at(position).as_ref(), buffer.file(), cx)
4919 .tab_size,
4920 )
4921 });
4922 self.request_lsp(
4923 buffer.clone(),
4924 OnTypeFormatting {
4925 position,
4926 trigger,
4927 options: lsp_command::lsp_formatting_options(tab_size.get()).into(),
4928 push_to_history,
4929 },
4930 cx,
4931 )
4932 }
4933
4934 pub fn inlay_hints<T: ToOffset>(
4935 &self,
4936 buffer_handle: ModelHandle<Buffer>,
4937 range: Range<T>,
4938 cx: &mut ModelContext<Self>,
4939 ) -> Task<Result<Vec<InlayHint>>> {
4940 let buffer = buffer_handle.read(cx);
4941 let range = buffer.anchor_before(range.start)..buffer.anchor_before(range.end);
4942 let range_start = range.start;
4943 let range_end = range.end;
4944 let buffer_id = buffer.remote_id();
4945 let buffer_version = buffer.version().clone();
4946 let lsp_request = InlayHints { range };
4947
4948 if self.is_local() {
4949 let lsp_request_task = self.request_lsp(buffer_handle.clone(), lsp_request, cx);
4950 cx.spawn(|_, mut cx| async move {
4951 buffer_handle
4952 .update(&mut cx, |buffer, _| {
4953 buffer.wait_for_edits(vec![range_start.timestamp, range_end.timestamp])
4954 })
4955 .await
4956 .context("waiting for inlay hint request range edits")?;
4957 lsp_request_task.await.context("inlay hints LSP request")
4958 })
4959 } else if let Some(project_id) = self.remote_id() {
4960 let client = self.client.clone();
4961 let request = proto::InlayHints {
4962 project_id,
4963 buffer_id,
4964 start: Some(serialize_anchor(&range_start)),
4965 end: Some(serialize_anchor(&range_end)),
4966 version: serialize_version(&buffer_version),
4967 };
4968 cx.spawn(|project, cx| async move {
4969 let response = client
4970 .request(request)
4971 .await
4972 .context("inlay hints proto request")?;
4973 let hints_request_result = LspCommand::response_from_proto(
4974 lsp_request,
4975 response,
4976 project,
4977 buffer_handle.clone(),
4978 cx,
4979 )
4980 .await;
4981
4982 hints_request_result.context("inlay hints proto response conversion")
4983 })
4984 } else {
4985 Task::ready(Err(anyhow!("project does not have a remote id")))
4986 }
4987 }
4988
4989 #[allow(clippy::type_complexity)]
4990 pub fn search(
4991 &self,
4992 query: SearchQuery,
4993 cx: &mut ModelContext<Self>,
4994 ) -> Task<Result<HashMap<ModelHandle<Buffer>, Vec<Range<Anchor>>>>> {
4995 if self.is_local() {
4996 let snapshots = self
4997 .visible_worktrees(cx)
4998 .filter_map(|tree| {
4999 let tree = tree.read(cx).as_local()?;
5000 Some(tree.snapshot())
5001 })
5002 .collect::<Vec<_>>();
5003
5004 let background = cx.background().clone();
5005 let path_count: usize = snapshots.iter().map(|s| s.visible_file_count()).sum();
5006 if path_count == 0 {
5007 return Task::ready(Ok(Default::default()));
5008 }
5009 let workers = background.num_cpus().min(path_count);
5010 let (matching_paths_tx, mut matching_paths_rx) = smol::channel::bounded(1024);
5011 cx.background()
5012 .spawn({
5013 let fs = self.fs.clone();
5014 let background = cx.background().clone();
5015 let query = query.clone();
5016 async move {
5017 let fs = &fs;
5018 let query = &query;
5019 let matching_paths_tx = &matching_paths_tx;
5020 let paths_per_worker = (path_count + workers - 1) / workers;
5021 let snapshots = &snapshots;
5022 background
5023 .scoped(|scope| {
5024 for worker_ix in 0..workers {
5025 let worker_start_ix = worker_ix * paths_per_worker;
5026 let worker_end_ix = worker_start_ix + paths_per_worker;
5027 scope.spawn(async move {
5028 let mut snapshot_start_ix = 0;
5029 let mut abs_path = PathBuf::new();
5030 for snapshot in snapshots {
5031 let snapshot_end_ix =
5032 snapshot_start_ix + snapshot.visible_file_count();
5033 if worker_end_ix <= snapshot_start_ix {
5034 break;
5035 } else if worker_start_ix > snapshot_end_ix {
5036 snapshot_start_ix = snapshot_end_ix;
5037 continue;
5038 } else {
5039 let start_in_snapshot = worker_start_ix
5040 .saturating_sub(snapshot_start_ix);
5041 let end_in_snapshot =
5042 cmp::min(worker_end_ix, snapshot_end_ix)
5043 - snapshot_start_ix;
5044
5045 for entry in snapshot
5046 .files(false, start_in_snapshot)
5047 .take(end_in_snapshot - start_in_snapshot)
5048 {
5049 if matching_paths_tx.is_closed() {
5050 break;
5051 }
5052 let matches = if query
5053 .file_matches(Some(&entry.path))
5054 {
5055 abs_path.clear();
5056 abs_path.push(&snapshot.abs_path());
5057 abs_path.push(&entry.path);
5058 if let Some(file) =
5059 fs.open_sync(&abs_path).await.log_err()
5060 {
5061 query.detect(file).unwrap_or(false)
5062 } else {
5063 false
5064 }
5065 } else {
5066 false
5067 };
5068
5069 if matches {
5070 let project_path =
5071 (snapshot.id(), entry.path.clone());
5072 if matching_paths_tx
5073 .send(project_path)
5074 .await
5075 .is_err()
5076 {
5077 break;
5078 }
5079 }
5080 }
5081
5082 snapshot_start_ix = snapshot_end_ix;
5083 }
5084 }
5085 });
5086 }
5087 })
5088 .await;
5089 }
5090 })
5091 .detach();
5092
5093 let (buffers_tx, buffers_rx) = smol::channel::bounded(1024);
5094 let open_buffers = self
5095 .opened_buffers
5096 .values()
5097 .filter_map(|b| b.upgrade(cx))
5098 .collect::<HashSet<_>>();
5099 cx.spawn(|this, cx| async move {
5100 for buffer in &open_buffers {
5101 let snapshot = buffer.read_with(&cx, |buffer, _| buffer.snapshot());
5102 buffers_tx.send((buffer.clone(), snapshot)).await?;
5103 }
5104
5105 let open_buffers = Rc::new(RefCell::new(open_buffers));
5106 while let Some(project_path) = matching_paths_rx.next().await {
5107 if buffers_tx.is_closed() {
5108 break;
5109 }
5110
5111 let this = this.clone();
5112 let open_buffers = open_buffers.clone();
5113 let buffers_tx = buffers_tx.clone();
5114 cx.spawn(|mut cx| async move {
5115 if let Some(buffer) = this
5116 .update(&mut cx, |this, cx| this.open_buffer(project_path, cx))
5117 .await
5118 .log_err()
5119 {
5120 if open_buffers.borrow_mut().insert(buffer.clone()) {
5121 let snapshot = buffer.read_with(&cx, |buffer, _| buffer.snapshot());
5122 buffers_tx.send((buffer, snapshot)).await?;
5123 }
5124 }
5125
5126 Ok::<_, anyhow::Error>(())
5127 })
5128 .detach();
5129 }
5130
5131 Ok::<_, anyhow::Error>(())
5132 })
5133 .detach_and_log_err(cx);
5134
5135 let background = cx.background().clone();
5136 cx.background().spawn(async move {
5137 let query = &query;
5138 let mut matched_buffers = Vec::new();
5139 for _ in 0..workers {
5140 matched_buffers.push(HashMap::default());
5141 }
5142 background
5143 .scoped(|scope| {
5144 for worker_matched_buffers in matched_buffers.iter_mut() {
5145 let mut buffers_rx = buffers_rx.clone();
5146 scope.spawn(async move {
5147 while let Some((buffer, snapshot)) = buffers_rx.next().await {
5148 let buffer_matches = if query.file_matches(
5149 snapshot.file().map(|file| file.path().as_ref()),
5150 ) {
5151 query
5152 .search(snapshot.as_rope())
5153 .await
5154 .iter()
5155 .map(|range| {
5156 snapshot.anchor_before(range.start)
5157 ..snapshot.anchor_after(range.end)
5158 })
5159 .collect()
5160 } else {
5161 Vec::new()
5162 };
5163 if !buffer_matches.is_empty() {
5164 worker_matched_buffers
5165 .insert(buffer.clone(), buffer_matches);
5166 }
5167 }
5168 });
5169 }
5170 })
5171 .await;
5172 Ok(matched_buffers.into_iter().flatten().collect())
5173 })
5174 } else if let Some(project_id) = self.remote_id() {
5175 let request = self.client.request(query.to_proto(project_id));
5176 cx.spawn(|this, mut cx| async move {
5177 let response = request.await?;
5178 let mut result = HashMap::default();
5179 for location in response.locations {
5180 let target_buffer = this
5181 .update(&mut cx, |this, cx| {
5182 this.wait_for_remote_buffer(location.buffer_id, cx)
5183 })
5184 .await?;
5185 let start = location
5186 .start
5187 .and_then(deserialize_anchor)
5188 .ok_or_else(|| anyhow!("missing target start"))?;
5189 let end = location
5190 .end
5191 .and_then(deserialize_anchor)
5192 .ok_or_else(|| anyhow!("missing target end"))?;
5193 result
5194 .entry(target_buffer)
5195 .or_insert(Vec::new())
5196 .push(start..end)
5197 }
5198 Ok(result)
5199 })
5200 } else {
5201 Task::ready(Ok(Default::default()))
5202 }
5203 }
5204
5205 // TODO: Wire this up to allow selecting a server?
5206 fn request_lsp<R: LspCommand>(
5207 &self,
5208 buffer_handle: ModelHandle<Buffer>,
5209 request: R,
5210 cx: &mut ModelContext<Self>,
5211 ) -> Task<Result<R::Response>>
5212 where
5213 <R::LspRequest as lsp::request::Request>::Result: Send,
5214 {
5215 let buffer = buffer_handle.read(cx);
5216 if self.is_local() {
5217 let file = File::from_dyn(buffer.file()).and_then(File::as_local);
5218 if let Some((file, language_server)) = file.zip(
5219 self.primary_language_servers_for_buffer(buffer, cx)
5220 .map(|(_, server)| server.clone()),
5221 ) {
5222 let lsp_params = request.to_lsp(&file.abs_path(cx), buffer, &language_server, cx);
5223 return cx.spawn(|this, cx| async move {
5224 if !request.check_capabilities(language_server.capabilities()) {
5225 return Ok(Default::default());
5226 }
5227
5228 let result = language_server.request::<R::LspRequest>(lsp_params).await;
5229 let response = match result {
5230 Ok(response) => response,
5231
5232 Err(err) => {
5233 log::warn!(
5234 "Generic lsp request to {} failed: {}",
5235 language_server.name(),
5236 err
5237 );
5238 return Err(err);
5239 }
5240 };
5241
5242 request
5243 .response_from_lsp(
5244 response,
5245 this,
5246 buffer_handle,
5247 language_server.server_id(),
5248 cx,
5249 )
5250 .await
5251 });
5252 }
5253 } else if let Some(project_id) = self.remote_id() {
5254 let rpc = self.client.clone();
5255 let message = request.to_proto(project_id, buffer);
5256 return cx.spawn_weak(|this, cx| async move {
5257 // Ensure the project is still alive by the time the task
5258 // is scheduled.
5259 this.upgrade(&cx)
5260 .ok_or_else(|| anyhow!("project dropped"))?;
5261
5262 let response = rpc.request(message).await?;
5263
5264 let this = this
5265 .upgrade(&cx)
5266 .ok_or_else(|| anyhow!("project dropped"))?;
5267 if this.read_with(&cx, |this, _| this.is_read_only()) {
5268 Err(anyhow!("disconnected before completing request"))
5269 } else {
5270 request
5271 .response_from_proto(response, this, buffer_handle, cx)
5272 .await
5273 }
5274 });
5275 }
5276 Task::ready(Ok(Default::default()))
5277 }
5278
5279 pub fn find_or_create_local_worktree(
5280 &mut self,
5281 abs_path: impl AsRef<Path>,
5282 visible: bool,
5283 cx: &mut ModelContext<Self>,
5284 ) -> Task<Result<(ModelHandle<Worktree>, PathBuf)>> {
5285 let abs_path = abs_path.as_ref();
5286 if let Some((tree, relative_path)) = self.find_local_worktree(abs_path, cx) {
5287 Task::ready(Ok((tree, relative_path)))
5288 } else {
5289 let worktree = self.create_local_worktree(abs_path, visible, cx);
5290 cx.foreground()
5291 .spawn(async move { Ok((worktree.await?, PathBuf::new())) })
5292 }
5293 }
5294
5295 pub fn find_local_worktree(
5296 &self,
5297 abs_path: &Path,
5298 cx: &AppContext,
5299 ) -> Option<(ModelHandle<Worktree>, PathBuf)> {
5300 for tree in &self.worktrees {
5301 if let Some(tree) = tree.upgrade(cx) {
5302 if let Some(relative_path) = tree
5303 .read(cx)
5304 .as_local()
5305 .and_then(|t| abs_path.strip_prefix(t.abs_path()).ok())
5306 {
5307 return Some((tree.clone(), relative_path.into()));
5308 }
5309 }
5310 }
5311 None
5312 }
5313
5314 pub fn is_shared(&self) -> bool {
5315 match &self.client_state {
5316 Some(ProjectClientState::Local { .. }) => true,
5317 _ => false,
5318 }
5319 }
5320
5321 fn create_local_worktree(
5322 &mut self,
5323 abs_path: impl AsRef<Path>,
5324 visible: bool,
5325 cx: &mut ModelContext<Self>,
5326 ) -> Task<Result<ModelHandle<Worktree>>> {
5327 let fs = self.fs.clone();
5328 let client = self.client.clone();
5329 let next_entry_id = self.next_entry_id.clone();
5330 let path: Arc<Path> = abs_path.as_ref().into();
5331 let task = self
5332 .loading_local_worktrees
5333 .entry(path.clone())
5334 .or_insert_with(|| {
5335 cx.spawn(|project, mut cx| {
5336 async move {
5337 let worktree = Worktree::local(
5338 client.clone(),
5339 path.clone(),
5340 visible,
5341 fs,
5342 next_entry_id,
5343 &mut cx,
5344 )
5345 .await;
5346
5347 project.update(&mut cx, |project, _| {
5348 project.loading_local_worktrees.remove(&path);
5349 });
5350
5351 let worktree = worktree?;
5352 project.update(&mut cx, |project, cx| project.add_worktree(&worktree, cx));
5353 Ok(worktree)
5354 }
5355 .map_err(Arc::new)
5356 })
5357 .shared()
5358 })
5359 .clone();
5360 cx.foreground().spawn(async move {
5361 match task.await {
5362 Ok(worktree) => Ok(worktree),
5363 Err(err) => Err(anyhow!("{}", err)),
5364 }
5365 })
5366 }
5367
5368 pub fn remove_worktree(&mut self, id_to_remove: WorktreeId, cx: &mut ModelContext<Self>) {
5369 self.worktrees.retain(|worktree| {
5370 if let Some(worktree) = worktree.upgrade(cx) {
5371 let id = worktree.read(cx).id();
5372 if id == id_to_remove {
5373 cx.emit(Event::WorktreeRemoved(id));
5374 false
5375 } else {
5376 true
5377 }
5378 } else {
5379 false
5380 }
5381 });
5382 self.metadata_changed(cx);
5383 }
5384
5385 fn add_worktree(&mut self, worktree: &ModelHandle<Worktree>, cx: &mut ModelContext<Self>) {
5386 cx.observe(worktree, |_, _, cx| cx.notify()).detach();
5387 if worktree.read(cx).is_local() {
5388 cx.subscribe(worktree, |this, worktree, event, cx| match event {
5389 worktree::Event::UpdatedEntries(changes) => {
5390 this.update_local_worktree_buffers(&worktree, changes, cx);
5391 this.update_local_worktree_language_servers(&worktree, changes, cx);
5392 this.update_local_worktree_settings(&worktree, changes, cx);
5393 cx.emit(Event::WorktreeUpdatedEntries(
5394 worktree.read(cx).id(),
5395 changes.clone(),
5396 ));
5397 }
5398 worktree::Event::UpdatedGitRepositories(updated_repos) => {
5399 this.update_local_worktree_buffers_git_repos(worktree, updated_repos, cx)
5400 }
5401 })
5402 .detach();
5403 }
5404
5405 let push_strong_handle = {
5406 let worktree = worktree.read(cx);
5407 self.is_shared() || worktree.is_visible() || worktree.is_remote()
5408 };
5409 if push_strong_handle {
5410 self.worktrees
5411 .push(WorktreeHandle::Strong(worktree.clone()));
5412 } else {
5413 self.worktrees
5414 .push(WorktreeHandle::Weak(worktree.downgrade()));
5415 }
5416
5417 let handle_id = worktree.id();
5418 cx.observe_release(worktree, move |this, worktree, cx| {
5419 let _ = this.remove_worktree(worktree.id(), cx);
5420 cx.update_global::<SettingsStore, _, _>(|store, cx| {
5421 store.clear_local_settings(handle_id, cx).log_err()
5422 });
5423 })
5424 .detach();
5425
5426 cx.emit(Event::WorktreeAdded);
5427 self.metadata_changed(cx);
5428 }
5429
5430 fn update_local_worktree_buffers(
5431 &mut self,
5432 worktree_handle: &ModelHandle<Worktree>,
5433 changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
5434 cx: &mut ModelContext<Self>,
5435 ) {
5436 let snapshot = worktree_handle.read(cx).snapshot();
5437
5438 let mut renamed_buffers = Vec::new();
5439 for (path, entry_id, _) in changes {
5440 let worktree_id = worktree_handle.read(cx).id();
5441 let project_path = ProjectPath {
5442 worktree_id,
5443 path: path.clone(),
5444 };
5445
5446 let buffer_id = match self.local_buffer_ids_by_entry_id.get(entry_id) {
5447 Some(&buffer_id) => buffer_id,
5448 None => match self.local_buffer_ids_by_path.get(&project_path) {
5449 Some(&buffer_id) => buffer_id,
5450 None => continue,
5451 },
5452 };
5453
5454 let open_buffer = self.opened_buffers.get(&buffer_id);
5455 let buffer = if let Some(buffer) = open_buffer.and_then(|buffer| buffer.upgrade(cx)) {
5456 buffer
5457 } else {
5458 self.opened_buffers.remove(&buffer_id);
5459 self.local_buffer_ids_by_path.remove(&project_path);
5460 self.local_buffer_ids_by_entry_id.remove(entry_id);
5461 continue;
5462 };
5463
5464 buffer.update(cx, |buffer, cx| {
5465 if let Some(old_file) = File::from_dyn(buffer.file()) {
5466 if old_file.worktree != *worktree_handle {
5467 return;
5468 }
5469
5470 let new_file = if let Some(entry) = snapshot.entry_for_id(old_file.entry_id) {
5471 File {
5472 is_local: true,
5473 entry_id: entry.id,
5474 mtime: entry.mtime,
5475 path: entry.path.clone(),
5476 worktree: worktree_handle.clone(),
5477 is_deleted: false,
5478 }
5479 } else if let Some(entry) = snapshot.entry_for_path(old_file.path().as_ref()) {
5480 File {
5481 is_local: true,
5482 entry_id: entry.id,
5483 mtime: entry.mtime,
5484 path: entry.path.clone(),
5485 worktree: worktree_handle.clone(),
5486 is_deleted: false,
5487 }
5488 } else {
5489 File {
5490 is_local: true,
5491 entry_id: old_file.entry_id,
5492 path: old_file.path().clone(),
5493 mtime: old_file.mtime(),
5494 worktree: worktree_handle.clone(),
5495 is_deleted: true,
5496 }
5497 };
5498
5499 let old_path = old_file.abs_path(cx);
5500 if new_file.abs_path(cx) != old_path {
5501 renamed_buffers.push((cx.handle(), old_file.clone()));
5502 self.local_buffer_ids_by_path.remove(&project_path);
5503 self.local_buffer_ids_by_path.insert(
5504 ProjectPath {
5505 worktree_id,
5506 path: path.clone(),
5507 },
5508 buffer_id,
5509 );
5510 }
5511
5512 if new_file.entry_id != *entry_id {
5513 self.local_buffer_ids_by_entry_id.remove(entry_id);
5514 self.local_buffer_ids_by_entry_id
5515 .insert(new_file.entry_id, buffer_id);
5516 }
5517
5518 if new_file != *old_file {
5519 if let Some(project_id) = self.remote_id() {
5520 self.client
5521 .send(proto::UpdateBufferFile {
5522 project_id,
5523 buffer_id: buffer_id as u64,
5524 file: Some(new_file.to_proto()),
5525 })
5526 .log_err();
5527 }
5528
5529 buffer.file_updated(Arc::new(new_file), cx).detach();
5530 }
5531 }
5532 });
5533 }
5534
5535 for (buffer, old_file) in renamed_buffers {
5536 self.unregister_buffer_from_language_servers(&buffer, &old_file, cx);
5537 self.detect_language_for_buffer(&buffer, cx);
5538 self.register_buffer_with_language_servers(&buffer, cx);
5539 }
5540 }
5541
5542 fn update_local_worktree_language_servers(
5543 &mut self,
5544 worktree_handle: &ModelHandle<Worktree>,
5545 changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
5546 cx: &mut ModelContext<Self>,
5547 ) {
5548 if changes.is_empty() {
5549 return;
5550 }
5551
5552 let worktree_id = worktree_handle.read(cx).id();
5553 let mut language_server_ids = self
5554 .language_server_ids
5555 .iter()
5556 .filter_map(|((server_worktree_id, _), server_id)| {
5557 (*server_worktree_id == worktree_id).then_some(*server_id)
5558 })
5559 .collect::<Vec<_>>();
5560 language_server_ids.sort();
5561 language_server_ids.dedup();
5562
5563 let abs_path = worktree_handle.read(cx).abs_path();
5564 for server_id in &language_server_ids {
5565 if let Some(LanguageServerState::Running {
5566 server,
5567 watched_paths,
5568 ..
5569 }) = self.language_servers.get(server_id)
5570 {
5571 if let Some(watched_paths) = watched_paths.get(&worktree_id) {
5572 let params = lsp::DidChangeWatchedFilesParams {
5573 changes: changes
5574 .iter()
5575 .filter_map(|(path, _, change)| {
5576 if !watched_paths.is_match(&path) {
5577 return None;
5578 }
5579 let typ = match change {
5580 PathChange::Loaded => return None,
5581 PathChange::Added => lsp::FileChangeType::CREATED,
5582 PathChange::Removed => lsp::FileChangeType::DELETED,
5583 PathChange::Updated => lsp::FileChangeType::CHANGED,
5584 PathChange::AddedOrUpdated => lsp::FileChangeType::CHANGED,
5585 };
5586 Some(lsp::FileEvent {
5587 uri: lsp::Url::from_file_path(abs_path.join(path)).unwrap(),
5588 typ,
5589 })
5590 })
5591 .collect(),
5592 };
5593
5594 if !params.changes.is_empty() {
5595 server
5596 .notify::<lsp::notification::DidChangeWatchedFiles>(params)
5597 .log_err();
5598 }
5599 }
5600 }
5601 }
5602 }
5603
5604 fn update_local_worktree_buffers_git_repos(
5605 &mut self,
5606 worktree_handle: ModelHandle<Worktree>,
5607 changed_repos: &UpdatedGitRepositoriesSet,
5608 cx: &mut ModelContext<Self>,
5609 ) {
5610 debug_assert!(worktree_handle.read(cx).is_local());
5611
5612 // Identify the loading buffers whose containing repository that has changed.
5613 let future_buffers = self
5614 .loading_buffers_by_path
5615 .iter()
5616 .filter_map(|(project_path, receiver)| {
5617 if project_path.worktree_id != worktree_handle.read(cx).id() {
5618 return None;
5619 }
5620 let path = &project_path.path;
5621 changed_repos
5622 .iter()
5623 .find(|(work_dir, _)| path.starts_with(work_dir))?;
5624 let receiver = receiver.clone();
5625 let path = path.clone();
5626 Some(async move {
5627 wait_for_loading_buffer(receiver)
5628 .await
5629 .ok()
5630 .map(|buffer| (buffer, path))
5631 })
5632 })
5633 .collect::<FuturesUnordered<_>>();
5634
5635 // Identify the current buffers whose containing repository has changed.
5636 let current_buffers = self
5637 .opened_buffers
5638 .values()
5639 .filter_map(|buffer| {
5640 let buffer = buffer.upgrade(cx)?;
5641 let file = File::from_dyn(buffer.read(cx).file())?;
5642 if file.worktree != worktree_handle {
5643 return None;
5644 }
5645 let path = file.path();
5646 changed_repos
5647 .iter()
5648 .find(|(work_dir, _)| path.starts_with(work_dir))?;
5649 Some((buffer, path.clone()))
5650 })
5651 .collect::<Vec<_>>();
5652
5653 if future_buffers.len() + current_buffers.len() == 0 {
5654 return;
5655 }
5656
5657 let remote_id = self.remote_id();
5658 let client = self.client.clone();
5659 cx.spawn_weak(move |_, mut cx| async move {
5660 // Wait for all of the buffers to load.
5661 let future_buffers = future_buffers.collect::<Vec<_>>().await;
5662
5663 // Reload the diff base for every buffer whose containing git repository has changed.
5664 let snapshot =
5665 worktree_handle.read_with(&cx, |tree, _| tree.as_local().unwrap().snapshot());
5666 let diff_bases_by_buffer = cx
5667 .background()
5668 .spawn(async move {
5669 future_buffers
5670 .into_iter()
5671 .filter_map(|e| e)
5672 .chain(current_buffers)
5673 .filter_map(|(buffer, path)| {
5674 let (work_directory, repo) =
5675 snapshot.repository_and_work_directory_for_path(&path)?;
5676 let repo = snapshot.get_local_repo(&repo)?;
5677 let relative_path = path.strip_prefix(&work_directory).ok()?;
5678 let base_text = repo.repo_ptr.lock().load_index_text(&relative_path);
5679 Some((buffer, base_text))
5680 })
5681 .collect::<Vec<_>>()
5682 })
5683 .await;
5684
5685 // Assign the new diff bases on all of the buffers.
5686 for (buffer, diff_base) in diff_bases_by_buffer {
5687 let buffer_id = buffer.update(&mut cx, |buffer, cx| {
5688 buffer.set_diff_base(diff_base.clone(), cx);
5689 buffer.remote_id()
5690 });
5691 if let Some(project_id) = remote_id {
5692 client
5693 .send(proto::UpdateDiffBase {
5694 project_id,
5695 buffer_id,
5696 diff_base,
5697 })
5698 .log_err();
5699 }
5700 }
5701 })
5702 .detach();
5703 }
5704
5705 fn update_local_worktree_settings(
5706 &mut self,
5707 worktree: &ModelHandle<Worktree>,
5708 changes: &UpdatedEntriesSet,
5709 cx: &mut ModelContext<Self>,
5710 ) {
5711 let project_id = self.remote_id();
5712 let worktree_id = worktree.id();
5713 let worktree = worktree.read(cx).as_local().unwrap();
5714 let remote_worktree_id = worktree.id();
5715
5716 let mut settings_contents = Vec::new();
5717 for (path, _, change) in changes.iter() {
5718 if path.ends_with(&*LOCAL_SETTINGS_RELATIVE_PATH) {
5719 let settings_dir = Arc::from(
5720 path.ancestors()
5721 .nth(LOCAL_SETTINGS_RELATIVE_PATH.components().count())
5722 .unwrap(),
5723 );
5724 let fs = self.fs.clone();
5725 let removed = *change == PathChange::Removed;
5726 let abs_path = worktree.absolutize(path);
5727 settings_contents.push(async move {
5728 (settings_dir, (!removed).then_some(fs.load(&abs_path).await))
5729 });
5730 }
5731 }
5732
5733 if settings_contents.is_empty() {
5734 return;
5735 }
5736
5737 let client = self.client.clone();
5738 cx.spawn_weak(move |_, mut cx| async move {
5739 let settings_contents: Vec<(Arc<Path>, _)> =
5740 futures::future::join_all(settings_contents).await;
5741 cx.update(|cx| {
5742 cx.update_global::<SettingsStore, _, _>(|store, cx| {
5743 for (directory, file_content) in settings_contents {
5744 let file_content = file_content.and_then(|content| content.log_err());
5745 store
5746 .set_local_settings(
5747 worktree_id,
5748 directory.clone(),
5749 file_content.as_ref().map(String::as_str),
5750 cx,
5751 )
5752 .log_err();
5753 if let Some(remote_id) = project_id {
5754 client
5755 .send(proto::UpdateWorktreeSettings {
5756 project_id: remote_id,
5757 worktree_id: remote_worktree_id.to_proto(),
5758 path: directory.to_string_lossy().into_owned(),
5759 content: file_content,
5760 })
5761 .log_err();
5762 }
5763 }
5764 });
5765 });
5766 })
5767 .detach();
5768 }
5769
5770 pub fn set_active_path(&mut self, entry: Option<ProjectPath>, cx: &mut ModelContext<Self>) {
5771 let new_active_entry = entry.and_then(|project_path| {
5772 let worktree = self.worktree_for_id(project_path.worktree_id, cx)?;
5773 let entry = worktree.read(cx).entry_for_path(project_path.path)?;
5774 Some(entry.id)
5775 });
5776 if new_active_entry != self.active_entry {
5777 self.active_entry = new_active_entry;
5778 cx.emit(Event::ActiveEntryChanged(new_active_entry));
5779 }
5780 }
5781
5782 pub fn language_servers_running_disk_based_diagnostics(
5783 &self,
5784 ) -> impl Iterator<Item = LanguageServerId> + '_ {
5785 self.language_server_statuses
5786 .iter()
5787 .filter_map(|(id, status)| {
5788 if status.has_pending_diagnostic_updates {
5789 Some(*id)
5790 } else {
5791 None
5792 }
5793 })
5794 }
5795
5796 pub fn diagnostic_summary(&self, cx: &AppContext) -> DiagnosticSummary {
5797 let mut summary = DiagnosticSummary::default();
5798 for (_, _, path_summary) in self.diagnostic_summaries(cx) {
5799 summary.error_count += path_summary.error_count;
5800 summary.warning_count += path_summary.warning_count;
5801 }
5802 summary
5803 }
5804
5805 pub fn diagnostic_summaries<'a>(
5806 &'a self,
5807 cx: &'a AppContext,
5808 ) -> impl Iterator<Item = (ProjectPath, LanguageServerId, DiagnosticSummary)> + 'a {
5809 self.visible_worktrees(cx).flat_map(move |worktree| {
5810 let worktree = worktree.read(cx);
5811 let worktree_id = worktree.id();
5812 worktree
5813 .diagnostic_summaries()
5814 .map(move |(path, server_id, summary)| {
5815 (ProjectPath { worktree_id, path }, server_id, summary)
5816 })
5817 })
5818 }
5819
5820 pub fn disk_based_diagnostics_started(
5821 &mut self,
5822 language_server_id: LanguageServerId,
5823 cx: &mut ModelContext<Self>,
5824 ) {
5825 cx.emit(Event::DiskBasedDiagnosticsStarted { language_server_id });
5826 }
5827
5828 pub fn disk_based_diagnostics_finished(
5829 &mut self,
5830 language_server_id: LanguageServerId,
5831 cx: &mut ModelContext<Self>,
5832 ) {
5833 cx.emit(Event::DiskBasedDiagnosticsFinished { language_server_id });
5834 }
5835
5836 pub fn active_entry(&self) -> Option<ProjectEntryId> {
5837 self.active_entry
5838 }
5839
5840 pub fn entry_for_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Entry> {
5841 self.worktree_for_id(path.worktree_id, cx)?
5842 .read(cx)
5843 .entry_for_path(&path.path)
5844 .cloned()
5845 }
5846
5847 pub fn path_for_entry(&self, entry_id: ProjectEntryId, cx: &AppContext) -> Option<ProjectPath> {
5848 let worktree = self.worktree_for_entry(entry_id, cx)?;
5849 let worktree = worktree.read(cx);
5850 let worktree_id = worktree.id();
5851 let path = worktree.entry_for_id(entry_id)?.path.clone();
5852 Some(ProjectPath { worktree_id, path })
5853 }
5854
5855 pub fn absolute_path(&self, project_path: &ProjectPath, cx: &AppContext) -> Option<PathBuf> {
5856 let workspace_root = self
5857 .worktree_for_id(project_path.worktree_id, cx)?
5858 .read(cx)
5859 .abs_path();
5860 let project_path = project_path.path.as_ref();
5861
5862 Some(if project_path == Path::new("") {
5863 workspace_root.to_path_buf()
5864 } else {
5865 workspace_root.join(project_path)
5866 })
5867 }
5868
5869 // RPC message handlers
5870
5871 async fn handle_unshare_project(
5872 this: ModelHandle<Self>,
5873 _: TypedEnvelope<proto::UnshareProject>,
5874 _: Arc<Client>,
5875 mut cx: AsyncAppContext,
5876 ) -> Result<()> {
5877 this.update(&mut cx, |this, cx| {
5878 if this.is_local() {
5879 this.unshare(cx)?;
5880 } else {
5881 this.disconnected_from_host(cx);
5882 }
5883 Ok(())
5884 })
5885 }
5886
5887 async fn handle_add_collaborator(
5888 this: ModelHandle<Self>,
5889 mut envelope: TypedEnvelope<proto::AddProjectCollaborator>,
5890 _: Arc<Client>,
5891 mut cx: AsyncAppContext,
5892 ) -> Result<()> {
5893 let collaborator = envelope
5894 .payload
5895 .collaborator
5896 .take()
5897 .ok_or_else(|| anyhow!("empty collaborator"))?;
5898
5899 let collaborator = Collaborator::from_proto(collaborator)?;
5900 this.update(&mut cx, |this, cx| {
5901 this.shared_buffers.remove(&collaborator.peer_id);
5902 this.collaborators
5903 .insert(collaborator.peer_id, collaborator);
5904 cx.notify();
5905 });
5906
5907 Ok(())
5908 }
5909
5910 async fn handle_update_project_collaborator(
5911 this: ModelHandle<Self>,
5912 envelope: TypedEnvelope<proto::UpdateProjectCollaborator>,
5913 _: Arc<Client>,
5914 mut cx: AsyncAppContext,
5915 ) -> Result<()> {
5916 let old_peer_id = envelope
5917 .payload
5918 .old_peer_id
5919 .ok_or_else(|| anyhow!("missing old peer id"))?;
5920 let new_peer_id = envelope
5921 .payload
5922 .new_peer_id
5923 .ok_or_else(|| anyhow!("missing new peer id"))?;
5924 this.update(&mut cx, |this, cx| {
5925 let collaborator = this
5926 .collaborators
5927 .remove(&old_peer_id)
5928 .ok_or_else(|| anyhow!("received UpdateProjectCollaborator for unknown peer"))?;
5929 let is_host = collaborator.replica_id == 0;
5930 this.collaborators.insert(new_peer_id, collaborator);
5931
5932 let buffers = this.shared_buffers.remove(&old_peer_id);
5933 log::info!(
5934 "peer {} became {}. moving buffers {:?}",
5935 old_peer_id,
5936 new_peer_id,
5937 &buffers
5938 );
5939 if let Some(buffers) = buffers {
5940 this.shared_buffers.insert(new_peer_id, buffers);
5941 }
5942
5943 if is_host {
5944 this.opened_buffers
5945 .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
5946 this.buffer_ordered_messages_tx
5947 .unbounded_send(BufferOrderedMessage::Resync)
5948 .unwrap();
5949 }
5950
5951 cx.emit(Event::CollaboratorUpdated {
5952 old_peer_id,
5953 new_peer_id,
5954 });
5955 cx.notify();
5956 Ok(())
5957 })
5958 }
5959
5960 async fn handle_remove_collaborator(
5961 this: ModelHandle<Self>,
5962 envelope: TypedEnvelope<proto::RemoveProjectCollaborator>,
5963 _: Arc<Client>,
5964 mut cx: AsyncAppContext,
5965 ) -> Result<()> {
5966 this.update(&mut cx, |this, cx| {
5967 let peer_id = envelope
5968 .payload
5969 .peer_id
5970 .ok_or_else(|| anyhow!("invalid peer id"))?;
5971 let replica_id = this
5972 .collaborators
5973 .remove(&peer_id)
5974 .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?
5975 .replica_id;
5976 for buffer in this.opened_buffers.values() {
5977 if let Some(buffer) = buffer.upgrade(cx) {
5978 buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
5979 }
5980 }
5981 this.shared_buffers.remove(&peer_id);
5982
5983 cx.emit(Event::CollaboratorLeft(peer_id));
5984 cx.notify();
5985 Ok(())
5986 })
5987 }
5988
5989 async fn handle_update_project(
5990 this: ModelHandle<Self>,
5991 envelope: TypedEnvelope<proto::UpdateProject>,
5992 _: Arc<Client>,
5993 mut cx: AsyncAppContext,
5994 ) -> Result<()> {
5995 this.update(&mut cx, |this, cx| {
5996 // Don't handle messages that were sent before the response to us joining the project
5997 if envelope.message_id > this.join_project_response_message_id {
5998 this.set_worktrees_from_proto(envelope.payload.worktrees, cx)?;
5999 }
6000 Ok(())
6001 })
6002 }
6003
6004 async fn handle_update_worktree(
6005 this: ModelHandle<Self>,
6006 envelope: TypedEnvelope<proto::UpdateWorktree>,
6007 _: Arc<Client>,
6008 mut cx: AsyncAppContext,
6009 ) -> Result<()> {
6010 this.update(&mut cx, |this, cx| {
6011 let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
6012 if let Some(worktree) = this.worktree_for_id(worktree_id, cx) {
6013 worktree.update(cx, |worktree, _| {
6014 let worktree = worktree.as_remote_mut().unwrap();
6015 worktree.update_from_remote(envelope.payload);
6016 });
6017 }
6018 Ok(())
6019 })
6020 }
6021
6022 async fn handle_update_worktree_settings(
6023 this: ModelHandle<Self>,
6024 envelope: TypedEnvelope<proto::UpdateWorktreeSettings>,
6025 _: Arc<Client>,
6026 mut cx: AsyncAppContext,
6027 ) -> Result<()> {
6028 this.update(&mut cx, |this, cx| {
6029 let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
6030 if let Some(worktree) = this.worktree_for_id(worktree_id, cx) {
6031 cx.update_global::<SettingsStore, _, _>(|store, cx| {
6032 store
6033 .set_local_settings(
6034 worktree.id(),
6035 PathBuf::from(&envelope.payload.path).into(),
6036 envelope.payload.content.as_ref().map(String::as_str),
6037 cx,
6038 )
6039 .log_err();
6040 });
6041 }
6042 Ok(())
6043 })
6044 }
6045
6046 async fn handle_create_project_entry(
6047 this: ModelHandle<Self>,
6048 envelope: TypedEnvelope<proto::CreateProjectEntry>,
6049 _: Arc<Client>,
6050 mut cx: AsyncAppContext,
6051 ) -> Result<proto::ProjectEntryResponse> {
6052 let worktree = this.update(&mut cx, |this, cx| {
6053 let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
6054 this.worktree_for_id(worktree_id, cx)
6055 .ok_or_else(|| anyhow!("worktree not found"))
6056 })?;
6057 let worktree_scan_id = worktree.read_with(&cx, |worktree, _| worktree.scan_id());
6058 let entry = worktree
6059 .update(&mut cx, |worktree, cx| {
6060 let worktree = worktree.as_local_mut().unwrap();
6061 let path = PathBuf::from(envelope.payload.path);
6062 worktree.create_entry(path, envelope.payload.is_directory, cx)
6063 })
6064 .await?;
6065 Ok(proto::ProjectEntryResponse {
6066 entry: Some((&entry).into()),
6067 worktree_scan_id: worktree_scan_id as u64,
6068 })
6069 }
6070
6071 async fn handle_rename_project_entry(
6072 this: ModelHandle<Self>,
6073 envelope: TypedEnvelope<proto::RenameProjectEntry>,
6074 _: Arc<Client>,
6075 mut cx: AsyncAppContext,
6076 ) -> Result<proto::ProjectEntryResponse> {
6077 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
6078 let worktree = this.read_with(&cx, |this, cx| {
6079 this.worktree_for_entry(entry_id, cx)
6080 .ok_or_else(|| anyhow!("worktree not found"))
6081 })?;
6082 let worktree_scan_id = worktree.read_with(&cx, |worktree, _| worktree.scan_id());
6083 let entry = worktree
6084 .update(&mut cx, |worktree, cx| {
6085 let new_path = PathBuf::from(envelope.payload.new_path);
6086 worktree
6087 .as_local_mut()
6088 .unwrap()
6089 .rename_entry(entry_id, new_path, cx)
6090 .ok_or_else(|| anyhow!("invalid entry"))
6091 })?
6092 .await?;
6093 Ok(proto::ProjectEntryResponse {
6094 entry: Some((&entry).into()),
6095 worktree_scan_id: worktree_scan_id as u64,
6096 })
6097 }
6098
6099 async fn handle_copy_project_entry(
6100 this: ModelHandle<Self>,
6101 envelope: TypedEnvelope<proto::CopyProjectEntry>,
6102 _: Arc<Client>,
6103 mut cx: AsyncAppContext,
6104 ) -> Result<proto::ProjectEntryResponse> {
6105 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
6106 let worktree = this.read_with(&cx, |this, cx| {
6107 this.worktree_for_entry(entry_id, cx)
6108 .ok_or_else(|| anyhow!("worktree not found"))
6109 })?;
6110 let worktree_scan_id = worktree.read_with(&cx, |worktree, _| worktree.scan_id());
6111 let entry = worktree
6112 .update(&mut cx, |worktree, cx| {
6113 let new_path = PathBuf::from(envelope.payload.new_path);
6114 worktree
6115 .as_local_mut()
6116 .unwrap()
6117 .copy_entry(entry_id, new_path, cx)
6118 .ok_or_else(|| anyhow!("invalid entry"))
6119 })?
6120 .await?;
6121 Ok(proto::ProjectEntryResponse {
6122 entry: Some((&entry).into()),
6123 worktree_scan_id: worktree_scan_id as u64,
6124 })
6125 }
6126
6127 async fn handle_delete_project_entry(
6128 this: ModelHandle<Self>,
6129 envelope: TypedEnvelope<proto::DeleteProjectEntry>,
6130 _: Arc<Client>,
6131 mut cx: AsyncAppContext,
6132 ) -> Result<proto::ProjectEntryResponse> {
6133 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
6134
6135 this.update(&mut cx, |_, cx| cx.emit(Event::DeletedEntry(entry_id)));
6136
6137 let worktree = this.read_with(&cx, |this, cx| {
6138 this.worktree_for_entry(entry_id, cx)
6139 .ok_or_else(|| anyhow!("worktree not found"))
6140 })?;
6141 let worktree_scan_id = worktree.read_with(&cx, |worktree, _| worktree.scan_id());
6142 worktree
6143 .update(&mut cx, |worktree, cx| {
6144 worktree
6145 .as_local_mut()
6146 .unwrap()
6147 .delete_entry(entry_id, cx)
6148 .ok_or_else(|| anyhow!("invalid entry"))
6149 })?
6150 .await?;
6151 Ok(proto::ProjectEntryResponse {
6152 entry: None,
6153 worktree_scan_id: worktree_scan_id as u64,
6154 })
6155 }
6156
6157 async fn handle_expand_project_entry(
6158 this: ModelHandle<Self>,
6159 envelope: TypedEnvelope<proto::ExpandProjectEntry>,
6160 _: Arc<Client>,
6161 mut cx: AsyncAppContext,
6162 ) -> Result<proto::ExpandProjectEntryResponse> {
6163 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
6164 let worktree = this
6165 .read_with(&cx, |this, cx| this.worktree_for_entry(entry_id, cx))
6166 .ok_or_else(|| anyhow!("invalid request"))?;
6167 worktree
6168 .update(&mut cx, |worktree, cx| {
6169 worktree
6170 .as_local_mut()
6171 .unwrap()
6172 .expand_entry(entry_id, cx)
6173 .ok_or_else(|| anyhow!("invalid entry"))
6174 })?
6175 .await?;
6176 let worktree_scan_id = worktree.read_with(&cx, |worktree, _| worktree.scan_id()) as u64;
6177 Ok(proto::ExpandProjectEntryResponse { worktree_scan_id })
6178 }
6179
6180 async fn handle_update_diagnostic_summary(
6181 this: ModelHandle<Self>,
6182 envelope: TypedEnvelope<proto::UpdateDiagnosticSummary>,
6183 _: Arc<Client>,
6184 mut cx: AsyncAppContext,
6185 ) -> Result<()> {
6186 this.update(&mut cx, |this, cx| {
6187 let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
6188 if let Some(worktree) = this.worktree_for_id(worktree_id, cx) {
6189 if let Some(summary) = envelope.payload.summary {
6190 let project_path = ProjectPath {
6191 worktree_id,
6192 path: Path::new(&summary.path).into(),
6193 };
6194 worktree.update(cx, |worktree, _| {
6195 worktree
6196 .as_remote_mut()
6197 .unwrap()
6198 .update_diagnostic_summary(project_path.path.clone(), &summary);
6199 });
6200 cx.emit(Event::DiagnosticsUpdated {
6201 language_server_id: LanguageServerId(summary.language_server_id as usize),
6202 path: project_path,
6203 });
6204 }
6205 }
6206 Ok(())
6207 })
6208 }
6209
6210 async fn handle_start_language_server(
6211 this: ModelHandle<Self>,
6212 envelope: TypedEnvelope<proto::StartLanguageServer>,
6213 _: Arc<Client>,
6214 mut cx: AsyncAppContext,
6215 ) -> Result<()> {
6216 let server = envelope
6217 .payload
6218 .server
6219 .ok_or_else(|| anyhow!("invalid server"))?;
6220 this.update(&mut cx, |this, cx| {
6221 this.language_server_statuses.insert(
6222 LanguageServerId(server.id as usize),
6223 LanguageServerStatus {
6224 name: server.name,
6225 pending_work: Default::default(),
6226 has_pending_diagnostic_updates: false,
6227 progress_tokens: Default::default(),
6228 },
6229 );
6230 cx.notify();
6231 });
6232 Ok(())
6233 }
6234
6235 async fn handle_update_language_server(
6236 this: ModelHandle<Self>,
6237 envelope: TypedEnvelope<proto::UpdateLanguageServer>,
6238 _: Arc<Client>,
6239 mut cx: AsyncAppContext,
6240 ) -> Result<()> {
6241 this.update(&mut cx, |this, cx| {
6242 let language_server_id = LanguageServerId(envelope.payload.language_server_id as usize);
6243
6244 match envelope
6245 .payload
6246 .variant
6247 .ok_or_else(|| anyhow!("invalid variant"))?
6248 {
6249 proto::update_language_server::Variant::WorkStart(payload) => {
6250 this.on_lsp_work_start(
6251 language_server_id,
6252 payload.token,
6253 LanguageServerProgress {
6254 message: payload.message,
6255 percentage: payload.percentage.map(|p| p as usize),
6256 last_update_at: Instant::now(),
6257 },
6258 cx,
6259 );
6260 }
6261
6262 proto::update_language_server::Variant::WorkProgress(payload) => {
6263 this.on_lsp_work_progress(
6264 language_server_id,
6265 payload.token,
6266 LanguageServerProgress {
6267 message: payload.message,
6268 percentage: payload.percentage.map(|p| p as usize),
6269 last_update_at: Instant::now(),
6270 },
6271 cx,
6272 );
6273 }
6274
6275 proto::update_language_server::Variant::WorkEnd(payload) => {
6276 this.on_lsp_work_end(language_server_id, payload.token, cx);
6277 }
6278
6279 proto::update_language_server::Variant::DiskBasedDiagnosticsUpdating(_) => {
6280 this.disk_based_diagnostics_started(language_server_id, cx);
6281 }
6282
6283 proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(_) => {
6284 this.disk_based_diagnostics_finished(language_server_id, cx)
6285 }
6286 }
6287
6288 Ok(())
6289 })
6290 }
6291
6292 async fn handle_update_buffer(
6293 this: ModelHandle<Self>,
6294 envelope: TypedEnvelope<proto::UpdateBuffer>,
6295 _: Arc<Client>,
6296 mut cx: AsyncAppContext,
6297 ) -> Result<proto::Ack> {
6298 this.update(&mut cx, |this, cx| {
6299 let payload = envelope.payload.clone();
6300 let buffer_id = payload.buffer_id;
6301 let ops = payload
6302 .operations
6303 .into_iter()
6304 .map(language::proto::deserialize_operation)
6305 .collect::<Result<Vec<_>, _>>()?;
6306 let is_remote = this.is_remote();
6307 match this.opened_buffers.entry(buffer_id) {
6308 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
6309 OpenBuffer::Strong(buffer) => {
6310 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
6311 }
6312 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
6313 OpenBuffer::Weak(_) => {}
6314 },
6315 hash_map::Entry::Vacant(e) => {
6316 assert!(
6317 is_remote,
6318 "received buffer update from {:?}",
6319 envelope.original_sender_id
6320 );
6321 e.insert(OpenBuffer::Operations(ops));
6322 }
6323 }
6324 Ok(proto::Ack {})
6325 })
6326 }
6327
6328 async fn handle_create_buffer_for_peer(
6329 this: ModelHandle<Self>,
6330 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
6331 _: Arc<Client>,
6332 mut cx: AsyncAppContext,
6333 ) -> Result<()> {
6334 this.update(&mut cx, |this, cx| {
6335 match envelope
6336 .payload
6337 .variant
6338 .ok_or_else(|| anyhow!("missing variant"))?
6339 {
6340 proto::create_buffer_for_peer::Variant::State(mut state) => {
6341 let mut buffer_file = None;
6342 if let Some(file) = state.file.take() {
6343 let worktree_id = WorktreeId::from_proto(file.worktree_id);
6344 let worktree = this.worktree_for_id(worktree_id, cx).ok_or_else(|| {
6345 anyhow!("no worktree found for id {}", file.worktree_id)
6346 })?;
6347 buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
6348 as Arc<dyn language::File>);
6349 }
6350
6351 let buffer_id = state.id;
6352 let buffer = cx.add_model(|_| {
6353 Buffer::from_proto(this.replica_id(), state, buffer_file).unwrap()
6354 });
6355 this.incomplete_remote_buffers
6356 .insert(buffer_id, Some(buffer));
6357 }
6358 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
6359 let buffer = this
6360 .incomplete_remote_buffers
6361 .get(&chunk.buffer_id)
6362 .cloned()
6363 .flatten()
6364 .ok_or_else(|| {
6365 anyhow!(
6366 "received chunk for buffer {} without initial state",
6367 chunk.buffer_id
6368 )
6369 })?;
6370 let operations = chunk
6371 .operations
6372 .into_iter()
6373 .map(language::proto::deserialize_operation)
6374 .collect::<Result<Vec<_>>>()?;
6375 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))?;
6376
6377 if chunk.is_last {
6378 this.incomplete_remote_buffers.remove(&chunk.buffer_id);
6379 this.register_buffer(&buffer, cx)?;
6380 }
6381 }
6382 }
6383
6384 Ok(())
6385 })
6386 }
6387
6388 async fn handle_update_diff_base(
6389 this: ModelHandle<Self>,
6390 envelope: TypedEnvelope<proto::UpdateDiffBase>,
6391 _: Arc<Client>,
6392 mut cx: AsyncAppContext,
6393 ) -> Result<()> {
6394 this.update(&mut cx, |this, cx| {
6395 let buffer_id = envelope.payload.buffer_id;
6396 let diff_base = envelope.payload.diff_base;
6397 if let Some(buffer) = this
6398 .opened_buffers
6399 .get_mut(&buffer_id)
6400 .and_then(|b| b.upgrade(cx))
6401 .or_else(|| {
6402 this.incomplete_remote_buffers
6403 .get(&buffer_id)
6404 .cloned()
6405 .flatten()
6406 })
6407 {
6408 buffer.update(cx, |buffer, cx| buffer.set_diff_base(diff_base, cx));
6409 }
6410 Ok(())
6411 })
6412 }
6413
6414 async fn handle_update_buffer_file(
6415 this: ModelHandle<Self>,
6416 envelope: TypedEnvelope<proto::UpdateBufferFile>,
6417 _: Arc<Client>,
6418 mut cx: AsyncAppContext,
6419 ) -> Result<()> {
6420 let buffer_id = envelope.payload.buffer_id;
6421
6422 this.update(&mut cx, |this, cx| {
6423 let payload = envelope.payload.clone();
6424 if let Some(buffer) = this
6425 .opened_buffers
6426 .get(&buffer_id)
6427 .and_then(|b| b.upgrade(cx))
6428 .or_else(|| {
6429 this.incomplete_remote_buffers
6430 .get(&buffer_id)
6431 .cloned()
6432 .flatten()
6433 })
6434 {
6435 let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
6436 let worktree = this
6437 .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
6438 .ok_or_else(|| anyhow!("no such worktree"))?;
6439 let file = File::from_proto(file, worktree, cx)?;
6440 buffer.update(cx, |buffer, cx| {
6441 buffer.file_updated(Arc::new(file), cx).detach();
6442 });
6443 this.detect_language_for_buffer(&buffer, cx);
6444 }
6445 Ok(())
6446 })
6447 }
6448
6449 async fn handle_save_buffer(
6450 this: ModelHandle<Self>,
6451 envelope: TypedEnvelope<proto::SaveBuffer>,
6452 _: Arc<Client>,
6453 mut cx: AsyncAppContext,
6454 ) -> Result<proto::BufferSaved> {
6455 let buffer_id = envelope.payload.buffer_id;
6456 let (project_id, buffer) = this.update(&mut cx, |this, cx| {
6457 let project_id = this.remote_id().ok_or_else(|| anyhow!("not connected"))?;
6458 let buffer = this
6459 .opened_buffers
6460 .get(&buffer_id)
6461 .and_then(|buffer| buffer.upgrade(cx))
6462 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?;
6463 anyhow::Ok((project_id, buffer))
6464 })?;
6465 buffer
6466 .update(&mut cx, |buffer, _| {
6467 buffer.wait_for_version(deserialize_version(&envelope.payload.version))
6468 })
6469 .await?;
6470 let buffer_id = buffer.read_with(&cx, |buffer, _| buffer.remote_id());
6471
6472 this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))
6473 .await?;
6474 Ok(buffer.read_with(&cx, |buffer, _| proto::BufferSaved {
6475 project_id,
6476 buffer_id,
6477 version: serialize_version(buffer.saved_version()),
6478 mtime: Some(buffer.saved_mtime().into()),
6479 fingerprint: language::proto::serialize_fingerprint(buffer.saved_version_fingerprint()),
6480 }))
6481 }
6482
6483 async fn handle_reload_buffers(
6484 this: ModelHandle<Self>,
6485 envelope: TypedEnvelope<proto::ReloadBuffers>,
6486 _: Arc<Client>,
6487 mut cx: AsyncAppContext,
6488 ) -> Result<proto::ReloadBuffersResponse> {
6489 let sender_id = envelope.original_sender_id()?;
6490 let reload = this.update(&mut cx, |this, cx| {
6491 let mut buffers = HashSet::default();
6492 for buffer_id in &envelope.payload.buffer_ids {
6493 buffers.insert(
6494 this.opened_buffers
6495 .get(buffer_id)
6496 .and_then(|buffer| buffer.upgrade(cx))
6497 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?,
6498 );
6499 }
6500 Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
6501 })?;
6502
6503 let project_transaction = reload.await?;
6504 let project_transaction = this.update(&mut cx, |this, cx| {
6505 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
6506 });
6507 Ok(proto::ReloadBuffersResponse {
6508 transaction: Some(project_transaction),
6509 })
6510 }
6511
6512 async fn handle_synchronize_buffers(
6513 this: ModelHandle<Self>,
6514 envelope: TypedEnvelope<proto::SynchronizeBuffers>,
6515 _: Arc<Client>,
6516 mut cx: AsyncAppContext,
6517 ) -> Result<proto::SynchronizeBuffersResponse> {
6518 let project_id = envelope.payload.project_id;
6519 let mut response = proto::SynchronizeBuffersResponse {
6520 buffers: Default::default(),
6521 };
6522
6523 this.update(&mut cx, |this, cx| {
6524 let Some(guest_id) = envelope.original_sender_id else {
6525 error!("missing original_sender_id on SynchronizeBuffers request");
6526 return;
6527 };
6528
6529 this.shared_buffers.entry(guest_id).or_default().clear();
6530 for buffer in envelope.payload.buffers {
6531 let buffer_id = buffer.id;
6532 let remote_version = language::proto::deserialize_version(&buffer.version);
6533 if let Some(buffer) = this.buffer_for_id(buffer_id, cx) {
6534 this.shared_buffers
6535 .entry(guest_id)
6536 .or_default()
6537 .insert(buffer_id);
6538
6539 let buffer = buffer.read(cx);
6540 response.buffers.push(proto::BufferVersion {
6541 id: buffer_id,
6542 version: language::proto::serialize_version(&buffer.version),
6543 });
6544
6545 let operations = buffer.serialize_ops(Some(remote_version), cx);
6546 let client = this.client.clone();
6547 if let Some(file) = buffer.file() {
6548 client
6549 .send(proto::UpdateBufferFile {
6550 project_id,
6551 buffer_id: buffer_id as u64,
6552 file: Some(file.to_proto()),
6553 })
6554 .log_err();
6555 }
6556
6557 client
6558 .send(proto::UpdateDiffBase {
6559 project_id,
6560 buffer_id: buffer_id as u64,
6561 diff_base: buffer.diff_base().map(Into::into),
6562 })
6563 .log_err();
6564
6565 client
6566 .send(proto::BufferReloaded {
6567 project_id,
6568 buffer_id,
6569 version: language::proto::serialize_version(buffer.saved_version()),
6570 mtime: Some(buffer.saved_mtime().into()),
6571 fingerprint: language::proto::serialize_fingerprint(
6572 buffer.saved_version_fingerprint(),
6573 ),
6574 line_ending: language::proto::serialize_line_ending(
6575 buffer.line_ending(),
6576 ) as i32,
6577 })
6578 .log_err();
6579
6580 cx.background()
6581 .spawn(
6582 async move {
6583 let operations = operations.await;
6584 for chunk in split_operations(operations) {
6585 client
6586 .request(proto::UpdateBuffer {
6587 project_id,
6588 buffer_id,
6589 operations: chunk,
6590 })
6591 .await?;
6592 }
6593 anyhow::Ok(())
6594 }
6595 .log_err(),
6596 )
6597 .detach();
6598 }
6599 }
6600 });
6601
6602 Ok(response)
6603 }
6604
6605 async fn handle_format_buffers(
6606 this: ModelHandle<Self>,
6607 envelope: TypedEnvelope<proto::FormatBuffers>,
6608 _: Arc<Client>,
6609 mut cx: AsyncAppContext,
6610 ) -> Result<proto::FormatBuffersResponse> {
6611 let sender_id = envelope.original_sender_id()?;
6612 let format = this.update(&mut cx, |this, cx| {
6613 let mut buffers = HashSet::default();
6614 for buffer_id in &envelope.payload.buffer_ids {
6615 buffers.insert(
6616 this.opened_buffers
6617 .get(buffer_id)
6618 .and_then(|buffer| buffer.upgrade(cx))
6619 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?,
6620 );
6621 }
6622 let trigger = FormatTrigger::from_proto(envelope.payload.trigger);
6623 Ok::<_, anyhow::Error>(this.format(buffers, false, trigger, cx))
6624 })?;
6625
6626 let project_transaction = format.await?;
6627 let project_transaction = this.update(&mut cx, |this, cx| {
6628 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
6629 });
6630 Ok(proto::FormatBuffersResponse {
6631 transaction: Some(project_transaction),
6632 })
6633 }
6634
6635 async fn handle_apply_additional_edits_for_completion(
6636 this: ModelHandle<Self>,
6637 envelope: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
6638 _: Arc<Client>,
6639 mut cx: AsyncAppContext,
6640 ) -> Result<proto::ApplyCompletionAdditionalEditsResponse> {
6641 let (buffer, completion) = this.update(&mut cx, |this, cx| {
6642 let buffer = this
6643 .opened_buffers
6644 .get(&envelope.payload.buffer_id)
6645 .and_then(|buffer| buffer.upgrade(cx))
6646 .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?;
6647 let language = buffer.read(cx).language();
6648 let completion = language::proto::deserialize_completion(
6649 envelope
6650 .payload
6651 .completion
6652 .ok_or_else(|| anyhow!("invalid completion"))?,
6653 language.cloned(),
6654 );
6655 Ok::<_, anyhow::Error>((buffer, completion))
6656 })?;
6657
6658 let completion = completion.await?;
6659
6660 let apply_additional_edits = this.update(&mut cx, |this, cx| {
6661 this.apply_additional_edits_for_completion(buffer, completion, false, cx)
6662 });
6663
6664 Ok(proto::ApplyCompletionAdditionalEditsResponse {
6665 transaction: apply_additional_edits
6666 .await?
6667 .as_ref()
6668 .map(language::proto::serialize_transaction),
6669 })
6670 }
6671
6672 async fn handle_apply_code_action(
6673 this: ModelHandle<Self>,
6674 envelope: TypedEnvelope<proto::ApplyCodeAction>,
6675 _: Arc<Client>,
6676 mut cx: AsyncAppContext,
6677 ) -> Result<proto::ApplyCodeActionResponse> {
6678 let sender_id = envelope.original_sender_id()?;
6679 let action = language::proto::deserialize_code_action(
6680 envelope
6681 .payload
6682 .action
6683 .ok_or_else(|| anyhow!("invalid action"))?,
6684 )?;
6685 let apply_code_action = this.update(&mut cx, |this, cx| {
6686 let buffer = this
6687 .opened_buffers
6688 .get(&envelope.payload.buffer_id)
6689 .and_then(|buffer| buffer.upgrade(cx))
6690 .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?;
6691 Ok::<_, anyhow::Error>(this.apply_code_action(buffer, action, false, cx))
6692 })?;
6693
6694 let project_transaction = apply_code_action.await?;
6695 let project_transaction = this.update(&mut cx, |this, cx| {
6696 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
6697 });
6698 Ok(proto::ApplyCodeActionResponse {
6699 transaction: Some(project_transaction),
6700 })
6701 }
6702
6703 async fn handle_on_type_formatting(
6704 this: ModelHandle<Self>,
6705 envelope: TypedEnvelope<proto::OnTypeFormatting>,
6706 _: Arc<Client>,
6707 mut cx: AsyncAppContext,
6708 ) -> Result<proto::OnTypeFormattingResponse> {
6709 let on_type_formatting = this.update(&mut cx, |this, cx| {
6710 let buffer = this
6711 .opened_buffers
6712 .get(&envelope.payload.buffer_id)
6713 .and_then(|buffer| buffer.upgrade(cx))
6714 .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?;
6715 let position = envelope
6716 .payload
6717 .position
6718 .and_then(deserialize_anchor)
6719 .ok_or_else(|| anyhow!("invalid position"))?;
6720 Ok::<_, anyhow::Error>(this.apply_on_type_formatting(
6721 buffer,
6722 position,
6723 envelope.payload.trigger.clone(),
6724 cx,
6725 ))
6726 })?;
6727
6728 let transaction = on_type_formatting
6729 .await?
6730 .as_ref()
6731 .map(language::proto::serialize_transaction);
6732 Ok(proto::OnTypeFormattingResponse { transaction })
6733 }
6734
6735 async fn handle_inlay_hints(
6736 this: ModelHandle<Self>,
6737 envelope: TypedEnvelope<proto::InlayHints>,
6738 _: Arc<Client>,
6739 mut cx: AsyncAppContext,
6740 ) -> Result<proto::InlayHintsResponse> {
6741 let sender_id = envelope.original_sender_id()?;
6742 let buffer = this.update(&mut cx, |this, cx| {
6743 this.opened_buffers
6744 .get(&envelope.payload.buffer_id)
6745 .and_then(|buffer| buffer.upgrade(cx))
6746 .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))
6747 })?;
6748 let buffer_version = deserialize_version(&envelope.payload.version);
6749
6750 buffer
6751 .update(&mut cx, |buffer, _| {
6752 buffer.wait_for_version(buffer_version.clone())
6753 })
6754 .await
6755 .with_context(|| {
6756 format!(
6757 "waiting for version {:?} for buffer {}",
6758 buffer_version,
6759 buffer.id()
6760 )
6761 })?;
6762
6763 let start = envelope
6764 .payload
6765 .start
6766 .and_then(deserialize_anchor)
6767 .context("missing range start")?;
6768 let end = envelope
6769 .payload
6770 .end
6771 .and_then(deserialize_anchor)
6772 .context("missing range end")?;
6773 let buffer_hints = this
6774 .update(&mut cx, |project, cx| {
6775 project.inlay_hints(buffer, start..end, cx)
6776 })
6777 .await
6778 .context("inlay hints fetch")?;
6779
6780 Ok(this.update(&mut cx, |project, cx| {
6781 InlayHints::response_to_proto(buffer_hints, project, sender_id, &buffer_version, cx)
6782 }))
6783 }
6784
6785 async fn handle_refresh_inlay_hints(
6786 this: ModelHandle<Self>,
6787 _: TypedEnvelope<proto::RefreshInlayHints>,
6788 _: Arc<Client>,
6789 mut cx: AsyncAppContext,
6790 ) -> Result<proto::Ack> {
6791 this.update(&mut cx, |_, cx| {
6792 cx.emit(Event::RefreshInlays);
6793 });
6794 Ok(proto::Ack {})
6795 }
6796
6797 async fn handle_lsp_command<T: LspCommand>(
6798 this: ModelHandle<Self>,
6799 envelope: TypedEnvelope<T::ProtoRequest>,
6800 _: Arc<Client>,
6801 mut cx: AsyncAppContext,
6802 ) -> Result<<T::ProtoRequest as proto::RequestMessage>::Response>
6803 where
6804 <T::LspRequest as lsp::request::Request>::Result: Send,
6805 {
6806 let sender_id = envelope.original_sender_id()?;
6807 let buffer_id = T::buffer_id_from_proto(&envelope.payload);
6808 let buffer_handle = this.read_with(&cx, |this, _| {
6809 this.opened_buffers
6810 .get(&buffer_id)
6811 .and_then(|buffer| buffer.upgrade(&cx))
6812 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
6813 })?;
6814 let request = T::from_proto(
6815 envelope.payload,
6816 this.clone(),
6817 buffer_handle.clone(),
6818 cx.clone(),
6819 )
6820 .await?;
6821 let buffer_version = buffer_handle.read_with(&cx, |buffer, _| buffer.version());
6822 let response = this
6823 .update(&mut cx, |this, cx| {
6824 this.request_lsp(buffer_handle, request, cx)
6825 })
6826 .await?;
6827 this.update(&mut cx, |this, cx| {
6828 Ok(T::response_to_proto(
6829 response,
6830 this,
6831 sender_id,
6832 &buffer_version,
6833 cx,
6834 ))
6835 })
6836 }
6837
6838 async fn handle_get_project_symbols(
6839 this: ModelHandle<Self>,
6840 envelope: TypedEnvelope<proto::GetProjectSymbols>,
6841 _: Arc<Client>,
6842 mut cx: AsyncAppContext,
6843 ) -> Result<proto::GetProjectSymbolsResponse> {
6844 let symbols = this
6845 .update(&mut cx, |this, cx| {
6846 this.symbols(&envelope.payload.query, cx)
6847 })
6848 .await?;
6849
6850 Ok(proto::GetProjectSymbolsResponse {
6851 symbols: symbols.iter().map(serialize_symbol).collect(),
6852 })
6853 }
6854
6855 async fn handle_search_project(
6856 this: ModelHandle<Self>,
6857 envelope: TypedEnvelope<proto::SearchProject>,
6858 _: Arc<Client>,
6859 mut cx: AsyncAppContext,
6860 ) -> Result<proto::SearchProjectResponse> {
6861 let peer_id = envelope.original_sender_id()?;
6862 let query = SearchQuery::from_proto(envelope.payload)?;
6863 let result = this
6864 .update(&mut cx, |this, cx| this.search(query, cx))
6865 .await?;
6866
6867 this.update(&mut cx, |this, cx| {
6868 let mut locations = Vec::new();
6869 for (buffer, ranges) in result {
6870 for range in ranges {
6871 let start = serialize_anchor(&range.start);
6872 let end = serialize_anchor(&range.end);
6873 let buffer_id = this.create_buffer_for_peer(&buffer, peer_id, cx);
6874 locations.push(proto::Location {
6875 buffer_id,
6876 start: Some(start),
6877 end: Some(end),
6878 });
6879 }
6880 }
6881 Ok(proto::SearchProjectResponse { locations })
6882 })
6883 }
6884
6885 async fn handle_open_buffer_for_symbol(
6886 this: ModelHandle<Self>,
6887 envelope: TypedEnvelope<proto::OpenBufferForSymbol>,
6888 _: Arc<Client>,
6889 mut cx: AsyncAppContext,
6890 ) -> Result<proto::OpenBufferForSymbolResponse> {
6891 let peer_id = envelope.original_sender_id()?;
6892 let symbol = envelope
6893 .payload
6894 .symbol
6895 .ok_or_else(|| anyhow!("invalid symbol"))?;
6896 let symbol = this
6897 .read_with(&cx, |this, _| this.deserialize_symbol(symbol))
6898 .await?;
6899 let symbol = this.read_with(&cx, |this, _| {
6900 let signature = this.symbol_signature(&symbol.path);
6901 if signature == symbol.signature {
6902 Ok(symbol)
6903 } else {
6904 Err(anyhow!("invalid symbol signature"))
6905 }
6906 })?;
6907 let buffer = this
6908 .update(&mut cx, |this, cx| this.open_buffer_for_symbol(&symbol, cx))
6909 .await?;
6910
6911 Ok(proto::OpenBufferForSymbolResponse {
6912 buffer_id: this.update(&mut cx, |this, cx| {
6913 this.create_buffer_for_peer(&buffer, peer_id, cx)
6914 }),
6915 })
6916 }
6917
6918 fn symbol_signature(&self, project_path: &ProjectPath) -> [u8; 32] {
6919 let mut hasher = Sha256::new();
6920 hasher.update(project_path.worktree_id.to_proto().to_be_bytes());
6921 hasher.update(project_path.path.to_string_lossy().as_bytes());
6922 hasher.update(self.nonce.to_be_bytes());
6923 hasher.finalize().as_slice().try_into().unwrap()
6924 }
6925
6926 async fn handle_open_buffer_by_id(
6927 this: ModelHandle<Self>,
6928 envelope: TypedEnvelope<proto::OpenBufferById>,
6929 _: Arc<Client>,
6930 mut cx: AsyncAppContext,
6931 ) -> Result<proto::OpenBufferResponse> {
6932 let peer_id = envelope.original_sender_id()?;
6933 let buffer = this
6934 .update(&mut cx, |this, cx| {
6935 this.open_buffer_by_id(envelope.payload.id, cx)
6936 })
6937 .await?;
6938 this.update(&mut cx, |this, cx| {
6939 Ok(proto::OpenBufferResponse {
6940 buffer_id: this.create_buffer_for_peer(&buffer, peer_id, cx),
6941 })
6942 })
6943 }
6944
6945 async fn handle_open_buffer_by_path(
6946 this: ModelHandle<Self>,
6947 envelope: TypedEnvelope<proto::OpenBufferByPath>,
6948 _: Arc<Client>,
6949 mut cx: AsyncAppContext,
6950 ) -> Result<proto::OpenBufferResponse> {
6951 let peer_id = envelope.original_sender_id()?;
6952 let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
6953 let open_buffer = this.update(&mut cx, |this, cx| {
6954 this.open_buffer(
6955 ProjectPath {
6956 worktree_id,
6957 path: PathBuf::from(envelope.payload.path).into(),
6958 },
6959 cx,
6960 )
6961 });
6962
6963 let buffer = open_buffer.await?;
6964 this.update(&mut cx, |this, cx| {
6965 Ok(proto::OpenBufferResponse {
6966 buffer_id: this.create_buffer_for_peer(&buffer, peer_id, cx),
6967 })
6968 })
6969 }
6970
6971 fn serialize_project_transaction_for_peer(
6972 &mut self,
6973 project_transaction: ProjectTransaction,
6974 peer_id: proto::PeerId,
6975 cx: &mut AppContext,
6976 ) -> proto::ProjectTransaction {
6977 let mut serialized_transaction = proto::ProjectTransaction {
6978 buffer_ids: Default::default(),
6979 transactions: Default::default(),
6980 };
6981 for (buffer, transaction) in project_transaction.0 {
6982 serialized_transaction
6983 .buffer_ids
6984 .push(self.create_buffer_for_peer(&buffer, peer_id, cx));
6985 serialized_transaction
6986 .transactions
6987 .push(language::proto::serialize_transaction(&transaction));
6988 }
6989 serialized_transaction
6990 }
6991
6992 fn deserialize_project_transaction(
6993 &mut self,
6994 message: proto::ProjectTransaction,
6995 push_to_history: bool,
6996 cx: &mut ModelContext<Self>,
6997 ) -> Task<Result<ProjectTransaction>> {
6998 cx.spawn(|this, mut cx| async move {
6999 let mut project_transaction = ProjectTransaction::default();
7000 for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
7001 {
7002 let buffer = this
7003 .update(&mut cx, |this, cx| {
7004 this.wait_for_remote_buffer(buffer_id, cx)
7005 })
7006 .await?;
7007 let transaction = language::proto::deserialize_transaction(transaction)?;
7008 project_transaction.0.insert(buffer, transaction);
7009 }
7010
7011 for (buffer, transaction) in &project_transaction.0 {
7012 buffer
7013 .update(&mut cx, |buffer, _| {
7014 buffer.wait_for_edits(transaction.edit_ids.iter().copied())
7015 })
7016 .await?;
7017
7018 if push_to_history {
7019 buffer.update(&mut cx, |buffer, _| {
7020 buffer.push_transaction(transaction.clone(), Instant::now());
7021 });
7022 }
7023 }
7024
7025 Ok(project_transaction)
7026 })
7027 }
7028
7029 fn create_buffer_for_peer(
7030 &mut self,
7031 buffer: &ModelHandle<Buffer>,
7032 peer_id: proto::PeerId,
7033 cx: &mut AppContext,
7034 ) -> u64 {
7035 let buffer_id = buffer.read(cx).remote_id();
7036 if let Some(ProjectClientState::Local { updates_tx, .. }) = &self.client_state {
7037 updates_tx
7038 .unbounded_send(LocalProjectUpdate::CreateBufferForPeer { peer_id, buffer_id })
7039 .ok();
7040 }
7041 buffer_id
7042 }
7043
7044 fn wait_for_remote_buffer(
7045 &mut self,
7046 id: u64,
7047 cx: &mut ModelContext<Self>,
7048 ) -> Task<Result<ModelHandle<Buffer>>> {
7049 let mut opened_buffer_rx = self.opened_buffer.1.clone();
7050
7051 cx.spawn_weak(|this, mut cx| async move {
7052 let buffer = loop {
7053 let Some(this) = this.upgrade(&cx) else {
7054 return Err(anyhow!("project dropped"));
7055 };
7056
7057 let buffer = this.read_with(&cx, |this, cx| {
7058 this.opened_buffers
7059 .get(&id)
7060 .and_then(|buffer| buffer.upgrade(cx))
7061 });
7062
7063 if let Some(buffer) = buffer {
7064 break buffer;
7065 } else if this.read_with(&cx, |this, _| this.is_read_only()) {
7066 return Err(anyhow!("disconnected before buffer {} could be opened", id));
7067 }
7068
7069 this.update(&mut cx, |this, _| {
7070 this.incomplete_remote_buffers.entry(id).or_default();
7071 });
7072 drop(this);
7073
7074 opened_buffer_rx
7075 .next()
7076 .await
7077 .ok_or_else(|| anyhow!("project dropped while waiting for buffer"))?;
7078 };
7079
7080 Ok(buffer)
7081 })
7082 }
7083
7084 fn synchronize_remote_buffers(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
7085 let project_id = match self.client_state.as_ref() {
7086 Some(ProjectClientState::Remote {
7087 sharing_has_stopped,
7088 remote_id,
7089 ..
7090 }) => {
7091 if *sharing_has_stopped {
7092 return Task::ready(Err(anyhow!(
7093 "can't synchronize remote buffers on a readonly project"
7094 )));
7095 } else {
7096 *remote_id
7097 }
7098 }
7099 Some(ProjectClientState::Local { .. }) | None => {
7100 return Task::ready(Err(anyhow!(
7101 "can't synchronize remote buffers on a local project"
7102 )))
7103 }
7104 };
7105
7106 let client = self.client.clone();
7107 cx.spawn(|this, cx| async move {
7108 let (buffers, incomplete_buffer_ids) = this.read_with(&cx, |this, cx| {
7109 let buffers = this
7110 .opened_buffers
7111 .iter()
7112 .filter_map(|(id, buffer)| {
7113 let buffer = buffer.upgrade(cx)?;
7114 Some(proto::BufferVersion {
7115 id: *id,
7116 version: language::proto::serialize_version(&buffer.read(cx).version),
7117 })
7118 })
7119 .collect();
7120 let incomplete_buffer_ids = this
7121 .incomplete_remote_buffers
7122 .keys()
7123 .copied()
7124 .collect::<Vec<_>>();
7125
7126 (buffers, incomplete_buffer_ids)
7127 });
7128 let response = client
7129 .request(proto::SynchronizeBuffers {
7130 project_id,
7131 buffers,
7132 })
7133 .await?;
7134
7135 let send_updates_for_buffers = response.buffers.into_iter().map(|buffer| {
7136 let client = client.clone();
7137 let buffer_id = buffer.id;
7138 let remote_version = language::proto::deserialize_version(&buffer.version);
7139 this.read_with(&cx, |this, cx| {
7140 if let Some(buffer) = this.buffer_for_id(buffer_id, cx) {
7141 let operations = buffer.read(cx).serialize_ops(Some(remote_version), cx);
7142 cx.background().spawn(async move {
7143 let operations = operations.await;
7144 for chunk in split_operations(operations) {
7145 client
7146 .request(proto::UpdateBuffer {
7147 project_id,
7148 buffer_id,
7149 operations: chunk,
7150 })
7151 .await?;
7152 }
7153 anyhow::Ok(())
7154 })
7155 } else {
7156 Task::ready(Ok(()))
7157 }
7158 })
7159 });
7160
7161 // Any incomplete buffers have open requests waiting. Request that the host sends
7162 // creates these buffers for us again to unblock any waiting futures.
7163 for id in incomplete_buffer_ids {
7164 cx.background()
7165 .spawn(client.request(proto::OpenBufferById { project_id, id }))
7166 .detach();
7167 }
7168
7169 futures::future::join_all(send_updates_for_buffers)
7170 .await
7171 .into_iter()
7172 .collect()
7173 })
7174 }
7175
7176 pub fn worktree_metadata_protos(&self, cx: &AppContext) -> Vec<proto::WorktreeMetadata> {
7177 self.worktrees(cx)
7178 .map(|worktree| {
7179 let worktree = worktree.read(cx);
7180 proto::WorktreeMetadata {
7181 id: worktree.id().to_proto(),
7182 root_name: worktree.root_name().into(),
7183 visible: worktree.is_visible(),
7184 abs_path: worktree.abs_path().to_string_lossy().into(),
7185 }
7186 })
7187 .collect()
7188 }
7189
7190 fn set_worktrees_from_proto(
7191 &mut self,
7192 worktrees: Vec<proto::WorktreeMetadata>,
7193 cx: &mut ModelContext<Project>,
7194 ) -> Result<()> {
7195 let replica_id = self.replica_id();
7196 let remote_id = self.remote_id().ok_or_else(|| anyhow!("invalid project"))?;
7197
7198 let mut old_worktrees_by_id = self
7199 .worktrees
7200 .drain(..)
7201 .filter_map(|worktree| {
7202 let worktree = worktree.upgrade(cx)?;
7203 Some((worktree.read(cx).id(), worktree))
7204 })
7205 .collect::<HashMap<_, _>>();
7206
7207 for worktree in worktrees {
7208 if let Some(old_worktree) =
7209 old_worktrees_by_id.remove(&WorktreeId::from_proto(worktree.id))
7210 {
7211 self.worktrees.push(WorktreeHandle::Strong(old_worktree));
7212 } else {
7213 let worktree =
7214 Worktree::remote(remote_id, replica_id, worktree, self.client.clone(), cx);
7215 let _ = self.add_worktree(&worktree, cx);
7216 }
7217 }
7218
7219 self.metadata_changed(cx);
7220 for id in old_worktrees_by_id.keys() {
7221 cx.emit(Event::WorktreeRemoved(*id));
7222 }
7223
7224 Ok(())
7225 }
7226
7227 fn set_collaborators_from_proto(
7228 &mut self,
7229 messages: Vec<proto::Collaborator>,
7230 cx: &mut ModelContext<Self>,
7231 ) -> Result<()> {
7232 let mut collaborators = HashMap::default();
7233 for message in messages {
7234 let collaborator = Collaborator::from_proto(message)?;
7235 collaborators.insert(collaborator.peer_id, collaborator);
7236 }
7237 for old_peer_id in self.collaborators.keys() {
7238 if !collaborators.contains_key(old_peer_id) {
7239 cx.emit(Event::CollaboratorLeft(*old_peer_id));
7240 }
7241 }
7242 self.collaborators = collaborators;
7243 Ok(())
7244 }
7245
7246 fn deserialize_symbol(
7247 &self,
7248 serialized_symbol: proto::Symbol,
7249 ) -> impl Future<Output = Result<Symbol>> {
7250 let languages = self.languages.clone();
7251 async move {
7252 let source_worktree_id = WorktreeId::from_proto(serialized_symbol.source_worktree_id);
7253 let worktree_id = WorktreeId::from_proto(serialized_symbol.worktree_id);
7254 let start = serialized_symbol
7255 .start
7256 .ok_or_else(|| anyhow!("invalid start"))?;
7257 let end = serialized_symbol
7258 .end
7259 .ok_or_else(|| anyhow!("invalid end"))?;
7260 let kind = unsafe { mem::transmute(serialized_symbol.kind) };
7261 let path = ProjectPath {
7262 worktree_id,
7263 path: PathBuf::from(serialized_symbol.path).into(),
7264 };
7265 let language = languages
7266 .language_for_file(&path.path, None)
7267 .await
7268 .log_err();
7269 Ok(Symbol {
7270 language_server_name: LanguageServerName(
7271 serialized_symbol.language_server_name.into(),
7272 ),
7273 source_worktree_id,
7274 path,
7275 label: {
7276 match language {
7277 Some(language) => {
7278 language
7279 .label_for_symbol(&serialized_symbol.name, kind)
7280 .await
7281 }
7282 None => None,
7283 }
7284 .unwrap_or_else(|| CodeLabel::plain(serialized_symbol.name.clone(), None))
7285 },
7286
7287 name: serialized_symbol.name,
7288 range: Unclipped(PointUtf16::new(start.row, start.column))
7289 ..Unclipped(PointUtf16::new(end.row, end.column)),
7290 kind,
7291 signature: serialized_symbol
7292 .signature
7293 .try_into()
7294 .map_err(|_| anyhow!("invalid signature"))?,
7295 })
7296 }
7297 }
7298
7299 async fn handle_buffer_saved(
7300 this: ModelHandle<Self>,
7301 envelope: TypedEnvelope<proto::BufferSaved>,
7302 _: Arc<Client>,
7303 mut cx: AsyncAppContext,
7304 ) -> Result<()> {
7305 let fingerprint = deserialize_fingerprint(&envelope.payload.fingerprint)?;
7306 let version = deserialize_version(&envelope.payload.version);
7307 let mtime = envelope
7308 .payload
7309 .mtime
7310 .ok_or_else(|| anyhow!("missing mtime"))?
7311 .into();
7312
7313 this.update(&mut cx, |this, cx| {
7314 let buffer = this
7315 .opened_buffers
7316 .get(&envelope.payload.buffer_id)
7317 .and_then(|buffer| buffer.upgrade(cx))
7318 .or_else(|| {
7319 this.incomplete_remote_buffers
7320 .get(&envelope.payload.buffer_id)
7321 .and_then(|b| b.clone())
7322 });
7323 if let Some(buffer) = buffer {
7324 buffer.update(cx, |buffer, cx| {
7325 buffer.did_save(version, fingerprint, mtime, cx);
7326 });
7327 }
7328 Ok(())
7329 })
7330 }
7331
7332 async fn handle_buffer_reloaded(
7333 this: ModelHandle<Self>,
7334 envelope: TypedEnvelope<proto::BufferReloaded>,
7335 _: Arc<Client>,
7336 mut cx: AsyncAppContext,
7337 ) -> Result<()> {
7338 let payload = envelope.payload;
7339 let version = deserialize_version(&payload.version);
7340 let fingerprint = deserialize_fingerprint(&payload.fingerprint)?;
7341 let line_ending = deserialize_line_ending(
7342 proto::LineEnding::from_i32(payload.line_ending)
7343 .ok_or_else(|| anyhow!("missing line ending"))?,
7344 );
7345 let mtime = payload
7346 .mtime
7347 .ok_or_else(|| anyhow!("missing mtime"))?
7348 .into();
7349 this.update(&mut cx, |this, cx| {
7350 let buffer = this
7351 .opened_buffers
7352 .get(&payload.buffer_id)
7353 .and_then(|buffer| buffer.upgrade(cx))
7354 .or_else(|| {
7355 this.incomplete_remote_buffers
7356 .get(&payload.buffer_id)
7357 .cloned()
7358 .flatten()
7359 });
7360 if let Some(buffer) = buffer {
7361 buffer.update(cx, |buffer, cx| {
7362 buffer.did_reload(version, fingerprint, line_ending, mtime, cx);
7363 });
7364 }
7365 Ok(())
7366 })
7367 }
7368
7369 #[allow(clippy::type_complexity)]
7370 fn edits_from_lsp(
7371 &mut self,
7372 buffer: &ModelHandle<Buffer>,
7373 lsp_edits: impl 'static + Send + IntoIterator<Item = lsp::TextEdit>,
7374 server_id: LanguageServerId,
7375 version: Option<i32>,
7376 cx: &mut ModelContext<Self>,
7377 ) -> Task<Result<Vec<(Range<Anchor>, String)>>> {
7378 let snapshot = self.buffer_snapshot_for_lsp_version(buffer, server_id, version, cx);
7379 cx.background().spawn(async move {
7380 let snapshot = snapshot?;
7381 let mut lsp_edits = lsp_edits
7382 .into_iter()
7383 .map(|edit| (range_from_lsp(edit.range), edit.new_text))
7384 .collect::<Vec<_>>();
7385 lsp_edits.sort_by_key(|(range, _)| range.start);
7386
7387 let mut lsp_edits = lsp_edits.into_iter().peekable();
7388 let mut edits = Vec::new();
7389 while let Some((range, mut new_text)) = lsp_edits.next() {
7390 // Clip invalid ranges provided by the language server.
7391 let mut range = snapshot.clip_point_utf16(range.start, Bias::Left)
7392 ..snapshot.clip_point_utf16(range.end, Bias::Left);
7393
7394 // Combine any LSP edits that are adjacent.
7395 //
7396 // Also, combine LSP edits that are separated from each other by only
7397 // a newline. This is important because for some code actions,
7398 // Rust-analyzer rewrites the entire buffer via a series of edits that
7399 // are separated by unchanged newline characters.
7400 //
7401 // In order for the diffing logic below to work properly, any edits that
7402 // cancel each other out must be combined into one.
7403 while let Some((next_range, next_text)) = lsp_edits.peek() {
7404 if next_range.start.0 > range.end {
7405 if next_range.start.0.row > range.end.row + 1
7406 || next_range.start.0.column > 0
7407 || snapshot.clip_point_utf16(
7408 Unclipped(PointUtf16::new(range.end.row, u32::MAX)),
7409 Bias::Left,
7410 ) > range.end
7411 {
7412 break;
7413 }
7414 new_text.push('\n');
7415 }
7416 range.end = snapshot.clip_point_utf16(next_range.end, Bias::Left);
7417 new_text.push_str(next_text);
7418 lsp_edits.next();
7419 }
7420
7421 // For multiline edits, perform a diff of the old and new text so that
7422 // we can identify the changes more precisely, preserving the locations
7423 // of any anchors positioned in the unchanged regions.
7424 if range.end.row > range.start.row {
7425 let mut offset = range.start.to_offset(&snapshot);
7426 let old_text = snapshot.text_for_range(range).collect::<String>();
7427
7428 let diff = TextDiff::from_lines(old_text.as_str(), &new_text);
7429 let mut moved_since_edit = true;
7430 for change in diff.iter_all_changes() {
7431 let tag = change.tag();
7432 let value = change.value();
7433 match tag {
7434 ChangeTag::Equal => {
7435 offset += value.len();
7436 moved_since_edit = true;
7437 }
7438 ChangeTag::Delete => {
7439 let start = snapshot.anchor_after(offset);
7440 let end = snapshot.anchor_before(offset + value.len());
7441 if moved_since_edit {
7442 edits.push((start..end, String::new()));
7443 } else {
7444 edits.last_mut().unwrap().0.end = end;
7445 }
7446 offset += value.len();
7447 moved_since_edit = false;
7448 }
7449 ChangeTag::Insert => {
7450 if moved_since_edit {
7451 let anchor = snapshot.anchor_after(offset);
7452 edits.push((anchor..anchor, value.to_string()));
7453 } else {
7454 edits.last_mut().unwrap().1.push_str(value);
7455 }
7456 moved_since_edit = false;
7457 }
7458 }
7459 }
7460 } else if range.end == range.start {
7461 let anchor = snapshot.anchor_after(range.start);
7462 edits.push((anchor..anchor, new_text));
7463 } else {
7464 let edit_start = snapshot.anchor_after(range.start);
7465 let edit_end = snapshot.anchor_before(range.end);
7466 edits.push((edit_start..edit_end, new_text));
7467 }
7468 }
7469
7470 Ok(edits)
7471 })
7472 }
7473
7474 fn buffer_snapshot_for_lsp_version(
7475 &mut self,
7476 buffer: &ModelHandle<Buffer>,
7477 server_id: LanguageServerId,
7478 version: Option<i32>,
7479 cx: &AppContext,
7480 ) -> Result<TextBufferSnapshot> {
7481 const OLD_VERSIONS_TO_RETAIN: i32 = 10;
7482
7483 if let Some(version) = version {
7484 let buffer_id = buffer.read(cx).remote_id();
7485 let snapshots = self
7486 .buffer_snapshots
7487 .get_mut(&buffer_id)
7488 .and_then(|m| m.get_mut(&server_id))
7489 .ok_or_else(|| {
7490 anyhow!("no snapshots found for buffer {buffer_id} and server {server_id}")
7491 })?;
7492
7493 let found_snapshot = snapshots
7494 .binary_search_by_key(&version, |e| e.version)
7495 .map(|ix| snapshots[ix].snapshot.clone())
7496 .map_err(|_| {
7497 anyhow!("snapshot not found for buffer {buffer_id} server {server_id} at version {version}")
7498 })?;
7499
7500 snapshots.retain(|snapshot| snapshot.version + OLD_VERSIONS_TO_RETAIN >= version);
7501 Ok(found_snapshot)
7502 } else {
7503 Ok((buffer.read(cx)).text_snapshot())
7504 }
7505 }
7506
7507 pub fn language_servers(
7508 &self,
7509 ) -> impl '_ + Iterator<Item = (LanguageServerId, LanguageServerName, WorktreeId)> {
7510 self.language_server_ids
7511 .iter()
7512 .map(|((worktree_id, server_name), server_id)| {
7513 (*server_id, server_name.clone(), *worktree_id)
7514 })
7515 }
7516
7517 pub fn language_server_for_id(&self, id: LanguageServerId) -> Option<Arc<LanguageServer>> {
7518 if let LanguageServerState::Running { server, .. } = self.language_servers.get(&id)? {
7519 Some(server.clone())
7520 } else {
7521 None
7522 }
7523 }
7524
7525 pub fn language_servers_for_buffer(
7526 &self,
7527 buffer: &Buffer,
7528 cx: &AppContext,
7529 ) -> impl Iterator<Item = (&Arc<CachedLspAdapter>, &Arc<LanguageServer>)> {
7530 self.language_server_ids_for_buffer(buffer, cx)
7531 .into_iter()
7532 .filter_map(|server_id| match self.language_servers.get(&server_id)? {
7533 LanguageServerState::Running {
7534 adapter, server, ..
7535 } => Some((adapter, server)),
7536 _ => None,
7537 })
7538 }
7539
7540 fn primary_language_servers_for_buffer(
7541 &self,
7542 buffer: &Buffer,
7543 cx: &AppContext,
7544 ) -> Option<(&Arc<CachedLspAdapter>, &Arc<LanguageServer>)> {
7545 self.language_servers_for_buffer(buffer, cx).next()
7546 }
7547
7548 fn language_server_for_buffer(
7549 &self,
7550 buffer: &Buffer,
7551 server_id: LanguageServerId,
7552 cx: &AppContext,
7553 ) -> Option<(&Arc<CachedLspAdapter>, &Arc<LanguageServer>)> {
7554 self.language_servers_for_buffer(buffer, cx)
7555 .find(|(_, s)| s.server_id() == server_id)
7556 }
7557
7558 fn language_server_ids_for_buffer(
7559 &self,
7560 buffer: &Buffer,
7561 cx: &AppContext,
7562 ) -> Vec<LanguageServerId> {
7563 if let Some((file, language)) = File::from_dyn(buffer.file()).zip(buffer.language()) {
7564 let worktree_id = file.worktree_id(cx);
7565 language
7566 .lsp_adapters()
7567 .iter()
7568 .flat_map(|adapter| {
7569 let key = (worktree_id, adapter.name.clone());
7570 self.language_server_ids.get(&key).copied()
7571 })
7572 .collect()
7573 } else {
7574 Vec::new()
7575 }
7576 }
7577}
7578
7579fn glob_literal_prefix<'a>(glob: &'a str) -> &'a str {
7580 let mut literal_end = 0;
7581 for (i, part) in glob.split(path::MAIN_SEPARATOR).enumerate() {
7582 if part.contains(&['*', '?', '{', '}']) {
7583 break;
7584 } else {
7585 if i > 0 {
7586 // Acount for separator prior to this part
7587 literal_end += path::MAIN_SEPARATOR.len_utf8();
7588 }
7589 literal_end += part.len();
7590 }
7591 }
7592 &glob[..literal_end]
7593}
7594
7595impl WorktreeHandle {
7596 pub fn upgrade(&self, cx: &AppContext) -> Option<ModelHandle<Worktree>> {
7597 match self {
7598 WorktreeHandle::Strong(handle) => Some(handle.clone()),
7599 WorktreeHandle::Weak(handle) => handle.upgrade(cx),
7600 }
7601 }
7602
7603 pub fn handle_id(&self) -> usize {
7604 match self {
7605 WorktreeHandle::Strong(handle) => handle.id(),
7606 WorktreeHandle::Weak(handle) => handle.id(),
7607 }
7608 }
7609}
7610
7611impl OpenBuffer {
7612 pub fn upgrade(&self, cx: &impl BorrowAppContext) -> Option<ModelHandle<Buffer>> {
7613 match self {
7614 OpenBuffer::Strong(handle) => Some(handle.clone()),
7615 OpenBuffer::Weak(handle) => handle.upgrade(cx),
7616 OpenBuffer::Operations(_) => None,
7617 }
7618 }
7619}
7620
7621pub struct PathMatchCandidateSet {
7622 pub snapshot: Snapshot,
7623 pub include_ignored: bool,
7624 pub include_root_name: bool,
7625}
7626
7627impl<'a> fuzzy::PathMatchCandidateSet<'a> for PathMatchCandidateSet {
7628 type Candidates = PathMatchCandidateSetIter<'a>;
7629
7630 fn id(&self) -> usize {
7631 self.snapshot.id().to_usize()
7632 }
7633
7634 fn len(&self) -> usize {
7635 if self.include_ignored {
7636 self.snapshot.file_count()
7637 } else {
7638 self.snapshot.visible_file_count()
7639 }
7640 }
7641
7642 fn prefix(&self) -> Arc<str> {
7643 if self.snapshot.root_entry().map_or(false, |e| e.is_file()) {
7644 self.snapshot.root_name().into()
7645 } else if self.include_root_name {
7646 format!("{}/", self.snapshot.root_name()).into()
7647 } else {
7648 "".into()
7649 }
7650 }
7651
7652 fn candidates(&'a self, start: usize) -> Self::Candidates {
7653 PathMatchCandidateSetIter {
7654 traversal: self.snapshot.files(self.include_ignored, start),
7655 }
7656 }
7657}
7658
7659pub struct PathMatchCandidateSetIter<'a> {
7660 traversal: Traversal<'a>,
7661}
7662
7663impl<'a> Iterator for PathMatchCandidateSetIter<'a> {
7664 type Item = fuzzy::PathMatchCandidate<'a>;
7665
7666 fn next(&mut self) -> Option<Self::Item> {
7667 self.traversal.next().map(|entry| {
7668 if let EntryKind::File(char_bag) = entry.kind {
7669 fuzzy::PathMatchCandidate {
7670 path: &entry.path,
7671 char_bag,
7672 }
7673 } else {
7674 unreachable!()
7675 }
7676 })
7677 }
7678}
7679
7680impl Entity for Project {
7681 type Event = Event;
7682
7683 fn release(&mut self, cx: &mut gpui::AppContext) {
7684 match &self.client_state {
7685 Some(ProjectClientState::Local { .. }) => {
7686 let _ = self.unshare_internal(cx);
7687 }
7688 Some(ProjectClientState::Remote { remote_id, .. }) => {
7689 let _ = self.client.send(proto::LeaveProject {
7690 project_id: *remote_id,
7691 });
7692 self.disconnected_from_host_internal(cx);
7693 }
7694 _ => {}
7695 }
7696 }
7697
7698 fn app_will_quit(
7699 &mut self,
7700 _: &mut AppContext,
7701 ) -> Option<std::pin::Pin<Box<dyn 'static + Future<Output = ()>>>> {
7702 let shutdown_futures = self
7703 .language_servers
7704 .drain()
7705 .map(|(_, server_state)| async {
7706 use LanguageServerState::*;
7707 match server_state {
7708 Running { server, .. } => server.shutdown()?.await,
7709 Starting(task) => task.await?.shutdown()?.await,
7710 }
7711 })
7712 .collect::<Vec<_>>();
7713
7714 Some(
7715 async move {
7716 futures::future::join_all(shutdown_futures).await;
7717 }
7718 .boxed(),
7719 )
7720 }
7721}
7722
7723impl Collaborator {
7724 fn from_proto(message: proto::Collaborator) -> Result<Self> {
7725 Ok(Self {
7726 peer_id: message.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?,
7727 replica_id: message.replica_id as ReplicaId,
7728 })
7729 }
7730}
7731
7732impl<P: AsRef<Path>> From<(WorktreeId, P)> for ProjectPath {
7733 fn from((worktree_id, path): (WorktreeId, P)) -> Self {
7734 Self {
7735 worktree_id,
7736 path: path.as_ref().into(),
7737 }
7738 }
7739}
7740
7741impl ProjectLspAdapterDelegate {
7742 fn new(project: &Project, cx: &ModelContext<Project>) -> Arc<Self> {
7743 Arc::new(Self {
7744 project: cx.handle(),
7745 http_client: project.client.http_client(),
7746 })
7747 }
7748}
7749
7750impl LspAdapterDelegate for ProjectLspAdapterDelegate {
7751 fn show_notification(&self, message: &str, cx: &mut AppContext) {
7752 self.project
7753 .update(cx, |_, cx| cx.emit(Event::Notification(message.to_owned())));
7754 }
7755
7756 fn http_client(&self) -> Arc<dyn HttpClient> {
7757 self.http_client.clone()
7758 }
7759}
7760
7761fn split_operations(
7762 mut operations: Vec<proto::Operation>,
7763) -> impl Iterator<Item = Vec<proto::Operation>> {
7764 #[cfg(any(test, feature = "test-support"))]
7765 const CHUNK_SIZE: usize = 5;
7766
7767 #[cfg(not(any(test, feature = "test-support")))]
7768 const CHUNK_SIZE: usize = 100;
7769
7770 let mut done = false;
7771 std::iter::from_fn(move || {
7772 if done {
7773 return None;
7774 }
7775
7776 let operations = operations
7777 .drain(..cmp::min(CHUNK_SIZE, operations.len()))
7778 .collect::<Vec<_>>();
7779 if operations.is_empty() {
7780 done = true;
7781 }
7782 Some(operations)
7783 })
7784}
7785
7786fn serialize_symbol(symbol: &Symbol) -> proto::Symbol {
7787 proto::Symbol {
7788 language_server_name: symbol.language_server_name.0.to_string(),
7789 source_worktree_id: symbol.source_worktree_id.to_proto(),
7790 worktree_id: symbol.path.worktree_id.to_proto(),
7791 path: symbol.path.path.to_string_lossy().to_string(),
7792 name: symbol.name.clone(),
7793 kind: unsafe { mem::transmute(symbol.kind) },
7794 start: Some(proto::PointUtf16 {
7795 row: symbol.range.start.0.row,
7796 column: symbol.range.start.0.column,
7797 }),
7798 end: Some(proto::PointUtf16 {
7799 row: symbol.range.end.0.row,
7800 column: symbol.range.end.0.column,
7801 }),
7802 signature: symbol.signature.to_vec(),
7803 }
7804}
7805
7806fn relativize_path(base: &Path, path: &Path) -> PathBuf {
7807 let mut path_components = path.components();
7808 let mut base_components = base.components();
7809 let mut components: Vec<Component> = Vec::new();
7810 loop {
7811 match (path_components.next(), base_components.next()) {
7812 (None, None) => break,
7813 (Some(a), None) => {
7814 components.push(a);
7815 components.extend(path_components.by_ref());
7816 break;
7817 }
7818 (None, _) => components.push(Component::ParentDir),
7819 (Some(a), Some(b)) if components.is_empty() && a == b => (),
7820 (Some(a), Some(b)) if b == Component::CurDir => components.push(a),
7821 (Some(a), Some(_)) => {
7822 components.push(Component::ParentDir);
7823 for _ in base_components {
7824 components.push(Component::ParentDir);
7825 }
7826 components.push(a);
7827 components.extend(path_components.by_ref());
7828 break;
7829 }
7830 }
7831 }
7832 components.iter().map(|c| c.as_os_str()).collect()
7833}
7834
7835impl Item for Buffer {
7836 fn entry_id(&self, cx: &AppContext) -> Option<ProjectEntryId> {
7837 File::from_dyn(self.file()).and_then(|file| file.project_entry_id(cx))
7838 }
7839
7840 fn project_path(&self, cx: &AppContext) -> Option<ProjectPath> {
7841 File::from_dyn(self.file()).map(|file| ProjectPath {
7842 worktree_id: file.worktree_id(cx),
7843 path: file.path().clone(),
7844 })
7845 }
7846}
7847
7848async fn wait_for_loading_buffer(
7849 mut receiver: postage::watch::Receiver<Option<Result<ModelHandle<Buffer>, Arc<anyhow::Error>>>>,
7850) -> Result<ModelHandle<Buffer>, Arc<anyhow::Error>> {
7851 loop {
7852 if let Some(result) = receiver.borrow().as_ref() {
7853 match result {
7854 Ok(buffer) => return Ok(buffer.to_owned()),
7855 Err(e) => return Err(e.to_owned()),
7856 }
7857 }
7858 receiver.next().await;
7859 }
7860}