log_store.rs

  1use std::{collections::VecDeque, sync::Arc};
  2
  3use collections::HashMap;
  4use futures::{StreamExt, channel::mpsc};
  5use gpui::{App, AppContext as _, Context, Entity, EventEmitter, Global, Subscription, WeakEntity};
  6use lsp::{
  7    IoKind, LanguageServer, LanguageServerId, LanguageServerName, LanguageServerSelector,
  8    MessageType, TraceValue,
  9};
 10use rpc::proto;
 11use settings::WorktreeId;
 12
 13use crate::{LanguageServerLogType, LspStore, Project, ProjectItem as _};
 14
 15const SEND_LINE: &str = "\n// Send:";
 16const RECEIVE_LINE: &str = "\n// Receive:";
 17const MAX_STORED_LOG_ENTRIES: usize = 2000;
 18
 19const RPC_MESSAGES: &str = "RPC Messages";
 20const SERVER_LOGS: &str = "Server Logs";
 21const SERVER_TRACE: &str = "Server Trace";
 22const SERVER_INFO: &str = "Server Info";
 23
 24pub fn init(on_headless_host: bool, cx: &mut App) -> Entity<LogStore> {
 25    let log_store = cx.new(|cx| LogStore::new(on_headless_host, cx));
 26    cx.set_global(GlobalLogStore(log_store.clone()));
 27    log_store
 28}
 29
 30pub struct GlobalLogStore(pub Entity<LogStore>);
 31
 32impl Global for GlobalLogStore {}
 33
 34#[derive(Debug)]
 35pub enum Event {
 36    NewServerLogEntry {
 37        id: LanguageServerId,
 38        kind: LanguageServerLogType,
 39        text: String,
 40    },
 41}
 42
 43impl EventEmitter<Event> for LogStore {}
 44
 45pub struct LogStore {
 46    on_headless_host: bool,
 47    projects: HashMap<WeakEntity<Project>, ProjectState>,
 48    pub copilot_log_subscription: Option<lsp::Subscription>,
 49    pub language_servers: HashMap<LanguageServerId, LanguageServerState>,
 50    io_tx: mpsc::UnboundedSender<(LanguageServerId, IoKind, String)>,
 51}
 52
 53struct ProjectState {
 54    _subscriptions: [Subscription; 2],
 55}
 56
 57pub trait Message: AsRef<str> {
 58    type Level: Copy + std::fmt::Debug;
 59    fn should_include(&self, _: Self::Level) -> bool {
 60        true
 61    }
 62}
 63
 64#[derive(Debug)]
 65pub struct LogMessage {
 66    message: String,
 67    typ: MessageType,
 68}
 69
 70impl AsRef<str> for LogMessage {
 71    fn as_ref(&self) -> &str {
 72        &self.message
 73    }
 74}
 75
 76impl Message for LogMessage {
 77    type Level = MessageType;
 78
 79    fn should_include(&self, level: Self::Level) -> bool {
 80        match (self.typ, level) {
 81            (MessageType::ERROR, _) => true,
 82            (_, MessageType::ERROR) => false,
 83            (MessageType::WARNING, _) => true,
 84            (_, MessageType::WARNING) => false,
 85            (MessageType::INFO, _) => true,
 86            (_, MessageType::INFO) => false,
 87            _ => true,
 88        }
 89    }
 90}
 91
 92#[derive(Debug)]
 93pub struct TraceMessage {
 94    message: String,
 95    is_verbose: bool,
 96}
 97
 98impl AsRef<str> for TraceMessage {
 99    fn as_ref(&self) -> &str {
100        &self.message
101    }
102}
103
104impl Message for TraceMessage {
105    type Level = TraceValue;
106
107    fn should_include(&self, level: Self::Level) -> bool {
108        match level {
109            TraceValue::Off => false,
110            TraceValue::Messages => !self.is_verbose,
111            TraceValue::Verbose => true,
112        }
113    }
114}
115
116#[derive(Debug)]
117pub struct RpcMessage {
118    message: String,
119}
120
121impl AsRef<str> for RpcMessage {
122    fn as_ref(&self) -> &str {
123        &self.message
124    }
125}
126
127impl Message for RpcMessage {
128    type Level = ();
129}
130
131pub struct LanguageServerState {
132    pub name: Option<LanguageServerName>,
133    pub worktree_id: Option<WorktreeId>,
134    pub kind: LanguageServerKind,
135    log_messages: VecDeque<LogMessage>,
136    trace_messages: VecDeque<TraceMessage>,
137    pub rpc_state: Option<LanguageServerRpcState>,
138    pub trace_level: TraceValue,
139    pub log_level: MessageType,
140    io_logs_subscription: Option<lsp::Subscription>,
141    pub toggled_log_kind: Option<LogKind>,
142}
143
144impl std::fmt::Debug for LanguageServerState {
145    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146        f.debug_struct("LanguageServerState")
147            .field("name", &self.name)
148            .field("worktree_id", &self.worktree_id)
149            .field("kind", &self.kind)
150            .field("log_messages", &self.log_messages)
151            .field("trace_messages", &self.trace_messages)
152            .field("rpc_state", &self.rpc_state)
153            .field("trace_level", &self.trace_level)
154            .field("log_level", &self.log_level)
155            .field("toggled_log_kind", &self.toggled_log_kind)
156            .finish_non_exhaustive()
157    }
158}
159
160#[derive(PartialEq, Clone)]
161pub enum LanguageServerKind {
162    Local { project: WeakEntity<Project> },
163    Remote { project: WeakEntity<Project> },
164    LocalSsh { lsp_store: WeakEntity<LspStore> },
165    Global,
166}
167
168impl std::fmt::Debug for LanguageServerKind {
169    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
170        match self {
171            LanguageServerKind::Local { .. } => write!(f, "LanguageServerKind::Local"),
172            LanguageServerKind::Remote { .. } => write!(f, "LanguageServerKind::Remote"),
173            LanguageServerKind::LocalSsh { .. } => write!(f, "LanguageServerKind::LocalSsh"),
174            LanguageServerKind::Global => write!(f, "LanguageServerKind::Global"),
175        }
176    }
177}
178
179impl LanguageServerKind {
180    pub fn project(&self) -> Option<&WeakEntity<Project>> {
181        match self {
182            Self::Local { project } => Some(project),
183            Self::Remote { project } => Some(project),
184            Self::LocalSsh { .. } => None,
185            Self::Global { .. } => None,
186        }
187    }
188}
189
190#[derive(Debug)]
191pub struct LanguageServerRpcState {
192    pub rpc_messages: VecDeque<RpcMessage>,
193    last_message_kind: Option<MessageKind>,
194}
195
196#[derive(Debug, Copy, Clone, PartialEq, Eq)]
197enum MessageKind {
198    Send,
199    Receive,
200}
201
202#[derive(Clone, Copy, Debug, Default, PartialEq)]
203pub enum LogKind {
204    Rpc,
205    Trace,
206    #[default]
207    Logs,
208    ServerInfo,
209}
210
211impl LogKind {
212    pub fn from_server_log_type(log_type: &LanguageServerLogType) -> Self {
213        match log_type {
214            LanguageServerLogType::Log(_) => Self::Logs,
215            LanguageServerLogType::Trace { .. } => Self::Trace,
216            LanguageServerLogType::Rpc { .. } => Self::Rpc,
217        }
218    }
219
220    pub fn label(&self) -> &'static str {
221        match self {
222            LogKind::Rpc => RPC_MESSAGES,
223            LogKind::Trace => SERVER_TRACE,
224            LogKind::Logs => SERVER_LOGS,
225            LogKind::ServerInfo => SERVER_INFO,
226        }
227    }
228}
229
230impl LogStore {
231    pub fn new(on_headless_host: bool, cx: &mut Context<Self>) -> Self {
232        let (io_tx, mut io_rx) = mpsc::unbounded();
233
234        let log_store = Self {
235            projects: HashMap::default(),
236            language_servers: HashMap::default(),
237            copilot_log_subscription: None,
238            on_headless_host,
239            io_tx,
240        };
241        cx.spawn(async move |log_store, cx| {
242            while let Some((server_id, io_kind, message)) = io_rx.next().await {
243                if let Some(log_store) = log_store.upgrade() {
244                    log_store.update(cx, |log_store, cx| {
245                        log_store.on_io(server_id, io_kind, &message, cx);
246                    })?;
247                }
248            }
249            anyhow::Ok(())
250        })
251        .detach_and_log_err(cx);
252
253        log_store
254    }
255
256    pub fn add_project(&mut self, project: &Entity<Project>, cx: &mut Context<Self>) {
257        let weak_project = project.downgrade();
258        self.projects.insert(
259            project.downgrade(),
260            ProjectState {
261                _subscriptions: [
262                    cx.observe_release(project, move |this, _, _| {
263                        this.projects.remove(&weak_project);
264                        this.language_servers
265                            .retain(|_, state| state.kind.project() != Some(&weak_project));
266                    }),
267                    cx.subscribe(project, move |log_store, project, event, cx| {
268                        let server_kind = if project.read(cx).is_local() {
269                            LanguageServerKind::Local {
270                                project: project.downgrade(),
271                            }
272                        } else {
273                            LanguageServerKind::Remote {
274                                project: project.downgrade(),
275                            }
276                        };
277                        match event {
278                            crate::Event::LanguageServerAdded(id, name, worktree_id) => {
279                                log_store.add_language_server(
280                                    server_kind,
281                                    *id,
282                                    Some(name.clone()),
283                                    *worktree_id,
284                                    project
285                                        .read(cx)
286                                        .lsp_store()
287                                        .read(cx)
288                                        .language_server_for_id(*id),
289                                    cx,
290                                );
291                            }
292                            crate::Event::LanguageServerBufferRegistered {
293                                server_id,
294                                buffer_id,
295                                name,
296                                ..
297                            } => {
298                                let worktree_id = project
299                                    .read(cx)
300                                    .buffer_for_id(*buffer_id, cx)
301                                    .and_then(|buffer| {
302                                        Some(buffer.read(cx).project_path(cx)?.worktree_id)
303                                    });
304                                let name = name.clone().or_else(|| {
305                                    project
306                                        .read(cx)
307                                        .lsp_store()
308                                        .read(cx)
309                                        .language_server_statuses
310                                        .get(server_id)
311                                        .map(|status| status.name.clone())
312                                });
313                                log_store.add_language_server(
314                                    server_kind,
315                                    *server_id,
316                                    name,
317                                    worktree_id,
318                                    None,
319                                    cx,
320                                );
321                            }
322                            crate::Event::LanguageServerRemoved(id) => {
323                                log_store.remove_language_server(*id, cx);
324                            }
325                            crate::Event::LanguageServerLog(id, typ, message) => {
326                                log_store.add_language_server(
327                                    server_kind,
328                                    *id,
329                                    None,
330                                    None,
331                                    None,
332                                    cx,
333                                );
334                                match typ {
335                                    crate::LanguageServerLogType::Log(typ) => {
336                                        log_store.add_language_server_log(*id, *typ, message, cx);
337                                    }
338                                    crate::LanguageServerLogType::Trace { verbose_info } => {
339                                        log_store.add_language_server_trace(
340                                            *id,
341                                            message,
342                                            verbose_info.clone(),
343                                            cx,
344                                        );
345                                    }
346                                    crate::LanguageServerLogType::Rpc { received } => {
347                                        let kind = if *received {
348                                            MessageKind::Receive
349                                        } else {
350                                            MessageKind::Send
351                                        };
352                                        log_store.add_language_server_rpc(*id, kind, message, cx);
353                                    }
354                                }
355                            }
356                            crate::Event::ToggleLspLogs {
357                                server_id,
358                                enabled,
359                                toggled_log_kind,
360                            } => {
361                                if let Some(server_state) =
362                                    log_store.get_language_server_state(*server_id)
363                                {
364                                    if *enabled {
365                                        server_state.toggled_log_kind = Some(*toggled_log_kind);
366                                    } else {
367                                        server_state.toggled_log_kind = None;
368                                    }
369                                }
370                                if LogKind::Rpc == *toggled_log_kind {
371                                    if *enabled {
372                                        log_store.enable_rpc_trace_for_language_server(*server_id);
373                                    } else {
374                                        log_store.disable_rpc_trace_for_language_server(*server_id);
375                                    }
376                                }
377                            }
378                            _ => {}
379                        }
380                    }),
381                ],
382            },
383        );
384    }
385
386    pub fn get_language_server_state(
387        &mut self,
388        id: LanguageServerId,
389    ) -> Option<&mut LanguageServerState> {
390        self.language_servers.get_mut(&id)
391    }
392
393    pub fn add_language_server(
394        &mut self,
395        kind: LanguageServerKind,
396        server_id: LanguageServerId,
397        name: Option<LanguageServerName>,
398        worktree_id: Option<WorktreeId>,
399        server: Option<Arc<LanguageServer>>,
400        cx: &mut Context<Self>,
401    ) -> Option<&mut LanguageServerState> {
402        let server_state = self.language_servers.entry(server_id).or_insert_with(|| {
403            cx.notify();
404            LanguageServerState {
405                name: None,
406                worktree_id: None,
407                kind,
408                rpc_state: None,
409                log_messages: VecDeque::with_capacity(MAX_STORED_LOG_ENTRIES),
410                trace_messages: VecDeque::with_capacity(MAX_STORED_LOG_ENTRIES),
411                trace_level: TraceValue::Off,
412                log_level: MessageType::LOG,
413                io_logs_subscription: None,
414                toggled_log_kind: None,
415            }
416        });
417
418        if let Some(name) = name {
419            server_state.name = Some(name);
420        }
421        if let Some(worktree_id) = worktree_id {
422            server_state.worktree_id = Some(worktree_id);
423        }
424
425        if let Some(server) = server.filter(|_| server_state.io_logs_subscription.is_none()) {
426            let io_tx = self.io_tx.clone();
427            let server_id = server.server_id();
428            server_state.io_logs_subscription = Some(server.on_io(move |io_kind, message| {
429                io_tx
430                    .unbounded_send((server_id, io_kind, message.to_string()))
431                    .ok();
432            }));
433        }
434
435        Some(server_state)
436    }
437
438    pub fn add_language_server_log(
439        &mut self,
440        id: LanguageServerId,
441        typ: MessageType,
442        message: &str,
443        cx: &mut Context<Self>,
444    ) -> Option<()> {
445        let store_logs = !self.on_headless_host;
446        let language_server_state = self.get_language_server_state(id)?;
447
448        let log_lines = &mut language_server_state.log_messages;
449        let message = message.trim_end().to_string();
450        if !store_logs {
451            // Send all messages regardless of the visibility in case of not storing, to notify the receiver anyway
452            self.emit_event(
453                Event::NewServerLogEntry {
454                    id,
455                    kind: LanguageServerLogType::Log(typ),
456                    text: message,
457                },
458                cx,
459            );
460        } else if let Some(new_message) = Self::push_new_message(
461            log_lines,
462            LogMessage { message, typ },
463            language_server_state.log_level,
464        ) {
465            self.emit_event(
466                Event::NewServerLogEntry {
467                    id,
468                    kind: LanguageServerLogType::Log(typ),
469                    text: new_message,
470                },
471                cx,
472            );
473        }
474        Some(())
475    }
476
477    fn add_language_server_trace(
478        &mut self,
479        id: LanguageServerId,
480        message: &str,
481        verbose_info: Option<String>,
482        cx: &mut Context<Self>,
483    ) -> Option<()> {
484        let store_logs = !self.on_headless_host;
485        let language_server_state = self.get_language_server_state(id)?;
486
487        let log_lines = &mut language_server_state.trace_messages;
488        if !store_logs {
489            // Send all messages regardless of the visibility in case of not storing, to notify the receiver anyway
490            self.emit_event(
491                Event::NewServerLogEntry {
492                    id,
493                    kind: LanguageServerLogType::Trace { verbose_info },
494                    text: message.trim().to_string(),
495                },
496                cx,
497            );
498        } else if let Some(new_message) = Self::push_new_message(
499            log_lines,
500            TraceMessage {
501                message: message.trim().to_string(),
502                is_verbose: false,
503            },
504            TraceValue::Messages,
505        ) {
506            if let Some(verbose_message) = verbose_info.as_ref() {
507                Self::push_new_message(
508                    log_lines,
509                    TraceMessage {
510                        message: verbose_message.clone(),
511                        is_verbose: true,
512                    },
513                    TraceValue::Verbose,
514                );
515            }
516            self.emit_event(
517                Event::NewServerLogEntry {
518                    id,
519                    kind: LanguageServerLogType::Trace { verbose_info },
520                    text: new_message,
521                },
522                cx,
523            );
524        }
525        Some(())
526    }
527
528    fn push_new_message<T: Message>(
529        log_lines: &mut VecDeque<T>,
530        message: T,
531        current_severity: <T as Message>::Level,
532    ) -> Option<String> {
533        while log_lines.len() + 1 >= MAX_STORED_LOG_ENTRIES {
534            log_lines.pop_front();
535        }
536        let visible = message.should_include(current_severity);
537
538        let visible_message = visible.then(|| message.as_ref().to_string());
539        log_lines.push_back(message);
540        visible_message
541    }
542
543    fn add_language_server_rpc(
544        &mut self,
545        language_server_id: LanguageServerId,
546        kind: MessageKind,
547        message: &str,
548        cx: &mut Context<'_, Self>,
549    ) {
550        let store_logs = !self.on_headless_host;
551        let Some(state) = self
552            .get_language_server_state(language_server_id)
553            .and_then(|state| state.rpc_state.as_mut())
554        else {
555            return;
556        };
557
558        let received = kind == MessageKind::Receive;
559        let rpc_log_lines = &mut state.rpc_messages;
560        if state.last_message_kind != Some(kind) {
561            while rpc_log_lines.len() + 1 >= MAX_STORED_LOG_ENTRIES {
562                rpc_log_lines.pop_front();
563            }
564            let line_before_message = match kind {
565                MessageKind::Send => SEND_LINE,
566                MessageKind::Receive => RECEIVE_LINE,
567            };
568            if store_logs {
569                rpc_log_lines.push_back(RpcMessage {
570                    message: line_before_message.to_string(),
571                });
572            }
573            // Do not send a synthetic message over the wire, it will be derived from the actual RPC message
574            cx.emit(Event::NewServerLogEntry {
575                id: language_server_id,
576                kind: LanguageServerLogType::Rpc { received },
577                text: line_before_message.to_string(),
578            });
579        }
580
581        while rpc_log_lines.len() + 1 >= MAX_STORED_LOG_ENTRIES {
582            rpc_log_lines.pop_front();
583        }
584
585        if store_logs {
586            rpc_log_lines.push_back(RpcMessage {
587                message: message.trim().to_owned(),
588            });
589        }
590
591        self.emit_event(
592            Event::NewServerLogEntry {
593                id: language_server_id,
594                kind: LanguageServerLogType::Rpc { received },
595                text: message.to_owned(),
596            },
597            cx,
598        );
599    }
600
601    pub fn remove_language_server(&mut self, id: LanguageServerId, cx: &mut Context<Self>) {
602        self.language_servers.remove(&id);
603        cx.notify();
604    }
605
606    pub fn server_logs(&self, server_id: LanguageServerId) -> Option<&VecDeque<LogMessage>> {
607        Some(&self.language_servers.get(&server_id)?.log_messages)
608    }
609
610    pub fn server_trace(&self, server_id: LanguageServerId) -> Option<&VecDeque<TraceMessage>> {
611        Some(&self.language_servers.get(&server_id)?.trace_messages)
612    }
613
614    pub fn server_ids_for_project<'a>(
615        &'a self,
616        lookup_project: &'a WeakEntity<Project>,
617    ) -> impl Iterator<Item = LanguageServerId> + 'a {
618        self.language_servers
619            .iter()
620            .filter_map(move |(id, state)| match &state.kind {
621                LanguageServerKind::Local { project } | LanguageServerKind::Remote { project } => {
622                    if project == lookup_project {
623                        Some(*id)
624                    } else {
625                        None
626                    }
627                }
628                LanguageServerKind::Global | LanguageServerKind::LocalSsh { .. } => Some(*id),
629            })
630    }
631
632    pub fn enable_rpc_trace_for_language_server(
633        &mut self,
634        server_id: LanguageServerId,
635    ) -> Option<&mut LanguageServerRpcState> {
636        let rpc_state = self
637            .language_servers
638            .get_mut(&server_id)?
639            .rpc_state
640            .get_or_insert_with(|| LanguageServerRpcState {
641                rpc_messages: VecDeque::with_capacity(MAX_STORED_LOG_ENTRIES),
642                last_message_kind: None,
643            });
644        Some(rpc_state)
645    }
646
647    pub fn disable_rpc_trace_for_language_server(
648        &mut self,
649        server_id: LanguageServerId,
650    ) -> Option<()> {
651        self.language_servers.get_mut(&server_id)?.rpc_state.take();
652        Some(())
653    }
654
655    pub fn has_server_logs(&self, server: &LanguageServerSelector) -> bool {
656        match server {
657            LanguageServerSelector::Id(id) => self.language_servers.contains_key(id),
658            LanguageServerSelector::Name(name) => self
659                .language_servers
660                .iter()
661                .any(|(_, state)| state.name.as_ref() == Some(name)),
662        }
663    }
664
665    fn on_io(
666        &mut self,
667        language_server_id: LanguageServerId,
668        io_kind: IoKind,
669        message: &str,
670        cx: &mut Context<Self>,
671    ) -> Option<()> {
672        let is_received = match io_kind {
673            IoKind::StdOut => true,
674            IoKind::StdIn => false,
675            IoKind::StdErr => {
676                self.add_language_server_log(language_server_id, MessageType::LOG, message, cx);
677                return Some(());
678            }
679        };
680
681        let kind = if is_received {
682            MessageKind::Receive
683        } else {
684            MessageKind::Send
685        };
686
687        self.add_language_server_rpc(language_server_id, kind, message, cx);
688        cx.notify();
689        Some(())
690    }
691
692    fn emit_event(&mut self, e: Event, cx: &mut Context<Self>) {
693        let on_headless_host = self.on_headless_host;
694        match &e {
695            Event::NewServerLogEntry { id, kind, text } => {
696                if let Some(state) = self.get_language_server_state(*id) {
697                    let downstream_client = match &state.kind {
698                        LanguageServerKind::Remote { project }
699                        | LanguageServerKind::Local { project } => project
700                            .upgrade()
701                            .map(|project| project.read(cx).lsp_store()),
702                        LanguageServerKind::LocalSsh { lsp_store } => lsp_store.upgrade(),
703                        LanguageServerKind::Global => None,
704                    }
705                    .and_then(|lsp_store| lsp_store.read(cx).downstream_client());
706                    if let Some((client, project_id)) = downstream_client {
707                        if on_headless_host
708                            || Some(LogKind::from_server_log_type(kind)) == state.toggled_log_kind
709                        {
710                            client
711                                .send(proto::LanguageServerLog {
712                                    project_id,
713                                    language_server_id: id.to_proto(),
714                                    message: text.clone(),
715                                    log_type: Some(kind.to_proto()),
716                                })
717                                .ok();
718                        }
719                    }
720                }
721            }
722        }
723
724        cx.emit(e);
725    }
726}