1use acp_thread::{
2 AgentConnection, AgentSessionInfo, AgentSessionList, AgentSessionListRequest,
3 AgentSessionListResponse,
4};
5use acp_tools::AcpConnectionRegistry;
6use action_log::ActionLog;
7use agent_client_protocol::{self as acp, Agent as _, ErrorCode};
8use anyhow::anyhow;
9use collections::HashMap;
10use futures::AsyncBufReadExt as _;
11use futures::io::BufReader;
12use project::agent_server_store::{AgentServerCommand, GEMINI_ID};
13use project::{AgentId, Project};
14use serde::Deserialize;
15use settings::Settings as _;
16use task::ShellBuilder;
17use util::ResultExt as _;
18use util::path_list::PathList;
19use util::process::Child;
20
21use std::path::PathBuf;
22use std::process::Stdio;
23use std::rc::Rc;
24use std::{any::Any, cell::RefCell};
25use thiserror::Error;
26
27use anyhow::{Context as _, Result};
28use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
29
30use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
31use terminal::TerminalBuilder;
32use terminal::terminal_settings::{AlternateScroll, CursorShape, TerminalSettings};
33
34#[derive(Debug, Error)]
35#[error("Unsupported version")]
36pub struct UnsupportedVersion;
37
38pub struct AcpConnection {
39 id: AgentId,
40 display_name: SharedString,
41 telemetry_id: SharedString,
42 connection: Rc<acp::ClientSideConnection>,
43 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
44 auth_methods: Vec<acp::AuthMethod>,
45 agent_capabilities: acp::AgentCapabilities,
46 default_mode: Option<acp::SessionModeId>,
47 default_model: Option<acp::ModelId>,
48 default_config_options: HashMap<String, String>,
49 child: Child,
50 session_list: Option<Rc<AcpSessionList>>,
51 _io_task: Task<Result<(), acp::Error>>,
52 _wait_task: Task<Result<()>>,
53 _stderr_task: Task<Result<()>>,
54}
55
56struct ConfigOptions {
57 config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
58 tx: Rc<RefCell<watch::Sender<()>>>,
59 rx: watch::Receiver<()>,
60}
61
62impl ConfigOptions {
63 fn new(config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>) -> Self {
64 let (tx, rx) = watch::channel(());
65 Self {
66 config_options,
67 tx: Rc::new(RefCell::new(tx)),
68 rx,
69 }
70 }
71}
72
73pub struct AcpSession {
74 thread: WeakEntity<AcpThread>,
75 suppress_abort_err: bool,
76 models: Option<Rc<RefCell<acp::SessionModelState>>>,
77 session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
78 config_options: Option<ConfigOptions>,
79}
80
81pub struct AcpSessionList {
82 connection: Rc<acp::ClientSideConnection>,
83 updates_tx: smol::channel::Sender<acp_thread::SessionListUpdate>,
84 updates_rx: smol::channel::Receiver<acp_thread::SessionListUpdate>,
85}
86
87impl AcpSessionList {
88 fn new(connection: Rc<acp::ClientSideConnection>) -> Self {
89 let (tx, rx) = smol::channel::unbounded();
90 Self {
91 connection,
92 updates_tx: tx,
93 updates_rx: rx,
94 }
95 }
96
97 fn notify_update(&self) {
98 self.updates_tx
99 .try_send(acp_thread::SessionListUpdate::Refresh)
100 .log_err();
101 }
102
103 fn send_info_update(&self, session_id: acp::SessionId, update: acp::SessionInfoUpdate) {
104 self.updates_tx
105 .try_send(acp_thread::SessionListUpdate::SessionInfo { session_id, update })
106 .log_err();
107 }
108}
109
110impl AgentSessionList for AcpSessionList {
111 fn list_sessions(
112 &self,
113 request: AgentSessionListRequest,
114 cx: &mut App,
115 ) -> Task<Result<AgentSessionListResponse>> {
116 let conn = self.connection.clone();
117 cx.foreground_executor().spawn(async move {
118 let acp_request = acp::ListSessionsRequest::new()
119 .cwd(request.cwd)
120 .cursor(request.cursor);
121 let response = conn.list_sessions(acp_request).await?;
122 Ok(AgentSessionListResponse {
123 sessions: response
124 .sessions
125 .into_iter()
126 .map(|s| AgentSessionInfo {
127 session_id: s.session_id,
128 work_dirs: Some(PathList::new(&[s.cwd])),
129 title: s.title.map(Into::into),
130 updated_at: s.updated_at.and_then(|date_str| {
131 chrono::DateTime::parse_from_rfc3339(&date_str)
132 .ok()
133 .map(|dt| dt.with_timezone(&chrono::Utc))
134 }),
135 created_at: None,
136 meta: s.meta,
137 })
138 .collect(),
139 next_cursor: response.next_cursor,
140 meta: response.meta,
141 })
142 })
143 }
144
145 fn watch(
146 &self,
147 _cx: &mut App,
148 ) -> Option<smol::channel::Receiver<acp_thread::SessionListUpdate>> {
149 Some(self.updates_rx.clone())
150 }
151
152 fn notify_refresh(&self) {
153 self.notify_update();
154 }
155
156 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
157 self
158 }
159}
160
161pub async fn connect(
162 agent_id: AgentId,
163 display_name: SharedString,
164 command: AgentServerCommand,
165 default_mode: Option<acp::SessionModeId>,
166 default_model: Option<acp::ModelId>,
167 default_config_options: HashMap<String, String>,
168 cx: &mut AsyncApp,
169) -> Result<Rc<dyn AgentConnection>> {
170 let conn = AcpConnection::stdio(
171 agent_id,
172 display_name,
173 command.clone(),
174 default_mode,
175 default_model,
176 default_config_options,
177 cx,
178 )
179 .await?;
180 Ok(Rc::new(conn) as _)
181}
182
183const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::ProtocolVersion::V1;
184
185impl AcpConnection {
186 pub async fn stdio(
187 agent_id: AgentId,
188 display_name: SharedString,
189 command: AgentServerCommand,
190 default_mode: Option<acp::SessionModeId>,
191 default_model: Option<acp::ModelId>,
192 default_config_options: HashMap<String, String>,
193 cx: &mut AsyncApp,
194 ) -> Result<Self> {
195 let shell = cx.update(|cx| TerminalSettings::get(None, cx).shell.clone());
196 let builder = ShellBuilder::new(&shell, cfg!(windows)).non_interactive();
197 let mut child =
198 builder.build_std_command(Some(command.path.display().to_string()), &command.args);
199 child.envs(command.env.iter().flatten());
200 let mut child = Child::spawn(child, Stdio::piped(), Stdio::piped(), Stdio::piped())?;
201
202 let stdout = child.stdout.take().context("Failed to take stdout")?;
203 let stdin = child.stdin.take().context("Failed to take stdin")?;
204 let stderr = child.stderr.take().context("Failed to take stderr")?;
205 log::debug!(
206 "Spawning external agent server: {:?}, {:?}",
207 command.path,
208 command.args
209 );
210 log::trace!("Spawned (pid: {})", child.id());
211
212 let sessions = Rc::new(RefCell::new(HashMap::default()));
213
214 let (release_channel, version): (Option<&str>, String) = cx.update(|cx| {
215 (
216 release_channel::ReleaseChannel::try_global(cx)
217 .map(|release_channel| release_channel.display_name()),
218 release_channel::AppVersion::global(cx).to_string(),
219 )
220 });
221
222 let client_session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>> =
223 Rc::new(RefCell::new(None));
224
225 let client = ClientDelegate {
226 sessions: sessions.clone(),
227 session_list: client_session_list.clone(),
228 cx: cx.clone(),
229 };
230 let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
231 let foreground_executor = cx.foreground_executor().clone();
232 move |fut| {
233 foreground_executor.spawn(fut).detach();
234 }
235 });
236
237 let io_task = cx.background_spawn(io_task);
238
239 let stderr_task = cx.background_spawn(async move {
240 let mut stderr = BufReader::new(stderr);
241 let mut line = String::new();
242 while let Ok(n) = stderr.read_line(&mut line).await
243 && n > 0
244 {
245 log::warn!("agent stderr: {}", line.trim());
246 line.clear();
247 }
248 Ok(())
249 });
250
251 let wait_task = cx.spawn({
252 let sessions = sessions.clone();
253 let status_fut = child.status();
254 async move |cx| {
255 let status = status_fut.await?;
256
257 for session in sessions.borrow().values() {
258 session
259 .thread
260 .update(cx, |thread, cx| {
261 thread.emit_load_error(LoadError::Exited { status }, cx)
262 })
263 .ok();
264 }
265
266 anyhow::Ok(())
267 }
268 });
269
270 let connection = Rc::new(connection);
271
272 cx.update(|cx| {
273 AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
274 registry.set_active_connection(agent_id.clone(), &connection, cx)
275 });
276 });
277
278 let response = connection
279 .initialize(
280 acp::InitializeRequest::new(acp::ProtocolVersion::V1)
281 .client_capabilities(
282 acp::ClientCapabilities::new()
283 .fs(acp::FileSystemCapabilities::new()
284 .read_text_file(true)
285 .write_text_file(true))
286 .terminal(true)
287 // Experimental: Allow for rendering terminal output from the agents
288 .meta(acp::Meta::from_iter([
289 ("terminal_output".into(), true.into()),
290 ("terminal-auth".into(), true.into()),
291 ])),
292 )
293 .client_info(
294 acp::Implementation::new("zed", version)
295 .title(release_channel.map(ToOwned::to_owned)),
296 ),
297 )
298 .await?;
299
300 if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
301 return Err(UnsupportedVersion.into());
302 }
303
304 let telemetry_id = response
305 .agent_info
306 // Use the one the agent provides if we have one
307 .map(|info| info.name.into())
308 // Otherwise, just use the name
309 .unwrap_or_else(|| agent_id.0.to_string().into());
310
311 let session_list = if response
312 .agent_capabilities
313 .session_capabilities
314 .list
315 .is_some()
316 {
317 let list = Rc::new(AcpSessionList::new(connection.clone()));
318 *client_session_list.borrow_mut() = Some(list.clone());
319 Some(list)
320 } else {
321 None
322 };
323
324 // TODO: Remove this override once Google team releases their official auth methods
325 let auth_methods = if agent_id.0.as_ref() == GEMINI_ID {
326 let mut args = command.args.clone();
327 args.retain(|a| a != "--experimental-acp" && a != "--acp");
328 let value = serde_json::json!({
329 "label": "gemini /auth",
330 "command": command.path.to_string_lossy().into_owned(),
331 "args": args,
332 "env": command.env.clone().unwrap_or_default(),
333 });
334 let meta = acp::Meta::from_iter([("terminal-auth".to_string(), value)]);
335 vec![acp::AuthMethod::Agent(
336 acp::AuthMethodAgent::new("spawn-gemini-cli", "Login")
337 .description("Login with your Google or Vertex AI account")
338 .meta(meta),
339 )]
340 } else {
341 response.auth_methods
342 };
343 Ok(Self {
344 id: agent_id,
345 auth_methods,
346 connection,
347 display_name,
348 telemetry_id,
349 sessions,
350 agent_capabilities: response.agent_capabilities,
351 default_mode,
352 default_model,
353 default_config_options,
354 session_list,
355 _io_task: io_task,
356 _wait_task: wait_task,
357 _stderr_task: stderr_task,
358 child,
359 })
360 }
361
362 pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
363 &self.agent_capabilities.prompt_capabilities
364 }
365
366 fn apply_default_config_options(
367 &self,
368 session_id: &acp::SessionId,
369 config_options: &Rc<RefCell<Vec<acp::SessionConfigOption>>>,
370 cx: &mut AsyncApp,
371 ) {
372 let id = self.id.clone();
373 let defaults_to_apply: Vec<_> = {
374 let config_opts_ref = config_options.borrow();
375 config_opts_ref
376 .iter()
377 .filter_map(|config_option| {
378 let default_value = self.default_config_options.get(&*config_option.id.0)?;
379
380 let is_valid = match &config_option.kind {
381 acp::SessionConfigKind::Select(select) => match &select.options {
382 acp::SessionConfigSelectOptions::Ungrouped(options) => options
383 .iter()
384 .any(|opt| &*opt.value.0 == default_value.as_str()),
385 acp::SessionConfigSelectOptions::Grouped(groups) => {
386 groups.iter().any(|g| {
387 g.options
388 .iter()
389 .any(|opt| &*opt.value.0 == default_value.as_str())
390 })
391 }
392 _ => false,
393 },
394 _ => false,
395 };
396
397 if is_valid {
398 let initial_value = match &config_option.kind {
399 acp::SessionConfigKind::Select(select) => {
400 Some(select.current_value.clone())
401 }
402 _ => None,
403 };
404 Some((
405 config_option.id.clone(),
406 default_value.clone(),
407 initial_value,
408 ))
409 } else {
410 log::warn!(
411 "`{}` is not a valid value for config option `{}` in {}",
412 default_value,
413 config_option.id.0,
414 id
415 );
416 None
417 }
418 })
419 .collect()
420 };
421
422 for (config_id, default_value, initial_value) in defaults_to_apply {
423 cx.spawn({
424 let default_value_id = acp::SessionConfigValueId::new(default_value.clone());
425 let session_id = session_id.clone();
426 let config_id_clone = config_id.clone();
427 let config_opts = config_options.clone();
428 let conn = self.connection.clone();
429 async move |_| {
430 let result = conn
431 .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
432 session_id,
433 config_id_clone.clone(),
434 default_value_id,
435 ))
436 .await
437 .log_err();
438
439 if result.is_none() {
440 if let Some(initial) = initial_value {
441 let mut opts = config_opts.borrow_mut();
442 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id_clone) {
443 if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
444 select.current_value = initial;
445 }
446 }
447 }
448 }
449 }
450 })
451 .detach();
452
453 let mut opts = config_options.borrow_mut();
454 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id) {
455 if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
456 select.current_value = acp::SessionConfigValueId::new(default_value);
457 }
458 }
459 }
460 }
461}
462
463impl Drop for AcpConnection {
464 fn drop(&mut self) {
465 self.child.kill().log_err();
466 }
467}
468
469impl AgentConnection for AcpConnection {
470 fn agent_id(&self) -> AgentId {
471 self.id.clone()
472 }
473
474 fn telemetry_id(&self) -> SharedString {
475 self.telemetry_id.clone()
476 }
477
478 fn new_session(
479 self: Rc<Self>,
480 project: Entity<Project>,
481 work_dirs: PathList,
482 cx: &mut App,
483 ) -> Task<Result<Entity<AcpThread>>> {
484 // TODO: remove this once ACP supports multiple working directories
485 let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
486 return Task::ready(Err(anyhow!("Working directory cannot be empty")));
487 };
488 let name = self.id.0.clone();
489 let mcp_servers = mcp_servers_for_project(&project, cx);
490
491 cx.spawn(async move |cx| {
492 let response = self.connection
493 .new_session(acp::NewSessionRequest::new(cwd.clone()).mcp_servers(mcp_servers))
494 .await
495 .map_err(map_acp_error)?;
496
497 let (modes, models, config_options) = config_state(response.modes, response.models, response.config_options);
498
499 if let Some(default_mode) = self.default_mode.clone() {
500 if let Some(modes) = modes.as_ref() {
501 let mut modes_ref = modes.borrow_mut();
502 let has_mode = modes_ref.available_modes.iter().any(|mode| mode.id == default_mode);
503
504 if has_mode {
505 let initial_mode_id = modes_ref.current_mode_id.clone();
506
507 cx.spawn({
508 let default_mode = default_mode.clone();
509 let session_id = response.session_id.clone();
510 let modes = modes.clone();
511 let conn = self.connection.clone();
512 async move |_| {
513 let result = conn.set_session_mode(acp::SetSessionModeRequest::new(session_id, default_mode))
514 .await.log_err();
515
516 if result.is_none() {
517 modes.borrow_mut().current_mode_id = initial_mode_id;
518 }
519 }
520 }).detach();
521
522 modes_ref.current_mode_id = default_mode;
523 } else {
524 let available_modes = modes_ref
525 .available_modes
526 .iter()
527 .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
528 .collect::<Vec<_>>()
529 .join("\n");
530
531 log::warn!(
532 "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
533 );
534 }
535 }
536 }
537
538 if let Some(default_model) = self.default_model.clone() {
539 if let Some(models) = models.as_ref() {
540 let mut models_ref = models.borrow_mut();
541 let has_model = models_ref.available_models.iter().any(|model| model.model_id == default_model);
542
543 if has_model {
544 let initial_model_id = models_ref.current_model_id.clone();
545
546 cx.spawn({
547 let default_model = default_model.clone();
548 let session_id = response.session_id.clone();
549 let models = models.clone();
550 let conn = self.connection.clone();
551 async move |_| {
552 let result = conn.set_session_model(acp::SetSessionModelRequest::new(session_id, default_model))
553 .await.log_err();
554
555 if result.is_none() {
556 models.borrow_mut().current_model_id = initial_model_id;
557 }
558 }
559 }).detach();
560
561 models_ref.current_model_id = default_model;
562 } else {
563 let available_models = models_ref
564 .available_models
565 .iter()
566 .map(|model| format!("- `{}`: {}", model.model_id, model.name))
567 .collect::<Vec<_>>()
568 .join("\n");
569
570 log::warn!(
571 "`{default_model}` is not a valid {name} model. Available options:\n{available_models}",
572 );
573 }
574 }
575 }
576
577 if let Some(config_opts) = config_options.as_ref() {
578 self.apply_default_config_options(&response.session_id, config_opts, cx);
579 }
580
581 let action_log = cx.new(|_| ActionLog::new(project.clone()));
582 let thread: Entity<AcpThread> = cx.new(|cx| {
583 AcpThread::new(
584 None,
585 self.display_name.clone(),
586 Some(work_dirs),
587 self.clone(),
588 project,
589 action_log,
590 response.session_id.clone(),
591 // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
592 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
593 cx,
594 )
595 });
596
597 self.sessions.borrow_mut().insert(
598 response.session_id,
599 AcpSession {
600 thread: thread.downgrade(),
601 suppress_abort_err: false,
602 session_modes: modes,
603 models,
604 config_options: config_options.map(ConfigOptions::new),
605 },
606 );
607
608 Ok(thread)
609 })
610 }
611
612 fn supports_load_session(&self) -> bool {
613 self.agent_capabilities.load_session
614 }
615
616 fn supports_resume_session(&self) -> bool {
617 self.agent_capabilities
618 .session_capabilities
619 .resume
620 .is_some()
621 }
622
623 fn load_session(
624 self: Rc<Self>,
625 session_id: acp::SessionId,
626 project: Entity<Project>,
627 work_dirs: PathList,
628 title: Option<SharedString>,
629 cx: &mut App,
630 ) -> Task<Result<Entity<AcpThread>>> {
631 if !self.agent_capabilities.load_session {
632 return Task::ready(Err(anyhow!(LoadError::Other(
633 "Loading sessions is not supported by this agent.".into()
634 ))));
635 }
636 // TODO: remove this once ACP supports multiple working directories
637 let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
638 return Task::ready(Err(anyhow!("Working directory cannot be empty")));
639 };
640
641 let mcp_servers = mcp_servers_for_project(&project, cx);
642 let action_log = cx.new(|_| ActionLog::new(project.clone()));
643 let title = title.unwrap_or_else(|| self.display_name.clone());
644 let thread: Entity<AcpThread> = cx.new(|cx| {
645 AcpThread::new(
646 None,
647 title,
648 Some(work_dirs.clone()),
649 self.clone(),
650 project,
651 action_log,
652 session_id.clone(),
653 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
654 cx,
655 )
656 });
657
658 self.sessions.borrow_mut().insert(
659 session_id.clone(),
660 AcpSession {
661 thread: thread.downgrade(),
662 suppress_abort_err: false,
663 session_modes: None,
664 models: None,
665 config_options: None,
666 },
667 );
668
669 cx.spawn(async move |cx| {
670 let response = match self
671 .connection
672 .load_session(
673 acp::LoadSessionRequest::new(session_id.clone(), cwd).mcp_servers(mcp_servers),
674 )
675 .await
676 {
677 Ok(response) => response,
678 Err(err) => {
679 self.sessions.borrow_mut().remove(&session_id);
680 return Err(map_acp_error(err));
681 }
682 };
683
684 let (modes, models, config_options) =
685 config_state(response.modes, response.models, response.config_options);
686
687 if let Some(config_opts) = config_options.as_ref() {
688 self.apply_default_config_options(&session_id, config_opts, cx);
689 }
690
691 if let Some(session) = self.sessions.borrow_mut().get_mut(&session_id) {
692 session.session_modes = modes;
693 session.models = models;
694 session.config_options = config_options.map(ConfigOptions::new);
695 }
696
697 Ok(thread)
698 })
699 }
700
701 fn resume_session(
702 self: Rc<Self>,
703 session_id: acp::SessionId,
704 project: Entity<Project>,
705 work_dirs: PathList,
706 title: Option<SharedString>,
707 cx: &mut App,
708 ) -> Task<Result<Entity<AcpThread>>> {
709 if self
710 .agent_capabilities
711 .session_capabilities
712 .resume
713 .is_none()
714 {
715 return Task::ready(Err(anyhow!(LoadError::Other(
716 "Resuming sessions is not supported by this agent.".into()
717 ))));
718 }
719 // TODO: remove this once ACP supports multiple working directories
720 let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
721 return Task::ready(Err(anyhow!("Working directory cannot be empty")));
722 };
723
724 let mcp_servers = mcp_servers_for_project(&project, cx);
725 let action_log = cx.new(|_| ActionLog::new(project.clone()));
726 let title = title.unwrap_or_else(|| self.display_name.clone());
727 let thread: Entity<AcpThread> = cx.new(|cx| {
728 AcpThread::new(
729 None,
730 title,
731 Some(work_dirs),
732 self.clone(),
733 project,
734 action_log,
735 session_id.clone(),
736 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
737 cx,
738 )
739 });
740
741 self.sessions.borrow_mut().insert(
742 session_id.clone(),
743 AcpSession {
744 thread: thread.downgrade(),
745 suppress_abort_err: false,
746 session_modes: None,
747 models: None,
748 config_options: None,
749 },
750 );
751
752 cx.spawn(async move |cx| {
753 let response = match self
754 .connection
755 .resume_session(
756 acp::ResumeSessionRequest::new(session_id.clone(), cwd)
757 .mcp_servers(mcp_servers),
758 )
759 .await
760 {
761 Ok(response) => response,
762 Err(err) => {
763 self.sessions.borrow_mut().remove(&session_id);
764 return Err(map_acp_error(err));
765 }
766 };
767
768 let (modes, models, config_options) =
769 config_state(response.modes, response.models, response.config_options);
770
771 if let Some(config_opts) = config_options.as_ref() {
772 self.apply_default_config_options(&session_id, config_opts, cx);
773 }
774
775 if let Some(session) = self.sessions.borrow_mut().get_mut(&session_id) {
776 session.session_modes = modes;
777 session.models = models;
778 session.config_options = config_options.map(ConfigOptions::new);
779 }
780
781 Ok(thread)
782 })
783 }
784
785 fn supports_close_session(&self) -> bool {
786 self.agent_capabilities.session_capabilities.close.is_some()
787 }
788
789 fn close_session(
790 self: Rc<Self>,
791 session_id: &acp::SessionId,
792 cx: &mut App,
793 ) -> Task<Result<()>> {
794 if !self.supports_close_session() {
795 return Task::ready(Err(anyhow!(LoadError::Other(
796 "Closing sessions is not supported by this agent.".into()
797 ))));
798 }
799
800 let conn = self.connection.clone();
801 let session_id = session_id.clone();
802 cx.foreground_executor().spawn(async move {
803 conn.close_session(acp::CloseSessionRequest::new(session_id.clone()))
804 .await?;
805 self.sessions.borrow_mut().remove(&session_id);
806 Ok(())
807 })
808 }
809
810 fn auth_methods(&self) -> &[acp::AuthMethod] {
811 &self.auth_methods
812 }
813
814 fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
815 let conn = self.connection.clone();
816 cx.foreground_executor().spawn(async move {
817 conn.authenticate(acp::AuthenticateRequest::new(method_id))
818 .await?;
819 Ok(())
820 })
821 }
822
823 fn prompt(
824 &self,
825 _id: Option<acp_thread::UserMessageId>,
826 params: acp::PromptRequest,
827 cx: &mut App,
828 ) -> Task<Result<acp::PromptResponse>> {
829 let conn = self.connection.clone();
830 let sessions = self.sessions.clone();
831 let session_id = params.session_id.clone();
832 cx.foreground_executor().spawn(async move {
833 let result = conn.prompt(params).await;
834
835 let mut suppress_abort_err = false;
836
837 if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
838 suppress_abort_err = session.suppress_abort_err;
839 session.suppress_abort_err = false;
840 }
841
842 match result {
843 Ok(response) => Ok(response),
844 Err(err) => {
845 if err.code == acp::ErrorCode::AuthRequired {
846 return Err(anyhow!(acp::Error::auth_required()));
847 }
848
849 if err.code != ErrorCode::InternalError {
850 anyhow::bail!(err)
851 }
852
853 let Some(data) = &err.data else {
854 anyhow::bail!(err)
855 };
856
857 // Temporary workaround until the following PR is generally available:
858 // https://github.com/google-gemini/gemini-cli/pull/6656
859
860 #[derive(Deserialize)]
861 #[serde(deny_unknown_fields)]
862 struct ErrorDetails {
863 details: Box<str>,
864 }
865
866 match serde_json::from_value(data.clone()) {
867 Ok(ErrorDetails { details }) => {
868 if suppress_abort_err
869 && (details.contains("This operation was aborted")
870 || details.contains("The user aborted a request"))
871 {
872 Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
873 } else {
874 Err(anyhow!(details))
875 }
876 }
877 Err(_) => Err(anyhow!(err)),
878 }
879 }
880 }
881 })
882 }
883
884 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
885 if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
886 session.suppress_abort_err = true;
887 }
888 let conn = self.connection.clone();
889 let params = acp::CancelNotification::new(session_id.clone());
890 cx.foreground_executor()
891 .spawn(async move { conn.cancel(params).await })
892 .detach();
893 }
894
895 fn session_modes(
896 &self,
897 session_id: &acp::SessionId,
898 _cx: &App,
899 ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
900 let sessions = self.sessions.clone();
901 let sessions_ref = sessions.borrow();
902 let Some(session) = sessions_ref.get(session_id) else {
903 return None;
904 };
905
906 if let Some(modes) = session.session_modes.as_ref() {
907 Some(Rc::new(AcpSessionModes {
908 connection: self.connection.clone(),
909 session_id: session_id.clone(),
910 state: modes.clone(),
911 }) as _)
912 } else {
913 None
914 }
915 }
916
917 fn model_selector(
918 &self,
919 session_id: &acp::SessionId,
920 ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
921 let sessions = self.sessions.clone();
922 let sessions_ref = sessions.borrow();
923 let Some(session) = sessions_ref.get(session_id) else {
924 return None;
925 };
926
927 if let Some(models) = session.models.as_ref() {
928 Some(Rc::new(AcpModelSelector::new(
929 session_id.clone(),
930 self.connection.clone(),
931 models.clone(),
932 )) as _)
933 } else {
934 None
935 }
936 }
937
938 fn session_config_options(
939 &self,
940 session_id: &acp::SessionId,
941 _cx: &App,
942 ) -> Option<Rc<dyn acp_thread::AgentSessionConfigOptions>> {
943 let sessions = self.sessions.borrow();
944 let session = sessions.get(session_id)?;
945
946 let config_opts = session.config_options.as_ref()?;
947
948 Some(Rc::new(AcpSessionConfigOptions {
949 session_id: session_id.clone(),
950 connection: self.connection.clone(),
951 state: config_opts.config_options.clone(),
952 watch_tx: config_opts.tx.clone(),
953 watch_rx: config_opts.rx.clone(),
954 }) as _)
955 }
956
957 fn session_list(&self, _cx: &mut App) -> Option<Rc<dyn AgentSessionList>> {
958 self.session_list.clone().map(|s| s as _)
959 }
960
961 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
962 self
963 }
964}
965
966fn map_acp_error(err: acp::Error) -> anyhow::Error {
967 if err.code == acp::ErrorCode::AuthRequired {
968 let mut error = AuthRequired::new();
969
970 if err.message != acp::ErrorCode::AuthRequired.to_string() {
971 error = error.with_description(err.message);
972 }
973
974 anyhow!(error)
975 } else {
976 anyhow!(err)
977 }
978}
979
980fn mcp_servers_for_project(project: &Entity<Project>, cx: &App) -> Vec<acp::McpServer> {
981 let context_server_store = project.read(cx).context_server_store().read(cx);
982 let is_local = project.read(cx).is_local();
983 context_server_store
984 .configured_server_ids()
985 .iter()
986 .filter_map(|id| {
987 let configuration = context_server_store.configuration_for_server(id)?;
988 match &*configuration {
989 project::context_server_store::ContextServerConfiguration::Custom {
990 command,
991 remote,
992 ..
993 }
994 | project::context_server_store::ContextServerConfiguration::Extension {
995 command,
996 remote,
997 ..
998 } if is_local || *remote => Some(acp::McpServer::Stdio(
999 acp::McpServerStdio::new(id.0.to_string(), &command.path)
1000 .args(command.args.clone())
1001 .env(if let Some(env) = command.env.as_ref() {
1002 env.iter()
1003 .map(|(name, value)| acp::EnvVariable::new(name, value))
1004 .collect()
1005 } else {
1006 vec![]
1007 }),
1008 )),
1009 project::context_server_store::ContextServerConfiguration::Http {
1010 url,
1011 headers,
1012 timeout: _,
1013 } => Some(acp::McpServer::Http(
1014 acp::McpServerHttp::new(id.0.to_string(), url.to_string()).headers(
1015 headers
1016 .iter()
1017 .map(|(name, value)| acp::HttpHeader::new(name, value))
1018 .collect(),
1019 ),
1020 )),
1021 _ => None,
1022 }
1023 })
1024 .collect()
1025}
1026
1027fn config_state(
1028 modes: Option<acp::SessionModeState>,
1029 models: Option<acp::SessionModelState>,
1030 config_options: Option<Vec<acp::SessionConfigOption>>,
1031) -> (
1032 Option<Rc<RefCell<acp::SessionModeState>>>,
1033 Option<Rc<RefCell<acp::SessionModelState>>>,
1034 Option<Rc<RefCell<Vec<acp::SessionConfigOption>>>>,
1035) {
1036 if let Some(opts) = config_options {
1037 return (None, None, Some(Rc::new(RefCell::new(opts))));
1038 }
1039
1040 let modes = modes.map(|modes| Rc::new(RefCell::new(modes)));
1041 let models = models.map(|models| Rc::new(RefCell::new(models)));
1042 (modes, models, None)
1043}
1044
1045struct AcpSessionModes {
1046 session_id: acp::SessionId,
1047 connection: Rc<acp::ClientSideConnection>,
1048 state: Rc<RefCell<acp::SessionModeState>>,
1049}
1050
1051impl acp_thread::AgentSessionModes for AcpSessionModes {
1052 fn current_mode(&self) -> acp::SessionModeId {
1053 self.state.borrow().current_mode_id.clone()
1054 }
1055
1056 fn all_modes(&self) -> Vec<acp::SessionMode> {
1057 self.state.borrow().available_modes.clone()
1058 }
1059
1060 fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
1061 let connection = self.connection.clone();
1062 let session_id = self.session_id.clone();
1063 let old_mode_id;
1064 {
1065 let mut state = self.state.borrow_mut();
1066 old_mode_id = state.current_mode_id.clone();
1067 state.current_mode_id = mode_id.clone();
1068 };
1069 let state = self.state.clone();
1070 cx.foreground_executor().spawn(async move {
1071 let result = connection
1072 .set_session_mode(acp::SetSessionModeRequest::new(session_id, mode_id))
1073 .await;
1074
1075 if result.is_err() {
1076 state.borrow_mut().current_mode_id = old_mode_id;
1077 }
1078
1079 result?;
1080
1081 Ok(())
1082 })
1083 }
1084}
1085
1086struct AcpModelSelector {
1087 session_id: acp::SessionId,
1088 connection: Rc<acp::ClientSideConnection>,
1089 state: Rc<RefCell<acp::SessionModelState>>,
1090}
1091
1092impl AcpModelSelector {
1093 fn new(
1094 session_id: acp::SessionId,
1095 connection: Rc<acp::ClientSideConnection>,
1096 state: Rc<RefCell<acp::SessionModelState>>,
1097 ) -> Self {
1098 Self {
1099 session_id,
1100 connection,
1101 state,
1102 }
1103 }
1104}
1105
1106impl acp_thread::AgentModelSelector for AcpModelSelector {
1107 fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
1108 Task::ready(Ok(acp_thread::AgentModelList::Flat(
1109 self.state
1110 .borrow()
1111 .available_models
1112 .clone()
1113 .into_iter()
1114 .map(acp_thread::AgentModelInfo::from)
1115 .collect(),
1116 )))
1117 }
1118
1119 fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
1120 let connection = self.connection.clone();
1121 let session_id = self.session_id.clone();
1122 let old_model_id;
1123 {
1124 let mut state = self.state.borrow_mut();
1125 old_model_id = state.current_model_id.clone();
1126 state.current_model_id = model_id.clone();
1127 };
1128 let state = self.state.clone();
1129 cx.foreground_executor().spawn(async move {
1130 let result = connection
1131 .set_session_model(acp::SetSessionModelRequest::new(session_id, model_id))
1132 .await;
1133
1134 if result.is_err() {
1135 state.borrow_mut().current_model_id = old_model_id;
1136 }
1137
1138 result?;
1139
1140 Ok(())
1141 })
1142 }
1143
1144 fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
1145 let state = self.state.borrow();
1146 Task::ready(
1147 state
1148 .available_models
1149 .iter()
1150 .find(|m| m.model_id == state.current_model_id)
1151 .cloned()
1152 .map(acp_thread::AgentModelInfo::from)
1153 .ok_or_else(|| anyhow::anyhow!("Model not found")),
1154 )
1155 }
1156}
1157
1158struct AcpSessionConfigOptions {
1159 session_id: acp::SessionId,
1160 connection: Rc<acp::ClientSideConnection>,
1161 state: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
1162 watch_tx: Rc<RefCell<watch::Sender<()>>>,
1163 watch_rx: watch::Receiver<()>,
1164}
1165
1166impl acp_thread::AgentSessionConfigOptions for AcpSessionConfigOptions {
1167 fn config_options(&self) -> Vec<acp::SessionConfigOption> {
1168 self.state.borrow().clone()
1169 }
1170
1171 fn set_config_option(
1172 &self,
1173 config_id: acp::SessionConfigId,
1174 value: acp::SessionConfigValueId,
1175 cx: &mut App,
1176 ) -> Task<Result<Vec<acp::SessionConfigOption>>> {
1177 let connection = self.connection.clone();
1178 let session_id = self.session_id.clone();
1179 let state = self.state.clone();
1180
1181 let watch_tx = self.watch_tx.clone();
1182
1183 cx.foreground_executor().spawn(async move {
1184 let response = connection
1185 .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
1186 session_id, config_id, value,
1187 ))
1188 .await?;
1189
1190 *state.borrow_mut() = response.config_options.clone();
1191 watch_tx.borrow_mut().send(()).ok();
1192 Ok(response.config_options)
1193 })
1194 }
1195
1196 fn watch(&self, _cx: &mut App) -> Option<watch::Receiver<()>> {
1197 Some(self.watch_rx.clone())
1198 }
1199}
1200
1201struct ClientDelegate {
1202 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
1203 session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>>,
1204 cx: AsyncApp,
1205}
1206
1207#[async_trait::async_trait(?Send)]
1208impl acp::Client for ClientDelegate {
1209 async fn request_permission(
1210 &self,
1211 arguments: acp::RequestPermissionRequest,
1212 ) -> Result<acp::RequestPermissionResponse, acp::Error> {
1213 let thread;
1214 {
1215 let sessions_ref = self.sessions.borrow();
1216 let session = sessions_ref
1217 .get(&arguments.session_id)
1218 .context("Failed to get session")?;
1219 thread = session.thread.clone();
1220 }
1221
1222 let cx = &mut self.cx.clone();
1223
1224 let task = thread.update(cx, |thread, cx| {
1225 thread.request_tool_call_authorization(
1226 arguments.tool_call,
1227 acp_thread::PermissionOptions::Flat(arguments.options),
1228 cx,
1229 )
1230 })??;
1231
1232 let outcome = task.await;
1233
1234 Ok(acp::RequestPermissionResponse::new(outcome))
1235 }
1236
1237 async fn write_text_file(
1238 &self,
1239 arguments: acp::WriteTextFileRequest,
1240 ) -> Result<acp::WriteTextFileResponse, acp::Error> {
1241 let cx = &mut self.cx.clone();
1242 let task = self
1243 .session_thread(&arguments.session_id)?
1244 .update(cx, |thread, cx| {
1245 thread.write_text_file(arguments.path, arguments.content, cx)
1246 })?;
1247
1248 task.await?;
1249
1250 Ok(Default::default())
1251 }
1252
1253 async fn read_text_file(
1254 &self,
1255 arguments: acp::ReadTextFileRequest,
1256 ) -> Result<acp::ReadTextFileResponse, acp::Error> {
1257 let task = self.session_thread(&arguments.session_id)?.update(
1258 &mut self.cx.clone(),
1259 |thread, cx| {
1260 thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx)
1261 },
1262 )?;
1263
1264 let content = task.await?;
1265
1266 Ok(acp::ReadTextFileResponse::new(content))
1267 }
1268
1269 async fn session_notification(
1270 &self,
1271 notification: acp::SessionNotification,
1272 ) -> Result<(), acp::Error> {
1273 let sessions = self.sessions.borrow();
1274 let session = sessions
1275 .get(¬ification.session_id)
1276 .context("Failed to get session")?;
1277
1278 if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1279 current_mode_id,
1280 ..
1281 }) = ¬ification.update
1282 {
1283 if let Some(session_modes) = &session.session_modes {
1284 session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
1285 }
1286 }
1287
1288 if let acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1289 config_options,
1290 ..
1291 }) = ¬ification.update
1292 {
1293 if let Some(opts) = &session.config_options {
1294 *opts.config_options.borrow_mut() = config_options.clone();
1295 opts.tx.borrow_mut().send(()).ok();
1296 }
1297 }
1298
1299 if let acp::SessionUpdate::SessionInfoUpdate(info_update) = ¬ification.update
1300 && let Some(session_list) = self.session_list.borrow().as_ref()
1301 {
1302 session_list.send_info_update(notification.session_id.clone(), info_update.clone());
1303 }
1304
1305 // Clone so we can inspect meta both before and after handing off to the thread
1306 let update_clone = notification.update.clone();
1307
1308 // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
1309 if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
1310 if let Some(meta) = &tc.meta {
1311 if let Some(terminal_info) = meta.get("terminal_info") {
1312 if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str())
1313 {
1314 let terminal_id = acp::TerminalId::new(id_str);
1315 let cwd = terminal_info
1316 .get("cwd")
1317 .and_then(|v| v.as_str().map(PathBuf::from));
1318
1319 // Create a minimal display-only lower-level terminal and register it.
1320 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1321 let builder = TerminalBuilder::new_display_only(
1322 CursorShape::default(),
1323 AlternateScroll::On,
1324 None,
1325 0,
1326 cx.background_executor(),
1327 thread.project().read(cx).path_style(cx),
1328 )?;
1329 let lower = cx.new(|cx| builder.subscribe(cx));
1330 thread.on_terminal_provider_event(
1331 TerminalProviderEvent::Created {
1332 terminal_id,
1333 label: tc.title.clone(),
1334 cwd,
1335 output_byte_limit: None,
1336 terminal: lower,
1337 },
1338 cx,
1339 );
1340 anyhow::Ok(())
1341 });
1342 }
1343 }
1344 }
1345 }
1346
1347 // Forward the update to the acp_thread as usual.
1348 session.thread.update(&mut self.cx.clone(), |thread, cx| {
1349 thread.handle_session_update(notification.update.clone(), cx)
1350 })??;
1351
1352 // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
1353 if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone {
1354 if let Some(meta) = &tcu.meta {
1355 if let Some(term_out) = meta.get("terminal_output") {
1356 if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
1357 let terminal_id = acp::TerminalId::new(id_str);
1358 if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
1359 let data = s.as_bytes().to_vec();
1360 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1361 thread.on_terminal_provider_event(
1362 TerminalProviderEvent::Output { terminal_id, data },
1363 cx,
1364 );
1365 });
1366 }
1367 }
1368 }
1369
1370 // terminal_exit
1371 if let Some(term_exit) = meta.get("terminal_exit") {
1372 if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
1373 let terminal_id = acp::TerminalId::new(id_str);
1374 let status = acp::TerminalExitStatus::new()
1375 .exit_code(
1376 term_exit
1377 .get("exit_code")
1378 .and_then(|v| v.as_u64())
1379 .map(|i| i as u32),
1380 )
1381 .signal(
1382 term_exit
1383 .get("signal")
1384 .and_then(|v| v.as_str().map(|s| s.to_string())),
1385 );
1386
1387 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1388 thread.on_terminal_provider_event(
1389 TerminalProviderEvent::Exit {
1390 terminal_id,
1391 status,
1392 },
1393 cx,
1394 );
1395 });
1396 }
1397 }
1398 }
1399 }
1400
1401 Ok(())
1402 }
1403
1404 async fn create_terminal(
1405 &self,
1406 args: acp::CreateTerminalRequest,
1407 ) -> Result<acp::CreateTerminalResponse, acp::Error> {
1408 let thread = self.session_thread(&args.session_id)?;
1409 let project = thread.read_with(&self.cx, |thread, _cx| thread.project().clone())?;
1410
1411 let terminal_entity = acp_thread::create_terminal_entity(
1412 args.command.clone(),
1413 &args.args,
1414 args.env
1415 .into_iter()
1416 .map(|env| (env.name, env.value))
1417 .collect(),
1418 args.cwd.clone(),
1419 &project,
1420 &mut self.cx.clone(),
1421 )
1422 .await?;
1423
1424 // Register with renderer
1425 let terminal_entity = thread.update(&mut self.cx.clone(), |thread, cx| {
1426 thread.register_terminal_created(
1427 acp::TerminalId::new(uuid::Uuid::new_v4().to_string()),
1428 format!("{} {}", args.command, args.args.join(" ")),
1429 args.cwd.clone(),
1430 args.output_byte_limit,
1431 terminal_entity,
1432 cx,
1433 )
1434 })?;
1435 let terminal_id = terminal_entity.read_with(&self.cx, |terminal, _| terminal.id().clone());
1436 Ok(acp::CreateTerminalResponse::new(terminal_id))
1437 }
1438
1439 async fn kill_terminal(
1440 &self,
1441 args: acp::KillTerminalRequest,
1442 ) -> Result<acp::KillTerminalResponse, acp::Error> {
1443 self.session_thread(&args.session_id)?
1444 .update(&mut self.cx.clone(), |thread, cx| {
1445 thread.kill_terminal(args.terminal_id, cx)
1446 })??;
1447
1448 Ok(Default::default())
1449 }
1450
1451 async fn ext_method(&self, _args: acp::ExtRequest) -> Result<acp::ExtResponse, acp::Error> {
1452 Err(acp::Error::method_not_found())
1453 }
1454
1455 async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> {
1456 Err(acp::Error::method_not_found())
1457 }
1458
1459 async fn release_terminal(
1460 &self,
1461 args: acp::ReleaseTerminalRequest,
1462 ) -> Result<acp::ReleaseTerminalResponse, acp::Error> {
1463 self.session_thread(&args.session_id)?
1464 .update(&mut self.cx.clone(), |thread, cx| {
1465 thread.release_terminal(args.terminal_id, cx)
1466 })??;
1467
1468 Ok(Default::default())
1469 }
1470
1471 async fn terminal_output(
1472 &self,
1473 args: acp::TerminalOutputRequest,
1474 ) -> Result<acp::TerminalOutputResponse, acp::Error> {
1475 self.session_thread(&args.session_id)?
1476 .read_with(&mut self.cx.clone(), |thread, cx| {
1477 let out = thread
1478 .terminal(args.terminal_id)?
1479 .read(cx)
1480 .current_output(cx);
1481
1482 Ok(out)
1483 })?
1484 }
1485
1486 async fn wait_for_terminal_exit(
1487 &self,
1488 args: acp::WaitForTerminalExitRequest,
1489 ) -> Result<acp::WaitForTerminalExitResponse, acp::Error> {
1490 let exit_status = self
1491 .session_thread(&args.session_id)?
1492 .update(&mut self.cx.clone(), |thread, cx| {
1493 anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
1494 })??
1495 .await;
1496
1497 Ok(acp::WaitForTerminalExitResponse::new(exit_status))
1498 }
1499}
1500
1501impl ClientDelegate {
1502 fn session_thread(&self, session_id: &acp::SessionId) -> Result<WeakEntity<AcpThread>> {
1503 let sessions = self.sessions.borrow();
1504 sessions
1505 .get(session_id)
1506 .context("Failed to get session")
1507 .map(|session| session.thread.clone())
1508 }
1509}