active_thread.rs

   1use super::*;
   2
   3pub struct AcpThreadView {
   4    pub id: acp::SessionId,
   5    pub thread: Entity<AcpThread>,
   6    pub workspace: WeakEntity<Workspace>,
   7    pub entry_view_state: Entity<EntryViewState>,
   8    pub title_editor: Option<Entity<Editor>>,
   9    pub config_options_view: Option<Entity<ConfigOptionsView>>,
  10    pub mode_selector: Option<Entity<ModeSelector>>,
  11    pub model_selector: Option<Entity<AcpModelSelectorPopover>>,
  12    pub profile_selector: Option<Entity<ProfileSelector>>,
  13    pub permission_dropdown_handle: PopoverMenuHandle<ContextMenu>,
  14    pub thread_retry_status: Option<RetryStatus>,
  15    pub(super) thread_error: Option<ThreadError>,
  16    pub thread_error_markdown: Option<Entity<Markdown>>,
  17    pub token_limit_callout_dismissed: bool,
  18    pub(super) thread_feedback: ThreadFeedbackState,
  19    pub list_state: ListState,
  20    pub prompt_capabilities: Rc<RefCell<PromptCapabilities>>,
  21    pub available_commands: Rc<RefCell<Vec<agent_client_protocol::AvailableCommand>>>,
  22    pub cached_user_commands: Rc<RefCell<HashMap<String, UserSlashCommand>>>,
  23    pub cached_user_command_errors: Rc<RefCell<Vec<CommandLoadError>>>,
  24    /// Tracks which tool calls have their content/output expanded.
  25    /// Used for showing/hiding tool call results, terminal output, etc.
  26    pub expanded_tool_calls: HashSet<agent_client_protocol::ToolCallId>,
  27    pub expanded_tool_call_raw_inputs: HashSet<agent_client_protocol::ToolCallId>,
  28    pub expanded_thinking_blocks: HashSet<(usize, usize)>,
  29    pub expanded_subagents: HashSet<agent_client_protocol::SessionId>,
  30    pub subagent_scroll_handles: RefCell<HashMap<agent_client_protocol::SessionId, ScrollHandle>>,
  31    pub edits_expanded: bool,
  32    pub plan_expanded: bool,
  33    pub queue_expanded: bool,
  34    pub editor_expanded: bool,
  35    pub should_be_following: bool,
  36    pub editing_message: Option<usize>,
  37    pub local_queued_messages: Vec<QueuedMessage>,
  38    pub queued_message_editors: Vec<Entity<MessageEditor>>,
  39    pub queued_message_editor_subscriptions: Vec<Subscription>,
  40    pub last_synced_queue_length: usize,
  41    pub turn_fields: TurnFields,
  42    pub command_load_errors_dismissed: bool,
  43    pub discarded_partial_edits: HashSet<agent_client_protocol::ToolCallId>,
  44    pub is_loading_contents: bool,
  45    pub new_server_version_available: Option<SharedString>,
  46    pub resumed_without_history: bool,
  47    /// Tracks the selected granularity index for each tool call's permission dropdown.
  48    /// The index corresponds to the position in the allow_options list.
  49    /// Default is the last option (index pointing to "Only this time").
  50    pub selected_permission_granularity: HashMap<agent_client_protocol::ToolCallId, usize>,
  51    pub resume_thread_metadata: Option<AgentSessionInfo>,
  52    pub _cancel_task: Option<Task<()>>,
  53    pub skip_queue_processing_count: usize,
  54    pub user_interrupted_generation: bool,
  55    pub can_fast_track_queue: bool,
  56    pub hovered_edited_file_buttons: Option<usize>,
  57    pub _subscriptions: Vec<Subscription>,
  58}
  59
  60#[derive(Default)]
  61pub struct TurnFields {
  62    pub _turn_timer_task: Option<Task<()>>,
  63    pub last_turn_duration: Option<Duration>,
  64    pub last_turn_tokens: Option<u64>,
  65    pub turn_generation: usize,
  66    pub turn_started_at: Option<Instant>,
  67    pub turn_tokens: Option<u64>,
  68}
  69
  70impl AcpThreadView {
  71    pub fn new(
  72        thread: Entity<AcpThread>,
  73        workspace: WeakEntity<Workspace>,
  74        entry_view_state: Entity<EntryViewState>,
  75        title_editor: Option<Entity<Editor>>,
  76        config_options_view: Option<Entity<ConfigOptionsView>>,
  77        mode_selector: Option<Entity<ModeSelector>>,
  78        model_selector: Option<Entity<AcpModelSelectorPopover>>,
  79        profile_selector: Option<Entity<ProfileSelector>>,
  80        list_state: ListState,
  81        prompt_capabilities: Rc<RefCell<PromptCapabilities>>,
  82        available_commands: Rc<RefCell<Vec<agent_client_protocol::AvailableCommand>>>,
  83        cached_user_commands: Rc<RefCell<HashMap<String, UserSlashCommand>>>,
  84        cached_user_command_errors: Rc<RefCell<Vec<CommandLoadError>>>,
  85        resumed_without_history: bool,
  86        resume_thread_metadata: Option<AgentSessionInfo>,
  87        subscriptions: Vec<Subscription>,
  88        cx: &App,
  89    ) -> Self {
  90        let id = thread.read(cx).session_id().clone();
  91        Self {
  92            id,
  93            thread,
  94            workspace,
  95            entry_view_state,
  96            title_editor,
  97            config_options_view,
  98            mode_selector,
  99            model_selector,
 100            profile_selector,
 101            list_state,
 102            prompt_capabilities,
 103            available_commands,
 104            cached_user_commands,
 105            cached_user_command_errors,
 106            resumed_without_history,
 107            resume_thread_metadata,
 108            command_load_errors_dismissed: false,
 109            _subscriptions: subscriptions,
 110            permission_dropdown_handle: PopoverMenuHandle::default(),
 111            thread_retry_status: None,
 112            thread_error: None,
 113            thread_error_markdown: None,
 114            token_limit_callout_dismissed: false,
 115            thread_feedback: Default::default(),
 116            expanded_tool_calls: HashSet::default(),
 117            expanded_tool_call_raw_inputs: HashSet::default(),
 118            expanded_thinking_blocks: HashSet::default(),
 119            expanded_subagents: HashSet::default(),
 120            subagent_scroll_handles: RefCell::new(HashMap::default()),
 121            edits_expanded: false,
 122            plan_expanded: false,
 123            queue_expanded: true,
 124            editor_expanded: false,
 125            should_be_following: false,
 126            editing_message: None,
 127            local_queued_messages: Vec::new(),
 128            queued_message_editors: Vec::new(),
 129            queued_message_editor_subscriptions: Vec::new(),
 130            last_synced_queue_length: 0,
 131            turn_fields: TurnFields::default(),
 132            discarded_partial_edits: HashSet::default(),
 133            is_loading_contents: false,
 134            new_server_version_available: None,
 135            selected_permission_granularity: HashMap::default(),
 136            _cancel_task: None,
 137            skip_queue_processing_count: 0,
 138            user_interrupted_generation: false,
 139            can_fast_track_queue: false,
 140            hovered_edited_file_buttons: None,
 141        }
 142    }
 143
 144    pub(crate) fn as_native_connection(
 145        &self,
 146        cx: &App,
 147    ) -> Option<Rc<agent::NativeAgentConnection>> {
 148        let acp_thread = self.thread.read(cx);
 149        acp_thread.connection().clone().downcast()
 150    }
 151
 152    pub(crate) fn as_native_thread(&self, cx: &App) -> Option<Entity<agent::Thread>> {
 153        let acp_thread = self.thread.read(cx);
 154        self.as_native_connection(cx)?
 155            .thread(acp_thread.session_id(), cx)
 156    }
 157
 158    pub fn current_model_id(&self, cx: &App) -> Option<String> {
 159        let selector = self.model_selector.as_ref()?;
 160        let model = selector.read(cx).active_model(cx)?;
 161        Some(model.id.to_string())
 162    }
 163
 164    pub fn current_mode_id(&self, cx: &App) -> Option<Arc<str>> {
 165        if let Some(thread) = self.as_native_thread(cx) {
 166            Some(thread.read(cx).profile().0.clone())
 167        } else {
 168            let mode_selector = self.mode_selector.as_ref()?;
 169            Some(mode_selector.read(cx).mode().0)
 170        }
 171    }
 172
 173    pub fn has_queued_messages(&self) -> bool {
 174        !self.local_queued_messages.is_empty()
 175    }
 176
 177    pub fn is_imported_thread(&self, cx: &App) -> bool {
 178        let Some(thread) = self.as_native_thread(cx) else {
 179            return false;
 180        };
 181        thread.read(cx).is_imported()
 182    }
 183
 184    // turns
 185
 186    pub fn start_turn(&mut self, cx: &mut Context<AcpServerView>) -> usize {
 187        self.turn_fields.turn_generation += 1;
 188        let generation = self.turn_fields.turn_generation;
 189        self.turn_fields.turn_started_at = Some(Instant::now());
 190        self.turn_fields.last_turn_duration = None;
 191        self.turn_fields.last_turn_tokens = None;
 192        self.turn_fields.turn_tokens = Some(0);
 193        self.turn_fields._turn_timer_task = Some(cx.spawn(async move |this, cx| {
 194            loop {
 195                cx.background_executor().timer(Duration::from_secs(1)).await;
 196                if this.update(cx, |_, cx| cx.notify()).is_err() {
 197                    break;
 198                }
 199            }
 200        }));
 201        generation
 202    }
 203
 204    pub fn stop_turn(&mut self, generation: usize) {
 205        if self.turn_fields.turn_generation != generation {
 206            return;
 207        }
 208        self.turn_fields.last_turn_duration = self
 209            .turn_fields
 210            .turn_started_at
 211            .take()
 212            .map(|started| started.elapsed());
 213        self.turn_fields.last_turn_tokens = self.turn_fields.turn_tokens.take();
 214        self.turn_fields._turn_timer_task = None;
 215    }
 216
 217    pub fn update_turn_tokens(&mut self, cx: &App) {
 218        if let Some(usage) = self.thread.read(cx).token_usage() {
 219            if let Some(tokens) = &mut self.turn_fields.turn_tokens {
 220                *tokens += usage.output_tokens;
 221            }
 222        }
 223    }
 224
 225    // sending
 226
 227    pub fn send(
 228        &mut self,
 229        message_editor: Entity<MessageEditor>,
 230        agent_name: SharedString,
 231        login: Option<task::SpawnInTerminal>,
 232        window: &mut Window,
 233        cx: &mut Context<AcpServerView>,
 234    ) {
 235        let thread = &self.thread;
 236
 237        if self.is_loading_contents {
 238            return;
 239        }
 240
 241        let is_editor_empty = message_editor.read(cx).is_empty(cx);
 242        let is_generating = thread.read(cx).status() != ThreadStatus::Idle;
 243
 244        let has_queued = self.has_queued_messages();
 245        if is_editor_empty && self.can_fast_track_queue && has_queued {
 246            self.can_fast_track_queue = false;
 247            self.send_queued_message_at_index(0, true, window, cx);
 248            return;
 249        }
 250
 251        if is_editor_empty {
 252            return;
 253        }
 254
 255        if is_generating {
 256            self.queue_message(message_editor, window, cx);
 257            return;
 258        }
 259
 260        let text = message_editor.read(cx).text(cx);
 261        let text = text.trim();
 262        if text == "/login" || text == "/logout" {
 263            let connection = thread.read(cx).connection().clone();
 264            let can_login = !connection.auth_methods().is_empty() || login.is_some();
 265            // Does the agent have a specific logout command? Prefer that in case they need to reset internal state.
 266            let logout_supported = text == "/logout"
 267                && self
 268                    .available_commands
 269                    .borrow()
 270                    .iter()
 271                    .any(|command| command.name == "logout");
 272            if can_login && !logout_supported {
 273                message_editor.update(cx, |editor, cx| editor.clear(window, cx));
 274
 275                let this = cx.weak_entity();
 276                window.defer(cx, |window, cx| {
 277                    AcpServerView::handle_auth_required(
 278                        this,
 279                        AuthRequired::new(),
 280                        agent_name,
 281                        window,
 282                        cx,
 283                    );
 284                });
 285                cx.notify();
 286                return;
 287            }
 288        }
 289
 290        self.send_impl(message_editor, window, cx)
 291    }
 292
 293    pub fn send_impl(
 294        &mut self,
 295        message_editor: Entity<MessageEditor>,
 296        window: &mut Window,
 297        cx: &mut Context<AcpServerView>,
 298    ) {
 299        let full_mention_content = self.as_native_thread(cx).is_some_and(|thread| {
 300            // Include full contents when using minimal profile
 301            let thread = thread.read(cx);
 302            AgentSettings::get_global(cx)
 303                .profiles
 304                .get(thread.profile())
 305                .is_some_and(|profile| profile.tools.is_empty())
 306        });
 307
 308        let cached_commands = &self.cached_user_commands;
 309        let cached_errors = &self.cached_user_command_errors;
 310        let contents = message_editor.update(cx, |message_editor, cx| {
 311            message_editor.contents_with_cache(
 312                full_mention_content,
 313                Some(cached_commands.borrow().clone()),
 314                Some(cached_errors.borrow().clone()),
 315                cx,
 316            )
 317        });
 318
 319        self.thread_error.take();
 320        self.thread_feedback.clear();
 321        self.editing_message.take();
 322
 323        if self.should_be_following {
 324            self.workspace
 325                .update(cx, |workspace, cx| {
 326                    workspace.follow(CollaboratorId::Agent, window, cx);
 327                })
 328                .ok();
 329        }
 330
 331        let contents_task = cx.spawn_in(window, async move |_this, cx| {
 332            let (contents, tracked_buffers) = contents.await?;
 333
 334            if contents.is_empty() {
 335                return Ok(None);
 336            }
 337
 338            let _ = cx.update(|window, cx| {
 339                message_editor.update(cx, |message_editor, cx| {
 340                    message_editor.clear(window, cx);
 341                });
 342            });
 343
 344            Ok(Some((contents, tracked_buffers)))
 345        });
 346
 347        self.send_content(contents_task, window, cx);
 348    }
 349
 350    pub fn send_content(
 351        &mut self,
 352        contents_task: Task<anyhow::Result<Option<(Vec<acp::ContentBlock>, Vec<Entity<Buffer>>)>>>,
 353        window: &mut Window,
 354        cx: &mut Context<AcpServerView>,
 355    ) {
 356        let session_id = self.thread.read(cx).session_id().clone();
 357        let agent_telemetry_id = self.thread.read(cx).connection().telemetry_id();
 358        let thread = self.thread.downgrade();
 359
 360        self.is_loading_contents = true;
 361
 362        let model_id = self.current_model_id(cx);
 363        let mode_id = self.current_mode_id(cx);
 364        let guard = cx.new(|_| ());
 365        cx.observe_release(&guard, |this, _guard, cx| {
 366            if let Some(thread) = this.as_active_thread_mut() {
 367                thread.is_loading_contents = false;
 368            }
 369            cx.notify();
 370        })
 371        .detach();
 372
 373        let task = cx.spawn_in(window, async move |this, cx| {
 374            let Some((contents, tracked_buffers)) = contents_task.await? else {
 375                return Ok(());
 376            };
 377
 378            let generation = this.update_in(cx, |this, _window, cx| {
 379                this.in_flight_prompt = Some(contents.clone());
 380                let generation = this.start_turn(cx);
 381                this.set_editor_is_expanded(false, cx);
 382                this.scroll_to_bottom(cx);
 383                generation
 384            })?;
 385
 386            let _stop_turn = defer({
 387                let this = this.clone();
 388                let mut cx = cx.clone();
 389                move || {
 390                    this.update(&mut cx, |this, cx| {
 391                        this.stop_turn(generation);
 392                        cx.notify();
 393                    })
 394                    .ok();
 395                }
 396            });
 397            let turn_start_time = Instant::now();
 398            let send = thread.update(cx, |thread, cx| {
 399                thread.action_log().update(cx, |action_log, cx| {
 400                    for buffer in tracked_buffers {
 401                        action_log.buffer_read(buffer, cx)
 402                    }
 403                });
 404                drop(guard);
 405
 406                telemetry::event!(
 407                    "Agent Message Sent",
 408                    agent = agent_telemetry_id,
 409                    session = session_id,
 410                    model = model_id,
 411                    mode = mode_id
 412                );
 413
 414                thread.send(contents, cx)
 415            })?;
 416            let res = send.await;
 417            let turn_time_ms = turn_start_time.elapsed().as_millis();
 418            drop(_stop_turn);
 419            let status = if res.is_ok() {
 420                this.update(cx, |this, _| this.in_flight_prompt.take()).ok();
 421                "success"
 422            } else {
 423                "failure"
 424            };
 425            telemetry::event!(
 426                "Agent Turn Completed",
 427                agent = agent_telemetry_id,
 428                session = session_id,
 429                model = model_id,
 430                mode = mode_id,
 431                status,
 432                turn_time_ms,
 433            );
 434            res
 435        });
 436
 437        cx.spawn(async move |this, cx| {
 438            if let Err(err) = task.await {
 439                this.update(cx, |this, cx| {
 440                    this.handle_thread_error(err, cx);
 441                })
 442                .ok();
 443            } else {
 444                this.update(cx, |this, cx| {
 445                    let should_be_following = this
 446                        .workspace
 447                        .update(cx, |workspace, _| {
 448                            workspace.is_being_followed(CollaboratorId::Agent)
 449                        })
 450                        .unwrap_or_default();
 451                    if let Some(thread) = this.as_active_thread_mut() {
 452                        thread.should_be_following = should_be_following;
 453                    }
 454                })
 455                .ok();
 456            }
 457        })
 458        .detach();
 459    }
 460
 461    pub fn interrupt_and_send(
 462        &mut self,
 463        message_editor: Entity<MessageEditor>,
 464        window: &mut Window,
 465        cx: &mut Context<AcpServerView>,
 466    ) {
 467        let thread = &self.thread;
 468
 469        if self.is_loading_contents {
 470            return;
 471        }
 472
 473        if thread.read(cx).status() == ThreadStatus::Idle {
 474            self.send_impl(message_editor, window, cx);
 475            return;
 476        }
 477
 478        self.stop_current_and_send_new_message(window, cx);
 479    }
 480
 481    pub fn stop_current_and_send_new_message(
 482        &mut self,
 483        window: &mut Window,
 484        cx: &mut Context<AcpServerView>,
 485    ) {
 486        let thread = self.thread.clone();
 487        self.skip_queue_processing_count = 0;
 488        self.user_interrupted_generation = true;
 489
 490        let cancelled = thread.update(cx, |thread, cx| thread.cancel(cx));
 491
 492        cx.spawn_in(window, async move |this, cx| {
 493            cancelled.await;
 494
 495            this.update_in(cx, |this, window, cx| {
 496                this.send_impl(this.message_editor.clone(), window, cx);
 497            })
 498            .ok();
 499        })
 500        .detach();
 501    }
 502
 503    // generation
 504
 505    pub fn cancel_generation(&mut self, cx: &mut Context<AcpServerView>) {
 506        self.thread_retry_status.take();
 507        self.thread_error.take();
 508        self.user_interrupted_generation = true;
 509        self._cancel_task = Some(self.thread.update(cx, |thread, cx| thread.cancel(cx)));
 510    }
 511
 512    pub fn retry_generation(&mut self, cx: &mut Context<AcpServerView>) {
 513        self.thread_error.take();
 514
 515        let thread = &self.thread;
 516        if !thread.read(cx).can_retry(cx) {
 517            return;
 518        }
 519
 520        let task = thread.update(cx, |thread, cx| thread.retry(cx));
 521        cx.spawn(async move |this, cx| {
 522            let result = task.await;
 523
 524            this.update(cx, |this, cx| {
 525                if let Err(err) = result {
 526                    this.handle_thread_error(err, cx);
 527                }
 528            })
 529        })
 530        .detach();
 531    }
 532
 533    pub fn regenerate(
 534        &mut self,
 535        entry_ix: usize,
 536        message_editor: Entity<MessageEditor>,
 537        window: &mut Window,
 538        cx: &mut Context<AcpServerView>,
 539    ) {
 540        if self.is_loading_contents {
 541            return;
 542        }
 543        let thread = self.thread.clone();
 544
 545        let Some(user_message_id) = thread.update(cx, |thread, _| {
 546            thread.entries().get(entry_ix)?.user_message()?.id.clone()
 547        }) else {
 548            return;
 549        };
 550
 551        cx.spawn_in(window, async move |this, cx| {
 552            // Check if there are any edits from prompts before the one being regenerated.
 553            //
 554            // If there are, we keep/accept them since we're not regenerating the prompt that created them.
 555            //
 556            // If editing the prompt that generated the edits, they are auto-rejected
 557            // through the `rewind` function in the `acp_thread`.
 558            let has_earlier_edits = thread.read_with(cx, |thread, _| {
 559                thread
 560                    .entries()
 561                    .iter()
 562                    .take(entry_ix)
 563                    .any(|entry| entry.diffs().next().is_some())
 564            });
 565
 566            if has_earlier_edits {
 567                thread.update(cx, |thread, cx| {
 568                    thread.action_log().update(cx, |action_log, cx| {
 569                        action_log.keep_all_edits(None, cx);
 570                    });
 571                });
 572            }
 573
 574            thread
 575                .update(cx, |thread, cx| thread.rewind(user_message_id, cx))
 576                .await?;
 577            this.update_in(cx, |this, window, cx| {
 578                this.send_impl(message_editor, window, cx);
 579                this.focus_handle(cx).focus(window, cx);
 580            })?;
 581            anyhow::Ok(())
 582        })
 583        .detach_and_log_err(cx);
 584    }
 585
 586    // message queueing
 587
 588    pub fn queue_message(
 589        &mut self,
 590        message_editor: Entity<MessageEditor>,
 591        window: &mut Window,
 592        cx: &mut Context<AcpServerView>,
 593    ) {
 594        let is_idle = self.thread.read(cx).status() == acp_thread::ThreadStatus::Idle;
 595
 596        if is_idle {
 597            self.send_impl(message_editor.clone(), window, cx);
 598            return;
 599        }
 600
 601        let full_mention_content = self.as_native_thread(cx).is_some_and(|thread| {
 602            let thread = thread.read(cx);
 603            AgentSettings::get_global(cx)
 604                .profiles
 605                .get(thread.profile())
 606                .is_some_and(|profile| profile.tools.is_empty())
 607        });
 608
 609        let cached_commands = self.cached_user_commands.borrow().clone();
 610        let cached_errors = self.cached_user_command_errors.borrow().clone();
 611        let contents = message_editor.update(cx, |message_editor, cx| {
 612            message_editor.contents_with_cache(
 613                full_mention_content,
 614                Some(cached_commands),
 615                Some(cached_errors),
 616                cx,
 617            )
 618        });
 619
 620        cx.spawn_in(window, async move |this, cx| {
 621            let (content, tracked_buffers) = contents.await?;
 622
 623            if content.is_empty() {
 624                return Ok::<(), anyhow::Error>(());
 625            }
 626
 627            this.update_in(cx, |this, window, cx| {
 628                this.add_to_queue(content, tracked_buffers, cx);
 629                // Enable fast-track: user can press Enter again to send this queued message immediately
 630                this.set_can_fast_track_queue(true);
 631                message_editor.update(cx, |message_editor, cx| {
 632                    message_editor.clear(window, cx);
 633                });
 634                cx.notify();
 635            })?;
 636            Ok(())
 637        })
 638        .detach_and_log_err(cx);
 639    }
 640
 641    pub fn remove_from_queue(
 642        &mut self,
 643        index: usize,
 644        cx: &mut Context<AcpServerView>,
 645    ) -> Option<QueuedMessage> {
 646        if index < self.local_queued_messages.len() {
 647            let removed = self.local_queued_messages.remove(index);
 648            self.sync_queue_flag_to_native_thread(cx);
 649            Some(removed)
 650        } else {
 651            None
 652        }
 653    }
 654
 655    pub fn sync_queue_flag_to_native_thread(&self, cx: &mut Context<AcpServerView>) {
 656        if let Some(native_thread) = self.as_native_thread(cx) {
 657            let has_queued = self.has_queued_messages();
 658            native_thread.update(cx, |thread, _| {
 659                thread.set_has_queued_message(has_queued);
 660            });
 661        }
 662    }
 663
 664    pub fn send_queued_message_at_index(
 665        &mut self,
 666        index: usize,
 667        is_send_now: bool,
 668        window: &mut Window,
 669        cx: &mut Context<AcpServerView>,
 670    ) {
 671        let Some(queued) = self.remove_from_queue(index, cx) else {
 672            return;
 673        };
 674        let content = queued.content;
 675        let tracked_buffers = queued.tracked_buffers;
 676
 677        // Only increment skip count for "Send Now" operations (out-of-order sends)
 678        // Normal auto-processing from the Stopped handler doesn't need to skip.
 679        // We only skip the Stopped event from the cancelled generation, NOT the
 680        // Stopped event from the newly sent message (which should trigger queue processing).
 681        if is_send_now {
 682            let is_generating =
 683                self.thread.read(cx).status() == acp_thread::ThreadStatus::Generating;
 684            self.skip_queue_processing_count += if is_generating { 1 } else { 0 };
 685        }
 686
 687        let cancelled = self.thread.update(cx, |thread, cx| thread.cancel(cx));
 688
 689        let workspace = self.workspace.clone();
 690
 691        let should_be_following = self.should_be_following;
 692        let contents_task = cx.spawn_in(window, async move |_this, cx| {
 693            cancelled.await;
 694            if should_be_following {
 695                workspace
 696                    .update_in(cx, |workspace, window, cx| {
 697                        workspace.follow(CollaboratorId::Agent, window, cx);
 698                    })
 699                    .ok();
 700            }
 701
 702            Ok(Some((content, tracked_buffers)))
 703        });
 704
 705        self.send_content(contents_task, window, cx);
 706    }
 707
 708    // editor methods
 709
 710    pub fn expand_message_editor(
 711        &mut self,
 712        message_editor: Entity<MessageEditor>,
 713        cx: &mut Context<AcpServerView>,
 714    ) {
 715        self.set_editor_is_expanded(!self.editor_expanded, message_editor, cx);
 716        cx.stop_propagation();
 717        cx.notify();
 718    }
 719
 720    pub fn set_editor_is_expanded(
 721        &mut self,
 722        is_expanded: bool,
 723        message_editor: Entity<MessageEditor>,
 724        cx: &mut Context<AcpServerView>,
 725    ) {
 726        self.editor_expanded = is_expanded;
 727        message_editor.update(cx, |editor, cx| {
 728            if is_expanded {
 729                editor.set_mode(
 730                    EditorMode::Full {
 731                        scale_ui_elements_with_buffer_font_size: false,
 732                        show_active_line_background: false,
 733                        sizing_behavior: SizingBehavior::ExcludeOverscrollMargin,
 734                    },
 735                    cx,
 736                )
 737            } else {
 738                let agent_settings = AgentSettings::get_global(cx);
 739                editor.set_mode(
 740                    EditorMode::AutoHeight {
 741                        min_lines: agent_settings.message_editor_min_lines,
 742                        max_lines: Some(agent_settings.set_message_editor_max_lines()),
 743                    },
 744                    cx,
 745                )
 746            }
 747        });
 748        cx.notify();
 749    }
 750
 751    pub fn handle_title_editor_event(
 752        &mut self,
 753        title_editor: &Entity<Editor>,
 754        event: &EditorEvent,
 755        window: &mut Window,
 756        cx: &mut Context<AcpServerView>,
 757    ) {
 758        let thread = &self.thread;
 759
 760        match event {
 761            EditorEvent::BufferEdited => {
 762                let new_title = title_editor.read(cx).text(cx);
 763                thread.update(cx, |thread, cx| {
 764                    thread
 765                        .set_title(new_title.into(), cx)
 766                        .detach_and_log_err(cx);
 767                })
 768            }
 769            EditorEvent::Blurred => {
 770                if title_editor.read(cx).text(cx).is_empty() {
 771                    title_editor.update(cx, |editor, cx| {
 772                        editor.set_text("New Thread", window, cx);
 773                    });
 774                }
 775            }
 776            _ => {}
 777        }
 778    }
 779
 780    pub fn cancel_editing(
 781        &mut self,
 782        focus_handle: FocusHandle,
 783        window: &mut Window,
 784        cx: &mut Context<AcpServerView>,
 785    ) {
 786        if let Some(index) = self.editing_message.take()
 787            && let Some(editor) = &self
 788                .entry_view_state
 789                .read(cx)
 790                .entry(index)
 791                .and_then(|e| e.message_editor())
 792                .cloned()
 793        {
 794            editor.update(cx, |editor, cx| {
 795                if let Some(user_message) = self
 796                    .thread
 797                    .read(cx)
 798                    .entries()
 799                    .get(index)
 800                    .and_then(|e| e.user_message())
 801                {
 802                    editor.set_message(user_message.chunks.clone(), window, cx);
 803                }
 804            })
 805        };
 806        focus_handle.focus(window, cx);
 807        cx.notify();
 808    }
 809
 810    // tool permissions
 811
 812    pub fn authorize_tool_call(
 813        &mut self,
 814        tool_call_id: acp::ToolCallId,
 815        option_id: acp::PermissionOptionId,
 816        option_kind: acp::PermissionOptionKind,
 817        window: &mut Window,
 818        cx: &mut Context<AcpServerView>,
 819    ) {
 820        let thread = &self.thread;
 821        let agent_telemetry_id = thread.read(cx).connection().telemetry_id();
 822
 823        telemetry::event!(
 824            "Agent Tool Call Authorized",
 825            agent = agent_telemetry_id,
 826            session = thread.read(cx).session_id(),
 827            option = option_kind
 828        );
 829
 830        thread.update(cx, |thread, cx| {
 831            thread.authorize_tool_call(tool_call_id, option_id, option_kind, cx);
 832        });
 833        if self.should_be_following {
 834            self.workspace
 835                .update(cx, |workspace, cx| {
 836                    workspace.follow(CollaboratorId::Agent, window, cx);
 837                })
 838                .ok();
 839        }
 840        cx.notify();
 841    }
 842
 843    pub fn authorize_pending_tool_call(
 844        &mut self,
 845        kind: acp::PermissionOptionKind,
 846        window: &mut Window,
 847        cx: &mut Context<AcpServerView>,
 848    ) -> Option<()> {
 849        let thread = self.thread.read(cx);
 850        let tool_call = thread.first_tool_awaiting_confirmation()?;
 851        let ToolCallStatus::WaitingForConfirmation { options, .. } = &tool_call.status else {
 852            return None;
 853        };
 854        let option = options.first_option_of_kind(kind)?;
 855
 856        self.authorize_tool_call(
 857            tool_call.id.clone(),
 858            option.option_id.clone(),
 859            option.kind,
 860            window,
 861            cx,
 862        );
 863
 864        Some(())
 865    }
 866
 867    pub fn handle_select_permission_granularity(
 868        &mut self,
 869        action: &SelectPermissionGranularity,
 870        cx: &mut Context<AcpServerView>,
 871    ) {
 872        let tool_call_id = acp::ToolCallId::new(action.tool_call_id.clone());
 873        self.selected_permission_granularity
 874            .insert(tool_call_id, action.index);
 875
 876        cx.notify();
 877    }
 878
 879    // edits
 880
 881    pub fn keep_all(&mut self, cx: &mut Context<AcpServerView>) {
 882        let thread = &self.thread;
 883        let telemetry = ActionLogTelemetry::from(thread.read(cx));
 884        let action_log = thread.read(cx).action_log().clone();
 885        action_log.update(cx, |action_log, cx| {
 886            action_log.keep_all_edits(Some(telemetry), cx)
 887        });
 888    }
 889
 890    pub fn reject_all(&mut self, cx: &mut Context<AcpServerView>) {
 891        let thread = &self.thread;
 892        let telemetry = ActionLogTelemetry::from(thread.read(cx));
 893        let action_log = thread.read(cx).action_log().clone();
 894        action_log
 895            .update(cx, |action_log, cx| {
 896                action_log.reject_all_edits(Some(telemetry), cx)
 897            })
 898            .detach();
 899    }
 900
 901    pub fn open_edited_buffer(
 902        &mut self,
 903        buffer: &Entity<Buffer>,
 904        window: &mut Window,
 905        cx: &mut Context<AcpServerView>,
 906    ) {
 907        let thread = &self.thread;
 908
 909        let Some(diff) =
 910            AgentDiffPane::deploy(thread.clone(), self.workspace.clone(), window, cx).log_err()
 911        else {
 912            return;
 913        };
 914
 915        diff.update(cx, |diff, cx| {
 916            diff.move_to_path(PathKey::for_buffer(buffer, cx), window, cx)
 917        })
 918    }
 919
 920    // thread stuff
 921
 922    pub fn sync_thread(
 923        &mut self,
 924        project: Entity<Project>,
 925        window: &mut Window,
 926        cx: &mut Context<AcpServerView>,
 927    ) {
 928        if !self.is_imported_thread(cx) {
 929            return;
 930        }
 931
 932        let Some(session_list) = self
 933            .as_native_connection(cx)
 934            .and_then(|connection| connection.session_list(cx))
 935            .and_then(|list| list.downcast::<NativeAgentSessionList>())
 936        else {
 937            return;
 938        };
 939        let thread_store = session_list.thread_store().clone();
 940
 941        let client = project.read(cx).client();
 942        let session_id = self.thread.read(cx).session_id().clone();
 943
 944        cx.spawn_in(window, async move |this, cx| {
 945            let response = client
 946                .request(proto::GetSharedAgentThread {
 947                    session_id: session_id.to_string(),
 948                })
 949                .await?;
 950
 951            let shared_thread = SharedThread::from_bytes(&response.thread_data)?;
 952
 953            let db_thread = shared_thread.to_db_thread();
 954
 955            thread_store
 956                .update(&mut cx.clone(), |store, cx| {
 957                    store.save_thread(session_id.clone(), db_thread, cx)
 958                })
 959                .await?;
 960
 961            let thread_metadata = AgentSessionInfo {
 962                session_id,
 963                cwd: None,
 964                title: Some(format!("🔗 {}", response.title).into()),
 965                updated_at: Some(chrono::Utc::now()),
 966                meta: None,
 967            };
 968
 969            this.update_in(cx, |this, window, cx| {
 970                if let Some(thread) = this.as_active_thread_mut() {
 971                    let resume_thread_metadata = &mut thread.resume_thread_metadata;
 972                    *resume_thread_metadata = Some(thread_metadata);
 973                }
 974                this.reset(window, cx);
 975            })?;
 976
 977            this.update_in(cx, |this, _window, cx| {
 978                if let Some(workspace) = this.workspace.upgrade() {
 979                    workspace.update(cx, |workspace, cx| {
 980                        struct ThreadSyncedToast;
 981                        workspace.show_toast(
 982                            Toast::new(
 983                                NotificationId::unique::<ThreadSyncedToast>(),
 984                                "Thread synced with latest version",
 985                            )
 986                            .autohide(),
 987                            cx,
 988                        );
 989                    });
 990                }
 991            })?;
 992
 993            anyhow::Ok(())
 994        })
 995        .detach_and_log_err(cx);
 996    }
 997
 998    pub fn restore_checkpoint(
 999        &mut self,
1000        message_id: &UserMessageId,
1001        cx: &mut Context<AcpServerView>,
1002    ) {
1003        self.thread
1004            .update(cx, |thread, cx| {
1005                thread.restore_checkpoint(message_id.clone(), cx)
1006            })
1007            .detach_and_log_err(cx);
1008    }
1009
1010    pub fn clear_thread_error(&mut self, cx: &mut Context<AcpServerView>) {
1011        self.thread_error = None;
1012        self.thread_error_markdown = None;
1013        self.token_limit_callout_dismissed = true;
1014        cx.notify();
1015    }
1016
1017    // other
1018
1019    pub fn refresh_cached_user_commands_from_registry(
1020        &mut self,
1021        registry: &Entity<SlashCommandRegistry>,
1022        cx: &App,
1023    ) {
1024        let (mut commands, mut errors) = registry.read_with(cx, |registry, _| {
1025            (registry.commands().clone(), registry.errors().to_vec())
1026        });
1027        let server_command_names = self
1028            .available_commands
1029            .borrow()
1030            .iter()
1031            .map(|command| command.name.clone())
1032            .collect::<HashSet<_>>();
1033        user_slash_command::apply_server_command_conflicts_to_map(
1034            &mut commands,
1035            &mut errors,
1036            &server_command_names,
1037        );
1038
1039        self.command_load_errors_dismissed = false;
1040        *self.cached_user_commands.borrow_mut() = commands;
1041        *self.cached_user_command_errors.borrow_mut() = errors;
1042    }
1043
1044    pub fn render_command_load_errors(
1045        &self,
1046        cx: &mut Context<AcpServerView>,
1047    ) -> Option<impl IntoElement> {
1048        let errors = self.cached_user_command_errors.borrow();
1049
1050        if self.command_load_errors_dismissed || errors.is_empty() {
1051            return None;
1052        }
1053
1054        let workspace = self.workspace.clone();
1055
1056        let error_count = errors.len();
1057        let title = if error_count == 1 {
1058            "Failed to load slash command"
1059        } else {
1060            "Failed to load slash commands"
1061        };
1062
1063        Some(
1064            Callout::new()
1065                .icon(IconName::Warning)
1066                .severity(Severity::Warning)
1067                .title(title)
1068                .actions_slot(
1069                    IconButton::new("dismiss-command-errors", IconName::Close)
1070                        .icon_size(IconSize::Small)
1071                        .icon_color(Color::Muted)
1072                        .tooltip(Tooltip::text("Dismiss Error"))
1073                        .on_click(cx.listener(|this, _, _, cx| {
1074                            this.clear_command_load_errors(cx);
1075                        })),
1076                )
1077                .description_slot(v_flex().children(errors.iter().enumerate().map({
1078                    move |(i, error)| {
1079                        let path = error.path.clone();
1080                        let workspace = workspace.clone();
1081                        let file_name = error
1082                            .path
1083                            .file_name()
1084                            .map(|n| n.to_string_lossy().to_string())
1085                            .unwrap_or_else(|| error.path.display().to_string());
1086                        let id = ElementId::Name(format!("command-error-{i}").into());
1087                        let label = format!("{}: {}", file_name, error.message);
1088
1089                        Button::new(id, label)
1090                            .label_size(LabelSize::Small)
1091                            .truncate(true)
1092                            .tooltip({
1093                                let message: SharedString = error.message.clone().into();
1094                                let path: SharedString = error.path.display().to_string().into();
1095                                move |_, cx| {
1096                                    Tooltip::with_meta(message.clone(), None, path.clone(), cx)
1097                                }
1098                            })
1099                            .on_click({
1100                                move |_, window, cx| {
1101                                    if let Some(workspace) = workspace.upgrade() {
1102                                        workspace.update(cx, |workspace, cx| {
1103                                            workspace
1104                                                .open_abs_path(
1105                                                    path.clone(),
1106                                                    OpenOptions::default(),
1107                                                    window,
1108                                                    cx,
1109                                                )
1110                                                .detach_and_log_err(cx);
1111                                        });
1112                                    }
1113                                }
1114                            })
1115                    }
1116                }))),
1117        )
1118    }
1119
1120    pub fn render_thread_retry_status_callout(&self) -> Option<Callout> {
1121        let state = self.thread_retry_status.as_ref()?;
1122
1123        let next_attempt_in = state
1124            .duration
1125            .saturating_sub(Instant::now().saturating_duration_since(state.started_at));
1126        if next_attempt_in.is_zero() {
1127            return None;
1128        }
1129
1130        let next_attempt_in_secs = next_attempt_in.as_secs() + 1;
1131
1132        let retry_message = if state.max_attempts == 1 {
1133            if next_attempt_in_secs == 1 {
1134                "Retrying. Next attempt in 1 second.".to_string()
1135            } else {
1136                format!("Retrying. Next attempt in {next_attempt_in_secs} seconds.")
1137            }
1138        } else if next_attempt_in_secs == 1 {
1139            format!(
1140                "Retrying. Next attempt in 1 second (Attempt {} of {}).",
1141                state.attempt, state.max_attempts,
1142            )
1143        } else {
1144            format!(
1145                "Retrying. Next attempt in {next_attempt_in_secs} seconds (Attempt {} of {}).",
1146                state.attempt, state.max_attempts,
1147            )
1148        };
1149
1150        Some(
1151            Callout::new()
1152                .icon(IconName::Warning)
1153                .severity(Severity::Warning)
1154                .title(state.last_error.clone())
1155                .description(retry_message),
1156        )
1157    }
1158
1159    pub fn handle_open_rules(&mut self, window: &mut Window, cx: &mut Context<AcpServerView>) {
1160        let Some(thread) = self.as_native_thread(cx) else {
1161            return;
1162        };
1163        let project_context = thread.read(cx).project_context().read(cx);
1164
1165        let project_entry_ids = project_context
1166            .worktrees
1167            .iter()
1168            .flat_map(|worktree| worktree.rules_file.as_ref())
1169            .map(|rules_file| ProjectEntryId::from_usize(rules_file.project_entry_id))
1170            .collect::<Vec<_>>();
1171
1172        self.workspace
1173            .update(cx, move |workspace, cx| {
1174                // TODO: Open a multibuffer instead? In some cases this doesn't make the set of rules
1175                // files clear. For example, if rules file 1 is already open but rules file 2 is not,
1176                // this would open and focus rules file 2 in a tab that is not next to rules file 1.
1177                let project = workspace.project().read(cx);
1178                let project_paths = project_entry_ids
1179                    .into_iter()
1180                    .flat_map(|entry_id| project.path_for_entry(entry_id, cx))
1181                    .collect::<Vec<_>>();
1182                for project_path in project_paths {
1183                    workspace
1184                        .open_path(project_path, None, true, window, cx)
1185                        .detach_and_log_err(cx);
1186                }
1187            })
1188            .ok();
1189    }
1190}