active_thread.rs

   1use super::*;
   2
   3pub struct AcpThreadView {
   4    pub id: acp::SessionId,
   5    pub thread: Entity<AcpThread>,
   6    pub workspace: WeakEntity<Workspace>,
   7    pub entry_view_state: Entity<EntryViewState>,
   8    pub title_editor: Option<Entity<Editor>>,
   9    pub config_options_view: Option<Entity<ConfigOptionsView>>,
  10    pub mode_selector: Option<Entity<ModeSelector>>,
  11    pub model_selector: Option<Entity<AcpModelSelectorPopover>>,
  12    pub profile_selector: Option<Entity<ProfileSelector>>,
  13    pub permission_dropdown_handle: PopoverMenuHandle<ContextMenu>,
  14    pub thread_retry_status: Option<RetryStatus>,
  15    pub(super) thread_error: Option<ThreadError>,
  16    pub thread_error_markdown: Option<Entity<Markdown>>,
  17    pub token_limit_callout_dismissed: bool,
  18    pub(super) thread_feedback: ThreadFeedbackState,
  19    pub list_state: ListState,
  20    pub prompt_capabilities: Rc<RefCell<PromptCapabilities>>,
  21    pub available_commands: Rc<RefCell<Vec<agent_client_protocol::AvailableCommand>>>,
  22    pub cached_user_commands: Rc<RefCell<HashMap<String, UserSlashCommand>>>,
  23    pub cached_user_command_errors: Rc<RefCell<Vec<CommandLoadError>>>,
  24    /// Tracks which tool calls have their content/output expanded.
  25    /// Used for showing/hiding tool call results, terminal output, etc.
  26    pub expanded_tool_calls: HashSet<agent_client_protocol::ToolCallId>,
  27    pub expanded_tool_call_raw_inputs: HashSet<agent_client_protocol::ToolCallId>,
  28    pub expanded_thinking_blocks: HashSet<(usize, usize)>,
  29    pub expanded_subagents: HashSet<agent_client_protocol::SessionId>,
  30    pub subagent_scroll_handles: RefCell<HashMap<agent_client_protocol::SessionId, ScrollHandle>>,
  31    pub edits_expanded: bool,
  32    pub plan_expanded: bool,
  33    pub queue_expanded: bool,
  34    pub editor_expanded: bool,
  35    pub should_be_following: bool,
  36    pub editing_message: Option<usize>,
  37    pub local_queued_messages: Vec<QueuedMessage>,
  38    pub queued_message_editors: Vec<Entity<MessageEditor>>,
  39    pub queued_message_editor_subscriptions: Vec<Subscription>,
  40    pub last_synced_queue_length: usize,
  41    pub turn_fields: TurnFields,
  42    pub command_load_errors_dismissed: bool,
  43    pub discarded_partial_edits: HashSet<agent_client_protocol::ToolCallId>,
  44    pub is_loading_contents: bool,
  45    pub new_server_version_available: Option<SharedString>,
  46    pub resumed_without_history: bool,
  47    /// Tracks the selected granularity index for each tool call's permission dropdown.
  48    /// The index corresponds to the position in the allow_options list.
  49    /// Default is the last option (index pointing to "Only this time").
  50    pub selected_permission_granularity: HashMap<agent_client_protocol::ToolCallId, usize>,
  51    pub resume_thread_metadata: Option<AgentSessionInfo>,
  52    pub _cancel_task: Option<Task<()>>,
  53    pub skip_queue_processing_count: usize,
  54    pub user_interrupted_generation: bool,
  55    pub can_fast_track_queue: bool,
  56    pub hovered_edited_file_buttons: Option<usize>,
  57    pub _subscriptions: Vec<Subscription>,
  58}
  59
  60#[derive(Default)]
  61pub struct TurnFields {
  62    pub _turn_timer_task: Option<Task<()>>,
  63    pub last_turn_duration: Option<Duration>,
  64    pub last_turn_tokens: Option<u64>,
  65    pub turn_generation: usize,
  66    pub turn_started_at: Option<Instant>,
  67    pub turn_tokens: Option<u64>,
  68}
  69
  70impl AcpThreadView {
  71    pub fn new(
  72        thread: Entity<AcpThread>,
  73        workspace: WeakEntity<Workspace>,
  74        entry_view_state: Entity<EntryViewState>,
  75        title_editor: Option<Entity<Editor>>,
  76        config_options_view: Option<Entity<ConfigOptionsView>>,
  77        mode_selector: Option<Entity<ModeSelector>>,
  78        model_selector: Option<Entity<AcpModelSelectorPopover>>,
  79        profile_selector: Option<Entity<ProfileSelector>>,
  80        list_state: ListState,
  81        prompt_capabilities: Rc<RefCell<PromptCapabilities>>,
  82        available_commands: Rc<RefCell<Vec<agent_client_protocol::AvailableCommand>>>,
  83        cached_user_commands: Rc<RefCell<HashMap<String, UserSlashCommand>>>,
  84        cached_user_command_errors: Rc<RefCell<Vec<CommandLoadError>>>,
  85        resumed_without_history: bool,
  86        resume_thread_metadata: Option<AgentSessionInfo>,
  87        subscriptions: Vec<Subscription>,
  88        cx: &App,
  89    ) -> Self {
  90        let id = thread.read(cx).session_id().clone();
  91        Self {
  92            id,
  93            thread,
  94            workspace,
  95            entry_view_state,
  96            title_editor,
  97            config_options_view,
  98            mode_selector,
  99            model_selector,
 100            profile_selector,
 101            list_state,
 102            prompt_capabilities,
 103            available_commands,
 104            cached_user_commands,
 105            cached_user_command_errors,
 106            resumed_without_history,
 107            resume_thread_metadata,
 108            command_load_errors_dismissed: false,
 109            _subscriptions: subscriptions,
 110            permission_dropdown_handle: PopoverMenuHandle::default(),
 111            thread_retry_status: None,
 112            thread_error: None,
 113            thread_error_markdown: None,
 114            token_limit_callout_dismissed: false,
 115            thread_feedback: Default::default(),
 116            expanded_tool_calls: HashSet::default(),
 117            expanded_tool_call_raw_inputs: HashSet::default(),
 118            expanded_thinking_blocks: HashSet::default(),
 119            expanded_subagents: HashSet::default(),
 120            subagent_scroll_handles: RefCell::new(HashMap::default()),
 121            edits_expanded: false,
 122            plan_expanded: false,
 123            queue_expanded: true,
 124            editor_expanded: false,
 125            should_be_following: false,
 126            editing_message: None,
 127            local_queued_messages: Vec::new(),
 128            queued_message_editors: Vec::new(),
 129            queued_message_editor_subscriptions: Vec::new(),
 130            last_synced_queue_length: 0,
 131            turn_fields: TurnFields::default(),
 132            discarded_partial_edits: HashSet::default(),
 133            is_loading_contents: false,
 134            new_server_version_available: None,
 135            selected_permission_granularity: HashMap::default(),
 136            _cancel_task: None,
 137            skip_queue_processing_count: 0,
 138            user_interrupted_generation: false,
 139            can_fast_track_queue: false,
 140            hovered_edited_file_buttons: None,
 141        }
 142    }
 143
 144    pub(crate) fn as_native_connection(
 145        &self,
 146        cx: &App,
 147    ) -> Option<Rc<agent::NativeAgentConnection>> {
 148        let acp_thread = self.thread.read(cx);
 149        acp_thread.connection().clone().downcast()
 150    }
 151
 152    pub(crate) fn as_native_thread(&self, cx: &App) -> Option<Entity<agent::Thread>> {
 153        let acp_thread = self.thread.read(cx);
 154        self.as_native_connection(cx)?
 155            .thread(acp_thread.session_id(), cx)
 156    }
 157
 158    pub fn current_model_id(&self, cx: &App) -> Option<String> {
 159        let selector = self.model_selector.as_ref()?;
 160        let model = selector.read(cx).active_model(cx)?;
 161        Some(model.id.to_string())
 162    }
 163
 164    pub fn current_mode_id(&self, cx: &App) -> Option<Arc<str>> {
 165        if let Some(thread) = self.as_native_thread(cx) {
 166            Some(thread.read(cx).profile().0.clone())
 167        } else {
 168            let mode_selector = self.mode_selector.as_ref()?;
 169            Some(mode_selector.read(cx).mode().0)
 170        }
 171    }
 172
 173    pub fn has_queued_messages(&self) -> bool {
 174        !self.local_queued_messages.is_empty()
 175    }
 176
 177    pub fn is_imported_thread(&self, cx: &App) -> bool {
 178        let Some(thread) = self.as_native_thread(cx) else {
 179            return false;
 180        };
 181        thread.read(cx).is_imported()
 182    }
 183
 184    // turns
 185
 186    pub fn start_turn(&mut self, cx: &mut Context<AcpServerView>) -> usize {
 187        self.turn_fields.turn_generation += 1;
 188        let generation = self.turn_fields.turn_generation;
 189        self.turn_fields.turn_started_at = Some(Instant::now());
 190        self.turn_fields.last_turn_duration = None;
 191        self.turn_fields.last_turn_tokens = None;
 192        self.turn_fields.turn_tokens = Some(0);
 193        self.turn_fields._turn_timer_task = Some(cx.spawn(async move |this, cx| {
 194            loop {
 195                cx.background_executor().timer(Duration::from_secs(1)).await;
 196                if this.update(cx, |_, cx| cx.notify()).is_err() {
 197                    break;
 198                }
 199            }
 200        }));
 201        generation
 202    }
 203
 204    pub fn stop_turn(&mut self, generation: usize) {
 205        if self.turn_fields.turn_generation != generation {
 206            return;
 207        }
 208        self.turn_fields.last_turn_duration = self
 209            .turn_fields
 210            .turn_started_at
 211            .take()
 212            .map(|started| started.elapsed());
 213        self.turn_fields.last_turn_tokens = self.turn_fields.turn_tokens.take();
 214        self.turn_fields._turn_timer_task = None;
 215    }
 216
 217    pub fn update_turn_tokens(&mut self, cx: &App) {
 218        if let Some(usage) = self.thread.read(cx).token_usage() {
 219            if let Some(tokens) = &mut self.turn_fields.turn_tokens {
 220                *tokens += usage.output_tokens;
 221            }
 222        }
 223    }
 224
 225    // sending
 226
 227    pub fn send(
 228        &mut self,
 229        message_editor: Entity<MessageEditor>,
 230        agent_name: SharedString,
 231        login: Option<task::SpawnInTerminal>,
 232        window: &mut Window,
 233        cx: &mut Context<AcpServerView>,
 234    ) {
 235        let thread = &self.thread;
 236
 237        if self.is_loading_contents {
 238            return;
 239        }
 240
 241        let is_editor_empty = message_editor.read(cx).is_empty(cx);
 242        let is_generating = thread.read(cx).status() != ThreadStatus::Idle;
 243
 244        let has_queued = self.has_queued_messages();
 245        if is_editor_empty && self.can_fast_track_queue && has_queued {
 246            self.can_fast_track_queue = false;
 247            self.send_queued_message_at_index(0, true, window, cx);
 248            return;
 249        }
 250
 251        if is_editor_empty {
 252            return;
 253        }
 254
 255        if is_generating {
 256            self.queue_message(message_editor, window, cx);
 257            return;
 258        }
 259
 260        let text = message_editor.read(cx).text(cx);
 261        let text = text.trim();
 262        if text == "/login" || text == "/logout" {
 263            let connection = thread.read(cx).connection().clone();
 264            let can_login = !connection.auth_methods().is_empty() || login.is_some();
 265            // Does the agent have a specific logout command? Prefer that in case they need to reset internal state.
 266            let logout_supported = text == "/logout"
 267                && self
 268                    .available_commands
 269                    .borrow()
 270                    .iter()
 271                    .any(|command| command.name == "logout");
 272            if can_login && !logout_supported {
 273                message_editor.update(cx, |editor, cx| editor.clear(window, cx));
 274
 275                let this = cx.weak_entity();
 276                window.defer(cx, |window, cx| {
 277                    AcpServerView::handle_auth_required(
 278                        this,
 279                        AuthRequired::new(),
 280                        agent_name,
 281                        connection,
 282                        window,
 283                        cx,
 284                    );
 285                });
 286                cx.notify();
 287                return;
 288            }
 289        }
 290
 291        self.send_impl(message_editor, window, cx)
 292    }
 293
 294    pub fn send_impl(
 295        &mut self,
 296        message_editor: Entity<MessageEditor>,
 297        window: &mut Window,
 298        cx: &mut Context<AcpServerView>,
 299    ) {
 300        let full_mention_content = self.as_native_thread(cx).is_some_and(|thread| {
 301            // Include full contents when using minimal profile
 302            let thread = thread.read(cx);
 303            AgentSettings::get_global(cx)
 304                .profiles
 305                .get(thread.profile())
 306                .is_some_and(|profile| profile.tools.is_empty())
 307        });
 308
 309        let cached_commands = &self.cached_user_commands;
 310        let cached_errors = &self.cached_user_command_errors;
 311        let contents = message_editor.update(cx, |message_editor, cx| {
 312            message_editor.contents_with_cache(
 313                full_mention_content,
 314                Some(cached_commands.borrow().clone()),
 315                Some(cached_errors.borrow().clone()),
 316                cx,
 317            )
 318        });
 319
 320        self.thread_error.take();
 321        self.thread_feedback.clear();
 322        self.editing_message.take();
 323
 324        if self.should_be_following {
 325            self.workspace
 326                .update(cx, |workspace, cx| {
 327                    workspace.follow(CollaboratorId::Agent, window, cx);
 328                })
 329                .ok();
 330        }
 331
 332        let contents_task = cx.spawn_in(window, async move |_this, cx| {
 333            let (contents, tracked_buffers) = contents.await?;
 334
 335            if contents.is_empty() {
 336                return Ok(None);
 337            }
 338
 339            let _ = cx.update(|window, cx| {
 340                message_editor.update(cx, |message_editor, cx| {
 341                    message_editor.clear(window, cx);
 342                });
 343            });
 344
 345            Ok(Some((contents, tracked_buffers)))
 346        });
 347
 348        self.send_content(contents_task, window, cx);
 349    }
 350
 351    pub fn send_content(
 352        &mut self,
 353        contents_task: Task<anyhow::Result<Option<(Vec<acp::ContentBlock>, Vec<Entity<Buffer>>)>>>,
 354        window: &mut Window,
 355        cx: &mut Context<AcpServerView>,
 356    ) {
 357        let session_id = self.thread.read(cx).session_id().clone();
 358        let agent_telemetry_id = self.thread.read(cx).connection().telemetry_id();
 359        let thread = self.thread.downgrade();
 360
 361        self.is_loading_contents = true;
 362
 363        let model_id = self.current_model_id(cx);
 364        let mode_id = self.current_mode_id(cx);
 365        let guard = cx.new(|_| ());
 366        cx.observe_release(&guard, |this, _guard, cx| {
 367            if let Some(thread) = this.as_active_thread_mut() {
 368                thread.is_loading_contents = false;
 369            }
 370            cx.notify();
 371        })
 372        .detach();
 373
 374        let task = cx.spawn_in(window, async move |this, cx| {
 375            let Some((contents, tracked_buffers)) = contents_task.await? else {
 376                return Ok(());
 377            };
 378
 379            let generation = this.update_in(cx, |this, _window, cx| {
 380                this.in_flight_prompt = Some(contents.clone());
 381                let generation = this.start_turn(cx);
 382                this.set_editor_is_expanded(false, cx);
 383                this.scroll_to_bottom(cx);
 384                generation
 385            })?;
 386
 387            let _stop_turn = defer({
 388                let this = this.clone();
 389                let mut cx = cx.clone();
 390                move || {
 391                    this.update(&mut cx, |this, cx| {
 392                        this.stop_turn(generation);
 393                        cx.notify();
 394                    })
 395                    .ok();
 396                }
 397            });
 398            let turn_start_time = Instant::now();
 399            let send = thread.update(cx, |thread, cx| {
 400                thread.action_log().update(cx, |action_log, cx| {
 401                    for buffer in tracked_buffers {
 402                        action_log.buffer_read(buffer, cx)
 403                    }
 404                });
 405                drop(guard);
 406
 407                telemetry::event!(
 408                    "Agent Message Sent",
 409                    agent = agent_telemetry_id,
 410                    session = session_id,
 411                    model = model_id,
 412                    mode = mode_id
 413                );
 414
 415                thread.send(contents, cx)
 416            })?;
 417            let res = send.await;
 418            let turn_time_ms = turn_start_time.elapsed().as_millis();
 419            drop(_stop_turn);
 420            let status = if res.is_ok() {
 421                this.update(cx, |this, _| this.in_flight_prompt.take()).ok();
 422                "success"
 423            } else {
 424                "failure"
 425            };
 426            telemetry::event!(
 427                "Agent Turn Completed",
 428                agent = agent_telemetry_id,
 429                session = session_id,
 430                model = model_id,
 431                mode = mode_id,
 432                status,
 433                turn_time_ms,
 434            );
 435            res
 436        });
 437
 438        cx.spawn(async move |this, cx| {
 439            if let Err(err) = task.await {
 440                this.update(cx, |this, cx| {
 441                    this.handle_thread_error(err, cx);
 442                })
 443                .ok();
 444            } else {
 445                this.update(cx, |this, cx| {
 446                    let should_be_following = this
 447                        .workspace
 448                        .update(cx, |workspace, _| {
 449                            workspace.is_being_followed(CollaboratorId::Agent)
 450                        })
 451                        .unwrap_or_default();
 452                    if let Some(thread) = this.as_active_thread_mut() {
 453                        thread.should_be_following = should_be_following;
 454                    }
 455                })
 456                .ok();
 457            }
 458        })
 459        .detach();
 460    }
 461
 462    pub fn interrupt_and_send(
 463        &mut self,
 464        message_editor: Entity<MessageEditor>,
 465        window: &mut Window,
 466        cx: &mut Context<AcpServerView>,
 467    ) {
 468        let thread = &self.thread;
 469
 470        if self.is_loading_contents {
 471            return;
 472        }
 473
 474        if thread.read(cx).status() == ThreadStatus::Idle {
 475            self.send_impl(message_editor, window, cx);
 476            return;
 477        }
 478
 479        self.stop_current_and_send_new_message(window, cx);
 480    }
 481
 482    pub fn stop_current_and_send_new_message(
 483        &mut self,
 484        window: &mut Window,
 485        cx: &mut Context<AcpServerView>,
 486    ) {
 487        let thread = self.thread.clone();
 488        self.skip_queue_processing_count = 0;
 489        self.user_interrupted_generation = true;
 490
 491        let cancelled = thread.update(cx, |thread, cx| thread.cancel(cx));
 492
 493        cx.spawn_in(window, async move |this, cx| {
 494            cancelled.await;
 495
 496            this.update_in(cx, |this, window, cx| {
 497                this.send_impl(this.message_editor.clone(), window, cx);
 498            })
 499            .ok();
 500        })
 501        .detach();
 502    }
 503
 504    // generation
 505
 506    pub fn cancel_generation(&mut self, cx: &mut Context<AcpServerView>) {
 507        self.thread_retry_status.take();
 508        self.thread_error.take();
 509        self.user_interrupted_generation = true;
 510        self._cancel_task = Some(self.thread.update(cx, |thread, cx| thread.cancel(cx)));
 511    }
 512
 513    pub fn retry_generation(&mut self, cx: &mut Context<AcpServerView>) {
 514        self.thread_error.take();
 515
 516        let thread = &self.thread;
 517        if !thread.read(cx).can_retry(cx) {
 518            return;
 519        }
 520
 521        let task = thread.update(cx, |thread, cx| thread.retry(cx));
 522        cx.spawn(async move |this, cx| {
 523            let result = task.await;
 524
 525            this.update(cx, |this, cx| {
 526                if let Err(err) = result {
 527                    this.handle_thread_error(err, cx);
 528                }
 529            })
 530        })
 531        .detach();
 532    }
 533
 534    pub fn regenerate(
 535        &mut self,
 536        entry_ix: usize,
 537        message_editor: Entity<MessageEditor>,
 538        window: &mut Window,
 539        cx: &mut Context<AcpServerView>,
 540    ) {
 541        if self.is_loading_contents {
 542            return;
 543        }
 544        let thread = self.thread.clone();
 545
 546        let Some(user_message_id) = thread.update(cx, |thread, _| {
 547            thread.entries().get(entry_ix)?.user_message()?.id.clone()
 548        }) else {
 549            return;
 550        };
 551
 552        cx.spawn_in(window, async move |this, cx| {
 553            // Check if there are any edits from prompts before the one being regenerated.
 554            //
 555            // If there are, we keep/accept them since we're not regenerating the prompt that created them.
 556            //
 557            // If editing the prompt that generated the edits, they are auto-rejected
 558            // through the `rewind` function in the `acp_thread`.
 559            let has_earlier_edits = thread.read_with(cx, |thread, _| {
 560                thread
 561                    .entries()
 562                    .iter()
 563                    .take(entry_ix)
 564                    .any(|entry| entry.diffs().next().is_some())
 565            });
 566
 567            if has_earlier_edits {
 568                thread.update(cx, |thread, cx| {
 569                    thread.action_log().update(cx, |action_log, cx| {
 570                        action_log.keep_all_edits(None, cx);
 571                    });
 572                });
 573            }
 574
 575            thread
 576                .update(cx, |thread, cx| thread.rewind(user_message_id, cx))
 577                .await?;
 578            this.update_in(cx, |this, window, cx| {
 579                this.send_impl(message_editor, window, cx);
 580                this.focus_handle(cx).focus(window, cx);
 581            })?;
 582            anyhow::Ok(())
 583        })
 584        .detach_and_log_err(cx);
 585    }
 586
 587    // message queueing
 588
 589    pub fn queue_message(
 590        &mut self,
 591        message_editor: Entity<MessageEditor>,
 592        window: &mut Window,
 593        cx: &mut Context<AcpServerView>,
 594    ) {
 595        let is_idle = self.thread.read(cx).status() == acp_thread::ThreadStatus::Idle;
 596
 597        if is_idle {
 598            self.send_impl(message_editor.clone(), window, cx);
 599            return;
 600        }
 601
 602        let full_mention_content = self.as_native_thread(cx).is_some_and(|thread| {
 603            let thread = thread.read(cx);
 604            AgentSettings::get_global(cx)
 605                .profiles
 606                .get(thread.profile())
 607                .is_some_and(|profile| profile.tools.is_empty())
 608        });
 609
 610        let cached_commands = self.cached_user_commands.borrow().clone();
 611        let cached_errors = self.cached_user_command_errors.borrow().clone();
 612        let contents = message_editor.update(cx, |message_editor, cx| {
 613            message_editor.contents_with_cache(
 614                full_mention_content,
 615                Some(cached_commands),
 616                Some(cached_errors),
 617                cx,
 618            )
 619        });
 620
 621        cx.spawn_in(window, async move |this, cx| {
 622            let (content, tracked_buffers) = contents.await?;
 623
 624            if content.is_empty() {
 625                return Ok::<(), anyhow::Error>(());
 626            }
 627
 628            this.update_in(cx, |this, window, cx| {
 629                this.add_to_queue(content, tracked_buffers, cx);
 630                // Enable fast-track: user can press Enter again to send this queued message immediately
 631                this.set_can_fast_track_queue(true);
 632                message_editor.update(cx, |message_editor, cx| {
 633                    message_editor.clear(window, cx);
 634                });
 635                cx.notify();
 636            })?;
 637            Ok(())
 638        })
 639        .detach_and_log_err(cx);
 640    }
 641
 642    pub fn remove_from_queue(
 643        &mut self,
 644        index: usize,
 645        cx: &mut Context<AcpServerView>,
 646    ) -> Option<QueuedMessage> {
 647        if index < self.local_queued_messages.len() {
 648            let removed = self.local_queued_messages.remove(index);
 649            self.sync_queue_flag_to_native_thread(cx);
 650            Some(removed)
 651        } else {
 652            None
 653        }
 654    }
 655
 656    pub fn sync_queue_flag_to_native_thread(&self, cx: &mut Context<AcpServerView>) {
 657        if let Some(native_thread) = self.as_native_thread(cx) {
 658            let has_queued = self.has_queued_messages();
 659            native_thread.update(cx, |thread, _| {
 660                thread.set_has_queued_message(has_queued);
 661            });
 662        }
 663    }
 664
 665    pub fn send_queued_message_at_index(
 666        &mut self,
 667        index: usize,
 668        is_send_now: bool,
 669        window: &mut Window,
 670        cx: &mut Context<AcpServerView>,
 671    ) {
 672        let Some(queued) = self.remove_from_queue(index, cx) else {
 673            return;
 674        };
 675        let content = queued.content;
 676        let tracked_buffers = queued.tracked_buffers;
 677
 678        // Only increment skip count for "Send Now" operations (out-of-order sends)
 679        // Normal auto-processing from the Stopped handler doesn't need to skip.
 680        // We only skip the Stopped event from the cancelled generation, NOT the
 681        // Stopped event from the newly sent message (which should trigger queue processing).
 682        if is_send_now {
 683            let is_generating =
 684                self.thread.read(cx).status() == acp_thread::ThreadStatus::Generating;
 685            self.skip_queue_processing_count += if is_generating { 1 } else { 0 };
 686        }
 687
 688        let cancelled = self.thread.update(cx, |thread, cx| thread.cancel(cx));
 689
 690        let workspace = self.workspace.clone();
 691
 692        let should_be_following = self.should_be_following;
 693        let contents_task = cx.spawn_in(window, async move |_this, cx| {
 694            cancelled.await;
 695            if should_be_following {
 696                workspace
 697                    .update_in(cx, |workspace, window, cx| {
 698                        workspace.follow(CollaboratorId::Agent, window, cx);
 699                    })
 700                    .ok();
 701            }
 702
 703            Ok(Some((content, tracked_buffers)))
 704        });
 705
 706        self.send_content(contents_task, window, cx);
 707    }
 708
 709    // editor methods
 710
 711    pub fn expand_message_editor(
 712        &mut self,
 713        message_editor: Entity<MessageEditor>,
 714        cx: &mut Context<AcpServerView>,
 715    ) {
 716        self.set_editor_is_expanded(!self.editor_expanded, message_editor, cx);
 717        cx.stop_propagation();
 718        cx.notify();
 719    }
 720
 721    pub fn set_editor_is_expanded(
 722        &mut self,
 723        is_expanded: bool,
 724        message_editor: Entity<MessageEditor>,
 725        cx: &mut Context<AcpServerView>,
 726    ) {
 727        self.editor_expanded = is_expanded;
 728        message_editor.update(cx, |editor, cx| {
 729            if is_expanded {
 730                editor.set_mode(
 731                    EditorMode::Full {
 732                        scale_ui_elements_with_buffer_font_size: false,
 733                        show_active_line_background: false,
 734                        sizing_behavior: SizingBehavior::ExcludeOverscrollMargin,
 735                    },
 736                    cx,
 737                )
 738            } else {
 739                let agent_settings = AgentSettings::get_global(cx);
 740                editor.set_mode(
 741                    EditorMode::AutoHeight {
 742                        min_lines: agent_settings.message_editor_min_lines,
 743                        max_lines: Some(agent_settings.set_message_editor_max_lines()),
 744                    },
 745                    cx,
 746                )
 747            }
 748        });
 749        cx.notify();
 750    }
 751
 752    pub fn handle_title_editor_event(
 753        &mut self,
 754        title_editor: &Entity<Editor>,
 755        event: &EditorEvent,
 756        window: &mut Window,
 757        cx: &mut Context<AcpServerView>,
 758    ) {
 759        let thread = &self.thread;
 760
 761        match event {
 762            EditorEvent::BufferEdited => {
 763                let new_title = title_editor.read(cx).text(cx);
 764                thread.update(cx, |thread, cx| {
 765                    thread
 766                        .set_title(new_title.into(), cx)
 767                        .detach_and_log_err(cx);
 768                })
 769            }
 770            EditorEvent::Blurred => {
 771                if title_editor.read(cx).text(cx).is_empty() {
 772                    title_editor.update(cx, |editor, cx| {
 773                        editor.set_text("New Thread", window, cx);
 774                    });
 775                }
 776            }
 777            _ => {}
 778        }
 779    }
 780
 781    pub fn cancel_editing(
 782        &mut self,
 783        focus_handle: FocusHandle,
 784        window: &mut Window,
 785        cx: &mut Context<AcpServerView>,
 786    ) {
 787        if let Some(index) = self.editing_message.take()
 788            && let Some(editor) = &self
 789                .entry_view_state
 790                .read(cx)
 791                .entry(index)
 792                .and_then(|e| e.message_editor())
 793                .cloned()
 794        {
 795            editor.update(cx, |editor, cx| {
 796                if let Some(user_message) = self
 797                    .thread
 798                    .read(cx)
 799                    .entries()
 800                    .get(index)
 801                    .and_then(|e| e.user_message())
 802                {
 803                    editor.set_message(user_message.chunks.clone(), window, cx);
 804                }
 805            })
 806        };
 807        focus_handle.focus(window, cx);
 808        cx.notify();
 809    }
 810
 811    // tool permissions
 812
 813    pub fn authorize_tool_call(
 814        &mut self,
 815        tool_call_id: acp::ToolCallId,
 816        option_id: acp::PermissionOptionId,
 817        option_kind: acp::PermissionOptionKind,
 818        window: &mut Window,
 819        cx: &mut Context<AcpServerView>,
 820    ) {
 821        let thread = &self.thread;
 822        let agent_telemetry_id = thread.read(cx).connection().telemetry_id();
 823
 824        telemetry::event!(
 825            "Agent Tool Call Authorized",
 826            agent = agent_telemetry_id,
 827            session = thread.read(cx).session_id(),
 828            option = option_kind
 829        );
 830
 831        thread.update(cx, |thread, cx| {
 832            thread.authorize_tool_call(tool_call_id, option_id, option_kind, cx);
 833        });
 834        if self.should_be_following {
 835            self.workspace
 836                .update(cx, |workspace, cx| {
 837                    workspace.follow(CollaboratorId::Agent, window, cx);
 838                })
 839                .ok();
 840        }
 841        cx.notify();
 842    }
 843
 844    pub fn authorize_pending_tool_call(
 845        &mut self,
 846        kind: acp::PermissionOptionKind,
 847        window: &mut Window,
 848        cx: &mut Context<AcpServerView>,
 849    ) -> Option<()> {
 850        let thread = self.thread.read(cx);
 851        let tool_call = thread.first_tool_awaiting_confirmation()?;
 852        let ToolCallStatus::WaitingForConfirmation { options, .. } = &tool_call.status else {
 853            return None;
 854        };
 855        let option = options.first_option_of_kind(kind)?;
 856
 857        self.authorize_tool_call(
 858            tool_call.id.clone(),
 859            option.option_id.clone(),
 860            option.kind,
 861            window,
 862            cx,
 863        );
 864
 865        Some(())
 866    }
 867
 868    pub fn handle_select_permission_granularity(
 869        &mut self,
 870        action: &SelectPermissionGranularity,
 871        cx: &mut Context<AcpServerView>,
 872    ) {
 873        let tool_call_id = acp::ToolCallId::new(action.tool_call_id.clone());
 874        self.selected_permission_granularity
 875            .insert(tool_call_id, action.index);
 876
 877        cx.notify();
 878    }
 879
 880    // edits
 881
 882    pub fn keep_all(&mut self, cx: &mut Context<AcpServerView>) {
 883        let thread = &self.thread;
 884        let telemetry = ActionLogTelemetry::from(thread.read(cx));
 885        let action_log = thread.read(cx).action_log().clone();
 886        action_log.update(cx, |action_log, cx| {
 887            action_log.keep_all_edits(Some(telemetry), cx)
 888        });
 889    }
 890
 891    pub fn reject_all(&mut self, cx: &mut Context<AcpServerView>) {
 892        let thread = &self.thread;
 893        let telemetry = ActionLogTelemetry::from(thread.read(cx));
 894        let action_log = thread.read(cx).action_log().clone();
 895        action_log
 896            .update(cx, |action_log, cx| {
 897                action_log.reject_all_edits(Some(telemetry), cx)
 898            })
 899            .detach();
 900    }
 901
 902    pub fn open_edited_buffer(
 903        &mut self,
 904        buffer: &Entity<Buffer>,
 905        window: &mut Window,
 906        cx: &mut Context<AcpServerView>,
 907    ) {
 908        let thread = &self.thread;
 909
 910        let Some(diff) =
 911            AgentDiffPane::deploy(thread.clone(), self.workspace.clone(), window, cx).log_err()
 912        else {
 913            return;
 914        };
 915
 916        diff.update(cx, |diff, cx| {
 917            diff.move_to_path(PathKey::for_buffer(buffer, cx), window, cx)
 918        })
 919    }
 920
 921    // thread stuff
 922
 923    pub fn sync_thread(
 924        &mut self,
 925        project: Entity<Project>,
 926        window: &mut Window,
 927        cx: &mut Context<AcpServerView>,
 928    ) {
 929        if !self.is_imported_thread(cx) {
 930            return;
 931        }
 932
 933        let Some(session_list) = self
 934            .as_native_connection(cx)
 935            .and_then(|connection| connection.session_list(cx))
 936            .and_then(|list| list.downcast::<NativeAgentSessionList>())
 937        else {
 938            return;
 939        };
 940        let thread_store = session_list.thread_store().clone();
 941
 942        let client = project.read(cx).client();
 943        let session_id = self.thread.read(cx).session_id().clone();
 944
 945        cx.spawn_in(window, async move |this, cx| {
 946            let response = client
 947                .request(proto::GetSharedAgentThread {
 948                    session_id: session_id.to_string(),
 949                })
 950                .await?;
 951
 952            let shared_thread = SharedThread::from_bytes(&response.thread_data)?;
 953
 954            let db_thread = shared_thread.to_db_thread();
 955
 956            thread_store
 957                .update(&mut cx.clone(), |store, cx| {
 958                    store.save_thread(session_id.clone(), db_thread, cx)
 959                })
 960                .await?;
 961
 962            let thread_metadata = AgentSessionInfo {
 963                session_id,
 964                cwd: None,
 965                title: Some(format!("🔗 {}", response.title).into()),
 966                updated_at: Some(chrono::Utc::now()),
 967                meta: None,
 968            };
 969
 970            this.update_in(cx, |this, window, cx| {
 971                if let Some(thread) = this.as_active_thread_mut() {
 972                    let resume_thread_metadata = &mut thread.resume_thread_metadata;
 973                    *resume_thread_metadata = Some(thread_metadata);
 974                }
 975                this.reset(window, cx);
 976            })?;
 977
 978            this.update_in(cx, |this, _window, cx| {
 979                if let Some(workspace) = this.workspace.upgrade() {
 980                    workspace.update(cx, |workspace, cx| {
 981                        struct ThreadSyncedToast;
 982                        workspace.show_toast(
 983                            Toast::new(
 984                                NotificationId::unique::<ThreadSyncedToast>(),
 985                                "Thread synced with latest version",
 986                            )
 987                            .autohide(),
 988                            cx,
 989                        );
 990                    });
 991                }
 992            })?;
 993
 994            anyhow::Ok(())
 995        })
 996        .detach_and_log_err(cx);
 997    }
 998
 999    pub fn restore_checkpoint(
1000        &mut self,
1001        message_id: &UserMessageId,
1002        cx: &mut Context<AcpServerView>,
1003    ) {
1004        self.thread
1005            .update(cx, |thread, cx| {
1006                thread.restore_checkpoint(message_id.clone(), cx)
1007            })
1008            .detach_and_log_err(cx);
1009    }
1010
1011    pub fn clear_thread_error(&mut self, cx: &mut Context<AcpServerView>) {
1012        self.thread_error = None;
1013        self.thread_error_markdown = None;
1014        self.token_limit_callout_dismissed = true;
1015        cx.notify();
1016    }
1017
1018    // other
1019
1020    pub fn refresh_cached_user_commands_from_registry(
1021        &mut self,
1022        registry: &Entity<SlashCommandRegistry>,
1023        cx: &App,
1024    ) {
1025        let (mut commands, mut errors) = registry.read_with(cx, |registry, _| {
1026            (registry.commands().clone(), registry.errors().to_vec())
1027        });
1028        let server_command_names = self
1029            .available_commands
1030            .borrow()
1031            .iter()
1032            .map(|command| command.name.clone())
1033            .collect::<HashSet<_>>();
1034        user_slash_command::apply_server_command_conflicts_to_map(
1035            &mut commands,
1036            &mut errors,
1037            &server_command_names,
1038        );
1039
1040        self.command_load_errors_dismissed = false;
1041        *self.cached_user_commands.borrow_mut() = commands;
1042        *self.cached_user_command_errors.borrow_mut() = errors;
1043    }
1044
1045    pub fn render_command_load_errors(
1046        &self,
1047        cx: &mut Context<AcpServerView>,
1048    ) -> Option<impl IntoElement> {
1049        let errors = self.cached_user_command_errors.borrow();
1050
1051        if self.command_load_errors_dismissed || errors.is_empty() {
1052            return None;
1053        }
1054
1055        let workspace = self.workspace.clone();
1056
1057        let error_count = errors.len();
1058        let title = if error_count == 1 {
1059            "Failed to load slash command"
1060        } else {
1061            "Failed to load slash commands"
1062        };
1063
1064        Some(
1065            Callout::new()
1066                .icon(IconName::Warning)
1067                .severity(Severity::Warning)
1068                .title(title)
1069                .actions_slot(
1070                    IconButton::new("dismiss-command-errors", IconName::Close)
1071                        .icon_size(IconSize::Small)
1072                        .icon_color(Color::Muted)
1073                        .tooltip(Tooltip::text("Dismiss Error"))
1074                        .on_click(cx.listener(|this, _, _, cx| {
1075                            this.clear_command_load_errors(cx);
1076                        })),
1077                )
1078                .description_slot(v_flex().children(errors.iter().enumerate().map({
1079                    move |(i, error)| {
1080                        let path = error.path.clone();
1081                        let workspace = workspace.clone();
1082                        let file_name = error
1083                            .path
1084                            .file_name()
1085                            .map(|n| n.to_string_lossy().to_string())
1086                            .unwrap_or_else(|| error.path.display().to_string());
1087                        let id = ElementId::Name(format!("command-error-{i}").into());
1088                        let label = format!("{}: {}", file_name, error.message);
1089
1090                        Button::new(id, label)
1091                            .label_size(LabelSize::Small)
1092                            .truncate(true)
1093                            .tooltip({
1094                                let message: SharedString = error.message.clone().into();
1095                                let path: SharedString = error.path.display().to_string().into();
1096                                move |_, cx| {
1097                                    Tooltip::with_meta(message.clone(), None, path.clone(), cx)
1098                                }
1099                            })
1100                            .on_click({
1101                                move |_, window, cx| {
1102                                    if let Some(workspace) = workspace.upgrade() {
1103                                        workspace.update(cx, |workspace, cx| {
1104                                            workspace
1105                                                .open_abs_path(
1106                                                    path.clone(),
1107                                                    OpenOptions::default(),
1108                                                    window,
1109                                                    cx,
1110                                                )
1111                                                .detach_and_log_err(cx);
1112                                        });
1113                                    }
1114                                }
1115                            })
1116                    }
1117                }))),
1118        )
1119    }
1120
1121    pub fn render_thread_retry_status_callout(&self) -> Option<Callout> {
1122        let state = self.thread_retry_status.as_ref()?;
1123
1124        let next_attempt_in = state
1125            .duration
1126            .saturating_sub(Instant::now().saturating_duration_since(state.started_at));
1127        if next_attempt_in.is_zero() {
1128            return None;
1129        }
1130
1131        let next_attempt_in_secs = next_attempt_in.as_secs() + 1;
1132
1133        let retry_message = if state.max_attempts == 1 {
1134            if next_attempt_in_secs == 1 {
1135                "Retrying. Next attempt in 1 second.".to_string()
1136            } else {
1137                format!("Retrying. Next attempt in {next_attempt_in_secs} seconds.")
1138            }
1139        } else if next_attempt_in_secs == 1 {
1140            format!(
1141                "Retrying. Next attempt in 1 second (Attempt {} of {}).",
1142                state.attempt, state.max_attempts,
1143            )
1144        } else {
1145            format!(
1146                "Retrying. Next attempt in {next_attempt_in_secs} seconds (Attempt {} of {}).",
1147                state.attempt, state.max_attempts,
1148            )
1149        };
1150
1151        Some(
1152            Callout::new()
1153                .icon(IconName::Warning)
1154                .severity(Severity::Warning)
1155                .title(state.last_error.clone())
1156                .description(retry_message),
1157        )
1158    }
1159
1160    pub fn handle_open_rules(&mut self, window: &mut Window, cx: &mut Context<AcpServerView>) {
1161        let Some(thread) = self.as_native_thread(cx) else {
1162            return;
1163        };
1164        let project_context = thread.read(cx).project_context().read(cx);
1165
1166        let project_entry_ids = project_context
1167            .worktrees
1168            .iter()
1169            .flat_map(|worktree| worktree.rules_file.as_ref())
1170            .map(|rules_file| ProjectEntryId::from_usize(rules_file.project_entry_id))
1171            .collect::<Vec<_>>();
1172
1173        self.workspace
1174            .update(cx, move |workspace, cx| {
1175                // TODO: Open a multibuffer instead? In some cases this doesn't make the set of rules
1176                // files clear. For example, if rules file 1 is already open but rules file 2 is not,
1177                // this would open and focus rules file 2 in a tab that is not next to rules file 1.
1178                let project = workspace.project().read(cx);
1179                let project_paths = project_entry_ids
1180                    .into_iter()
1181                    .flat_map(|entry_id| project.path_for_entry(entry_id, cx))
1182                    .collect::<Vec<_>>();
1183                for project_path in project_paths {
1184                    workspace
1185                        .open_path(project_path, None, true, window, cx)
1186                        .detach_and_log_err(cx);
1187                }
1188            })
1189            .ok();
1190    }
1191}