log_store.rs

  1use std::{collections::VecDeque, sync::Arc};
  2
  3use collections::HashMap;
  4use futures::{StreamExt, channel::mpsc};
  5use gpui::{
  6    App, AppContext as _, Context, Entity, EventEmitter, Global, Subscription, TaskExt, WeakEntity,
  7};
  8use lsp::{
  9    IoKind, LanguageServer, LanguageServerId, LanguageServerName, LanguageServerSelector,
 10    MessageType, TraceValue,
 11};
 12use rpc::proto;
 13use settings::WorktreeId;
 14
 15use crate::{LanguageServerLogType, LspStore, Project, ProjectItem as _};
 16
 17const SEND_LINE: &str = "\n// Send:";
 18const RECEIVE_LINE: &str = "\n// Receive:";
 19const MAX_STORED_LOG_ENTRIES: usize = 2000;
 20
 21pub fn init(on_headless_host: bool, cx: &mut App) -> Entity<LogStore> {
 22    let log_store = cx.new(|cx| LogStore::new(on_headless_host, cx));
 23    cx.set_global(GlobalLogStore(log_store.clone()));
 24    log_store
 25}
 26
 27pub struct GlobalLogStore(pub Entity<LogStore>);
 28
 29impl Global for GlobalLogStore {}
 30
 31#[derive(Debug)]
 32pub enum Event {
 33    NewServerLogEntry {
 34        id: LanguageServerId,
 35        kind: LanguageServerLogType,
 36        text: String,
 37    },
 38}
 39
 40impl EventEmitter<Event> for LogStore {}
 41
 42pub struct LogStore {
 43    on_headless_host: bool,
 44    projects: HashMap<WeakEntity<Project>, ProjectState>,
 45    pub language_servers: HashMap<LanguageServerId, LanguageServerState>,
 46    io_tx: mpsc::UnboundedSender<(LanguageServerId, IoKind, String)>,
 47}
 48
 49struct ProjectState {
 50    _subscriptions: [Subscription; 2],
 51    copilot_log_subscription: Option<lsp::Subscription>,
 52}
 53
 54pub trait Message: AsRef<str> {
 55    type Level: Copy + std::fmt::Debug;
 56    fn should_include(&self, _: Self::Level) -> bool {
 57        true
 58    }
 59}
 60
 61#[derive(Debug)]
 62pub struct LogMessage {
 63    message: String,
 64    typ: MessageType,
 65}
 66
 67impl AsRef<str> for LogMessage {
 68    fn as_ref(&self) -> &str {
 69        &self.message
 70    }
 71}
 72
 73impl Message for LogMessage {
 74    type Level = MessageType;
 75
 76    fn should_include(&self, level: Self::Level) -> bool {
 77        match (self.typ, level) {
 78            (MessageType::ERROR, _) => true,
 79            (_, MessageType::ERROR) => false,
 80            (MessageType::WARNING, _) => true,
 81            (_, MessageType::WARNING) => false,
 82            (MessageType::INFO, _) => true,
 83            (_, MessageType::INFO) => false,
 84            _ => true,
 85        }
 86    }
 87}
 88
 89#[derive(Debug)]
 90pub struct TraceMessage {
 91    message: String,
 92    is_verbose: bool,
 93}
 94
 95impl AsRef<str> for TraceMessage {
 96    fn as_ref(&self) -> &str {
 97        &self.message
 98    }
 99}
