log_store.rs

  1use std::{collections::VecDeque, sync::Arc};
  2
  3use collections::HashMap;
  4use futures::{StreamExt, channel::mpsc};
  5use gpui::{App, AppContext as _, Context, Entity, EventEmitter, Global, Subscription, WeakEntity};
  6use lsp::{
  7    IoKind, LanguageServer, LanguageServerId, LanguageServerName, LanguageServerSelector,
  8    MessageType, TraceValue,
  9};
 10use rpc::proto;
 11use settings::WorktreeId;
 12
 13use crate::{LanguageServerLogType, LspStore, Project, ProjectItem as _};
 14
 15const SEND_LINE: &str = "\n// Send:";
 16const RECEIVE_LINE: &str = "\n// Receive:";
 17const MAX_STORED_LOG_ENTRIES: usize = 2000;
 18
 19pub fn init(store_logs: bool, cx: &mut App) -> Entity<LogStore> {
 20    let log_store = cx.new(|cx| LogStore::new(store_logs, cx));
 21    cx.set_global(GlobalLogStore(log_store.clone()));
 22    log_store
 23}
 24
 25pub struct GlobalLogStore(pub Entity<LogStore>);
 26
 27impl Global for GlobalLogStore {}
 28
 29#[derive(Debug)]
 30pub enum Event {
 31    NewServerLogEntry {
 32        id: LanguageServerId,
 33        kind: LanguageServerLogType,
 34        text: String,
 35    },
 36}
 37
 38impl EventEmitter<Event> for LogStore {}
 39
 40pub struct LogStore {
 41    store_logs: bool,
 42    projects: HashMap<WeakEntity<Project>, ProjectState>,
 43    pub copilot_log_subscription: Option<lsp::Subscription>,
 44    pub language_servers: HashMap<LanguageServerId, LanguageServerState>,
 45    io_tx: mpsc::UnboundedSender<(LanguageServerId, IoKind, String)>,
 46}
 47
 48struct ProjectState {
 49    _subscriptions: [Subscription; 2],
 50}
 51
 52pub trait Message: AsRef<str> {
 53    type Level: Copy + std::fmt::Debug;
 54    fn should_include(&self, _: Self::Level) -> bool {
 55        true
 56    }
 57}
 58
 59#[derive(Debug)]
 60pub struct LogMessage {
 61    message: String,
 62    typ: MessageType,
 63}
 64
 65impl AsRef<str> for LogMessage {
 66    fn as_ref(&self) -> &str {
 67        &self.message
 68    }
 69}
 70
 71impl Message for LogMessage {
 72    type Level = MessageType;
 73
 74    fn should_include(&self, level: Self::Level) -> bool {
 75        match (self.typ, level) {
 76            (MessageType::ERROR, _) => true,
 77            (_, MessageType::ERROR) => false,
 78            (MessageType::WARNING, _) => true,
 79            (_, MessageType::WARNING) => false,
 80            (MessageType::INFO, _) => true,
 81            (_, MessageType::INFO) => false,
 82            _ => true,
 83        }
 84    }
 85}
 86
 87#[derive(Debug)]
 88pub struct TraceMessage {
 89    message: String,
 90    is_verbose: bool,
 91}
 92
 93impl AsRef<str> for TraceMessage {
 94    fn as_ref(&self) -> &str {
 95        &self.message
 96    }
 97}
 98
 99impl Message for TraceMessage {
100    type Level = TraceValue;
101
102    fn should_include(&self, level: Self::Level) -> bool {
103        match level {
104            TraceValue::Off => false,
105            TraceValue::Messages => !self.is_verbose,
106            TraceValue::Verbose => true,
107        }
108    }
109}
110
111#[derive(Debug)]
112pub struct RpcMessage {
113    message: String,
114}
115
116impl AsRef<str> for RpcMessage {
117    fn as_ref(&self) -> &str {
118        &self.message
119    }
120}
121
122impl Message for RpcMessage {
123    type Level = ();
124}
125
126pub struct LanguageServerState {
127    pub name: Option<LanguageServerName>,
128    pub worktree_id: Option<WorktreeId>,
129    pub kind: LanguageServerKind,
130    log_messages: VecDeque<LogMessage>,
131    trace_messages: VecDeque<TraceMessage>,
132    pub rpc_state: Option<LanguageServerRpcState>,
133    pub trace_level: TraceValue,
134    pub log_level: MessageType,
135    io_logs_subscription: Option<lsp::Subscription>,
136}
137
138impl std::fmt::Debug for LanguageServerState {
139    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
140        f.debug_struct("LanguageServerState")
141            .field("name", &self.name)
142            .field("worktree_id", &self.worktree_id)
143            .field("kind", &self.kind)
144            .field("log_messages", &self.log_messages)
145            .field("trace_messages", &self.trace_messages)
146            .field("rpc_state", &self.rpc_state)
147            .field("trace_level", &self.trace_level)
148            .field("log_level", &self.log_level)
149            .finish_non_exhaustive()
150    }
151}
152
153#[derive(PartialEq, Clone)]
154pub enum LanguageServerKind {
155    Local { project: WeakEntity<Project> },
156    Remote { project: WeakEntity<Project> },
157    LocalSsh { lsp_store: WeakEntity<LspStore> },
158    Global,
159}
160
161impl std::fmt::Debug for LanguageServerKind {
162    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163        match self {
164            LanguageServerKind::Local { .. } => write!(f, "LanguageServerKind::Local"),
165            LanguageServerKind::Remote { .. } => write!(f, "LanguageServerKind::Remote"),
166            LanguageServerKind::LocalSsh { .. } => write!(f, "LanguageServerKind::LocalSsh"),
167            LanguageServerKind::Global => write!(f, "LanguageServerKind::Global"),
168        }
169    }
170}
171
172impl LanguageServerKind {
173    pub fn project(&self) -> Option<&WeakEntity<Project>> {
174        match self {
175            Self::Local { project } => Some(project),
176            Self::Remote { project } => Some(project),
177            Self::LocalSsh { .. } => None,
178            Self::Global { .. } => None,
179        }
180    }
181}
182
183#[derive(Debug)]
184pub struct LanguageServerRpcState {
185    pub rpc_messages: VecDeque<RpcMessage>,
186    last_message_kind: Option<MessageKind>,
187}
188
189#[derive(Debug, Copy, Clone, PartialEq, Eq)]
190enum MessageKind {
191    Send,
192    Receive,
193}
194
195#[derive(Clone, Copy, Debug, Default, PartialEq)]
196pub enum LogKind {
197    Rpc,
198    Trace,
199    #[default]
200    Logs,
201    ServerInfo,
202}
203
204impl LogKind {
205    pub fn from_server_log_type(log_type: &LanguageServerLogType) -> Self {
206        match log_type {
207            LanguageServerLogType::Log(_) => Self::Logs,
208            LanguageServerLogType::Trace { .. } => Self::Trace,
209            LanguageServerLogType::Rpc { .. } => Self::Rpc,
210        }
211    }
212}
213
214impl LogStore {
215    pub fn new(store_logs: bool, cx: &mut Context<Self>) -> Self {
216        let (io_tx, mut io_rx) = mpsc::unbounded();
217
218        let log_store = Self {
219            projects: HashMap::default(),
220            language_servers: HashMap::default(),
221            copilot_log_subscription: None,
222            store_logs,
223            io_tx,
224        };
225        cx.spawn(async move |log_store, cx| {
226            while let Some((server_id, io_kind, message)) = io_rx.next().await {
227                if let Some(log_store) = log_store.upgrade() {
228                    log_store.update(cx, |log_store, cx| {
229                        log_store.on_io(server_id, io_kind, &message, cx);
230                    })?;
231                }
232            }
233            anyhow::Ok(())
234        })
235        .detach_and_log_err(cx);
236
237        log_store
238    }
239
240    pub fn add_project(&mut self, project: &Entity<Project>, cx: &mut Context<Self>) {
241        let weak_project = project.downgrade();
242        self.projects.insert(
243            project.downgrade(),
244            ProjectState {
245                _subscriptions: [
246                    cx.observe_release(project, move |this, _, _| {
247                        this.projects.remove(&weak_project);
248                        this.language_servers
249                            .retain(|_, state| state.kind.project() != Some(&weak_project));
250                    }),
251                    cx.subscribe(project, move |log_store, project, event, cx| {
252                        let server_kind = if project.read(cx).is_local() {
253                            LanguageServerKind::Local {
254                                project: project.downgrade(),
255                            }
256                        } else {
257                            LanguageServerKind::Remote {
258                                project: project.downgrade(),
259                            }
260                        };
261                        match event {
262                            crate::Event::LanguageServerAdded(id, name, worktree_id) => {
263                                log_store.add_language_server(
264                                    server_kind,
265                                    *id,
266                                    Some(name.clone()),
267                                    *worktree_id,
268                                    project
269                                        .read(cx)
270                                        .lsp_store()
271                                        .read(cx)
272                                        .language_server_for_id(*id),
273                                    cx,
274                                );
275                            }
276                            crate::Event::LanguageServerBufferRegistered {
277                                server_id,
278                                buffer_id,
279                                name,
280                                ..
281                            } => {
282                                let worktree_id = project
283                                    .read(cx)
284                                    .buffer_for_id(*buffer_id, cx)
285                                    .and_then(|buffer| {
286                                        Some(buffer.read(cx).project_path(cx)?.worktree_id)
287                                    });
288                                let name = name.clone().or_else(|| {
289                                    project
290                                        .read(cx)
291                                        .lsp_store()
292                                        .read(cx)
293                                        .language_server_statuses
294                                        .get(server_id)
295                                        .map(|status| status.name.clone())
296                                });
297                                log_store.add_language_server(
298                                    server_kind,
299                                    *server_id,
300                                    name,
301                                    worktree_id,
302                                    None,
303                                    cx,
304                                );
305                            }
306                            crate::Event::LanguageServerRemoved(id) => {
307                                log_store.remove_language_server(*id, cx);
308                            }
309                            crate::Event::LanguageServerLog(id, typ, message) => {
310                                log_store.add_language_server(
311                                    server_kind,
312                                    *id,
313                                    None,
314                                    None,
315                                    None,
316                                    cx,
317                                );
318                                match typ {
319                                    crate::LanguageServerLogType::Log(typ) => {
320                                        log_store.add_language_server_log(*id, *typ, message, cx);
321                                    }
322                                    crate::LanguageServerLogType::Trace { verbose_info } => {
323                                        log_store.add_language_server_trace(
324                                            *id,
325                                            message,
326                                            verbose_info.clone(),
327                                            cx,
328                                        );
329                                    }
330                                    crate::LanguageServerLogType::Rpc { received } => {
331                                        let kind = if *received {
332                                            MessageKind::Receive
333                                        } else {
334                                            MessageKind::Send
335                                        };
336                                        log_store.add_language_server_rpc(*id, kind, message, cx);
337                                    }
338                                }
339                            }
340                            crate::Event::ToggleLspLogs { server_id, enabled } => {
341                                // we do not support any other log toggling yet
342                                if *enabled {
343                                    log_store.enable_rpc_trace_for_language_server(*server_id);
344                                } else {
345                                    log_store.disable_rpc_trace_for_language_server(*server_id);
346                                }
347                            }
348                            _ => {}
349                        }
350                    }),
351                ],
352            },
353        );
354    }
355
356    pub fn get_language_server_state(
357        &mut self,
358        id: LanguageServerId,
359    ) -> Option<&mut LanguageServerState> {
360        self.language_servers.get_mut(&id)
361    }
362
363    pub fn add_language_server(
364        &mut self,
365        kind: LanguageServerKind,
366        server_id: LanguageServerId,
367        name: Option<LanguageServerName>,
368        worktree_id: Option<WorktreeId>,
369        server: Option<Arc<LanguageServer>>,
370        cx: &mut Context<Self>,
371    ) -> Option<&mut LanguageServerState> {
372        let server_state = self.language_servers.entry(server_id).or_insert_with(|| {
373            cx.notify();
374            LanguageServerState {
375                name: None,
376                worktree_id: None,
377                kind,
378                rpc_state: None,
379                log_messages: VecDeque::with_capacity(MAX_STORED_LOG_ENTRIES),
380                trace_messages: VecDeque::with_capacity(MAX_STORED_LOG_ENTRIES),
381                trace_level: TraceValue::Off,
382                log_level: MessageType::LOG,
383                io_logs_subscription: None,
384            }
385        });
386
387        if let Some(name) = name {
388            server_state.name = Some(name);
389        }
390        if let Some(worktree_id) = worktree_id {
391            server_state.worktree_id = Some(worktree_id);
392        }
393
394        if let Some(server) = server.filter(|_| server_state.io_logs_subscription.is_none()) {
395            let io_tx = self.io_tx.clone();
396            let server_id = server.server_id();
397            server_state.io_logs_subscription = Some(server.on_io(move |io_kind, message| {
398                io_tx
399                    .unbounded_send((server_id, io_kind, message.to_string()))
400                    .ok();
401            }));
402        }
403
404        Some(server_state)
405    }
406
407    pub fn add_language_server_log(
408        &mut self,
409        id: LanguageServerId,
410        typ: MessageType,
411        message: &str,
412        cx: &mut Context<Self>,
413    ) -> Option<()> {
414        let store_logs = self.store_logs;
415        let language_server_state = self.get_language_server_state(id)?;
416
417        let log_lines = &mut language_server_state.log_messages;
418        let message = message.trim_end().to_string();
419        if !store_logs {
420            // Send all messages regardless of the visibility in case of not storing, to notify the receiver anyway
421            self.emit_event(
422                Event::NewServerLogEntry {
423                    id,
424                    kind: LanguageServerLogType::Log(typ),
425                    text: message,
426                },
427                cx,
428            );
429        } else if let Some(new_message) = Self::push_new_message(
430            log_lines,
431            LogMessage { message, typ },
432            language_server_state.log_level,
433        ) {
434            self.emit_event(
435                Event::NewServerLogEntry {
436                    id,
437                    kind: LanguageServerLogType::Log(typ),
438                    text: new_message,
439                },
440                cx,
441            );
442        }
443        Some(())
444    }
445
446    fn add_language_server_trace(
447        &mut self,
448        id: LanguageServerId,
449        message: &str,
450        verbose_info: Option<String>,
451        cx: &mut Context<Self>,
452    ) -> Option<()> {
453        let store_logs = self.store_logs;
454        let language_server_state = self.get_language_server_state(id)?;
455
456        let log_lines = &mut language_server_state.trace_messages;
457        if !store_logs {
458            // Send all messages regardless of the visibility in case of not storing, to notify the receiver anyway
459            self.emit_event(
460                Event::NewServerLogEntry {
461                    id,
462                    kind: LanguageServerLogType::Trace { verbose_info },
463                    text: message.trim().to_string(),
464                },
465                cx,
466            );
467        } else if let Some(new_message) = Self::push_new_message(
468            log_lines,
469            TraceMessage {
470                message: message.trim().to_string(),
471                is_verbose: false,
472            },
473            TraceValue::Messages,
474        ) {
475            if let Some(verbose_message) = verbose_info.as_ref() {
476                Self::push_new_message(
477                    log_lines,
478                    TraceMessage {
479                        message: verbose_message.clone(),
480                        is_verbose: true,
481                    },
482                    TraceValue::Verbose,
483                );
484            }
485            self.emit_event(
486                Event::NewServerLogEntry {
487                    id,
488                    kind: LanguageServerLogType::Trace { verbose_info },
489                    text: new_message,
490                },
491                cx,
492            );
493        }
494        Some(())
495    }
496
497    fn push_new_message<T: Message>(
498        log_lines: &mut VecDeque<T>,
499        message: T,
500        current_severity: <T as Message>::Level,
501    ) -> Option<String> {
502        while log_lines.len() + 1 >= MAX_STORED_LOG_ENTRIES {
503            log_lines.pop_front();
504        }
505        let visible = message.should_include(current_severity);
506
507        let visible_message = visible.then(|| message.as_ref().to_string());
508        log_lines.push_back(message);
509        visible_message
510    }
511
512    fn add_language_server_rpc(
513        &mut self,
514        language_server_id: LanguageServerId,
515        kind: MessageKind,
516        message: &str,
517        cx: &mut Context<'_, Self>,
518    ) {
519        let store_logs = self.store_logs;
520        let Some(state) = self
521            .get_language_server_state(language_server_id)
522            .and_then(|state| state.rpc_state.as_mut())
523        else {
524            return;
525        };
526
527        let received = kind == MessageKind::Receive;
528        let rpc_log_lines = &mut state.rpc_messages;
529        if state.last_message_kind != Some(kind) {
530            while rpc_log_lines.len() + 1 >= MAX_STORED_LOG_ENTRIES {
531                rpc_log_lines.pop_front();
532            }
533            let line_before_message = match kind {
534                MessageKind::Send => SEND_LINE,
535                MessageKind::Receive => RECEIVE_LINE,
536            };
537            if store_logs {
538                rpc_log_lines.push_back(RpcMessage {
539                    message: line_before_message.to_string(),
540                });
541            }
542            // Do not send a synthetic message over the wire, it will be derived from the actual RPC message
543            cx.emit(Event::NewServerLogEntry {
544                id: language_server_id,
545                kind: LanguageServerLogType::Rpc { received },
546                text: line_before_message.to_string(),
547            });
548        }
549
550        while rpc_log_lines.len() + 1 >= MAX_STORED_LOG_ENTRIES {
551            rpc_log_lines.pop_front();
552        }
553
554        if store_logs {
555            rpc_log_lines.push_back(RpcMessage {
556                message: message.trim().to_owned(),
557            });
558        }
559
560        self.emit_event(
561            Event::NewServerLogEntry {
562                id: language_server_id,
563                kind: LanguageServerLogType::Rpc { received },
564                text: message.to_owned(),
565            },
566            cx,
567        );
568    }
569
570    pub fn remove_language_server(&mut self, id: LanguageServerId, cx: &mut Context<Self>) {
571        self.language_servers.remove(&id);
572        cx.notify();
573    }
574
575    pub fn server_logs(&self, server_id: LanguageServerId) -> Option<&VecDeque<LogMessage>> {
576        Some(&self.language_servers.get(&server_id)?.log_messages)
577    }
578
579    pub fn server_trace(&self, server_id: LanguageServerId) -> Option<&VecDeque<TraceMessage>> {
580        Some(&self.language_servers.get(&server_id)?.trace_messages)
581    }
582
583    pub fn server_ids_for_project<'a>(
584        &'a self,
585        lookup_project: &'a WeakEntity<Project>,
586    ) -> impl Iterator<Item = LanguageServerId> + 'a {
587        self.language_servers
588            .iter()
589            .filter_map(move |(id, state)| match &state.kind {
590                LanguageServerKind::Local { project } | LanguageServerKind::Remote { project } => {
591                    if project == lookup_project {
592                        Some(*id)
593                    } else {
594                        None
595                    }
596                }
597                LanguageServerKind::Global | LanguageServerKind::LocalSsh { .. } => Some(*id),
598            })
599    }
600
601    pub fn enable_rpc_trace_for_language_server(
602        &mut self,
603        server_id: LanguageServerId,
604    ) -> Option<&mut LanguageServerRpcState> {
605        let rpc_state = self
606            .language_servers
607            .get_mut(&server_id)?
608            .rpc_state
609            .get_or_insert_with(|| LanguageServerRpcState {
610                rpc_messages: VecDeque::with_capacity(MAX_STORED_LOG_ENTRIES),
611                last_message_kind: None,
612            });
613        Some(rpc_state)
614    }
615
616    pub fn disable_rpc_trace_for_language_server(
617        &mut self,
618        server_id: LanguageServerId,
619    ) -> Option<()> {
620        self.language_servers.get_mut(&server_id)?.rpc_state.take();
621        Some(())
622    }
623
624    pub fn has_server_logs(&self, server: &LanguageServerSelector) -> bool {
625        match server {
626            LanguageServerSelector::Id(id) => self.language_servers.contains_key(id),
627            LanguageServerSelector::Name(name) => self
628                .language_servers
629                .iter()
630                .any(|(_, state)| state.name.as_ref() == Some(name)),
631        }
632    }
633
634    fn on_io(
635        &mut self,
636        language_server_id: LanguageServerId,
637        io_kind: IoKind,
638        message: &str,
639        cx: &mut Context<Self>,
640    ) -> Option<()> {
641        let is_received = match io_kind {
642            IoKind::StdOut => true,
643            IoKind::StdIn => false,
644            IoKind::StdErr => {
645                self.add_language_server_log(language_server_id, MessageType::LOG, message, cx);
646                return Some(());
647            }
648        };
649
650        let kind = if is_received {
651            MessageKind::Receive
652        } else {
653            MessageKind::Send
654        };
655
656        self.add_language_server_rpc(language_server_id, kind, message, cx);
657        cx.notify();
658        Some(())
659    }
660
661    fn emit_event(&mut self, e: Event, cx: &mut Context<Self>) {
662        match &e {
663            Event::NewServerLogEntry { id, kind, text } => {
664                if let Some(state) = self.get_language_server_state(*id) {
665                    let downstream_client = match &state.kind {
666                        LanguageServerKind::Remote { project }
667                        | LanguageServerKind::Local { project } => project
668                            .upgrade()
669                            .map(|project| project.read(cx).lsp_store()),
670                        LanguageServerKind::LocalSsh { lsp_store } => lsp_store.upgrade(),
671                        LanguageServerKind::Global => None,
672                    }
673                    .and_then(|lsp_store| lsp_store.read(cx).downstream_client());
674                    if let Some((client, project_id)) = downstream_client {
675                        client
676                            .send(proto::LanguageServerLog {
677                                project_id,
678                                language_server_id: id.to_proto(),
679                                message: text.clone(),
680                                log_type: Some(kind.to_proto()),
681                            })
682                            .ok();
683                    }
684                }
685            }
686        }
687
688        cx.emit(e);
689    }
690}