Broadcast completion triggers to remote participants

Antonio Scandurra created

Change summary

crates/editor/src/multi_buffer.rs |  21 ---
crates/language/src/buffer.rs     | 170 ++++++++++++++++++++------------
crates/language/src/proto.rs      |  12 ++
crates/lsp/src/lsp.rs             |  23 ++-
crates/rpc/proto/zed.proto        |   6 +
5 files changed, 144 insertions(+), 88 deletions(-)

Detailed changes

crates/editor/src/multi_buffer.rs 🔗

@@ -911,22 +911,11 @@ impl MultiBuffer {
         let snapshot = self.snapshot(cx);
         let anchor = snapshot.anchor_before(position);
         let buffer = self.buffers.borrow()[&anchor.buffer_id].buffer.clone();
-        if let Some(language_server) = buffer.read(cx).language_server() {
-            language_server
-                .capabilities()
-                .completion_provider
-                .as_ref()
-                .map_or(false, |provider| {
-                    provider
-                        .trigger_characters
-                        .as_ref()
-                        .map_or(false, |characters| {
-                            characters.iter().any(|string| string == text)
-                        })
-                })
-        } else {
-            false
-        }
+        buffer
+            .read(cx)
+            .completion_triggers()
+            .iter()
+            .any(|string| string == text)
     }
 
     pub fn apply_additional_edits_for_completion(

crates/language/src/buffer.rs 🔗

@@ -74,6 +74,7 @@ pub struct Buffer {
     selections_update_count: usize,
     diagnostics_update_count: usize,
     language_server: Option<LanguageServerState>,
+    completion_triggers: Vec<String>,
     deferred_ops: OperationQueue<Operation>,
     #[cfg(test)]
     pub(crate) operations: Vec<Operation>,
@@ -126,7 +127,7 @@ struct LanguageServerState {
     latest_snapshot: watch::Sender<Option<LanguageServerSnapshot>>,
     pending_snapshots: BTreeMap<usize, LanguageServerSnapshot>,
     next_version: usize,
-    _maintain_server: Task<Option<()>>,
+    _maintain_server: Task<()>,
 }
 
 #[derive(Clone)]
@@ -148,6 +149,9 @@ pub enum Operation {
         selections: Arc<[Selection<Anchor>]>,
         lamport_timestamp: clock::Lamport,
     },
+    UpdateCompletionTriggers {
+        triggers: Vec<String>,
+    },
 }
 
 #[derive(Clone, Debug, Eq, PartialEq)]
@@ -448,6 +452,8 @@ impl Buffer {
             cx,
         );
 
+        this.completion_triggers = message.completion_triggers;
+
         let deferred_ops = message
             .deferred_operations
             .into_iter()
@@ -496,6 +502,7 @@ impl Buffer {
                         .map(|op| proto::serialize_operation(&Operation::Buffer(op.clone()))),
                 )
                 .collect(),
+            completion_triggers: self.completion_triggers.clone(),
         }
     }
 
