util.rs

  1use anyhow::{Result, anyhow};
  2use futures::channel::mpsc;
  3use futures::{FutureExt as _, StreamExt as _};
  4use gpui::{AsyncApp, Entity, Task};
  5use language::{Buffer, LanguageId, LanguageServerId, ParseStatus};
  6use project::{Project, ProjectPath, Worktree};
  7use std::collections::HashSet;
  8use std::sync::Arc;
  9use std::time::Duration;
 10use util::rel_path::RelPath;
 11
 12pub fn open_buffer(
 13    project: Entity<Project>,
 14    worktree: Entity<Worktree>,
 15    path: Arc<RelPath>,
 16    cx: &AsyncApp,
 17) -> Task<Result<Entity<Buffer>>> {
 18    cx.spawn(async move |cx| {
 19        let project_path = worktree.read_with(cx, |worktree, _cx| ProjectPath {
 20            worktree_id: worktree.id(),
 21            path,
 22        })?;
 23
 24        let buffer = project
 25            .update(cx, |project, cx| project.open_buffer(project_path, cx))?
 26            .await?;
 27
 28        let mut parse_status = buffer.read_with(cx, |buffer, _cx| buffer.parse_status())?;
 29        while *parse_status.borrow() != ParseStatus::Idle {
 30            parse_status.changed().await?;
 31        }
 32
 33        Ok(buffer)
 34    })
 35}
 36
 37pub async fn open_buffer_with_language_server(
 38    project: Entity<Project>,
 39    worktree: Entity<Worktree>,
 40    path: Arc<RelPath>,
 41    ready_languages: &mut HashSet<LanguageId>,
 42    cx: &mut AsyncApp,
 43) -> Result<(Entity<Entity<Buffer>>, LanguageServerId, Entity<Buffer>)> {
 44    let buffer = open_buffer(project.clone(), worktree, path.clone(), cx).await?;
 45
 46    let (lsp_open_handle, path_style) = project.update(cx, |project, cx| {
 47        (
 48            project.register_buffer_with_language_servers(&buffer, cx),
 49            project.path_style(cx),
 50        )
 51    })?;
 52
 53    let Some(language_id) = buffer.read_with(cx, |buffer, _cx| {
 54        buffer.language().map(|language| language.id())
 55    })?
 56    else {
 57        return Err(anyhow!("No language for {}", path.display(path_style)));
 58    };
 59
 60    let log_prefix = path.display(path_style);
 61    if !ready_languages.contains(&language_id) {
 62        wait_for_lang_server(&project, &buffer, log_prefix.into_owned(), cx).await?;
 63        ready_languages.insert(language_id);
 64    }
 65
 66    let lsp_store = project.read_with(cx, |project, _cx| project.lsp_store())?;
 67
 68    // hacky wait for buffer to be registered with the language server
 69    for _ in 0..100 {
 70        let Some(language_server_id) = lsp_store.update(cx, |lsp_store, cx| {
 71            buffer.update(cx, |buffer, cx| {
 72                lsp_store
 73                    .language_servers_for_local_buffer(&buffer, cx)
 74                    .next()
 75                    .map(|(_, language_server)| language_server.server_id())
 76            })
 77        })?
 78        else {
 79            cx.background_executor()
 80                .timer(Duration::from_millis(10))
 81                .await;
 82            continue;
 83        };
 84
 85        return Ok((lsp_open_handle, language_server_id, buffer));
 86    }
 87
 88    return Err(anyhow!("No language server found for buffer"));
 89}
 90
 91// TODO: Dedupe with similar function in crates/eval/src/instance.rs
 92pub fn wait_for_lang_server(
 93    project: &Entity<Project>,
 94    buffer: &Entity<Buffer>,
 95    log_prefix: String,
 96    cx: &mut AsyncApp,
 97) -> Task<Result<()>> {
 98    println!("{}⏵ Waiting for language server", log_prefix);
 99
100    let (mut tx, mut rx) = mpsc::channel(1);
101
102    let lsp_store = project
103        .read_with(cx, |project, _| project.lsp_store())
104        .unwrap();
105
106    let has_lang_server = buffer
107        .update(cx, |buffer, cx| {
108            lsp_store.update(cx, |lsp_store, cx| {
109                lsp_store
110                    .language_servers_for_local_buffer(buffer, cx)
111                    .next()
112                    .is_some()
113            })
114        })
115        .unwrap_or(false);
116
117    if has_lang_server {
118        project
119            .update(cx, |project, cx| project.save_buffer(buffer.clone(), cx))
120            .unwrap()
121            .detach();
122    }
123    let (mut added_tx, mut added_rx) = mpsc::channel(1);
124
125    let subscriptions = [
126        cx.subscribe(&lsp_store, {
127            let log_prefix = log_prefix.clone();
128            move |_, event, _| {
129                if let project::LspStoreEvent::LanguageServerUpdate {
130                    message:
131                        client::proto::update_language_server::Variant::WorkProgress(
132                            client::proto::LspWorkProgress {
133                                message: Some(message),
134                                ..
135                            },
136                        ),
137                    ..
138                } = event
139                {
140                    println!("{}{message}", log_prefix)
141                }
142            }
143        }),
144        cx.subscribe(project, {
145            let buffer = buffer.clone();
146            move |project, event, cx| match event {
147                project::Event::LanguageServerAdded(_, _, _) => {
148                    let buffer = buffer.clone();
149                    project
150                        .update(cx, |project, cx| project.save_buffer(buffer, cx))
151                        .detach();
152                    added_tx.try_send(()).ok();
153                }
154                project::Event::DiskBasedDiagnosticsFinished { .. } => {
155                    tx.try_send(()).ok();
156                }
157                _ => {}
158            }
159        }),
160    ];
161
162    cx.spawn(async move |cx| {
163        if !has_lang_server {
164            // some buffers never have a language server, so this aborts quickly in that case.
165            let timeout = cx.background_executor().timer(Duration::from_secs(5));
166            futures::select! {
167                _ = added_rx.next() => {},
168                _ = timeout.fuse() => {
169                    anyhow::bail!("Waiting for language server add timed out after 5 seconds");
170                }
171            };
172        }
173        let timeout = cx.background_executor().timer(Duration::from_secs(60 * 5));
174        let result = futures::select! {
175            _ = rx.next() => {
176                println!("{}⚑ Language server idle", log_prefix);
177                anyhow::Ok(())
178            },
179            _ = timeout.fuse() => {
180                anyhow::bail!("LSP wait timed out after 5 minutes");
181            }
182        };
183        drop(subscriptions);
184        result
185    })
186}