project.rs

   1mod ignore;
   2mod lsp_command;
   3mod project_settings;
   4pub mod search;
   5pub mod terminals;
   6pub mod worktree;
   7
   8#[cfg(test)]
   9mod project_tests;
  10
  11use anyhow::{anyhow, Context, Result};
  12use client::{proto, Client, TypedEnvelope, UserStore};
  13use clock::ReplicaId;
  14use collections::{hash_map, BTreeMap, HashMap, HashSet};
  15use copilot::Copilot;
  16use futures::{
  17    channel::mpsc::{self, UnboundedReceiver},
  18    future::{try_join_all, Shared},
  19    stream::FuturesUnordered,
  20    AsyncWriteExt, Future, FutureExt, StreamExt, TryFutureExt,
  21};
  22use globset::{Glob, GlobSet, GlobSetBuilder};
  23use gpui::{
  24    AnyModelHandle, AppContext, AsyncAppContext, BorrowAppContext, Entity, ModelContext,
  25    ModelHandle, Task, WeakModelHandle,
  26};
  27use language::{
  28    language_settings::{all_language_settings, language_settings, FormatOnSave, Formatter},
  29    point_to_lsp,
  30    proto::{
  31        deserialize_anchor, deserialize_fingerprint, deserialize_line_ending, deserialize_version,
  32        serialize_anchor, serialize_version,
  33    },
  34    range_from_lsp, range_to_lsp, Anchor, Bias, Buffer, CachedLspAdapter, CodeAction, CodeLabel,
  35    Completion, Diagnostic, DiagnosticEntry, DiagnosticSet, Diff, Event as BufferEvent, File as _,
  36    Language, LanguageRegistry, LanguageServerName, LocalFile, OffsetRangeExt, Operation, Patch,
  37    PendingLanguageServer, PointUtf16, RopeFingerprint, TextBufferSnapshot, ToOffset, ToPointUtf16,
  38    Transaction, Unclipped,
  39};
  40use lsp::{
  41    DiagnosticSeverity, DiagnosticTag, DidChangeWatchedFilesRegistrationOptions,
  42    DocumentHighlightKind, LanguageServer, LanguageServerId,
  43};
  44use lsp_command::*;
  45use postage::watch;
  46use project_settings::ProjectSettings;
  47use rand::prelude::*;
  48use search::SearchQuery;
  49use serde::Serialize;
  50use settings::SettingsStore;
  51use sha2::{Digest, Sha256};
  52use similar::{ChangeTag, TextDiff};
  53use std::{
  54    cell::RefCell,
  55    cmp::{self, Ordering},
  56    convert::TryInto,
  57    hash::Hash,
  58    mem,
  59    num::NonZeroU32,
  60    ops::Range,
  61    path::{Component, Path, PathBuf},
  62    rc::Rc,
  63    str,
  64    sync::{
  65        atomic::{AtomicUsize, Ordering::SeqCst},
  66        Arc,
  67    },
  68    time::{Duration, Instant, SystemTime},
  69};
  70use terminals::Terminals;
  71use util::{debug_panic, defer, merge_json_value_into, post_inc, ResultExt, TryFutureExt as _};
  72
  73pub use fs::*;
  74pub use worktree::*;
  75
  76pub trait Item {
  77    fn entry_id(&self, cx: &AppContext) -> Option<ProjectEntryId>;
  78    fn project_path(&self, cx: &AppContext) -> Option<ProjectPath>;
  79}
  80
  81// Language server state is stored across 3 collections:
  82//     language_servers =>
  83//         a mapping from unique server id to LanguageServerState which can either be a task for a
  84//         server in the process of starting, or a running server with adapter and language server arcs
  85//     language_server_ids => a mapping from worktreeId and server name to the unique server id
  86//     language_server_statuses => a mapping from unique server id to the current server status
  87//
  88// Multiple worktrees can map to the same language server for example when you jump to the definition
  89// of a file in the standard library. So language_server_ids is used to look up which server is active
  90// for a given worktree and language server name
  91//
  92// When starting a language server, first the id map is checked to make sure a server isn't already available
  93// for that worktree. If there is one, it finishes early. Otherwise, a new id is allocated and and
  94// the Starting variant of LanguageServerState is stored in the language_servers map.
  95pub struct Project {
  96    worktrees: Vec<WorktreeHandle>,
  97    active_entry: Option<ProjectEntryId>,
  98    buffer_ordered_messages_tx: mpsc::UnboundedSender<BufferOrderedMessage>,
  99    languages: Arc<LanguageRegistry>,
 100    language_servers: HashMap<LanguageServerId, LanguageServerState>,
 101    language_server_ids: HashMap<(WorktreeId, LanguageServerName), LanguageServerId>,
 102    language_server_statuses: BTreeMap<LanguageServerId, LanguageServerStatus>,
 103    last_workspace_edits_by_language_server: HashMap<LanguageServerId, ProjectTransaction>,
 104    client: Arc<client::Client>,
 105    next_entry_id: Arc<AtomicUsize>,
 106    join_project_response_message_id: u32,
 107    next_diagnostic_group_id: usize,
 108    user_store: ModelHandle<UserStore>,
 109    fs: Arc<dyn Fs>,
 110    client_state: Option<ProjectClientState>,
 111    collaborators: HashMap<proto::PeerId, Collaborator>,
 112    client_subscriptions: Vec<client::Subscription>,
 113    _subscriptions: Vec<gpui::Subscription>,
 114    next_buffer_id: u64,
 115    opened_buffer: (watch::Sender<()>, watch::Receiver<()>),
 116    shared_buffers: HashMap<proto::PeerId, HashSet<u64>>,
 117    #[allow(clippy::type_complexity)]
 118    loading_buffers_by_path: HashMap<
 119        ProjectPath,
 120        postage::watch::Receiver<Option<Result<ModelHandle<Buffer>, Arc<anyhow::Error>>>>,
 121    >,
 122    #[allow(clippy::type_complexity)]
 123    loading_local_worktrees:
 124        HashMap<Arc<Path>, Shared<Task<Result<ModelHandle<Worktree>, Arc<anyhow::Error>>>>>,
 125    opened_buffers: HashMap<u64, OpenBuffer>,
 126    local_buffer_ids_by_path: HashMap<ProjectPath, u64>,
 127    local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, u64>,
 128    /// A mapping from a buffer ID to None means that we've started waiting for an ID but haven't finished loading it.
 129    /// Used for re-issuing buffer requests when peers temporarily disconnect
 130    incomplete_remote_buffers: HashMap<u64, Option<ModelHandle<Buffer>>>,
 131    buffer_snapshots: HashMap<u64, HashMap<LanguageServerId, Vec<LspBufferSnapshot>>>, // buffer_id -> server_id -> vec of snapshots
 132    buffers_being_formatted: HashSet<u64>,
 133    buffers_needing_diff: HashSet<WeakModelHandle<Buffer>>,
 134    nonce: u128,
 135    _maintain_buffer_languages: Task<()>,
 136    _maintain_workspace_config: Task<()>,
 137    terminals: Terminals,
 138    copilot_enabled: bool,
 139}
 140
 141struct LspBufferSnapshot {
 142    version: i32,
 143    snapshot: TextBufferSnapshot,
 144}
 145
 146/// Message ordered with respect to buffer operations
 147enum BufferOrderedMessage {
 148    Operation {
 149        buffer_id: u64,
 150        operation: proto::Operation,
 151    },
 152    LanguageServerUpdate {
 153        language_server_id: LanguageServerId,
 154        message: proto::update_language_server::Variant,
 155    },
 156    Resync,
 157}
 158
 159enum LocalProjectUpdate {
 160    WorktreesChanged,
 161    CreateBufferForPeer {
 162        peer_id: proto::PeerId,
 163        buffer_id: u64,
 164    },
 165}
 166
 167enum OpenBuffer {
 168    Strong(ModelHandle<Buffer>),
 169    Weak(WeakModelHandle<Buffer>),
 170    Operations(Vec<Operation>),
 171}
 172
 173enum WorktreeHandle {
 174    Strong(ModelHandle<Worktree>),
 175    Weak(WeakModelHandle<Worktree>),
 176}
 177
 178enum ProjectClientState {
 179    Local {
 180        remote_id: u64,
 181        updates_tx: mpsc::UnboundedSender<LocalProjectUpdate>,
 182        _send_updates: Task<()>,
 183    },
 184    Remote {
 185        sharing_has_stopped: bool,
 186        remote_id: u64,
 187        replica_id: ReplicaId,
 188    },
 189}
 190
 191#[derive(Clone, Debug)]
 192pub struct Collaborator {
 193    pub peer_id: proto::PeerId,
 194    pub replica_id: ReplicaId,
 195}
 196
 197#[derive(Clone, Debug, PartialEq, Eq)]
 198pub enum Event {
 199    LanguageServerAdded(LanguageServerId),
 200    LanguageServerRemoved(LanguageServerId),
 201    ActiveEntryChanged(Option<ProjectEntryId>),
 202    WorktreeAdded,
 203    WorktreeRemoved(WorktreeId),
 204    DiskBasedDiagnosticsStarted {
 205        language_server_id: LanguageServerId,
 206    },
 207    DiskBasedDiagnosticsFinished {
 208        language_server_id: LanguageServerId,
 209    },
 210    DiagnosticsUpdated {
 211        path: ProjectPath,
 212        language_server_id: LanguageServerId,
 213    },
 214    RemoteIdChanged(Option<u64>),
 215    DisconnectedFromHost,
 216    Closed,
 217    DeletedEntry(ProjectEntryId),
 218    CollaboratorUpdated {
 219        old_peer_id: proto::PeerId,
 220        new_peer_id: proto::PeerId,
 221    },
 222    CollaboratorLeft(proto::PeerId),
 223}
 224
 225pub enum LanguageServerState {
 226    Starting(Task<Option<Arc<LanguageServer>>>),
 227    Running {
 228        language: Arc<Language>,
 229        adapter: Arc<CachedLspAdapter>,
 230        server: Arc<LanguageServer>,
 231        watched_paths: HashMap<WorktreeId, GlobSet>,
 232        simulate_disk_based_diagnostics_completion: Option<Task<()>>,
 233    },
 234}
 235
 236#[derive(Serialize)]
 237pub struct LanguageServerStatus {
 238    pub name: String,
 239    pub pending_work: BTreeMap<String, LanguageServerProgress>,
 240    pub has_pending_diagnostic_updates: bool,
 241    progress_tokens: HashSet<String>,
 242}
 243
 244#[derive(Clone, Debug, Serialize)]
 245pub struct LanguageServerProgress {
 246    pub message: Option<String>,
 247    pub percentage: Option<usize>,
 248    #[serde(skip_serializing)]
 249    pub last_update_at: Instant,
 250}
 251
 252#[derive(Clone, Debug, Eq, PartialEq, Hash, PartialOrd, Ord)]
 253pub struct ProjectPath {
 254    pub worktree_id: WorktreeId,
 255    pub path: Arc<Path>,
 256}
 257
 258#[derive(Copy, Clone, Debug, Default, PartialEq, Serialize)]
 259pub struct DiagnosticSummary {
 260    pub error_count: usize,
 261    pub warning_count: usize,
 262}
 263
 264#[derive(Debug, Clone)]
 265pub struct Location {
 266    pub buffer: ModelHandle<Buffer>,
 267    pub range: Range<language::Anchor>,
 268}
 269
 270#[derive(Debug, Clone)]
 271pub struct LocationLink {
 272    pub origin: Option<Location>,
 273    pub target: Location,
 274}
 275
 276#[derive(Debug)]
 277pub struct DocumentHighlight {
 278    pub range: Range<language::Anchor>,
 279    pub kind: DocumentHighlightKind,
 280}
 281
 282#[derive(Clone, Debug)]
 283pub struct Symbol {
 284    pub language_server_name: LanguageServerName,
 285    pub source_worktree_id: WorktreeId,
 286    pub path: ProjectPath,
 287    pub label: CodeLabel,
 288    pub name: String,
 289    pub kind: lsp::SymbolKind,
 290    pub range: Range<Unclipped<PointUtf16>>,
 291    pub signature: [u8; 32],
 292}
 293
 294#[derive(Clone, Debug, PartialEq)]
 295pub struct HoverBlock {
 296    pub text: String,
 297    pub kind: HoverBlockKind,
 298}
 299
 300#[derive(Clone, Debug, PartialEq)]
 301pub enum HoverBlockKind {
 302    PlainText,
 303    Markdown,
 304    Code { language: String },
 305}
 306
 307#[derive(Debug)]
 308pub struct Hover {
 309    pub contents: Vec<HoverBlock>,
 310    pub range: Option<Range<language::Anchor>>,
 311}
 312
 313#[derive(Default)]
 314pub struct ProjectTransaction(pub HashMap<ModelHandle<Buffer>, language::Transaction>);
 315
 316impl DiagnosticSummary {
 317    fn new<'a, T: 'a>(diagnostics: impl IntoIterator<Item = &'a DiagnosticEntry<T>>) -> Self {
 318        let mut this = Self {
 319            error_count: 0,
 320            warning_count: 0,
 321        };
 322
 323        for entry in diagnostics {
 324            if entry.diagnostic.is_primary {
 325                match entry.diagnostic.severity {
 326                    DiagnosticSeverity::ERROR => this.error_count += 1,
 327                    DiagnosticSeverity::WARNING => this.warning_count += 1,
 328                    _ => {}
 329                }
 330            }
 331        }
 332
 333        this
 334    }
 335
 336    pub fn is_empty(&self) -> bool {
 337        self.error_count == 0 && self.warning_count == 0
 338    }
 339
 340    pub fn to_proto(
 341        &self,
 342        language_server_id: LanguageServerId,
 343        path: &Path,
 344    ) -> proto::DiagnosticSummary {
 345        proto::DiagnosticSummary {
 346            path: path.to_string_lossy().to_string(),
 347            language_server_id: language_server_id.0 as u64,
 348            error_count: self.error_count as u32,
 349            warning_count: self.warning_count as u32,
 350        }
 351    }
 352}
 353
 354#[derive(Clone, Copy, Debug, Default, Hash, PartialEq, Eq, PartialOrd, Ord)]
 355pub struct ProjectEntryId(usize);
 356
 357impl ProjectEntryId {
 358    pub const MAX: Self = Self(usize::MAX);
 359
 360    pub fn new(counter: &AtomicUsize) -> Self {
 361        Self(counter.fetch_add(1, SeqCst))
 362    }
 363
 364    pub fn from_proto(id: u64) -> Self {
 365        Self(id as usize)
 366    }
 367
 368    pub fn to_proto(&self) -> u64 {
 369        self.0 as u64
 370    }
 371
 372    pub fn to_usize(&self) -> usize {
 373        self.0
 374    }
 375}
 376
 377#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 378pub enum FormatTrigger {
 379    Save,
 380    Manual,
 381}
 382
 383impl FormatTrigger {
 384    fn from_proto(value: i32) -> FormatTrigger {
 385        match value {
 386            0 => FormatTrigger::Save,
 387            1 => FormatTrigger::Manual,
 388            _ => FormatTrigger::Save,
 389        }
 390    }
 391}
 392
 393impl Project {
 394    pub fn init_settings(cx: &mut AppContext) {
 395        settings::register::<ProjectSettings>(cx);
 396    }
 397
 398    pub fn init(client: &Arc<Client>, cx: &mut AppContext) {
 399        Self::init_settings(cx);
 400
 401        client.add_model_message_handler(Self::handle_add_collaborator);
 402        client.add_model_message_handler(Self::handle_update_project_collaborator);
 403        client.add_model_message_handler(Self::handle_remove_collaborator);
 404        client.add_model_message_handler(Self::handle_buffer_reloaded);
 405        client.add_model_message_handler(Self::handle_buffer_saved);
 406        client.add_model_message_handler(Self::handle_start_language_server);
 407        client.add_model_message_handler(Self::handle_update_language_server);
 408        client.add_model_message_handler(Self::handle_update_project);
 409        client.add_model_message_handler(Self::handle_unshare_project);
 410        client.add_model_message_handler(Self::handle_create_buffer_for_peer);
 411        client.add_model_message_handler(Self::handle_update_buffer_file);
 412        client.add_model_request_handler(Self::handle_update_buffer);
 413        client.add_model_message_handler(Self::handle_update_diagnostic_summary);
 414        client.add_model_message_handler(Self::handle_update_worktree);
 415        client.add_model_request_handler(Self::handle_create_project_entry);
 416        client.add_model_request_handler(Self::handle_rename_project_entry);
 417        client.add_model_request_handler(Self::handle_copy_project_entry);
 418        client.add_model_request_handler(Self::handle_delete_project_entry);
 419        client.add_model_request_handler(Self::handle_apply_additional_edits_for_completion);
 420        client.add_model_request_handler(Self::handle_apply_code_action);
 421        client.add_model_request_handler(Self::handle_on_type_formatting);
 422        client.add_model_request_handler(Self::handle_reload_buffers);
 423        client.add_model_request_handler(Self::handle_synchronize_buffers);
 424        client.add_model_request_handler(Self::handle_format_buffers);
 425        client.add_model_request_handler(Self::handle_lsp_command::<GetCodeActions>);
 426        client.add_model_request_handler(Self::handle_lsp_command::<GetCompletions>);
 427        client.add_model_request_handler(Self::handle_lsp_command::<GetHover>);
 428        client.add_model_request_handler(Self::handle_lsp_command::<GetDefinition>);
 429        client.add_model_request_handler(Self::handle_lsp_command::<GetTypeDefinition>);
 430        client.add_model_request_handler(Self::handle_lsp_command::<GetDocumentHighlights>);
 431        client.add_model_request_handler(Self::handle_lsp_command::<GetReferences>);
 432        client.add_model_request_handler(Self::handle_lsp_command::<PrepareRename>);
 433        client.add_model_request_handler(Self::handle_lsp_command::<PerformRename>);
 434        client.add_model_request_handler(Self::handle_search_project);
 435        client.add_model_request_handler(Self::handle_get_project_symbols);
 436        client.add_model_request_handler(Self::handle_open_buffer_for_symbol);
 437        client.add_model_request_handler(Self::handle_open_buffer_by_id);
 438        client.add_model_request_handler(Self::handle_open_buffer_by_path);
 439        client.add_model_request_handler(Self::handle_save_buffer);
 440        client.add_model_message_handler(Self::handle_update_diff_base);
 441    }
 442
 443    pub fn local(
 444        client: Arc<Client>,
 445        user_store: ModelHandle<UserStore>,
 446        languages: Arc<LanguageRegistry>,
 447        fs: Arc<dyn Fs>,
 448        cx: &mut AppContext,
 449    ) -> ModelHandle<Self> {
 450        cx.add_model(|cx: &mut ModelContext<Self>| {
 451            let (tx, rx) = mpsc::unbounded();
 452            cx.spawn_weak(|this, cx| Self::send_buffer_ordered_messages(this, rx, cx))
 453                .detach();
 454            Self {
 455                worktrees: Default::default(),
 456                buffer_ordered_messages_tx: tx,
 457                collaborators: Default::default(),
 458                next_buffer_id: 0,
 459                opened_buffers: Default::default(),
 460                shared_buffers: Default::default(),
 461                incomplete_remote_buffers: Default::default(),
 462                loading_buffers_by_path: Default::default(),
 463                loading_local_worktrees: Default::default(),
 464                local_buffer_ids_by_path: Default::default(),
 465                local_buffer_ids_by_entry_id: Default::default(),
 466                buffer_snapshots: Default::default(),
 467                join_project_response_message_id: 0,
 468                client_state: None,
 469                opened_buffer: watch::channel(),
 470                client_subscriptions: Vec::new(),
 471                _subscriptions: vec![
 472                    cx.observe_global::<SettingsStore, _>(Self::on_settings_changed)
 473                ],
 474                _maintain_buffer_languages: Self::maintain_buffer_languages(&languages, cx),
 475                _maintain_workspace_config: Self::maintain_workspace_config(languages.clone(), cx),
 476                active_entry: None,
 477                languages,
 478                client,
 479                user_store,
 480                fs,
 481                next_entry_id: Default::default(),
 482                next_diagnostic_group_id: Default::default(),
 483                language_servers: Default::default(),
 484                language_server_ids: Default::default(),
 485                language_server_statuses: Default::default(),
 486                last_workspace_edits_by_language_server: Default::default(),
 487                buffers_being_formatted: Default::default(),
 488                buffers_needing_diff: Default::default(),
 489                nonce: StdRng::from_entropy().gen(),
 490                terminals: Terminals {
 491                    local_handles: Vec::new(),
 492                },
 493                copilot_enabled: Copilot::global(cx).is_some(),
 494            }
 495        })
 496    }
 497
 498    pub async fn remote(
 499        remote_id: u64,
 500        client: Arc<Client>,
 501        user_store: ModelHandle<UserStore>,
 502        languages: Arc<LanguageRegistry>,
 503        fs: Arc<dyn Fs>,
 504        mut cx: AsyncAppContext,
 505    ) -> Result<ModelHandle<Self>> {
 506        client.authenticate_and_connect(true, &cx).await?;
 507
 508        let subscription = client.subscribe_to_entity(remote_id)?;
 509        let response = client
 510            .request_envelope(proto::JoinProject {
 511                project_id: remote_id,
 512            })
 513            .await?;
 514        let this = cx.add_model(|cx| {
 515            let replica_id = response.payload.replica_id as ReplicaId;
 516
 517            let mut worktrees = Vec::new();
 518            for worktree in response.payload.worktrees {
 519                let worktree = cx.update(|cx| {
 520                    Worktree::remote(remote_id, replica_id, worktree, client.clone(), cx)
 521                });
 522                worktrees.push(worktree);
 523            }
 524
 525            let (tx, rx) = mpsc::unbounded();
 526            cx.spawn_weak(|this, cx| Self::send_buffer_ordered_messages(this, rx, cx))
 527                .detach();
 528            let mut this = Self {
 529                worktrees: Vec::new(),
 530                buffer_ordered_messages_tx: tx,
 531                loading_buffers_by_path: Default::default(),
 532                next_buffer_id: 0,
 533                opened_buffer: watch::channel(),
 534                shared_buffers: Default::default(),
 535                incomplete_remote_buffers: Default::default(),
 536                loading_local_worktrees: Default::default(),
 537                local_buffer_ids_by_path: Default::default(),
 538                local_buffer_ids_by_entry_id: Default::default(),
 539                active_entry: None,
 540                collaborators: Default::default(),
 541                join_project_response_message_id: response.message_id,
 542                _maintain_buffer_languages: Self::maintain_buffer_languages(&languages, cx),
 543                _maintain_workspace_config: Self::maintain_workspace_config(languages.clone(), cx),
 544                languages,
 545                user_store: user_store.clone(),
 546                fs,
 547                next_entry_id: Default::default(),
 548                next_diagnostic_group_id: Default::default(),
 549                client_subscriptions: Default::default(),
 550                _subscriptions: Default::default(),
 551                client: client.clone(),
 552                client_state: Some(ProjectClientState::Remote {
 553                    sharing_has_stopped: false,
 554                    remote_id,
 555                    replica_id,
 556                }),
 557                language_servers: Default::default(),
 558                language_server_ids: Default::default(),
 559                language_server_statuses: response
 560                    .payload
 561                    .language_servers
 562                    .into_iter()
 563                    .map(|server| {
 564                        (
 565                            LanguageServerId(server.id as usize),
 566                            LanguageServerStatus {
 567                                name: server.name,
 568                                pending_work: Default::default(),
 569                                has_pending_diagnostic_updates: false,
 570                                progress_tokens: Default::default(),
 571                            },
 572                        )
 573                    })
 574                    .collect(),
 575                last_workspace_edits_by_language_server: Default::default(),
 576                opened_buffers: Default::default(),
 577                buffers_being_formatted: Default::default(),
 578                buffers_needing_diff: Default::default(),
 579                buffer_snapshots: Default::default(),
 580                nonce: StdRng::from_entropy().gen(),
 581                terminals: Terminals {
 582                    local_handles: Vec::new(),
 583                },
 584                copilot_enabled: Copilot::global(cx).is_some(),
 585            };
 586            for worktree in worktrees {
 587                let _ = this.add_worktree(&worktree, cx);
 588            }
 589            this
 590        });
 591        let subscription = subscription.set_model(&this, &mut cx);
 592
 593        let user_ids = response
 594            .payload
 595            .collaborators
 596            .iter()
 597            .map(|peer| peer.user_id)
 598            .collect();
 599        user_store
 600            .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))
 601            .await?;
 602
 603        this.update(&mut cx, |this, cx| {
 604            this.set_collaborators_from_proto(response.payload.collaborators, cx)?;
 605            this.client_subscriptions.push(subscription);
 606            anyhow::Ok(())
 607        })?;
 608
 609        Ok(this)
 610    }
 611
 612    #[cfg(any(test, feature = "test-support"))]
 613    pub async fn test(
 614        fs: Arc<dyn Fs>,
 615        root_paths: impl IntoIterator<Item = &Path>,
 616        cx: &mut gpui::TestAppContext,
 617    ) -> ModelHandle<Project> {
 618        let mut languages = LanguageRegistry::test();
 619        languages.set_executor(cx.background());
 620        let http_client = util::http::FakeHttpClient::with_404_response();
 621        let client = cx.update(|cx| client::Client::new(http_client.clone(), cx));
 622        let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
 623        let project =
 624            cx.update(|cx| Project::local(client, user_store, Arc::new(languages), fs, cx));
 625        for path in root_paths {
 626            let (tree, _) = project
 627                .update(cx, |project, cx| {
 628                    project.find_or_create_local_worktree(path, true, cx)
 629                })
 630                .await
 631                .unwrap();
 632            tree.read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
 633                .await;
 634        }
 635        project
 636    }
 637
 638    fn on_settings_changed(&mut self, cx: &mut ModelContext<Self>) {
 639        let settings = all_language_settings(cx);
 640
 641        let mut language_servers_to_start = Vec::new();
 642        for buffer in self.opened_buffers.values() {
 643            if let Some(buffer) = buffer.upgrade(cx) {
 644                let buffer = buffer.read(cx);
 645                if let Some((file, language)) = File::from_dyn(buffer.file()).zip(buffer.language())
 646                {
 647                    if settings
 648                        .language(Some(&language.name()))
 649                        .enable_language_server
 650                    {
 651                        let worktree = file.worktree.read(cx);
 652                        language_servers_to_start.push((
 653                            worktree.id(),
 654                            worktree.as_local().unwrap().abs_path().clone(),
 655                            language.clone(),
 656                        ));
 657                    }
 658                }
 659            }
 660        }
 661
 662        let mut language_servers_to_stop = Vec::new();
 663        for language in self.languages.to_vec() {
 664            for lsp_adapter in language.lsp_adapters() {
 665                if !settings
 666                    .language(Some(&language.name()))
 667                    .enable_language_server
 668                {
 669                    let lsp_name = &lsp_adapter.name;
 670                    for (worktree_id, started_lsp_name) in self.language_server_ids.keys() {
 671                        if lsp_name == started_lsp_name {
 672                            language_servers_to_stop.push((*worktree_id, started_lsp_name.clone()));
 673                        }
 674                    }
 675                }
 676            }
 677        }
 678
 679        // Stop all newly-disabled language servers.
 680        for (worktree_id, adapter_name) in language_servers_to_stop {
 681            self.stop_language_server(worktree_id, adapter_name, cx)
 682                .detach();
 683        }
 684
 685        // Start all the newly-enabled language servers.
 686        for (worktree_id, worktree_path, language) in language_servers_to_start {
 687            self.start_language_servers(worktree_id, worktree_path, language, cx);
 688        }
 689
 690        if !self.copilot_enabled && Copilot::global(cx).is_some() {
 691            self.copilot_enabled = true;
 692            for buffer in self.opened_buffers.values() {
 693                if let Some(buffer) = buffer.upgrade(cx) {
 694                    self.register_buffer_with_copilot(&buffer, cx);
 695                }
 696            }
 697        }
 698
 699        cx.notify();
 700    }
 701
 702    pub fn buffer_for_id(&self, remote_id: u64, cx: &AppContext) -> Option<ModelHandle<Buffer>> {
 703        self.opened_buffers
 704            .get(&remote_id)
 705            .and_then(|buffer| buffer.upgrade(cx))
 706    }
 707
 708    pub fn languages(&self) -> &Arc<LanguageRegistry> {
 709        &self.languages
 710    }
 711
 712    pub fn client(&self) -> Arc<Client> {
 713        self.client.clone()
 714    }
 715
 716    pub fn user_store(&self) -> ModelHandle<UserStore> {
 717        self.user_store.clone()
 718    }
 719
 720    #[cfg(any(test, feature = "test-support"))]
 721    pub fn opened_buffers(&self, cx: &AppContext) -> Vec<ModelHandle<Buffer>> {
 722        self.opened_buffers
 723            .values()
 724            .filter_map(|b| b.upgrade(cx))
 725            .collect()
 726    }
 727
 728    #[cfg(any(test, feature = "test-support"))]
 729    pub fn has_open_buffer(&self, path: impl Into<ProjectPath>, cx: &AppContext) -> bool {
 730        let path = path.into();
 731        if let Some(worktree) = self.worktree_for_id(path.worktree_id, cx) {
 732            self.opened_buffers.iter().any(|(_, buffer)| {
 733                if let Some(buffer) = buffer.upgrade(cx) {
 734                    if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
 735                        if file.worktree == worktree && file.path() == &path.path {
 736                            return true;
 737                        }
 738                    }
 739                }
 740                false
 741            })
 742        } else {
 743            false
 744        }
 745    }
 746
 747    pub fn fs(&self) -> &Arc<dyn Fs> {
 748        &self.fs
 749    }
 750
 751    pub fn remote_id(&self) -> Option<u64> {
 752        match self.client_state.as_ref()? {
 753            ProjectClientState::Local { remote_id, .. }
 754            | ProjectClientState::Remote { remote_id, .. } => Some(*remote_id),
 755        }
 756    }
 757
 758    pub fn replica_id(&self) -> ReplicaId {
 759        match &self.client_state {
 760            Some(ProjectClientState::Remote { replica_id, .. }) => *replica_id,
 761            _ => 0,
 762        }
 763    }
 764
 765    fn metadata_changed(&mut self, cx: &mut ModelContext<Self>) {
 766        if let Some(ProjectClientState::Local { updates_tx, .. }) = &mut self.client_state {
 767            updates_tx
 768                .unbounded_send(LocalProjectUpdate::WorktreesChanged)
 769                .ok();
 770        }
 771        cx.notify();
 772    }
 773
 774    pub fn collaborators(&self) -> &HashMap<proto::PeerId, Collaborator> {
 775        &self.collaborators
 776    }
 777
 778    /// Collect all worktrees, including ones that don't appear in the project panel
 779    pub fn worktrees<'a>(
 780        &'a self,
 781        cx: &'a AppContext,
 782    ) -> impl 'a + DoubleEndedIterator<Item = ModelHandle<Worktree>> {
 783        self.worktrees
 784            .iter()
 785            .filter_map(move |worktree| worktree.upgrade(cx))
 786    }
 787
 788    /// Collect all user-visible worktrees, the ones that appear in the project panel
 789    pub fn visible_worktrees<'a>(
 790        &'a self,
 791        cx: &'a AppContext,
 792    ) -> impl 'a + DoubleEndedIterator<Item = ModelHandle<Worktree>> {
 793        self.worktrees.iter().filter_map(|worktree| {
 794            worktree.upgrade(cx).and_then(|worktree| {
 795                if worktree.read(cx).is_visible() {
 796                    Some(worktree)
 797                } else {
 798                    None
 799                }
 800            })
 801        })
 802    }
 803
 804    pub fn worktree_root_names<'a>(&'a self, cx: &'a AppContext) -> impl Iterator<Item = &'a str> {
 805        self.visible_worktrees(cx)
 806            .map(|tree| tree.read(cx).root_name())
 807    }
 808
 809    pub fn worktree_for_id(
 810        &self,
 811        id: WorktreeId,
 812        cx: &AppContext,
 813    ) -> Option<ModelHandle<Worktree>> {
 814        self.worktrees(cx)
 815            .find(|worktree| worktree.read(cx).id() == id)
 816    }
 817
 818    pub fn worktree_for_entry(
 819        &self,
 820        entry_id: ProjectEntryId,
 821        cx: &AppContext,
 822    ) -> Option<ModelHandle<Worktree>> {
 823        self.worktrees(cx)
 824            .find(|worktree| worktree.read(cx).contains_entry(entry_id))
 825    }
 826
 827    pub fn worktree_id_for_entry(
 828        &self,
 829        entry_id: ProjectEntryId,
 830        cx: &AppContext,
 831    ) -> Option<WorktreeId> {
 832        self.worktree_for_entry(entry_id, cx)
 833            .map(|worktree| worktree.read(cx).id())
 834    }
 835
 836    pub fn contains_paths(&self, paths: &[PathBuf], cx: &AppContext) -> bool {
 837        paths.iter().all(|path| self.contains_path(path, cx))
 838    }
 839
 840    pub fn contains_path(&self, path: &Path, cx: &AppContext) -> bool {
 841        for worktree in self.worktrees(cx) {
 842            let worktree = worktree.read(cx).as_local();
 843            if worktree.map_or(false, |w| w.contains_abs_path(path)) {
 844                return true;
 845            }
 846        }
 847        false
 848    }
 849
 850    pub fn create_entry(
 851        &mut self,
 852        project_path: impl Into<ProjectPath>,
 853        is_directory: bool,
 854        cx: &mut ModelContext<Self>,
 855    ) -> Option<Task<Result<Entry>>> {
 856        let project_path = project_path.into();
 857        let worktree = self.worktree_for_id(project_path.worktree_id, cx)?;
 858        if self.is_local() {
 859            Some(worktree.update(cx, |worktree, cx| {
 860                worktree
 861                    .as_local_mut()
 862                    .unwrap()
 863                    .create_entry(project_path.path, is_directory, cx)
 864            }))
 865        } else {
 866            let client = self.client.clone();
 867            let project_id = self.remote_id().unwrap();
 868            Some(cx.spawn_weak(|_, mut cx| async move {
 869                let response = client
 870                    .request(proto::CreateProjectEntry {
 871                        worktree_id: project_path.worktree_id.to_proto(),
 872                        project_id,
 873                        path: project_path.path.to_string_lossy().into(),
 874                        is_directory,
 875                    })
 876                    .await?;
 877                let entry = response
 878                    .entry
 879                    .ok_or_else(|| anyhow!("missing entry in response"))?;
 880                worktree
 881                    .update(&mut cx, |worktree, cx| {
 882                        worktree.as_remote_mut().unwrap().insert_entry(
 883                            entry,
 884                            response.worktree_scan_id as usize,
 885                            cx,
 886                        )
 887                    })
 888                    .await
 889            }))
 890        }
 891    }
 892
 893    pub fn copy_entry(
 894        &mut self,
 895        entry_id: ProjectEntryId,
 896        new_path: impl Into<Arc<Path>>,
 897        cx: &mut ModelContext<Self>,
 898    ) -> Option<Task<Result<Entry>>> {
 899        let worktree = self.worktree_for_entry(entry_id, cx)?;
 900        let new_path = new_path.into();
 901        if self.is_local() {
 902            worktree.update(cx, |worktree, cx| {
 903                worktree
 904                    .as_local_mut()
 905                    .unwrap()
 906                    .copy_entry(entry_id, new_path, cx)
 907            })
 908        } else {
 909            let client = self.client.clone();
 910            let project_id = self.remote_id().unwrap();
 911
 912            Some(cx.spawn_weak(|_, mut cx| async move {
 913                let response = client
 914                    .request(proto::CopyProjectEntry {
 915                        project_id,
 916                        entry_id: entry_id.to_proto(),
 917                        new_path: new_path.to_string_lossy().into(),
 918                    })
 919                    .await?;
 920                let entry = response
 921                    .entry
 922                    .ok_or_else(|| anyhow!("missing entry in response"))?;
 923                worktree
 924                    .update(&mut cx, |worktree, cx| {
 925                        worktree.as_remote_mut().unwrap().insert_entry(
 926                            entry,
 927                            response.worktree_scan_id as usize,
 928                            cx,
 929                        )
 930                    })
 931                    .await
 932            }))
 933        }
 934    }
 935
 936    pub fn rename_entry(
 937        &mut self,
 938        entry_id: ProjectEntryId,
 939        new_path: impl Into<Arc<Path>>,
 940        cx: &mut ModelContext<Self>,
 941    ) -> Option<Task<Result<Entry>>> {
 942        let worktree = self.worktree_for_entry(entry_id, cx)?;
 943        let new_path = new_path.into();
 944        if self.is_local() {
 945            worktree.update(cx, |worktree, cx| {
 946                worktree
 947                    .as_local_mut()
 948                    .unwrap()
 949                    .rename_entry(entry_id, new_path, cx)
 950            })
 951        } else {
 952            let client = self.client.clone();
 953            let project_id = self.remote_id().unwrap();
 954
 955            Some(cx.spawn_weak(|_, mut cx| async move {
 956                let response = client
 957                    .request(proto::RenameProjectEntry {
 958                        project_id,
 959                        entry_id: entry_id.to_proto(),
 960                        new_path: new_path.to_string_lossy().into(),
 961                    })
 962                    .await?;
 963                let entry = response
 964                    .entry
 965                    .ok_or_else(|| anyhow!("missing entry in response"))?;
 966                worktree
 967                    .update(&mut cx, |worktree, cx| {
 968                        worktree.as_remote_mut().unwrap().insert_entry(
 969                            entry,
 970                            response.worktree_scan_id as usize,
 971                            cx,
 972                        )
 973                    })
 974                    .await
 975            }))
 976        }
 977    }
 978
 979    pub fn delete_entry(
 980        &mut self,
 981        entry_id: ProjectEntryId,
 982        cx: &mut ModelContext<Self>,
 983    ) -> Option<Task<Result<()>>> {
 984        let worktree = self.worktree_for_entry(entry_id, cx)?;
 985
 986        cx.emit(Event::DeletedEntry(entry_id));
 987
 988        if self.is_local() {
 989            worktree.update(cx, |worktree, cx| {
 990                worktree.as_local_mut().unwrap().delete_entry(entry_id, cx)
 991            })
 992        } else {
 993            let client = self.client.clone();
 994            let project_id = self.remote_id().unwrap();
 995            Some(cx.spawn_weak(|_, mut cx| async move {
 996                let response = client
 997                    .request(proto::DeleteProjectEntry {
 998                        project_id,
 999                        entry_id: entry_id.to_proto(),
1000                    })
1001                    .await?;
1002                worktree
1003                    .update(&mut cx, move |worktree, cx| {
1004                        worktree.as_remote_mut().unwrap().delete_entry(
1005                            entry_id,
1006                            response.worktree_scan_id as usize,
1007                            cx,
1008                        )
1009                    })
1010                    .await
1011            }))
1012        }
1013    }
1014
1015    pub fn shared(&mut self, project_id: u64, cx: &mut ModelContext<Self>) -> Result<()> {
1016        if self.client_state.is_some() {
1017            return Err(anyhow!("project was already shared"));
1018        }
1019        self.client_subscriptions.push(
1020            self.client
1021                .subscribe_to_entity(project_id)?
1022                .set_model(&cx.handle(), &mut cx.to_async()),
1023        );
1024
1025        for open_buffer in self.opened_buffers.values_mut() {
1026            match open_buffer {
1027                OpenBuffer::Strong(_) => {}
1028                OpenBuffer::Weak(buffer) => {
1029                    if let Some(buffer) = buffer.upgrade(cx) {
1030                        *open_buffer = OpenBuffer::Strong(buffer);
1031                    }
1032                }
1033                OpenBuffer::Operations(_) => unreachable!(),
1034            }
1035        }
1036
1037        for worktree_handle in self.worktrees.iter_mut() {
1038            match worktree_handle {
1039                WorktreeHandle::Strong(_) => {}
1040                WorktreeHandle::Weak(worktree) => {
1041                    if let Some(worktree) = worktree.upgrade(cx) {
1042                        *worktree_handle = WorktreeHandle::Strong(worktree);
1043                    }
1044                }
1045            }
1046        }
1047
1048        for (server_id, status) in &self.language_server_statuses {
1049            self.client
1050                .send(proto::StartLanguageServer {
1051                    project_id,
1052                    server: Some(proto::LanguageServer {
1053                        id: server_id.0 as u64,
1054                        name: status.name.clone(),
1055                    }),
1056                })
1057                .log_err();
1058        }
1059
1060        let (updates_tx, mut updates_rx) = mpsc::unbounded();
1061        let client = self.client.clone();
1062        self.client_state = Some(ProjectClientState::Local {
1063            remote_id: project_id,
1064            updates_tx,
1065            _send_updates: cx.spawn_weak(move |this, mut cx| async move {
1066                while let Some(update) = updates_rx.next().await {
1067                    let Some(this) = this.upgrade(&cx) else { break };
1068
1069                    match update {
1070                        LocalProjectUpdate::WorktreesChanged => {
1071                            let worktrees = this
1072                                .read_with(&cx, |this, cx| this.worktrees(cx).collect::<Vec<_>>());
1073                            let update_project = this
1074                                .read_with(&cx, |this, cx| {
1075                                    this.client.request(proto::UpdateProject {
1076                                        project_id,
1077                                        worktrees: this.worktree_metadata_protos(cx),
1078                                    })
1079                                })
1080                                .await;
1081                            if update_project.is_ok() {
1082                                for worktree in worktrees {
1083                                    worktree.update(&mut cx, |worktree, cx| {
1084                                        let worktree = worktree.as_local_mut().unwrap();
1085                                        worktree.share(project_id, cx).detach_and_log_err(cx)
1086                                    });
1087                                }
1088                            }
1089                        }
1090                        LocalProjectUpdate::CreateBufferForPeer { peer_id, buffer_id } => {
1091                            let buffer = this.update(&mut cx, |this, _| {
1092                                let buffer = this.opened_buffers.get(&buffer_id).unwrap();
1093                                let shared_buffers =
1094                                    this.shared_buffers.entry(peer_id).or_default();
1095                                if shared_buffers.insert(buffer_id) {
1096                                    if let OpenBuffer::Strong(buffer) = buffer {
1097                                        Some(buffer.clone())
1098                                    } else {
1099                                        None
1100                                    }
1101                                } else {
1102                                    None
1103                                }
1104                            });
1105
1106                            let Some(buffer) = buffer else { continue };
1107                            let operations =
1108                                buffer.read_with(&cx, |b, cx| b.serialize_ops(None, cx));
1109                            let operations = operations.await;
1110                            let state = buffer.read_with(&cx, |buffer, _| buffer.to_proto());
1111
1112                            let initial_state = proto::CreateBufferForPeer {
1113                                project_id,
1114                                peer_id: Some(peer_id),
1115                                variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
1116                            };
1117                            if client.send(initial_state).log_err().is_some() {
1118                                let client = client.clone();
1119                                cx.background()
1120                                    .spawn(async move {
1121                                        let mut chunks = split_operations(operations).peekable();
1122                                        while let Some(chunk) = chunks.next() {
1123                                            let is_last = chunks.peek().is_none();
1124                                            client.send(proto::CreateBufferForPeer {
1125                                                project_id,
1126                                                peer_id: Some(peer_id),
1127                                                variant: Some(
1128                                                    proto::create_buffer_for_peer::Variant::Chunk(
1129                                                        proto::BufferChunk {
1130                                                            buffer_id,
1131                                                            operations: chunk,
1132                                                            is_last,
1133                                                        },
1134                                                    ),
1135                                                ),
1136                                            })?;
1137                                        }
1138                                        anyhow::Ok(())
1139                                    })
1140                                    .await
1141                                    .log_err();
1142                            }
1143                        }
1144                    }
1145                }
1146            }),
1147        });
1148
1149        self.metadata_changed(cx);
1150        cx.emit(Event::RemoteIdChanged(Some(project_id)));
1151        cx.notify();
1152        Ok(())
1153    }
1154
1155    pub fn reshared(
1156        &mut self,
1157        message: proto::ResharedProject,
1158        cx: &mut ModelContext<Self>,
1159    ) -> Result<()> {
1160        self.shared_buffers.clear();
1161        self.set_collaborators_from_proto(message.collaborators, cx)?;
1162        self.metadata_changed(cx);
1163        Ok(())
1164    }
1165
1166    pub fn rejoined(
1167        &mut self,
1168        message: proto::RejoinedProject,
1169        message_id: u32,
1170        cx: &mut ModelContext<Self>,
1171    ) -> Result<()> {
1172        self.join_project_response_message_id = message_id;
1173        self.set_worktrees_from_proto(message.worktrees, cx)?;
1174        self.set_collaborators_from_proto(message.collaborators, cx)?;
1175        self.language_server_statuses = message
1176            .language_servers
1177            .into_iter()
1178            .map(|server| {
1179                (
1180                    LanguageServerId(server.id as usize),
1181                    LanguageServerStatus {
1182                        name: server.name,
1183                        pending_work: Default::default(),
1184                        has_pending_diagnostic_updates: false,
1185                        progress_tokens: Default::default(),
1186                    },
1187                )
1188            })
1189            .collect();
1190        self.buffer_ordered_messages_tx
1191            .unbounded_send(BufferOrderedMessage::Resync)
1192            .unwrap();
1193        cx.notify();
1194        Ok(())
1195    }
1196
1197    pub fn unshare(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1198        self.unshare_internal(cx)?;
1199        self.metadata_changed(cx);
1200        cx.notify();
1201        Ok(())
1202    }
1203
1204    fn unshare_internal(&mut self, cx: &mut AppContext) -> Result<()> {
1205        if self.is_remote() {
1206            return Err(anyhow!("attempted to unshare a remote project"));
1207        }
1208
1209        if let Some(ProjectClientState::Local { remote_id, .. }) = self.client_state.take() {
1210            self.collaborators.clear();
1211            self.shared_buffers.clear();
1212            self.client_subscriptions.clear();
1213
1214            for worktree_handle in self.worktrees.iter_mut() {
1215                if let WorktreeHandle::Strong(worktree) = worktree_handle {
1216                    let is_visible = worktree.update(cx, |worktree, _| {
1217                        worktree.as_local_mut().unwrap().unshare();
1218                        worktree.is_visible()
1219                    });
1220                    if !is_visible {
1221                        *worktree_handle = WorktreeHandle::Weak(worktree.downgrade());
1222                    }
1223                }
1224            }
1225
1226            for open_buffer in self.opened_buffers.values_mut() {
1227                // Wake up any tasks waiting for peers' edits to this buffer.
1228                if let Some(buffer) = open_buffer.upgrade(cx) {
1229                    buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1230                }
1231
1232                if let OpenBuffer::Strong(buffer) = open_buffer {
1233                    *open_buffer = OpenBuffer::Weak(buffer.downgrade());
1234                }
1235            }
1236
1237            self.client.send(proto::UnshareProject {
1238                project_id: remote_id,
1239            })?;
1240
1241            Ok(())
1242        } else {
1243            Err(anyhow!("attempted to unshare an unshared project"))
1244        }
1245    }
1246
1247    pub fn disconnected_from_host(&mut self, cx: &mut ModelContext<Self>) {
1248        self.disconnected_from_host_internal(cx);
1249        cx.emit(Event::DisconnectedFromHost);
1250        cx.notify();
1251    }
1252
1253    fn disconnected_from_host_internal(&mut self, cx: &mut AppContext) {
1254        if let Some(ProjectClientState::Remote {
1255            sharing_has_stopped,
1256            ..
1257        }) = &mut self.client_state
1258        {
1259            *sharing_has_stopped = true;
1260
1261            self.collaborators.clear();
1262
1263            for worktree in &self.worktrees {
1264                if let Some(worktree) = worktree.upgrade(cx) {
1265                    worktree.update(cx, |worktree, _| {
1266                        if let Some(worktree) = worktree.as_remote_mut() {
1267                            worktree.disconnected_from_host();
1268                        }
1269                    });
1270                }
1271            }
1272
1273            for open_buffer in self.opened_buffers.values_mut() {
1274                // Wake up any tasks waiting for peers' edits to this buffer.
1275                if let Some(buffer) = open_buffer.upgrade(cx) {
1276                    buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1277                }
1278
1279                if let OpenBuffer::Strong(buffer) = open_buffer {
1280                    *open_buffer = OpenBuffer::Weak(buffer.downgrade());
1281                }
1282            }
1283
1284            // Wake up all futures currently waiting on a buffer to get opened,
1285            // to give them a chance to fail now that we've disconnected.
1286            *self.opened_buffer.0.borrow_mut() = ();
1287        }
1288    }
1289
1290    pub fn close(&mut self, cx: &mut ModelContext<Self>) {
1291        cx.emit(Event::Closed);
1292    }
1293
1294    pub fn is_read_only(&self) -> bool {
1295        match &self.client_state {
1296            Some(ProjectClientState::Remote {
1297                sharing_has_stopped,
1298                ..
1299            }) => *sharing_has_stopped,
1300            _ => false,
1301        }
1302    }
1303
1304    pub fn is_local(&self) -> bool {
1305        match &self.client_state {
1306            Some(ProjectClientState::Remote { .. }) => false,
1307            _ => true,
1308        }
1309    }
1310
1311    pub fn is_remote(&self) -> bool {
1312        !self.is_local()
1313    }
1314
1315    pub fn create_buffer(
1316        &mut self,
1317        text: &str,
1318        language: Option<Arc<Language>>,
1319        cx: &mut ModelContext<Self>,
1320    ) -> Result<ModelHandle<Buffer>> {
1321        if self.is_remote() {
1322            return Err(anyhow!("creating buffers as a guest is not supported yet"));
1323        }
1324
1325        let buffer = cx.add_model(|cx| {
1326            Buffer::new(self.replica_id(), text, cx)
1327                .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
1328        });
1329        self.register_buffer(&buffer, cx)?;
1330        Ok(buffer)
1331    }
1332
1333    pub fn open_path(
1334        &mut self,
1335        path: impl Into<ProjectPath>,
1336        cx: &mut ModelContext<Self>,
1337    ) -> Task<Result<(ProjectEntryId, AnyModelHandle)>> {
1338        let task = self.open_buffer(path, cx);
1339        cx.spawn_weak(|_, cx| async move {
1340            let buffer = task.await?;
1341            let project_entry_id = buffer
1342                .read_with(&cx, |buffer, cx| {
1343                    File::from_dyn(buffer.file()).and_then(|file| file.project_entry_id(cx))
1344                })
1345                .ok_or_else(|| anyhow!("no project entry"))?;
1346
1347            let buffer: &AnyModelHandle = &buffer;
1348            Ok((project_entry_id, buffer.clone()))
1349        })
1350    }
1351
1352    pub fn open_local_buffer(
1353        &mut self,
1354        abs_path: impl AsRef<Path>,
1355        cx: &mut ModelContext<Self>,
1356    ) -> Task<Result<ModelHandle<Buffer>>> {
1357        if let Some((worktree, relative_path)) = self.find_local_worktree(abs_path.as_ref(), cx) {
1358            self.open_buffer((worktree.read(cx).id(), relative_path), cx)
1359        } else {
1360            Task::ready(Err(anyhow!("no such path")))
1361        }
1362    }
1363
1364    pub fn open_buffer(
1365        &mut self,
1366        path: impl Into<ProjectPath>,
1367        cx: &mut ModelContext<Self>,
1368    ) -> Task<Result<ModelHandle<Buffer>>> {
1369        let project_path = path.into();
1370        let worktree = if let Some(worktree) = self.worktree_for_id(project_path.worktree_id, cx) {
1371            worktree
1372        } else {
1373            return Task::ready(Err(anyhow!("no such worktree")));
1374        };
1375
1376        // If there is already a buffer for the given path, then return it.
1377        let existing_buffer = self.get_open_buffer(&project_path, cx);
1378        if let Some(existing_buffer) = existing_buffer {
1379            return Task::ready(Ok(existing_buffer));
1380        }
1381
1382        let loading_watch = match self.loading_buffers_by_path.entry(project_path.clone()) {
1383            // If the given path is already being loaded, then wait for that existing
1384            // task to complete and return the same buffer.
1385            hash_map::Entry::Occupied(e) => e.get().clone(),
1386
1387            // Otherwise, record the fact that this path is now being loaded.
1388            hash_map::Entry::Vacant(entry) => {
1389                let (mut tx, rx) = postage::watch::channel();
1390                entry.insert(rx.clone());
1391
1392                let load_buffer = if worktree.read(cx).is_local() {
1393                    self.open_local_buffer_internal(&project_path.path, &worktree, cx)
1394                } else {
1395                    self.open_remote_buffer_internal(&project_path.path, &worktree, cx)
1396                };
1397
1398                cx.spawn(move |this, mut cx| async move {
1399                    let load_result = load_buffer.await;
1400                    *tx.borrow_mut() = Some(this.update(&mut cx, |this, _| {
1401                        // Record the fact that the buffer is no longer loading.
1402                        this.loading_buffers_by_path.remove(&project_path);
1403                        let buffer = load_result.map_err(Arc::new)?;
1404                        Ok(buffer)
1405                    }));
1406                })
1407                .detach();
1408                rx
1409            }
1410        };
1411
1412        cx.foreground().spawn(async move {
1413            pump_loading_buffer_reciever(loading_watch)
1414                .await
1415                .map_err(|error| anyhow!("{}", error))
1416        })
1417    }
1418
1419    fn open_local_buffer_internal(
1420        &mut self,
1421        path: &Arc<Path>,
1422        worktree: &ModelHandle<Worktree>,
1423        cx: &mut ModelContext<Self>,
1424    ) -> Task<Result<ModelHandle<Buffer>>> {
1425        let buffer_id = post_inc(&mut self.next_buffer_id);
1426        let load_buffer = worktree.update(cx, |worktree, cx| {
1427            let worktree = worktree.as_local_mut().unwrap();
1428            worktree.load_buffer(buffer_id, path, cx)
1429        });
1430        cx.spawn(|this, mut cx| async move {
1431            let buffer = load_buffer.await?;
1432            this.update(&mut cx, |this, cx| this.register_buffer(&buffer, cx))?;
1433            Ok(buffer)
1434        })
1435    }
1436
1437    fn open_remote_buffer_internal(
1438        &mut self,
1439        path: &Arc<Path>,
1440        worktree: &ModelHandle<Worktree>,
1441        cx: &mut ModelContext<Self>,
1442    ) -> Task<Result<ModelHandle<Buffer>>> {
1443        let rpc = self.client.clone();
1444        let project_id = self.remote_id().unwrap();
1445        let remote_worktree_id = worktree.read(cx).id();
1446        let path = path.clone();
1447        let path_string = path.to_string_lossy().to_string();
1448        cx.spawn(|this, mut cx| async move {
1449            let response = rpc
1450                .request(proto::OpenBufferByPath {
1451                    project_id,
1452                    worktree_id: remote_worktree_id.to_proto(),
1453                    path: path_string,
1454                })
1455                .await?;
1456            this.update(&mut cx, |this, cx| {
1457                this.wait_for_remote_buffer(response.buffer_id, cx)
1458            })
1459            .await
1460        })
1461    }
1462
1463    /// LanguageServerName is owned, because it is inserted into a map
1464    fn open_local_buffer_via_lsp(
1465        &mut self,
1466        abs_path: lsp::Url,
1467        language_server_id: LanguageServerId,
1468        language_server_name: LanguageServerName,
1469        cx: &mut ModelContext<Self>,
1470    ) -> Task<Result<ModelHandle<Buffer>>> {
1471        cx.spawn(|this, mut cx| async move {
1472            let abs_path = abs_path
1473                .to_file_path()
1474                .map_err(|_| anyhow!("can't convert URI to path"))?;
1475            let (worktree, relative_path) = if let Some(result) =
1476                this.read_with(&cx, |this, cx| this.find_local_worktree(&abs_path, cx))
1477            {
1478                result
1479            } else {
1480                let worktree = this
1481                    .update(&mut cx, |this, cx| {
1482                        this.create_local_worktree(&abs_path, false, cx)
1483                    })
1484                    .await?;
1485                this.update(&mut cx, |this, cx| {
1486                    this.language_server_ids.insert(
1487                        (worktree.read(cx).id(), language_server_name),
1488                        language_server_id,
1489                    );
1490                });
1491                (worktree, PathBuf::new())
1492            };
1493
1494            let project_path = ProjectPath {
1495                worktree_id: worktree.read_with(&cx, |worktree, _| worktree.id()),
1496                path: relative_path.into(),
1497            };
1498            this.update(&mut cx, |this, cx| this.open_buffer(project_path, cx))
1499                .await
1500        })
1501    }
1502
1503    pub fn open_buffer_by_id(
1504        &mut self,
1505        id: u64,
1506        cx: &mut ModelContext<Self>,
1507    ) -> Task<Result<ModelHandle<Buffer>>> {
1508        if let Some(buffer) = self.buffer_for_id(id, cx) {
1509            Task::ready(Ok(buffer))
1510        } else if self.is_local() {
1511            Task::ready(Err(anyhow!("buffer {} does not exist", id)))
1512        } else if let Some(project_id) = self.remote_id() {
1513            let request = self
1514                .client
1515                .request(proto::OpenBufferById { project_id, id });
1516            cx.spawn(|this, mut cx| async move {
1517                let buffer_id = request.await?.buffer_id;
1518                this.update(&mut cx, |this, cx| {
1519                    this.wait_for_remote_buffer(buffer_id, cx)
1520                })
1521                .await
1522            })
1523        } else {
1524            Task::ready(Err(anyhow!("cannot open buffer while disconnected")))
1525        }
1526    }
1527
1528    pub fn save_buffers(
1529        &self,
1530        buffers: HashSet<ModelHandle<Buffer>>,
1531        cx: &mut ModelContext<Self>,
1532    ) -> Task<Result<()>> {
1533        cx.spawn(|this, mut cx| async move {
1534            let save_tasks = buffers
1535                .into_iter()
1536                .map(|buffer| this.update(&mut cx, |this, cx| this.save_buffer(buffer, cx)));
1537            try_join_all(save_tasks).await?;
1538            Ok(())
1539        })
1540    }
1541
1542    pub fn save_buffer(
1543        &self,
1544        buffer: ModelHandle<Buffer>,
1545        cx: &mut ModelContext<Self>,
1546    ) -> Task<Result<(clock::Global, RopeFingerprint, SystemTime)>> {
1547        let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
1548            return Task::ready(Err(anyhow!("buffer doesn't have a file")));
1549        };
1550        let worktree = file.worktree.clone();
1551        let path = file.path.clone();
1552        worktree.update(cx, |worktree, cx| match worktree {
1553            Worktree::Local(worktree) => worktree.save_buffer(buffer, path, false, cx),
1554            Worktree::Remote(worktree) => worktree.save_buffer(buffer, cx),
1555        })
1556    }
1557
1558    pub fn save_buffer_as(
1559        &mut self,
1560        buffer: ModelHandle<Buffer>,
1561        abs_path: PathBuf,
1562        cx: &mut ModelContext<Self>,
1563    ) -> Task<Result<()>> {
1564        let worktree_task = self.find_or_create_local_worktree(&abs_path, true, cx);
1565        let old_file = File::from_dyn(buffer.read(cx).file())
1566            .filter(|f| f.is_local())
1567            .cloned();
1568        cx.spawn(|this, mut cx| async move {
1569            if let Some(old_file) = &old_file {
1570                this.update(&mut cx, |this, cx| {
1571                    this.unregister_buffer_from_language_servers(&buffer, old_file, cx);
1572                });
1573            }
1574            let (worktree, path) = worktree_task.await?;
1575            worktree
1576                .update(&mut cx, |worktree, cx| match worktree {
1577                    Worktree::Local(worktree) => {
1578                        worktree.save_buffer(buffer.clone(), path.into(), true, cx)
1579                    }
1580                    Worktree::Remote(_) => panic!("cannot remote buffers as new files"),
1581                })
1582                .await?;
1583            this.update(&mut cx, |this, cx| {
1584                this.detect_language_for_buffer(&buffer, cx);
1585                this.register_buffer_with_language_servers(&buffer, cx);
1586            });
1587            Ok(())
1588        })
1589    }
1590
1591    pub fn get_open_buffer(
1592        &mut self,
1593        path: &ProjectPath,
1594        cx: &mut ModelContext<Self>,
1595    ) -> Option<ModelHandle<Buffer>> {
1596        let worktree = self.worktree_for_id(path.worktree_id, cx)?;
1597        self.opened_buffers.values().find_map(|buffer| {
1598            let buffer = buffer.upgrade(cx)?;
1599            let file = File::from_dyn(buffer.read(cx).file())?;
1600            if file.worktree == worktree && file.path() == &path.path {
1601                Some(buffer)
1602            } else {
1603                None
1604            }
1605        })
1606    }
1607
1608    fn register_buffer(
1609        &mut self,
1610        buffer: &ModelHandle<Buffer>,
1611        cx: &mut ModelContext<Self>,
1612    ) -> Result<()> {
1613        self.request_buffer_diff_recalculation(buffer, cx);
1614        buffer.update(cx, |buffer, _| {
1615            buffer.set_language_registry(self.languages.clone())
1616        });
1617
1618        let remote_id = buffer.read(cx).remote_id();
1619        let is_remote = self.is_remote();
1620        let open_buffer = if is_remote || self.is_shared() {
1621            OpenBuffer::Strong(buffer.clone())
1622        } else {
1623            OpenBuffer::Weak(buffer.downgrade())
1624        };
1625
1626        match self.opened_buffers.entry(remote_id) {
1627            hash_map::Entry::Vacant(entry) => {
1628                entry.insert(open_buffer);
1629            }
1630            hash_map::Entry::Occupied(mut entry) => {
1631                if let OpenBuffer::Operations(operations) = entry.get_mut() {
1632                    buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx))?;
1633                } else if entry.get().upgrade(cx).is_some() {
1634                    if is_remote {
1635                        return Ok(());
1636                    } else {
1637                        debug_panic!("buffer {} was already registered", remote_id);
1638                        Err(anyhow!("buffer {} was already registered", remote_id))?;
1639                    }
1640                }
1641                entry.insert(open_buffer);
1642            }
1643        }
1644        cx.subscribe(buffer, |this, buffer, event, cx| {
1645            this.on_buffer_event(buffer, event, cx);
1646        })
1647        .detach();
1648
1649        if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
1650            if file.is_local {
1651                self.local_buffer_ids_by_path.insert(
1652                    ProjectPath {
1653                        worktree_id: file.worktree_id(cx),
1654                        path: file.path.clone(),
1655                    },
1656                    remote_id,
1657                );
1658
1659                self.local_buffer_ids_by_entry_id
1660                    .insert(file.entry_id, remote_id);
1661            }
1662        }
1663
1664        self.detect_language_for_buffer(buffer, cx);
1665        self.register_buffer_with_language_servers(buffer, cx);
1666        self.register_buffer_with_copilot(buffer, cx);
1667        cx.observe_release(buffer, |this, buffer, cx| {
1668            if let Some(file) = File::from_dyn(buffer.file()) {
1669                if file.is_local() {
1670                    let uri = lsp::Url::from_file_path(file.abs_path(cx)).unwrap();
1671                    for server in this.language_servers_for_buffer(buffer, cx) {
1672                        server
1673                            .1
1674                            .notify::<lsp::notification::DidCloseTextDocument>(
1675                                lsp::DidCloseTextDocumentParams {
1676                                    text_document: lsp::TextDocumentIdentifier::new(uri.clone()),
1677                                },
1678                            )
1679                            .log_err();
1680                    }
1681                }
1682            }
1683        })
1684        .detach();
1685
1686        *self.opened_buffer.0.borrow_mut() = ();
1687        Ok(())
1688    }
1689
1690    fn register_buffer_with_language_servers(
1691        &mut self,
1692        buffer_handle: &ModelHandle<Buffer>,
1693        cx: &mut ModelContext<Self>,
1694    ) {
1695        let buffer = buffer_handle.read(cx);
1696        let buffer_id = buffer.remote_id();
1697
1698        if let Some(file) = File::from_dyn(buffer.file()) {
1699            if !file.is_local() {
1700                return;
1701            }
1702
1703            let uri = lsp::Url::from_file_path(file.abs_path(cx)).unwrap();
1704            let initial_snapshot = buffer.text_snapshot();
1705            let language = buffer.language().cloned();
1706            let worktree_id = file.worktree_id(cx);
1707
1708            if let Some(local_worktree) = file.worktree.read(cx).as_local() {
1709                for (server_id, diagnostics) in local_worktree.diagnostics_for_path(file.path()) {
1710                    self.update_buffer_diagnostics(buffer_handle, server_id, None, diagnostics, cx)
1711                        .log_err();
1712                }
1713            }
1714
1715            if let Some(language) = language {
1716                for adapter in language.lsp_adapters() {
1717                    let language_id = adapter.language_ids.get(language.name().as_ref()).cloned();
1718                    let server = self
1719                        .language_server_ids
1720                        .get(&(worktree_id, adapter.name.clone()))
1721                        .and_then(|id| self.language_servers.get(id))
1722                        .and_then(|server_state| {
1723                            if let LanguageServerState::Running { server, .. } = server_state {
1724                                Some(server.clone())
1725                            } else {
1726                                None
1727                            }
1728                        });
1729                    let server = match server {
1730                        Some(server) => server,
1731                        None => continue,
1732                    };
1733
1734                    server
1735                        .notify::<lsp::notification::DidOpenTextDocument>(
1736                            lsp::DidOpenTextDocumentParams {
1737                                text_document: lsp::TextDocumentItem::new(
1738                                    uri.clone(),
1739                                    language_id.unwrap_or_default(),
1740                                    0,
1741                                    initial_snapshot.text(),
1742                                ),
1743                            },
1744                        )
1745                        .log_err();
1746
1747                    buffer_handle.update(cx, |buffer, cx| {
1748                        buffer.set_completion_triggers(
1749                            server
1750                                .capabilities()
1751                                .completion_provider
1752                                .as_ref()
1753                                .and_then(|provider| provider.trigger_characters.clone())
1754                                .unwrap_or_default(),
1755                            cx,
1756                        );
1757                    });
1758
1759                    let snapshot = LspBufferSnapshot {
1760                        version: 0,
1761                        snapshot: initial_snapshot.clone(),
1762                    };
1763                    self.buffer_snapshots
1764                        .entry(buffer_id)
1765                        .or_default()
1766                        .insert(server.server_id(), vec![snapshot]);
1767                }
1768            }
1769        }
1770    }
1771
1772    fn unregister_buffer_from_language_servers(
1773        &mut self,
1774        buffer: &ModelHandle<Buffer>,
1775        old_file: &File,
1776        cx: &mut ModelContext<Self>,
1777    ) {
1778        let old_path = match old_file.as_local() {
1779            Some(local) => local.abs_path(cx),
1780            None => return,
1781        };
1782
1783        buffer.update(cx, |buffer, cx| {
1784            let worktree_id = old_file.worktree_id(cx);
1785            let ids = &self.language_server_ids;
1786
1787            let language = buffer.language().cloned();
1788            let adapters = language.iter().flat_map(|language| language.lsp_adapters());
1789            for &server_id in adapters.flat_map(|a| ids.get(&(worktree_id, a.name.clone()))) {
1790                buffer.update_diagnostics(server_id, Default::default(), cx);
1791            }
1792
1793            self.buffer_snapshots.remove(&buffer.remote_id());
1794            let file_url = lsp::Url::from_file_path(old_path).unwrap();
1795            for (_, language_server) in self.language_servers_for_buffer(buffer, cx) {
1796                language_server
1797                    .notify::<lsp::notification::DidCloseTextDocument>(
1798                        lsp::DidCloseTextDocumentParams {
1799                            text_document: lsp::TextDocumentIdentifier::new(file_url.clone()),
1800                        },
1801                    )
1802                    .log_err();
1803            }
1804        });
1805    }
1806
1807    fn register_buffer_with_copilot(
1808        &self,
1809        buffer_handle: &ModelHandle<Buffer>,
1810        cx: &mut ModelContext<Self>,
1811    ) {
1812        if let Some(copilot) = Copilot::global(cx) {
1813            copilot.update(cx, |copilot, cx| copilot.register_buffer(buffer_handle, cx));
1814        }
1815    }
1816
1817    async fn send_buffer_ordered_messages(
1818        this: WeakModelHandle<Self>,
1819        rx: UnboundedReceiver<BufferOrderedMessage>,
1820        mut cx: AsyncAppContext,
1821    ) -> Option<()> {
1822        const MAX_BATCH_SIZE: usize = 128;
1823
1824        let mut operations_by_buffer_id = HashMap::default();
1825        async fn flush_operations(
1826            this: &ModelHandle<Project>,
1827            operations_by_buffer_id: &mut HashMap<u64, Vec<proto::Operation>>,
1828            needs_resync_with_host: &mut bool,
1829            is_local: bool,
1830            cx: &AsyncAppContext,
1831        ) {
1832            for (buffer_id, operations) in operations_by_buffer_id.drain() {
1833                let request = this.read_with(cx, |this, _| {
1834                    let project_id = this.remote_id()?;
1835                    Some(this.client.request(proto::UpdateBuffer {
1836                        buffer_id,
1837                        project_id,
1838                        operations,
1839                    }))
1840                });
1841                if let Some(request) = request {
1842                    if request.await.is_err() && !is_local {
1843                        *needs_resync_with_host = true;
1844                        break;
1845                    }
1846                }
1847            }
1848        }
1849
1850        let mut needs_resync_with_host = false;
1851        let mut changes = rx.ready_chunks(MAX_BATCH_SIZE);
1852
1853        while let Some(changes) = changes.next().await {
1854            let this = this.upgrade(&mut cx)?;
1855            let is_local = this.read_with(&cx, |this, _| this.is_local());
1856
1857            for change in changes {
1858                match change {
1859                    BufferOrderedMessage::Operation {
1860                        buffer_id,
1861                        operation,
1862                    } => {
1863                        if needs_resync_with_host {
1864                            continue;
1865                        }
1866
1867                        operations_by_buffer_id
1868                            .entry(buffer_id)
1869                            .or_insert(Vec::new())
1870                            .push(operation);
1871                    }
1872
1873                    BufferOrderedMessage::Resync => {
1874                        operations_by_buffer_id.clear();
1875                        if this
1876                            .update(&mut cx, |this, cx| this.synchronize_remote_buffers(cx))
1877                            .await
1878                            .is_ok()
1879                        {
1880                            needs_resync_with_host = false;
1881                        }
1882                    }
1883
1884                    BufferOrderedMessage::LanguageServerUpdate {
1885                        language_server_id,
1886                        message,
1887                    } => {
1888                        flush_operations(
1889                            &this,
1890                            &mut operations_by_buffer_id,
1891                            &mut needs_resync_with_host,
1892                            is_local,
1893                            &cx,
1894                        )
1895                        .await;
1896
1897                        this.read_with(&cx, |this, _| {
1898                            if let Some(project_id) = this.remote_id() {
1899                                this.client
1900                                    .send(proto::UpdateLanguageServer {
1901                                        project_id,
1902                                        language_server_id: language_server_id.0 as u64,
1903                                        variant: Some(message),
1904                                    })
1905                                    .log_err();
1906                            }
1907                        });
1908                    }
1909                }
1910            }
1911
1912            flush_operations(
1913                &this,
1914                &mut operations_by_buffer_id,
1915                &mut needs_resync_with_host,
1916                is_local,
1917                &cx,
1918            )
1919            .await;
1920        }
1921
1922        None
1923    }
1924
1925    fn on_buffer_event(
1926        &mut self,
1927        buffer: ModelHandle<Buffer>,
1928        event: &BufferEvent,
1929        cx: &mut ModelContext<Self>,
1930    ) -> Option<()> {
1931        if matches!(event, BufferEvent::Edited { .. } | BufferEvent::Reloaded) {
1932            self.request_buffer_diff_recalculation(&buffer, cx);
1933        }
1934
1935        match event {
1936            BufferEvent::Operation(operation) => {
1937                self.buffer_ordered_messages_tx
1938                    .unbounded_send(BufferOrderedMessage::Operation {
1939                        buffer_id: buffer.read(cx).remote_id(),
1940                        operation: language::proto::serialize_operation(operation),
1941                    })
1942                    .ok();
1943            }
1944
1945            BufferEvent::Edited { .. } => {
1946                let buffer = buffer.read(cx);
1947                let file = File::from_dyn(buffer.file())?;
1948                let abs_path = file.as_local()?.abs_path(cx);
1949                let uri = lsp::Url::from_file_path(abs_path).unwrap();
1950                let next_snapshot = buffer.text_snapshot();
1951
1952                let language_servers: Vec<_> = self
1953                    .language_servers_for_buffer(buffer, cx)
1954                    .map(|i| i.1.clone())
1955                    .collect();
1956
1957                for language_server in language_servers {
1958                    let language_server = language_server.clone();
1959
1960                    let buffer_snapshots = self
1961                        .buffer_snapshots
1962                        .get_mut(&buffer.remote_id())
1963                        .and_then(|m| m.get_mut(&language_server.server_id()))?;
1964                    let previous_snapshot = buffer_snapshots.last()?;
1965                    let next_version = previous_snapshot.version + 1;
1966
1967                    let content_changes = buffer
1968                        .edits_since::<(PointUtf16, usize)>(previous_snapshot.snapshot.version())
1969                        .map(|edit| {
1970                            let edit_start = edit.new.start.0;
1971                            let edit_end = edit_start + (edit.old.end.0 - edit.old.start.0);
1972                            let new_text = next_snapshot
1973                                .text_for_range(edit.new.start.1..edit.new.end.1)
1974                                .collect();
1975                            lsp::TextDocumentContentChangeEvent {
1976                                range: Some(lsp::Range::new(
1977                                    point_to_lsp(edit_start),
1978                                    point_to_lsp(edit_end),
1979                                )),
1980                                range_length: None,
1981                                text: new_text,
1982                            }
1983                        })
1984                        .collect();
1985
1986                    buffer_snapshots.push(LspBufferSnapshot {
1987                        version: next_version,
1988                        snapshot: next_snapshot.clone(),
1989                    });
1990
1991                    language_server
1992                        .notify::<lsp::notification::DidChangeTextDocument>(
1993                            lsp::DidChangeTextDocumentParams {
1994                                text_document: lsp::VersionedTextDocumentIdentifier::new(
1995                                    uri.clone(),
1996                                    next_version,
1997                                ),
1998                                content_changes,
1999                            },
2000                        )
2001                        .log_err();
2002                }
2003            }
2004
2005            BufferEvent::Saved => {
2006                let file = File::from_dyn(buffer.read(cx).file())?;
2007                let worktree_id = file.worktree_id(cx);
2008                let abs_path = file.as_local()?.abs_path(cx);
2009                let text_document = lsp::TextDocumentIdentifier {
2010                    uri: lsp::Url::from_file_path(abs_path).unwrap(),
2011                };
2012
2013                for (_, _, server) in self.language_servers_for_worktree(worktree_id) {
2014                    server
2015                        .notify::<lsp::notification::DidSaveTextDocument>(
2016                            lsp::DidSaveTextDocumentParams {
2017                                text_document: text_document.clone(),
2018                                text: None,
2019                            },
2020                        )
2021                        .log_err();
2022                }
2023
2024                let language_server_ids = self.language_server_ids_for_buffer(buffer.read(cx), cx);
2025                for language_server_id in language_server_ids {
2026                    if let Some(LanguageServerState::Running {
2027                        adapter,
2028                        simulate_disk_based_diagnostics_completion,
2029                        ..
2030                    }) = self.language_servers.get_mut(&language_server_id)
2031                    {
2032                        // After saving a buffer using a language server that doesn't provide
2033                        // a disk-based progress token, kick off a timer that will reset every
2034                        // time the buffer is saved. If the timer eventually fires, simulate
2035                        // disk-based diagnostics being finished so that other pieces of UI
2036                        // (e.g., project diagnostics view, diagnostic status bar) can update.
2037                        // We don't emit an event right away because the language server might take
2038                        // some time to publish diagnostics.
2039                        if adapter.disk_based_diagnostics_progress_token.is_none() {
2040                            const DISK_BASED_DIAGNOSTICS_DEBOUNCE: Duration =
2041                                Duration::from_secs(1);
2042
2043                            let task = cx.spawn_weak(|this, mut cx| async move {
2044                                cx.background().timer(DISK_BASED_DIAGNOSTICS_DEBOUNCE).await;
2045                                if let Some(this) = this.upgrade(&cx) {
2046                                    this.update(&mut cx, |this, cx| {
2047                                        this.disk_based_diagnostics_finished(
2048                                            language_server_id,
2049                                            cx,
2050                                        );
2051                                        this.buffer_ordered_messages_tx
2052                                            .unbounded_send(
2053                                                BufferOrderedMessage::LanguageServerUpdate {
2054                                                    language_server_id,
2055                                                    message:proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(Default::default())
2056                                                },
2057                                            )
2058                                            .ok();
2059                                    });
2060                                }
2061                            });
2062                            *simulate_disk_based_diagnostics_completion = Some(task);
2063                        }
2064                    }
2065                }
2066            }
2067
2068            _ => {}
2069        }
2070
2071        None
2072    }
2073
2074    fn request_buffer_diff_recalculation(
2075        &mut self,
2076        buffer: &ModelHandle<Buffer>,
2077        cx: &mut ModelContext<Self>,
2078    ) {
2079        self.buffers_needing_diff.insert(buffer.downgrade());
2080        if self.buffers_needing_diff.len() == 1 {
2081            let this = cx.weak_handle();
2082            cx.defer(move |cx| {
2083                if let Some(this) = this.upgrade(cx) {
2084                    this.update(cx, |this, cx| {
2085                        this.recalculate_buffer_diffs(cx);
2086                    });
2087                }
2088            });
2089        }
2090    }
2091
2092    fn recalculate_buffer_diffs(&mut self, cx: &mut ModelContext<Self>) {
2093        cx.spawn(|this, mut cx| async move {
2094            let buffers: Vec<_> = this.update(&mut cx, |this, _| {
2095                this.buffers_needing_diff.drain().collect()
2096            });
2097
2098            let tasks: Vec<_> = this.update(&mut cx, |_, cx| {
2099                buffers
2100                    .iter()
2101                    .filter_map(|buffer| {
2102                        let buffer = buffer.upgrade(cx)?;
2103                        buffer.update(cx, |buffer, cx| buffer.git_diff_recalc_2(cx))
2104                    })
2105                    .collect()
2106            });
2107
2108            futures::future::join_all(tasks).await;
2109
2110            this.update(&mut cx, |this, cx| {
2111                if !this.buffers_needing_diff.is_empty() {
2112                    this.recalculate_buffer_diffs(cx);
2113                } else {
2114                    // TODO: Would a `ModelContext<Project>.notify()` suffice here?
2115                    for buffer in buffers {
2116                        if let Some(buffer) = buffer.upgrade(cx) {
2117                            buffer.update(cx, |_, cx| cx.notify());
2118                        }
2119                    }
2120                }
2121            });
2122        })
2123        .detach();
2124    }
2125
2126    fn language_servers_for_worktree(
2127        &self,
2128        worktree_id: WorktreeId,
2129    ) -> impl Iterator<Item = (&Arc<CachedLspAdapter>, &Arc<Language>, &Arc<LanguageServer>)> {
2130        self.language_server_ids
2131            .iter()
2132            .filter_map(move |((language_server_worktree_id, _), id)| {
2133                if *language_server_worktree_id == worktree_id {
2134                    if let Some(LanguageServerState::Running {
2135                        adapter,
2136                        language,
2137                        server,
2138                        ..
2139                    }) = self.language_servers.get(id)
2140                    {
2141                        return Some((adapter, language, server));
2142                    }
2143                }
2144                None
2145            })
2146    }
2147
2148    fn maintain_buffer_languages(
2149        languages: &LanguageRegistry,
2150        cx: &mut ModelContext<Project>,
2151    ) -> Task<()> {
2152        let mut subscription = languages.subscribe();
2153        cx.spawn_weak(|project, mut cx| async move {
2154            while let Some(()) = subscription.next().await {
2155                if let Some(project) = project.upgrade(&cx) {
2156                    project.update(&mut cx, |project, cx| {
2157                        let mut plain_text_buffers = Vec::new();
2158                        let mut buffers_with_unknown_injections = Vec::new();
2159                        for buffer in project.opened_buffers.values() {
2160                            if let Some(handle) = buffer.upgrade(cx) {
2161                                let buffer = &handle.read(cx);
2162                                if buffer.language().is_none()
2163                                    || buffer.language() == Some(&*language::PLAIN_TEXT)
2164                                {
2165                                    plain_text_buffers.push(handle);
2166                                } else if buffer.contains_unknown_injections() {
2167                                    buffers_with_unknown_injections.push(handle);
2168                                }
2169                            }
2170                        }
2171
2172                        for buffer in plain_text_buffers {
2173                            project.detect_language_for_buffer(&buffer, cx);
2174                            project.register_buffer_with_language_servers(&buffer, cx);
2175                        }
2176
2177                        for buffer in buffers_with_unknown_injections {
2178                            buffer.update(cx, |buffer, cx| buffer.reparse(cx));
2179                        }
2180                    });
2181                }
2182            }
2183        })
2184    }
2185
2186    fn maintain_workspace_config(
2187        languages: Arc<LanguageRegistry>,
2188        cx: &mut ModelContext<Project>,
2189    ) -> Task<()> {
2190        let (mut settings_changed_tx, mut settings_changed_rx) = watch::channel();
2191        let _ = postage::stream::Stream::try_recv(&mut settings_changed_rx);
2192
2193        let settings_observation = cx.observe_global::<SettingsStore, _>(move |_, _| {
2194            *settings_changed_tx.borrow_mut() = ();
2195        });
2196        cx.spawn_weak(|this, mut cx| async move {
2197            while let Some(_) = settings_changed_rx.next().await {
2198                let workspace_config = cx.update(|cx| languages.workspace_configuration(cx)).await;
2199                if let Some(this) = this.upgrade(&cx) {
2200                    this.read_with(&cx, |this, _| {
2201                        for server_state in this.language_servers.values() {
2202                            if let LanguageServerState::Running { server, .. } = server_state {
2203                                server
2204                                    .notify::<lsp::notification::DidChangeConfiguration>(
2205                                        lsp::DidChangeConfigurationParams {
2206                                            settings: workspace_config.clone(),
2207                                        },
2208                                    )
2209                                    .ok();
2210                            }
2211                        }
2212                    })
2213                } else {
2214                    break;
2215                }
2216            }
2217
2218            drop(settings_observation);
2219        })
2220    }
2221
2222    fn detect_language_for_buffer(
2223        &mut self,
2224        buffer_handle: &ModelHandle<Buffer>,
2225        cx: &mut ModelContext<Self>,
2226    ) -> Option<()> {
2227        // If the buffer has a language, set it and start the language server if we haven't already.
2228        let buffer = buffer_handle.read(cx);
2229        let full_path = buffer.file()?.full_path(cx);
2230        let content = buffer.as_rope();
2231        let new_language = self
2232            .languages
2233            .language_for_file(&full_path, Some(content))
2234            .now_or_never()?
2235            .ok()?;
2236        self.set_language_for_buffer(buffer_handle, new_language, cx);
2237        None
2238    }
2239
2240    pub fn set_language_for_buffer(
2241        &mut self,
2242        buffer: &ModelHandle<Buffer>,
2243        new_language: Arc<Language>,
2244        cx: &mut ModelContext<Self>,
2245    ) {
2246        buffer.update(cx, |buffer, cx| {
2247            if buffer.language().map_or(true, |old_language| {
2248                !Arc::ptr_eq(old_language, &new_language)
2249            }) {
2250                buffer.set_language(Some(new_language.clone()), cx);
2251            }
2252        });
2253
2254        if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
2255            if let Some(worktree) = file.worktree.read(cx).as_local() {
2256                let worktree_id = worktree.id();
2257                let worktree_abs_path = worktree.abs_path().clone();
2258                self.start_language_servers(worktree_id, worktree_abs_path, new_language, cx);
2259            }
2260        }
2261    }
2262
2263    fn start_language_servers(
2264        &mut self,
2265        worktree_id: WorktreeId,
2266        worktree_path: Arc<Path>,
2267        language: Arc<Language>,
2268        cx: &mut ModelContext<Self>,
2269    ) {
2270        if !language_settings(Some(&language.name()), cx).enable_language_server {
2271            return;
2272        }
2273
2274        for adapter in language.lsp_adapters() {
2275            let key = (worktree_id, adapter.name.clone());
2276            if self.language_server_ids.contains_key(&key) {
2277                continue;
2278            }
2279
2280            let pending_server = match self.languages.start_language_server(
2281                language.clone(),
2282                adapter.clone(),
2283                worktree_path.clone(),
2284                self.client.http_client(),
2285                cx,
2286            ) {
2287                Some(pending_server) => pending_server,
2288                None => continue,
2289            };
2290
2291            let lsp = settings::get::<ProjectSettings>(cx)
2292                .lsp
2293                .get(&adapter.name.0);
2294            let override_options = lsp.map(|s| s.initialization_options.clone()).flatten();
2295
2296            let mut initialization_options = adapter.initialization_options.clone();
2297            match (&mut initialization_options, override_options) {
2298                (Some(initialization_options), Some(override_options)) => {
2299                    merge_json_value_into(override_options, initialization_options);
2300                }
2301                (None, override_options) => initialization_options = override_options,
2302                _ => {}
2303            }
2304
2305            let server_id = pending_server.server_id;
2306            let state = self.setup_pending_language_server(
2307                initialization_options,
2308                pending_server,
2309                adapter.clone(),
2310                language.clone(),
2311                key.clone(),
2312                cx,
2313            );
2314            self.language_servers.insert(server_id, state);
2315            self.language_server_ids.insert(key.clone(), server_id);
2316        }
2317    }
2318
2319    fn setup_pending_language_server(
2320        &mut self,
2321        initialization_options: Option<serde_json::Value>,
2322        pending_server: PendingLanguageServer,
2323        adapter: Arc<CachedLspAdapter>,
2324        language: Arc<Language>,
2325        key: (WorktreeId, LanguageServerName),
2326        cx: &mut ModelContext<Project>,
2327    ) -> LanguageServerState {
2328        let server_id = pending_server.server_id;
2329        let languages = self.languages.clone();
2330
2331        LanguageServerState::Starting(cx.spawn_weak(|this, mut cx| async move {
2332            let workspace_config = cx.update(|cx| languages.workspace_configuration(cx)).await;
2333            let language_server = pending_server.task.await.log_err()?;
2334            let language_server = language_server
2335                .initialize(initialization_options)
2336                .await
2337                .log_err()?;
2338            let this = this.upgrade(&cx)?;
2339
2340            language_server
2341                .on_notification::<lsp::notification::PublishDiagnostics, _>({
2342                    let this = this.downgrade();
2343                    let adapter = adapter.clone();
2344                    move |mut params, cx| {
2345                        let this = this;
2346                        let adapter = adapter.clone();
2347                        cx.spawn(|mut cx| async move {
2348                            adapter.process_diagnostics(&mut params).await;
2349                            if let Some(this) = this.upgrade(&cx) {
2350                                this.update(&mut cx, |this, cx| {
2351                                    this.update_diagnostics(
2352                                        server_id,
2353                                        params,
2354                                        &adapter.disk_based_diagnostic_sources,
2355                                        cx,
2356                                    )
2357                                    .log_err();
2358                                });
2359                            }
2360                        })
2361                        .detach();
2362                    }
2363                })
2364                .detach();
2365
2366            language_server
2367                .on_request::<lsp::request::WorkspaceConfiguration, _, _>({
2368                    let languages = languages.clone();
2369                    move |params, mut cx| {
2370                        let languages = languages.clone();
2371                        async move {
2372                            let workspace_config =
2373                                cx.update(|cx| languages.workspace_configuration(cx)).await;
2374                            Ok(params
2375                                .items
2376                                .into_iter()
2377                                .map(|item| {
2378                                    if let Some(section) = &item.section {
2379                                        workspace_config
2380                                            .get(section)
2381                                            .cloned()
2382                                            .unwrap_or(serde_json::Value::Null)
2383                                    } else {
2384                                        workspace_config.clone()
2385                                    }
2386                                })
2387                                .collect())
2388                        }
2389                    }
2390                })
2391                .detach();
2392
2393            // Even though we don't have handling for these requests, respond to them to
2394            // avoid stalling any language server like `gopls` which waits for a response
2395            // to these requests when initializing.
2396            language_server
2397                .on_request::<lsp::request::WorkDoneProgressCreate, _, _>({
2398                    let this = this.downgrade();
2399                    move |params, mut cx| async move {
2400                        if let Some(this) = this.upgrade(&cx) {
2401                            this.update(&mut cx, |this, _| {
2402                                if let Some(status) =
2403                                    this.language_server_statuses.get_mut(&server_id)
2404                                {
2405                                    if let lsp::NumberOrString::String(token) = params.token {
2406                                        status.progress_tokens.insert(token);
2407                                    }
2408                                }
2409                            });
2410                        }
2411                        Ok(())
2412                    }
2413                })
2414                .detach();
2415            language_server
2416                .on_request::<lsp::request::RegisterCapability, _, _>({
2417                    let this = this.downgrade();
2418                    move |params, mut cx| async move {
2419                        let this = this
2420                            .upgrade(&cx)
2421                            .ok_or_else(|| anyhow!("project dropped"))?;
2422                        for reg in params.registrations {
2423                            if reg.method == "workspace/didChangeWatchedFiles" {
2424                                if let Some(options) = reg.register_options {
2425                                    let options = serde_json::from_value(options)?;
2426                                    this.update(&mut cx, |this, cx| {
2427                                        this.on_lsp_did_change_watched_files(
2428                                            server_id, options, cx,
2429                                        );
2430                                    });
2431                                }
2432                            }
2433                        }
2434                        Ok(())
2435                    }
2436                })
2437                .detach();
2438
2439            language_server
2440                .on_request::<lsp::request::ApplyWorkspaceEdit, _, _>({
2441                    let this = this.downgrade();
2442                    let adapter = adapter.clone();
2443                    let language_server = language_server.clone();
2444                    move |params, cx| {
2445                        Self::on_lsp_workspace_edit(
2446                            this,
2447                            params,
2448                            server_id,
2449                            adapter.clone(),
2450                            language_server.clone(),
2451                            cx,
2452                        )
2453                    }
2454                })
2455                .detach();
2456
2457            let disk_based_diagnostics_progress_token =
2458                adapter.disk_based_diagnostics_progress_token.clone();
2459
2460            language_server
2461                .on_notification::<lsp::notification::Progress, _>({
2462                    let this = this.downgrade();
2463                    move |params, mut cx| {
2464                        if let Some(this) = this.upgrade(&cx) {
2465                            this.update(&mut cx, |this, cx| {
2466                                this.on_lsp_progress(
2467                                    params,
2468                                    server_id,
2469                                    disk_based_diagnostics_progress_token.clone(),
2470                                    cx,
2471                                );
2472                            });
2473                        }
2474                    }
2475                })
2476                .detach();
2477
2478            language_server
2479                .notify::<lsp::notification::DidChangeConfiguration>(
2480                    lsp::DidChangeConfigurationParams {
2481                        settings: workspace_config,
2482                    },
2483                )
2484                .ok();
2485
2486            this.update(&mut cx, |this, cx| {
2487                // If the language server for this key doesn't match the server id, don't store the
2488                // server. Which will cause it to be dropped, killing the process
2489                if this
2490                    .language_server_ids
2491                    .get(&key)
2492                    .map(|id| id != &server_id)
2493                    .unwrap_or(false)
2494                {
2495                    return None;
2496                }
2497
2498                // Update language_servers collection with Running variant of LanguageServerState
2499                // indicating that the server is up and running and ready
2500                this.language_servers.insert(
2501                    server_id,
2502                    LanguageServerState::Running {
2503                        adapter: adapter.clone(),
2504                        language: language.clone(),
2505                        watched_paths: Default::default(),
2506                        server: language_server.clone(),
2507                        simulate_disk_based_diagnostics_completion: None,
2508                    },
2509                );
2510                this.language_server_statuses.insert(
2511                    server_id,
2512                    LanguageServerStatus {
2513                        name: language_server.name().to_string(),
2514                        pending_work: Default::default(),
2515                        has_pending_diagnostic_updates: false,
2516                        progress_tokens: Default::default(),
2517                    },
2518                );
2519
2520                if let Some(project_id) = this.remote_id() {
2521                    this.client
2522                        .send(proto::StartLanguageServer {
2523                            project_id,
2524                            server: Some(proto::LanguageServer {
2525                                id: server_id.0 as u64,
2526                                name: language_server.name().to_string(),
2527                            }),
2528                        })
2529                        .log_err();
2530                }
2531
2532                // Tell the language server about every open buffer in the worktree that matches the language.
2533                for buffer in this.opened_buffers.values() {
2534                    if let Some(buffer_handle) = buffer.upgrade(cx) {
2535                        let buffer = buffer_handle.read(cx);
2536                        let file = match File::from_dyn(buffer.file()) {
2537                            Some(file) => file,
2538                            None => continue,
2539                        };
2540                        let language = match buffer.language() {
2541                            Some(language) => language,
2542                            None => continue,
2543                        };
2544
2545                        if file.worktree.read(cx).id() != key.0
2546                            || !language.lsp_adapters().iter().any(|a| a.name == key.1)
2547                        {
2548                            continue;
2549                        }
2550
2551                        let file = file.as_local()?;
2552                        let versions = this
2553                            .buffer_snapshots
2554                            .entry(buffer.remote_id())
2555                            .or_default()
2556                            .entry(server_id)
2557                            .or_insert_with(|| {
2558                                vec![LspBufferSnapshot {
2559                                    version: 0,
2560                                    snapshot: buffer.text_snapshot(),
2561                                }]
2562                            });
2563
2564                        let snapshot = versions.last().unwrap();
2565                        let version = snapshot.version;
2566                        let initial_snapshot = &snapshot.snapshot;
2567                        let uri = lsp::Url::from_file_path(file.abs_path(cx)).unwrap();
2568                        language_server
2569                            .notify::<lsp::notification::DidOpenTextDocument>(
2570                                lsp::DidOpenTextDocumentParams {
2571                                    text_document: lsp::TextDocumentItem::new(
2572                                        uri,
2573                                        adapter
2574                                            .language_ids
2575                                            .get(language.name().as_ref())
2576                                            .cloned()
2577                                            .unwrap_or_default(),
2578                                        version,
2579                                        initial_snapshot.text(),
2580                                    ),
2581                                },
2582                            )
2583                            .log_err()?;
2584                        buffer_handle.update(cx, |buffer, cx| {
2585                            buffer.set_completion_triggers(
2586                                language_server
2587                                    .capabilities()
2588                                    .completion_provider
2589                                    .as_ref()
2590                                    .and_then(|provider| provider.trigger_characters.clone())
2591                                    .unwrap_or_default(),
2592                                cx,
2593                            )
2594                        });
2595                    }
2596                }
2597
2598                cx.notify();
2599                Some(language_server)
2600            })
2601        }))
2602    }
2603
2604    // Returns a list of all of the worktrees which no longer have a language server and the root path
2605    // for the stopped server
2606    fn stop_language_server(
2607        &mut self,
2608        worktree_id: WorktreeId,
2609        adapter_name: LanguageServerName,
2610        cx: &mut ModelContext<Self>,
2611    ) -> Task<(Option<PathBuf>, Vec<WorktreeId>)> {
2612        let key = (worktree_id, adapter_name);
2613        if let Some(server_id) = self.language_server_ids.remove(&key) {
2614            // Remove other entries for this language server as well
2615            let mut orphaned_worktrees = vec![worktree_id];
2616            let other_keys = self.language_server_ids.keys().cloned().collect::<Vec<_>>();
2617            for other_key in other_keys {
2618                if self.language_server_ids.get(&other_key) == Some(&server_id) {
2619                    self.language_server_ids.remove(&other_key);
2620                    orphaned_worktrees.push(other_key.0);
2621                }
2622            }
2623
2624            for buffer in self.opened_buffers.values() {
2625                if let Some(buffer) = buffer.upgrade(cx) {
2626                    buffer.update(cx, |buffer, cx| {
2627                        buffer.update_diagnostics(server_id, Default::default(), cx);
2628                    });
2629                }
2630            }
2631            for worktree in &self.worktrees {
2632                if let Some(worktree) = worktree.upgrade(cx) {
2633                    worktree.update(cx, |worktree, cx| {
2634                        if let Some(worktree) = worktree.as_local_mut() {
2635                            worktree.clear_diagnostics_for_language_server(server_id, cx);
2636                        }
2637                    });
2638                }
2639            }
2640
2641            self.language_server_statuses.remove(&server_id);
2642            cx.notify();
2643
2644            let server_state = self.language_servers.remove(&server_id);
2645            cx.spawn_weak(|this, mut cx| async move {
2646                let mut root_path = None;
2647
2648                let server = match server_state {
2649                    Some(LanguageServerState::Starting(started_language_server)) => {
2650                        started_language_server.await
2651                    }
2652                    Some(LanguageServerState::Running { server, .. }) => Some(server),
2653                    None => None,
2654                };
2655
2656                if let Some(server) = server {
2657                    root_path = Some(server.root_path().clone());
2658                    if let Some(shutdown) = server.shutdown() {
2659                        shutdown.await;
2660                    }
2661                }
2662
2663                if let Some(this) = this.upgrade(&cx) {
2664                    this.update(&mut cx, |this, cx| {
2665                        this.language_server_statuses.remove(&server_id);
2666                        cx.notify();
2667                    });
2668                }
2669
2670                (root_path, orphaned_worktrees)
2671            })
2672        } else {
2673            Task::ready((None, Vec::new()))
2674        }
2675    }
2676
2677    pub fn restart_language_servers_for_buffers(
2678        &mut self,
2679        buffers: impl IntoIterator<Item = ModelHandle<Buffer>>,
2680        cx: &mut ModelContext<Self>,
2681    ) -> Option<()> {
2682        let language_server_lookup_info: HashSet<(WorktreeId, Arc<Path>, Arc<Language>)> = buffers
2683            .into_iter()
2684            .filter_map(|buffer| {
2685                let buffer = buffer.read(cx);
2686                let file = File::from_dyn(buffer.file())?;
2687                let worktree = file.worktree.read(cx).as_local()?;
2688                let full_path = file.full_path(cx);
2689                let language = self
2690                    .languages
2691                    .language_for_file(&full_path, Some(buffer.as_rope()))
2692                    .now_or_never()?
2693                    .ok()?;
2694                Some((worktree.id(), worktree.abs_path().clone(), language))
2695            })
2696            .collect();
2697        for (worktree_id, worktree_abs_path, language) in language_server_lookup_info {
2698            self.restart_language_servers(worktree_id, worktree_abs_path, language, cx);
2699        }
2700
2701        None
2702    }
2703
2704    // TODO This will break in the case where the adapter's root paths and worktrees are not equal
2705    fn restart_language_servers(
2706        &mut self,
2707        worktree_id: WorktreeId,
2708        fallback_path: Arc<Path>,
2709        language: Arc<Language>,
2710        cx: &mut ModelContext<Self>,
2711    ) {
2712        let mut stops = Vec::new();
2713        for adapter in language.lsp_adapters() {
2714            stops.push(self.stop_language_server(worktree_id, adapter.name.clone(), cx));
2715        }
2716
2717        if stops.is_empty() {
2718            return;
2719        }
2720        let mut stops = stops.into_iter();
2721
2722        cx.spawn_weak(|this, mut cx| async move {
2723            let (original_root_path, mut orphaned_worktrees) = stops.next().unwrap().await;
2724            for stop in stops {
2725                let (_, worktrees) = stop.await;
2726                orphaned_worktrees.extend_from_slice(&worktrees);
2727            }
2728
2729            let this = match this.upgrade(&cx) {
2730                Some(this) => this,
2731                None => return,
2732            };
2733
2734            this.update(&mut cx, |this, cx| {
2735                // Attempt to restart using original server path. Fallback to passed in
2736                // path if we could not retrieve the root path
2737                let root_path = original_root_path
2738                    .map(|path_buf| Arc::from(path_buf.as_path()))
2739                    .unwrap_or(fallback_path);
2740
2741                this.start_language_servers(worktree_id, root_path, language.clone(), cx);
2742
2743                // Lookup new server ids and set them for each of the orphaned worktrees
2744                for adapter in language.lsp_adapters() {
2745                    if let Some(new_server_id) = this
2746                        .language_server_ids
2747                        .get(&(worktree_id, adapter.name.clone()))
2748                        .cloned()
2749                    {
2750                        for &orphaned_worktree in &orphaned_worktrees {
2751                            this.language_server_ids
2752                                .insert((orphaned_worktree, adapter.name.clone()), new_server_id);
2753                        }
2754                    }
2755                }
2756            });
2757        })
2758        .detach();
2759    }
2760
2761    fn on_lsp_progress(
2762        &mut self,
2763        progress: lsp::ProgressParams,
2764        language_server_id: LanguageServerId,
2765        disk_based_diagnostics_progress_token: Option<String>,
2766        cx: &mut ModelContext<Self>,
2767    ) {
2768        let token = match progress.token {
2769            lsp::NumberOrString::String(token) => token,
2770            lsp::NumberOrString::Number(token) => {
2771                log::info!("skipping numeric progress token {}", token);
2772                return;
2773            }
2774        };
2775        let lsp::ProgressParamsValue::WorkDone(progress) = progress.value;
2776        let language_server_status =
2777            if let Some(status) = self.language_server_statuses.get_mut(&language_server_id) {
2778                status
2779            } else {
2780                return;
2781            };
2782
2783        if !language_server_status.progress_tokens.contains(&token) {
2784            return;
2785        }
2786
2787        let is_disk_based_diagnostics_progress = disk_based_diagnostics_progress_token
2788            .as_ref()
2789            .map_or(false, |disk_based_token| {
2790                token.starts_with(disk_based_token)
2791            });
2792
2793        match progress {
2794            lsp::WorkDoneProgress::Begin(report) => {
2795                if is_disk_based_diagnostics_progress {
2796                    language_server_status.has_pending_diagnostic_updates = true;
2797                    self.disk_based_diagnostics_started(language_server_id, cx);
2798                    self.buffer_ordered_messages_tx
2799                        .unbounded_send(BufferOrderedMessage::LanguageServerUpdate {
2800                            language_server_id,
2801                            message: proto::update_language_server::Variant::DiskBasedDiagnosticsUpdating(Default::default())
2802                        })
2803                        .ok();
2804                } else {
2805                    self.on_lsp_work_start(
2806                        language_server_id,
2807                        token.clone(),
2808                        LanguageServerProgress {
2809                            message: report.message.clone(),
2810                            percentage: report.percentage.map(|p| p as usize),
2811                            last_update_at: Instant::now(),
2812                        },
2813                        cx,
2814                    );
2815                    self.buffer_ordered_messages_tx
2816                        .unbounded_send(BufferOrderedMessage::LanguageServerUpdate {
2817                            language_server_id,
2818                            message: proto::update_language_server::Variant::WorkStart(
2819                                proto::LspWorkStart {
2820                                    token,
2821                                    message: report.message,
2822                                    percentage: report.percentage.map(|p| p as u32),
2823                                },
2824                            ),
2825                        })
2826                        .ok();
2827                }
2828            }
2829            lsp::WorkDoneProgress::Report(report) => {
2830                if !is_disk_based_diagnostics_progress {
2831                    self.on_lsp_work_progress(
2832                        language_server_id,
2833                        token.clone(),
2834                        LanguageServerProgress {
2835                            message: report.message.clone(),
2836                            percentage: report.percentage.map(|p| p as usize),
2837                            last_update_at: Instant::now(),
2838                        },
2839                        cx,
2840                    );
2841                    self.buffer_ordered_messages_tx
2842                        .unbounded_send(BufferOrderedMessage::LanguageServerUpdate {
2843                            language_server_id,
2844                            message: proto::update_language_server::Variant::WorkProgress(
2845                                proto::LspWorkProgress {
2846                                    token,
2847                                    message: report.message,
2848                                    percentage: report.percentage.map(|p| p as u32),
2849                                },
2850                            ),
2851                        })
2852                        .ok();
2853                }
2854            }
2855            lsp::WorkDoneProgress::End(_) => {
2856                language_server_status.progress_tokens.remove(&token);
2857
2858                if is_disk_based_diagnostics_progress {
2859                    language_server_status.has_pending_diagnostic_updates = false;
2860                    self.disk_based_diagnostics_finished(language_server_id, cx);
2861                    self.buffer_ordered_messages_tx
2862                        .unbounded_send(BufferOrderedMessage::LanguageServerUpdate {
2863                            language_server_id,
2864                            message:
2865                                proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
2866                                    Default::default(),
2867                                ),
2868                        })
2869                        .ok();
2870                } else {
2871                    self.on_lsp_work_end(language_server_id, token.clone(), cx);
2872                    self.buffer_ordered_messages_tx
2873                        .unbounded_send(BufferOrderedMessage::LanguageServerUpdate {
2874                            language_server_id,
2875                            message: proto::update_language_server::Variant::WorkEnd(
2876                                proto::LspWorkEnd { token },
2877                            ),
2878                        })
2879                        .ok();
2880                }
2881            }
2882        }
2883    }
2884
2885    fn on_lsp_work_start(
2886        &mut self,
2887        language_server_id: LanguageServerId,
2888        token: String,
2889        progress: LanguageServerProgress,
2890        cx: &mut ModelContext<Self>,
2891    ) {
2892        if let Some(status) = self.language_server_statuses.get_mut(&language_server_id) {
2893            status.pending_work.insert(token, progress);
2894            cx.notify();
2895        }
2896    }
2897
2898    fn on_lsp_work_progress(
2899        &mut self,
2900        language_server_id: LanguageServerId,
2901        token: String,
2902        progress: LanguageServerProgress,
2903        cx: &mut ModelContext<Self>,
2904    ) {
2905        if let Some(status) = self.language_server_statuses.get_mut(&language_server_id) {
2906            let entry = status
2907                .pending_work
2908                .entry(token)
2909                .or_insert(LanguageServerProgress {
2910                    message: Default::default(),
2911                    percentage: Default::default(),
2912                    last_update_at: progress.last_update_at,
2913                });
2914            if progress.message.is_some() {
2915                entry.message = progress.message;
2916            }
2917            if progress.percentage.is_some() {
2918                entry.percentage = progress.percentage;
2919            }
2920            entry.last_update_at = progress.last_update_at;
2921            cx.notify();
2922        }
2923    }
2924
2925    fn on_lsp_work_end(
2926        &mut self,
2927        language_server_id: LanguageServerId,
2928        token: String,
2929        cx: &mut ModelContext<Self>,
2930    ) {
2931        if let Some(status) = self.language_server_statuses.get_mut(&language_server_id) {
2932            status.pending_work.remove(&token);
2933            cx.notify();
2934        }
2935    }
2936
2937    fn on_lsp_did_change_watched_files(
2938        &mut self,
2939        language_server_id: LanguageServerId,
2940        params: DidChangeWatchedFilesRegistrationOptions,
2941        cx: &mut ModelContext<Self>,
2942    ) {
2943        if let Some(LanguageServerState::Running { watched_paths, .. }) =
2944            self.language_servers.get_mut(&language_server_id)
2945        {
2946            let mut builders = HashMap::default();
2947            for watcher in params.watchers {
2948                for worktree in &self.worktrees {
2949                    if let Some(worktree) = worktree.upgrade(cx) {
2950                        let worktree = worktree.read(cx);
2951                        if let Some(abs_path) = worktree.abs_path().to_str() {
2952                            if let Some(suffix) = watcher
2953                                .glob_pattern
2954                                .strip_prefix(abs_path)
2955                                .and_then(|s| s.strip_prefix(std::path::MAIN_SEPARATOR))
2956                            {
2957                                if let Some(glob) = Glob::new(suffix).log_err() {
2958                                    builders
2959                                        .entry(worktree.id())
2960                                        .or_insert_with(|| GlobSetBuilder::new())
2961                                        .add(glob);
2962                                }
2963                                break;
2964                            }
2965                        }
2966                    }
2967                }
2968            }
2969
2970            watched_paths.clear();
2971            for (worktree_id, builder) in builders {
2972                if let Ok(globset) = builder.build() {
2973                    watched_paths.insert(worktree_id, globset);
2974                }
2975            }
2976
2977            cx.notify();
2978        }
2979    }
2980
2981    async fn on_lsp_workspace_edit(
2982        this: WeakModelHandle<Self>,
2983        params: lsp::ApplyWorkspaceEditParams,
2984        server_id: LanguageServerId,
2985        adapter: Arc<CachedLspAdapter>,
2986        language_server: Arc<LanguageServer>,
2987        mut cx: AsyncAppContext,
2988    ) -> Result<lsp::ApplyWorkspaceEditResponse> {
2989        let this = this
2990            .upgrade(&cx)
2991            .ok_or_else(|| anyhow!("project project closed"))?;
2992        let transaction = Self::deserialize_workspace_edit(
2993            this.clone(),
2994            params.edit,
2995            true,
2996            adapter.clone(),
2997            language_server.clone(),
2998            &mut cx,
2999        )
3000        .await
3001        .log_err();
3002        this.update(&mut cx, |this, _| {
3003            if let Some(transaction) = transaction {
3004                this.last_workspace_edits_by_language_server
3005                    .insert(server_id, transaction);
3006            }
3007        });
3008        Ok(lsp::ApplyWorkspaceEditResponse {
3009            applied: true,
3010            failed_change: None,
3011            failure_reason: None,
3012        })
3013    }
3014
3015    pub fn language_server_statuses(
3016        &self,
3017    ) -> impl DoubleEndedIterator<Item = &LanguageServerStatus> {
3018        self.language_server_statuses.values()
3019    }
3020
3021    pub fn update_diagnostics(
3022        &mut self,
3023        language_server_id: LanguageServerId,
3024        mut params: lsp::PublishDiagnosticsParams,
3025        disk_based_sources: &[String],
3026        cx: &mut ModelContext<Self>,
3027    ) -> Result<()> {
3028        let abs_path = params
3029            .uri
3030            .to_file_path()
3031            .map_err(|_| anyhow!("URI is not a file"))?;
3032        let mut diagnostics = Vec::default();
3033        let mut primary_diagnostic_group_ids = HashMap::default();
3034        let mut sources_by_group_id = HashMap::default();
3035        let mut supporting_diagnostics = HashMap::default();
3036
3037        // Ensure that primary diagnostics are always the most severe
3038        params.diagnostics.sort_by_key(|item| item.severity);
3039
3040        for diagnostic in &params.diagnostics {
3041            let source = diagnostic.source.as_ref();
3042            let code = diagnostic.code.as_ref().map(|code| match code {
3043                lsp::NumberOrString::Number(code) => code.to_string(),
3044                lsp::NumberOrString::String(code) => code.clone(),
3045            });
3046            let range = range_from_lsp(diagnostic.range);
3047            let is_supporting = diagnostic
3048                .related_information
3049                .as_ref()
3050                .map_or(false, |infos| {
3051                    infos.iter().any(|info| {
3052                        primary_diagnostic_group_ids.contains_key(&(
3053                            source,
3054                            code.clone(),
3055                            range_from_lsp(info.location.range),
3056                        ))
3057                    })
3058                });
3059
3060            let is_unnecessary = diagnostic.tags.as_ref().map_or(false, |tags| {
3061                tags.iter().any(|tag| *tag == DiagnosticTag::UNNECESSARY)
3062            });
3063
3064            if is_supporting {
3065                supporting_diagnostics.insert(
3066                    (source, code.clone(), range),
3067                    (diagnostic.severity, is_unnecessary),
3068                );
3069            } else {
3070                let group_id = post_inc(&mut self.next_diagnostic_group_id);
3071                let is_disk_based =
3072                    source.map_or(false, |source| disk_based_sources.contains(source));
3073
3074                sources_by_group_id.insert(group_id, source);
3075                primary_diagnostic_group_ids
3076                    .insert((source, code.clone(), range.clone()), group_id);
3077
3078                diagnostics.push(DiagnosticEntry {
3079                    range,
3080                    diagnostic: Diagnostic {
3081                        source: diagnostic.source.clone(),
3082                        code: code.clone(),
3083                        severity: diagnostic.severity.unwrap_or(DiagnosticSeverity::ERROR),
3084                        message: diagnostic.message.clone(),
3085                        group_id,
3086                        is_primary: true,
3087                        is_valid: true,
3088                        is_disk_based,
3089                        is_unnecessary,
3090                    },
3091                });
3092                if let Some(infos) = &diagnostic.related_information {
3093                    for info in infos {
3094                        if info.location.uri == params.uri && !info.message.is_empty() {
3095                            let range = range_from_lsp(info.location.range);
3096                            diagnostics.push(DiagnosticEntry {
3097                                range,
3098                                diagnostic: Diagnostic {
3099                                    source: diagnostic.source.clone(),
3100                                    code: code.clone(),
3101                                    severity: DiagnosticSeverity::INFORMATION,
3102                                    message: info.message.clone(),
3103                                    group_id,
3104                                    is_primary: false,
3105                                    is_valid: true,
3106                                    is_disk_based,
3107                                    is_unnecessary: false,
3108                                },
3109                            });
3110                        }
3111                    }
3112                }
3113            }
3114        }
3115
3116        for entry in &mut diagnostics {
3117            let diagnostic = &mut entry.diagnostic;
3118            if !diagnostic.is_primary {
3119                let source = *sources_by_group_id.get(&diagnostic.group_id).unwrap();
3120                if let Some(&(severity, is_unnecessary)) = supporting_diagnostics.get(&(
3121                    source,
3122                    diagnostic.code.clone(),
3123                    entry.range.clone(),
3124                )) {
3125                    if let Some(severity) = severity {
3126                        diagnostic.severity = severity;
3127                    }
3128                    diagnostic.is_unnecessary = is_unnecessary;
3129                }
3130            }
3131        }
3132
3133        self.update_diagnostic_entries(
3134            language_server_id,
3135            abs_path,
3136            params.version,
3137            diagnostics,
3138            cx,
3139        )?;
3140        Ok(())
3141    }
3142
3143    pub fn update_diagnostic_entries(
3144        &mut self,
3145        server_id: LanguageServerId,
3146        abs_path: PathBuf,
3147        version: Option<i32>,
3148        diagnostics: Vec<DiagnosticEntry<Unclipped<PointUtf16>>>,
3149        cx: &mut ModelContext<Project>,
3150    ) -> Result<(), anyhow::Error> {
3151        let (worktree, relative_path) = self
3152            .find_local_worktree(&abs_path, cx)
3153            .ok_or_else(|| anyhow!("no worktree found for diagnostics"))?;
3154
3155        let project_path = ProjectPath {
3156            worktree_id: worktree.read(cx).id(),
3157            path: relative_path.into(),
3158        };
3159
3160        if let Some(buffer) = self.get_open_buffer(&project_path, cx) {
3161            self.update_buffer_diagnostics(&buffer, server_id, version, diagnostics.clone(), cx)?;
3162        }
3163
3164        let updated = worktree.update(cx, |worktree, cx| {
3165            worktree
3166                .as_local_mut()
3167                .ok_or_else(|| anyhow!("not a local worktree"))?
3168                .update_diagnostics(server_id, project_path.path.clone(), diagnostics, cx)
3169        })?;
3170        if updated {
3171            cx.emit(Event::DiagnosticsUpdated {
3172                language_server_id: server_id,
3173                path: project_path,
3174            });
3175        }
3176        Ok(())
3177    }
3178
3179    fn update_buffer_diagnostics(
3180        &mut self,
3181        buffer: &ModelHandle<Buffer>,
3182        server_id: LanguageServerId,
3183        version: Option<i32>,
3184        mut diagnostics: Vec<DiagnosticEntry<Unclipped<PointUtf16>>>,
3185        cx: &mut ModelContext<Self>,
3186    ) -> Result<()> {
3187        fn compare_diagnostics(a: &Diagnostic, b: &Diagnostic) -> Ordering {
3188            Ordering::Equal
3189                .then_with(|| b.is_primary.cmp(&a.is_primary))
3190                .then_with(|| a.is_disk_based.cmp(&b.is_disk_based))
3191                .then_with(|| a.severity.cmp(&b.severity))
3192                .then_with(|| a.message.cmp(&b.message))
3193        }
3194
3195        let snapshot = self.buffer_snapshot_for_lsp_version(buffer, server_id, version, cx)?;
3196
3197        diagnostics.sort_unstable_by(|a, b| {
3198            Ordering::Equal
3199                .then_with(|| a.range.start.cmp(&b.range.start))
3200                .then_with(|| b.range.end.cmp(&a.range.end))
3201                .then_with(|| compare_diagnostics(&a.diagnostic, &b.diagnostic))
3202        });
3203
3204        let mut sanitized_diagnostics = Vec::new();
3205        let edits_since_save = Patch::new(
3206            snapshot
3207                .edits_since::<Unclipped<PointUtf16>>(buffer.read(cx).saved_version())
3208                .collect(),
3209        );
3210        for entry in diagnostics {
3211            let start;
3212            let end;
3213            if entry.diagnostic.is_disk_based {
3214                // Some diagnostics are based on files on disk instead of buffers'
3215                // current contents. Adjust these diagnostics' ranges to reflect
3216                // any unsaved edits.
3217                start = edits_since_save.old_to_new(entry.range.start);
3218                end = edits_since_save.old_to_new(entry.range.end);
3219            } else {
3220                start = entry.range.start;
3221                end = entry.range.end;
3222            }
3223
3224            let mut range = snapshot.clip_point_utf16(start, Bias::Left)
3225                ..snapshot.clip_point_utf16(end, Bias::Right);
3226
3227            // Expand empty ranges by one codepoint
3228            if range.start == range.end {
3229                // This will be go to the next boundary when being clipped
3230                range.end.column += 1;
3231                range.end = snapshot.clip_point_utf16(Unclipped(range.end), Bias::Right);
3232                if range.start == range.end && range.end.column > 0 {
3233                    range.start.column -= 1;
3234                    range.end = snapshot.clip_point_utf16(Unclipped(range.end), Bias::Left);
3235                }
3236            }
3237
3238            sanitized_diagnostics.push(DiagnosticEntry {
3239                range,
3240                diagnostic: entry.diagnostic,
3241            });
3242        }
3243        drop(edits_since_save);
3244
3245        let set = DiagnosticSet::new(sanitized_diagnostics, &snapshot);
3246        buffer.update(cx, |buffer, cx| {
3247            buffer.update_diagnostics(server_id, set, cx)
3248        });
3249        Ok(())
3250    }
3251
3252    pub fn reload_buffers(
3253        &self,
3254        buffers: HashSet<ModelHandle<Buffer>>,
3255        push_to_history: bool,
3256        cx: &mut ModelContext<Self>,
3257    ) -> Task<Result<ProjectTransaction>> {
3258        let mut local_buffers = Vec::new();
3259        let mut remote_buffers = None;
3260        for buffer_handle in buffers {
3261            let buffer = buffer_handle.read(cx);
3262            if buffer.is_dirty() {
3263                if let Some(file) = File::from_dyn(buffer.file()) {
3264                    if file.is_local() {
3265                        local_buffers.push(buffer_handle);
3266                    } else {
3267                        remote_buffers.get_or_insert(Vec::new()).push(buffer_handle);
3268                    }
3269                }
3270            }
3271        }
3272
3273        let remote_buffers = self.remote_id().zip(remote_buffers);
3274        let client = self.client.clone();
3275
3276        cx.spawn(|this, mut cx| async move {
3277            let mut project_transaction = ProjectTransaction::default();
3278
3279            if let Some((project_id, remote_buffers)) = remote_buffers {
3280                let response = client
3281                    .request(proto::ReloadBuffers {
3282                        project_id,
3283                        buffer_ids: remote_buffers
3284                            .iter()
3285                            .map(|buffer| buffer.read_with(&cx, |buffer, _| buffer.remote_id()))
3286                            .collect(),
3287                    })
3288                    .await?
3289                    .transaction
3290                    .ok_or_else(|| anyhow!("missing transaction"))?;
3291                project_transaction = this
3292                    .update(&mut cx, |this, cx| {
3293                        this.deserialize_project_transaction(response, push_to_history, cx)
3294                    })
3295                    .await?;
3296            }
3297
3298            for buffer in local_buffers {
3299                let transaction = buffer
3300                    .update(&mut cx, |buffer, cx| buffer.reload(cx))
3301                    .await?;
3302                buffer.update(&mut cx, |buffer, cx| {
3303                    if let Some(transaction) = transaction {
3304                        if !push_to_history {
3305                            buffer.forget_transaction(transaction.id);
3306                        }
3307                        project_transaction.0.insert(cx.handle(), transaction);
3308                    }
3309                });
3310            }
3311
3312            Ok(project_transaction)
3313        })
3314    }
3315
3316    pub fn format(
3317        &self,
3318        buffers: HashSet<ModelHandle<Buffer>>,
3319        push_to_history: bool,
3320        trigger: FormatTrigger,
3321        cx: &mut ModelContext<Project>,
3322    ) -> Task<Result<ProjectTransaction>> {
3323        if self.is_local() {
3324            let mut buffers_with_paths_and_servers = buffers
3325                .into_iter()
3326                .filter_map(|buffer_handle| {
3327                    let buffer = buffer_handle.read(cx);
3328                    let file = File::from_dyn(buffer.file())?;
3329                    let buffer_abs_path = file.as_local().map(|f| f.abs_path(cx));
3330                    let server = self
3331                        .primary_language_servers_for_buffer(buffer, cx)
3332                        .map(|s| s.1.clone());
3333                    Some((buffer_handle, buffer_abs_path, server))
3334                })
3335                .collect::<Vec<_>>();
3336
3337            cx.spawn(|this, mut cx| async move {
3338                // Do not allow multiple concurrent formatting requests for the
3339                // same buffer.
3340                this.update(&mut cx, |this, cx| {
3341                    buffers_with_paths_and_servers.retain(|(buffer, _, _)| {
3342                        this.buffers_being_formatted
3343                            .insert(buffer.read(cx).remote_id())
3344                    });
3345                });
3346
3347                let _cleanup = defer({
3348                    let this = this.clone();
3349                    let mut cx = cx.clone();
3350                    let buffers = &buffers_with_paths_and_servers;
3351                    move || {
3352                        this.update(&mut cx, |this, cx| {
3353                            for (buffer, _, _) in buffers {
3354                                this.buffers_being_formatted
3355                                    .remove(&buffer.read(cx).remote_id());
3356                            }
3357                        });
3358                    }
3359                });
3360
3361                let mut project_transaction = ProjectTransaction::default();
3362                for (buffer, buffer_abs_path, language_server) in &buffers_with_paths_and_servers {
3363                    let settings = buffer.read_with(&cx, |buffer, cx| {
3364                        let language_name = buffer.language().map(|language| language.name());
3365                        language_settings(language_name.as_deref(), cx).clone()
3366                    });
3367
3368                    let remove_trailing_whitespace = settings.remove_trailing_whitespace_on_save;
3369                    let ensure_final_newline = settings.ensure_final_newline_on_save;
3370                    let format_on_save = settings.format_on_save.clone();
3371                    let formatter = settings.formatter.clone();
3372                    let tab_size = settings.tab_size;
3373
3374                    // First, format buffer's whitespace according to the settings.
3375                    let trailing_whitespace_diff = if remove_trailing_whitespace {
3376                        Some(
3377                            buffer
3378                                .read_with(&cx, |b, cx| b.remove_trailing_whitespace(cx))
3379                                .await,
3380                        )
3381                    } else {
3382                        None
3383                    };
3384                    let whitespace_transaction_id = buffer.update(&mut cx, |buffer, cx| {
3385                        buffer.finalize_last_transaction();
3386                        buffer.start_transaction();
3387                        if let Some(diff) = trailing_whitespace_diff {
3388                            buffer.apply_diff(diff, cx);
3389                        }
3390                        if ensure_final_newline {
3391                            buffer.ensure_final_newline(cx);
3392                        }
3393                        buffer.end_transaction(cx)
3394                    });
3395
3396                    // Currently, formatting operations are represented differently depending on
3397                    // whether they come from a language server or an external command.
3398                    enum FormatOperation {
3399                        Lsp(Vec<(Range<Anchor>, String)>),
3400                        External(Diff),
3401                    }
3402
3403                    // Apply language-specific formatting using either a language server
3404                    // or external command.
3405                    let mut format_operation = None;
3406                    match (formatter, format_on_save) {
3407                        (_, FormatOnSave::Off) if trigger == FormatTrigger::Save => {}
3408
3409                        (Formatter::LanguageServer, FormatOnSave::On | FormatOnSave::Off)
3410                        | (_, FormatOnSave::LanguageServer) => {
3411                            if let Some((language_server, buffer_abs_path)) =
3412                                language_server.as_ref().zip(buffer_abs_path.as_ref())
3413                            {
3414                                format_operation = Some(FormatOperation::Lsp(
3415                                    Self::format_via_lsp(
3416                                        &this,
3417                                        &buffer,
3418                                        buffer_abs_path,
3419                                        &language_server,
3420                                        tab_size,
3421                                        &mut cx,
3422                                    )
3423                                    .await
3424                                    .context("failed to format via language server")?,
3425                                ));
3426                            }
3427                        }
3428
3429                        (
3430                            Formatter::External { command, arguments },
3431                            FormatOnSave::On | FormatOnSave::Off,
3432                        )
3433                        | (_, FormatOnSave::External { command, arguments }) => {
3434                            if let Some(buffer_abs_path) = buffer_abs_path {
3435                                format_operation = Self::format_via_external_command(
3436                                    &buffer,
3437                                    &buffer_abs_path,
3438                                    &command,
3439                                    &arguments,
3440                                    &mut cx,
3441                                )
3442                                .await
3443                                .context(format!(
3444                                    "failed to format via external command {:?}",
3445                                    command
3446                                ))?
3447                                .map(FormatOperation::External);
3448                            }
3449                        }
3450                    };
3451
3452                    buffer.update(&mut cx, |b, cx| {
3453                        // If the buffer had its whitespace formatted and was edited while the language-specific
3454                        // formatting was being computed, avoid applying the language-specific formatting, because
3455                        // it can't be grouped with the whitespace formatting in the undo history.
3456                        if let Some(transaction_id) = whitespace_transaction_id {
3457                            if b.peek_undo_stack()
3458                                .map_or(true, |e| e.transaction_id() != transaction_id)
3459                            {
3460                                format_operation.take();
3461                            }
3462                        }
3463
3464                        // Apply any language-specific formatting, and group the two formatting operations
3465                        // in the buffer's undo history.
3466                        if let Some(operation) = format_operation {
3467                            match operation {
3468                                FormatOperation::Lsp(edits) => {
3469                                    b.edit(edits, None, cx);
3470                                }
3471                                FormatOperation::External(diff) => {
3472                                    b.apply_diff(diff, cx);
3473                                }
3474                            }
3475
3476                            if let Some(transaction_id) = whitespace_transaction_id {
3477                                b.group_until_transaction(transaction_id);
3478                            }
3479                        }
3480
3481                        if let Some(transaction) = b.finalize_last_transaction().cloned() {
3482                            if !push_to_history {
3483                                b.forget_transaction(transaction.id);
3484                            }
3485                            project_transaction.0.insert(buffer.clone(), transaction);
3486                        }
3487                    });
3488                }
3489
3490                Ok(project_transaction)
3491            })
3492        } else {
3493            let remote_id = self.remote_id();
3494            let client = self.client.clone();
3495            cx.spawn(|this, mut cx| async move {
3496                let mut project_transaction = ProjectTransaction::default();
3497                if let Some(project_id) = remote_id {
3498                    let response = client
3499                        .request(proto::FormatBuffers {
3500                            project_id,
3501                            trigger: trigger as i32,
3502                            buffer_ids: buffers
3503                                .iter()
3504                                .map(|buffer| buffer.read_with(&cx, |buffer, _| buffer.remote_id()))
3505                                .collect(),
3506                        })
3507                        .await?
3508                        .transaction
3509                        .ok_or_else(|| anyhow!("missing transaction"))?;
3510                    project_transaction = this
3511                        .update(&mut cx, |this, cx| {
3512                            this.deserialize_project_transaction(response, push_to_history, cx)
3513                        })
3514                        .await?;
3515                }
3516                Ok(project_transaction)
3517            })
3518        }
3519    }
3520
3521    async fn format_via_lsp(
3522        this: &ModelHandle<Self>,
3523        buffer: &ModelHandle<Buffer>,
3524        abs_path: &Path,
3525        language_server: &Arc<LanguageServer>,
3526        tab_size: NonZeroU32,
3527        cx: &mut AsyncAppContext,
3528    ) -> Result<Vec<(Range<Anchor>, String)>> {
3529        let text_document =
3530            lsp::TextDocumentIdentifier::new(lsp::Url::from_file_path(abs_path).unwrap());
3531        let capabilities = &language_server.capabilities();
3532        let lsp_edits = if capabilities
3533            .document_formatting_provider
3534            .as_ref()
3535            .map_or(false, |provider| *provider != lsp::OneOf::Left(false))
3536        {
3537            language_server
3538                .request::<lsp::request::Formatting>(lsp::DocumentFormattingParams {
3539                    text_document,
3540                    options: lsp_command::lsp_formatting_options(tab_size.get()),
3541                    work_done_progress_params: Default::default(),
3542                })
3543                .await?
3544        } else if capabilities
3545            .document_range_formatting_provider
3546            .as_ref()
3547            .map_or(false, |provider| *provider != lsp::OneOf::Left(false))
3548        {
3549            let buffer_start = lsp::Position::new(0, 0);
3550            let buffer_end =
3551                buffer.read_with(cx, |buffer, _| point_to_lsp(buffer.max_point_utf16()));
3552            language_server
3553                .request::<lsp::request::RangeFormatting>(lsp::DocumentRangeFormattingParams {
3554                    text_document,
3555                    range: lsp::Range::new(buffer_start, buffer_end),
3556                    options: lsp_command::lsp_formatting_options(tab_size.get()),
3557                    work_done_progress_params: Default::default(),
3558                })
3559                .await?
3560        } else {
3561            None
3562        };
3563
3564        if let Some(lsp_edits) = lsp_edits {
3565            this.update(cx, |this, cx| {
3566                this.edits_from_lsp(buffer, lsp_edits, language_server.server_id(), None, cx)
3567            })
3568            .await
3569        } else {
3570            Ok(Default::default())
3571        }
3572    }
3573
3574    async fn format_via_external_command(
3575        buffer: &ModelHandle<Buffer>,
3576        buffer_abs_path: &Path,
3577        command: &str,
3578        arguments: &[String],
3579        cx: &mut AsyncAppContext,
3580    ) -> Result<Option<Diff>> {
3581        let working_dir_path = buffer.read_with(cx, |buffer, cx| {
3582            let file = File::from_dyn(buffer.file())?;
3583            let worktree = file.worktree.read(cx).as_local()?;
3584            let mut worktree_path = worktree.abs_path().to_path_buf();
3585            if worktree.root_entry()?.is_file() {
3586                worktree_path.pop();
3587            }
3588            Some(worktree_path)
3589        });
3590
3591        if let Some(working_dir_path) = working_dir_path {
3592            let mut child =
3593                smol::process::Command::new(command)
3594                    .args(arguments.iter().map(|arg| {
3595                        arg.replace("{buffer_path}", &buffer_abs_path.to_string_lossy())
3596                    }))
3597                    .current_dir(&working_dir_path)
3598                    .stdin(smol::process::Stdio::piped())
3599                    .stdout(smol::process::Stdio::piped())
3600                    .stderr(smol::process::Stdio::piped())
3601                    .spawn()?;
3602            let stdin = child
3603                .stdin
3604                .as_mut()
3605                .ok_or_else(|| anyhow!("failed to acquire stdin"))?;
3606            let text = buffer.read_with(cx, |buffer, _| buffer.as_rope().clone());
3607            for chunk in text.chunks() {
3608                stdin.write_all(chunk.as_bytes()).await?;
3609            }
3610            stdin.flush().await?;
3611
3612            let output = child.output().await?;
3613            if !output.status.success() {
3614                return Err(anyhow!(
3615                    "command failed with exit code {:?}:\nstdout: {}\nstderr: {}",
3616                    output.status.code(),
3617                    String::from_utf8_lossy(&output.stdout),
3618                    String::from_utf8_lossy(&output.stderr),
3619                ));
3620            }
3621
3622            let stdout = String::from_utf8(output.stdout)?;
3623            Ok(Some(
3624                buffer
3625                    .read_with(cx, |buffer, cx| buffer.diff(stdout, cx))
3626                    .await,
3627            ))
3628        } else {
3629            Ok(None)
3630        }
3631    }
3632
3633    pub fn definition<T: ToPointUtf16>(
3634        &self,
3635        buffer: &ModelHandle<Buffer>,
3636        position: T,
3637        cx: &mut ModelContext<Self>,
3638    ) -> Task<Result<Vec<LocationLink>>> {
3639        let position = position.to_point_utf16(buffer.read(cx));
3640        self.request_lsp(buffer.clone(), GetDefinition { position }, cx)
3641    }
3642
3643    pub fn type_definition<T: ToPointUtf16>(
3644        &self,
3645        buffer: &ModelHandle<Buffer>,
3646        position: T,
3647        cx: &mut ModelContext<Self>,
3648    ) -> Task<Result<Vec<LocationLink>>> {
3649        let position = position.to_point_utf16(buffer.read(cx));
3650        self.request_lsp(buffer.clone(), GetTypeDefinition { position }, cx)
3651    }
3652
3653    pub fn references<T: ToPointUtf16>(
3654        &self,
3655        buffer: &ModelHandle<Buffer>,
3656        position: T,
3657        cx: &mut ModelContext<Self>,
3658    ) -> Task<Result<Vec<Location>>> {
3659        let position = position.to_point_utf16(buffer.read(cx));
3660        self.request_lsp(buffer.clone(), GetReferences { position }, cx)
3661    }
3662
3663    pub fn document_highlights<T: ToPointUtf16>(
3664        &self,
3665        buffer: &ModelHandle<Buffer>,
3666        position: T,
3667        cx: &mut ModelContext<Self>,
3668    ) -> Task<Result<Vec<DocumentHighlight>>> {
3669        let position = position.to_point_utf16(buffer.read(cx));
3670        self.request_lsp(buffer.clone(), GetDocumentHighlights { position }, cx)
3671    }
3672
3673    pub fn symbols(&self, query: &str, cx: &mut ModelContext<Self>) -> Task<Result<Vec<Symbol>>> {
3674        if self.is_local() {
3675            let mut requests = Vec::new();
3676            for ((worktree_id, _), server_id) in self.language_server_ids.iter() {
3677                let worktree_id = *worktree_id;
3678                if let Some(worktree) = self
3679                    .worktree_for_id(worktree_id, cx)
3680                    .and_then(|worktree| worktree.read(cx).as_local())
3681                {
3682                    if let Some(LanguageServerState::Running {
3683                        adapter,
3684                        language,
3685                        server,
3686                        ..
3687                    }) = self.language_servers.get(server_id)
3688                    {
3689                        let adapter = adapter.clone();
3690                        let language = language.clone();
3691                        let worktree_abs_path = worktree.abs_path().clone();
3692                        requests.push(
3693                            server
3694                                .request::<lsp::request::WorkspaceSymbol>(
3695                                    lsp::WorkspaceSymbolParams {
3696                                        query: query.to_string(),
3697                                        ..Default::default()
3698                                    },
3699                                )
3700                                .log_err()
3701                                .map(move |response| {
3702                                    (
3703                                        adapter,
3704                                        language,
3705                                        worktree_id,
3706                                        worktree_abs_path,
3707                                        response.unwrap_or_default(),
3708                                    )
3709                                }),
3710                        );
3711                    }
3712                }
3713            }
3714
3715            cx.spawn_weak(|this, cx| async move {
3716                let responses = futures::future::join_all(requests).await;
3717                let this = if let Some(this) = this.upgrade(&cx) {
3718                    this
3719                } else {
3720                    return Ok(Default::default());
3721                };
3722                let symbols = this.read_with(&cx, |this, cx| {
3723                    let mut symbols = Vec::new();
3724                    for (
3725                        adapter,
3726                        adapter_language,
3727                        source_worktree_id,
3728                        worktree_abs_path,
3729                        response,
3730                    ) in responses
3731                    {
3732                        symbols.extend(response.into_iter().flatten().filter_map(|lsp_symbol| {
3733                            let abs_path = lsp_symbol.location.uri.to_file_path().ok()?;
3734                            let mut worktree_id = source_worktree_id;
3735                            let path;
3736                            if let Some((worktree, rel_path)) =
3737                                this.find_local_worktree(&abs_path, cx)
3738                            {
3739                                worktree_id = worktree.read(cx).id();
3740                                path = rel_path;
3741                            } else {
3742                                path = relativize_path(&worktree_abs_path, &abs_path);
3743                            }
3744
3745                            let project_path = ProjectPath {
3746                                worktree_id,
3747                                path: path.into(),
3748                            };
3749                            let signature = this.symbol_signature(&project_path);
3750                            let adapter_language = adapter_language.clone();
3751                            let language = this
3752                                .languages
3753                                .language_for_file(&project_path.path, None)
3754                                .unwrap_or_else(move |_| adapter_language);
3755                            let language_server_name = adapter.name.clone();
3756                            Some(async move {
3757                                let language = language.await;
3758                                let label = language
3759                                    .label_for_symbol(&lsp_symbol.name, lsp_symbol.kind)
3760                                    .await;
3761
3762                                Symbol {
3763                                    language_server_name,
3764                                    source_worktree_id,
3765                                    path: project_path,
3766                                    label: label.unwrap_or_else(|| {
3767                                        CodeLabel::plain(lsp_symbol.name.clone(), None)
3768                                    }),
3769                                    kind: lsp_symbol.kind,
3770                                    name: lsp_symbol.name,
3771                                    range: range_from_lsp(lsp_symbol.location.range),
3772                                    signature,
3773                                }
3774                            })
3775                        }));
3776                    }
3777                    symbols
3778                });
3779                Ok(futures::future::join_all(symbols).await)
3780            })
3781        } else if let Some(project_id) = self.remote_id() {
3782            let request = self.client.request(proto::GetProjectSymbols {
3783                project_id,
3784                query: query.to_string(),
3785            });
3786            cx.spawn_weak(|this, cx| async move {
3787                let response = request.await?;
3788                let mut symbols = Vec::new();
3789                if let Some(this) = this.upgrade(&cx) {
3790                    let new_symbols = this.read_with(&cx, |this, _| {
3791                        response
3792                            .symbols
3793                            .into_iter()
3794                            .map(|symbol| this.deserialize_symbol(symbol))
3795                            .collect::<Vec<_>>()
3796                    });
3797                    symbols = futures::future::join_all(new_symbols)
3798                        .await
3799                        .into_iter()
3800                        .filter_map(|symbol| symbol.log_err())
3801                        .collect::<Vec<_>>();
3802                }
3803                Ok(symbols)
3804            })
3805        } else {
3806            Task::ready(Ok(Default::default()))
3807        }
3808    }
3809
3810    pub fn open_buffer_for_symbol(
3811        &mut self,
3812        symbol: &Symbol,
3813        cx: &mut ModelContext<Self>,
3814    ) -> Task<Result<ModelHandle<Buffer>>> {
3815        if self.is_local() {
3816            let language_server_id = if let Some(id) = self.language_server_ids.get(&(
3817                symbol.source_worktree_id,
3818                symbol.language_server_name.clone(),
3819            )) {
3820                *id
3821            } else {
3822                return Task::ready(Err(anyhow!(
3823                    "language server for worktree and language not found"
3824                )));
3825            };
3826
3827            let worktree_abs_path = if let Some(worktree_abs_path) = self
3828                .worktree_for_id(symbol.path.worktree_id, cx)
3829                .and_then(|worktree| worktree.read(cx).as_local())
3830                .map(|local_worktree| local_worktree.abs_path())
3831            {
3832                worktree_abs_path
3833            } else {
3834                return Task::ready(Err(anyhow!("worktree not found for symbol")));
3835            };
3836            let symbol_abs_path = worktree_abs_path.join(&symbol.path.path);
3837            let symbol_uri = if let Ok(uri) = lsp::Url::from_file_path(symbol_abs_path) {
3838                uri
3839            } else {
3840                return Task::ready(Err(anyhow!("invalid symbol path")));
3841            };
3842
3843            self.open_local_buffer_via_lsp(
3844                symbol_uri,
3845                language_server_id,
3846                symbol.language_server_name.clone(),
3847                cx,
3848            )
3849        } else if let Some(project_id) = self.remote_id() {
3850            let request = self.client.request(proto::OpenBufferForSymbol {
3851                project_id,
3852                symbol: Some(serialize_symbol(symbol)),
3853            });
3854            cx.spawn(|this, mut cx| async move {
3855                let response = request.await?;
3856                this.update(&mut cx, |this, cx| {
3857                    this.wait_for_remote_buffer(response.buffer_id, cx)
3858                })
3859                .await
3860            })
3861        } else {
3862            Task::ready(Err(anyhow!("project does not have a remote id")))
3863        }
3864    }
3865
3866    pub fn hover<T: ToPointUtf16>(
3867        &self,
3868        buffer: &ModelHandle<Buffer>,
3869        position: T,
3870        cx: &mut ModelContext<Self>,
3871    ) -> Task<Result<Option<Hover>>> {
3872        let position = position.to_point_utf16(buffer.read(cx));
3873        self.request_lsp(buffer.clone(), GetHover { position }, cx)
3874    }
3875
3876    pub fn completions<T: ToPointUtf16>(
3877        &self,
3878        buffer: &ModelHandle<Buffer>,
3879        position: T,
3880        cx: &mut ModelContext<Self>,
3881    ) -> Task<Result<Vec<Completion>>> {
3882        let position = position.to_point_utf16(buffer.read(cx));
3883        self.request_lsp(buffer.clone(), GetCompletions { position }, cx)
3884    }
3885
3886    pub fn apply_additional_edits_for_completion(
3887        &self,
3888        buffer_handle: ModelHandle<Buffer>,
3889        completion: Completion,
3890        push_to_history: bool,
3891        cx: &mut ModelContext<Self>,
3892    ) -> Task<Result<Option<Transaction>>> {
3893        let buffer = buffer_handle.read(cx);
3894        let buffer_id = buffer.remote_id();
3895
3896        if self.is_local() {
3897            let lang_server = match self.primary_language_servers_for_buffer(buffer, cx) {
3898                Some((_, server)) => server.clone(),
3899                _ => return Task::ready(Ok(Default::default())),
3900            };
3901
3902            cx.spawn(|this, mut cx| async move {
3903                let resolved_completion = lang_server
3904                    .request::<lsp::request::ResolveCompletionItem>(completion.lsp_completion)
3905                    .await?;
3906
3907                if let Some(edits) = resolved_completion.additional_text_edits {
3908                    let edits = this
3909                        .update(&mut cx, |this, cx| {
3910                            this.edits_from_lsp(
3911                                &buffer_handle,
3912                                edits,
3913                                lang_server.server_id(),
3914                                None,
3915                                cx,
3916                            )
3917                        })
3918                        .await?;
3919
3920                    buffer_handle.update(&mut cx, |buffer, cx| {
3921                        buffer.finalize_last_transaction();
3922                        buffer.start_transaction();
3923
3924                        for (range, text) in edits {
3925                            let primary = &completion.old_range;
3926                            let start_within = primary.start.cmp(&range.start, buffer).is_le()
3927                                && primary.end.cmp(&range.start, buffer).is_ge();
3928                            let end_within = range.start.cmp(&primary.end, buffer).is_le()
3929                                && range.end.cmp(&primary.end, buffer).is_ge();
3930
3931                            //Skip addtional edits which overlap with the primary completion edit
3932                            //https://github.com/zed-industries/zed/pull/1871
3933                            if !start_within && !end_within {
3934                                buffer.edit([(range, text)], None, cx);
3935                            }
3936                        }
3937
3938                        let transaction = if buffer.end_transaction(cx).is_some() {
3939                            let transaction = buffer.finalize_last_transaction().unwrap().clone();
3940                            if !push_to_history {
3941                                buffer.forget_transaction(transaction.id);
3942                            }
3943                            Some(transaction)
3944                        } else {
3945                            None
3946                        };
3947                        Ok(transaction)
3948                    })
3949                } else {
3950                    Ok(None)
3951                }
3952            })
3953        } else if let Some(project_id) = self.remote_id() {
3954            let client = self.client.clone();
3955            cx.spawn(|_, mut cx| async move {
3956                let response = client
3957                    .request(proto::ApplyCompletionAdditionalEdits {
3958                        project_id,
3959                        buffer_id,
3960                        completion: Some(language::proto::serialize_completion(&completion)),
3961                    })
3962                    .await?;
3963
3964                if let Some(transaction) = response.transaction {
3965                    let transaction = language::proto::deserialize_transaction(transaction)?;
3966                    buffer_handle
3967                        .update(&mut cx, |buffer, _| {
3968                            buffer.wait_for_edits(transaction.edit_ids.iter().copied())
3969                        })
3970                        .await?;
3971                    if push_to_history {
3972                        buffer_handle.update(&mut cx, |buffer, _| {
3973                            buffer.push_transaction(transaction.clone(), Instant::now());
3974                        });
3975                    }
3976                    Ok(Some(transaction))
3977                } else {
3978                    Ok(None)
3979                }
3980            })
3981        } else {
3982            Task::ready(Err(anyhow!("project does not have a remote id")))
3983        }
3984    }
3985
3986    pub fn code_actions<T: Clone + ToOffset>(
3987        &self,
3988        buffer_handle: &ModelHandle<Buffer>,
3989        range: Range<T>,
3990        cx: &mut ModelContext<Self>,
3991    ) -> Task<Result<Vec<CodeAction>>> {
3992        let buffer = buffer_handle.read(cx);
3993        let range = buffer.anchor_before(range.start)..buffer.anchor_before(range.end);
3994        self.request_lsp(buffer_handle.clone(), GetCodeActions { range }, cx)
3995    }
3996
3997    pub fn apply_code_action(
3998        &self,
3999        buffer_handle: ModelHandle<Buffer>,
4000        mut action: CodeAction,
4001        push_to_history: bool,
4002        cx: &mut ModelContext<Self>,
4003    ) -> Task<Result<ProjectTransaction>> {
4004        if self.is_local() {
4005            let buffer = buffer_handle.read(cx);
4006            let (lsp_adapter, lang_server) = if let Some((adapter, server)) =
4007                self.language_server_for_buffer(buffer, action.server_id, cx)
4008            {
4009                (adapter.clone(), server.clone())
4010            } else {
4011                return Task::ready(Ok(Default::default()));
4012            };
4013            let range = action.range.to_point_utf16(buffer);
4014
4015            cx.spawn(|this, mut cx| async move {
4016                if let Some(lsp_range) = action
4017                    .lsp_action
4018                    .data
4019                    .as_mut()
4020                    .and_then(|d| d.get_mut("codeActionParams"))
4021                    .and_then(|d| d.get_mut("range"))
4022                {
4023                    *lsp_range = serde_json::to_value(&range_to_lsp(range)).unwrap();
4024                    action.lsp_action = lang_server
4025                        .request::<lsp::request::CodeActionResolveRequest>(action.lsp_action)
4026                        .await?;
4027                } else {
4028                    let actions = this
4029                        .update(&mut cx, |this, cx| {
4030                            this.code_actions(&buffer_handle, action.range, cx)
4031                        })
4032                        .await?;
4033                    action.lsp_action = actions
4034                        .into_iter()
4035                        .find(|a| a.lsp_action.title == action.lsp_action.title)
4036                        .ok_or_else(|| anyhow!("code action is outdated"))?
4037                        .lsp_action;
4038                }
4039
4040                if let Some(edit) = action.lsp_action.edit {
4041                    if edit.changes.is_some() || edit.document_changes.is_some() {
4042                        return Self::deserialize_workspace_edit(
4043                            this,
4044                            edit,
4045                            push_to_history,
4046                            lsp_adapter.clone(),
4047                            lang_server.clone(),
4048                            &mut cx,
4049                        )
4050                        .await;
4051                    }
4052                }
4053
4054                if let Some(command) = action.lsp_action.command {
4055                    this.update(&mut cx, |this, _| {
4056                        this.last_workspace_edits_by_language_server
4057                            .remove(&lang_server.server_id());
4058                    });
4059                    lang_server
4060                        .request::<lsp::request::ExecuteCommand>(lsp::ExecuteCommandParams {
4061                            command: command.command,
4062                            arguments: command.arguments.unwrap_or_default(),
4063                            ..Default::default()
4064                        })
4065                        .await?;
4066                    return Ok(this.update(&mut cx, |this, _| {
4067                        this.last_workspace_edits_by_language_server
4068                            .remove(&lang_server.server_id())
4069                            .unwrap_or_default()
4070                    }));
4071                }
4072
4073                Ok(ProjectTransaction::default())
4074            })
4075        } else if let Some(project_id) = self.remote_id() {
4076            let client = self.client.clone();
4077            let request = proto::ApplyCodeAction {
4078                project_id,
4079                buffer_id: buffer_handle.read(cx).remote_id(),
4080                action: Some(language::proto::serialize_code_action(&action)),
4081            };
4082            cx.spawn(|this, mut cx| async move {
4083                let response = client
4084                    .request(request)
4085                    .await?
4086                    .transaction
4087                    .ok_or_else(|| anyhow!("missing transaction"))?;
4088                this.update(&mut cx, |this, cx| {
4089                    this.deserialize_project_transaction(response, push_to_history, cx)
4090                })
4091                .await
4092            })
4093        } else {
4094            Task::ready(Err(anyhow!("project does not have a remote id")))
4095        }
4096    }
4097
4098    fn apply_on_type_formatting(
4099        &self,
4100        buffer: ModelHandle<Buffer>,
4101        position: Anchor,
4102        trigger: String,
4103        cx: &mut ModelContext<Self>,
4104    ) -> Task<Result<Option<Transaction>>> {
4105        if self.is_local() {
4106            cx.spawn(|this, mut cx| async move {
4107                // Do not allow multiple concurrent formatting requests for the
4108                // same buffer.
4109                this.update(&mut cx, |this, cx| {
4110                    this.buffers_being_formatted
4111                        .insert(buffer.read(cx).remote_id())
4112                });
4113
4114                let _cleanup = defer({
4115                    let this = this.clone();
4116                    let mut cx = cx.clone();
4117                    let closure_buffer = buffer.clone();
4118                    move || {
4119                        this.update(&mut cx, |this, cx| {
4120                            this.buffers_being_formatted
4121                                .remove(&closure_buffer.read(cx).remote_id());
4122                        });
4123                    }
4124                });
4125
4126                buffer
4127                    .update(&mut cx, |buffer, _| {
4128                        buffer.wait_for_edits(Some(position.timestamp))
4129                    })
4130                    .await?;
4131                this.update(&mut cx, |this, cx| {
4132                    let position = position.to_point_utf16(buffer.read(cx));
4133                    this.on_type_format(buffer, position, trigger, false, cx)
4134                })
4135                .await
4136            })
4137        } else if let Some(project_id) = self.remote_id() {
4138            let client = self.client.clone();
4139            let request = proto::OnTypeFormatting {
4140                project_id,
4141                buffer_id: buffer.read(cx).remote_id(),
4142                position: Some(serialize_anchor(&position)),
4143                trigger,
4144                version: serialize_version(&buffer.read(cx).version()),
4145            };
4146            cx.spawn(|_, _| async move {
4147                client
4148                    .request(request)
4149                    .await?
4150                    .transaction
4151                    .map(language::proto::deserialize_transaction)
4152                    .transpose()
4153            })
4154        } else {
4155            Task::ready(Err(anyhow!("project does not have a remote id")))
4156        }
4157    }
4158
4159    async fn deserialize_edits(
4160        this: ModelHandle<Self>,
4161        buffer_to_edit: ModelHandle<Buffer>,
4162        edits: Vec<lsp::TextEdit>,
4163        push_to_history: bool,
4164        _: Arc<CachedLspAdapter>,
4165        language_server: Arc<LanguageServer>,
4166        cx: &mut AsyncAppContext,
4167    ) -> Result<Option<Transaction>> {
4168        let edits = this
4169            .update(cx, |this, cx| {
4170                this.edits_from_lsp(
4171                    &buffer_to_edit,
4172                    edits,
4173                    language_server.server_id(),
4174                    None,
4175                    cx,
4176                )
4177            })
4178            .await?;
4179
4180        let transaction = buffer_to_edit.update(cx, |buffer, cx| {
4181            buffer.finalize_last_transaction();
4182            buffer.start_transaction();
4183            for (range, text) in edits {
4184                buffer.edit([(range, text)], None, cx);
4185            }
4186
4187            if buffer.end_transaction(cx).is_some() {
4188                let transaction = buffer.finalize_last_transaction().unwrap().clone();
4189                if !push_to_history {
4190                    buffer.forget_transaction(transaction.id);
4191                }
4192                Some(transaction)
4193            } else {
4194                None
4195            }
4196        });
4197
4198        Ok(transaction)
4199    }
4200
4201    async fn deserialize_workspace_edit(
4202        this: ModelHandle<Self>,
4203        edit: lsp::WorkspaceEdit,
4204        push_to_history: bool,
4205        lsp_adapter: Arc<CachedLspAdapter>,
4206        language_server: Arc<LanguageServer>,
4207        cx: &mut AsyncAppContext,
4208    ) -> Result<ProjectTransaction> {
4209        let fs = this.read_with(cx, |this, _| this.fs.clone());
4210        let mut operations = Vec::new();
4211        if let Some(document_changes) = edit.document_changes {
4212            match document_changes {
4213                lsp::DocumentChanges::Edits(edits) => {
4214                    operations.extend(edits.into_iter().map(lsp::DocumentChangeOperation::Edit))
4215                }
4216                lsp::DocumentChanges::Operations(ops) => operations = ops,
4217            }
4218        } else if let Some(changes) = edit.changes {
4219            operations.extend(changes.into_iter().map(|(uri, edits)| {
4220                lsp::DocumentChangeOperation::Edit(lsp::TextDocumentEdit {
4221                    text_document: lsp::OptionalVersionedTextDocumentIdentifier {
4222                        uri,
4223                        version: None,
4224                    },
4225                    edits: edits.into_iter().map(lsp::OneOf::Left).collect(),
4226                })
4227            }));
4228        }
4229
4230        let mut project_transaction = ProjectTransaction::default();
4231        for operation in operations {
4232            match operation {
4233                lsp::DocumentChangeOperation::Op(lsp::ResourceOp::Create(op)) => {
4234                    let abs_path = op
4235                        .uri
4236                        .to_file_path()
4237                        .map_err(|_| anyhow!("can't convert URI to path"))?;
4238
4239                    if let Some(parent_path) = abs_path.parent() {
4240                        fs.create_dir(parent_path).await?;
4241                    }
4242                    if abs_path.ends_with("/") {
4243                        fs.create_dir(&abs_path).await?;
4244                    } else {
4245                        fs.create_file(&abs_path, op.options.map(Into::into).unwrap_or_default())
4246                            .await?;
4247                    }
4248                }
4249
4250                lsp::DocumentChangeOperation::Op(lsp::ResourceOp::Rename(op)) => {
4251                    let source_abs_path = op
4252                        .old_uri
4253                        .to_file_path()
4254                        .map_err(|_| anyhow!("can't convert URI to path"))?;
4255                    let target_abs_path = op
4256                        .new_uri
4257                        .to_file_path()
4258                        .map_err(|_| anyhow!("can't convert URI to path"))?;
4259                    fs.rename(
4260                        &source_abs_path,
4261                        &target_abs_path,
4262                        op.options.map(Into::into).unwrap_or_default(),
4263                    )
4264                    .await?;
4265                }
4266
4267                lsp::DocumentChangeOperation::Op(lsp::ResourceOp::Delete(op)) => {
4268                    let abs_path = op
4269                        .uri
4270                        .to_file_path()
4271                        .map_err(|_| anyhow!("can't convert URI to path"))?;
4272                    let options = op.options.map(Into::into).unwrap_or_default();
4273                    if abs_path.ends_with("/") {
4274                        fs.remove_dir(&abs_path, options).await?;
4275                    } else {
4276                        fs.remove_file(&abs_path, options).await?;
4277                    }
4278                }
4279
4280                lsp::DocumentChangeOperation::Edit(op) => {
4281                    let buffer_to_edit = this
4282                        .update(cx, |this, cx| {
4283                            this.open_local_buffer_via_lsp(
4284                                op.text_document.uri,
4285                                language_server.server_id(),
4286                                lsp_adapter.name.clone(),
4287                                cx,
4288                            )
4289                        })
4290                        .await?;
4291
4292                    let edits = this
4293                        .update(cx, |this, cx| {
4294                            let edits = op.edits.into_iter().map(|edit| match edit {
4295                                lsp::OneOf::Left(edit) => edit,
4296                                lsp::OneOf::Right(edit) => edit.text_edit,
4297                            });
4298                            this.edits_from_lsp(
4299                                &buffer_to_edit,
4300                                edits,
4301                                language_server.server_id(),
4302                                op.text_document.version,
4303                                cx,
4304                            )
4305                        })
4306                        .await?;
4307
4308                    let transaction = buffer_to_edit.update(cx, |buffer, cx| {
4309                        buffer.finalize_last_transaction();
4310                        buffer.start_transaction();
4311                        for (range, text) in edits {
4312                            buffer.edit([(range, text)], None, cx);
4313                        }
4314                        let transaction = if buffer.end_transaction(cx).is_some() {
4315                            let transaction = buffer.finalize_last_transaction().unwrap().clone();
4316                            if !push_to_history {
4317                                buffer.forget_transaction(transaction.id);
4318                            }
4319                            Some(transaction)
4320                        } else {
4321                            None
4322                        };
4323
4324                        transaction
4325                    });
4326                    if let Some(transaction) = transaction {
4327                        project_transaction.0.insert(buffer_to_edit, transaction);
4328                    }
4329                }
4330            }
4331        }
4332
4333        Ok(project_transaction)
4334    }
4335
4336    pub fn prepare_rename<T: ToPointUtf16>(
4337        &self,
4338        buffer: ModelHandle<Buffer>,
4339        position: T,
4340        cx: &mut ModelContext<Self>,
4341    ) -> Task<Result<Option<Range<Anchor>>>> {
4342        let position = position.to_point_utf16(buffer.read(cx));
4343        self.request_lsp(buffer, PrepareRename { position }, cx)
4344    }
4345
4346    pub fn perform_rename<T: ToPointUtf16>(
4347        &self,
4348        buffer: ModelHandle<Buffer>,
4349        position: T,
4350        new_name: String,
4351        push_to_history: bool,
4352        cx: &mut ModelContext<Self>,
4353    ) -> Task<Result<ProjectTransaction>> {
4354        let position = position.to_point_utf16(buffer.read(cx));
4355        self.request_lsp(
4356            buffer,
4357            PerformRename {
4358                position,
4359                new_name,
4360                push_to_history,
4361            },
4362            cx,
4363        )
4364    }
4365
4366    pub fn on_type_format<T: ToPointUtf16>(
4367        &self,
4368        buffer: ModelHandle<Buffer>,
4369        position: T,
4370        trigger: String,
4371        push_to_history: bool,
4372        cx: &mut ModelContext<Self>,
4373    ) -> Task<Result<Option<Transaction>>> {
4374        let tab_size = buffer.read_with(cx, |buffer, cx| {
4375            let language_name = buffer.language().map(|language| language.name());
4376            language_settings(language_name.as_deref(), cx).tab_size
4377        });
4378        let position = position.to_point_utf16(buffer.read(cx));
4379        self.request_lsp(
4380            buffer.clone(),
4381            OnTypeFormatting {
4382                position,
4383                trigger,
4384                options: lsp_command::lsp_formatting_options(tab_size.get()).into(),
4385                push_to_history,
4386            },
4387            cx,
4388        )
4389    }
4390
4391    #[allow(clippy::type_complexity)]
4392    pub fn search(
4393        &self,
4394        query: SearchQuery,
4395        cx: &mut ModelContext<Self>,
4396    ) -> Task<Result<HashMap<ModelHandle<Buffer>, Vec<Range<Anchor>>>>> {
4397        if self.is_local() {
4398            let snapshots = self
4399                .visible_worktrees(cx)
4400                .filter_map(|tree| {
4401                    let tree = tree.read(cx).as_local()?;
4402                    Some(tree.snapshot())
4403                })
4404                .collect::<Vec<_>>();
4405
4406            let background = cx.background().clone();
4407            let path_count: usize = snapshots.iter().map(|s| s.visible_file_count()).sum();
4408            if path_count == 0 {
4409                return Task::ready(Ok(Default::default()));
4410            }
4411            let workers = background.num_cpus().min(path_count);
4412            let (matching_paths_tx, mut matching_paths_rx) = smol::channel::bounded(1024);
4413            cx.background()
4414                .spawn({
4415                    let fs = self.fs.clone();
4416                    let background = cx.background().clone();
4417                    let query = query.clone();
4418                    async move {
4419                        let fs = &fs;
4420                        let query = &query;
4421                        let matching_paths_tx = &matching_paths_tx;
4422                        let paths_per_worker = (path_count + workers - 1) / workers;
4423                        let snapshots = &snapshots;
4424                        background
4425                            .scoped(|scope| {
4426                                for worker_ix in 0..workers {
4427                                    let worker_start_ix = worker_ix * paths_per_worker;
4428                                    let worker_end_ix = worker_start_ix + paths_per_worker;
4429                                    scope.spawn(async move {
4430                                        let mut snapshot_start_ix = 0;
4431                                        let mut abs_path = PathBuf::new();
4432                                        for snapshot in snapshots {
4433                                            let snapshot_end_ix =
4434                                                snapshot_start_ix + snapshot.visible_file_count();
4435                                            if worker_end_ix <= snapshot_start_ix {
4436                                                break;
4437                                            } else if worker_start_ix > snapshot_end_ix {
4438                                                snapshot_start_ix = snapshot_end_ix;
4439                                                continue;
4440                                            } else {
4441                                                let start_in_snapshot = worker_start_ix
4442                                                    .saturating_sub(snapshot_start_ix);
4443                                                let end_in_snapshot =
4444                                                    cmp::min(worker_end_ix, snapshot_end_ix)
4445                                                        - snapshot_start_ix;
4446
4447                                                for entry in snapshot
4448                                                    .files(false, start_in_snapshot)
4449                                                    .take(end_in_snapshot - start_in_snapshot)
4450                                                {
4451                                                    if matching_paths_tx.is_closed() {
4452                                                        break;
4453                                                    }
4454                                                    let matches = if query
4455                                                        .file_matches(Some(&entry.path))
4456                                                    {
4457                                                        abs_path.clear();
4458                                                        abs_path.push(&snapshot.abs_path());
4459                                                        abs_path.push(&entry.path);
4460                                                        if let Some(file) =
4461                                                            fs.open_sync(&abs_path).await.log_err()
4462                                                        {
4463                                                            query.detect(file).unwrap_or(false)
4464                                                        } else {
4465                                                            false
4466                                                        }
4467                                                    } else {
4468                                                        false
4469                                                    };
4470
4471                                                    if matches {
4472                                                        let project_path =
4473                                                            (snapshot.id(), entry.path.clone());
4474                                                        if matching_paths_tx
4475                                                            .send(project_path)
4476                                                            .await
4477                                                            .is_err()
4478                                                        {
4479                                                            break;
4480                                                        }
4481                                                    }
4482                                                }
4483
4484                                                snapshot_start_ix = snapshot_end_ix;
4485                                            }
4486                                        }
4487                                    });
4488                                }
4489                            })
4490                            .await;
4491                    }
4492                })
4493                .detach();
4494
4495            let (buffers_tx, buffers_rx) = smol::channel::bounded(1024);
4496            let open_buffers = self
4497                .opened_buffers
4498                .values()
4499                .filter_map(|b| b.upgrade(cx))
4500                .collect::<HashSet<_>>();
4501            cx.spawn(|this, cx| async move {
4502                for buffer in &open_buffers {
4503                    let snapshot = buffer.read_with(&cx, |buffer, _| buffer.snapshot());
4504                    buffers_tx.send((buffer.clone(), snapshot)).await?;
4505                }
4506
4507                let open_buffers = Rc::new(RefCell::new(open_buffers));
4508                while let Some(project_path) = matching_paths_rx.next().await {
4509                    if buffers_tx.is_closed() {
4510                        break;
4511                    }
4512
4513                    let this = this.clone();
4514                    let open_buffers = open_buffers.clone();
4515                    let buffers_tx = buffers_tx.clone();
4516                    cx.spawn(|mut cx| async move {
4517                        if let Some(buffer) = this
4518                            .update(&mut cx, |this, cx| this.open_buffer(project_path, cx))
4519                            .await
4520                            .log_err()
4521                        {
4522                            if open_buffers.borrow_mut().insert(buffer.clone()) {
4523                                let snapshot = buffer.read_with(&cx, |buffer, _| buffer.snapshot());
4524                                buffers_tx.send((buffer, snapshot)).await?;
4525                            }
4526                        }
4527
4528                        Ok::<_, anyhow::Error>(())
4529                    })
4530                    .detach();
4531                }
4532
4533                Ok::<_, anyhow::Error>(())
4534            })
4535            .detach_and_log_err(cx);
4536
4537            let background = cx.background().clone();
4538            cx.background().spawn(async move {
4539                let query = &query;
4540                let mut matched_buffers = Vec::new();
4541                for _ in 0..workers {
4542                    matched_buffers.push(HashMap::default());
4543                }
4544                background
4545                    .scoped(|scope| {
4546                        for worker_matched_buffers in matched_buffers.iter_mut() {
4547                            let mut buffers_rx = buffers_rx.clone();
4548                            scope.spawn(async move {
4549                                while let Some((buffer, snapshot)) = buffers_rx.next().await {
4550                                    let buffer_matches = if query.file_matches(
4551                                        snapshot.file().map(|file| file.path().as_ref()),
4552                                    ) {
4553                                        query
4554                                            .search(snapshot.as_rope())
4555                                            .await
4556                                            .iter()
4557                                            .map(|range| {
4558                                                snapshot.anchor_before(range.start)
4559                                                    ..snapshot.anchor_after(range.end)
4560                                            })
4561                                            .collect()
4562                                    } else {
4563                                        Vec::new()
4564                                    };
4565                                    if !buffer_matches.is_empty() {
4566                                        worker_matched_buffers
4567                                            .insert(buffer.clone(), buffer_matches);
4568                                    }
4569                                }
4570                            });
4571                        }
4572                    })
4573                    .await;
4574                Ok(matched_buffers.into_iter().flatten().collect())
4575            })
4576        } else if let Some(project_id) = self.remote_id() {
4577            let request = self.client.request(query.to_proto(project_id));
4578            cx.spawn(|this, mut cx| async move {
4579                let response = request.await?;
4580                let mut result = HashMap::default();
4581                for location in response.locations {
4582                    let target_buffer = this
4583                        .update(&mut cx, |this, cx| {
4584                            this.wait_for_remote_buffer(location.buffer_id, cx)
4585                        })
4586                        .await?;
4587                    let start = location
4588                        .start
4589                        .and_then(deserialize_anchor)
4590                        .ok_or_else(|| anyhow!("missing target start"))?;
4591                    let end = location
4592                        .end
4593                        .and_then(deserialize_anchor)
4594                        .ok_or_else(|| anyhow!("missing target end"))?;
4595                    result
4596                        .entry(target_buffer)
4597                        .or_insert(Vec::new())
4598                        .push(start..end)
4599                }
4600                Ok(result)
4601            })
4602        } else {
4603            Task::ready(Ok(Default::default()))
4604        }
4605    }
4606
4607    // TODO: Wire this up to allow selecting a server?
4608    fn request_lsp<R: LspCommand>(
4609        &self,
4610        buffer_handle: ModelHandle<Buffer>,
4611        request: R,
4612        cx: &mut ModelContext<Self>,
4613    ) -> Task<Result<R::Response>>
4614    where
4615        <R::LspRequest as lsp::request::Request>::Result: Send,
4616    {
4617        let buffer = buffer_handle.read(cx);
4618        if self.is_local() {
4619            let file = File::from_dyn(buffer.file()).and_then(File::as_local);
4620            if let Some((file, language_server)) = file.zip(
4621                self.primary_language_servers_for_buffer(buffer, cx)
4622                    .map(|(_, server)| server.clone()),
4623            ) {
4624                let lsp_params = request.to_lsp(&file.abs_path(cx), buffer, &language_server, cx);
4625                return cx.spawn(|this, cx| async move {
4626                    if !request.check_capabilities(language_server.capabilities()) {
4627                        return Ok(Default::default());
4628                    }
4629
4630                    let response = language_server
4631                        .request::<R::LspRequest>(lsp_params)
4632                        .await
4633                        .context("lsp request failed")?;
4634                    request
4635                        .response_from_lsp(
4636                            response,
4637                            this,
4638                            buffer_handle,
4639                            language_server.server_id(),
4640                            cx,
4641                        )
4642                        .await
4643                });
4644            }
4645        } else if let Some(project_id) = self.remote_id() {
4646            let rpc = self.client.clone();
4647            let message = request.to_proto(project_id, buffer);
4648            return cx.spawn_weak(|this, cx| async move {
4649                // Ensure the project is still alive by the time the task
4650                // is scheduled.
4651                this.upgrade(&cx)
4652                    .ok_or_else(|| anyhow!("project dropped"))?;
4653
4654                let response = rpc.request(message).await?;
4655
4656                let this = this
4657                    .upgrade(&cx)
4658                    .ok_or_else(|| anyhow!("project dropped"))?;
4659                if this.read_with(&cx, |this, _| this.is_read_only()) {
4660                    Err(anyhow!("disconnected before completing request"))
4661                } else {
4662                    request
4663                        .response_from_proto(response, this, buffer_handle, cx)
4664                        .await
4665                }
4666            });
4667        }
4668        Task::ready(Ok(Default::default()))
4669    }
4670
4671    pub fn find_or_create_local_worktree(
4672        &mut self,
4673        abs_path: impl AsRef<Path>,
4674        visible: bool,
4675        cx: &mut ModelContext<Self>,
4676    ) -> Task<Result<(ModelHandle<Worktree>, PathBuf)>> {
4677        let abs_path = abs_path.as_ref();
4678        if let Some((tree, relative_path)) = self.find_local_worktree(abs_path, cx) {
4679            Task::ready(Ok((tree, relative_path)))
4680        } else {
4681            let worktree = self.create_local_worktree(abs_path, visible, cx);
4682            cx.foreground()
4683                .spawn(async move { Ok((worktree.await?, PathBuf::new())) })
4684        }
4685    }
4686
4687    pub fn find_local_worktree(
4688        &self,
4689        abs_path: &Path,
4690        cx: &AppContext,
4691    ) -> Option<(ModelHandle<Worktree>, PathBuf)> {
4692        for tree in &self.worktrees {
4693            if let Some(tree) = tree.upgrade(cx) {
4694                if let Some(relative_path) = tree
4695                    .read(cx)
4696                    .as_local()
4697                    .and_then(|t| abs_path.strip_prefix(t.abs_path()).ok())
4698                {
4699                    return Some((tree.clone(), relative_path.into()));
4700                }
4701            }
4702        }
4703        None
4704    }
4705
4706    pub fn is_shared(&self) -> bool {
4707        match &self.client_state {
4708            Some(ProjectClientState::Local { .. }) => true,
4709            _ => false,
4710        }
4711    }
4712
4713    fn create_local_worktree(
4714        &mut self,
4715        abs_path: impl AsRef<Path>,
4716        visible: bool,
4717        cx: &mut ModelContext<Self>,
4718    ) -> Task<Result<ModelHandle<Worktree>>> {
4719        let fs = self.fs.clone();
4720        let client = self.client.clone();
4721        let next_entry_id = self.next_entry_id.clone();
4722        let path: Arc<Path> = abs_path.as_ref().into();
4723        let task = self
4724            .loading_local_worktrees
4725            .entry(path.clone())
4726            .or_insert_with(|| {
4727                cx.spawn(|project, mut cx| {
4728                    async move {
4729                        let worktree = Worktree::local(
4730                            client.clone(),
4731                            path.clone(),
4732                            visible,
4733                            fs,
4734                            next_entry_id,
4735                            &mut cx,
4736                        )
4737                        .await;
4738
4739                        project.update(&mut cx, |project, _| {
4740                            project.loading_local_worktrees.remove(&path);
4741                        });
4742
4743                        let worktree = worktree?;
4744                        project.update(&mut cx, |project, cx| project.add_worktree(&worktree, cx));
4745                        Ok(worktree)
4746                    }
4747                    .map_err(Arc::new)
4748                })
4749                .shared()
4750            })
4751            .clone();
4752        cx.foreground().spawn(async move {
4753            match task.await {
4754                Ok(worktree) => Ok(worktree),
4755                Err(err) => Err(anyhow!("{}", err)),
4756            }
4757        })
4758    }
4759
4760    pub fn remove_worktree(&mut self, id_to_remove: WorktreeId, cx: &mut ModelContext<Self>) {
4761        self.worktrees.retain(|worktree| {
4762            if let Some(worktree) = worktree.upgrade(cx) {
4763                let id = worktree.read(cx).id();
4764                if id == id_to_remove {
4765                    cx.emit(Event::WorktreeRemoved(id));
4766                    false
4767                } else {
4768                    true
4769                }
4770            } else {
4771                false
4772            }
4773        });
4774        self.metadata_changed(cx);
4775    }
4776
4777    fn add_worktree(&mut self, worktree: &ModelHandle<Worktree>, cx: &mut ModelContext<Self>) {
4778        cx.observe(worktree, |_, _, cx| cx.notify()).detach();
4779        if worktree.read(cx).is_local() {
4780            cx.subscribe(worktree, |this, worktree, event, cx| match event {
4781                worktree::Event::UpdatedEntries(changes) => {
4782                    this.update_local_worktree_buffers(&worktree, &changes, cx);
4783                    this.update_local_worktree_language_servers(&worktree, changes, cx);
4784                }
4785                worktree::Event::UpdatedGitRepositories(updated_repos) => {
4786                    this.update_local_worktree_buffers_git_repos(worktree, updated_repos, cx)
4787                }
4788            })
4789            .detach();
4790        }
4791
4792        let push_strong_handle = {
4793            let worktree = worktree.read(cx);
4794            self.is_shared() || worktree.is_visible() || worktree.is_remote()
4795        };
4796        if push_strong_handle {
4797            self.worktrees
4798                .push(WorktreeHandle::Strong(worktree.clone()));
4799        } else {
4800            self.worktrees
4801                .push(WorktreeHandle::Weak(worktree.downgrade()));
4802        }
4803
4804        cx.observe_release(worktree, |this, worktree, cx| {
4805            let _ = this.remove_worktree(worktree.id(), cx);
4806        })
4807        .detach();
4808
4809        cx.emit(Event::WorktreeAdded);
4810        self.metadata_changed(cx);
4811    }
4812
4813    fn update_local_worktree_buffers(
4814        &mut self,
4815        worktree_handle: &ModelHandle<Worktree>,
4816        changes: &HashMap<(Arc<Path>, ProjectEntryId), PathChange>,
4817        cx: &mut ModelContext<Self>,
4818    ) {
4819        let snapshot = worktree_handle.read(cx).snapshot();
4820
4821        let mut renamed_buffers = Vec::new();
4822        for (path, entry_id) in changes.keys() {
4823            let worktree_id = worktree_handle.read(cx).id();
4824            let project_path = ProjectPath {
4825                worktree_id,
4826                path: path.clone(),
4827            };
4828
4829            let buffer_id = match self.local_buffer_ids_by_entry_id.get(entry_id) {
4830                Some(&buffer_id) => buffer_id,
4831                None => match self.local_buffer_ids_by_path.get(&project_path) {
4832                    Some(&buffer_id) => buffer_id,
4833                    None => continue,
4834                },
4835            };
4836
4837            let open_buffer = self.opened_buffers.get(&buffer_id);
4838            let buffer = if let Some(buffer) = open_buffer.and_then(|buffer| buffer.upgrade(cx)) {
4839                buffer
4840            } else {
4841                self.opened_buffers.remove(&buffer_id);
4842                self.local_buffer_ids_by_path.remove(&project_path);
4843                self.local_buffer_ids_by_entry_id.remove(entry_id);
4844                continue;
4845            };
4846
4847            buffer.update(cx, |buffer, cx| {
4848                if let Some(old_file) = File::from_dyn(buffer.file()) {
4849                    if old_file.worktree != *worktree_handle {
4850                        return;
4851                    }
4852
4853                    let new_file = if let Some(entry) = snapshot.entry_for_id(old_file.entry_id) {
4854                        File {
4855                            is_local: true,
4856                            entry_id: entry.id,
4857                            mtime: entry.mtime,
4858                            path: entry.path.clone(),
4859                            worktree: worktree_handle.clone(),
4860                            is_deleted: false,
4861                        }
4862                    } else if let Some(entry) = snapshot.entry_for_path(old_file.path().as_ref()) {
4863                        File {
4864                            is_local: true,
4865                            entry_id: entry.id,
4866                            mtime: entry.mtime,
4867                            path: entry.path.clone(),
4868                            worktree: worktree_handle.clone(),
4869                            is_deleted: false,
4870                        }
4871                    } else {
4872                        File {
4873                            is_local: true,
4874                            entry_id: old_file.entry_id,
4875                            path: old_file.path().clone(),
4876                            mtime: old_file.mtime(),
4877                            worktree: worktree_handle.clone(),
4878                            is_deleted: true,
4879                        }
4880                    };
4881
4882                    let old_path = old_file.abs_path(cx);
4883                    if new_file.abs_path(cx) != old_path {
4884                        renamed_buffers.push((cx.handle(), old_file.clone()));
4885                        self.local_buffer_ids_by_path.remove(&project_path);
4886                        self.local_buffer_ids_by_path.insert(
4887                            ProjectPath {
4888                                worktree_id,
4889                                path: path.clone(),
4890                            },
4891                            buffer_id,
4892                        );
4893                    }
4894
4895                    if new_file.entry_id != *entry_id {
4896                        self.local_buffer_ids_by_entry_id.remove(entry_id);
4897                        self.local_buffer_ids_by_entry_id
4898                            .insert(new_file.entry_id, buffer_id);
4899                    }
4900
4901                    if new_file != *old_file {
4902                        if let Some(project_id) = self.remote_id() {
4903                            self.client
4904                                .send(proto::UpdateBufferFile {
4905                                    project_id,
4906                                    buffer_id: buffer_id as u64,
4907                                    file: Some(new_file.to_proto()),
4908                                })
4909                                .log_err();
4910                        }
4911
4912                        buffer.file_updated(Arc::new(new_file), cx).detach();
4913                    }
4914                }
4915            });
4916        }
4917
4918        for (buffer, old_file) in renamed_buffers {
4919            self.unregister_buffer_from_language_servers(&buffer, &old_file, cx);
4920            self.detect_language_for_buffer(&buffer, cx);
4921            self.register_buffer_with_language_servers(&buffer, cx);
4922        }
4923    }
4924
4925    fn update_local_worktree_language_servers(
4926        &mut self,
4927        worktree_handle: &ModelHandle<Worktree>,
4928        changes: &HashMap<(Arc<Path>, ProjectEntryId), PathChange>,
4929        cx: &mut ModelContext<Self>,
4930    ) {
4931        if changes.is_empty() {
4932            return;
4933        }
4934
4935        let worktree_id = worktree_handle.read(cx).id();
4936        let mut language_server_ids = self
4937            .language_server_ids
4938            .iter()
4939            .filter_map(|((server_worktree_id, _), server_id)| {
4940                (*server_worktree_id == worktree_id).then_some(*server_id)
4941            })
4942            .collect::<Vec<_>>();
4943        language_server_ids.sort();
4944        language_server_ids.dedup();
4945
4946        let abs_path = worktree_handle.read(cx).abs_path();
4947        for server_id in &language_server_ids {
4948            if let Some(server) = self.language_servers.get(server_id) {
4949                if let LanguageServerState::Running {
4950                    server,
4951                    watched_paths,
4952                    ..
4953                } = server
4954                {
4955                    if let Some(watched_paths) = watched_paths.get(&worktree_id) {
4956                        let params = lsp::DidChangeWatchedFilesParams {
4957                            changes: changes
4958                                .iter()
4959                                .filter_map(|((path, _), change)| {
4960                                    if watched_paths.is_match(&path) {
4961                                        Some(lsp::FileEvent {
4962                                            uri: lsp::Url::from_file_path(abs_path.join(path))
4963                                                .unwrap(),
4964                                            typ: match change {
4965                                                PathChange::Added => lsp::FileChangeType::CREATED,
4966                                                PathChange::Removed => lsp::FileChangeType::DELETED,
4967                                                PathChange::Updated
4968                                                | PathChange::AddedOrUpdated => {
4969                                                    lsp::FileChangeType::CHANGED
4970                                                }
4971                                            },
4972                                        })
4973                                    } else {
4974                                        None
4975                                    }
4976                                })
4977                                .collect(),
4978                        };
4979
4980                        if !params.changes.is_empty() {
4981                            server
4982                                .notify::<lsp::notification::DidChangeWatchedFiles>(params)
4983                                .log_err();
4984                        }
4985                    }
4986                }
4987            }
4988        }
4989    }
4990
4991    fn update_local_worktree_buffers_git_repos(
4992        &mut self,
4993        worktree_handle: ModelHandle<Worktree>,
4994        repos: &HashMap<Arc<Path>, LocalRepositoryEntry>,
4995        cx: &mut ModelContext<Self>,
4996    ) {
4997        debug_assert!(worktree_handle.read(cx).is_local());
4998
4999        // Setup the pending buffers
5000        let future_buffers = self
5001            .loading_buffers_by_path
5002            .iter()
5003            .filter_map(|(path, receiver)| {
5004                let path = &path.path;
5005                let (work_directory, repo) = repos
5006                    .iter()
5007                    .find(|(work_directory, _)| path.starts_with(work_directory))?;
5008
5009                let repo_relative_path = path.strip_prefix(work_directory).log_err()?;
5010
5011                let receiver = receiver.clone();
5012                let repo_ptr = repo.repo_ptr.clone();
5013                let repo_relative_path = repo_relative_path.to_owned();
5014                Some(async move {
5015                    pump_loading_buffer_reciever(receiver)
5016                        .await
5017                        .ok()
5018                        .map(|buffer| (buffer, repo_relative_path, repo_ptr))
5019                })
5020            })
5021            .collect::<FuturesUnordered<_>>()
5022            .filter_map(|result| async move {
5023                let (buffer_handle, repo_relative_path, repo_ptr) = result?;
5024
5025                let lock = repo_ptr.lock();
5026                lock.load_index_text(&repo_relative_path)
5027                    .map(|diff_base| (diff_base, buffer_handle))
5028            });
5029
5030        let update_diff_base_fn = update_diff_base(self);
5031        cx.spawn(|_, mut cx| async move {
5032            let diff_base_tasks = cx
5033                .background()
5034                .spawn(future_buffers.collect::<Vec<_>>())
5035                .await;
5036
5037            for (diff_base, buffer) in diff_base_tasks.into_iter() {
5038                update_diff_base_fn(Some(diff_base), buffer, &mut cx);
5039            }
5040        })
5041        .detach();
5042
5043        // And the current buffers
5044        for (_, buffer) in &self.opened_buffers {
5045            if let Some(buffer) = buffer.upgrade(cx) {
5046                let file = match File::from_dyn(buffer.read(cx).file()) {
5047                    Some(file) => file,
5048                    None => continue,
5049                };
5050                if file.worktree != worktree_handle {
5051                    continue;
5052                }
5053
5054                let path = file.path().clone();
5055
5056                let worktree = worktree_handle.read(cx);
5057
5058                let (work_directory, repo) = match repos
5059                    .iter()
5060                    .find(|(work_directory, _)| path.starts_with(work_directory))
5061                {
5062                    Some(repo) => repo.clone(),
5063                    None => continue,
5064                };
5065
5066                let relative_repo = match path.strip_prefix(work_directory).log_err() {
5067                    Some(relative_repo) => relative_repo.to_owned(),
5068                    None => continue,
5069                };
5070
5071                drop(worktree);
5072
5073                let update_diff_base_fn = update_diff_base(self);
5074                let git_ptr = repo.repo_ptr.clone();
5075                let diff_base_task = cx
5076                    .background()
5077                    .spawn(async move { git_ptr.lock().load_index_text(&relative_repo) });
5078
5079                cx.spawn(|_, mut cx| async move {
5080                    let diff_base = diff_base_task.await;
5081                    update_diff_base_fn(diff_base, buffer, &mut cx);
5082                })
5083                .detach();
5084            }
5085        }
5086    }
5087
5088    pub fn set_active_path(&mut self, entry: Option<ProjectPath>, cx: &mut ModelContext<Self>) {
5089        let new_active_entry = entry.and_then(|project_path| {
5090            let worktree = self.worktree_for_id(project_path.worktree_id, cx)?;
5091            let entry = worktree.read(cx).entry_for_path(project_path.path)?;
5092            Some(entry.id)
5093        });
5094        if new_active_entry != self.active_entry {
5095            self.active_entry = new_active_entry;
5096            cx.emit(Event::ActiveEntryChanged(new_active_entry));
5097        }
5098    }
5099
5100    pub fn language_servers_running_disk_based_diagnostics(
5101        &self,
5102    ) -> impl Iterator<Item = LanguageServerId> + '_ {
5103        self.language_server_statuses
5104            .iter()
5105            .filter_map(|(id, status)| {
5106                if status.has_pending_diagnostic_updates {
5107                    Some(*id)
5108                } else {
5109                    None
5110                }
5111            })
5112    }
5113
5114    pub fn diagnostic_summary(&self, cx: &AppContext) -> DiagnosticSummary {
5115        let mut summary = DiagnosticSummary::default();
5116        for (_, _, path_summary) in self.diagnostic_summaries(cx) {
5117            summary.error_count += path_summary.error_count;
5118            summary.warning_count += path_summary.warning_count;
5119        }
5120        summary
5121    }
5122
5123    pub fn diagnostic_summaries<'a>(
5124        &'a self,
5125        cx: &'a AppContext,
5126    ) -> impl Iterator<Item = (ProjectPath, LanguageServerId, DiagnosticSummary)> + 'a {
5127        self.visible_worktrees(cx).flat_map(move |worktree| {
5128            let worktree = worktree.read(cx);
5129            let worktree_id = worktree.id();
5130            worktree
5131                .diagnostic_summaries()
5132                .map(move |(path, server_id, summary)| {
5133                    (ProjectPath { worktree_id, path }, server_id, summary)
5134                })
5135        })
5136    }
5137
5138    pub fn disk_based_diagnostics_started(
5139        &mut self,
5140        language_server_id: LanguageServerId,
5141        cx: &mut ModelContext<Self>,
5142    ) {
5143        cx.emit(Event::DiskBasedDiagnosticsStarted { language_server_id });
5144    }
5145
5146    pub fn disk_based_diagnostics_finished(
5147        &mut self,
5148        language_server_id: LanguageServerId,
5149        cx: &mut ModelContext<Self>,
5150    ) {
5151        cx.emit(Event::DiskBasedDiagnosticsFinished { language_server_id });
5152    }
5153
5154    pub fn active_entry(&self) -> Option<ProjectEntryId> {
5155        self.active_entry
5156    }
5157
5158    pub fn entry_for_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Entry> {
5159        self.worktree_for_id(path.worktree_id, cx)?
5160            .read(cx)
5161            .entry_for_path(&path.path)
5162            .cloned()
5163    }
5164
5165    pub fn path_for_entry(&self, entry_id: ProjectEntryId, cx: &AppContext) -> Option<ProjectPath> {
5166        let worktree = self.worktree_for_entry(entry_id, cx)?;
5167        let worktree = worktree.read(cx);
5168        let worktree_id = worktree.id();
5169        let path = worktree.entry_for_id(entry_id)?.path.clone();
5170        Some(ProjectPath { worktree_id, path })
5171    }
5172
5173    // RPC message handlers
5174
5175    async fn handle_unshare_project(
5176        this: ModelHandle<Self>,
5177        _: TypedEnvelope<proto::UnshareProject>,
5178        _: Arc<Client>,
5179        mut cx: AsyncAppContext,
5180    ) -> Result<()> {
5181        this.update(&mut cx, |this, cx| {
5182            if this.is_local() {
5183                this.unshare(cx)?;
5184            } else {
5185                this.disconnected_from_host(cx);
5186            }
5187            Ok(())
5188        })
5189    }
5190
5191    async fn handle_add_collaborator(
5192        this: ModelHandle<Self>,
5193        mut envelope: TypedEnvelope<proto::AddProjectCollaborator>,
5194        _: Arc<Client>,
5195        mut cx: AsyncAppContext,
5196    ) -> Result<()> {
5197        let collaborator = envelope
5198            .payload
5199            .collaborator
5200            .take()
5201            .ok_or_else(|| anyhow!("empty collaborator"))?;
5202
5203        let collaborator = Collaborator::from_proto(collaborator)?;
5204        this.update(&mut cx, |this, cx| {
5205            this.shared_buffers.remove(&collaborator.peer_id);
5206            this.collaborators
5207                .insert(collaborator.peer_id, collaborator);
5208            cx.notify();
5209        });
5210
5211        Ok(())
5212    }
5213
5214    async fn handle_update_project_collaborator(
5215        this: ModelHandle<Self>,
5216        envelope: TypedEnvelope<proto::UpdateProjectCollaborator>,
5217        _: Arc<Client>,
5218        mut cx: AsyncAppContext,
5219    ) -> Result<()> {
5220        let old_peer_id = envelope
5221            .payload
5222            .old_peer_id
5223            .ok_or_else(|| anyhow!("missing old peer id"))?;
5224        let new_peer_id = envelope
5225            .payload
5226            .new_peer_id
5227            .ok_or_else(|| anyhow!("missing new peer id"))?;
5228        this.update(&mut cx, |this, cx| {
5229            let collaborator = this
5230                .collaborators
5231                .remove(&old_peer_id)
5232                .ok_or_else(|| anyhow!("received UpdateProjectCollaborator for unknown peer"))?;
5233            let is_host = collaborator.replica_id == 0;
5234            this.collaborators.insert(new_peer_id, collaborator);
5235
5236            let buffers = this.shared_buffers.remove(&old_peer_id);
5237            log::info!(
5238                "peer {} became {}. moving buffers {:?}",
5239                old_peer_id,
5240                new_peer_id,
5241                &buffers
5242            );
5243            if let Some(buffers) = buffers {
5244                this.shared_buffers.insert(new_peer_id, buffers);
5245            }
5246
5247            if is_host {
5248                this.opened_buffers
5249                    .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
5250                this.buffer_ordered_messages_tx
5251                    .unbounded_send(BufferOrderedMessage::Resync)
5252                    .unwrap();
5253            }
5254
5255            cx.emit(Event::CollaboratorUpdated {
5256                old_peer_id,
5257                new_peer_id,
5258            });
5259            cx.notify();
5260            Ok(())
5261        })
5262    }
5263
5264    async fn handle_remove_collaborator(
5265        this: ModelHandle<Self>,
5266        envelope: TypedEnvelope<proto::RemoveProjectCollaborator>,
5267        _: Arc<Client>,
5268        mut cx: AsyncAppContext,
5269    ) -> Result<()> {
5270        this.update(&mut cx, |this, cx| {
5271            let peer_id = envelope
5272                .payload
5273                .peer_id
5274                .ok_or_else(|| anyhow!("invalid peer id"))?;
5275            let replica_id = this
5276                .collaborators
5277                .remove(&peer_id)
5278                .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?
5279                .replica_id;
5280            for buffer in this.opened_buffers.values() {
5281                if let Some(buffer) = buffer.upgrade(cx) {
5282                    buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
5283                }
5284            }
5285            this.shared_buffers.remove(&peer_id);
5286
5287            cx.emit(Event::CollaboratorLeft(peer_id));
5288            cx.notify();
5289            Ok(())
5290        })
5291    }
5292
5293    async fn handle_update_project(
5294        this: ModelHandle<Self>,
5295        envelope: TypedEnvelope<proto::UpdateProject>,
5296        _: Arc<Client>,
5297        mut cx: AsyncAppContext,
5298    ) -> Result<()> {
5299        this.update(&mut cx, |this, cx| {
5300            // Don't handle messages that were sent before the response to us joining the project
5301            if envelope.message_id > this.join_project_response_message_id {
5302                this.set_worktrees_from_proto(envelope.payload.worktrees, cx)?;
5303            }
5304            Ok(())
5305        })
5306    }
5307
5308    async fn handle_update_worktree(
5309        this: ModelHandle<Self>,
5310        envelope: TypedEnvelope<proto::UpdateWorktree>,
5311        _: Arc<Client>,
5312        mut cx: AsyncAppContext,
5313    ) -> Result<()> {
5314        this.update(&mut cx, |this, cx| {
5315            let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
5316            if let Some(worktree) = this.worktree_for_id(worktree_id, cx) {
5317                worktree.update(cx, |worktree, _| {
5318                    let worktree = worktree.as_remote_mut().unwrap();
5319                    worktree.update_from_remote(envelope.payload);
5320                });
5321            }
5322            Ok(())
5323        })
5324    }
5325
5326    async fn handle_create_project_entry(
5327        this: ModelHandle<Self>,
5328        envelope: TypedEnvelope<proto::CreateProjectEntry>,
5329        _: Arc<Client>,
5330        mut cx: AsyncAppContext,
5331    ) -> Result<proto::ProjectEntryResponse> {
5332        let worktree = this.update(&mut cx, |this, cx| {
5333            let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
5334            this.worktree_for_id(worktree_id, cx)
5335                .ok_or_else(|| anyhow!("worktree not found"))
5336        })?;
5337        let worktree_scan_id = worktree.read_with(&cx, |worktree, _| worktree.scan_id());
5338        let entry = worktree
5339            .update(&mut cx, |worktree, cx| {
5340                let worktree = worktree.as_local_mut().unwrap();
5341                let path = PathBuf::from(envelope.payload.path);
5342                worktree.create_entry(path, envelope.payload.is_directory, cx)
5343            })
5344            .await?;
5345        Ok(proto::ProjectEntryResponse {
5346            entry: Some((&entry).into()),
5347            worktree_scan_id: worktree_scan_id as u64,
5348        })
5349    }
5350
5351    async fn handle_rename_project_entry(
5352        this: ModelHandle<Self>,
5353        envelope: TypedEnvelope<proto::RenameProjectEntry>,
5354        _: Arc<Client>,
5355        mut cx: AsyncAppContext,
5356    ) -> Result<proto::ProjectEntryResponse> {
5357        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
5358        let worktree = this.read_with(&cx, |this, cx| {
5359            this.worktree_for_entry(entry_id, cx)
5360                .ok_or_else(|| anyhow!("worktree not found"))
5361        })?;
5362        let worktree_scan_id = worktree.read_with(&cx, |worktree, _| worktree.scan_id());
5363        let entry = worktree
5364            .update(&mut cx, |worktree, cx| {
5365                let new_path = PathBuf::from(envelope.payload.new_path);
5366                worktree
5367                    .as_local_mut()
5368                    .unwrap()
5369                    .rename_entry(entry_id, new_path, cx)
5370                    .ok_or_else(|| anyhow!("invalid entry"))
5371            })?
5372            .await?;
5373        Ok(proto::ProjectEntryResponse {
5374            entry: Some((&entry).into()),
5375            worktree_scan_id: worktree_scan_id as u64,
5376        })
5377    }
5378
5379    async fn handle_copy_project_entry(
5380        this: ModelHandle<Self>,
5381        envelope: TypedEnvelope<proto::CopyProjectEntry>,
5382        _: Arc<Client>,
5383        mut cx: AsyncAppContext,
5384    ) -> Result<proto::ProjectEntryResponse> {
5385        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
5386        let worktree = this.read_with(&cx, |this, cx| {
5387            this.worktree_for_entry(entry_id, cx)
5388                .ok_or_else(|| anyhow!("worktree not found"))
5389        })?;
5390        let worktree_scan_id = worktree.read_with(&cx, |worktree, _| worktree.scan_id());
5391        let entry = worktree
5392            .update(&mut cx, |worktree, cx| {
5393                let new_path = PathBuf::from(envelope.payload.new_path);
5394                worktree
5395                    .as_local_mut()
5396                    .unwrap()
5397                    .copy_entry(entry_id, new_path, cx)
5398                    .ok_or_else(|| anyhow!("invalid entry"))
5399            })?
5400            .await?;
5401        Ok(proto::ProjectEntryResponse {
5402            entry: Some((&entry).into()),
5403            worktree_scan_id: worktree_scan_id as u64,
5404        })
5405    }
5406
5407    async fn handle_delete_project_entry(
5408        this: ModelHandle<Self>,
5409        envelope: TypedEnvelope<proto::DeleteProjectEntry>,
5410        _: Arc<Client>,
5411        mut cx: AsyncAppContext,
5412    ) -> Result<proto::ProjectEntryResponse> {
5413        let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
5414
5415        this.update(&mut cx, |_, cx| cx.emit(Event::DeletedEntry(entry_id)));
5416
5417        let worktree = this.read_with(&cx, |this, cx| {
5418            this.worktree_for_entry(entry_id, cx)
5419                .ok_or_else(|| anyhow!("worktree not found"))
5420        })?;
5421        let worktree_scan_id = worktree.read_with(&cx, |worktree, _| worktree.scan_id());
5422        worktree
5423            .update(&mut cx, |worktree, cx| {
5424                worktree
5425                    .as_local_mut()
5426                    .unwrap()
5427                    .delete_entry(entry_id, cx)
5428                    .ok_or_else(|| anyhow!("invalid entry"))
5429            })?
5430            .await?;
5431        Ok(proto::ProjectEntryResponse {
5432            entry: None,
5433            worktree_scan_id: worktree_scan_id as u64,
5434        })
5435    }
5436
5437    async fn handle_update_diagnostic_summary(
5438        this: ModelHandle<Self>,
5439        envelope: TypedEnvelope<proto::UpdateDiagnosticSummary>,
5440        _: Arc<Client>,
5441        mut cx: AsyncAppContext,
5442    ) -> Result<()> {
5443        this.update(&mut cx, |this, cx| {
5444            let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
5445            if let Some(worktree) = this.worktree_for_id(worktree_id, cx) {
5446                if let Some(summary) = envelope.payload.summary {
5447                    let project_path = ProjectPath {
5448                        worktree_id,
5449                        path: Path::new(&summary.path).into(),
5450                    };
5451                    worktree.update(cx, |worktree, _| {
5452                        worktree
5453                            .as_remote_mut()
5454                            .unwrap()
5455                            .update_diagnostic_summary(project_path.path.clone(), &summary);
5456                    });
5457                    cx.emit(Event::DiagnosticsUpdated {
5458                        language_server_id: LanguageServerId(summary.language_server_id as usize),
5459                        path: project_path,
5460                    });
5461                }
5462            }
5463            Ok(())
5464        })
5465    }
5466
5467    async fn handle_start_language_server(
5468        this: ModelHandle<Self>,
5469        envelope: TypedEnvelope<proto::StartLanguageServer>,
5470        _: Arc<Client>,
5471        mut cx: AsyncAppContext,
5472    ) -> Result<()> {
5473        let server = envelope
5474            .payload
5475            .server
5476            .ok_or_else(|| anyhow!("invalid server"))?;
5477        this.update(&mut cx, |this, cx| {
5478            this.language_server_statuses.insert(
5479                LanguageServerId(server.id as usize),
5480                LanguageServerStatus {
5481                    name: server.name,
5482                    pending_work: Default::default(),
5483                    has_pending_diagnostic_updates: false,
5484                    progress_tokens: Default::default(),
5485                },
5486            );
5487            cx.notify();
5488        });
5489        Ok(())
5490    }
5491
5492    async fn handle_update_language_server(
5493        this: ModelHandle<Self>,
5494        envelope: TypedEnvelope<proto::UpdateLanguageServer>,
5495        _: Arc<Client>,
5496        mut cx: AsyncAppContext,
5497    ) -> Result<()> {
5498        this.update(&mut cx, |this, cx| {
5499            let language_server_id = LanguageServerId(envelope.payload.language_server_id as usize);
5500
5501            match envelope
5502                .payload
5503                .variant
5504                .ok_or_else(|| anyhow!("invalid variant"))?
5505            {
5506                proto::update_language_server::Variant::WorkStart(payload) => {
5507                    this.on_lsp_work_start(
5508                        language_server_id,
5509                        payload.token,
5510                        LanguageServerProgress {
5511                            message: payload.message,
5512                            percentage: payload.percentage.map(|p| p as usize),
5513                            last_update_at: Instant::now(),
5514                        },
5515                        cx,
5516                    );
5517                }
5518
5519                proto::update_language_server::Variant::WorkProgress(payload) => {
5520                    this.on_lsp_work_progress(
5521                        language_server_id,
5522                        payload.token,
5523                        LanguageServerProgress {
5524                            message: payload.message,
5525                            percentage: payload.percentage.map(|p| p as usize),
5526                            last_update_at: Instant::now(),
5527                        },
5528                        cx,
5529                    );
5530                }
5531
5532                proto::update_language_server::Variant::WorkEnd(payload) => {
5533                    this.on_lsp_work_end(language_server_id, payload.token, cx);
5534                }
5535
5536                proto::update_language_server::Variant::DiskBasedDiagnosticsUpdating(_) => {
5537                    this.disk_based_diagnostics_started(language_server_id, cx);
5538                }
5539
5540                proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(_) => {
5541                    this.disk_based_diagnostics_finished(language_server_id, cx)
5542                }
5543            }
5544
5545            Ok(())
5546        })
5547    }
5548
5549    async fn handle_update_buffer(
5550        this: ModelHandle<Self>,
5551        envelope: TypedEnvelope<proto::UpdateBuffer>,
5552        _: Arc<Client>,
5553        mut cx: AsyncAppContext,
5554    ) -> Result<proto::Ack> {
5555        this.update(&mut cx, |this, cx| {
5556            let payload = envelope.payload.clone();
5557            let buffer_id = payload.buffer_id;
5558            let ops = payload
5559                .operations
5560                .into_iter()
5561                .map(language::proto::deserialize_operation)
5562                .collect::<Result<Vec<_>, _>>()?;
5563            let is_remote = this.is_remote();
5564            match this.opened_buffers.entry(buffer_id) {
5565                hash_map::Entry::Occupied(mut e) => match e.get_mut() {
5566                    OpenBuffer::Strong(buffer) => {
5567                        buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
5568                    }
5569                    OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
5570                    OpenBuffer::Weak(_) => {}
5571                },
5572                hash_map::Entry::Vacant(e) => {
5573                    assert!(
5574                        is_remote,
5575                        "received buffer update from {:?}",
5576                        envelope.original_sender_id
5577                    );
5578                    e.insert(OpenBuffer::Operations(ops));
5579                }
5580            }
5581            Ok(proto::Ack {})
5582        })
5583    }
5584
5585    async fn handle_create_buffer_for_peer(
5586        this: ModelHandle<Self>,
5587        envelope: TypedEnvelope<proto::CreateBufferForPeer>,
5588        _: Arc<Client>,
5589        mut cx: AsyncAppContext,
5590    ) -> Result<()> {
5591        this.update(&mut cx, |this, cx| {
5592            match envelope
5593                .payload
5594                .variant
5595                .ok_or_else(|| anyhow!("missing variant"))?
5596            {
5597                proto::create_buffer_for_peer::Variant::State(mut state) => {
5598                    let mut buffer_file = None;
5599                    if let Some(file) = state.file.take() {
5600                        let worktree_id = WorktreeId::from_proto(file.worktree_id);
5601                        let worktree = this.worktree_for_id(worktree_id, cx).ok_or_else(|| {
5602                            anyhow!("no worktree found for id {}", file.worktree_id)
5603                        })?;
5604                        buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
5605                            as Arc<dyn language::File>);
5606                    }
5607
5608                    let buffer_id = state.id;
5609                    let buffer = cx.add_model(|_| {
5610                        Buffer::from_proto(this.replica_id(), state, buffer_file).unwrap()
5611                    });
5612                    this.incomplete_remote_buffers
5613                        .insert(buffer_id, Some(buffer));
5614                }
5615                proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
5616                    let buffer = this
5617                        .incomplete_remote_buffers
5618                        .get(&chunk.buffer_id)
5619                        .cloned()
5620                        .flatten()
5621                        .ok_or_else(|| {
5622                            anyhow!(
5623                                "received chunk for buffer {} without initial state",
5624                                chunk.buffer_id
5625                            )
5626                        })?;
5627                    let operations = chunk
5628                        .operations
5629                        .into_iter()
5630                        .map(language::proto::deserialize_operation)
5631                        .collect::<Result<Vec<_>>>()?;
5632                    buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))?;
5633
5634                    if chunk.is_last {
5635                        this.incomplete_remote_buffers.remove(&chunk.buffer_id);
5636                        this.register_buffer(&buffer, cx)?;
5637                    }
5638                }
5639            }
5640
5641            Ok(())
5642        })
5643    }
5644
5645    async fn handle_update_diff_base(
5646        this: ModelHandle<Self>,
5647        envelope: TypedEnvelope<proto::UpdateDiffBase>,
5648        _: Arc<Client>,
5649        mut cx: AsyncAppContext,
5650    ) -> Result<()> {
5651        this.update(&mut cx, |this, cx| {
5652            let buffer_id = envelope.payload.buffer_id;
5653            let diff_base = envelope.payload.diff_base;
5654            if let Some(buffer) = this
5655                .opened_buffers
5656                .get_mut(&buffer_id)
5657                .and_then(|b| b.upgrade(cx))
5658                .or_else(|| {
5659                    this.incomplete_remote_buffers
5660                        .get(&buffer_id)
5661                        .cloned()
5662                        .flatten()
5663                })
5664            {
5665                buffer.update(cx, |buffer, cx| buffer.set_diff_base(diff_base, cx));
5666            }
5667            Ok(())
5668        })
5669    }
5670
5671    async fn handle_update_buffer_file(
5672        this: ModelHandle<Self>,
5673        envelope: TypedEnvelope<proto::UpdateBufferFile>,
5674        _: Arc<Client>,
5675        mut cx: AsyncAppContext,
5676    ) -> Result<()> {
5677        let buffer_id = envelope.payload.buffer_id;
5678
5679        this.update(&mut cx, |this, cx| {
5680            let payload = envelope.payload.clone();
5681            if let Some(buffer) = this
5682                .opened_buffers
5683                .get(&buffer_id)
5684                .and_then(|b| b.upgrade(cx))
5685                .or_else(|| {
5686                    this.incomplete_remote_buffers
5687                        .get(&buffer_id)
5688                        .cloned()
5689                        .flatten()
5690                })
5691            {
5692                let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
5693                let worktree = this
5694                    .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
5695                    .ok_or_else(|| anyhow!("no such worktree"))?;
5696                let file = File::from_proto(file, worktree, cx)?;
5697                buffer.update(cx, |buffer, cx| {
5698                    buffer.file_updated(Arc::new(file), cx).detach();
5699                });
5700                this.detect_language_for_buffer(&buffer, cx);
5701            }
5702            Ok(())
5703        })
5704    }
5705
5706    async fn handle_save_buffer(
5707        this: ModelHandle<Self>,
5708        envelope: TypedEnvelope<proto::SaveBuffer>,
5709        _: Arc<Client>,
5710        mut cx: AsyncAppContext,
5711    ) -> Result<proto::BufferSaved> {
5712        let buffer_id = envelope.payload.buffer_id;
5713        let (project_id, buffer) = this.update(&mut cx, |this, cx| {
5714            let project_id = this.remote_id().ok_or_else(|| anyhow!("not connected"))?;
5715            let buffer = this
5716                .opened_buffers
5717                .get(&buffer_id)
5718                .and_then(|buffer| buffer.upgrade(cx))
5719                .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?;
5720            anyhow::Ok((project_id, buffer))
5721        })?;
5722        buffer
5723            .update(&mut cx, |buffer, _| {
5724                buffer.wait_for_version(deserialize_version(&envelope.payload.version))
5725            })
5726            .await?;
5727        let buffer_id = buffer.read_with(&cx, |buffer, _| buffer.remote_id());
5728
5729        let (saved_version, fingerprint, mtime) = this
5730            .update(&mut cx, |this, cx| this.save_buffer(buffer, cx))
5731            .await?;
5732        Ok(proto::BufferSaved {
5733            project_id,
5734            buffer_id,
5735            version: serialize_version(&saved_version),
5736            mtime: Some(mtime.into()),
5737            fingerprint: language::proto::serialize_fingerprint(fingerprint),
5738        })
5739    }
5740
5741    async fn handle_reload_buffers(
5742        this: ModelHandle<Self>,
5743        envelope: TypedEnvelope<proto::ReloadBuffers>,
5744        _: Arc<Client>,
5745        mut cx: AsyncAppContext,
5746    ) -> Result<proto::ReloadBuffersResponse> {
5747        let sender_id = envelope.original_sender_id()?;
5748        let reload = this.update(&mut cx, |this, cx| {
5749            let mut buffers = HashSet::default();
5750            for buffer_id in &envelope.payload.buffer_ids {
5751                buffers.insert(
5752                    this.opened_buffers
5753                        .get(buffer_id)
5754                        .and_then(|buffer| buffer.upgrade(cx))
5755                        .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?,
5756                );
5757            }
5758            Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
5759        })?;
5760
5761        let project_transaction = reload.await?;
5762        let project_transaction = this.update(&mut cx, |this, cx| {
5763            this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
5764        });
5765        Ok(proto::ReloadBuffersResponse {
5766            transaction: Some(project_transaction),
5767        })
5768    }
5769
5770    async fn handle_synchronize_buffers(
5771        this: ModelHandle<Self>,
5772        envelope: TypedEnvelope<proto::SynchronizeBuffers>,
5773        _: Arc<Client>,
5774        mut cx: AsyncAppContext,
5775    ) -> Result<proto::SynchronizeBuffersResponse> {
5776        let project_id = envelope.payload.project_id;
5777        let mut response = proto::SynchronizeBuffersResponse {
5778            buffers: Default::default(),
5779        };
5780
5781        this.update(&mut cx, |this, cx| {
5782            let Some(guest_id) = envelope.original_sender_id else {
5783                log::error!("missing original_sender_id on SynchronizeBuffers request");
5784                return;
5785            };
5786
5787            this.shared_buffers.entry(guest_id).or_default().clear();
5788            for buffer in envelope.payload.buffers {
5789                let buffer_id = buffer.id;
5790                let remote_version = language::proto::deserialize_version(&buffer.version);
5791                if let Some(buffer) = this.buffer_for_id(buffer_id, cx) {
5792                    this.shared_buffers
5793                        .entry(guest_id)
5794                        .or_default()
5795                        .insert(buffer_id);
5796
5797                    let buffer = buffer.read(cx);
5798                    response.buffers.push(proto::BufferVersion {
5799                        id: buffer_id,
5800                        version: language::proto::serialize_version(&buffer.version),
5801                    });
5802
5803                    let operations = buffer.serialize_ops(Some(remote_version), cx);
5804                    let client = this.client.clone();
5805                    if let Some(file) = buffer.file() {
5806                        client
5807                            .send(proto::UpdateBufferFile {
5808                                project_id,
5809                                buffer_id: buffer_id as u64,
5810                                file: Some(file.to_proto()),
5811                            })
5812                            .log_err();
5813                    }
5814
5815                    client
5816                        .send(proto::UpdateDiffBase {
5817                            project_id,
5818                            buffer_id: buffer_id as u64,
5819                            diff_base: buffer.diff_base().map(Into::into),
5820                        })
5821                        .log_err();
5822
5823                    client
5824                        .send(proto::BufferReloaded {
5825                            project_id,
5826                            buffer_id,
5827                            version: language::proto::serialize_version(buffer.saved_version()),
5828                            mtime: Some(buffer.saved_mtime().into()),
5829                            fingerprint: language::proto::serialize_fingerprint(
5830                                buffer.saved_version_fingerprint(),
5831                            ),
5832                            line_ending: language::proto::serialize_line_ending(
5833                                buffer.line_ending(),
5834                            ) as i32,
5835                        })
5836                        .log_err();
5837
5838                    cx.background()
5839                        .spawn(
5840                            async move {
5841                                let operations = operations.await;
5842                                for chunk in split_operations(operations) {
5843                                    client
5844                                        .request(proto::UpdateBuffer {
5845                                            project_id,
5846                                            buffer_id,
5847                                            operations: chunk,
5848                                        })
5849                                        .await?;
5850                                }
5851                                anyhow::Ok(())
5852                            }
5853                            .log_err(),
5854                        )
5855                        .detach();
5856                }
5857            }
5858        });
5859
5860        Ok(response)
5861    }
5862
5863    async fn handle_format_buffers(
5864        this: ModelHandle<Self>,
5865        envelope: TypedEnvelope<proto::FormatBuffers>,
5866        _: Arc<Client>,
5867        mut cx: AsyncAppContext,
5868    ) -> Result<proto::FormatBuffersResponse> {
5869        let sender_id = envelope.original_sender_id()?;
5870        let format = this.update(&mut cx, |this, cx| {
5871            let mut buffers = HashSet::default();
5872            for buffer_id in &envelope.payload.buffer_ids {
5873                buffers.insert(
5874                    this.opened_buffers
5875                        .get(buffer_id)
5876                        .and_then(|buffer| buffer.upgrade(cx))
5877                        .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?,
5878                );
5879            }
5880            let trigger = FormatTrigger::from_proto(envelope.payload.trigger);
5881            Ok::<_, anyhow::Error>(this.format(buffers, false, trigger, cx))
5882        })?;
5883
5884        let project_transaction = format.await?;
5885        let project_transaction = this.update(&mut cx, |this, cx| {
5886            this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
5887        });
5888        Ok(proto::FormatBuffersResponse {
5889            transaction: Some(project_transaction),
5890        })
5891    }
5892
5893    async fn handle_apply_additional_edits_for_completion(
5894        this: ModelHandle<Self>,
5895        envelope: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
5896        _: Arc<Client>,
5897        mut cx: AsyncAppContext,
5898    ) -> Result<proto::ApplyCompletionAdditionalEditsResponse> {
5899        let (buffer, completion) = this.update(&mut cx, |this, cx| {
5900            let buffer = this
5901                .opened_buffers
5902                .get(&envelope.payload.buffer_id)
5903                .and_then(|buffer| buffer.upgrade(cx))
5904                .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?;
5905            let language = buffer.read(cx).language();
5906            let completion = language::proto::deserialize_completion(
5907                envelope
5908                    .payload
5909                    .completion
5910                    .ok_or_else(|| anyhow!("invalid completion"))?,
5911                language.cloned(),
5912            );
5913            Ok::<_, anyhow::Error>((buffer, completion))
5914        })?;
5915
5916        let completion = completion.await?;
5917
5918        let apply_additional_edits = this.update(&mut cx, |this, cx| {
5919            this.apply_additional_edits_for_completion(buffer, completion, false, cx)
5920        });
5921
5922        Ok(proto::ApplyCompletionAdditionalEditsResponse {
5923            transaction: apply_additional_edits
5924                .await?
5925                .as_ref()
5926                .map(language::proto::serialize_transaction),
5927        })
5928    }
5929
5930    async fn handle_apply_code_action(
5931        this: ModelHandle<Self>,
5932        envelope: TypedEnvelope<proto::ApplyCodeAction>,
5933        _: Arc<Client>,
5934        mut cx: AsyncAppContext,
5935    ) -> Result<proto::ApplyCodeActionResponse> {
5936        let sender_id = envelope.original_sender_id()?;
5937        let action = language::proto::deserialize_code_action(
5938            envelope
5939                .payload
5940                .action
5941                .ok_or_else(|| anyhow!("invalid action"))?,
5942        )?;
5943        let apply_code_action = this.update(&mut cx, |this, cx| {
5944            let buffer = this
5945                .opened_buffers
5946                .get(&envelope.payload.buffer_id)
5947                .and_then(|buffer| buffer.upgrade(cx))
5948                .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?;
5949            Ok::<_, anyhow::Error>(this.apply_code_action(buffer, action, false, cx))
5950        })?;
5951
5952        let project_transaction = apply_code_action.await?;
5953        let project_transaction = this.update(&mut cx, |this, cx| {
5954            this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
5955        });
5956        Ok(proto::ApplyCodeActionResponse {
5957            transaction: Some(project_transaction),
5958        })
5959    }
5960
5961    async fn handle_on_type_formatting(
5962        this: ModelHandle<Self>,
5963        envelope: TypedEnvelope<proto::OnTypeFormatting>,
5964        _: Arc<Client>,
5965        mut cx: AsyncAppContext,
5966    ) -> Result<proto::OnTypeFormattingResponse> {
5967        let on_type_formatting = this.update(&mut cx, |this, cx| {
5968            let buffer = this
5969                .opened_buffers
5970                .get(&envelope.payload.buffer_id)
5971                .and_then(|buffer| buffer.upgrade(cx))
5972                .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?;
5973            let position = envelope
5974                .payload
5975                .position
5976                .and_then(deserialize_anchor)
5977                .ok_or_else(|| anyhow!("invalid position"))?;
5978            Ok::<_, anyhow::Error>(this.apply_on_type_formatting(
5979                buffer,
5980                position,
5981                envelope.payload.trigger.clone(),
5982                cx,
5983            ))
5984        })?;
5985
5986        let transaction = on_type_formatting
5987            .await?
5988            .as_ref()
5989            .map(language::proto::serialize_transaction);
5990        Ok(proto::OnTypeFormattingResponse { transaction })
5991    }
5992
5993    async fn handle_lsp_command<T: LspCommand>(
5994        this: ModelHandle<Self>,
5995        envelope: TypedEnvelope<T::ProtoRequest>,
5996        _: Arc<Client>,
5997        mut cx: AsyncAppContext,
5998    ) -> Result<<T::ProtoRequest as proto::RequestMessage>::Response>
5999    where
6000        <T::LspRequest as lsp::request::Request>::Result: Send,
6001    {
6002        let sender_id = envelope.original_sender_id()?;
6003        let buffer_id = T::buffer_id_from_proto(&envelope.payload);
6004        let buffer_handle = this.read_with(&cx, |this, _| {
6005            this.opened_buffers
6006                .get(&buffer_id)
6007                .and_then(|buffer| buffer.upgrade(&cx))
6008                .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
6009        })?;
6010        let request = T::from_proto(
6011            envelope.payload,
6012            this.clone(),
6013            buffer_handle.clone(),
6014            cx.clone(),
6015        )
6016        .await?;
6017        let buffer_version = buffer_handle.read_with(&cx, |buffer, _| buffer.version());
6018        let response = this
6019            .update(&mut cx, |this, cx| {
6020                this.request_lsp(buffer_handle, request, cx)
6021            })
6022            .await?;
6023        this.update(&mut cx, |this, cx| {
6024            Ok(T::response_to_proto(
6025                response,
6026                this,
6027                sender_id,
6028                &buffer_version,
6029                cx,
6030            ))
6031        })
6032    }
6033
6034    async fn handle_get_project_symbols(
6035        this: ModelHandle<Self>,
6036        envelope: TypedEnvelope<proto::GetProjectSymbols>,
6037        _: Arc<Client>,
6038        mut cx: AsyncAppContext,
6039    ) -> Result<proto::GetProjectSymbolsResponse> {
6040        let symbols = this
6041            .update(&mut cx, |this, cx| {
6042                this.symbols(&envelope.payload.query, cx)
6043            })
6044            .await?;
6045
6046        Ok(proto::GetProjectSymbolsResponse {
6047            symbols: symbols.iter().map(serialize_symbol).collect(),
6048        })
6049    }
6050
6051    async fn handle_search_project(
6052        this: ModelHandle<Self>,
6053        envelope: TypedEnvelope<proto::SearchProject>,
6054        _: Arc<Client>,
6055        mut cx: AsyncAppContext,
6056    ) -> Result<proto::SearchProjectResponse> {
6057        let peer_id = envelope.original_sender_id()?;
6058        let query = SearchQuery::from_proto(envelope.payload)?;
6059        let result = this
6060            .update(&mut cx, |this, cx| this.search(query, cx))
6061            .await?;
6062
6063        this.update(&mut cx, |this, cx| {
6064            let mut locations = Vec::new();
6065            for (buffer, ranges) in result {
6066                for range in ranges {
6067                    let start = serialize_anchor(&range.start);
6068                    let end = serialize_anchor(&range.end);
6069                    let buffer_id = this.create_buffer_for_peer(&buffer, peer_id, cx);
6070                    locations.push(proto::Location {
6071                        buffer_id,
6072                        start: Some(start),
6073                        end: Some(end),
6074                    });
6075                }
6076            }
6077            Ok(proto::SearchProjectResponse { locations })
6078        })
6079    }
6080
6081    async fn handle_open_buffer_for_symbol(
6082        this: ModelHandle<Self>,
6083        envelope: TypedEnvelope<proto::OpenBufferForSymbol>,
6084        _: Arc<Client>,
6085        mut cx: AsyncAppContext,
6086    ) -> Result<proto::OpenBufferForSymbolResponse> {
6087        let peer_id = envelope.original_sender_id()?;
6088        let symbol = envelope
6089            .payload
6090            .symbol
6091            .ok_or_else(|| anyhow!("invalid symbol"))?;
6092        let symbol = this
6093            .read_with(&cx, |this, _| this.deserialize_symbol(symbol))
6094            .await?;
6095        let symbol = this.read_with(&cx, |this, _| {
6096            let signature = this.symbol_signature(&symbol.path);
6097            if signature == symbol.signature {
6098                Ok(symbol)
6099            } else {
6100                Err(anyhow!("invalid symbol signature"))
6101            }
6102        })?;
6103        let buffer = this
6104            .update(&mut cx, |this, cx| this.open_buffer_for_symbol(&symbol, cx))
6105            .await?;
6106
6107        Ok(proto::OpenBufferForSymbolResponse {
6108            buffer_id: this.update(&mut cx, |this, cx| {
6109                this.create_buffer_for_peer(&buffer, peer_id, cx)
6110            }),
6111        })
6112    }
6113
6114    fn symbol_signature(&self, project_path: &ProjectPath) -> [u8; 32] {
6115        let mut hasher = Sha256::new();
6116        hasher.update(project_path.worktree_id.to_proto().to_be_bytes());
6117        hasher.update(project_path.path.to_string_lossy().as_bytes());
6118        hasher.update(self.nonce.to_be_bytes());
6119        hasher.finalize().as_slice().try_into().unwrap()
6120    }
6121
6122    async fn handle_open_buffer_by_id(
6123        this: ModelHandle<Self>,
6124        envelope: TypedEnvelope<proto::OpenBufferById>,
6125        _: Arc<Client>,
6126        mut cx: AsyncAppContext,
6127    ) -> Result<proto::OpenBufferResponse> {
6128        let peer_id = envelope.original_sender_id()?;
6129        let buffer = this
6130            .update(&mut cx, |this, cx| {
6131                this.open_buffer_by_id(envelope.payload.id, cx)
6132            })
6133            .await?;
6134        this.update(&mut cx, |this, cx| {
6135            Ok(proto::OpenBufferResponse {
6136                buffer_id: this.create_buffer_for_peer(&buffer, peer_id, cx),
6137            })
6138        })
6139    }
6140
6141    async fn handle_open_buffer_by_path(
6142        this: ModelHandle<Self>,
6143        envelope: TypedEnvelope<proto::OpenBufferByPath>,
6144        _: Arc<Client>,
6145        mut cx: AsyncAppContext,
6146    ) -> Result<proto::OpenBufferResponse> {
6147        let peer_id = envelope.original_sender_id()?;
6148        let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
6149        let open_buffer = this.update(&mut cx, |this, cx| {
6150            this.open_buffer(
6151                ProjectPath {
6152                    worktree_id,
6153                    path: PathBuf::from(envelope.payload.path).into(),
6154                },
6155                cx,
6156            )
6157        });
6158
6159        let buffer = open_buffer.await?;
6160        this.update(&mut cx, |this, cx| {
6161            Ok(proto::OpenBufferResponse {
6162                buffer_id: this.create_buffer_for_peer(&buffer, peer_id, cx),
6163            })
6164        })
6165    }
6166
6167    fn serialize_project_transaction_for_peer(
6168        &mut self,
6169        project_transaction: ProjectTransaction,
6170        peer_id: proto::PeerId,
6171        cx: &mut AppContext,
6172    ) -> proto::ProjectTransaction {
6173        let mut serialized_transaction = proto::ProjectTransaction {
6174            buffer_ids: Default::default(),
6175            transactions: Default::default(),
6176        };
6177        for (buffer, transaction) in project_transaction.0 {
6178            serialized_transaction
6179                .buffer_ids
6180                .push(self.create_buffer_for_peer(&buffer, peer_id, cx));
6181            serialized_transaction
6182                .transactions
6183                .push(language::proto::serialize_transaction(&transaction));
6184        }
6185        serialized_transaction
6186    }
6187
6188    fn deserialize_project_transaction(
6189        &mut self,
6190        message: proto::ProjectTransaction,
6191        push_to_history: bool,
6192        cx: &mut ModelContext<Self>,
6193    ) -> Task<Result<ProjectTransaction>> {
6194        cx.spawn(|this, mut cx| async move {
6195            let mut project_transaction = ProjectTransaction::default();
6196            for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
6197            {
6198                let buffer = this
6199                    .update(&mut cx, |this, cx| {
6200                        this.wait_for_remote_buffer(buffer_id, cx)
6201                    })
6202                    .await?;
6203                let transaction = language::proto::deserialize_transaction(transaction)?;
6204                project_transaction.0.insert(buffer, transaction);
6205            }
6206
6207            for (buffer, transaction) in &project_transaction.0 {
6208                buffer
6209                    .update(&mut cx, |buffer, _| {
6210                        buffer.wait_for_edits(transaction.edit_ids.iter().copied())
6211                    })
6212                    .await?;
6213
6214                if push_to_history {
6215                    buffer.update(&mut cx, |buffer, _| {
6216                        buffer.push_transaction(transaction.clone(), Instant::now());
6217                    });
6218                }
6219            }
6220
6221            Ok(project_transaction)
6222        })
6223    }
6224
6225    fn create_buffer_for_peer(
6226        &mut self,
6227        buffer: &ModelHandle<Buffer>,
6228        peer_id: proto::PeerId,
6229        cx: &mut AppContext,
6230    ) -> u64 {
6231        let buffer_id = buffer.read(cx).remote_id();
6232        if let Some(ProjectClientState::Local { updates_tx, .. }) = &self.client_state {
6233            updates_tx
6234                .unbounded_send(LocalProjectUpdate::CreateBufferForPeer { peer_id, buffer_id })
6235                .ok();
6236        }
6237        buffer_id
6238    }
6239
6240    fn wait_for_remote_buffer(
6241        &mut self,
6242        id: u64,
6243        cx: &mut ModelContext<Self>,
6244    ) -> Task<Result<ModelHandle<Buffer>>> {
6245        let mut opened_buffer_rx = self.opened_buffer.1.clone();
6246
6247        cx.spawn_weak(|this, mut cx| async move {
6248            let buffer = loop {
6249                let Some(this) = this.upgrade(&cx) else {
6250                    return Err(anyhow!("project dropped"));
6251                };
6252
6253                let buffer = this.read_with(&cx, |this, cx| {
6254                    this.opened_buffers
6255                        .get(&id)
6256                        .and_then(|buffer| buffer.upgrade(cx))
6257                });
6258
6259                if let Some(buffer) = buffer {
6260                    break buffer;
6261                } else if this.read_with(&cx, |this, _| this.is_read_only()) {
6262                    return Err(anyhow!("disconnected before buffer {} could be opened", id));
6263                }
6264
6265                this.update(&mut cx, |this, _| {
6266                    this.incomplete_remote_buffers.entry(id).or_default();
6267                });
6268                drop(this);
6269
6270                opened_buffer_rx
6271                    .next()
6272                    .await
6273                    .ok_or_else(|| anyhow!("project dropped while waiting for buffer"))?;
6274            };
6275
6276            Ok(buffer)
6277        })
6278    }
6279
6280    fn synchronize_remote_buffers(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
6281        let project_id = match self.client_state.as_ref() {
6282            Some(ProjectClientState::Remote {
6283                sharing_has_stopped,
6284                remote_id,
6285                ..
6286            }) => {
6287                if *sharing_has_stopped {
6288                    return Task::ready(Err(anyhow!(
6289                        "can't synchronize remote buffers on a readonly project"
6290                    )));
6291                } else {
6292                    *remote_id
6293                }
6294            }
6295            Some(ProjectClientState::Local { .. }) | None => {
6296                return Task::ready(Err(anyhow!(
6297                    "can't synchronize remote buffers on a local project"
6298                )))
6299            }
6300        };
6301
6302        let client = self.client.clone();
6303        cx.spawn(|this, cx| async move {
6304            let (buffers, incomplete_buffer_ids) = this.read_with(&cx, |this, cx| {
6305                let buffers = this
6306                    .opened_buffers
6307                    .iter()
6308                    .filter_map(|(id, buffer)| {
6309                        let buffer = buffer.upgrade(cx)?;
6310                        Some(proto::BufferVersion {
6311                            id: *id,
6312                            version: language::proto::serialize_version(&buffer.read(cx).version),
6313                        })
6314                    })
6315                    .collect();
6316                let incomplete_buffer_ids = this
6317                    .incomplete_remote_buffers
6318                    .keys()
6319                    .copied()
6320                    .collect::<Vec<_>>();
6321
6322                (buffers, incomplete_buffer_ids)
6323            });
6324            let response = client
6325                .request(proto::SynchronizeBuffers {
6326                    project_id,
6327                    buffers,
6328                })
6329                .await?;
6330
6331            let send_updates_for_buffers = response.buffers.into_iter().map(|buffer| {
6332                let client = client.clone();
6333                let buffer_id = buffer.id;
6334                let remote_version = language::proto::deserialize_version(&buffer.version);
6335                this.read_with(&cx, |this, cx| {
6336                    if let Some(buffer) = this.buffer_for_id(buffer_id, cx) {
6337                        let operations = buffer.read(cx).serialize_ops(Some(remote_version), cx);
6338                        cx.background().spawn(async move {
6339                            let operations = operations.await;
6340                            for chunk in split_operations(operations) {
6341                                client
6342                                    .request(proto::UpdateBuffer {
6343                                        project_id,
6344                                        buffer_id,
6345                                        operations: chunk,
6346                                    })
6347                                    .await?;
6348                            }
6349                            anyhow::Ok(())
6350                        })
6351                    } else {
6352                        Task::ready(Ok(()))
6353                    }
6354                })
6355            });
6356
6357            // Any incomplete buffers have open requests waiting. Request that the host sends
6358            // creates these buffers for us again to unblock any waiting futures.
6359            for id in incomplete_buffer_ids {
6360                cx.background()
6361                    .spawn(client.request(proto::OpenBufferById { project_id, id }))
6362                    .detach();
6363            }
6364
6365            futures::future::join_all(send_updates_for_buffers)
6366                .await
6367                .into_iter()
6368                .collect()
6369        })
6370    }
6371
6372    pub fn worktree_metadata_protos(&self, cx: &AppContext) -> Vec<proto::WorktreeMetadata> {
6373        self.worktrees(cx)
6374            .map(|worktree| {
6375                let worktree = worktree.read(cx);
6376                proto::WorktreeMetadata {
6377                    id: worktree.id().to_proto(),
6378                    root_name: worktree.root_name().into(),
6379                    visible: worktree.is_visible(),
6380                    abs_path: worktree.abs_path().to_string_lossy().into(),
6381                }
6382            })
6383            .collect()
6384    }
6385
6386    fn set_worktrees_from_proto(
6387        &mut self,
6388        worktrees: Vec<proto::WorktreeMetadata>,
6389        cx: &mut ModelContext<Project>,
6390    ) -> Result<()> {
6391        let replica_id = self.replica_id();
6392        let remote_id = self.remote_id().ok_or_else(|| anyhow!("invalid project"))?;
6393
6394        let mut old_worktrees_by_id = self
6395            .worktrees
6396            .drain(..)
6397            .filter_map(|worktree| {
6398                let worktree = worktree.upgrade(cx)?;
6399                Some((worktree.read(cx).id(), worktree))
6400            })
6401            .collect::<HashMap<_, _>>();
6402
6403        for worktree in worktrees {
6404            if let Some(old_worktree) =
6405                old_worktrees_by_id.remove(&WorktreeId::from_proto(worktree.id))
6406            {
6407                self.worktrees.push(WorktreeHandle::Strong(old_worktree));
6408            } else {
6409                let worktree =
6410                    Worktree::remote(remote_id, replica_id, worktree, self.client.clone(), cx);
6411                let _ = self.add_worktree(&worktree, cx);
6412            }
6413        }
6414
6415        self.metadata_changed(cx);
6416        for (id, _) in old_worktrees_by_id {
6417            cx.emit(Event::WorktreeRemoved(id));
6418        }
6419
6420        Ok(())
6421    }
6422
6423    fn set_collaborators_from_proto(
6424        &mut self,
6425        messages: Vec<proto::Collaborator>,
6426        cx: &mut ModelContext<Self>,
6427    ) -> Result<()> {
6428        let mut collaborators = HashMap::default();
6429        for message in messages {
6430            let collaborator = Collaborator::from_proto(message)?;
6431            collaborators.insert(collaborator.peer_id, collaborator);
6432        }
6433        for old_peer_id in self.collaborators.keys() {
6434            if !collaborators.contains_key(old_peer_id) {
6435                cx.emit(Event::CollaboratorLeft(*old_peer_id));
6436            }
6437        }
6438        self.collaborators = collaborators;
6439        Ok(())
6440    }
6441
6442    fn deserialize_symbol(
6443        &self,
6444        serialized_symbol: proto::Symbol,
6445    ) -> impl Future<Output = Result<Symbol>> {
6446        let languages = self.languages.clone();
6447        async move {
6448            let source_worktree_id = WorktreeId::from_proto(serialized_symbol.source_worktree_id);
6449            let worktree_id = WorktreeId::from_proto(serialized_symbol.worktree_id);
6450            let start = serialized_symbol
6451                .start
6452                .ok_or_else(|| anyhow!("invalid start"))?;
6453            let end = serialized_symbol
6454                .end
6455                .ok_or_else(|| anyhow!("invalid end"))?;
6456            let kind = unsafe { mem::transmute(serialized_symbol.kind) };
6457            let path = ProjectPath {
6458                worktree_id,
6459                path: PathBuf::from(serialized_symbol.path).into(),
6460            };
6461            let language = languages
6462                .language_for_file(&path.path, None)
6463                .await
6464                .log_err();
6465            Ok(Symbol {
6466                language_server_name: LanguageServerName(
6467                    serialized_symbol.language_server_name.into(),
6468                ),
6469                source_worktree_id,
6470                path,
6471                label: {
6472                    match language {
6473                        Some(language) => {
6474                            language
6475                                .label_for_symbol(&serialized_symbol.name, kind)
6476                                .await
6477                        }
6478                        None => None,
6479                    }
6480                    .unwrap_or_else(|| CodeLabel::plain(serialized_symbol.name.clone(), None))
6481                },
6482
6483                name: serialized_symbol.name,
6484                range: Unclipped(PointUtf16::new(start.row, start.column))
6485                    ..Unclipped(PointUtf16::new(end.row, end.column)),
6486                kind,
6487                signature: serialized_symbol
6488                    .signature
6489                    .try_into()
6490                    .map_err(|_| anyhow!("invalid signature"))?,
6491            })
6492        }
6493    }
6494
6495    async fn handle_buffer_saved(
6496        this: ModelHandle<Self>,
6497        envelope: TypedEnvelope<proto::BufferSaved>,
6498        _: Arc<Client>,
6499        mut cx: AsyncAppContext,
6500    ) -> Result<()> {
6501        let fingerprint = deserialize_fingerprint(&envelope.payload.fingerprint)?;
6502        let version = deserialize_version(&envelope.payload.version);
6503        let mtime = envelope
6504            .payload
6505            .mtime
6506            .ok_or_else(|| anyhow!("missing mtime"))?
6507            .into();
6508
6509        this.update(&mut cx, |this, cx| {
6510            let buffer = this
6511                .opened_buffers
6512                .get(&envelope.payload.buffer_id)
6513                .and_then(|buffer| buffer.upgrade(cx))
6514                .or_else(|| {
6515                    this.incomplete_remote_buffers
6516                        .get(&envelope.payload.buffer_id)
6517                        .and_then(|b| b.clone())
6518                });
6519            if let Some(buffer) = buffer {
6520                buffer.update(cx, |buffer, cx| {
6521                    buffer.did_save(version, fingerprint, mtime, cx);
6522                });
6523            }
6524            Ok(())
6525        })
6526    }
6527
6528    async fn handle_buffer_reloaded(
6529        this: ModelHandle<Self>,
6530        envelope: TypedEnvelope<proto::BufferReloaded>,
6531        _: Arc<Client>,
6532        mut cx: AsyncAppContext,
6533    ) -> Result<()> {
6534        let payload = envelope.payload;
6535        let version = deserialize_version(&payload.version);
6536        let fingerprint = deserialize_fingerprint(&payload.fingerprint)?;
6537        let line_ending = deserialize_line_ending(
6538            proto::LineEnding::from_i32(payload.line_ending)
6539                .ok_or_else(|| anyhow!("missing line ending"))?,
6540        );
6541        let mtime = payload
6542            .mtime
6543            .ok_or_else(|| anyhow!("missing mtime"))?
6544            .into();
6545        this.update(&mut cx, |this, cx| {
6546            let buffer = this
6547                .opened_buffers
6548                .get(&payload.buffer_id)
6549                .and_then(|buffer| buffer.upgrade(cx))
6550                .or_else(|| {
6551                    this.incomplete_remote_buffers
6552                        .get(&payload.buffer_id)
6553                        .cloned()
6554                        .flatten()
6555                });
6556            if let Some(buffer) = buffer {
6557                buffer.update(cx, |buffer, cx| {
6558                    buffer.did_reload(version, fingerprint, line_ending, mtime, cx);
6559                });
6560            }
6561            Ok(())
6562        })
6563    }
6564
6565    #[allow(clippy::type_complexity)]
6566    fn edits_from_lsp(
6567        &mut self,
6568        buffer: &ModelHandle<Buffer>,
6569        lsp_edits: impl 'static + Send + IntoIterator<Item = lsp::TextEdit>,
6570        server_id: LanguageServerId,
6571        version: Option<i32>,
6572        cx: &mut ModelContext<Self>,
6573    ) -> Task<Result<Vec<(Range<Anchor>, String)>>> {
6574        let snapshot = self.buffer_snapshot_for_lsp_version(buffer, server_id, version, cx);
6575        cx.background().spawn(async move {
6576            let snapshot = snapshot?;
6577            let mut lsp_edits = lsp_edits
6578                .into_iter()
6579                .map(|edit| (range_from_lsp(edit.range), edit.new_text))
6580                .collect::<Vec<_>>();
6581            lsp_edits.sort_by_key(|(range, _)| range.start);
6582
6583            let mut lsp_edits = lsp_edits.into_iter().peekable();
6584            let mut edits = Vec::new();
6585            while let Some((range, mut new_text)) = lsp_edits.next() {
6586                // Clip invalid ranges provided by the language server.
6587                let mut range = snapshot.clip_point_utf16(range.start, Bias::Left)
6588                    ..snapshot.clip_point_utf16(range.end, Bias::Left);
6589
6590                // Combine any LSP edits that are adjacent.
6591                //
6592                // Also, combine LSP edits that are separated from each other by only
6593                // a newline. This is important because for some code actions,
6594                // Rust-analyzer rewrites the entire buffer via a series of edits that
6595                // are separated by unchanged newline characters.
6596                //
6597                // In order for the diffing logic below to work properly, any edits that
6598                // cancel each other out must be combined into one.
6599                while let Some((next_range, next_text)) = lsp_edits.peek() {
6600                    if next_range.start.0 > range.end {
6601                        if next_range.start.0.row > range.end.row + 1
6602                            || next_range.start.0.column > 0
6603                            || snapshot.clip_point_utf16(
6604                                Unclipped(PointUtf16::new(range.end.row, u32::MAX)),
6605                                Bias::Left,
6606                            ) > range.end
6607                        {
6608                            break;
6609                        }
6610                        new_text.push('\n');
6611                    }
6612                    range.end = snapshot.clip_point_utf16(next_range.end, Bias::Left);
6613                    new_text.push_str(next_text);
6614                    lsp_edits.next();
6615                }
6616
6617                // For multiline edits, perform a diff of the old and new text so that
6618                // we can identify the changes more precisely, preserving the locations
6619                // of any anchors positioned in the unchanged regions.
6620                if range.end.row > range.start.row {
6621                    let mut offset = range.start.to_offset(&snapshot);
6622                    let old_text = snapshot.text_for_range(range).collect::<String>();
6623
6624                    let diff = TextDiff::from_lines(old_text.as_str(), &new_text);
6625                    let mut moved_since_edit = true;
6626                    for change in diff.iter_all_changes() {
6627                        let tag = change.tag();
6628                        let value = change.value();
6629                        match tag {
6630                            ChangeTag::Equal => {
6631                                offset += value.len();
6632                                moved_since_edit = true;
6633                            }
6634                            ChangeTag::Delete => {
6635                                let start = snapshot.anchor_after(offset);
6636                                let end = snapshot.anchor_before(offset + value.len());
6637                                if moved_since_edit {
6638                                    edits.push((start..end, String::new()));
6639                                } else {
6640                                    edits.last_mut().unwrap().0.end = end;
6641                                }
6642                                offset += value.len();
6643                                moved_since_edit = false;
6644                            }
6645                            ChangeTag::Insert => {
6646                                if moved_since_edit {
6647                                    let anchor = snapshot.anchor_after(offset);
6648                                    edits.push((anchor..anchor, value.to_string()));
6649                                } else {
6650                                    edits.last_mut().unwrap().1.push_str(value);
6651                                }
6652                                moved_since_edit = false;
6653                            }
6654                        }
6655                    }
6656                } else if range.end == range.start {
6657                    let anchor = snapshot.anchor_after(range.start);
6658                    edits.push((anchor..anchor, new_text));
6659                } else {
6660                    let edit_start = snapshot.anchor_after(range.start);
6661                    let edit_end = snapshot.anchor_before(range.end);
6662                    edits.push((edit_start..edit_end, new_text));
6663                }
6664            }
6665
6666            Ok(edits)
6667        })
6668    }
6669
6670    fn buffer_snapshot_for_lsp_version(
6671        &mut self,
6672        buffer: &ModelHandle<Buffer>,
6673        server_id: LanguageServerId,
6674        version: Option<i32>,
6675        cx: &AppContext,
6676    ) -> Result<TextBufferSnapshot> {
6677        const OLD_VERSIONS_TO_RETAIN: i32 = 10;
6678
6679        if let Some(version) = version {
6680            let buffer_id = buffer.read(cx).remote_id();
6681            let snapshots = self
6682                .buffer_snapshots
6683                .get_mut(&buffer_id)
6684                .and_then(|m| m.get_mut(&server_id))
6685                .ok_or_else(|| {
6686                    anyhow!("no snapshots found for buffer {buffer_id} and server {server_id}")
6687                })?;
6688
6689            let found_snapshot = snapshots
6690                .binary_search_by_key(&version, |e| e.version)
6691                .map(|ix| snapshots[ix].snapshot.clone())
6692                .map_err(|_| {
6693                    anyhow!("snapshot not found for buffer {buffer_id} server {server_id} at version {version}")
6694                })?;
6695
6696            snapshots.retain(|snapshot| snapshot.version + OLD_VERSIONS_TO_RETAIN >= version);
6697            Ok(found_snapshot)
6698        } else {
6699            Ok((buffer.read(cx)).text_snapshot())
6700        }
6701    }
6702
6703    pub fn language_servers(
6704        &self,
6705    ) -> impl '_ + Iterator<Item = (LanguageServerId, LanguageServerName, WorktreeId)> {
6706        self.language_server_ids
6707            .iter()
6708            .map(|((worktree_id, server_name), server_id)| {
6709                (*server_id, server_name.clone(), *worktree_id)
6710            })
6711    }
6712
6713    pub fn language_server_for_id(&self, id: LanguageServerId) -> Option<Arc<LanguageServer>> {
6714        if let LanguageServerState::Running { server, .. } = self.language_servers.get(&id)? {
6715            Some(server.clone())
6716        } else {
6717            None
6718        }
6719    }
6720
6721    pub fn language_servers_for_buffer(
6722        &self,
6723        buffer: &Buffer,
6724        cx: &AppContext,
6725    ) -> impl Iterator<Item = (&Arc<CachedLspAdapter>, &Arc<LanguageServer>)> {
6726        self.language_server_ids_for_buffer(buffer, cx)
6727            .into_iter()
6728            .filter_map(|server_id| {
6729                let server = self.language_servers.get(&server_id)?;
6730                if let LanguageServerState::Running {
6731                    adapter, server, ..
6732                } = server
6733                {
6734                    Some((adapter, server))
6735                } else {
6736                    None
6737                }
6738            })
6739    }
6740
6741    fn primary_language_servers_for_buffer(
6742        &self,
6743        buffer: &Buffer,
6744        cx: &AppContext,
6745    ) -> Option<(&Arc<CachedLspAdapter>, &Arc<LanguageServer>)> {
6746        self.language_servers_for_buffer(buffer, cx).next()
6747    }
6748
6749    fn language_server_for_buffer(
6750        &self,
6751        buffer: &Buffer,
6752        server_id: LanguageServerId,
6753        cx: &AppContext,
6754    ) -> Option<(&Arc<CachedLspAdapter>, &Arc<LanguageServer>)> {
6755        self.language_servers_for_buffer(buffer, cx)
6756            .find(|(_, s)| s.server_id() == server_id)
6757    }
6758
6759    fn language_server_ids_for_buffer(
6760        &self,
6761        buffer: &Buffer,
6762        cx: &AppContext,
6763    ) -> Vec<LanguageServerId> {
6764        if let Some((file, language)) = File::from_dyn(buffer.file()).zip(buffer.language()) {
6765            let worktree_id = file.worktree_id(cx);
6766            language
6767                .lsp_adapters()
6768                .iter()
6769                .flat_map(|adapter| {
6770                    let key = (worktree_id, adapter.name.clone());
6771                    self.language_server_ids.get(&key).copied()
6772                })
6773                .collect()
6774        } else {
6775            Vec::new()
6776        }
6777    }
6778}
6779
6780impl WorktreeHandle {
6781    pub fn upgrade(&self, cx: &AppContext) -> Option<ModelHandle<Worktree>> {
6782        match self {
6783            WorktreeHandle::Strong(handle) => Some(handle.clone()),
6784            WorktreeHandle::Weak(handle) => handle.upgrade(cx),
6785        }
6786    }
6787}
6788
6789impl OpenBuffer {
6790    pub fn upgrade(&self, cx: &impl BorrowAppContext) -> Option<ModelHandle<Buffer>> {
6791        match self {
6792            OpenBuffer::Strong(handle) => Some(handle.clone()),
6793            OpenBuffer::Weak(handle) => handle.upgrade(cx),
6794            OpenBuffer::Operations(_) => None,
6795        }
6796    }
6797}
6798
6799pub struct PathMatchCandidateSet {
6800    pub snapshot: Snapshot,
6801    pub include_ignored: bool,
6802    pub include_root_name: bool,
6803}
6804
6805impl<'a> fuzzy::PathMatchCandidateSet<'a> for PathMatchCandidateSet {
6806    type Candidates = PathMatchCandidateSetIter<'a>;
6807
6808    fn id(&self) -> usize {
6809        self.snapshot.id().to_usize()
6810    }
6811
6812    fn len(&self) -> usize {
6813        if self.include_ignored {
6814            self.snapshot.file_count()
6815        } else {
6816            self.snapshot.visible_file_count()
6817        }
6818    }
6819
6820    fn prefix(&self) -> Arc<str> {
6821        if self.snapshot.root_entry().map_or(false, |e| e.is_file()) {
6822            self.snapshot.root_name().into()
6823        } else if self.include_root_name {
6824            format!("{}/", self.snapshot.root_name()).into()
6825        } else {
6826            "".into()
6827        }
6828    }
6829
6830    fn candidates(&'a self, start: usize) -> Self::Candidates {
6831        PathMatchCandidateSetIter {
6832            traversal: self.snapshot.files(self.include_ignored, start),
6833        }
6834    }
6835}
6836
6837pub struct PathMatchCandidateSetIter<'a> {
6838    traversal: Traversal<'a>,
6839}
6840
6841impl<'a> Iterator for PathMatchCandidateSetIter<'a> {
6842    type Item = fuzzy::PathMatchCandidate<'a>;
6843
6844    fn next(&mut self) -> Option<Self::Item> {
6845        self.traversal.next().map(|entry| {
6846            if let EntryKind::File(char_bag) = entry.kind {
6847                fuzzy::PathMatchCandidate {
6848                    path: &entry.path,
6849                    char_bag,
6850                }
6851            } else {
6852                unreachable!()
6853            }
6854        })
6855    }
6856}
6857
6858impl Entity for Project {
6859    type Event = Event;
6860
6861    fn release(&mut self, cx: &mut gpui::AppContext) {
6862        match &self.client_state {
6863            Some(ProjectClientState::Local { .. }) => {
6864                let _ = self.unshare_internal(cx);
6865            }
6866            Some(ProjectClientState::Remote { remote_id, .. }) => {
6867                let _ = self.client.send(proto::LeaveProject {
6868                    project_id: *remote_id,
6869                });
6870                self.disconnected_from_host_internal(cx);
6871            }
6872            _ => {}
6873        }
6874    }
6875
6876    fn app_will_quit(
6877        &mut self,
6878        _: &mut AppContext,
6879    ) -> Option<std::pin::Pin<Box<dyn 'static + Future<Output = ()>>>> {
6880        let shutdown_futures = self
6881            .language_servers
6882            .drain()
6883            .map(|(_, server_state)| async {
6884                match server_state {
6885                    LanguageServerState::Running { server, .. } => server.shutdown()?.await,
6886                    LanguageServerState::Starting(starting_server) => {
6887                        starting_server.await?.shutdown()?.await
6888                    }
6889                }
6890            })
6891            .collect::<Vec<_>>();
6892
6893        Some(
6894            async move {
6895                futures::future::join_all(shutdown_futures).await;
6896            }
6897            .boxed(),
6898        )
6899    }
6900}
6901
6902impl Collaborator {
6903    fn from_proto(message: proto::Collaborator) -> Result<Self> {
6904        Ok(Self {
6905            peer_id: message.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?,
6906            replica_id: message.replica_id as ReplicaId,
6907        })
6908    }
6909}
6910
6911impl<P: AsRef<Path>> From<(WorktreeId, P)> for ProjectPath {
6912    fn from((worktree_id, path): (WorktreeId, P)) -> Self {
6913        Self {
6914            worktree_id,
6915            path: path.as_ref().into(),
6916        }
6917    }
6918}
6919
6920fn split_operations(
6921    mut operations: Vec<proto::Operation>,
6922) -> impl Iterator<Item = Vec<proto::Operation>> {
6923    #[cfg(any(test, feature = "test-support"))]
6924    const CHUNK_SIZE: usize = 5;
6925
6926    #[cfg(not(any(test, feature = "test-support")))]
6927    const CHUNK_SIZE: usize = 100;
6928
6929    let mut done = false;
6930    std::iter::from_fn(move || {
6931        if done {
6932            return None;
6933        }
6934
6935        let operations = operations
6936            .drain(..cmp::min(CHUNK_SIZE, operations.len()))
6937            .collect::<Vec<_>>();
6938        if operations.is_empty() {
6939            done = true;
6940        }
6941        Some(operations)
6942    })
6943}
6944
6945fn serialize_symbol(symbol: &Symbol) -> proto::Symbol {
6946    proto::Symbol {
6947        language_server_name: symbol.language_server_name.0.to_string(),
6948        source_worktree_id: symbol.source_worktree_id.to_proto(),
6949        worktree_id: symbol.path.worktree_id.to_proto(),
6950        path: symbol.path.path.to_string_lossy().to_string(),
6951        name: symbol.name.clone(),
6952        kind: unsafe { mem::transmute(symbol.kind) },
6953        start: Some(proto::PointUtf16 {
6954            row: symbol.range.start.0.row,
6955            column: symbol.range.start.0.column,
6956        }),
6957        end: Some(proto::PointUtf16 {
6958            row: symbol.range.end.0.row,
6959            column: symbol.range.end.0.column,
6960        }),
6961        signature: symbol.signature.to_vec(),
6962    }
6963}
6964
6965fn relativize_path(base: &Path, path: &Path) -> PathBuf {
6966    let mut path_components = path.components();
6967    let mut base_components = base.components();
6968    let mut components: Vec<Component> = Vec::new();
6969    loop {
6970        match (path_components.next(), base_components.next()) {
6971            (None, None) => break,
6972            (Some(a), None) => {
6973                components.push(a);
6974                components.extend(path_components.by_ref());
6975                break;
6976            }
6977            (None, _) => components.push(Component::ParentDir),
6978            (Some(a), Some(b)) if components.is_empty() && a == b => (),
6979            (Some(a), Some(b)) if b == Component::CurDir => components.push(a),
6980            (Some(a), Some(_)) => {
6981                components.push(Component::ParentDir);
6982                for _ in base_components {
6983                    components.push(Component::ParentDir);
6984                }
6985                components.push(a);
6986                components.extend(path_components.by_ref());
6987                break;
6988            }
6989        }
6990    }
6991    components.iter().map(|c| c.as_os_str()).collect()
6992}
6993
6994impl Item for Buffer {
6995    fn entry_id(&self, cx: &AppContext) -> Option<ProjectEntryId> {
6996        File::from_dyn(self.file()).and_then(|file| file.project_entry_id(cx))
6997    }
6998
6999    fn project_path(&self, cx: &AppContext) -> Option<ProjectPath> {
7000        File::from_dyn(self.file()).map(|file| ProjectPath {
7001            worktree_id: file.worktree_id(cx),
7002            path: file.path().clone(),
7003        })
7004    }
7005}
7006
7007async fn pump_loading_buffer_reciever(
7008    mut receiver: postage::watch::Receiver<Option<Result<ModelHandle<Buffer>, Arc<anyhow::Error>>>>,
7009) -> Result<ModelHandle<Buffer>, Arc<anyhow::Error>> {
7010    loop {
7011        if let Some(result) = receiver.borrow().as_ref() {
7012            match result {
7013                Ok(buffer) => return Ok(buffer.to_owned()),
7014                Err(e) => return Err(e.to_owned()),
7015            }
7016        }
7017        receiver.next().await;
7018    }
7019}
7020
7021fn update_diff_base(
7022    project: &Project,
7023) -> impl Fn(Option<String>, ModelHandle<Buffer>, &mut AsyncAppContext) {
7024    let remote_id = project.remote_id();
7025    let client = project.client().clone();
7026    move |diff_base, buffer, cx| {
7027        let buffer_id = buffer.update(cx, |buffer, cx| {
7028            buffer.set_diff_base(diff_base.clone(), cx);
7029            buffer.remote_id()
7030        });
7031
7032        if let Some(project_id) = remote_id {
7033            client
7034                .send(proto::UpdateDiffBase {
7035                    project_id,
7036                    buffer_id: buffer_id as u64,
7037                    diff_base,
7038                })
7039                .log_err();
7040        }
7041    }
7042}