active_thread.rs

   1use super::*;
   2
   3pub struct ActiveThreadState {
   4    pub thread: Entity<AcpThread>,
   5    pub workspace: WeakEntity<Workspace>,
   6    pub entry_view_state: Entity<EntryViewState>,
   7    pub title_editor: Option<Entity<Editor>>,
   8    pub config_options_view: Option<Entity<ConfigOptionsView>>,
   9    pub mode_selector: Option<Entity<ModeSelector>>,
  10    pub model_selector: Option<Entity<AcpModelSelectorPopover>>,
  11    pub profile_selector: Option<Entity<ProfileSelector>>,
  12    pub permission_dropdown_handle: PopoverMenuHandle<ContextMenu>,
  13    pub thread_retry_status: Option<RetryStatus>,
  14    pub(super) thread_error: Option<ThreadError>,
  15    pub thread_error_markdown: Option<Entity<Markdown>>,
  16    pub token_limit_callout_dismissed: bool,
  17    pub(super) thread_feedback: ThreadFeedbackState,
  18    pub list_state: ListState,
  19    pub prompt_capabilities: Rc<RefCell<PromptCapabilities>>,
  20    pub available_commands: Rc<RefCell<Vec<agent_client_protocol::AvailableCommand>>>,
  21    pub cached_user_commands: Rc<RefCell<HashMap<String, UserSlashCommand>>>,
  22    pub cached_user_command_errors: Rc<RefCell<Vec<CommandLoadError>>>,
  23    /// Tracks which tool calls have their content/output expanded.
  24    /// Used for showing/hiding tool call results, terminal output, etc.
  25    pub expanded_tool_calls: HashSet<agent_client_protocol::ToolCallId>,
  26    pub expanded_tool_call_raw_inputs: HashSet<agent_client_protocol::ToolCallId>,
  27    pub expanded_thinking_blocks: HashSet<(usize, usize)>,
  28    pub expanded_subagents: HashSet<agent_client_protocol::SessionId>,
  29    pub subagent_scroll_handles: RefCell<HashMap<agent_client_protocol::SessionId, ScrollHandle>>,
  30    pub edits_expanded: bool,
  31    pub plan_expanded: bool,
  32    pub queue_expanded: bool,
  33    pub editor_expanded: bool,
  34    pub should_be_following: bool,
  35    pub editing_message: Option<usize>,
  36    pub local_queued_messages: Vec<QueuedMessage>,
  37    pub queued_message_editors: Vec<Entity<MessageEditor>>,
  38    pub queued_message_editor_subscriptions: Vec<Subscription>,
  39    pub last_synced_queue_length: usize,
  40    pub turn_fields: TurnFields,
  41    pub command_load_errors_dismissed: bool,
  42    pub discarded_partial_edits: HashSet<agent_client_protocol::ToolCallId>,
  43    pub is_loading_contents: bool,
  44    pub new_server_version_available: Option<SharedString>,
  45    pub resumed_without_history: bool,
  46    /// Tracks the selected granularity index for each tool call's permission dropdown.
  47    /// The index corresponds to the position in the allow_options list.
  48    /// Default is the last option (index pointing to "Only this time").
  49    pub selected_permission_granularity: HashMap<agent_client_protocol::ToolCallId, usize>,
  50    pub resume_thread_metadata: Option<AgentSessionInfo>,
  51    pub _cancel_task: Option<Task<()>>,
  52    pub skip_queue_processing_count: usize,
  53    pub user_interrupted_generation: bool,
  54    pub can_fast_track_queue: bool,
  55    pub hovered_edited_file_buttons: Option<usize>,
  56    pub _subscriptions: Vec<Subscription>,
  57}
  58
  59#[derive(Default)]
  60pub struct TurnFields {
  61    pub _turn_timer_task: Option<Task<()>>,
  62    pub last_turn_duration: Option<Duration>,
  63    pub last_turn_tokens: Option<u64>,
  64    pub turn_generation: usize,
  65    pub turn_started_at: Option<Instant>,
  66    pub turn_tokens: Option<u64>,
  67}
  68
  69impl ActiveThreadState {
  70    pub fn new(
  71        thread: Entity<AcpThread>,
  72        workspace: WeakEntity<Workspace>,
  73        entry_view_state: Entity<EntryViewState>,
  74        title_editor: Option<Entity<Editor>>,
  75        config_options_view: Option<Entity<ConfigOptionsView>>,
  76        mode_selector: Option<Entity<ModeSelector>>,
  77        model_selector: Option<Entity<AcpModelSelectorPopover>>,
  78        profile_selector: Option<Entity<ProfileSelector>>,
  79        list_state: ListState,
  80        prompt_capabilities: Rc<RefCell<PromptCapabilities>>,
  81        available_commands: Rc<RefCell<Vec<agent_client_protocol::AvailableCommand>>>,
  82        cached_user_commands: Rc<RefCell<HashMap<String, UserSlashCommand>>>,
  83        cached_user_command_errors: Rc<RefCell<Vec<CommandLoadError>>>,
  84        resumed_without_history: bool,
  85        resume_thread_metadata: Option<AgentSessionInfo>,
  86        subscriptions: Vec<Subscription>,
  87    ) -> Self {
  88        Self {
  89            thread,
  90            workspace,
  91            entry_view_state,
  92            title_editor,
  93            config_options_view,
  94            mode_selector,
  95            model_selector,
  96            profile_selector,
  97            list_state,
  98            prompt_capabilities,
  99            available_commands,
 100            cached_user_commands,
 101            cached_user_command_errors,
 102            resumed_without_history,
 103            resume_thread_metadata,
 104            command_load_errors_dismissed: false,
 105            _subscriptions: subscriptions,
 106            permission_dropdown_handle: PopoverMenuHandle::default(),
 107            thread_retry_status: None,
 108            thread_error: None,
 109            thread_error_markdown: None,
 110            token_limit_callout_dismissed: false,
 111            thread_feedback: Default::default(),
 112            expanded_tool_calls: HashSet::default(),
 113            expanded_tool_call_raw_inputs: HashSet::default(),
 114            expanded_thinking_blocks: HashSet::default(),
 115            expanded_subagents: HashSet::default(),
 116            subagent_scroll_handles: RefCell::new(HashMap::default()),
 117            edits_expanded: false,
 118            plan_expanded: false,
 119            queue_expanded: true,
 120            editor_expanded: false,
 121            should_be_following: false,
 122            editing_message: None,
 123            local_queued_messages: Vec::new(),
 124            queued_message_editors: Vec::new(),
 125            queued_message_editor_subscriptions: Vec::new(),
 126            last_synced_queue_length: 0,
 127            turn_fields: TurnFields::default(),
 128            discarded_partial_edits: HashSet::default(),
 129            is_loading_contents: false,
 130            new_server_version_available: None,
 131            selected_permission_granularity: HashMap::default(),
 132            _cancel_task: None,
 133            skip_queue_processing_count: 0,
 134            user_interrupted_generation: false,
 135            can_fast_track_queue: false,
 136            hovered_edited_file_buttons: None,
 137        }
 138    }
 139
 140    pub(crate) fn as_native_connection(
 141        &self,
 142        cx: &App,
 143    ) -> Option<Rc<agent::NativeAgentConnection>> {
 144        let acp_thread = self.thread.read(cx);
 145        acp_thread.connection().clone().downcast()
 146    }
 147
 148    pub(crate) fn as_native_thread(&self, cx: &App) -> Option<Entity<agent::Thread>> {
 149        let acp_thread = self.thread.read(cx);
 150        self.as_native_connection(cx)?
 151            .thread(acp_thread.session_id(), cx)
 152    }
 153
 154    pub fn current_model_id(&self, cx: &App) -> Option<String> {
 155        let selector = self.model_selector.as_ref()?;
 156        let model = selector.read(cx).active_model(cx)?;
 157        Some(model.id.to_string())
 158    }
 159
 160    pub fn current_mode_id(&self, cx: &App) -> Option<Arc<str>> {
 161        if let Some(thread) = self.as_native_thread(cx) {
 162            Some(thread.read(cx).profile().0.clone())
 163        } else {
 164            let mode_selector = self.mode_selector.as_ref()?;
 165            Some(mode_selector.read(cx).mode().0)
 166        }
 167    }
 168
 169    pub fn has_queued_messages(&self) -> bool {
 170        !self.local_queued_messages.is_empty()
 171    }
 172
 173    pub fn is_imported_thread(&self, cx: &App) -> bool {
 174        let Some(thread) = self.as_native_thread(cx) else {
 175            return false;
 176        };
 177        thread.read(cx).is_imported()
 178    }
 179
 180    // turns
 181
 182    pub fn start_turn(&mut self, cx: &mut Context<AcpThreadView>) -> usize {
 183        self.turn_fields.turn_generation += 1;
 184        let generation = self.turn_fields.turn_generation;
 185        self.turn_fields.turn_started_at = Some(Instant::now());
 186        self.turn_fields.last_turn_duration = None;
 187        self.turn_fields.last_turn_tokens = None;
 188        self.turn_fields.turn_tokens = Some(0);
 189        self.turn_fields._turn_timer_task = Some(cx.spawn(async move |this, cx| {
 190            loop {
 191                cx.background_executor().timer(Duration::from_secs(1)).await;
 192                if this.update(cx, |_, cx| cx.notify()).is_err() {
 193                    break;
 194                }
 195            }
 196        }));
 197        generation
 198    }
 199
 200    pub fn stop_turn(&mut self, generation: usize) {
 201        if self.turn_fields.turn_generation != generation {
 202            return;
 203        }
 204        self.turn_fields.last_turn_duration = self
 205            .turn_fields
 206            .turn_started_at
 207            .take()
 208            .map(|started| started.elapsed());
 209        self.turn_fields.last_turn_tokens = self.turn_fields.turn_tokens.take();
 210        self.turn_fields._turn_timer_task = None;
 211    }
 212
 213    pub fn update_turn_tokens(&mut self, cx: &App) {
 214        if let Some(usage) = self.thread.read(cx).token_usage() {
 215            if let Some(tokens) = &mut self.turn_fields.turn_tokens {
 216                *tokens += usage.output_tokens;
 217            }
 218        }
 219    }
 220
 221    // sending
 222
 223    pub fn send(
 224        &mut self,
 225        message_editor: Entity<MessageEditor>,
 226        agent: Rc<dyn AgentServer>,
 227        login: Option<task::SpawnInTerminal>,
 228        window: &mut Window,
 229        cx: &mut Context<AcpThreadView>,
 230    ) {
 231        let thread = &self.thread;
 232
 233        if self.is_loading_contents {
 234            return;
 235        }
 236
 237        let is_editor_empty = message_editor.read(cx).is_empty(cx);
 238        let is_generating = thread.read(cx).status() != ThreadStatus::Idle;
 239
 240        let has_queued = self.has_queued_messages();
 241        if is_editor_empty && self.can_fast_track_queue && has_queued {
 242            self.can_fast_track_queue = false;
 243            self.send_queued_message_at_index(0, true, window, cx);
 244            return;
 245        }
 246
 247        if is_editor_empty {
 248            return;
 249        }
 250
 251        if is_generating {
 252            self.queue_message(message_editor, window, cx);
 253            return;
 254        }
 255
 256        let text = message_editor.read(cx).text(cx);
 257        let text = text.trim();
 258        if text == "/login" || text == "/logout" {
 259            let connection = thread.read(cx).connection().clone();
 260            let can_login = !connection.auth_methods().is_empty() || login.is_some();
 261            // Does the agent have a specific logout command? Prefer that in case they need to reset internal state.
 262            let logout_supported = text == "/logout"
 263                && self
 264                    .available_commands
 265                    .borrow()
 266                    .iter()
 267                    .any(|command| command.name == "logout");
 268            if can_login && !logout_supported {
 269                message_editor.update(cx, |editor, cx| editor.clear(window, cx));
 270
 271                let this = cx.weak_entity();
 272                let agent = agent.clone();
 273                window.defer(cx, |window, cx| {
 274                    AcpThreadView::handle_auth_required(
 275                        this,
 276                        AuthRequired::new(),
 277                        agent,
 278                        connection,
 279                        window,
 280                        cx,
 281                    );
 282                });
 283                cx.notify();
 284                return;
 285            }
 286        }
 287
 288        self.send_impl(message_editor, window, cx)
 289    }
 290
 291    pub fn send_impl(
 292        &mut self,
 293        message_editor: Entity<MessageEditor>,
 294        window: &mut Window,
 295        cx: &mut Context<AcpThreadView>,
 296    ) {
 297        let full_mention_content = self.as_native_thread(cx).is_some_and(|thread| {
 298            // Include full contents when using minimal profile
 299            let thread = thread.read(cx);
 300            AgentSettings::get_global(cx)
 301                .profiles
 302                .get(thread.profile())
 303                .is_some_and(|profile| profile.tools.is_empty())
 304        });
 305
 306        let cached_commands = &self.cached_user_commands;
 307        let cached_errors = &self.cached_user_command_errors;
 308        let contents = message_editor.update(cx, |message_editor, cx| {
 309            message_editor.contents_with_cache(
 310                full_mention_content,
 311                Some(cached_commands.borrow().clone()),
 312                Some(cached_errors.borrow().clone()),
 313                cx,
 314            )
 315        });
 316
 317        self.thread_error.take();
 318        self.thread_feedback.clear();
 319        self.editing_message.take();
 320
 321        if self.should_be_following {
 322            self.workspace
 323                .update(cx, |workspace, cx| {
 324                    workspace.follow(CollaboratorId::Agent, window, cx);
 325                })
 326                .ok();
 327        }
 328
 329        let contents_task = cx.spawn_in(window, async move |_this, cx| {
 330            let (contents, tracked_buffers) = contents.await?;
 331
 332            if contents.is_empty() {
 333                return Ok(None);
 334            }
 335
 336            let _ = cx.update(|window, cx| {
 337                message_editor.update(cx, |message_editor, cx| {
 338                    message_editor.clear(window, cx);
 339                });
 340            });
 341
 342            Ok(Some((contents, tracked_buffers)))
 343        });
 344
 345        self.send_content(contents_task, window, cx);
 346    }
 347
 348    pub fn send_content(
 349        &mut self,
 350        contents_task: Task<anyhow::Result<Option<(Vec<acp::ContentBlock>, Vec<Entity<Buffer>>)>>>,
 351        window: &mut Window,
 352        cx: &mut Context<AcpThreadView>,
 353    ) {
 354        let session_id = self.thread.read(cx).session_id().clone();
 355        let agent_telemetry_id = self.thread.read(cx).connection().telemetry_id();
 356        let thread = self.thread.downgrade();
 357
 358        self.is_loading_contents = true;
 359
 360        let model_id = self.current_model_id(cx);
 361        let mode_id = self.current_mode_id(cx);
 362        let guard = cx.new(|_| ());
 363        cx.observe_release(&guard, |this, _guard, cx| {
 364            if let ThreadState::Active(ActiveThreadState {
 365                is_loading_contents,
 366                ..
 367            }) = &mut this.thread_state
 368            {
 369                *is_loading_contents = false;
 370            }
 371            cx.notify();
 372        })
 373        .detach();
 374
 375        let task = cx.spawn_in(window, async move |this, cx| {
 376            let Some((contents, tracked_buffers)) = contents_task.await? else {
 377                return Ok(());
 378            };
 379
 380            let generation = this.update_in(cx, |this, _window, cx| {
 381                this.in_flight_prompt = Some(contents.clone());
 382                let generation = this.start_turn(cx);
 383                this.set_editor_is_expanded(false, cx);
 384                this.scroll_to_bottom(cx);
 385                generation
 386            })?;
 387
 388            let _stop_turn = defer({
 389                let this = this.clone();
 390                let mut cx = cx.clone();
 391                move || {
 392                    this.update(&mut cx, |this, cx| {
 393                        this.stop_turn(generation);
 394                        cx.notify();
 395                    })
 396                    .ok();
 397                }
 398            });
 399            let turn_start_time = Instant::now();
 400            let send = thread.update(cx, |thread, cx| {
 401                thread.action_log().update(cx, |action_log, cx| {
 402                    for buffer in tracked_buffers {
 403                        action_log.buffer_read(buffer, cx)
 404                    }
 405                });
 406                drop(guard);
 407
 408                telemetry::event!(
 409                    "Agent Message Sent",
 410                    agent = agent_telemetry_id,
 411                    session = session_id,
 412                    model = model_id,
 413                    mode = mode_id
 414                );
 415
 416                thread.send(contents, cx)
 417            })?;
 418            let res = send.await;
 419            let turn_time_ms = turn_start_time.elapsed().as_millis();
 420            drop(_stop_turn);
 421            let status = if res.is_ok() {
 422                this.update(cx, |this, _| this.in_flight_prompt.take()).ok();
 423                "success"
 424            } else {
 425                "failure"
 426            };
 427            telemetry::event!(
 428                "Agent Turn Completed",
 429                agent = agent_telemetry_id,
 430                session = session_id,
 431                model = model_id,
 432                mode = mode_id,
 433                status,
 434                turn_time_ms,
 435            );
 436            res
 437        });
 438
 439        cx.spawn(async move |this, cx| {
 440            if let Err(err) = task.await {
 441                this.update(cx, |this, cx| {
 442                    this.handle_thread_error(err, cx);
 443                })
 444                .ok();
 445            } else {
 446                this.update(cx, |this, cx| {
 447                    if let ThreadState::Active(ActiveThreadState {
 448                        should_be_following,
 449                        ..
 450                    }) = &mut this.thread_state
 451                    {
 452                        *should_be_following = this
 453                            .workspace
 454                            .update(cx, |workspace, _| {
 455                                workspace.is_being_followed(CollaboratorId::Agent)
 456                            })
 457                            .unwrap_or_default();
 458                    }
 459                })
 460                .ok();
 461            }
 462        })
 463        .detach();
 464    }
 465
 466    pub fn interrupt_and_send(
 467        &mut self,
 468        message_editor: Entity<MessageEditor>,
 469        window: &mut Window,
 470        cx: &mut Context<AcpThreadView>,
 471    ) {
 472        let thread = &self.thread;
 473
 474        if self.is_loading_contents {
 475            return;
 476        }
 477
 478        if thread.read(cx).status() == ThreadStatus::Idle {
 479            self.send_impl(message_editor, window, cx);
 480            return;
 481        }
 482
 483        self.stop_current_and_send_new_message(window, cx);
 484    }
 485
 486    pub fn stop_current_and_send_new_message(
 487        &mut self,
 488        window: &mut Window,
 489        cx: &mut Context<AcpThreadView>,
 490    ) {
 491        let thread = self.thread.clone();
 492        self.skip_queue_processing_count = 0;
 493        self.user_interrupted_generation = true;
 494
 495        let cancelled = thread.update(cx, |thread, cx| thread.cancel(cx));
 496
 497        cx.spawn_in(window, async move |this, cx| {
 498            cancelled.await;
 499
 500            this.update_in(cx, |this, window, cx| {
 501                this.send_impl(this.message_editor.clone(), window, cx);
 502            })
 503            .ok();
 504        })
 505        .detach();
 506    }
 507
 508    // generation
 509
 510    pub fn cancel_generation(&mut self, cx: &mut Context<AcpThreadView>) {
 511        self.thread_retry_status.take();
 512        self.thread_error.take();
 513        self.user_interrupted_generation = true;
 514        self._cancel_task = Some(self.thread.update(cx, |thread, cx| thread.cancel(cx)));
 515    }
 516
 517    pub fn retry_generation(&mut self, cx: &mut Context<AcpThreadView>) {
 518        self.thread_error.take();
 519
 520        let thread = &self.thread;
 521        if !thread.read(cx).can_retry(cx) {
 522            return;
 523        }
 524
 525        let task = thread.update(cx, |thread, cx| thread.retry(cx));
 526        cx.spawn(async move |this, cx| {
 527            let result = task.await;
 528
 529            this.update(cx, |this, cx| {
 530                if let Err(err) = result {
 531                    this.handle_thread_error(err, cx);
 532                }
 533            })
 534        })
 535        .detach();
 536    }
 537
 538    pub fn regenerate(
 539        &mut self,
 540        entry_ix: usize,
 541        message_editor: Entity<MessageEditor>,
 542        window: &mut Window,
 543        cx: &mut Context<AcpThreadView>,
 544    ) {
 545        if self.is_loading_contents {
 546            return;
 547        }
 548        let thread = self.thread.clone();
 549
 550        let Some(user_message_id) = thread.update(cx, |thread, _| {
 551            thread.entries().get(entry_ix)?.user_message()?.id.clone()
 552        }) else {
 553            return;
 554        };
 555
 556        cx.spawn_in(window, async move |this, cx| {
 557            // Check if there are any edits from prompts before the one being regenerated.
 558            //
 559            // If there are, we keep/accept them since we're not regenerating the prompt that created them.
 560            //
 561            // If editing the prompt that generated the edits, they are auto-rejected
 562            // through the `rewind` function in the `acp_thread`.
 563            let has_earlier_edits = thread.read_with(cx, |thread, _| {
 564                thread
 565                    .entries()
 566                    .iter()
 567                    .take(entry_ix)
 568                    .any(|entry| entry.diffs().next().is_some())
 569            });
 570
 571            if has_earlier_edits {
 572                thread.update(cx, |thread, cx| {
 573                    thread.action_log().update(cx, |action_log, cx| {
 574                        action_log.keep_all_edits(None, cx);
 575                    });
 576                });
 577            }
 578
 579            thread
 580                .update(cx, |thread, cx| thread.rewind(user_message_id, cx))
 581                .await?;
 582            this.update_in(cx, |this, window, cx| {
 583                this.send_impl(message_editor, window, cx);
 584                this.focus_handle(cx).focus(window, cx);
 585            })?;
 586            anyhow::Ok(())
 587        })
 588        .detach_and_log_err(cx);
 589    }
 590
 591    // message queueing
 592
 593    pub fn queue_message(
 594        &mut self,
 595        message_editor: Entity<MessageEditor>,
 596        window: &mut Window,
 597        cx: &mut Context<AcpThreadView>,
 598    ) {
 599        let is_idle = self.thread.read(cx).status() == acp_thread::ThreadStatus::Idle;
 600
 601        if is_idle {
 602            self.send_impl(message_editor.clone(), window, cx);
 603            return;
 604        }
 605
 606        let full_mention_content = self.as_native_thread(cx).is_some_and(|thread| {
 607            let thread = thread.read(cx);
 608            AgentSettings::get_global(cx)
 609                .profiles
 610                .get(thread.profile())
 611                .is_some_and(|profile| profile.tools.is_empty())
 612        });
 613
 614        let cached_commands = self.cached_user_commands.borrow().clone();
 615        let cached_errors = self.cached_user_command_errors.borrow().clone();
 616        let contents = message_editor.update(cx, |message_editor, cx| {
 617            message_editor.contents_with_cache(
 618                full_mention_content,
 619                Some(cached_commands),
 620                Some(cached_errors),
 621                cx,
 622            )
 623        });
 624
 625        cx.spawn_in(window, async move |this, cx| {
 626            let (content, tracked_buffers) = contents.await?;
 627
 628            if content.is_empty() {
 629                return Ok::<(), anyhow::Error>(());
 630            }
 631
 632            this.update_in(cx, |this, window, cx| {
 633                this.add_to_queue(content, tracked_buffers, cx);
 634                // Enable fast-track: user can press Enter again to send this queued message immediately
 635                this.set_can_fast_track_queue(true);
 636                message_editor.update(cx, |message_editor, cx| {
 637                    message_editor.clear(window, cx);
 638                });
 639                cx.notify();
 640            })?;
 641            Ok(())
 642        })
 643        .detach_and_log_err(cx);
 644    }
 645
 646    pub fn remove_from_queue(
 647        &mut self,
 648        index: usize,
 649        cx: &mut Context<AcpThreadView>,
 650    ) -> Option<QueuedMessage> {
 651        if index < self.local_queued_messages.len() {
 652            let removed = self.local_queued_messages.remove(index);
 653            self.sync_queue_flag_to_native_thread(cx);
 654            Some(removed)
 655        } else {
 656            None
 657        }
 658    }
 659
 660    pub fn sync_queue_flag_to_native_thread(&self, cx: &mut Context<AcpThreadView>) {
 661        if let Some(native_thread) = self.as_native_thread(cx) {
 662            let has_queued = self.has_queued_messages();
 663            native_thread.update(cx, |thread, _| {
 664                thread.set_has_queued_message(has_queued);
 665            });
 666        }
 667    }
 668
 669    pub fn send_queued_message_at_index(
 670        &mut self,
 671        index: usize,
 672        is_send_now: bool,
 673        window: &mut Window,
 674        cx: &mut Context<AcpThreadView>,
 675    ) {
 676        let Some(queued) = self.remove_from_queue(index, cx) else {
 677            return;
 678        };
 679        let content = queued.content;
 680        let tracked_buffers = queued.tracked_buffers;
 681
 682        // Only increment skip count for "Send Now" operations (out-of-order sends)
 683        // Normal auto-processing from the Stopped handler doesn't need to skip.
 684        // We only skip the Stopped event from the cancelled generation, NOT the
 685        // Stopped event from the newly sent message (which should trigger queue processing).
 686        if is_send_now {
 687            let is_generating =
 688                self.thread.read(cx).status() == acp_thread::ThreadStatus::Generating;
 689            self.skip_queue_processing_count += if is_generating { 1 } else { 0 };
 690        }
 691
 692        let cancelled = self.thread.update(cx, |thread, cx| thread.cancel(cx));
 693
 694        let workspace = self.workspace.clone();
 695
 696        let should_be_following = self.should_be_following;
 697        let contents_task = cx.spawn_in(window, async move |_this, cx| {
 698            cancelled.await;
 699            if should_be_following {
 700                workspace
 701                    .update_in(cx, |workspace, window, cx| {
 702                        workspace.follow(CollaboratorId::Agent, window, cx);
 703                    })
 704                    .ok();
 705            }
 706
 707            Ok(Some((content, tracked_buffers)))
 708        });
 709
 710        self.send_content(contents_task, window, cx);
 711    }
 712
 713    // editor methods
 714
 715    pub fn expand_message_editor(
 716        &mut self,
 717        message_editor: Entity<MessageEditor>,
 718        cx: &mut Context<AcpThreadView>,
 719    ) {
 720        self.set_editor_is_expanded(!self.editor_expanded, message_editor, cx);
 721        cx.stop_propagation();
 722        cx.notify();
 723    }
 724
 725    pub fn set_editor_is_expanded(
 726        &mut self,
 727        is_expanded: bool,
 728        message_editor: Entity<MessageEditor>,
 729        cx: &mut Context<AcpThreadView>,
 730    ) {
 731        self.editor_expanded = is_expanded;
 732        message_editor.update(cx, |editor, cx| {
 733            if is_expanded {
 734                editor.set_mode(
 735                    EditorMode::Full {
 736                        scale_ui_elements_with_buffer_font_size: false,
 737                        show_active_line_background: false,
 738                        sizing_behavior: SizingBehavior::ExcludeOverscrollMargin,
 739                    },
 740                    cx,
 741                )
 742            } else {
 743                let agent_settings = AgentSettings::get_global(cx);
 744                editor.set_mode(
 745                    EditorMode::AutoHeight {
 746                        min_lines: agent_settings.message_editor_min_lines,
 747                        max_lines: Some(agent_settings.set_message_editor_max_lines()),
 748                    },
 749                    cx,
 750                )
 751            }
 752        });
 753        cx.notify();
 754    }
 755
 756    pub fn handle_title_editor_event(
 757        &mut self,
 758        title_editor: &Entity<Editor>,
 759        event: &EditorEvent,
 760        window: &mut Window,
 761        cx: &mut Context<AcpThreadView>,
 762    ) {
 763        let thread = &self.thread;
 764
 765        match event {
 766            EditorEvent::BufferEdited => {
 767                let new_title = title_editor.read(cx).text(cx);
 768                thread.update(cx, |thread, cx| {
 769                    thread
 770                        .set_title(new_title.into(), cx)
 771                        .detach_and_log_err(cx);
 772                })
 773            }
 774            EditorEvent::Blurred => {
 775                if title_editor.read(cx).text(cx).is_empty() {
 776                    title_editor.update(cx, |editor, cx| {
 777                        editor.set_text("New Thread", window, cx);
 778                    });
 779                }
 780            }
 781            _ => {}
 782        }
 783    }
 784
 785    pub fn cancel_editing(
 786        &mut self,
 787        focus_handle: FocusHandle,
 788        window: &mut Window,
 789        cx: &mut Context<AcpThreadView>,
 790    ) {
 791        if let Some(index) = self.editing_message.take()
 792            && let Some(editor) = &self
 793                .entry_view_state
 794                .read(cx)
 795                .entry(index)
 796                .and_then(|e| e.message_editor())
 797                .cloned()
 798        {
 799            editor.update(cx, |editor, cx| {
 800                if let Some(user_message) = self
 801                    .thread
 802                    .read(cx)
 803                    .entries()
 804                    .get(index)
 805                    .and_then(|e| e.user_message())
 806                {
 807                    editor.set_message(user_message.chunks.clone(), window, cx);
 808                }
 809            })
 810        };
 811        focus_handle.focus(window, cx);
 812        cx.notify();
 813    }
 814
 815    // tool permissions
 816
 817    pub fn authorize_tool_call(
 818        &mut self,
 819        tool_call_id: acp::ToolCallId,
 820        option_id: acp::PermissionOptionId,
 821        option_kind: acp::PermissionOptionKind,
 822        window: &mut Window,
 823        cx: &mut Context<AcpThreadView>,
 824    ) {
 825        let thread = &self.thread;
 826        let agent_telemetry_id = thread.read(cx).connection().telemetry_id();
 827
 828        telemetry::event!(
 829            "Agent Tool Call Authorized",
 830            agent = agent_telemetry_id,
 831            session = thread.read(cx).session_id(),
 832            option = option_kind
 833        );
 834
 835        thread.update(cx, |thread, cx| {
 836            thread.authorize_tool_call(tool_call_id, option_id, option_kind, cx);
 837        });
 838        if self.should_be_following {
 839            self.workspace
 840                .update(cx, |workspace, cx| {
 841                    workspace.follow(CollaboratorId::Agent, window, cx);
 842                })
 843                .ok();
 844        }
 845        cx.notify();
 846    }
 847
 848    pub fn authorize_pending_tool_call(
 849        &mut self,
 850        kind: acp::PermissionOptionKind,
 851        window: &mut Window,
 852        cx: &mut Context<AcpThreadView>,
 853    ) -> Option<()> {
 854        let thread = self.thread.read(cx);
 855        let tool_call = thread.first_tool_awaiting_confirmation()?;
 856        let ToolCallStatus::WaitingForConfirmation { options, .. } = &tool_call.status else {
 857            return None;
 858        };
 859        let option = options.first_option_of_kind(kind)?;
 860
 861        self.authorize_tool_call(
 862            tool_call.id.clone(),
 863            option.option_id.clone(),
 864            option.kind,
 865            window,
 866            cx,
 867        );
 868
 869        Some(())
 870    }
 871
 872    pub fn handle_select_permission_granularity(
 873        &mut self,
 874        action: &SelectPermissionGranularity,
 875        cx: &mut Context<AcpThreadView>,
 876    ) {
 877        let tool_call_id = acp::ToolCallId::new(action.tool_call_id.clone());
 878        self.selected_permission_granularity
 879            .insert(tool_call_id, action.index);
 880
 881        cx.notify();
 882    }
 883
 884    // edits
 885
 886    pub fn keep_all(&mut self, cx: &mut Context<AcpThreadView>) {
 887        let thread = &self.thread;
 888        let telemetry = ActionLogTelemetry::from(thread.read(cx));
 889        let action_log = thread.read(cx).action_log().clone();
 890        action_log.update(cx, |action_log, cx| {
 891            action_log.keep_all_edits(Some(telemetry), cx)
 892        });
 893    }
 894
 895    pub fn reject_all(&mut self, cx: &mut Context<AcpThreadView>) {
 896        let thread = &self.thread;
 897        let telemetry = ActionLogTelemetry::from(thread.read(cx));
 898        let action_log = thread.read(cx).action_log().clone();
 899        action_log
 900            .update(cx, |action_log, cx| {
 901                action_log.reject_all_edits(Some(telemetry), cx)
 902            })
 903            .detach();
 904    }
 905
 906    pub fn open_edited_buffer(
 907        &mut self,
 908        buffer: &Entity<Buffer>,
 909        window: &mut Window,
 910        cx: &mut Context<AcpThreadView>,
 911    ) {
 912        let thread = &self.thread;
 913
 914        let Some(diff) =
 915            AgentDiffPane::deploy(thread.clone(), self.workspace.clone(), window, cx).log_err()
 916        else {
 917            return;
 918        };
 919
 920        diff.update(cx, |diff, cx| {
 921            diff.move_to_path(PathKey::for_buffer(buffer, cx), window, cx)
 922        })
 923    }
 924
 925    // thread stuff
 926
 927    pub fn sync_thread(
 928        &mut self,
 929        project: Entity<Project>,
 930        window: &mut Window,
 931        cx: &mut Context<AcpThreadView>,
 932    ) {
 933        if !self.is_imported_thread(cx) {
 934            return;
 935        }
 936
 937        let Some(session_list) = self
 938            .as_native_connection(cx)
 939            .and_then(|connection| connection.session_list(cx))
 940            .and_then(|list| list.downcast::<NativeAgentSessionList>())
 941        else {
 942            return;
 943        };
 944        let thread_store = session_list.thread_store().clone();
 945
 946        let client = project.read(cx).client();
 947        let session_id = self.thread.read(cx).session_id().clone();
 948
 949        cx.spawn_in(window, async move |this, cx| {
 950            let response = client
 951                .request(proto::GetSharedAgentThread {
 952                    session_id: session_id.to_string(),
 953                })
 954                .await?;
 955
 956            let shared_thread = SharedThread::from_bytes(&response.thread_data)?;
 957
 958            let db_thread = shared_thread.to_db_thread();
 959
 960            thread_store
 961                .update(&mut cx.clone(), |store, cx| {
 962                    store.save_thread(session_id.clone(), db_thread, cx)
 963                })
 964                .await?;
 965
 966            let thread_metadata = AgentSessionInfo {
 967                session_id,
 968                cwd: None,
 969                title: Some(format!("🔗 {}", response.title).into()),
 970                updated_at: Some(chrono::Utc::now()),
 971                meta: None,
 972            };
 973
 974            this.update_in(cx, |this, window, cx| {
 975                if let ThreadState::Active(ActiveThreadState {
 976                    resume_thread_metadata,
 977                    ..
 978                }) = &mut this.thread_state
 979                {
 980                    *resume_thread_metadata = Some(thread_metadata);
 981                }
 982                this.reset(window, cx);
 983            })?;
 984
 985            this.update_in(cx, |this, _window, cx| {
 986                if let Some(workspace) = this.workspace.upgrade() {
 987                    workspace.update(cx, |workspace, cx| {
 988                        struct ThreadSyncedToast;
 989                        workspace.show_toast(
 990                            Toast::new(
 991                                NotificationId::unique::<ThreadSyncedToast>(),
 992                                "Thread synced with latest version",
 993                            )
 994                            .autohide(),
 995                            cx,
 996                        );
 997                    });
 998                }
 999            })?;
1000
1001            anyhow::Ok(())
1002        })
1003        .detach_and_log_err(cx);
1004    }
1005
1006    pub fn restore_checkpoint(
1007        &mut self,
1008        message_id: &UserMessageId,
1009        cx: &mut Context<AcpThreadView>,
1010    ) {
1011        self.thread
1012            .update(cx, |thread, cx| {
1013                thread.restore_checkpoint(message_id.clone(), cx)
1014            })
1015            .detach_and_log_err(cx);
1016    }
1017
1018    pub fn clear_thread_error(&mut self, cx: &mut Context<AcpThreadView>) {
1019        self.thread_error = None;
1020        self.thread_error_markdown = None;
1021        self.token_limit_callout_dismissed = true;
1022        cx.notify();
1023    }
1024
1025    // other
1026
1027    pub fn refresh_cached_user_commands_from_registry(
1028        &mut self,
1029        registry: &Entity<SlashCommandRegistry>,
1030        cx: &App,
1031    ) {
1032        let (mut commands, mut errors) = registry.read_with(cx, |registry, _| {
1033            (registry.commands().clone(), registry.errors().to_vec())
1034        });
1035        let server_command_names = self
1036            .available_commands
1037            .borrow()
1038            .iter()
1039            .map(|command| command.name.clone())
1040            .collect::<HashSet<_>>();
1041        user_slash_command::apply_server_command_conflicts_to_map(
1042            &mut commands,
1043            &mut errors,
1044            &server_command_names,
1045        );
1046
1047        self.command_load_errors_dismissed = false;
1048        *self.cached_user_commands.borrow_mut() = commands;
1049        *self.cached_user_command_errors.borrow_mut() = errors;
1050    }
1051
1052    pub fn render_command_load_errors(
1053        &self,
1054        cx: &mut Context<AcpThreadView>,
1055    ) -> Option<impl IntoElement> {
1056        let errors = self.cached_user_command_errors.borrow();
1057
1058        if self.command_load_errors_dismissed || errors.is_empty() {
1059            return None;
1060        }
1061
1062        let workspace = self.workspace.clone();
1063
1064        let error_count = errors.len();
1065        let title = if error_count == 1 {
1066            "Failed to load slash command"
1067        } else {
1068            "Failed to load slash commands"
1069        };
1070
1071        Some(
1072            Callout::new()
1073                .icon(IconName::Warning)
1074                .severity(Severity::Warning)
1075                .title(title)
1076                .actions_slot(
1077                    IconButton::new("dismiss-command-errors", IconName::Close)
1078                        .icon_size(IconSize::Small)
1079                        .icon_color(Color::Muted)
1080                        .tooltip(Tooltip::text("Dismiss Error"))
1081                        .on_click(cx.listener(|this, _, _, cx| {
1082                            this.clear_command_load_errors(cx);
1083                        })),
1084                )
1085                .description_slot(v_flex().children(errors.iter().enumerate().map({
1086                    move |(i, error)| {
1087                        let path = error.path.clone();
1088                        let workspace = workspace.clone();
1089                        let file_name = error
1090                            .path
1091                            .file_name()
1092                            .map(|n| n.to_string_lossy().to_string())
1093                            .unwrap_or_else(|| error.path.display().to_string());
1094                        let id = ElementId::Name(format!("command-error-{i}").into());
1095                        let label = format!("{}: {}", file_name, error.message);
1096
1097                        Button::new(id, label)
1098                            .label_size(LabelSize::Small)
1099                            .truncate(true)
1100                            .tooltip({
1101                                let message: SharedString = error.message.clone().into();
1102                                let path: SharedString = error.path.display().to_string().into();
1103                                move |_, cx| {
1104                                    Tooltip::with_meta(message.clone(), None, path.clone(), cx)
1105                                }
1106                            })
1107                            .on_click({
1108                                move |_, window, cx| {
1109                                    if let Some(workspace) = workspace.upgrade() {
1110                                        workspace.update(cx, |workspace, cx| {
1111                                            workspace
1112                                                .open_abs_path(
1113                                                    path.clone(),
1114                                                    OpenOptions::default(),
1115                                                    window,
1116                                                    cx,
1117                                                )
1118                                                .detach_and_log_err(cx);
1119                                        });
1120                                    }
1121                                }
1122                            })
1123                    }
1124                }))),
1125        )
1126    }
1127
1128    pub fn render_thread_retry_status_callout(&self) -> Option<Callout> {
1129        let state = self.thread_retry_status.as_ref()?;
1130
1131        let next_attempt_in = state
1132            .duration
1133            .saturating_sub(Instant::now().saturating_duration_since(state.started_at));
1134        if next_attempt_in.is_zero() {
1135            return None;
1136        }
1137
1138        let next_attempt_in_secs = next_attempt_in.as_secs() + 1;
1139
1140        let retry_message = if state.max_attempts == 1 {
1141            if next_attempt_in_secs == 1 {
1142                "Retrying. Next attempt in 1 second.".to_string()
1143            } else {
1144                format!("Retrying. Next attempt in {next_attempt_in_secs} seconds.")
1145            }
1146        } else if next_attempt_in_secs == 1 {
1147            format!(
1148                "Retrying. Next attempt in 1 second (Attempt {} of {}).",
1149                state.attempt, state.max_attempts,
1150            )
1151        } else {
1152            format!(
1153                "Retrying. Next attempt in {next_attempt_in_secs} seconds (Attempt {} of {}).",
1154                state.attempt, state.max_attempts,
1155            )
1156        };
1157
1158        Some(
1159            Callout::new()
1160                .icon(IconName::Warning)
1161                .severity(Severity::Warning)
1162                .title(state.last_error.clone())
1163                .description(retry_message),
1164        )
1165    }
1166
1167    pub fn handle_open_rules(&mut self, window: &mut Window, cx: &mut Context<AcpThreadView>) {
1168        let Some(thread) = self.as_native_thread(cx) else {
1169            return;
1170        };
1171        let project_context = thread.read(cx).project_context().read(cx);
1172
1173        let project_entry_ids = project_context
1174            .worktrees
1175            .iter()
1176            .flat_map(|worktree| worktree.rules_file.as_ref())
1177            .map(|rules_file| ProjectEntryId::from_usize(rules_file.project_entry_id))
1178            .collect::<Vec<_>>();
1179
1180        self.workspace
1181            .update(cx, move |workspace, cx| {
1182                // TODO: Open a multibuffer instead? In some cases this doesn't make the set of rules
1183                // files clear. For example, if rules file 1 is already open but rules file 2 is not,
1184                // this would open and focus rules file 2 in a tab that is not next to rules file 1.
1185                let project = workspace.project().read(cx);
1186                let project_paths = project_entry_ids
1187                    .into_iter()
1188                    .flat_map(|entry_id| project.path_for_entry(entry_id, cx))
1189                    .collect::<Vec<_>>();
1190                for project_path in project_paths {
1191                    workspace
1192                        .open_path(project_path, None, true, window, cx)
1193                        .detach_and_log_err(cx);
1194                }
1195            })
1196            .ok();
1197    }
1198}