1use anyhow::{Result, anyhow};
2use futures::channel::mpsc;
3use futures::{FutureExt as _, StreamExt as _};
4use gpui::{AsyncApp, Entity, Task};
5use language::{Buffer, LanguageId, LanguageNotFound, LanguageServerId, ParseStatus};
6use project::lsp_store::OpenLspBufferHandle;
7use project::{Project, ProjectPath, Worktree};
8use std::collections::HashSet;
9use std::sync::Arc;
10use std::time::Duration;
11use util::rel_path::RelPath;
12
13pub fn open_buffer(
14 project: Entity<Project>,
15 worktree: Entity<Worktree>,
16 path: Arc<RelPath>,
17 cx: &AsyncApp,
18) -> Task<Result<Entity<Buffer>>> {
19 cx.spawn(async move |cx| {
20 let project_path = worktree.read_with(cx, |worktree, _cx| ProjectPath {
21 worktree_id: worktree.id(),
22 path,
23 })?;
24
25 let buffer = project
26 .update(cx, |project, cx| project.open_buffer(project_path, cx))?
27 .await?;
28
29 let mut parse_status = buffer.read_with(cx, |buffer, _cx| buffer.parse_status())?;
30 while *parse_status.borrow() != ParseStatus::Idle {
31 parse_status.changed().await?;
32 }
33
34 Ok(buffer)
35 })
36}
37
38pub async fn open_buffer_with_language_server(
39 project: Entity<Project>,
40 worktree: Entity<Worktree>,
41 path: Arc<RelPath>,
42 ready_languages: &mut HashSet<LanguageId>,
43 cx: &mut AsyncApp,
44) -> Result<(OpenLspBufferHandle, LanguageServerId, Entity<Buffer>)> {
45 let buffer = open_buffer(project.clone(), worktree, path.clone(), cx).await?;
46
47 let (lsp_open_handle, path_style) = project.update(cx, |project, cx| {
48 (
49 project.register_buffer_with_language_servers(&buffer, cx),
50 project.path_style(cx),
51 )
52 })?;
53
54 let language_registry = project.read_with(cx, |project, _| project.languages().clone())?;
55 let result = language_registry
56 .load_language_for_file_path(path.as_std_path())
57 .await;
58
59 if let Err(error) = result
60 && !error.is::<LanguageNotFound>()
61 {
62 anyhow::bail!(error);
63 }
64
65 let Some(language_id) = buffer.read_with(cx, |buffer, _cx| {
66 buffer.language().map(|language| language.id())
67 })?
68 else {
69 return Err(anyhow!("No language for {}", path.display(path_style)));
70 };
71
72 let log_prefix = format!("{} | ", path.display(path_style));
73 if !ready_languages.contains(&language_id) {
74 wait_for_lang_server(&project, &buffer, log_prefix, cx).await?;
75 ready_languages.insert(language_id);
76 }
77
78 let lsp_store = project.read_with(cx, |project, _cx| project.lsp_store())?;
79
80 // hacky wait for buffer to be registered with the language server
81 for _ in 0..100 {
82 let Some(language_server_id) = lsp_store.update(cx, |lsp_store, cx| {
83 buffer.update(cx, |buffer, cx| {
84 lsp_store
85 .language_servers_for_local_buffer(&buffer, cx)
86 .next()
87 .map(|(_, language_server)| language_server.server_id())
88 })
89 })?
90 else {
91 cx.background_executor()
92 .timer(Duration::from_millis(10))
93 .await;
94 continue;
95 };
96
97 return Ok((lsp_open_handle, language_server_id, buffer));
98 }
99
100 return Err(anyhow!("No language server found for buffer"));
101}
102
103// TODO: Dedupe with similar function in crates/eval/src/instance.rs
104pub fn wait_for_lang_server(
105 project: &Entity<Project>,
106 buffer: &Entity<Buffer>,
107 log_prefix: String,
108 cx: &mut AsyncApp,
109) -> Task<Result<()>> {
110 eprintln!("{}⏵ Waiting for language server", log_prefix);
111
112 let (mut tx, mut rx) = mpsc::channel(1);
113
114 let lsp_store = project
115 .read_with(cx, |project, _| project.lsp_store())
116 .unwrap();
117
118 let has_lang_server = buffer
119 .update(cx, |buffer, cx| {
120 lsp_store.update(cx, |lsp_store, cx| {
121 lsp_store
122 .language_servers_for_local_buffer(buffer, cx)
123 .next()
124 .is_some()
125 })
126 })
127 .unwrap_or(false);
128
129 if has_lang_server {
130 project
131 .update(cx, |project, cx| project.save_buffer(buffer.clone(), cx))
132 .unwrap()
133 .detach();
134 }
135 let (mut added_tx, mut added_rx) = mpsc::channel(1);
136
137 let subscriptions = [
138 cx.subscribe(&lsp_store, {
139 let log_prefix = log_prefix.clone();
140 move |_, event, _| {
141 if let project::LspStoreEvent::LanguageServerUpdate {
142 message:
143 client::proto::update_language_server::Variant::WorkProgress(
144 client::proto::LspWorkProgress {
145 message: Some(message),
146 ..
147 },
148 ),
149 ..
150 } = event
151 {
152 eprintln!("{}⟲ {message}", log_prefix)
153 }
154 }
155 }),
156 cx.subscribe(project, {
157 let buffer = buffer.clone();
158 move |project, event, cx| match event {
159 project::Event::LanguageServerAdded(_, _, _) => {
160 let buffer = buffer.clone();
161 project
162 .update(cx, |project, cx| project.save_buffer(buffer, cx))
163 .detach();
164 added_tx.try_send(()).ok();
165 }
166 project::Event::DiskBasedDiagnosticsFinished { .. } => {
167 tx.try_send(()).ok();
168 }
169 _ => {}
170 }
171 }),
172 ];
173
174 cx.spawn(async move |cx| {
175 if !has_lang_server {
176 // some buffers never have a language server, so this aborts quickly in that case.
177 let timeout = cx.background_executor().timer(Duration::from_secs(500));
178 futures::select! {
179 _ = added_rx.next() => {},
180 _ = timeout.fuse() => {
181 anyhow::bail!("Waiting for language server add timed out after 5 seconds");
182 }
183 };
184 }
185 let timeout = cx.background_executor().timer(Duration::from_secs(60 * 5));
186 let result = futures::select! {
187 _ = rx.next() => {
188 eprintln!("{}⚑ Language server idle", log_prefix);
189 anyhow::Ok(())
190 },
191 _ = timeout.fuse() => {
192 anyhow::bail!("LSP wait timed out after 5 minutes");
193 }
194 };
195 drop(subscriptions);
196 result
197 })
198}