@@ -538,6 +545,7 @@ impl Buffer {
             diagnostics: Default::default(),
             diagnostics_update_count: 0,
             language_server: None,
+            completion_triggers: Default::default(),
             deferred_ops: OperationQueue::new(),
             #[cfg(test)]
             operations: Default::default(),
@@ -639,75 +647,102 @@ impl Buffer {
         cx: &mut ModelContext<Self>,
     ) {
         self.language_server = if let Some(server) = language_server {
-            let (latest_snapshot_tx, mut latest_snapshot_rx) = watch::channel();
+            let (latest_snapshot_tx, mut latest_snapshot_rx) =
+                watch::channel::<Option<LanguageServerSnapshot>>();
+
+            let maintain_changes = cx.background().spawn({
+                let server = server.clone();
+                async move {
+                    let mut prev_snapshot: Option<LanguageServerSnapshot> = None;
+                    while let Some(snapshot) = latest_snapshot_rx.recv().await {
+                        if let Some(snapshot) = snapshot {
+                            let uri = lsp::Url::from_file_path(&snapshot.path).unwrap();
+                            if let Some(prev_snapshot) = prev_snapshot {
+                                let changes = lsp::DidChangeTextDocumentParams {
+                                    text_document: lsp::VersionedTextDocumentIdentifier::new(
+                                        uri,
+                                        snapshot.version as i32,
+                                    ),
+                                    content_changes: snapshot
+                                        .buffer_snapshot
+                                        .edits_since::<(PointUtf16, usize)>(
+                                            prev_snapshot.buffer_snapshot.version(),
+                                        )
+                                        .map(|edit| {
+                                            let edit_start = edit.new.start.0;
+                                            let edit_end =
+                                                edit_start + (edit.old.end.0 - edit.old.start.0);
+                                            let new_text = snapshot
+                                                .buffer_snapshot
+                                                .text_for_range(edit.new.start.1..edit.new.end.1)
+                                                .collect();
+                                            lsp::TextDocumentContentChangeEvent {
+                                                range: Some(lsp::Range::new(
+                                                    edit_start.to_lsp_position(),
+                                                    edit_end.to_lsp_position(),
+                                                )),
+                                                range_length: None,
+                                                text: new_text,
+                                            }
+                                        })
+                                        .collect(),
+                                };
+                                server
+                                    .notify::<lsp::notification::DidChangeTextDocument>(changes)
+                                    .await?;
+                            } else {
+                                server
+                                    .notify::<lsp::notification::DidOpenTextDocument>(
+                                        lsp::DidOpenTextDocumentParams {
+                                            text_document: lsp::TextDocumentItem::new(
+                                                uri,
+                                                Default::default(),
+                                                snapshot.version as i32,
+                                                snapshot.buffer_snapshot.text().to_string(),
+                                            ),
+                                        },
+                                    )
+                                    .await?;
+                            }
+
+                            prev_snapshot = Some(snapshot);
+                        }
+                    }
+                    Ok(())
+                }
+            });
+
             Some(LanguageServerState {
                 latest_snapshot: latest_snapshot_tx,
                 pending_snapshots: Default::default(),
                 next_version: 0,
                 server: server.clone(),
-                _maintain_server: cx.background().spawn(
-                    async move {
-                        let mut prev_snapshot: Option<LanguageServerSnapshot> = None;
-                        while let Some(snapshot) = latest_snapshot_rx.recv().await {
-                            if let Some(snapshot) = snapshot {
-                                let uri = lsp::Url::from_file_path(&snapshot.path).unwrap();
-                                if let Some(prev_snapshot) = prev_snapshot {
-                                    let changes = lsp::DidChangeTextDocumentParams {
-                                        text_document: lsp::VersionedTextDocumentIdentifier::new(
-                                            uri,
-                                            snapshot.version as i32,
-                                        ),
-                                        content_changes: snapshot
-                                            .buffer_snapshot
-                                            .edits_since::<(PointUtf16, usize)>(
-                                                prev_snapshot.buffer_snapshot.version(),
-                                            )
-                                            .map(|edit| {
-                                                let edit_start = edit.new.start.0;
-                                                let edit_end = edit_start
-                                                    + (edit.old.end.0 - edit.old.start.0);
-                                                let new_text = snapshot
-                                                    .buffer_snapshot
-                                                    .text_for_range(
-                                                        edit.new.start.1..edit.new.end.1,
-                                                    )
-                                                    .collect();
-                                                lsp::TextDocumentContentChangeEvent {
-                                                    range: Some(lsp::Range::new(
-                                                        edit_start.to_lsp_position(),
-                                                        edit_end.to_lsp_position(),
-                                                    )),
-                                                    range_length: None,
-                                                    text: new_text,
-                                                }
-                                            })
-                                            .collect(),
-                                    };
-                                    server
-                                        .notify::<lsp::notification::DidChangeTextDocument>(changes)
-                                        .await?;
-                                } else {
-                                    server
-                                        .notify::<lsp::notification::DidOpenTextDocument>(
-                                            lsp::DidOpenTextDocumentParams {
-                                                text_document: lsp::TextDocumentItem::new(
-                                                    uri,
-                                                    Default::default(),
-                                                    snapshot.version as i32,
-                                                    snapshot.buffer_snapshot.text().to_string(),
-                                                ),
-                                            },
-                                        )
-                                        .await?;
-                                }
-
-                                prev_snapshot = Some(snapshot);
+                _maintain_server: cx.spawn_weak(|this, mut cx| async move {
+                    let mut capabilities = server.capabilities();
+                    loop {
+                        if let Some(capabilities) = capabilities.recv().await.flatten() {
+                            if let Some(this) = this.upgrade(&cx) {
+                                let triggers = capabilities
+                                    .completion_provider
+                                    .and_then(|c| c.trigger_characters)
+                                    .unwrap_or_default();
+                                this.update(&mut cx, |this, cx| {
+                                    this.completion_triggers = triggers.clone();
+                                    this.send_operation(
+                                        Operation::UpdateCompletionTriggers { triggers },
+                                        cx,
+                                    );
+                                });
+                            } else {
+                                return;
                             }
+
+                            break;
                         }
-                        Ok(())
                     }
-                    .log_err(),
-                ),
+
+                    maintain_changes.log_err().await;
+                }),
             })
         } else {
             None
@@ -1591,6 +1626,7 @@ impl Buffer {
             Operation::UpdateSelections { selections, .. } => selections
                 .iter()
                 .all(|s| self.can_resolve(&s.start) && self.can_resolve(&s.end)),
+            Operation::UpdateCompletionTriggers { .. } => true,
         }
     }
 
@@ -1630,6 +1666,9 @@ impl Buffer {
                 self.text.lamport_clock.observe(lamport_timestamp);
                 self.selections_update_count += 1;
             }
+            Operation::UpdateCompletionTriggers { triggers } => {
+                self.completion_triggers = triggers;
+            }
         }
     }
 
@@ -1812,6 +1851,10 @@ impl Buffer {
             Ok::<_, anyhow::Error>(())
         }))
     }
