1mod ignore;
2mod lsp_command;
3pub mod search;
4pub mod worktree;
5
6#[cfg(test)]
7mod project_tests;
8
9use anyhow::{anyhow, Context, Result};
10use client::{proto, Client, TypedEnvelope, UserStore};
11use clock::ReplicaId;
12use collections::{hash_map, BTreeMap, HashMap, HashSet};
13use futures::{
14 channel::{mpsc, oneshot},
15 future::Shared,
16 select_biased, AsyncWriteExt, Future, FutureExt, StreamExt, TryFutureExt,
17};
18use gpui::{
19 AnyModelHandle, AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle,
20 MutableAppContext, Task, UpgradeModelHandle, WeakModelHandle,
21};
22use language::{
23 point_to_lsp,
24 proto::{
25 deserialize_anchor, deserialize_line_ending, deserialize_version, serialize_anchor,
26 serialize_version,
27 },
28 range_from_lsp, range_to_lsp, Anchor, Bias, Buffer, CachedLspAdapter, CharKind, CodeAction,
29 CodeLabel, Completion, Diagnostic, DiagnosticEntry, DiagnosticSet, Event as BufferEvent,
30 File as _, Language, LanguageRegistry, LanguageServerName, LocalFile, OffsetRangeExt,
31 Operation, Patch, PointUtf16, TextBufferSnapshot, ToOffset, ToPointUtf16, Transaction,
32 Unclipped,
33};
34use lsp::{
35 DiagnosticSeverity, DiagnosticTag, DocumentHighlightKind, LanguageServer, LanguageString,
36 MarkedString,
37};
38use lsp_command::*;
39use parking_lot::Mutex;
40use postage::watch;
41use rand::prelude::*;
42use search::SearchQuery;
43use serde::Serialize;
44use settings::{FormatOnSave, Formatter, Settings};
45use sha2::{Digest, Sha256};
46use similar::{ChangeTag, TextDiff};
47use std::{
48 cell::RefCell,
49 cmp::{self, Ordering},
50 convert::TryInto,
51 hash::Hash,
52 mem,
53 num::NonZeroU32,
54 ops::Range,
55 path::{Component, Path, PathBuf},
56 rc::Rc,
57 str,
58 sync::{
59 atomic::{AtomicUsize, Ordering::SeqCst},
60 Arc,
61 },
62 time::Instant,
63};
64use terminal::{Terminal, TerminalBuilder};
65use thiserror::Error;
66use util::{defer, post_inc, ResultExt, TryFutureExt as _};
67
68pub use fs::*;
69pub use worktree::*;
70
71pub trait Item: Entity {
72 fn entry_id(&self, cx: &AppContext) -> Option<ProjectEntryId>;
73}
74
75// Language server state is stored across 3 collections:
76// language_servers =>
77// a mapping from unique server id to LanguageServerState which can either be a task for a
78// server in the process of starting, or a running server with adapter and language server arcs
79// language_server_ids => a mapping from worktreeId and server name to the unique server id
80// language_server_statuses => a mapping from unique server id to the current server status
81//
82// Multiple worktrees can map to the same language server for example when you jump to the definition
83// of a file in the standard library. So language_server_ids is used to look up which server is active
84// for a given worktree and language server name
85//
86// When starting a language server, first the id map is checked to make sure a server isn't already available
87// for that worktree. If there is one, it finishes early. Otherwise, a new id is allocated and and
88// the Starting variant of LanguageServerState is stored in the language_servers map.
89pub struct Project {
90 worktrees: Vec<WorktreeHandle>,
91 active_entry: Option<ProjectEntryId>,
92 languages: Arc<LanguageRegistry>,
93 language_servers: HashMap<usize, LanguageServerState>,
94 language_server_ids: HashMap<(WorktreeId, LanguageServerName), usize>,
95 language_server_statuses: BTreeMap<usize, LanguageServerStatus>,
96 language_server_settings: Arc<Mutex<serde_json::Value>>,
97 last_workspace_edits_by_language_server: HashMap<usize, ProjectTransaction>,
98 next_language_server_id: usize,
99 client: Arc<client::Client>,
100 next_entry_id: Arc<AtomicUsize>,
101 next_diagnostic_group_id: usize,
102 user_store: ModelHandle<UserStore>,
103 fs: Arc<dyn Fs>,
104 client_state: Option<ProjectClientState>,
105 collaborators: HashMap<proto::PeerId, Collaborator>,
106 client_subscriptions: Vec<client::Subscription>,
107 _subscriptions: Vec<gpui::Subscription>,
108 opened_buffer: (watch::Sender<()>, watch::Receiver<()>),
109 shared_buffers: HashMap<proto::PeerId, HashSet<u64>>,
110 #[allow(clippy::type_complexity)]
111 loading_buffers: HashMap<
112 ProjectPath,
113 postage::watch::Receiver<Option<Result<ModelHandle<Buffer>, Arc<anyhow::Error>>>>,
114 >,
115 #[allow(clippy::type_complexity)]
116 loading_local_worktrees:
117 HashMap<Arc<Path>, Shared<Task<Result<ModelHandle<Worktree>, Arc<anyhow::Error>>>>>,
118 opened_buffers: HashMap<u64, OpenBuffer>,
119 incomplete_buffers: HashMap<u64, ModelHandle<Buffer>>,
120 buffer_snapshots: HashMap<u64, Vec<(i32, TextBufferSnapshot)>>,
121 buffers_being_formatted: HashSet<usize>,
122 nonce: u128,
123 _maintain_buffer_languages: Task<()>,
124}
125
126#[derive(Error, Debug)]
127pub enum JoinProjectError {
128 #[error("host declined join request")]
129 HostDeclined,
130 #[error("host closed the project")]
131 HostClosedProject,
132 #[error("host went offline")]
133 HostWentOffline,
134 #[error("{0}")]
135 Other(#[from] anyhow::Error),
136}
137
138enum OpenBuffer {
139 Strong(ModelHandle<Buffer>),
140 Weak(WeakModelHandle<Buffer>),
141 Operations(Vec<Operation>),
142}
143
144enum WorktreeHandle {
145 Strong(ModelHandle<Worktree>),
146 Weak(WeakModelHandle<Worktree>),
147}
148
149enum ProjectClientState {
150 Local {
151 remote_id: u64,
152 metadata_changed: mpsc::UnboundedSender<oneshot::Sender<()>>,
153 _maintain_metadata: Task<()>,
154 },
155 Remote {
156 sharing_has_stopped: bool,
157 remote_id: u64,
158 replica_id: ReplicaId,
159 },
160}
161
162#[derive(Clone, Debug)]
163pub struct Collaborator {
164 pub peer_id: proto::PeerId,
165 pub replica_id: ReplicaId,
166}
167
168#[derive(Clone, Debug, PartialEq, Eq)]
169pub enum Event {
170 ActiveEntryChanged(Option<ProjectEntryId>),
171 WorktreeAdded,
172 WorktreeRemoved(WorktreeId),
173 DiskBasedDiagnosticsStarted {
174 language_server_id: usize,
175 },
176 DiskBasedDiagnosticsFinished {
177 language_server_id: usize,
178 },
179 DiagnosticsUpdated {
180 path: ProjectPath,
181 language_server_id: usize,
182 },
183 RemoteIdChanged(Option<u64>),
184 DisconnectedFromHost,
185 CollaboratorLeft(proto::PeerId),
186}
187
188pub enum LanguageServerState {
189 Starting(Task<Option<Arc<LanguageServer>>>),
190 Running {
191 language: Arc<Language>,
192 adapter: Arc<CachedLspAdapter>,
193 server: Arc<LanguageServer>,
194 },
195}
196
197#[derive(Serialize)]
198pub struct LanguageServerStatus {
199 pub name: String,
200 pub pending_work: BTreeMap<String, LanguageServerProgress>,
201 pub has_pending_diagnostic_updates: bool,
202 progress_tokens: HashSet<String>,
203}
204
205#[derive(Clone, Debug, Serialize)]
206pub struct LanguageServerProgress {
207 pub message: Option<String>,
208 pub percentage: Option<usize>,
209 #[serde(skip_serializing)]
210 pub last_update_at: Instant,
211}
212
213#[derive(Clone, Debug, Eq, PartialEq, Hash, PartialOrd, Ord)]
214pub struct ProjectPath {
215 pub worktree_id: WorktreeId,
216 pub path: Arc<Path>,
217}
218
219#[derive(Copy, Clone, Debug, Default, PartialEq, Serialize)]
220pub struct DiagnosticSummary {
221 pub language_server_id: usize,
222 pub error_count: usize,
223 pub warning_count: usize,
224}
225
226#[derive(Debug, Clone)]
227pub struct Location {
228 pub buffer: ModelHandle<Buffer>,
229 pub range: Range<language::Anchor>,
230}
231
232#[derive(Debug, Clone)]
233pub struct LocationLink {
234 pub origin: Option<Location>,
235 pub target: Location,
236}
237
238#[derive(Debug)]
239pub struct DocumentHighlight {
240 pub range: Range<language::Anchor>,
241 pub kind: DocumentHighlightKind,
242}
243
244#[derive(Clone, Debug)]
245pub struct Symbol {
246 pub language_server_name: LanguageServerName,
247 pub source_worktree_id: WorktreeId,
248 pub path: ProjectPath,
249 pub label: CodeLabel,
250 pub name: String,
251 pub kind: lsp::SymbolKind,
252 pub range: Range<Unclipped<PointUtf16>>,
253 pub signature: [u8; 32],
254}
255
256#[derive(Clone, Debug, PartialEq)]
257pub struct HoverBlock {
258 pub text: String,
259 pub language: Option<String>,
260}
261
262impl HoverBlock {
263 fn try_new(marked_string: MarkedString) -> Option<Self> {
264 let result = match marked_string {
265 MarkedString::LanguageString(LanguageString { language, value }) => HoverBlock {
266 text: value,
267 language: Some(language),
268 },
269 MarkedString::String(text) => HoverBlock {
270 text,
271 language: None,
272 },
273 };
274 if result.text.is_empty() {
275 None
276 } else {
277 Some(result)
278 }
279 }
280}
281
282#[derive(Debug)]
283pub struct Hover {
284 pub contents: Vec<HoverBlock>,
285 pub range: Option<Range<language::Anchor>>,
286}
287
288#[derive(Default)]
289pub struct ProjectTransaction(pub HashMap<ModelHandle<Buffer>, language::Transaction>);
290
291impl DiagnosticSummary {
292 fn new<'a, T: 'a>(
293 language_server_id: usize,
294 diagnostics: impl IntoIterator<Item = &'a DiagnosticEntry<T>>,
295 ) -> Self {
296 let mut this = Self {
297 language_server_id,
298 error_count: 0,
299 warning_count: 0,
300 };
301
302 for entry in diagnostics {
303 if entry.diagnostic.is_primary {
304 match entry.diagnostic.severity {
305 DiagnosticSeverity::ERROR => this.error_count += 1,
306 DiagnosticSeverity::WARNING => this.warning_count += 1,
307 _ => {}
308 }
309 }
310 }
311
312 this
313 }
314
315 pub fn is_empty(&self) -> bool {
316 self.error_count == 0 && self.warning_count == 0
317 }
318
319 pub fn to_proto(&self, path: &Path) -> proto::DiagnosticSummary {
320 proto::DiagnosticSummary {
321 path: path.to_string_lossy().to_string(),
322 language_server_id: self.language_server_id as u64,
323 error_count: self.error_count as u32,
324 warning_count: self.warning_count as u32,
325 }
326 }
327}
328
329#[derive(Clone, Copy, Debug, Default, Hash, PartialEq, Eq, PartialOrd, Ord)]
330pub struct ProjectEntryId(usize);
331
332impl ProjectEntryId {
333 pub const MAX: Self = Self(usize::MAX);
334
335 pub fn new(counter: &AtomicUsize) -> Self {
336 Self(counter.fetch_add(1, SeqCst))
337 }
338
339 pub fn from_proto(id: u64) -> Self {
340 Self(id as usize)
341 }
342
343 pub fn to_proto(&self) -> u64 {
344 self.0 as u64
345 }
346
347 pub fn to_usize(&self) -> usize {
348 self.0
349 }
350}
351
352#[derive(Debug, Clone, Copy, PartialEq, Eq)]
353pub enum FormatTrigger {
354 Save,
355 Manual,
356}
357
358impl FormatTrigger {
359 fn from_proto(value: i32) -> FormatTrigger {
360 match value {
361 0 => FormatTrigger::Save,
362 1 => FormatTrigger::Manual,
363 _ => FormatTrigger::Save,
364 }
365 }
366}
367
368impl Project {
369 pub fn init(client: &Arc<Client>) {
370 client.add_model_message_handler(Self::handle_add_collaborator);
371 client.add_model_message_handler(Self::handle_buffer_reloaded);
372 client.add_model_message_handler(Self::handle_buffer_saved);
373 client.add_model_message_handler(Self::handle_start_language_server);
374 client.add_model_message_handler(Self::handle_update_language_server);
375 client.add_model_message_handler(Self::handle_remove_collaborator);
376 client.add_model_message_handler(Self::handle_update_project);
377 client.add_model_message_handler(Self::handle_unshare_project);
378 client.add_model_message_handler(Self::handle_create_buffer_for_peer);
379 client.add_model_message_handler(Self::handle_update_buffer_file);
380 client.add_model_message_handler(Self::handle_update_buffer);
381 client.add_model_message_handler(Self::handle_update_diagnostic_summary);
382 client.add_model_message_handler(Self::handle_update_worktree);
383 client.add_model_request_handler(Self::handle_create_project_entry);
384 client.add_model_request_handler(Self::handle_rename_project_entry);
385 client.add_model_request_handler(Self::handle_copy_project_entry);
386 client.add_model_request_handler(Self::handle_delete_project_entry);
387 client.add_model_request_handler(Self::handle_apply_additional_edits_for_completion);
388 client.add_model_request_handler(Self::handle_apply_code_action);
389 client.add_model_request_handler(Self::handle_reload_buffers);
390 client.add_model_request_handler(Self::handle_format_buffers);
391 client.add_model_request_handler(Self::handle_get_code_actions);
392 client.add_model_request_handler(Self::handle_get_completions);
393 client.add_model_request_handler(Self::handle_lsp_command::<GetHover>);
394 client.add_model_request_handler(Self::handle_lsp_command::<GetDefinition>);
395 client.add_model_request_handler(Self::handle_lsp_command::<GetTypeDefinition>);
396 client.add_model_request_handler(Self::handle_lsp_command::<GetDocumentHighlights>);
397 client.add_model_request_handler(Self::handle_lsp_command::<GetReferences>);
398 client.add_model_request_handler(Self::handle_lsp_command::<PrepareRename>);
399 client.add_model_request_handler(Self::handle_lsp_command::<PerformRename>);
400 client.add_model_request_handler(Self::handle_search_project);
401 client.add_model_request_handler(Self::handle_get_project_symbols);
402 client.add_model_request_handler(Self::handle_open_buffer_for_symbol);
403 client.add_model_request_handler(Self::handle_open_buffer_by_id);
404 client.add_model_request_handler(Self::handle_open_buffer_by_path);
405 client.add_model_request_handler(Self::handle_save_buffer);
406 client.add_model_message_handler(Self::handle_update_diff_base);
407 }
408
409 pub fn local(
410 client: Arc<Client>,
411 user_store: ModelHandle<UserStore>,
412 languages: Arc<LanguageRegistry>,
413 fs: Arc<dyn Fs>,
414 cx: &mut MutableAppContext,
415 ) -> ModelHandle<Self> {
416 cx.add_model(|cx: &mut ModelContext<Self>| Self {
417 worktrees: Default::default(),
418 collaborators: Default::default(),
419 opened_buffers: Default::default(),
420 shared_buffers: Default::default(),
421 incomplete_buffers: Default::default(),
422 loading_buffers: Default::default(),
423 loading_local_worktrees: Default::default(),
424 buffer_snapshots: Default::default(),
425 client_state: None,
426 opened_buffer: watch::channel(),
427 client_subscriptions: Vec::new(),
428 _subscriptions: vec![cx.observe_global::<Settings, _>(Self::on_settings_changed)],
429 _maintain_buffer_languages: Self::maintain_buffer_languages(&languages, cx),
430 active_entry: None,
431 languages,
432 client,
433 user_store,
434 fs,
435 next_entry_id: Default::default(),
436 next_diagnostic_group_id: Default::default(),
437 language_servers: Default::default(),
438 language_server_ids: Default::default(),
439 language_server_statuses: Default::default(),
440 last_workspace_edits_by_language_server: Default::default(),
441 language_server_settings: Default::default(),
442 buffers_being_formatted: Default::default(),
443 next_language_server_id: 0,
444 nonce: StdRng::from_entropy().gen(),
445 })
446 }
447
448 pub async fn remote(
449 remote_id: u64,
450 client: Arc<Client>,
451 user_store: ModelHandle<UserStore>,
452 languages: Arc<LanguageRegistry>,
453 fs: Arc<dyn Fs>,
454 mut cx: AsyncAppContext,
455 ) -> Result<ModelHandle<Self>, JoinProjectError> {
456 client.authenticate_and_connect(true, &cx).await?;
457
458 let subscription = client.subscribe_to_entity(remote_id);
459 let response = client
460 .request(proto::JoinProject {
461 project_id: remote_id,
462 })
463 .await?;
464 let this = cx.add_model(|cx| {
465 let replica_id = response.replica_id as ReplicaId;
466
467 let mut worktrees = Vec::new();
468 for worktree in response.worktrees {
469 let worktree = cx.update(|cx| {
470 Worktree::remote(remote_id, replica_id, worktree, client.clone(), cx)
471 });
472 worktrees.push(worktree);
473 }
474
475 let mut this = Self {
476 worktrees: Vec::new(),
477 loading_buffers: Default::default(),
478 opened_buffer: watch::channel(),
479 shared_buffers: Default::default(),
480 incomplete_buffers: Default::default(),
481 loading_local_worktrees: Default::default(),
482 active_entry: None,
483 collaborators: Default::default(),
484 _maintain_buffer_languages: Self::maintain_buffer_languages(&languages, cx),
485 languages,
486 user_store: user_store.clone(),
487 fs,
488 next_entry_id: Default::default(),
489 next_diagnostic_group_id: Default::default(),
490 client_subscriptions: Default::default(),
491 _subscriptions: Default::default(),
492 client: client.clone(),
493 client_state: Some(ProjectClientState::Remote {
494 sharing_has_stopped: false,
495 remote_id,
496 replica_id,
497 }),
498 language_servers: Default::default(),
499 language_server_ids: Default::default(),
500 language_server_settings: Default::default(),
501 language_server_statuses: response
502 .language_servers
503 .into_iter()
504 .map(|server| {
505 (
506 server.id as usize,
507 LanguageServerStatus {
508 name: server.name,
509 pending_work: Default::default(),
510 has_pending_diagnostic_updates: false,
511 progress_tokens: Default::default(),
512 },
513 )
514 })
515 .collect(),
516 last_workspace_edits_by_language_server: Default::default(),
517 next_language_server_id: 0,
518 opened_buffers: Default::default(),
519 buffers_being_formatted: Default::default(),
520 buffer_snapshots: Default::default(),
521 nonce: StdRng::from_entropy().gen(),
522 };
523 for worktree in worktrees {
524 let _ = this.add_worktree(&worktree, cx);
525 }
526 this
527 });
528 let subscription = subscription.set_model(&this, &mut cx);
529
530 let user_ids = response
531 .collaborators
532 .iter()
533 .map(|peer| peer.user_id)
534 .collect();
535 user_store
536 .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))
537 .await?;
538
539 this.update(&mut cx, |this, cx| {
540 this.set_collaborators_from_proto(response.collaborators, cx)?;
541 this.client_subscriptions.push(subscription);
542 anyhow::Ok(())
543 })?;
544
545 Ok(this)
546 }
547
548 #[cfg(any(test, feature = "test-support"))]
549 pub async fn test(
550 fs: Arc<dyn Fs>,
551 root_paths: impl IntoIterator<Item = &Path>,
552 cx: &mut gpui::TestAppContext,
553 ) -> ModelHandle<Project> {
554 if !cx.read(|cx| cx.has_global::<Settings>()) {
555 cx.update(|cx| {
556 cx.set_global(Settings::test(cx));
557 cx.set_global(HomeDir(Path::new("/tmp/").to_path_buf()))
558 });
559 }
560
561 let languages = Arc::new(LanguageRegistry::test());
562 let http_client = client::test::FakeHttpClient::with_404_response();
563 let client = cx.update(|cx| client::Client::new(http_client.clone(), cx));
564 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
565 let project = cx.update(|cx| Project::local(client, user_store, languages, fs, cx));
566 for path in root_paths {
567 let (tree, _) = project
568 .update(cx, |project, cx| {
569 project.find_or_create_local_worktree(path, true, cx)
570 })
571 .await
572 .unwrap();
573 tree.read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
574 .await;
575 }
576 project
577 }
578
579 fn on_settings_changed(&mut self, cx: &mut ModelContext<Self>) {
580 let settings = cx.global::<Settings>();
581
582 let mut language_servers_to_start = Vec::new();
583 for buffer in self.opened_buffers.values() {
584 if let Some(buffer) = buffer.upgrade(cx) {
585 let buffer = buffer.read(cx);
586 if let Some((file, language)) = File::from_dyn(buffer.file()).zip(buffer.language())
587 {
588 if settings.enable_language_server(Some(&language.name())) {
589 let worktree = file.worktree.read(cx);
590 language_servers_to_start.push((
591 worktree.id(),
592 worktree.as_local().unwrap().abs_path().clone(),
593 language.clone(),
594 ));
595 }
596 }
597 }
598 }
599
600 let mut language_servers_to_stop = Vec::new();
601 for language in self.languages.to_vec() {
602 if let Some(lsp_adapter) = language.lsp_adapter() {
603 if !settings.enable_language_server(Some(&language.name())) {
604 let lsp_name = &lsp_adapter.name;
605 for (worktree_id, started_lsp_name) in self.language_server_ids.keys() {
606 if lsp_name == started_lsp_name {
607 language_servers_to_stop.push((*worktree_id, started_lsp_name.clone()));
608 }
609 }
610 }
611 }
612 }
613
614 // Stop all newly-disabled language servers.
615 for (worktree_id, adapter_name) in language_servers_to_stop {
616 self.stop_language_server(worktree_id, adapter_name, cx)
617 .detach();
618 }
619
620 // Start all the newly-enabled language servers.
621 for (worktree_id, worktree_path, language) in language_servers_to_start {
622 self.start_language_server(worktree_id, worktree_path, language, cx);
623 }
624
625 cx.notify();
626 }
627
628 pub fn buffer_for_id(&self, remote_id: u64, cx: &AppContext) -> Option<ModelHandle<Buffer>> {
629 self.opened_buffers
630 .get(&remote_id)
631 .and_then(|buffer| buffer.upgrade(cx))
632 }
633
634 pub fn languages(&self) -> &Arc<LanguageRegistry> {
635 &self.languages
636 }
637
638 pub fn client(&self) -> Arc<Client> {
639 self.client.clone()
640 }
641
642 pub fn user_store(&self) -> ModelHandle<UserStore> {
643 self.user_store.clone()
644 }
645
646 #[cfg(any(test, feature = "test-support"))]
647 pub fn check_invariants(&self, cx: &AppContext) {
648 if self.is_local() {
649 let mut worktree_root_paths = HashMap::default();
650 for worktree in self.worktrees(cx) {
651 let worktree = worktree.read(cx);
652 let abs_path = worktree.as_local().unwrap().abs_path().clone();
653 let prev_worktree_id = worktree_root_paths.insert(abs_path.clone(), worktree.id());
654 assert_eq!(
655 prev_worktree_id,
656 None,
657 "abs path {:?} for worktree {:?} is not unique ({:?} was already registered with the same path)",
658 abs_path,
659 worktree.id(),
660 prev_worktree_id
661 )
662 }
663 } else {
664 let replica_id = self.replica_id();
665 for buffer in self.opened_buffers.values() {
666 if let Some(buffer) = buffer.upgrade(cx) {
667 let buffer = buffer.read(cx);
668 assert_eq!(
669 buffer.deferred_ops_len(),
670 0,
671 "replica {}, buffer {} has deferred operations",
672 replica_id,
673 buffer.remote_id()
674 );
675 }
676 }
677 }
678 }
679
680 #[cfg(any(test, feature = "test-support"))]
681 pub fn has_open_buffer(&self, path: impl Into<ProjectPath>, cx: &AppContext) -> bool {
682 let path = path.into();
683 if let Some(worktree) = self.worktree_for_id(path.worktree_id, cx) {
684 self.opened_buffers.iter().any(|(_, buffer)| {
685 if let Some(buffer) = buffer.upgrade(cx) {
686 if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
687 if file.worktree == worktree && file.path() == &path.path {
688 return true;
689 }
690 }
691 }
692 false
693 })
694 } else {
695 false
696 }
697 }
698
699 pub fn fs(&self) -> &Arc<dyn Fs> {
700 &self.fs
701 }
702
703 pub fn remote_id(&self) -> Option<u64> {
704 match self.client_state.as_ref()? {
705 ProjectClientState::Local { remote_id, .. }
706 | ProjectClientState::Remote { remote_id, .. } => Some(*remote_id),
707 }
708 }
709
710 pub fn replica_id(&self) -> ReplicaId {
711 match &self.client_state {
712 Some(ProjectClientState::Remote { replica_id, .. }) => *replica_id,
713 _ => 0,
714 }
715 }
716
717 fn metadata_changed(&mut self, cx: &mut ModelContext<Self>) -> impl Future<Output = ()> {
718 let (tx, rx) = oneshot::channel();
719 if let Some(ProjectClientState::Local {
720 metadata_changed, ..
721 }) = &mut self.client_state
722 {
723 let _ = metadata_changed.unbounded_send(tx);
724 }
725 cx.notify();
726
727 async move {
728 // If the project is shared, this will resolve when the `_maintain_metadata` task has
729 // a chance to update the metadata. Otherwise, it will resolve right away because `tx`
730 // will get dropped.
731 let _ = rx.await;
732 }
733 }
734
735 pub fn collaborators(&self) -> &HashMap<proto::PeerId, Collaborator> {
736 &self.collaborators
737 }
738
739 /// Collect all worktrees, including ones that don't appear in the project panel
740 pub fn worktrees<'a>(
741 &'a self,
742 cx: &'a AppContext,
743 ) -> impl 'a + DoubleEndedIterator<Item = ModelHandle<Worktree>> {
744 self.worktrees
745 .iter()
746 .filter_map(move |worktree| worktree.upgrade(cx))
747 }
748
749 /// Collect all user-visible worktrees, the ones that appear in the project panel
750 pub fn visible_worktrees<'a>(
751 &'a self,
752 cx: &'a AppContext,
753 ) -> impl 'a + DoubleEndedIterator<Item = ModelHandle<Worktree>> {
754 self.worktrees.iter().filter_map(|worktree| {
755 worktree.upgrade(cx).and_then(|worktree| {
756 if worktree.read(cx).is_visible() {
757 Some(worktree)
758 } else {
759 None
760 }
761 })
762 })
763 }
764
765 pub fn worktree_root_names<'a>(&'a self, cx: &'a AppContext) -> impl Iterator<Item = &'a str> {
766 self.visible_worktrees(cx)
767 .map(|tree| tree.read(cx).root_name())
768 }
769
770 pub fn worktree_for_id(
771 &self,
772 id: WorktreeId,
773 cx: &AppContext,
774 ) -> Option<ModelHandle<Worktree>> {
775 self.worktrees(cx)
776 .find(|worktree| worktree.read(cx).id() == id)
777 }
778
779 pub fn worktree_for_entry(
780 &self,
781 entry_id: ProjectEntryId,
782 cx: &AppContext,
783 ) -> Option<ModelHandle<Worktree>> {
784 self.worktrees(cx)
785 .find(|worktree| worktree.read(cx).contains_entry(entry_id))
786 }
787
788 pub fn worktree_id_for_entry(
789 &self,
790 entry_id: ProjectEntryId,
791 cx: &AppContext,
792 ) -> Option<WorktreeId> {
793 self.worktree_for_entry(entry_id, cx)
794 .map(|worktree| worktree.read(cx).id())
795 }
796
797 pub fn contains_paths(&self, paths: &[PathBuf], cx: &AppContext) -> bool {
798 paths.iter().all(|path| self.contains_path(path, cx))
799 }
800
801 pub fn contains_path(&self, path: &Path, cx: &AppContext) -> bool {
802 for worktree in self.worktrees(cx) {
803 let worktree = worktree.read(cx).as_local();
804 if worktree.map_or(false, |w| w.contains_abs_path(path)) {
805 return true;
806 }
807 }
808 false
809 }
810
811 pub fn create_entry(
812 &mut self,
813 project_path: impl Into<ProjectPath>,
814 is_directory: bool,
815 cx: &mut ModelContext<Self>,
816 ) -> Option<Task<Result<Entry>>> {
817 let project_path = project_path.into();
818 let worktree = self.worktree_for_id(project_path.worktree_id, cx)?;
819 if self.is_local() {
820 Some(worktree.update(cx, |worktree, cx| {
821 worktree
822 .as_local_mut()
823 .unwrap()
824 .create_entry(project_path.path, is_directory, cx)
825 }))
826 } else {
827 let client = self.client.clone();
828 let project_id = self.remote_id().unwrap();
829 Some(cx.spawn_weak(|_, mut cx| async move {
830 let response = client
831 .request(proto::CreateProjectEntry {
832 worktree_id: project_path.worktree_id.to_proto(),
833 project_id,
834 path: project_path.path.to_string_lossy().into(),
835 is_directory,
836 })
837 .await?;
838 let entry = response
839 .entry
840 .ok_or_else(|| anyhow!("missing entry in response"))?;
841 worktree
842 .update(&mut cx, |worktree, cx| {
843 worktree.as_remote_mut().unwrap().insert_entry(
844 entry,
845 response.worktree_scan_id as usize,
846 cx,
847 )
848 })
849 .await
850 }))
851 }
852 }
853
854 pub fn copy_entry(
855 &mut self,
856 entry_id: ProjectEntryId,
857 new_path: impl Into<Arc<Path>>,
858 cx: &mut ModelContext<Self>,
859 ) -> Option<Task<Result<Entry>>> {
860 let worktree = self.worktree_for_entry(entry_id, cx)?;
861 let new_path = new_path.into();
862 if self.is_local() {
863 worktree.update(cx, |worktree, cx| {
864 worktree
865 .as_local_mut()
866 .unwrap()
867 .copy_entry(entry_id, new_path, cx)
868 })
869 } else {
870 let client = self.client.clone();
871 let project_id = self.remote_id().unwrap();
872
873 Some(cx.spawn_weak(|_, mut cx| async move {
874 let response = client
875 .request(proto::CopyProjectEntry {
876 project_id,
877 entry_id: entry_id.to_proto(),
878 new_path: new_path.to_string_lossy().into(),
879 })
880 .await?;
881 let entry = response
882 .entry
883 .ok_or_else(|| anyhow!("missing entry in response"))?;
884 worktree
885 .update(&mut cx, |worktree, cx| {
886 worktree.as_remote_mut().unwrap().insert_entry(
887 entry,
888 response.worktree_scan_id as usize,
889 cx,
890 )
891 })
892 .await
893 }))
894 }
895 }
896
897 pub fn rename_entry(
898 &mut self,
899 entry_id: ProjectEntryId,
900 new_path: impl Into<Arc<Path>>,
901 cx: &mut ModelContext<Self>,
902 ) -> Option<Task<Result<Entry>>> {
903 let worktree = self.worktree_for_entry(entry_id, cx)?;
904 let new_path = new_path.into();
905 if self.is_local() {
906 worktree.update(cx, |worktree, cx| {
907 worktree
908 .as_local_mut()
909 .unwrap()
910 .rename_entry(entry_id, new_path, cx)
911 })
912 } else {
913 let client = self.client.clone();
914 let project_id = self.remote_id().unwrap();
915
916 Some(cx.spawn_weak(|_, mut cx| async move {
917 let response = client
918 .request(proto::RenameProjectEntry {
919 project_id,
920 entry_id: entry_id.to_proto(),
921 new_path: new_path.to_string_lossy().into(),
922 })
923 .await?;
924 let entry = response
925 .entry
926 .ok_or_else(|| anyhow!("missing entry in response"))?;
927 worktree
928 .update(&mut cx, |worktree, cx| {
929 worktree.as_remote_mut().unwrap().insert_entry(
930 entry,
931 response.worktree_scan_id as usize,
932 cx,
933 )
934 })
935 .await
936 }))
937 }
938 }
939
940 pub fn delete_entry(
941 &mut self,
942 entry_id: ProjectEntryId,
943 cx: &mut ModelContext<Self>,
944 ) -> Option<Task<Result<()>>> {
945 let worktree = self.worktree_for_entry(entry_id, cx)?;
946 if self.is_local() {
947 worktree.update(cx, |worktree, cx| {
948 worktree.as_local_mut().unwrap().delete_entry(entry_id, cx)
949 })
950 } else {
951 let client = self.client.clone();
952 let project_id = self.remote_id().unwrap();
953 Some(cx.spawn_weak(|_, mut cx| async move {
954 let response = client
955 .request(proto::DeleteProjectEntry {
956 project_id,
957 entry_id: entry_id.to_proto(),
958 })
959 .await?;
960 worktree
961 .update(&mut cx, move |worktree, cx| {
962 worktree.as_remote_mut().unwrap().delete_entry(
963 entry_id,
964 response.worktree_scan_id as usize,
965 cx,
966 )
967 })
968 .await
969 }))
970 }
971 }
972
973 pub fn shared(&mut self, project_id: u64, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
974 if self.client_state.is_some() {
975 return Task::ready(Err(anyhow!("project was already shared")));
976 }
977
978 let mut worktree_share_tasks = Vec::new();
979
980 for open_buffer in self.opened_buffers.values_mut() {
981 match open_buffer {
982 OpenBuffer::Strong(_) => {}
983 OpenBuffer::Weak(buffer) => {
984 if let Some(buffer) = buffer.upgrade(cx) {
985 *open_buffer = OpenBuffer::Strong(buffer);
986 }
987 }
988 OpenBuffer::Operations(_) => unreachable!(),
989 }
990 }
991
992 for worktree_handle in self.worktrees.iter_mut() {
993 match worktree_handle {
994 WorktreeHandle::Strong(_) => {}
995 WorktreeHandle::Weak(worktree) => {
996 if let Some(worktree) = worktree.upgrade(cx) {
997 *worktree_handle = WorktreeHandle::Strong(worktree);
998 }
999 }
1000 }
1001 }
1002
1003 for (server_id, status) in &self.language_server_statuses {
1004 self.client
1005 .send(proto::StartLanguageServer {
1006 project_id,
1007 server: Some(proto::LanguageServer {
1008 id: *server_id as u64,
1009 name: status.name.clone(),
1010 }),
1011 })
1012 .log_err();
1013 }
1014
1015 for worktree in self.worktrees(cx).collect::<Vec<_>>() {
1016 worktree.update(cx, |worktree, cx| {
1017 let worktree = worktree.as_local_mut().unwrap();
1018 worktree_share_tasks.push(worktree.share(project_id, cx));
1019 });
1020 }
1021
1022 self.client_subscriptions.push(
1023 self.client
1024 .subscribe_to_entity(project_id)
1025 .set_model(&cx.handle(), &mut cx.to_async()),
1026 );
1027 let _ = self.metadata_changed(cx);
1028 cx.emit(Event::RemoteIdChanged(Some(project_id)));
1029 cx.notify();
1030
1031 let mut status = self.client.status();
1032 let (metadata_changed_tx, mut metadata_changed_rx) = mpsc::unbounded();
1033 self.client_state = Some(ProjectClientState::Local {
1034 remote_id: project_id,
1035 metadata_changed: metadata_changed_tx,
1036 _maintain_metadata: cx.spawn_weak(move |this, cx| async move {
1037 let mut txs = Vec::new();
1038 loop {
1039 select_biased! {
1040 tx = metadata_changed_rx.next().fuse() => {
1041 let Some(tx) = tx else { break };
1042 txs.push(tx);
1043 while let Ok(Some(next_tx)) = metadata_changed_rx.try_next() {
1044 txs.push(next_tx);
1045 }
1046 }
1047 status = status.next().fuse() => {
1048 let Some(status) = status else { break };
1049 if !status.is_connected() {
1050 continue
1051 }
1052 }
1053 }
1054
1055 let Some(this) = this.upgrade(&cx) else { break };
1056 this.read_with(&cx, |this, cx| {
1057 this.client.request(proto::UpdateProject {
1058 project_id,
1059 worktrees: this.worktree_metadata_protos(cx),
1060 })
1061 })
1062 .await
1063 .log_err();
1064
1065 for tx in txs.drain(..) {
1066 let _ = tx.send(());
1067 }
1068 }
1069 }),
1070 });
1071
1072 cx.foreground().spawn(async move {
1073 futures::future::try_join_all(worktree_share_tasks).await?;
1074 Ok(())
1075 })
1076 }
1077
1078 pub fn reshared(
1079 &mut self,
1080 message: proto::ResharedProject,
1081 cx: &mut ModelContext<Self>,
1082 ) -> Result<()> {
1083 self.set_collaborators_from_proto(message.collaborators, cx)?;
1084 Ok(())
1085 }
1086
1087 pub fn rejoined(
1088 &mut self,
1089 message: proto::RejoinedProject,
1090 cx: &mut ModelContext<Self>,
1091 ) -> Result<()> {
1092 self.set_collaborators_from_proto(message.collaborators, cx)?;
1093 Ok(())
1094 }
1095
1096 pub fn worktree_metadata_protos(&self, cx: &AppContext) -> Vec<proto::WorktreeMetadata> {
1097 self.worktrees(cx)
1098 .map(|worktree| {
1099 let worktree = worktree.read(cx);
1100 proto::WorktreeMetadata {
1101 id: worktree.id().to_proto(),
1102 root_name: worktree.root_name().into(),
1103 visible: worktree.is_visible(),
1104 abs_path: worktree.abs_path().to_string_lossy().into(),
1105 }
1106 })
1107 .collect()
1108 }
1109
1110 pub fn unshare(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1111 if self.is_remote() {
1112 return Err(anyhow!("attempted to unshare a remote project"));
1113 }
1114
1115 if let Some(ProjectClientState::Local { remote_id, .. }) = self.client_state.take() {
1116 self.collaborators.clear();
1117 self.shared_buffers.clear();
1118 self.client_subscriptions.clear();
1119
1120 for worktree_handle in self.worktrees.iter_mut() {
1121 if let WorktreeHandle::Strong(worktree) = worktree_handle {
1122 let is_visible = worktree.update(cx, |worktree, _| {
1123 worktree.as_local_mut().unwrap().unshare();
1124 worktree.is_visible()
1125 });
1126 if !is_visible {
1127 *worktree_handle = WorktreeHandle::Weak(worktree.downgrade());
1128 }
1129 }
1130 }
1131
1132 for open_buffer in self.opened_buffers.values_mut() {
1133 if let OpenBuffer::Strong(buffer) = open_buffer {
1134 *open_buffer = OpenBuffer::Weak(buffer.downgrade());
1135 }
1136 }
1137
1138 let _ = self.metadata_changed(cx);
1139 cx.notify();
1140 self.client.send(proto::UnshareProject {
1141 project_id: remote_id,
1142 })?;
1143
1144 Ok(())
1145 } else {
1146 Err(anyhow!("attempted to unshare an unshared project"))
1147 }
1148 }
1149
1150 fn disconnected_from_host(&mut self, cx: &mut ModelContext<Self>) {
1151 if let Some(ProjectClientState::Remote {
1152 sharing_has_stopped,
1153 ..
1154 }) = &mut self.client_state
1155 {
1156 *sharing_has_stopped = true;
1157 self.collaborators.clear();
1158 for worktree in &self.worktrees {
1159 if let Some(worktree) = worktree.upgrade(cx) {
1160 worktree.update(cx, |worktree, _| {
1161 if let Some(worktree) = worktree.as_remote_mut() {
1162 worktree.disconnected_from_host();
1163 }
1164 });
1165 }
1166 }
1167 cx.emit(Event::DisconnectedFromHost);
1168 cx.notify();
1169
1170 // Wake up all futures currently waiting on a buffer to get opened,
1171 // to give them a chance to fail now that we've disconnected.
1172 *self.opened_buffer.0.borrow_mut() = ();
1173 }
1174 }
1175
1176 pub fn is_read_only(&self) -> bool {
1177 match &self.client_state {
1178 Some(ProjectClientState::Remote {
1179 sharing_has_stopped,
1180 ..
1181 }) => *sharing_has_stopped,
1182 _ => false,
1183 }
1184 }
1185
1186 pub fn is_local(&self) -> bool {
1187 match &self.client_state {
1188 Some(ProjectClientState::Remote { .. }) => false,
1189 _ => true,
1190 }
1191 }
1192
1193 pub fn is_remote(&self) -> bool {
1194 !self.is_local()
1195 }
1196
1197 pub fn create_terminal(
1198 &mut self,
1199 working_directory: Option<PathBuf>,
1200 window_id: usize,
1201 cx: &mut ModelContext<Self>,
1202 ) -> Result<ModelHandle<Terminal>> {
1203 if self.is_remote() {
1204 return Err(anyhow!(
1205 "creating terminals as a guest is not supported yet"
1206 ));
1207 } else {
1208 let settings = cx.global::<Settings>();
1209 let shell = settings.terminal_shell();
1210 let envs = settings.terminal_env();
1211 let scroll = settings.terminal_scroll();
1212
1213 TerminalBuilder::new(
1214 working_directory.clone(),
1215 shell,
1216 envs,
1217 settings.terminal_overrides.blinking.clone(),
1218 scroll,
1219 window_id,
1220 )
1221 .map(|builder| cx.add_model(|cx| builder.subscribe(cx)))
1222 }
1223 }
1224
1225 pub fn create_buffer(
1226 &mut self,
1227 text: &str,
1228 language: Option<Arc<Language>>,
1229 cx: &mut ModelContext<Self>,
1230 ) -> Result<ModelHandle<Buffer>> {
1231 if self.is_remote() {
1232 return Err(anyhow!("creating buffers as a guest is not supported yet"));
1233 }
1234
1235 let buffer = cx.add_model(|cx| {
1236 Buffer::new(self.replica_id(), text, cx)
1237 .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
1238 });
1239 self.register_buffer(&buffer, cx)?;
1240 Ok(buffer)
1241 }
1242
1243 pub fn open_path(
1244 &mut self,
1245 path: impl Into<ProjectPath>,
1246 cx: &mut ModelContext<Self>,
1247 ) -> Task<Result<(ProjectEntryId, AnyModelHandle)>> {
1248 let task = self.open_buffer(path, cx);
1249 cx.spawn_weak(|_, cx| async move {
1250 let buffer = task.await?;
1251 let project_entry_id = buffer
1252 .read_with(&cx, |buffer, cx| {
1253 File::from_dyn(buffer.file()).and_then(|file| file.project_entry_id(cx))
1254 })
1255 .ok_or_else(|| anyhow!("no project entry"))?;
1256 Ok((project_entry_id, buffer.into()))
1257 })
1258 }
1259
1260 pub fn open_local_buffer(
1261 &mut self,
1262 abs_path: impl AsRef<Path>,
1263 cx: &mut ModelContext<Self>,
1264 ) -> Task<Result<ModelHandle<Buffer>>> {
1265 if let Some((worktree, relative_path)) = self.find_local_worktree(abs_path.as_ref(), cx) {
1266 self.open_buffer((worktree.read(cx).id(), relative_path), cx)
1267 } else {
1268 Task::ready(Err(anyhow!("no such path")))
1269 }
1270 }
1271
1272 pub fn open_buffer(
1273 &mut self,
1274 path: impl Into<ProjectPath>,
1275 cx: &mut ModelContext<Self>,
1276 ) -> Task<Result<ModelHandle<Buffer>>> {
1277 let project_path = path.into();
1278 let worktree = if let Some(worktree) = self.worktree_for_id(project_path.worktree_id, cx) {
1279 worktree
1280 } else {
1281 return Task::ready(Err(anyhow!("no such worktree")));
1282 };
1283
1284 // If there is already a buffer for the given path, then return it.
1285 let existing_buffer = self.get_open_buffer(&project_path, cx);
1286 if let Some(existing_buffer) = existing_buffer {
1287 return Task::ready(Ok(existing_buffer));
1288 }
1289
1290 let mut loading_watch = match self.loading_buffers.entry(project_path.clone()) {
1291 // If the given path is already being loaded, then wait for that existing
1292 // task to complete and return the same buffer.
1293 hash_map::Entry::Occupied(e) => e.get().clone(),
1294
1295 // Otherwise, record the fact that this path is now being loaded.
1296 hash_map::Entry::Vacant(entry) => {
1297 let (mut tx, rx) = postage::watch::channel();
1298 entry.insert(rx.clone());
1299
1300 let load_buffer = if worktree.read(cx).is_local() {
1301 self.open_local_buffer_internal(&project_path.path, &worktree, cx)
1302 } else {
1303 self.open_remote_buffer_internal(&project_path.path, &worktree, cx)
1304 };
1305
1306 cx.spawn(move |this, mut cx| async move {
1307 let load_result = load_buffer.await;
1308 *tx.borrow_mut() = Some(this.update(&mut cx, |this, _| {
1309 // Record the fact that the buffer is no longer loading.
1310 this.loading_buffers.remove(&project_path);
1311 let buffer = load_result.map_err(Arc::new)?;
1312 Ok(buffer)
1313 }));
1314 })
1315 .detach();
1316 rx
1317 }
1318 };
1319
1320 cx.foreground().spawn(async move {
1321 loop {
1322 if let Some(result) = loading_watch.borrow().as_ref() {
1323 match result {
1324 Ok(buffer) => return Ok(buffer.clone()),
1325 Err(error) => return Err(anyhow!("{}", error)),
1326 }
1327 }
1328 loading_watch.next().await;
1329 }
1330 })
1331 }
1332
1333 fn open_local_buffer_internal(
1334 &mut self,
1335 path: &Arc<Path>,
1336 worktree: &ModelHandle<Worktree>,
1337 cx: &mut ModelContext<Self>,
1338 ) -> Task<Result<ModelHandle<Buffer>>> {
1339 let load_buffer = worktree.update(cx, |worktree, cx| {
1340 let worktree = worktree.as_local_mut().unwrap();
1341 worktree.load_buffer(path, cx)
1342 });
1343 cx.spawn(|this, mut cx| async move {
1344 let buffer = load_buffer.await?;
1345 this.update(&mut cx, |this, cx| this.register_buffer(&buffer, cx))?;
1346 Ok(buffer)
1347 })
1348 }
1349
1350 fn open_remote_buffer_internal(
1351 &mut self,
1352 path: &Arc<Path>,
1353 worktree: &ModelHandle<Worktree>,
1354 cx: &mut ModelContext<Self>,
1355 ) -> Task<Result<ModelHandle<Buffer>>> {
1356 let rpc = self.client.clone();
1357 let project_id = self.remote_id().unwrap();
1358 let remote_worktree_id = worktree.read(cx).id();
1359 let path = path.clone();
1360 let path_string = path.to_string_lossy().to_string();
1361 cx.spawn(|this, mut cx| async move {
1362 let response = rpc
1363 .request(proto::OpenBufferByPath {
1364 project_id,
1365 worktree_id: remote_worktree_id.to_proto(),
1366 path: path_string,
1367 })
1368 .await?;
1369 this.update(&mut cx, |this, cx| {
1370 this.wait_for_buffer(response.buffer_id, cx)
1371 })
1372 .await
1373 })
1374 }
1375
1376 /// LanguageServerName is owned, because it is inserted into a map
1377 fn open_local_buffer_via_lsp(
1378 &mut self,
1379 abs_path: lsp::Url,
1380 language_server_id: usize,
1381 language_server_name: LanguageServerName,
1382 cx: &mut ModelContext<Self>,
1383 ) -> Task<Result<ModelHandle<Buffer>>> {
1384 cx.spawn(|this, mut cx| async move {
1385 let abs_path = abs_path
1386 .to_file_path()
1387 .map_err(|_| anyhow!("can't convert URI to path"))?;
1388 let (worktree, relative_path) = if let Some(result) =
1389 this.read_with(&cx, |this, cx| this.find_local_worktree(&abs_path, cx))
1390 {
1391 result
1392 } else {
1393 let worktree = this
1394 .update(&mut cx, |this, cx| {
1395 this.create_local_worktree(&abs_path, false, cx)
1396 })
1397 .await?;
1398 this.update(&mut cx, |this, cx| {
1399 this.language_server_ids.insert(
1400 (worktree.read(cx).id(), language_server_name),
1401 language_server_id,
1402 );
1403 });
1404 (worktree, PathBuf::new())
1405 };
1406
1407 let project_path = ProjectPath {
1408 worktree_id: worktree.read_with(&cx, |worktree, _| worktree.id()),
1409 path: relative_path.into(),
1410 };
1411 this.update(&mut cx, |this, cx| this.open_buffer(project_path, cx))
1412 .await
1413 })
1414 }
1415
1416 pub fn open_buffer_by_id(
1417 &mut self,
1418 id: u64,
1419 cx: &mut ModelContext<Self>,
1420 ) -> Task<Result<ModelHandle<Buffer>>> {
1421 if let Some(buffer) = self.buffer_for_id(id, cx) {
1422 Task::ready(Ok(buffer))
1423 } else if self.is_local() {
1424 Task::ready(Err(anyhow!("buffer {} does not exist", id)))
1425 } else if let Some(project_id) = self.remote_id() {
1426 let request = self
1427 .client
1428 .request(proto::OpenBufferById { project_id, id });
1429 cx.spawn(|this, mut cx| async move {
1430 let buffer_id = request.await?.buffer_id;
1431 this.update(&mut cx, |this, cx| this.wait_for_buffer(buffer_id, cx))
1432 .await
1433 })
1434 } else {
1435 Task::ready(Err(anyhow!("cannot open buffer while disconnected")))
1436 }
1437 }
1438
1439 pub fn save_buffer_as(
1440 &mut self,
1441 buffer: ModelHandle<Buffer>,
1442 abs_path: PathBuf,
1443 cx: &mut ModelContext<Project>,
1444 ) -> Task<Result<()>> {
1445 let worktree_task = self.find_or_create_local_worktree(&abs_path, true, cx);
1446 let old_path =
1447 File::from_dyn(buffer.read(cx).file()).and_then(|f| Some(f.as_local()?.abs_path(cx)));
1448 cx.spawn(|this, mut cx| async move {
1449 if let Some(old_path) = old_path {
1450 this.update(&mut cx, |this, cx| {
1451 this.unregister_buffer_from_language_server(&buffer, old_path, cx);
1452 });
1453 }
1454 let (worktree, path) = worktree_task.await?;
1455 worktree
1456 .update(&mut cx, |worktree, cx| {
1457 worktree
1458 .as_local_mut()
1459 .unwrap()
1460 .save_buffer_as(buffer.clone(), path, cx)
1461 })
1462 .await?;
1463 this.update(&mut cx, |this, cx| {
1464 this.assign_language_to_buffer(&buffer, cx);
1465 this.register_buffer_with_language_server(&buffer, cx);
1466 });
1467 Ok(())
1468 })
1469 }
1470
1471 pub fn get_open_buffer(
1472 &mut self,
1473 path: &ProjectPath,
1474 cx: &mut ModelContext<Self>,
1475 ) -> Option<ModelHandle<Buffer>> {
1476 let worktree = self.worktree_for_id(path.worktree_id, cx)?;
1477 self.opened_buffers.values().find_map(|buffer| {
1478 let buffer = buffer.upgrade(cx)?;
1479 let file = File::from_dyn(buffer.read(cx).file())?;
1480 if file.worktree == worktree && file.path() == &path.path {
1481 Some(buffer)
1482 } else {
1483 None
1484 }
1485 })
1486 }
1487
1488 fn register_buffer(
1489 &mut self,
1490 buffer: &ModelHandle<Buffer>,
1491 cx: &mut ModelContext<Self>,
1492 ) -> Result<()> {
1493 let remote_id = buffer.read(cx).remote_id();
1494 let open_buffer = if self.is_remote() || self.is_shared() {
1495 OpenBuffer::Strong(buffer.clone())
1496 } else {
1497 OpenBuffer::Weak(buffer.downgrade())
1498 };
1499
1500 match self.opened_buffers.insert(remote_id, open_buffer) {
1501 None => {}
1502 Some(OpenBuffer::Operations(operations)) => {
1503 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))?
1504 }
1505 Some(OpenBuffer::Weak(existing_handle)) => {
1506 if existing_handle.upgrade(cx).is_some() {
1507 Err(anyhow!(
1508 "already registered buffer with remote id {}",
1509 remote_id
1510 ))?
1511 }
1512 }
1513 Some(OpenBuffer::Strong(_)) => Err(anyhow!(
1514 "already registered buffer with remote id {}",
1515 remote_id
1516 ))?,
1517 }
1518 cx.subscribe(buffer, |this, buffer, event, cx| {
1519 this.on_buffer_event(buffer, event, cx);
1520 })
1521 .detach();
1522
1523 self.assign_language_to_buffer(buffer, cx);
1524 self.register_buffer_with_language_server(buffer, cx);
1525 cx.observe_release(buffer, |this, buffer, cx| {
1526 if let Some(file) = File::from_dyn(buffer.file()) {
1527 if file.is_local() {
1528 let uri = lsp::Url::from_file_path(file.abs_path(cx)).unwrap();
1529 if let Some((_, server)) = this.language_server_for_buffer(buffer, cx) {
1530 server
1531 .notify::<lsp::notification::DidCloseTextDocument>(
1532 lsp::DidCloseTextDocumentParams {
1533 text_document: lsp::TextDocumentIdentifier::new(uri),
1534 },
1535 )
1536 .log_err();
1537 }
1538 }
1539 }
1540 })
1541 .detach();
1542
1543 *self.opened_buffer.0.borrow_mut() = ();
1544 Ok(())
1545 }
1546
1547 fn register_buffer_with_language_server(
1548 &mut self,
1549 buffer_handle: &ModelHandle<Buffer>,
1550 cx: &mut ModelContext<Self>,
1551 ) {
1552 let buffer = buffer_handle.read(cx);
1553 let buffer_id = buffer.remote_id();
1554 if let Some(file) = File::from_dyn(buffer.file()) {
1555 if file.is_local() {
1556 let uri = lsp::Url::from_file_path(file.abs_path(cx)).unwrap();
1557 let initial_snapshot = buffer.text_snapshot();
1558
1559 let mut language_server = None;
1560 let mut language_id = None;
1561 if let Some(language) = buffer.language() {
1562 let worktree_id = file.worktree_id(cx);
1563 if let Some(adapter) = language.lsp_adapter() {
1564 language_id = adapter.language_ids.get(language.name().as_ref()).cloned();
1565 language_server = self
1566 .language_server_ids
1567 .get(&(worktree_id, adapter.name.clone()))
1568 .and_then(|id| self.language_servers.get(id))
1569 .and_then(|server_state| {
1570 if let LanguageServerState::Running { server, .. } = server_state {
1571 Some(server.clone())
1572 } else {
1573 None
1574 }
1575 });
1576 }
1577 }
1578
1579 if let Some(local_worktree) = file.worktree.read(cx).as_local() {
1580 if let Some(diagnostics) = local_worktree.diagnostics_for_path(file.path()) {
1581 self.update_buffer_diagnostics(buffer_handle, diagnostics, None, cx)
1582 .log_err();
1583 }
1584 }
1585
1586 if let Some(server) = language_server {
1587 server
1588 .notify::<lsp::notification::DidOpenTextDocument>(
1589 lsp::DidOpenTextDocumentParams {
1590 text_document: lsp::TextDocumentItem::new(
1591 uri,
1592 language_id.unwrap_or_default(),
1593 0,
1594 initial_snapshot.text(),
1595 ),
1596 },
1597 )
1598 .log_err();
1599 buffer_handle.update(cx, |buffer, cx| {
1600 buffer.set_completion_triggers(
1601 server
1602 .capabilities()
1603 .completion_provider
1604 .as_ref()
1605 .and_then(|provider| provider.trigger_characters.clone())
1606 .unwrap_or_default(),
1607 cx,
1608 )
1609 });
1610 self.buffer_snapshots
1611 .insert(buffer_id, vec![(0, initial_snapshot)]);
1612 }
1613 }
1614 }
1615 }
1616
1617 fn unregister_buffer_from_language_server(
1618 &mut self,
1619 buffer: &ModelHandle<Buffer>,
1620 old_path: PathBuf,
1621 cx: &mut ModelContext<Self>,
1622 ) {
1623 buffer.update(cx, |buffer, cx| {
1624 buffer.update_diagnostics(Default::default(), cx);
1625 self.buffer_snapshots.remove(&buffer.remote_id());
1626 if let Some((_, language_server)) = self.language_server_for_buffer(buffer, cx) {
1627 language_server
1628 .notify::<lsp::notification::DidCloseTextDocument>(
1629 lsp::DidCloseTextDocumentParams {
1630 text_document: lsp::TextDocumentIdentifier::new(
1631 lsp::Url::from_file_path(old_path).unwrap(),
1632 ),
1633 },
1634 )
1635 .log_err();
1636 }
1637 });
1638 }
1639
1640 fn on_buffer_event(
1641 &mut self,
1642 buffer: ModelHandle<Buffer>,
1643 event: &BufferEvent,
1644 cx: &mut ModelContext<Self>,
1645 ) -> Option<()> {
1646 match event {
1647 BufferEvent::Operation(operation) => {
1648 if let Some(project_id) = self.remote_id() {
1649 let request = self.client.request(proto::UpdateBuffer {
1650 project_id,
1651 buffer_id: buffer.read(cx).remote_id(),
1652 operations: vec![language::proto::serialize_operation(operation)],
1653 });
1654 cx.background().spawn(request).detach_and_log_err(cx);
1655 }
1656 }
1657 BufferEvent::Edited { .. } => {
1658 let language_server = self
1659 .language_server_for_buffer(buffer.read(cx), cx)
1660 .map(|(_, server)| server.clone())?;
1661 let buffer = buffer.read(cx);
1662 let file = File::from_dyn(buffer.file())?;
1663 let abs_path = file.as_local()?.abs_path(cx);
1664 let uri = lsp::Url::from_file_path(abs_path).unwrap();
1665 let buffer_snapshots = self.buffer_snapshots.get_mut(&buffer.remote_id())?;
1666 let (version, prev_snapshot) = buffer_snapshots.last()?;
1667 let next_snapshot = buffer.text_snapshot();
1668 let next_version = version + 1;
1669
1670 let content_changes = buffer
1671 .edits_since::<(PointUtf16, usize)>(prev_snapshot.version())
1672 .map(|edit| {
1673 let edit_start = edit.new.start.0;
1674 let edit_end = edit_start + (edit.old.end.0 - edit.old.start.0);
1675 let new_text = next_snapshot
1676 .text_for_range(edit.new.start.1..edit.new.end.1)
1677 .collect();
1678 lsp::TextDocumentContentChangeEvent {
1679 range: Some(lsp::Range::new(
1680 point_to_lsp(edit_start),
1681 point_to_lsp(edit_end),
1682 )),
1683 range_length: None,
1684 text: new_text,
1685 }
1686 })
1687 .collect();
1688
1689 buffer_snapshots.push((next_version, next_snapshot));
1690
1691 language_server
1692 .notify::<lsp::notification::DidChangeTextDocument>(
1693 lsp::DidChangeTextDocumentParams {
1694 text_document: lsp::VersionedTextDocumentIdentifier::new(
1695 uri,
1696 next_version,
1697 ),
1698 content_changes,
1699 },
1700 )
1701 .log_err();
1702 }
1703 BufferEvent::Saved => {
1704 let file = File::from_dyn(buffer.read(cx).file())?;
1705 let worktree_id = file.worktree_id(cx);
1706 let abs_path = file.as_local()?.abs_path(cx);
1707 let text_document = lsp::TextDocumentIdentifier {
1708 uri: lsp::Url::from_file_path(abs_path).unwrap(),
1709 };
1710
1711 for (_, _, server) in self.language_servers_for_worktree(worktree_id) {
1712 server
1713 .notify::<lsp::notification::DidSaveTextDocument>(
1714 lsp::DidSaveTextDocumentParams {
1715 text_document: text_document.clone(),
1716 text: None,
1717 },
1718 )
1719 .log_err();
1720 }
1721
1722 // After saving a buffer, simulate disk-based diagnostics being finished for languages
1723 // that don't support a disk-based progress token.
1724 let (lsp_adapter, language_server) =
1725 self.language_server_for_buffer(buffer.read(cx), cx)?;
1726 if lsp_adapter.disk_based_diagnostics_progress_token.is_none() {
1727 let server_id = language_server.server_id();
1728 self.disk_based_diagnostics_finished(server_id, cx);
1729 self.broadcast_language_server_update(
1730 server_id,
1731 proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
1732 proto::LspDiskBasedDiagnosticsUpdated {},
1733 ),
1734 );
1735 }
1736 }
1737 _ => {}
1738 }
1739
1740 None
1741 }
1742
1743 fn language_servers_for_worktree(
1744 &self,
1745 worktree_id: WorktreeId,
1746 ) -> impl Iterator<Item = (&Arc<CachedLspAdapter>, &Arc<Language>, &Arc<LanguageServer>)> {
1747 self.language_server_ids
1748 .iter()
1749 .filter_map(move |((language_server_worktree_id, _), id)| {
1750 if *language_server_worktree_id == worktree_id {
1751 if let Some(LanguageServerState::Running {
1752 adapter,
1753 language,
1754 server,
1755 }) = self.language_servers.get(id)
1756 {
1757 return Some((adapter, language, server));
1758 }
1759 }
1760 None
1761 })
1762 }
1763
1764 fn maintain_buffer_languages(
1765 languages: &LanguageRegistry,
1766 cx: &mut ModelContext<Project>,
1767 ) -> Task<()> {
1768 let mut subscription = languages.subscribe();
1769 cx.spawn_weak(|project, mut cx| async move {
1770 while let Some(()) = subscription.next().await {
1771 if let Some(project) = project.upgrade(&cx) {
1772 project.update(&mut cx, |project, cx| {
1773 let mut buffers_without_language = Vec::new();
1774 for buffer in project.opened_buffers.values() {
1775 if let Some(buffer) = buffer.upgrade(cx) {
1776 if buffer.read(cx).language().is_none() {
1777 buffers_without_language.push(buffer);
1778 }
1779 }
1780 }
1781
1782 for buffer in buffers_without_language {
1783 project.assign_language_to_buffer(&buffer, cx);
1784 project.register_buffer_with_language_server(&buffer, cx);
1785 }
1786 });
1787 }
1788 }
1789 })
1790 }
1791
1792 fn assign_language_to_buffer(
1793 &mut self,
1794 buffer: &ModelHandle<Buffer>,
1795 cx: &mut ModelContext<Self>,
1796 ) -> Option<()> {
1797 // If the buffer has a language, set it and start the language server if we haven't already.
1798 let full_path = buffer.read(cx).file()?.full_path(cx);
1799 let new_language = self.languages.select_language(&full_path)?;
1800 buffer.update(cx, |buffer, cx| {
1801 if buffer.language().map_or(true, |old_language| {
1802 !Arc::ptr_eq(old_language, &new_language)
1803 }) {
1804 buffer.set_language_registry(self.languages.clone());
1805 buffer.set_language(Some(new_language.clone()), cx);
1806 }
1807 });
1808
1809 let file = File::from_dyn(buffer.read(cx).file())?;
1810 let worktree = file.worktree.read(cx).as_local()?;
1811 let worktree_id = worktree.id();
1812 let worktree_abs_path = worktree.abs_path().clone();
1813 self.start_language_server(worktree_id, worktree_abs_path, new_language, cx);
1814
1815 None
1816 }
1817
1818 fn merge_json_value_into(source: serde_json::Value, target: &mut serde_json::Value) {
1819 use serde_json::Value;
1820
1821 match (source, target) {
1822 (Value::Object(source), Value::Object(target)) => {
1823 for (key, value) in source {
1824 if let Some(target) = target.get_mut(&key) {
1825 Self::merge_json_value_into(value, target);
1826 } else {
1827 target.insert(key.clone(), value);
1828 }
1829 }
1830 }
1831
1832 (source, target) => *target = source,
1833 }
1834 }
1835
1836 fn start_language_server(
1837 &mut self,
1838 worktree_id: WorktreeId,
1839 worktree_path: Arc<Path>,
1840 language: Arc<Language>,
1841 cx: &mut ModelContext<Self>,
1842 ) {
1843 if !cx
1844 .global::<Settings>()
1845 .enable_language_server(Some(&language.name()))
1846 {
1847 return;
1848 }
1849
1850 let adapter = if let Some(adapter) = language.lsp_adapter() {
1851 adapter
1852 } else {
1853 return;
1854 };
1855 let key = (worktree_id, adapter.name.clone());
1856
1857 let mut initialization_options = adapter.initialization_options.clone();
1858
1859 let lsp = &cx.global::<Settings>().lsp.get(&adapter.name.0);
1860 let override_options = lsp.map(|s| s.initialization_options.clone()).flatten();
1861 match (&mut initialization_options, override_options) {
1862 (Some(initialization_options), Some(override_options)) => {
1863 Self::merge_json_value_into(override_options, initialization_options);
1864 }
1865
1866 (None, override_options) => initialization_options = override_options,
1867
1868 _ => {}
1869 }
1870
1871 self.language_server_ids
1872 .entry(key.clone())
1873 .or_insert_with(|| {
1874 let server_id = post_inc(&mut self.next_language_server_id);
1875 let language_server = self.languages.start_language_server(
1876 server_id,
1877 language.clone(),
1878 worktree_path,
1879 self.client.http_client(),
1880 cx,
1881 );
1882 self.language_servers.insert(
1883 server_id,
1884 LanguageServerState::Starting(cx.spawn_weak(|this, mut cx| async move {
1885 let language_server = language_server?.await.log_err()?;
1886 let language_server = language_server
1887 .initialize(initialization_options)
1888 .await
1889 .log_err()?;
1890 let this = this.upgrade(&cx)?;
1891
1892 language_server
1893 .on_notification::<lsp::notification::PublishDiagnostics, _>({
1894 let this = this.downgrade();
1895 let adapter = adapter.clone();
1896 move |mut params, cx| {
1897 let this = this;
1898 let adapter = adapter.clone();
1899 cx.spawn(|mut cx| async move {
1900 adapter.process_diagnostics(&mut params).await;
1901 if let Some(this) = this.upgrade(&cx) {
1902 this.update(&mut cx, |this, cx| {
1903 this.update_diagnostics(
1904 server_id,
1905 params,
1906 &adapter.disk_based_diagnostic_sources,
1907 cx,
1908 )
1909 .log_err();
1910 });
1911 }
1912 })
1913 .detach();
1914 }
1915 })
1916 .detach();
1917
1918 language_server
1919 .on_request::<lsp::request::WorkspaceConfiguration, _, _>({
1920 let settings = this.read_with(&cx, |this, _| {
1921 this.language_server_settings.clone()
1922 });
1923 move |params, _| {
1924 let settings = settings.lock().clone();
1925 async move {
1926 Ok(params
1927 .items
1928 .into_iter()
1929 .map(|item| {
1930 if let Some(section) = &item.section {
1931 settings
1932 .get(section)
1933 .cloned()
1934 .unwrap_or(serde_json::Value::Null)
1935 } else {
1936 settings.clone()
1937 }
1938 })
1939 .collect())
1940 }
1941 }
1942 })
1943 .detach();
1944
1945 // Even though we don't have handling for these requests, respond to them to
1946 // avoid stalling any language server like `gopls` which waits for a response
1947 // to these requests when initializing.
1948 language_server
1949 .on_request::<lsp::request::WorkDoneProgressCreate, _, _>({
1950 let this = this.downgrade();
1951 move |params, mut cx| async move {
1952 if let Some(this) = this.upgrade(&cx) {
1953 this.update(&mut cx, |this, _| {
1954 if let Some(status) =
1955 this.language_server_statuses.get_mut(&server_id)
1956 {
1957 if let lsp::NumberOrString::String(token) =
1958 params.token
1959 {
1960 status.progress_tokens.insert(token);
1961 }
1962 }
1963 });
1964 }
1965 Ok(())
1966 }
1967 })
1968 .detach();
1969 language_server
1970 .on_request::<lsp::request::RegisterCapability, _, _>(|_, _| async {
1971 Ok(())
1972 })
1973 .detach();
1974
1975 language_server
1976 .on_request::<lsp::request::ApplyWorkspaceEdit, _, _>({
1977 let this = this.downgrade();
1978 let adapter = adapter.clone();
1979 let language_server = language_server.clone();
1980 move |params, cx| {
1981 Self::on_lsp_workspace_edit(
1982 this,
1983 params,
1984 server_id,
1985 adapter.clone(),
1986 language_server.clone(),
1987 cx,
1988 )
1989 }
1990 })
1991 .detach();
1992
1993 let disk_based_diagnostics_progress_token =
1994 adapter.disk_based_diagnostics_progress_token.clone();
1995
1996 language_server
1997 .on_notification::<lsp::notification::Progress, _>({
1998 let this = this.downgrade();
1999 move |params, mut cx| {
2000 if let Some(this) = this.upgrade(&cx) {
2001 this.update(&mut cx, |this, cx| {
2002 this.on_lsp_progress(
2003 params,
2004 server_id,
2005 disk_based_diagnostics_progress_token.clone(),
2006 cx,
2007 );
2008 });
2009 }
2010 }
2011 })
2012 .detach();
2013
2014 this.update(&mut cx, |this, cx| {
2015 // If the language server for this key doesn't match the server id, don't store the
2016 // server. Which will cause it to be dropped, killing the process
2017 if this
2018 .language_server_ids
2019 .get(&key)
2020 .map(|id| id != &server_id)
2021 .unwrap_or(false)
2022 {
2023 return None;
2024 }
2025
2026 // Update language_servers collection with Running variant of LanguageServerState
2027 // indicating that the server is up and running and ready
2028 this.language_servers.insert(
2029 server_id,
2030 LanguageServerState::Running {
2031 adapter: adapter.clone(),
2032 language,
2033 server: language_server.clone(),
2034 },
2035 );
2036 this.language_server_statuses.insert(
2037 server_id,
2038 LanguageServerStatus {
2039 name: language_server.name().to_string(),
2040 pending_work: Default::default(),
2041 has_pending_diagnostic_updates: false,
2042 progress_tokens: Default::default(),
2043 },
2044 );
2045 language_server
2046 .notify::<lsp::notification::DidChangeConfiguration>(
2047 lsp::DidChangeConfigurationParams {
2048 settings: this.language_server_settings.lock().clone(),
2049 },
2050 )
2051 .ok();
2052
2053 if let Some(project_id) = this.remote_id() {
2054 this.client
2055 .send(proto::StartLanguageServer {
2056 project_id,
2057 server: Some(proto::LanguageServer {
2058 id: server_id as u64,
2059 name: language_server.name().to_string(),
2060 }),
2061 })
2062 .log_err();
2063 }
2064
2065 // Tell the language server about every open buffer in the worktree that matches the language.
2066 for buffer in this.opened_buffers.values() {
2067 if let Some(buffer_handle) = buffer.upgrade(cx) {
2068 let buffer = buffer_handle.read(cx);
2069 let file = if let Some(file) = File::from_dyn(buffer.file()) {
2070 file
2071 } else {
2072 continue;
2073 };
2074 let language = if let Some(language) = buffer.language() {
2075 language
2076 } else {
2077 continue;
2078 };
2079 if file.worktree.read(cx).id() != key.0
2080 || language.lsp_adapter().map(|a| a.name.clone())
2081 != Some(key.1.clone())
2082 {
2083 continue;
2084 }
2085
2086 let file = file.as_local()?;
2087 let versions = this
2088 .buffer_snapshots
2089 .entry(buffer.remote_id())
2090 .or_insert_with(|| vec![(0, buffer.text_snapshot())]);
2091 let (version, initial_snapshot) = versions.last().unwrap();
2092 let uri = lsp::Url::from_file_path(file.abs_path(cx)).unwrap();
2093 language_server
2094 .notify::<lsp::notification::DidOpenTextDocument>(
2095 lsp::DidOpenTextDocumentParams {
2096 text_document: lsp::TextDocumentItem::new(
2097 uri,
2098 adapter
2099 .language_ids
2100 .get(language.name().as_ref())
2101 .cloned()
2102 .unwrap_or_default(),
2103 *version,
2104 initial_snapshot.text(),
2105 ),
2106 },
2107 )
2108 .log_err()?;
2109 buffer_handle.update(cx, |buffer, cx| {
2110 buffer.set_completion_triggers(
2111 language_server
2112 .capabilities()
2113 .completion_provider
2114 .as_ref()
2115 .and_then(|provider| {
2116 provider.trigger_characters.clone()
2117 })
2118 .unwrap_or_default(),
2119 cx,
2120 )
2121 });
2122 }
2123 }
2124
2125 cx.notify();
2126 Some(language_server)
2127 })
2128 })),
2129 );
2130
2131 server_id
2132 });
2133 }
2134
2135 // Returns a list of all of the worktrees which no longer have a language server and the root path
2136 // for the stopped server
2137 fn stop_language_server(
2138 &mut self,
2139 worktree_id: WorktreeId,
2140 adapter_name: LanguageServerName,
2141 cx: &mut ModelContext<Self>,
2142 ) -> Task<(Option<PathBuf>, Vec<WorktreeId>)> {
2143 let key = (worktree_id, adapter_name);
2144 if let Some(server_id) = self.language_server_ids.remove(&key) {
2145 // Remove other entries for this language server as well
2146 let mut orphaned_worktrees = vec![worktree_id];
2147 let other_keys = self.language_server_ids.keys().cloned().collect::<Vec<_>>();
2148 for other_key in other_keys {
2149 if self.language_server_ids.get(&other_key) == Some(&server_id) {
2150 self.language_server_ids.remove(&other_key);
2151 orphaned_worktrees.push(other_key.0);
2152 }
2153 }
2154
2155 self.language_server_statuses.remove(&server_id);
2156 cx.notify();
2157
2158 let server_state = self.language_servers.remove(&server_id);
2159 cx.spawn_weak(|this, mut cx| async move {
2160 let mut root_path = None;
2161
2162 let server = match server_state {
2163 Some(LanguageServerState::Starting(started_language_server)) => {
2164 started_language_server.await
2165 }
2166 Some(LanguageServerState::Running { server, .. }) => Some(server),
2167 None => None,
2168 };
2169
2170 if let Some(server) = server {
2171 root_path = Some(server.root_path().clone());
2172 if let Some(shutdown) = server.shutdown() {
2173 shutdown.await;
2174 }
2175 }
2176
2177 if let Some(this) = this.upgrade(&cx) {
2178 this.update(&mut cx, |this, cx| {
2179 this.language_server_statuses.remove(&server_id);
2180 cx.notify();
2181 });
2182 }
2183
2184 (root_path, orphaned_worktrees)
2185 })
2186 } else {
2187 Task::ready((None, Vec::new()))
2188 }
2189 }
2190
2191 pub fn restart_language_servers_for_buffers(
2192 &mut self,
2193 buffers: impl IntoIterator<Item = ModelHandle<Buffer>>,
2194 cx: &mut ModelContext<Self>,
2195 ) -> Option<()> {
2196 let language_server_lookup_info: HashSet<(WorktreeId, Arc<Path>, PathBuf)> = buffers
2197 .into_iter()
2198 .filter_map(|buffer| {
2199 let file = File::from_dyn(buffer.read(cx).file())?;
2200 let worktree = file.worktree.read(cx).as_local()?;
2201 let worktree_id = worktree.id();
2202 let worktree_abs_path = worktree.abs_path().clone();
2203 let full_path = file.full_path(cx);
2204 Some((worktree_id, worktree_abs_path, full_path))
2205 })
2206 .collect();
2207 for (worktree_id, worktree_abs_path, full_path) in language_server_lookup_info {
2208 let language = self.languages.select_language(&full_path)?;
2209 self.restart_language_server(worktree_id, worktree_abs_path, language, cx);
2210 }
2211
2212 None
2213 }
2214
2215 fn restart_language_server(
2216 &mut self,
2217 worktree_id: WorktreeId,
2218 fallback_path: Arc<Path>,
2219 language: Arc<Language>,
2220 cx: &mut ModelContext<Self>,
2221 ) {
2222 let adapter = if let Some(adapter) = language.lsp_adapter() {
2223 adapter
2224 } else {
2225 return;
2226 };
2227
2228 let server_name = adapter.name.clone();
2229 let stop = self.stop_language_server(worktree_id, server_name.clone(), cx);
2230 cx.spawn_weak(|this, mut cx| async move {
2231 let (original_root_path, orphaned_worktrees) = stop.await;
2232 if let Some(this) = this.upgrade(&cx) {
2233 this.update(&mut cx, |this, cx| {
2234 // Attempt to restart using original server path. Fallback to passed in
2235 // path if we could not retrieve the root path
2236 let root_path = original_root_path
2237 .map(|path_buf| Arc::from(path_buf.as_path()))
2238 .unwrap_or(fallback_path);
2239
2240 this.start_language_server(worktree_id, root_path, language, cx);
2241
2242 // Lookup new server id and set it for each of the orphaned worktrees
2243 if let Some(new_server_id) = this
2244 .language_server_ids
2245 .get(&(worktree_id, server_name.clone()))
2246 .cloned()
2247 {
2248 for orphaned_worktree in orphaned_worktrees {
2249 this.language_server_ids
2250 .insert((orphaned_worktree, server_name.clone()), new_server_id);
2251 }
2252 }
2253 });
2254 }
2255 })
2256 .detach();
2257 }
2258
2259 fn on_lsp_progress(
2260 &mut self,
2261 progress: lsp::ProgressParams,
2262 server_id: usize,
2263 disk_based_diagnostics_progress_token: Option<String>,
2264 cx: &mut ModelContext<Self>,
2265 ) {
2266 let token = match progress.token {
2267 lsp::NumberOrString::String(token) => token,
2268 lsp::NumberOrString::Number(token) => {
2269 log::info!("skipping numeric progress token {}", token);
2270 return;
2271 }
2272 };
2273 let lsp::ProgressParamsValue::WorkDone(progress) = progress.value;
2274 let language_server_status =
2275 if let Some(status) = self.language_server_statuses.get_mut(&server_id) {
2276 status
2277 } else {
2278 return;
2279 };
2280
2281 if !language_server_status.progress_tokens.contains(&token) {
2282 return;
2283 }
2284
2285 let is_disk_based_diagnostics_progress = disk_based_diagnostics_progress_token
2286 .as_ref()
2287 .map_or(false, |disk_based_token| {
2288 token.starts_with(disk_based_token)
2289 });
2290
2291 match progress {
2292 lsp::WorkDoneProgress::Begin(report) => {
2293 if is_disk_based_diagnostics_progress {
2294 language_server_status.has_pending_diagnostic_updates = true;
2295 self.disk_based_diagnostics_started(server_id, cx);
2296 self.broadcast_language_server_update(
2297 server_id,
2298 proto::update_language_server::Variant::DiskBasedDiagnosticsUpdating(
2299 proto::LspDiskBasedDiagnosticsUpdating {},
2300 ),
2301 );
2302 } else {
2303 self.on_lsp_work_start(
2304 server_id,
2305 token.clone(),
2306 LanguageServerProgress {
2307 message: report.message.clone(),
2308 percentage: report.percentage.map(|p| p as usize),
2309 last_update_at: Instant::now(),
2310 },
2311 cx,
2312 );
2313 self.broadcast_language_server_update(
2314 server_id,
2315 proto::update_language_server::Variant::WorkStart(proto::LspWorkStart {
2316 token,
2317 message: report.message,
2318 percentage: report.percentage.map(|p| p as u32),
2319 }),
2320 );
2321 }
2322 }
2323 lsp::WorkDoneProgress::Report(report) => {
2324 if !is_disk_based_diagnostics_progress {
2325 self.on_lsp_work_progress(
2326 server_id,
2327 token.clone(),
2328 LanguageServerProgress {
2329 message: report.message.clone(),
2330 percentage: report.percentage.map(|p| p as usize),
2331 last_update_at: Instant::now(),
2332 },
2333 cx,
2334 );
2335 self.broadcast_language_server_update(
2336 server_id,
2337 proto::update_language_server::Variant::WorkProgress(
2338 proto::LspWorkProgress {
2339 token,
2340 message: report.message,
2341 percentage: report.percentage.map(|p| p as u32),
2342 },
2343 ),
2344 );
2345 }
2346 }
2347 lsp::WorkDoneProgress::End(_) => {
2348 language_server_status.progress_tokens.remove(&token);
2349
2350 if is_disk_based_diagnostics_progress {
2351 language_server_status.has_pending_diagnostic_updates = false;
2352 self.disk_based_diagnostics_finished(server_id, cx);
2353 self.broadcast_language_server_update(
2354 server_id,
2355 proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
2356 proto::LspDiskBasedDiagnosticsUpdated {},
2357 ),
2358 );
2359 } else {
2360 self.on_lsp_work_end(server_id, token.clone(), cx);
2361 self.broadcast_language_server_update(
2362 server_id,
2363 proto::update_language_server::Variant::WorkEnd(proto::LspWorkEnd {
2364 token,
2365 }),
2366 );
2367 }
2368 }
2369 }
2370 }
2371
2372 fn on_lsp_work_start(
2373 &mut self,
2374 language_server_id: usize,
2375 token: String,
2376 progress: LanguageServerProgress,
2377 cx: &mut ModelContext<Self>,
2378 ) {
2379 if let Some(status) = self.language_server_statuses.get_mut(&language_server_id) {
2380 status.pending_work.insert(token, progress);
2381 cx.notify();
2382 }
2383 }
2384
2385 fn on_lsp_work_progress(
2386 &mut self,
2387 language_server_id: usize,
2388 token: String,
2389 progress: LanguageServerProgress,
2390 cx: &mut ModelContext<Self>,
2391 ) {
2392 if let Some(status) = self.language_server_statuses.get_mut(&language_server_id) {
2393 let entry = status
2394 .pending_work
2395 .entry(token)
2396 .or_insert(LanguageServerProgress {
2397 message: Default::default(),
2398 percentage: Default::default(),
2399 last_update_at: progress.last_update_at,
2400 });
2401 if progress.message.is_some() {
2402 entry.message = progress.message;
2403 }
2404 if progress.percentage.is_some() {
2405 entry.percentage = progress.percentage;
2406 }
2407 entry.last_update_at = progress.last_update_at;
2408 cx.notify();
2409 }
2410 }
2411
2412 fn on_lsp_work_end(
2413 &mut self,
2414 language_server_id: usize,
2415 token: String,
2416 cx: &mut ModelContext<Self>,
2417 ) {
2418 if let Some(status) = self.language_server_statuses.get_mut(&language_server_id) {
2419 status.pending_work.remove(&token);
2420 cx.notify();
2421 }
2422 }
2423
2424 async fn on_lsp_workspace_edit(
2425 this: WeakModelHandle<Self>,
2426 params: lsp::ApplyWorkspaceEditParams,
2427 server_id: usize,
2428 adapter: Arc<CachedLspAdapter>,
2429 language_server: Arc<LanguageServer>,
2430 mut cx: AsyncAppContext,
2431 ) -> Result<lsp::ApplyWorkspaceEditResponse> {
2432 let this = this
2433 .upgrade(&cx)
2434 .ok_or_else(|| anyhow!("project project closed"))?;
2435 let transaction = Self::deserialize_workspace_edit(
2436 this.clone(),
2437 params.edit,
2438 true,
2439 adapter.clone(),
2440 language_server.clone(),
2441 &mut cx,
2442 )
2443 .await
2444 .log_err();
2445 this.update(&mut cx, |this, _| {
2446 if let Some(transaction) = transaction {
2447 this.last_workspace_edits_by_language_server
2448 .insert(server_id, transaction);
2449 }
2450 });
2451 Ok(lsp::ApplyWorkspaceEditResponse {
2452 applied: true,
2453 failed_change: None,
2454 failure_reason: None,
2455 })
2456 }
2457
2458 fn broadcast_language_server_update(
2459 &self,
2460 language_server_id: usize,
2461 event: proto::update_language_server::Variant,
2462 ) {
2463 if let Some(project_id) = self.remote_id() {
2464 self.client
2465 .send(proto::UpdateLanguageServer {
2466 project_id,
2467 language_server_id: language_server_id as u64,
2468 variant: Some(event),
2469 })
2470 .log_err();
2471 }
2472 }
2473
2474 pub fn set_language_server_settings(&mut self, settings: serde_json::Value) {
2475 for server_state in self.language_servers.values() {
2476 if let LanguageServerState::Running { server, .. } = server_state {
2477 server
2478 .notify::<lsp::notification::DidChangeConfiguration>(
2479 lsp::DidChangeConfigurationParams {
2480 settings: settings.clone(),
2481 },
2482 )
2483 .ok();
2484 }
2485 }
2486 *self.language_server_settings.lock() = settings;
2487 }
2488
2489 pub fn language_server_statuses(
2490 &self,
2491 ) -> impl DoubleEndedIterator<Item = &LanguageServerStatus> {
2492 self.language_server_statuses.values()
2493 }
2494
2495 pub fn update_diagnostics(
2496 &mut self,
2497 language_server_id: usize,
2498 params: lsp::PublishDiagnosticsParams,
2499 disk_based_sources: &[String],
2500 cx: &mut ModelContext<Self>,
2501 ) -> Result<()> {
2502 let abs_path = params
2503 .uri
2504 .to_file_path()
2505 .map_err(|_| anyhow!("URI is not a file"))?;
2506 let mut diagnostics = Vec::default();
2507 let mut primary_diagnostic_group_ids = HashMap::default();
2508 let mut sources_by_group_id = HashMap::default();
2509 let mut supporting_diagnostics = HashMap::default();
2510 for diagnostic in ¶ms.diagnostics {
2511 let source = diagnostic.source.as_ref();
2512 let code = diagnostic.code.as_ref().map(|code| match code {
2513 lsp::NumberOrString::Number(code) => code.to_string(),
2514 lsp::NumberOrString::String(code) => code.clone(),
2515 });
2516 let range = range_from_lsp(diagnostic.range);
2517 let is_supporting = diagnostic
2518 .related_information
2519 .as_ref()
2520 .map_or(false, |infos| {
2521 infos.iter().any(|info| {
2522 primary_diagnostic_group_ids.contains_key(&(
2523 source,
2524 code.clone(),
2525 range_from_lsp(info.location.range),
2526 ))
2527 })
2528 });
2529
2530 let is_unnecessary = diagnostic.tags.as_ref().map_or(false, |tags| {
2531 tags.iter().any(|tag| *tag == DiagnosticTag::UNNECESSARY)
2532 });
2533
2534 if is_supporting {
2535 supporting_diagnostics.insert(
2536 (source, code.clone(), range),
2537 (diagnostic.severity, is_unnecessary),
2538 );
2539 } else {
2540 let group_id = post_inc(&mut self.next_diagnostic_group_id);
2541 let is_disk_based =
2542 source.map_or(false, |source| disk_based_sources.contains(source));
2543
2544 sources_by_group_id.insert(group_id, source);
2545 primary_diagnostic_group_ids
2546 .insert((source, code.clone(), range.clone()), group_id);
2547
2548 diagnostics.push(DiagnosticEntry {
2549 range,
2550 diagnostic: Diagnostic {
2551 code: code.clone(),
2552 severity: diagnostic.severity.unwrap_or(DiagnosticSeverity::ERROR),
2553 message: diagnostic.message.clone(),
2554 group_id,
2555 is_primary: true,
2556 is_valid: true,
2557 is_disk_based,
2558 is_unnecessary,
2559 },
2560 });
2561 if let Some(infos) = &diagnostic.related_information {
2562 for info in infos {
2563 if info.location.uri == params.uri && !info.message.is_empty() {
2564 let range = range_from_lsp(info.location.range);
2565 diagnostics.push(DiagnosticEntry {
2566 range,
2567 diagnostic: Diagnostic {
2568 code: code.clone(),
2569 severity: DiagnosticSeverity::INFORMATION,
2570 message: info.message.clone(),
2571 group_id,
2572 is_primary: false,
2573 is_valid: true,
2574 is_disk_based,
2575 is_unnecessary: false,
2576 },
2577 });
2578 }
2579 }
2580 }
2581 }
2582 }
2583
2584 for entry in &mut diagnostics {
2585 let diagnostic = &mut entry.diagnostic;
2586 if !diagnostic.is_primary {
2587 let source = *sources_by_group_id.get(&diagnostic.group_id).unwrap();
2588 if let Some(&(severity, is_unnecessary)) = supporting_diagnostics.get(&(
2589 source,
2590 diagnostic.code.clone(),
2591 entry.range.clone(),
2592 )) {
2593 if let Some(severity) = severity {
2594 diagnostic.severity = severity;
2595 }
2596 diagnostic.is_unnecessary = is_unnecessary;
2597 }
2598 }
2599 }
2600
2601 self.update_diagnostic_entries(
2602 language_server_id,
2603 abs_path,
2604 params.version,
2605 diagnostics,
2606 cx,
2607 )?;
2608 Ok(())
2609 }
2610
2611 pub fn update_diagnostic_entries(
2612 &mut self,
2613 language_server_id: usize,
2614 abs_path: PathBuf,
2615 version: Option<i32>,
2616 diagnostics: Vec<DiagnosticEntry<Unclipped<PointUtf16>>>,
2617 cx: &mut ModelContext<Project>,
2618 ) -> Result<(), anyhow::Error> {
2619 let (worktree, relative_path) = self
2620 .find_local_worktree(&abs_path, cx)
2621 .ok_or_else(|| anyhow!("no worktree found for diagnostics"))?;
2622
2623 let project_path = ProjectPath {
2624 worktree_id: worktree.read(cx).id(),
2625 path: relative_path.into(),
2626 };
2627 if let Some(buffer) = self.get_open_buffer(&project_path, cx) {
2628 self.update_buffer_diagnostics(&buffer, diagnostics.clone(), version, cx)?;
2629 }
2630
2631 let updated = worktree.update(cx, |worktree, cx| {
2632 worktree
2633 .as_local_mut()
2634 .ok_or_else(|| anyhow!("not a local worktree"))?
2635 .update_diagnostics(
2636 language_server_id,
2637 project_path.path.clone(),
2638 diagnostics,
2639 cx,
2640 )
2641 })?;
2642 if updated {
2643 cx.emit(Event::DiagnosticsUpdated {
2644 language_server_id,
2645 path: project_path,
2646 });
2647 }
2648 Ok(())
2649 }
2650
2651 fn update_buffer_diagnostics(
2652 &mut self,
2653 buffer: &ModelHandle<Buffer>,
2654 mut diagnostics: Vec<DiagnosticEntry<Unclipped<PointUtf16>>>,
2655 version: Option<i32>,
2656 cx: &mut ModelContext<Self>,
2657 ) -> Result<()> {
2658 fn compare_diagnostics(a: &Diagnostic, b: &Diagnostic) -> Ordering {
2659 Ordering::Equal
2660 .then_with(|| b.is_primary.cmp(&a.is_primary))
2661 .then_with(|| a.is_disk_based.cmp(&b.is_disk_based))
2662 .then_with(|| a.severity.cmp(&b.severity))
2663 .then_with(|| a.message.cmp(&b.message))
2664 }
2665
2666 let snapshot = self.buffer_snapshot_for_lsp_version(buffer, version, cx)?;
2667
2668 diagnostics.sort_unstable_by(|a, b| {
2669 Ordering::Equal
2670 .then_with(|| a.range.start.cmp(&b.range.start))
2671 .then_with(|| b.range.end.cmp(&a.range.end))
2672 .then_with(|| compare_diagnostics(&a.diagnostic, &b.diagnostic))
2673 });
2674
2675 let mut sanitized_diagnostics = Vec::new();
2676 let edits_since_save = Patch::new(
2677 snapshot
2678 .edits_since::<Unclipped<PointUtf16>>(buffer.read(cx).saved_version())
2679 .collect(),
2680 );
2681 for entry in diagnostics {
2682 let start;
2683 let end;
2684 if entry.diagnostic.is_disk_based {
2685 // Some diagnostics are based on files on disk instead of buffers'
2686 // current contents. Adjust these diagnostics' ranges to reflect
2687 // any unsaved edits.
2688 start = edits_since_save.old_to_new(entry.range.start);
2689 end = edits_since_save.old_to_new(entry.range.end);
2690 } else {
2691 start = entry.range.start;
2692 end = entry.range.end;
2693 }
2694
2695 let mut range = snapshot.clip_point_utf16(start, Bias::Left)
2696 ..snapshot.clip_point_utf16(end, Bias::Right);
2697
2698 // Expand empty ranges by one codepoint
2699 if range.start == range.end {
2700 // This will be go to the next boundary when being clipped
2701 range.end.column += 1;
2702 range.end = snapshot.clip_point_utf16(Unclipped(range.end), Bias::Right);
2703 if range.start == range.end && range.end.column > 0 {
2704 range.start.column -= 1;
2705 range.end = snapshot.clip_point_utf16(Unclipped(range.end), Bias::Left);
2706 }
2707 }
2708
2709 sanitized_diagnostics.push(DiagnosticEntry {
2710 range,
2711 diagnostic: entry.diagnostic,
2712 });
2713 }
2714 drop(edits_since_save);
2715
2716 let set = DiagnosticSet::new(sanitized_diagnostics, &snapshot);
2717 buffer.update(cx, |buffer, cx| buffer.update_diagnostics(set, cx));
2718 Ok(())
2719 }
2720
2721 pub fn reload_buffers(
2722 &self,
2723 buffers: HashSet<ModelHandle<Buffer>>,
2724 push_to_history: bool,
2725 cx: &mut ModelContext<Self>,
2726 ) -> Task<Result<ProjectTransaction>> {
2727 let mut local_buffers = Vec::new();
2728 let mut remote_buffers = None;
2729 for buffer_handle in buffers {
2730 let buffer = buffer_handle.read(cx);
2731 if buffer.is_dirty() {
2732 if let Some(file) = File::from_dyn(buffer.file()) {
2733 if file.is_local() {
2734 local_buffers.push(buffer_handle);
2735 } else {
2736 remote_buffers.get_or_insert(Vec::new()).push(buffer_handle);
2737 }
2738 }
2739 }
2740 }
2741
2742 let remote_buffers = self.remote_id().zip(remote_buffers);
2743 let client = self.client.clone();
2744
2745 cx.spawn(|this, mut cx| async move {
2746 let mut project_transaction = ProjectTransaction::default();
2747
2748 if let Some((project_id, remote_buffers)) = remote_buffers {
2749 let response = client
2750 .request(proto::ReloadBuffers {
2751 project_id,
2752 buffer_ids: remote_buffers
2753 .iter()
2754 .map(|buffer| buffer.read_with(&cx, |buffer, _| buffer.remote_id()))
2755 .collect(),
2756 })
2757 .await?
2758 .transaction
2759 .ok_or_else(|| anyhow!("missing transaction"))?;
2760 project_transaction = this
2761 .update(&mut cx, |this, cx| {
2762 this.deserialize_project_transaction(response, push_to_history, cx)
2763 })
2764 .await?;
2765 }
2766
2767 for buffer in local_buffers {
2768 let transaction = buffer
2769 .update(&mut cx, |buffer, cx| buffer.reload(cx))
2770 .await?;
2771 buffer.update(&mut cx, |buffer, cx| {
2772 if let Some(transaction) = transaction {
2773 if !push_to_history {
2774 buffer.forget_transaction(transaction.id);
2775 }
2776 project_transaction.0.insert(cx.handle(), transaction);
2777 }
2778 });
2779 }
2780
2781 Ok(project_transaction)
2782 })
2783 }
2784
2785 pub fn format(
2786 &self,
2787 buffers: HashSet<ModelHandle<Buffer>>,
2788 push_to_history: bool,
2789 trigger: FormatTrigger,
2790 cx: &mut ModelContext<Project>,
2791 ) -> Task<Result<ProjectTransaction>> {
2792 let mut local_buffers = Vec::new();
2793 let mut remote_buffers = None;
2794 for buffer_handle in buffers {
2795 let buffer = buffer_handle.read(cx);
2796 if let Some(file) = File::from_dyn(buffer.file()) {
2797 if let Some(buffer_abs_path) = file.as_local().map(|f| f.abs_path(cx)) {
2798 if let Some((_, server)) = self.language_server_for_buffer(buffer, cx) {
2799 local_buffers.push((buffer_handle, buffer_abs_path, server.clone()));
2800 }
2801 } else {
2802 remote_buffers.get_or_insert(Vec::new()).push(buffer_handle);
2803 }
2804 } else {
2805 return Task::ready(Ok(Default::default()));
2806 }
2807 }
2808
2809 let remote_buffers = self.remote_id().zip(remote_buffers);
2810 let client = self.client.clone();
2811
2812 cx.spawn(|this, mut cx| async move {
2813 let mut project_transaction = ProjectTransaction::default();
2814
2815 if let Some((project_id, remote_buffers)) = remote_buffers {
2816 let response = client
2817 .request(proto::FormatBuffers {
2818 project_id,
2819 trigger: trigger as i32,
2820 buffer_ids: remote_buffers
2821 .iter()
2822 .map(|buffer| buffer.read_with(&cx, |buffer, _| buffer.remote_id()))
2823 .collect(),
2824 })
2825 .await?
2826 .transaction
2827 .ok_or_else(|| anyhow!("missing transaction"))?;
2828 project_transaction = this
2829 .update(&mut cx, |this, cx| {
2830 this.deserialize_project_transaction(response, push_to_history, cx)
2831 })
2832 .await?;
2833 }
2834
2835 // Do not allow multiple concurrent formatting requests for the
2836 // same buffer.
2837 this.update(&mut cx, |this, _| {
2838 local_buffers
2839 .retain(|(buffer, _, _)| this.buffers_being_formatted.insert(buffer.id()));
2840 });
2841 let _cleanup = defer({
2842 let this = this.clone();
2843 let mut cx = cx.clone();
2844 let local_buffers = &local_buffers;
2845 move || {
2846 this.update(&mut cx, |this, _| {
2847 for (buffer, _, _) in local_buffers {
2848 this.buffers_being_formatted.remove(&buffer.id());
2849 }
2850 });
2851 }
2852 });
2853
2854 for (buffer, buffer_abs_path, language_server) in &local_buffers {
2855 let (format_on_save, formatter, tab_size) = buffer.read_with(&cx, |buffer, cx| {
2856 let settings = cx.global::<Settings>();
2857 let language_name = buffer.language().map(|language| language.name());
2858 (
2859 settings.format_on_save(language_name.as_deref()),
2860 settings.formatter(language_name.as_deref()),
2861 settings.tab_size(language_name.as_deref()),
2862 )
2863 });
2864
2865 let transaction = match (formatter, format_on_save) {
2866 (_, FormatOnSave::Off) if trigger == FormatTrigger::Save => continue,
2867
2868 (Formatter::LanguageServer, FormatOnSave::On | FormatOnSave::Off)
2869 | (_, FormatOnSave::LanguageServer) => Self::format_via_lsp(
2870 &this,
2871 &buffer,
2872 &buffer_abs_path,
2873 &language_server,
2874 tab_size,
2875 &mut cx,
2876 )
2877 .await
2878 .context("failed to format via language server")?,
2879
2880 (
2881 Formatter::External { command, arguments },
2882 FormatOnSave::On | FormatOnSave::Off,
2883 )
2884 | (_, FormatOnSave::External { command, arguments }) => {
2885 Self::format_via_external_command(
2886 &buffer,
2887 &buffer_abs_path,
2888 &command,
2889 &arguments,
2890 &mut cx,
2891 )
2892 .await
2893 .context(format!(
2894 "failed to format via external command {:?}",
2895 command
2896 ))?
2897 }
2898 };
2899
2900 if let Some(transaction) = transaction {
2901 if !push_to_history {
2902 buffer.update(&mut cx, |buffer, _| {
2903 buffer.forget_transaction(transaction.id)
2904 });
2905 }
2906 project_transaction.0.insert(buffer.clone(), transaction);
2907 }
2908 }
2909
2910 Ok(project_transaction)
2911 })
2912 }
2913
2914 async fn format_via_lsp(
2915 this: &ModelHandle<Self>,
2916 buffer: &ModelHandle<Buffer>,
2917 abs_path: &Path,
2918 language_server: &Arc<LanguageServer>,
2919 tab_size: NonZeroU32,
2920 cx: &mut AsyncAppContext,
2921 ) -> Result<Option<Transaction>> {
2922 let text_document =
2923 lsp::TextDocumentIdentifier::new(lsp::Url::from_file_path(abs_path).unwrap());
2924 let capabilities = &language_server.capabilities();
2925 let lsp_edits = if capabilities
2926 .document_formatting_provider
2927 .as_ref()
2928 .map_or(false, |provider| *provider != lsp::OneOf::Left(false))
2929 {
2930 language_server
2931 .request::<lsp::request::Formatting>(lsp::DocumentFormattingParams {
2932 text_document,
2933 options: lsp::FormattingOptions {
2934 tab_size: tab_size.into(),
2935 insert_spaces: true,
2936 insert_final_newline: Some(true),
2937 ..Default::default()
2938 },
2939 work_done_progress_params: Default::default(),
2940 })
2941 .await?
2942 } else if capabilities
2943 .document_range_formatting_provider
2944 .as_ref()
2945 .map_or(false, |provider| *provider != lsp::OneOf::Left(false))
2946 {
2947 let buffer_start = lsp::Position::new(0, 0);
2948 let buffer_end =
2949 buffer.read_with(cx, |buffer, _| point_to_lsp(buffer.max_point_utf16()));
2950 language_server
2951 .request::<lsp::request::RangeFormatting>(lsp::DocumentRangeFormattingParams {
2952 text_document,
2953 range: lsp::Range::new(buffer_start, buffer_end),
2954 options: lsp::FormattingOptions {
2955 tab_size: tab_size.into(),
2956 insert_spaces: true,
2957 insert_final_newline: Some(true),
2958 ..Default::default()
2959 },
2960 work_done_progress_params: Default::default(),
2961 })
2962 .await?
2963 } else {
2964 None
2965 };
2966
2967 if let Some(lsp_edits) = lsp_edits {
2968 let edits = this
2969 .update(cx, |this, cx| {
2970 this.edits_from_lsp(buffer, lsp_edits, None, cx)
2971 })
2972 .await?;
2973 buffer.update(cx, |buffer, cx| {
2974 buffer.finalize_last_transaction();
2975 buffer.start_transaction();
2976 for (range, text) in edits {
2977 buffer.edit([(range, text)], None, cx);
2978 }
2979 if buffer.end_transaction(cx).is_some() {
2980 let transaction = buffer.finalize_last_transaction().unwrap().clone();
2981 Ok(Some(transaction))
2982 } else {
2983 Ok(None)
2984 }
2985 })
2986 } else {
2987 Ok(None)
2988 }
2989 }
2990
2991 async fn format_via_external_command(
2992 buffer: &ModelHandle<Buffer>,
2993 buffer_abs_path: &Path,
2994 command: &str,
2995 arguments: &[String],
2996 cx: &mut AsyncAppContext,
2997 ) -> Result<Option<Transaction>> {
2998 let working_dir_path = buffer.read_with(cx, |buffer, cx| {
2999 let file = File::from_dyn(buffer.file())?;
3000 let worktree = file.worktree.read(cx).as_local()?;
3001 let mut worktree_path = worktree.abs_path().to_path_buf();
3002 if worktree.root_entry()?.is_file() {
3003 worktree_path.pop();
3004 }
3005 Some(worktree_path)
3006 });
3007
3008 if let Some(working_dir_path) = working_dir_path {
3009 let mut child =
3010 smol::process::Command::new(command)
3011 .args(arguments.iter().map(|arg| {
3012 arg.replace("{buffer_path}", &buffer_abs_path.to_string_lossy())
3013 }))
3014 .current_dir(&working_dir_path)
3015 .stdin(smol::process::Stdio::piped())
3016 .stdout(smol::process::Stdio::piped())
3017 .stderr(smol::process::Stdio::piped())
3018 .spawn()?;
3019 let stdin = child
3020 .stdin
3021 .as_mut()
3022 .ok_or_else(|| anyhow!("failed to acquire stdin"))?;
3023 let text = buffer.read_with(cx, |buffer, _| buffer.as_rope().clone());
3024 for chunk in text.chunks() {
3025 stdin.write_all(chunk.as_bytes()).await?;
3026 }
3027 stdin.flush().await?;
3028
3029 let output = child.output().await?;
3030 if !output.status.success() {
3031 return Err(anyhow!(
3032 "command failed with exit code {:?}:\nstdout: {}\nstderr: {}",
3033 output.status.code(),
3034 String::from_utf8_lossy(&output.stdout),
3035 String::from_utf8_lossy(&output.stderr),
3036 ));
3037 }
3038
3039 let stdout = String::from_utf8(output.stdout)?;
3040 let diff = buffer
3041 .read_with(cx, |buffer, cx| buffer.diff(stdout, cx))
3042 .await;
3043 Ok(buffer.update(cx, |buffer, cx| buffer.apply_diff(diff, cx).cloned()))
3044 } else {
3045 Ok(None)
3046 }
3047 }
3048
3049 pub fn definition<T: ToPointUtf16>(
3050 &self,
3051 buffer: &ModelHandle<Buffer>,
3052 position: T,
3053 cx: &mut ModelContext<Self>,
3054 ) -> Task<Result<Vec<LocationLink>>> {
3055 let position = position.to_point_utf16(buffer.read(cx));
3056 self.request_lsp(buffer.clone(), GetDefinition { position }, cx)
3057 }
3058
3059 pub fn type_definition<T: ToPointUtf16>(
3060 &self,
3061 buffer: &ModelHandle<Buffer>,
3062 position: T,
3063 cx: &mut ModelContext<Self>,
3064 ) -> Task<Result<Vec<LocationLink>>> {
3065 let position = position.to_point_utf16(buffer.read(cx));
3066 self.request_lsp(buffer.clone(), GetTypeDefinition { position }, cx)
3067 }
3068
3069 pub fn references<T: ToPointUtf16>(
3070 &self,
3071 buffer: &ModelHandle<Buffer>,
3072 position: T,
3073 cx: &mut ModelContext<Self>,
3074 ) -> Task<Result<Vec<Location>>> {
3075 let position = position.to_point_utf16(buffer.read(cx));
3076 self.request_lsp(buffer.clone(), GetReferences { position }, cx)
3077 }
3078
3079 pub fn document_highlights<T: ToPointUtf16>(
3080 &self,
3081 buffer: &ModelHandle<Buffer>,
3082 position: T,
3083 cx: &mut ModelContext<Self>,
3084 ) -> Task<Result<Vec<DocumentHighlight>>> {
3085 let position = position.to_point_utf16(buffer.read(cx));
3086 self.request_lsp(buffer.clone(), GetDocumentHighlights { position }, cx)
3087 }
3088
3089 pub fn symbols(&self, query: &str, cx: &mut ModelContext<Self>) -> Task<Result<Vec<Symbol>>> {
3090 if self.is_local() {
3091 let mut requests = Vec::new();
3092 for ((worktree_id, _), server_id) in self.language_server_ids.iter() {
3093 let worktree_id = *worktree_id;
3094 if let Some(worktree) = self
3095 .worktree_for_id(worktree_id, cx)
3096 .and_then(|worktree| worktree.read(cx).as_local())
3097 {
3098 if let Some(LanguageServerState::Running {
3099 adapter,
3100 language,
3101 server,
3102 }) = self.language_servers.get(server_id)
3103 {
3104 let adapter = adapter.clone();
3105 let language = language.clone();
3106 let worktree_abs_path = worktree.abs_path().clone();
3107 requests.push(
3108 server
3109 .request::<lsp::request::WorkspaceSymbol>(
3110 lsp::WorkspaceSymbolParams {
3111 query: query.to_string(),
3112 ..Default::default()
3113 },
3114 )
3115 .log_err()
3116 .map(move |response| {
3117 (
3118 adapter,
3119 language,
3120 worktree_id,
3121 worktree_abs_path,
3122 response.unwrap_or_default(),
3123 )
3124 }),
3125 );
3126 }
3127 }
3128 }
3129
3130 cx.spawn_weak(|this, cx| async move {
3131 let responses = futures::future::join_all(requests).await;
3132 let this = if let Some(this) = this.upgrade(&cx) {
3133 this
3134 } else {
3135 return Ok(Default::default());
3136 };
3137 let symbols = this.read_with(&cx, |this, cx| {
3138 let mut symbols = Vec::new();
3139 for (
3140 adapter,
3141 adapter_language,
3142 source_worktree_id,
3143 worktree_abs_path,
3144 response,
3145 ) in responses
3146 {
3147 symbols.extend(response.into_iter().flatten().filter_map(|lsp_symbol| {
3148 let abs_path = lsp_symbol.location.uri.to_file_path().ok()?;
3149 let mut worktree_id = source_worktree_id;
3150 let path;
3151 if let Some((worktree, rel_path)) =
3152 this.find_local_worktree(&abs_path, cx)
3153 {
3154 worktree_id = worktree.read(cx).id();
3155 path = rel_path;
3156 } else {
3157 path = relativize_path(&worktree_abs_path, &abs_path);
3158 }
3159
3160 let project_path = ProjectPath {
3161 worktree_id,
3162 path: path.into(),
3163 };
3164 let signature = this.symbol_signature(&project_path);
3165 let language = this
3166 .languages
3167 .select_language(&project_path.path)
3168 .unwrap_or(adapter_language.clone());
3169 let language_server_name = adapter.name.clone();
3170 Some(async move {
3171 let label = language
3172 .label_for_symbol(&lsp_symbol.name, lsp_symbol.kind)
3173 .await;
3174
3175 Symbol {
3176 language_server_name,
3177 source_worktree_id,
3178 path: project_path,
3179 label: label.unwrap_or_else(|| {
3180 CodeLabel::plain(lsp_symbol.name.clone(), None)
3181 }),
3182 kind: lsp_symbol.kind,
3183 name: lsp_symbol.name,
3184 range: range_from_lsp(lsp_symbol.location.range),
3185 signature,
3186 }
3187 })
3188 }));
3189 }
3190 symbols
3191 });
3192 Ok(futures::future::join_all(symbols).await)
3193 })
3194 } else if let Some(project_id) = self.remote_id() {
3195 let request = self.client.request(proto::GetProjectSymbols {
3196 project_id,
3197 query: query.to_string(),
3198 });
3199 cx.spawn_weak(|this, cx| async move {
3200 let response = request.await?;
3201 let mut symbols = Vec::new();
3202 if let Some(this) = this.upgrade(&cx) {
3203 let new_symbols = this.read_with(&cx, |this, _| {
3204 response
3205 .symbols
3206 .into_iter()
3207 .map(|symbol| this.deserialize_symbol(symbol))
3208 .collect::<Vec<_>>()
3209 });
3210 symbols = futures::future::join_all(new_symbols)
3211 .await
3212 .into_iter()
3213 .filter_map(|symbol| symbol.log_err())
3214 .collect::<Vec<_>>();
3215 }
3216 Ok(symbols)
3217 })
3218 } else {
3219 Task::ready(Ok(Default::default()))
3220 }
3221 }
3222
3223 pub fn open_buffer_for_symbol(
3224 &mut self,
3225 symbol: &Symbol,
3226 cx: &mut ModelContext<Self>,
3227 ) -> Task<Result<ModelHandle<Buffer>>> {
3228 if self.is_local() {
3229 let language_server_id = if let Some(id) = self.language_server_ids.get(&(
3230 symbol.source_worktree_id,
3231 symbol.language_server_name.clone(),
3232 )) {
3233 *id
3234 } else {
3235 return Task::ready(Err(anyhow!(
3236 "language server for worktree and language not found"
3237 )));
3238 };
3239
3240 let worktree_abs_path = if let Some(worktree_abs_path) = self
3241 .worktree_for_id(symbol.path.worktree_id, cx)
3242 .and_then(|worktree| worktree.read(cx).as_local())
3243 .map(|local_worktree| local_worktree.abs_path())
3244 {
3245 worktree_abs_path
3246 } else {
3247 return Task::ready(Err(anyhow!("worktree not found for symbol")));
3248 };
3249 let symbol_abs_path = worktree_abs_path.join(&symbol.path.path);
3250 let symbol_uri = if let Ok(uri) = lsp::Url::from_file_path(symbol_abs_path) {
3251 uri
3252 } else {
3253 return Task::ready(Err(anyhow!("invalid symbol path")));
3254 };
3255
3256 self.open_local_buffer_via_lsp(
3257 symbol_uri,
3258 language_server_id,
3259 symbol.language_server_name.clone(),
3260 cx,
3261 )
3262 } else if let Some(project_id) = self.remote_id() {
3263 let request = self.client.request(proto::OpenBufferForSymbol {
3264 project_id,
3265 symbol: Some(serialize_symbol(symbol)),
3266 });
3267 cx.spawn(|this, mut cx| async move {
3268 let response = request.await?;
3269 this.update(&mut cx, |this, cx| {
3270 this.wait_for_buffer(response.buffer_id, cx)
3271 })
3272 .await
3273 })
3274 } else {
3275 Task::ready(Err(anyhow!("project does not have a remote id")))
3276 }
3277 }
3278
3279 pub fn hover<T: ToPointUtf16>(
3280 &self,
3281 buffer: &ModelHandle<Buffer>,
3282 position: T,
3283 cx: &mut ModelContext<Self>,
3284 ) -> Task<Result<Option<Hover>>> {
3285 let position = position.to_point_utf16(buffer.read(cx));
3286 self.request_lsp(buffer.clone(), GetHover { position }, cx)
3287 }
3288
3289 pub fn completions<T: ToPointUtf16>(
3290 &self,
3291 source_buffer_handle: &ModelHandle<Buffer>,
3292 position: T,
3293 cx: &mut ModelContext<Self>,
3294 ) -> Task<Result<Vec<Completion>>> {
3295 let source_buffer_handle = source_buffer_handle.clone();
3296 let source_buffer = source_buffer_handle.read(cx);
3297 let buffer_id = source_buffer.remote_id();
3298 let language = source_buffer.language().cloned();
3299 let worktree;
3300 let buffer_abs_path;
3301 if let Some(file) = File::from_dyn(source_buffer.file()) {
3302 worktree = file.worktree.clone();
3303 buffer_abs_path = file.as_local().map(|f| f.abs_path(cx));
3304 } else {
3305 return Task::ready(Ok(Default::default()));
3306 };
3307
3308 let position = Unclipped(position.to_point_utf16(source_buffer));
3309 let anchor = source_buffer.anchor_after(position);
3310
3311 if worktree.read(cx).as_local().is_some() {
3312 let buffer_abs_path = buffer_abs_path.unwrap();
3313 let lang_server =
3314 if let Some((_, server)) = self.language_server_for_buffer(source_buffer, cx) {
3315 server.clone()
3316 } else {
3317 return Task::ready(Ok(Default::default()));
3318 };
3319
3320 cx.spawn(|_, cx| async move {
3321 let completions = lang_server
3322 .request::<lsp::request::Completion>(lsp::CompletionParams {
3323 text_document_position: lsp::TextDocumentPositionParams::new(
3324 lsp::TextDocumentIdentifier::new(
3325 lsp::Url::from_file_path(buffer_abs_path).unwrap(),
3326 ),
3327 point_to_lsp(position.0),
3328 ),
3329 context: Default::default(),
3330 work_done_progress_params: Default::default(),
3331 partial_result_params: Default::default(),
3332 })
3333 .await
3334 .context("lsp completion request failed")?;
3335
3336 let completions = if let Some(completions) = completions {
3337 match completions {
3338 lsp::CompletionResponse::Array(completions) => completions,
3339 lsp::CompletionResponse::List(list) => list.items,
3340 }
3341 } else {
3342 Default::default()
3343 };
3344
3345 let completions = source_buffer_handle.read_with(&cx, |this, _| {
3346 let snapshot = this.snapshot();
3347 let clipped_position = this.clip_point_utf16(position, Bias::Left);
3348 let mut range_for_token = None;
3349 completions
3350 .into_iter()
3351 .filter_map(move |mut lsp_completion| {
3352 // For now, we can only handle additional edits if they are returned
3353 // when resolving the completion, not if they are present initially.
3354 if lsp_completion
3355 .additional_text_edits
3356 .as_ref()
3357 .map_or(false, |edits| !edits.is_empty())
3358 {
3359 return None;
3360 }
3361
3362 let (old_range, mut new_text) = match lsp_completion.text_edit.as_ref()
3363 {
3364 // If the language server provides a range to overwrite, then
3365 // check that the range is valid.
3366 Some(lsp::CompletionTextEdit::Edit(edit)) => {
3367 let range = range_from_lsp(edit.range);
3368 let start = snapshot.clip_point_utf16(range.start, Bias::Left);
3369 let end = snapshot.clip_point_utf16(range.end, Bias::Left);
3370 if start != range.start.0 || end != range.end.0 {
3371 log::info!("completion out of expected range");
3372 return None;
3373 }
3374 (
3375 snapshot.anchor_before(start)..snapshot.anchor_after(end),
3376 edit.new_text.clone(),
3377 )
3378 }
3379 // If the language server does not provide a range, then infer
3380 // the range based on the syntax tree.
3381 None => {
3382 if position.0 != clipped_position {
3383 log::info!("completion out of expected range");
3384 return None;
3385 }
3386 let Range { start, end } = range_for_token
3387 .get_or_insert_with(|| {
3388 let offset = position.to_offset(&snapshot);
3389 let (range, kind) = snapshot.surrounding_word(offset);
3390 if kind == Some(CharKind::Word) {
3391 range
3392 } else {
3393 offset..offset
3394 }
3395 })
3396 .clone();
3397 let text = lsp_completion
3398 .insert_text
3399 .as_ref()
3400 .unwrap_or(&lsp_completion.label)
3401 .clone();
3402 (
3403 snapshot.anchor_before(start)..snapshot.anchor_after(end),
3404 text,
3405 )
3406 }
3407 Some(lsp::CompletionTextEdit::InsertAndReplace(_)) => {
3408 log::info!("unsupported insert/replace completion");
3409 return None;
3410 }
3411 };
3412
3413 LineEnding::normalize(&mut new_text);
3414 let language = language.clone();
3415 Some(async move {
3416 let mut label = None;
3417 if let Some(language) = language {
3418 language.process_completion(&mut lsp_completion).await;
3419 label = language.label_for_completion(&lsp_completion).await;
3420 }
3421 Completion {
3422 old_range,
3423 new_text,
3424 label: label.unwrap_or_else(|| {
3425 CodeLabel::plain(
3426 lsp_completion.label.clone(),
3427 lsp_completion.filter_text.as_deref(),
3428 )
3429 }),
3430 lsp_completion,
3431 }
3432 })
3433 })
3434 });
3435
3436 Ok(futures::future::join_all(completions).await)
3437 })
3438 } else if let Some(project_id) = self.remote_id() {
3439 let rpc = self.client.clone();
3440 let message = proto::GetCompletions {
3441 project_id,
3442 buffer_id,
3443 position: Some(language::proto::serialize_anchor(&anchor)),
3444 version: serialize_version(&source_buffer.version()),
3445 };
3446 cx.spawn_weak(|this, mut cx| async move {
3447 let response = rpc.request(message).await?;
3448
3449 if this
3450 .upgrade(&cx)
3451 .ok_or_else(|| anyhow!("project was dropped"))?
3452 .read_with(&cx, |this, _| this.is_read_only())
3453 {
3454 return Err(anyhow!(
3455 "failed to get completions: project was disconnected"
3456 ));
3457 } else {
3458 source_buffer_handle
3459 .update(&mut cx, |buffer, _| {
3460 buffer.wait_for_version(deserialize_version(response.version))
3461 })
3462 .await;
3463
3464 let completions = response.completions.into_iter().map(|completion| {
3465 language::proto::deserialize_completion(completion, language.clone())
3466 });
3467 futures::future::try_join_all(completions).await
3468 }
3469 })
3470 } else {
3471 Task::ready(Ok(Default::default()))
3472 }
3473 }
3474
3475 pub fn apply_additional_edits_for_completion(
3476 &self,
3477 buffer_handle: ModelHandle<Buffer>,
3478 completion: Completion,
3479 push_to_history: bool,
3480 cx: &mut ModelContext<Self>,
3481 ) -> Task<Result<Option<Transaction>>> {
3482 let buffer = buffer_handle.read(cx);
3483 let buffer_id = buffer.remote_id();
3484
3485 if self.is_local() {
3486 let lang_server = match self.language_server_for_buffer(buffer, cx) {
3487 Some((_, server)) => server.clone(),
3488 _ => return Task::ready(Ok(Default::default())),
3489 };
3490
3491 cx.spawn(|this, mut cx| async move {
3492 let resolved_completion = lang_server
3493 .request::<lsp::request::ResolveCompletionItem>(completion.lsp_completion)
3494 .await?;
3495
3496 if let Some(edits) = resolved_completion.additional_text_edits {
3497 let edits = this
3498 .update(&mut cx, |this, cx| {
3499 this.edits_from_lsp(&buffer_handle, edits, None, cx)
3500 })
3501 .await?;
3502
3503 buffer_handle.update(&mut cx, |buffer, cx| {
3504 buffer.finalize_last_transaction();
3505 buffer.start_transaction();
3506
3507 for (range, text) in edits {
3508 let primary = &completion.old_range;
3509 let start_within = primary.start.cmp(&range.start, buffer).is_le()
3510 && primary.end.cmp(&range.start, buffer).is_ge();
3511 let end_within = range.start.cmp(&primary.end, buffer).is_le()
3512 && range.end.cmp(&primary.end, buffer).is_ge();
3513
3514 //Skip addtional edits which overlap with the primary completion edit
3515 //https://github.com/zed-industries/zed/pull/1871
3516 if !start_within && !end_within {
3517 buffer.edit([(range, text)], None, cx);
3518 }
3519 }
3520
3521 let transaction = if buffer.end_transaction(cx).is_some() {
3522 let transaction = buffer.finalize_last_transaction().unwrap().clone();
3523 if !push_to_history {
3524 buffer.forget_transaction(transaction.id);
3525 }
3526 Some(transaction)
3527 } else {
3528 None
3529 };
3530 Ok(transaction)
3531 })
3532 } else {
3533 Ok(None)
3534 }
3535 })
3536 } else if let Some(project_id) = self.remote_id() {
3537 let client = self.client.clone();
3538 cx.spawn(|_, mut cx| async move {
3539 let response = client
3540 .request(proto::ApplyCompletionAdditionalEdits {
3541 project_id,
3542 buffer_id,
3543 completion: Some(language::proto::serialize_completion(&completion)),
3544 })
3545 .await?;
3546
3547 if let Some(transaction) = response.transaction {
3548 let transaction = language::proto::deserialize_transaction(transaction)?;
3549 buffer_handle
3550 .update(&mut cx, |buffer, _| {
3551 buffer.wait_for_edits(transaction.edit_ids.iter().copied())
3552 })
3553 .await;
3554 if push_to_history {
3555 buffer_handle.update(&mut cx, |buffer, _| {
3556 buffer.push_transaction(transaction.clone(), Instant::now());
3557 });
3558 }
3559 Ok(Some(transaction))
3560 } else {
3561 Ok(None)
3562 }
3563 })
3564 } else {
3565 Task::ready(Err(anyhow!("project does not have a remote id")))
3566 }
3567 }
3568
3569 pub fn code_actions<T: Clone + ToOffset>(
3570 &self,
3571 buffer_handle: &ModelHandle<Buffer>,
3572 range: Range<T>,
3573 cx: &mut ModelContext<Self>,
3574 ) -> Task<Result<Vec<CodeAction>>> {
3575 let buffer_handle = buffer_handle.clone();
3576 let buffer = buffer_handle.read(cx);
3577 let snapshot = buffer.snapshot();
3578 let relevant_diagnostics = snapshot
3579 .diagnostics_in_range::<usize, usize>(range.to_offset(&snapshot), false)
3580 .map(|entry| entry.to_lsp_diagnostic_stub())
3581 .collect();
3582 let buffer_id = buffer.remote_id();
3583 let worktree;
3584 let buffer_abs_path;
3585 if let Some(file) = File::from_dyn(buffer.file()) {
3586 worktree = file.worktree.clone();
3587 buffer_abs_path = file.as_local().map(|f| f.abs_path(cx));
3588 } else {
3589 return Task::ready(Ok(Default::default()));
3590 };
3591 let range = buffer.anchor_before(range.start)..buffer.anchor_before(range.end);
3592
3593 if worktree.read(cx).as_local().is_some() {
3594 let buffer_abs_path = buffer_abs_path.unwrap();
3595 let lang_server = if let Some((_, server)) = self.language_server_for_buffer(buffer, cx)
3596 {
3597 server.clone()
3598 } else {
3599 return Task::ready(Ok(Default::default()));
3600 };
3601
3602 let lsp_range = range_to_lsp(range.to_point_utf16(buffer));
3603 cx.foreground().spawn(async move {
3604 if lang_server.capabilities().code_action_provider.is_none() {
3605 return Ok(Default::default());
3606 }
3607
3608 Ok(lang_server
3609 .request::<lsp::request::CodeActionRequest>(lsp::CodeActionParams {
3610 text_document: lsp::TextDocumentIdentifier::new(
3611 lsp::Url::from_file_path(buffer_abs_path).unwrap(),
3612 ),
3613 range: lsp_range,
3614 work_done_progress_params: Default::default(),
3615 partial_result_params: Default::default(),
3616 context: lsp::CodeActionContext {
3617 diagnostics: relevant_diagnostics,
3618 only: Some(vec![
3619 lsp::CodeActionKind::EMPTY,
3620 lsp::CodeActionKind::QUICKFIX,
3621 lsp::CodeActionKind::REFACTOR,
3622 lsp::CodeActionKind::REFACTOR_EXTRACT,
3623 lsp::CodeActionKind::SOURCE,
3624 ]),
3625 },
3626 })
3627 .await?
3628 .unwrap_or_default()
3629 .into_iter()
3630 .filter_map(|entry| {
3631 if let lsp::CodeActionOrCommand::CodeAction(lsp_action) = entry {
3632 Some(CodeAction {
3633 range: range.clone(),
3634 lsp_action,
3635 })
3636 } else {
3637 None
3638 }
3639 })
3640 .collect())
3641 })
3642 } else if let Some(project_id) = self.remote_id() {
3643 let rpc = self.client.clone();
3644 let version = buffer.version();
3645 cx.spawn_weak(|this, mut cx| async move {
3646 let response = rpc
3647 .request(proto::GetCodeActions {
3648 project_id,
3649 buffer_id,
3650 start: Some(language::proto::serialize_anchor(&range.start)),
3651 end: Some(language::proto::serialize_anchor(&range.end)),
3652 version: serialize_version(&version),
3653 })
3654 .await?;
3655
3656 if this
3657 .upgrade(&cx)
3658 .ok_or_else(|| anyhow!("project was dropped"))?
3659 .read_with(&cx, |this, _| this.is_read_only())
3660 {
3661 return Err(anyhow!(
3662 "failed to get code actions: project was disconnected"
3663 ));
3664 } else {
3665 buffer_handle
3666 .update(&mut cx, |buffer, _| {
3667 buffer.wait_for_version(deserialize_version(response.version))
3668 })
3669 .await;
3670
3671 response
3672 .actions
3673 .into_iter()
3674 .map(language::proto::deserialize_code_action)
3675 .collect()
3676 }
3677 })
3678 } else {
3679 Task::ready(Ok(Default::default()))
3680 }
3681 }
3682
3683 pub fn apply_code_action(
3684 &self,
3685 buffer_handle: ModelHandle<Buffer>,
3686 mut action: CodeAction,
3687 push_to_history: bool,
3688 cx: &mut ModelContext<Self>,
3689 ) -> Task<Result<ProjectTransaction>> {
3690 if self.is_local() {
3691 let buffer = buffer_handle.read(cx);
3692 let (lsp_adapter, lang_server) =
3693 if let Some((adapter, server)) = self.language_server_for_buffer(buffer, cx) {
3694 (adapter.clone(), server.clone())
3695 } else {
3696 return Task::ready(Ok(Default::default()));
3697 };
3698 let range = action.range.to_point_utf16(buffer);
3699
3700 cx.spawn(|this, mut cx| async move {
3701 if let Some(lsp_range) = action
3702 .lsp_action
3703 .data
3704 .as_mut()
3705 .and_then(|d| d.get_mut("codeActionParams"))
3706 .and_then(|d| d.get_mut("range"))
3707 {
3708 *lsp_range = serde_json::to_value(&range_to_lsp(range)).unwrap();
3709 action.lsp_action = lang_server
3710 .request::<lsp::request::CodeActionResolveRequest>(action.lsp_action)
3711 .await?;
3712 } else {
3713 let actions = this
3714 .update(&mut cx, |this, cx| {
3715 this.code_actions(&buffer_handle, action.range, cx)
3716 })
3717 .await?;
3718 action.lsp_action = actions
3719 .into_iter()
3720 .find(|a| a.lsp_action.title == action.lsp_action.title)
3721 .ok_or_else(|| anyhow!("code action is outdated"))?
3722 .lsp_action;
3723 }
3724
3725 if let Some(edit) = action.lsp_action.edit {
3726 if edit.changes.is_some() || edit.document_changes.is_some() {
3727 return Self::deserialize_workspace_edit(
3728 this,
3729 edit,
3730 push_to_history,
3731 lsp_adapter.clone(),
3732 lang_server.clone(),
3733 &mut cx,
3734 )
3735 .await;
3736 }
3737 }
3738
3739 if let Some(command) = action.lsp_action.command {
3740 this.update(&mut cx, |this, _| {
3741 this.last_workspace_edits_by_language_server
3742 .remove(&lang_server.server_id());
3743 });
3744 lang_server
3745 .request::<lsp::request::ExecuteCommand>(lsp::ExecuteCommandParams {
3746 command: command.command,
3747 arguments: command.arguments.unwrap_or_default(),
3748 ..Default::default()
3749 })
3750 .await?;
3751 return Ok(this.update(&mut cx, |this, _| {
3752 this.last_workspace_edits_by_language_server
3753 .remove(&lang_server.server_id())
3754 .unwrap_or_default()
3755 }));
3756 }
3757
3758 Ok(ProjectTransaction::default())
3759 })
3760 } else if let Some(project_id) = self.remote_id() {
3761 let client = self.client.clone();
3762 let request = proto::ApplyCodeAction {
3763 project_id,
3764 buffer_id: buffer_handle.read(cx).remote_id(),
3765 action: Some(language::proto::serialize_code_action(&action)),
3766 };
3767 cx.spawn(|this, mut cx| async move {
3768 let response = client
3769 .request(request)
3770 .await?
3771 .transaction
3772 .ok_or_else(|| anyhow!("missing transaction"))?;
3773 this.update(&mut cx, |this, cx| {
3774 this.deserialize_project_transaction(response, push_to_history, cx)
3775 })
3776 .await
3777 })
3778 } else {
3779 Task::ready(Err(anyhow!("project does not have a remote id")))
3780 }
3781 }
3782
3783 async fn deserialize_workspace_edit(
3784 this: ModelHandle<Self>,
3785 edit: lsp::WorkspaceEdit,
3786 push_to_history: bool,
3787 lsp_adapter: Arc<CachedLspAdapter>,
3788 language_server: Arc<LanguageServer>,
3789 cx: &mut AsyncAppContext,
3790 ) -> Result<ProjectTransaction> {
3791 let fs = this.read_with(cx, |this, _| this.fs.clone());
3792 let mut operations = Vec::new();
3793 if let Some(document_changes) = edit.document_changes {
3794 match document_changes {
3795 lsp::DocumentChanges::Edits(edits) => {
3796 operations.extend(edits.into_iter().map(lsp::DocumentChangeOperation::Edit))
3797 }
3798 lsp::DocumentChanges::Operations(ops) => operations = ops,
3799 }
3800 } else if let Some(changes) = edit.changes {
3801 operations.extend(changes.into_iter().map(|(uri, edits)| {
3802 lsp::DocumentChangeOperation::Edit(lsp::TextDocumentEdit {
3803 text_document: lsp::OptionalVersionedTextDocumentIdentifier {
3804 uri,
3805 version: None,
3806 },
3807 edits: edits.into_iter().map(lsp::OneOf::Left).collect(),
3808 })
3809 }));
3810 }
3811
3812 let mut project_transaction = ProjectTransaction::default();
3813 for operation in operations {
3814 match operation {
3815 lsp::DocumentChangeOperation::Op(lsp::ResourceOp::Create(op)) => {
3816 let abs_path = op
3817 .uri
3818 .to_file_path()
3819 .map_err(|_| anyhow!("can't convert URI to path"))?;
3820
3821 if let Some(parent_path) = abs_path.parent() {
3822 fs.create_dir(parent_path).await?;
3823 }
3824 if abs_path.ends_with("/") {
3825 fs.create_dir(&abs_path).await?;
3826 } else {
3827 fs.create_file(&abs_path, op.options.map(Into::into).unwrap_or_default())
3828 .await?;
3829 }
3830 }
3831 lsp::DocumentChangeOperation::Op(lsp::ResourceOp::Rename(op)) => {
3832 let source_abs_path = op
3833 .old_uri
3834 .to_file_path()
3835 .map_err(|_| anyhow!("can't convert URI to path"))?;
3836 let target_abs_path = op
3837 .new_uri
3838 .to_file_path()
3839 .map_err(|_| anyhow!("can't convert URI to path"))?;
3840 fs.rename(
3841 &source_abs_path,
3842 &target_abs_path,
3843 op.options.map(Into::into).unwrap_or_default(),
3844 )
3845 .await?;
3846 }
3847 lsp::DocumentChangeOperation::Op(lsp::ResourceOp::Delete(op)) => {
3848 let abs_path = op
3849 .uri
3850 .to_file_path()
3851 .map_err(|_| anyhow!("can't convert URI to path"))?;
3852 let options = op.options.map(Into::into).unwrap_or_default();
3853 if abs_path.ends_with("/") {
3854 fs.remove_dir(&abs_path, options).await?;
3855 } else {
3856 fs.remove_file(&abs_path, options).await?;
3857 }
3858 }
3859 lsp::DocumentChangeOperation::Edit(op) => {
3860 let buffer_to_edit = this
3861 .update(cx, |this, cx| {
3862 this.open_local_buffer_via_lsp(
3863 op.text_document.uri,
3864 language_server.server_id(),
3865 lsp_adapter.name.clone(),
3866 cx,
3867 )
3868 })
3869 .await?;
3870
3871 let edits = this
3872 .update(cx, |this, cx| {
3873 let edits = op.edits.into_iter().map(|edit| match edit {
3874 lsp::OneOf::Left(edit) => edit,
3875 lsp::OneOf::Right(edit) => edit.text_edit,
3876 });
3877 this.edits_from_lsp(
3878 &buffer_to_edit,
3879 edits,
3880 op.text_document.version,
3881 cx,
3882 )
3883 })
3884 .await?;
3885
3886 let transaction = buffer_to_edit.update(cx, |buffer, cx| {
3887 buffer.finalize_last_transaction();
3888 buffer.start_transaction();
3889 for (range, text) in edits {
3890 buffer.edit([(range, text)], None, cx);
3891 }
3892 let transaction = if buffer.end_transaction(cx).is_some() {
3893 let transaction = buffer.finalize_last_transaction().unwrap().clone();
3894 if !push_to_history {
3895 buffer.forget_transaction(transaction.id);
3896 }
3897 Some(transaction)
3898 } else {
3899 None
3900 };
3901
3902 transaction
3903 });
3904 if let Some(transaction) = transaction {
3905 project_transaction.0.insert(buffer_to_edit, transaction);
3906 }
3907 }
3908 }
3909 }
3910
3911 Ok(project_transaction)
3912 }
3913
3914 pub fn prepare_rename<T: ToPointUtf16>(
3915 &self,
3916 buffer: ModelHandle<Buffer>,
3917 position: T,
3918 cx: &mut ModelContext<Self>,
3919 ) -> Task<Result<Option<Range<Anchor>>>> {
3920 let position = position.to_point_utf16(buffer.read(cx));
3921 self.request_lsp(buffer, PrepareRename { position }, cx)
3922 }
3923
3924 pub fn perform_rename<T: ToPointUtf16>(
3925 &self,
3926 buffer: ModelHandle<Buffer>,
3927 position: T,
3928 new_name: String,
3929 push_to_history: bool,
3930 cx: &mut ModelContext<Self>,
3931 ) -> Task<Result<ProjectTransaction>> {
3932 let position = position.to_point_utf16(buffer.read(cx));
3933 self.request_lsp(
3934 buffer,
3935 PerformRename {
3936 position,
3937 new_name,
3938 push_to_history,
3939 },
3940 cx,
3941 )
3942 }
3943
3944 #[allow(clippy::type_complexity)]
3945 pub fn search(
3946 &self,
3947 query: SearchQuery,
3948 cx: &mut ModelContext<Self>,
3949 ) -> Task<Result<HashMap<ModelHandle<Buffer>, Vec<Range<Anchor>>>>> {
3950 if self.is_local() {
3951 let snapshots = self
3952 .visible_worktrees(cx)
3953 .filter_map(|tree| {
3954 let tree = tree.read(cx).as_local()?;
3955 Some(tree.snapshot())
3956 })
3957 .collect::<Vec<_>>();
3958
3959 let background = cx.background().clone();
3960 let path_count: usize = snapshots.iter().map(|s| s.visible_file_count()).sum();
3961 if path_count == 0 {
3962 return Task::ready(Ok(Default::default()));
3963 }
3964 let workers = background.num_cpus().min(path_count);
3965 let (matching_paths_tx, mut matching_paths_rx) = smol::channel::bounded(1024);
3966 cx.background()
3967 .spawn({
3968 let fs = self.fs.clone();
3969 let background = cx.background().clone();
3970 let query = query.clone();
3971 async move {
3972 let fs = &fs;
3973 let query = &query;
3974 let matching_paths_tx = &matching_paths_tx;
3975 let paths_per_worker = (path_count + workers - 1) / workers;
3976 let snapshots = &snapshots;
3977 background
3978 .scoped(|scope| {
3979 for worker_ix in 0..workers {
3980 let worker_start_ix = worker_ix * paths_per_worker;
3981 let worker_end_ix = worker_start_ix + paths_per_worker;
3982 scope.spawn(async move {
3983 let mut snapshot_start_ix = 0;
3984 let mut abs_path = PathBuf::new();
3985 for snapshot in snapshots {
3986 let snapshot_end_ix =
3987 snapshot_start_ix + snapshot.visible_file_count();
3988 if worker_end_ix <= snapshot_start_ix {
3989 break;
3990 } else if worker_start_ix > snapshot_end_ix {
3991 snapshot_start_ix = snapshot_end_ix;
3992 continue;
3993 } else {
3994 let start_in_snapshot = worker_start_ix
3995 .saturating_sub(snapshot_start_ix);
3996 let end_in_snapshot =
3997 cmp::min(worker_end_ix, snapshot_end_ix)
3998 - snapshot_start_ix;
3999
4000 for entry in snapshot
4001 .files(false, start_in_snapshot)
4002 .take(end_in_snapshot - start_in_snapshot)
4003 {
4004 if matching_paths_tx.is_closed() {
4005 break;
4006 }
4007
4008 abs_path.clear();
4009 abs_path.push(&snapshot.abs_path());
4010 abs_path.push(&entry.path);
4011 let matches = if let Some(file) =
4012 fs.open_sync(&abs_path).await.log_err()
4013 {
4014 query.detect(file).unwrap_or(false)
4015 } else {
4016 false
4017 };
4018
4019 if matches {
4020 let project_path =
4021 (snapshot.id(), entry.path.clone());
4022 if matching_paths_tx
4023 .send(project_path)
4024 .await
4025 .is_err()
4026 {
4027 break;
4028 }
4029 }
4030 }
4031
4032 snapshot_start_ix = snapshot_end_ix;
4033 }
4034 }
4035 });
4036 }
4037 })
4038 .await;
4039 }
4040 })
4041 .detach();
4042
4043 let (buffers_tx, buffers_rx) = smol::channel::bounded(1024);
4044 let open_buffers = self
4045 .opened_buffers
4046 .values()
4047 .filter_map(|b| b.upgrade(cx))
4048 .collect::<HashSet<_>>();
4049 cx.spawn(|this, cx| async move {
4050 for buffer in &open_buffers {
4051 let snapshot = buffer.read_with(&cx, |buffer, _| buffer.snapshot());
4052 buffers_tx.send((buffer.clone(), snapshot)).await?;
4053 }
4054
4055 let open_buffers = Rc::new(RefCell::new(open_buffers));
4056 while let Some(project_path) = matching_paths_rx.next().await {
4057 if buffers_tx.is_closed() {
4058 break;
4059 }
4060
4061 let this = this.clone();
4062 let open_buffers = open_buffers.clone();
4063 let buffers_tx = buffers_tx.clone();
4064 cx.spawn(|mut cx| async move {
4065 if let Some(buffer) = this
4066 .update(&mut cx, |this, cx| this.open_buffer(project_path, cx))
4067 .await
4068 .log_err()
4069 {
4070 if open_buffers.borrow_mut().insert(buffer.clone()) {
4071 let snapshot = buffer.read_with(&cx, |buffer, _| buffer.snapshot());
4072 buffers_tx.send((buffer, snapshot)).await?;
4073 }
4074 }
4075
4076 Ok::<_, anyhow::Error>(())
4077 })
4078 .detach();
4079 }
4080
4081 Ok::<_, anyhow::Error>(())
4082 })
4083 .detach_and_log_err(cx);
4084
4085 let background = cx.background().clone();
4086 cx.background().spawn(async move {
4087 let query = &query;
4088 let mut matched_buffers = Vec::new();
4089 for _ in 0..workers {
4090 matched_buffers.push(HashMap::default());
4091 }
4092 background
4093 .scoped(|scope| {
4094 for worker_matched_buffers in matched_buffers.iter_mut() {
4095 let mut buffers_rx = buffers_rx.clone();
4096 scope.spawn(async move {
4097 while let Some((buffer, snapshot)) = buffers_rx.next().await {
4098 let buffer_matches = query
4099 .search(snapshot.as_rope())
4100 .await
4101 .iter()
4102 .map(|range| {
4103 snapshot.anchor_before(range.start)
4104 ..snapshot.anchor_after(range.end)
4105 })
4106 .collect::<Vec<_>>();
4107 if !buffer_matches.is_empty() {
4108 worker_matched_buffers
4109 .insert(buffer.clone(), buffer_matches);
4110 }
4111 }
4112 });
4113 }
4114 })
4115 .await;
4116 Ok(matched_buffers.into_iter().flatten().collect())
4117 })
4118 } else if let Some(project_id) = self.remote_id() {
4119 let request = self.client.request(query.to_proto(project_id));
4120 cx.spawn(|this, mut cx| async move {
4121 let response = request.await?;
4122 let mut result = HashMap::default();
4123 for location in response.locations {
4124 let target_buffer = this
4125 .update(&mut cx, |this, cx| {
4126 this.wait_for_buffer(location.buffer_id, cx)
4127 })
4128 .await?;
4129 let start = location
4130 .start
4131 .and_then(deserialize_anchor)
4132 .ok_or_else(|| anyhow!("missing target start"))?;
4133 let end = location
4134 .end
4135 .and_then(deserialize_anchor)
4136 .ok_or_else(|| anyhow!("missing target end"))?;
4137 result
4138 .entry(target_buffer)
4139 .or_insert(Vec::new())
4140 .push(start..end)
4141 }
4142 Ok(result)
4143 })
4144 } else {
4145 Task::ready(Ok(Default::default()))
4146 }
4147 }
4148
4149 fn request_lsp<R: LspCommand>(
4150 &self,
4151 buffer_handle: ModelHandle<Buffer>,
4152 request: R,
4153 cx: &mut ModelContext<Self>,
4154 ) -> Task<Result<R::Response>>
4155 where
4156 <R::LspRequest as lsp::request::Request>::Result: Send,
4157 {
4158 let buffer = buffer_handle.read(cx);
4159 if self.is_local() {
4160 let file = File::from_dyn(buffer.file()).and_then(File::as_local);
4161 if let Some((file, language_server)) = file.zip(
4162 self.language_server_for_buffer(buffer, cx)
4163 .map(|(_, server)| server.clone()),
4164 ) {
4165 let lsp_params = request.to_lsp(&file.abs_path(cx), cx);
4166 return cx.spawn(|this, cx| async move {
4167 if !request.check_capabilities(language_server.capabilities()) {
4168 return Ok(Default::default());
4169 }
4170
4171 let response = language_server
4172 .request::<R::LspRequest>(lsp_params)
4173 .await
4174 .context("lsp request failed")?;
4175 request
4176 .response_from_lsp(response, this, buffer_handle, cx)
4177 .await
4178 });
4179 }
4180 } else if let Some(project_id) = self.remote_id() {
4181 let rpc = self.client.clone();
4182 let message = request.to_proto(project_id, buffer);
4183 return cx.spawn(|this, cx| async move {
4184 let response = rpc.request(message).await?;
4185 if this.read_with(&cx, |this, _| this.is_read_only()) {
4186 Err(anyhow!("disconnected before completing request"))
4187 } else {
4188 request
4189 .response_from_proto(response, this, buffer_handle, cx)
4190 .await
4191 }
4192 });
4193 }
4194 Task::ready(Ok(Default::default()))
4195 }
4196
4197 pub fn find_or_create_local_worktree(
4198 &mut self,
4199 abs_path: impl AsRef<Path>,
4200 visible: bool,
4201 cx: &mut ModelContext<Self>,
4202 ) -> Task<Result<(ModelHandle<Worktree>, PathBuf)>> {
4203 let abs_path = abs_path.as_ref();
4204 if let Some((tree, relative_path)) = self.find_local_worktree(abs_path, cx) {
4205 Task::ready(Ok((tree, relative_path)))
4206 } else {
4207 let worktree = self.create_local_worktree(abs_path, visible, cx);
4208 cx.foreground()
4209 .spawn(async move { Ok((worktree.await?, PathBuf::new())) })
4210 }
4211 }
4212
4213 pub fn find_local_worktree(
4214 &self,
4215 abs_path: &Path,
4216 cx: &AppContext,
4217 ) -> Option<(ModelHandle<Worktree>, PathBuf)> {
4218 for tree in &self.worktrees {
4219 if let Some(tree) = tree.upgrade(cx) {
4220 if let Some(relative_path) = tree
4221 .read(cx)
4222 .as_local()
4223 .and_then(|t| abs_path.strip_prefix(t.abs_path()).ok())
4224 {
4225 return Some((tree.clone(), relative_path.into()));
4226 }
4227 }
4228 }
4229 None
4230 }
4231
4232 pub fn is_shared(&self) -> bool {
4233 match &self.client_state {
4234 Some(ProjectClientState::Local { .. }) => true,
4235 _ => false,
4236 }
4237 }
4238
4239 fn create_local_worktree(
4240 &mut self,
4241 abs_path: impl AsRef<Path>,
4242 visible: bool,
4243 cx: &mut ModelContext<Self>,
4244 ) -> Task<Result<ModelHandle<Worktree>>> {
4245 let fs = self.fs.clone();
4246 let client = self.client.clone();
4247 let next_entry_id = self.next_entry_id.clone();
4248 let path: Arc<Path> = abs_path.as_ref().into();
4249 let task = self
4250 .loading_local_worktrees
4251 .entry(path.clone())
4252 .or_insert_with(|| {
4253 cx.spawn(|project, mut cx| {
4254 async move {
4255 let worktree = Worktree::local(
4256 client.clone(),
4257 path.clone(),
4258 visible,
4259 fs,
4260 next_entry_id,
4261 &mut cx,
4262 )
4263 .await;
4264 project.update(&mut cx, |project, _| {
4265 project.loading_local_worktrees.remove(&path);
4266 });
4267 let worktree = worktree?;
4268
4269 project
4270 .update(&mut cx, |project, cx| project.add_worktree(&worktree, cx))
4271 .await;
4272
4273 if let Some(project_id) =
4274 project.read_with(&cx, |project, _| project.remote_id())
4275 {
4276 worktree
4277 .update(&mut cx, |worktree, cx| {
4278 worktree.as_local_mut().unwrap().share(project_id, cx)
4279 })
4280 .await
4281 .log_err();
4282 }
4283
4284 Ok(worktree)
4285 }
4286 .map_err(Arc::new)
4287 })
4288 .shared()
4289 })
4290 .clone();
4291 cx.foreground().spawn(async move {
4292 match task.await {
4293 Ok(worktree) => Ok(worktree),
4294 Err(err) => Err(anyhow!("{}", err)),
4295 }
4296 })
4297 }
4298
4299 pub fn remove_worktree(
4300 &mut self,
4301 id_to_remove: WorktreeId,
4302 cx: &mut ModelContext<Self>,
4303 ) -> impl Future<Output = ()> {
4304 self.worktrees.retain(|worktree| {
4305 if let Some(worktree) = worktree.upgrade(cx) {
4306 let id = worktree.read(cx).id();
4307 if id == id_to_remove {
4308 cx.emit(Event::WorktreeRemoved(id));
4309 false
4310 } else {
4311 true
4312 }
4313 } else {
4314 false
4315 }
4316 });
4317 self.metadata_changed(cx)
4318 }
4319
4320 fn add_worktree(
4321 &mut self,
4322 worktree: &ModelHandle<Worktree>,
4323 cx: &mut ModelContext<Self>,
4324 ) -> impl Future<Output = ()> {
4325 cx.observe(worktree, |_, _, cx| cx.notify()).detach();
4326 if worktree.read(cx).is_local() {
4327 cx.subscribe(worktree, |this, worktree, event, cx| match event {
4328 worktree::Event::UpdatedEntries => this.update_local_worktree_buffers(worktree, cx),
4329 worktree::Event::UpdatedGitRepositories(updated_repos) => {
4330 this.update_local_worktree_buffers_git_repos(worktree, updated_repos, cx)
4331 }
4332 })
4333 .detach();
4334 }
4335
4336 let push_strong_handle = {
4337 let worktree = worktree.read(cx);
4338 self.is_shared() || worktree.is_visible() || worktree.is_remote()
4339 };
4340 if push_strong_handle {
4341 self.worktrees
4342 .push(WorktreeHandle::Strong(worktree.clone()));
4343 } else {
4344 self.worktrees
4345 .push(WorktreeHandle::Weak(worktree.downgrade()));
4346 }
4347
4348 cx.observe_release(worktree, |this, worktree, cx| {
4349 let _ = this.remove_worktree(worktree.id(), cx);
4350 })
4351 .detach();
4352
4353 cx.emit(Event::WorktreeAdded);
4354 self.metadata_changed(cx)
4355 }
4356
4357 fn update_local_worktree_buffers(
4358 &mut self,
4359 worktree_handle: ModelHandle<Worktree>,
4360 cx: &mut ModelContext<Self>,
4361 ) {
4362 let snapshot = worktree_handle.read(cx).snapshot();
4363 let mut buffers_to_delete = Vec::new();
4364 let mut renamed_buffers = Vec::new();
4365 for (buffer_id, buffer) in &self.opened_buffers {
4366 if let Some(buffer) = buffer.upgrade(cx) {
4367 buffer.update(cx, |buffer, cx| {
4368 if let Some(old_file) = File::from_dyn(buffer.file()) {
4369 if old_file.worktree != worktree_handle {
4370 return;
4371 }
4372
4373 let new_file = if let Some(entry) = snapshot.entry_for_id(old_file.entry_id)
4374 {
4375 File {
4376 is_local: true,
4377 entry_id: entry.id,
4378 mtime: entry.mtime,
4379 path: entry.path.clone(),
4380 worktree: worktree_handle.clone(),
4381 is_deleted: false,
4382 }
4383 } else if let Some(entry) =
4384 snapshot.entry_for_path(old_file.path().as_ref())
4385 {
4386 File {
4387 is_local: true,
4388 entry_id: entry.id,
4389 mtime: entry.mtime,
4390 path: entry.path.clone(),
4391 worktree: worktree_handle.clone(),
4392 is_deleted: false,
4393 }
4394 } else {
4395 File {
4396 is_local: true,
4397 entry_id: old_file.entry_id,
4398 path: old_file.path().clone(),
4399 mtime: old_file.mtime(),
4400 worktree: worktree_handle.clone(),
4401 is_deleted: true,
4402 }
4403 };
4404
4405 let old_path = old_file.abs_path(cx);
4406 if new_file.abs_path(cx) != old_path {
4407 renamed_buffers.push((cx.handle(), old_path));
4408 }
4409
4410 if let Some(project_id) = self.remote_id() {
4411 self.client
4412 .send(proto::UpdateBufferFile {
4413 project_id,
4414 buffer_id: *buffer_id as u64,
4415 file: Some(new_file.to_proto()),
4416 })
4417 .log_err();
4418 }
4419 buffer.file_updated(Arc::new(new_file), cx).detach();
4420 }
4421 });
4422 } else {
4423 buffers_to_delete.push(*buffer_id);
4424 }
4425 }
4426
4427 for buffer_id in buffers_to_delete {
4428 self.opened_buffers.remove(&buffer_id);
4429 }
4430
4431 for (buffer, old_path) in renamed_buffers {
4432 self.unregister_buffer_from_language_server(&buffer, old_path, cx);
4433 self.assign_language_to_buffer(&buffer, cx);
4434 self.register_buffer_with_language_server(&buffer, cx);
4435 }
4436 }
4437
4438 fn update_local_worktree_buffers_git_repos(
4439 &mut self,
4440 worktree: ModelHandle<Worktree>,
4441 repos: &[GitRepositoryEntry],
4442 cx: &mut ModelContext<Self>,
4443 ) {
4444 for (_, buffer) in &self.opened_buffers {
4445 if let Some(buffer) = buffer.upgrade(cx) {
4446 let file = match File::from_dyn(buffer.read(cx).file()) {
4447 Some(file) => file,
4448 None => continue,
4449 };
4450 if file.worktree != worktree {
4451 continue;
4452 }
4453
4454 let path = file.path().clone();
4455
4456 let repo = match repos.iter().find(|repo| repo.manages(&path)) {
4457 Some(repo) => repo.clone(),
4458 None => return,
4459 };
4460
4461 let relative_repo = match path.strip_prefix(repo.content_path) {
4462 Ok(relative_repo) => relative_repo.to_owned(),
4463 Err(_) => return,
4464 };
4465
4466 let remote_id = self.remote_id();
4467 let client = self.client.clone();
4468
4469 cx.spawn(|_, mut cx| async move {
4470 let diff_base = cx
4471 .background()
4472 .spawn(async move { repo.repo.lock().load_index_text(&relative_repo) })
4473 .await;
4474
4475 let buffer_id = buffer.update(&mut cx, |buffer, cx| {
4476 buffer.set_diff_base(diff_base.clone(), cx);
4477 buffer.remote_id()
4478 });
4479
4480 if let Some(project_id) = remote_id {
4481 client
4482 .send(proto::UpdateDiffBase {
4483 project_id,
4484 buffer_id: buffer_id as u64,
4485 diff_base,
4486 })
4487 .log_err();
4488 }
4489 })
4490 .detach();
4491 }
4492 }
4493 }
4494
4495 pub fn set_active_path(&mut self, entry: Option<ProjectPath>, cx: &mut ModelContext<Self>) {
4496 let new_active_entry = entry.and_then(|project_path| {
4497 let worktree = self.worktree_for_id(project_path.worktree_id, cx)?;
4498 let entry = worktree.read(cx).entry_for_path(project_path.path)?;
4499 Some(entry.id)
4500 });
4501 if new_active_entry != self.active_entry {
4502 self.active_entry = new_active_entry;
4503 cx.emit(Event::ActiveEntryChanged(new_active_entry));
4504 }
4505 }
4506
4507 pub fn language_servers_running_disk_based_diagnostics(
4508 &self,
4509 ) -> impl Iterator<Item = usize> + '_ {
4510 self.language_server_statuses
4511 .iter()
4512 .filter_map(|(id, status)| {
4513 if status.has_pending_diagnostic_updates {
4514 Some(*id)
4515 } else {
4516 None
4517 }
4518 })
4519 }
4520
4521 pub fn diagnostic_summary(&self, cx: &AppContext) -> DiagnosticSummary {
4522 let mut summary = DiagnosticSummary::default();
4523 for (_, path_summary) in self.diagnostic_summaries(cx) {
4524 summary.error_count += path_summary.error_count;
4525 summary.warning_count += path_summary.warning_count;
4526 }
4527 summary
4528 }
4529
4530 pub fn diagnostic_summaries<'a>(
4531 &'a self,
4532 cx: &'a AppContext,
4533 ) -> impl Iterator<Item = (ProjectPath, DiagnosticSummary)> + 'a {
4534 self.visible_worktrees(cx).flat_map(move |worktree| {
4535 let worktree = worktree.read(cx);
4536 let worktree_id = worktree.id();
4537 worktree
4538 .diagnostic_summaries()
4539 .map(move |(path, summary)| (ProjectPath { worktree_id, path }, summary))
4540 })
4541 }
4542
4543 pub fn disk_based_diagnostics_started(
4544 &mut self,
4545 language_server_id: usize,
4546 cx: &mut ModelContext<Self>,
4547 ) {
4548 cx.emit(Event::DiskBasedDiagnosticsStarted { language_server_id });
4549 }
4550
4551 pub fn disk_based_diagnostics_finished(
4552 &mut self,
4553 language_server_id: usize,
4554 cx: &mut ModelContext<Self>,
4555 ) {
4556 cx.emit(Event::DiskBasedDiagnosticsFinished { language_server_id });
4557 }
4558
4559 pub fn active_entry(&self) -> Option<ProjectEntryId> {
4560 self.active_entry
4561 }
4562
4563 pub fn entry_for_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Entry> {
4564 self.worktree_for_id(path.worktree_id, cx)?
4565 .read(cx)
4566 .entry_for_path(&path.path)
4567 .cloned()
4568 }
4569
4570 pub fn path_for_entry(&self, entry_id: ProjectEntryId, cx: &AppContext) -> Option<ProjectPath> {
4571 let worktree = self.worktree_for_entry(entry_id, cx)?;
4572 let worktree = worktree.read(cx);
4573 let worktree_id = worktree.id();
4574 let path = worktree.entry_for_id(entry_id)?.path.clone();
4575 Some(ProjectPath { worktree_id, path })
4576 }
4577
4578 // RPC message handlers
4579
4580 async fn handle_unshare_project(
4581 this: ModelHandle<Self>,
4582 _: TypedEnvelope<proto::UnshareProject>,
4583 _: Arc<Client>,
4584 mut cx: AsyncAppContext,
4585 ) -> Result<()> {
4586 this.update(&mut cx, |this, cx| {
4587 if this.is_local() {
4588 this.unshare(cx)?;
4589 } else {
4590 this.disconnected_from_host(cx);
4591 }
4592 Ok(())
4593 })
4594 }
4595
4596 async fn handle_add_collaborator(
4597 this: ModelHandle<Self>,
4598 mut envelope: TypedEnvelope<proto::AddProjectCollaborator>,
4599 _: Arc<Client>,
4600 mut cx: AsyncAppContext,
4601 ) -> Result<()> {
4602 let collaborator = envelope
4603 .payload
4604 .collaborator
4605 .take()
4606 .ok_or_else(|| anyhow!("empty collaborator"))?;
4607
4608 let collaborator = Collaborator::from_proto(collaborator)?;
4609 this.update(&mut cx, |this, cx| {
4610 this.collaborators
4611 .insert(collaborator.peer_id, collaborator);
4612 cx.notify();
4613 });
4614
4615 Ok(())
4616 }
4617
4618 async fn handle_remove_collaborator(
4619 this: ModelHandle<Self>,
4620 envelope: TypedEnvelope<proto::RemoveProjectCollaborator>,
4621 _: Arc<Client>,
4622 mut cx: AsyncAppContext,
4623 ) -> Result<()> {
4624 this.update(&mut cx, |this, cx| {
4625 let peer_id = envelope
4626 .payload
4627 .peer_id
4628 .ok_or_else(|| anyhow!("invalid peer id"))?;
4629 let replica_id = this
4630 .collaborators
4631 .remove(&peer_id)
4632 .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?
4633 .replica_id;
4634 for buffer in this.opened_buffers.values() {
4635 if let Some(buffer) = buffer.upgrade(cx) {
4636 buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
4637 }
4638 }
4639 this.shared_buffers.remove(&peer_id);
4640
4641 cx.emit(Event::CollaboratorLeft(peer_id));
4642 cx.notify();
4643 Ok(())
4644 })
4645 }
4646
4647 async fn handle_update_project(
4648 this: ModelHandle<Self>,
4649 envelope: TypedEnvelope<proto::UpdateProject>,
4650 client: Arc<Client>,
4651 mut cx: AsyncAppContext,
4652 ) -> Result<()> {
4653 this.update(&mut cx, |this, cx| {
4654 let replica_id = this.replica_id();
4655 let remote_id = this.remote_id().ok_or_else(|| anyhow!("invalid project"))?;
4656
4657 let mut old_worktrees_by_id = this
4658 .worktrees
4659 .drain(..)
4660 .filter_map(|worktree| {
4661 let worktree = worktree.upgrade(cx)?;
4662 Some((worktree.read(cx).id(), worktree))
4663 })
4664 .collect::<HashMap<_, _>>();
4665
4666 for worktree in envelope.payload.worktrees {
4667 if let Some(old_worktree) =
4668 old_worktrees_by_id.remove(&WorktreeId::from_proto(worktree.id))
4669 {
4670 this.worktrees.push(WorktreeHandle::Strong(old_worktree));
4671 } else {
4672 let worktree =
4673 Worktree::remote(remote_id, replica_id, worktree, client.clone(), cx);
4674 let _ = this.add_worktree(&worktree, cx);
4675 }
4676 }
4677
4678 let _ = this.metadata_changed(cx);
4679 for (id, _) in old_worktrees_by_id {
4680 cx.emit(Event::WorktreeRemoved(id));
4681 }
4682
4683 Ok(())
4684 })
4685 }
4686
4687 async fn handle_update_worktree(
4688 this: ModelHandle<Self>,
4689 envelope: TypedEnvelope<proto::UpdateWorktree>,
4690 _: Arc<Client>,
4691 mut cx: AsyncAppContext,
4692 ) -> Result<()> {
4693 this.update(&mut cx, |this, cx| {
4694 let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
4695 if let Some(worktree) = this.worktree_for_id(worktree_id, cx) {
4696 worktree.update(cx, |worktree, _| {
4697 let worktree = worktree.as_remote_mut().unwrap();
4698 worktree.update_from_remote(envelope.payload);
4699 });
4700 }
4701 Ok(())
4702 })
4703 }
4704
4705 async fn handle_create_project_entry(
4706 this: ModelHandle<Self>,
4707 envelope: TypedEnvelope<proto::CreateProjectEntry>,
4708 _: Arc<Client>,
4709 mut cx: AsyncAppContext,
4710 ) -> Result<proto::ProjectEntryResponse> {
4711 let worktree = this.update(&mut cx, |this, cx| {
4712 let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
4713 this.worktree_for_id(worktree_id, cx)
4714 .ok_or_else(|| anyhow!("worktree not found"))
4715 })?;
4716 let worktree_scan_id = worktree.read_with(&cx, |worktree, _| worktree.scan_id());
4717 let entry = worktree
4718 .update(&mut cx, |worktree, cx| {
4719 let worktree = worktree.as_local_mut().unwrap();
4720 let path = PathBuf::from(envelope.payload.path);
4721 worktree.create_entry(path, envelope.payload.is_directory, cx)
4722 })
4723 .await?;
4724 Ok(proto::ProjectEntryResponse {
4725 entry: Some((&entry).into()),
4726 worktree_scan_id: worktree_scan_id as u64,
4727 })
4728 }
4729
4730 async fn handle_rename_project_entry(
4731 this: ModelHandle<Self>,
4732 envelope: TypedEnvelope<proto::RenameProjectEntry>,
4733 _: Arc<Client>,
4734 mut cx: AsyncAppContext,
4735 ) -> Result<proto::ProjectEntryResponse> {
4736 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
4737 let worktree = this.read_with(&cx, |this, cx| {
4738 this.worktree_for_entry(entry_id, cx)
4739 .ok_or_else(|| anyhow!("worktree not found"))
4740 })?;
4741 let worktree_scan_id = worktree.read_with(&cx, |worktree, _| worktree.scan_id());
4742 let entry = worktree
4743 .update(&mut cx, |worktree, cx| {
4744 let new_path = PathBuf::from(envelope.payload.new_path);
4745 worktree
4746 .as_local_mut()
4747 .unwrap()
4748 .rename_entry(entry_id, new_path, cx)
4749 .ok_or_else(|| anyhow!("invalid entry"))
4750 })?
4751 .await?;
4752 Ok(proto::ProjectEntryResponse {
4753 entry: Some((&entry).into()),
4754 worktree_scan_id: worktree_scan_id as u64,
4755 })
4756 }
4757
4758 async fn handle_copy_project_entry(
4759 this: ModelHandle<Self>,
4760 envelope: TypedEnvelope<proto::CopyProjectEntry>,
4761 _: Arc<Client>,
4762 mut cx: AsyncAppContext,
4763 ) -> Result<proto::ProjectEntryResponse> {
4764 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
4765 let worktree = this.read_with(&cx, |this, cx| {
4766 this.worktree_for_entry(entry_id, cx)
4767 .ok_or_else(|| anyhow!("worktree not found"))
4768 })?;
4769 let worktree_scan_id = worktree.read_with(&cx, |worktree, _| worktree.scan_id());
4770 let entry = worktree
4771 .update(&mut cx, |worktree, cx| {
4772 let new_path = PathBuf::from(envelope.payload.new_path);
4773 worktree
4774 .as_local_mut()
4775 .unwrap()
4776 .copy_entry(entry_id, new_path, cx)
4777 .ok_or_else(|| anyhow!("invalid entry"))
4778 })?
4779 .await?;
4780 Ok(proto::ProjectEntryResponse {
4781 entry: Some((&entry).into()),
4782 worktree_scan_id: worktree_scan_id as u64,
4783 })
4784 }
4785
4786 async fn handle_delete_project_entry(
4787 this: ModelHandle<Self>,
4788 envelope: TypedEnvelope<proto::DeleteProjectEntry>,
4789 _: Arc<Client>,
4790 mut cx: AsyncAppContext,
4791 ) -> Result<proto::ProjectEntryResponse> {
4792 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
4793 let worktree = this.read_with(&cx, |this, cx| {
4794 this.worktree_for_entry(entry_id, cx)
4795 .ok_or_else(|| anyhow!("worktree not found"))
4796 })?;
4797 let worktree_scan_id = worktree.read_with(&cx, |worktree, _| worktree.scan_id());
4798 worktree
4799 .update(&mut cx, |worktree, cx| {
4800 worktree
4801 .as_local_mut()
4802 .unwrap()
4803 .delete_entry(entry_id, cx)
4804 .ok_or_else(|| anyhow!("invalid entry"))
4805 })?
4806 .await?;
4807 Ok(proto::ProjectEntryResponse {
4808 entry: None,
4809 worktree_scan_id: worktree_scan_id as u64,
4810 })
4811 }
4812
4813 async fn handle_update_diagnostic_summary(
4814 this: ModelHandle<Self>,
4815 envelope: TypedEnvelope<proto::UpdateDiagnosticSummary>,
4816 _: Arc<Client>,
4817 mut cx: AsyncAppContext,
4818 ) -> Result<()> {
4819 this.update(&mut cx, |this, cx| {
4820 let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
4821 if let Some(worktree) = this.worktree_for_id(worktree_id, cx) {
4822 if let Some(summary) = envelope.payload.summary {
4823 let project_path = ProjectPath {
4824 worktree_id,
4825 path: Path::new(&summary.path).into(),
4826 };
4827 worktree.update(cx, |worktree, _| {
4828 worktree
4829 .as_remote_mut()
4830 .unwrap()
4831 .update_diagnostic_summary(project_path.path.clone(), &summary);
4832 });
4833 cx.emit(Event::DiagnosticsUpdated {
4834 language_server_id: summary.language_server_id as usize,
4835 path: project_path,
4836 });
4837 }
4838 }
4839 Ok(())
4840 })
4841 }
4842
4843 async fn handle_start_language_server(
4844 this: ModelHandle<Self>,
4845 envelope: TypedEnvelope<proto::StartLanguageServer>,
4846 _: Arc<Client>,
4847 mut cx: AsyncAppContext,
4848 ) -> Result<()> {
4849 let server = envelope
4850 .payload
4851 .server
4852 .ok_or_else(|| anyhow!("invalid server"))?;
4853 this.update(&mut cx, |this, cx| {
4854 this.language_server_statuses.insert(
4855 server.id as usize,
4856 LanguageServerStatus {
4857 name: server.name,
4858 pending_work: Default::default(),
4859 has_pending_diagnostic_updates: false,
4860 progress_tokens: Default::default(),
4861 },
4862 );
4863 cx.notify();
4864 });
4865 Ok(())
4866 }
4867
4868 async fn handle_update_language_server(
4869 this: ModelHandle<Self>,
4870 envelope: TypedEnvelope<proto::UpdateLanguageServer>,
4871 _: Arc<Client>,
4872 mut cx: AsyncAppContext,
4873 ) -> Result<()> {
4874 let language_server_id = envelope.payload.language_server_id as usize;
4875 match envelope
4876 .payload
4877 .variant
4878 .ok_or_else(|| anyhow!("invalid variant"))?
4879 {
4880 proto::update_language_server::Variant::WorkStart(payload) => {
4881 this.update(&mut cx, |this, cx| {
4882 this.on_lsp_work_start(
4883 language_server_id,
4884 payload.token,
4885 LanguageServerProgress {
4886 message: payload.message,
4887 percentage: payload.percentage.map(|p| p as usize),
4888 last_update_at: Instant::now(),
4889 },
4890 cx,
4891 );
4892 })
4893 }
4894 proto::update_language_server::Variant::WorkProgress(payload) => {
4895 this.update(&mut cx, |this, cx| {
4896 this.on_lsp_work_progress(
4897 language_server_id,
4898 payload.token,
4899 LanguageServerProgress {
4900 message: payload.message,
4901 percentage: payload.percentage.map(|p| p as usize),
4902 last_update_at: Instant::now(),
4903 },
4904 cx,
4905 );
4906 })
4907 }
4908 proto::update_language_server::Variant::WorkEnd(payload) => {
4909 this.update(&mut cx, |this, cx| {
4910 this.on_lsp_work_end(language_server_id, payload.token, cx);
4911 })
4912 }
4913 proto::update_language_server::Variant::DiskBasedDiagnosticsUpdating(_) => {
4914 this.update(&mut cx, |this, cx| {
4915 this.disk_based_diagnostics_started(language_server_id, cx);
4916 })
4917 }
4918 proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(_) => {
4919 this.update(&mut cx, |this, cx| {
4920 this.disk_based_diagnostics_finished(language_server_id, cx)
4921 });
4922 }
4923 }
4924
4925 Ok(())
4926 }
4927
4928 async fn handle_update_buffer(
4929 this: ModelHandle<Self>,
4930 envelope: TypedEnvelope<proto::UpdateBuffer>,
4931 _: Arc<Client>,
4932 mut cx: AsyncAppContext,
4933 ) -> Result<()> {
4934 this.update(&mut cx, |this, cx| {
4935 let payload = envelope.payload.clone();
4936 let buffer_id = payload.buffer_id;
4937 let ops = payload
4938 .operations
4939 .into_iter()
4940 .map(language::proto::deserialize_operation)
4941 .collect::<Result<Vec<_>, _>>()?;
4942 let is_remote = this.is_remote();
4943 match this.opened_buffers.entry(buffer_id) {
4944 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
4945 OpenBuffer::Strong(buffer) => {
4946 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
4947 }
4948 OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
4949 OpenBuffer::Weak(_) => {}
4950 },
4951 hash_map::Entry::Vacant(e) => {
4952 assert!(
4953 is_remote,
4954 "received buffer update from {:?}",
4955 envelope.original_sender_id
4956 );
4957 e.insert(OpenBuffer::Operations(ops));
4958 }
4959 }
4960 Ok(())
4961 })
4962 }
4963
4964 async fn handle_create_buffer_for_peer(
4965 this: ModelHandle<Self>,
4966 envelope: TypedEnvelope<proto::CreateBufferForPeer>,
4967 _: Arc<Client>,
4968 mut cx: AsyncAppContext,
4969 ) -> Result<()> {
4970 this.update(&mut cx, |this, cx| {
4971 match envelope
4972 .payload
4973 .variant
4974 .ok_or_else(|| anyhow!("missing variant"))?
4975 {
4976 proto::create_buffer_for_peer::Variant::State(mut state) => {
4977 let mut buffer_file = None;
4978 if let Some(file) = state.file.take() {
4979 let worktree_id = WorktreeId::from_proto(file.worktree_id);
4980 let worktree = this.worktree_for_id(worktree_id, cx).ok_or_else(|| {
4981 anyhow!("no worktree found for id {}", file.worktree_id)
4982 })?;
4983 buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
4984 as Arc<dyn language::File>);
4985 }
4986
4987 let buffer_id = state.id;
4988 let buffer = cx.add_model(|_| {
4989 Buffer::from_proto(this.replica_id(), state, buffer_file).unwrap()
4990 });
4991 this.incomplete_buffers.insert(buffer_id, buffer);
4992 }
4993 proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
4994 let buffer = this
4995 .incomplete_buffers
4996 .get(&chunk.buffer_id)
4997 .ok_or_else(|| {
4998 anyhow!(
4999 "received chunk for buffer {} without initial state",
5000 chunk.buffer_id
5001 )
5002 })?
5003 .clone();
5004 let operations = chunk
5005 .operations
5006 .into_iter()
5007 .map(language::proto::deserialize_operation)
5008 .collect::<Result<Vec<_>>>()?;
5009 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))?;
5010
5011 if chunk.is_last {
5012 this.incomplete_buffers.remove(&chunk.buffer_id);
5013 this.register_buffer(&buffer, cx)?;
5014 }
5015 }
5016 }
5017
5018 Ok(())
5019 })
5020 }
5021
5022 async fn handle_update_diff_base(
5023 this: ModelHandle<Self>,
5024 envelope: TypedEnvelope<proto::UpdateDiffBase>,
5025 _: Arc<Client>,
5026 mut cx: AsyncAppContext,
5027 ) -> Result<()> {
5028 this.update(&mut cx, |this, cx| {
5029 let buffer_id = envelope.payload.buffer_id;
5030 let diff_base = envelope.payload.diff_base;
5031 let buffer = this
5032 .opened_buffers
5033 .get_mut(&buffer_id)
5034 .and_then(|b| b.upgrade(cx))
5035 .ok_or_else(|| anyhow!("No such buffer {}", buffer_id))?;
5036
5037 buffer.update(cx, |buffer, cx| buffer.set_diff_base(diff_base, cx));
5038
5039 Ok(())
5040 })
5041 }
5042
5043 async fn handle_update_buffer_file(
5044 this: ModelHandle<Self>,
5045 envelope: TypedEnvelope<proto::UpdateBufferFile>,
5046 _: Arc<Client>,
5047 mut cx: AsyncAppContext,
5048 ) -> Result<()> {
5049 this.update(&mut cx, |this, cx| {
5050 let payload = envelope.payload.clone();
5051 let buffer_id = payload.buffer_id;
5052 let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
5053 let worktree = this
5054 .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
5055 .ok_or_else(|| anyhow!("no such worktree"))?;
5056 let file = File::from_proto(file, worktree, cx)?;
5057 let buffer = this
5058 .opened_buffers
5059 .get_mut(&buffer_id)
5060 .and_then(|b| b.upgrade(cx))
5061 .ok_or_else(|| anyhow!("no such buffer"))?;
5062 buffer.update(cx, |buffer, cx| {
5063 buffer.file_updated(Arc::new(file), cx).detach();
5064 });
5065 this.assign_language_to_buffer(&buffer, cx);
5066 Ok(())
5067 })
5068 }
5069
5070 async fn handle_save_buffer(
5071 this: ModelHandle<Self>,
5072 envelope: TypedEnvelope<proto::SaveBuffer>,
5073 _: Arc<Client>,
5074 mut cx: AsyncAppContext,
5075 ) -> Result<proto::BufferSaved> {
5076 let buffer_id = envelope.payload.buffer_id;
5077 let requested_version = deserialize_version(envelope.payload.version);
5078
5079 let (project_id, buffer) = this.update(&mut cx, |this, cx| {
5080 let project_id = this.remote_id().ok_or_else(|| anyhow!("not connected"))?;
5081 let buffer = this
5082 .opened_buffers
5083 .get(&buffer_id)
5084 .and_then(|buffer| buffer.upgrade(cx))
5085 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?;
5086 Ok::<_, anyhow::Error>((project_id, buffer))
5087 })?;
5088 buffer
5089 .update(&mut cx, |buffer, _| {
5090 buffer.wait_for_version(requested_version)
5091 })
5092 .await;
5093
5094 let (saved_version, fingerprint, mtime) =
5095 buffer.update(&mut cx, |buffer, cx| buffer.save(cx)).await?;
5096 Ok(proto::BufferSaved {
5097 project_id,
5098 buffer_id,
5099 version: serialize_version(&saved_version),
5100 mtime: Some(mtime.into()),
5101 fingerprint,
5102 })
5103 }
5104
5105 async fn handle_reload_buffers(
5106 this: ModelHandle<Self>,
5107 envelope: TypedEnvelope<proto::ReloadBuffers>,
5108 _: Arc<Client>,
5109 mut cx: AsyncAppContext,
5110 ) -> Result<proto::ReloadBuffersResponse> {
5111 let sender_id = envelope.original_sender_id()?;
5112 let reload = this.update(&mut cx, |this, cx| {
5113 let mut buffers = HashSet::default();
5114 for buffer_id in &envelope.payload.buffer_ids {
5115 buffers.insert(
5116 this.opened_buffers
5117 .get(buffer_id)
5118 .and_then(|buffer| buffer.upgrade(cx))
5119 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?,
5120 );
5121 }
5122 Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
5123 })?;
5124
5125 let project_transaction = reload.await?;
5126 let project_transaction = this.update(&mut cx, |this, cx| {
5127 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
5128 });
5129 Ok(proto::ReloadBuffersResponse {
5130 transaction: Some(project_transaction),
5131 })
5132 }
5133
5134 async fn handle_format_buffers(
5135 this: ModelHandle<Self>,
5136 envelope: TypedEnvelope<proto::FormatBuffers>,
5137 _: Arc<Client>,
5138 mut cx: AsyncAppContext,
5139 ) -> Result<proto::FormatBuffersResponse> {
5140 let sender_id = envelope.original_sender_id()?;
5141 let format = this.update(&mut cx, |this, cx| {
5142 let mut buffers = HashSet::default();
5143 for buffer_id in &envelope.payload.buffer_ids {
5144 buffers.insert(
5145 this.opened_buffers
5146 .get(buffer_id)
5147 .and_then(|buffer| buffer.upgrade(cx))
5148 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?,
5149 );
5150 }
5151 let trigger = FormatTrigger::from_proto(envelope.payload.trigger);
5152 Ok::<_, anyhow::Error>(this.format(buffers, false, trigger, cx))
5153 })?;
5154
5155 let project_transaction = format.await?;
5156 let project_transaction = this.update(&mut cx, |this, cx| {
5157 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
5158 });
5159 Ok(proto::FormatBuffersResponse {
5160 transaction: Some(project_transaction),
5161 })
5162 }
5163
5164 async fn handle_get_completions(
5165 this: ModelHandle<Self>,
5166 envelope: TypedEnvelope<proto::GetCompletions>,
5167 _: Arc<Client>,
5168 mut cx: AsyncAppContext,
5169 ) -> Result<proto::GetCompletionsResponse> {
5170 let buffer = this.read_with(&cx, |this, cx| {
5171 this.opened_buffers
5172 .get(&envelope.payload.buffer_id)
5173 .and_then(|buffer| buffer.upgrade(cx))
5174 .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))
5175 })?;
5176
5177 let position = envelope
5178 .payload
5179 .position
5180 .and_then(language::proto::deserialize_anchor)
5181 .map(|p| {
5182 buffer.read_with(&cx, |buffer, _| {
5183 buffer.clip_point_utf16(Unclipped(p.to_point_utf16(buffer)), Bias::Left)
5184 })
5185 })
5186 .ok_or_else(|| anyhow!("invalid position"))?;
5187
5188 let version = deserialize_version(envelope.payload.version);
5189 buffer
5190 .update(&mut cx, |buffer, _| buffer.wait_for_version(version))
5191 .await;
5192 let version = buffer.read_with(&cx, |buffer, _| buffer.version());
5193
5194 let completions = this
5195 .update(&mut cx, |this, cx| this.completions(&buffer, position, cx))
5196 .await?;
5197
5198 Ok(proto::GetCompletionsResponse {
5199 completions: completions
5200 .iter()
5201 .map(language::proto::serialize_completion)
5202 .collect(),
5203 version: serialize_version(&version),
5204 })
5205 }
5206
5207 async fn handle_apply_additional_edits_for_completion(
5208 this: ModelHandle<Self>,
5209 envelope: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
5210 _: Arc<Client>,
5211 mut cx: AsyncAppContext,
5212 ) -> Result<proto::ApplyCompletionAdditionalEditsResponse> {
5213 let (buffer, completion) = this.update(&mut cx, |this, cx| {
5214 let buffer = this
5215 .opened_buffers
5216 .get(&envelope.payload.buffer_id)
5217 .and_then(|buffer| buffer.upgrade(cx))
5218 .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?;
5219 let language = buffer.read(cx).language();
5220 let completion = language::proto::deserialize_completion(
5221 envelope
5222 .payload
5223 .completion
5224 .ok_or_else(|| anyhow!("invalid completion"))?,
5225 language.cloned(),
5226 );
5227 Ok::<_, anyhow::Error>((buffer, completion))
5228 })?;
5229
5230 let completion = completion.await?;
5231
5232 let apply_additional_edits = this.update(&mut cx, |this, cx| {
5233 this.apply_additional_edits_for_completion(buffer, completion, false, cx)
5234 });
5235
5236 Ok(proto::ApplyCompletionAdditionalEditsResponse {
5237 transaction: apply_additional_edits
5238 .await?
5239 .as_ref()
5240 .map(language::proto::serialize_transaction),
5241 })
5242 }
5243
5244 async fn handle_get_code_actions(
5245 this: ModelHandle<Self>,
5246 envelope: TypedEnvelope<proto::GetCodeActions>,
5247 _: Arc<Client>,
5248 mut cx: AsyncAppContext,
5249 ) -> Result<proto::GetCodeActionsResponse> {
5250 let start = envelope
5251 .payload
5252 .start
5253 .and_then(language::proto::deserialize_anchor)
5254 .ok_or_else(|| anyhow!("invalid start"))?;
5255 let end = envelope
5256 .payload
5257 .end
5258 .and_then(language::proto::deserialize_anchor)
5259 .ok_or_else(|| anyhow!("invalid end"))?;
5260 let buffer = this.update(&mut cx, |this, cx| {
5261 this.opened_buffers
5262 .get(&envelope.payload.buffer_id)
5263 .and_then(|buffer| buffer.upgrade(cx))
5264 .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))
5265 })?;
5266 buffer
5267 .update(&mut cx, |buffer, _| {
5268 buffer.wait_for_version(deserialize_version(envelope.payload.version))
5269 })
5270 .await;
5271
5272 let version = buffer.read_with(&cx, |buffer, _| buffer.version());
5273 let code_actions = this.update(&mut cx, |this, cx| {
5274 Ok::<_, anyhow::Error>(this.code_actions(&buffer, start..end, cx))
5275 })?;
5276
5277 Ok(proto::GetCodeActionsResponse {
5278 actions: code_actions
5279 .await?
5280 .iter()
5281 .map(language::proto::serialize_code_action)
5282 .collect(),
5283 version: serialize_version(&version),
5284 })
5285 }
5286
5287 async fn handle_apply_code_action(
5288 this: ModelHandle<Self>,
5289 envelope: TypedEnvelope<proto::ApplyCodeAction>,
5290 _: Arc<Client>,
5291 mut cx: AsyncAppContext,
5292 ) -> Result<proto::ApplyCodeActionResponse> {
5293 let sender_id = envelope.original_sender_id()?;
5294 let action = language::proto::deserialize_code_action(
5295 envelope
5296 .payload
5297 .action
5298 .ok_or_else(|| anyhow!("invalid action"))?,
5299 )?;
5300 let apply_code_action = this.update(&mut cx, |this, cx| {
5301 let buffer = this
5302 .opened_buffers
5303 .get(&envelope.payload.buffer_id)
5304 .and_then(|buffer| buffer.upgrade(cx))
5305 .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?;
5306 Ok::<_, anyhow::Error>(this.apply_code_action(buffer, action, false, cx))
5307 })?;
5308
5309 let project_transaction = apply_code_action.await?;
5310 let project_transaction = this.update(&mut cx, |this, cx| {
5311 this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
5312 });
5313 Ok(proto::ApplyCodeActionResponse {
5314 transaction: Some(project_transaction),
5315 })
5316 }
5317
5318 async fn handle_lsp_command<T: LspCommand>(
5319 this: ModelHandle<Self>,
5320 envelope: TypedEnvelope<T::ProtoRequest>,
5321 _: Arc<Client>,
5322 mut cx: AsyncAppContext,
5323 ) -> Result<<T::ProtoRequest as proto::RequestMessage>::Response>
5324 where
5325 <T::LspRequest as lsp::request::Request>::Result: Send,
5326 {
5327 let sender_id = envelope.original_sender_id()?;
5328 let buffer_id = T::buffer_id_from_proto(&envelope.payload);
5329 let buffer_handle = this.read_with(&cx, |this, _| {
5330 this.opened_buffers
5331 .get(&buffer_id)
5332 .and_then(|buffer| buffer.upgrade(&cx))
5333 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
5334 })?;
5335 let request = T::from_proto(
5336 envelope.payload,
5337 this.clone(),
5338 buffer_handle.clone(),
5339 cx.clone(),
5340 )
5341 .await?;
5342 let buffer_version = buffer_handle.read_with(&cx, |buffer, _| buffer.version());
5343 let response = this
5344 .update(&mut cx, |this, cx| {
5345 this.request_lsp(buffer_handle, request, cx)
5346 })
5347 .await?;
5348 this.update(&mut cx, |this, cx| {
5349 Ok(T::response_to_proto(
5350 response,
5351 this,
5352 sender_id,
5353 &buffer_version,
5354 cx,
5355 ))
5356 })
5357 }
5358
5359 async fn handle_get_project_symbols(
5360 this: ModelHandle<Self>,
5361 envelope: TypedEnvelope<proto::GetProjectSymbols>,
5362 _: Arc<Client>,
5363 mut cx: AsyncAppContext,
5364 ) -> Result<proto::GetProjectSymbolsResponse> {
5365 let symbols = this
5366 .update(&mut cx, |this, cx| {
5367 this.symbols(&envelope.payload.query, cx)
5368 })
5369 .await?;
5370
5371 Ok(proto::GetProjectSymbolsResponse {
5372 symbols: symbols.iter().map(serialize_symbol).collect(),
5373 })
5374 }
5375
5376 async fn handle_search_project(
5377 this: ModelHandle<Self>,
5378 envelope: TypedEnvelope<proto::SearchProject>,
5379 _: Arc<Client>,
5380 mut cx: AsyncAppContext,
5381 ) -> Result<proto::SearchProjectResponse> {
5382 let peer_id = envelope.original_sender_id()?;
5383 let query = SearchQuery::from_proto(envelope.payload)?;
5384 let result = this
5385 .update(&mut cx, |this, cx| this.search(query, cx))
5386 .await?;
5387
5388 this.update(&mut cx, |this, cx| {
5389 let mut locations = Vec::new();
5390 for (buffer, ranges) in result {
5391 for range in ranges {
5392 let start = serialize_anchor(&range.start);
5393 let end = serialize_anchor(&range.end);
5394 let buffer_id = this.create_buffer_for_peer(&buffer, peer_id, cx);
5395 locations.push(proto::Location {
5396 buffer_id,
5397 start: Some(start),
5398 end: Some(end),
5399 });
5400 }
5401 }
5402 Ok(proto::SearchProjectResponse { locations })
5403 })
5404 }
5405
5406 async fn handle_open_buffer_for_symbol(
5407 this: ModelHandle<Self>,
5408 envelope: TypedEnvelope<proto::OpenBufferForSymbol>,
5409 _: Arc<Client>,
5410 mut cx: AsyncAppContext,
5411 ) -> Result<proto::OpenBufferForSymbolResponse> {
5412 let peer_id = envelope.original_sender_id()?;
5413 let symbol = envelope
5414 .payload
5415 .symbol
5416 .ok_or_else(|| anyhow!("invalid symbol"))?;
5417 let symbol = this
5418 .read_with(&cx, |this, _| this.deserialize_symbol(symbol))
5419 .await?;
5420 let symbol = this.read_with(&cx, |this, _| {
5421 let signature = this.symbol_signature(&symbol.path);
5422 if signature == symbol.signature {
5423 Ok(symbol)
5424 } else {
5425 Err(anyhow!("invalid symbol signature"))
5426 }
5427 })?;
5428 let buffer = this
5429 .update(&mut cx, |this, cx| this.open_buffer_for_symbol(&symbol, cx))
5430 .await?;
5431
5432 Ok(proto::OpenBufferForSymbolResponse {
5433 buffer_id: this.update(&mut cx, |this, cx| {
5434 this.create_buffer_for_peer(&buffer, peer_id, cx)
5435 }),
5436 })
5437 }
5438
5439 fn symbol_signature(&self, project_path: &ProjectPath) -> [u8; 32] {
5440 let mut hasher = Sha256::new();
5441 hasher.update(project_path.worktree_id.to_proto().to_be_bytes());
5442 hasher.update(project_path.path.to_string_lossy().as_bytes());
5443 hasher.update(self.nonce.to_be_bytes());
5444 hasher.finalize().as_slice().try_into().unwrap()
5445 }
5446
5447 async fn handle_open_buffer_by_id(
5448 this: ModelHandle<Self>,
5449 envelope: TypedEnvelope<proto::OpenBufferById>,
5450 _: Arc<Client>,
5451 mut cx: AsyncAppContext,
5452 ) -> Result<proto::OpenBufferResponse> {
5453 let peer_id = envelope.original_sender_id()?;
5454 let buffer = this
5455 .update(&mut cx, |this, cx| {
5456 this.open_buffer_by_id(envelope.payload.id, cx)
5457 })
5458 .await?;
5459 this.update(&mut cx, |this, cx| {
5460 Ok(proto::OpenBufferResponse {
5461 buffer_id: this.create_buffer_for_peer(&buffer, peer_id, cx),
5462 })
5463 })
5464 }
5465
5466 async fn handle_open_buffer_by_path(
5467 this: ModelHandle<Self>,
5468 envelope: TypedEnvelope<proto::OpenBufferByPath>,
5469 _: Arc<Client>,
5470 mut cx: AsyncAppContext,
5471 ) -> Result<proto::OpenBufferResponse> {
5472 let peer_id = envelope.original_sender_id()?;
5473 let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
5474 let open_buffer = this.update(&mut cx, |this, cx| {
5475 this.open_buffer(
5476 ProjectPath {
5477 worktree_id,
5478 path: PathBuf::from(envelope.payload.path).into(),
5479 },
5480 cx,
5481 )
5482 });
5483
5484 let buffer = open_buffer.await?;
5485 this.update(&mut cx, |this, cx| {
5486 Ok(proto::OpenBufferResponse {
5487 buffer_id: this.create_buffer_for_peer(&buffer, peer_id, cx),
5488 })
5489 })
5490 }
5491
5492 fn serialize_project_transaction_for_peer(
5493 &mut self,
5494 project_transaction: ProjectTransaction,
5495 peer_id: proto::PeerId,
5496 cx: &AppContext,
5497 ) -> proto::ProjectTransaction {
5498 let mut serialized_transaction = proto::ProjectTransaction {
5499 buffer_ids: Default::default(),
5500 transactions: Default::default(),
5501 };
5502 for (buffer, transaction) in project_transaction.0 {
5503 serialized_transaction
5504 .buffer_ids
5505 .push(self.create_buffer_for_peer(&buffer, peer_id, cx));
5506 serialized_transaction
5507 .transactions
5508 .push(language::proto::serialize_transaction(&transaction));
5509 }
5510 serialized_transaction
5511 }
5512
5513 fn deserialize_project_transaction(
5514 &mut self,
5515 message: proto::ProjectTransaction,
5516 push_to_history: bool,
5517 cx: &mut ModelContext<Self>,
5518 ) -> Task<Result<ProjectTransaction>> {
5519 cx.spawn(|this, mut cx| async move {
5520 let mut project_transaction = ProjectTransaction::default();
5521 for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
5522 {
5523 let buffer = this
5524 .update(&mut cx, |this, cx| this.wait_for_buffer(buffer_id, cx))
5525 .await?;
5526 let transaction = language::proto::deserialize_transaction(transaction)?;
5527 project_transaction.0.insert(buffer, transaction);
5528 }
5529
5530 for (buffer, transaction) in &project_transaction.0 {
5531 buffer
5532 .update(&mut cx, |buffer, _| {
5533 buffer.wait_for_edits(transaction.edit_ids.iter().copied())
5534 })
5535 .await;
5536
5537 if push_to_history {
5538 buffer.update(&mut cx, |buffer, _| {
5539 buffer.push_transaction(transaction.clone(), Instant::now());
5540 });
5541 }
5542 }
5543
5544 Ok(project_transaction)
5545 })
5546 }
5547
5548 fn create_buffer_for_peer(
5549 &mut self,
5550 buffer: &ModelHandle<Buffer>,
5551 peer_id: proto::PeerId,
5552 cx: &AppContext,
5553 ) -> u64 {
5554 let buffer_id = buffer.read(cx).remote_id();
5555 if let Some(project_id) = self.remote_id() {
5556 let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
5557 if shared_buffers.insert(buffer_id) {
5558 let buffer = buffer.read(cx);
5559 let state = buffer.to_proto();
5560 let operations = buffer.serialize_ops(cx);
5561 let client = self.client.clone();
5562 cx.background()
5563 .spawn(
5564 async move {
5565 let mut operations = operations.await;
5566
5567 client.send(proto::CreateBufferForPeer {
5568 project_id,
5569 peer_id: Some(peer_id),
5570 variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
5571 })?;
5572
5573 loop {
5574 #[cfg(any(test, feature = "test-support"))]
5575 const CHUNK_SIZE: usize = 5;
5576
5577 #[cfg(not(any(test, feature = "test-support")))]
5578 const CHUNK_SIZE: usize = 100;
5579
5580 let chunk = operations
5581 .drain(..cmp::min(CHUNK_SIZE, operations.len()))
5582 .collect();
5583 let is_last = operations.is_empty();
5584 client.send(proto::CreateBufferForPeer {
5585 project_id,
5586 peer_id: Some(peer_id),
5587 variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
5588 proto::BufferChunk {
5589 buffer_id,
5590 operations: chunk,
5591 is_last,
5592 },
5593 )),
5594 })?;
5595
5596 if is_last {
5597 break;
5598 }
5599 }
5600
5601 Ok(())
5602 }
5603 .log_err(),
5604 )
5605 .detach();
5606 }
5607 }
5608
5609 buffer_id
5610 }
5611
5612 fn wait_for_buffer(
5613 &self,
5614 id: u64,
5615 cx: &mut ModelContext<Self>,
5616 ) -> Task<Result<ModelHandle<Buffer>>> {
5617 let mut opened_buffer_rx = self.opened_buffer.1.clone();
5618 cx.spawn(|this, mut cx| async move {
5619 let buffer = loop {
5620 let buffer = this.read_with(&cx, |this, cx| {
5621 this.opened_buffers
5622 .get(&id)
5623 .and_then(|buffer| buffer.upgrade(cx))
5624 });
5625 if let Some(buffer) = buffer {
5626 break buffer;
5627 } else if this.read_with(&cx, |this, _| this.is_read_only()) {
5628 return Err(anyhow!("disconnected before buffer {} could be opened", id));
5629 }
5630
5631 opened_buffer_rx
5632 .next()
5633 .await
5634 .ok_or_else(|| anyhow!("project dropped while waiting for buffer"))?;
5635 };
5636 buffer.update(&mut cx, |buffer, cx| buffer.git_diff_recalc(cx));
5637 Ok(buffer)
5638 })
5639 }
5640
5641 fn set_collaborators_from_proto(
5642 &mut self,
5643 messages: Vec<proto::Collaborator>,
5644 cx: &mut ModelContext<Self>,
5645 ) -> Result<()> {
5646 let mut collaborators = HashMap::default();
5647 for message in messages {
5648 let collaborator = Collaborator::from_proto(message)?;
5649 collaborators.insert(collaborator.peer_id, collaborator);
5650 }
5651 for old_peer_id in self.collaborators.keys() {
5652 if !collaborators.contains_key(old_peer_id) {
5653 cx.emit(Event::CollaboratorLeft(*old_peer_id));
5654 }
5655 }
5656 self.collaborators = collaborators;
5657 Ok(())
5658 }
5659
5660 fn deserialize_symbol(
5661 &self,
5662 serialized_symbol: proto::Symbol,
5663 ) -> impl Future<Output = Result<Symbol>> {
5664 let languages = self.languages.clone();
5665 async move {
5666 let source_worktree_id = WorktreeId::from_proto(serialized_symbol.source_worktree_id);
5667 let worktree_id = WorktreeId::from_proto(serialized_symbol.worktree_id);
5668 let start = serialized_symbol
5669 .start
5670 .ok_or_else(|| anyhow!("invalid start"))?;
5671 let end = serialized_symbol
5672 .end
5673 .ok_or_else(|| anyhow!("invalid end"))?;
5674 let kind = unsafe { mem::transmute(serialized_symbol.kind) };
5675 let path = ProjectPath {
5676 worktree_id,
5677 path: PathBuf::from(serialized_symbol.path).into(),
5678 };
5679 let language = languages.select_language(&path.path);
5680 Ok(Symbol {
5681 language_server_name: LanguageServerName(
5682 serialized_symbol.language_server_name.into(),
5683 ),
5684 source_worktree_id,
5685 path,
5686 label: {
5687 match language {
5688 Some(language) => {
5689 language
5690 .label_for_symbol(&serialized_symbol.name, kind)
5691 .await
5692 }
5693 None => None,
5694 }
5695 .unwrap_or_else(|| CodeLabel::plain(serialized_symbol.name.clone(), None))
5696 },
5697
5698 name: serialized_symbol.name,
5699 range: Unclipped(PointUtf16::new(start.row, start.column))
5700 ..Unclipped(PointUtf16::new(end.row, end.column)),
5701 kind,
5702 signature: serialized_symbol
5703 .signature
5704 .try_into()
5705 .map_err(|_| anyhow!("invalid signature"))?,
5706 })
5707 }
5708 }
5709
5710 async fn handle_buffer_saved(
5711 this: ModelHandle<Self>,
5712 envelope: TypedEnvelope<proto::BufferSaved>,
5713 _: Arc<Client>,
5714 mut cx: AsyncAppContext,
5715 ) -> Result<()> {
5716 let version = deserialize_version(envelope.payload.version);
5717 let mtime = envelope
5718 .payload
5719 .mtime
5720 .ok_or_else(|| anyhow!("missing mtime"))?
5721 .into();
5722
5723 this.update(&mut cx, |this, cx| {
5724 let buffer = this
5725 .opened_buffers
5726 .get(&envelope.payload.buffer_id)
5727 .and_then(|buffer| buffer.upgrade(cx));
5728 if let Some(buffer) = buffer {
5729 buffer.update(cx, |buffer, cx| {
5730 buffer.did_save(version, envelope.payload.fingerprint, mtime, None, cx);
5731 });
5732 }
5733 Ok(())
5734 })
5735 }
5736
5737 async fn handle_buffer_reloaded(
5738 this: ModelHandle<Self>,
5739 envelope: TypedEnvelope<proto::BufferReloaded>,
5740 _: Arc<Client>,
5741 mut cx: AsyncAppContext,
5742 ) -> Result<()> {
5743 let payload = envelope.payload;
5744 let version = deserialize_version(payload.version);
5745 let line_ending = deserialize_line_ending(
5746 proto::LineEnding::from_i32(payload.line_ending)
5747 .ok_or_else(|| anyhow!("missing line ending"))?,
5748 );
5749 let mtime = payload
5750 .mtime
5751 .ok_or_else(|| anyhow!("missing mtime"))?
5752 .into();
5753 this.update(&mut cx, |this, cx| {
5754 let buffer = this
5755 .opened_buffers
5756 .get(&payload.buffer_id)
5757 .and_then(|buffer| buffer.upgrade(cx));
5758 if let Some(buffer) = buffer {
5759 buffer.update(cx, |buffer, cx| {
5760 buffer.did_reload(version, payload.fingerprint, line_ending, mtime, cx);
5761 });
5762 }
5763 Ok(())
5764 })
5765 }
5766
5767 #[allow(clippy::type_complexity)]
5768 fn edits_from_lsp(
5769 &mut self,
5770 buffer: &ModelHandle<Buffer>,
5771 lsp_edits: impl 'static + Send + IntoIterator<Item = lsp::TextEdit>,
5772 version: Option<i32>,
5773 cx: &mut ModelContext<Self>,
5774 ) -> Task<Result<Vec<(Range<Anchor>, String)>>> {
5775 let snapshot = self.buffer_snapshot_for_lsp_version(buffer, version, cx);
5776 cx.background().spawn(async move {
5777 let snapshot = snapshot?;
5778 let mut lsp_edits = lsp_edits
5779 .into_iter()
5780 .map(|edit| (range_from_lsp(edit.range), edit.new_text))
5781 .collect::<Vec<_>>();
5782 lsp_edits.sort_by_key(|(range, _)| range.start);
5783
5784 let mut lsp_edits = lsp_edits.into_iter().peekable();
5785 let mut edits = Vec::new();
5786 while let Some((range, mut new_text)) = lsp_edits.next() {
5787 // Clip invalid ranges provided by the language server.
5788 let mut range = snapshot.clip_point_utf16(range.start, Bias::Left)
5789 ..snapshot.clip_point_utf16(range.end, Bias::Left);
5790
5791 // Combine any LSP edits that are adjacent.
5792 //
5793 // Also, combine LSP edits that are separated from each other by only
5794 // a newline. This is important because for some code actions,
5795 // Rust-analyzer rewrites the entire buffer via a series of edits that
5796 // are separated by unchanged newline characters.
5797 //
5798 // In order for the diffing logic below to work properly, any edits that
5799 // cancel each other out must be combined into one.
5800 while let Some((next_range, next_text)) = lsp_edits.peek() {
5801 if next_range.start.0 > range.end {
5802 if next_range.start.0.row > range.end.row + 1
5803 || next_range.start.0.column > 0
5804 || snapshot.clip_point_utf16(
5805 Unclipped(PointUtf16::new(range.end.row, u32::MAX)),
5806 Bias::Left,
5807 ) > range.end
5808 {
5809 break;
5810 }
5811 new_text.push('\n');
5812 }
5813 range.end = snapshot.clip_point_utf16(next_range.end, Bias::Left);
5814 new_text.push_str(next_text);
5815 lsp_edits.next();
5816 }
5817
5818 // For multiline edits, perform a diff of the old and new text so that
5819 // we can identify the changes more precisely, preserving the locations
5820 // of any anchors positioned in the unchanged regions.
5821 if range.end.row > range.start.row {
5822 let mut offset = range.start.to_offset(&snapshot);
5823 let old_text = snapshot.text_for_range(range).collect::<String>();
5824
5825 let diff = TextDiff::from_lines(old_text.as_str(), &new_text);
5826 let mut moved_since_edit = true;
5827 for change in diff.iter_all_changes() {
5828 let tag = change.tag();
5829 let value = change.value();
5830 match tag {
5831 ChangeTag::Equal => {
5832 offset += value.len();
5833 moved_since_edit = true;
5834 }
5835 ChangeTag::Delete => {
5836 let start = snapshot.anchor_after(offset);
5837 let end = snapshot.anchor_before(offset + value.len());
5838 if moved_since_edit {
5839 edits.push((start..end, String::new()));
5840 } else {
5841 edits.last_mut().unwrap().0.end = end;
5842 }
5843 offset += value.len();
5844 moved_since_edit = false;
5845 }
5846 ChangeTag::Insert => {
5847 if moved_since_edit {
5848 let anchor = snapshot.anchor_after(offset);
5849 edits.push((anchor..anchor, value.to_string()));
5850 } else {
5851 edits.last_mut().unwrap().1.push_str(value);
5852 }
5853 moved_since_edit = false;
5854 }
5855 }
5856 }
5857 } else if range.end == range.start {
5858 let anchor = snapshot.anchor_after(range.start);
5859 edits.push((anchor..anchor, new_text));
5860 } else {
5861 let edit_start = snapshot.anchor_after(range.start);
5862 let edit_end = snapshot.anchor_before(range.end);
5863 edits.push((edit_start..edit_end, new_text));
5864 }
5865 }
5866
5867 Ok(edits)
5868 })
5869 }
5870
5871 fn buffer_snapshot_for_lsp_version(
5872 &mut self,
5873 buffer: &ModelHandle<Buffer>,
5874 version: Option<i32>,
5875 cx: &AppContext,
5876 ) -> Result<TextBufferSnapshot> {
5877 const OLD_VERSIONS_TO_RETAIN: i32 = 10;
5878
5879 if let Some(version) = version {
5880 let buffer_id = buffer.read(cx).remote_id();
5881 let snapshots = self
5882 .buffer_snapshots
5883 .get_mut(&buffer_id)
5884 .ok_or_else(|| anyhow!("no snapshot found for buffer {}", buffer_id))?;
5885 let mut found_snapshot = None;
5886 snapshots.retain(|(snapshot_version, snapshot)| {
5887 if snapshot_version + OLD_VERSIONS_TO_RETAIN < version {
5888 false
5889 } else {
5890 if *snapshot_version == version {
5891 found_snapshot = Some(snapshot.clone());
5892 }
5893 true
5894 }
5895 });
5896
5897 found_snapshot.ok_or_else(|| {
5898 anyhow!(
5899 "snapshot not found for buffer {} at version {}",
5900 buffer_id,
5901 version
5902 )
5903 })
5904 } else {
5905 Ok((buffer.read(cx)).text_snapshot())
5906 }
5907 }
5908
5909 fn language_server_for_buffer(
5910 &self,
5911 buffer: &Buffer,
5912 cx: &AppContext,
5913 ) -> Option<(&Arc<CachedLspAdapter>, &Arc<LanguageServer>)> {
5914 if let Some((file, language)) = File::from_dyn(buffer.file()).zip(buffer.language()) {
5915 let name = language.lsp_adapter()?.name.clone();
5916 let worktree_id = file.worktree_id(cx);
5917 let key = (worktree_id, name);
5918
5919 if let Some(server_id) = self.language_server_ids.get(&key) {
5920 if let Some(LanguageServerState::Running {
5921 adapter, server, ..
5922 }) = self.language_servers.get(server_id)
5923 {
5924 return Some((adapter, server));
5925 }
5926 }
5927 }
5928
5929 None
5930 }
5931}
5932
5933impl WorktreeHandle {
5934 pub fn upgrade(&self, cx: &AppContext) -> Option<ModelHandle<Worktree>> {
5935 match self {
5936 WorktreeHandle::Strong(handle) => Some(handle.clone()),
5937 WorktreeHandle::Weak(handle) => handle.upgrade(cx),
5938 }
5939 }
5940}
5941
5942impl OpenBuffer {
5943 pub fn upgrade(&self, cx: &impl UpgradeModelHandle) -> Option<ModelHandle<Buffer>> {
5944 match self {
5945 OpenBuffer::Strong(handle) => Some(handle.clone()),
5946 OpenBuffer::Weak(handle) => handle.upgrade(cx),
5947 OpenBuffer::Operations(_) => None,
5948 }
5949 }
5950}
5951
5952pub struct PathMatchCandidateSet {
5953 pub snapshot: Snapshot,
5954 pub include_ignored: bool,
5955 pub include_root_name: bool,
5956}
5957
5958impl<'a> fuzzy::PathMatchCandidateSet<'a> for PathMatchCandidateSet {
5959 type Candidates = PathMatchCandidateSetIter<'a>;
5960
5961 fn id(&self) -> usize {
5962 self.snapshot.id().to_usize()
5963 }
5964
5965 fn len(&self) -> usize {
5966 if self.include_ignored {
5967 self.snapshot.file_count()
5968 } else {
5969 self.snapshot.visible_file_count()
5970 }
5971 }
5972
5973 fn prefix(&self) -> Arc<str> {
5974 if self.snapshot.root_entry().map_or(false, |e| e.is_file()) {
5975 self.snapshot.root_name().into()
5976 } else if self.include_root_name {
5977 format!("{}/", self.snapshot.root_name()).into()
5978 } else {
5979 "".into()
5980 }
5981 }
5982
5983 fn candidates(&'a self, start: usize) -> Self::Candidates {
5984 PathMatchCandidateSetIter {
5985 traversal: self.snapshot.files(self.include_ignored, start),
5986 }
5987 }
5988}
5989
5990pub struct PathMatchCandidateSetIter<'a> {
5991 traversal: Traversal<'a>,
5992}
5993
5994impl<'a> Iterator for PathMatchCandidateSetIter<'a> {
5995 type Item = fuzzy::PathMatchCandidate<'a>;
5996
5997 fn next(&mut self) -> Option<Self::Item> {
5998 self.traversal.next().map(|entry| {
5999 if let EntryKind::File(char_bag) = entry.kind {
6000 fuzzy::PathMatchCandidate {
6001 path: &entry.path,
6002 char_bag,
6003 }
6004 } else {
6005 unreachable!()
6006 }
6007 })
6008 }
6009}
6010
6011impl Entity for Project {
6012 type Event = Event;
6013
6014 fn release(&mut self, _: &mut gpui::MutableAppContext) {
6015 match &self.client_state {
6016 Some(ProjectClientState::Local { remote_id, .. }) => {
6017 self.client
6018 .send(proto::UnshareProject {
6019 project_id: *remote_id,
6020 })
6021 .log_err();
6022 }
6023 Some(ProjectClientState::Remote { remote_id, .. }) => {
6024 self.client
6025 .send(proto::LeaveProject {
6026 project_id: *remote_id,
6027 })
6028 .log_err();
6029 }
6030 _ => {}
6031 }
6032 }
6033
6034 fn app_will_quit(
6035 &mut self,
6036 _: &mut MutableAppContext,
6037 ) -> Option<std::pin::Pin<Box<dyn 'static + Future<Output = ()>>>> {
6038 let shutdown_futures = self
6039 .language_servers
6040 .drain()
6041 .map(|(_, server_state)| async {
6042 match server_state {
6043 LanguageServerState::Running { server, .. } => server.shutdown()?.await,
6044 LanguageServerState::Starting(starting_server) => {
6045 starting_server.await?.shutdown()?.await
6046 }
6047 }
6048 })
6049 .collect::<Vec<_>>();
6050
6051 Some(
6052 async move {
6053 futures::future::join_all(shutdown_futures).await;
6054 }
6055 .boxed(),
6056 )
6057 }
6058}
6059
6060impl Collaborator {
6061 fn from_proto(message: proto::Collaborator) -> Result<Self> {
6062 Ok(Self {
6063 peer_id: message.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?,
6064 replica_id: message.replica_id as ReplicaId,
6065 })
6066 }
6067}
6068
6069impl<P: AsRef<Path>> From<(WorktreeId, P)> for ProjectPath {
6070 fn from((worktree_id, path): (WorktreeId, P)) -> Self {
6071 Self {
6072 worktree_id,
6073 path: path.as_ref().into(),
6074 }
6075 }
6076}
6077
6078fn serialize_symbol(symbol: &Symbol) -> proto::Symbol {
6079 proto::Symbol {
6080 language_server_name: symbol.language_server_name.0.to_string(),
6081 source_worktree_id: symbol.source_worktree_id.to_proto(),
6082 worktree_id: symbol.path.worktree_id.to_proto(),
6083 path: symbol.path.path.to_string_lossy().to_string(),
6084 name: symbol.name.clone(),
6085 kind: unsafe { mem::transmute(symbol.kind) },
6086 start: Some(proto::PointUtf16 {
6087 row: symbol.range.start.0.row,
6088 column: symbol.range.start.0.column,
6089 }),
6090 end: Some(proto::PointUtf16 {
6091 row: symbol.range.end.0.row,
6092 column: symbol.range.end.0.column,
6093 }),
6094 signature: symbol.signature.to_vec(),
6095 }
6096}
6097
6098fn relativize_path(base: &Path, path: &Path) -> PathBuf {
6099 let mut path_components = path.components();
6100 let mut base_components = base.components();
6101 let mut components: Vec<Component> = Vec::new();
6102 loop {
6103 match (path_components.next(), base_components.next()) {
6104 (None, None) => break,
6105 (Some(a), None) => {
6106 components.push(a);
6107 components.extend(path_components.by_ref());
6108 break;
6109 }
6110 (None, _) => components.push(Component::ParentDir),
6111 (Some(a), Some(b)) if components.is_empty() && a == b => (),
6112 (Some(a), Some(b)) if b == Component::CurDir => components.push(a),
6113 (Some(a), Some(_)) => {
6114 components.push(Component::ParentDir);
6115 for _ in base_components {
6116 components.push(Component::ParentDir);
6117 }
6118 components.push(a);
6119 components.extend(path_components.by_ref());
6120 break;
6121 }
6122 }
6123 }
6124 components.iter().map(|c| c.as_os_str()).collect()
6125}
6126
6127impl Item for Buffer {
6128 fn entry_id(&self, cx: &AppContext) -> Option<ProjectEntryId> {
6129 File::from_dyn(self.file()).and_then(|file| file.project_entry_id(cx))
6130 }
6131}