1use anyhow::{Result, anyhow};
2use futures::channel::mpsc;
3use futures::{FutureExt as _, StreamExt as _};
4use gpui::{AsyncApp, Entity, Task};
5use language::{Buffer, LanguageId, LanguageServerId, ParseStatus};
6use project::{Project, ProjectPath, Worktree};
7use std::collections::HashSet;
8use std::sync::Arc;
9use std::time::Duration;
10use util::rel_path::RelPath;
11
12pub fn open_buffer(
13 project: Entity<Project>,
14 worktree: Entity<Worktree>,
15 path: Arc<RelPath>,
16 cx: &AsyncApp,
17) -> Task<Result<Entity<Buffer>>> {
18 cx.spawn(async move |cx| {
19 let project_path = worktree.read_with(cx, |worktree, _cx| ProjectPath {
20 worktree_id: worktree.id(),
21 path,
22 })?;
23
24 let buffer = project
25 .update(cx, |project, cx| project.open_buffer(project_path, cx))?
26 .await?;
27
28 let mut parse_status = buffer.read_with(cx, |buffer, _cx| buffer.parse_status())?;
29 while *parse_status.borrow() != ParseStatus::Idle {
30 parse_status.changed().await?;
31 }
32
33 Ok(buffer)
34 })
35}
36
37pub async fn open_buffer_with_language_server(
38 project: Entity<Project>,
39 worktree: Entity<Worktree>,
40 path: Arc<RelPath>,
41 ready_languages: &mut HashSet<LanguageId>,
42 cx: &mut AsyncApp,
43) -> Result<(Entity<Entity<Buffer>>, LanguageServerId, Entity<Buffer>)> {
44 let buffer = open_buffer(project.clone(), worktree, path.clone(), cx).await?;
45
46 let (lsp_open_handle, path_style) = project.update(cx, |project, cx| {
47 (
48 project.register_buffer_with_language_servers(&buffer, cx),
49 project.path_style(cx),
50 )
51 })?;
52
53 let Some(language_id) = buffer.read_with(cx, |buffer, _cx| {
54 buffer.language().map(|language| language.id())
55 })?
56 else {
57 return Err(anyhow!("No language for {}", path.display(path_style)));
58 };
59
60 let log_prefix = path.display(path_style);
61 if !ready_languages.contains(&language_id) {
62 wait_for_lang_server(&project, &buffer, log_prefix.into_owned(), cx).await?;
63 ready_languages.insert(language_id);
64 }
65
66 let lsp_store = project.read_with(cx, |project, _cx| project.lsp_store())?;
67
68 // hacky wait for buffer to be registered with the language server
69 for _ in 0..100 {
70 let Some(language_server_id) = lsp_store.update(cx, |lsp_store, cx| {
71 buffer.update(cx, |buffer, cx| {
72 lsp_store
73 .language_servers_for_local_buffer(&buffer, cx)
74 .next()
75 .map(|(_, language_server)| language_server.server_id())
76 })
77 })?
78 else {
79 cx.background_executor()
80 .timer(Duration::from_millis(10))
81 .await;
82 continue;
83 };
84
85 return Ok((lsp_open_handle, language_server_id, buffer));
86 }
87
88 return Err(anyhow!("No language server found for buffer"));
89}
90
91// TODO: Dedupe with similar function in crates/eval/src/instance.rs
92pub fn wait_for_lang_server(
93 project: &Entity<Project>,
94 buffer: &Entity<Buffer>,
95 log_prefix: String,
96 cx: &mut AsyncApp,
97) -> Task<Result<()>> {
98 println!("{}⏵ Waiting for language server", log_prefix);
99
100 let (mut tx, mut rx) = mpsc::channel(1);
101
102 let lsp_store = project
103 .read_with(cx, |project, _| project.lsp_store())
104 .unwrap();
105
106 let has_lang_server = buffer
107 .update(cx, |buffer, cx| {
108 lsp_store.update(cx, |lsp_store, cx| {
109 lsp_store
110 .language_servers_for_local_buffer(buffer, cx)
111 .next()
112 .is_some()
113 })
114 })
115 .unwrap_or(false);
116
117 if has_lang_server {
118 project
119 .update(cx, |project, cx| project.save_buffer(buffer.clone(), cx))
120 .unwrap()
121 .detach();
122 }
123 let (mut added_tx, mut added_rx) = mpsc::channel(1);
124
125 let subscriptions = [
126 cx.subscribe(&lsp_store, {
127 let log_prefix = log_prefix.clone();
128 move |_, event, _| {
129 if let project::LspStoreEvent::LanguageServerUpdate {
130 message:
131 client::proto::update_language_server::Variant::WorkProgress(
132 client::proto::LspWorkProgress {
133 message: Some(message),
134 ..
135 },
136 ),
137 ..
138 } = event
139 {
140 println!("{}⟲ {message}", log_prefix)
141 }
142 }
143 }),
144 cx.subscribe(project, {
145 let buffer = buffer.clone();
146 move |project, event, cx| match event {
147 project::Event::LanguageServerAdded(_, _, _) => {
148 let buffer = buffer.clone();
149 project
150 .update(cx, |project, cx| project.save_buffer(buffer, cx))
151 .detach();
152 added_tx.try_send(()).ok();
153 }
154 project::Event::DiskBasedDiagnosticsFinished { .. } => {
155 tx.try_send(()).ok();
156 }
157 _ => {}
158 }
159 }),
160 ];
161
162 cx.spawn(async move |cx| {
163 if !has_lang_server {
164 // some buffers never have a language server, so this aborts quickly in that case.
165 let timeout = cx.background_executor().timer(Duration::from_secs(5));
166 futures::select! {
167 _ = added_rx.next() => {},
168 _ = timeout.fuse() => {
169 anyhow::bail!("Waiting for language server add timed out after 5 seconds");
170 }
171 };
172 }
173 let timeout = cx.background_executor().timer(Duration::from_secs(60 * 5));
174 let result = futures::select! {
175 _ = rx.next() => {
176 println!("{}⚑ Language server idle", log_prefix);
177 anyhow::Ok(())
178 },
179 _ = timeout.fuse() => {
180 anyhow::bail!("LSP wait timed out after 5 minutes");
181 }
182 };
183 drop(subscriptions);
184 result
185 })
186}