+
+    pub fn completion_triggers(&self) -> &[String] {
+        &self.completion_triggers
+    }
 }
 
 #[cfg(any(test, feature = "test-support"))]
@@ -2529,6 +2572,9 @@ impl operation_queue::Operation for Operation {
             | Operation::UpdateSelections {
                 lamport_timestamp, ..
             } => *lamport_timestamp,
+            Operation::UpdateCompletionTriggers { .. } => {
+                unreachable!("updating completion triggers should never be deferred")
+            }
         }
     }
 }

crates/language/src/proto.rs 🔗

@@ -58,6 +58,13 @@ pub fn serialize_operation(operation: &Operation) -> proto::Operation {
                 lamport_timestamp: lamport_timestamp.value,
                 diagnostics: serialize_diagnostics(diagnostics.iter()),
             }),
+            Operation::UpdateCompletionTriggers { triggers } => {
+                proto::operation::Variant::UpdateCompletionTriggers(
+                    proto::operation::UpdateCompletionTriggers {
+                        triggers: triggers.clone(),
+                    },
+                )
+            }
         }),
     }
 }
@@ -238,6 +245,11 @@ pub fn deserialize_operation(message: proto::Operation) -> Result<Operation> {
                     value: message.lamport_timestamp,
                 },
             },
+            proto::operation::Variant::UpdateCompletionTriggers(message) => {
+                Operation::UpdateCompletionTriggers {
+                    triggers: message.triggers,
+                }
+            }
         },
     )
 }

crates/lsp/src/lsp.rs 🔗

@@ -1,8 +1,8 @@
 use anyhow::{anyhow, Context, Result};
 use futures::{io::BufWriter, AsyncRead, AsyncWrite};
 use gpui::{executor, Task};
