log_store.rs

  1use std::{collections::VecDeque, sync::Arc};
  2
  3use collections::HashMap;
  4use futures::{StreamExt, channel::mpsc};
  5use gpui::{App, AppContext as _, Context, Entity, EventEmitter, Global, Subscription, WeakEntity};
  6use lsp::{
  7    IoKind, LanguageServer, LanguageServerId, LanguageServerName, LanguageServerSelector,
  8    MessageType, TraceValue,
  9};
 10use rpc::proto;
 11use settings::WorktreeId;
 12
 13use crate::{LanguageServerLogType, LspStore, Project, ProjectItem as _};
 14
 15const SEND_LINE: &str = "\n// Send:";
 16const RECEIVE_LINE: &str = "\n// Receive:";
 17const MAX_STORED_LOG_ENTRIES: usize = 2000;
 18
 19const RPC_MESSAGES: &str = "RPC Messages";
 20const SERVER_LOGS: &str = "Server Logs";
 21const SERVER_TRACE: &str = "Server Trace";
 22const SERVER_INFO: &str = "Server Info";
 23
 24pub fn init(store_logs: bool, cx: &mut App) -> Entity<LogStore> {
 25    let log_store = cx.new(|cx| LogStore::new(store_logs, cx));
 26    cx.set_global(GlobalLogStore(log_store.clone()));
 27    log_store
 28}
 29
 30pub struct GlobalLogStore(pub Entity<LogStore>);
 31
 32impl Global for GlobalLogStore {}
 33
 34#[derive(Debug)]
 35pub enum Event {
 36    NewServerLogEntry {
 37        id: LanguageServerId,
 38        kind: LanguageServerLogType,
 39        text: String,
 40    },
 41}
 42
 43impl EventEmitter<Event> for LogStore {}
 44
 45pub struct LogStore {
 46    store_logs: bool,
 47    projects: HashMap<WeakEntity<Project>, ProjectState>,
 48    pub copilot_log_subscription: Option<lsp::Subscription>,
 49    pub language_servers: HashMap<LanguageServerId, LanguageServerState>,
 50    io_tx: mpsc::UnboundedSender<(LanguageServerId, IoKind, String)>,
 51}
 52
 53struct ProjectState {
 54    _subscriptions: [Subscription; 2],
 55}
 56
 57pub trait Message: AsRef<str> {
 58    type Level: Copy + std::fmt::Debug;
 59    fn should_include(&self, _: Self::Level) -> bool {
 60        true
 61    }
 62}
 63
 64#[derive(Debug)]
 65pub struct LogMessage {
 66    message: String,
 67    typ: MessageType,
 68}
 69
 70impl AsRef<str> for LogMessage {
 71    fn as_ref(&self) -> &str {
 72        &self.message
 73    }
 74}
 75
 76impl Message for LogMessage {
 77    type Level = MessageType;
 78
 79    fn should_include(&self, level: Self::Level) -> bool {
 80        match (self.typ, level) {
 81            (MessageType::ERROR, _) => true,
 82            (_, MessageType::ERROR) => false,
 83            (MessageType::WARNING, _) => true,
 84            (_, MessageType::WARNING) => false,
 85            (MessageType::INFO, _) => true,
 86            (_, MessageType::INFO) => false,
 87            _ => true,
 88        }
 89    }
 90}
 91
 92#[derive(Debug)]
 93pub struct TraceMessage {
 94    message: String,
 95    is_verbose: bool,
 96}
 97
 98impl AsRef<str> for TraceMessage {
 99    fn as_ref(&self) -> &str {
100        &self.message
101    }
102}
103
104impl Message for TraceMessage {
105    type Level = TraceValue;
106
107    fn should_include(&self, level: Self::Level) -> bool {
108        match level {
109            TraceValue::Off => false,
110            TraceValue::Messages => !self.is_verbose,
111            TraceValue::Verbose => true,
112        }
113    }
114}
115
116#[derive(Debug)]
117pub struct RpcMessage {
118    message: String,
119}
120
121impl AsRef<str> for RpcMessage {
122    fn as_ref(&self) -> &str {
123        &self.message
124    }
125}
126
127impl Message for RpcMessage {
128    type Level = ();
129}
130
131pub struct LanguageServerState {
132    pub name: Option<LanguageServerName>,
133    pub worktree_id: Option<WorktreeId>,
134    pub kind: LanguageServerKind,
135    log_messages: VecDeque<LogMessage>,
136    trace_messages: VecDeque<TraceMessage>,
137    pub rpc_state: Option<LanguageServerRpcState>,
138    pub trace_level: TraceValue,
139    pub log_level: MessageType,
140    io_logs_subscription: Option<lsp::Subscription>,
141}
142
143impl std::fmt::Debug for LanguageServerState {
144    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145        f.debug_struct("LanguageServerState")
146            .field("name", &self.name)
147            .field("worktree_id", &self.worktree_id)
148            .field("kind", &self.kind)
149            .field("log_messages", &self.log_messages)
150            .field("trace_messages", &self.trace_messages)
151            .field("rpc_state", &self.rpc_state)
152            .field("trace_level", &self.trace_level)
153            .field("log_level", &self.log_level)
154            .finish_non_exhaustive()
155    }
156}
157
158#[derive(PartialEq, Clone)]
159pub enum LanguageServerKind {
160    Local { project: WeakEntity<Project> },
161    Remote { project: WeakEntity<Project> },
162    LocalSsh { lsp_store: WeakEntity<LspStore> },
163    Global,
164}
165
166impl std::fmt::Debug for LanguageServerKind {
167    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168        match self {
169            LanguageServerKind::Local { .. } => write!(f, "LanguageServerKind::Local"),
170            LanguageServerKind::Remote { .. } => write!(f, "LanguageServerKind::Remote"),
171            LanguageServerKind::LocalSsh { .. } => write!(f, "LanguageServerKind::LocalSsh"),
172            LanguageServerKind::Global => write!(f, "LanguageServerKind::Global"),
173        }
174    }
175}
176
177impl LanguageServerKind {
178    pub fn project(&self) -> Option<&WeakEntity<Project>> {
179        match self {
180            Self::Local { project } => Some(project),
181            Self::Remote { project } => Some(project),
182            Self::LocalSsh { .. } => None,
183            Self::Global { .. } => None,
184        }
185    }
186}
187
188#[derive(Debug)]
189pub struct LanguageServerRpcState {
190    pub rpc_messages: VecDeque<RpcMessage>,
191    last_message_kind: Option<MessageKind>,
192}
193
194#[derive(Debug, Copy, Clone, PartialEq, Eq)]
195enum MessageKind {
196    Send,
197    Receive,
198}
199
200#[derive(Clone, Copy, Debug, Default, PartialEq)]
201pub enum LogKind {
202    Rpc,
203    Trace,
204    #[default]
205    Logs,
206    ServerInfo,
207}
208
209impl LogKind {
210    pub fn from_server_log_type(log_type: &LanguageServerLogType) -> Self {
211        match log_type {
212            LanguageServerLogType::Log(_) => Self::Logs,
213            LanguageServerLogType::Trace { .. } => Self::Trace,
214            LanguageServerLogType::Rpc { .. } => Self::Rpc,
215        }
216    }
217
218    pub fn label(&self) -> &'static str {
219        match self {
220            LogKind::Rpc => RPC_MESSAGES,
221            LogKind::Trace => SERVER_TRACE,
222            LogKind::Logs => SERVER_LOGS,
223            LogKind::ServerInfo => SERVER_INFO,
224        }
225    }
226}
227
228impl LogStore {
229    pub fn new(store_logs: bool, cx: &mut Context<Self>) -> Self {
230        let (io_tx, mut io_rx) = mpsc::unbounded();
231
232        let log_store = Self {
233            projects: HashMap::default(),
234            language_servers: HashMap::default(),
235            copilot_log_subscription: None,
236            store_logs,
237            io_tx,
238        };
239        cx.spawn(async move |log_store, cx| {
240            while let Some((server_id, io_kind, message)) = io_rx.next().await {
241                if let Some(log_store) = log_store.upgrade() {
242                    log_store.update(cx, |log_store, cx| {
243                        log_store.on_io(server_id, io_kind, &message, cx);
244                    })?;
245                }
246            }
247            anyhow::Ok(())
248        })
249        .detach_and_log_err(cx);
250
251        log_store
252    }
253
254    pub fn add_project(&mut self, project: &Entity<Project>, cx: &mut Context<Self>) {
255        let weak_project = project.downgrade();
256        self.projects.insert(
257            project.downgrade(),
258            ProjectState {
259                _subscriptions: [
260                    cx.observe_release(project, move |this, _, _| {
261                        this.projects.remove(&weak_project);
262                        this.language_servers
263                            .retain(|_, state| state.kind.project() != Some(&weak_project));
264                    }),
265                    cx.subscribe(project, move |log_store, project, event, cx| {
266                        let server_kind = if project.read(cx).is_local() {
267                            LanguageServerKind::Local {
268                                project: project.downgrade(),
269                            }
270                        } else {
271                            LanguageServerKind::Remote {
272                                project: project.downgrade(),
273                            }
274                        };
275                        match event {
276                            crate::Event::LanguageServerAdded(id, name, worktree_id) => {
277                                log_store.add_language_server(
278                                    server_kind,
279                                    *id,
280                                    Some(name.clone()),
281                                    *worktree_id,
282                                    project
283                                        .read(cx)
284                                        .lsp_store()
285                                        .read(cx)
286                                        .language_server_for_id(*id),
287                                    cx,
288                                );
289                            }
290                            crate::Event::LanguageServerBufferRegistered {
291                                server_id,
292                                buffer_id,
293                                name,
294                                ..
295                            } => {
296                                let worktree_id = project
297                                    .read(cx)
298                                    .buffer_for_id(*buffer_id, cx)
299                                    .and_then(|buffer| {
300                                        Some(buffer.read(cx).project_path(cx)?.worktree_id)
301                                    });
302                                let name = name.clone().or_else(|| {
303                                    project
304                                        .read(cx)
305                                        .lsp_store()
306                                        .read(cx)
307                                        .language_server_statuses
308                                        .get(server_id)
309                                        .map(|status| status.name.clone())
310                                });
311                                log_store.add_language_server(
312                                    server_kind,
313                                    *server_id,
314                                    name,
315                                    worktree_id,
316                                    None,
317                                    cx,
318                                );
319                            }
320                            crate::Event::LanguageServerRemoved(id) => {
321                                log_store.remove_language_server(*id, cx);
322                            }
323                            crate::Event::LanguageServerLog(id, typ, message) => {
324                                log_store.add_language_server(
325                                    server_kind,
326                                    *id,
327                                    None,
328                                    None,
329                                    None,
330                                    cx,
331                                );
332                                match typ {
333                                    crate::LanguageServerLogType::Log(typ) => {
334                                        log_store.add_language_server_log(*id, *typ, message, cx);
335                                    }
336                                    crate::LanguageServerLogType::Trace { verbose_info } => {
337                                        log_store.add_language_server_trace(
338                                            *id,
339                                            message,
340                                            verbose_info.clone(),
341                                            cx,
342                                        );
343                                    }
344                                    crate::LanguageServerLogType::Rpc { received } => {
345                                        let kind = if *received {
346                                            MessageKind::Receive
347                                        } else {
348                                            MessageKind::Send
349                                        };
350                                        log_store.add_language_server_rpc(*id, kind, message, cx);
351                                    }
352                                }
353                            }
354                            crate::Event::ToggleLspLogs { server_id, enabled } => {
355                                // we do not support any other log toggling yet
356                                if *enabled {
357                                    log_store.enable_rpc_trace_for_language_server(*server_id);
358                                } else {
359                                    log_store.disable_rpc_trace_for_language_server(*server_id);
360                                }
361                            }
362                            _ => {}
363                        }
364                    }),
365                ],
366            },
367        );
368    }
369
370    pub fn get_language_server_state(
371        &mut self,
372        id: LanguageServerId,
373    ) -> Option<&mut LanguageServerState> {
374        self.language_servers.get_mut(&id)
375    }
376
377    pub fn add_language_server(
378        &mut self,
379        kind: LanguageServerKind,
380        server_id: LanguageServerId,
381        name: Option<LanguageServerName>,
382        worktree_id: Option<WorktreeId>,
383        server: Option<Arc<LanguageServer>>,
384        cx: &mut Context<Self>,
385    ) -> Option<&mut LanguageServerState> {
386        let server_state = self.language_servers.entry(server_id).or_insert_with(|| {
387            cx.notify();
388            LanguageServerState {
389                name: None,
390                worktree_id: None,
391                kind,
392                rpc_state: None,
393                log_messages: VecDeque::with_capacity(MAX_STORED_LOG_ENTRIES),
394                trace_messages: VecDeque::with_capacity(MAX_STORED_LOG_ENTRIES),
395                trace_level: TraceValue::Off,
396                log_level: MessageType::LOG,
397                io_logs_subscription: None,
398            }
399        });
400
401        if let Some(name) = name {
402            server_state.name = Some(name);
403        }
404        if let Some(worktree_id) = worktree_id {
405            server_state.worktree_id = Some(worktree_id);
406        }
407
408        if let Some(server) = server.filter(|_| server_state.io_logs_subscription.is_none()) {
409            let io_tx = self.io_tx.clone();
410            let server_id = server.server_id();
411            server_state.io_logs_subscription = Some(server.on_io(move |io_kind, message| {
412                io_tx
413                    .unbounded_send((server_id, io_kind, message.to_string()))
414                    .ok();
415            }));
416        }
417
418        Some(server_state)
419    }
420
421    pub fn add_language_server_log(
422        &mut self,
423        id: LanguageServerId,
424        typ: MessageType,
425        message: &str,
426        cx: &mut Context<Self>,
427    ) -> Option<()> {
428        let store_logs = self.store_logs;
429        let language_server_state = self.get_language_server_state(id)?;
430
431        let log_lines = &mut language_server_state.log_messages;
432        let message = message.trim_end().to_string();
433        if !store_logs {
434            // Send all messages regardless of the visibility in case of not storing, to notify the receiver anyway
435            self.emit_event(
436                Event::NewServerLogEntry {
437                    id,
438                    kind: LanguageServerLogType::Log(typ),
439                    text: message,
440                },
441                cx,
442            );
443        } else if let Some(new_message) = Self::push_new_message(
444            log_lines,
445            LogMessage { message, typ },
446            language_server_state.log_level,
447        ) {
448            self.emit_event(
449                Event::NewServerLogEntry {
450                    id,
451                    kind: LanguageServerLogType::Log(typ),
452                    text: new_message,
453                },
454                cx,
455            );
456        }
457        Some(())
458    }
459
460    fn add_language_server_trace(
461        &mut self,
462        id: LanguageServerId,
463        message: &str,
464        verbose_info: Option<String>,
465        cx: &mut Context<Self>,
466    ) -> Option<()> {
467        let store_logs = self.store_logs;
468        let language_server_state = self.get_language_server_state(id)?;
469
470        let log_lines = &mut language_server_state.trace_messages;
471        if !store_logs {
472            // Send all messages regardless of the visibility in case of not storing, to notify the receiver anyway
473            self.emit_event(
474                Event::NewServerLogEntry {
475                    id,
476                    kind: LanguageServerLogType::Trace { verbose_info },
477                    text: message.trim().to_string(),
478                },
479                cx,
480            );
481        } else if let Some(new_message) = Self::push_new_message(
482            log_lines,
483            TraceMessage {
484                message: message.trim().to_string(),
485                is_verbose: false,
486            },
487            TraceValue::Messages,
488        ) {
489            if let Some(verbose_message) = verbose_info.as_ref() {
490                Self::push_new_message(
491                    log_lines,
492                    TraceMessage {
493                        message: verbose_message.clone(),
494                        is_verbose: true,
495                    },
496                    TraceValue::Verbose,
497                );
498            }
499            self.emit_event(
500                Event::NewServerLogEntry {
501                    id,
502                    kind: LanguageServerLogType::Trace { verbose_info },
503                    text: new_message,
504                },
505                cx,
506            );
507        }
508        Some(())
509    }
510
511    fn push_new_message<T: Message>(
512        log_lines: &mut VecDeque<T>,
513        message: T,
514        current_severity: <T as Message>::Level,
515    ) -> Option<String> {
516        while log_lines.len() + 1 >= MAX_STORED_LOG_ENTRIES {
517            log_lines.pop_front();
518        }
519        let visible = message.should_include(current_severity);
520
521        let visible_message = visible.then(|| message.as_ref().to_string());
522        log_lines.push_back(message);
523        visible_message
524    }
525
526    fn add_language_server_rpc(
527        &mut self,
528        language_server_id: LanguageServerId,
529        kind: MessageKind,
530        message: &str,
531        cx: &mut Context<'_, Self>,
532    ) {
533        let store_logs = self.store_logs;
534        let Some(state) = self
535            .get_language_server_state(language_server_id)
536            .and_then(|state| state.rpc_state.as_mut())
537        else {
538            return;
539        };
540
541        let received = kind == MessageKind::Receive;
542        let rpc_log_lines = &mut state.rpc_messages;
543        if state.last_message_kind != Some(kind) {
544            while rpc_log_lines.len() + 1 >= MAX_STORED_LOG_ENTRIES {
545                rpc_log_lines.pop_front();
546            }
547            let line_before_message = match kind {
548                MessageKind::Send => SEND_LINE,
549                MessageKind::Receive => RECEIVE_LINE,
550            };
551            if store_logs {
552                rpc_log_lines.push_back(RpcMessage {
553                    message: line_before_message.to_string(),
554                });
555            }
556            // Do not send a synthetic message over the wire, it will be derived from the actual RPC message
557            cx.emit(Event::NewServerLogEntry {
558                id: language_server_id,
559                kind: LanguageServerLogType::Rpc { received },
560                text: line_before_message.to_string(),
561            });
562        }
563
564        while rpc_log_lines.len() + 1 >= MAX_STORED_LOG_ENTRIES {
565            rpc_log_lines.pop_front();
566        }
567
568        if store_logs {
569            rpc_log_lines.push_back(RpcMessage {
570                message: message.trim().to_owned(),
571            });
572        }
573
574        self.emit_event(
575            Event::NewServerLogEntry {
576                id: language_server_id,
577                kind: LanguageServerLogType::Rpc { received },
578                text: message.to_owned(),
579            },
580            cx,
581        );
582    }
583
584    pub fn remove_language_server(&mut self, id: LanguageServerId, cx: &mut Context<Self>) {
585        self.language_servers.remove(&id);
586        cx.notify();
587    }
588
589    pub fn server_logs(&self, server_id: LanguageServerId) -> Option<&VecDeque<LogMessage>> {
590        Some(&self.language_servers.get(&server_id)?.log_messages)
591    }
592
593    pub fn server_trace(&self, server_id: LanguageServerId) -> Option<&VecDeque<TraceMessage>> {
594        Some(&self.language_servers.get(&server_id)?.trace_messages)
595    }
596
597    pub fn server_ids_for_project<'a>(
598        &'a self,
599        lookup_project: &'a WeakEntity<Project>,
600    ) -> impl Iterator<Item = LanguageServerId> + 'a {
601        self.language_servers
602            .iter()
603            .filter_map(move |(id, state)| match &state.kind {
604                LanguageServerKind::Local { project } | LanguageServerKind::Remote { project } => {
605                    if project == lookup_project {
606                        Some(*id)
607                    } else {
608                        None
609                    }
610                }
611                LanguageServerKind::Global | LanguageServerKind::LocalSsh { .. } => Some(*id),
612            })
613    }
614
615    pub fn enable_rpc_trace_for_language_server(
616        &mut self,
617        server_id: LanguageServerId,
618    ) -> Option<&mut LanguageServerRpcState> {
619        let rpc_state = self
620            .language_servers
621            .get_mut(&server_id)?
622            .rpc_state
623            .get_or_insert_with(|| LanguageServerRpcState {
624                rpc_messages: VecDeque::with_capacity(MAX_STORED_LOG_ENTRIES),
625                last_message_kind: None,
626            });
627        Some(rpc_state)
628    }
629
630    pub fn disable_rpc_trace_for_language_server(
631        &mut self,
632        server_id: LanguageServerId,
633    ) -> Option<()> {
634        self.language_servers.get_mut(&server_id)?.rpc_state.take();
635        Some(())
636    }
637
638    pub fn has_server_logs(&self, server: &LanguageServerSelector) -> bool {
639        match server {
640            LanguageServerSelector::Id(id) => self.language_servers.contains_key(id),
641            LanguageServerSelector::Name(name) => self
642                .language_servers
643                .iter()
644                .any(|(_, state)| state.name.as_ref() == Some(name)),
645        }
646    }
647
648    fn on_io(
649        &mut self,
650        language_server_id: LanguageServerId,
651        io_kind: IoKind,
652        message: &str,
653        cx: &mut Context<Self>,
654    ) -> Option<()> {
655        let is_received = match io_kind {
656            IoKind::StdOut => true,
657            IoKind::StdIn => false,
658            IoKind::StdErr => {
659                self.add_language_server_log(language_server_id, MessageType::LOG, message, cx);
660                return Some(());
661            }
662        };
663
664        let kind = if is_received {
665            MessageKind::Receive
666        } else {
667            MessageKind::Send
668        };
669
670        self.add_language_server_rpc(language_server_id, kind, message, cx);
671        cx.notify();
672        Some(())
673    }
674
675    fn emit_event(&mut self, e: Event, cx: &mut Context<Self>) {
676        match &e {
677            Event::NewServerLogEntry { id, kind, text } => {
678                if let Some(state) = self.get_language_server_state(*id) {
679                    let downstream_client = match &state.kind {
680                        LanguageServerKind::Remote { project }
681                        | LanguageServerKind::Local { project } => project
682                            .upgrade()
683                            .map(|project| project.read(cx).lsp_store()),
684                        LanguageServerKind::LocalSsh { lsp_store } => lsp_store.upgrade(),
685                        LanguageServerKind::Global => None,
686                    }
687                    .and_then(|lsp_store| lsp_store.read(cx).downstream_client());
688                    if let Some((client, project_id)) = downstream_client {
689                        client
690                            .send(proto::LanguageServerLog {
691                                project_id,
692                                language_server_id: id.to_proto(),
693                                message: text.clone(),
694                                log_type: Some(kind.to_proto()),
695                            })
696                            .ok();
697                    }
698                }
699            }
700        }
701
702        cx.emit(e);
703    }
704}