100
101impl Message for TraceMessage {
102    type Level = TraceValue;
103
104    fn should_include(&self, level: Self::Level) -> bool {
105        match level {
106            TraceValue::Off => false,
107            TraceValue::Messages => !self.is_verbose,
108            TraceValue::Verbose => true,
109        }
110    }
111}
112
113#[derive(Debug)]
114pub struct RpcMessage {
115    message: String,
116}
117
118impl AsRef<str> for RpcMessage {
119    fn as_ref(&self) -> &str {
120        &self.message
121    }
122}
123
124impl Message for RpcMessage {
125    type Level = ();
126}
127
128pub struct LanguageServerState {
129    pub name: Option<LanguageServerName>,
130    pub worktree_id: Option<WorktreeId>,
131    pub kind: LanguageServerKind,
132    log_messages: VecDeque<LogMessage>,
133    trace_messages: VecDeque<TraceMessage>,
134    pub rpc_state: Option<LanguageServerRpcState>,
135    pub trace_level: TraceValue,
136    pub log_level: MessageType,
137    io_logs_subscription: Option<lsp::Subscription>,
138    pub toggled_log_kind: Option<LogKind>,
139}
140
141impl std::fmt::Debug for LanguageServerState {
142    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143        f.debug_struct("LanguageServerState")
144            .field("name", &self.name)
145            .field("worktree_id", &self.worktree_id)
146            .field("kind", &self.kind)
147            .field("log_messages", &self.log_messages)
148            .field("trace_messages", &self.trace_messages)
149            .field("rpc_state", &self.rpc_state)
150            .field("trace_level", &self.trace_level)
151            .field("log_level", &self.log_level)
152            .field("toggled_log_kind", &self.toggled_log_kind)
153            .finish_non_exhaustive()
154    }
155}
156
157#[derive(PartialEq, Clone)]
158pub enum LanguageServerKind {
159    Local { project: WeakEntity<Project> },
160    Remote { project: WeakEntity<Project> },
161    LocalSsh { lsp_store: WeakEntity<LspStore> },
162    Global,
163}
164
165impl std::fmt::Debug for LanguageServerKind {
166    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
167        match self {
168            LanguageServerKind::Local { .. } => write!(f, "LanguageServerKind::Local"),
169            LanguageServerKind::Remote { .. } => write!(f, "LanguageServerKind::Remote"),
170            LanguageServerKind::LocalSsh { .. } => write!(f, "LanguageServerKind::LocalSsh"),
171            LanguageServerKind::Global => write!(f, "LanguageServerKind::Global"),
172        }
173    }
174}
175
176impl LanguageServerKind {
177    pub fn project(&self) -> Option<&WeakEntity<Project>> {
178        match self {
179            Self::Local { project } => Some(project),
180            Self::Remote { project } => Some(project),
181            Self::LocalSsh { .. } => None,
182            Self::Global { .. } => None,
183        }
184    }
185}
186
187#[derive(Debug)]
188pub struct LanguageServerRpcState {
189    pub rpc_messages: VecDeque<RpcMessage>,
190    last_message_kind: Option<MessageKind>,
191}
192
193#[derive(Debug, Copy, Clone, PartialEq, Eq)]
194enum MessageKind {
195    Send,
196    Receive,
197}
198
199#[derive(Clone, Copy, Debug, Default, PartialEq)]
200pub enum LogKind {
201    Rpc,
202    Trace,
203    #[default]
204    Logs,
205    ServerInfo,
206}
207
208impl LogKind {
209    pub fn from_server_log_type(log_type: &LanguageServerLogType) -> Self {
210        match log_type {
211            LanguageServerLogType::Log(_) => Self::Logs,
212            LanguageServerLogType::Trace { .. } => Self::Trace,
213            LanguageServerLogType::Rpc { .. } => Self::Rpc,
214        }
215    }
216}
217
218impl LogStore {
219    pub fn new(on_headless_host: bool, cx: &mut Context<Self>) -> Self {
220        let (io_tx, mut io_rx) = mpsc::unbounded();
221
222        let log_store = Self {
223            projects: HashMap::default(),
224            language_servers: HashMap::default(),
225
226            on_headless_host,
227            io_tx,
228        };
229        cx.spawn(async move |log_store, cx| {
230            while let Some((server_id, io_kind, message)) = io_rx.next().await {
231                if let Some(log_store) = log_store.upgrade() {
232                    log_store.update(cx, |log_store, cx| {
233                        log_store.on_io(server_id, io_kind, &message, cx);
234                    });
235                }
236            }
237            anyhow::Ok(())
238        })
239        .detach_and_log_err(cx);
240
241        log_store
242    }
243
244    pub fn add_project(&mut self, project: &Entity<Project>, cx: &mut Context<Self>) {
245        let weak_project = project.downgrade();
246        self.projects.insert(
247            project.downgrade(),
248            ProjectState {
249                _subscriptions: [
250                    cx.observe_release(project, move |this, _, _| {
251                        this.projects.remove(&weak_project);
252                        this.language_servers
253                            .retain(|_, state| state.kind.project() != Some(&weak_project));
254                    }),
255                    cx.subscribe(project, move |log_store, project, event, cx| {
256                        let server_kind = if project.read(cx).is_local() {
257                            LanguageServerKind::Local {
258                                project: project.downgrade(),
259                            }
260                        } else {
261                            LanguageServerKind::Remote {
262                                project: project.downgrade(),
263                            }
264                        };
265                        match event {
266                            crate::Event::LanguageServerAdded(id, name, worktree_id) => {
267                                log_store.add_language_server(
268                                    server_kind,
269                                    *id,
270                                    Some(name.clone()),
271                                    *worktree_id,
272                                    project
273                                        .read(cx)
274                                        .lsp_store()
275                                        .read(cx)
276                                        .language_server_for_id(*id),
277                                    cx,
278                                );
279                            }
280                            crate::Event::LanguageServerBufferRegistered {
281                                server_id,
282                                buffer_id,
283                                name,
284                                ..
285                            } => {
286                                let worktree_id = project
287                                    .read(cx)
288                                    .buffer_for_id(*buffer_id, cx)
289                                    .and_then(|buffer| {
290                                        Some(buffer.read(cx).project_path(cx)?.worktree_id)
291                                    });
292                                let name = name.clone().or_else(|| {
293                                    project
294                                        .read(cx)
295                                        .lsp_store()
296                                        .read(cx)
297                                        .language_server_statuses
298                                        .get(server_id)
299                                        .map(|status| status.name.clone())
300                                });
301                                log_store.add_language_server(
302                                    server_kind,
303                                    *server_id,
304                                    name,
305                                    worktree_id,
306                                    None,
307                                    cx,
308                                );
309                            }
310                            crate::Event::LanguageServerRemoved(id) => {
311                                log_store.remove_language_server(*id, cx);
312                            }
313                            crate::Event::LanguageServerLog(id, typ, message) => {
314                                log_store.add_language_server(
315                                    server_kind,
316                                    *id,
317                                    None,
318                                    None,
319                                    None,
320                                    cx,
321                                );
322                                match typ {
323                                    crate::LanguageServerLogType::Log(typ) => {
324                                        log_store.add_language_server_log(*id, *typ, message, cx);
325                                    }
326                                    crate::LanguageServerLogType::Trace { verbose_info } => {
327                                        log_store.add_language_server_trace(
328                                            *id,
329                                            message,
330                                            verbose_info.clone(),
331                                            cx,
332                                        );
333                                    }
334                                    crate::LanguageServerLogType::Rpc { received } => {
335                                        let kind = if *received {
336                                            MessageKind::Receive
337                                        } else {
338                                            MessageKind::Send
339                                        };
340                                        log_store.add_language_server_rpc(*id, kind, message, cx);
341                                    }
342                                }
343                            }
344                            crate::Event::ToggleLspLogs {
345                                server_id,
346                                enabled,
347                                toggled_log_kind,
348                            } => {
349                                log_store.toggle_lsp_logs(*server_id, *enabled, *toggled_log_kind);
350                            }
351                            _ => {}
352                        }
353                    }),
354                ],
355                copilot_log_subscription: None,
356            },
357        );
358    }
359
360    pub fn get_language_server_state(
361        &mut self,
362        id: LanguageServerId,
363    ) -> Option<&mut LanguageServerState> {
364        self.language_servers.get_mut(&id)
365    }
366
367    pub fn add_language_server(
368        &mut self,
369        kind: LanguageServerKind,
370        server_id: LanguageServerId,
371        name: Option<LanguageServerName>,
372        worktree_id: Option<WorktreeId>,
373        server: Option<Arc<LanguageServer>>,
374        cx: &mut Context<Self>,
375    ) -> Option<&mut LanguageServerState> {
376        let server_state = self.language_servers.entry(server_id).or_insert_with(|| {
377            cx.notify();
378            LanguageServerState {
379                name: None,
380                worktree_id: None,
381                kind,
382                rpc_state: None,
383                log_messages: VecDeque::with_capacity(MAX_STORED_LOG_ENTRIES),
384                trace_messages: VecDeque::with_capacity(MAX_STORED_LOG_ENTRIES),
385                trace_level: TraceValue::Off,
386                log_level: MessageType::LOG,
387                io_logs_subscription: None,
388                toggled_log_kind: None,
389            }
390        });
391
392        if let Some(name) = name {
393            server_state.name = Some(name);
394        }
395        if let Some(worktree_id) = worktree_id {
396            server_state.worktree_id = Some(worktree_id);
397        }
398
399        if let Some(server) = server.filter(|_| server_state.io_logs_subscription.is_none()) {
400            let io_tx = self.io_tx.clone();
401            let server_id = server.server_id();
402            server_state.io_logs_subscription = Some(server.on_io(move |io_kind, message| {
403                io_tx
404                    .unbounded_send((server_id, io_kind, message.to_string()))
405                    .ok();
406            }));
407        }
408
409        Some(server_state)
410    }
411
412    pub fn add_language_server_log(
413        &mut self,
414        id: LanguageServerId,
415        typ: MessageType,
416        message: &str,
417        cx: &mut Context<Self>,
418    ) -> Option<()> {
419        let store_logs = !self.on_headless_host;
420        let language_server_state = self.get_language_server_state(id)?;
421
422        let log_lines = &mut language_server_state.log_messages;
423        let message = message.trim_end().to_string();
424        if !store_logs {
425            // Send all messages regardless of the visibility in case of not storing, to notify the receiver anyway
426            self.emit_event(
427                Event::NewServerLogEntry {
428                    id,
429                    kind: LanguageServerLogType::Log(typ),
430                    text: message,
431                },
432                cx,
433            );
434        } else if let Some(new_message) = Self::push_new_message(
435            log_lines,
436            LogMessage { message, typ },
437            language_server_state.log_level,
438        ) {
439            self.emit_event(
440                Event::NewServerLogEntry {
441                    id,
442                    kind: LanguageServerLogType::Log(typ),
443                    text: new_message,
444                },
445                cx,
446            );
447        }
448        Some(())
449    }
450
451    fn add_language_server_trace(
452        &mut self,
453        id: LanguageServerId,
454        message: &str,
455        verbose_info: Option<String>,
456        cx: &mut Context<Self>,
457    ) -> Option<()> {
458        let store_logs = !self.on_headless_host;
459        let language_server_state = self.get_language_server_state(id)?;
460
461        let log_lines = &mut language_server_state.trace_messages;
462        if !store_logs {
463            // Send all messages regardless of the visibility in case of not storing, to notify the receiver anyway
464            self.emit_event(
465                Event::NewServerLogEntry {
466                    id,
467                    kind: LanguageServerLogType::Trace { verbose_info },
468                    text: message.trim().to_string(),
469                },
470                cx,
471            );
472        } else if let Some(new_message) = Self::push_new_message(
473            log_lines,
474            TraceMessage {
475                message: message.trim().to_string(),
476                is_verbose: false,
477            },
478            TraceValue::Messages,
479        ) {
480            if let Some(verbose_message) = verbose_info.as_ref() {
481                Self::push_new_message(
482                    log_lines,
483                    TraceMessage {
484                        message: verbose_message.clone(),
485                        is_verbose: true,
486                    },
487                    TraceValue::Verbose,
488                );
489            }
490            self.emit_event(
491                Event::NewServerLogEntry {
492                    id,
493                    kind: LanguageServerLogType::Trace { verbose_info },
494                    text: new_message,
495                },
496                cx,
497            );
498        }
499        Some(())
500    }
501
502    fn push_new_message<T: Message>(
503        log_lines: &mut VecDeque<T>,
504        message: T,
505        current_severity: <T as Message>::Level,
506    ) -> Option<String> {
507        while log_lines.len() + 1 >= MAX_STORED_LOG_ENTRIES {
508            log_lines.pop_front();
509        }
510        let visible = message.should_include(current_severity);
511
512        let visible_message = visible.then(|| message.as_ref().to_string());
513        log_lines.push_back(message);
514        visible_message
515    }
516
517    fn add_language_server_rpc(
518        &mut self,
519        language_server_id: LanguageServerId,
520        kind: MessageKind,
521        message: &str,
522        cx: &mut Context<'_, Self>,
523    ) {
524        let store_logs = !self.on_headless_host;
525        let Some(state) = self
526            .get_language_server_state(language_server_id)
527            .and_then(|state| state.rpc_state.as_mut())
528        else {
529            return;
530        };
531
532        let received = kind == MessageKind::Receive;
533        let rpc_log_lines = &mut state.rpc_messages;
534        if state.last_message_kind != Some(kind) {
535            while rpc_log_lines.len() + 1 >= MAX_STORED_LOG_ENTRIES {
536                rpc_log_lines.pop_front();
537            }
538            let line_before_message = match kind {
539                MessageKind::Send => SEND_LINE,
540                MessageKind::Receive => RECEIVE_LINE,
541            };
542            if store_logs {
543                rpc_log_lines.push_back(RpcMessage {
544                    message: line_before_message.to_string(),
545                });
546            }
547            // Do not send a synthetic message over the wire, it will be derived from the actual RPC message
548            cx.emit(Event::NewServerLogEntry {
549                id: language_server_id,
550                kind: LanguageServerLogType::Rpc { received },
551                text: line_before_message.to_string(),
552            });
553        }
554
555        while rpc_log_lines.len() + 1 >= MAX_STORED_LOG_ENTRIES {
556            rpc_log_lines.pop_front();
557        }
558
559        if store_logs {
560            rpc_log_lines.push_back(RpcMessage {
561                message: message.trim().to_owned(),
562            });
563        }
564
565        self.emit_event(
566            Event::NewServerLogEntry {
567                id: language_server_id,
568                kind: LanguageServerLogType::Rpc { received },
569                text: message.to_owned(),
570            },
571            cx,
572        );
573    }
574
575    pub fn remove_language_server(&mut self, id: LanguageServerId, cx: &mut Context<Self>) {
576        self.language_servers.remove(&id);
577        cx.notify();
578    }
579
580    pub fn server_logs(&self, server_id: LanguageServerId) -> Option<&VecDeque<LogMessage>> {
581        Some(&self.language_servers.get(&server_id)?.log_messages)
582    }
583
584    pub fn server_trace(&self, server_id: LanguageServerId) -> Option<&VecDeque<TraceMessage>> {
585        Some(&self.language_servers.get(&server_id)?.trace_messages)
586    }
587
588    pub fn server_ids_for_project<'a>(
589        &'a self,
590        lookup_project: &'a WeakEntity<Project>,
591    ) -> impl Iterator<Item = LanguageServerId> + 'a {
592        self.language_servers
593            .iter()
594            .filter_map(move |(id, state)| match &state.kind {
595                LanguageServerKind::Local { project } | LanguageServerKind::Remote { project } => {
596                    if project == lookup_project {
597                        Some(*id)
598                    } else {
599                        None
600                    }
601                }
602                LanguageServerKind::Global | LanguageServerKind::LocalSsh { .. } => Some(*id),
603            })
604    }
605
606    pub fn enable_rpc_trace_for_language_server(
607        &mut self,
608        server_id: LanguageServerId,
609    ) -> Option<&mut LanguageServerRpcState> {
610        let rpc_state = self
611            .language_servers
612            .get_mut(&server_id)?
613            .rpc_state
614            .get_or_insert_with(|| LanguageServerRpcState {
615                rpc_messages: VecDeque::with_capacity(MAX_STORED_LOG_ENTRIES),
616                last_message_kind: None,
617            });
618        Some(rpc_state)
619    }
620
621    pub fn disable_rpc_trace_for_language_server(
622        &mut self,
623        server_id: LanguageServerId,
624    ) -> Option<()> {
625        self.language_servers.get_mut(&server_id)?.rpc_state.take();
626        Some(())
627    }
628
629    pub fn has_server_logs(&self, server: &LanguageServerSelector) -> bool {
630        match server {
631            LanguageServerSelector::Id(id) => self.language_servers.contains_key(id),
632            LanguageServerSelector::Name(name) => self
633                .language_servers
634                .iter()
635                .any(|(_, state)| state.name.as_ref() == Some(name)),
636        }
637    }
638
639    fn on_io(
640        &mut self,
641        language_server_id: LanguageServerId,
642        io_kind: IoKind,
643        message: &str,
644        cx: &mut Context<Self>,
645    ) -> Option<()> {
646        let is_received = match io_kind {
647            IoKind::StdOut => true,
648            IoKind::StdIn => false,
649            IoKind::StdErr => {
650                self.add_language_server_log(language_server_id, MessageType::LOG, message, cx);
651                return Some(());
652            }
653        };
654
655        let kind = if is_received {
656            MessageKind::Receive
657        } else {
658            MessageKind::Send
659        };
660
661        self.add_language_server_rpc(language_server_id, kind, message, cx);
662        cx.notify();
663        Some(())
664    }
665
666    fn emit_event(&mut self, e: Event, cx: &mut Context<Self>) {
667        match &e {
668            Event::NewServerLogEntry { id, kind, text } => {
669                if let Some(state) = self.get_language_server_state(*id) {
670                    let downstream_client = match &state.kind {
671                        LanguageServerKind::Remote { project }
672                        | LanguageServerKind::Local { project } => project
673                            .upgrade()
674                            .map(|project| project.read(cx).lsp_store()),
675                        LanguageServerKind::LocalSsh { lsp_store } => lsp_store.upgrade(),
676                        LanguageServerKind::Global => None,
677                    }
678                    .and_then(|lsp_store| lsp_store.read(cx).downstream_client());
679                    if let Some((client, project_id)) = downstream_client {
680                        if Some(LogKind::from_server_log_type(kind)) == state.toggled_log_kind {
681                            client
682                                .send(proto::LanguageServerLog {
683                                    project_id,
684                                    language_server_id: id.to_proto(),
685                                    message: text.clone(),
686                                    log_type: Some(kind.to_proto()),
687                                })
688                                .ok();
689                        }
690                    }
691                }
692            }
693        }
694
695        cx.emit(e);
696    }
697
698    pub fn toggle_lsp_logs(
699        &mut self,
700        server_id: LanguageServerId,
701        enabled: bool,
702        toggled_log_kind: LogKind,
703    ) {
704        if let Some(server_state) = self.get_language_server_state(server_id) {
705            if enabled {
706                server_state.toggled_log_kind = Some(toggled_log_kind);
707            } else {
708                server_state.toggled_log_kind = None;
709            }
710        }
711        if LogKind::Rpc == toggled_log_kind {
712            if enabled {
713                self.enable_rpc_trace_for_language_server(server_id);
714            } else {
715                self.disable_rpc_trace_for_language_server(server_id);
716            }
717        }
718    }
719    pub fn copilot_state_for_project(
720        &mut self,
721        project: &WeakEntity<Project>,
722    ) -> Option<&mut Option<lsp::Subscription>> {
723        self.projects
724            .get_mut(project)
725            .map(|project| &mut project.copilot_log_subscription)
726    }
727}