1use acp_thread::{
2 AgentConnection, AgentSessionInfo, AgentSessionList, AgentSessionListRequest,
3 AgentSessionListResponse,
4};
5use acp_tools::AcpConnectionRegistry;
6use action_log::ActionLog;
7use agent_client_protocol::{self as acp, Agent as _, ErrorCode};
8use anyhow::anyhow;
9use collections::HashMap;
10use futures::AsyncBufReadExt as _;
11use futures::io::BufReader;
12use project::agent_server_store::AgentServerCommand;
13use project::{AgentId, Project};
14use serde::Deserialize;
15use settings::Settings as _;
16use task::ShellBuilder;
17use util::ResultExt as _;
18use util::path_list::PathList;
19use util::process::Child;
20
21use std::path::PathBuf;
22use std::process::Stdio;
23use std::rc::Rc;
24use std::{any::Any, cell::RefCell};
25use thiserror::Error;
26
27use anyhow::{Context as _, Result};
28use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
29
30use acp_thread::{AcpThread, AuthRequired, LoadError, TerminalProviderEvent};
31use terminal::TerminalBuilder;
32use terminal::terminal_settings::{AlternateScroll, CursorShape, TerminalSettings};
33
34use crate::GEMINI_ID;
35
36#[derive(Debug, Error)]
37#[error("Unsupported version")]
38pub struct UnsupportedVersion;
39
40pub struct AcpConnection {
41 id: AgentId,
42 display_name: SharedString,
43 telemetry_id: SharedString,
44 connection: Rc<acp::ClientSideConnection>,
45 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
46 auth_methods: Vec<acp::AuthMethod>,
47 agent_capabilities: acp::AgentCapabilities,
48 default_mode: Option<acp::SessionModeId>,
49 default_model: Option<acp::ModelId>,
50 default_config_options: HashMap<String, String>,
51 child: Child,
52 session_list: Option<Rc<AcpSessionList>>,
53 _io_task: Task<Result<(), acp::Error>>,
54 _wait_task: Task<Result<()>>,
55 _stderr_task: Task<Result<()>>,
56}
57
58struct ConfigOptions {
59 config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
60 tx: Rc<RefCell<watch::Sender<()>>>,
61 rx: watch::Receiver<()>,
62}
63
64impl ConfigOptions {
65 fn new(config_options: Rc<RefCell<Vec<acp::SessionConfigOption>>>) -> Self {
66 let (tx, rx) = watch::channel(());
67 Self {
68 config_options,
69 tx: Rc::new(RefCell::new(tx)),
70 rx,
71 }
72 }
73}
74
75pub struct AcpSession {
76 thread: WeakEntity<AcpThread>,
77 suppress_abort_err: bool,
78 models: Option<Rc<RefCell<acp::SessionModelState>>>,
79 session_modes: Option<Rc<RefCell<acp::SessionModeState>>>,
80 config_options: Option<ConfigOptions>,
81}
82
83pub struct AcpSessionList {
84 connection: Rc<acp::ClientSideConnection>,
85 updates_tx: smol::channel::Sender<acp_thread::SessionListUpdate>,
86 updates_rx: smol::channel::Receiver<acp_thread::SessionListUpdate>,
87}
88
89impl AcpSessionList {
90 fn new(connection: Rc<acp::ClientSideConnection>) -> Self {
91 let (tx, rx) = smol::channel::unbounded();
92 Self {
93 connection,
94 updates_tx: tx,
95 updates_rx: rx,
96 }
97 }
98
99 fn notify_update(&self) {
100 self.updates_tx
101 .try_send(acp_thread::SessionListUpdate::Refresh)
102 .log_err();
103 }
104
105 fn send_info_update(&self, session_id: acp::SessionId, update: acp::SessionInfoUpdate) {
106 self.updates_tx
107 .try_send(acp_thread::SessionListUpdate::SessionInfo { session_id, update })
108 .log_err();
109 }
110}
111
112impl AgentSessionList for AcpSessionList {
113 fn list_sessions(
114 &self,
115 request: AgentSessionListRequest,
116 cx: &mut App,
117 ) -> Task<Result<AgentSessionListResponse>> {
118 let conn = self.connection.clone();
119 cx.foreground_executor().spawn(async move {
120 let acp_request = acp::ListSessionsRequest::new()
121 .cwd(request.cwd)
122 .cursor(request.cursor);
123 let response = conn.list_sessions(acp_request).await?;
124 Ok(AgentSessionListResponse {
125 sessions: response
126 .sessions
127 .into_iter()
128 .map(|s| AgentSessionInfo {
129 session_id: s.session_id,
130 work_dirs: Some(PathList::new(&[s.cwd])),
131 title: s.title.map(Into::into),
132 updated_at: s.updated_at.and_then(|date_str| {
133 chrono::DateTime::parse_from_rfc3339(&date_str)
134 .ok()
135 .map(|dt| dt.with_timezone(&chrono::Utc))
136 }),
137 created_at: None,
138 meta: s.meta,
139 })
140 .collect(),
141 next_cursor: response.next_cursor,
142 meta: response.meta,
143 })
144 })
145 }
146
147 fn watch(
148 &self,
149 _cx: &mut App,
150 ) -> Option<smol::channel::Receiver<acp_thread::SessionListUpdate>> {
151 Some(self.updates_rx.clone())
152 }
153
154 fn notify_refresh(&self) {
155 self.notify_update();
156 }
157
158 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
159 self
160 }
161}
162
163pub async fn connect(
164 agent_id: AgentId,
165 display_name: SharedString,
166 command: AgentServerCommand,
167 default_mode: Option<acp::SessionModeId>,
168 default_model: Option<acp::ModelId>,
169 default_config_options: HashMap<String, String>,
170 cx: &mut AsyncApp,
171) -> Result<Rc<dyn AgentConnection>> {
172 let conn = AcpConnection::stdio(
173 agent_id,
174 display_name,
175 command.clone(),
176 default_mode,
177 default_model,
178 default_config_options,
179 cx,
180 )
181 .await?;
182 Ok(Rc::new(conn) as _)
183}
184
185const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::ProtocolVersion::V1;
186
187impl AcpConnection {
188 pub async fn stdio(
189 agent_id: AgentId,
190 display_name: SharedString,
191 command: AgentServerCommand,
192 default_mode: Option<acp::SessionModeId>,
193 default_model: Option<acp::ModelId>,
194 default_config_options: HashMap<String, String>,
195 cx: &mut AsyncApp,
196 ) -> Result<Self> {
197 let shell = cx.update(|cx| TerminalSettings::get(None, cx).shell.clone());
198 let builder = ShellBuilder::new(&shell, cfg!(windows)).non_interactive();
199 let mut child =
200 builder.build_std_command(Some(command.path.display().to_string()), &command.args);
201 child.envs(command.env.iter().flatten());
202 let mut child = Child::spawn(child, Stdio::piped(), Stdio::piped(), Stdio::piped())?;
203
204 let stdout = child.stdout.take().context("Failed to take stdout")?;
205 let stdin = child.stdin.take().context("Failed to take stdin")?;
206 let stderr = child.stderr.take().context("Failed to take stderr")?;
207 log::debug!(
208 "Spawning external agent server: {:?}, {:?}",
209 command.path,
210 command.args
211 );
212 log::trace!("Spawned (pid: {})", child.id());
213
214 let sessions = Rc::new(RefCell::new(HashMap::default()));
215
216 let (release_channel, version): (Option<&str>, String) = cx.update(|cx| {
217 (
218 release_channel::ReleaseChannel::try_global(cx)
219 .map(|release_channel| release_channel.display_name()),
220 release_channel::AppVersion::global(cx).to_string(),
221 )
222 });
223
224 let client_session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>> =
225 Rc::new(RefCell::new(None));
226
227 let client = ClientDelegate {
228 sessions: sessions.clone(),
229 session_list: client_session_list.clone(),
230 cx: cx.clone(),
231 };
232 let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
233 let foreground_executor = cx.foreground_executor().clone();
234 move |fut| {
235 foreground_executor.spawn(fut).detach();
236 }
237 });
238
239 let io_task = cx.background_spawn(io_task);
240
241 let stderr_task = cx.background_spawn(async move {
242 let mut stderr = BufReader::new(stderr);
243 let mut line = String::new();
244 while let Ok(n) = stderr.read_line(&mut line).await
245 && n > 0
246 {
247 log::warn!("agent stderr: {}", line.trim());
248 line.clear();
249 }
250 Ok(())
251 });
252
253 let wait_task = cx.spawn({
254 let sessions = sessions.clone();
255 let status_fut = child.status();
256 async move |cx| {
257 let status = status_fut.await?;
258
259 for session in sessions.borrow().values() {
260 session
261 .thread
262 .update(cx, |thread, cx| {
263 thread.emit_load_error(LoadError::Exited { status }, cx)
264 })
265 .ok();
266 }
267
268 anyhow::Ok(())
269 }
270 });
271
272 let connection = Rc::new(connection);
273
274 cx.update(|cx| {
275 AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
276 registry.set_active_connection(agent_id.clone(), &connection, cx)
277 });
278 });
279
280 let response = connection
281 .initialize(
282 acp::InitializeRequest::new(acp::ProtocolVersion::V1)
283 .client_capabilities(
284 acp::ClientCapabilities::new()
285 .fs(acp::FileSystemCapabilities::new()
286 .read_text_file(true)
287 .write_text_file(true))
288 .terminal(true)
289 // Experimental: Allow for rendering terminal output from the agents
290 .meta(acp::Meta::from_iter([
291 ("terminal_output".into(), true.into()),
292 ("terminal-auth".into(), true.into()),
293 ])),
294 )
295 .client_info(
296 acp::Implementation::new("zed", version)
297 .title(release_channel.map(ToOwned::to_owned)),
298 ),
299 )
300 .await?;
301
302 if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
303 return Err(UnsupportedVersion.into());
304 }
305
306 let telemetry_id = response
307 .agent_info
308 // Use the one the agent provides if we have one
309 .map(|info| info.name.into())
310 // Otherwise, just use the name
311 .unwrap_or_else(|| agent_id.0.to_string().into());
312
313 let session_list = if response
314 .agent_capabilities
315 .session_capabilities
316 .list
317 .is_some()
318 {
319 let list = Rc::new(AcpSessionList::new(connection.clone()));
320 *client_session_list.borrow_mut() = Some(list.clone());
321 Some(list)
322 } else {
323 None
324 };
325
326 // TODO: Remove this override once Google team releases their official auth methods
327 let auth_methods = if agent_id.0.as_ref() == GEMINI_ID {
328 let mut args = command.args.clone();
329 args.retain(|a| a != "--experimental-acp" && a != "--acp");
330 let value = serde_json::json!({
331 "label": "gemini /auth",
332 "command": command.path.to_string_lossy().into_owned(),
333 "args": args,
334 "env": command.env.clone().unwrap_or_default(),
335 });
336 let meta = acp::Meta::from_iter([("terminal-auth".to_string(), value)]);
337 vec![acp::AuthMethod::Agent(
338 acp::AuthMethodAgent::new("spawn-gemini-cli", "Login")
339 .description("Login with your Google or Vertex AI account")
340 .meta(meta),
341 )]
342 } else {
343 response.auth_methods
344 };
345 Ok(Self {
346 id: agent_id,
347 auth_methods,
348 connection,
349 display_name,
350 telemetry_id,
351 sessions,
352 agent_capabilities: response.agent_capabilities,
353 default_mode,
354 default_model,
355 default_config_options,
356 session_list,
357 _io_task: io_task,
358 _wait_task: wait_task,
359 _stderr_task: stderr_task,
360 child,
361 })
362 }
363
364 pub fn prompt_capabilities(&self) -> &acp::PromptCapabilities {
365 &self.agent_capabilities.prompt_capabilities
366 }
367
368 fn apply_default_config_options(
369 &self,
370 session_id: &acp::SessionId,
371 config_options: &Rc<RefCell<Vec<acp::SessionConfigOption>>>,
372 cx: &mut AsyncApp,
373 ) {
374 let id = self.id.clone();
375 let defaults_to_apply: Vec<_> = {
376 let config_opts_ref = config_options.borrow();
377 config_opts_ref
378 .iter()
379 .filter_map(|config_option| {
380 let default_value = self.default_config_options.get(&*config_option.id.0)?;
381
382 let is_valid = match &config_option.kind {
383 acp::SessionConfigKind::Select(select) => match &select.options {
384 acp::SessionConfigSelectOptions::Ungrouped(options) => options
385 .iter()
386 .any(|opt| &*opt.value.0 == default_value.as_str()),
387 acp::SessionConfigSelectOptions::Grouped(groups) => {
388 groups.iter().any(|g| {
389 g.options
390 .iter()
391 .any(|opt| &*opt.value.0 == default_value.as_str())
392 })
393 }
394 _ => false,
395 },
396 _ => false,
397 };
398
399 if is_valid {
400 let initial_value = match &config_option.kind {
401 acp::SessionConfigKind::Select(select) => {
402 Some(select.current_value.clone())
403 }
404 _ => None,
405 };
406 Some((
407 config_option.id.clone(),
408 default_value.clone(),
409 initial_value,
410 ))
411 } else {
412 log::warn!(
413 "`{}` is not a valid value for config option `{}` in {}",
414 default_value,
415 config_option.id.0,
416 id
417 );
418 None
419 }
420 })
421 .collect()
422 };
423
424 for (config_id, default_value, initial_value) in defaults_to_apply {
425 cx.spawn({
426 let default_value_id = acp::SessionConfigValueId::new(default_value.clone());
427 let session_id = session_id.clone();
428 let config_id_clone = config_id.clone();
429 let config_opts = config_options.clone();
430 let conn = self.connection.clone();
431 async move |_| {
432 let result = conn
433 .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
434 session_id,
435 config_id_clone.clone(),
436 default_value_id,
437 ))
438 .await
439 .log_err();
440
441 if result.is_none() {
442 if let Some(initial) = initial_value {
443 let mut opts = config_opts.borrow_mut();
444 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id_clone) {
445 if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
446 select.current_value = initial;
447 }
448 }
449 }
450 }
451 }
452 })
453 .detach();
454
455 let mut opts = config_options.borrow_mut();
456 if let Some(opt) = opts.iter_mut().find(|o| o.id == config_id) {
457 if let acp::SessionConfigKind::Select(select) = &mut opt.kind {
458 select.current_value = acp::SessionConfigValueId::new(default_value);
459 }
460 }
461 }
462 }
463}
464
465impl Drop for AcpConnection {
466 fn drop(&mut self) {
467 self.child.kill().log_err();
468 }
469}
470
471impl AgentConnection for AcpConnection {
472 fn agent_id(&self) -> AgentId {
473 self.id.clone()
474 }
475
476 fn telemetry_id(&self) -> SharedString {
477 self.telemetry_id.clone()
478 }
479
480 fn new_session(
481 self: Rc<Self>,
482 project: Entity<Project>,
483 work_dirs: PathList,
484 cx: &mut App,
485 ) -> Task<Result<Entity<AcpThread>>> {
486 // TODO: remove this once ACP supports multiple working directories
487 let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
488 return Task::ready(Err(anyhow!("Working directory cannot be empty")));
489 };
490 let name = self.id.0.clone();
491 let mcp_servers = mcp_servers_for_project(&project, cx);
492
493 cx.spawn(async move |cx| {
494 let response = self.connection
495 .new_session(acp::NewSessionRequest::new(cwd.clone()).mcp_servers(mcp_servers))
496 .await
497 .map_err(map_acp_error)?;
498
499 let (modes, models, config_options) = config_state(response.modes, response.models, response.config_options);
500
501 if let Some(default_mode) = self.default_mode.clone() {
502 if let Some(modes) = modes.as_ref() {
503 let mut modes_ref = modes.borrow_mut();
504 let has_mode = modes_ref.available_modes.iter().any(|mode| mode.id == default_mode);
505
506 if has_mode {
507 let initial_mode_id = modes_ref.current_mode_id.clone();
508
509 cx.spawn({
510 let default_mode = default_mode.clone();
511 let session_id = response.session_id.clone();
512 let modes = modes.clone();
513 let conn = self.connection.clone();
514 async move |_| {
515 let result = conn.set_session_mode(acp::SetSessionModeRequest::new(session_id, default_mode))
516 .await.log_err();
517
518 if result.is_none() {
519 modes.borrow_mut().current_mode_id = initial_mode_id;
520 }
521 }
522 }).detach();
523
524 modes_ref.current_mode_id = default_mode;
525 } else {
526 let available_modes = modes_ref
527 .available_modes
528 .iter()
529 .map(|mode| format!("- `{}`: {}", mode.id, mode.name))
530 .collect::<Vec<_>>()
531 .join("\n");
532
533 log::warn!(
534 "`{default_mode}` is not valid {name} mode. Available options:\n{available_modes}",
535 );
536 }
537 }
538 }
539
540 if let Some(default_model) = self.default_model.clone() {
541 if let Some(models) = models.as_ref() {
542 let mut models_ref = models.borrow_mut();
543 let has_model = models_ref.available_models.iter().any(|model| model.model_id == default_model);
544
545 if has_model {
546 let initial_model_id = models_ref.current_model_id.clone();
547
548 cx.spawn({
549 let default_model = default_model.clone();
550 let session_id = response.session_id.clone();
551 let models = models.clone();
552 let conn = self.connection.clone();
553 async move |_| {
554 let result = conn.set_session_model(acp::SetSessionModelRequest::new(session_id, default_model))
555 .await.log_err();
556
557 if result.is_none() {
558 models.borrow_mut().current_model_id = initial_model_id;
559 }
560 }
561 }).detach();
562
563 models_ref.current_model_id = default_model;
564 } else {
565 let available_models = models_ref
566 .available_models
567 .iter()
568 .map(|model| format!("- `{}`: {}", model.model_id, model.name))
569 .collect::<Vec<_>>()
570 .join("\n");
571
572 log::warn!(
573 "`{default_model}` is not a valid {name} model. Available options:\n{available_models}",
574 );
575 }
576 }
577 }
578
579 if let Some(config_opts) = config_options.as_ref() {
580 self.apply_default_config_options(&response.session_id, config_opts, cx);
581 }
582
583 let action_log = cx.new(|_| ActionLog::new(project.clone()));
584 let thread: Entity<AcpThread> = cx.new(|cx| {
585 AcpThread::new(
586 None,
587 self.display_name.clone(),
588 Some(work_dirs),
589 self.clone(),
590 project,
591 action_log,
592 response.session_id.clone(),
593 // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
594 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
595 cx,
596 )
597 });
598
599 self.sessions.borrow_mut().insert(
600 response.session_id,
601 AcpSession {
602 thread: thread.downgrade(),
603 suppress_abort_err: false,
604 session_modes: modes,
605 models,
606 config_options: config_options.map(ConfigOptions::new),
607 },
608 );
609
610 Ok(thread)
611 })
612 }
613
614 fn supports_load_session(&self) -> bool {
615 self.agent_capabilities.load_session
616 }
617
618 fn supports_resume_session(&self) -> bool {
619 self.agent_capabilities
620 .session_capabilities
621 .resume
622 .is_some()
623 }
624
625 fn load_session(
626 self: Rc<Self>,
627 session_id: acp::SessionId,
628 project: Entity<Project>,
629 work_dirs: PathList,
630 title: Option<SharedString>,
631 cx: &mut App,
632 ) -> Task<Result<Entity<AcpThread>>> {
633 if !self.agent_capabilities.load_session {
634 return Task::ready(Err(anyhow!(LoadError::Other(
635 "Loading sessions is not supported by this agent.".into()
636 ))));
637 }
638 // TODO: remove this once ACP supports multiple working directories
639 let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
640 return Task::ready(Err(anyhow!("Working directory cannot be empty")));
641 };
642
643 let mcp_servers = mcp_servers_for_project(&project, cx);
644 let action_log = cx.new(|_| ActionLog::new(project.clone()));
645 let title = title.unwrap_or_else(|| self.display_name.clone());
646 let thread: Entity<AcpThread> = cx.new(|cx| {
647 AcpThread::new(
648 None,
649 title,
650 Some(work_dirs.clone()),
651 self.clone(),
652 project,
653 action_log,
654 session_id.clone(),
655 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
656 cx,
657 )
658 });
659
660 self.sessions.borrow_mut().insert(
661 session_id.clone(),
662 AcpSession {
663 thread: thread.downgrade(),
664 suppress_abort_err: false,
665 session_modes: None,
666 models: None,
667 config_options: None,
668 },
669 );
670
671 cx.spawn(async move |cx| {
672 let response = match self
673 .connection
674 .load_session(
675 acp::LoadSessionRequest::new(session_id.clone(), cwd).mcp_servers(mcp_servers),
676 )
677 .await
678 {
679 Ok(response) => response,
680 Err(err) => {
681 self.sessions.borrow_mut().remove(&session_id);
682 return Err(map_acp_error(err));
683 }
684 };
685
686 let (modes, models, config_options) =
687 config_state(response.modes, response.models, response.config_options);
688
689 if let Some(config_opts) = config_options.as_ref() {
690 self.apply_default_config_options(&session_id, config_opts, cx);
691 }
692
693 if let Some(session) = self.sessions.borrow_mut().get_mut(&session_id) {
694 session.session_modes = modes;
695 session.models = models;
696 session.config_options = config_options.map(ConfigOptions::new);
697 }
698
699 Ok(thread)
700 })
701 }
702
703 fn resume_session(
704 self: Rc<Self>,
705 session_id: acp::SessionId,
706 project: Entity<Project>,
707 work_dirs: PathList,
708 title: Option<SharedString>,
709 cx: &mut App,
710 ) -> Task<Result<Entity<AcpThread>>> {
711 if self
712 .agent_capabilities
713 .session_capabilities
714 .resume
715 .is_none()
716 {
717 return Task::ready(Err(anyhow!(LoadError::Other(
718 "Resuming sessions is not supported by this agent.".into()
719 ))));
720 }
721 // TODO: remove this once ACP supports multiple working directories
722 let Some(cwd) = work_dirs.ordered_paths().next().cloned() else {
723 return Task::ready(Err(anyhow!("Working directory cannot be empty")));
724 };
725
726 let mcp_servers = mcp_servers_for_project(&project, cx);
727 let action_log = cx.new(|_| ActionLog::new(project.clone()));
728 let title = title.unwrap_or_else(|| self.display_name.clone());
729 let thread: Entity<AcpThread> = cx.new(|cx| {
730 AcpThread::new(
731 None,
732 title,
733 Some(work_dirs),
734 self.clone(),
735 project,
736 action_log,
737 session_id.clone(),
738 watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
739 cx,
740 )
741 });
742
743 self.sessions.borrow_mut().insert(
744 session_id.clone(),
745 AcpSession {
746 thread: thread.downgrade(),
747 suppress_abort_err: false,
748 session_modes: None,
749 models: None,
750 config_options: None,
751 },
752 );
753
754 cx.spawn(async move |cx| {
755 let response = match self
756 .connection
757 .resume_session(
758 acp::ResumeSessionRequest::new(session_id.clone(), cwd)
759 .mcp_servers(mcp_servers),
760 )
761 .await
762 {
763 Ok(response) => response,
764 Err(err) => {
765 self.sessions.borrow_mut().remove(&session_id);
766 return Err(map_acp_error(err));
767 }
768 };
769
770 let (modes, models, config_options) =
771 config_state(response.modes, response.models, response.config_options);
772
773 if let Some(config_opts) = config_options.as_ref() {
774 self.apply_default_config_options(&session_id, config_opts, cx);
775 }
776
777 if let Some(session) = self.sessions.borrow_mut().get_mut(&session_id) {
778 session.session_modes = modes;
779 session.models = models;
780 session.config_options = config_options.map(ConfigOptions::new);
781 }
782
783 Ok(thread)
784 })
785 }
786
787 fn supports_close_session(&self) -> bool {
788 self.agent_capabilities.session_capabilities.close.is_some()
789 }
790
791 fn close_session(
792 self: Rc<Self>,
793 session_id: &acp::SessionId,
794 cx: &mut App,
795 ) -> Task<Result<()>> {
796 if !self.supports_close_session() {
797 return Task::ready(Err(anyhow!(LoadError::Other(
798 "Closing sessions is not supported by this agent.".into()
799 ))));
800 }
801
802 let conn = self.connection.clone();
803 let session_id = session_id.clone();
804 cx.foreground_executor().spawn(async move {
805 conn.close_session(acp::CloseSessionRequest::new(session_id.clone()))
806 .await?;
807 self.sessions.borrow_mut().remove(&session_id);
808 Ok(())
809 })
810 }
811
812 fn auth_methods(&self) -> &[acp::AuthMethod] {
813 &self.auth_methods
814 }
815
816 fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
817 let conn = self.connection.clone();
818 cx.foreground_executor().spawn(async move {
819 conn.authenticate(acp::AuthenticateRequest::new(method_id))
820 .await?;
821 Ok(())
822 })
823 }
824
825 fn prompt(
826 &self,
827 _id: Option<acp_thread::UserMessageId>,
828 params: acp::PromptRequest,
829 cx: &mut App,
830 ) -> Task<Result<acp::PromptResponse>> {
831 let conn = self.connection.clone();
832 let sessions = self.sessions.clone();
833 let session_id = params.session_id.clone();
834 cx.foreground_executor().spawn(async move {
835 let result = conn.prompt(params).await;
836
837 let mut suppress_abort_err = false;
838
839 if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
840 suppress_abort_err = session.suppress_abort_err;
841 session.suppress_abort_err = false;
842 }
843
844 match result {
845 Ok(response) => Ok(response),
846 Err(err) => {
847 if err.code == acp::ErrorCode::AuthRequired {
848 return Err(anyhow!(acp::Error::auth_required()));
849 }
850
851 if err.code != ErrorCode::InternalError {
852 anyhow::bail!(err)
853 }
854
855 let Some(data) = &err.data else {
856 anyhow::bail!(err)
857 };
858
859 // Temporary workaround until the following PR is generally available:
860 // https://github.com/google-gemini/gemini-cli/pull/6656
861
862 #[derive(Deserialize)]
863 #[serde(deny_unknown_fields)]
864 struct ErrorDetails {
865 details: Box<str>,
866 }
867
868 match serde_json::from_value(data.clone()) {
869 Ok(ErrorDetails { details }) => {
870 if suppress_abort_err
871 && (details.contains("This operation was aborted")
872 || details.contains("The user aborted a request"))
873 {
874 Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
875 } else {
876 Err(anyhow!(details))
877 }
878 }
879 Err(_) => Err(anyhow!(err)),
880 }
881 }
882 }
883 })
884 }
885
886 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
887 if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
888 session.suppress_abort_err = true;
889 }
890 let conn = self.connection.clone();
891 let params = acp::CancelNotification::new(session_id.clone());
892 cx.foreground_executor()
893 .spawn(async move { conn.cancel(params).await })
894 .detach();
895 }
896
897 fn session_modes(
898 &self,
899 session_id: &acp::SessionId,
900 _cx: &App,
901 ) -> Option<Rc<dyn acp_thread::AgentSessionModes>> {
902 let sessions = self.sessions.clone();
903 let sessions_ref = sessions.borrow();
904 let Some(session) = sessions_ref.get(session_id) else {
905 return None;
906 };
907
908 if let Some(modes) = session.session_modes.as_ref() {
909 Some(Rc::new(AcpSessionModes {
910 connection: self.connection.clone(),
911 session_id: session_id.clone(),
912 state: modes.clone(),
913 }) as _)
914 } else {
915 None
916 }
917 }
918
919 fn model_selector(
920 &self,
921 session_id: &acp::SessionId,
922 ) -> Option<Rc<dyn acp_thread::AgentModelSelector>> {
923 let sessions = self.sessions.clone();
924 let sessions_ref = sessions.borrow();
925 let Some(session) = sessions_ref.get(session_id) else {
926 return None;
927 };
928
929 if let Some(models) = session.models.as_ref() {
930 Some(Rc::new(AcpModelSelector::new(
931 session_id.clone(),
932 self.connection.clone(),
933 models.clone(),
934 )) as _)
935 } else {
936 None
937 }
938 }
939
940 fn session_config_options(
941 &self,
942 session_id: &acp::SessionId,
943 _cx: &App,
944 ) -> Option<Rc<dyn acp_thread::AgentSessionConfigOptions>> {
945 let sessions = self.sessions.borrow();
946 let session = sessions.get(session_id)?;
947
948 let config_opts = session.config_options.as_ref()?;
949
950 Some(Rc::new(AcpSessionConfigOptions {
951 session_id: session_id.clone(),
952 connection: self.connection.clone(),
953 state: config_opts.config_options.clone(),
954 watch_tx: config_opts.tx.clone(),
955 watch_rx: config_opts.rx.clone(),
956 }) as _)
957 }
958
959 fn session_list(&self, _cx: &mut App) -> Option<Rc<dyn AgentSessionList>> {
960 self.session_list.clone().map(|s| s as _)
961 }
962
963 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
964 self
965 }
966}
967
968fn map_acp_error(err: acp::Error) -> anyhow::Error {
969 if err.code == acp::ErrorCode::AuthRequired {
970 let mut error = AuthRequired::new();
971
972 if err.message != acp::ErrorCode::AuthRequired.to_string() {
973 error = error.with_description(err.message);
974 }
975
976 anyhow!(error)
977 } else {
978 anyhow!(err)
979 }
980}
981
982fn mcp_servers_for_project(project: &Entity<Project>, cx: &App) -> Vec<acp::McpServer> {
983 let context_server_store = project.read(cx).context_server_store().read(cx);
984 let is_local = project.read(cx).is_local();
985 context_server_store
986 .configured_server_ids()
987 .iter()
988 .filter_map(|id| {
989 let configuration = context_server_store.configuration_for_server(id)?;
990 match &*configuration {
991 project::context_server_store::ContextServerConfiguration::Custom {
992 command,
993 remote,
994 ..
995 }
996 | project::context_server_store::ContextServerConfiguration::Extension {
997 command,
998 remote,
999 ..
1000 } if is_local || *remote => Some(acp::McpServer::Stdio(
1001 acp::McpServerStdio::new(id.0.to_string(), &command.path)
1002 .args(command.args.clone())
1003 .env(if let Some(env) = command.env.as_ref() {
1004 env.iter()
1005 .map(|(name, value)| acp::EnvVariable::new(name, value))
1006 .collect()
1007 } else {
1008 vec![]
1009 }),
1010 )),
1011 project::context_server_store::ContextServerConfiguration::Http {
1012 url,
1013 headers,
1014 timeout: _,
1015 } => Some(acp::McpServer::Http(
1016 acp::McpServerHttp::new(id.0.to_string(), url.to_string()).headers(
1017 headers
1018 .iter()
1019 .map(|(name, value)| acp::HttpHeader::new(name, value))
1020 .collect(),
1021 ),
1022 )),
1023 _ => None,
1024 }
1025 })
1026 .collect()
1027}
1028
1029fn config_state(
1030 modes: Option<acp::SessionModeState>,
1031 models: Option<acp::SessionModelState>,
1032 config_options: Option<Vec<acp::SessionConfigOption>>,
1033) -> (
1034 Option<Rc<RefCell<acp::SessionModeState>>>,
1035 Option<Rc<RefCell<acp::SessionModelState>>>,
1036 Option<Rc<RefCell<Vec<acp::SessionConfigOption>>>>,
1037) {
1038 if let Some(opts) = config_options {
1039 return (None, None, Some(Rc::new(RefCell::new(opts))));
1040 }
1041
1042 let modes = modes.map(|modes| Rc::new(RefCell::new(modes)));
1043 let models = models.map(|models| Rc::new(RefCell::new(models)));
1044 (modes, models, None)
1045}
1046
1047struct AcpSessionModes {
1048 session_id: acp::SessionId,
1049 connection: Rc<acp::ClientSideConnection>,
1050 state: Rc<RefCell<acp::SessionModeState>>,
1051}
1052
1053impl acp_thread::AgentSessionModes for AcpSessionModes {
1054 fn current_mode(&self) -> acp::SessionModeId {
1055 self.state.borrow().current_mode_id.clone()
1056 }
1057
1058 fn all_modes(&self) -> Vec<acp::SessionMode> {
1059 self.state.borrow().available_modes.clone()
1060 }
1061
1062 fn set_mode(&self, mode_id: acp::SessionModeId, cx: &mut App) -> Task<Result<()>> {
1063 let connection = self.connection.clone();
1064 let session_id = self.session_id.clone();
1065 let old_mode_id;
1066 {
1067 let mut state = self.state.borrow_mut();
1068 old_mode_id = state.current_mode_id.clone();
1069 state.current_mode_id = mode_id.clone();
1070 };
1071 let state = self.state.clone();
1072 cx.foreground_executor().spawn(async move {
1073 let result = connection
1074 .set_session_mode(acp::SetSessionModeRequest::new(session_id, mode_id))
1075 .await;
1076
1077 if result.is_err() {
1078 state.borrow_mut().current_mode_id = old_mode_id;
1079 }
1080
1081 result?;
1082
1083 Ok(())
1084 })
1085 }
1086}
1087
1088struct AcpModelSelector {
1089 session_id: acp::SessionId,
1090 connection: Rc<acp::ClientSideConnection>,
1091 state: Rc<RefCell<acp::SessionModelState>>,
1092}
1093
1094impl AcpModelSelector {
1095 fn new(
1096 session_id: acp::SessionId,
1097 connection: Rc<acp::ClientSideConnection>,
1098 state: Rc<RefCell<acp::SessionModelState>>,
1099 ) -> Self {
1100 Self {
1101 session_id,
1102 connection,
1103 state,
1104 }
1105 }
1106}
1107
1108impl acp_thread::AgentModelSelector for AcpModelSelector {
1109 fn list_models(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelList>> {
1110 Task::ready(Ok(acp_thread::AgentModelList::Flat(
1111 self.state
1112 .borrow()
1113 .available_models
1114 .clone()
1115 .into_iter()
1116 .map(acp_thread::AgentModelInfo::from)
1117 .collect(),
1118 )))
1119 }
1120
1121 fn select_model(&self, model_id: acp::ModelId, cx: &mut App) -> Task<Result<()>> {
1122 let connection = self.connection.clone();
1123 let session_id = self.session_id.clone();
1124 let old_model_id;
1125 {
1126 let mut state = self.state.borrow_mut();
1127 old_model_id = state.current_model_id.clone();
1128 state.current_model_id = model_id.clone();
1129 };
1130 let state = self.state.clone();
1131 cx.foreground_executor().spawn(async move {
1132 let result = connection
1133 .set_session_model(acp::SetSessionModelRequest::new(session_id, model_id))
1134 .await;
1135
1136 if result.is_err() {
1137 state.borrow_mut().current_model_id = old_model_id;
1138 }
1139
1140 result?;
1141
1142 Ok(())
1143 })
1144 }
1145
1146 fn selected_model(&self, _cx: &mut App) -> Task<Result<acp_thread::AgentModelInfo>> {
1147 let state = self.state.borrow();
1148 Task::ready(
1149 state
1150 .available_models
1151 .iter()
1152 .find(|m| m.model_id == state.current_model_id)
1153 .cloned()
1154 .map(acp_thread::AgentModelInfo::from)
1155 .ok_or_else(|| anyhow::anyhow!("Model not found")),
1156 )
1157 }
1158}
1159
1160struct AcpSessionConfigOptions {
1161 session_id: acp::SessionId,
1162 connection: Rc<acp::ClientSideConnection>,
1163 state: Rc<RefCell<Vec<acp::SessionConfigOption>>>,
1164 watch_tx: Rc<RefCell<watch::Sender<()>>>,
1165 watch_rx: watch::Receiver<()>,
1166}
1167
1168impl acp_thread::AgentSessionConfigOptions for AcpSessionConfigOptions {
1169 fn config_options(&self) -> Vec<acp::SessionConfigOption> {
1170 self.state.borrow().clone()
1171 }
1172
1173 fn set_config_option(
1174 &self,
1175 config_id: acp::SessionConfigId,
1176 value: acp::SessionConfigValueId,
1177 cx: &mut App,
1178 ) -> Task<Result<Vec<acp::SessionConfigOption>>> {
1179 let connection = self.connection.clone();
1180 let session_id = self.session_id.clone();
1181 let state = self.state.clone();
1182
1183 let watch_tx = self.watch_tx.clone();
1184
1185 cx.foreground_executor().spawn(async move {
1186 let response = connection
1187 .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
1188 session_id, config_id, value,
1189 ))
1190 .await?;
1191
1192 *state.borrow_mut() = response.config_options.clone();
1193 watch_tx.borrow_mut().send(()).ok();
1194 Ok(response.config_options)
1195 })
1196 }
1197
1198 fn watch(&self, _cx: &mut App) -> Option<watch::Receiver<()>> {
1199 Some(self.watch_rx.clone())
1200 }
1201}
1202
1203struct ClientDelegate {
1204 sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
1205 session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>>,
1206 cx: AsyncApp,
1207}
1208
1209#[async_trait::async_trait(?Send)]
1210impl acp::Client for ClientDelegate {
1211 async fn request_permission(
1212 &self,
1213 arguments: acp::RequestPermissionRequest,
1214 ) -> Result<acp::RequestPermissionResponse, acp::Error> {
1215 let thread;
1216 {
1217 let sessions_ref = self.sessions.borrow();
1218 let session = sessions_ref
1219 .get(&arguments.session_id)
1220 .context("Failed to get session")?;
1221 thread = session.thread.clone();
1222 }
1223
1224 let cx = &mut self.cx.clone();
1225
1226 let task = thread.update(cx, |thread, cx| {
1227 thread.request_tool_call_authorization(
1228 arguments.tool_call,
1229 acp_thread::PermissionOptions::Flat(arguments.options),
1230 cx,
1231 )
1232 })??;
1233
1234 let outcome = task.await;
1235
1236 Ok(acp::RequestPermissionResponse::new(outcome))
1237 }
1238
1239 async fn write_text_file(
1240 &self,
1241 arguments: acp::WriteTextFileRequest,
1242 ) -> Result<acp::WriteTextFileResponse, acp::Error> {
1243 let cx = &mut self.cx.clone();
1244 let task = self
1245 .session_thread(&arguments.session_id)?
1246 .update(cx, |thread, cx| {
1247 thread.write_text_file(arguments.path, arguments.content, cx)
1248 })?;
1249
1250 task.await?;
1251
1252 Ok(Default::default())
1253 }
1254
1255 async fn read_text_file(
1256 &self,
1257 arguments: acp::ReadTextFileRequest,
1258 ) -> Result<acp::ReadTextFileResponse, acp::Error> {
1259 let task = self.session_thread(&arguments.session_id)?.update(
1260 &mut self.cx.clone(),
1261 |thread, cx| {
1262 thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx)
1263 },
1264 )?;
1265
1266 let content = task.await?;
1267
1268 Ok(acp::ReadTextFileResponse::new(content))
1269 }
1270
1271 async fn session_notification(
1272 &self,
1273 notification: acp::SessionNotification,
1274 ) -> Result<(), acp::Error> {
1275 let sessions = self.sessions.borrow();
1276 let session = sessions
1277 .get(¬ification.session_id)
1278 .context("Failed to get session")?;
1279
1280 if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1281 current_mode_id,
1282 ..
1283 }) = ¬ification.update
1284 {
1285 if let Some(session_modes) = &session.session_modes {
1286 session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
1287 }
1288 }
1289
1290 if let acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1291 config_options,
1292 ..
1293 }) = ¬ification.update
1294 {
1295 if let Some(opts) = &session.config_options {
1296 *opts.config_options.borrow_mut() = config_options.clone();
1297 opts.tx.borrow_mut().send(()).ok();
1298 }
1299 }
1300
1301 if let acp::SessionUpdate::SessionInfoUpdate(info_update) = ¬ification.update
1302 && let Some(session_list) = self.session_list.borrow().as_ref()
1303 {
1304 session_list.send_info_update(notification.session_id.clone(), info_update.clone());
1305 }
1306
1307 // Clone so we can inspect meta both before and after handing off to the thread
1308 let update_clone = notification.update.clone();
1309
1310 // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
1311 if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
1312 if let Some(meta) = &tc.meta {
1313 if let Some(terminal_info) = meta.get("terminal_info") {
1314 if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str())
1315 {
1316 let terminal_id = acp::TerminalId::new(id_str);
1317 let cwd = terminal_info
1318 .get("cwd")
1319 .and_then(|v| v.as_str().map(PathBuf::from));
1320
1321 // Create a minimal display-only lower-level terminal and register it.
1322 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1323 let builder = TerminalBuilder::new_display_only(
1324 CursorShape::default(),
1325 AlternateScroll::On,
1326 None,
1327 0,
1328 cx.background_executor(),
1329 thread.project().read(cx).path_style(cx),
1330 )?;
1331 let lower = cx.new(|cx| builder.subscribe(cx));
1332 thread.on_terminal_provider_event(
1333 TerminalProviderEvent::Created {
1334 terminal_id,
1335 label: tc.title.clone(),
1336 cwd,
1337 output_byte_limit: None,
1338 terminal: lower,
1339 },
1340 cx,
1341 );
1342 anyhow::Ok(())
1343 });
1344 }
1345 }
1346 }
1347 }
1348
1349 // Forward the update to the acp_thread as usual.
1350 session.thread.update(&mut self.cx.clone(), |thread, cx| {
1351 thread.handle_session_update(notification.update.clone(), cx)
1352 })??;
1353
1354 // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
1355 if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone {
1356 if let Some(meta) = &tcu.meta {
1357 if let Some(term_out) = meta.get("terminal_output") {
1358 if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {
1359 let terminal_id = acp::TerminalId::new(id_str);
1360 if let Some(s) = term_out.get("data").and_then(|v| v.as_str()) {
1361 let data = s.as_bytes().to_vec();
1362 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1363 thread.on_terminal_provider_event(
1364 TerminalProviderEvent::Output { terminal_id, data },
1365 cx,
1366 );
1367 });
1368 }
1369 }
1370 }
1371
1372 // terminal_exit
1373 if let Some(term_exit) = meta.get("terminal_exit") {
1374 if let Some(id_str) = term_exit.get("terminal_id").and_then(|v| v.as_str()) {
1375 let terminal_id = acp::TerminalId::new(id_str);
1376 let status = acp::TerminalExitStatus::new()
1377 .exit_code(
1378 term_exit
1379 .get("exit_code")
1380 .and_then(|v| v.as_u64())
1381 .map(|i| i as u32),
1382 )
1383 .signal(
1384 term_exit
1385 .get("signal")
1386 .and_then(|v| v.as_str().map(|s| s.to_string())),
1387 );
1388
1389 let _ = session.thread.update(&mut self.cx.clone(), |thread, cx| {
1390 thread.on_terminal_provider_event(
1391 TerminalProviderEvent::Exit {
1392 terminal_id,
1393 status,
1394 },
1395 cx,
1396 );
1397 });
1398 }
1399 }
1400 }
1401 }
1402
1403 Ok(())
1404 }
1405
1406 async fn create_terminal(
1407 &self,
1408 args: acp::CreateTerminalRequest,
1409 ) -> Result<acp::CreateTerminalResponse, acp::Error> {
1410 let thread = self.session_thread(&args.session_id)?;
1411 let project = thread.read_with(&self.cx, |thread, _cx| thread.project().clone())?;
1412
1413 let terminal_entity = acp_thread::create_terminal_entity(
1414 args.command.clone(),
1415 &args.args,
1416 args.env
1417 .into_iter()
1418 .map(|env| (env.name, env.value))
1419 .collect(),
1420 args.cwd.clone(),
1421 &project,
1422 &mut self.cx.clone(),
1423 )
1424 .await?;
1425
1426 // Register with renderer
1427 let terminal_entity = thread.update(&mut self.cx.clone(), |thread, cx| {
1428 thread.register_terminal_created(
1429 acp::TerminalId::new(uuid::Uuid::new_v4().to_string()),
1430 format!("{} {}", args.command, args.args.join(" ")),
1431 args.cwd.clone(),
1432 args.output_byte_limit,
1433 terminal_entity,
1434 cx,
1435 )
1436 })?;
1437 let terminal_id = terminal_entity.read_with(&self.cx, |terminal, _| terminal.id().clone());
1438 Ok(acp::CreateTerminalResponse::new(terminal_id))
1439 }
1440
1441 async fn kill_terminal(
1442 &self,
1443 args: acp::KillTerminalRequest,
1444 ) -> Result<acp::KillTerminalResponse, acp::Error> {
1445 self.session_thread(&args.session_id)?
1446 .update(&mut self.cx.clone(), |thread, cx| {
1447 thread.kill_terminal(args.terminal_id, cx)
1448 })??;
1449
1450 Ok(Default::default())
1451 }
1452
1453 async fn ext_method(&self, _args: acp::ExtRequest) -> Result<acp::ExtResponse, acp::Error> {
1454 Err(acp::Error::method_not_found())
1455 }
1456
1457 async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> {
1458 Err(acp::Error::method_not_found())
1459 }
1460
1461 async fn release_terminal(
1462 &self,
1463 args: acp::ReleaseTerminalRequest,
1464 ) -> Result<acp::ReleaseTerminalResponse, acp::Error> {
1465 self.session_thread(&args.session_id)?
1466 .update(&mut self.cx.clone(), |thread, cx| {
1467 thread.release_terminal(args.terminal_id, cx)
1468 })??;
1469
1470 Ok(Default::default())
1471 }
1472
1473 async fn terminal_output(
1474 &self,
1475 args: acp::TerminalOutputRequest,
1476 ) -> Result<acp::TerminalOutputResponse, acp::Error> {
1477 self.session_thread(&args.session_id)?
1478 .read_with(&mut self.cx.clone(), |thread, cx| {
1479 let out = thread
1480 .terminal(args.terminal_id)?
1481 .read(cx)
1482 .current_output(cx);
1483
1484 Ok(out)
1485 })?
1486 }
1487
1488 async fn wait_for_terminal_exit(
1489 &self,
1490 args: acp::WaitForTerminalExitRequest,
1491 ) -> Result<acp::WaitForTerminalExitResponse, acp::Error> {
1492 let exit_status = self
1493 .session_thread(&args.session_id)?
1494 .update(&mut self.cx.clone(), |thread, cx| {
1495 anyhow::Ok(thread.terminal(args.terminal_id)?.read(cx).wait_for_exit())
1496 })??
1497 .await;
1498
1499 Ok(acp::WaitForTerminalExitResponse::new(exit_status))
1500 }
1501}
1502
1503impl ClientDelegate {
1504 fn session_thread(&self, session_id: &acp::SessionId) -> Result<WeakEntity<AcpThread>> {
1505 let sessions = self.sessions.borrow();
1506 sessions
1507 .get(session_id)
1508 .context("Failed to get session")
1509 .map(|session| session.thread.clone())
1510 }
1511}