-use parking_lot::{Mutex, RwLock, RwLockReadGuard};
-use postage::{barrier, oneshot, prelude::Stream, sink::Sink};
+use parking_lot::{Mutex, RwLock};
+use postage::{barrier, oneshot, prelude::Stream, sink::Sink, watch};
 use serde::{Deserialize, Serialize};
 use serde_json::{json, value::RawValue, Value};
 use smol::{
@@ -34,7 +34,7 @@ type ResponseHandler = Box<dyn Send + FnOnce(Result<&str, Error>)>;
 pub struct LanguageServer {
     next_id: AtomicUsize,
     outbound_tx: RwLock<Option<channel::Sender<Vec<u8>>>>,
-    capabilities: RwLock<lsp_types::ServerCapabilities>,
+    capabilities: watch::Receiver<Option<ServerCapabilities>>,
     notification_handlers: Arc<RwLock<HashMap<&'static str, NotificationHandler>>>,
     response_handlers: Arc<Mutex<HashMap<usize, ResponseHandler>>>,
     executor: Arc<executor::Background>,
@@ -195,10 +195,11 @@ impl LanguageServer {
         );
 
         let (initialized_tx, initialized_rx) = barrier::channel();
+        let (mut capabilities_tx, capabilities_rx) = watch::channel();
         let this = Arc::new(Self {
             notification_handlers,
             response_handlers,
-            capabilities: Default::default(),
+            capabilities: capabilities_rx,
             next_id: Default::default(),
             outbound_tx: RwLock::new(Some(outbound_tx)),
             executor: executor.clone(),
@@ -212,7 +213,10 @@ impl LanguageServer {
             .spawn({
                 let this = this.clone();
                 async move {
-                    this.init(root_uri).log_err().await;
+                    if let Some(capabilities) = this.init(root_uri).log_err().await {
+                        *capabilities_tx.borrow_mut() = Some(capabilities);
+                    }
+
                     drop(initialized_tx);
                 }
             })
@@ -221,7 +225,7 @@ impl LanguageServer {
         Ok(this)
     }
 
-    async fn init(self: Arc<Self>, root_uri: Url) -> Result<()> {
+    async fn init(self: Arc<Self>, root_uri: Url) -> Result<ServerCapabilities> {
         #[allow(deprecated)]
         let params = InitializeParams {
             process_id: Default::default(),
@@ -269,12 +273,11 @@ impl LanguageServer {
             params,
         );
         let response = request.await?;
-        *this.capabilities.write() = response.capabilities;
         Self::notify_internal::<notification::Initialized>(
             this.outbound_tx.read().as_ref(),
             InitializedParams {},
         )?;
-        Ok(())
+        Ok(response.capabilities)
     }
 
     pub fn shutdown(&self) -> Option<impl 'static + Send + Future<Output = Result<()>>> {
@@ -328,8 +331,8 @@ impl LanguageServer {
         }
     }
 
-    pub fn capabilities(&self) -> RwLockReadGuard<ServerCapabilities> {
-        self.capabilities.read()
+    pub fn capabilities(&self) -> watch::Receiver<Option<ServerCapabilities>> {
+        self.capabilities.clone()
     }
 
     pub fn request<T: request::Request>(

crates/rpc/proto/zed.proto 🔗

@@ -358,6 +358,7 @@ message BufferState {
     repeated Diagnostic diagnostics = 9;
     uint32 lamport_timestamp = 10;
     repeated Operation deferred_operations = 11;
+    repeated string completion_triggers = 12;
 }
 
 message BufferFragment {
@@ -428,6 +429,7 @@ message Operation {
         Undo undo = 2;
         UpdateSelections update_selections = 3;
         UpdateDiagnostics update_diagnostics = 4;
+        UpdateCompletionTriggers update_completion_triggers = 5;
     }
 
     message Edit {
@@ -453,6 +455,10 @@ message Operation {
         uint32 lamport_timestamp = 2;
         repeated Selection selections = 3;
     }
+
+    message UpdateCompletionTriggers {
+        repeated string triggers = 1;
+    }
 }
 
 message UndoMapEntry {