1mod ignore;
2mod lsp_command;
3pub mod search;
4pub mod worktree;
5
6#[cfg(test)]
7mod project_tests;
8
9use anyhow::{anyhow, Context, Result};
10use client::{proto, Client, PeerId, TypedEnvelope, UserStore};
11use clock::ReplicaId;
12use collections::{hash_map, BTreeMap, HashMap, HashSet};
13use futures::{
14 channel::{mpsc, oneshot},
15 future::Shared,
16 AsyncWriteExt, Future, FutureExt, StreamExt, TryFutureExt,
17};
18
19use gpui::{
20 AnyModelHandle, AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle,
21 MutableAppContext, Task, UpgradeModelHandle, WeakModelHandle,
22};
23use language::{
24 point_to_lsp,
25 proto::{
26 deserialize_anchor, deserialize_line_ending, deserialize_version, serialize_anchor,
27 serialize_version,
28 },
29 range_from_lsp, range_to_lsp, Anchor, Bias, Buffer, CachedLspAdapter, CharKind, CodeAction,
30 CodeLabel, Completion, Diagnostic, DiagnosticEntry, DiagnosticSet, Event as BufferEvent,
31 File as _, Language, LanguageRegistry, LanguageServerName, LocalFile, OffsetRangeExt,
32 Operation, Patch, PointUtf16, TextBufferSnapshot, ToOffset, ToPointUtf16, Transaction,
33 Unclipped,
34};
35use lsp::{
36 DiagnosticSeverity, DiagnosticTag, DocumentHighlightKind, LanguageServer, LanguageString,
37 MarkedString,
38};
39use lsp_command::*;
40use parking_lot::Mutex;
41use postage::watch;
42use rand::prelude::*;
43use search::SearchQuery;
44use serde::Serialize;
45use settings::{FormatOnSave, Formatter, Settings};
46use sha2::{Digest, Sha256};
47use similar::{ChangeTag, TextDiff};
48use std::{
49 cell::RefCell,
50 cmp::{self, Ordering},
51 convert::TryInto,
52 hash::Hash,
53 mem,
54 num::NonZeroU32,
55 ops::Range,
56 path::{Component, Path, PathBuf},
57 rc::Rc,
58 str,
59 sync::{
60 atomic::{AtomicUsize, Ordering::SeqCst},
61 Arc,
62 },
63 time::Instant,
64};
65use terminal::{Terminal, TerminalBuilder};
66use thiserror::Error;
67use util::{defer, post_inc, ResultExt, TryFutureExt as _};
68
69pub use fs::*;
70pub use worktree::*;
71
72pub trait Item: Entity {
73 fn entry_id(&self, cx: &AppContext) -> Option<ProjectEntryId>;
74}
75
76// Language server state is stored across 3 collections:
77// language_servers =>
78// a mapping from unique server id to LanguageServerState which can either be a task for a
79// server in the process of starting, or a running server with adapter and language server arcs
80// language_server_ids => a mapping from worktreeId and server name to the unique server id
81// language_server_statuses => a mapping from unique server id to the current server status
82//
83// Multiple worktrees can map to the same language server for example when you jump to the definition
84// of a file in the standard library. So language_server_ids is used to look up which server is active
85// for a given worktree and language server name
86//
87// When starting a language server, first the id map is checked to make sure a server isn't already available
88// for that worktree. If there is one, it finishes early. Otherwise, a new id is allocated and and
89// the Starting variant of LanguageServerState is stored in the language_servers map.
90pub struct Project {
91 worktrees: Vec<WorktreeHandle>,
92 active_entry: Option<ProjectEntryId>,
93 languages: Arc<LanguageRegistry>,
94 language_servers: HashMap<usize, LanguageServerState>,
95 language_server_ids: HashMap<(WorktreeId, LanguageServerName), usize>,
96 language_server_statuses: BTreeMap<usize, LanguageServerStatus>,
97 language_server_settings: Arc<Mutex<serde_json::Value>>,
98 last_workspace_edits_by_language_server: HashMap<usize, ProjectTransaction>,
99 next_language_server_id: usize,
100 client: Arc<client::Client>,
101 next_entry_id: Arc<AtomicUsize>,
102 next_diagnostic_group_id: usize,
103 user_store: ModelHandle<UserStore>,
104 fs: Arc<dyn Fs>,
105 client_state: Option<ProjectClientState>,
106 collaborators: HashMap<PeerId, Collaborator>,
107 client_subscriptions: Vec<client::Subscription>,
108 _subscriptions: Vec<gpui::Subscription>,
109 opened_buffer: (watch::Sender<()>, watch::Receiver<()>),
110 shared_buffers: HashMap<PeerId, HashSet<u64>>,
111 #[allow(clippy::type_complexity)]
112 loading_buffers: HashMap<
113 ProjectPath,
114 postage::watch::Receiver<Option<Result<ModelHandle<Buffer>, Arc<anyhow::Error>>>>,
115 >,
116 #[allow(clippy::type_complexity)]
117 loading_local_worktrees:
118 HashMap<Arc<Path>, Shared<Task<Result<ModelHandle<Worktree>, Arc<anyhow::Error>>>>>,
119 opened_buffers: HashMap<u64, OpenBuffer>,
120 incomplete_buffers: HashMap<u64, ModelHandle<Buffer>>,
121 buffer_snapshots: HashMap<u64, Vec<(i32, TextBufferSnapshot)>>,
122 buffers_being_formatted: HashSet<usize>,
123 nonce: u128,
124 _maintain_buffer_languages: Task<()>,
125}
126
127#[derive(Error, Debug)]
128pub enum JoinProjectError {
129 #[error("host declined join request")]
130 HostDeclined,
131 #[error("host closed the project")]
132 HostClosedProject,
133 #[error("host went offline")]
134 HostWentOffline,
135 #[error("{0}")]
136 Other(#[from] anyhow::Error),
137}
138
139enum OpenBuffer {
140 Strong(ModelHandle<Buffer>),
141 Weak(WeakModelHandle<Buffer>),
142 Operations(Vec<Operation>),
143}
144
145enum WorktreeHandle {
146 Strong(ModelHandle<Worktree>),
147 Weak(WeakModelHandle<Worktree>),
148}
149
150enum ProjectClientState {
151 Local {
152 remote_id: u64,
153 metadata_changed: mpsc::UnboundedSender<oneshot::Sender<()>>,
154 _maintain_metadata: Task<()>,
155 _detect_unshare: Task<Option<()>>,
156 },
157 Remote {
158 sharing_has_stopped: bool,
159 remote_id: u64,
160 replica_id: ReplicaId,
161 _detect_unshare: Task<Option<()>>,
162 },
163}
164
165#[derive(Clone, Debug)]
166pub struct Collaborator {
167 pub peer_id: PeerId,
168 pub replica_id: ReplicaId,
169}
170
171#[derive(Clone, Debug, PartialEq, Eq)]
172pub enum Event {
173 ActiveEntryChanged(Option<ProjectEntryId>),
174 WorktreeAdded,
175 WorktreeRemoved(WorktreeId),
176 DiskBasedDiagnosticsStarted {
177 language_server_id: usize,
178 },
179 DiskBasedDiagnosticsFinished {
180 language_server_id: usize,
181 },
182 DiagnosticsUpdated {
183 path: ProjectPath,
184 language_server_id: usize,
185 },
186 RemoteIdChanged(Option<u64>),
187 DisconnectedFromHost,
188 CollaboratorLeft(PeerId),
189}
190
191pub enum LanguageServerState {
192 Starting(Task<Option<Arc<LanguageServer>>>),
193 Running {
194 language: Arc<Language>,
195 adapter: Arc<CachedLspAdapter>,
196 server: Arc<LanguageServer>,
197 },
198}
199
200#[derive(Serialize)]
201pub struct LanguageServerStatus {
202 pub name: String,
203 pub pending_work: BTreeMap<String, LanguageServerProgress>,
204 pub has_pending_diagnostic_updates: bool,
205 progress_tokens: HashSet<String>,
206}
207
208#[derive(Clone, Debug, Serialize)]
209pub struct LanguageServerProgress {
210 pub message: Option<String>,
211 pub percentage: Option<usize>,
212 #[serde(skip_serializing)]
213 pub last_update_at: Instant,
214}
215
216#[derive(Clone, Debug, Eq, PartialEq, Hash, PartialOrd, Ord)]
217pub struct ProjectPath {
218 pub worktree_id: WorktreeId,
219 pub path: Arc<Path>,
220}
221
222#[derive(Copy, Clone, Debug, Default, PartialEq, Serialize)]
223pub struct DiagnosticSummary {
224 pub language_server_id: usize,
225 pub error_count: usize,
226 pub warning_count: usize,
227}
228
229#[derive(Debug, Clone)]
230pub struct Location {
231 pub buffer: ModelHandle<Buffer>,
232 pub range: Range<language::Anchor>,
233}
234
235#[derive(Debug, Clone)]
236pub struct LocationLink {
237 pub origin: Option<Location>,
238 pub target: Location,
239}
240
241#[derive(Debug)]
242pub struct DocumentHighlight {
243 pub range: Range<language::Anchor>,
244 pub kind: DocumentHighlightKind,
245}
246
247#[derive(Clone, Debug)]
248pub struct Symbol {
249 pub language_server_name: LanguageServerName,
250 pub source_worktree_id: WorktreeId,
251 pub path: ProjectPath,
252 pub label: CodeLabel,
253 pub name: String,
254 pub kind: lsp::SymbolKind,
255 pub range: Range<Unclipped<PointUtf16>>,
256 pub signature: [u8; 32],
257}
258
259#[derive(Clone, Debug, PartialEq)]
260pub struct HoverBlock {
261 pub text: String,
262 pub language: Option<String>,
263}
264
265impl HoverBlock {
266 fn try_new(marked_string: MarkedString) -> Option<Self> {
267 let result = match marked_string {
268 MarkedString::LanguageString(LanguageString { language, value }) => HoverBlock {
269 text: value,
270 language: Some(language),
271 },
272 MarkedString::String(text) => HoverBlock {
273 text,
274 language: None,
275 },
276 };
277 if result.text.is_empty() {
278 None
279 } else {
280 Some(result)
281 }
282 }
283}
284
285#[derive(Debug)]
286pub struct Hover {
287 pub contents: Vec<HoverBlock>,
288 pub range: Option<Range<language::Anchor>>,
289}
290
291#[derive(Default)]
292pub struct ProjectTransaction(pub HashMap<ModelHandle<Buffer>, language::Transaction>);
293
294impl DiagnosticSummary {
295 fn new<'a, T: 'a>(
296 language_server_id: usize,
297 diagnostics: impl IntoIterator<Item = &'a DiagnosticEntry<T>>,
298 ) -> Self {
299 let mut this = Self {
300 language_server_id,
301 error_count: 0,
302 warning_count: 0,
303 };
304
305 for entry in diagnostics {
306 if entry.diagnostic.is_primary {
307 match entry.diagnostic.severity {
308 DiagnosticSeverity::ERROR => this.error_count += 1,
309 DiagnosticSeverity::WARNING => this.warning_count += 1,
310 _ => {}
311 }
312 }
313 }
314
315 this
316 }
317
318 pub fn is_empty(&self) -> bool {
319 self.error_count == 0 && self.warning_count == 0
320 }
321
322 pub fn to_proto(&self, path: &Path) -> proto::DiagnosticSummary {
323 proto::DiagnosticSummary {
324 path: path.to_string_lossy().to_string(),
325 language_server_id: self.language_server_id as u64,
326 error_count: self.error_count as u32,
327 warning_count: self.warning_count as u32,
328 }
329 }
330}
331
332#[derive(Clone, Copy, Debug, Default, Hash, PartialEq, Eq, PartialOrd, Ord)]
333pub struct ProjectEntryId(usize);
334
335impl ProjectEntryId {
336 pub const MAX: Self = Self(usize::MAX);
337
338 pub fn new(counter: &AtomicUsize) -> Self {
339 Self(counter.fetch_add(1, SeqCst))
340 }
341
342 pub fn from_proto(id: u64) -> Self {
343 Self(id as usize)
344 }
345
346 pub fn to_proto(&self) -> u64 {
347 self.0 as u64
348 }
349
350 pub fn to_usize(&self) -> usize {
351 self.0
352 }
353}
354
355#[derive(Debug, Clone, Copy, PartialEq, Eq)]
356pub enum FormatTrigger {
357 Save,
358 Manual,
359}
360
361impl FormatTrigger {
362 fn from_proto(value: i32) -> FormatTrigger {
363 match value {
364 0 => FormatTrigger::Save,
365 1 => FormatTrigger::Manual,
366 _ => FormatTrigger::Save,
367 }
368 }
369}
370
371impl Project {
372 pub fn init(client: &Arc<Client>) {
373 client.add_model_message_handler(Self::handle_add_collaborator);
374 client.add_model_message_handler(Self::handle_buffer_reloaded);
375 client.add_model_message_handler(Self::handle_buffer_saved);
376 client.add_model_message_handler(Self::handle_start_language_server);
377 client.add_model_message_handler(Self::handle_update_language_server);
378 client.add_model_message_handler(Self::handle_remove_collaborator);
379 client.add_model_message_handler(Self::handle_update_project);
380 client.add_model_message_handler(Self::handle_unshare_project);
381 client.add_model_message_handler(Self::handle_create_buffer_for_peer);
382 client.add_model_message_handler(Self::handle_update_buffer_file);
383 client.add_model_message_handler(Self::handle_update_buffer);
384 client.add_model_message_handler(Self::handle_update_diagnostic_summary);
385 client.add_model_message_handler(Self::handle_update_worktree);
386 client.add_model_request_handler(Self::handle_create_project_entry);
387 client.add_model_request_handler(Self::handle_rename_project_entry);
388 client.add_model_request_handler(Self::handle_copy_project_entry);
389 client.add_model_request_handler(Self::handle_delete_project_entry);
390 client.add_model_request_handler(Self::handle_apply_additional_edits_for_completion);
391 client.add_model_request_handler(Self::handle_apply_code_action);
392 client.add_model_request_handler(Self::handle_reload_buffers);
393 client.add_model_request_handler(Self::handle_format_buffers);
394 client.add_model_request_handler(Self::handle_get_code_actions);
395 client.add_model_request_handler(Self::handle_get_completions);
396 client.add_model_request_handler(Self::handle_lsp_command::<GetHover>);
397 client.add_model_request_handler(Self::handle_lsp_command::<GetDefinition>);
398 client.add_model_request_handler(Self::handle_lsp_command::<GetTypeDefinition>);
399 client.add_model_request_handler(Self::handle_lsp_command::<GetDocumentHighlights>);
400 client.add_model_request_handler(Self::handle_lsp_command::<GetReferences>);
401 client.add_model_request_handler(Self::handle_lsp_command::<PrepareRename>);
402 client.add_model_request_handler(Self::handle_lsp_command::<PerformRename>);
403 client.add_model_request_handler(Self::handle_search_project);
404 client.add_model_request_handler(Self::handle_get_project_symbols);
405 client.add_model_request_handler(Self::handle_open_buffer_for_symbol);
406 client.add_model_request_handler(Self::handle_open_buffer_by_id);
407 client.add_model_request_handler(Self::handle_open_buffer_by_path);
408 client.add_model_request_handler(Self::handle_save_buffer);
409 client.add_model_message_handler(Self::handle_update_diff_base);
410 }
411
412 pub fn local(
413 client: Arc<Client>,
414 user_store: ModelHandle<UserStore>,
415 languages: Arc<LanguageRegistry>,
416 fs: Arc<dyn Fs>,
417 cx: &mut MutableAppContext,
418 ) -> ModelHandle<Self> {
419 cx.add_model(|cx: &mut ModelContext<Self>| Self {
420 worktrees: Default::default(),
421 collaborators: Default::default(),
422 opened_buffers: Default::default(),
423 shared_buffers: Default::default(),
424 incomplete_buffers: Default::default(),
425 loading_buffers: Default::default(),
426 loading_local_worktrees: Default::default(),
427 buffer_snapshots: Default::default(),
428 client_state: None,
429 opened_buffer: watch::channel(),
430 client_subscriptions: Vec::new(),
431 _subscriptions: vec![cx.observe_global::<Settings, _>(Self::on_settings_changed)],
432 _maintain_buffer_languages: Self::maintain_buffer_languages(&languages, cx),
433 active_entry: None,
434 languages,
435 client,
436 user_store,
437 fs,
438 next_entry_id: Default::default(),
439 next_diagnostic_group_id: Default::default(),
440 language_servers: Default::default(),
441 language_server_ids: Default::default(),
442 language_server_statuses: Default::default(),
443 last_workspace_edits_by_language_server: Default::default(),
444 language_server_settings: Default::default(),
445 buffers_being_formatted: Default::default(),
446 next_language_server_id: 0,
447 nonce: StdRng::from_entropy().gen(),
448 })
449 }
450
451 pub async fn remote(
452 remote_id: u64,
453 client: Arc<Client>,
454 user_store: ModelHandle<UserStore>,
455 languages: Arc<LanguageRegistry>,
456 fs: Arc<dyn Fs>,
457 mut cx: AsyncAppContext,
458 ) -> Result<ModelHandle<Self>, JoinProjectError> {
459 client.authenticate_and_connect(true, &cx).await?;
460
461 let subscription = client.subscribe_to_entity(remote_id);
462 let response = client
463 .request(proto::JoinProject {
464 project_id: remote_id,
465 })
466 .await?;
467 let this = cx.add_model(|cx| {
468 let replica_id = response.replica_id as ReplicaId;
469
470 let mut worktrees = Vec::new();
471 for worktree in response.worktrees {
472 let worktree = cx.update(|cx| {
473 Worktree::remote(remote_id, replica_id, worktree, client.clone(), cx)
474 });
475 worktrees.push(worktree);
476 }
477
478 let mut this = Self {
479 worktrees: Vec::new(),
480 loading_buffers: Default::default(),
481 opened_buffer: watch::channel(),
482 shared_buffers: Default::default(),
483 incomplete_buffers: Default::default(),
484 loading_local_worktrees: Default::default(),
485 active_entry: None,
486 collaborators: Default::default(),
487 _maintain_buffer_languages: Self::maintain_buffer_languages(&languages, cx),
488 languages,
489 user_store: user_store.clone(),
490 fs,
491 next_entry_id: Default::default(),
492 next_diagnostic_group_id: Default::default(),
493 client_subscriptions: Default::default(),
494 _subscriptions: Default::default(),
495 client: client.clone(),
496 client_state: Some(ProjectClientState::Remote {
497 sharing_has_stopped: false,
498 remote_id,
499 replica_id,
500 _detect_unshare: cx.spawn_weak(move |this, mut cx| {
501 async move {
502 let mut status = client.status();
503 let is_connected =
504 status.next().await.map_or(false, |s| s.is_connected());
505 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
506 if !is_connected || status.next().await.is_some() {
507 if let Some(this) = this.upgrade(&cx) {
508 this.update(&mut cx, |this, cx| this.disconnected_from_host(cx))
509 }
510 }
511 Ok(())
512 }
513 .log_err()
514 }),
515 }),
516 language_servers: Default::default(),
517 language_server_ids: Default::default(),
518 language_server_settings: Default::default(),
519 language_server_statuses: response
520 .language_servers
521 .into_iter()
522 .map(|server| {
523 (
524 server.id as usize,
525 LanguageServerStatus {
526 name: server.name,
527 pending_work: Default::default(),
528 has_pending_diagnostic_updates: false,
529 progress_tokens: Default::default(),
530 },
531 )
532 })
533 .collect(),
534 last_workspace_edits_by_language_server: Default::default(),
535 next_language_server_id: 0,
536 opened_buffers: Default::default(),
537 buffers_being_formatted: Default::default(),
538 buffer_snapshots: Default::default(),
539 nonce: StdRng::from_entropy().gen(),
540 };
541 for worktree in worktrees {
542 let _ = this.add_worktree(&worktree, cx);
543 }
544 this
545 });
546 let subscription = subscription.set_model(&this, &mut cx);
547
548 let user_ids = response
549 .collaborators
550 .iter()
551 .map(|peer| peer.user_id)
552 .collect();
553 user_store
554 .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))
555 .await?;
556 let mut collaborators = HashMap::default();
557 for message in response.collaborators {
558 let collaborator = Collaborator::from_proto(message);
559 collaborators.insert(collaborator.peer_id, collaborator);
560 }
561
562 this.update(&mut cx, |this, _| {
563 this.collaborators = collaborators;
564 this.client_subscriptions.push(subscription);
565 });
566
567 Ok(this)
568 }
569
570 #[cfg(any(test, feature = "test-support"))]
571 pub async fn test(
572 fs: Arc<dyn Fs>,
573 root_paths: impl IntoIterator<Item = &Path>,
574 cx: &mut gpui::TestAppContext,
575 ) -> ModelHandle<Project> {
576 if !cx.read(|cx| cx.has_global::<Settings>()) {
577 cx.update(|cx| {
578 cx.set_global(Settings::test(cx));
579 cx.set_global(HomeDir(Path::new("/tmp/").to_path_buf()))
580 });
581 }
582
583 let languages = Arc::new(LanguageRegistry::test());
584 let http_client = client::test::FakeHttpClient::with_404_response();
585 let client = cx.update(|cx| client::Client::new(http_client.clone(), cx));
586 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
587 let project = cx.update(|cx| Project::local(client, user_store, languages, fs, cx));
588 for path in root_paths {
589 let (tree, _) = project
590 .update(cx, |project, cx| {
591 project.find_or_create_local_worktree(path, true, cx)
592 })
593 .await
594 .unwrap();
595 tree.read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
596 .await;
597 }
598 project
599 }
600
601 fn on_settings_changed(&mut self, cx: &mut ModelContext<Self>) {
602 let settings = cx.global::<Settings>();
603
604 let mut language_servers_to_start = Vec::new();
605 for buffer in self.opened_buffers.values() {
606 if let Some(buffer) = buffer.upgrade(cx) {
607 let buffer = buffer.read(cx);
608 if let Some((file, language)) = File::from_dyn(buffer.file()).zip(buffer.language())
609 {
610 if settings.enable_language_server(Some(&language.name())) {
611 let worktree = file.worktree.read(cx);
612 language_servers_to_start.push((
613 worktree.id(),
614 worktree.as_local().unwrap().abs_path().clone(),
615 language.clone(),
616 ));
617 }
618 }
619 }
620 }
621
622 let mut language_servers_to_stop = Vec::new();
623 for language in self.languages.to_vec() {
624 if let Some(lsp_adapter) = language.lsp_adapter() {
625 if !settings.enable_language_server(Some(&language.name())) {
626 let lsp_name = &lsp_adapter.name;
627 for (worktree_id, started_lsp_name) in self.language_server_ids.keys() {
628 if lsp_name == started_lsp_name {
629 language_servers_to_stop.push((*worktree_id, started_lsp_name.clone()));
630 }
631 }
632 }
633 }
634 }
635
636 // Stop all newly-disabled language servers.
637 for (worktree_id, adapter_name) in language_servers_to_stop {
638 self.stop_language_server(worktree_id, adapter_name, cx)
639 .detach();
640 }
641
642 // Start all the newly-enabled language servers.
643 for (worktree_id, worktree_path, language) in language_servers_to_start {
644 self.start_language_server(worktree_id, worktree_path, language, cx);
645 }
646
647 cx.notify();
648 }
649
650 pub fn buffer_for_id(&self, remote_id: u64, cx: &AppContext) -> Option<ModelHandle<Buffer>> {
651 self.opened_buffers
652 .get(&remote_id)
653 .and_then(|buffer| buffer.upgrade(cx))
654 }
655
656 pub fn languages(&self) -> &Arc<LanguageRegistry> {
657 &self.languages
658 }
659
660 pub fn client(&self) -> Arc<Client> {
661 self.client.clone()
662 }
663
664 pub fn user_store(&self) -> ModelHandle<UserStore> {
665 self.user_store.clone()
666 }
667
668 #[cfg(any(test, feature = "test-support"))]
669 pub fn check_invariants(&self, cx: &AppContext) {
670 if self.is_local() {
671 let mut worktree_root_paths = HashMap::default();
672 for worktree in self.worktrees(cx) {
673 let worktree = worktree.read(cx);
674 let abs_path = worktree.as_local().unwrap().abs_path().clone();
675 let prev_worktree_id = worktree_root_paths.insert(abs_path.clone(), worktree.id());
676 assert_eq!(
677 prev_worktree_id,
678 None,
679 "abs path {:?} for worktree {:?} is not unique ({:?} was already registered with the same path)",
680 abs_path,
681 worktree.id(),
682 prev_worktree_id
683 )
684 }
685 } else {
686 let replica_id = self.replica_id();
687 for buffer in self.opened_buffers.values() {
688 if let Some(buffer) = buffer.upgrade(cx) {
689 let buffer = buffer.read(cx);
690 assert_eq!(
691 buffer.deferred_ops_len(),
692 0,
693 "replica {}, buffer {} has deferred operations",
694 replica_id,
695 buffer.remote_id()
696 );
697 }
698 }
699 }
700 }
701
702 #[cfg(any(test, feature = "test-support"))]
703 pub fn has_open_buffer(&self, path: impl Into<ProjectPath>, cx: &AppContext) -> bool {
704 let path = path.into();
705 if let Some(worktree) = self.worktree_for_id(path.worktree_id, cx) {
706 self.opened_buffers.iter().any(|(_, buffer)| {
707 if let Some(buffer) = buffer.upgrade(cx) {
708 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
709 if file.worktree == worktree && file.path() == &path.path {
710 return true;
711 }
712 }
713 }
714 false
715 })
716 } else {
717 false
718 }
719 }
720
721 pub fn fs(&self) -> &Arc<dyn Fs> {
722 &self.fs
723 }
724
725 pub fn remote_id(&self) -> Option<u64> {
726 match self.client_state.as_ref()? {
727 ProjectClientState::Local { remote_id, .. }
728 | ProjectClientState::Remote { remote_id, .. } => Some(*remote_id),
729 }
730 }
731
732 pub fn replica_id(&self) -> ReplicaId {
733 match &self.client_state {
734 Some(ProjectClientState::Remote { replica_id, .. }) => *replica_id,
735 _ => 0,
736 }
737 }
738
739 fn metadata_changed(&mut self, cx: &mut ModelContext<Self>) -> impl Future<Output = ()> {
740 let (tx, rx) = oneshot::channel();
741 if let Some(ProjectClientState::Local {
742 metadata_changed, ..
743 }) = &mut self.client_state
744 {
745 let _ = metadata_changed.unbounded_send(tx);
746 }
747 cx.notify();
748
749 async move {
750 // If the project is shared, this will resolve when the `_maintain_metadata` task has
751 // a chance to update the metadata. Otherwise, it will resolve right away because `tx`
752 // will get dropped.
753 let _ = rx.await;
754 }
755 }
756
757 pub fn collaborators(&self) -> &HashMap<PeerId, Collaborator> {
758 &self.collaborators
759 }
760
761 /// Collect all worktrees, including ones that don't appear in the project panel
762 pub fn worktrees<'a>(
763 &'a self,
764 cx: &'a AppContext,
765 ) -> impl 'a + DoubleEndedIterator<Item = ModelHandle<Worktree>> {
766 self.worktrees
767 .iter()
768 .filter_map(move |worktree| worktree.upgrade(cx))
769 }
770
771 /// Collect all user-visible worktrees, the ones that appear in the project panel
772 pub fn visible_worktrees<'a>(
773 &'a self,
774 cx: &'a AppContext,
775 ) -> impl 'a + DoubleEndedIterator<Item = ModelHandle<Worktree>> {
776 self.worktrees.iter().filter_map(|worktree| {
777 worktree.upgrade(cx).and_then(|worktree| {
778 if worktree.read(cx).is_visible() {
779 Some(worktree)
780 } else {
781 None
782 }
783 })
784 })
785 }
786
787 pub fn worktree_root_names<'a>(&'a self, cx: &'a AppContext) -> impl Iterator<Item = &'a str> {
788 self.visible_worktrees(cx)
789 .map(|tree| tree.read(cx).root_name())
790 }
791
792 pub fn worktree_for_id(
793 &self,
794 id: WorktreeId,
795 cx: &AppContext,
796 ) -> Option<ModelHandle<Worktree>> {
797 self.worktrees(cx)
798 .find(|worktree| worktree.read(cx).id() == id)
799 }
800
801 pub fn worktree_for_entry(
802 &self,
803 entry_id: ProjectEntryId,
804 cx: &AppContext,
805 ) -> Option<ModelHandle<Worktree>> {
806 self.worktrees(cx)
807 .find(|worktree| worktree.read(cx).contains_entry(entry_id))
808 }
809
810 pub fn worktree_id_for_entry(
811 &self,
812 entry_id: ProjectEntryId,
813 cx: &AppContext,
814 ) -> Option<WorktreeId> {
815 self.worktree_for_entry(entry_id, cx)
816 .map(|worktree| worktree.read(cx).id())
817 }
818
819 pub fn contains_paths(&self, paths: &[PathBuf], cx: &AppContext) -> bool {
820 paths.iter().all(|path| self.contains_path(path, cx))
821 }
822
823 pub fn contains_path(&self, path: &Path, cx: &AppContext) -> bool {
824 for worktree in self.worktrees(cx) {
825 let worktree = worktree.read(cx).as_local();
826 if worktree.map_or(false, |w| w.contains_abs_path(path)) {
827 return true;
828 }
829 }
830 false
831 }
832
833 pub fn create_entry(
834 &mut self,
835 project_path: impl Into<ProjectPath>,
836 is_directory: bool,
837 cx: &mut ModelContext<Self>,
838 ) -> Option<Task<Result<Entry>>> {
839 let project_path = project_path.into();
840 let worktree = self.worktree_for_id(project_path.worktree_id, cx)?;
841 if self.is_local() {
842 Some(worktree.update(cx, |worktree, cx| {
843 worktree
844 .as_local_mut()
845 .unwrap()
846 .create_entry(project_path.path, is_directory, cx)
847 }))
848 } else {
849 let client = self.client.clone();
850 let project_id = self.remote_id().unwrap();
851 Some(cx.spawn_weak(|_, mut cx| async move {
852 let response = client
853 .request(proto::CreateProjectEntry {
854 worktree_id: project_path.worktree_id.to_proto(),
855 project_id,
856 path: project_path.path.to_string_lossy().into(),
857 is_directory,
858 })
859 .await?;
860 let entry = response
861 .entry
862 .ok_or_else(|| anyhow!("missing entry in response"))?;
863 worktree
864 .update(&mut cx, |worktree, cx| {
865 worktree.as_remote_mut().unwrap().insert_entry(
866 entry,
867 response.worktree_scan_id as usize,
868 cx,
869 )
870 })
871 .await
872 }))
873 }
874 }
875
876 pub fn copy_entry(
877 &mut self,
878 entry_id: ProjectEntryId,
879 new_path: impl Into<Arc<Path>>,
880 cx: &mut ModelContext<Self>,
881 ) -> Option<Task<Result<Entry>>> {
882 let worktree = self.worktree_for_entry(entry_id, cx)?;
883 let new_path = new_path.into();
884 if self.is_local() {
885 worktree.update(cx, |worktree, cx| {
886 worktree
887 .as_local_mut()
888 .unwrap()
889 .copy_entry(entry_id, new_path, cx)
890 })
891 } else {
892 let client = self.client.clone();
893 let project_id = self.remote_id().unwrap();
894
895 Some(cx.spawn_weak(|_, mut cx| async move {
896 let response = client
897 .request(proto::CopyProjectEntry {
898 project_id,
899 entry_id: entry_id.to_proto(),
900 new_path: new_path.to_string_lossy().into(),
901 })
902 .await?;
903 let entry = response
904 .entry
905 .ok_or_else(|| anyhow!("missing entry in response"))?;
906 worktree
907 .update(&mut cx, |worktree, cx| {
908 worktree.as_remote_mut().unwrap().insert_entry(
909 entry,
910 response.worktree_scan_id as usize,
911 cx,
912 )
913 })
914 .await
915 }))
916 }
917 }
918
919 pub fn rename_entry(
920 &mut self,
921 entry_id: ProjectEntryId,
922 new_path: impl Into<Arc<Path>>,
923 cx: &mut ModelContext<Self>,
924 ) -> Option<Task<Result<Entry>>> {
925 let worktree = self.worktree_for_entry(entry_id, cx)?;
926 let new_path = new_path.into();
927 if self.is_local() {
928 worktree.update(cx, |worktree, cx| {
929 worktree
930 .as_local_mut()
931 .unwrap()
932 .rename_entry(entry_id, new_path, cx)
933 })
934 } else {
935 let client = self.client.clone();
936 let project_id = self.remote_id().unwrap();
937
938 Some(cx.spawn_weak(|_, mut cx| async move {
939 let response = client
940 .request(proto::RenameProjectEntry {
941 project_id,
942 entry_id: entry_id.to_proto(),
943 new_path: new_path.to_string_lossy().into(),
944 })
945 .await?;
946 let entry = response
947 .entry
948 .ok_or_else(|| anyhow!("missing entry in response"))?;
949 worktree
950 .update(&mut cx, |worktree, cx| {
951 worktree.as_remote_mut().unwrap().insert_entry(
952 entry,
953 response.worktree_scan_id as usize,
954 cx,
955 )
956 })
957 .await
958 }))
959 }
960 }
961
962 pub fn delete_entry(
963 &mut self,
964 entry_id: ProjectEntryId,
965 cx: &mut ModelContext<Self>,
966 ) -> Option<Task<Result<()>>> {
967 let worktree = self.worktree_for_entry(entry_id, cx)?;
968 if self.is_local() {
969 worktree.update(cx, |worktree, cx| {
970 worktree.as_local_mut().unwrap().delete_entry(entry_id, cx)
971 })
972 } else {
973 let client = self.client.clone();
974 let project_id = self.remote_id().unwrap();
975 Some(cx.spawn_weak(|_, mut cx| async move {
976 let response = client
977 .request(proto::DeleteProjectEntry {
978 project_id,
979 entry_id: entry_id.to_proto(),
980 })
981 .await?;
982 worktree
983 .update(&mut cx, move |worktree, cx| {
984 worktree.as_remote_mut().unwrap().delete_entry(
985 entry_id,
986 response.worktree_scan_id as usize,
987 cx,
988 )
989 })
990 .await
991 }))
992 }
993 }
994
995 pub fn shared(&mut self, project_id: u64, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
996 if self.client_state.is_some() {
997 return Task::ready(Err(anyhow!("project was already shared")));
998 }
999
1000 let mut worktree_share_tasks = Vec::new();
1001
1002 for open_buffer in self.opened_buffers.values_mut() {
1003 match open_buffer {
1004 OpenBuffer::Strong(_) => {}
1005 OpenBuffer::Weak(buffer) => {
1006 if let Some(buffer) = buffer.upgrade(cx) {
1007 *open_buffer = OpenBuffer::Strong(buffer);
1008 }
1009 }
1010 OpenBuffer::Operations(_) => unreachable!(),
1011 }
1012 }
1013
1014 for worktree_handle in self.worktrees.iter_mut() {
1015 match worktree_handle {
1016 WorktreeHandle::Strong(_) => {}
1017 WorktreeHandle::Weak(worktree) => {
1018 if let Some(worktree) = worktree.upgrade(cx) {
1019 *worktree_handle = WorktreeHandle::Strong(worktree);
1020 }
1021 }
1022 }
1023 }
1024
1025 for (server_id, status) in &self.language_server_statuses {
1026 self.client
1027 .send(proto::StartLanguageServer {
1028 project_id,
1029 server: Some(proto::LanguageServer {
1030 id: *server_id as u64,
1031 name: status.name.clone(),
1032 }),
1033 })
1034 .log_err();
1035 }
1036
1037 for worktree in self.worktrees(cx).collect::<Vec<_>>() {
1038 worktree.update(cx, |worktree, cx| {
1039 let worktree = worktree.as_local_mut().unwrap();
1040 worktree_share_tasks.push(worktree.share(project_id, cx));
1041 });
1042 }
1043
1044 self.client_subscriptions.push(
1045 self.client
1046 .subscribe_to_entity(project_id)
1047 .set_model(&cx.handle(), &mut cx.to_async()),
1048 );
1049 let _ = self.metadata_changed(cx);
1050 cx.emit(Event::RemoteIdChanged(Some(project_id)));
1051 cx.notify();
1052
1053 let mut status = self.client.status();
1054 let (metadata_changed_tx, mut metadata_changed_rx) = mpsc::unbounded();
1055 self.client_state = Some(ProjectClientState::Local {
1056 remote_id: project_id,
1057 metadata_changed: metadata_changed_tx,
1058 _maintain_metadata: cx.spawn_weak(move |this, cx| async move {
1059 while let Some(tx) = metadata_changed_rx.next().await {
1060 let mut txs = vec![tx];
1061 while let Ok(Some(next_tx)) = metadata_changed_rx.try_next() {
1062 txs.push(next_tx);
1063 }
1064
1065 let Some(this) = this.upgrade(&cx) else { break };
1066 this.read_with(&cx, |this, cx| {
1067 let worktrees = this
1068 .worktrees
1069 .iter()
1070 .filter_map(|worktree| {
1071 worktree.upgrade(cx).map(|worktree| {
1072 worktree.read(cx).as_local().unwrap().metadata_proto()
1073 })
1074 })
1075 .collect();
1076 this.client.request(proto::UpdateProject {
1077 project_id,
1078 worktrees,
1079 })
1080 })
1081 .await
1082 .log_err();
1083
1084 for tx in txs {
1085 let _ = tx.send(());
1086 }
1087 }
1088 }),
1089 _detect_unshare: cx.spawn_weak(move |this, mut cx| {
1090 async move {
1091 let is_connected = status.next().await.map_or(false, |s| s.is_connected());
1092 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
1093 if !is_connected || status.next().await.is_some() {
1094 if let Some(this) = this.upgrade(&cx) {
1095 let _ = this.update(&mut cx, |this, cx| this.unshare(cx));
1096 }
1097 }
1098 Ok(())
1099 }
1100 .log_err()
1101 }),
1102 });
1103
1104 cx.foreground().spawn(async move {
1105 futures::future::try_join_all(worktree_share_tasks).await?;
1106 Ok(())
1107 })
1108 }
1109
1110 pub fn unshare(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1111 if self.is_remote() {
1112 return Err(anyhow!("attempted to unshare a remote project"));
1113 }
1114
1115 if let Some(ProjectClientState::Local { remote_id, .. }) = self.client_state.take() {
1116 self.collaborators.clear();
1117 self.shared_buffers.clear();
1118 self.client_subscriptions.clear();
1119
1120 for worktree_handle in self.worktrees.iter_mut() {
1121 if let WorktreeHandle::Strong(worktree) = worktree_handle {
1122 let is_visible = worktree.update(cx, |worktree, _| {
1123 worktree.as_local_mut().unwrap().unshare();
1124 worktree.is_visible()
1125 });
1126 if !is_visible {
1127 *worktree_handle = WorktreeHandle::Weak(worktree.downgrade());
1128 }
1129 }
1130 }
1131
1132 for open_buffer in self.opened_buffers.values_mut() {
1133 if let OpenBuffer::Strong(buffer) = open_buffer {
1134 *open_buffer = OpenBuffer::Weak(buffer.downgrade());
1135 }
1136 }
1137
1138 let _ = self.metadata_changed(cx);
1139 cx.notify();
1140 self.client.send(proto::UnshareProject {
1141 project_id: remote_id,
1142 })?;
1143
1144 Ok(())
1145 } else {
1146 Err(anyhow!("attempted to unshare an unshared project"))
1147 }
1148 }
1149
1150 fn disconnected_from_host(&mut self, cx: &mut ModelContext<Self>) {
1151 if let Some(ProjectClientState::Remote {
1152 sharing_has_stopped,
1153 ..
1154 }) = &mut self.client_state
1155 {
1156 *sharing_has_stopped = true;
1157 self.collaborators.clear();
1158 for worktree in &self.worktrees {
1159 if let Some(worktree) = worktree.upgrade(cx) {
1160 worktree.update(cx, |worktree, _| {
1161 if let Some(worktree) = worktree.as_remote_mut() {
1162 worktree.disconnected_from_host();
1163 }
1164 });
1165 }
1166 }
1167 cx.emit(Event::DisconnectedFromHost);
1168 cx.notify();
1169
1170 // Wake up all futures currently waiting on a buffer to get opened,
1171 // to give them a chance to fail now that we've disconnected.
1172 *self.opened_buffer.0.borrow_mut() = ();
1173 }
1174 }
1175
1176 pub fn is_read_only(&self) -> bool {
1177 match &self.client_state {
1178 Some(ProjectClientState::Remote {
1179 sharing_has_stopped,
1180 ..
1181 }) => *sharing_has_stopped,
1182 _ => false,
1183 }
1184 }
1185
1186 pub fn is_local(&self) -> bool {
1187 match &self.client_state {
1188 Some(ProjectClientState::Remote { .. }) => false,
1189 _ => true,
1190 }
1191 }
1192
1193 pub fn is_remote(&self) -> bool {
1194 !self.is_local()
1195 }
1196
1197 pub fn create_terminal(
1198 &mut self,
1199 working_directory: Option<PathBuf>,
1200 window_id: usize,
1201 cx: &mut ModelContext<Self>,
1202 ) -> Result<ModelHandle<Terminal>> {
1203 if self.is_remote() {
1204 return Err(anyhow!(
1205 "creating terminals as a guest is not supported yet"
1206 ));
1207 } else {
1208 let settings = cx.global::<Settings>();
1209 let shell = settings.terminal_shell();
1210 let envs = settings.terminal_env();
1211 let scroll = settings.terminal_scroll();
1212
1213 TerminalBuilder::new(
1214 working_directory.clone(),
1215 shell,
1216 envs,
1217 settings.terminal_overrides.blinking.clone(),
1218 scroll,
1219 window_id,
1220 )
1221 .map(|builder| cx.add_model(|cx| builder.subscribe(cx)))
1222 }
1223 }
1224
1225 pub fn create_buffer(
1226 &mut self,
1227 text: &str,
1228 language: Option<Arc<Language>>,
1229 cx: &mut ModelContext<Self>,
1230 ) -> Result<ModelHandle<Buffer>> {
1231 if self.is_remote() {
1232 return Err(anyhow!("creating buffers as a guest is not supported yet"));
1233 }
1234
1235 let buffer = cx.add_model(|cx| {
1236 Buffer::new(self.replica_id(), text, cx)
1237 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
1238 });
1239 self.register_buffer(&buffer, cx)?;
1240 Ok(buffer)
1241 }
1242
1243 pub fn open_path(
1244 &mut self,
1245 path: impl Into<ProjectPath>,
1246 cx: &mut ModelContext<Self>,
1247 ) -> Task<Result<(ProjectEntryId, AnyModelHandle)>> {
1248 let task = self.open_buffer(path, cx);
1249 cx.spawn_weak(|_, cx| async move {
1250 let buffer = task.await?;
1251 let project_entry_id = buffer
1252 .read_with(&cx, |buffer, cx| {
1253 File::from_dyn(buffer.file()).and_then(|file| file.project_entry_id(cx))
1254 })
1255 .ok_or_else(|| anyhow!("no project entry"))?;
1256 Ok((project_entry_id, buffer.into()))
1257 })
1258 }
1259
1260 pub fn open_local_buffer(
1261 &mut self,
1262 abs_path: impl AsRef<Path>,
1263 cx: &mut ModelContext<Self>,
1264 ) -> Task<Result<ModelHandle<Buffer>>> {
1265 if let Some((worktree, relative_path)) = self.find_local_worktree(abs_path.as_ref(), cx) {
1266 self.open_buffer((worktree.read(cx).id(), relative_path), cx)
1267 } else {
1268 Task::ready(Err(anyhow!("no such path")))
1269 }
1270 }
1271
1272 pub fn open_buffer(
1273 &mut self,
1274 path: impl Into<ProjectPath>,
1275 cx: &mut ModelContext<Self>,
1276 ) -> Task<Result<ModelHandle<Buffer>>> {
1277 let project_path = path.into();
1278 let worktree = if let Some(worktree) = self.worktree_for_id(project_path.worktree_id, cx) {
1279 worktree
1280 } else {
1281 return Task::ready(Err(anyhow!("no such worktree")));
1282 };
1283
1284 // If there is already a buffer for the given path, then return it.
1285 let existing_buffer = self.get_open_buffer(&project_path, cx);
1286 if let Some(existing_buffer) = existing_buffer {
1287 return Task::ready(Ok(existing_buffer));
1288 }
1289
1290 let mut loading_watch = match self.loading_buffers.entry(project_path.clone()) {
1291 // If the given path is already being loaded, then wait for that existing
1292 // task to complete and return the same buffer.
1293 hash_map::Entry::Occupied(e) => e.get().clone(),
1294
1295 // Otherwise, record the fact that this path is now being loaded.
1296 hash_map::Entry::Vacant(entry) => {
1297 let (mut tx, rx) = postage::watch::channel();
1298 entry.insert(rx.clone());
1299
1300 let load_buffer = if worktree.read(cx).is_local() {
1301 self.open_local_buffer_internal(&project_path.path, &worktree, cx)
1302 } else {
1303 self.open_remote_buffer_internal(&project_path.path, &worktree, cx)
1304 };
1305
1306 cx.spawn(move |this, mut cx| async move {
1307 let load_result = load_buffer.await;
1308 *tx.borrow_mut() = Some(this.update(&mut cx, |this, _| {
1309 // Record the fact that the buffer is no longer loading.
1310 this.loading_buffers.remove(&project_path);
1311 let buffer = load_result.map_err(Arc::new)?;
1312 Ok(buffer)
1313 }));
1314 })
1315 .detach();
1316 rx
1317 }
1318 };
1319
1320 cx.foreground().spawn(async move {
1321 loop {
1322 if let Some(result) = loading_watch.borrow().as_ref() {
1323 match result {
1324 Ok(buffer) => return Ok(buffer.clone()),
1325 Err(error) => return Err(anyhow!("{}", error)),
1326 }
1327 }
1328 loading_watch.next().await;
1329 }
1330 })
1331 }
1332
1333 fn open_local_buffer_internal(
1334 &mut self,
1335 path: &Arc<Path>,
1336 worktree: &ModelHandle<Worktree>,
1337 cx: &mut ModelContext<Self>,
1338 ) -> Task<Result<ModelHandle<Buffer>>> {
1339 let load_buffer = worktree.update(cx, |worktree, cx| {
1340 let worktree = worktree.as_local_mut().unwrap();
1341 worktree.load_buffer(path, cx)
1342 });
1343 cx.spawn(|this, mut cx| async move {
1344 let buffer = load_buffer.await?;
1345 this.update(&mut cx, |this, cx| this.register_buffer(&buffer, cx))?;
1346 Ok(buffer)
1347 })
1348 }
1349
1350 fn open_remote_buffer_internal(
1351 &mut self,
1352 path: &Arc<Path>,
1353 worktree: &ModelHandle<Worktree>,
1354 cx: &mut ModelContext<Self>,
1355 ) -> Task<Result<ModelHandle<Buffer>>> {
1356 let rpc = self.client.clone();
1357 let project_id = self.remote_id().unwrap();
1358 let remote_worktree_id = worktree.read(cx).id();
1359 let path = path.clone();
1360 let path_string = path.to_string_lossy().to_string();
1361 cx.spawn(|this, mut cx| async move {
1362 let response = rpc
1363 .request(proto::OpenBufferByPath {
1364 project_id,
1365 worktree_id: remote_worktree_id.to_proto(),
1366 path: path_string,
1367 })
1368 .await?;
1369 this.update(&mut cx, |this, cx| {
1370 this.wait_for_buffer(response.buffer_id, cx)
1371 })
1372 .await
1373 })
1374 }
1375
1376 /// LanguageServerName is owned, because it is inserted into a map
1377 fn open_local_buffer_via_lsp(
1378 &mut self,
1379 abs_path: lsp::Url,
1380 language_server_id: usize,
1381 language_server_name: LanguageServerName,
1382 cx: &mut ModelContext<Self>,
1383 ) -> Task<Result<ModelHandle<Buffer>>> {
1384 cx.spawn(|this, mut cx| async move {
1385 let abs_path = abs_path
1386 .to_file_path()
1387 .map_err(|_| anyhow!("can't convert URI to path"))?;
1388 let (worktree, relative_path) = if let Some(result) =
1389 this.read_with(&cx, |this, cx| this.find_local_worktree(&abs_path, cx))
1390 {
1391 result
1392 } else {
1393 let worktree = this
1394 .update(&mut cx, |this, cx| {
1395 this.create_local_worktree(&abs_path, false, cx)
1396 })
1397 .await?;
1398 this.update(&mut cx, |this, cx| {
1399 this.language_server_ids.insert(
1400 (worktree.read(cx).id(), language_server_name),
1401 language_server_id,
1402 );
1403 });
1404 (worktree, PathBuf::new())
1405 };
1406
1407 let project_path = ProjectPath {
1408 worktree_id: worktree.read_with(&cx, |worktree, _| worktree.id()),
1409 path: relative_path.into(),
1410 };
1411 this.update(&mut cx, |this, cx| this.open_buffer(project_path, cx))
1412 .await
1413 })
1414 }
1415
1416 pub fn open_buffer_by_id(
1417 &mut self,
1418 id: u64,
1419 cx: &mut ModelContext<Self>,
1420 ) -> Task<Result<ModelHandle<Buffer>>> {
1421 if let Some(buffer) = self.buffer_for_id(id, cx) {
1422 Task::ready(Ok(buffer))
1423 } else if self.is_local() {
1424 Task::ready(Err(anyhow!("buffer {} does not exist", id)))
1425 } else if let Some(project_id) = self.remote_id() {
1426 let request = self
1427 .client
1428 .request(proto::OpenBufferById { project_id, id });
1429 cx.spawn(|this, mut cx| async move {
1430 let buffer_id = request.await?.buffer_id;
1431 this.update(&mut cx, |this, cx| this.wait_for_buffer(buffer_id, cx))
1432 .await
1433 })
1434 } else {
1435 Task::ready(Err(anyhow!("cannot open buffer while disconnected")))
1436 }
1437 }
1438
1439 pub fn save_buffer_as(
1440 &mut self,
1441 buffer: ModelHandle<Buffer>,
1442 abs_path: PathBuf,
1443 cx: &mut ModelContext<Project>,
1444 ) -> Task<Result<()>> {
1445 let worktree_task = self.find_or_create_local_worktree(&abs_path, true, cx);
1446 let old_path =
1447 File::from_dyn(buffer.read(cx).file()).and_then(|f| Some(f.as_local()?.abs_path(cx)));
1448 cx.spawn(|this, mut cx| async move {
1449 if let Some(old_path) = old_path {
1450 this.update(&mut cx, |this, cx| {
1451 this.unregister_buffer_from_language_server(&buffer, old_path, cx);
1452 });
1453 }
1454 let (worktree, path) = worktree_task.await?;
1455 worktree
1456 .update(&mut cx, |worktree, cx| {
1457 worktree
1458 .as_local_mut()
1459 .unwrap()
1460 .save_buffer_as(buffer.clone(), path, cx)
1461 })
1462 .await?;
1463 this.update(&mut cx, |this, cx| {
1464 this.assign_language_to_buffer(&buffer, cx);
1465 this.register_buffer_with_language_server(&buffer, cx);
1466 });
1467 Ok(())
1468 })
1469 }
1470
1471 pub fn get_open_buffer(
1472 &mut self,
1473 path: &ProjectPath,
1474 cx: &mut ModelContext<Self>,
1475 ) -> Option<ModelHandle<Buffer>> {
1476 let worktree = self.worktree_for_id(path.worktree_id, cx)?;
1477 self.opened_buffers.values().find_map(|buffer| {
1478 let buffer = buffer.upgrade(cx)?;
1479 let file = File::from_dyn(buffer.read(cx).file())?;
1480 if file.worktree == worktree && file.path() == &path.path {
1481 Some(buffer)
1482 } else {
1483 None
1484 }
1485 })
1486 }
1487
1488 fn register_buffer(
1489 &mut self,
1490 buffer: &ModelHandle<Buffer>,
1491 cx: &mut ModelContext<Self>,
1492 ) -> Result<()> {
1493 let remote_id = buffer.read(cx).remote_id();
1494 let open_buffer = if self.is_remote() || self.is_shared() {
1495 OpenBuffer::Strong(buffer.clone())
1496 } else {
1497 OpenBuffer::Weak(buffer.downgrade())
1498 };
1499
1500 match self.opened_buffers.insert(remote_id, open_buffer) {
1501 None => {}
1502 Some(OpenBuffer::Operations(operations)) => {
1503 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))?
1504 }
1505 Some(OpenBuffer::Weak(existing_handle)) => {
1506 if existing_handle.upgrade(cx).is_some() {
1507 Err(anyhow!(
1508 "already registered buffer with remote id {}",
1509 remote_id
1510 ))?
1511 }
1512 }
1513 Some(OpenBuffer::Strong(_)) => Err(anyhow!(
1514 "already registered buffer with remote id {}",
1515 remote_id
1516 ))?,
1517 }
1518 cx.subscribe(buffer, |this, buffer, event, cx| {
1519 this.on_buffer_event(buffer, event, cx);
1520 })
1521 .detach();
1522
1523 self.assign_language_to_buffer(buffer, cx);
1524 self.register_buffer_with_language_server(buffer, cx);
1525 cx.observe_release(buffer, |this, buffer, cx| {
1526 if let Some(file) = File::from_dyn(buffer.file()) {
1527 if file.is_local() {
1528 let uri = lsp::Url::from_file_path(file.abs_path(cx)).unwrap();
1529 if let Some((_, server)) = this.language_server_for_buffer(buffer, cx) {
1530 server
1531 .notify::<lsp::notification::DidCloseTextDocument>(
1532 lsp::DidCloseTextDocumentParams {
1533 text_document: lsp::TextDocumentIdentifier::new(uri),
1534 },
1535 )
1536 .log_err();
1537 }
1538 }
1539 }
1540 })
1541 .detach();
1542
1543 *self.opened_buffer.0.borrow_mut() = ();
1544 Ok(())
1545 }
1546
1547 fn register_buffer_with_language_server(
1548 &mut self,
1549 buffer_handle: &ModelHandle<Buffer>,
1550 cx: &mut ModelContext<Self>,
1551 ) {
1552 let buffer = buffer_handle.read(cx);
1553 let buffer_id = buffer.remote_id();
1554 if let Some(file) = File::from_dyn(buffer.file()) {
1555 if file.is_local() {
1556 let uri = lsp::Url::from_file_path(file.abs_path(cx)).unwrap();
1557 let initial_snapshot = buffer.text_snapshot();
1558
1559 let mut language_server = None;
1560 let mut language_id = None;
1561 if let Some(language) = buffer.language() {
1562 let worktree_id = file.worktree_id(cx);
1563 if let Some(adapter) = language.lsp_adapter() {
1564 language_id = adapter.language_ids.get(language.name().as_ref()).cloned();
1565 language_server = self
1566 .language_server_ids
1567 .get(&(worktree_id, adapter.name.clone()))
1568 .and_then(|id| self.language_servers.get(id))
1569 .and_then(|server_state| {
1570 if let LanguageServerState::Running { server, .. } = server_state {
1571 Some(server.clone())
1572 } else {
1573 None
1574 }
1575 });
1576 }
1577 }
1578
1579 if let Some(local_worktree) = file.worktree.read(cx).as_local() {
1580 if let Some(diagnostics) = local_worktree.diagnostics_for_path(file.path()) {
1581 self.update_buffer_diagnostics(buffer_handle, diagnostics, None, cx)
1582 .log_err();
1583 }
1584 }
1585
1586 if let Some(server) = language_server {
1587 server
1588 .notify::<lsp::notification::DidOpenTextDocument>(
1589 lsp::DidOpenTextDocumentParams {
1590 text_document: lsp::TextDocumentItem::new(
1591 uri,
1592 language_id.unwrap_or_default(),
1593 0,
1594 initial_snapshot.text(),
1595 ),
1596 },
1597 )
1598 .log_err();
1599 buffer_handle.update(cx, |buffer, cx| {
1600 buffer.set_completion_triggers(
1601 server
1602 .capabilities()
1603 .completion_provider
1604 .as_ref()
1605 .and_then(|provider| provider.trigger_characters.clone())
1606 .unwrap_or_default(),
1607 cx,
1608 )
1609 });
1610 self.buffer_snapshots
1611 .insert(buffer_id, vec![(0, initial_snapshot)]);
1612 }
1613 }
1614 }
1615 }
1616
1617 fn unregister_buffer_from_language_server(
1618 &mut self,
1619 buffer: &ModelHandle<Buffer>,
1620 old_path: PathBuf,
1621 cx: &mut ModelContext<Self>,
1622 ) {
1623 buffer.update(cx, |buffer, cx| {
1624 buffer.update_diagnostics(Default::default(), cx);
1625 self.buffer_snapshots.remove(&buffer.remote_id());
1626 if let Some((_, language_server)) = self.language_server_for_buffer(buffer, cx) {
1627 language_server
1628 .notify::<lsp::notification::DidCloseTextDocument>(
1629 lsp::DidCloseTextDocumentParams {
1630 text_document: lsp::TextDocumentIdentifier::new(
1631 lsp::Url::from_file_path(old_path).unwrap(),
1632 ),
1633 },
1634 )
1635 .log_err();
1636 }
1637 });
1638 }
1639
1640 fn on_buffer_event(
1641 &mut self,
1642 buffer: ModelHandle<Buffer>,
1643 event: &BufferEvent,
1644 cx: &mut ModelContext<Self>,
1645 ) -> Option<()> {
1646 match event {
1647 BufferEvent::Operation(operation) => {
1648 if let Some(project_id) = self.remote_id() {
1649 let request = self.client.request(proto::UpdateBuffer {
1650 project_id,
1651 buffer_id: buffer.read(cx).remote_id(),
1652 operations: vec![language::proto::serialize_operation(operation)],
1653 });
1654 cx.background().spawn(request).detach_and_log_err(cx);
1655 }
1656 }
1657 BufferEvent::Edited { .. } => {
1658 let language_server = self
1659 .language_server_for_buffer(buffer.read(cx), cx)
1660 .map(|(_, server)| server.clone())?;
1661 let buffer = buffer.read(cx);
1662 let file = File::from_dyn(buffer.file())?;
1663 let abs_path = file.as_local()?.abs_path(cx);
1664 let uri = lsp::Url::from_file_path(abs_path).unwrap();
1665 let buffer_snapshots = self.buffer_snapshots.get_mut(&buffer.remote_id())?;
1666 let (version, prev_snapshot) = buffer_snapshots.last()?;
1667 let next_snapshot = buffer.text_snapshot();
1668 let next_version = version + 1;
1669
1670 let content_changes = buffer
1671 .edits_since::<(PointUtf16, usize)>(prev_snapshot.version())
1672 .map(|edit| {
1673 let edit_start = edit.new.start.0;
1674 let edit_end = edit_start + (edit.old.end.0 - edit.old.start.0);
1675 let new_text = next_snapshot
1676 .text_for_range(edit.new.start.1..edit.new.end.1)
1677 .collect();
1678 lsp::TextDocumentContentChangeEvent {
1679 range: Some(lsp::Range::new(
1680 point_to_lsp(edit_start),
1681 point_to_lsp(edit_end),
1682 )),
1683 range_length: None,
1684 text: new_text,
1685 }
1686 })
1687 .collect();
1688
1689 buffer_snapshots.push((next_version, next_snapshot));
1690
1691 language_server
1692 .notify::<lsp::notification::DidChangeTextDocument>(
1693 lsp::DidChangeTextDocumentParams {
1694 text_document: lsp::VersionedTextDocumentIdentifier::new(
1695 uri,
1696 next_version,
1697 ),
1698 content_changes,
1699 },
1700 )
1701 .log_err();
1702 }
1703 BufferEvent::Saved => {
1704 let file = File::from_dyn(buffer.read(cx).file())?;
1705 let worktree_id = file.worktree_id(cx);
1706 let abs_path = file.as_local()?.abs_path(cx);
1707 let text_document = lsp::TextDocumentIdentifier {
1708 uri: lsp::Url::from_file_path(abs_path).unwrap(),
1709 };
1710
1711 for (_, _, server) in self.language_servers_for_worktree(worktree_id) {
1712 server
1713 .notify::<lsp::notification::DidSaveTextDocument>(
1714 lsp::DidSaveTextDocumentParams {
1715 text_document: text_document.clone(),
1716 text: None,
1717 },
1718 )
1719 .log_err();
1720 }
1721
1722 // After saving a buffer, simulate disk-based diagnostics being finished for languages
1723 // that don't support a disk-based progress token.
1724 let (lsp_adapter, language_server) =
1725 self.language_server_for_buffer(buffer.read(cx), cx)?;
1726 if lsp_adapter.disk_based_diagnostics_progress_token.is_none() {
1727 let server_id = language_server.server_id();
1728 self.disk_based_diagnostics_finished(server_id, cx);
1729 self.broadcast_language_server_update(
1730 server_id,
1731 proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1732 proto::LspDiskBasedDiagnosticsUpdated {},
1733 ),
1734 );
1735 }
1736 }
1737 _ => {}
1738 }
1739
1740 None
1741 }
1742
1743 fn language_servers_for_worktree(
1744 &self,
1745 worktree_id: WorktreeId,
1746 ) -> impl Iterator<Item = (&Arc<CachedLspAdapter>, &Arc<Language>, &Arc<LanguageServer>)> {
1747 self.language_server_ids
1748 .iter()
1749 .filter_map(move |((language_server_worktree_id, _), id)| {
1750 if *language_server_worktree_id == worktree_id {
1751 if let Some(LanguageServerState::Running {
1752 adapter,
1753 language,
1754 server,
1755 }) = self.language_servers.get(id)
1756 {
1757 return Some((adapter, language, server));
1758 }
1759 }
1760 None
1761 })
1762 }
1763
1764 fn maintain_buffer_languages(
1765 languages: &LanguageRegistry,
1766 cx: &mut ModelContext<Project>,
1767 ) -> Task<()> {
1768 let mut subscription = languages.subscribe();
1769 cx.spawn_weak(|project, mut cx| async move {
1770 while let Some(()) = subscription.next().await {
1771 if let Some(project) = project.upgrade(&cx) {
1772 project.update(&mut cx, |project, cx| {
1773 let mut buffers_without_language = Vec::new();
1774 for buffer in project.opened_buffers.values() {
1775 if let Some(buffer) = buffer.upgrade(cx) {
1776 if buffer.read(cx).language().is_none() {
1777 buffers_without_language.push(buffer);
1778 }
1779 }
1780 }
1781
1782 for buffer in buffers_without_language {
1783 project.assign_language_to_buffer(&buffer, cx);
1784 project.register_buffer_with_language_server(&buffer, cx);
1785 }
1786 });
1787 }
1788 }
1789 })
1790 }
1791
1792 fn assign_language_to_buffer(
1793 &mut self,
1794 buffer: &ModelHandle<Buffer>,
1795 cx: &mut ModelContext<Self>,
1796 ) -> Option<()> {
1797 // If the buffer has a language, set it and start the language server if we haven't already.
1798 let full_path = buffer.read(cx).file()?.full_path(cx);
1799 let new_language = self.languages.select_language(&full_path)?;
1800 buffer.update(cx, |buffer, cx| {
1801 if buffer.language().map_or(true, |old_language| {
1802 !Arc::ptr_eq(old_language, &new_language)
1803 }) {
1804 buffer.set_language_registry(self.languages.clone());
1805 buffer.set_language(Some(new_language.clone()), cx);
1806 }
1807 });
1808
1809 let file = File::from_dyn(buffer.read(cx).file())?;
1810 let worktree = file.worktree.read(cx).as_local()?;
1811 let worktree_id = worktree.id();
1812 let worktree_abs_path = worktree.abs_path().clone();
1813 self.start_language_server(worktree_id, worktree_abs_path, new_language, cx);
1814
1815 None
1816 }
1817
1818 fn merge_json_value_into(source: serde_json::Value, target: &mut serde_json::Value) {
1819 use serde_json::Value;
1820
1821 match (source, target) {
1822 (Value::Object(source), Value::Object(target)) => {
1823 for (key, value) in source {
1824 if let Some(target) = target.get_mut(&key) {
1825 Self::merge_json_value_into(value, target);
1826 } else {
1827 target.insert(key.clone(), value);
1828 }
1829 }
1830 }
1831
1832 (source, target) => *target = source,
1833 }
1834 }
1835
1836 fn start_language_server(
1837 &mut self,
1838 worktree_id: WorktreeId,
1839 worktree_path: Arc<Path>,
1840 language: Arc<Language>,
1841 cx: &mut ModelContext<Self>,
1842 ) {
1843 if !cx
1844 .global::<Settings>()
1845 .enable_language_server(Some(&language.name()))
1846 {
1847 return;
1848 }
1849
1850 let adapter = if let Some(adapter) = language.lsp_adapter() {
1851 adapter
1852 } else {
1853 return;
1854 };
1855 let key = (worktree_id, adapter.name.clone());
1856
1857 let mut initialization_options = adapter.initialization_options.clone();
1858
1859 let lsp = &cx.global::<Settings>().lsp.get(&adapter.name.0);
1860 let override_options = lsp.map(|s| s.initialization_options.clone()).flatten();
1861 match (&mut initialization_options, override_options) {
1862 (Some(initialization_options), Some(override_options)) => {
1863 Self::merge_json_value_into(override_options, initialization_options);
1864 }
1865
1866 (None, override_options) => initialization_options = override_options,
1867
1868 _ => {}
1869 }
1870
1871 self.language_server_ids
1872 .entry(key.clone())
1873 .or_insert_with(|| {
1874 let server_id = post_inc(&mut self.next_language_server_id);
1875 let language_server = self.languages.start_language_server(
1876 server_id,
1877 language.clone(),
1878 worktree_path,
1879 self.client.http_client(),
1880 cx,
1881 );
1882 self.language_servers.insert(
1883 server_id,
1884 LanguageServerState::Starting(cx.spawn_weak(|this, mut cx| async move {
1885 let language_server = language_server?.await.log_err()?;
1886 let language_server = language_server
1887 .initialize(initialization_options)
1888 .await
1889 .log_err()?;
1890 let this = this.upgrade(&cx)?;
1891
1892 language_server
1893 .on_notification::<lsp::notification::PublishDiagnostics, _>({
1894 let this = this.downgrade();
1895 let adapter = adapter.clone();
1896 move |mut params, cx| {
1897 let this = this;
1898 let adapter = adapter.clone();
1899 cx.spawn(|mut cx| async move {
1900 adapter.process_diagnostics(&mut params).await;
1901 if let Some(this) = this.upgrade(&cx) {
1902 this.update(&mut cx, |this, cx| {
1903 this.update_diagnostics(
1904 server_id,
1905 params,
1906 &adapter.disk_based_diagnostic_sources,
1907 cx,
1908 )
1909 .log_err();
1910 });
1911 }
1912 })
1913 .detach();
1914 }
1915 })
1916 .detach();
1917
1918 language_server
1919 .on_request::<lsp::request::WorkspaceConfiguration, _, _>({
1920 let settings = this.read_with(&cx, |this, _| {
1921 this.language_server_settings.clone()
1922 });
1923 move |params, _| {
1924 let settings = settings.lock().clone();
1925 async move {
1926 Ok(params
1927 .items
1928 .into_iter()
1929 .map(|item| {
1930 if let Some(section) = &item.section {
1931 settings
1932 .get(section)
1933 .cloned()
1934 .unwrap_or(serde_json::Value::Null)
1935 } else {
1936 settings.clone()
1937 }
1938 })
1939 .collect())
1940 }
1941 }
1942 })
1943 .detach();
1944
1945 // Even though we don't have handling for these requests, respond to them to
1946 // avoid stalling any language server like `gopls` which waits for a response
1947 // to these requests when initializing.
1948 language_server
1949 .on_request::<lsp::request::WorkDoneProgressCreate, _, _>({
1950 let this = this.downgrade();
1951 move |params, mut cx| async move {
1952 if let Some(this) = this.upgrade(&cx) {
1953 this.update(&mut cx, |this, _| {
1954 if let Some(status) =
1955 this.language_server_statuses.get_mut(&server_id)
1956 {
1957 if let lsp::NumberOrString::String(token) =
1958 params.token
1959 {
1960 status.progress_tokens.insert(token);
1961 }
1962 }
1963 });
1964 }
1965 Ok(())
1966 }
1967 })
1968 .detach();
1969 language_server
1970 .on_request::<lsp::request::RegisterCapability, _, _>(|_, _| async {
1971 Ok(())
1972 })
1973 .detach();
1974
1975 language_server
1976 .on_request::<lsp::request::ApplyWorkspaceEdit, _, _>({
1977 let this = this.downgrade();
1978 let adapter = adapter.clone();
1979 let language_server = language_server.clone();
1980 move |params, cx| {
1981 Self::on_lsp_workspace_edit(
1982 this,
1983 params,
1984 server_id,
1985 adapter.clone(),
1986 language_server.clone(),
1987 cx,
1988 )
1989 }
1990 })
1991 .detach();
1992
1993 let disk_based_diagnostics_progress_token =
1994 adapter.disk_based_diagnostics_progress_token.clone();
1995
1996 language_server
1997 .on_notification::<lsp::notification::Progress, _>({
1998 let this = this.downgrade();
1999 move |params, mut cx| {
2000 if let Some(this) = this.upgrade(&cx) {
2001 this.update(&mut cx, |this, cx| {
2002 this.on_lsp_progress(
2003 params,
2004 server_id,
2005 disk_based_diagnostics_progress_token.clone(),
2006 cx,
2007 );
2008 });
2009 }
2010 }
2011 })
2012 .detach();
2013
2014 this.update(&mut cx, |this, cx| {
2015 // If the language server for this key doesn't match the server id, don't store the
2016 // server. Which will cause it to be dropped, killing the process
2017 if this
2018 .language_server_ids
2019 .get(&key)
2020 .map(|id| id != &server_id)
2021 .unwrap_or(false)
2022 {
2023 return None;
2024 }
2025
2026 // Update language_servers collection with Running variant of LanguageServerState
2027 // indicating that the server is up and running and ready
2028 this.language_servers.insert(
2029 server_id,
2030 LanguageServerState::Running {
2031 adapter: adapter.clone(),
2032 language,
2033 server: language_server.clone(),
2034 },
2035 );
2036 this.language_server_statuses.insert(
2037 server_id,
2038 LanguageServerStatus {
2039 name: language_server.name().to_string(),
2040 pending_work: Default::default(),
2041 has_pending_diagnostic_updates: false,
2042 progress_tokens: Default::default(),
2043 },
2044 );
2045 language_server
2046 .notify::<lsp::notification::DidChangeConfiguration>(
2047 lsp::DidChangeConfigurationParams {
2048 settings: this.language_server_settings.lock().clone(),
2049 },
2050 )
2051 .ok();
2052
2053 if let Some(project_id) = this.remote_id() {
2054 this.client
2055 .send(proto::StartLanguageServer {
2056 project_id,
2057 server: Some(proto::LanguageServer {
2058 id: server_id as u64,
2059 name: language_server.name().to_string(),
2060 }),
2061 })
2062 .log_err();
2063 }
2064
2065 // Tell the language server about every open buffer in the worktree that matches the language.
2066 for buffer in this.opened_buffers.values() {
2067 if let Some(buffer_handle) = buffer.upgrade(cx) {
2068 let buffer = buffer_handle.read(cx);
2069 let file = if let Some(file) = File::from_dyn(buffer.file()) {
2070 file
2071 } else {
2072 continue;
2073 };
2074 let language = if let Some(language) = buffer.language() {
2075 language
2076 } else {
2077 continue;
2078 };
2079 if file.worktree.read(cx).id() != key.0
2080 || language.lsp_adapter().map(|a| a.name.clone())
2081 != Some(key.1.clone())
2082 {
2083 continue;
2084 }
2085
2086 let file = file.as_local()?;
2087 let versions = this
2088 .buffer_snapshots
2089 .entry(buffer.remote_id())
2090 .or_insert_with(|| vec![(0, buffer.text_snapshot())]);
2091 let (version, initial_snapshot) = versions.last().unwrap();
2092 let uri = lsp::Url::from_file_path(file.abs_path(cx)).unwrap();
2093 language_server
2094 .notify::<lsp::notification::DidOpenTextDocument>(
2095 lsp::DidOpenTextDocumentParams {
2096 text_document: lsp::TextDocumentItem::new(
2097 uri,
2098 adapter
2099 .language_ids
2100 .get(language.name().as_ref())
2101 .cloned()
2102 .unwrap_or_default(),
2103 *version,
2104 initial_snapshot.text(),
2105 ),
2106 },
2107 )
2108 .log_err()?;
2109 buffer_handle.update(cx, |buffer, cx| {
2110 buffer.set_completion_triggers(
2111 language_server
2112 .capabilities()
2113 .completion_provider
2114 .as_ref()
2115 .and_then(|provider| {
2116 provider.trigger_characters.clone()
2117 })
2118 .unwrap_or_default(),
2119 cx,
2120 )
2121 });
2122 }
2123 }
2124
2125 cx.notify();
2126 Some(language_server)
2127 })
2128 })),
2129 );
2130
2131 server_id
2132 });
2133 }
2134
2135 // Returns a list of all of the worktrees which no longer have a language server and the root path
2136 // for the stopped server
2137 fn stop_language_server(
2138 &mut self,
2139 worktree_id: WorktreeId,
2140 adapter_name: LanguageServerName,
2141 cx: &mut ModelContext<Self>,
2142 ) -> Task<(Option<PathBuf>, Vec<WorktreeId>)> {
2143 let key = (worktree_id, adapter_name);
2144 if let Some(server_id) = self.language_server_ids.remove(&key) {
2145 // Remove other entries for this language server as well
2146 let mut orphaned_worktrees = vec![worktree_id];
2147 let other_keys = self.language_server_ids.keys().cloned().collect::<Vec<_>>();
2148 for other_key in other_keys {
2149 if self.language_server_ids.get(&other_key) == Some(&server_id) {
2150 self.language_server_ids.remove(&other_key);
2151 orphaned_worktrees.push(other_key.0);
2152 }
2153 }
2154
2155 self.language_server_statuses.remove(&server_id);
2156 cx.notify();
2157
2158 let server_state = self.language_servers.remove(&server_id);
2159 cx.spawn_weak(|this, mut cx| async move {
2160 let mut root_path = None;
2161
2162 let server = match server_state {
2163 Some(LanguageServerState::Starting(started_language_server)) => {
2164 started_language_server.await
2165 }
2166 Some(LanguageServerState::Running { server, .. }) => Some(server),
2167 None => None,
2168 };
2169
2170 if let Some(server) = server {
2171 root_path = Some(server.root_path().clone());
2172 if let Some(shutdown) = server.shutdown() {
2173 shutdown.await;
2174 }
2175 }
2176
2177 if let Some(this) = this.upgrade(&cx) {
2178 this.update(&mut cx, |this, cx| {
2179 this.language_server_statuses.remove(&server_id);
2180 cx.notify();
2181 });
2182 }
2183
2184 (root_path, orphaned_worktrees)
2185 })
2186 } else {
2187 Task::ready((None, Vec::new()))
2188 }
2189 }
2190
2191 pub fn restart_language_servers_for_buffers(
2192 &mut self,
2193 buffers: impl IntoIterator<Item = ModelHandle<Buffer>>,
2194 cx: &mut ModelContext<Self>,
2195 ) -> Option<()> {
2196 let language_server_lookup_info: HashSet<(WorktreeId, Arc<Path>, PathBuf)> = buffers
2197 .into_iter()
2198 .filter_map(|buffer| {
2199 let file = File::from_dyn(buffer.read(cx).file())?;
2200 let worktree = file.worktree.read(cx).as_local()?;
2201 let worktree_id = worktree.id();
2202 let worktree_abs_path = worktree.abs_path().clone();
2203 let full_path = file.full_path(cx);
2204 Some((worktree_id, worktree_abs_path, full_path))
2205 })
2206 .collect();
2207 for (worktree_id, worktree_abs_path, full_path) in language_server_lookup_info {
2208 let language = self.languages.select_language(&full_path)?;
2209 self.restart_language_server(worktree_id, worktree_abs_path, language, cx);
2210 }
2211
2212 None
2213 }
2214
2215 fn restart_language_server(
2216 &mut self,
2217 worktree_id: WorktreeId,
2218 fallback_path: Arc<Path>,
2219 language: Arc<Language>,
2220 cx: &mut ModelContext<Self>,
2221 ) {
2222 let adapter = if let Some(adapter) = language.lsp_adapter() {
2223 adapter
2224 } else {
2225 return;
2226 };
2227
2228 let server_name = adapter.name.clone();
2229 let stop = self.stop_language_server(worktree_id, server_name.clone(), cx);
2230 cx.spawn_weak(|this, mut cx| async move {
2231 let (original_root_path, orphaned_worktrees) = stop.await;
2232 if let Some(this) = this.upgrade(&cx) {
2233 this.update(&mut cx, |this, cx| {
2234 // Attempt to restart using original server path. Fallback to passed in
2235 // path if we could not retrieve the root path
2236 let root_path = original_root_path
2237 .map(|path_buf| Arc::from(path_buf.as_path()))
2238 .unwrap_or(fallback_path);
2239
2240 this.start_language_server(worktree_id, root_path, language, cx);
2241
2242 // Lookup new server id and set it for each of the orphaned worktrees
2243 if let Some(new_server_id) = this
2244 .language_server_ids
2245 .get(&(worktree_id, server_name.clone()))
2246 .cloned()
2247 {
2248 for orphaned_worktree in orphaned_worktrees {
2249 this.language_server_ids
2250 .insert((orphaned_worktree, server_name.clone()), new_server_id);
2251 }
2252 }
2253 });
2254 }
2255 })
2256 .detach();
2257 }
2258
2259 fn on_lsp_progress(
2260 &mut self,
2261 progress: lsp::ProgressParams,
2262 server_id: usize,
2263 disk_based_diagnostics_progress_token: Option<String>,
2264 cx: &mut ModelContext<Self>,
2265 ) {
2266 let token = match progress.token {
2267 lsp::NumberOrString::String(token) => token,
2268 lsp::NumberOrString::Number(token) => {
2269 log::info!("skipping numeric progress token {}", token);
2270 return;
2271 }
2272 };
2273 let lsp::ProgressParamsValue::WorkDone(progress) = progress.value;
2274 let language_server_status =
2275 if let Some(status) = self.language_server_statuses.get_mut(&server_id) {
2276 status
2277 } else {
2278 return;
2279 };
2280
2281 if !language_server_status.progress_tokens.contains(&token) {
2282 return;
2283 }
2284
2285 let is_disk_based_diagnostics_progress = disk_based_diagnostics_progress_token
2286 .as_ref()
2287 .map_or(false, |disk_based_token| {
2288 token.starts_with(disk_based_token)
2289 });
2290
2291 match progress {
2292 lsp::WorkDoneProgress::Begin(report) => {
2293 if is_disk_based_diagnostics_progress {
2294 language_server_status.has_pending_diagnostic_updates = true;
2295 self.disk_based_diagnostics_started(server_id, cx);
2296 self.broadcast_language_server_update(
2297 server_id,
2298 proto::update_language_server::Variant::DiskBasedDiagnosticsUpdating(
2299 proto::LspDiskBasedDiagnosticsUpdating {},
2300 ),
2301 );
2302 } else {
2303 self.on_lsp_work_start(
2304 server_id,
2305 token.clone(),
2306 LanguageServerProgress {
2307 message: report.message.clone(),
2308 percentage: report.percentage.map(|p| p as usize),
2309 last_update_at: Instant::now(),
2310 },
2311 cx,
2312 );
2313 self.broadcast_language_server_update(
2314 server_id,
2315 proto::update_language_server::Variant::WorkStart(proto::LspWorkStart {
2316 token,
2317 message: report.message,
2318 percentage: report.percentage.map(|p| p as u32),
2319 }),
2320 );
2321 }
2322 }
2323 lsp::WorkDoneProgress::Report(report) => {
2324 if !is_disk_based_diagnostics_progress {
2325 self.on_lsp_work_progress(
2326 server_id,
2327 token.clone(),
2328 LanguageServerProgress {
2329 message: report.message.clone(),
2330 percentage: report.percentage.map(|p| p as usize),
2331 last_update_at: Instant::now(),
2332 },
2333 cx,
2334 );
2335 self.broadcast_language_server_update(
2336 server_id,
2337 proto::update_language_server::Variant::WorkProgress(
2338 proto::LspWorkProgress {
2339 token,
2340 message: report.message,
2341 percentage: report.percentage.map(|p| p as u32),
2342 },
2343 ),
2344 );
2345 }
2346 }
2347 lsp::WorkDoneProgress::End(_) => {
2348 language_server_status.progress_tokens.remove(&token);
2349
2350 if is_disk_based_diagnostics_progress {
2351 language_server_status.has_pending_diagnostic_updates = false;
2352 self.disk_based_diagnostics_finished(server_id, cx);
2353 self.broadcast_language_server_update(
2354 server_id,
2355 proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
2356 proto::LspDiskBasedDiagnosticsUpdated {},
2357 ),
2358 );
2359 } else {
2360 self.on_lsp_work_end(server_id, token.clone(), cx);
2361 self.broadcast_language_server_update(
2362 server_id,
2363 proto::update_language_server::Variant::WorkEnd(proto::LspWorkEnd {
2364 token,
2365 }),
2366 );
2367 }
2368 }
2369 }
2370 }
2371
2372 fn on_lsp_work_start(
2373 &mut self,
2374 language_server_id: usize,
2375 token: String,
2376 progress: LanguageServerProgress,
2377 cx: &mut ModelContext<Self>,
2378 ) {
2379 if let Some(status) = self.language_server_statuses.get_mut(&language_server_id) {
2380 status.pending_work.insert(token, progress);
2381 cx.notify();
2382 }
2383 }
2384
2385 fn on_lsp_work_progress(
2386 &mut self,
2387 language_server_id: usize,
2388 token: String,
2389 progress: LanguageServerProgress,
2390 cx: &mut ModelContext<Self>,
2391 ) {
2392 if let Some(status) = self.language_server_statuses.get_mut(&language_server_id) {
2393 let entry = status
2394 .pending_work
2395 .entry(token)
2396 .or_insert(LanguageServerProgress {
2397 message: Default::default(),
2398 percentage: Default::default(),
2399 last_update_at: progress.last_update_at,
2400 });
2401 if progress.message.is_some() {
2402 entry.message = progress.message;
2403 }
2404 if progress.percentage.is_some() {
2405 entry.percentage = progress.percentage;
2406 }
2407 entry.last_update_at = progress.last_update_at;
2408 cx.notify();
2409 }
2410 }
2411
2412 fn on_lsp_work_end(
2413 &mut self,
2414 language_server_id: usize,
2415 token: String,
2416 cx: &mut ModelContext<Self>,
2417 ) {
2418 if let Some(status) = self.language_server_statuses.get_mut(&language_server_id) {
2419 status.pending_work.remove(&token);
2420 cx.notify();
2421 }
2422 }
2423
2424 async fn on_lsp_workspace_edit(
2425 this: WeakModelHandle<Self>,
2426 params: lsp::ApplyWorkspaceEditParams,
2427 server_id: usize,
2428 adapter: Arc<CachedLspAdapter>,
2429 language_server: Arc<LanguageServer>,
2430 mut cx: AsyncAppContext,
2431 ) -> Result<lsp::ApplyWorkspaceEditResponse> {
2432 let this = this
2433 .upgrade(&cx)
2434 .ok_or_else(|| anyhow!("project project closed"))?;
2435 let transaction = Self::deserialize_workspace_edit(
2436 this.clone(),
2437 params.edit,
2438 true,
2439 adapter.clone(),
2440 language_server.clone(),
2441 &mut cx,
2442 )
2443 .await
2444 .log_err();
2445 this.update(&mut cx, |this, _| {
2446 if let Some(transaction) = transaction {
2447 this.last_workspace_edits_by_language_server
2448 .insert(server_id, transaction);
2449 }
2450 });
2451 Ok(lsp::ApplyWorkspaceEditResponse {
2452 applied: true,
2453 failed_change: None,
2454 failure_reason: None,
2455 })
2456 }
2457
2458 fn broadcast_language_server_update(
2459 &self,
2460 language_server_id: usize,
2461 event: proto::update_language_server::Variant,
2462 ) {
2463 if let Some(project_id) = self.remote_id() {
2464 self.client
2465 .send(proto::UpdateLanguageServer {
2466 project_id,
2467 language_server_id: language_server_id as u64,
2468 variant: Some(event),
2469 })
2470 .log_err();
2471 }
2472 }
2473
2474 pub fn set_language_server_settings(&mut self, settings: serde_json::Value) {
2475 for server_state in self.language_servers.values() {
2476 if let LanguageServerState::Running { server, .. } = server_state {
2477 server
2478 .notify::<lsp::notification::DidChangeConfiguration>(
2479 lsp::DidChangeConfigurationParams {
2480 settings: settings.clone(),
2481 },
2482 )
2483 .ok();
2484 }
2485 }
2486 *self.language_server_settings.lock() = settings;
2487 }
2488
2489 pub fn language_server_statuses(
2490 &self,
2491 ) -> impl DoubleEndedIterator<Item = &LanguageServerStatus> {
2492 self.language_server_statuses.values()
2493 }
2494
2495 pub fn update_diagnostics(
2496 &mut self,
2497 language_server_id: usize,
2498 params: lsp::PublishDiagnosticsParams,
2499 disk_based_sources: &[String],
2500 cx: &mut ModelContext<Self>,
2501 ) -> Result<()> {
2502 let abs_path = params
2503 .uri
2504 .to_file_path()
2505 .map_err(|_| anyhow!("URI is not a file"))?;
2506 let mut diagnostics = Vec::default();
2507 let mut primary_diagnostic_group_ids = HashMap::default();
2508 let mut sources_by_group_id = HashMap::default();
2509 let mut supporting_diagnostics = HashMap::default();
2510 for diagnostic in ¶ms.diagnostics {
2511 let source = diagnostic.source.as_ref();
2512 let code = diagnostic.code.as_ref().map(|code| match code {
2513 lsp::NumberOrString::Number(code) => code.to_string(),
2514 lsp::NumberOrString::String(code) => code.clone(),
2515 });
2516 let range = range_from_lsp(diagnostic.range);
2517 let is_supporting = diagnostic
2518 .related_information
2519 .as_ref()
2520 .map_or(false, |infos| {
2521 infos.iter().any(|info| {
2522 primary_diagnostic_group_ids.contains_key(&(
2523 source,
2524 code.clone(),
2525 range_from_lsp(info.location.range),
2526 ))
2527 })
2528 });
2529
2530 let is_unnecessary = diagnostic.tags.as_ref().map_or(false, |tags| {
2531 tags.iter().any(|tag| *tag == DiagnosticTag::UNNECESSARY)
2532 });
2533
2534 if is_supporting {
2535 supporting_diagnostics.insert(
2536 (source, code.clone(), range),
2537 (diagnostic.severity, is_unnecessary),
2538 );
2539 } else {
2540 let group_id = post_inc(&mut self.next_diagnostic_group_id);
2541 let is_disk_based =
2542 source.map_or(false, |source| disk_based_sources.contains(source));
2543
2544 sources_by_group_id.insert(group_id, source);
2545 primary_diagnostic_group_ids
2546 .insert((source, code.clone(), range.clone()), group_id);
2547
2548 diagnostics.push(DiagnosticEntry {
2549 range,
2550 diagnostic: Diagnostic {
2551 code: code.clone(),
2552 severity: diagnostic.severity.unwrap_or(DiagnosticSeverity::ERROR),
2553 message: diagnostic.message.clone(),
2554 group_id,
2555 is_primary: true,
2556 is_valid: true,
2557 is_disk_based,
2558 is_unnecessary,
2559 },
2560 });
2561 if let Some(infos) = &diagnostic.related_information {
2562 for info in infos {
2563 if info.location.uri == params.uri && !info.message.is_empty() {
2564 let range = range_from_lsp(info.location.range);
2565 diagnostics.push(DiagnosticEntry {
2566 range,
2567 diagnostic: Diagnostic {
2568 code: code.clone(),
2569 severity: DiagnosticSeverity::INFORMATION,
2570 message: info.message.clone(),
2571 group_id,
2572 is_primary: false,
2573 is_valid: true,
2574 is_disk_based,
2575 is_unnecessary: false,
2576 },
2577 });
2578 }
2579 }
2580 }
2581 }
2582 }
2583
2584 for entry in &mut diagnostics {
2585 let diagnostic = &mut entry.diagnostic;
2586 if !diagnostic.is_primary {
2587 let source = *sources_by_group_id.get(&diagnostic.group_id).unwrap();
2588 if let Some(&(severity, is_unnecessary)) = supporting_diagnostics.get(&(
2589 source,
2590 diagnostic.code.clone(),
2591 entry.range.clone(),
2592 )) {
2593 if let Some(severity) = severity {
2594 diagnostic.severity = severity;
2595 }
2596 diagnostic.is_unnecessary = is_unnecessary;
2597 }
2598 }
2599 }
2600
2601 self.update_diagnostic_entries(
2602 language_server_id,
2603 abs_path,
2604 params.version,
2605 diagnostics,
2606 cx,
2607 )?;
2608 Ok(())
2609 }
2610
2611 pub fn update_diagnostic_entries(
2612 &mut self,
2613 language_server_id: usize,
2614 abs_path: PathBuf,
2615 version: Option<i32>,
2616 diagnostics: Vec<DiagnosticEntry<Unclipped<PointUtf16>>>,
2617 cx: &mut ModelContext<Project>,
2618 ) -> Result<(), anyhow::Error> {
2619 let (worktree, relative_path) = self
2620 .find_local_worktree(&abs_path, cx)
2621 .ok_or_else(|| anyhow!("no worktree found for diagnostics"))?;
2622
2623 let project_path = ProjectPath {
2624 worktree_id: worktree.read(cx).id(),
2625 path: relative_path.into(),
2626 };
2627 if let Some(buffer) = self.get_open_buffer(&project_path, cx) {
2628 self.update_buffer_diagnostics(&buffer, diagnostics.clone(), version, cx)?;
2629 }
2630
2631 let updated = worktree.update(cx, |worktree, cx| {
2632 worktree
2633 .as_local_mut()
2634 .ok_or_else(|| anyhow!("not a local worktree"))?
2635 .update_diagnostics(
2636 language_server_id,
2637 project_path.path.clone(),
2638 diagnostics,
2639 cx,
2640 )
2641 })?;
2642 if updated {
2643 cx.emit(Event::DiagnosticsUpdated {
2644 language_server_id,
2645 path: project_path,
2646 });
2647 }
2648 Ok(())
2649 }
2650
2651 fn update_buffer_diagnostics(
2652 &mut self,
2653 buffer: &ModelHandle<Buffer>,
2654 mut diagnostics: Vec<DiagnosticEntry<Unclipped<PointUtf16>>>,
2655 version: Option<i32>,
2656 cx: &mut ModelContext<Self>,
2657 ) -> Result<()> {
2658 fn compare_diagnostics(a: &Diagnostic, b: &Diagnostic) -> Ordering {
2659 Ordering::Equal
2660 .then_with(|| b.is_primary.cmp(&a.is_primary))
2661 .then_with(|| a.is_disk_based.cmp(&b.is_disk_based))
2662 .then_with(|| a.severity.cmp(&b.severity))
2663 .then_with(|| a.message.cmp(&b.message))
2664 }
2665
2666 let snapshot = self.buffer_snapshot_for_lsp_version(buffer, version, cx)?;
2667
2668 diagnostics.sort_unstable_by(|a, b| {
2669 Ordering::Equal
2670 .then_with(|| a.range.start.cmp(&b.range.start))
2671 .then_with(|| b.range.end.cmp(&a.range.end))
2672 .then_with(|| compare_diagnostics(&a.diagnostic, &b.diagnostic))
2673 });
2674
2675 let mut sanitized_diagnostics = Vec::new();
2676 let edits_since_save = Patch::new(
2677 snapshot
2678 .edits_since::<Unclipped<PointUtf16>>(buffer.read(cx).saved_version())
2679 .collect(),
2680 );
2681 for entry in diagnostics {
2682 let start;
2683 let end;
2684 if entry.diagnostic.is_disk_based {
2685 // Some diagnostics are based on files on disk instead of buffers'
2686 // current contents. Adjust these diagnostics' ranges to reflect
2687 // any unsaved edits.
2688 start = edits_since_save.old_to_new(entry.range.start);
2689 end = edits_since_save.old_to_new(entry.range.end);
2690 } else {
2691 start = entry.range.start;
2692 end = entry.range.end;
2693 }
2694
2695 let mut range = snapshot.clip_point_utf16(start, Bias::Left)
2696 ..snapshot.clip_point_utf16(end, Bias::Right);
2697
2698 // Expand empty ranges by one codepoint
2699 if range.start == range.end {
2700 // This will be go to the next boundary when being clipped
2701 range.end.column += 1;
2702 range.end = snapshot.clip_point_utf16(Unclipped(range.end), Bias::Right);
2703 if range.start == range.end && range.end.column > 0 {
2704 range.start.column -= 1;
2705 range.end = snapshot.clip_point_utf16(Unclipped(range.end), Bias::Left);
2706 }
2707 }
2708
2709 sanitized_diagnostics.push(DiagnosticEntry {
2710 range,
2711 diagnostic: entry.diagnostic,
2712 });
2713 }
2714 drop(edits_since_save);
2715
2716 let set = DiagnosticSet::new(sanitized_diagnostics, &snapshot);
2717 buffer.update(cx, |buffer, cx| buffer.update_diagnostics(set, cx));
2718 Ok(())
2719 }
2720
2721 pub fn reload_buffers(
2722 &self,
2723 buffers: HashSet<ModelHandle<Buffer>>,
2724 push_to_history: bool,
2725 cx: &mut ModelContext<Self>,
2726 ) -> Task<Result<ProjectTransaction>> {
2727 let mut local_buffers = Vec::new();
2728 let mut remote_buffers = None;
2729 for buffer_handle in buffers {
2730 let buffer = buffer_handle.read(cx);
2731 if buffer.is_dirty() {
2732 if let Some(file) = File::from_dyn(buffer.file()) {
2733 if file.is_local() {
2734 local_buffers.push(buffer_handle);
2735 } else {
2736 remote_buffers.get_or_insert(Vec::new()).push(buffer_handle);
2737 }
2738 }
2739 }
2740 }
2741
2742 let remote_buffers = self.remote_id().zip(remote_buffers);
2743 let client = self.client.clone();
2744
2745 cx.spawn(|this, mut cx| async move {
2746 let mut project_transaction = ProjectTransaction::default();
2747
2748 if let Some((project_id, remote_buffers)) = remote_buffers {
2749 let response = client
2750 .request(proto::ReloadBuffers {
2751 project_id,
2752 buffer_ids: remote_buffers
2753 .iter()
2754 .map(|buffer| buffer.read_with(&cx, |buffer, _| buffer.remote_id()))
2755 .collect(),
2756 })
2757 .await?
2758 .transaction
2759 .ok_or_else(|| anyhow!("missing transaction"))?;
2760 project_transaction = this
2761 .update(&mut cx, |this, cx| {
2762 this.deserialize_project_transaction(response, push_to_history, cx)
2763 })
2764 .await?;
2765 }
2766
2767 for buffer in local_buffers {
2768 let transaction = buffer
2769 .update(&mut cx, |buffer, cx| buffer.reload(cx))
2770 .await?;
2771 buffer.update(&mut cx, |buffer, cx| {
2772 if let Some(transaction) = transaction {
2773 if !push_to_history {
2774 buffer.forget_transaction(transaction.id);
2775 }
2776 project_transaction.0.insert(cx.handle(), transaction);
2777 }
2778 });
2779 }
2780
2781 Ok(project_transaction)
2782 })
2783 }
2784
2785 pub fn format(
2786 &self,
2787 buffers: HashSet<ModelHandle<Buffer>>,
2788 push_to_history: bool,
2789 trigger: FormatTrigger,
2790 cx: &mut ModelContext<Project>,
2791 ) -> Task<Result<ProjectTransaction>> {
2792 let mut local_buffers = Vec::new();
2793 let mut remote_buffers = None;
2794 for buffer_handle in buffers {
2795 let buffer = buffer_handle.read(cx);
2796 if let Some(file) = File::from_dyn(buffer.file()) {
2797 if let Some(buffer_abs_path) = file.as_local().map(|f| f.abs_path(cx)) {
2798 if let Some((_, server)) = self.language_server_for_buffer(buffer, cx) {
2799 local_buffers.push((buffer_handle, buffer_abs_path, server.clone()));
2800 }
2801 } else {
2802 remote_buffers.get_or_insert(Vec::new()).push(buffer_handle);
2803 }
2804 } else {
2805 return Task::ready(Ok(Default::default()));
2806 }
2807 }
2808
2809 let remote_buffers = self.remote_id().zip(remote_buffers);
2810 let client = self.client.clone();
2811
2812 cx.spawn(|this, mut cx| async move {
2813 let mut project_transaction = ProjectTransaction::default();
2814
2815 if let Some((project_id, remote_buffers)) = remote_buffers {
2816 let response = client
2817 .request(proto::FormatBuffers {
2818 project_id,
2819 trigger: trigger as i32,
2820 buffer_ids: remote_buffers
2821 .iter()
2822 .map(|buffer| buffer.read_with(&cx, |buffer, _| buffer.remote_id()))
2823 .collect(),
2824 })
2825 .await?
2826 .transaction
2827 .ok_or_else(|| anyhow!("missing transaction"))?;
2828 project_transaction = this
2829 .update(&mut cx, |this, cx| {
2830 this.deserialize_project_transaction(response, push_to_history, cx)
2831 })
2832 .await?;
2833 }
2834
2835 // Do not allow multiple concurrent formatting requests for the
2836 // same buffer.
2837 this.update(&mut cx, |this, _| {
2838 local_buffers
2839 .retain(|(buffer, _, _)| this.buffers_being_formatted.insert(buffer.id()));
2840 });
2841 let _cleanup = defer({
2842 let this = this.clone();
2843 let mut cx = cx.clone();
2844 let local_buffers = &local_buffers;
2845 move || {
2846 this.update(&mut cx, |this, _| {
2847 for (buffer, _, _) in local_buffers {
2848 this.buffers_being_formatted.remove(&buffer.id());
2849 }
2850 });
2851 }
2852 });
2853
2854 for (buffer, buffer_abs_path, language_server) in &local_buffers {
2855 let (format_on_save, formatter, tab_size) = buffer.read_with(&cx, |buffer, cx| {
2856 let settings = cx.global::<Settings>();
2857 let language_name = buffer.language().map(|language| language.name());
2858 (
2859 settings.format_on_save(language_name.as_deref()),
2860 settings.formatter(language_name.as_deref()),
2861 settings.tab_size(language_name.as_deref()),
2862 )
2863 });
2864
2865 let transaction = match (formatter, format_on_save) {
2866 (_, FormatOnSave::Off) if trigger == FormatTrigger::Save => continue,
2867
2868 (Formatter::LanguageServer, FormatOnSave::On | FormatOnSave::Off)
2869 | (_, FormatOnSave::LanguageServer) => Self::format_via_lsp(
2870 &this,
2871 &buffer,
2872 &buffer_abs_path,
2873 &language_server,
2874 tab_size,
2875 &mut cx,
2876 )
2877 .await
2878 .context("failed to format via language server")?,
2879
2880 (
2881 Formatter::External { command, arguments },
2882 FormatOnSave::On | FormatOnSave::Off,
2883 )
2884 | (_, FormatOnSave::External { command, arguments }) => {
2885 Self::format_via_external_command(
2886 &buffer,
2887 &buffer_abs_path,
2888 &command,
2889 &arguments,
2890 &mut cx,
2891 )
2892 .await
2893 .context(format!(
2894 "failed to format via external command {:?}",
2895 command
2896 ))?
2897 }
2898 };
2899
2900 if let Some(transaction) = transaction {
2901 if !push_to_history {
2902 buffer.update(&mut cx, |buffer, _| {
2903 buffer.forget_transaction(transaction.id)
2904 });
2905 }
2906 project_transaction.0.insert(buffer.clone(), transaction);
2907 }
2908 }
2909
2910 Ok(project_transaction)
2911 })
2912 }
2913
2914 async fn format_via_lsp(
2915 this: &ModelHandle<Self>,
2916 buffer: &ModelHandle<Buffer>,
2917 abs_path: &Path,
2918 language_server: &Arc<LanguageServer>,
2919 tab_size: NonZeroU32,
2920 cx: &mut AsyncAppContext,
2921 ) -> Result<Option<Transaction>> {
2922 let text_document =
2923 lsp::TextDocumentIdentifier::new(lsp::Url::from_file_path(abs_path).unwrap());
2924 let capabilities = &language_server.capabilities();
2925 let lsp_edits = if capabilities
2926 .document_formatting_provider
2927 .as_ref()
2928 .map_or(false, |provider| *provider != lsp::OneOf::Left(false))
2929 {
2930 language_server
2931 .request::<lsp::request::Formatting>(lsp::DocumentFormattingParams {
2932 text_document,
2933 options: lsp::FormattingOptions {
2934 tab_size: tab_size.into(),
2935 insert_spaces: true,
2936 insert_final_newline: Some(true),
2937 ..Default::default()
2938 },
2939 work_done_progress_params: Default::default(),
2940 })
2941 .await?
2942 } else if capabilities
2943 .document_range_formatting_provider
2944 .as_ref()
2945 .map_or(false, |provider| *provider != lsp::OneOf::Left(false))
2946 {
2947 let buffer_start = lsp::Position::new(0, 0);
2948 let buffer_end =
2949 buffer.read_with(cx, |buffer, _| point_to_lsp(buffer.max_point_utf16()));
2950 language_server
2951 .request::<lsp::request::RangeFormatting>(lsp::DocumentRangeFormattingParams {
2952 text_document,
2953 range: lsp::Range::new(buffer_start, buffer_end),
2954 options: lsp::FormattingOptions {
2955 tab_size: tab_size.into(),
2956 insert_spaces: true,
2957 insert_final_newline: Some(true),
2958 ..Default::default()
2959 },
2960 work_done_progress_params: Default::default(),
2961 })
2962 .await?
2963 } else {
2964 None
2965 };
2966
2967 if let Some(lsp_edits) = lsp_edits {
2968 let edits = this
2969 .update(cx, |this, cx| {
2970 this.edits_from_lsp(buffer, lsp_edits, None, cx)
2971 })
2972 .await?;
2973 buffer.update(cx, |buffer, cx| {
2974 buffer.finalize_last_transaction();
2975 buffer.start_transaction();
2976 for (range, text) in edits {
2977 buffer.edit([(range, text)], None, cx);
2978 }
2979 if buffer.end_transaction(cx).is_some() {
2980 let transaction = buffer.finalize_last_transaction().unwrap().clone();
2981 Ok(Some(transaction))
2982 } else {
2983 Ok(None)
2984 }
2985 })
2986 } else {
2987 Ok(None)
2988 }
2989 }
2990
2991 async fn format_via_external_command(
2992 buffer: &ModelHandle<Buffer>,
2993 buffer_abs_path: &Path,
2994 command: &str,
2995 arguments: &[String],
2996 cx: &mut AsyncAppContext,
2997 ) -> Result<Option<Transaction>> {
2998 let working_dir_path = buffer.read_with(cx, |buffer, cx| {
2999 let file = File::from_dyn(buffer.file())?;
3000 let worktree = file.worktree.read(cx).as_local()?;
3001 let mut worktree_path = worktree.abs_path().to_path_buf();
3002 if worktree.root_entry()?.is_file() {
3003 worktree_path.pop();
3004 }
3005 Some(worktree_path)
3006 });
3007
3008 if let Some(working_dir_path) = working_dir_path {
3009 let mut child =
3010 smol::process::Command::new(command)
3011 .args(arguments.iter().map(|arg| {
3012 arg.replace("{buffer_path}", &buffer_abs_path.to_string_lossy())
3013 }))
3014 .current_dir(&working_dir_path)
3015 .stdin(smol::process::Stdio::piped())
3016 .stdout(smol::process::Stdio::piped())
3017 .stderr(smol::process::Stdio::piped())
3018 .spawn()?;
3019 let stdin = child
3020 .stdin
3021 .as_mut()
3022 .ok_or_else(|| anyhow!("failed to acquire stdin"))?;
3023 let text = buffer.read_with(cx, |buffer, _| buffer.as_rope().clone());
3024 for chunk in text.chunks() {
3025 stdin.write_all(chunk.as_bytes()).await?;
3026 }
3027 stdin.flush().await?;
3028
3029 let output = child.output().await?;
3030 if !output.status.success() {
3031 return Err(anyhow!(
3032 "command failed with exit code {:?}:\nstdout: {}\nstderr: {}",
3033 output.status.code(),
3034 String::from_utf8_lossy(&output.stdout),
3035 String::from_utf8_lossy(&output.stderr),
3036 ));
3037 }
3038
3039 let stdout = String::from_utf8(output.stdout)?;
3040 let diff = buffer
3041 .read_with(cx, |buffer, cx| buffer.diff(stdout, cx))
3042 .await;
3043 Ok(buffer.update(cx, |buffer, cx| buffer.apply_diff(diff, cx).cloned()))
3044 } else {
3045 Ok(None)
3046 }
3047 }
3048
3049 pub fn definition<T: ToPointUtf16>(
3050 &self,
3051 buffer: &ModelHandle<Buffer>,
3052 position: T,
3053 cx: &mut ModelContext<Self>,
3054 ) -> Task<Result<Vec<LocationLink>>> {
3055 let position = position.to_point_utf16(buffer.read(cx));
3056 self.request_lsp(buffer.clone(), GetDefinition { position }, cx)
3057 }
3058
3059 pub fn type_definition<T: ToPointUtf16>(
3060 &self,
3061 buffer: &ModelHandle<Buffer>,
3062 position: T,
3063 cx: &mut ModelContext<Self>,
3064 ) -> Task<Result<Vec<LocationLink>>> {
3065 let position = position.to_point_utf16(buffer.read(cx));
3066 self.request_lsp(buffer.clone(), GetTypeDefinition { position }, cx)
3067 }
3068
3069 pub fn references<T: ToPointUtf16>(
3070 &self,
3071 buffer: &ModelHandle<Buffer>,
3072 position: T,
3073 cx: &mut ModelContext<Self>,
3074 ) -> Task<Result<Vec<Location>>> {
3075 let position = position.to_point_utf16(buffer.read(cx));
3076 self.request_lsp(buffer.clone(), GetReferences { position }, cx)
3077 }
3078
3079 pub fn document_highlights<T: ToPointUtf16>(
3080 &self,
3081 buffer: &ModelHandle<Buffer>,
3082 position: T,
3083 cx: &mut ModelContext<Self>,
3084 ) -> Task<Result<Vec<DocumentHighlight>>> {
3085 let position = position.to_point_utf16(buffer.read(cx));
3086 self.request_lsp(buffer.clone(), GetDocumentHighlights { position }, cx)
3087 }
3088
3089 pub fn symbols(&self, query: &str, cx: &mut ModelContext<Self>) -> Task<Result<Vec<Symbol>>> {
3090 if self.is_local() {
3091 let mut requests = Vec::new();
3092 for ((worktree_id, _), server_id) in self.language_server_ids.iter() {
3093 let worktree_id = *worktree_id;
3094 if let Some(worktree) = self
3095 .worktree_for_id(worktree_id, cx)
3096 .and_then(|worktree| worktree.read(cx).as_local())
3097 {
3098 if let Some(LanguageServerState::Running {
3099 adapter,
3100 language,
3101 server,
3102 }) = self.language_servers.get(server_id)
3103 {
3104 let adapter = adapter.clone();
3105 let language = language.clone();
3106 let worktree_abs_path = worktree.abs_path().clone();
3107 requests.push(
3108 server
3109 .request::<lsp::request::WorkspaceSymbol>(
3110 lsp::WorkspaceSymbolParams {
3111 query: query.to_string(),
3112 ..Default::default()
3113 },
3114 )
3115 .log_err()
3116 .map(move |response| {
3117 (
3118 adapter,
3119 language,
3120 worktree_id,
3121 worktree_abs_path,
3122 response.unwrap_or_default(),
3123 )
3124 }),
3125 );
3126 }
3127 }
3128 }
3129
3130 cx.spawn_weak(|this, cx| async move {
3131 let responses = futures::future::join_all(requests).await;
3132 let this = if let Some(this) = this.upgrade(&cx) {
3133 this
3134 } else {
3135 return Ok(Default::default());
3136 };
3137 let symbols = this.read_with(&cx, |this, cx| {
3138 let mut symbols = Vec::new();
3139 for (
3140 adapter,
3141 adapter_language,
3142 source_worktree_id,
3143 worktree_abs_path,
3144 response,
3145 ) in responses
3146 {
3147 symbols.extend(response.into_iter().flatten().filter_map(|lsp_symbol| {
3148 let abs_path = lsp_symbol.location.uri.to_file_path().ok()?;
3149 let mut worktree_id = source_worktree_id;
3150 let path;
3151 if let Some((worktree, rel_path)) =
3152 this.find_local_worktree(&abs_path, cx)
3153 {
3154 worktree_id = worktree.read(cx).id();
3155 path = rel_path;
3156 } else {
3157 path = relativize_path(&worktree_abs_path, &abs_path);
3158 }
3159
3160 let project_path = ProjectPath {
3161 worktree_id,
3162 path: path.into(),
3163 };
3164 let signature = this.symbol_signature(&project_path);
3165 let language = this
3166 .languages
3167 .select_language(&project_path.path)
3168 .unwrap_or(adapter_language.clone());
3169 let language_server_name = adapter.name.clone();
3170 Some(async move {
3171 let label = language
3172 .label_for_symbol(&lsp_symbol.name, lsp_symbol.kind)
3173 .await;
3174
3175 Symbol {
3176 language_server_name,
3177 source_worktree_id,
3178 path: project_path,
3179 label: label.unwrap_or_else(|| {
3180 CodeLabel::plain(lsp_symbol.name.clone(), None)
3181 }),
3182 kind: lsp_symbol.kind,
3183 name: lsp_symbol.name,
3184 range: range_from_lsp(lsp_symbol.location.range),
3185 signature,
3186 }
3187 })
3188 }));
3189 }
3190 symbols
3191 });
3192 Ok(futures::future::join_all(symbols).await)
3193 })
3194 } else if let Some(project_id) = self.remote_id() {
3195 let request = self.client.request(proto::GetProjectSymbols {
3196 project_id,
3197 query: query.to_string(),
3198 });
3199 cx.spawn_weak(|this, cx| async move {
3200 let response = request.await?;
3201 let mut symbols = Vec::new();
3202 if let Some(this) = this.upgrade(&cx) {
3203 let new_symbols = this.read_with(&cx, |this, _| {
3204 response
3205 .symbols
3206 .into_iter()
3207 .map(|symbol| this.deserialize_symbol(symbol))
3208 .collect::<Vec<_>>()
3209 });
3210 symbols = futures::future::join_all(new_symbols)
3211 .await
3212 .into_iter()
3213 .filter_map(|symbol| symbol.log_err())
3214 .collect::<Vec<_>>();
3215 }
3216 Ok(symbols)
3217 })
3218 } else {
3219 Task::ready(Ok(Default::default()))
3220 }
3221 }
3222
3223 pub fn open_buffer_for_symbol(
3224 &mut self,
3225 symbol: &Symbol,
3226 cx: &mut ModelContext<Self>,
3227 ) -> Task<Result<ModelHandle<Buffer>>> {
3228 if self.is_local() {
3229 let language_server_id = if let Some(id) = self.language_server_ids.get(&(
3230 symbol.source_worktree_id,
3231 symbol.language_server_name.clone(),
3232 )) {
3233 *id
3234 } else {
3235 return Task::ready(Err(anyhow!(
3236 "language server for worktree and language not found"
3237 )));
3238 };
3239
3240 let worktree_abs_path = if let Some(worktree_abs_path) = self
3241 .worktree_for_id(symbol.path.worktree_id, cx)
3242 .and_then(|worktree| worktree.read(cx).as_local())
3243 .map(|local_worktree| local_worktree.abs_path())
3244 {
3245 worktree_abs_path
3246 } else {
3247 return Task::ready(Err(anyhow!("worktree not found for symbol")));
3248 };
3249 let symbol_abs_path = worktree_abs_path.join(&symbol.path.path);
3250 let symbol_uri = if let Ok(uri) = lsp::Url::from_file_path(symbol_abs_path) {
3251 uri
3252 } else {
3253 return Task::ready(Err(anyhow!("invalid symbol path")));
3254 };
3255
3256 self.open_local_buffer_via_lsp(
3257 symbol_uri,
3258 language_server_id,
3259 symbol.language_server_name.clone(),
3260 cx,
3261 )
3262 } else if let Some(project_id) = self.remote_id() {
3263 let request = self.client.request(proto::OpenBufferForSymbol {
3264 project_id,
3265 symbol: Some(serialize_symbol(symbol)),
3266 });
3267 cx.spawn(|this, mut cx| async move {
3268 let response = request.await?;
3269 this.update(&mut cx, |this, cx| {
3270 this.wait_for_buffer(response.buffer_id, cx)
3271 })
3272 .await
3273 })
3274 } else {
3275 Task::ready(Err(anyhow!("project does not have a remote id")))
3276 }
3277 }
3278
3279 pub fn hover<T: ToPointUtf16>(
3280 &self,
3281 buffer: &ModelHandle<Buffer>,
3282 position: T,
3283 cx: &mut ModelContext<Self>,
3284 ) -> Task<Result<Option<Hover>>> {
3285 let position = position.to_point_utf16(buffer.read(cx));
3286 self.request_lsp(buffer.clone(), GetHover { position }, cx)
3287 }
3288
3289 pub fn completions<T: ToPointUtf16>(
3290 &self,
3291 source_buffer_handle: &ModelHandle<Buffer>,
3292 position: T,
3293 cx: &mut ModelContext<Self>,
3294 ) -> Task<Result<Vec<Completion>>> {
3295 let source_buffer_handle = source_buffer_handle.clone();
3296 let source_buffer = source_buffer_handle.read(cx);
3297 let buffer_id = source_buffer.remote_id();
3298 let language = source_buffer.language().cloned();
3299 let worktree;
3300 let buffer_abs_path;
3301 if let Some(file) = File::from_dyn(source_buffer.file()) {
3302 worktree = file.worktree.clone();
3303 buffer_abs_path = file.as_local().map(|f| f.abs_path(cx));
3304 } else {
3305 return Task::ready(Ok(Default::default()));
3306 };
3307
3308 let position = Unclipped(position.to_point_utf16(source_buffer));
3309 let anchor = source_buffer.anchor_after(position);
3310
3311 if worktree.read(cx).as_local().is_some() {
3312 let buffer_abs_path = buffer_abs_path.unwrap();
3313 let lang_server =
3314 if let Some((_, server)) = self.language_server_for_buffer(source_buffer, cx) {
3315 server.clone()
3316 } else {
3317 return Task::ready(Ok(Default::default()));
3318 };
3319
3320 cx.spawn(|_, cx| async move {
3321 let completions = lang_server
3322 .request::<lsp::request::Completion>(lsp::CompletionParams {
3323 text_document_position: lsp::TextDocumentPositionParams::new(
3324 lsp::TextDocumentIdentifier::new(
3325 lsp::Url::from_file_path(buffer_abs_path).unwrap(),
3326 ),
3327 point_to_lsp(position.0),
3328 ),
3329 context: Default::default(),
3330 work_done_progress_params: Default::default(),
3331 partial_result_params: Default::default(),
3332 })
3333 .await
3334 .context("lsp completion request failed")?;
3335
3336 let completions = if let Some(completions) = completions {
3337 match completions {
3338 lsp::CompletionResponse::Array(completions) => completions,
3339 lsp::CompletionResponse::List(list) => list.items,
3340 }
3341 } else {
3342 Default::default()
3343 };
3344
3345 let completions = source_buffer_handle.read_with(&cx, |this, _| {
3346 let snapshot = this.snapshot();
3347 let clipped_position = this.clip_point_utf16(position, Bias::Left);
3348 let mut range_for_token = None;
3349 completions
3350 .into_iter()
3351 .filter_map(move |mut lsp_completion| {
3352 // For now, we can only handle additional edits if they are returned
3353 // when resolving the completion, not if they are present initially.
3354 if lsp_completion
3355 .additional_text_edits
3356 .as_ref()
3357 .map_or(false, |edits| !edits.is_empty())
3358 {
3359 return None;
3360 }
3361
3362 let (old_range, mut new_text) = match lsp_completion.text_edit.as_ref()
3363 {
3364 // If the language server provides a range to overwrite, then
3365 // check that the range is valid.
3366 Some(lsp::CompletionTextEdit::Edit(edit)) => {
3367 let range = range_from_lsp(edit.range);
3368 let start = snapshot.clip_point_utf16(range.start, Bias::Left);
3369 let end = snapshot.clip_point_utf16(range.end, Bias::Left);
3370 if start != range.start.0 || end != range.end.0 {
3371 log::info!("completion out of expected range");
3372 return None;
3373 }
3374 (
3375 snapshot.anchor_before(start)..snapshot.anchor_after(end),
3376 edit.new_text.clone(),
3377 )
3378 }
3379 // If the language server does not provide a range, then infer
3380 // the range based on the syntax tree.
3381 None => {
3382 if position.0 != clipped_position {
3383 log::info!("completion out of expected range");
3384 return None;
3385 }
3386 let Range { start, end } = range_for_token
3387 .get_or_insert_with(|| {
3388 let offset = position.to_offset(&snapshot);
3389 let (range, kind) = snapshot.surrounding_word(offset);
3390 if kind == Some(CharKind::Word) {
3391 range
3392 } else {
3393 offset..offset
3394 }
3395 })
3396 .clone();
3397 let text = lsp_completion
3398 .insert_text
3399 .as_ref()
3400 .unwrap_or(&lsp_completion.label)
3401 .clone();
3402 (
3403 snapshot.anchor_before(start)..snapshot.anchor_after(end),
3404 text,
3405 )
3406 }
3407 Some(lsp::CompletionTextEdit::InsertAndReplace(_)) => {
3408 log::info!("unsupported insert/replace completion");
3409 return None;
3410 }
3411 };
3412
3413 LineEnding::normalize(&mut new_text);
3414 let language = language.clone();
3415 Some(async move {
3416 let mut label = None;
3417 if let Some(language) = language {
3418 language.process_completion(&mut lsp_completion).await;
3419 label = language.label_for_completion(&lsp_completion).await;
3420 }
3421 Completion {
3422 old_range,
3423 new_text,
3424 label: label.unwrap_or_else(|| {
3425 CodeLabel::plain(
3426 lsp_completion.label.clone(),
3427 lsp_completion.filter_text.as_deref(),
3428 )
3429 }),
3430 lsp_completion,
3431 }
3432 })
3433 })
3434 });
3435
3436 Ok(futures::future::join_all(completions).await)
3437 })
3438 } else if let Some(project_id) = self.remote_id() {
3439 let rpc = self.client.clone();
3440 let message = proto::GetCompletions {
3441 project_id,
3442 buffer_id,
3443 position: Some(language::proto::serialize_anchor(&anchor)),
3444 version: serialize_version(&source_buffer.version()),
3445 };
3446 cx.spawn_weak(|this, mut cx| async move {
3447 let response = rpc.request(message).await?;
3448
3449 if this
3450 .upgrade(&cx)
3451 .ok_or_else(|| anyhow!("project was dropped"))?
3452 .read_with(&cx, |this, _| this.is_read_only())
3453 {
3454 return Err(anyhow!(
3455 "failed to get completions: project was disconnected"
3456 ));
3457 } else {
3458 source_buffer_handle
3459 .update(&mut cx, |buffer, _| {
3460 buffer.wait_for_version(deserialize_version(response.version))
3461 })
3462 .await;
3463
3464 let completions = response.completions.into_iter().map(|completion| {
3465 language::proto::deserialize_completion(completion, language.clone())
3466 });
3467 futures::future::try_join_all(completions).await
3468 }
3469 })
3470 } else {
3471 Task::ready(Ok(Default::default()))
3472 }
3473 }
3474
3475 pub fn apply_additional_edits_for_completion(
3476 &self,
3477 buffer_handle: ModelHandle<Buffer>,
3478 completion: Completion,
3479 push_to_history: bool,
3480 cx: &mut ModelContext<Self>,
3481 ) -> Task<Result<Option<Transaction>>> {
3482 let buffer = buffer_handle.read(cx);
3483 let buffer_id = buffer.remote_id();
3484
3485 if self.is_local() {
3486 let lang_server = match self.language_server_for_buffer(buffer, cx) {
3487 Some((_, server)) => server.clone(),
3488 _ => return Task::ready(Ok(Default::default())),
3489 };
3490
3491 cx.spawn(|this, mut cx| async move {
3492 let resolved_completion = lang_server
3493 .request::<lsp::request::ResolveCompletionItem>(completion.lsp_completion)
3494 .await?;
3495
3496 if let Some(edits) = resolved_completion.additional_text_edits {
3497 let edits = this
3498 .update(&mut cx, |this, cx| {
3499 this.edits_from_lsp(&buffer_handle, edits, None, cx)
3500 })
3501 .await?;
3502
3503 buffer_handle.update(&mut cx, |buffer, cx| {
3504 buffer.finalize_last_transaction();
3505 buffer.start_transaction();
3506
3507 for (range, text) in edits {
3508 let primary = &completion.old_range;
3509 let start_within = primary.start.cmp(&range.start, buffer).is_le()
3510 && primary.end.cmp(&range.start, buffer).is_ge();
3511 let end_within = range.start.cmp(&primary.end, buffer).is_le()
3512 && range.end.cmp(&primary.end, buffer).is_ge();
3513
3514 //Skip addtional edits which overlap with the primary completion edit
3515 //https://github.com/zed-industries/zed/pull/1871
3516 if !start_within && !end_within {
3517 buffer.edit([(range, text)], None, cx);
3518 }
3519 }
3520
3521 let transaction = if buffer.end_transaction(cx).is_some() {
3522 let transaction = buffer.finalize_last_transaction().unwrap().clone();
3523 if !push_to_history {
3524 buffer.forget_transaction(transaction.id);
3525 }
3526 Some(transaction)
3527 } else {
3528 None
3529 };
3530 Ok(transaction)
3531 })
3532 } else {
3533 Ok(None)
3534 }
3535 })
3536 } else if let Some(project_id) = self.remote_id() {
3537 let client = self.client.clone();
3538 cx.spawn(|_, mut cx| async move {
3539 let response = client
3540 .request(proto::ApplyCompletionAdditionalEdits {
3541 project_id,
3542 buffer_id,
3543 completion: Some(language::proto::serialize_completion(&completion)),
3544 })
3545 .await?;
3546
3547 if let Some(transaction) = response.transaction {
3548 let transaction = language::proto::deserialize_transaction(transaction)?;
3549 buffer_handle
3550 .update(&mut cx, |buffer, _| {
3551 buffer.wait_for_edits(transaction.edit_ids.iter().copied())
3552 })
3553 .await;
3554 if push_to_history {
3555 buffer_handle.update(&mut cx, |buffer, _| {
3556 buffer.push_transaction(transaction.clone(), Instant::now());
3557 });
3558 }
3559 Ok(Some(transaction))
3560 } else {
3561 Ok(None)
3562 }
3563 })
3564 } else {
3565 Task::ready(Err(anyhow!("project does not have a remote id")))
3566 }
3567 }
3568
3569 pub fn code_actions<T: Clone + ToOffset>(
3570 &self,
3571 buffer_handle: &ModelHandle<Buffer>,
3572 range: Range<T>,
3573 cx: &mut ModelContext<Self>,
3574 ) -> Task<Result<Vec<CodeAction>>> {
3575 let buffer_handle = buffer_handle.clone();
3576 let buffer = buffer_handle.read(cx);
3577 let snapshot = buffer.snapshot();
3578 let relevant_diagnostics = snapshot
3579 .diagnostics_in_range::<usize, usize>(range.to_offset(&snapshot), false)
3580 .map(|entry| entry.to_lsp_diagnostic_stub())
3581 .collect();
3582 let buffer_id = buffer.remote_id();
3583 let worktree;
3584 let buffer_abs_path;
3585 if let Some(file) = File::from_dyn(buffer.file()) {
3586 worktree = file.worktree.clone();
3587 buffer_abs_path = file.as_local().map(|f| f.abs_path(cx));
3588 } else {
3589 return Task::ready(Ok(Default::default()));
3590 };
3591 let range = buffer.anchor_before(range.start)..buffer.anchor_before(range.end);
3592
3593 if worktree.read(cx).as_local().is_some() {
3594 let buffer_abs_path = buffer_abs_path.unwrap();
3595 let lang_server = if let Some((_, server)) = self.language_server_for_buffer(buffer, cx)
3596 {
3597 server.clone()
3598 } else {
3599 return Task::ready(Ok(Default::default()));
3600 };
3601
3602 let lsp_range = range_to_lsp(range.to_point_utf16(buffer));
3603 cx.foreground().spawn(async move {
3604 if lang_server.capabilities().code_action_provider.is_none() {
3605 return Ok(Default::default());
3606 }
3607
3608 Ok(lang_server
3609 .request::<lsp::request::CodeActionRequest>(lsp::CodeActionParams {
3610 text_document: lsp::TextDocumentIdentifier::new(
3611 lsp::Url::from_file_path(buffer_abs_path).unwrap(),
3612 ),
3613 range: lsp_range,
3614 work_done_progress_params: Default::default(),
3615 partial_result_params: Default::default(),
3616 context: lsp::CodeActionContext {
3617 diagnostics: relevant_diagnostics,
3618 only: Some(vec![
3619 lsp::CodeActionKind::EMPTY,
3620 lsp::CodeActionKind::QUICKFIX,
3621 lsp::CodeActionKind::REFACTOR,
3622 lsp::CodeActionKind::REFACTOR_EXTRACT,
3623 lsp::CodeActionKind::SOURCE,
3624 ]),
3625 },
3626 })
3627 .await?
3628 .unwrap_or_default()
3629 .into_iter()
3630 .filter_map(|entry| {
3631 if let lsp::CodeActionOrCommand::CodeAction(lsp_action) = entry {
3632 Some(CodeAction {
3633 range: range.clone(),
3634 lsp_action,
3635 })
3636 } else {
3637 None
3638 }
3639 })
3640 .collect())
3641 })
3642 } else if let Some(project_id) = self.remote_id() {
3643 let rpc = self.client.clone();
3644 let version = buffer.version();
3645 cx.spawn_weak(|this, mut cx| async move {
3646 let response = rpc
3647 .request(proto::GetCodeActions {
3648 project_id,
3649 buffer_id,
3650 start: Some(language::proto::serialize_anchor(&range.start)),
3651 end: Some(language::proto::serialize_anchor(&range.end)),
3652 version: serialize_version(&version),
3653 })
3654 .await?;
3655
3656 if this
3657 .upgrade(&cx)
3658 .ok_or_else(|| anyhow!("project was dropped"))?
3659 .read_with(&cx, |this, _| this.is_read_only())
3660 {
3661 return Err(anyhow!(
3662 "failed to get code actions: project was disconnected"
3663 ));
3664 } else {
3665 buffer_handle
3666 .update(&mut cx, |buffer, _| {
3667 buffer.wait_for_version(deserialize_version(response.version))
3668 })
3669 .await;
3670
3671 response
3672 .actions
3673 .into_iter()
3674 .map(language::proto::deserialize_code_action)
3675 .collect()
3676 }
3677 })
3678 } else {
3679 Task::ready(Ok(Default::default()))
3680 }
3681 }
3682
3683 pub fn apply_code_action(
3684 &self,
3685 buffer_handle: ModelHandle<Buffer>,
3686 mut action: CodeAction,
3687 push_to_history: bool,
3688 cx: &mut ModelContext<Self>,
3689 ) -> Task<Result<ProjectTransaction>> {
3690 if self.is_local() {
3691 let buffer = buffer_handle.read(cx);
3692 let (lsp_adapter, lang_server) =
3693 if let Some((adapter, server)) = self.language_server_for_buffer(buffer, cx) {
3694 (adapter.clone(), server.clone())
3695 } else {
3696 return Task::ready(Ok(Default::default()));
3697 };
3698 let range = action.range.to_point_utf16(buffer);
3699
3700 cx.spawn(|this, mut cx| async move {
3701 if let Some(lsp_range) = action
3702 .lsp_action
3703 .data
3704 .as_mut()
3705 .and_then(|d| d.get_mut("codeActionParams"))
3706 .and_then(|d| d.get_mut("range"))
3707 {
3708 *lsp_range = serde_json::to_value(&range_to_lsp(range)).unwrap();
3709 action.lsp_action = lang_server
3710 .request::<lsp::request::CodeActionResolveRequest>(action.lsp_action)
3711 .await?;
3712 } else {
3713 let actions = this
3714 .update(&mut cx, |this, cx| {
3715 this.code_actions(&buffer_handle, action.range, cx)
3716 })
3717 .await?;
3718 action.lsp_action = actions
3719 .into_iter()
3720 .find(|a| a.lsp_action.title == action.lsp_action.title)
3721 .ok_or_else(|| anyhow!("code action is outdated"))?
3722 .lsp_action;
3723 }
3724
3725 if let Some(edit) = action.lsp_action.edit {
3726 if edit.changes.is_some() || edit.document_changes.is_some() {
3727 return Self::deserialize_workspace_edit(
3728 this,
3729 edit,
3730 push_to_history,
3731 lsp_adapter.clone(),
3732 lang_server.clone(),
3733 &mut cx,
3734 )
3735 .await;
3736 }
3737 }
3738
3739 if let Some(command) = action.lsp_action.command {
3740 this.update(&mut cx, |this, _| {
3741 this.last_workspace_edits_by_language_server
3742 .remove(&lang_server.server_id());
3743 });
3744 lang_server
3745 .request::<lsp::request::ExecuteCommand>(lsp::ExecuteCommandParams {
3746 command: command.command,
3747 arguments: command.arguments.unwrap_or_default(),
3748 ..Default::default()
3749 })
3750 .await?;
3751 return Ok(this.update(&mut cx, |this, _| {
3752 this.last_workspace_edits_by_language_server
3753 .remove(&lang_server.server_id())
3754 .unwrap_or_default()
3755 }));
3756 }
3757
3758 Ok(ProjectTransaction::default())
3759 })
3760 } else if let Some(project_id) = self.remote_id() {
3761 let client = self.client.clone();
3762 let request = proto::ApplyCodeAction {
3763 project_id,
3764 buffer_id: buffer_handle.read(cx).remote_id(),
3765 action: Some(language::proto::serialize_code_action(&action)),
3766 };
3767 cx.spawn(|this, mut cx| async move {
3768 let response = client
3769 .request(request)
3770 .await?
3771 .transaction
3772 .ok_or_else(|| anyhow!("missing transaction"))?;
3773 this.update(&mut cx, |this, cx| {
3774 this.deserialize_project_transaction(response, push_to_history, cx)
3775 })
3776 .await
3777 })
3778 } else {
3779 Task::ready(Err(anyhow!("project does not have a remote id")))
3780 }
3781 }
3782
3783 async fn deserialize_workspace_edit(
3784 this: ModelHandle<Self>,
3785 edit: lsp::WorkspaceEdit,
3786 push_to_history: bool,
3787 lsp_adapter: Arc<CachedLspAdapter>,
3788 language_server: Arc<LanguageServer>,
3789 cx: &mut AsyncAppContext,
3790 ) -> Result<ProjectTransaction> {
3791 let fs = this.read_with(cx, |this, _| this.fs.clone());
3792 let mut operations = Vec::new();
3793 if let Some(document_changes) = edit.document_changes {
3794 match document_changes {
3795 lsp::DocumentChanges::Edits(edits) => {
3796 operations.extend(edits.into_iter().map(lsp::DocumentChangeOperation::Edit))
3797 }
3798 lsp::DocumentChanges::Operations(ops) => operations = ops,
3799 }
3800 } else if let Some(changes) = edit.changes {
3801 operations.extend(changes.into_iter().map(|(uri, edits)| {
3802 lsp::DocumentChangeOperation::Edit(lsp::TextDocumentEdit {
3803 text_document: lsp::OptionalVersionedTextDocumentIdentifier {
3804 uri,
3805 version: None,
3806 },
3807 edits: edits.into_iter().map(lsp::OneOf::Left).collect(),
3808 })
3809 }));
3810 }
3811
3812 let mut project_transaction = ProjectTransaction::default();
3813 for operation in operations {
3814 match operation {
3815 lsp::DocumentChangeOperation::Op(lsp::ResourceOp::Create(op)) => {
3816 let abs_path = op
3817 .uri
3818 .to_file_path()
3819 .map_err(|_| anyhow!("can't convert URI to path"))?;
3820
3821 if let Some(parent_path) = abs_path.parent() {
3822 fs.create_dir(parent_path).await?;
3823 }
3824 if abs_path.ends_with("/") {
3825 fs.create_dir(&abs_path).await?;
3826 } else {
3827 fs.create_file(&abs_path, op.options.map(Into::into).unwrap_or_default())
3828 .await?;
3829 }
3830 }
3831 lsp::DocumentChangeOperation::Op(lsp::ResourceOp::Rename(op)) => {
3832 let source_abs_path = op
3833 .old_uri
3834 .to_file_path()
3835 .map_err(|_| anyhow!("can't convert URI to path"))?;
3836 let target_abs_path = op
3837 .new_uri
3838 .to_file_path()
3839 .map_err(|_| anyhow!("can't convert URI to path"))?;
3840 fs.rename(
3841 &source_abs_path,
3842 &target_abs_path,
3843 op.options.map(Into::into).unwrap_or_default(),
3844 )
3845 .await?;
3846 }
3847 lsp::DocumentChangeOperation::Op(lsp::ResourceOp::Delete(op)) => {
3848 let abs_path = op
3849 .uri
3850 .to_file_path()
3851 .map_err(|_| anyhow!("can't convert URI to path"))?;
3852 let options = op.options.map(Into::into).unwrap_or_default();
3853 if abs_path.ends_with("/") {
3854 fs.remove_dir(&abs_path, options).await?;
3855 } else {
3856 fs.remove_file(&abs_path, options).await?;
3857 }
3858 }
3859 lsp::DocumentChangeOperation::Edit(op) => {
3860 let buffer_to_edit = this
3861 .update(cx, |this, cx| {
3862 this.open_local_buffer_via_lsp(
3863 op.text_document.uri,
3864 language_server.server_id(),
3865 lsp_adapter.name.clone(),
3866 cx,
3867 )
3868 })
3869 .await?;
3870
3871 let edits = this
3872 .update(cx, |this, cx| {
3873 let edits = op.edits.into_iter().map(|edit| match edit {
3874 lsp::OneOf::Left(edit) => edit,
3875 lsp::OneOf::Right(edit) => edit.text_edit,
3876 });
3877 this.edits_from_lsp(
3878 &buffer_to_edit,
3879 edits,
3880 op.text_document.version,
3881 cx,
3882 )
3883 })
3884 .await?;
3885
3886 let transaction = buffer_to_edit.update(cx, |buffer, cx| {
3887 buffer.finalize_last_transaction();
3888 buffer.start_transaction();
3889 for (range, text) in edits {
3890 buffer.edit([(range, text)], None, cx);
3891 }
3892 let transaction = if buffer.end_transaction(cx).is_some() {
3893 let transaction = buffer.finalize_last_transaction().unwrap().clone();
3894 if !push_to_history {
3895 buffer.forget_transaction(transaction.id);
3896 }
3897 Some(transaction)
3898 } else {
3899 None
3900 };
3901
3902 transaction
3903 });
3904 if let Some(transaction) = transaction {
3905 project_transaction.0.insert(buffer_to_edit, transaction);
3906 }
3907 }
3908 }
3909 }
3910
3911 Ok(project_transaction)
3912 }
3913
3914 pub fn prepare_rename<T: ToPointUtf16>(
3915 &self,
3916 buffer: ModelHandle<Buffer>,
3917 position: T,
3918 cx: &mut ModelContext<Self>,
3919 ) -> Task<Result<Option<Range<Anchor>>>> {
3920 let position = position.to_point_utf16(buffer.read(cx));
3921 self.request_lsp(buffer, PrepareRename { position }, cx)
3922 }
3923
3924 pub fn perform_rename<T: ToPointUtf16>(
3925 &self,
3926 buffer: ModelHandle<Buffer>,
3927 position: T,
3928 new_name: String,
3929 push_to_history: bool,
3930 cx: &mut ModelContext<Self>,
3931 ) -> Task<Result<ProjectTransaction>> {
3932 let position = position.to_point_utf16(buffer.read(cx));
3933 self.request_lsp(
3934 buffer,
3935 PerformRename {
3936 position,
3937 new_name,
3938 push_to_history,
3939 },
3940 cx,
3941 )
3942 }
3943
3944 #[allow(clippy::type_complexity)]
3945 pub fn search(
3946 &self,
3947 query: SearchQuery,
3948 cx: &mut ModelContext<Self>,
3949 ) -> Task<Result<HashMap<ModelHandle<Buffer>, Vec<Range<Anchor>>>>> {
3950 if self.is_local() {
3951 let snapshots = self
3952 .visible_worktrees(cx)
3953 .filter_map(|tree| {
3954 let tree = tree.read(cx).as_local()?;
3955 Some(tree.snapshot())
3956 })
3957 .collect::<Vec<_>>();
3958
3959 let background = cx.background().clone();
3960 let path_count: usize = snapshots.iter().map(|s| s.visible_file_count()).sum();
3961 if path_count == 0 {
3962 return Task::ready(Ok(Default::default()));
3963 }
3964 let workers = background.num_cpus().min(path_count);
3965 let (matching_paths_tx, mut matching_paths_rx) = smol::channel::bounded(1024);
3966 cx.background()
3967 .spawn({
3968 let fs = self.fs.clone();
3969 let background = cx.background().clone();
3970 let query = query.clone();
3971 async move {
3972 let fs = &fs;
3973 let query = &query;
3974 let matching_paths_tx = &matching_paths_tx;
3975 let paths_per_worker = (path_count + workers - 1) / workers;
3976 let snapshots = &snapshots;
3977 background
3978 .scoped(|scope| {
3979 for worker_ix in 0..workers {
3980 let worker_start_ix = worker_ix * paths_per_worker;
3981 let worker_end_ix = worker_start_ix + paths_per_worker;
3982 scope.spawn(async move {
3983 let mut snapshot_start_ix = 0;
3984 let mut abs_path = PathBuf::new();
3985 for snapshot in snapshots {
3986 let snapshot_end_ix =
3987 snapshot_start_ix + snapshot.visible_file_count();
3988 if worker_end_ix <= snapshot_start_ix {
3989 break;
3990 } else if worker_start_ix > snapshot_end_ix {
3991 snapshot_start_ix = snapshot_end_ix;
3992 continue;
3993 } else {
3994 let start_in_snapshot = worker_start_ix
3995 .saturating_sub(snapshot_start_ix);
3996 let end_in_snapshot =
3997 cmp::min(worker_end_ix, snapshot_end_ix)
3998 - snapshot_start_ix;
3999
4000 for entry in snapshot
4001 .files(false, start_in_snapshot)
4002 .take(end_in_snapshot - start_in_snapshot)
4003 {
4004 if matching_paths_tx.is_closed() {
4005 break;
4006 }
4007
4008 abs_path.clear();
4009 abs_path.push(&snapshot.abs_path());
4010 abs_path.push(&entry.path);
4011 let matches = if let Some(file) =
4012 fs.open_sync(&abs_path).await.log_err()
4013 {
4014 query.detect(file).unwrap_or(false)
4015 } else {
4016 false
4017 };
4018
4019 if matches {
4020 let project_path =
4021 (snapshot.id(), entry.path.clone());
4022 if matching_paths_tx
4023 .send(project_path)
4024 .await
4025 .is_err()
4026 {
4027 break;
4028 }
4029 }
4030 }
4031
4032 snapshot_start_ix = snapshot_end_ix;
4033 }
4034 }
4035 });
4036 }
4037 })
4038 .await;
4039 }
4040 })
4041 .detach();
4042
4043 let (buffers_tx, buffers_rx) = smol::channel::bounded(1024);
4044 let open_buffers = self
4045 .opened_buffers
4046 .values()
4047 .filter_map(|b| b.upgrade(cx))
4048 .collect::<HashSet<_>>();
4049 cx.spawn(|this, cx| async move {
4050 for buffer in &open_buffers {
4051 let snapshot = buffer.read_with(&cx, |buffer, _| buffer.snapshot());
4052 buffers_tx.send((buffer.clone(), snapshot)).await?;
4053 }
4054
4055 let open_buffers = Rc::new(RefCell::new(open_buffers));
4056 while let Some(project_path) = matching_paths_rx.next().await {
4057 if buffers_tx.is_closed() {
4058 break;
4059 }
4060
4061 let this = this.clone();
4062 let open_buffers = open_buffers.clone();
4063 let buffers_tx = buffers_tx.clone();
4064 cx.spawn(|mut cx| async move {
4065 if let Some(buffer) = this
4066 .update(&mut cx, |this, cx| this.open_buffer(project_path, cx))
4067 .await
4068 .log_err()
4069 {
4070 if open_buffers.borrow_mut().insert(buffer.clone()) {
4071 let snapshot = buffer.read_with(&cx, |buffer, _| buffer.snapshot());
4072 buffers_tx.send((buffer, snapshot)).await?;
4073 }
4074 }
4075
4076 Ok::<_, anyhow::Error>(())
4077 })
4078 .detach();
4079 }
4080
4081 Ok::<_, anyhow::Error>(())
4082 })
4083 .detach_and_log_err(cx);
4084
4085 let background = cx.background().clone();
4086 cx.background().spawn(async move {
4087 let query = &query;
4088 let mut matched_buffers = Vec::new();
4089 for _ in 0..workers {
4090 matched_buffers.push(HashMap::default());
4091 }
4092 background
4093 .scoped(|scope| {
4094 for worker_matched_buffers in matched_buffers.iter_mut() {
4095 let mut buffers_rx = buffers_rx.clone();
4096 scope.spawn(async move {
4097 while let Some((buffer, snapshot)) = buffers_rx.next().await {
4098 let buffer_matches = query
4099 .search(snapshot.as_rope())
4100 .await
4101 .iter()
4102 .map(|range| {
4103 snapshot.anchor_before(range.start)
4104 ..snapshot.anchor_after(range.end)
4105 })
4106 .collect::<Vec<_>>();
4107 if !buffer_matches.is_empty() {
4108 worker_matched_buffers
4109 .insert(buffer.clone(), buffer_matches);
4110 }
4111 }
4112 });
4113 }
4114 })
4115 .await;
4116 Ok(matched_buffers.into_iter().flatten().collect())
4117 })
4118 } else if let Some(project_id) = self.remote_id() {
4119 let request = self.client.request(query.to_proto(project_id));
4120 cx.spawn(|this, mut cx| async move {
4121 let response = request.await?;
4122 let mut result = HashMap::default();
4123 for location in response.locations {
4124 let target_buffer = this
4125 .update(&mut cx, |this, cx| {
4126 this.wait_for_buffer(location.buffer_id, cx)
4127 })
4128 .await?;
4129 let start = location
4130 .start
4131 .and_then(deserialize_anchor)
4132 .ok_or_else(|| anyhow!("missing target start"))?;
4133 let end = location
4134 .end
4135 .and_then(deserialize_anchor)
4136 .ok_or_else(|| anyhow!("missing target end"))?;
4137 result
4138 .entry(target_buffer)
4139 .or_insert(Vec::new())
4140 .push(start..end)
4141 }
4142 Ok(result)
4143 })
4144 } else {
4145 Task::ready(Ok(Default::default()))
4146 }
4147 }
4148
4149 fn request_lsp<R: LspCommand>(
4150 &self,
4151 buffer_handle: ModelHandle<Buffer>,
4152 request: R,
4153 cx: &mut ModelContext<Self>,
4154 ) -> Task<Result<R::Response>>
4155 where
4156 <R::LspRequest as lsp::request::Request>::Result: Send,
4157 {
4158 let buffer = buffer_handle.read(cx);
4159 if self.is_local() {
4160 let file = File::from_dyn(buffer.file()).and_then(File::as_local);
4161 if let Some((file, language_server)) = file.zip(
4162 self.language_server_for_buffer(buffer, cx)
4163 .map(|(_, server)| server.clone()),
4164 ) {
4165 let lsp_params = request.to_lsp(&file.abs_path(cx), cx);
4166 return cx.spawn(|this, cx| async move {
4167 if !request.check_capabilities(language_server.capabilities()) {
4168 return Ok(Default::default());
4169 }
4170
4171 let response = language_server
4172 .request::<R::LspRequest>(lsp_params)
4173 .await
4174 .context("lsp request failed")?;
4175 request
4176 .response_from_lsp(response, this, buffer_handle, cx)
4177 .await
4178 });
4179 }
4180 } else if let Some(project_id) = self.remote_id() {
4181 let rpc = self.client.clone();
4182 let message = request.to_proto(project_id, buffer);
4183 return cx.spawn(|this, cx| async move {
4184 let response = rpc.request(message).await?;
4185 if this.read_with(&cx, |this, _| this.is_read_only()) {
4186 Err(anyhow!("disconnected before completing request"))
4187 } else {
4188 request
4189 .response_from_proto(response, this, buffer_handle, cx)
4190 .await
4191 }
4192 });
4193 }
4194 Task::ready(Ok(Default::default()))
4195 }
4196
4197 pub fn find_or_create_local_worktree(
4198 &mut self,
4199 abs_path: impl AsRef<Path>,
4200 visible: bool,
4201 cx: &mut ModelContext<Self>,
4202 ) -> Task<Result<(ModelHandle<Worktree>, PathBuf)>> {
4203 let abs_path = abs_path.as_ref();
4204 if let Some((tree, relative_path)) = self.find_local_worktree(abs_path, cx) {
4205 Task::ready(Ok((tree, relative_path)))
4206 } else {
4207 let worktree = self.create_local_worktree(abs_path, visible, cx);
4208 cx.foreground()
4209 .spawn(async move { Ok((worktree.await?, PathBuf::new())) })
4210 }
4211 }
4212
4213 pub fn find_local_worktree(
4214 &self,
4215 abs_path: &Path,
4216 cx: &AppContext,
4217 ) -> Option<(ModelHandle<Worktree>, PathBuf)> {
4218 for tree in &self.worktrees {
4219 if let Some(tree) = tree.upgrade(cx) {
4220 if let Some(relative_path) = tree
4221 .read(cx)
4222 .as_local()
4223 .and_then(|t| abs_path.strip_prefix(t.abs_path()).ok())
4224 {
4225 return Some((tree.clone(), relative_path.into()));
4226 }
4227 }
4228 }
4229 None
4230 }
4231
4232 pub fn is_shared(&self) -> bool {
4233 match &self.client_state {
4234 Some(ProjectClientState::Local { .. }) => true,
4235 _ => false,
4236 }
4237 }
4238
4239 fn create_local_worktree(
4240 &mut self,
4241 abs_path: impl AsRef<Path>,
4242 visible: bool,
4243 cx: &mut ModelContext<Self>,
4244 ) -> Task<Result<ModelHandle<Worktree>>> {
4245 let fs = self.fs.clone();
4246 let client = self.client.clone();
4247 let next_entry_id = self.next_entry_id.clone();
4248 let path: Arc<Path> = abs_path.as_ref().into();
4249 let task = self
4250 .loading_local_worktrees
4251 .entry(path.clone())
4252 .or_insert_with(|| {
4253 cx.spawn(|project, mut cx| {
4254 async move {
4255 let worktree = Worktree::local(
4256 client.clone(),
4257 path.clone(),
4258 visible,
4259 fs,
4260 next_entry_id,
4261 &mut cx,
4262 )
4263 .await;
4264 project.update(&mut cx, |project, _| {
4265 project.loading_local_worktrees.remove(&path);
4266 });
4267 let worktree = worktree?;
4268
4269 project
4270 .update(&mut cx, |project, cx| project.add_worktree(&worktree, cx))
4271 .await;
4272
4273 if let Some(project_id) =
4274 project.read_with(&cx, |project, _| project.remote_id())
4275 {
4276 worktree
4277 .update(&mut cx, |worktree, cx| {
4278 worktree.as_local_mut().unwrap().share(project_id, cx)
4279 })
4280 .await
4281 .log_err();
4282 }
4283
4284 Ok(worktree)
4285 }
4286 .map_err(Arc::new)
4287 })
4288 .shared()
4289 })
4290 .clone();
4291 cx.foreground().spawn(async move {
4292 match task.await {
4293 Ok(worktree) => Ok(worktree),
4294 Err(err) => Err(anyhow!("{}", err)),
4295 }
4296 })
4297 }
4298
4299 pub fn remove_worktree(
4300 &mut self,
4301 id_to_remove: WorktreeId,
4302 cx: &mut ModelContext<Self>,
4303 ) -> impl Future<Output = ()> {
4304 self.worktrees.retain(|worktree| {
4305 if let Some(worktree) = worktree.upgrade(cx) {
4306 let id = worktree.read(cx).id();
4307 if id == id_to_remove {
4308 cx.emit(Event::WorktreeRemoved(id));
4309 false
4310 } else {
4311 true
4312 }
4313 } else {
4314 false
4315 }
4316 });
4317 self.metadata_changed(cx)
4318 }
4319
4320 fn add_worktree(
4321 &mut self,
4322 worktree: &ModelHandle<Worktree>,
4323 cx: &mut ModelContext<Self>,
4324 ) -> impl Future<Output = ()> {
4325 cx.observe(worktree, |_, _, cx| cx.notify()).detach();
4326 if worktree.read(cx).is_local() {
4327 cx.subscribe(worktree, |this, worktree, event, cx| match event {
4328 worktree::Event::UpdatedEntries => this.update_local_worktree_buffers(worktree, cx),
4329 worktree::Event::UpdatedGitRepositories(updated_repos) => {
4330 this.update_local_worktree_buffers_git_repos(worktree, updated_repos, cx)
4331 }
4332 })
4333 .detach();
4334 }
4335
4336 let push_strong_handle = {
4337 let worktree = worktree.read(cx);
4338 self.is_shared() || worktree.is_visible() || worktree.is_remote()
4339 };
4340 if push_strong_handle {
4341 self.worktrees
4342 .push(WorktreeHandle::Strong(worktree.clone()));
4343 } else {
4344 self.worktrees
4345 .push(WorktreeHandle::Weak(worktree.downgrade()));
4346 }
4347
4348 cx.observe_release(worktree, |this, worktree, cx| {
4349 let _ = this.remove_worktree(worktree.id(), cx);
4350 })
4351 .detach();
4352
4353 cx.emit(Event::WorktreeAdded);
4354 self.metadata_changed(cx)
4355 }
4356
4357 fn update_local_worktree_buffers(
4358 &mut self,
4359 worktree_handle: ModelHandle<Worktree>,
4360 cx: &mut ModelContext<Self>,
4361 ) {
4362 let snapshot = worktree_handle.read(cx).snapshot();
4363 let mut buffers_to_delete = Vec::new();
4364 let mut renamed_buffers = Vec::new();
4365 for (buffer_id, buffer) in &self.opened_buffers {
4366 if let Some(buffer) = buffer.upgrade(cx) {
4367 buffer.update(cx, |buffer, cx| {
4368 if let Some(old_file) = File::from_dyn(buffer.file()) {
4369 if old_file.worktree != worktree_handle {
4370 return;
4371 }
4372
4373 let new_file = if let Some(entry) = snapshot.entry_for_id(old_file.entry_id)
4374 {
4375 File {
4376 is_local: true,
4377 entry_id: entry.id,
4378 mtime: entry.mtime,
4379 path: entry.path.clone(),
4380 worktree: worktree_handle.clone(),
4381 is_deleted: false,
4382 }
4383 } else if let Some(entry) =
4384 snapshot.entry_for_path(old_file.path().as_ref())
4385 {
4386 File {
4387 is_local: true,
4388 entry_id: entry.id,
4389 mtime: entry.mtime,
4390 path: entry.path.clone(),
4391 worktree: worktree_handle.clone(),
4392 is_deleted: false,
4393 }
4394 } else {
4395 File {
4396 is_local: true,
4397 entry_id: old_file.entry_id,
4398 path: old_file.path().clone(),
4399 mtime: old_file.mtime(),
4400 worktree: worktree_handle.clone(),
4401 is_deleted: true,
4402 }
4403 };
4404
4405 let old_path = old_file.abs_path(cx);
4406 if new_file.abs_path(cx) != old_path {
4407 renamed_buffers.push((cx.handle(), old_path));
4408 }
4409
4410 if let Some(project_id) = self.remote_id() {
4411 self.client
4412 .send(proto::UpdateBufferFile {
4413 project_id,
4414 buffer_id: *buffer_id as u64,
4415 file: Some(new_file.to_proto()),
4416 })
4417 .log_err();
4418 }
4419 buffer.file_updated(Arc::new(new_file), cx).detach();
4420 }
4421 });
4422 } else {
4423 buffers_to_delete.push(*buffer_id);
4424 }
4425 }
4426
4427 for buffer_id in buffers_to_delete {
4428 self.opened_buffers.remove(&buffer_id);
4429 }
4430
4431 for (buffer, old_path) in renamed_buffers {
4432 self.unregister_buffer_from_language_server(&buffer, old_path, cx);
4433 self.assign_language_to_buffer(&buffer, cx);
4434 self.register_buffer_with_language_server(&buffer, cx);
4435 }
4436 }
4437
4438 fn update_local_worktree_buffers_git_repos(
4439 &mut self,
4440 worktree: ModelHandle<Worktree>,
4441 repos: &[GitRepositoryEntry],
4442 cx: &mut ModelContext<Self>,
4443 ) {
4444 for (_, buffer) in &self.opened_buffers {
4445 if let Some(buffer) = buffer.upgrade(cx) {
4446 let file = match File::from_dyn(buffer.read(cx).file()) {
4447 Some(file) => file,
4448 None => continue,
4449 };
4450 if file.worktree != worktree {
4451 continue;
4452 }
4453
4454 let path = file.path().clone();
4455
4456 let repo = match repos.iter().find(|repo| repo.manages(&path)) {
4457 Some(repo) => repo.clone(),
4458 None => return,
4459 };
4460
4461 let relative_repo = match path.strip_prefix(repo.content_path) {
4462 Ok(relative_repo) => relative_repo.to_owned(),
4463 Err(_) => return,
4464 };
4465
4466 let remote_id = self.remote_id();
4467 let client = self.client.clone();
4468
4469 cx.spawn(|_, mut cx| async move {
4470 let diff_base = cx
4471 .background()
4472 .spawn(async move { repo.repo.lock().load_index_text(&relative_repo) })
4473 .await;
4474
4475 let buffer_id = buffer.update(&mut cx, |buffer, cx| {
4476 buffer.set_diff_base(diff_base.clone(), cx);
4477 buffer.remote_id()
4478 });
4479
4480 if let Some(project_id) = remote_id {
4481 client
4482 .send(proto::UpdateDiffBase {
4483 project_id,
4484 buffer_id: buffer_id as u64,
4485 diff_base,
4486 })
4487 .log_err();
4488 }
4489 })
4490 .detach();
4491 }
4492 }
4493 }
4494
4495 pub fn set_active_path(&mut self, entry: Option<ProjectPath>, cx: &mut ModelContext<Self>) {
4496 let new_active_entry = entry.and_then(|project_path| {
4497 let worktree = self.worktree_for_id(project_path.worktree_id, cx)?;
4498 let entry = worktree.read(cx).entry_for_path(project_path.path)?;
4499 Some(entry.id)
4500 });
4501 if new_active_entry != self.active_entry {
4502 self.active_entry = new_active_entry;
4503 cx.emit(Event::ActiveEntryChanged(new_active_entry));
4504 }
4505 }
4506
4507 pub fn language_servers_running_disk_based_diagnostics(
4508 &self,
4509 ) -> impl Iterator<Item = usize> + '_ {
4510 self.language_server_statuses
4511 .iter()
4512 .filter_map(|(id, status)| {
4513 if status.has_pending_diagnostic_updates {
4514 Some(*id)
4515 } else {
4516 None
4517 }
4518 })
4519 }
4520
4521 pub fn diagnostic_summary(&self, cx: &AppContext) -> DiagnosticSummary {
4522 let mut summary = DiagnosticSummary::default();
4523 for (_, path_summary) in self.diagnostic_summaries(cx) {
4524 summary.error_count += path_summary.error_count;
4525 summary.warning_count += path_summary.warning_count;
4526 }
4527 summary
4528 }
4529
4530 pub fn diagnostic_summaries<'a>(
4531 &'a self,
4532 cx: &'a AppContext,
4533 ) -> impl Iterator<Item = (ProjectPath, DiagnosticSummary)> + 'a {
4534 self.visible_worktrees(cx).flat_map(move |worktree| {
4535 let worktree = worktree.read(cx);
4536 let worktree_id = worktree.id();
4537 worktree
4538 .diagnostic_summaries()
4539 .map(move |(path, summary)| (ProjectPath { worktree_id, path }, summary))
4540 })
4541 }
4542
4543 pub fn disk_based_diagnostics_started(
4544 &mut self,
4545 language_server_id: usize,
4546 cx: &mut ModelContext<Self>,
4547 ) {
4548 cx.emit(Event::DiskBasedDiagnosticsStarted { language_server_id });
4549 }
4550
4551 pub fn disk_based_diagnostics_finished(
4552 &mut self,
4553 language_server_id: usize,
4554 cx: &mut ModelContext<Self>,
4555 ) {
4556 cx.emit(Event::DiskBasedDiagnosticsFinished { language_server_id });
4557 }
4558
4559 pub fn active_entry(&self) -> Option<ProjectEntryId> {
4560 self.active_entry
4561 }
4562
4563 pub fn entry_for_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Entry> {
4564 self.worktree_for_id(path.worktree_id, cx)?
4565 .read(cx)
4566 .entry_for_path(&path.path)
4567 .cloned()
4568 }
4569
4570 pub fn path_for_entry(&self, entry_id: ProjectEntryId, cx: &AppContext) -> Option<ProjectPath> {
4571 let worktree = self.worktree_for_entry(entry_id, cx)?;
4572 let worktree = worktree.read(cx);
4573 let worktree_id = worktree.id();
4574 let path = worktree.entry_for_id(entry_id)?.path.clone();
4575 Some(ProjectPath { worktree_id, path })
4576 }
4577
4578 // RPC message handlers
4579
4580 async fn handle_unshare_project(
4581 this: ModelHandle<Self>,
4582 _: TypedEnvelope<proto::UnshareProject>,
4583 _: Arc<Client>,
4584 mut cx: AsyncAppContext,
4585 ) -> Result<()> {
4586 this.update(&mut cx, |this, cx| {
4587 if this.is_local() {
4588 this.unshare(cx)?;
4589 } else {
4590 this.disconnected_from_host(cx);
4591 }
4592 Ok(())
4593 })
4594 }
4595
4596 async fn handle_add_collaborator(
4597 this: ModelHandle<Self>,
4598 mut envelope: TypedEnvelope<proto::AddProjectCollaborator>,
4599 _: Arc<Client>,
4600 mut cx: AsyncAppContext,
4601 ) -> Result<()> {
4602 let collaborator = envelope
4603 .payload
4604 .collaborator
4605 .take()
4606 .ok_or_else(|| anyhow!("empty collaborator"))?;
4607
4608 let collaborator = Collaborator::from_proto(collaborator);
4609 this.update(&mut cx, |this, cx| {
4610 this.collaborators
4611 .insert(collaborator.peer_id, collaborator);
4612 cx.notify();
4613 });
4614
4615 Ok(())
4616 }
4617
4618 async fn handle_remove_collaborator(
4619 this: ModelHandle<Self>,
4620 envelope: TypedEnvelope<proto::RemoveProjectCollaborator>,
4621 _: Arc<Client>,
4622 mut cx: AsyncAppContext,
4623 ) -> Result<()> {
4624 this.update(&mut cx, |this, cx| {
4625 let peer_id = PeerId(envelope.payload.peer_id);
4626 let replica_id = this
4627 .collaborators
4628 .remove(&peer_id)
4629 .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?
4630 .replica_id;
4631 for buffer in this.opened_buffers.values() {
4632 if let Some(buffer) = buffer.upgrade(cx) {
4633 buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
4634 }
4635 }
4636 this.shared_buffers.remove(&peer_id);
4637
4638 cx.emit(Event::CollaboratorLeft(peer_id));
4639 cx.notify();
4640 Ok(())
4641 })
4642 }
4643
4644 async fn handle_update_project(
4645 this: ModelHandle<Self>,
4646 envelope: TypedEnvelope<proto::UpdateProject>,
4647 client: Arc<Client>,
4648 mut cx: AsyncAppContext,
4649 ) -> Result<()> {
4650 this.update(&mut cx, |this, cx| {
4651 let replica_id = this.replica_id();
4652 let remote_id = this.remote_id().ok_or_else(|| anyhow!("invalid project"))?;
4653
4654 let mut old_worktrees_by_id = this
4655 .worktrees
4656 .drain(..)
4657 .filter_map(|worktree| {
4658 let worktree = worktree.upgrade(cx)?;
4659 Some((worktree.read(cx).id(), worktree))
4660 })
4661 .collect::<HashMap<_, _>>();
4662
4663 for worktree in envelope.payload.worktrees {
4664 if let Some(old_worktree) =
4665 old_worktrees_by_id.remove(&WorktreeId::from_proto(worktree.id))
4666 {
4667 this.worktrees.push(WorktreeHandle::Strong(old_worktree));
4668 } else {
4669 let worktree =
4670 Worktree::remote(remote_id, replica_id, worktree, client.clone(), cx);
4671 let _ = this.add_worktree(&worktree, cx);
4672 }
4673 }
4674
4675 let _ = this.metadata_changed(cx);
4676 for (id, _) in old_worktrees_by_id {
4677 cx.emit(Event::WorktreeRemoved(id));
4678 }
4679
4680 Ok(())
4681 })
4682 }
4683
4684 async fn handle_update_worktree(
4685 this: ModelHandle<Self>,
4686 envelope: TypedEnvelope<proto::UpdateWorktree>,
4687 _: Arc<Client>,
4688 mut cx: AsyncAppContext,
4689 ) -> Result<()> {
4690 this.update(&mut cx, |this, cx| {
4691 let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
4692 if let Some(worktree) = this.worktree_for_id(worktree_id, cx) {
4693 worktree.update(cx, |worktree, _| {
4694 let worktree = worktree.as_remote_mut().unwrap();
4695 worktree.update_from_remote(envelope.payload);
4696 });
4697 }
4698 Ok(())
4699 })
4700 }
4701
4702 async fn handle_create_project_entry(
4703 this: ModelHandle<Self>,
4704 envelope: TypedEnvelope<proto::CreateProjectEntry>,
4705 _: Arc<Client>,
4706 mut cx: AsyncAppContext,
4707 ) -> Result<proto::ProjectEntryResponse> {
4708 let worktree = this.update(&mut cx, |this, cx| {
4709 let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
4710 this.worktree_for_id(worktree_id, cx)
4711 .ok_or_else(|| anyhow!("worktree not found"))
4712 })?;
4713 let worktree_scan_id = worktree.read_with(&cx, |worktree, _| worktree.scan_id());
4714 let entry = worktree
4715 .update(&mut cx, |worktree, cx| {
4716 let worktree = worktree.as_local_mut().unwrap();
4717 let path = PathBuf::from(envelope.payload.path);
4718 worktree.create_entry(path, envelope.payload.is_directory, cx)
4719 })
4720 .await?;
4721 Ok(proto::ProjectEntryResponse {
4722 entry: Some((&entry).into()),
4723 worktree_scan_id: worktree_scan_id as u64,
4724 })
4725 }
4726
4727 async fn handle_rename_project_entry(
4728 this: ModelHandle<Self>,
4729 envelope: TypedEnvelope<proto::RenameProjectEntry>,
4730 _: Arc<Client>,
4731 mut cx: AsyncAppContext,
4732 ) -> Result<proto::ProjectEntryResponse> {
4733 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
4734 let worktree = this.read_with(&cx, |this, cx| {
4735 this.worktree_for_entry(entry_id, cx)
4736 .ok_or_else(|| anyhow!("worktree not found"))
4737 })?;
4738 let worktree_scan_id = worktree.read_with(&cx, |worktree, _| worktree.scan_id());
4739 let entry = worktree
4740 .update(&mut cx, |worktree, cx| {
4741 let new_path = PathBuf::from(envelope.payload.new_path);
4742 worktree
4743 .as_local_mut()
4744 .unwrap()
4745 .rename_entry(entry_id, new_path, cx)
4746 .ok_or_else(|| anyhow!("invalid entry"))
4747 })?
4748 .await?;
4749 Ok(proto::ProjectEntryResponse {
4750 entry: Some((&entry).into()),
4751 worktree_scan_id: worktree_scan_id as u64,
4752 })
4753 }
4754
4755 async fn handle_copy_project_entry(
4756 this: ModelHandle<Self>,
4757 envelope: TypedEnvelope<proto::CopyProjectEntry>,
4758 _: Arc<Client>,
4759 mut cx: AsyncAppContext,
4760 ) -> Result<proto::ProjectEntryResponse> {
4761 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
4762 let worktree = this.read_with(&cx, |this, cx| {
4763 this.worktree_for_entry(entry_id, cx)
4764 .ok_or_else(|| anyhow!("worktree not found"))
4765 })?;
4766 let worktree_scan_id = worktree.read_with(&cx, |worktree, _| worktree.scan_id());
4767 let entry = worktree
4768 .update(&mut cx, |worktree, cx| {
4769 let new_path = PathBuf::from(envelope.payload.new_path);
4770 worktree
4771 .as_local_mut()
4772 .unwrap()
4773 .copy_entry(entry_id, new_path, cx)
4774 .ok_or_else(|| anyhow!("invalid entry"))
4775 })?
4776 .await?;
4777 Ok(proto::ProjectEntryResponse {
4778 entry: Some((&entry).into()),
4779 worktree_scan_id: worktree_scan_id as u64,
4780 })
4781 }
4782
4783 async fn handle_delete_project_entry(
4784 this: ModelHandle<Self>,
4785 envelope: TypedEnvelope<proto::DeleteProjectEntry>,
4786 _: Arc<Client>,
4787 mut cx: AsyncAppContext,
4788 ) -> Result<proto::ProjectEntryResponse> {
4789 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
4790 let worktree = this.read_with(&cx, |this, cx| {
4791 this.worktree_for_entry(entry_id, cx)
4792 .ok_or_else(|| anyhow!("worktree not found"))
4793 })?;
4794 let worktree_scan_id = worktree.read_with(&cx, |worktree, _| worktree.scan_id());
4795 worktree
4796 .update(&mut cx, |worktree, cx| {
4797 worktree
4798 .as_local_mut()
4799 .unwrap()
4800 .delete_entry(entry_id, cx)
4801 .ok_or_else(|| anyhow!("invalid entry"))
4802 })?
4803 .await?;
4804 Ok(proto::ProjectEntryResponse {
4805 entry: None,
4806 worktree_scan_id: worktree_scan_id as u64,
4807 })
4808 }
4809
4810 async fn handle_update_diagnostic_summary(
4811 this: ModelHandle<Self>,
4812 envelope: TypedEnvelope<proto::UpdateDiagnosticSummary>,
4813 _: Arc<Client>,
4814 mut cx: AsyncAppContext,
4815 ) -> Result<()> {
4816 this.update(&mut cx, |this, cx| {
4817 let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
4818 if let Some(worktree) = this.worktree_for_id(worktree_id, cx) {
4819 if let Some(summary) = envelope.payload.summary {
4820 let project_path = ProjectPath {
4821 worktree_id,
4822 path: Path::new(&summary.path).into(),
4823 };
4824 worktree.update(cx, |worktree, _| {
4825 worktree
4826 .as_remote_mut()
4827 .unwrap()
4828 .update_diagnostic_summary(project_path.path.clone(), &summary);
4829 });
4830 cx.emit(Event::DiagnosticsUpdated {
4831 language_server_id: summary.language_server_id as usize,
4832 path: project_path,
4833 });
4834 }
4835 }
4836 Ok(())
4837 })
4838 }
4839
4840 async fn handle_start_language_server(
4841 this: ModelHandle<Self>,
4842 envelope: TypedEnvelope<proto::StartLanguageServer>,
4843 _: Arc<Client>,
4844 mut cx: AsyncAppContext,
4845 ) -> Result<()> {
4846 let server = envelope
4847 .payload
4848 .server
4849 .ok_or_else(|| anyhow!("invalid server"))?;
4850 this.update(&mut cx, |this, cx| {
4851 this.language_server_statuses.insert(
4852 server.id as usize,
4853 LanguageServerStatus {
4854 name: server.name,
4855 pending_work: Default::default(),
4856 has_pending_diagnostic_updates: false,
4857 progress_tokens: Default::default(),
4858 },
4859 );
4860 cx.notify();
4861 });
4862 Ok(())
4863 }
4864
4865 async fn handle_update_language_server(
4866 this: ModelHandle<Self>,
4867 envelope: TypedEnvelope<proto::UpdateLanguageServer>,
4868 _: Arc<Client>,
4869 mut cx: AsyncAppContext,
4870 ) -> Result<()> {
4871 let language_server_id = envelope.payload.language_server_id as usize;
4872 match envelope
4873 .payload
4874 .variant
4875 .ok_or_else(|| anyhow!("invalid variant"))?
4876 {
4877 proto::update_language_server::Variant::WorkStart(payload) => {
4878 this.update(&mut cx, |this, cx| {
4879 this.on_lsp_work_start(
4880 language_server_id,
4881 payload.token,
4882 LanguageServerProgress {
4883 message: payload.message,
4884 percentage: payload.percentage.map(|p| p as usize),
4885 last_update_at: Instant::now(),
4886 },
4887 cx,
4888 );
4889 })
4890 }
4891 proto::update_language_server::Variant::WorkProgress(payload) => {
4892 this.update(&mut cx, |this, cx| {
4893 this.on_lsp_work_progress(
4894 language_server_id,
4895 payload.token,
4896 LanguageServerProgress {
4897 message: payload.message,
4898 percentage: payload.percentage.map(|p| p as usize),
4899 last_update_at: Instant::now(),
4900 },
4901 cx,
4902 );
4903 })
4904 }
4905 proto::update_language_server::Variant::WorkEnd(payload) => {
4906 this.update(&mut cx, |this, cx| {
4907 this.on_lsp_work_end(language_server_id, payload.token, cx);
4908 })
4909 }
4910 proto::update_language_server::Variant::DiskBasedDiagnosticsUpdating(_) => {
4911 this.update(&mut cx, |this, cx| {
4912 this.disk_based_diagnostics_started(language_server_id, cx);
4913 })
4914 }
4915 proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(_) => {
4916 this.update(&mut cx, |this, cx| {
4917 this.disk_based_diagnostics_finished(language_server_id, cx)
4918 });
4919 }
4920 }
4921
4922 Ok(())
4923 }
4924
4925 async fn handle_update_buffer(
4926 this: ModelHandle<Self>,
4927 envelope: TypedEnvelope<proto::UpdateBuffer>,
4928 _: Arc<Client>,
4929 mut cx: AsyncAppContext,
4930 ) -> Result<()> {
4931 this.update(&mut cx, |this, cx| {
4932 let payload = envelope.payload.clone();
4933 let buffer_id = payload.buffer_id;
4934 let ops = payload
4935 .operations
4936 .into_iter()
4937 .map(language::proto::deserialize_operation)
4938 .collect::<Result<Vec<_>, _>>()?;
4939 let is_remote = this.is_remote();
4940 match this.opened_buffers.entry(buffer_id) {
4941 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
4942 OpenBuffer::Strong(buffer) => {
4943 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
4944 }
4945 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
4946 OpenBuffer::Weak(_) => {}
4947 },
4948 hash_map::Entry::Vacant(e) => {
4949 assert!(
4950 is_remote,
4951 "received buffer update from {:?}",
4952 envelope.original_sender_id
4953 );
4954 e.insert(OpenBuffer::Operations(ops));
4955 }
4956 }
4957 Ok(())
4958 })
4959 }
4960
4961 async fn handle_create_buffer_for_peer(
4962 this: ModelHandle<Self>,
4963 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
4964 _: Arc<Client>,
4965 mut cx: AsyncAppContext,
4966 ) -> Result<()> {
4967 this.update(&mut cx, |this, cx| {
4968 match envelope
4969 .payload
4970 .variant
4971 .ok_or_else(|| anyhow!("missing variant"))?
4972 {
4973 proto::create_buffer_for_peer::Variant::State(mut state) => {
4974 let mut buffer_file = None;
4975 if let Some(file) = state.file.take() {
4976 let worktree_id = WorktreeId::from_proto(file.worktree_id);
4977 let worktree = this.worktree_for_id(worktree_id, cx).ok_or_else(|| {
4978 anyhow!("no worktree found for id {}", file.worktree_id)
4979 })?;
4980 buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
4981 as Arc<dyn language::File>);
4982 }
4983
4984 let buffer_id = state.id;
4985 let buffer = cx.add_model(|_| {
4986 Buffer::from_proto(this.replica_id(), state, buffer_file).unwrap()
4987 });
4988 this.incomplete_buffers.insert(buffer_id, buffer);
4989 }
4990 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
4991 let buffer = this
4992 .incomplete_buffers
4993 .get(&chunk.buffer_id)
4994 .ok_or_else(|| {
4995 anyhow!(
4996 "received chunk for buffer {} without initial state",
4997 chunk.buffer_id
4998 )
4999 })?
5000 .clone();
5001 let operations = chunk
5002 .operations
5003 .into_iter()
5004 .map(language::proto::deserialize_operation)
5005 .collect::<Result<Vec<_>>>()?;
5006 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))?;
5007
5008 if chunk.is_last {
5009 this.incomplete_buffers.remove(&chunk.buffer_id);
5010 this.register_buffer(&buffer, cx)?;
5011 }
5012 }
5013 }
5014
5015 Ok(())
5016 })
5017 }
5018
5019 async fn handle_update_diff_base(
5020 this: ModelHandle<Self>,
5021 envelope: TypedEnvelope<proto::UpdateDiffBase>,
5022 _: Arc<Client>,
5023 mut cx: AsyncAppContext,
5024 ) -> Result<()> {
5025 this.update(&mut cx, |this, cx| {
5026 let buffer_id = envelope.payload.buffer_id;
5027 let diff_base = envelope.payload.diff_base;
5028 let buffer = this
5029 .opened_buffers
5030 .get_mut(&buffer_id)
5031 .and_then(|b| b.upgrade(cx))
5032 .ok_or_else(|| anyhow!("No such buffer {}", buffer_id))?;
5033
5034 buffer.update(cx, |buffer, cx| buffer.set_diff_base(diff_base, cx));
5035
5036 Ok(())
5037 })
5038 }
5039
5040 async fn handle_update_buffer_file(
5041 this: ModelHandle<Self>,
5042 envelope: TypedEnvelope<proto::UpdateBufferFile>,
5043 _: Arc<Client>,
5044 mut cx: AsyncAppContext,
5045 ) -> Result<()> {
5046 this.update(&mut cx, |this, cx| {
5047 let payload = envelope.payload.clone();
5048 let buffer_id = payload.buffer_id;
5049 let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
5050 let worktree = this
5051 .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
5052 .ok_or_else(|| anyhow!("no such worktree"))?;
5053 let file = File::from_proto(file, worktree, cx)?;
5054 let buffer = this
5055 .opened_buffers
5056 .get_mut(&buffer_id)
5057 .and_then(|b| b.upgrade(cx))
5058 .ok_or_else(|| anyhow!("no such buffer"))?;
5059 buffer.update(cx, |buffer, cx| {
5060 buffer.file_updated(Arc::new(file), cx).detach();
5061 });
5062 this.assign_language_to_buffer(&buffer, cx);
5063 Ok(())
5064 })
5065 }
5066
5067 async fn handle_save_buffer(
5068 this: ModelHandle<Self>,
5069 envelope: TypedEnvelope<proto::SaveBuffer>,
5070 _: Arc<Client>,
5071 mut cx: AsyncAppContext,
5072 ) -> Result<proto::BufferSaved> {
5073 let buffer_id = envelope.payload.buffer_id;
5074 let requested_version = deserialize_version(envelope.payload.version);
5075
5076 let (project_id, buffer) = this.update(&mut cx, |this, cx| {
5077 let project_id = this.remote_id().ok_or_else(|| anyhow!("not connected"))?;
5078 let buffer = this
5079 .opened_buffers
5080 .get(&buffer_id)
5081 .and_then(|buffer| buffer.upgrade(cx))
5082 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?;
5083 Ok::<_, anyhow::Error>((project_id, buffer))
5084 })?;
5085 buffer
5086 .update(&mut cx, |buffer, _| {
5087 buffer.wait_for_version(requested_version)
5088 })
5089 .await;
5090
5091 let (saved_version, fingerprint, mtime) =
5092 buffer.update(&mut cx, |buffer, cx| buffer.save(cx)).await?;
5093 Ok(proto::BufferSaved {
5094 project_id,
5095 buffer_id,
5096 version: serialize_version(&saved_version),
5097 mtime: Some(mtime.into()),
5098 fingerprint,
5099 })
5100 }
5101
5102 async fn handle_reload_buffers(
5103 this: ModelHandle<Self>,
5104 envelope: TypedEnvelope<proto::ReloadBuffers>,
5105 _: Arc<Client>,
5106 mut cx: AsyncAppContext,
5107 ) -> Result<proto::ReloadBuffersResponse> {
5108 let sender_id = envelope.original_sender_id()?;
5109 let reload = this.update(&mut cx, |this, cx| {
5110 let mut buffers = HashSet::default();
5111 for buffer_id in &envelope.payload.buffer_ids {
5112 buffers.insert(
5113 this.opened_buffers
5114 .get(buffer_id)
5115 .and_then(|buffer| buffer.upgrade(cx))
5116 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?,
5117 );
5118 }
5119 Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
5120 })?;
5121
5122 let project_transaction = reload.await?;
5123 let project_transaction = this.update(&mut cx, |this, cx| {
5124 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
5125 });
5126 Ok(proto::ReloadBuffersResponse {
5127 transaction: Some(project_transaction),
5128 })
5129 }
5130
5131 async fn handle_format_buffers(
5132 this: ModelHandle<Self>,
5133 envelope: TypedEnvelope<proto::FormatBuffers>,
5134 _: Arc<Client>,
5135 mut cx: AsyncAppContext,
5136 ) -> Result<proto::FormatBuffersResponse> {
5137 let sender_id = envelope.original_sender_id()?;
5138 let format = this.update(&mut cx, |this, cx| {
5139 let mut buffers = HashSet::default();
5140 for buffer_id in &envelope.payload.buffer_ids {
5141 buffers.insert(
5142 this.opened_buffers
5143 .get(buffer_id)
5144 .and_then(|buffer| buffer.upgrade(cx))
5145 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?,
5146 );
5147 }
5148 let trigger = FormatTrigger::from_proto(envelope.payload.trigger);
5149 Ok::<_, anyhow::Error>(this.format(buffers, false, trigger, cx))
5150 })?;
5151
5152 let project_transaction = format.await?;
5153 let project_transaction = this.update(&mut cx, |this, cx| {
5154 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
5155 });
5156 Ok(proto::FormatBuffersResponse {
5157 transaction: Some(project_transaction),
5158 })
5159 }
5160
5161 async fn handle_get_completions(
5162 this: ModelHandle<Self>,
5163 envelope: TypedEnvelope<proto::GetCompletions>,
5164 _: Arc<Client>,
5165 mut cx: AsyncAppContext,
5166 ) -> Result<proto::GetCompletionsResponse> {
5167 let buffer = this.read_with(&cx, |this, cx| {
5168 this.opened_buffers
5169 .get(&envelope.payload.buffer_id)
5170 .and_then(|buffer| buffer.upgrade(cx))
5171 .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))
5172 })?;
5173
5174 let position = envelope
5175 .payload
5176 .position
5177 .and_then(language::proto::deserialize_anchor)
5178 .map(|p| {
5179 buffer.read_with(&cx, |buffer, _| {
5180 buffer.clip_point_utf16(Unclipped(p.to_point_utf16(buffer)), Bias::Left)
5181 })
5182 })
5183 .ok_or_else(|| anyhow!("invalid position"))?;
5184
5185 let version = deserialize_version(envelope.payload.version);
5186 buffer
5187 .update(&mut cx, |buffer, _| buffer.wait_for_version(version))
5188 .await;
5189 let version = buffer.read_with(&cx, |buffer, _| buffer.version());
5190
5191 let completions = this
5192 .update(&mut cx, |this, cx| this.completions(&buffer, position, cx))
5193 .await?;
5194
5195 Ok(proto::GetCompletionsResponse {
5196 completions: completions
5197 .iter()
5198 .map(language::proto::serialize_completion)
5199 .collect(),
5200 version: serialize_version(&version),
5201 })
5202 }
5203
5204 async fn handle_apply_additional_edits_for_completion(
5205 this: ModelHandle<Self>,
5206 envelope: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
5207 _: Arc<Client>,
5208 mut cx: AsyncAppContext,
5209 ) -> Result<proto::ApplyCompletionAdditionalEditsResponse> {
5210 let (buffer, completion) = this.update(&mut cx, |this, cx| {
5211 let buffer = this
5212 .opened_buffers
5213 .get(&envelope.payload.buffer_id)
5214 .and_then(|buffer| buffer.upgrade(cx))
5215 .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?;
5216 let language = buffer.read(cx).language();
5217 let completion = language::proto::deserialize_completion(
5218 envelope
5219 .payload
5220 .completion
5221 .ok_or_else(|| anyhow!("invalid completion"))?,
5222 language.cloned(),
5223 );
5224 Ok::<_, anyhow::Error>((buffer, completion))
5225 })?;
5226
5227 let completion = completion.await?;
5228
5229 let apply_additional_edits = this.update(&mut cx, |this, cx| {
5230 this.apply_additional_edits_for_completion(buffer, completion, false, cx)
5231 });
5232
5233 Ok(proto::ApplyCompletionAdditionalEditsResponse {
5234 transaction: apply_additional_edits
5235 .await?
5236 .as_ref()
5237 .map(language::proto::serialize_transaction),
5238 })
5239 }
5240
5241 async fn handle_get_code_actions(
5242 this: ModelHandle<Self>,
5243 envelope: TypedEnvelope<proto::GetCodeActions>,
5244 _: Arc<Client>,
5245 mut cx: AsyncAppContext,
5246 ) -> Result<proto::GetCodeActionsResponse> {
5247 let start = envelope
5248 .payload
5249 .start
5250 .and_then(language::proto::deserialize_anchor)
5251 .ok_or_else(|| anyhow!("invalid start"))?;
5252 let end = envelope
5253 .payload
5254 .end
5255 .and_then(language::proto::deserialize_anchor)
5256 .ok_or_else(|| anyhow!("invalid end"))?;
5257 let buffer = this.update(&mut cx, |this, cx| {
5258 this.opened_buffers
5259 .get(&envelope.payload.buffer_id)
5260 .and_then(|buffer| buffer.upgrade(cx))
5261 .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))
5262 })?;
5263 buffer
5264 .update(&mut cx, |buffer, _| {
5265 buffer.wait_for_version(deserialize_version(envelope.payload.version))
5266 })
5267 .await;
5268
5269 let version = buffer.read_with(&cx, |buffer, _| buffer.version());
5270 let code_actions = this.update(&mut cx, |this, cx| {
5271 Ok::<_, anyhow::Error>(this.code_actions(&buffer, start..end, cx))
5272 })?;
5273
5274 Ok(proto::GetCodeActionsResponse {
5275 actions: code_actions
5276 .await?
5277 .iter()
5278 .map(language::proto::serialize_code_action)
5279 .collect(),
5280 version: serialize_version(&version),
5281 })
5282 }
5283
5284 async fn handle_apply_code_action(
5285 this: ModelHandle<Self>,
5286 envelope: TypedEnvelope<proto::ApplyCodeAction>,
5287 _: Arc<Client>,
5288 mut cx: AsyncAppContext,
5289 ) -> Result<proto::ApplyCodeActionResponse> {
5290 let sender_id = envelope.original_sender_id()?;
5291 let action = language::proto::deserialize_code_action(
5292 envelope
5293 .payload
5294 .action
5295 .ok_or_else(|| anyhow!("invalid action"))?,
5296 )?;
5297 let apply_code_action = this.update(&mut cx, |this, cx| {
5298 let buffer = this
5299 .opened_buffers
5300 .get(&envelope.payload.buffer_id)
5301 .and_then(|buffer| buffer.upgrade(cx))
5302 .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?;
5303 Ok::<_, anyhow::Error>(this.apply_code_action(buffer, action, false, cx))
5304 })?;
5305
5306 let project_transaction = apply_code_action.await?;
5307 let project_transaction = this.update(&mut cx, |this, cx| {
5308 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
5309 });
5310 Ok(proto::ApplyCodeActionResponse {
5311 transaction: Some(project_transaction),
5312 })
5313 }
5314
5315 async fn handle_lsp_command<T: LspCommand>(
5316 this: ModelHandle<Self>,
5317 envelope: TypedEnvelope<T::ProtoRequest>,
5318 _: Arc<Client>,
5319 mut cx: AsyncAppContext,
5320 ) -> Result<<T::ProtoRequest as proto::RequestMessage>::Response>
5321 where
5322 <T::LspRequest as lsp::request::Request>::Result: Send,
5323 {
5324 let sender_id = envelope.original_sender_id()?;
5325 let buffer_id = T::buffer_id_from_proto(&envelope.payload);
5326 let buffer_handle = this.read_with(&cx, |this, _| {
5327 this.opened_buffers
5328 .get(&buffer_id)
5329 .and_then(|buffer| buffer.upgrade(&cx))
5330 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
5331 })?;
5332 let request = T::from_proto(
5333 envelope.payload,
5334 this.clone(),
5335 buffer_handle.clone(),
5336 cx.clone(),
5337 )
5338 .await?;
5339 let buffer_version = buffer_handle.read_with(&cx, |buffer, _| buffer.version());
5340 let response = this
5341 .update(&mut cx, |this, cx| {
5342 this.request_lsp(buffer_handle, request, cx)
5343 })
5344 .await?;
5345 this.update(&mut cx, |this, cx| {
5346 Ok(T::response_to_proto(
5347 response,
5348 this,
5349 sender_id,
5350 &buffer_version,
5351 cx,
5352 ))
5353 })
5354 }
5355
5356 async fn handle_get_project_symbols(
5357 this: ModelHandle<Self>,
5358 envelope: TypedEnvelope<proto::GetProjectSymbols>,
5359 _: Arc<Client>,
5360 mut cx: AsyncAppContext,
5361 ) -> Result<proto::GetProjectSymbolsResponse> {
5362 let symbols = this
5363 .update(&mut cx, |this, cx| {
5364 this.symbols(&envelope.payload.query, cx)
5365 })
5366 .await?;
5367
5368 Ok(proto::GetProjectSymbolsResponse {
5369 symbols: symbols.iter().map(serialize_symbol).collect(),
5370 })
5371 }
5372
5373 async fn handle_search_project(
5374 this: ModelHandle<Self>,
5375 envelope: TypedEnvelope<proto::SearchProject>,
5376 _: Arc<Client>,
5377 mut cx: AsyncAppContext,
5378 ) -> Result<proto::SearchProjectResponse> {
5379 let peer_id = envelope.original_sender_id()?;
5380 let query = SearchQuery::from_proto(envelope.payload)?;
5381 let result = this
5382 .update(&mut cx, |this, cx| this.search(query, cx))
5383 .await?;
5384
5385 this.update(&mut cx, |this, cx| {
5386 let mut locations = Vec::new();
5387 for (buffer, ranges) in result {
5388 for range in ranges {
5389 let start = serialize_anchor(&range.start);
5390 let end = serialize_anchor(&range.end);
5391 let buffer_id = this.create_buffer_for_peer(&buffer, peer_id, cx);
5392 locations.push(proto::Location {
5393 buffer_id,
5394 start: Some(start),
5395 end: Some(end),
5396 });
5397 }
5398 }
5399 Ok(proto::SearchProjectResponse { locations })
5400 })
5401 }
5402
5403 async fn handle_open_buffer_for_symbol(
5404 this: ModelHandle<Self>,
5405 envelope: TypedEnvelope<proto::OpenBufferForSymbol>,
5406 _: Arc<Client>,
5407 mut cx: AsyncAppContext,
5408 ) -> Result<proto::OpenBufferForSymbolResponse> {
5409 let peer_id = envelope.original_sender_id()?;
5410 let symbol = envelope
5411 .payload
5412 .symbol
5413 .ok_or_else(|| anyhow!("invalid symbol"))?;
5414 let symbol = this
5415 .read_with(&cx, |this, _| this.deserialize_symbol(symbol))
5416 .await?;
5417 let symbol = this.read_with(&cx, |this, _| {
5418 let signature = this.symbol_signature(&symbol.path);
5419 if signature == symbol.signature {
5420 Ok(symbol)
5421 } else {
5422 Err(anyhow!("invalid symbol signature"))
5423 }
5424 })?;
5425 let buffer = this
5426 .update(&mut cx, |this, cx| this.open_buffer_for_symbol(&symbol, cx))
5427 .await?;
5428
5429 Ok(proto::OpenBufferForSymbolResponse {
5430 buffer_id: this.update(&mut cx, |this, cx| {
5431 this.create_buffer_for_peer(&buffer, peer_id, cx)
5432 }),
5433 })
5434 }
5435
5436 fn symbol_signature(&self, project_path: &ProjectPath) -> [u8; 32] {
5437 let mut hasher = Sha256::new();
5438 hasher.update(project_path.worktree_id.to_proto().to_be_bytes());
5439 hasher.update(project_path.path.to_string_lossy().as_bytes());
5440 hasher.update(self.nonce.to_be_bytes());
5441 hasher.finalize().as_slice().try_into().unwrap()
5442 }
5443
5444 async fn handle_open_buffer_by_id(
5445 this: ModelHandle<Self>,
5446 envelope: TypedEnvelope<proto::OpenBufferById>,
5447 _: Arc<Client>,
5448 mut cx: AsyncAppContext,
5449 ) -> Result<proto::OpenBufferResponse> {
5450 let peer_id = envelope.original_sender_id()?;
5451 let buffer = this
5452 .update(&mut cx, |this, cx| {
5453 this.open_buffer_by_id(envelope.payload.id, cx)
5454 })
5455 .await?;
5456 this.update(&mut cx, |this, cx| {
5457 Ok(proto::OpenBufferResponse {
5458 buffer_id: this.create_buffer_for_peer(&buffer, peer_id, cx),
5459 })
5460 })
5461 }
5462
5463 async fn handle_open_buffer_by_path(
5464 this: ModelHandle<Self>,
5465 envelope: TypedEnvelope<proto::OpenBufferByPath>,
5466 _: Arc<Client>,
5467 mut cx: AsyncAppContext,
5468 ) -> Result<proto::OpenBufferResponse> {
5469 let peer_id = envelope.original_sender_id()?;
5470 let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
5471 let open_buffer = this.update(&mut cx, |this, cx| {
5472 this.open_buffer(
5473 ProjectPath {
5474 worktree_id,
5475 path: PathBuf::from(envelope.payload.path).into(),
5476 },
5477 cx,
5478 )
5479 });
5480
5481 let buffer = open_buffer.await?;
5482 this.update(&mut cx, |this, cx| {
5483 Ok(proto::OpenBufferResponse {
5484 buffer_id: this.create_buffer_for_peer(&buffer, peer_id, cx),
5485 })
5486 })
5487 }
5488
5489 fn serialize_project_transaction_for_peer(
5490 &mut self,
5491 project_transaction: ProjectTransaction,
5492 peer_id: PeerId,
5493 cx: &AppContext,
5494 ) -> proto::ProjectTransaction {
5495 let mut serialized_transaction = proto::ProjectTransaction {
5496 buffer_ids: Default::default(),
5497 transactions: Default::default(),
5498 };
5499 for (buffer, transaction) in project_transaction.0 {
5500 serialized_transaction
5501 .buffer_ids
5502 .push(self.create_buffer_for_peer(&buffer, peer_id, cx));
5503 serialized_transaction
5504 .transactions
5505 .push(language::proto::serialize_transaction(&transaction));
5506 }
5507 serialized_transaction
5508 }
5509
5510 fn deserialize_project_transaction(
5511 &mut self,
5512 message: proto::ProjectTransaction,
5513 push_to_history: bool,
5514 cx: &mut ModelContext<Self>,
5515 ) -> Task<Result<ProjectTransaction>> {
5516 cx.spawn(|this, mut cx| async move {
5517 let mut project_transaction = ProjectTransaction::default();
5518 for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
5519 {
5520 let buffer = this
5521 .update(&mut cx, |this, cx| this.wait_for_buffer(buffer_id, cx))
5522 .await?;
5523 let transaction = language::proto::deserialize_transaction(transaction)?;
5524 project_transaction.0.insert(buffer, transaction);
5525 }
5526
5527 for (buffer, transaction) in &project_transaction.0 {
5528 buffer
5529 .update(&mut cx, |buffer, _| {
5530 buffer.wait_for_edits(transaction.edit_ids.iter().copied())
5531 })
5532 .await;
5533
5534 if push_to_history {
5535 buffer.update(&mut cx, |buffer, _| {
5536 buffer.push_transaction(transaction.clone(), Instant::now());
5537 });
5538 }
5539 }
5540
5541 Ok(project_transaction)
5542 })
5543 }
5544
5545 fn create_buffer_for_peer(
5546 &mut self,
5547 buffer: &ModelHandle<Buffer>,
5548 peer_id: PeerId,
5549 cx: &AppContext,
5550 ) -> u64 {
5551 let buffer_id = buffer.read(cx).remote_id();
5552 if let Some(project_id) = self.remote_id() {
5553 let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
5554 if shared_buffers.insert(buffer_id) {
5555 let buffer = buffer.read(cx);
5556 let state = buffer.to_proto();
5557 let operations = buffer.serialize_ops(cx);
5558 let client = self.client.clone();
5559 cx.background()
5560 .spawn(
5561 async move {
5562 let mut operations = operations.await;
5563
5564 client.send(proto::CreateBufferForPeer {
5565 project_id,
5566 peer_id: peer_id.0,
5567 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
5568 })?;
5569
5570 loop {
5571 #[cfg(any(test, feature = "test-support"))]
5572 const CHUNK_SIZE: usize = 5;
5573
5574 #[cfg(not(any(test, feature = "test-support")))]
5575 const CHUNK_SIZE: usize = 100;
5576
5577 let chunk = operations
5578 .drain(..cmp::min(CHUNK_SIZE, operations.len()))
5579 .collect();
5580 let is_last = operations.is_empty();
5581 client.send(proto::CreateBufferForPeer {
5582 project_id,
5583 peer_id: peer_id.0,
5584 variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
5585 proto::BufferChunk {
5586 buffer_id,
5587 operations: chunk,
5588 is_last,
5589 },
5590 )),
5591 })?;
5592
5593 if is_last {
5594 break;
5595 }
5596 }
5597
5598 Ok(())
5599 }
5600 .log_err(),
5601 )
5602 .detach();
5603 }
5604 }
5605
5606 buffer_id
5607 }
5608
5609 fn wait_for_buffer(
5610 &self,
5611 id: u64,
5612 cx: &mut ModelContext<Self>,
5613 ) -> Task<Result<ModelHandle<Buffer>>> {
5614 let mut opened_buffer_rx = self.opened_buffer.1.clone();
5615 cx.spawn(|this, mut cx| async move {
5616 let buffer = loop {
5617 let buffer = this.read_with(&cx, |this, cx| {
5618 this.opened_buffers
5619 .get(&id)
5620 .and_then(|buffer| buffer.upgrade(cx))
5621 });
5622 if let Some(buffer) = buffer {
5623 break buffer;
5624 } else if this.read_with(&cx, |this, _| this.is_read_only()) {
5625 return Err(anyhow!("disconnected before buffer {} could be opened", id));
5626 }
5627
5628 opened_buffer_rx
5629 .next()
5630 .await
5631 .ok_or_else(|| anyhow!("project dropped while waiting for buffer"))?;
5632 };
5633 buffer.update(&mut cx, |buffer, cx| buffer.git_diff_recalc(cx));
5634 Ok(buffer)
5635 })
5636 }
5637
5638 fn deserialize_symbol(
5639 &self,
5640 serialized_symbol: proto::Symbol,
5641 ) -> impl Future<Output = Result<Symbol>> {
5642 let languages = self.languages.clone();
5643 async move {
5644 let source_worktree_id = WorktreeId::from_proto(serialized_symbol.source_worktree_id);
5645 let worktree_id = WorktreeId::from_proto(serialized_symbol.worktree_id);
5646 let start = serialized_symbol
5647 .start
5648 .ok_or_else(|| anyhow!("invalid start"))?;
5649 let end = serialized_symbol
5650 .end
5651 .ok_or_else(|| anyhow!("invalid end"))?;
5652 let kind = unsafe { mem::transmute(serialized_symbol.kind) };
5653 let path = ProjectPath {
5654 worktree_id,
5655 path: PathBuf::from(serialized_symbol.path).into(),
5656 };
5657 let language = languages.select_language(&path.path);
5658 Ok(Symbol {
5659 language_server_name: LanguageServerName(
5660 serialized_symbol.language_server_name.into(),
5661 ),
5662 source_worktree_id,
5663 path,
5664 label: {
5665 match language {
5666 Some(language) => {
5667 language
5668 .label_for_symbol(&serialized_symbol.name, kind)
5669 .await
5670 }
5671 None => None,
5672 }
5673 .unwrap_or_else(|| CodeLabel::plain(serialized_symbol.name.clone(), None))
5674 },
5675
5676 name: serialized_symbol.name,
5677 range: Unclipped(PointUtf16::new(start.row, start.column))
5678 ..Unclipped(PointUtf16::new(end.row, end.column)),
5679 kind,
5680 signature: serialized_symbol
5681 .signature
5682 .try_into()
5683 .map_err(|_| anyhow!("invalid signature"))?,
5684 })
5685 }
5686 }
5687
5688 async fn handle_buffer_saved(
5689 this: ModelHandle<Self>,
5690 envelope: TypedEnvelope<proto::BufferSaved>,
5691 _: Arc<Client>,
5692 mut cx: AsyncAppContext,
5693 ) -> Result<()> {
5694 let version = deserialize_version(envelope.payload.version);
5695 let mtime = envelope
5696 .payload
5697 .mtime
5698 .ok_or_else(|| anyhow!("missing mtime"))?
5699 .into();
5700
5701 this.update(&mut cx, |this, cx| {
5702 let buffer = this
5703 .opened_buffers
5704 .get(&envelope.payload.buffer_id)
5705 .and_then(|buffer| buffer.upgrade(cx));
5706 if let Some(buffer) = buffer {
5707 buffer.update(cx, |buffer, cx| {
5708 buffer.did_save(version, envelope.payload.fingerprint, mtime, None, cx);
5709 });
5710 }
5711 Ok(())
5712 })
5713 }
5714
5715 async fn handle_buffer_reloaded(
5716 this: ModelHandle<Self>,
5717 envelope: TypedEnvelope<proto::BufferReloaded>,
5718 _: Arc<Client>,
5719 mut cx: AsyncAppContext,
5720 ) -> Result<()> {
5721 let payload = envelope.payload;
5722 let version = deserialize_version(payload.version);
5723 let line_ending = deserialize_line_ending(
5724 proto::LineEnding::from_i32(payload.line_ending)
5725 .ok_or_else(|| anyhow!("missing line ending"))?,
5726 );
5727 let mtime = payload
5728 .mtime
5729 .ok_or_else(|| anyhow!("missing mtime"))?
5730 .into();
5731 this.update(&mut cx, |this, cx| {
5732 let buffer = this
5733 .opened_buffers
5734 .get(&payload.buffer_id)
5735 .and_then(|buffer| buffer.upgrade(cx));
5736 if let Some(buffer) = buffer {
5737 buffer.update(cx, |buffer, cx| {
5738 buffer.did_reload(version, payload.fingerprint, line_ending, mtime, cx);
5739 });
5740 }
5741 Ok(())
5742 })
5743 }
5744
5745 #[allow(clippy::type_complexity)]
5746 fn edits_from_lsp(
5747 &mut self,
5748 buffer: &ModelHandle<Buffer>,
5749 lsp_edits: impl 'static + Send + IntoIterator<Item = lsp::TextEdit>,
5750 version: Option<i32>,
5751 cx: &mut ModelContext<Self>,
5752 ) -> Task<Result<Vec<(Range<Anchor>, String)>>> {
5753 let snapshot = self.buffer_snapshot_for_lsp_version(buffer, version, cx);
5754 cx.background().spawn(async move {
5755 let snapshot = snapshot?;
5756 let mut lsp_edits = lsp_edits
5757 .into_iter()
5758 .map(|edit| (range_from_lsp(edit.range), edit.new_text))
5759 .collect::<Vec<_>>();
5760 lsp_edits.sort_by_key(|(range, _)| range.start);
5761
5762 let mut lsp_edits = lsp_edits.into_iter().peekable();
5763 let mut edits = Vec::new();
5764 while let Some((range, mut new_text)) = lsp_edits.next() {
5765 // Clip invalid ranges provided by the language server.
5766 let mut range = snapshot.clip_point_utf16(range.start, Bias::Left)
5767 ..snapshot.clip_point_utf16(range.end, Bias::Left);
5768
5769 // Combine any LSP edits that are adjacent.
5770 //
5771 // Also, combine LSP edits that are separated from each other by only
5772 // a newline. This is important because for some code actions,
5773 // Rust-analyzer rewrites the entire buffer via a series of edits that
5774 // are separated by unchanged newline characters.
5775 //
5776 // In order for the diffing logic below to work properly, any edits that
5777 // cancel each other out must be combined into one.
5778 while let Some((next_range, next_text)) = lsp_edits.peek() {
5779 if next_range.start.0 > range.end {
5780 if next_range.start.0.row > range.end.row + 1
5781 || next_range.start.0.column > 0
5782 || snapshot.clip_point_utf16(
5783 Unclipped(PointUtf16::new(range.end.row, u32::MAX)),
5784 Bias::Left,
5785 ) > range.end
5786 {
5787 break;
5788 }
5789 new_text.push('\n');
5790 }
5791 range.end = snapshot.clip_point_utf16(next_range.end, Bias::Left);
5792 new_text.push_str(next_text);
5793 lsp_edits.next();
5794 }
5795
5796 // For multiline edits, perform a diff of the old and new text so that
5797 // we can identify the changes more precisely, preserving the locations
5798 // of any anchors positioned in the unchanged regions.
5799 if range.end.row > range.start.row {
5800 let mut offset = range.start.to_offset(&snapshot);
5801 let old_text = snapshot.text_for_range(range).collect::<String>();
5802
5803 let diff = TextDiff::from_lines(old_text.as_str(), &new_text);
5804 let mut moved_since_edit = true;
5805 for change in diff.iter_all_changes() {
5806 let tag = change.tag();
5807 let value = change.value();
5808 match tag {
5809 ChangeTag::Equal => {
5810 offset += value.len();
5811 moved_since_edit = true;
5812 }
5813 ChangeTag::Delete => {
5814 let start = snapshot.anchor_after(offset);
5815 let end = snapshot.anchor_before(offset + value.len());
5816 if moved_since_edit {
5817 edits.push((start..end, String::new()));
5818 } else {
5819 edits.last_mut().unwrap().0.end = end;
5820 }
5821 offset += value.len();
5822 moved_since_edit = false;
5823 }
5824 ChangeTag::Insert => {
5825 if moved_since_edit {
5826 let anchor = snapshot.anchor_after(offset);
5827 edits.push((anchor..anchor, value.to_string()));
5828 } else {
5829 edits.last_mut().unwrap().1.push_str(value);
5830 }
5831 moved_since_edit = false;
5832 }
5833 }
5834 }
5835 } else if range.end == range.start {
5836 let anchor = snapshot.anchor_after(range.start);
5837 edits.push((anchor..anchor, new_text));
5838 } else {
5839 let edit_start = snapshot.anchor_after(range.start);
5840 let edit_end = snapshot.anchor_before(range.end);
5841 edits.push((edit_start..edit_end, new_text));
5842 }
5843 }
5844
5845 Ok(edits)
5846 })
5847 }
5848
5849 fn buffer_snapshot_for_lsp_version(
5850 &mut self,
5851 buffer: &ModelHandle<Buffer>,
5852 version: Option<i32>,
5853 cx: &AppContext,
5854 ) -> Result<TextBufferSnapshot> {
5855 const OLD_VERSIONS_TO_RETAIN: i32 = 10;
5856
5857 if let Some(version) = version {
5858 let buffer_id = buffer.read(cx).remote_id();
5859 let snapshots = self
5860 .buffer_snapshots
5861 .get_mut(&buffer_id)
5862 .ok_or_else(|| anyhow!("no snapshot found for buffer {}", buffer_id))?;
5863 let mut found_snapshot = None;
5864 snapshots.retain(|(snapshot_version, snapshot)| {
5865 if snapshot_version + OLD_VERSIONS_TO_RETAIN < version {
5866 false
5867 } else {
5868 if *snapshot_version == version {
5869 found_snapshot = Some(snapshot.clone());
5870 }
5871 true
5872 }
5873 });
5874
5875 found_snapshot.ok_or_else(|| {
5876 anyhow!(
5877 "snapshot not found for buffer {} at version {}",
5878 buffer_id,
5879 version
5880 )
5881 })
5882 } else {
5883 Ok((buffer.read(cx)).text_snapshot())
5884 }
5885 }
5886
5887 fn language_server_for_buffer(
5888 &self,
5889 buffer: &Buffer,
5890 cx: &AppContext,
5891 ) -> Option<(&Arc<CachedLspAdapter>, &Arc<LanguageServer>)> {
5892 if let Some((file, language)) = File::from_dyn(buffer.file()).zip(buffer.language()) {
5893 let name = language.lsp_adapter()?.name.clone();
5894 let worktree_id = file.worktree_id(cx);
5895 let key = (worktree_id, name);
5896
5897 if let Some(server_id) = self.language_server_ids.get(&key) {
5898 if let Some(LanguageServerState::Running {
5899 adapter, server, ..
5900 }) = self.language_servers.get(server_id)
5901 {
5902 return Some((adapter, server));
5903 }
5904 }
5905 }
5906
5907 None
5908 }
5909}
5910
5911impl WorktreeHandle {
5912 pub fn upgrade(&self, cx: &AppContext) -> Option<ModelHandle<Worktree>> {
5913 match self {
5914 WorktreeHandle::Strong(handle) => Some(handle.clone()),
5915 WorktreeHandle::Weak(handle) => handle.upgrade(cx),
5916 }
5917 }
5918}
5919
5920impl OpenBuffer {
5921 pub fn upgrade(&self, cx: &impl UpgradeModelHandle) -> Option<ModelHandle<Buffer>> {
5922 match self {
5923 OpenBuffer::Strong(handle) => Some(handle.clone()),
5924 OpenBuffer::Weak(handle) => handle.upgrade(cx),
5925 OpenBuffer::Operations(_) => None,
5926 }
5927 }
5928}
5929
5930pub struct PathMatchCandidateSet {
5931 pub snapshot: Snapshot,
5932 pub include_ignored: bool,
5933 pub include_root_name: bool,
5934}
5935
5936impl<'a> fuzzy::PathMatchCandidateSet<'a> for PathMatchCandidateSet {
5937 type Candidates = PathMatchCandidateSetIter<'a>;
5938
5939 fn id(&self) -> usize {
5940 self.snapshot.id().to_usize()
5941 }
5942
5943 fn len(&self) -> usize {
5944 if self.include_ignored {
5945 self.snapshot.file_count()
5946 } else {
5947 self.snapshot.visible_file_count()
5948 }
5949 }
5950
5951 fn prefix(&self) -> Arc<str> {
5952 if self.snapshot.root_entry().map_or(false, |e| e.is_file()) {
5953 self.snapshot.root_name().into()
5954 } else if self.include_root_name {
5955 format!("{}/", self.snapshot.root_name()).into()
5956 } else {
5957 "".into()
5958 }
5959 }
5960
5961 fn candidates(&'a self, start: usize) -> Self::Candidates {
5962 PathMatchCandidateSetIter {
5963 traversal: self.snapshot.files(self.include_ignored, start),
5964 }
5965 }
5966}
5967
5968pub struct PathMatchCandidateSetIter<'a> {
5969 traversal: Traversal<'a>,
5970}
5971
5972impl<'a> Iterator for PathMatchCandidateSetIter<'a> {
5973 type Item = fuzzy::PathMatchCandidate<'a>;
5974
5975 fn next(&mut self) -> Option<Self::Item> {
5976 self.traversal.next().map(|entry| {
5977 if let EntryKind::File(char_bag) = entry.kind {
5978 fuzzy::PathMatchCandidate {
5979 path: &entry.path,
5980 char_bag,
5981 }
5982 } else {
5983 unreachable!()
5984 }
5985 })
5986 }
5987}
5988
5989impl Entity for Project {
5990 type Event = Event;
5991
5992 fn release(&mut self, _: &mut gpui::MutableAppContext) {
5993 match &self.client_state {
5994 Some(ProjectClientState::Local { remote_id, .. }) => {
5995 self.client
5996 .send(proto::UnshareProject {
5997 project_id: *remote_id,
5998 })
5999 .log_err();
6000 }
6001 Some(ProjectClientState::Remote { remote_id, .. }) => {
6002 self.client
6003 .send(proto::LeaveProject {
6004 project_id: *remote_id,
6005 })
6006 .log_err();
6007 }
6008 _ => {}
6009 }
6010 }
6011
6012 fn app_will_quit(
6013 &mut self,
6014 _: &mut MutableAppContext,
6015 ) -> Option<std::pin::Pin<Box<dyn 'static + Future<Output = ()>>>> {
6016 let shutdown_futures = self
6017 .language_servers
6018 .drain()
6019 .map(|(_, server_state)| async {
6020 match server_state {
6021 LanguageServerState::Running { server, .. } => server.shutdown()?.await,
6022 LanguageServerState::Starting(starting_server) => {
6023 starting_server.await?.shutdown()?.await
6024 }
6025 }
6026 })
6027 .collect::<Vec<_>>();
6028
6029 Some(
6030 async move {
6031 futures::future::join_all(shutdown_futures).await;
6032 }
6033 .boxed(),
6034 )
6035 }
6036}
6037
6038impl Collaborator {
6039 fn from_proto(message: proto::Collaborator) -> Self {
6040 Self {
6041 peer_id: PeerId(message.peer_id),
6042 replica_id: message.replica_id as ReplicaId,
6043 }
6044 }
6045}
6046
6047impl<P: AsRef<Path>> From<(WorktreeId, P)> for ProjectPath {
6048 fn from((worktree_id, path): (WorktreeId, P)) -> Self {
6049 Self {
6050 worktree_id,
6051 path: path.as_ref().into(),
6052 }
6053 }
6054}
6055
6056fn serialize_symbol(symbol: &Symbol) -> proto::Symbol {
6057 proto::Symbol {
6058 language_server_name: symbol.language_server_name.0.to_string(),
6059 source_worktree_id: symbol.source_worktree_id.to_proto(),
6060 worktree_id: symbol.path.worktree_id.to_proto(),
6061 path: symbol.path.path.to_string_lossy().to_string(),
6062 name: symbol.name.clone(),
6063 kind: unsafe { mem::transmute(symbol.kind) },
6064 start: Some(proto::PointUtf16 {
6065 row: symbol.range.start.0.row,
6066 column: symbol.range.start.0.column,
6067 }),
6068 end: Some(proto::PointUtf16 {
6069 row: symbol.range.end.0.row,
6070 column: symbol.range.end.0.column,
6071 }),
6072 signature: symbol.signature.to_vec(),
6073 }
6074}
6075
6076fn relativize_path(base: &Path, path: &Path) -> PathBuf {
6077 let mut path_components = path.components();
6078 let mut base_components = base.components();
6079 let mut components: Vec<Component> = Vec::new();
6080 loop {
6081 match (path_components.next(), base_components.next()) {
6082 (None, None) => break,
6083 (Some(a), None) => {
6084 components.push(a);
6085 components.extend(path_components.by_ref());
6086 break;
6087 }
6088 (None, _) => components.push(Component::ParentDir),
6089 (Some(a), Some(b)) if components.is_empty() && a == b => (),
6090 (Some(a), Some(b)) if b == Component::CurDir => components.push(a),
6091 (Some(a), Some(_)) => {
6092 components.push(Component::ParentDir);
6093 for _ in base_components {
6094 components.push(Component::ParentDir);
6095 }
6096 components.push(a);
6097 components.extend(path_components.by_ref());
6098 break;
6099 }
6100 }
6101 }
6102 components.iter().map(|c| c.as_os_str()).collect()
6103}
6104
6105impl Item for Buffer {
6106 fn entry_id(&self, cx: &AppContext) -> Option<ProjectEntryId> {
6107 File::from_dyn(self.file()).and_then(|file| file.project_entry_id(cx))
6108 }
6109}