log_store.rs

  1use std::{collections::VecDeque, sync::Arc};
  2
  3use collections::HashMap;
  4use futures::{StreamExt, channel::mpsc};
  5use gpui::{App, AppContext as _, Context, Entity, EventEmitter, Global, Subscription, WeakEntity};
  6use lsp::{
  7    IoKind, LanguageServer, LanguageServerId, LanguageServerName, LanguageServerSelector,
  8    MessageType, TraceValue,
  9};
 10use rpc::proto;
 11use settings::WorktreeId;
 12
 13use crate::{LanguageServerLogType, LspStore, Project, ProjectItem as _};
 14
 15const SEND_LINE: &str = "\n// Send:";
 16const RECEIVE_LINE: &str = "\n// Receive:";
 17const MAX_STORED_LOG_ENTRIES: usize = 2000;
 18
 19pub fn init(on_headless_host: bool, cx: &mut App) -> Entity<LogStore> {
 20    let log_store = cx.new(|cx| LogStore::new(on_headless_host, cx));
 21    cx.set_global(GlobalLogStore(log_store.clone()));
 22    log_store
 23}
 24
 25pub struct GlobalLogStore(pub Entity<LogStore>);
 26
 27impl Global for GlobalLogStore {}
 28
 29#[derive(Debug)]
 30pub enum Event {
 31    NewServerLogEntry {
 32        id: LanguageServerId,
 33        kind: LanguageServerLogType,
 34        text: String,
 35    },
 36}
 37
 38impl EventEmitter<Event> for LogStore {}
 39
 40pub struct LogStore {
 41    on_headless_host: bool,
 42    projects: HashMap<WeakEntity<Project>, ProjectState>,
 43    pub copilot_log_subscription: Option<lsp::Subscription>,
 44    pub language_servers: HashMap<LanguageServerId, LanguageServerState>,
 45    io_tx: mpsc::UnboundedSender<(LanguageServerId, IoKind, String)>,
 46}
 47
 48struct ProjectState {
 49    _subscriptions: [Subscription; 2],
 50}
 51
 52pub trait Message: AsRef<str> {
 53    type Level: Copy + std::fmt::Debug;
 54    fn should_include(&self, _: Self::Level) -> bool {
 55        true
 56    }
 57}
 58
 59#[derive(Debug)]
 60pub struct LogMessage {
 61    message: String,
 62    typ: MessageType,
 63}
 64
 65impl AsRef<str> for LogMessage {
 66    fn as_ref(&self) -> &str {
 67        &self.message
 68    }
 69}
 70
 71impl Message for LogMessage {
 72    type Level = MessageType;
 73
 74    fn should_include(&self, level: Self::Level) -> bool {
 75        match (self.typ, level) {
 76            (MessageType::ERROR, _) => true,
 77            (_, MessageType::ERROR) => false,
 78            (MessageType::WARNING, _) => true,
 79            (_, MessageType::WARNING) => false,
 80            (MessageType::INFO, _) => true,
 81            (_, MessageType::INFO) => false,
 82            _ => true,
 83        }
 84    }
 85}
 86
 87#[derive(Debug)]
 88pub struct TraceMessage {
 89    message: String,
 90    is_verbose: bool,
 91}
 92
 93impl AsRef<str> for TraceMessage {
 94    fn as_ref(&self) -> &str {
 95        &self.message
 96    }
 97}
 98
 99impl Message for TraceMessage {
100    type Level = TraceValue;
101
102    fn should_include(&self, level: Self::Level) -> bool {
103        match level {
104            TraceValue::Off => false,
105            TraceValue::Messages => !self.is_verbose,
106            TraceValue::Verbose => true,
107        }
108    }
109}
110
111#[derive(Debug)]
112pub struct RpcMessage {
113    message: String,
114}
115
116impl AsRef<str> for RpcMessage {
117    fn as_ref(&self) -> &str {
118        &self.message
119    }
120}
121
122impl Message for RpcMessage {
123    type Level = ();
124}
125
126pub struct LanguageServerState {
127    pub name: Option<LanguageServerName>,
128    pub worktree_id: Option<WorktreeId>,
129    pub kind: LanguageServerKind,
130    log_messages: VecDeque<LogMessage>,
131    trace_messages: VecDeque<TraceMessage>,
132    pub rpc_state: Option<LanguageServerRpcState>,
133    pub trace_level: TraceValue,
134    pub log_level: MessageType,
135    io_logs_subscription: Option<lsp::Subscription>,
136    pub toggled_log_kind: Option<LogKind>,
137}
138
139impl std::fmt::Debug for LanguageServerState {
140    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141        f.debug_struct("LanguageServerState")
142            .field("name", &self.name)
143            .field("worktree_id", &self.worktree_id)
144            .field("kind", &self.kind)
145            .field("log_messages", &self.log_messages)
146            .field("trace_messages", &self.trace_messages)
147            .field("rpc_state", &self.rpc_state)
148            .field("trace_level", &self.trace_level)
149            .field("log_level", &self.log_level)
150            .field("toggled_log_kind", &self.toggled_log_kind)
151            .finish_non_exhaustive()
152    }
153}
154
155#[derive(PartialEq, Clone)]
156pub enum LanguageServerKind {
157    Local { project: WeakEntity<Project> },
158    Remote { project: WeakEntity<Project> },
159    LocalSsh { lsp_store: WeakEntity<LspStore> },
160    Global,
161}
162
163impl std::fmt::Debug for LanguageServerKind {
164    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
165        match self {
166            LanguageServerKind::Local { .. } => write!(f, "LanguageServerKind::Local"),
167            LanguageServerKind::Remote { .. } => write!(f, "LanguageServerKind::Remote"),
168            LanguageServerKind::LocalSsh { .. } => write!(f, "LanguageServerKind::LocalSsh"),
169            LanguageServerKind::Global => write!(f, "LanguageServerKind::Global"),
170        }
171    }
172}
173
174impl LanguageServerKind {
175    pub fn project(&self) -> Option<&WeakEntity<Project>> {
176        match self {
177            Self::Local { project } => Some(project),
178            Self::Remote { project } => Some(project),
179            Self::LocalSsh { .. } => None,
180            Self::Global { .. } => None,
181        }
182    }
183}
184
185#[derive(Debug)]
186pub struct LanguageServerRpcState {
187    pub rpc_messages: VecDeque<RpcMessage>,
188    last_message_kind: Option<MessageKind>,
189}
190
191#[derive(Debug, Copy, Clone, PartialEq, Eq)]
192enum MessageKind {
193    Send,
194    Receive,
195}
196
197#[derive(Clone, Copy, Debug, Default, PartialEq)]
198pub enum LogKind {
199    Rpc,
200    Trace,
201    #[default]
202    Logs,
203    ServerInfo,
204}
205
206impl LogKind {
207    pub fn from_server_log_type(log_type: &LanguageServerLogType) -> Self {
208        match log_type {
209            LanguageServerLogType::Log(_) => Self::Logs,
210            LanguageServerLogType::Trace { .. } => Self::Trace,
211            LanguageServerLogType::Rpc { .. } => Self::Rpc,
212        }
213    }
214}
215
216impl LogStore {
217    pub fn new(on_headless_host: bool, cx: &mut Context<Self>) -> Self {
218        let (io_tx, mut io_rx) = mpsc::unbounded();
219
220        let log_store = Self {
221            projects: HashMap::default(),
222            language_servers: HashMap::default(),
223            copilot_log_subscription: None,
224            on_headless_host,
225            io_tx,
226        };
227        cx.spawn(async move |log_store, cx| {
228            while let Some((server_id, io_kind, message)) = io_rx.next().await {
229                if let Some(log_store) = log_store.upgrade() {
230                    log_store.update(cx, |log_store, cx| {
231                        log_store.on_io(server_id, io_kind, &message, cx);
232                    })?;
233                }
234            }
235            anyhow::Ok(())
236        })
237        .detach_and_log_err(cx);
238
239        log_store
240    }
241
242    pub fn add_project(&mut self, project: &Entity<Project>, cx: &mut Context<Self>) {
243        let weak_project = project.downgrade();
244        self.projects.insert(
245            project.downgrade(),
246            ProjectState {
247                _subscriptions: [
248                    cx.observe_release(project, move |this, _, _| {
249                        this.projects.remove(&weak_project);
250                        this.language_servers
251                            .retain(|_, state| state.kind.project() != Some(&weak_project));
252                    }),
253                    cx.subscribe(project, move |log_store, project, event, cx| {
254                        let server_kind = if project.read(cx).is_local() {
255                            LanguageServerKind::Local {
256                                project: project.downgrade(),
257                            }
258                        } else {
259                            LanguageServerKind::Remote {
260                                project: project.downgrade(),
261                            }
262                        };
263                        match event {
264                            crate::Event::LanguageServerAdded(id, name, worktree_id) => {
265                                log_store.add_language_server(
266                                    server_kind,
267                                    *id,
268                                    Some(name.clone()),
269                                    *worktree_id,
270                                    project
271                                        .read(cx)
272                                        .lsp_store()
273                                        .read(cx)
274                                        .language_server_for_id(*id),
275                                    cx,
276                                );
277                            }
278                            crate::Event::LanguageServerBufferRegistered {
279                                server_id,
280                                buffer_id,
281                                name,
282                                ..
283                            } => {
284                                let worktree_id = project
285                                    .read(cx)
286                                    .buffer_for_id(*buffer_id, cx)
287                                    .and_then(|buffer| {
288                                        Some(buffer.read(cx).project_path(cx)?.worktree_id)
289                                    });
290                                let name = name.clone().or_else(|| {
291                                    project
292                                        .read(cx)
293                                        .lsp_store()
294                                        .read(cx)
295                                        .language_server_statuses
296                                        .get(server_id)
297                                        .map(|status| status.name.clone())
298                                });
299                                log_store.add_language_server(
300                                    server_kind,
301                                    *server_id,
302                                    name,
303                                    worktree_id,
304                                    None,
305                                    cx,
306                                );
307                            }
308                            crate::Event::LanguageServerRemoved(id) => {
309                                log_store.remove_language_server(*id, cx);
310                            }
311                            crate::Event::LanguageServerLog(id, typ, message) => {
312                                log_store.add_language_server(
313                                    server_kind,
314                                    *id,
315                                    None,
316                                    None,
317                                    None,
318                                    cx,
319                                );
320                                match typ {
321                                    crate::LanguageServerLogType::Log(typ) => {
322                                        log_store.add_language_server_log(*id, *typ, message, cx);
323                                    }
324                                    crate::LanguageServerLogType::Trace { verbose_info } => {
325                                        log_store.add_language_server_trace(
326                                            *id,
327                                            message,
328                                            verbose_info.clone(),
329                                            cx,
330                                        );
331                                    }
332                                    crate::LanguageServerLogType::Rpc { received } => {
333                                        let kind = if *received {
334                                            MessageKind::Receive
335                                        } else {
336                                            MessageKind::Send
337                                        };
338                                        log_store.add_language_server_rpc(*id, kind, message, cx);
339                                    }
340                                }
341                            }
342                            crate::Event::ToggleLspLogs {
343                                server_id,
344                                enabled,
345                                toggled_log_kind,
346                            } => {
347                                log_store.toggle_lsp_logs(*server_id, *enabled, *toggled_log_kind);
348                            }
349                            _ => {}
350                        }
351                    }),
352                ],
353            },
354        );
355    }
356
357    pub fn get_language_server_state(
358        &mut self,
359        id: LanguageServerId,
360    ) -> Option<&mut LanguageServerState> {
361        self.language_servers.get_mut(&id)
362    }
363
364    pub fn add_language_server(
365        &mut self,
366        kind: LanguageServerKind,
367        server_id: LanguageServerId,
368        name: Option<LanguageServerName>,
369        worktree_id: Option<WorktreeId>,
370        server: Option<Arc<LanguageServer>>,
371        cx: &mut Context<Self>,
372    ) -> Option<&mut LanguageServerState> {
373        let server_state = self.language_servers.entry(server_id).or_insert_with(|| {
374            cx.notify();
375            LanguageServerState {
376                name: None,
377                worktree_id: None,
378                kind,
379                rpc_state: None,
380                log_messages: VecDeque::with_capacity(MAX_STORED_LOG_ENTRIES),
381                trace_messages: VecDeque::with_capacity(MAX_STORED_LOG_ENTRIES),
382                trace_level: TraceValue::Off,
383                log_level: MessageType::LOG,
384                io_logs_subscription: None,
385                toggled_log_kind: None,
386            }
387        });
388
389        if let Some(name) = name {
390            server_state.name = Some(name);
391        }
392        if let Some(worktree_id) = worktree_id {
393            server_state.worktree_id = Some(worktree_id);
394        }
395
396        if let Some(server) = server.filter(|_| server_state.io_logs_subscription.is_none()) {
397            let io_tx = self.io_tx.clone();
398            let server_id = server.server_id();
399            server_state.io_logs_subscription = Some(server.on_io(move |io_kind, message| {
400                io_tx
401                    .unbounded_send((server_id, io_kind, message.to_string()))
402                    .ok();
403            }));
404        }
405
406        Some(server_state)
407    }
408
409    pub fn add_language_server_log(
410        &mut self,
411        id: LanguageServerId,
412        typ: MessageType,
413        message: &str,
414        cx: &mut Context<Self>,
415    ) -> Option<()> {
416        let store_logs = !self.on_headless_host;
417        let language_server_state = self.get_language_server_state(id)?;
418
419        let log_lines = &mut language_server_state.log_messages;
420        let message = message.trim_end().to_string();
421        if !store_logs {
422            // Send all messages regardless of the visibility in case of not storing, to notify the receiver anyway
423            self.emit_event(
424                Event::NewServerLogEntry {
425                    id,
426                    kind: LanguageServerLogType::Log(typ),
427                    text: message,
428                },
429                cx,
430            );
431        } else if let Some(new_message) = Self::push_new_message(
432            log_lines,
433            LogMessage { message, typ },
434            language_server_state.log_level,
435        ) {
436            self.emit_event(
437                Event::NewServerLogEntry {
438                    id,
439                    kind: LanguageServerLogType::Log(typ),
440                    text: new_message,
441                },
442                cx,
443            );
444        }
445        Some(())
446    }
447
448    fn add_language_server_trace(
449        &mut self,
450        id: LanguageServerId,
451        message: &str,
452        verbose_info: Option<String>,
453        cx: &mut Context<Self>,
454    ) -> Option<()> {
455        let store_logs = !self.on_headless_host;
456        let language_server_state = self.get_language_server_state(id)?;
457
458        let log_lines = &mut language_server_state.trace_messages;
459        if !store_logs {
460            // Send all messages regardless of the visibility in case of not storing, to notify the receiver anyway
461            self.emit_event(
462                Event::NewServerLogEntry {
463                    id,
464                    kind: LanguageServerLogType::Trace { verbose_info },
465                    text: message.trim().to_string(),
466                },
467                cx,
468            );
469        } else if let Some(new_message) = Self::push_new_message(
470            log_lines,
471            TraceMessage {
472                message: message.trim().to_string(),
473                is_verbose: false,
474            },
475            TraceValue::Messages,
476        ) {
477            if let Some(verbose_message) = verbose_info.as_ref() {
478                Self::push_new_message(
479                    log_lines,
480                    TraceMessage {
481                        message: verbose_message.clone(),
482                        is_verbose: true,
483                    },
484                    TraceValue::Verbose,
485                );
486            }
487            self.emit_event(
488                Event::NewServerLogEntry {
489                    id,
490                    kind: LanguageServerLogType::Trace { verbose_info },
491                    text: new_message,
492                },
493                cx,
494            );
495        }
496        Some(())
497    }
498
499    fn push_new_message<T: Message>(
500        log_lines: &mut VecDeque<T>,
501        message: T,
502        current_severity: <T as Message>::Level,
503    ) -> Option<String> {
504        while log_lines.len() + 1 >= MAX_STORED_LOG_ENTRIES {
505            log_lines.pop_front();
506        }
507        let visible = message.should_include(current_severity);
508
509        let visible_message = visible.then(|| message.as_ref().to_string());
510        log_lines.push_back(message);
511        visible_message
512    }
513
514    fn add_language_server_rpc(
515        &mut self,
516        language_server_id: LanguageServerId,
517        kind: MessageKind,
518        message: &str,
519        cx: &mut Context<'_, Self>,
520    ) {
521        let store_logs = !self.on_headless_host;
522        let Some(state) = self
523            .get_language_server_state(language_server_id)
524            .and_then(|state| state.rpc_state.as_mut())
525        else {
526            return;
527        };
528
529        let received = kind == MessageKind::Receive;
530        let rpc_log_lines = &mut state.rpc_messages;
531        if state.last_message_kind != Some(kind) {
532            while rpc_log_lines.len() + 1 >= MAX_STORED_LOG_ENTRIES {
533                rpc_log_lines.pop_front();
534            }
535            let line_before_message = match kind {
536                MessageKind::Send => SEND_LINE,
537                MessageKind::Receive => RECEIVE_LINE,
538            };
539            if store_logs {
540                rpc_log_lines.push_back(RpcMessage {
541                    message: line_before_message.to_string(),
542                });
543            }
544            // Do not send a synthetic message over the wire, it will be derived from the actual RPC message
545            cx.emit(Event::NewServerLogEntry {
546                id: language_server_id,
547                kind: LanguageServerLogType::Rpc { received },
548                text: line_before_message.to_string(),
549            });
550        }
551
552        while rpc_log_lines.len() + 1 >= MAX_STORED_LOG_ENTRIES {
553            rpc_log_lines.pop_front();
554        }
555
556        if store_logs {
557            rpc_log_lines.push_back(RpcMessage {
558                message: message.trim().to_owned(),
559            });
560        }
561
562        self.emit_event(
563            Event::NewServerLogEntry {
564                id: language_server_id,
565                kind: LanguageServerLogType::Rpc { received },
566                text: message.to_owned(),
567            },
568            cx,
569        );
570    }
571
572    pub fn remove_language_server(&mut self, id: LanguageServerId, cx: &mut Context<Self>) {
573        self.language_servers.remove(&id);
574        cx.notify();
575    }
576
577    pub fn server_logs(&self, server_id: LanguageServerId) -> Option<&VecDeque<LogMessage>> {
578        Some(&self.language_servers.get(&server_id)?.log_messages)
579    }
580
581    pub fn server_trace(&self, server_id: LanguageServerId) -> Option<&VecDeque<TraceMessage>> {
582        Some(&self.language_servers.get(&server_id)?.trace_messages)
583    }
584
585    pub fn server_ids_for_project<'a>(
586        &'a self,
587        lookup_project: &'a WeakEntity<Project>,
588    ) -> impl Iterator<Item = LanguageServerId> + 'a {
589        self.language_servers
590            .iter()
591            .filter_map(move |(id, state)| match &state.kind {
592                LanguageServerKind::Local { project } | LanguageServerKind::Remote { project } => {
593                    if project == lookup_project {
594                        Some(*id)
595                    } else {
596                        None
597                    }
598                }
599                LanguageServerKind::Global | LanguageServerKind::LocalSsh { .. } => Some(*id),
600            })
601    }
602
603    pub fn enable_rpc_trace_for_language_server(
604        &mut self,
605        server_id: LanguageServerId,
606    ) -> Option<&mut LanguageServerRpcState> {
607        let rpc_state = self
608            .language_servers
609            .get_mut(&server_id)?
610            .rpc_state
611            .get_or_insert_with(|| LanguageServerRpcState {
612                rpc_messages: VecDeque::with_capacity(MAX_STORED_LOG_ENTRIES),
613                last_message_kind: None,
614            });
615        Some(rpc_state)
616    }
617
618    pub fn disable_rpc_trace_for_language_server(
619        &mut self,
620        server_id: LanguageServerId,
621    ) -> Option<()> {
622        self.language_servers.get_mut(&server_id)?.rpc_state.take();
623        Some(())
624    }
625
626    pub fn has_server_logs(&self, server: &LanguageServerSelector) -> bool {
627        match server {
628            LanguageServerSelector::Id(id) => self.language_servers.contains_key(id),
629            LanguageServerSelector::Name(name) => self
630                .language_servers
631                .iter()
632                .any(|(_, state)| state.name.as_ref() == Some(name)),
633        }
634    }
635
636    fn on_io(
637        &mut self,
638        language_server_id: LanguageServerId,
639        io_kind: IoKind,
640        message: &str,
641        cx: &mut Context<Self>,
642    ) -> Option<()> {
643        let is_received = match io_kind {
644            IoKind::StdOut => true,
645            IoKind::StdIn => false,
646            IoKind::StdErr => {
647                self.add_language_server_log(language_server_id, MessageType::LOG, message, cx);
648                return Some(());
649            }
650        };
651
652        let kind = if is_received {
653            MessageKind::Receive
654        } else {
655            MessageKind::Send
656        };
657
658        self.add_language_server_rpc(language_server_id, kind, message, cx);
659        cx.notify();
660        Some(())
661    }
662
663    fn emit_event(&mut self, e: Event, cx: &mut Context<Self>) {
664        match &e {
665            Event::NewServerLogEntry { id, kind, text } => {
666                if let Some(state) = self.get_language_server_state(*id) {
667                    let downstream_client = match &state.kind {
668                        LanguageServerKind::Remote { project }
669                        | LanguageServerKind::Local { project } => project
670                            .upgrade()
671                            .map(|project| project.read(cx).lsp_store()),
672                        LanguageServerKind::LocalSsh { lsp_store } => lsp_store.upgrade(),
673                        LanguageServerKind::Global => None,
674                    }
675                    .and_then(|lsp_store| lsp_store.read(cx).downstream_client());
676                    if let Some((client, project_id)) = downstream_client {
677                        if Some(LogKind::from_server_log_type(kind)) == state.toggled_log_kind {
678                            client
679                                .send(proto::LanguageServerLog {
680                                    project_id,
681                                    language_server_id: id.to_proto(),
682                                    message: text.clone(),
683                                    log_type: Some(kind.to_proto()),
684                                })
685                                .ok();
686                        }
687                    }
688                }
689            }
690        }
691
692        cx.emit(e);
693    }
694
695    pub fn toggle_lsp_logs(
696        &mut self,
697        server_id: LanguageServerId,
698        enabled: bool,
699        toggled_log_kind: LogKind,
700    ) {
701        if let Some(server_state) = self.get_language_server_state(server_id) {
702            if enabled {
703                server_state.toggled_log_kind = Some(toggled_log_kind);
704            } else {
705                server_state.toggled_log_kind = None;
706            }
707        }
708        if LogKind::Rpc == toggled_log_kind {
709            if enabled {
710                self.enable_rpc_trace_for_language_server(server_id);
711            } else {
712                self.disable_rpc_trace_for_language_server(server_id);
713            }
714        }
715    }
716}