project.rs

   1mod ignore;
   2mod lsp_command;
   3mod lsp_glob_set;
   4pub mod search;
   5pub mod terminals;
   6pub mod worktree;
   7
   8#[cfg(test)]
   9mod project_tests;
  10
  11use anyhow::{anyhow, Context, Result};
  12use client::{proto, Client, TypedEnvelope, UserStore};
  13use clock::ReplicaId;
  14use collections::{hash_map, BTreeMap, HashMap, HashSet};
  15use futures::{
  16    channel::mpsc::{self, UnboundedReceiver},
  17    future::{try_join_all, Shared},
  18    AsyncWriteExt, Future, FutureExt, StreamExt, TryFutureExt,
  19};
  20use gpui::{
  21    AnyModelHandle, AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task,
  22    UpgradeModelHandle, WeakModelHandle,
  23};
  24use language::{
  25    point_to_lsp,
  26    proto::{
  27        deserialize_anchor, deserialize_fingerprint, deserialize_line_ending, deserialize_version,
  28        serialize_anchor, serialize_version,
  29    },
  30    range_from_lsp, range_to_lsp, Anchor, Bias, Buffer, CachedLspAdapter, CodeAction, CodeLabel,
  31    Completion, Diagnostic, DiagnosticEntry, DiagnosticSet, Diff, Event as BufferEvent, File as _,
  32    Language, LanguageRegistry, LanguageServerName, LocalFile, OffsetRangeExt, Operation, Patch,
  33    PointUtf16, RopeFingerprint, TextBufferSnapshot, ToOffset, ToPointUtf16, Transaction,
  34    Unclipped,
  35};
  36use lsp::{
  37    DiagnosticSeverity, DiagnosticTag, DidChangeWatchedFilesRegistrationOptions,
  38    DocumentHighlightKind, LanguageServer, LanguageString, MarkedString,
  39};
  40use lsp_command::*;
  41use lsp_glob_set::LspGlobSet;
  42use postage::watch;
  43use rand::prelude::*;
  44use search::SearchQuery;
  45use serde::Serialize;
  46use settings::{FormatOnSave, Formatter, Settings};
  47use sha2::{Digest, Sha256};
  48use similar::{ChangeTag, TextDiff};
  49use std::{
  50    cell::RefCell,
  51    cmp::{self, Ordering},
  52    convert::TryInto,
  53    hash::Hash,
  54    mem,
  55    num::NonZeroU32,
  56    ops::Range,
  57    path::{Component, Path, PathBuf},
  58    rc::Rc,
  59    str,
  60    sync::{
  61        atomic::{AtomicUsize, Ordering::SeqCst},
  62        Arc,
  63    },
  64    time::{Duration, Instant, SystemTime},
  65};
  66use terminals::Terminals;
  67
  68use util::{debug_panic, defer, merge_json_value_into, post_inc, ResultExt, TryFutureExt as _};
  69
  70pub use fs::*;
  71pub use worktree::*;
  72
  73pub trait Item {
  74    fn entry_id(&self, cx: &AppContext) -> Option<ProjectEntryId>;
  75    fn project_path(&self, cx: &AppContext) -> Option<ProjectPath>;
  76}
  77
  78// Language server state is stored across 3 collections:
  79//     language_servers =>
  80//         a mapping from unique server id to LanguageServerState which can either be a task for a
  81//         server in the process of starting, or a running server with adapter and language server arcs
  82//     language_server_ids => a mapping from worktreeId and server name to the unique server id
  83//     language_server_statuses => a mapping from unique server id to the current server status
  84//
  85// Multiple worktrees can map to the same language server for example when you jump to the definition
  86// of a file in the standard library. So language_server_ids is used to look up which server is active
  87// for a given worktree and language server name
  88//
  89// When starting a language server, first the id map is checked to make sure a server isn't already available
  90// for that worktree. If there is one, it finishes early. Otherwise, a new id is allocated and and
  91// the Starting variant of LanguageServerState is stored in the language_servers map.
  92pub struct Project {
  93    worktrees: Vec<WorktreeHandle>,
  94    active_entry: Option<ProjectEntryId>,
  95    buffer_ordered_messages_tx: mpsc::UnboundedSender<BufferOrderedMessage>,
  96    languages: Arc<LanguageRegistry>,
  97    language_servers: HashMap<usize, LanguageServerState>,
  98    language_server_ids: HashMap<(WorktreeId, LanguageServerName), usize>,
  99    language_server_statuses: BTreeMap<usize, LanguageServerStatus>,
 100    last_workspace_edits_by_language_server: HashMap<usize, ProjectTransaction>,
 101    next_language_server_id: usize,
 102    client: Arc<client::Client>,
 103    next_entry_id: Arc<AtomicUsize>,
 104    join_project_response_message_id: u32,
 105    next_diagnostic_group_id: usize,
 106    user_store: ModelHandle<UserStore>,
 107    fs: Arc<dyn Fs>,
 108    client_state: Option<ProjectClientState>,
 109    collaborators: HashMap<proto::PeerId, Collaborator>,
 110    client_subscriptions: Vec<client::Subscription>,
 111    _subscriptions: Vec<gpui::Subscription>,
 112    opened_buffer: (watch::Sender<()>, watch::Receiver<()>),
 113    shared_buffers: HashMap<proto::PeerId, HashSet<u64>>,
 114    #[allow(clippy::type_complexity)]
 115    loading_buffers_by_path: HashMap<
 116        ProjectPath,
 117        postage::watch::Receiver<Option<Result<ModelHandle<Buffer>, Arc<anyhow::Error>>>>,
 118    >,
 119    #[allow(clippy::type_complexity)]
 120    loading_local_worktrees:
 121        HashMap<Arc<Path>, Shared<Task<Result<ModelHandle<Worktree>, Arc<anyhow::Error>>>>>,
 122    opened_buffers: HashMap<u64, OpenBuffer>,
 123    /// A mapping from a buffer ID to None means that we've started waiting for an ID but haven't finished loading it.
 124    /// Used for re-issuing buffer requests when peers temporarily disconnect
 125    incomplete_remote_buffers: HashMap<u64, Option<ModelHandle<Buffer>>>,
 126    buffer_snapshots: HashMap<u64, Vec<(i32, TextBufferSnapshot)>>,
 127    buffers_being_formatted: HashSet<usize>,
 128    nonce: u128,
 129    _maintain_buffer_languages: Task<()>,
 130    _maintain_workspace_config: Task<()>,
 131    terminals: Terminals,
 132}
 133
 134/// Message ordered with respect to buffer operations
 135enum BufferOrderedMessage {
 136    Operation {
 137        buffer_id: u64,
 138        operation: proto::Operation,
 139    },
 140    LanguageServerUpdate {
 141        language_server_id: usize,
 142        message: proto::update_language_server::Variant,
 143    },
 144    Resync,
 145}
 146
 147enum LocalProjectUpdate {
 148    WorktreesChanged,
 149    CreateBufferForPeer {
 150        peer_id: proto::PeerId,
 151        buffer_id: u64,
 152    },
 153}
 154
 155enum OpenBuffer {
 156    Strong(ModelHandle<Buffer>),
 157    Weak(WeakModelHandle<Buffer>),
 158    Operations(Vec<Operation>),
 159}
 160
 161enum WorktreeHandle {
 162    Strong(ModelHandle<Worktree>),
 163    Weak(WeakModelHandle<Worktree>),
 164}
 165
 166enum ProjectClientState {
 167    Local {
 168        remote_id: u64,
 169        updates_tx: mpsc::UnboundedSender<LocalProjectUpdate>,
 170        _send_updates: Task<()>,
 171    },
 172    Remote {
 173        sharing_has_stopped: bool,
 174        remote_id: u64,
 175        replica_id: ReplicaId,
 176    },
 177}
 178
 179#[derive(Clone, Debug)]
 180pub struct Collaborator {
 181    pub peer_id: proto::PeerId,
 182    pub replica_id: ReplicaId,
 183}
 184
 185#[derive(Clone, Debug, PartialEq, Eq)]
 186pub enum Event {
 187    ActiveEntryChanged(Option<ProjectEntryId>),
 188    WorktreeAdded,
 189    WorktreeRemoved(WorktreeId),
 190    DiskBasedDiagnosticsStarted {
 191        language_server_id: usize,
 192    },
 193    DiskBasedDiagnosticsFinished {
 194        language_server_id: usize,
 195    },
 196    DiagnosticsUpdated {
 197        path: ProjectPath,
 198        language_server_id: usize,
 199    },
 200    RemoteIdChanged(Option<u64>),
 201    DisconnectedFromHost,
 202    Closed,
 203    CollaboratorUpdated {
 204        old_peer_id: proto::PeerId,
 205        new_peer_id: proto::PeerId,
 206    },
 207    CollaboratorLeft(proto::PeerId),
 208}
 209
 210pub enum LanguageServerState {
 211    Starting(Task<Option<Arc<LanguageServer>>>),
 212    Running {
 213        language: Arc<Language>,
 214        adapter: Arc<CachedLspAdapter>,
 215        server: Arc<LanguageServer>,
 216        watched_paths: LspGlobSet,
 217        simulate_disk_based_diagnostics_completion: Option<Task<()>>,
 218    },
 219}
 220
 221#[derive(Serialize)]
 222pub struct LanguageServerStatus {
 223    pub name: String,
 224    pub pending_work: BTreeMap<String, LanguageServerProgress>,
 225    pub has_pending_diagnostic_updates: bool,
 226    progress_tokens: HashSet<String>,
 227}
 228
 229#[derive(Clone, Debug, Serialize)]
 230pub struct LanguageServerProgress {
 231    pub message: Option<String>,
 232    pub percentage: Option<usize>,
 233    #[serde(skip_serializing)]
 234    pub last_update_at: Instant,
 235}
 236
 237#[derive(Clone, Debug, Eq, PartialEq, Hash, PartialOrd, Ord)]
 238pub struct ProjectPath {
 239    pub worktree_id: WorktreeId,
 240    pub path: Arc<Path>,
 241}
 242
 243#[derive(Copy, Clone, Debug, Default, PartialEq, Serialize)]
 244pub struct DiagnosticSummary {
 245    pub language_server_id: usize,
 246    pub error_count: usize,
 247    pub warning_count: usize,
 248}
 249
 250#[derive(Debug, Clone)]
 251pub struct Location {
 252    pub buffer: ModelHandle<Buffer>,
 253    pub range: Range<language::Anchor>,
 254}
 255
 256#[derive(Debug, Clone)]
 257pub struct LocationLink {
 258    pub origin: Option<Location>,
 259    pub target: Location,
 260}
 261
 262#[derive(Debug)]
 263pub struct DocumentHighlight {
 264    pub range: Range<language::Anchor>,
 265    pub kind: DocumentHighlightKind,
 266}
 267
 268#[derive(Clone, Debug)]
 269pub struct Symbol {
 270    pub language_server_name: LanguageServerName,
 271    pub source_worktree_id: WorktreeId,
 272    pub path: ProjectPath,
 273    pub label: CodeLabel,
 274    pub name: String,
 275    pub kind: lsp::SymbolKind,
 276    pub range: Range<Unclipped<PointUtf16>>,
 277    pub signature: [u8; 32],
 278}
 279
 280#[derive(Clone, Debug, PartialEq)]
 281pub struct HoverBlock {
 282    pub text: String,
 283    pub language: Option<String>,
 284}
 285
 286impl HoverBlock {
 287    fn try_new(marked_string: MarkedString) -> Option<Self> {
 288        let result = match marked_string {
 289            MarkedString::LanguageString(LanguageString { language, value }) => HoverBlock {
 290                text: value,
 291                language: Some(language),
 292            },
 293            MarkedString::String(text) => HoverBlock {
 294                text,
 295                language: None,
 296            },
 297        };
 298        if result.text.is_empty() {
 299            None
 300        } else {
 301            Some(result)
 302        }
 303    }
 304}
 305
 306#[derive(Debug)]
 307pub struct Hover {
 308    pub contents: Vec<HoverBlock>,
 309    pub range: Option<Range<language::Anchor>>,
 310}
 311
 312#[derive(Default)]
 313pub struct ProjectTransaction(pub HashMap<ModelHandle<Buffer>, language::Transaction>);
 314
 315impl DiagnosticSummary {
 316    fn new<'a, T: 'a>(
 317        language_server_id: usize,
 318        diagnostics: impl IntoIterator<Item = &'a DiagnosticEntry<T>>,
 319    ) -> Self {
 320        let mut this = Self {
 321            language_server_id,
 322            error_count: 0,
 323            warning_count: 0,
 324        };
 325
 326        for entry in diagnostics {
 327            if entry.diagnostic.is_primary {
 328                match entry.diagnostic.severity {
 329                    DiagnosticSeverity::ERROR => this.error_count += 1,
 330                    DiagnosticSeverity::WARNING => this.warning_count += 1,
 331                    _ => {}
 332                }
 333            }
 334        }
 335
 336        this
 337    }
 338
 339    pub fn is_empty(&self) -> bool {
 340        self.error_count == 0 && self.warning_count == 0
 341    }
 342
 343    pub fn to_proto(&self, path: &Path) -> proto::DiagnosticSummary {
 344        proto::DiagnosticSummary {
 345            path: path.to_string_lossy().to_string(),
 346            language_server_id: self.language_server_id as u64,
 347            error_count: self.error_count as u32,
 348            warning_count: self.warning_count as u32,
 349        }
 350    }
 351}
 352
 353#[derive(Clone, Copy, Debug, Default, Hash, PartialEq, Eq, PartialOrd, Ord)]
 354pub struct ProjectEntryId(usize);
 355
 356impl ProjectEntryId {
 357    pub const MAX: Self = Self(usize::MAX);
 358
 359    pub fn new(counter: &AtomicUsize) -> Self {
 360        Self(counter.fetch_add(1, SeqCst))
 361    }
 362
 363    pub fn from_proto(id: u64) -> Self {
 364        Self(id as usize)
 365    }
 366
 367    pub fn to_proto(&self) -> u64 {
 368        self.0 as u64
 369    }
 370
 371    pub fn to_usize(&self) -> usize {
 372        self.0
 373    }
 374}
 375
 376#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 377pub enum FormatTrigger {
 378    Save,
 379    Manual,
 380}
 381
 382impl FormatTrigger {
 383    fn from_proto(value: i32) -> FormatTrigger {
 384        match value {
 385            0 => FormatTrigger::Save,
 386            1 => FormatTrigger::Manual,
 387            _ => FormatTrigger::Save,
 388        }
 389    }
 390}
 391
 392impl Project {
 393    pub fn init(client: &Arc<Client>) {
 394        client.add_model_message_handler(Self::handle_add_collaborator);
 395        client.add_model_message_handler(Self::handle_update_project_collaborator);
 396        client.add_model_message_handler(Self::handle_remove_collaborator);
 397        client.add_model_message_handler(Self::handle_buffer_reloaded);
 398        client.add_model_message_handler(Self::handle_buffer_saved);
 399        client.add_model_message_handler(Self::handle_start_language_server);
 400        client.add_model_message_handler(Self::handle_update_language_server);
 401        client.add_model_message_handler(Self::handle_update_project);
 402        client.add_model_message_handler(Self::handle_unshare_project);
 403        client.add_model_message_handler(Self::handle_create_buffer_for_peer);
 404        client.add_model_message_handler(Self::handle_update_buffer_file);
 405        client.add_model_request_handler(Self::handle_update_buffer);
 406        client.add_model_message_handler(Self::handle_update_diagnostic_summary);
 407        client.add_model_message_handler(Self::handle_update_worktree);
 408        client.add_model_request_handler(Self::handle_create_project_entry);
 409        client.add_model_request_handler(Self::handle_rename_project_entry);
 410        client.add_model_request_handler(Self::handle_copy_project_entry);
 411        client.add_model_request_handler(Self::handle_delete_project_entry);
 412        client.add_model_request_handler(Self::handle_apply_additional_edits_for_completion);
 413        client.add_model_request_handler(Self::handle_apply_code_action);
 414        client.add_model_request_handler(Self::handle_reload_buffers);
 415        client.add_model_request_handler(Self::handle_synchronize_buffers);
 416        client.add_model_request_handler(Self::handle_format_buffers);
 417        client.add_model_request_handler(Self::handle_lsp_command::<GetCodeActions>);
 418        client.add_model_request_handler(Self::handle_lsp_command::<GetCompletions>);
 419        client.add_model_request_handler(Self::handle_lsp_command::<GetHover>);
 420        client.add_model_request_handler(Self::handle_lsp_command::<GetDefinition>);
 421        client.add_model_request_handler(Self::handle_lsp_command::<GetTypeDefinition>);
 422        client.add_model_request_handler(Self::handle_lsp_command::<GetDocumentHighlights>);
 423        client.add_model_request_handler(Self::handle_lsp_command::<GetReferences>);
 424        client.add_model_request_handler(Self::handle_lsp_command::<PrepareRename>);
 425        client.add_model_request_handler(Self::handle_lsp_command::<PerformRename>);
 426        client.add_model_request_handler(Self::handle_search_project);
 427        client.add_model_request_handler(Self::handle_get_project_symbols);
 428        client.add_model_request_handler(Self::handle_open_buffer_for_symbol);
 429        client.add_model_request_handler(Self::handle_open_buffer_by_id);
 430        client.add_model_request_handler(Self::handle_open_buffer_by_path);
 431        client.add_model_request_handler(Self::handle_save_buffer);
 432        client.add_model_message_handler(Self::handle_update_diff_base);
 433    }
 434
 435    pub fn local(
 436        client: Arc<Client>,
 437        user_store: ModelHandle<UserStore>,
 438        languages: Arc<LanguageRegistry>,
 439        fs: Arc<dyn Fs>,
 440        cx: &mut AppContext,
 441    ) -> ModelHandle<Self> {
 442        cx.add_model(|cx: &mut ModelContext<Self>| {
 443            let (tx, rx) = mpsc::unbounded();
 444            cx.spawn_weak(|this, cx| Self::send_buffer_ordered_messages(this, rx, cx))
 445                .detach();
 446            Self {
 447                worktrees: Default::default(),
 448                buffer_ordered_messages_tx: tx,
 449                collaborators: Default::default(),
 450                opened_buffers: Default::default(),
 451                shared_buffers: Default::default(),
 452                incomplete_remote_buffers: Default::default(),
 453                loading_buffers_by_path: Default::default(),
 454                loading_local_worktrees: Default::default(),
 455                buffer_snapshots: Default::default(),
 456                join_project_response_message_id: 0,
 457                client_state: None,
 458                opened_buffer: watch::channel(),
 459                client_subscriptions: Vec::new(),
 460                _subscriptions: vec![cx.observe_global::<Settings, _>(Self::on_settings_changed)],
 461                _maintain_buffer_languages: Self::maintain_buffer_languages(&languages, cx),
 462                _maintain_workspace_config: Self::maintain_workspace_config(languages.clone(), cx),
 463                active_entry: None,
 464                languages,
 465                client,
 466                user_store,
 467                fs,
 468                next_entry_id: Default::default(),
 469                next_diagnostic_group_id: Default::default(),
 470                language_servers: Default::default(),
 471                language_server_ids: Default::default(),
 472                language_server_statuses: Default::default(),
 473                last_workspace_edits_by_language_server: Default::default(),
 474                buffers_being_formatted: Default::default(),
 475                next_language_server_id: 0,
 476                nonce: StdRng::from_entropy().gen(),
 477                terminals: Terminals {
 478                    local_handles: Vec::new(),
 479                },
 480            }
 481        })
 482    }
 483
 484    pub async fn remote(
 485        remote_id: u64,
 486        client: Arc<Client>,
 487        user_store: ModelHandle<UserStore>,
 488        languages: Arc<LanguageRegistry>,
 489        fs: Arc<dyn Fs>,
 490        mut cx: AsyncAppContext,
 491    ) -> Result<ModelHandle<Self>> {
 492        client.authenticate_and_connect(true, &cx).await?;
 493
 494        let subscription = client.subscribe_to_entity(remote_id)?;
 495        let response = client
 496            .request_envelope(proto::JoinProject {
 497                project_id: remote_id,
 498            })
 499            .await?;
 500        let this = cx.add_model(|cx| {
 501            let replica_id = response.payload.replica_id as ReplicaId;
 502
 503            let mut worktrees = Vec::new();
 504            for worktree in response.payload.worktrees {
 505                let worktree = cx.update(|cx| {
 506                    Worktree::remote(remote_id, replica_id, worktree, client.clone(), cx)
 507                });
 508                worktrees.push(worktree);
 509            }
 510
 511            let (tx, rx) = mpsc::unbounded();
 512            cx.spawn_weak(|this, cx| Self::send_buffer_ordered_messages(this, rx, cx))
 513                .detach();
 514            let mut this = Self {
 515                worktrees: Vec::new(),
 516                buffer_ordered_messages_tx: tx,
 517                loading_buffers_by_path: Default::default(),
 518                opened_buffer: watch::channel(),
 519                shared_buffers: Default::default(),
 520                incomplete_remote_buffers: Default::default(),
 521                loading_local_worktrees: Default::default(),
 522                active_entry: None,
 523                collaborators: Default::default(),
 524                join_project_response_message_id: response.message_id,
 525                _maintain_buffer_languages: Self::maintain_buffer_languages(&languages, cx),
 526                _maintain_workspace_config: Self::maintain_workspace_config(languages.clone(), cx),
 527                languages,
 528                user_store: user_store.clone(),
 529                fs,
 530                next_entry_id: Default::default(),
 531                next_diagnostic_group_id: Default::default(),
 532                client_subscriptions: Default::default(),
 533                _subscriptions: Default::default(),
 534                client: client.clone(),
 535                client_state: Some(ProjectClientState::Remote {
 536                    sharing_has_stopped: false,
 537                    remote_id,
 538                    replica_id,
 539                }),
 540                language_servers: Default::default(),
 541                language_server_ids: Default::default(),
 542                language_server_statuses: response
 543                    .payload
 544                    .language_servers
 545                    .into_iter()
 546                    .map(|server| {
 547                        (
 548                            server.id as usize,
 549                            LanguageServerStatus {
 550                                name: server.name,
 551                                pending_work: Default::default(),
 552                                has_pending_diagnostic_updates: false,
 553                                progress_tokens: Default::default(),
 554                            },
 555                        )
 556                    })
 557                    .collect(),
 558                last_workspace_edits_by_language_server: Default::default(),
 559                next_language_server_id: 0,
 560                opened_buffers: Default::default(),
 561                buffers_being_formatted: Default::default(),
 562                buffer_snapshots: Default::default(),
 563                nonce: StdRng::from_entropy().gen(),
 564                terminals: Terminals {
 565                    local_handles: Vec::new(),
 566                },
 567            };
 568            for worktree in worktrees {
 569                let _ = this.add_worktree(&worktree, cx);
 570            }
 571            this
 572        });
 573        let subscription = subscription.set_model(&this, &mut cx);
 574
 575        let user_ids = response
 576            .payload
 577            .collaborators
 578            .iter()
 579            .map(|peer| peer.user_id)
 580            .collect();
 581        user_store
 582            .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))
 583            .await?;
 584
 585        this.update(&mut cx, |this, cx| {
 586            this.set_collaborators_from_proto(response.payload.collaborators, cx)?;
 587            this.client_subscriptions.push(subscription);
 588            anyhow::Ok(())
 589        })?;
 590
 591        Ok(this)
 592    }
 593
 594    #[cfg(any(test, feature = "test-support"))]
 595    pub async fn test(
 596        fs: Arc<dyn Fs>,
 597        root_paths: impl IntoIterator<Item = &Path>,
 598        cx: &mut gpui::TestAppContext,
 599    ) -> ModelHandle<Project> {
 600        if !cx.read(|cx| cx.has_global::<Settings>()) {
 601            cx.update(|cx| {
 602                cx.set_global(Settings::test(cx));
 603            });
 604        }
 605
 606        let mut languages = LanguageRegistry::test();
 607        languages.set_executor(cx.background());
 608        let http_client = util::http::FakeHttpClient::with_404_response();
 609        let client = cx.update(|cx| client::Client::new(http_client.clone(), cx));
 610        let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
 611        let project =
 612            cx.update(|cx| Project::local(client, user_store, Arc::new(languages), fs, cx));
 613        for path in root_paths {
 614            let (tree, _) = project
 615                .update(cx, |project, cx| {
 616                    project.find_or_create_local_worktree(path, true, cx)
 617                })
 618                .await
 619                .unwrap();
 620            tree.read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
 621                .await;
 622        }
 623        project
 624    }
 625
 626    fn on_settings_changed(&mut self, cx: &mut ModelContext<Self>) {
 627        let settings = cx.global::<Settings>();
 628
 629        let mut language_servers_to_start = Vec::new();
 630        for buffer in self.opened_buffers.values() {
 631            if let Some(buffer) = buffer.upgrade(cx) {
 632                let buffer = buffer.read(cx);
 633                if let Some((file, language)) = File::from_dyn(buffer.file()).zip(buffer.language())
 634                {
 635                    if settings.enable_language_server(Some(&language.name())) {
 636                        let worktree = file.worktree.read(cx);
 637                        language_servers_to_start.push((
 638                            worktree.id(),
 639                            worktree.as_local().unwrap().abs_path().clone(),
 640                            language.clone(),
 641                        ));
 642                    }
 643                }
 644            }
 645        }
 646
 647        let mut language_servers_to_stop = Vec::new();
 648        for language in self.languages.to_vec() {
 649            if let Some(lsp_adapter) = language.lsp_adapter() {
 650                if !settings.enable_language_server(Some(&language.name())) {
 651                    let lsp_name = &lsp_adapter.name;
 652                    for (worktree_id, started_lsp_name) in self.language_server_ids.keys() {
 653                        if lsp_name == started_lsp_name {
 654                            language_servers_to_stop.push((*worktree_id, started_lsp_name.clone()));
 655                        }
 656                    }
 657                }
 658            }
 659        }
 660
 661        // Stop all newly-disabled language servers.
 662        for (worktree_id, adapter_name) in language_servers_to_stop {
 663            self.stop_language_server(worktree_id, adapter_name, cx)
 664                .detach();
 665        }
 666
 667        // Start all the newly-enabled language servers.
 668        for (worktree_id, worktree_path, language) in language_servers_to_start {
 669            self.start_language_server(worktree_id, worktree_path, language, cx);
 670        }
 671
 672        cx.notify();
 673    }
 674
 675    pub fn buffer_for_id(&self, remote_id: u64, cx: &AppContext) -> Option<ModelHandle<Buffer>> {
 676        self.opened_buffers
 677            .get(&remote_id)
 678            .and_then(|buffer| buffer.upgrade(cx))
 679    }
 680
 681    pub fn languages(&self) -> &Arc<LanguageRegistry> {
 682        &self.languages
 683    }
 684
 685    pub fn client(&self) -> Arc<Client> {
 686        self.client.clone()
 687    }
 688
 689    pub fn user_store(&self) -> ModelHandle<UserStore> {
 690        self.user_store.clone()
 691    }
 692
 693    #[cfg(any(test, feature = "test-support"))]
 694    pub fn opened_buffers(&self, cx: &AppContext) -> Vec<ModelHandle<Buffer>> {
 695        self.opened_buffers
 696            .values()
 697            .filter_map(|b| b.upgrade(cx))
 698            .collect()
 699    }
 700
 701    #[cfg(any(test, feature = "test-support"))]
 702    pub fn has_open_buffer(&self, path: impl Into<ProjectPath>, cx: &AppContext) -> bool {
 703        let path = path.into();
 704        if let Some(worktree) = self.worktree_for_id(path.worktree_id, cx) {
 705            self.opened_buffers.iter().any(|(_, buffer)| {
 706                if let Some(buffer) = buffer.upgrade(cx) {
 707                    if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
 708                        if file.worktree == worktree && file.path() == &path.path {
 709                            return true;
 710                        }
 711                    }
 712                }
 713                false
 714            })
 715        } else {
 716            false
 717        }
 718    }
 719
 720    pub fn fs(&self) -> &Arc<dyn Fs> {
 721        &self.fs
 722    }
 723
 724    pub fn remote_id(&self) -> Option<u64> {
 725        match self.client_state.as_ref()? {
 726            ProjectClientState::Local { remote_id, .. }
 727            | ProjectClientState::Remote { remote_id, .. } => Some(*remote_id),
 728        }
 729    }
 730
 731    pub fn replica_id(&self) -> ReplicaId {
 732        match &self.client_state {
 733            Some(ProjectClientState::Remote { replica_id, .. }) => *replica_id,
 734            _ => 0,
 735        }
 736    }
 737
 738    fn metadata_changed(&mut self, cx: &mut ModelContext<Self>) {
 739        if let Some(ProjectClientState::Local { updates_tx, .. }) = &mut self.client_state {
 740            updates_tx
 741                .unbounded_send(LocalProjectUpdate::WorktreesChanged)
 742                .ok();
 743        }
 744        cx.notify();
 745    }
 746
 747    pub fn collaborators(&self) -> &HashMap<proto::PeerId, Collaborator> {
 748        &self.collaborators
 749    }
 750
 751    /// Collect all worktrees, including ones that don't appear in the project panel
 752    pub fn worktrees<'a>(
 753        &'a self,
 754        cx: &'a AppContext,
 755    ) -> impl 'a + DoubleEndedIterator<Item = ModelHandle<Worktree>> {
 756        self.worktrees
 757            .iter()
 758            .filter_map(move |worktree| worktree.upgrade(cx))
 759    }
 760
 761    /// Collect all user-visible worktrees, the ones that appear in the project panel
 762    pub fn visible_worktrees<'a>(
 763        &'a self,
 764        cx: &'a AppContext,
 765    ) -> impl 'a + DoubleEndedIterator<Item = ModelHandle<Worktree>> {
 766        self.worktrees.iter().filter_map(|worktree| {
 767            worktree.upgrade(cx).and_then(|worktree| {
 768                if worktree.read(cx).is_visible() {
 769                    Some(worktree)
 770                } else {
 771                    None
 772                }
 773            })
 774        })
 775    }
 776
 777    pub fn worktree_root_names<'a>(&'a self, cx: &'a AppContext) -> impl Iterator<Item = &'a str> {
 778        self.visible_worktrees(cx)
 779            .map(|tree| tree.read(cx).root_name())
 780    }
 781
 782    pub fn worktree_for_id(
 783        &self,
 784        id: WorktreeId,
 785        cx: &AppContext,
 786    ) -> Option<ModelHandle<Worktree>> {
 787        self.worktrees(cx)
 788            .find(|worktree| worktree.read(cx).id() == id)
 789    }
 790
 791    pub fn worktree_for_entry(
 792        &self,
 793        entry_id: ProjectEntryId,
 794        cx: &AppContext,
 795    ) -> Option<ModelHandle<Worktree>> {
 796        self.worktrees(cx)
 797            .find(|worktree| worktree.read(cx).contains_entry(entry_id))
 798    }
 799
 800    pub fn worktree_id_for_entry(
 801        &self,
 802        entry_id: ProjectEntryId,
 803        cx: &AppContext,
 804    ) -> Option<WorktreeId> {
 805        self.worktree_for_entry(entry_id, cx)
 806            .map(|worktree| worktree.read(cx).id())
 807    }
 808
 809    pub fn contains_paths(&self, paths: &[PathBuf], cx: &AppContext) -> bool {
 810        paths.iter().all(|path| self.contains_path(path, cx))
 811    }
 812
 813    pub fn contains_path(&self, path: &Path, cx: &AppContext) -> bool {
 814        for worktree in self.worktrees(cx) {
 815            let worktree = worktree.read(cx).as_local();
 816            if worktree.map_or(false, |w| w.contains_abs_path(path)) {
 817                return true;
 818            }
 819        }
 820        false
 821    }
 822
 823    pub fn create_entry(
 824        &mut self,
 825        project_path: impl Into<ProjectPath>,
 826        is_directory: bool,
 827        cx: &mut ModelContext<Self>,
 828    ) -> Option<Task<Result<Entry>>> {
 829        let project_path = project_path.into();
 830        let worktree = self.worktree_for_id(project_path.worktree_id, cx)?;
 831        if self.is_local() {
 832            Some(worktree.update(cx, |worktree, cx| {
 833                worktree
 834                    .as_local_mut()
 835                    .unwrap()
 836                    .create_entry(project_path.path, is_directory, cx)
 837            }))
 838        } else {
 839            let client = self.client.clone();
 840            let project_id = self.remote_id().unwrap();
 841            Some(cx.spawn_weak(|_, mut cx| async move {
 842                let response = client
 843                    .request(proto::CreateProjectEntry {
 844                        worktree_id: project_path.worktree_id.to_proto(),
 845                        project_id,
 846                        path: project_path.path.to_string_lossy().into(),
 847                        is_directory,
 848                    })
 849                    .await?;
 850                let entry = response
 851                    .entry
 852                    .ok_or_else(|| anyhow!("missing entry in response"))?;
 853                worktree
 854                    .update(&mut cx, |worktree, cx| {
 855                        worktree.as_remote_mut().unwrap().insert_entry(
 856                            entry,
 857                            response.worktree_scan_id as usize,
 858                            cx,
 859                        )
 860                    })
 861                    .await
 862            }))
 863        }
 864    }
 865
 866    pub fn copy_entry(
 867        &mut self,
 868        entry_id: ProjectEntryId,
 869        new_path: impl Into<Arc<Path>>,
 870        cx: &mut ModelContext<Self>,
 871    ) -> Option<Task<Result<Entry>>> {
 872        let worktree = self.worktree_for_entry(entry_id, cx)?;
 873        let new_path = new_path.into();
 874        if self.is_local() {
 875            worktree.update(cx, |worktree, cx| {
 876                worktree
 877                    .as_local_mut()
 878                    .unwrap()
 879                    .copy_entry(entry_id, new_path, cx)
 880            })
 881        } else {
 882            let client = self.client.clone();
 883            let project_id = self.remote_id().unwrap();
 884
 885            Some(cx.spawn_weak(|_, mut cx| async move {
 886                let response = client
 887                    .request(proto::CopyProjectEntry {
 888                        project_id,
 889                        entry_id: entry_id.to_proto(),
 890                        new_path: new_path.to_string_lossy().into(),
 891                    })
 892                    .await?;
 893                let entry = response
 894                    .entry
 895                    .ok_or_else(|| anyhow!("missing entry in response"))?;
 896                worktree
 897                    .update(&mut cx, |worktree, cx| {
 898                        worktree.as_remote_mut().unwrap().insert_entry(
 899                            entry,
 900                            response.worktree_scan_id as usize,
 901                            cx,
 902                        )
 903                    })
 904                    .await
 905            }))
 906        }
 907    }
 908
 909    pub fn rename_entry(
 910        &mut self,
 911        entry_id: ProjectEntryId,
 912        new_path: impl Into<Arc<Path>>,
 913        cx: &mut ModelContext<Self>,
 914    ) -> Option<Task<Result<Entry>>> {
 915        let worktree = self.worktree_for_entry(entry_id, cx)?;
 916        let new_path = new_path.into();
 917        if self.is_local() {
 918            worktree.update(cx, |worktree, cx| {
 919                worktree
 920                    .as_local_mut()
 921                    .unwrap()
 922                    .rename_entry(entry_id, new_path, cx)
 923            })
 924        } else {
 925            let client = self.client.clone();
 926            let project_id = self.remote_id().unwrap();
 927
 928            Some(cx.spawn_weak(|_, mut cx| async move {
 929                let response = client
 930                    .request(proto::RenameProjectEntry {
 931                        project_id,
 932                        entry_id: entry_id.to_proto(),
 933                        new_path: new_path.to_string_lossy().into(),
 934                    })
 935                    .await?;
 936                let entry = response
 937                    .entry
 938                    .ok_or_else(|| anyhow!("missing entry in response"))?;
 939                worktree
 940                    .update(&mut cx, |worktree, cx| {
 941                        worktree.as_remote_mut().unwrap().insert_entry(
 942                            entry,
 943                            response.worktree_scan_id as usize,
 944                            cx,
 945                        )
 946                    })
 947                    .await
 948            }))
 949        }
 950    }
 951
 952    pub fn delete_entry(
 953        &mut self,
 954        entry_id: ProjectEntryId,
 955        cx: &mut ModelContext<Self>,
 956    ) -> Option<Task<Result<()>>> {
 957        let worktree = self.worktree_for_entry(entry_id, cx)?;
 958        if self.is_local() {
 959            worktree.update(cx, |worktree, cx| {
 960                worktree.as_local_mut().unwrap().delete_entry(entry_id, cx)
 961            })
 962        } else {
 963            let client = self.client.clone();
 964            let project_id = self.remote_id().unwrap();
 965            Some(cx.spawn_weak(|_, mut cx| async move {
 966                let response = client
 967                    .request(proto::DeleteProjectEntry {
 968                        project_id,
 969                        entry_id: entry_id.to_proto(),
 970                    })
 971                    .await?;
 972                worktree
 973                    .update(&mut cx, move |worktree, cx| {
 974                        worktree.as_remote_mut().unwrap().delete_entry(
 975                            entry_id,
 976                            response.worktree_scan_id as usize,
 977                            cx,
 978                        )
 979                    })
 980                    .await
 981            }))
 982        }
 983    }
 984
 985    pub fn shared(&mut self, project_id: u64, cx: &mut ModelContext<Self>) -> Result<()> {
 986        if self.client_state.is_some() {
 987            return Err(anyhow!("project was already shared"));
 988        }
 989        self.client_subscriptions.push(
 990            self.client
 991                .subscribe_to_entity(project_id)?
 992                .set_model(&cx.handle(), &mut cx.to_async()),
 993        );
 994
 995        for open_buffer in self.opened_buffers.values_mut() {
 996            match open_buffer {
 997                OpenBuffer::Strong(_) => {}
 998                OpenBuffer::Weak(buffer) => {
 999                    if let Some(buffer) = buffer.upgrade(cx) {
1000                        *open_buffer = OpenBuffer::Strong(buffer);
1001                    }
1002                }
1003                OpenBuffer::Operations(_) => unreachable!(),
1004            }
1005        }
1006
1007        for worktree_handle in self.worktrees.iter_mut() {
1008            match worktree_handle {
1009                WorktreeHandle::Strong(_) => {}
1010                WorktreeHandle::Weak(worktree) => {
1011                    if let Some(worktree) = worktree.upgrade(cx) {
1012                        *worktree_handle = WorktreeHandle::Strong(worktree);
1013                    }
1014                }
1015            }
1016        }
1017
1018        for (server_id, status) in &self.language_server_statuses {
1019            self.client
1020                .send(proto::StartLanguageServer {
1021                    project_id,
1022                    server: Some(proto::LanguageServer {
1023                        id: *server_id as u64,
1024                        name: status.name.clone(),
1025                    }),
1026                })
1027                .log_err();
1028        }
1029
1030        let (updates_tx, mut updates_rx) = mpsc::unbounded();
1031        let client = self.client.clone();
1032        self.client_state = Some(ProjectClientState::Local {
1033            remote_id: project_id,
1034            updates_tx,
1035            _send_updates: cx.spawn_weak(move |this, mut cx| async move {
1036                while let Some(update) = updates_rx.next().await {
1037                    let Some(this) = this.upgrade(&cx) else { break };
1038
1039                    match update {
1040                        LocalProjectUpdate::WorktreesChanged => {
1041                            let worktrees = this
1042                                .read_with(&cx, |this, cx| this.worktrees(cx).collect::<Vec<_>>());
1043                            let update_project = this
1044                                .read_with(&cx, |this, cx| {
1045                                    this.client.request(proto::UpdateProject {
1046                                        project_id,
1047                                        worktrees: this.worktree_metadata_protos(cx),
1048                                    })
1049                                })
1050                                .await;
1051                            if update_project.is_ok() {
1052                                for worktree in worktrees {
1053                                    worktree.update(&mut cx, |worktree, cx| {
1054                                        let worktree = worktree.as_local_mut().unwrap();
1055                                        worktree.share(project_id, cx).detach_and_log_err(cx)
1056                                    });
1057                                }
1058                            }
1059                        }
1060                        LocalProjectUpdate::CreateBufferForPeer { peer_id, buffer_id } => {
1061                            let buffer = this.update(&mut cx, |this, _| {
1062                                let buffer = this.opened_buffers.get(&buffer_id).unwrap();
1063                                let shared_buffers =
1064                                    this.shared_buffers.entry(peer_id).or_default();
1065                                if shared_buffers.insert(buffer_id) {
1066                                    if let OpenBuffer::Strong(buffer) = buffer {
1067                                        Some(buffer.clone())
1068                                    } else {
1069                                        None
1070                                    }
1071                                } else {
1072                                    None
1073                                }
1074                            });
1075
1076                            let Some(buffer) = buffer else { continue };
1077                            let operations =
1078                                buffer.read_with(&cx, |b, cx| b.serialize_ops(None, cx));
1079                            let operations = operations.await;
1080                            let state = buffer.read_with(&cx, |buffer, _| buffer.to_proto());
1081
1082                            let initial_state = proto::CreateBufferForPeer {
1083                                project_id,
1084                                peer_id: Some(peer_id),
1085                                variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
1086                            };
1087                            if client.send(initial_state).log_err().is_some() {
1088                                let client = client.clone();
1089                                cx.background()
1090                                    .spawn(async move {
1091                                        let mut chunks = split_operations(operations).peekable();
1092                                        while let Some(chunk) = chunks.next() {
1093                                            let is_last = chunks.peek().is_none();
1094                                            client.send(proto::CreateBufferForPeer {
1095                                                project_id,
1096                                                peer_id: Some(peer_id),
1097                                                variant: Some(
1098                                                    proto::create_buffer_for_peer::Variant::Chunk(
1099                                                        proto::BufferChunk {
1100                                                            buffer_id,
1101                                                            operations: chunk,
1102                                                            is_last,
1103                                                        },
1104                                                    ),
1105                                                ),
1106                                            })?;
1107                                        }
1108                                        anyhow::Ok(())
1109                                    })
1110                                    .await
1111                                    .log_err();
1112                            }
1113                        }
1114                    }
1115                }
1116            }),
1117        });
1118
1119        self.metadata_changed(cx);
1120        cx.emit(Event::RemoteIdChanged(Some(project_id)));
1121        cx.notify();
1122        Ok(())
1123    }
1124
1125    pub fn reshared(
1126        &mut self,
1127        message: proto::ResharedProject,
1128        cx: &mut ModelContext<Self>,
1129    ) -> Result<()> {
1130        self.shared_buffers.clear();
1131        self.set_collaborators_from_proto(message.collaborators, cx)?;
1132        self.metadata_changed(cx);
1133        Ok(())
1134    }
1135
1136    pub fn rejoined(
1137        &mut self,
1138        message: proto::RejoinedProject,
1139        message_id: u32,
1140        cx: &mut ModelContext<Self>,
1141    ) -> Result<()> {
1142        self.join_project_response_message_id = message_id;
1143        self.set_worktrees_from_proto(message.worktrees, cx)?;
1144        self.set_collaborators_from_proto(message.collaborators, cx)?;
1145        self.language_server_statuses = message
1146            .language_servers
1147            .into_iter()
1148            .map(|server| {
1149                (
1150                    server.id as usize,
1151                    LanguageServerStatus {
1152                        name: server.name,
1153                        pending_work: Default::default(),
1154                        has_pending_diagnostic_updates: false,
1155                        progress_tokens: Default::default(),
1156                    },
1157                )
1158            })
1159            .collect();
1160        self.buffer_ordered_messages_tx
1161            .unbounded_send(BufferOrderedMessage::Resync)
1162            .unwrap();
1163        cx.notify();
1164        Ok(())
1165    }
1166
1167    pub fn unshare(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1168        self.unshare_internal(cx)?;
1169        self.metadata_changed(cx);
1170        cx.notify();
1171        Ok(())
1172    }
1173
1174    fn unshare_internal(&mut self, cx: &mut AppContext) -> Result<()> {
1175        if self.is_remote() {
1176            return Err(anyhow!("attempted to unshare a remote project"));
1177        }
1178
1179        if let Some(ProjectClientState::Local { remote_id, .. }) = self.client_state.take() {
1180            self.collaborators.clear();
1181            self.shared_buffers.clear();
1182            self.client_subscriptions.clear();
1183
1184            for worktree_handle in self.worktrees.iter_mut() {
1185                if let WorktreeHandle::Strong(worktree) = worktree_handle {
1186                    let is_visible = worktree.update(cx, |worktree, _| {
1187                        worktree.as_local_mut().unwrap().unshare();
1188                        worktree.is_visible()
1189                    });
1190                    if !is_visible {
1191                        *worktree_handle = WorktreeHandle::Weak(worktree.downgrade());
1192                    }
1193                }
1194            }
1195
1196            for open_buffer in self.opened_buffers.values_mut() {
1197                // Wake up any tasks waiting for peers' edits to this buffer.
1198                if let Some(buffer) = open_buffer.upgrade(cx) {
1199                    buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1200                }
1201
1202                if let OpenBuffer::Strong(buffer) = open_buffer {
1203                    *open_buffer = OpenBuffer::Weak(buffer.downgrade());
1204                }
1205            }
1206
1207            self.client.send(proto::UnshareProject {
1208                project_id: remote_id,
1209            })?;
1210
1211            Ok(())
1212        } else {
1213            Err(anyhow!("attempted to unshare an unshared project"))
1214        }
1215    }
1216
1217    pub fn disconnected_from_host(&mut self, cx: &mut ModelContext<Self>) {
1218        self.disconnected_from_host_internal(cx);
1219        cx.emit(Event::DisconnectedFromHost);
1220        cx.notify();
1221    }
1222
1223    fn disconnected_from_host_internal(&mut self, cx: &mut AppContext) {
1224        if let Some(ProjectClientState::Remote {
1225            sharing_has_stopped,
1226            ..
1227        }) = &mut self.client_state
1228        {
1229            *sharing_has_stopped = true;
1230
1231            self.collaborators.clear();
1232
1233            for worktree in &self.worktrees {
1234                if let Some(worktree) = worktree.upgrade(cx) {
1235                    worktree.update(cx, |worktree, _| {
1236                        if let Some(worktree) = worktree.as_remote_mut() {
1237                            worktree.disconnected_from_host();
1238                        }
1239                    });
1240                }
1241            }
1242
1243            for open_buffer in self.opened_buffers.values_mut() {
1244                // Wake up any tasks waiting for peers' edits to this buffer.
1245                if let Some(buffer) = open_buffer.upgrade(cx) {
1246                    buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1247                }
1248
1249                if let OpenBuffer::Strong(buffer) = open_buffer {
1250                    *open_buffer = OpenBuffer::Weak(buffer.downgrade());
1251                }
1252            }
1253
1254            // Wake up all futures currently waiting on a buffer to get opened,
1255            // to give them a chance to fail now that we've disconnected.
1256            *self.opened_buffer.0.borrow_mut() = ();
1257        }
1258    }
1259
1260    pub fn close(&mut self, cx: &mut ModelContext<Self>) {
1261        cx.emit(Event::Closed);
1262    }
1263
1264    pub fn is_read_only(&self) -> bool {
1265        match &self.client_state {
1266            Some(ProjectClientState::Remote {
1267                sharing_has_stopped,
1268                ..
1269            }) => *sharing_has_stopped,
1270            _ => false,
1271        }
1272    }
1273
1274    pub fn is_local(&self) -> bool {
1275        match &self.client_state {
1276            Some(ProjectClientState::Remote { .. }) => false,
1277            _ => true,
1278        }
1279    }
1280
1281    pub fn is_remote(&self) -> bool {
1282        !self.is_local()
1283    }
1284
1285    pub fn create_buffer(
1286        &mut self,
1287        text: &str,
1288        language: Option<Arc<Language>>,
1289        cx: &mut ModelContext<Self>,
1290    ) -> Result<ModelHandle<Buffer>> {
1291        if self.is_remote() {
1292            return Err(anyhow!("creating buffers as a guest is not supported yet"));
1293        }
1294
1295        let buffer = cx.add_model(|cx| {
1296            Buffer::new(self.replica_id(), text, cx)
1297                .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
1298        });
1299        self.register_buffer(&buffer, cx)?;
1300        Ok(buffer)
1301    }
1302
1303    pub fn open_path(
1304        &mut self,
1305        path: impl Into<ProjectPath>,
1306        cx: &mut ModelContext<Self>,
1307    ) -> Task<Result<(ProjectEntryId, AnyModelHandle)>> {
1308        let task = self.open_buffer(path, cx);
1309        cx.spawn_weak(|_, cx| async move {
1310            let buffer = task.await?;
1311            let project_entry_id = buffer
1312                .read_with(&cx, |buffer, cx| {
1313                    File::from_dyn(buffer.file()).and_then(|file| file.project_entry_id(cx))
1314                })
1315                .ok_or_else(|| anyhow!("no project entry"))?;
1316
1317            let buffer: &AnyModelHandle = &buffer;
1318            Ok((project_entry_id, buffer.clone()))
1319        })
1320    }
1321
1322    pub fn open_local_buffer(
1323        &mut self,
1324        abs_path: impl AsRef<Path>,
1325        cx: &mut ModelContext<Self>,
1326    ) -> Task<Result<ModelHandle<Buffer>>> {
1327        if let Some((worktree, relative_path)) = self.find_local_worktree(abs_path.as_ref(), cx) {
1328            self.open_buffer((worktree.read(cx).id(), relative_path), cx)
1329        } else {
1330            Task::ready(Err(anyhow!("no such path")))
1331        }
1332    }
1333
1334    pub fn open_buffer(
1335        &mut self,
1336        path: impl Into<ProjectPath>,
1337        cx: &mut ModelContext<Self>,
1338    ) -> Task<Result<ModelHandle<Buffer>>> {
1339        let project_path = path.into();
1340        let worktree = if let Some(worktree) = self.worktree_for_id(project_path.worktree_id, cx) {
1341            worktree
1342        } else {
1343            return Task::ready(Err(anyhow!("no such worktree")));
1344        };
1345
1346        // If there is already a buffer for the given path, then return it.
1347        let existing_buffer = self.get_open_buffer(&project_path, cx);
1348        if let Some(existing_buffer) = existing_buffer {
1349            return Task::ready(Ok(existing_buffer));
1350        }
1351
1352        let mut loading_watch = match self.loading_buffers_by_path.entry(project_path.clone()) {
1353            // If the given path is already being loaded, then wait for that existing
1354            // task to complete and return the same buffer.
1355            hash_map::Entry::Occupied(e) => e.get().clone(),
1356
1357            // Otherwise, record the fact that this path is now being loaded.
1358            hash_map::Entry::Vacant(entry) => {
1359                let (mut tx, rx) = postage::watch::channel();
1360                entry.insert(rx.clone());
1361
1362                let load_buffer = if worktree.read(cx).is_local() {
1363                    self.open_local_buffer_internal(&project_path.path, &worktree, cx)
1364                } else {
1365                    self.open_remote_buffer_internal(&project_path.path, &worktree, cx)
1366                };
1367
1368                cx.spawn(move |this, mut cx| async move {
1369                    let load_result = load_buffer.await;
1370                    *tx.borrow_mut() = Some(this.update(&mut cx, |this, _| {
1371                        // Record the fact that the buffer is no longer loading.
1372                        this.loading_buffers_by_path.remove(&project_path);
1373                        let buffer = load_result.map_err(Arc::new)?;
1374                        Ok(buffer)
1375                    }));
1376                })
1377                .detach();
1378                rx
1379            }
1380        };
1381
1382        cx.foreground().spawn(async move {
1383            loop {
1384                if let Some(result) = loading_watch.borrow().as_ref() {
1385                    match result {
1386                        Ok(buffer) => return Ok(buffer.clone()),
1387                        Err(error) => return Err(anyhow!("{}", error)),
1388                    }
1389                }
1390                loading_watch.next().await;
1391            }
1392        })
1393    }
1394
1395    fn open_local_buffer_internal(
1396        &mut self,
1397        path: &Arc<Path>,
1398        worktree: &ModelHandle<Worktree>,
1399        cx: &mut ModelContext<Self>,
1400    ) -> Task<Result<ModelHandle<Buffer>>> {
1401        let load_buffer = worktree.update(cx, |worktree, cx| {
1402            let worktree = worktree.as_local_mut().unwrap();
1403            worktree.load_buffer(path, cx)
1404        });
1405        cx.spawn(|this, mut cx| async move {
1406            let buffer = load_buffer.await?;
1407            this.update(&mut cx, |this, cx| this.register_buffer(&buffer, cx))?;
1408            Ok(buffer)
1409        })
1410    }
1411
1412    fn open_remote_buffer_internal(
1413        &mut self,
1414        path: &Arc<Path>,
1415        worktree: &ModelHandle<Worktree>,
1416        cx: &mut ModelContext<Self>,
1417    ) -> Task<Result<ModelHandle<Buffer>>> {
1418        let rpc = self.client.clone();
1419        let project_id = self.remote_id().unwrap();
1420        let remote_worktree_id = worktree.read(cx).id();
1421        let path = path.clone();
1422        let path_string = path.to_string_lossy().to_string();
1423        cx.spawn(|this, mut cx| async move {
1424            let response = rpc
1425                .request(proto::OpenBufferByPath {
1426                    project_id,
1427                    worktree_id: remote_worktree_id.to_proto(),
1428                    path: path_string,
1429                })
1430                .await?;
1431            this.update(&mut cx, |this, cx| {
1432                this.wait_for_remote_buffer(response.buffer_id, cx)
1433            })
1434            .await
1435        })
1436    }
1437
1438    /// LanguageServerName is owned, because it is inserted into a map
1439    fn open_local_buffer_via_lsp(
1440        &mut self,
1441        abs_path: lsp::Url,
1442        language_server_id: usize,
1443        language_server_name: LanguageServerName,
1444        cx: &mut ModelContext<Self>,
1445    ) -> Task<Result<ModelHandle<Buffer>>> {
1446        cx.spawn(|this, mut cx| async move {
1447            let abs_path = abs_path
1448                .to_file_path()
1449                .map_err(|_| anyhow!("can't convert URI to path"))?;
1450            let (worktree, relative_path) = if let Some(result) =
1451                this.read_with(&cx, |this, cx| this.find_local_worktree(&abs_path, cx))
1452            {
1453                result
1454            } else {
1455                let worktree = this
1456                    .update(&mut cx, |this, cx| {
1457                        this.create_local_worktree(&abs_path, false, cx)
1458                    })
1459                    .await?;
1460                this.update(&mut cx, |this, cx| {
1461                    this.language_server_ids.insert(
1462                        (worktree.read(cx).id(), language_server_name),
1463                        language_server_id,
1464                    );
1465                });
1466                (worktree, PathBuf::new())
1467            };
1468
1469            let project_path = ProjectPath {
1470                worktree_id: worktree.read_with(&cx, |worktree, _| worktree.id()),
1471                path: relative_path.into(),
1472            };
1473            this.update(&mut cx, |this, cx| this.open_buffer(project_path, cx))
1474                .await
1475        })
1476    }
1477
1478    pub fn open_buffer_by_id(
1479        &mut self,
1480        id: u64,
1481        cx: &mut ModelContext<Self>,
1482    ) -> Task<Result<ModelHandle<Buffer>>> {
1483        if let Some(buffer) = self.buffer_for_id(id, cx) {
1484            Task::ready(Ok(buffer))
1485        } else if self.is_local() {
1486            Task::ready(Err(anyhow!("buffer {} does not exist", id)))
1487        } else if let Some(project_id) = self.remote_id() {
1488            let request = self
1489                .client
1490                .request(proto::OpenBufferById { project_id, id });
1491            cx.spawn(|this, mut cx| async move {
1492                let buffer_id = request.await?.buffer_id;
1493                this.update(&mut cx, |this, cx| {
1494                    this.wait_for_remote_buffer(buffer_id, cx)
1495                })
1496                .await
1497            })
1498        } else {
1499            Task::ready(Err(anyhow!("cannot open buffer while disconnected")))
1500        }
1501    }
1502
1503    pub fn save_buffers(
1504        &self,
1505        buffers: HashSet<ModelHandle<Buffer>>,
1506        cx: &mut ModelContext<Self>,
1507    ) -> Task<Result<()>> {
1508        cx.spawn(|this, mut cx| async move {
1509            let save_tasks = buffers
1510                .into_iter()
1511                .map(|buffer| this.update(&mut cx, |this, cx| this.save_buffer(buffer, cx)));
1512            try_join_all(save_tasks).await?;
1513            Ok(())
1514        })
1515    }
1516
1517    pub fn save_buffer(
1518        &self,
1519        buffer: ModelHandle<Buffer>,
1520        cx: &mut ModelContext<Self>,
1521    ) -> Task<Result<(clock::Global, RopeFingerprint, SystemTime)>> {
1522        let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
1523            return Task::ready(Err(anyhow!("buffer doesn't have a file")));
1524        };
1525        let worktree = file.worktree.clone();
1526        let path = file.path.clone();
1527        worktree.update(cx, |worktree, cx| match worktree {
1528            Worktree::Local(worktree) => worktree.save_buffer(buffer, path, false, cx),
1529            Worktree::Remote(worktree) => worktree.save_buffer(buffer, cx),
1530        })
1531    }
1532
1533    pub fn save_buffer_as(
1534        &mut self,
1535        buffer: ModelHandle<Buffer>,
1536        abs_path: PathBuf,
1537        cx: &mut ModelContext<Self>,
1538    ) -> Task<Result<()>> {
1539        let worktree_task = self.find_or_create_local_worktree(&abs_path, true, cx);
1540        let old_path =
1541            File::from_dyn(buffer.read(cx).file()).and_then(|f| Some(f.as_local()?.abs_path(cx)));
1542        cx.spawn(|this, mut cx| async move {
1543            if let Some(old_path) = old_path {
1544                this.update(&mut cx, |this, cx| {
1545                    this.unregister_buffer_from_language_server(&buffer, old_path, cx);
1546                });
1547            }
1548            let (worktree, path) = worktree_task.await?;
1549            worktree
1550                .update(&mut cx, |worktree, cx| match worktree {
1551                    Worktree::Local(worktree) => {
1552                        worktree.save_buffer(buffer.clone(), path.into(), true, cx)
1553                    }
1554                    Worktree::Remote(_) => panic!("cannot remote buffers as new files"),
1555                })
1556                .await?;
1557            this.update(&mut cx, |this, cx| {
1558                this.detect_language_for_buffer(&buffer, cx);
1559                this.register_buffer_with_language_server(&buffer, cx);
1560            });
1561            Ok(())
1562        })
1563    }
1564
1565    pub fn get_open_buffer(
1566        &mut self,
1567        path: &ProjectPath,
1568        cx: &mut ModelContext<Self>,
1569    ) -> Option<ModelHandle<Buffer>> {
1570        let worktree = self.worktree_for_id(path.worktree_id, cx)?;
1571        self.opened_buffers.values().find_map(|buffer| {
1572            let buffer = buffer.upgrade(cx)?;
1573            let file = File::from_dyn(buffer.read(cx).file())?;
1574            if file.worktree == worktree && file.path() == &path.path {
1575                Some(buffer)
1576            } else {
1577                None
1578            }
1579        })
1580    }
1581
1582    fn register_buffer(
1583        &mut self,
1584        buffer: &ModelHandle<Buffer>,
1585        cx: &mut ModelContext<Self>,
1586    ) -> Result<()> {
1587        buffer.update(cx, |buffer, _| {
1588            buffer.set_language_registry(self.languages.clone())
1589        });
1590
1591        let remote_id = buffer.read(cx).remote_id();
1592        let is_remote = self.is_remote();
1593        let open_buffer = if is_remote || self.is_shared() {
1594            OpenBuffer::Strong(buffer.clone())
1595        } else {
1596            OpenBuffer::Weak(buffer.downgrade())
1597        };
1598
1599        match self.opened_buffers.entry(remote_id) {
1600            hash_map::Entry::Vacant(entry) => {
1601                entry.insert(open_buffer);
1602            }
1603            hash_map::Entry::Occupied(mut entry) => {
1604                if let OpenBuffer::Operations(operations) = entry.get_mut() {
1605                    buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx))?;
1606                } else if entry.get().upgrade(cx).is_some() {
1607                    if is_remote {
1608                        return Ok(());
1609                    } else {
1610                        debug_panic!("buffer {} was already registered", remote_id);
1611                        Err(anyhow!("buffer {} was already registered", remote_id))?;
1612                    }
1613                }
1614                entry.insert(open_buffer);
1615            }
1616        }
1617        cx.subscribe(buffer, |this, buffer, event, cx| {
1618            this.on_buffer_event(buffer, event, cx);
1619        })
1620        .detach();
1621
1622        self.detect_language_for_buffer(buffer, cx);
1623        self.register_buffer_with_language_server(buffer, cx);
1624        cx.observe_release(buffer, |this, buffer, cx| {
1625            if let Some(file) = File::from_dyn(buffer.file()) {
1626                if file.is_local() {
1627                    let uri = lsp::Url::from_file_path(file.abs_path(cx)).unwrap();
1628                    if let Some((_, server)) = this.language_server_for_buffer(buffer, cx) {
1629                        server
1630                            .notify::<lsp::notification::DidCloseTextDocument>(
1631                                lsp::DidCloseTextDocumentParams {
1632                                    text_document: lsp::TextDocumentIdentifier::new(uri),
1633                                },
1634                            )
1635                            .log_err();
1636                    }
1637                }
1638            }
1639        })
1640        .detach();
1641
1642        *self.opened_buffer.0.borrow_mut() = ();
1643        Ok(())
1644    }
1645
1646    fn register_buffer_with_language_server(
1647        &mut self,
1648        buffer_handle: &ModelHandle<Buffer>,
1649        cx: &mut ModelContext<Self>,
1650    ) {
1651        let buffer = buffer_handle.read(cx);
1652        let buffer_id = buffer.remote_id();
1653        if let Some(file) = File::from_dyn(buffer.file()) {
1654            if file.is_local() {
1655                let uri = lsp::Url::from_file_path(file.abs_path(cx)).unwrap();
1656                let initial_snapshot = buffer.text_snapshot();
1657
1658                let mut language_server = None;
1659                let mut language_id = None;
1660                if let Some(language) = buffer.language() {
1661                    let worktree_id = file.worktree_id(cx);
1662                    if let Some(adapter) = language.lsp_adapter() {
1663                        language_id = adapter.language_ids.get(language.name().as_ref()).cloned();
1664                        language_server = self
1665                            .language_server_ids
1666                            .get(&(worktree_id, adapter.name.clone()))
1667                            .and_then(|id| self.language_servers.get(id))
1668                            .and_then(|server_state| {
1669                                if let LanguageServerState::Running { server, .. } = server_state {
1670                                    Some(server.clone())
1671                                } else {
1672                                    None
1673                                }
1674                            });
1675                    }
1676                }
1677
1678                if let Some(local_worktree) = file.worktree.read(cx).as_local() {
1679                    if let Some(diagnostics) = local_worktree.diagnostics_for_path(file.path()) {
1680                        self.update_buffer_diagnostics(buffer_handle, diagnostics, None, cx)
1681                            .log_err();
1682                    }
1683                }
1684
1685                if let Some(server) = language_server {
1686                    server
1687                        .notify::<lsp::notification::DidOpenTextDocument>(
1688                            lsp::DidOpenTextDocumentParams {
1689                                text_document: lsp::TextDocumentItem::new(
1690                                    uri,
1691                                    language_id.unwrap_or_default(),
1692                                    0,
1693                                    initial_snapshot.text(),
1694                                ),
1695                            },
1696                        )
1697                        .log_err();
1698                    buffer_handle.update(cx, |buffer, cx| {
1699                        buffer.set_completion_triggers(
1700                            server
1701                                .capabilities()
1702                                .completion_provider
1703                                .as_ref()
1704                                .and_then(|provider| provider.trigger_characters.clone())
1705                                .unwrap_or_default(),
1706                            cx,
1707                        )
1708                    });
1709                    self.buffer_snapshots
1710                        .insert(buffer_id, vec![(0, initial_snapshot)]);
1711                }
1712            }
1713        }
1714    }
1715
1716    fn unregister_buffer_from_language_server(
1717        &mut self,
1718        buffer: &ModelHandle<Buffer>,
1719        old_path: PathBuf,
1720        cx: &mut ModelContext<Self>,
1721    ) {
1722        buffer.update(cx, |buffer, cx| {
1723            buffer.update_diagnostics(Default::default(), cx);
1724            self.buffer_snapshots.remove(&buffer.remote_id());
1725            if let Some((_, language_server)) = self.language_server_for_buffer(buffer, cx) {
1726                language_server
1727                    .notify::<lsp::notification::DidCloseTextDocument>(
1728                        lsp::DidCloseTextDocumentParams {
1729                            text_document: lsp::TextDocumentIdentifier::new(
1730                                lsp::Url::from_file_path(old_path).unwrap(),
1731                            ),
1732                        },
1733                    )
1734                    .log_err();
1735            }
1736        });
1737    }
1738
1739    async fn send_buffer_ordered_messages(
1740        this: WeakModelHandle<Self>,
1741        rx: UnboundedReceiver<BufferOrderedMessage>,
1742        mut cx: AsyncAppContext,
1743    ) -> Option<()> {
1744        const MAX_BATCH_SIZE: usize = 128;
1745
1746        let mut operations_by_buffer_id = HashMap::default();
1747        async fn flush_operations(
1748            this: &ModelHandle<Project>,
1749            operations_by_buffer_id: &mut HashMap<u64, Vec<proto::Operation>>,
1750            needs_resync_with_host: &mut bool,
1751            is_local: bool,
1752            cx: &AsyncAppContext,
1753        ) {
1754            for (buffer_id, operations) in operations_by_buffer_id.drain() {
1755                let request = this.read_with(cx, |this, _| {
1756                    let project_id = this.remote_id()?;
1757                    Some(this.client.request(proto::UpdateBuffer {
1758                        buffer_id,
1759                        project_id,
1760                        operations,
1761                    }))
1762                });
1763                if let Some(request) = request {
1764                    if request.await.is_err() && !is_local {
1765                        *needs_resync_with_host = true;
1766                        break;
1767                    }
1768                }
1769            }
1770        }
1771
1772        let mut needs_resync_with_host = false;
1773        let mut changes = rx.ready_chunks(MAX_BATCH_SIZE);
1774
1775        while let Some(changes) = changes.next().await {
1776            let this = this.upgrade(&mut cx)?;
1777            let is_local = this.read_with(&cx, |this, _| this.is_local());
1778
1779            for change in changes {
1780                match change {
1781                    BufferOrderedMessage::Operation {
1782                        buffer_id,
1783                        operation,
1784                    } => {
1785                        if needs_resync_with_host {
1786                            continue;
1787                        }
1788
1789                        operations_by_buffer_id
1790                            .entry(buffer_id)
1791                            .or_insert(Vec::new())
1792                            .push(operation);
1793                    }
1794
1795                    BufferOrderedMessage::Resync => {
1796                        operations_by_buffer_id.clear();
1797                        if this
1798                            .update(&mut cx, |this, cx| this.synchronize_remote_buffers(cx))
1799                            .await
1800                            .is_ok()
1801                        {
1802                            needs_resync_with_host = false;
1803                        }
1804                    }
1805
1806                    BufferOrderedMessage::LanguageServerUpdate {
1807                        language_server_id,
1808                        message,
1809                    } => {
1810                        flush_operations(
1811                            &this,
1812                            &mut operations_by_buffer_id,
1813                            &mut needs_resync_with_host,
1814                            is_local,
1815                            &cx,
1816                        )
1817                        .await;
1818
1819                        this.read_with(&cx, |this, _| {
1820                            if let Some(project_id) = this.remote_id() {
1821                                this.client
1822                                    .send(proto::UpdateLanguageServer {
1823                                        project_id,
1824                                        language_server_id: language_server_id as u64,
1825                                        variant: Some(message),
1826                                    })
1827                                    .log_err();
1828                            }
1829                        });
1830                    }
1831                }
1832            }
1833
1834            flush_operations(
1835                &this,
1836                &mut operations_by_buffer_id,
1837                &mut needs_resync_with_host,
1838                is_local,
1839                &cx,
1840            )
1841            .await;
1842        }
1843
1844        None
1845    }
1846
1847    fn on_buffer_event(
1848        &mut self,
1849        buffer: ModelHandle<Buffer>,
1850        event: &BufferEvent,
1851        cx: &mut ModelContext<Self>,
1852    ) -> Option<()> {
1853        match event {
1854            BufferEvent::Operation(operation) => {
1855                self.buffer_ordered_messages_tx
1856                    .unbounded_send(BufferOrderedMessage::Operation {
1857                        buffer_id: buffer.read(cx).remote_id(),
1858                        operation: language::proto::serialize_operation(operation),
1859                    })
1860                    .ok();
1861            }
1862            BufferEvent::Edited { .. } => {
1863                let language_server = self
1864                    .language_server_for_buffer(buffer.read(cx), cx)
1865                    .map(|(_, server)| server.clone())?;
1866                let buffer = buffer.read(cx);
1867                let file = File::from_dyn(buffer.file())?;
1868                let abs_path = file.as_local()?.abs_path(cx);
1869                let uri = lsp::Url::from_file_path(abs_path).unwrap();
1870                let buffer_snapshots = self.buffer_snapshots.get_mut(&buffer.remote_id())?;
1871                let (version, prev_snapshot) = buffer_snapshots.last()?;
1872                let next_snapshot = buffer.text_snapshot();
1873                let next_version = version + 1;
1874
1875                let content_changes = buffer
1876                    .edits_since::<(PointUtf16, usize)>(prev_snapshot.version())
1877                    .map(|edit| {
1878                        let edit_start = edit.new.start.0;
1879                        let edit_end = edit_start + (edit.old.end.0 - edit.old.start.0);
1880                        let new_text = next_snapshot
1881                            .text_for_range(edit.new.start.1..edit.new.end.1)
1882                            .collect();
1883                        lsp::TextDocumentContentChangeEvent {
1884                            range: Some(lsp::Range::new(
1885                                point_to_lsp(edit_start),
1886                                point_to_lsp(edit_end),
1887                            )),
1888                            range_length: None,
1889                            text: new_text,
1890                        }
1891                    })
1892                    .collect();
1893
1894                buffer_snapshots.push((next_version, next_snapshot));
1895
1896                language_server
1897                    .notify::<lsp::notification::DidChangeTextDocument>(
1898                        lsp::DidChangeTextDocumentParams {
1899                            text_document: lsp::VersionedTextDocumentIdentifier::new(
1900                                uri,
1901                                next_version,
1902                            ),
1903                            content_changes,
1904                        },
1905                    )
1906                    .log_err();
1907            }
1908            BufferEvent::Saved => {
1909                let file = File::from_dyn(buffer.read(cx).file())?;
1910                let worktree_id = file.worktree_id(cx);
1911                let abs_path = file.as_local()?.abs_path(cx);
1912                let text_document = lsp::TextDocumentIdentifier {
1913                    uri: lsp::Url::from_file_path(abs_path).unwrap(),
1914                };
1915
1916                for (_, _, server) in self.language_servers_for_worktree(worktree_id) {
1917                    server
1918                        .notify::<lsp::notification::DidSaveTextDocument>(
1919                            lsp::DidSaveTextDocumentParams {
1920                                text_document: text_document.clone(),
1921                                text: None,
1922                            },
1923                        )
1924                        .log_err();
1925                }
1926
1927                let language_server_id = self.language_server_id_for_buffer(buffer.read(cx), cx)?;
1928                if let Some(LanguageServerState::Running {
1929                    adapter,
1930                    simulate_disk_based_diagnostics_completion,
1931                    ..
1932                }) = self.language_servers.get_mut(&language_server_id)
1933                {
1934                    // After saving a buffer using a language server that doesn't provide
1935                    // a disk-based progress token, kick off a timer that will reset every
1936                    // time the buffer is saved. If the timer eventually fires, simulate
1937                    // disk-based diagnostics being finished so that other pieces of UI
1938                    // (e.g., project diagnostics view, diagnostic status bar) can update.
1939                    // We don't emit an event right away because the language server might take
1940                    // some time to publish diagnostics.
1941                    if adapter.disk_based_diagnostics_progress_token.is_none() {
1942                        const DISK_BASED_DIAGNOSTICS_DEBOUNCE: Duration = Duration::from_secs(1);
1943
1944                        let task = cx.spawn_weak(|this, mut cx| async move {
1945                            cx.background().timer(DISK_BASED_DIAGNOSTICS_DEBOUNCE).await;
1946                            if let Some(this) = this.upgrade(&cx) {
1947                                this.update(&mut cx, |this, cx| {
1948                                    this.disk_based_diagnostics_finished(
1949                                        language_server_id,
1950                                        cx,
1951                                    );
1952                                    this.buffer_ordered_messages_tx
1953                                        .unbounded_send(
1954                                            BufferOrderedMessage::LanguageServerUpdate {
1955                                                language_server_id,
1956                                                message:proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(Default::default())
1957                                            },
1958                                        )
1959                                        .ok();
1960                                });
1961                            }
1962                        });
1963                        *simulate_disk_based_diagnostics_completion = Some(task);
1964                    }
1965                }
1966            }
1967            _ => {}
1968        }
1969
1970        None
1971    }
1972
1973    fn language_servers_for_worktree(
1974        &self,
1975        worktree_id: WorktreeId,
1976    ) -> impl Iterator<Item = (&Arc<CachedLspAdapter>, &Arc<Language>, &Arc<LanguageServer>)> {
1977        self.language_server_ids
1978            .iter()
1979            .filter_map(move |((language_server_worktree_id, _), id)| {
1980                if *language_server_worktree_id == worktree_id {
1981                    if let Some(LanguageServerState::Running {
1982                        adapter,
1983                        language,
1984                        server,
1985                        ..
1986                    }) = self.language_servers.get(id)
1987                    {
1988                        return Some((adapter, language, server));
1989                    }
1990                }
1991                None
1992            })
1993    }
1994
1995    fn maintain_buffer_languages(
1996        languages: &LanguageRegistry,
1997        cx: &mut ModelContext<Project>,
1998    ) -> Task<()> {
1999        let mut subscription = languages.subscribe();
2000        cx.spawn_weak(|project, mut cx| async move {
2001            while let Some(()) = subscription.next().await {
2002                if let Some(project) = project.upgrade(&cx) {
2003                    project.update(&mut cx, |project, cx| {
2004                        let mut plain_text_buffers = Vec::new();
2005                        let mut buffers_with_unknown_injections = Vec::new();
2006                        for buffer in project.opened_buffers.values() {
2007                            if let Some(handle) = buffer.upgrade(cx) {
2008                                let buffer = &handle.read(cx);
2009                                if buffer.language().is_none()
2010                                    || buffer.language() == Some(&*language::PLAIN_TEXT)
2011                                {
2012                                    plain_text_buffers.push(handle);
2013                                } else if buffer.contains_unknown_injections() {
2014                                    buffers_with_unknown_injections.push(handle);
2015                                }
2016                            }
2017                        }
2018
2019                        for buffer in plain_text_buffers {
2020                            project.detect_language_for_buffer(&buffer, cx);
2021                            project.register_buffer_with_language_server(&buffer, cx);
2022                        }
2023
2024                        for buffer in buffers_with_unknown_injections {
2025                            buffer.update(cx, |buffer, cx| buffer.reparse(cx));
2026                        }
2027                    });
2028                }
2029            }
2030        })
2031    }
2032
2033    fn maintain_workspace_config(
2034        languages: Arc<LanguageRegistry>,
2035        cx: &mut ModelContext<Project>,
2036    ) -> Task<()> {
2037        let (mut settings_changed_tx, mut settings_changed_rx) = watch::channel();
2038        let _ = postage::stream::Stream::try_recv(&mut settings_changed_rx);
2039
2040        let settings_observation = cx.observe_global::<Settings, _>(move |_, _| {
2041            *settings_changed_tx.borrow_mut() = ();
2042        });
2043        cx.spawn_weak(|this, mut cx| async move {
2044            while let Some(_) = settings_changed_rx.next().await {
2045                let workspace_config = cx.update(|cx| languages.workspace_configuration(cx)).await;
2046                if let Some(this) = this.upgrade(&cx) {
2047                    this.read_with(&cx, |this, _| {
2048                        for server_state in this.language_servers.values() {
2049                            if let LanguageServerState::Running { server, .. } = server_state {
2050                                server
2051                                    .notify::<lsp::notification::DidChangeConfiguration>(
2052                                        lsp::DidChangeConfigurationParams {
2053                                            settings: workspace_config.clone(),
2054                                        },
2055                                    )
2056                                    .ok();
2057                            }
2058                        }
2059                    })
2060                } else {
2061                    break;
2062                }
2063            }
2064
2065            drop(settings_observation);
2066        })
2067    }
2068
2069    fn detect_language_for_buffer(
2070        &mut self,
2071        buffer: &ModelHandle<Buffer>,
2072        cx: &mut ModelContext<Self>,
2073    ) -> Option<()> {
2074        // If the buffer has a language, set it and start the language server if we haven't already.
2075        let full_path = buffer.read(cx).file()?.full_path(cx);
2076        let new_language = self
2077            .languages
2078            .language_for_path(&full_path)
2079            .now_or_never()?
2080            .ok()?;
2081        self.set_language_for_buffer(buffer, new_language, cx);
2082        None
2083    }
2084
2085    pub fn set_language_for_buffer(
2086        &mut self,
2087        buffer: &ModelHandle<Buffer>,
2088        new_language: Arc<Language>,
2089        cx: &mut ModelContext<Self>,
2090    ) {
2091        buffer.update(cx, |buffer, cx| {
2092            if buffer.language().map_or(true, |old_language| {
2093                !Arc::ptr_eq(old_language, &new_language)
2094            }) {
2095                buffer.set_language(Some(new_language.clone()), cx);
2096            }
2097        });
2098
2099        if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
2100            if let Some(worktree) = file.worktree.read(cx).as_local() {
2101                let worktree_id = worktree.id();
2102                let worktree_abs_path = worktree.abs_path().clone();
2103                self.start_language_server(worktree_id, worktree_abs_path, new_language, cx);
2104            }
2105        }
2106    }
2107
2108    fn start_language_server(
2109        &mut self,
2110        worktree_id: WorktreeId,
2111        worktree_path: Arc<Path>,
2112        language: Arc<Language>,
2113        cx: &mut ModelContext<Self>,
2114    ) {
2115        if !cx
2116            .global::<Settings>()
2117            .enable_language_server(Some(&language.name()))
2118        {
2119            return;
2120        }
2121
2122        let adapter = if let Some(adapter) = language.lsp_adapter() {
2123            adapter
2124        } else {
2125            return;
2126        };
2127        let key = (worktree_id, adapter.name.clone());
2128
2129        let mut initialization_options = adapter.initialization_options.clone();
2130
2131        let lsp = &cx.global::<Settings>().lsp.get(&adapter.name.0);
2132        let override_options = lsp.map(|s| s.initialization_options.clone()).flatten();
2133        match (&mut initialization_options, override_options) {
2134            (Some(initialization_options), Some(override_options)) => {
2135                merge_json_value_into(override_options, initialization_options);
2136            }
2137            (None, override_options) => initialization_options = override_options,
2138            _ => {}
2139        }
2140
2141        self.language_server_ids
2142            .entry(key.clone())
2143            .or_insert_with(|| {
2144                let languages = self.languages.clone();
2145                let server_id = post_inc(&mut self.next_language_server_id);
2146                let language_server = self.languages.start_language_server(
2147                    server_id,
2148                    language.clone(),
2149                    worktree_path,
2150                    self.client.http_client(),
2151                    cx,
2152                );
2153                self.language_servers.insert(
2154                    server_id,
2155                    LanguageServerState::Starting(cx.spawn_weak(|this, mut cx| async move {
2156                        let workspace_config =
2157                            cx.update(|cx| languages.workspace_configuration(cx)).await;
2158                        let language_server = language_server?.await.log_err()?;
2159                        let language_server = language_server
2160                            .initialize(initialization_options)
2161                            .await
2162                            .log_err()?;
2163                        let this = this.upgrade(&cx)?;
2164
2165                        language_server
2166                            .on_notification::<lsp::notification::PublishDiagnostics, _>({
2167                                let this = this.downgrade();
2168                                let adapter = adapter.clone();
2169                                move |mut params, cx| {
2170                                    let this = this;
2171                                    let adapter = adapter.clone();
2172                                    cx.spawn(|mut cx| async move {
2173                                        adapter.process_diagnostics(&mut params).await;
2174                                        if let Some(this) = this.upgrade(&cx) {
2175                                            this.update(&mut cx, |this, cx| {
2176                                                this.update_diagnostics(
2177                                                    server_id,
2178                                                    params,
2179                                                    &adapter.disk_based_diagnostic_sources,
2180                                                    cx,
2181                                                )
2182                                                .log_err();
2183                                            });
2184                                        }
2185                                    })
2186                                    .detach();
2187                                }
2188                            })
2189                            .detach();
2190
2191                        language_server
2192                            .on_request::<lsp::request::WorkspaceConfiguration, _, _>({
2193                                let languages = languages.clone();
2194                                move |params, mut cx| {
2195                                    let languages = languages.clone();
2196                                    async move {
2197                                        let workspace_config = cx
2198                                            .update(|cx| languages.workspace_configuration(cx))
2199                                            .await;
2200                                        Ok(params
2201                                            .items
2202                                            .into_iter()
2203                                            .map(|item| {
2204                                                if let Some(section) = &item.section {
2205                                                    workspace_config
2206                                                        .get(section)
2207                                                        .cloned()
2208                                                        .unwrap_or(serde_json::Value::Null)
2209                                                } else {
2210                                                    workspace_config.clone()
2211                                                }
2212                                            })
2213                                            .collect())
2214                                    }
2215                                }
2216                            })
2217                            .detach();
2218
2219                        // Even though we don't have handling for these requests, respond to them to
2220                        // avoid stalling any language server like `gopls` which waits for a response
2221                        // to these requests when initializing.
2222                        language_server
2223                            .on_request::<lsp::request::WorkDoneProgressCreate, _, _>({
2224                                let this = this.downgrade();
2225                                move |params, mut cx| async move {
2226                                    if let Some(this) = this.upgrade(&cx) {
2227                                        this.update(&mut cx, |this, _| {
2228                                            if let Some(status) =
2229                                                this.language_server_statuses.get_mut(&server_id)
2230                                            {
2231                                                if let lsp::NumberOrString::String(token) =
2232                                                    params.token
2233                                                {
2234                                                    status.progress_tokens.insert(token);
2235                                                }
2236                                            }
2237                                        });
2238                                    }
2239                                    Ok(())
2240                                }
2241                            })
2242                            .detach();
2243                        language_server
2244                            .on_request::<lsp::request::RegisterCapability, _, _>({
2245                                let this = this.downgrade();
2246                                move |params, mut cx| async move {
2247                                    let this = this
2248                                        .upgrade(&cx)
2249                                        .ok_or_else(|| anyhow!("project dropped"))?;
2250                                    for reg in params.registrations {
2251                                        if reg.method == "workspace/didChangeWatchedFiles" {
2252                                            if let Some(options) = reg.register_options {
2253                                                let options = serde_json::from_value(options)?;
2254                                                this.update(&mut cx, |this, cx| {
2255                                                    this.on_lsp_did_change_watched_files(
2256                                                        server_id, options, cx,
2257                                                    );
2258                                                });
2259                                            }
2260                                        }
2261                                    }
2262                                    Ok(())
2263                                }
2264                            })
2265                            .detach();
2266
2267                        language_server
2268                            .on_request::<lsp::request::ApplyWorkspaceEdit, _, _>({
2269                                let this = this.downgrade();
2270                                let adapter = adapter.clone();
2271                                let language_server = language_server.clone();
2272                                move |params, cx| {
2273                                    Self::on_lsp_workspace_edit(
2274                                        this,
2275                                        params,
2276                                        server_id,
2277                                        adapter.clone(),
2278                                        language_server.clone(),
2279                                        cx,
2280                                    )
2281                                }
2282                            })
2283                            .detach();
2284
2285                        let disk_based_diagnostics_progress_token =
2286                            adapter.disk_based_diagnostics_progress_token.clone();
2287
2288                        language_server
2289                            .on_notification::<lsp::notification::Progress, _>({
2290                                let this = this.downgrade();
2291                                move |params, mut cx| {
2292                                    if let Some(this) = this.upgrade(&cx) {
2293                                        this.update(&mut cx, |this, cx| {
2294                                            this.on_lsp_progress(
2295                                                params,
2296                                                server_id,
2297                                                disk_based_diagnostics_progress_token.clone(),
2298                                                cx,
2299                                            );
2300                                        });
2301                                    }
2302                                }
2303                            })
2304                            .detach();
2305
2306                        language_server
2307                            .notify::<lsp::notification::DidChangeConfiguration>(
2308                                lsp::DidChangeConfigurationParams {
2309                                    settings: workspace_config,
2310                                },
2311                            )
2312                            .ok();
2313
2314                        this.update(&mut cx, |this, cx| {
2315                            // If the language server for this key doesn't match the server id, don't store the
2316                            // server. Which will cause it to be dropped, killing the process
2317                            if this
2318                                .language_server_ids
2319                                .get(&key)
2320                                .map(|id| id != &server_id)
2321                                .unwrap_or(false)
2322                            {
2323                                return None;
2324                            }
2325
2326                            // Update language_servers collection with Running variant of LanguageServerState
2327                            // indicating that the server is up and running and ready
2328                            this.language_servers.insert(
2329                                server_id,
2330                                LanguageServerState::Running {
2331                                    adapter: adapter.clone(),
2332                                    language,
2333                                    watched_paths: Default::default(),
2334                                    server: language_server.clone(),
2335                                    simulate_disk_based_diagnostics_completion: None,
2336                                },
2337                            );
2338                            this.language_server_statuses.insert(
2339                                server_id,
2340                                LanguageServerStatus {
2341                                    name: language_server.name().to_string(),
2342                                    pending_work: Default::default(),
2343                                    has_pending_diagnostic_updates: false,
2344                                    progress_tokens: Default::default(),
2345                                },
2346                            );
2347
2348                            if let Some(project_id) = this.remote_id() {
2349                                this.client
2350                                    .send(proto::StartLanguageServer {
2351                                        project_id,
2352                                        server: Some(proto::LanguageServer {
2353                                            id: server_id as u64,
2354                                            name: language_server.name().to_string(),
2355                                        }),
2356                                    })
2357                                    .log_err();
2358                            }
2359
2360                            // Tell the language server about every open buffer in the worktree that matches the language.
2361                            for buffer in this.opened_buffers.values() {
2362                                if let Some(buffer_handle) = buffer.upgrade(cx) {
2363                                    let buffer = buffer_handle.read(cx);
2364                                    let file = if let Some(file) = File::from_dyn(buffer.file()) {
2365                                        file
2366                                    } else {
2367                                        continue;
2368                                    };
2369                                    let language = if let Some(language) = buffer.language() {
2370                                        language
2371                                    } else {
2372                                        continue;
2373                                    };
2374                                    if file.worktree.read(cx).id() != key.0
2375                                        || language.lsp_adapter().map(|a| a.name.clone())
2376                                            != Some(key.1.clone())
2377                                    {
2378                                        continue;
2379                                    }
2380
2381                                    let file = file.as_local()?;
2382                                    let versions = this
2383                                        .buffer_snapshots
2384                                        .entry(buffer.remote_id())
2385                                        .or_insert_with(|| vec![(0, buffer.text_snapshot())]);
2386
2387                                    let (version, initial_snapshot) = versions.last().unwrap();
2388                                    let uri = lsp::Url::from_file_path(file.abs_path(cx)).unwrap();
2389                                    language_server
2390                                        .notify::<lsp::notification::DidOpenTextDocument>(
2391                                            lsp::DidOpenTextDocumentParams {
2392                                                text_document: lsp::TextDocumentItem::new(
2393                                                    uri,
2394                                                    adapter
2395                                                        .language_ids
2396                                                        .get(language.name().as_ref())
2397                                                        .cloned()
2398                                                        .unwrap_or_default(),
2399                                                    *version,
2400                                                    initial_snapshot.text(),
2401                                                ),
2402                                            },
2403                                        )
2404                                        .log_err()?;
2405                                    buffer_handle.update(cx, |buffer, cx| {
2406                                        buffer.set_completion_triggers(
2407                                            language_server
2408                                                .capabilities()
2409                                                .completion_provider
2410                                                .as_ref()
2411                                                .and_then(|provider| {
2412                                                    provider.trigger_characters.clone()
2413                                                })
2414                                                .unwrap_or_default(),
2415                                            cx,
2416                                        )
2417                                    });
2418                                }
2419                            }
2420
2421                            cx.notify();
2422                            Some(language_server)
2423                        })
2424                    })),
2425                );
2426
2427                server_id
2428            });
2429    }
2430
2431    // Returns a list of all of the worktrees which no longer have a language server and the root path
2432    // for the stopped server
2433    fn stop_language_server(
2434        &mut self,
2435        worktree_id: WorktreeId,
2436        adapter_name: LanguageServerName,
2437        cx: &mut ModelContext<Self>,
2438    ) -> Task<(Option<PathBuf>, Vec<WorktreeId>)> {
2439        let key = (worktree_id, adapter_name);
2440        if let Some(server_id) = self.language_server_ids.remove(&key) {
2441            // Remove other entries for this language server as well
2442            let mut orphaned_worktrees = vec![worktree_id];
2443            let other_keys = self.language_server_ids.keys().cloned().collect::<Vec<_>>();
2444            for other_key in other_keys {
2445                if self.language_server_ids.get(&other_key) == Some(&server_id) {
2446                    self.language_server_ids.remove(&other_key);
2447                    orphaned_worktrees.push(other_key.0);
2448                }
2449            }
2450
2451            self.language_server_statuses.remove(&server_id);
2452            cx.notify();
2453
2454            let server_state = self.language_servers.remove(&server_id);
2455            cx.spawn_weak(|this, mut cx| async move {
2456                let mut root_path = None;
2457
2458                let server = match server_state {
2459                    Some(LanguageServerState::Starting(started_language_server)) => {
2460                        started_language_server.await
2461                    }
2462                    Some(LanguageServerState::Running { server, .. }) => Some(server),
2463                    None => None,
2464                };
2465
2466                if let Some(server) = server {
2467                    root_path = Some(server.root_path().clone());
2468                    if let Some(shutdown) = server.shutdown() {
2469                        shutdown.await;
2470                    }
2471                }
2472
2473                if let Some(this) = this.upgrade(&cx) {
2474                    this.update(&mut cx, |this, cx| {
2475                        this.language_server_statuses.remove(&server_id);
2476                        cx.notify();
2477                    });
2478                }
2479
2480                (root_path, orphaned_worktrees)
2481            })
2482        } else {
2483            Task::ready((None, Vec::new()))
2484        }
2485    }
2486
2487    pub fn restart_language_servers_for_buffers(
2488        &mut self,
2489        buffers: impl IntoIterator<Item = ModelHandle<Buffer>>,
2490        cx: &mut ModelContext<Self>,
2491    ) -> Option<()> {
2492        let language_server_lookup_info: HashSet<(WorktreeId, Arc<Path>, PathBuf)> = buffers
2493            .into_iter()
2494            .filter_map(|buffer| {
2495                let file = File::from_dyn(buffer.read(cx).file())?;
2496                let worktree = file.worktree.read(cx).as_local()?;
2497                let worktree_id = worktree.id();
2498                let worktree_abs_path = worktree.abs_path().clone();
2499                let full_path = file.full_path(cx);
2500                Some((worktree_id, worktree_abs_path, full_path))
2501            })
2502            .collect();
2503        for (worktree_id, worktree_abs_path, full_path) in language_server_lookup_info {
2504            if let Some(language) = self
2505                .languages
2506                .language_for_path(&full_path)
2507                .now_or_never()
2508                .and_then(|language| language.ok())
2509            {
2510                self.restart_language_server(worktree_id, worktree_abs_path, language, cx);
2511            }
2512        }
2513
2514        None
2515    }
2516
2517    fn restart_language_server(
2518        &mut self,
2519        worktree_id: WorktreeId,
2520        fallback_path: Arc<Path>,
2521        language: Arc<Language>,
2522        cx: &mut ModelContext<Self>,
2523    ) {
2524        let adapter = if let Some(adapter) = language.lsp_adapter() {
2525            adapter
2526        } else {
2527            return;
2528        };
2529
2530        let server_name = adapter.name.clone();
2531        let stop = self.stop_language_server(worktree_id, server_name.clone(), cx);
2532        cx.spawn_weak(|this, mut cx| async move {
2533            let (original_root_path, orphaned_worktrees) = stop.await;
2534            if let Some(this) = this.upgrade(&cx) {
2535                this.update(&mut cx, |this, cx| {
2536                    // Attempt to restart using original server path. Fallback to passed in
2537                    // path if we could not retrieve the root path
2538                    let root_path = original_root_path
2539                        .map(|path_buf| Arc::from(path_buf.as_path()))
2540                        .unwrap_or(fallback_path);
2541
2542                    this.start_language_server(worktree_id, root_path, language, cx);
2543
2544                    // Lookup new server id and set it for each of the orphaned worktrees
2545                    if let Some(new_server_id) = this
2546                        .language_server_ids
2547                        .get(&(worktree_id, server_name.clone()))
2548                        .cloned()
2549                    {
2550                        for orphaned_worktree in orphaned_worktrees {
2551                            this.language_server_ids
2552                                .insert((orphaned_worktree, server_name.clone()), new_server_id);
2553                        }
2554                    }
2555                });
2556            }
2557        })
2558        .detach();
2559    }
2560
2561    fn on_lsp_progress(
2562        &mut self,
2563        progress: lsp::ProgressParams,
2564        server_id: usize,
2565        disk_based_diagnostics_progress_token: Option<String>,
2566        cx: &mut ModelContext<Self>,
2567    ) {
2568        let token = match progress.token {
2569            lsp::NumberOrString::String(token) => token,
2570            lsp::NumberOrString::Number(token) => {
2571                log::info!("skipping numeric progress token {}", token);
2572                return;
2573            }
2574        };
2575        let lsp::ProgressParamsValue::WorkDone(progress) = progress.value;
2576        let language_server_status =
2577            if let Some(status) = self.language_server_statuses.get_mut(&server_id) {
2578                status
2579            } else {
2580                return;
2581            };
2582
2583        if !language_server_status.progress_tokens.contains(&token) {
2584            return;
2585        }
2586
2587        let is_disk_based_diagnostics_progress = disk_based_diagnostics_progress_token
2588            .as_ref()
2589            .map_or(false, |disk_based_token| {
2590                token.starts_with(disk_based_token)
2591            });
2592
2593        match progress {
2594            lsp::WorkDoneProgress::Begin(report) => {
2595                if is_disk_based_diagnostics_progress {
2596                    language_server_status.has_pending_diagnostic_updates = true;
2597                    self.disk_based_diagnostics_started(server_id, cx);
2598                    self.buffer_ordered_messages_tx
2599                        .unbounded_send(BufferOrderedMessage::LanguageServerUpdate {
2600                            language_server_id: server_id,
2601                            message: proto::update_language_server::Variant::DiskBasedDiagnosticsUpdating(Default::default())
2602                        })
2603                        .ok();
2604                } else {
2605                    self.on_lsp_work_start(
2606                        server_id,
2607                        token.clone(),
2608                        LanguageServerProgress {
2609                            message: report.message.clone(),
2610                            percentage: report.percentage.map(|p| p as usize),
2611                            last_update_at: Instant::now(),
2612                        },
2613                        cx,
2614                    );
2615                    self.buffer_ordered_messages_tx
2616                        .unbounded_send(BufferOrderedMessage::LanguageServerUpdate {
2617                            language_server_id: server_id,
2618                            message: proto::update_language_server::Variant::WorkStart(
2619                                proto::LspWorkStart {
2620                                    token,
2621                                    message: report.message,
2622                                    percentage: report.percentage.map(|p| p as u32),
2623                                },
2624                            ),
2625                        })
2626                        .ok();
2627                }
2628            }
2629            lsp::WorkDoneProgress::Report(report) => {
2630                if !is_disk_based_diagnostics_progress {
2631                    self.on_lsp_work_progress(
2632                        server_id,
2633                        token.clone(),
2634                        LanguageServerProgress {
2635                            message: report.message.clone(),
2636                            percentage: report.percentage.map(|p| p as usize),
2637                            last_update_at: Instant::now(),
2638                        },
2639                        cx,
2640                    );
2641                    self.buffer_ordered_messages_tx
2642                        .unbounded_send(BufferOrderedMessage::LanguageServerUpdate {
2643                            language_server_id: server_id,
2644                            message: proto::update_language_server::Variant::WorkProgress(
2645                                proto::LspWorkProgress {
2646                                    token,
2647                                    message: report.message,
2648                                    percentage: report.percentage.map(|p| p as u32),
2649                                },
2650                            ),
2651                        })
2652                        .ok();
2653                }
2654            }
2655            lsp::WorkDoneProgress::End(_) => {
2656                language_server_status.progress_tokens.remove(&token);
2657
2658                if is_disk_based_diagnostics_progress {
2659                    language_server_status.has_pending_diagnostic_updates = false;
2660                    self.disk_based_diagnostics_finished(server_id, cx);
2661                    self.buffer_ordered_messages_tx
2662                        .unbounded_send(BufferOrderedMessage::LanguageServerUpdate {
2663                            language_server_id: server_id,
2664                            message:
2665                                proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
2666                                    Default::default(),
2667                                ),
2668                        })
2669                        .ok();
2670                } else {
2671                    self.on_lsp_work_end(server_id, token.clone(), cx);
2672                    self.buffer_ordered_messages_tx
2673                        .unbounded_send(BufferOrderedMessage::LanguageServerUpdate {
2674                            language_server_id: server_id,
2675                            message: proto::update_language_server::Variant::WorkEnd(
2676                                proto::LspWorkEnd { token },
2677                            ),
2678                        })
2679                        .ok();
2680                }
2681            }
2682        }
2683    }
2684
2685    fn on_lsp_work_start(
2686        &mut self,
2687        language_server_id: usize,
2688        token: String,
2689        progress: LanguageServerProgress,
2690        cx: &mut ModelContext<Self>,
2691    ) {
2692        if let Some(status) = self.language_server_statuses.get_mut(&language_server_id) {
2693            status.pending_work.insert(token, progress);
2694            cx.notify();
2695        }
2696    }
2697
2698    fn on_lsp_work_progress(
2699        &mut self,
2700        language_server_id: usize,
2701        token: String,
2702        progress: LanguageServerProgress,
2703        cx: &mut ModelContext<Self>,
2704    ) {
2705        if let Some(status) = self.language_server_statuses.get_mut(&language_server_id) {
2706            let entry = status
2707                .pending_work
2708                .entry(token)
2709                .or_insert(LanguageServerProgress {
2710                    message: Default::default(),
2711                    percentage: Default::default(),
2712                    last_update_at: progress.last_update_at,
2713                });
2714            if progress.message.is_some() {
2715                entry.message = progress.message;
2716            }
2717            if progress.percentage.is_some() {
2718                entry.percentage = progress.percentage;
2719            }
2720            entry.last_update_at = progress.last_update_at;
2721            cx.notify();
2722        }
2723    }
2724
2725    fn on_lsp_work_end(
2726        &mut self,
2727        language_server_id: usize,
2728        token: String,
2729        cx: &mut ModelContext<Self>,
2730    ) {
2731        if let Some(status) = self.language_server_statuses.get_mut(&language_server_id) {
2732            status.pending_work.remove(&token);
2733            cx.notify();
2734        }
2735    }
2736
2737    fn on_lsp_did_change_watched_files(
2738        &mut self,
2739        language_server_id: usize,
2740        params: DidChangeWatchedFilesRegistrationOptions,
2741        cx: &mut ModelContext<Self>,
2742    ) {
2743        if let Some(LanguageServerState::Running { watched_paths, .. }) =
2744            self.language_servers.get_mut(&language_server_id)
2745        {
2746            watched_paths.clear();
2747            for watcher in params.watchers {
2748                watched_paths.add_pattern(&watcher.glob_pattern).log_err();
2749            }
2750            cx.notify();
2751        }
2752    }
2753
2754    async fn on_lsp_workspace_edit(
2755        this: WeakModelHandle<Self>,
2756        params: lsp::ApplyWorkspaceEditParams,
2757        server_id: usize,
2758        adapter: Arc<CachedLspAdapter>,
2759        language_server: Arc<LanguageServer>,
2760        mut cx: AsyncAppContext,
2761    ) -> Result<lsp::ApplyWorkspaceEditResponse> {
2762        let this = this
2763            .upgrade(&cx)
2764            .ok_or_else(|| anyhow!("project project closed"))?;
2765        let transaction = Self::deserialize_workspace_edit(
2766            this.clone(),
2767            params.edit,
2768            true,
2769            adapter.clone(),
2770            language_server.clone(),
2771            &mut cx,
2772        )
2773        .await
2774        .log_err();
2775        this.update(&mut cx, |this, _| {
2776            if let Some(transaction) = transaction {
2777                this.last_workspace_edits_by_language_server
2778                    .insert(server_id, transaction);
2779            }
2780        });
2781        Ok(lsp::ApplyWorkspaceEditResponse {
2782            applied: true,
2783            failed_change: None,
2784            failure_reason: None,
2785        })
2786    }
2787
2788    pub fn language_server_statuses(
2789        &self,
2790    ) -> impl DoubleEndedIterator<Item = &LanguageServerStatus> {
2791        self.language_server_statuses.values()
2792    }
2793
2794    pub fn update_diagnostics(
2795        &mut self,
2796        language_server_id: usize,
2797        mut params: lsp::PublishDiagnosticsParams,
2798        disk_based_sources: &[String],
2799        cx: &mut ModelContext<Self>,
2800    ) -> Result<()> {
2801        let abs_path = params
2802            .uri
2803            .to_file_path()
2804            .map_err(|_| anyhow!("URI is not a file"))?;
2805        let mut diagnostics = Vec::default();
2806        let mut primary_diagnostic_group_ids = HashMap::default();
2807        let mut sources_by_group_id = HashMap::default();
2808        let mut supporting_diagnostics = HashMap::default();
2809
2810        // Ensure that primary diagnostics are always the most severe
2811        params.diagnostics.sort_by_key(|item| item.severity);
2812
2813        for diagnostic in &params.diagnostics {
2814            let source = diagnostic.source.as_ref();
2815            let code = diagnostic.code.as_ref().map(|code| match code {
2816                lsp::NumberOrString::Number(code) => code.to_string(),
2817                lsp::NumberOrString::String(code) => code.clone(),
2818            });
2819            let range = range_from_lsp(diagnostic.range);
2820            let is_supporting = diagnostic
2821                .related_information
2822                .as_ref()
2823                .map_or(false, |infos| {
2824                    infos.iter().any(|info| {
2825                        primary_diagnostic_group_ids.contains_key(&(
2826                            source,
2827                            code.clone(),
2828                            range_from_lsp(info.location.range),
2829                        ))
2830                    })
2831                });
2832
2833            let is_unnecessary = diagnostic.tags.as_ref().map_or(false, |tags| {
2834                tags.iter().any(|tag| *tag == DiagnosticTag::UNNECESSARY)
2835            });
2836
2837            if is_supporting {
2838                supporting_diagnostics.insert(
2839                    (source, code.clone(), range),
2840                    (diagnostic.severity, is_unnecessary),
2841                );
2842            } else {
2843                let group_id = post_inc(&mut self.next_diagnostic_group_id);
2844                let is_disk_based =
2845                    source.map_or(false, |source| disk_based_sources.contains(source));
2846
2847                sources_by_group_id.insert(group_id, source);
2848                primary_diagnostic_group_ids
2849                    .insert((source, code.clone(), range.clone()), group_id);
2850
2851                diagnostics.push(DiagnosticEntry {
2852                    range,
2853                    diagnostic: Diagnostic {
2854                        code: code.clone(),
2855                        severity: diagnostic.severity.unwrap_or(DiagnosticSeverity::ERROR),
2856                        message: diagnostic.message.clone(),
2857                        group_id,
2858                        is_primary: true,
2859                        is_valid: true,
2860                        is_disk_based,
2861                        is_unnecessary,
2862                    },
2863                });
2864                if let Some(infos) = &diagnostic.related_information {
2865                    for info in infos {
2866                        if info.location.uri == params.uri && !info.message.is_empty() {
2867                            let range = range_from_lsp(info.location.range);
2868                            diagnostics.push(DiagnosticEntry {
2869                                range,
2870                                diagnostic: Diagnostic {
2871                                    code: code.clone(),
2872                                    severity: DiagnosticSeverity::INFORMATION,
2873                                    message: info.message.clone(),
2874                                    group_id,
2875                                    is_primary: false,
2876                                    is_valid: true,
2877                                    is_disk_based,
2878                                    is_unnecessary: false,
2879                                },
2880                            });
2881                        }
2882                    }
2883                }
2884            }
2885        }
2886
2887        for entry in &mut diagnostics {
2888            let diagnostic = &mut entry.diagnostic;
2889            if !diagnostic.is_primary {
2890                let source = *sources_by_group_id.get(&diagnostic.group_id).unwrap();
2891                if let Some(&(severity, is_unnecessary)) = supporting_diagnostics.get(&(
2892                    source,
2893                    diagnostic.code.clone(),
2894                    entry.range.clone(),
2895                )) {
2896                    if let Some(severity) = severity {
2897                        diagnostic.severity = severity;
2898                    }
2899                    diagnostic.is_unnecessary = is_unnecessary;
2900                }
2901            }
2902        }
2903
2904        self.update_diagnostic_entries(
2905            language_server_id,
2906            abs_path,
2907            params.version,
2908            diagnostics,
2909            cx,
2910        )?;
2911        Ok(())
2912    }
2913
2914    pub fn update_diagnostic_entries(
2915        &mut self,
2916        language_server_id: usize,
2917        abs_path: PathBuf,
2918        version: Option<i32>,
2919        diagnostics: Vec<DiagnosticEntry<Unclipped<PointUtf16>>>,
2920        cx: &mut ModelContext<Project>,
2921    ) -> Result<(), anyhow::Error> {
2922        let (worktree, relative_path) = self
2923            .find_local_worktree(&abs_path, cx)
2924            .ok_or_else(|| anyhow!("no worktree found for diagnostics"))?;
2925
2926        let project_path = ProjectPath {
2927            worktree_id: worktree.read(cx).id(),
2928            path: relative_path.into(),
2929        };
2930
2931        if let Some(buffer) = self.get_open_buffer(&project_path, cx) {
2932            self.update_buffer_diagnostics(&buffer, diagnostics.clone(), version, cx)?;
2933        }
2934
2935        let updated = worktree.update(cx, |worktree, cx| {
2936            worktree
2937                .as_local_mut()
2938                .ok_or_else(|| anyhow!("not a local worktree"))?
2939                .update_diagnostics(
2940                    language_server_id,
2941                    project_path.path.clone(),
2942                    diagnostics,
2943                    cx,
2944                )
2945        })?;
2946        if updated {
2947            cx.emit(Event::DiagnosticsUpdated {
2948                language_server_id,
2949                path: project_path,
2950            });
2951        }
2952        Ok(())
2953    }
2954
2955    fn update_buffer_diagnostics(
2956        &mut self,
2957        buffer: &ModelHandle<Buffer>,
2958        mut diagnostics: Vec<DiagnosticEntry<Unclipped<PointUtf16>>>,
2959        version: Option<i32>,
2960        cx: &mut ModelContext<Self>,
2961    ) -> Result<()> {
2962        fn compare_diagnostics(a: &Diagnostic, b: &Diagnostic) -> Ordering {
2963            Ordering::Equal
2964                .then_with(|| b.is_primary.cmp(&a.is_primary))
2965                .then_with(|| a.is_disk_based.cmp(&b.is_disk_based))
2966                .then_with(|| a.severity.cmp(&b.severity))
2967                .then_with(|| a.message.cmp(&b.message))
2968        }
2969
2970        let snapshot = self.buffer_snapshot_for_lsp_version(buffer, version, cx)?;
2971
2972        diagnostics.sort_unstable_by(|a, b| {
2973            Ordering::Equal
2974                .then_with(|| a.range.start.cmp(&b.range.start))
2975                .then_with(|| b.range.end.cmp(&a.range.end))
2976                .then_with(|| compare_diagnostics(&a.diagnostic, &b.diagnostic))
2977        });
2978
2979        let mut sanitized_diagnostics = Vec::new();
2980        let edits_since_save = Patch::new(
2981            snapshot
2982                .edits_since::<Unclipped<PointUtf16>>(buffer.read(cx).saved_version())
2983                .collect(),
2984        );
2985        for entry in diagnostics {
2986            let start;
2987            let end;
2988            if entry.diagnostic.is_disk_based {
2989                // Some diagnostics are based on files on disk instead of buffers'
2990                // current contents. Adjust these diagnostics' ranges to reflect
2991                // any unsaved edits.
2992                start = edits_since_save.old_to_new(entry.range.start);
2993                end = edits_since_save.old_to_new(entry.range.end);
2994            } else {
2995                start = entry.range.start;
2996                end = entry.range.end;
2997            }
2998
2999            let mut range = snapshot.clip_point_utf16(start, Bias::Left)
3000                ..snapshot.clip_point_utf16(end, Bias::Right);
3001
3002            // Expand empty ranges by one codepoint
3003            if range.start == range.end {
3004                // This will be go to the next boundary when being clipped
3005                range.end.column += 1;
3006                range.end = snapshot.clip_point_utf16(Unclipped(range.end), Bias::Right);
3007                if range.start == range.end && range.end.column > 0 {
3008                    range.start.column -= 1;
3009                    range.end = snapshot.clip_point_utf16(Unclipped(range.end), Bias::Left);
3010                }
3011            }
3012
3013            sanitized_diagnostics.push(DiagnosticEntry {
3014                range,
3015                diagnostic: entry.diagnostic,
3016            });
3017        }
3018        drop(edits_since_save);
3019
3020        let set = DiagnosticSet::new(sanitized_diagnostics, &snapshot);
3021        buffer.update(cx, |buffer, cx| buffer.update_diagnostics(set, cx));
3022        Ok(())
3023    }
3024
3025    pub fn reload_buffers(
3026        &self,
3027        buffers: HashSet<ModelHandle<Buffer>>,
3028        push_to_history: bool,
3029        cx: &mut ModelContext<Self>,
3030    ) -> Task<Result<ProjectTransaction>> {
3031        let mut local_buffers = Vec::new();
3032        let mut remote_buffers = None;
3033        for buffer_handle in buffers {
3034            let buffer = buffer_handle.read(cx);
3035            if buffer.is_dirty() {
3036                if let Some(file) = File::from_dyn(buffer.file()) {
3037                    if file.is_local() {
3038                        local_buffers.push(buffer_handle);
3039                    } else {
3040                        remote_buffers.get_or_insert(Vec::new()).push(buffer_handle);
3041                    }
3042                }
3043            }
3044        }
3045
3046        let remote_buffers = self.remote_id().zip(remote_buffers);
3047        let client = self.client.clone();
3048
3049        cx.spawn(|this, mut cx| async move {
3050            let mut project_transaction = ProjectTransaction::default();
3051
3052            if let Some((project_id, remote_buffers)) = remote_buffers {
3053                let response = client
3054                    .request(proto::ReloadBuffers {
3055                        project_id,
3056                        buffer_ids: remote_buffers
3057                            .iter()
3058                            .map(|buffer| buffer.read_with(&cx, |buffer, _| buffer.remote_id()))
3059                            .collect(),
3060                    })
3061                    .await?
3062                    .transaction
3063                    .ok_or_else(|| anyhow!("missing transaction"))?;
3064                project_transaction = this
3065                    .update(&mut cx, |this, cx| {
3066                        this.deserialize_project_transaction(response, push_to_history, cx)
3067                    })
3068                    .await?;
3069            }
3070
3071            for buffer in local_buffers {
3072                let transaction = buffer
3073                    .update(&mut cx, |buffer, cx| buffer.reload(cx))
3074                    .await?;
3075                buffer.update(&mut cx, |buffer, cx| {
3076                    if let Some(transaction) = transaction {
3077                        if !push_to_history {
3078                            buffer.forget_transaction(transaction.id);
3079                        }
3080                        project_transaction.0.insert(cx.handle(), transaction);
3081                    }
3082                });
3083            }
3084
3085            Ok(project_transaction)
3086        })
3087    }
3088
3089    pub fn format(
3090        &self,
3091        buffers: HashSet<ModelHandle<Buffer>>,
3092        push_to_history: bool,
3093        trigger: FormatTrigger,
3094        cx: &mut ModelContext<Project>,
3095    ) -> Task<Result<ProjectTransaction>> {
3096        if self.is_local() {
3097            let mut buffers_with_paths_and_servers = buffers
3098                .into_iter()
3099                .filter_map(|buffer_handle| {
3100                    let buffer = buffer_handle.read(cx);
3101                    let file = File::from_dyn(buffer.file())?;
3102                    let buffer_abs_path = file.as_local().map(|f| f.abs_path(cx));
3103                    let server = self
3104                        .language_server_for_buffer(buffer, cx)
3105                        .map(|s| s.1.clone());
3106                    Some((buffer_handle, buffer_abs_path, server))
3107                })
3108                .collect::<Vec<_>>();
3109
3110            cx.spawn(|this, mut cx| async move {
3111                // Do not allow multiple concurrent formatting requests for the
3112                // same buffer.
3113                this.update(&mut cx, |this, _| {
3114                    buffers_with_paths_and_servers
3115                        .retain(|(buffer, _, _)| this.buffers_being_formatted.insert(buffer.id()));
3116                });
3117
3118                let _cleanup = defer({
3119                    let this = this.clone();
3120                    let mut cx = cx.clone();
3121                    let buffers = &buffers_with_paths_and_servers;
3122                    move || {
3123                        this.update(&mut cx, |this, _| {
3124                            for (buffer, _, _) in buffers {
3125                                this.buffers_being_formatted.remove(&buffer.id());
3126                            }
3127                        });
3128                    }
3129                });
3130
3131                let mut project_transaction = ProjectTransaction::default();
3132                for (buffer, buffer_abs_path, language_server) in &buffers_with_paths_and_servers {
3133                    let (
3134                        format_on_save,
3135                        remove_trailing_whitespace,
3136                        ensure_final_newline,
3137                        formatter,
3138                        tab_size,
3139                    ) = buffer.read_with(&cx, |buffer, cx| {
3140                        let settings = cx.global::<Settings>();
3141                        let language_name = buffer.language().map(|language| language.name());
3142                        (
3143                            settings.format_on_save(language_name.as_deref()),
3144                            settings.remove_trailing_whitespace_on_save(language_name.as_deref()),
3145                            settings.ensure_final_newline_on_save(language_name.as_deref()),
3146                            settings.formatter(language_name.as_deref()),
3147                            settings.tab_size(language_name.as_deref()),
3148                        )
3149                    });
3150
3151                    // First, format buffer's whitespace according to the settings.
3152                    let trailing_whitespace_diff = if remove_trailing_whitespace {
3153                        Some(
3154                            buffer
3155                                .read_with(&cx, |b, cx| b.remove_trailing_whitespace(cx))
3156                                .await,
3157                        )
3158                    } else {
3159                        None
3160                    };
3161                    let whitespace_transaction_id = buffer.update(&mut cx, |buffer, cx| {
3162                        buffer.finalize_last_transaction();
3163                        buffer.start_transaction();
3164                        if let Some(diff) = trailing_whitespace_diff {
3165                            buffer.apply_diff(diff, cx);
3166                        }
3167                        if ensure_final_newline {
3168                            buffer.ensure_final_newline(cx);
3169                        }
3170                        buffer.end_transaction(cx)
3171                    });
3172
3173                    // Currently, formatting operations are represented differently depending on
3174                    // whether they come from a language server or an external command.
3175                    enum FormatOperation {
3176                        Lsp(Vec<(Range<Anchor>, String)>),
3177                        External(Diff),
3178                    }
3179
3180                    // Apply language-specific formatting using either a language server
3181                    // or external command.
3182                    let mut format_operation = None;
3183                    match (formatter, format_on_save) {
3184                        (_, FormatOnSave::Off) if trigger == FormatTrigger::Save => {}
3185
3186                        (Formatter::LanguageServer, FormatOnSave::On | FormatOnSave::Off)
3187                        | (_, FormatOnSave::LanguageServer) => {
3188                            if let Some((language_server, buffer_abs_path)) =
3189                                language_server.as_ref().zip(buffer_abs_path.as_ref())
3190                            {
3191                                format_operation = Some(FormatOperation::Lsp(
3192                                    Self::format_via_lsp(
3193                                        &this,
3194                                        &buffer,
3195                                        buffer_abs_path,
3196                                        &language_server,
3197                                        tab_size,
3198                                        &mut cx,
3199                                    )
3200                                    .await
3201                                    .context("failed to format via language server")?,
3202                                ));
3203                            }
3204                        }
3205
3206                        (
3207                            Formatter::External { command, arguments },
3208                            FormatOnSave::On | FormatOnSave::Off,
3209                        )
3210                        | (_, FormatOnSave::External { command, arguments }) => {
3211                            if let Some(buffer_abs_path) = buffer_abs_path {
3212                                format_operation = Self::format_via_external_command(
3213                                    &buffer,
3214                                    &buffer_abs_path,
3215                                    &command,
3216                                    &arguments,
3217                                    &mut cx,
3218                                )
3219                                .await
3220                                .context(format!(
3221                                    "failed to format via external command {:?}",
3222                                    command
3223                                ))?
3224                                .map(FormatOperation::External);
3225                            }
3226                        }
3227                    };
3228
3229                    buffer.update(&mut cx, |b, cx| {
3230                        // If the buffer had its whitespace formatted and was edited while the language-specific
3231                        // formatting was being computed, avoid applying the language-specific formatting, because
3232                        // it can't be grouped with the whitespace formatting in the undo history.
3233                        if let Some(transaction_id) = whitespace_transaction_id {
3234                            if b.peek_undo_stack()
3235                                .map_or(true, |e| e.transaction_id() != transaction_id)
3236                            {
3237                                format_operation.take();
3238                            }
3239                        }
3240
3241                        // Apply any language-specific formatting, and group the two formatting operations
3242                        // in the buffer's undo history.
3243                        if let Some(operation) = format_operation {
3244                            match operation {
3245                                FormatOperation::Lsp(edits) => {
3246                                    b.edit(edits, None, cx);
3247                                }
3248                                FormatOperation::External(diff) => {
3249                                    b.apply_diff(diff, cx);
3250                                }
3251                            }
3252
3253                            if let Some(transaction_id) = whitespace_transaction_id {
3254                                b.group_until_transaction(transaction_id);
3255                            }
3256                        }
3257
3258                        if let Some(transaction) = b.finalize_last_transaction().cloned() {
3259                            if !push_to_history {
3260                                b.forget_transaction(transaction.id);
3261                            }
3262                            project_transaction.0.insert(buffer.clone(), transaction);
3263                        }
3264                    });
3265                }
3266
3267                Ok(project_transaction)
3268            })
3269        } else {
3270            let remote_id = self.remote_id();
3271            let client = self.client.clone();
3272            cx.spawn(|this, mut cx| async move {
3273                let mut project_transaction = ProjectTransaction::default();
3274                if let Some(project_id) = remote_id {
3275                    let response = client
3276                        .request(proto::FormatBuffers {
3277                            project_id,
3278                            trigger: trigger as i32,
3279                            buffer_ids: buffers
3280                                .iter()
3281                                .map(|buffer| buffer.read_with(&cx, |buffer, _| buffer.remote_id()))
3282                                .collect(),
3283                        })
3284                        .await?
3285                        .transaction
3286                        .ok_or_else(|| anyhow!("missing transaction"))?;
3287                    project_transaction = this
3288                        .update(&mut cx, |this, cx| {
3289                            this.deserialize_project_transaction(response, push_to_history, cx)
3290                        })
3291                        .await?;
3292                }
3293                Ok(project_transaction)
3294            })
3295        }
3296    }
3297
3298    async fn format_via_lsp(
3299        this: &ModelHandle<Self>,
3300        buffer: &ModelHandle<Buffer>,
3301        abs_path: &Path,
3302        language_server: &Arc<LanguageServer>,
3303        tab_size: NonZeroU32,
3304        cx: &mut AsyncAppContext,
3305    ) -> Result<Vec<(Range<Anchor>, String)>> {
3306        let text_document =
3307            lsp::TextDocumentIdentifier::new(lsp::Url::from_file_path(abs_path).unwrap());
3308        let capabilities = &language_server.capabilities();
3309        let lsp_edits = if capabilities
3310            .document_formatting_provider
3311            .as_ref()
3312            .map_or(false, |provider| *provider != lsp::OneOf::Left(false))
3313        {
3314            language_server
3315                .request::<lsp::request::Formatting>(lsp::DocumentFormattingParams {
3316                    text_document,
3317                    options: lsp::FormattingOptions {
3318                        tab_size: tab_size.into(),
3319                        insert_spaces: true,
3320                        insert_final_newline: Some(true),
3321                        ..Default::default()
3322                    },
3323                    work_done_progress_params: Default::default(),
3324                })
3325                .await?
3326        } else if capabilities
3327            .document_range_formatting_provider
3328            .as_ref()
3329            .map_or(false, |provider| *provider != lsp::OneOf::Left(false))
3330        {
3331            let buffer_start = lsp::Position::new(0, 0);
3332            let buffer_end =
3333                buffer.read_with(cx, |buffer, _| point_to_lsp(buffer.max_point_utf16()));
3334            language_server
3335                .request::<lsp::request::RangeFormatting>(lsp::DocumentRangeFormattingParams {
3336                    text_document,
3337                    range: lsp::Range::new(buffer_start, buffer_end),
3338                    options: lsp::FormattingOptions {
3339                        tab_size: tab_size.into(),
3340                        insert_spaces: true,
3341                        insert_final_newline: Some(true),
3342                        ..Default::default()
3343                    },
3344                    work_done_progress_params: Default::default(),
3345                })
3346                .await?
3347        } else {
3348            None
3349        };
3350
3351        if let Some(lsp_edits) = lsp_edits {
3352            this.update(cx, |this, cx| {
3353                this.edits_from_lsp(buffer, lsp_edits, None, cx)
3354            })
3355            .await
3356        } else {
3357            Ok(Default::default())
3358        }
3359    }
3360
3361    async fn format_via_external_command(
3362        buffer: &ModelHandle<Buffer>,
3363        buffer_abs_path: &Path,
3364        command: &str,
3365        arguments: &[String],
3366        cx: &mut AsyncAppContext,
3367    ) -> Result<Option<Diff>> {
3368        let working_dir_path = buffer.read_with(cx, |buffer, cx| {
3369            let file = File::from_dyn(buffer.file())?;
3370            let worktree = file.worktree.read(cx).as_local()?;
3371            let mut worktree_path = worktree.abs_path().to_path_buf();
3372            if worktree.root_entry()?.is_file() {
3373                worktree_path.pop();
3374            }
3375            Some(worktree_path)
3376        });
3377
3378        if let Some(working_dir_path) = working_dir_path {
3379            let mut child =
3380                smol::process::Command::new(command)
3381                    .args(arguments.iter().map(|arg| {
3382                        arg.replace("{buffer_path}", &buffer_abs_path.to_string_lossy())
3383                    }))
3384                    .current_dir(&working_dir_path)
3385                    .stdin(smol::process::Stdio::piped())
3386                    .stdout(smol::process::Stdio::piped())
3387                    .stderr(smol::process::Stdio::piped())
3388                    .spawn()?;
3389            let stdin = child
3390                .stdin
3391                .as_mut()
3392                .ok_or_else(|| anyhow!("failed to acquire stdin"))?;
3393            let text = buffer.read_with(cx, |buffer, _| buffer.as_rope().clone());
3394            for chunk in text.chunks() {
3395                stdin.write_all(chunk.as_bytes()).await?;
3396            }
3397            stdin.flush().await?;
3398
3399            let output = child.output().await?;
3400            if !output.status.success() {
3401                return Err(anyhow!(
3402                    "command failed with exit code {:?}:\nstdout: {}\nstderr: {}",
3403                    output.status.code(),
3404                    String::from_utf8_lossy(&output.stdout),
3405                    String::from_utf8_lossy(&output.stderr),
3406                ));
3407            }
3408
3409            let stdout = String::from_utf8(output.stdout)?;
3410            Ok(Some(
3411                buffer
3412                    .read_with(cx, |buffer, cx| buffer.diff(stdout, cx))
3413                    .await,
3414            ))
3415        } else {
3416            Ok(None)
3417        }
3418    }
3419
3420    pub fn definition<T: ToPointUtf16>(
3421        &self,
3422        buffer: &ModelHandle<Buffer>,
3423        position: T,
3424        cx: &mut ModelContext<Self>,
3425    ) -> Task<Result<Vec<LocationLink>>> {
3426        let position = position.to_point_utf16(buffer.read(cx));
3427        self.request_lsp(buffer.clone(), GetDefinition { position }, cx)
3428    }
3429
3430    pub fn type_definition<T: ToPointUtf16>(
3431        &self,
3432        buffer: &ModelHandle<Buffer>,
3433        position: T,
3434        cx: &mut ModelContext<Self>,
3435    ) -> Task<Result<Vec<LocationLink>>> {
3436        let position = position.to_point_utf16(buffer.read(cx));
3437        self.request_lsp(buffer.clone(), GetTypeDefinition { position }, cx)
3438    }
3439
3440    pub fn references<T: ToPointUtf16>(
3441        &self,
3442        buffer: &ModelHandle<Buffer>,
3443        position: T,
3444        cx: &mut ModelContext<Self>,
3445    ) -> Task<Result<Vec<Location>>> {
3446        let position = position.to_point_utf16(buffer.read(cx));
3447        self.request_lsp(buffer.clone(), GetReferences { position }, cx)
3448    }
3449
3450    pub fn document_highlights<T: ToPointUtf16>(
3451        &self,
3452        buffer: &ModelHandle<Buffer>,
3453        position: T,
3454        cx: &mut ModelContext<Self>,
3455    ) -> Task<Result<Vec<DocumentHighlight>>> {
3456        let position = position.to_point_utf16(buffer.read(cx));
3457        self.request_lsp(buffer.clone(), GetDocumentHighlights { position }, cx)
3458    }
3459
3460    pub fn symbols(&self, query: &str, cx: &mut ModelContext<Self>) -> Task<Result<Vec<Symbol>>> {
3461        if self.is_local() {
3462            let mut requests = Vec::new();
3463            for ((worktree_id, _), server_id) in self.language_server_ids.iter() {
3464                let worktree_id = *worktree_id;
3465                if let Some(worktree) = self
3466                    .worktree_for_id(worktree_id, cx)
3467                    .and_then(|worktree| worktree.read(cx).as_local())
3468                {
3469                    if let Some(LanguageServerState::Running {
3470                        adapter,
3471                        language,
3472                        server,
3473                        ..
3474                    }) = self.language_servers.get(server_id)
3475                    {
3476                        let adapter = adapter.clone();
3477                        let language = language.clone();
3478                        let worktree_abs_path = worktree.abs_path().clone();
3479                        requests.push(
3480                            server
3481                                .request::<lsp::request::WorkspaceSymbol>(
3482                                    lsp::WorkspaceSymbolParams {
3483                                        query: query.to_string(),
3484                                        ..Default::default()
3485                                    },
3486                                )
3487                                .log_err()
3488                                .map(move |response| {
3489                                    (
3490                                        adapter,
3491                                        language,
3492                                        worktree_id,
3493                                        worktree_abs_path,
3494                                        response.unwrap_or_default(),
3495                                    )
3496                                }),
3497                        );
3498                    }
3499                }
3500            }
3501
3502            cx.spawn_weak(|this, cx| async move {
3503                let responses = futures::future::join_all(requests).await;
3504                let this = if let Some(this) = this.upgrade(&cx) {
3505                    this
3506                } else {
3507                    return Ok(Default::default());
3508                };
3509                let symbols = this.read_with(&cx, |this, cx| {
3510                    let mut symbols = Vec::new();
3511                    for (
3512                        adapter,
3513                        adapter_language,
3514                        source_worktree_id,
3515                        worktree_abs_path,
3516                        response,
3517                    ) in responses
3518                    {
3519                        symbols.extend(response.into_iter().flatten().filter_map(|lsp_symbol| {
3520                            let abs_path = lsp_symbol.location.uri.to_file_path().ok()?;
3521                            let mut worktree_id = source_worktree_id;
3522                            let path;
3523                            if let Some((worktree, rel_path)) =
3524                                this.find_local_worktree(&abs_path, cx)
3525                            {
3526                                worktree_id = worktree.read(cx).id();
3527                                path = rel_path;
3528                            } else {
3529                                path = relativize_path(&worktree_abs_path, &abs_path);
3530                            }
3531
3532                            let project_path = ProjectPath {
3533                                worktree_id,
3534                                path: path.into(),
3535                            };
3536                            let signature = this.symbol_signature(&project_path);
3537                            let adapter_language = adapter_language.clone();
3538                            let language = this
3539                                .languages
3540                                .language_for_path(&project_path.path)
3541                                .unwrap_or_else(move |_| adapter_language);
3542                            let language_server_name = adapter.name.clone();
3543                            Some(async move {
3544                                let language = language.await;
3545                                let label = language
3546                                    .label_for_symbol(&lsp_symbol.name, lsp_symbol.kind)
3547                                    .await;
3548
3549                                Symbol {
3550                                    language_server_name,
3551                                    source_worktree_id,
3552                                    path: project_path,
3553                                    label: label.unwrap_or_else(|| {
3554                                        CodeLabel::plain(lsp_symbol.name.clone(), None)
3555                                    }),
3556                                    kind: lsp_symbol.kind,
3557                                    name: lsp_symbol.name,
3558                                    range: range_from_lsp(lsp_symbol.location.range),
3559                                    signature,
3560                                }
3561                            })
3562                        }));
3563                    }
3564                    symbols
3565                });
3566                Ok(futures::future::join_all(symbols).await)
3567            })
3568        } else if let Some(project_id) = self.remote_id() {
3569            let request = self.client.request(proto::GetProjectSymbols {
3570                project_id,
3571                query: query.to_string(),
3572            });
3573            cx.spawn_weak(|this, cx| async move {
3574                let response = request.await?;
3575                let mut symbols = Vec::new();
3576                if let Some(this) = this.upgrade(&cx) {
3577                    let new_symbols = this.read_with(&cx, |this, _| {
3578                        response
3579                            .symbols
3580                            .into_iter()
3581                            .map(|symbol| this.deserialize_symbol(symbol))
3582                            .collect::<Vec<_>>()
3583                    });
3584                    symbols = futures::future::join_all(new_symbols)
3585                        .await
3586                        .into_iter()
3587                        .filter_map(|symbol| symbol.log_err())
3588                        .collect::<Vec<_>>();
3589                }
3590                Ok(symbols)
3591            })
3592        } else {
3593            Task::ready(Ok(Default::default()))
3594        }
3595    }
3596
3597    pub fn open_buffer_for_symbol(
3598        &mut self,
3599        symbol: &Symbol,
3600        cx: &mut ModelContext<Self>,
3601    ) -> Task<Result<ModelHandle<Buffer>>> {
3602        if self.is_local() {
3603            let language_server_id = if let Some(id) = self.language_server_ids.get(&(
3604                symbol.source_worktree_id,
3605                symbol.language_server_name.clone(),
3606            )) {
3607                *id
3608            } else {
3609                return Task::ready(Err(anyhow!(
3610                    "language server for worktree and language not found"
3611                )));
3612            };
3613
3614            let worktree_abs_path = if let Some(worktree_abs_path) = self
3615                .worktree_for_id(symbol.path.worktree_id, cx)
3616                .and_then(|worktree| worktree.read(cx).as_local())
3617                .map(|local_worktree| local_worktree.abs_path())
3618            {
3619                worktree_abs_path
3620            } else {
3621                return Task::ready(Err(anyhow!("worktree not found for symbol")));
3622            };
3623            let symbol_abs_path = worktree_abs_path.join(&symbol.path.path);
3624            let symbol_uri = if let Ok(uri) = lsp::Url::from_file_path(symbol_abs_path) {
3625                uri
3626            } else {
3627                return Task::ready(Err(anyhow!("invalid symbol path")));
3628            };
3629
3630            self.open_local_buffer_via_lsp(
3631                symbol_uri,
3632                language_server_id,
3633                symbol.language_server_name.clone(),
3634                cx,
3635            )
3636        } else if let Some(project_id) = self.remote_id() {
3637            let request = self.client.request(proto::OpenBufferForSymbol {
3638                project_id,
3639                symbol: Some(serialize_symbol(symbol)),
3640            });
3641            cx.spawn(|this, mut cx| async move {
3642                let response = request.await?;
3643                this.update(&mut cx, |this, cx| {
3644                    this.wait_for_remote_buffer(response.buffer_id, cx)
3645                })
3646                .await
3647            })
3648        } else {
3649            Task::ready(Err(anyhow!("project does not have a remote id")))
3650        }
3651    }
3652
3653    pub fn hover<T: ToPointUtf16>(
3654        &self,
3655        buffer: &ModelHandle<Buffer>,
3656        position: T,
3657        cx: &mut ModelContext<Self>,
3658    ) -> Task<Result<Option<Hover>>> {
3659        let position = position.to_point_utf16(buffer.read(cx));
3660        self.request_lsp(buffer.clone(), GetHover { position }, cx)
3661    }
3662
3663    pub fn completions<T: ToPointUtf16>(
3664        &self,
3665        buffer: &ModelHandle<Buffer>,
3666        position: T,
3667        cx: &mut ModelContext<Self>,
3668    ) -> Task<Result<Vec<Completion>>> {
3669        let position = position.to_point_utf16(buffer.read(cx));
3670        self.request_lsp(buffer.clone(), GetCompletions { position }, cx)
3671    }
3672
3673    pub fn apply_additional_edits_for_completion(
3674        &self,
3675        buffer_handle: ModelHandle<Buffer>,
3676        completion: Completion,
3677        push_to_history: bool,
3678        cx: &mut ModelContext<Self>,
3679    ) -> Task<Result<Option<Transaction>>> {
3680        let buffer = buffer_handle.read(cx);
3681        let buffer_id = buffer.remote_id();
3682
3683        if self.is_local() {
3684            let lang_server = match self.language_server_for_buffer(buffer, cx) {
3685                Some((_, server)) => server.clone(),
3686                _ => return Task::ready(Ok(Default::default())),
3687            };
3688
3689            cx.spawn(|this, mut cx| async move {
3690                let resolved_completion = lang_server
3691                    .request::<lsp::request::ResolveCompletionItem>(completion.lsp_completion)
3692                    .await?;
3693
3694                if let Some(edits) = resolved_completion.additional_text_edits {
3695                    let edits = this
3696                        .update(&mut cx, |this, cx| {
3697                            this.edits_from_lsp(&buffer_handle, edits, None, cx)
3698                        })
3699                        .await?;
3700
3701                    buffer_handle.update(&mut cx, |buffer, cx| {
3702                        buffer.finalize_last_transaction();
3703                        buffer.start_transaction();
3704
3705                        for (range, text) in edits {
3706                            let primary = &completion.old_range;
3707                            let start_within = primary.start.cmp(&range.start, buffer).is_le()
3708                                && primary.end.cmp(&range.start, buffer).is_ge();
3709                            let end_within = range.start.cmp(&primary.end, buffer).is_le()
3710                                && range.end.cmp(&primary.end, buffer).is_ge();
3711
3712                            //Skip addtional edits which overlap with the primary completion edit
3713                            //https://github.com/zed-industries/zed/pull/1871
3714                            if !start_within && !end_within {
3715                                buffer.edit([(range, text)], None, cx);
3716                            }
3717                        }
3718
3719                        let transaction = if buffer.end_transaction(cx).is_some() {
3720                            let transaction = buffer.finalize_last_transaction().unwrap().clone();
3721                            if !push_to_history {
3722                                buffer.forget_transaction(transaction.id);
3723                            }
3724                            Some(transaction)
3725                        } else {
3726                            None
3727                        };
3728                        Ok(transaction)
3729                    })
3730                } else {
3731                    Ok(None)
3732                }
3733            })
3734        } else if let Some(project_id) = self.remote_id() {
3735            let client = self.client.clone();
3736            cx.spawn(|_, mut cx| async move {
3737                let response = client
3738                    .request(proto::ApplyCompletionAdditionalEdits {
3739                        project_id,
3740                        buffer_id,
3741                        completion: Some(language::proto::serialize_completion(&completion)),
3742                    })
3743                    .await?;
3744
3745                if let Some(transaction) = response.transaction {
3746                    let transaction = language::proto::deserialize_transaction(transaction)?;
3747                    buffer_handle
3748                        .update(&mut cx, |buffer, _| {
3749                            buffer.wait_for_edits(transaction.edit_ids.iter().copied())
3750                        })
3751                        .await?;
3752                    if push_to_history {
3753                        buffer_handle.update(&mut cx, |buffer, _| {
3754                            buffer.push_transaction(transaction.clone(), Instant::now());
3755                        });
3756                    }
3757                    Ok(Some(transaction))
3758                } else {
3759                    Ok(None)
3760                }
3761            })
3762        } else {
3763            Task::ready(Err(anyhow!("project does not have a remote id")))
3764        }
3765    }
3766
3767    pub fn code_actions<T: Clone + ToOffset>(
3768        &self,
3769        buffer_handle: &ModelHandle<Buffer>,
3770        range: Range<T>,
3771        cx: &mut ModelContext<Self>,
3772    ) -> Task<Result<Vec<CodeAction>>> {
3773        let buffer = buffer_handle.read(cx);
3774        let range = buffer.anchor_before(range.start)..buffer.anchor_before(range.end);
3775        self.request_lsp(buffer_handle.clone(), GetCodeActions { range }, cx)
3776    }
3777
3778    pub fn apply_code_action(
3779        &self,
3780        buffer_handle: ModelHandle<Buffer>,
3781        mut action: CodeAction,
3782        push_to_history: bool,
3783        cx: &mut ModelContext<Self>,
3784    ) -> Task<Result<ProjectTransaction>> {
3785        if self.is_local() {
3786            let buffer = buffer_handle.read(cx);
3787            let (lsp_adapter, lang_server) =
3788                if let Some((adapter, server)) = self.language_server_for_buffer(buffer, cx) {
3789                    (adapter.clone(), server.clone())
3790                } else {
3791                    return Task::ready(Ok(Default::default()));
3792                };
3793            let range = action.range.to_point_utf16(buffer);
3794
3795            cx.spawn(|this, mut cx| async move {
3796                if let Some(lsp_range) = action
3797                    .lsp_action
3798                    .data
3799                    .as_mut()
3800                    .and_then(|d| d.get_mut("codeActionParams"))
3801                    .and_then(|d| d.get_mut("range"))
3802                {
3803                    *lsp_range = serde_json::to_value(&range_to_lsp(range)).unwrap();
3804                    action.lsp_action = lang_server
3805                        .request::<lsp::request::CodeActionResolveRequest>(action.lsp_action)
3806                        .await?;
3807                } else {
3808                    let actions = this
3809                        .update(&mut cx, |this, cx| {
3810                            this.code_actions(&buffer_handle, action.range, cx)
3811                        })
3812                        .await?;
3813                    action.lsp_action = actions
3814                        .into_iter()
3815                        .find(|a| a.lsp_action.title == action.lsp_action.title)
3816                        .ok_or_else(|| anyhow!("code action is outdated"))?
3817                        .lsp_action;
3818                }
3819
3820                if let Some(edit) = action.lsp_action.edit {
3821                    if edit.changes.is_some() || edit.document_changes.is_some() {
3822                        return Self::deserialize_workspace_edit(
3823                            this,
3824                            edit,
3825                            push_to_history,
3826                            lsp_adapter.clone(),
3827                            lang_server.clone(),
3828                            &mut cx,
3829                        )
3830                        .await;
3831                    }
3832                }
3833
3834                if let Some(command) = action.lsp_action.command {
3835                    this.update(&mut cx, |this, _| {
3836                        this.last_workspace_edits_by_language_server
3837                            .remove(&lang_server.server_id());
3838                    });
3839                    lang_server
3840                        .request::<lsp::request::ExecuteCommand>(lsp::ExecuteCommandParams {
3841                            command: command.command,
3842                            arguments: command.arguments.unwrap_or_default(),
3843                            ..Default::default()
3844                        })
3845                        .await?;
3846                    return Ok(this.update(&mut cx, |this, _| {
3847                        this.last_workspace_edits_by_language_server
3848                            .remove(&lang_server.server_id())
3849                            .unwrap_or_default()
3850                    }));
3851                }
3852
3853                Ok(ProjectTransaction::default())
3854            })
3855        } else if let Some(project_id) = self.remote_id() {
3856            let client = self.client.clone();
3857            let request = proto::ApplyCodeAction {
3858                project_id,
3859                buffer_id: buffer_handle.read(cx).remote_id(),
3860                action: Some(language::proto::serialize_code_action(&action)),
3861            };
3862            cx.spawn(|this, mut cx| async move {
3863                let response = client
3864                    .request(request)
3865                    .await?
3866                    .transaction
3867                    .ok_or_else(|| anyhow!("missing transaction"))?;
3868                this.update(&mut cx, |this, cx| {
3869                    this.deserialize_project_transaction(response, push_to_history, cx)
3870                })
3871                .await
3872            })
3873        } else {
3874            Task::ready(Err(anyhow!("project does not have a remote id")))
3875        }
3876    }
3877
3878    async fn deserialize_workspace_edit(
3879        this: ModelHandle<Self>,
3880        edit: lsp::WorkspaceEdit,
3881        push_to_history: bool,
3882        lsp_adapter: Arc<CachedLspAdapter>,
3883        language_server: Arc<LanguageServer>,
3884        cx: &mut AsyncAppContext,
3885    ) -> Result<ProjectTransaction> {
3886        let fs = this.read_with(cx, |this, _| this.fs.clone());
3887        let mut operations = Vec::new();
3888        if let Some(document_changes) = edit.document_changes {
3889            match document_changes {
3890                lsp::DocumentChanges::Edits(edits) => {
3891                    operations.extend(edits.into_iter().map(lsp::DocumentChangeOperation::Edit))
3892                }
3893                lsp::DocumentChanges::Operations(ops) => operations = ops,
3894            }
3895        } else if let Some(changes) = edit.changes {
3896            operations.extend(changes.into_iter().map(|(uri, edits)| {
3897                lsp::DocumentChangeOperation::Edit(lsp::TextDocumentEdit {
3898                    text_document: lsp::OptionalVersionedTextDocumentIdentifier {
3899                        uri,
3900                        version: None,
3901                    },
3902                    edits: edits.into_iter().map(lsp::OneOf::Left).collect(),
3903                })
3904            }));
3905        }
3906
3907        let mut project_transaction = ProjectTransaction::default();
3908        for operation in operations {
3909            match operation {
3910                lsp::DocumentChangeOperation::Op(lsp::ResourceOp::Create(op)) => {
3911                    let abs_path = op
3912                        .uri
3913                        .to_file_path()
3914                        .map_err(|_| anyhow!("can't convert URI to path"))?;
3915
3916                    if let Some(parent_path) = abs_path.parent() {
3917                        fs.create_dir(parent_path).await?;
3918                    }
3919                    if abs_path.ends_with("/") {
3920                        fs.create_dir(&abs_path).await?;
3921                    } else {
3922                        fs.create_file(&abs_path, op.options.map(Into::into).unwrap_or_default())
3923                            .await?;
3924                    }
3925                }
3926                lsp::DocumentChangeOperation::Op(lsp::ResourceOp::Rename(op)) => {
3927                    let source_abs_path = op
3928                        .old_uri
3929                        .to_file_path()
3930                        .map_err(|_| anyhow!("can't convert URI to path"))?;
3931                    let target_abs_path = op
3932                        .new_uri
3933                        .to_file_path()
3934                        .map_err(|_| anyhow!("can't convert URI to path"))?;
3935                    fs.rename(
3936                        &source_abs_path,
3937                        &target_abs_path,
3938                        op.options.map(Into::into).unwrap_or_default(),
3939                    )
3940                    .await?;
3941                }
3942                lsp::DocumentChangeOperation::Op(lsp::ResourceOp::Delete(op)) => {
3943                    let abs_path = op
3944                        .uri
3945                        .to_file_path()
3946                        .map_err(|_| anyhow!("can't convert URI to path"))?;
3947                    let options = op.options.map(Into::into).unwrap_or_default();
3948                    if abs_path.ends_with("/") {
3949                        fs.remove_dir(&abs_path, options).await?;
3950                    } else {
3951                        fs.remove_file(&abs_path, options).await?;
3952                    }
3953                }
3954                lsp::DocumentChangeOperation::Edit(op) => {
3955                    let buffer_to_edit = this
3956                        .update(cx, |this, cx| {
3957                            this.open_local_buffer_via_lsp(
3958                                op.text_document.uri,
3959                                language_server.server_id(),
3960                                lsp_adapter.name.clone(),
3961                                cx,
3962                            )
3963                        })
3964                        .await?;
3965
3966                    let edits = this
3967                        .update(cx, |this, cx| {
3968                            let edits = op.edits.into_iter().map(|edit| match edit {
3969                                lsp::OneOf::Left(edit) => edit,
3970                                lsp::OneOf::Right(edit) => edit.text_edit,
3971                            });
3972                            this.edits_from_lsp(
3973                                &buffer_to_edit,
3974                                edits,
3975                                op.text_document.version,
3976                                cx,
3977                            )
3978                        })
3979                        .await?;
3980
3981                    let transaction = buffer_to_edit.update(cx, |buffer, cx| {
3982                        buffer.finalize_last_transaction();
3983                        buffer.start_transaction();
3984                        for (range, text) in edits {
3985                            buffer.edit([(range, text)], None, cx);
3986                        }
3987                        let transaction = if buffer.end_transaction(cx).is_some() {
3988                            let transaction = buffer.finalize_last_transaction().unwrap().clone();
3989                            if !push_to_history {
3990                                buffer.forget_transaction(transaction.id);
3991                            }
3992                            Some(transaction)
3993                        } else {
3994                            None
3995                        };
3996
3997                        transaction
3998                    });
3999                    if let Some(transaction) = transaction {
4000                        project_transaction.0.insert(buffer_to_edit, transaction);
4001                    }
4002                }
4003            }
4004        }
4005
4006        Ok(project_transaction)
4007    }
4008
4009    pub fn prepare_rename<T: ToPointUtf16>(
4010        &self,
4011        buffer: ModelHandle<Buffer>,
4012        position: T,
4013        cx: &mut ModelContext<Self>,
4014    ) -> Task<Result<Option<Range<Anchor>>>> {
4015        let position = position.to_point_utf16(buffer.read(cx));
4016        self.request_lsp(buffer, PrepareRename { position }, cx)
4017    }
4018
4019    pub fn perform_rename<T: ToPointUtf16>(
4020        &self,
4021        buffer: ModelHandle<Buffer>,
4022        position: T,
4023        new_name: String,
4024        push_to_history: bool,
4025        cx: &mut ModelContext<Self>,
4026    ) -> Task<Result<ProjectTransaction>> {
4027        let position = position.to_point_utf16(buffer.read(cx));
4028        self.request_lsp(
4029            buffer,
4030            PerformRename {
4031                position,
4032                new_name,
4033                push_to_history,
4034            },
4035            cx,
4036        )
4037    }
4038
4039    #[allow(clippy::type_complexity)]
4040    pub fn search(
4041        &self,
4042        query: SearchQuery,
4043        cx: &mut ModelContext<Self>,
4044    ) -> Task<Result<HashMap<ModelHandle<Buffer>, Vec<Range<Anchor>>>>> {
4045        if self.is_local() {
4046            let snapshots = self
4047                .visible_worktrees(cx)
4048                .filter_map(|tree| {
4049                    let tree = tree.read(cx).as_local()?;
4050                    Some(tree.snapshot())
4051                })
4052                .collect::<Vec<_>>();
4053
4054            let background = cx.background().clone();
4055            let path_count: usize = snapshots.iter().map(|s| s.visible_file_count()).sum();
4056            if path_count == 0 {
4057                return Task::ready(Ok(Default::default()));
4058            }
4059            let workers = background.num_cpus().min(path_count);
4060            let (matching_paths_tx, mut matching_paths_rx) = smol::channel::bounded(1024);
4061            cx.background()
4062                .spawn({
4063                    let fs = self.fs.clone();
4064                    let background = cx.background().clone();
4065                    let query = query.clone();
4066                    async move {
4067                        let fs = &fs;
4068                        let query = &query;
4069                        let matching_paths_tx = &matching_paths_tx;
4070                        let paths_per_worker = (path_count + workers - 1) / workers;
4071                        let snapshots = &snapshots;
4072                        background
4073                            .scoped(|scope| {
4074                                for worker_ix in 0..workers {
4075                                    let worker_start_ix = worker_ix * paths_per_worker;
4076                                    let worker_end_ix = worker_start_ix + paths_per_worker;
4077                                    scope.spawn(async move {
4078                                        let mut snapshot_start_ix = 0;
4079                                        let mut abs_path = PathBuf::new();
4080                                        for snapshot in snapshots {
4081                                            let snapshot_end_ix =
4082                                                snapshot_start_ix + snapshot.visible_file_count();
4083                                            if worker_end_ix <= snapshot_start_ix {
4084                                                break;
4085                                            } else if worker_start_ix > snapshot_end_ix {
4086                                                snapshot_start_ix = snapshot_end_ix;
4087                                                continue;
4088                                            } else {
4089                                                let start_in_snapshot = worker_start_ix
4090                                                    .saturating_sub(snapshot_start_ix);
4091                                                let end_in_snapshot =
4092                                                    cmp::min(worker_end_ix, snapshot_end_ix)
4093                                                        - snapshot_start_ix;
4094
4095                                                for entry in snapshot
4096                                                    .files(false, start_in_snapshot)
4097                                                    .take(end_in_snapshot - start_in_snapshot)
4098                                                {
4099                                                    if matching_paths_tx.is_closed() {
4100                                                        break;
4101                                                    }
4102
4103                                                    abs_path.clear();
4104                                                    abs_path.push(&snapshot.abs_path());
4105                                                    abs_path.push(&entry.path);
4106                                                    let matches = if let Some(file) =
4107                                                        fs.open_sync(&abs_path).await.log_err()
4108                                                    {
4109                                                        query.detect(file).unwrap_or(false)
4110                                                    } else {
4111                                                        false
4112                                                    };
4113
4114                                                    if matches {
4115                                                        let project_path =
4116                                                            (snapshot.id(), entry.path.clone());
4117                                                        if matching_paths_tx
4118                                                            .send(project_path)
4119                                                            .await
4120                                                            .is_err()
4121                                                        {
4122                                                            break;
4123                                                        }
4124                                                    }
4125                                                }
4126
4127                                                snapshot_start_ix = snapshot_end_ix;
4128                                            }
4129                                        }
4130                                    });
4131                                }
4132                            })
4133                            .await;
4134                    }
4135                })
4136                .detach();
4137
4138            let (buffers_tx, buffers_rx) = smol::channel::bounded(1024);
4139            let open_buffers = self
4140                .opened_buffers
4141                .values()
4142                .filter_map(|b| b.upgrade(cx))
4143                .collect::<HashSet<_>>();
4144            cx.spawn(|this, cx| async move {
4145                for buffer in &open_buffers {
4146                    let snapshot = buffer.read_with(&cx, |buffer, _| buffer.snapshot());
4147                    buffers_tx.send((buffer.clone(), snapshot)).await?;
4148                }
4149
4150                let open_buffers = Rc::new(RefCell::new(open_buffers));
4151                while let Some(project_path) = matching_paths_rx.next().await {
4152                    if buffers_tx.is_closed() {
4153                        break;
4154                    }
4155
4156                    let this = this.clone();
4157                    let open_buffers = open_buffers.clone();
4158                    let buffers_tx = buffers_tx.clone();
4159                    cx.spawn(|mut cx| async move {
4160                        if let Some(buffer) = this
4161                            .update(&mut cx, |this, cx| this.open_buffer(project_path, cx))
4162                            .await
4163                            .log_err()
4164                        {
4165                            if open_buffers.borrow_mut().insert(buffer.clone()) {
4166                                let snapshot = buffer.read_with(&cx, |buffer, _| buffer.snapshot());
4167                                buffers_tx.send((buffer, snapshot)).await?;
4168                            }
4169                        }
4170
4171                        Ok::<_, anyhow::Error>(())
4172                    })
4173                    .detach();
4174                }
4175
4176                Ok::<_, anyhow::Error>(())
4177            })
4178            .detach_and_log_err(cx);
4179
4180            let background = cx.background().clone();
4181            cx.background().spawn(async move {
4182                let query = &query;
4183                let mut matched_buffers = Vec::new();
4184                for _ in 0..workers {
4185                    matched_buffers.push(HashMap::default());
4186                }
4187                background
4188                    .scoped(|scope| {
4189                        for worker_matched_buffers in matched_buffers.iter_mut() {
4190                            let mut buffers_rx = buffers_rx.clone();
4191                            scope.spawn(async move {
4192                                while let Some((buffer, snapshot)) = buffers_rx.next().await {
4193                                    let buffer_matches = query
4194                                        .search(snapshot.as_rope())
4195                                        .await
4196                                        .iter()
4197                                        .map(|range| {
4198                                            snapshot.anchor_before(range.start)
4199                                                ..snapshot.anchor_after(range.end)
4200                                        })
4201                                        .collect::<Vec<_>>();
4202                                    if !buffer_matches.is_empty() {
4203                                        worker_matched_buffers
4204                                            .insert(buffer.clone(), buffer_matches);
4205                                    }
4206                                }
4207                            });
4208                        }
4209                    })
4210                    .await;
4211                Ok(matched_buffers.into_iter().flatten().collect())
4212            })
4213        } else if let Some(project_id) = self.remote_id() {
4214            let request = self.client.request(query.to_proto(project_id));
4215            cx.spawn(|this, mut cx| async move {
4216                let response = request.await?;
4217                let mut result = HashMap::default();
4218                for location in response.locations {
4219                    let target_buffer = this
4220                        .update(&mut cx, |this, cx| {
4221                            this.wait_for_remote_buffer(location.buffer_id, cx)
4222                        })
4223                        .await?;
4224                    let start = location
4225                        .start
4226                        .and_then(deserialize_anchor)
4227                        .ok_or_else(|| anyhow!("missing target start"))?;
4228                    let end = location
4229                        .end
4230                        .and_then(deserialize_anchor)
4231                        .ok_or_else(|| anyhow!("missing target end"))?;
4232                    result
4233                        .entry(target_buffer)
4234                        .or_insert(Vec::new())
4235                        .push(start..end)
4236                }
4237                Ok(result)
4238            })
4239        } else {
4240            Task::ready(Ok(Default::default()))
4241        }
4242    }
4243
4244    fn request_lsp<R: LspCommand>(
4245        &self,
4246        buffer_handle: ModelHandle<Buffer>,
4247        request: R,
4248        cx: &mut ModelContext<Self>,
4249    ) -> Task<Result<R::Response>>
4250    where
4251        <R::LspRequest as lsp::request::Request>::Result: Send,
4252    {
4253        let buffer = buffer_handle.read(cx);
4254        if self.is_local() {
4255            let file = File::from_dyn(buffer.file()).and_then(File::as_local);
4256            if let Some((file, language_server)) = file.zip(
4257                self.language_server_for_buffer(buffer, cx)
4258                    .map(|(_, server)| server.clone()),
4259            ) {
4260                let lsp_params = request.to_lsp(&file.abs_path(cx), buffer, &language_server, cx);
4261                return cx.spawn(|this, cx| async move {
4262                    if !request.check_capabilities(language_server.capabilities()) {
4263                        return Ok(Default::default());
4264                    }
4265
4266                    let response = language_server
4267                        .request::<R::LspRequest>(lsp_params)
4268                        .await
4269                        .context("lsp request failed")?;
4270                    request
4271                        .response_from_lsp(response, this, buffer_handle, cx)
4272                        .await
4273                });
4274            }
4275        } else if let Some(project_id) = self.remote_id() {
4276            let rpc = self.client.clone();
4277            let message = request.to_proto(project_id, buffer);
4278            return cx.spawn_weak(|this, cx| async move {
4279                // Ensure the project is still alive by the time the task
4280                // is scheduled.
4281                this.upgrade(&cx)
4282                    .ok_or_else(|| anyhow!("project dropped"))?;
4283
4284                let response = rpc.request(message).await?;
4285
4286                let this = this
4287                    .upgrade(&cx)
4288                    .ok_or_else(|| anyhow!("project dropped"))?;
4289                if this.read_with(&cx, |this, _| this.is_read_only()) {
4290                    Err(anyhow!("disconnected before completing request"))
4291                } else {
4292                    request
4293                        .response_from_proto(response, this, buffer_handle, cx)
4294                        .await
4295                }
4296            });
4297        }
4298        Task::ready(Ok(Default::default()))
4299    }
4300
4301    pub fn find_or_create_local_worktree(
4302        &mut self,
4303        abs_path: impl AsRef<Path>,
4304        visible: bool,
4305        cx: &mut ModelContext<Self>,
4306    ) -> Task<Result<(ModelHandle<Worktree>, PathBuf)>> {
4307        let abs_path = abs_path.as_ref();
4308        if let Some((tree, relative_path)) = self.find_local_worktree(abs_path, cx) {
4309            Task::ready(Ok((tree, relative_path)))
4310        } else {
4311            let worktree = self.create_local_worktree(abs_path, visible, cx);
4312            cx.foreground()
4313                .spawn(async move { Ok((worktree.await?, PathBuf::new())) })
4314        }
4315    }
4316
4317    pub fn find_local_worktree(
4318        &self,
4319        abs_path: &Path,
4320        cx: &AppContext,
4321    ) -> Option<(ModelHandle<Worktree>, PathBuf)> {
4322        for tree in &self.worktrees {
4323            if let Some(tree) = tree.upgrade(cx) {
4324                if let Some(relative_path) = tree
4325                    .read(cx)
4326                    .as_local()
4327                    .and_then(|t| abs_path.strip_prefix(t.abs_path()).ok())
4328                {
4329                    return Some((tree.clone(), relative_path.into()));
4330                }
4331            }
4332        }
4333        None
4334    }
4335
4336    pub fn is_shared(&self) -> bool {
4337        match &self.client_state {
4338            Some(ProjectClientState::Local { .. }) => true,
4339            _ => false,
4340        }
4341    }
4342
4343    fn create_local_worktree(
4344        &mut self,
4345        abs_path: impl AsRef<Path>,
4346        visible: bool,
4347        cx: &mut ModelContext<Self>,
4348    ) -> Task<Result<ModelHandle<Worktree>>> {
4349        let fs = self.fs.clone();
4350        let client = self.client.clone();
4351        let next_entry_id = self.next_entry_id.clone();
4352        let path: Arc<Path> = abs_path.as_ref().into();
4353        let task = self
4354            .loading_local_worktrees
4355            .entry(path.clone())
4356            .or_insert_with(|| {
4357                cx.spawn(|project, mut cx| {
4358                    async move {
4359                        let worktree = Worktree::local(
4360                            client.clone(),
4361                            path.clone(),
4362                            visible,
4363                            fs,
4364                            next_entry_id,
4365                            &mut cx,
4366                        )
4367                        .await;
4368
4369                        project.update(&mut cx, |project, _| {
4370                            project.loading_local_worktrees.remove(&path);
4371                        });
4372
4373                        let worktree = worktree?;
4374                        project.update(&mut cx, |project, cx| project.add_worktree(&worktree, cx));
4375                        Ok(worktree)
4376                    }
4377                    .map_err(Arc::new)
4378                })
4379                .shared()
4380            })
4381            .clone();
4382        cx.foreground().spawn(async move {
4383            match task.await {
4384                Ok(worktree) => Ok(worktree),
4385                Err(err) => Err(anyhow!("{}", err)),
4386            }
4387        })
4388    }
4389
4390    pub fn remove_worktree(&mut self, id_to_remove: WorktreeId, cx: &mut ModelContext<Self>) {
4391        self.worktrees.retain(|worktree| {
4392            if let Some(worktree) = worktree.upgrade(cx) {
4393                let id = worktree.read(cx).id();
4394                if id == id_to_remove {
4395                    cx.emit(Event::WorktreeRemoved(id));
4396                    false
4397                } else {
4398                    true
4399                }
4400            } else {
4401                false
4402            }
4403        });
4404        self.metadata_changed(cx);
4405    }
4406
4407    fn add_worktree(&mut self, worktree: &ModelHandle<Worktree>, cx: &mut ModelContext<Self>) {
4408        cx.observe(worktree, |_, _, cx| cx.notify()).detach();
4409        if worktree.read(cx).is_local() {
4410            cx.subscribe(worktree, |this, worktree, event, cx| match event {
4411                worktree::Event::UpdatedEntries(changes) => {
4412                    this.update_local_worktree_buffers(&worktree, cx);
4413                    this.update_local_worktree_language_servers(&worktree, changes, cx);
4414                }
4415                worktree::Event::UpdatedGitRepositories(updated_repos) => {
4416                    this.update_local_worktree_buffers_git_repos(worktree, updated_repos, cx)
4417                }
4418            })
4419            .detach();
4420        }
4421
4422        let push_strong_handle = {
4423            let worktree = worktree.read(cx);
4424            self.is_shared() || worktree.is_visible() || worktree.is_remote()
4425        };
4426        if push_strong_handle {
4427            self.worktrees
4428                .push(WorktreeHandle::Strong(worktree.clone()));
4429        } else {
4430            self.worktrees
4431                .push(WorktreeHandle::Weak(worktree.downgrade()));
4432        }
4433
4434        cx.observe_release(worktree, |this, worktree, cx| {
4435            let _ = this.remove_worktree(worktree.id(), cx);
4436        })
4437        .detach();
4438
4439        cx.emit(Event::WorktreeAdded);
4440        self.metadata_changed(cx);
4441    }
4442
4443    fn update_local_worktree_buffers(
4444        &mut self,
4445        worktree_handle: &ModelHandle<Worktree>,
4446        cx: &mut ModelContext<Self>,
4447    ) {
4448        let snapshot = worktree_handle.read(cx).snapshot();
4449        let mut buffers_to_delete = Vec::new();
4450        let mut renamed_buffers = Vec::new();
4451        for (buffer_id, buffer) in &self.opened_buffers {
4452            if let Some(buffer) = buffer.upgrade(cx) {
4453                buffer.update(cx, |buffer, cx| {
4454                    if let Some(old_file) = File::from_dyn(buffer.file()) {
4455                        if old_file.worktree != *worktree_handle {
4456                            return;
4457                        }
4458
4459                        let new_file = if let Some(entry) = snapshot.entry_for_id(old_file.entry_id)
4460                        {
4461                            File {
4462                                is_local: true,
4463                                entry_id: entry.id,
4464                                mtime: entry.mtime,
4465                                path: entry.path.clone(),
4466                                worktree: worktree_handle.clone(),
4467                                is_deleted: false,
4468                            }
4469                        } else if let Some(entry) =
4470                            snapshot.entry_for_path(old_file.path().as_ref())
4471                        {
4472                            File {
4473                                is_local: true,
4474                                entry_id: entry.id,
4475                                mtime: entry.mtime,
4476                                path: entry.path.clone(),
4477                                worktree: worktree_handle.clone(),
4478                                is_deleted: false,
4479                            }
4480                        } else {
4481                            File {
4482                                is_local: true,
4483                                entry_id: old_file.entry_id,
4484                                path: old_file.path().clone(),
4485                                mtime: old_file.mtime(),
4486                                worktree: worktree_handle.clone(),
4487                                is_deleted: true,
4488                            }
4489                        };
4490
4491                        let old_path = old_file.abs_path(cx);
4492                        if new_file.abs_path(cx) != old_path {
4493                            renamed_buffers.push((cx.handle(), old_path));
4494                        }
4495
4496                        if new_file != *old_file {
4497                            if let Some(project_id) = self.remote_id() {
4498                                self.client
4499                                    .send(proto::UpdateBufferFile {
4500                                        project_id,
4501                                        buffer_id: *buffer_id as u64,
4502                                        file: Some(new_file.to_proto()),
4503                                    })
4504                                    .log_err();
4505                            }
4506
4507                            buffer.file_updated(Arc::new(new_file), cx).detach();
4508                        }
4509                    }
4510                });
4511            } else {
4512                buffers_to_delete.push(*buffer_id);
4513            }
4514        }
4515
4516        for buffer_id in buffers_to_delete {
4517            self.opened_buffers.remove(&buffer_id);
4518        }
4519
4520        for (buffer, old_path) in renamed_buffers {
4521            self.unregister_buffer_from_language_server(&buffer, old_path, cx);
4522            self.detect_language_for_buffer(&buffer, cx);
4523            self.register_buffer_with_language_server(&buffer, cx);
4524        }
4525    }
4526
4527    fn update_local_worktree_language_servers(
4528        &mut self,
4529        worktree_handle: &ModelHandle<Worktree>,
4530        changes: &HashMap<Arc<Path>, PathChange>,
4531        cx: &mut ModelContext<Self>,
4532    ) {
4533        let worktree_id = worktree_handle.read(cx).id();
4534        let abs_path = worktree_handle.read(cx).abs_path();
4535        for ((server_worktree_id, _), server_id) in &self.language_server_ids {
4536            if *server_worktree_id == worktree_id {
4537                if let Some(server) = self.language_servers.get(server_id) {
4538                    if let LanguageServerState::Running {
4539                        server,
4540                        watched_paths,
4541                        ..
4542                    } = server
4543                    {
4544                        let params = lsp::DidChangeWatchedFilesParams {
4545                            changes: changes
4546                                .iter()
4547                                .filter_map(|(path, change)| {
4548                                    let path = abs_path.join(path);
4549                                    if watched_paths.matches(&path) {
4550                                        Some(lsp::FileEvent {
4551                                            uri: lsp::Url::from_file_path(path).unwrap(),
4552                                            typ: match change {
4553                                                PathChange::Added => lsp::FileChangeType::CREATED,
4554                                                PathChange::Removed => lsp::FileChangeType::DELETED,
4555                                                PathChange::Updated
4556                                                | PathChange::AddedOrUpdated => {
4557                                                    lsp::FileChangeType::CHANGED
4558                                                }
4559                                            },
4560                                        })
4561                                    } else {
4562                                        None
4563                                    }
4564                                })
4565                                .collect(),
4566                        };
4567
4568                        if !params.changes.is_empty() {
4569                            server
4570                                .notify::<lsp::notification::DidChangeWatchedFiles>(params)
4571                                .log_err();
4572                        }
4573                    }
4574                }
4575            }
4576        }
4577    }
4578
4579    fn update_local_worktree_buffers_git_repos(
4580        &mut self,
4581        worktree: ModelHandle<Worktree>,
4582        repos: &[GitRepositoryEntry],
4583        cx: &mut ModelContext<Self>,
4584    ) {
4585        for (_, buffer) in &self.opened_buffers {
4586            if let Some(buffer) = buffer.upgrade(cx) {
4587                let file = match File::from_dyn(buffer.read(cx).file()) {
4588                    Some(file) => file,
4589                    None => continue,
4590                };
4591                if file.worktree != worktree {
4592                    continue;
4593                }
4594
4595                let path = file.path().clone();
4596
4597                let repo = match repos.iter().find(|repo| repo.manages(&path)) {
4598                    Some(repo) => repo.clone(),
4599                    None => return,
4600                };
4601
4602                let relative_repo = match path.strip_prefix(repo.content_path) {
4603                    Ok(relative_repo) => relative_repo.to_owned(),
4604                    Err(_) => return,
4605                };
4606
4607                let remote_id = self.remote_id();
4608                let client = self.client.clone();
4609
4610                cx.spawn(|_, mut cx| async move {
4611                    let diff_base = cx
4612                        .background()
4613                        .spawn(async move { repo.repo.lock().load_index_text(&relative_repo) })
4614                        .await;
4615
4616                    let buffer_id = buffer.update(&mut cx, |buffer, cx| {
4617                        buffer.set_diff_base(diff_base.clone(), cx);
4618                        buffer.remote_id()
4619                    });
4620
4621                    if let Some(project_id) = remote_id {
4622                        client
4623                            .send(proto::UpdateDiffBase {
4624                                project_id,
4625                                buffer_id: buffer_id as u64,
4626                                diff_base,
4627                            })
4628                            .log_err();
4629                    }
4630                })
4631                .detach();
4632            }
4633        }
4634    }
4635
4636    pub fn set_active_path(&mut self, entry: Option<ProjectPath>, cx: &mut ModelContext<Self>) {
4637        let new_active_entry = entry.and_then(|project_path| {
4638            let worktree = self.worktree_for_id(project_path.worktree_id, cx)?;
4639            let entry = worktree.read(cx).entry_for_path(project_path.path)?;
4640            Some(entry.id)
4641        });
4642        if new_active_entry != self.active_entry {
4643            self.active_entry = new_active_entry;
4644            cx.emit(Event::ActiveEntryChanged(new_active_entry));
4645        }
4646    }
4647
4648    pub fn language_servers_running_disk_based_diagnostics(
4649        &self,
4650    ) -> impl Iterator<Item = usize> + '_ {
4651        self.language_server_statuses
4652            .iter()
4653            .filter_map(|(id, status)| {
4654                if status.has_pending_diagnostic_updates {
4655                    Some(*id)
4656                } else {
4657                    None
4658                }
4659            })
4660    }
4661
4662    pub fn diagnostic_summary(&self, cx: &AppContext) -> DiagnosticSummary {
4663        let mut summary = DiagnosticSummary::default();
4664        for (_, path_summary) in self.diagnostic_summaries(cx) {
4665            summary.error_count += path_summary.error_count;
4666            summary.warning_count += path_summary.warning_count;
4667        }
4668        summary
4669    }
4670
4671    pub fn diagnostic_summaries<'a>(
4672        &'a self,
4673        cx: &'a AppContext,
4674    ) -> impl Iterator<Item = (ProjectPath, DiagnosticSummary)> + 'a {
4675        self.visible_worktrees(cx).flat_map(move |worktree| {
4676            let worktree = worktree.read(cx);
4677            let worktree_id = worktree.id();
4678            worktree
4679                .diagnostic_summaries()
4680                .map(move |(path, summary)| (ProjectPath { worktree_id, path }, summary))
4681        })
4682    }
4683
4684    pub fn disk_based_diagnostics_started(
4685        &mut self,
4686        language_server_id: usize,
4687        cx: &mut ModelContext<Self>,
4688    ) {
4689        cx.emit(Event::DiskBasedDiagnosticsStarted { language_server_id });
4690    }
4691
4692    pub fn disk_based_diagnostics_finished(
4693        &mut self,
4694        language_server_id: usize,
4695        cx: &mut ModelContext<Self>,
4696    ) {
4697        cx.emit(Event::DiskBasedDiagnosticsFinished { language_server_id });
4698    }
4699
4700    pub fn active_entry(&self) -> Option<ProjectEntryId> {
4701        self.active_entry
4702    }
4703
4704    pub fn entry_for_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Entry> {
4705        self.worktree_for_id(path.worktree_id, cx)?
4706            .read(cx)
4707            .entry_for_path(&path.path)
4708            .cloned()
4709    }
4710
4711    pub fn path_for_entry(&self, entry_id: ProjectEntryId, cx: &AppContext) -> Option<ProjectPath> {
4712        let worktree = self.worktree_for_entry(entry_id, cx)?;
4713        let worktree = worktree.read(cx);
4714        let worktree_id = worktree.id();
4715        let path = worktree.entry_for_id(entry_id)?.path.clone();
4716        Some(ProjectPath { worktree_id, path })
4717    }
4718
4719    // RPC message handlers
4720
4721    async fn handle_unshare_project(
4722        this: ModelHandle<Self>,
4723        _: TypedEnvelope<proto::UnshareProject>,
4724        _: Arc<Client>,
4725        mut cx: AsyncAppContext,
4726    ) -> Result<()> {
4727        this.update(&mut cx, |this, cx| {
4728            if this.is_local() {
4729                this.unshare(cx)?;
4730            } else {
4731                this.disconnected_from_host(cx);
4732            }
4733            Ok(())
4734        })
4735    }
4736
4737    async fn handle_add_collaborator(
4738        this: ModelHandle<Self>,
4739        mut envelope: TypedEnvelope<proto::AddProjectCollaborator>,
4740        _: Arc<Client>,
4741        mut cx: AsyncAppContext,
4742    ) -> Result<()> {
4743        let collaborator = envelope
4744            .payload
4745            .collaborator
4746            .take()
4747            .ok_or_else(|| anyhow!("empty collaborator"))?;
4748
4749        let collaborator = Collaborator::from_proto(collaborator)?;
4750        this.update(&mut cx, |this, cx| {
4751            this.shared_buffers.remove(&collaborator.peer_id);
4752            this.collaborators
4753                .insert(collaborator.peer_id, collaborator);
4754            cx.notify();
4755        });
4756
4757        Ok(())
4758    }
4759
4760    async fn handle_update_project_collaborator(
4761        this: ModelHandle<Self>,
4762        envelope: TypedEnvelope<proto::UpdateProjectCollaborator>,
4763        _: Arc<Client>,
4764        mut cx: AsyncAppContext,
4765    ) -> Result<()> {
4766        let old_peer_id = envelope
4767            .payload
4768            .old_peer_id
4769            .ok_or_else(|| anyhow!("missing old peer id"))?;
4770        let new_peer_id = envelope
4771            .payload
4772            .new_peer_id
4773            .ok_or_else(|| anyhow!("missing new peer id"))?;
4774        this.update(&mut cx, |this, cx| {
4775            let collaborator = this
4776                .collaborators
4777                .remove(&old_peer_id)
4778                .ok_or_else(|| anyhow!("received UpdateProjectCollaborator for unknown peer"))?;
4779            let is_host = collaborator.replica_id == 0;
4780            this.collaborators.insert(new_peer_id, collaborator);
4781
4782            let buffers = this.shared_buffers.remove(&old_peer_id);
4783            log::info!(
4784                "peer {} became {}. moving buffers {:?}",
4785                old_peer_id,
4786                new_peer_id,
4787                &buffers
4788            );
4789            if let Some(buffers) = buffers {
4790                this.shared_buffers.insert(new_peer_id, buffers);
4791            }
4792
4793            if is_host {
4794                this.opened_buffers
4795                    .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
4796                this.buffer_ordered_messages_tx
4797                    .unbounded_send(BufferOrderedMessage::Resync)
4798                    .unwrap();
4799            }
4800
4801            cx.emit(Event::CollaboratorUpdated {
4802                old_peer_id,
4803                new_peer_id,
4804            });
4805            cx.notify();
4806            Ok(())
4807        })
4808    }
4809
4810    async fn handle_remove_collaborator(
4811        this: ModelHandle<Self>,
4812        envelope: TypedEnvelope<proto::RemoveProjectCollaborator>,
4813        _: Arc<Client>,
4814        mut cx: AsyncAppContext,
4815    ) -> Result<()> {
4816        this.update(&mut cx, |this, cx| {
4817            let peer_id = envelope
4818                .payload
4819                .peer_id
4820                .ok_or_else(|| anyhow!("invalid peer id"))?;
4821            let replica_id = this
4822                .collaborators
4823                .remove(&peer_id)
4824                .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?
4825                .replica_id;
4826            for buffer in this.opened_buffers.values() {
4827                if let Some(buffer) = buffer.upgrade(cx) {
4828                    buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
4829                }
4830            }
4831            this.shared_buffers.remove(&peer_id);
4832
4833            cx.emit(Event::CollaboratorLeft(peer_id));
4834            cx.notify();
4835            Ok(())
4836        })
4837    }
4838
4839    async fn handle_update_project(
4840        this: ModelHandle<Self>,
4841        envelope: TypedEnvelope<proto::UpdateProject>,
4842        _: Arc<Client>,
4843        mut cx: AsyncAppContext,
4844    ) -> Result<()> {
4845        this.update(&mut cx, |this, cx| {
4846            // Don't handle messages that were sent before the response to us joining the project
4847            if envelope.message_id > this.join_project_response_message_id {
4848                this.set_worktrees_from_proto(envelope.payload.worktrees, cx)?;
4849            }
4850            Ok(())
4851        })
4852    }
4853
4854    async fn handle_update_worktree(
4855        this: ModelHandle<Self>,
4856        envelope: TypedEnvelope<proto::UpdateWorktree>,
4857        _: Arc<Client>,
4858        mut cx: AsyncAppContext,
4859    ) -> Result<()> {
4860        this.update(&mut cx, |this, cx| {
4861            let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
4862            if let Some(worktree) = this.worktree_for_id(worktree_id, cx) {
4863                worktree.update(cx, |worktree, _| {
4864                    let worktree = worktree.as_remote_mut().unwrap();
4865                    worktree.update_from_remote(envelope.payload);
4866                });
4867            }
4868            Ok(())
4869        })
4870    }
4871
4872    async fn handle_create_project_entry(
4873        this: ModelHandle<Self>,
4874        envelope: TypedEnvelope<proto::CreateProjectEntry>,
4875        _: Arc<Client>,
4876        mut cx: AsyncAppContext,
4877    ) -> Result<proto::ProjectEntryResponse> {
4878        let worktree = this.update(&mut cx, |this, cx| {
4879            let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
4880            this.worktree_for_id(worktree_id, cx)
4881                .ok_or_else(|| anyhow!("worktree not found"))
4882        })?;
4883        let worktree_scan_id = worktree.read_with(&cx, |worktree, _| worktree.scan_id());
4884        let entry = worktree
4885            .update(&mut cx, |worktree, cx| {
4886                let worktree = worktree.as_local_mut().unwrap();
4887                let path = PathBuf::from(envelope.payload.path);
4888                worktree.create_entry(path, envelope.payload.is_directory, cx)
4889            })
4890            .await?;
4891        Ok(proto::ProjectEntryResponse {
4892            entry: Some((&entry).into()),
4893            worktree_scan_id: worktree_scan_id as u64,
4894        })
4895    }
4896
4897    async fn handle_rename_project_entry(
4898        this: ModelHandle<Self>,
4899        envelope: TypedEnvelope<proto::RenameProjectEntry>,
4900        _: Arc<Client>,
4901        mut cx: AsyncAppContext,
4902    ) -> Result<proto::ProjectEntryResponse> {
4903        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
4904        let worktree = this.read_with(&cx, |this, cx| {
4905            this.worktree_for_entry(entry_id, cx)
4906                .ok_or_else(|| anyhow!("worktree not found"))
4907        })?;
4908        let worktree_scan_id = worktree.read_with(&cx, |worktree, _| worktree.scan_id());
4909        let entry = worktree
4910            .update(&mut cx, |worktree, cx| {
4911                let new_path = PathBuf::from(envelope.payload.new_path);
4912                worktree
4913                    .as_local_mut()
4914                    .unwrap()
4915                    .rename_entry(entry_id, new_path, cx)
4916                    .ok_or_else(|| anyhow!("invalid entry"))
4917            })?
4918            .await?;
4919        Ok(proto::ProjectEntryResponse {
4920            entry: Some((&entry).into()),
4921            worktree_scan_id: worktree_scan_id as u64,
4922        })
4923    }
4924
4925    async fn handle_copy_project_entry(
4926        this: ModelHandle<Self>,
4927        envelope: TypedEnvelope<proto::CopyProjectEntry>,
4928        _: Arc<Client>,
4929        mut cx: AsyncAppContext,
4930    ) -> Result<proto::ProjectEntryResponse> {
4931        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
4932        let worktree = this.read_with(&cx, |this, cx| {
4933            this.worktree_for_entry(entry_id, cx)
4934                .ok_or_else(|| anyhow!("worktree not found"))
4935        })?;
4936        let worktree_scan_id = worktree.read_with(&cx, |worktree, _| worktree.scan_id());
4937        let entry = worktree
4938            .update(&mut cx, |worktree, cx| {
4939                let new_path = PathBuf::from(envelope.payload.new_path);
4940                worktree
4941                    .as_local_mut()
4942                    .unwrap()
4943                    .copy_entry(entry_id, new_path, cx)
4944                    .ok_or_else(|| anyhow!("invalid entry"))
4945            })?
4946            .await?;
4947        Ok(proto::ProjectEntryResponse {
4948            entry: Some((&entry).into()),
4949            worktree_scan_id: worktree_scan_id as u64,
4950        })
4951    }
4952
4953    async fn handle_delete_project_entry(
4954        this: ModelHandle<Self>,
4955        envelope: TypedEnvelope<proto::DeleteProjectEntry>,
4956        _: Arc<Client>,
4957        mut cx: AsyncAppContext,
4958    ) -> Result<proto::ProjectEntryResponse> {
4959        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
4960        let worktree = this.read_with(&cx, |this, cx| {
4961            this.worktree_for_entry(entry_id, cx)
4962                .ok_or_else(|| anyhow!("worktree not found"))
4963        })?;
4964        let worktree_scan_id = worktree.read_with(&cx, |worktree, _| worktree.scan_id());
4965        worktree
4966            .update(&mut cx, |worktree, cx| {
4967                worktree
4968                    .as_local_mut()
4969                    .unwrap()
4970                    .delete_entry(entry_id, cx)
4971                    .ok_or_else(|| anyhow!("invalid entry"))
4972            })?
4973            .await?;
4974        Ok(proto::ProjectEntryResponse {
4975            entry: None,
4976            worktree_scan_id: worktree_scan_id as u64,
4977        })
4978    }
4979
4980    async fn handle_update_diagnostic_summary(
4981        this: ModelHandle<Self>,
4982        envelope: TypedEnvelope<proto::UpdateDiagnosticSummary>,
4983        _: Arc<Client>,
4984        mut cx: AsyncAppContext,
4985    ) -> Result<()> {
4986        this.update(&mut cx, |this, cx| {
4987            let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
4988            if let Some(worktree) = this.worktree_for_id(worktree_id, cx) {
4989                if let Some(summary) = envelope.payload.summary {
4990                    let project_path = ProjectPath {
4991                        worktree_id,
4992                        path: Path::new(&summary.path).into(),
4993                    };
4994                    worktree.update(cx, |worktree, _| {
4995                        worktree
4996                            .as_remote_mut()
4997                            .unwrap()
4998                            .update_diagnostic_summary(project_path.path.clone(), &summary);
4999                    });
5000                    cx.emit(Event::DiagnosticsUpdated {
5001                        language_server_id: summary.language_server_id as usize,
5002                        path: project_path,
5003                    });
5004                }
5005            }
5006            Ok(())
5007        })
5008    }
5009
5010    async fn handle_start_language_server(
5011        this: ModelHandle<Self>,
5012        envelope: TypedEnvelope<proto::StartLanguageServer>,
5013        _: Arc<Client>,
5014        mut cx: AsyncAppContext,
5015    ) -> Result<()> {
5016        let server = envelope
5017            .payload
5018            .server
5019            .ok_or_else(|| anyhow!("invalid server"))?;
5020        this.update(&mut cx, |this, cx| {
5021            this.language_server_statuses.insert(
5022                server.id as usize,
5023                LanguageServerStatus {
5024                    name: server.name,
5025                    pending_work: Default::default(),
5026                    has_pending_diagnostic_updates: false,
5027                    progress_tokens: Default::default(),
5028                },
5029            );
5030            cx.notify();
5031        });
5032        Ok(())
5033    }
5034
5035    async fn handle_update_language_server(
5036        this: ModelHandle<Self>,
5037        envelope: TypedEnvelope<proto::UpdateLanguageServer>,
5038        _: Arc<Client>,
5039        mut cx: AsyncAppContext,
5040    ) -> Result<()> {
5041        this.update(&mut cx, |this, cx| {
5042            let language_server_id = envelope.payload.language_server_id as usize;
5043
5044            match envelope
5045                .payload
5046                .variant
5047                .ok_or_else(|| anyhow!("invalid variant"))?
5048            {
5049                proto::update_language_server::Variant::WorkStart(payload) => {
5050                    this.on_lsp_work_start(
5051                        language_server_id,
5052                        payload.token,
5053                        LanguageServerProgress {
5054                            message: payload.message,
5055                            percentage: payload.percentage.map(|p| p as usize),
5056                            last_update_at: Instant::now(),
5057                        },
5058                        cx,
5059                    );
5060                }
5061
5062                proto::update_language_server::Variant::WorkProgress(payload) => {
5063                    this.on_lsp_work_progress(
5064                        language_server_id,
5065                        payload.token,
5066                        LanguageServerProgress {
5067                            message: payload.message,
5068                            percentage: payload.percentage.map(|p| p as usize),
5069                            last_update_at: Instant::now(),
5070                        },
5071                        cx,
5072                    );
5073                }
5074
5075                proto::update_language_server::Variant::WorkEnd(payload) => {
5076                    this.on_lsp_work_end(language_server_id, payload.token, cx);
5077                }
5078
5079                proto::update_language_server::Variant::DiskBasedDiagnosticsUpdating(_) => {
5080                    this.disk_based_diagnostics_started(language_server_id, cx);
5081                }
5082
5083                proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(_) => {
5084                    this.disk_based_diagnostics_finished(language_server_id, cx)
5085                }
5086            }
5087
5088            Ok(())
5089        })
5090    }
5091
5092    async fn handle_update_buffer(
5093        this: ModelHandle<Self>,
5094        envelope: TypedEnvelope<proto::UpdateBuffer>,
5095        _: Arc<Client>,
5096        mut cx: AsyncAppContext,
5097    ) -> Result<proto::Ack> {
5098        this.update(&mut cx, |this, cx| {
5099            let payload = envelope.payload.clone();
5100            let buffer_id = payload.buffer_id;
5101            let ops = payload
5102                .operations
5103                .into_iter()
5104                .map(language::proto::deserialize_operation)
5105                .collect::<Result<Vec<_>, _>>()?;
5106            let is_remote = this.is_remote();
5107            match this.opened_buffers.entry(buffer_id) {
5108                hash_map::Entry::Occupied(mut e) => match e.get_mut() {
5109                    OpenBuffer::Strong(buffer) => {
5110                        buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
5111                    }
5112                    OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
5113                    OpenBuffer::Weak(_) => {}
5114                },
5115                hash_map::Entry::Vacant(e) => {
5116                    assert!(
5117                        is_remote,
5118                        "received buffer update from {:?}",
5119                        envelope.original_sender_id
5120                    );
5121                    e.insert(OpenBuffer::Operations(ops));
5122                }
5123            }
5124            Ok(proto::Ack {})
5125        })
5126    }
5127
5128    async fn handle_create_buffer_for_peer(
5129        this: ModelHandle<Self>,
5130        envelope: TypedEnvelope<proto::CreateBufferForPeer>,
5131        _: Arc<Client>,
5132        mut cx: AsyncAppContext,
5133    ) -> Result<()> {
5134        this.update(&mut cx, |this, cx| {
5135            match envelope
5136                .payload
5137                .variant
5138                .ok_or_else(|| anyhow!("missing variant"))?
5139            {
5140                proto::create_buffer_for_peer::Variant::State(mut state) => {
5141                    let mut buffer_file = None;
5142                    if let Some(file) = state.file.take() {
5143                        let worktree_id = WorktreeId::from_proto(file.worktree_id);
5144                        let worktree = this.worktree_for_id(worktree_id, cx).ok_or_else(|| {
5145                            anyhow!("no worktree found for id {}", file.worktree_id)
5146                        })?;
5147                        buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
5148                            as Arc<dyn language::File>);
5149                    }
5150
5151                    let buffer_id = state.id;
5152                    let buffer = cx.add_model(|_| {
5153                        Buffer::from_proto(this.replica_id(), state, buffer_file).unwrap()
5154                    });
5155                    this.incomplete_remote_buffers
5156                        .insert(buffer_id, Some(buffer));
5157                }
5158                proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
5159                    let buffer = this
5160                        .incomplete_remote_buffers
5161                        .get(&chunk.buffer_id)
5162                        .cloned()
5163                        .flatten()
5164                        .ok_or_else(|| {
5165                            anyhow!(
5166                                "received chunk for buffer {} without initial state",
5167                                chunk.buffer_id
5168                            )
5169                        })?;
5170                    let operations = chunk
5171                        .operations
5172                        .into_iter()
5173                        .map(language::proto::deserialize_operation)
5174                        .collect::<Result<Vec<_>>>()?;
5175                    buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))?;
5176
5177                    if chunk.is_last {
5178                        this.incomplete_remote_buffers.remove(&chunk.buffer_id);
5179                        this.register_buffer(&buffer, cx)?;
5180                    }
5181                }
5182            }
5183
5184            Ok(())
5185        })
5186    }
5187
5188    async fn handle_update_diff_base(
5189        this: ModelHandle<Self>,
5190        envelope: TypedEnvelope<proto::UpdateDiffBase>,
5191        _: Arc<Client>,
5192        mut cx: AsyncAppContext,
5193    ) -> Result<()> {
5194        this.update(&mut cx, |this, cx| {
5195            let buffer_id = envelope.payload.buffer_id;
5196            let diff_base = envelope.payload.diff_base;
5197            if let Some(buffer) = this
5198                .opened_buffers
5199                .get_mut(&buffer_id)
5200                .and_then(|b| b.upgrade(cx))
5201                .or_else(|| {
5202                    this.incomplete_remote_buffers
5203                        .get(&buffer_id)
5204                        .cloned()
5205                        .flatten()
5206                })
5207            {
5208                buffer.update(cx, |buffer, cx| buffer.set_diff_base(diff_base, cx));
5209            }
5210            Ok(())
5211        })
5212    }
5213
5214    async fn handle_update_buffer_file(
5215        this: ModelHandle<Self>,
5216        envelope: TypedEnvelope<proto::UpdateBufferFile>,
5217        _: Arc<Client>,
5218        mut cx: AsyncAppContext,
5219    ) -> Result<()> {
5220        let buffer_id = envelope.payload.buffer_id;
5221
5222        this.update(&mut cx, |this, cx| {
5223            let payload = envelope.payload.clone();
5224            if let Some(buffer) = this
5225                .opened_buffers
5226                .get(&buffer_id)
5227                .and_then(|b| b.upgrade(cx))
5228                .or_else(|| {
5229                    this.incomplete_remote_buffers
5230                        .get(&buffer_id)
5231                        .cloned()
5232                        .flatten()
5233                })
5234            {
5235                let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
5236                let worktree = this
5237                    .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
5238                    .ok_or_else(|| anyhow!("no such worktree"))?;
5239                let file = File::from_proto(file, worktree, cx)?;
5240                buffer.update(cx, |buffer, cx| {
5241                    buffer.file_updated(Arc::new(file), cx).detach();
5242                });
5243                this.detect_language_for_buffer(&buffer, cx);
5244            }
5245            Ok(())
5246        })
5247    }
5248
5249    async fn handle_save_buffer(
5250        this: ModelHandle<Self>,
5251        envelope: TypedEnvelope<proto::SaveBuffer>,
5252        _: Arc<Client>,
5253        mut cx: AsyncAppContext,
5254    ) -> Result<proto::BufferSaved> {
5255        let buffer_id = envelope.payload.buffer_id;
5256        let (project_id, buffer) = this.update(&mut cx, |this, cx| {
5257            let project_id = this.remote_id().ok_or_else(|| anyhow!("not connected"))?;
5258            let buffer = this
5259                .opened_buffers
5260                .get(&buffer_id)
5261                .and_then(|buffer| buffer.upgrade(cx))
5262                .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?;
5263            anyhow::Ok((project_id, buffer))
5264        })?;
5265        buffer
5266            .update(&mut cx, |buffer, _| {
5267                buffer.wait_for_version(deserialize_version(&envelope.payload.version))
5268            })
5269            .await?;
5270        let buffer_id = buffer.read_with(&cx, |buffer, _| buffer.remote_id());
5271
5272        let (saved_version, fingerprint, mtime) = this
5273            .update(&mut cx, |this, cx| this.save_buffer(buffer, cx))
5274            .await?;
5275        Ok(proto::BufferSaved {
5276            project_id,
5277            buffer_id,
5278            version: serialize_version(&saved_version),
5279            mtime: Some(mtime.into()),
5280            fingerprint: language::proto::serialize_fingerprint(fingerprint),
5281        })
5282    }
5283
5284    async fn handle_reload_buffers(
5285        this: ModelHandle<Self>,
5286        envelope: TypedEnvelope<proto::ReloadBuffers>,
5287        _: Arc<Client>,
5288        mut cx: AsyncAppContext,
5289    ) -> Result<proto::ReloadBuffersResponse> {
5290        let sender_id = envelope.original_sender_id()?;
5291        let reload = this.update(&mut cx, |this, cx| {
5292            let mut buffers = HashSet::default();
5293            for buffer_id in &envelope.payload.buffer_ids {
5294                buffers.insert(
5295                    this.opened_buffers
5296                        .get(buffer_id)
5297                        .and_then(|buffer| buffer.upgrade(cx))
5298                        .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?,
5299                );
5300            }
5301            Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
5302        })?;
5303
5304        let project_transaction = reload.await?;
5305        let project_transaction = this.update(&mut cx, |this, cx| {
5306            this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
5307        });
5308        Ok(proto::ReloadBuffersResponse {
5309            transaction: Some(project_transaction),
5310        })
5311    }
5312
5313    async fn handle_synchronize_buffers(
5314        this: ModelHandle<Self>,
5315        envelope: TypedEnvelope<proto::SynchronizeBuffers>,
5316        _: Arc<Client>,
5317        mut cx: AsyncAppContext,
5318    ) -> Result<proto::SynchronizeBuffersResponse> {
5319        let project_id = envelope.payload.project_id;
5320        let mut response = proto::SynchronizeBuffersResponse {
5321            buffers: Default::default(),
5322        };
5323
5324        this.update(&mut cx, |this, cx| {
5325            let Some(guest_id) = envelope.original_sender_id else {
5326                log::error!("missing original_sender_id on SynchronizeBuffers request");
5327                return;
5328            };
5329
5330            this.shared_buffers.entry(guest_id).or_default().clear();
5331            for buffer in envelope.payload.buffers {
5332                let buffer_id = buffer.id;
5333                let remote_version = language::proto::deserialize_version(&buffer.version);
5334                if let Some(buffer) = this.buffer_for_id(buffer_id, cx) {
5335                    this.shared_buffers
5336                        .entry(guest_id)
5337                        .or_default()
5338                        .insert(buffer_id);
5339
5340                    let buffer = buffer.read(cx);
5341                    response.buffers.push(proto::BufferVersion {
5342                        id: buffer_id,
5343                        version: language::proto::serialize_version(&buffer.version),
5344                    });
5345
5346                    let operations = buffer.serialize_ops(Some(remote_version), cx);
5347                    let client = this.client.clone();
5348                    if let Some(file) = buffer.file() {
5349                        client
5350                            .send(proto::UpdateBufferFile {
5351                                project_id,
5352                                buffer_id: buffer_id as u64,
5353                                file: Some(file.to_proto()),
5354                            })
5355                            .log_err();
5356                    }
5357
5358                    client
5359                        .send(proto::UpdateDiffBase {
5360                            project_id,
5361                            buffer_id: buffer_id as u64,
5362                            diff_base: buffer.diff_base().map(Into::into),
5363                        })
5364                        .log_err();
5365
5366                    client
5367                        .send(proto::BufferReloaded {
5368                            project_id,
5369                            buffer_id,
5370                            version: language::proto::serialize_version(buffer.saved_version()),
5371                            mtime: Some(buffer.saved_mtime().into()),
5372                            fingerprint: language::proto::serialize_fingerprint(
5373                                buffer.saved_version_fingerprint(),
5374                            ),
5375                            line_ending: language::proto::serialize_line_ending(
5376                                buffer.line_ending(),
5377                            ) as i32,
5378                        })
5379                        .log_err();
5380
5381                    cx.background()
5382                        .spawn(
5383                            async move {
5384                                let operations = operations.await;
5385                                for chunk in split_operations(operations) {
5386                                    client
5387                                        .request(proto::UpdateBuffer {
5388                                            project_id,
5389                                            buffer_id,
5390                                            operations: chunk,
5391                                        })
5392                                        .await?;
5393                                }
5394                                anyhow::Ok(())
5395                            }
5396                            .log_err(),
5397                        )
5398                        .detach();
5399                }
5400            }
5401        });
5402
5403        Ok(response)
5404    }
5405
5406    async fn handle_format_buffers(
5407        this: ModelHandle<Self>,
5408        envelope: TypedEnvelope<proto::FormatBuffers>,
5409        _: Arc<Client>,
5410        mut cx: AsyncAppContext,
5411    ) -> Result<proto::FormatBuffersResponse> {
5412        let sender_id = envelope.original_sender_id()?;
5413        let format = this.update(&mut cx, |this, cx| {
5414            let mut buffers = HashSet::default();
5415            for buffer_id in &envelope.payload.buffer_ids {
5416                buffers.insert(
5417                    this.opened_buffers
5418                        .get(buffer_id)
5419                        .and_then(|buffer| buffer.upgrade(cx))
5420                        .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?,
5421                );
5422            }
5423            let trigger = FormatTrigger::from_proto(envelope.payload.trigger);
5424            Ok::<_, anyhow::Error>(this.format(buffers, false, trigger, cx))
5425        })?;
5426
5427        let project_transaction = format.await?;
5428        let project_transaction = this.update(&mut cx, |this, cx| {
5429            this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
5430        });
5431        Ok(proto::FormatBuffersResponse {
5432            transaction: Some(project_transaction),
5433        })
5434    }
5435
5436    async fn handle_apply_additional_edits_for_completion(
5437        this: ModelHandle<Self>,
5438        envelope: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
5439        _: Arc<Client>,
5440        mut cx: AsyncAppContext,
5441    ) -> Result<proto::ApplyCompletionAdditionalEditsResponse> {
5442        let (buffer, completion) = this.update(&mut cx, |this, cx| {
5443            let buffer = this
5444                .opened_buffers
5445                .get(&envelope.payload.buffer_id)
5446                .and_then(|buffer| buffer.upgrade(cx))
5447                .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?;
5448            let language = buffer.read(cx).language();
5449            let completion = language::proto::deserialize_completion(
5450                envelope
5451                    .payload
5452                    .completion
5453                    .ok_or_else(|| anyhow!("invalid completion"))?,
5454                language.cloned(),
5455            );
5456            Ok::<_, anyhow::Error>((buffer, completion))
5457        })?;
5458
5459        let completion = completion.await?;
5460
5461        let apply_additional_edits = this.update(&mut cx, |this, cx| {
5462            this.apply_additional_edits_for_completion(buffer, completion, false, cx)
5463        });
5464
5465        Ok(proto::ApplyCompletionAdditionalEditsResponse {
5466            transaction: apply_additional_edits
5467                .await?
5468                .as_ref()
5469                .map(language::proto::serialize_transaction),
5470        })
5471    }
5472
5473    async fn handle_apply_code_action(
5474        this: ModelHandle<Self>,
5475        envelope: TypedEnvelope<proto::ApplyCodeAction>,
5476        _: Arc<Client>,
5477        mut cx: AsyncAppContext,
5478    ) -> Result<proto::ApplyCodeActionResponse> {
5479        let sender_id = envelope.original_sender_id()?;
5480        let action = language::proto::deserialize_code_action(
5481            envelope
5482                .payload
5483                .action
5484                .ok_or_else(|| anyhow!("invalid action"))?,
5485        )?;
5486        let apply_code_action = this.update(&mut cx, |this, cx| {
5487            let buffer = this
5488                .opened_buffers
5489                .get(&envelope.payload.buffer_id)
5490                .and_then(|buffer| buffer.upgrade(cx))
5491                .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?;
5492            Ok::<_, anyhow::Error>(this.apply_code_action(buffer, action, false, cx))
5493        })?;
5494
5495        let project_transaction = apply_code_action.await?;
5496        let project_transaction = this.update(&mut cx, |this, cx| {
5497            this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
5498        });
5499        Ok(proto::ApplyCodeActionResponse {
5500            transaction: Some(project_transaction),
5501        })
5502    }
5503
5504    async fn handle_lsp_command<T: LspCommand>(
5505        this: ModelHandle<Self>,
5506        envelope: TypedEnvelope<T::ProtoRequest>,
5507        _: Arc<Client>,
5508        mut cx: AsyncAppContext,
5509    ) -> Result<<T::ProtoRequest as proto::RequestMessage>::Response>
5510    where
5511        <T::LspRequest as lsp::request::Request>::Result: Send,
5512    {
5513        let sender_id = envelope.original_sender_id()?;
5514        let buffer_id = T::buffer_id_from_proto(&envelope.payload);
5515        let buffer_handle = this.read_with(&cx, |this, _| {
5516            this.opened_buffers
5517                .get(&buffer_id)
5518                .and_then(|buffer| buffer.upgrade(&cx))
5519                .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
5520        })?;
5521        let request = T::from_proto(
5522            envelope.payload,
5523            this.clone(),
5524            buffer_handle.clone(),
5525            cx.clone(),
5526        )
5527        .await?;
5528        let buffer_version = buffer_handle.read_with(&cx, |buffer, _| buffer.version());
5529        let response = this
5530            .update(&mut cx, |this, cx| {
5531                this.request_lsp(buffer_handle, request, cx)
5532            })
5533            .await?;
5534        this.update(&mut cx, |this, cx| {
5535            Ok(T::response_to_proto(
5536                response,
5537                this,
5538                sender_id,
5539                &buffer_version,
5540                cx,
5541            ))
5542        })
5543    }
5544
5545    async fn handle_get_project_symbols(
5546        this: ModelHandle<Self>,
5547        envelope: TypedEnvelope<proto::GetProjectSymbols>,
5548        _: Arc<Client>,
5549        mut cx: AsyncAppContext,
5550    ) -> Result<proto::GetProjectSymbolsResponse> {
5551        let symbols = this
5552            .update(&mut cx, |this, cx| {
5553                this.symbols(&envelope.payload.query, cx)
5554            })
5555            .await?;
5556
5557        Ok(proto::GetProjectSymbolsResponse {
5558            symbols: symbols.iter().map(serialize_symbol).collect(),
5559        })
5560    }
5561
5562    async fn handle_search_project(
5563        this: ModelHandle<Self>,
5564        envelope: TypedEnvelope<proto::SearchProject>,
5565        _: Arc<Client>,
5566        mut cx: AsyncAppContext,
5567    ) -> Result<proto::SearchProjectResponse> {
5568        let peer_id = envelope.original_sender_id()?;
5569        let query = SearchQuery::from_proto(envelope.payload)?;
5570        let result = this
5571            .update(&mut cx, |this, cx| this.search(query, cx))
5572            .await?;
5573
5574        this.update(&mut cx, |this, cx| {
5575            let mut locations = Vec::new();
5576            for (buffer, ranges) in result {
5577                for range in ranges {
5578                    let start = serialize_anchor(&range.start);
5579                    let end = serialize_anchor(&range.end);
5580                    let buffer_id = this.create_buffer_for_peer(&buffer, peer_id, cx);
5581                    locations.push(proto::Location {
5582                        buffer_id,
5583                        start: Some(start),
5584                        end: Some(end),
5585                    });
5586                }
5587            }
5588            Ok(proto::SearchProjectResponse { locations })
5589        })
5590    }
5591
5592    async fn handle_open_buffer_for_symbol(
5593        this: ModelHandle<Self>,
5594        envelope: TypedEnvelope<proto::OpenBufferForSymbol>,
5595        _: Arc<Client>,
5596        mut cx: AsyncAppContext,
5597    ) -> Result<proto::OpenBufferForSymbolResponse> {
5598        let peer_id = envelope.original_sender_id()?;
5599        let symbol = envelope
5600            .payload
5601            .symbol
5602            .ok_or_else(|| anyhow!("invalid symbol"))?;
5603        let symbol = this
5604            .read_with(&cx, |this, _| this.deserialize_symbol(symbol))
5605            .await?;
5606        let symbol = this.read_with(&cx, |this, _| {
5607            let signature = this.symbol_signature(&symbol.path);
5608            if signature == symbol.signature {
5609                Ok(symbol)
5610            } else {
5611                Err(anyhow!("invalid symbol signature"))
5612            }
5613        })?;
5614        let buffer = this
5615            .update(&mut cx, |this, cx| this.open_buffer_for_symbol(&symbol, cx))
5616            .await?;
5617
5618        Ok(proto::OpenBufferForSymbolResponse {
5619            buffer_id: this.update(&mut cx, |this, cx| {
5620                this.create_buffer_for_peer(&buffer, peer_id, cx)
5621            }),
5622        })
5623    }
5624
5625    fn symbol_signature(&self, project_path: &ProjectPath) -> [u8; 32] {
5626        let mut hasher = Sha256::new();
5627        hasher.update(project_path.worktree_id.to_proto().to_be_bytes());
5628        hasher.update(project_path.path.to_string_lossy().as_bytes());
5629        hasher.update(self.nonce.to_be_bytes());
5630        hasher.finalize().as_slice().try_into().unwrap()
5631    }
5632
5633    async fn handle_open_buffer_by_id(
5634        this: ModelHandle<Self>,
5635        envelope: TypedEnvelope<proto::OpenBufferById>,
5636        _: Arc<Client>,
5637        mut cx: AsyncAppContext,
5638    ) -> Result<proto::OpenBufferResponse> {
5639        let peer_id = envelope.original_sender_id()?;
5640        let buffer = this
5641            .update(&mut cx, |this, cx| {
5642                this.open_buffer_by_id(envelope.payload.id, cx)
5643            })
5644            .await?;
5645        this.update(&mut cx, |this, cx| {
5646            Ok(proto::OpenBufferResponse {
5647                buffer_id: this.create_buffer_for_peer(&buffer, peer_id, cx),
5648            })
5649        })
5650    }
5651
5652    async fn handle_open_buffer_by_path(
5653        this: ModelHandle<Self>,
5654        envelope: TypedEnvelope<proto::OpenBufferByPath>,
5655        _: Arc<Client>,
5656        mut cx: AsyncAppContext,
5657    ) -> Result<proto::OpenBufferResponse> {
5658        let peer_id = envelope.original_sender_id()?;
5659        let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
5660        let open_buffer = this.update(&mut cx, |this, cx| {
5661            this.open_buffer(
5662                ProjectPath {
5663                    worktree_id,
5664                    path: PathBuf::from(envelope.payload.path).into(),
5665                },
5666                cx,
5667            )
5668        });
5669
5670        let buffer = open_buffer.await?;
5671        this.update(&mut cx, |this, cx| {
5672            Ok(proto::OpenBufferResponse {
5673                buffer_id: this.create_buffer_for_peer(&buffer, peer_id, cx),
5674            })
5675        })
5676    }
5677
5678    fn serialize_project_transaction_for_peer(
5679        &mut self,
5680        project_transaction: ProjectTransaction,
5681        peer_id: proto::PeerId,
5682        cx: &mut AppContext,
5683    ) -> proto::ProjectTransaction {
5684        let mut serialized_transaction = proto::ProjectTransaction {
5685            buffer_ids: Default::default(),
5686            transactions: Default::default(),
5687        };
5688        for (buffer, transaction) in project_transaction.0 {
5689            serialized_transaction
5690                .buffer_ids
5691                .push(self.create_buffer_for_peer(&buffer, peer_id, cx));
5692            serialized_transaction
5693                .transactions
5694                .push(language::proto::serialize_transaction(&transaction));
5695        }
5696        serialized_transaction
5697    }
5698
5699    fn deserialize_project_transaction(
5700        &mut self,
5701        message: proto::ProjectTransaction,
5702        push_to_history: bool,
5703        cx: &mut ModelContext<Self>,
5704    ) -> Task<Result<ProjectTransaction>> {
5705        cx.spawn(|this, mut cx| async move {
5706            let mut project_transaction = ProjectTransaction::default();
5707            for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
5708            {
5709                let buffer = this
5710                    .update(&mut cx, |this, cx| {
5711                        this.wait_for_remote_buffer(buffer_id, cx)
5712                    })
5713                    .await?;
5714                let transaction = language::proto::deserialize_transaction(transaction)?;
5715                project_transaction.0.insert(buffer, transaction);
5716            }
5717
5718            for (buffer, transaction) in &project_transaction.0 {
5719                buffer
5720                    .update(&mut cx, |buffer, _| {
5721                        buffer.wait_for_edits(transaction.edit_ids.iter().copied())
5722                    })
5723                    .await?;
5724
5725                if push_to_history {
5726                    buffer.update(&mut cx, |buffer, _| {
5727                        buffer.push_transaction(transaction.clone(), Instant::now());
5728                    });
5729                }
5730            }
5731
5732            Ok(project_transaction)
5733        })
5734    }
5735
5736    fn create_buffer_for_peer(
5737        &mut self,
5738        buffer: &ModelHandle<Buffer>,
5739        peer_id: proto::PeerId,
5740        cx: &mut AppContext,
5741    ) -> u64 {
5742        let buffer_id = buffer.read(cx).remote_id();
5743        if let Some(ProjectClientState::Local { updates_tx, .. }) = &self.client_state {
5744            updates_tx
5745                .unbounded_send(LocalProjectUpdate::CreateBufferForPeer { peer_id, buffer_id })
5746                .ok();
5747        }
5748        buffer_id
5749    }
5750
5751    fn wait_for_remote_buffer(
5752        &mut self,
5753        id: u64,
5754        cx: &mut ModelContext<Self>,
5755    ) -> Task<Result<ModelHandle<Buffer>>> {
5756        let mut opened_buffer_rx = self.opened_buffer.1.clone();
5757
5758        cx.spawn_weak(|this, mut cx| async move {
5759            let buffer = loop {
5760                let Some(this) = this.upgrade(&cx) else {
5761                    return Err(anyhow!("project dropped"));
5762                };
5763                let buffer = this.read_with(&cx, |this, cx| {
5764                    this.opened_buffers
5765                        .get(&id)
5766                        .and_then(|buffer| buffer.upgrade(cx))
5767                });
5768                if let Some(buffer) = buffer {
5769                    break buffer;
5770                } else if this.read_with(&cx, |this, _| this.is_read_only()) {
5771                    return Err(anyhow!("disconnected before buffer {} could be opened", id));
5772                }
5773
5774                this.update(&mut cx, |this, _| {
5775                    this.incomplete_remote_buffers.entry(id).or_default();
5776                });
5777                drop(this);
5778                opened_buffer_rx
5779                    .next()
5780                    .await
5781                    .ok_or_else(|| anyhow!("project dropped while waiting for buffer"))?;
5782            };
5783            buffer.update(&mut cx, |buffer, cx| buffer.git_diff_recalc(cx));
5784            Ok(buffer)
5785        })
5786    }
5787
5788    fn synchronize_remote_buffers(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
5789        let project_id = match self.client_state.as_ref() {
5790            Some(ProjectClientState::Remote {
5791                sharing_has_stopped,
5792                remote_id,
5793                ..
5794            }) => {
5795                if *sharing_has_stopped {
5796                    return Task::ready(Err(anyhow!(
5797                        "can't synchronize remote buffers on a readonly project"
5798                    )));
5799                } else {
5800                    *remote_id
5801                }
5802            }
5803            Some(ProjectClientState::Local { .. }) | None => {
5804                return Task::ready(Err(anyhow!(
5805                    "can't synchronize remote buffers on a local project"
5806                )))
5807            }
5808        };
5809
5810        let client = self.client.clone();
5811        cx.spawn(|this, cx| async move {
5812            let (buffers, incomplete_buffer_ids) = this.read_with(&cx, |this, cx| {
5813                let buffers = this
5814                    .opened_buffers
5815                    .iter()
5816                    .filter_map(|(id, buffer)| {
5817                        let buffer = buffer.upgrade(cx)?;
5818                        Some(proto::BufferVersion {
5819                            id: *id,
5820                            version: language::proto::serialize_version(&buffer.read(cx).version),
5821                        })
5822                    })
5823                    .collect();
5824                let incomplete_buffer_ids = this
5825                    .incomplete_remote_buffers
5826                    .keys()
5827                    .copied()
5828                    .collect::<Vec<_>>();
5829
5830                (buffers, incomplete_buffer_ids)
5831            });
5832            let response = client
5833                .request(proto::SynchronizeBuffers {
5834                    project_id,
5835                    buffers,
5836                })
5837                .await?;
5838
5839            let send_updates_for_buffers = response.buffers.into_iter().map(|buffer| {
5840                let client = client.clone();
5841                let buffer_id = buffer.id;
5842                let remote_version = language::proto::deserialize_version(&buffer.version);
5843                this.read_with(&cx, |this, cx| {
5844                    if let Some(buffer) = this.buffer_for_id(buffer_id, cx) {
5845                        let operations = buffer.read(cx).serialize_ops(Some(remote_version), cx);
5846                        cx.background().spawn(async move {
5847                            let operations = operations.await;
5848                            for chunk in split_operations(operations) {
5849                                client
5850                                    .request(proto::UpdateBuffer {
5851                                        project_id,
5852                                        buffer_id,
5853                                        operations: chunk,
5854                                    })
5855                                    .await?;
5856                            }
5857                            anyhow::Ok(())
5858                        })
5859                    } else {
5860                        Task::ready(Ok(()))
5861                    }
5862                })
5863            });
5864
5865            // Any incomplete buffers have open requests waiting. Request that the host sends
5866            // creates these buffers for us again to unblock any waiting futures.
5867            for id in incomplete_buffer_ids {
5868                cx.background()
5869                    .spawn(client.request(proto::OpenBufferById { project_id, id }))
5870                    .detach();
5871            }
5872
5873            futures::future::join_all(send_updates_for_buffers)
5874                .await
5875                .into_iter()
5876                .collect()
5877        })
5878    }
5879
5880    pub fn worktree_metadata_protos(&self, cx: &AppContext) -> Vec<proto::WorktreeMetadata> {
5881        self.worktrees(cx)
5882            .map(|worktree| {
5883                let worktree = worktree.read(cx);
5884                proto::WorktreeMetadata {
5885                    id: worktree.id().to_proto(),
5886                    root_name: worktree.root_name().into(),
5887                    visible: worktree.is_visible(),
5888                    abs_path: worktree.abs_path().to_string_lossy().into(),
5889                }
5890            })
5891            .collect()
5892    }
5893
5894    fn set_worktrees_from_proto(
5895        &mut self,
5896        worktrees: Vec<proto::WorktreeMetadata>,
5897        cx: &mut ModelContext<Project>,
5898    ) -> Result<()> {
5899        let replica_id = self.replica_id();
5900        let remote_id = self.remote_id().ok_or_else(|| anyhow!("invalid project"))?;
5901
5902        let mut old_worktrees_by_id = self
5903            .worktrees
5904            .drain(..)
5905            .filter_map(|worktree| {
5906                let worktree = worktree.upgrade(cx)?;
5907                Some((worktree.read(cx).id(), worktree))
5908            })
5909            .collect::<HashMap<_, _>>();
5910
5911        for worktree in worktrees {
5912            if let Some(old_worktree) =
5913                old_worktrees_by_id.remove(&WorktreeId::from_proto(worktree.id))
5914            {
5915                self.worktrees.push(WorktreeHandle::Strong(old_worktree));
5916            } else {
5917                let worktree =
5918                    Worktree::remote(remote_id, replica_id, worktree, self.client.clone(), cx);
5919                let _ = self.add_worktree(&worktree, cx);
5920            }
5921        }
5922
5923        self.metadata_changed(cx);
5924        for (id, _) in old_worktrees_by_id {
5925            cx.emit(Event::WorktreeRemoved(id));
5926        }
5927
5928        Ok(())
5929    }
5930
5931    fn set_collaborators_from_proto(
5932        &mut self,
5933        messages: Vec<proto::Collaborator>,
5934        cx: &mut ModelContext<Self>,
5935    ) -> Result<()> {
5936        let mut collaborators = HashMap::default();
5937        for message in messages {
5938            let collaborator = Collaborator::from_proto(message)?;
5939            collaborators.insert(collaborator.peer_id, collaborator);
5940        }
5941        for old_peer_id in self.collaborators.keys() {
5942            if !collaborators.contains_key(old_peer_id) {
5943                cx.emit(Event::CollaboratorLeft(*old_peer_id));
5944            }
5945        }
5946        self.collaborators = collaborators;
5947        Ok(())
5948    }
5949
5950    fn deserialize_symbol(
5951        &self,
5952        serialized_symbol: proto::Symbol,
5953    ) -> impl Future<Output = Result<Symbol>> {
5954        let languages = self.languages.clone();
5955        async move {
5956            let source_worktree_id = WorktreeId::from_proto(serialized_symbol.source_worktree_id);
5957            let worktree_id = WorktreeId::from_proto(serialized_symbol.worktree_id);
5958            let start = serialized_symbol
5959                .start
5960                .ok_or_else(|| anyhow!("invalid start"))?;
5961            let end = serialized_symbol
5962                .end
5963                .ok_or_else(|| anyhow!("invalid end"))?;
5964            let kind = unsafe { mem::transmute(serialized_symbol.kind) };
5965            let path = ProjectPath {
5966                worktree_id,
5967                path: PathBuf::from(serialized_symbol.path).into(),
5968            };
5969            let language = languages.language_for_path(&path.path).await.log_err();
5970            Ok(Symbol {
5971                language_server_name: LanguageServerName(
5972                    serialized_symbol.language_server_name.into(),
5973                ),
5974                source_worktree_id,
5975                path,
5976                label: {
5977                    match language {
5978                        Some(language) => {
5979                            language
5980                                .label_for_symbol(&serialized_symbol.name, kind)
5981                                .await
5982                        }
5983                        None => None,
5984                    }
5985                    .unwrap_or_else(|| CodeLabel::plain(serialized_symbol.name.clone(), None))
5986                },
5987
5988                name: serialized_symbol.name,
5989                range: Unclipped(PointUtf16::new(start.row, start.column))
5990                    ..Unclipped(PointUtf16::new(end.row, end.column)),
5991                kind,
5992                signature: serialized_symbol
5993                    .signature
5994                    .try_into()
5995                    .map_err(|_| anyhow!("invalid signature"))?,
5996            })
5997        }
5998    }
5999
6000    async fn handle_buffer_saved(
6001        this: ModelHandle<Self>,
6002        envelope: TypedEnvelope<proto::BufferSaved>,
6003        _: Arc<Client>,
6004        mut cx: AsyncAppContext,
6005    ) -> Result<()> {
6006        let fingerprint = deserialize_fingerprint(&envelope.payload.fingerprint)?;
6007        let version = deserialize_version(&envelope.payload.version);
6008        let mtime = envelope
6009            .payload
6010            .mtime
6011            .ok_or_else(|| anyhow!("missing mtime"))?
6012            .into();
6013
6014        this.update(&mut cx, |this, cx| {
6015            let buffer = this
6016                .opened_buffers
6017                .get(&envelope.payload.buffer_id)
6018                .and_then(|buffer| buffer.upgrade(cx))
6019                .or_else(|| {
6020                    this.incomplete_remote_buffers
6021                        .get(&envelope.payload.buffer_id)
6022                        .and_then(|b| b.clone())
6023                });
6024            if let Some(buffer) = buffer {
6025                buffer.update(cx, |buffer, cx| {
6026                    buffer.did_save(version, fingerprint, mtime, cx);
6027                });
6028            }
6029            Ok(())
6030        })
6031    }
6032
6033    async fn handle_buffer_reloaded(
6034        this: ModelHandle<Self>,
6035        envelope: TypedEnvelope<proto::BufferReloaded>,
6036        _: Arc<Client>,
6037        mut cx: AsyncAppContext,
6038    ) -> Result<()> {
6039        let payload = envelope.payload;
6040        let version = deserialize_version(&payload.version);
6041        let fingerprint = deserialize_fingerprint(&payload.fingerprint)?;
6042        let line_ending = deserialize_line_ending(
6043            proto::LineEnding::from_i32(payload.line_ending)
6044                .ok_or_else(|| anyhow!("missing line ending"))?,
6045        );
6046        let mtime = payload
6047            .mtime
6048            .ok_or_else(|| anyhow!("missing mtime"))?
6049            .into();
6050        this.update(&mut cx, |this, cx| {
6051            let buffer = this
6052                .opened_buffers
6053                .get(&payload.buffer_id)
6054                .and_then(|buffer| buffer.upgrade(cx))
6055                .or_else(|| {
6056                    this.incomplete_remote_buffers
6057                        .get(&payload.buffer_id)
6058                        .cloned()
6059                        .flatten()
6060                });
6061            if let Some(buffer) = buffer {
6062                buffer.update(cx, |buffer, cx| {
6063                    buffer.did_reload(version, fingerprint, line_ending, mtime, cx);
6064                });
6065            }
6066            Ok(())
6067        })
6068    }
6069
6070    #[allow(clippy::type_complexity)]
6071    fn edits_from_lsp(
6072        &mut self,
6073        buffer: &ModelHandle<Buffer>,
6074        lsp_edits: impl 'static + Send + IntoIterator<Item = lsp::TextEdit>,
6075        version: Option<i32>,
6076        cx: &mut ModelContext<Self>,
6077    ) -> Task<Result<Vec<(Range<Anchor>, String)>>> {
6078        let snapshot = self.buffer_snapshot_for_lsp_version(buffer, version, cx);
6079        cx.background().spawn(async move {
6080            let snapshot = snapshot?;
6081            let mut lsp_edits = lsp_edits
6082                .into_iter()
6083                .map(|edit| (range_from_lsp(edit.range), edit.new_text))
6084                .collect::<Vec<_>>();
6085            lsp_edits.sort_by_key(|(range, _)| range.start);
6086
6087            let mut lsp_edits = lsp_edits.into_iter().peekable();
6088            let mut edits = Vec::new();
6089            while let Some((range, mut new_text)) = lsp_edits.next() {
6090                // Clip invalid ranges provided by the language server.
6091                let mut range = snapshot.clip_point_utf16(range.start, Bias::Left)
6092                    ..snapshot.clip_point_utf16(range.end, Bias::Left);
6093
6094                // Combine any LSP edits that are adjacent.
6095                //
6096                // Also, combine LSP edits that are separated from each other by only
6097                // a newline. This is important because for some code actions,
6098                // Rust-analyzer rewrites the entire buffer via a series of edits that
6099                // are separated by unchanged newline characters.
6100                //
6101                // In order for the diffing logic below to work properly, any edits that
6102                // cancel each other out must be combined into one.
6103                while let Some((next_range, next_text)) = lsp_edits.peek() {
6104                    if next_range.start.0 > range.end {
6105                        if next_range.start.0.row > range.end.row + 1
6106                            || next_range.start.0.column > 0
6107                            || snapshot.clip_point_utf16(
6108                                Unclipped(PointUtf16::new(range.end.row, u32::MAX)),
6109                                Bias::Left,
6110                            ) > range.end
6111                        {
6112                            break;
6113                        }
6114                        new_text.push('\n');
6115                    }
6116                    range.end = snapshot.clip_point_utf16(next_range.end, Bias::Left);
6117                    new_text.push_str(next_text);
6118                    lsp_edits.next();
6119                }
6120
6121                // For multiline edits, perform a diff of the old and new text so that
6122                // we can identify the changes more precisely, preserving the locations
6123                // of any anchors positioned in the unchanged regions.
6124                if range.end.row > range.start.row {
6125                    let mut offset = range.start.to_offset(&snapshot);
6126                    let old_text = snapshot.text_for_range(range).collect::<String>();
6127
6128                    let diff = TextDiff::from_lines(old_text.as_str(), &new_text);
6129                    let mut moved_since_edit = true;
6130                    for change in diff.iter_all_changes() {
6131                        let tag = change.tag();
6132                        let value = change.value();
6133                        match tag {
6134                            ChangeTag::Equal => {
6135                                offset += value.len();
6136                                moved_since_edit = true;
6137                            }
6138                            ChangeTag::Delete => {
6139                                let start = snapshot.anchor_after(offset);
6140                                let end = snapshot.anchor_before(offset + value.len());
6141                                if moved_since_edit {
6142                                    edits.push((start..end, String::new()));
6143                                } else {
6144                                    edits.last_mut().unwrap().0.end = end;
6145                                }
6146                                offset += value.len();
6147                                moved_since_edit = false;
6148                            }
6149                            ChangeTag::Insert => {
6150                                if moved_since_edit {
6151                                    let anchor = snapshot.anchor_after(offset);
6152                                    edits.push((anchor..anchor, value.to_string()));
6153                                } else {
6154                                    edits.last_mut().unwrap().1.push_str(value);
6155                                }
6156                                moved_since_edit = false;
6157                            }
6158                        }
6159                    }
6160                } else if range.end == range.start {
6161                    let anchor = snapshot.anchor_after(range.start);
6162                    edits.push((anchor..anchor, new_text));
6163                } else {
6164                    let edit_start = snapshot.anchor_after(range.start);
6165                    let edit_end = snapshot.anchor_before(range.end);
6166                    edits.push((edit_start..edit_end, new_text));
6167                }
6168            }
6169
6170            Ok(edits)
6171        })
6172    }
6173
6174    fn buffer_snapshot_for_lsp_version(
6175        &mut self,
6176        buffer: &ModelHandle<Buffer>,
6177        version: Option<i32>,
6178        cx: &AppContext,
6179    ) -> Result<TextBufferSnapshot> {
6180        const OLD_VERSIONS_TO_RETAIN: i32 = 10;
6181
6182        if let Some(version) = version {
6183            let buffer_id = buffer.read(cx).remote_id();
6184            let snapshots = self
6185                .buffer_snapshots
6186                .get_mut(&buffer_id)
6187                .ok_or_else(|| anyhow!("no snapshot found for buffer {}", buffer_id))?;
6188            let found_snapshot = snapshots
6189                .binary_search_by_key(&version, |e| e.0)
6190                .map(|ix| snapshots[ix].1.clone())
6191                .map_err(|_| {
6192                    anyhow!(
6193                        "snapshot not found for buffer {} at version {}",
6194                        buffer_id,
6195                        version
6196                    )
6197                })?;
6198            snapshots.retain(|(snapshot_version, _)| {
6199                snapshot_version + OLD_VERSIONS_TO_RETAIN >= version
6200            });
6201            Ok(found_snapshot)
6202        } else {
6203            Ok((buffer.read(cx)).text_snapshot())
6204        }
6205    }
6206
6207    fn language_server_for_buffer(
6208        &self,
6209        buffer: &Buffer,
6210        cx: &AppContext,
6211    ) -> Option<(&Arc<CachedLspAdapter>, &Arc<LanguageServer>)> {
6212        let server_id = self.language_server_id_for_buffer(buffer, cx)?;
6213        let server = self.language_servers.get(&server_id)?;
6214        if let LanguageServerState::Running {
6215            adapter, server, ..
6216        } = server
6217        {
6218            Some((adapter, server))
6219        } else {
6220            None
6221        }
6222    }
6223
6224    fn language_server_id_for_buffer(&self, buffer: &Buffer, cx: &AppContext) -> Option<usize> {
6225        if let Some((file, language)) = File::from_dyn(buffer.file()).zip(buffer.language()) {
6226            let name = language.lsp_adapter()?.name.clone();
6227            let worktree_id = file.worktree_id(cx);
6228            let key = (worktree_id, name);
6229            self.language_server_ids.get(&key).copied()
6230        } else {
6231            None
6232        }
6233    }
6234}
6235
6236impl WorktreeHandle {
6237    pub fn upgrade(&self, cx: &AppContext) -> Option<ModelHandle<Worktree>> {
6238        match self {
6239            WorktreeHandle::Strong(handle) => Some(handle.clone()),
6240            WorktreeHandle::Weak(handle) => handle.upgrade(cx),
6241        }
6242    }
6243}
6244
6245impl OpenBuffer {
6246    pub fn upgrade(&self, cx: &impl UpgradeModelHandle) -> Option<ModelHandle<Buffer>> {
6247        match self {
6248            OpenBuffer::Strong(handle) => Some(handle.clone()),
6249            OpenBuffer::Weak(handle) => handle.upgrade(cx),
6250            OpenBuffer::Operations(_) => None,
6251        }
6252    }
6253}
6254
6255pub struct PathMatchCandidateSet {
6256    pub snapshot: Snapshot,
6257    pub include_ignored: bool,
6258    pub include_root_name: bool,
6259}
6260
6261impl<'a> fuzzy::PathMatchCandidateSet<'a> for PathMatchCandidateSet {
6262    type Candidates = PathMatchCandidateSetIter<'a>;
6263
6264    fn id(&self) -> usize {
6265        self.snapshot.id().to_usize()
6266    }
6267
6268    fn len(&self) -> usize {
6269        if self.include_ignored {
6270            self.snapshot.file_count()
6271        } else {
6272            self.snapshot.visible_file_count()
6273        }
6274    }
6275
6276    fn prefix(&self) -> Arc<str> {
6277        if self.snapshot.root_entry().map_or(false, |e| e.is_file()) {
6278            self.snapshot.root_name().into()
6279        } else if self.include_root_name {
6280            format!("{}/", self.snapshot.root_name()).into()
6281        } else {
6282            "".into()
6283        }
6284    }
6285
6286    fn candidates(&'a self, start: usize) -> Self::Candidates {
6287        PathMatchCandidateSetIter {
6288            traversal: self.snapshot.files(self.include_ignored, start),
6289        }
6290    }
6291}
6292
6293pub struct PathMatchCandidateSetIter<'a> {
6294    traversal: Traversal<'a>,
6295}
6296
6297impl<'a> Iterator for PathMatchCandidateSetIter<'a> {
6298    type Item = fuzzy::PathMatchCandidate<'a>;
6299
6300    fn next(&mut self) -> Option<Self::Item> {
6301        self.traversal.next().map(|entry| {
6302            if let EntryKind::File(char_bag) = entry.kind {
6303                fuzzy::PathMatchCandidate {
6304                    path: &entry.path,
6305                    char_bag,
6306                }
6307            } else {
6308                unreachable!()
6309            }
6310        })
6311    }
6312}
6313
6314impl Entity for Project {
6315    type Event = Event;
6316
6317    fn release(&mut self, cx: &mut gpui::AppContext) {
6318        match &self.client_state {
6319            Some(ProjectClientState::Local { .. }) => {
6320                let _ = self.unshare_internal(cx);
6321            }
6322            Some(ProjectClientState::Remote { remote_id, .. }) => {
6323                let _ = self.client.send(proto::LeaveProject {
6324                    project_id: *remote_id,
6325                });
6326                self.disconnected_from_host_internal(cx);
6327            }
6328            _ => {}
6329        }
6330    }
6331
6332    fn app_will_quit(
6333        &mut self,
6334        _: &mut AppContext,
6335    ) -> Option<std::pin::Pin<Box<dyn 'static + Future<Output = ()>>>> {
6336        let shutdown_futures = self
6337            .language_servers
6338            .drain()
6339            .map(|(_, server_state)| async {
6340                match server_state {
6341                    LanguageServerState::Running { server, .. } => server.shutdown()?.await,
6342                    LanguageServerState::Starting(starting_server) => {
6343                        starting_server.await?.shutdown()?.await
6344                    }
6345                }
6346            })
6347            .collect::<Vec<_>>();
6348
6349        Some(
6350            async move {
6351                futures::future::join_all(shutdown_futures).await;
6352            }
6353            .boxed(),
6354        )
6355    }
6356}
6357
6358impl Collaborator {
6359    fn from_proto(message: proto::Collaborator) -> Result<Self> {
6360        Ok(Self {
6361            peer_id: message.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?,
6362            replica_id: message.replica_id as ReplicaId,
6363        })
6364    }
6365}
6366
6367impl<P: AsRef<Path>> From<(WorktreeId, P)> for ProjectPath {
6368    fn from((worktree_id, path): (WorktreeId, P)) -> Self {
6369        Self {
6370            worktree_id,
6371            path: path.as_ref().into(),
6372        }
6373    }
6374}
6375
6376fn split_operations(
6377    mut operations: Vec<proto::Operation>,
6378) -> impl Iterator<Item = Vec<proto::Operation>> {
6379    #[cfg(any(test, feature = "test-support"))]
6380    const CHUNK_SIZE: usize = 5;
6381
6382    #[cfg(not(any(test, feature = "test-support")))]
6383    const CHUNK_SIZE: usize = 100;
6384
6385    let mut done = false;
6386    std::iter::from_fn(move || {
6387        if done {
6388            return None;
6389        }
6390
6391        let operations = operations
6392            .drain(..cmp::min(CHUNK_SIZE, operations.len()))
6393            .collect::<Vec<_>>();
6394        if operations.is_empty() {
6395            done = true;
6396        }
6397        Some(operations)
6398    })
6399}
6400
6401fn serialize_symbol(symbol: &Symbol) -> proto::Symbol {
6402    proto::Symbol {
6403        language_server_name: symbol.language_server_name.0.to_string(),
6404        source_worktree_id: symbol.source_worktree_id.to_proto(),
6405        worktree_id: symbol.path.worktree_id.to_proto(),
6406        path: symbol.path.path.to_string_lossy().to_string(),
6407        name: symbol.name.clone(),
6408        kind: unsafe { mem::transmute(symbol.kind) },
6409        start: Some(proto::PointUtf16 {
6410            row: symbol.range.start.0.row,
6411            column: symbol.range.start.0.column,
6412        }),
6413        end: Some(proto::PointUtf16 {
6414            row: symbol.range.end.0.row,
6415            column: symbol.range.end.0.column,
6416        }),
6417        signature: symbol.signature.to_vec(),
6418    }
6419}
6420
6421fn relativize_path(base: &Path, path: &Path) -> PathBuf {
6422    let mut path_components = path.components();
6423    let mut base_components = base.components();
6424    let mut components: Vec<Component> = Vec::new();
6425    loop {
6426        match (path_components.next(), base_components.next()) {
6427            (None, None) => break,
6428            (Some(a), None) => {
6429                components.push(a);
6430                components.extend(path_components.by_ref());
6431                break;
6432            }
6433            (None, _) => components.push(Component::ParentDir),
6434            (Some(a), Some(b)) if components.is_empty() && a == b => (),
6435            (Some(a), Some(b)) if b == Component::CurDir => components.push(a),
6436            (Some(a), Some(_)) => {
6437                components.push(Component::ParentDir);
6438                for _ in base_components {
6439                    components.push(Component::ParentDir);
6440                }
6441                components.push(a);
6442                components.extend(path_components.by_ref());
6443                break;
6444            }
6445        }
6446    }
6447    components.iter().map(|c| c.as_os_str()).collect()
6448}
6449
6450impl Item for Buffer {
6451    fn entry_id(&self, cx: &AppContext) -> Option<ProjectEntryId> {
6452        File::from_dyn(self.file()).and_then(|file| file.project_entry_id(cx))
6453    }
6454
6455    fn project_path(&self, cx: &AppContext) -> Option<ProjectPath> {
6456        File::from_dyn(self.file()).map(|file| ProjectPath {
6457            worktree_id: file.worktree_id(cx),
6458            path: file.path().clone(),
6459        })
6460    }
6461}