util.rs

  1use anyhow::{Result, anyhow};
  2use futures::channel::mpsc;
  3use futures::{FutureExt as _, StreamExt as _};
  4use gpui::{AsyncApp, Entity, Task};
  5use language::{Buffer, LanguageId, LanguageNotFound, LanguageServerId, ParseStatus};
  6use project::lsp_store::OpenLspBufferHandle;
  7use project::{Project, ProjectPath, Worktree};
  8use std::collections::HashSet;
  9use std::sync::Arc;
 10use std::time::Duration;
 11use util::rel_path::RelPath;
 12
 13pub fn open_buffer(
 14    project: Entity<Project>,
 15    worktree: Entity<Worktree>,
 16    path: Arc<RelPath>,
 17    cx: &AsyncApp,
 18) -> Task<Result<Entity<Buffer>>> {
 19    cx.spawn(async move |cx| {
 20        let project_path = worktree.read_with(cx, |worktree, _cx| ProjectPath {
 21            worktree_id: worktree.id(),
 22            path,
 23        })?;
 24
 25        let buffer = project
 26            .update(cx, |project, cx| project.open_buffer(project_path, cx))?
 27            .await?;
 28
 29        let mut parse_status = buffer.read_with(cx, |buffer, _cx| buffer.parse_status())?;
 30        while *parse_status.borrow() != ParseStatus::Idle {
 31            parse_status.changed().await?;
 32        }
 33
 34        Ok(buffer)
 35    })
 36}
 37
 38pub async fn open_buffer_with_language_server(
 39    project: Entity<Project>,
 40    worktree: Entity<Worktree>,
 41    path: Arc<RelPath>,
 42    ready_languages: &mut HashSet<LanguageId>,
 43    cx: &mut AsyncApp,
 44) -> Result<(OpenLspBufferHandle, LanguageServerId, Entity<Buffer>)> {
 45    let buffer = open_buffer(project.clone(), worktree, path.clone(), cx).await?;
 46
 47    let (lsp_open_handle, path_style) = project.update(cx, |project, cx| {
 48        (
 49            project.register_buffer_with_language_servers(&buffer, cx),
 50            project.path_style(cx),
 51        )
 52    })?;
 53
 54    let language_registry = project.read_with(cx, |project, _| project.languages().clone())?;
 55    let result = language_registry
 56        .load_language_for_file_path(path.as_std_path())
 57        .await;
 58
 59    if let Err(error) = result
 60        && !error.is::<LanguageNotFound>()
 61    {
 62        anyhow::bail!(error);
 63    }
 64
 65    let Some(language_id) = buffer.read_with(cx, |buffer, _cx| {
 66        buffer.language().map(|language| language.id())
 67    })?
 68    else {
 69        return Err(anyhow!("No language for {}", path.display(path_style)));
 70    };
 71
 72    let log_prefix = format!("{} | ", path.display(path_style));
 73    if !ready_languages.contains(&language_id) {
 74        wait_for_lang_server(&project, &buffer, log_prefix, cx).await?;
 75        ready_languages.insert(language_id);
 76    }
 77
 78    let lsp_store = project.read_with(cx, |project, _cx| project.lsp_store())?;
 79
 80    // hacky wait for buffer to be registered with the language server
 81    for _ in 0..100 {
 82        let Some(language_server_id) = lsp_store.update(cx, |lsp_store, cx| {
 83            buffer.update(cx, |buffer, cx| {
 84                lsp_store
 85                    .language_servers_for_local_buffer(&buffer, cx)
 86                    .next()
 87                    .map(|(_, language_server)| language_server.server_id())
 88            })
 89        })?
 90        else {
 91            cx.background_executor()
 92                .timer(Duration::from_millis(10))
 93                .await;
 94            continue;
 95        };
 96
 97        return Ok((lsp_open_handle, language_server_id, buffer));
 98    }
 99
100    return Err(anyhow!("No language server found for buffer"));
101}
102
103// TODO: Dedupe with similar function in crates/eval/src/instance.rs
104pub fn wait_for_lang_server(
105    project: &Entity<Project>,
106    buffer: &Entity<Buffer>,
107    log_prefix: String,
108    cx: &mut AsyncApp,
109) -> Task<Result<()>> {
110    eprintln!("{}⏵ Waiting for language server", log_prefix);
111
112    let (mut tx, mut rx) = mpsc::channel(1);
113
114    let lsp_store = project
115        .read_with(cx, |project, _| project.lsp_store())
116        .unwrap();
117
118    let has_lang_server = buffer
119        .update(cx, |buffer, cx| {
120            lsp_store.update(cx, |lsp_store, cx| {
121                lsp_store
122                    .language_servers_for_local_buffer(buffer, cx)
123                    .next()
124                    .is_some()
125            })
126        })
127        .unwrap_or(false);
128
129    if has_lang_server {
130        project
131            .update(cx, |project, cx| project.save_buffer(buffer.clone(), cx))
132            .unwrap()
133            .detach();
134    }
135    let (mut added_tx, mut added_rx) = mpsc::channel(1);
136
137    let subscriptions = [
138        cx.subscribe(&lsp_store, {
139            let log_prefix = log_prefix.clone();
140            move |_, event, _| {
141                if let project::LspStoreEvent::LanguageServerUpdate {
142                    message:
143                        client::proto::update_language_server::Variant::WorkProgress(
144                            client::proto::LspWorkProgress {
145                                message: Some(message),
146                                ..
147                            },
148                        ),
149                    ..
150                } = event
151                {
152                    eprintln!("{}{message}", log_prefix)
153                }
154            }
155        }),
156        cx.subscribe(project, {
157            let buffer = buffer.clone();
158            move |project, event, cx| match event {
159                project::Event::LanguageServerAdded(_, _, _) => {
160                    let buffer = buffer.clone();
161                    project
162                        .update(cx, |project, cx| project.save_buffer(buffer, cx))
163                        .detach();
164                    added_tx.try_send(()).ok();
165                }
166                project::Event::DiskBasedDiagnosticsFinished { .. } => {
167                    tx.try_send(()).ok();
168                }
169                _ => {}
170            }
171        }),
172    ];
173
174    cx.spawn(async move |cx| {
175        if !has_lang_server {
176            // some buffers never have a language server, so this aborts quickly in that case.
177            let timeout = cx.background_executor().timer(Duration::from_secs(500));
178            futures::select! {
179                _ = added_rx.next() => {},
180                _ = timeout.fuse() => {
181                    anyhow::bail!("Waiting for language server add timed out after 5 seconds");
182                }
183            };
184        }
185        let timeout = cx.background_executor().timer(Duration::from_secs(60 * 5));
186        let result = futures::select! {
187            _ = rx.next() => {
188                eprintln!("{}⚑ Language server idle", log_prefix);
189                anyhow::Ok(())
190            },
191            _ = timeout.fuse() => {
192                anyhow::bail!("LSP wait timed out after 5 minutes");
193            }
194        };
195        drop(subscriptions);
196        result
197    })
198}