thread_store.rs

  1use std::sync::Arc;
  2
  3use anyhow::Result;
  4use assistant_tool::{ToolId, ToolWorkingSet};
  5use collections::HashMap;
  6use context_server::manager::ContextServerManager;
  7use context_server::{ContextServerFactoryRegistry, ContextServerTool};
  8use gpui::{prelude::*, AppContext, Model, ModelContext, Task};
  9use project::Project;
 10use unindent::Unindent;
 11use util::ResultExt as _;
 12
 13use crate::thread::{Thread, ThreadId};
 14
 15pub struct ThreadStore {
 16    #[allow(unused)]
 17    project: Model<Project>,
 18    tools: Arc<ToolWorkingSet>,
 19    context_server_manager: Model<ContextServerManager>,
 20    context_server_tool_ids: HashMap<Arc<str>, Vec<ToolId>>,
 21    threads: Vec<Model<Thread>>,
 22}
 23
 24impl ThreadStore {
 25    pub fn new(
 26        project: Model<Project>,
 27        tools: Arc<ToolWorkingSet>,
 28        cx: &mut AppContext,
 29    ) -> Task<Result<Model<Self>>> {
 30        cx.spawn(|mut cx| async move {
 31            let this = cx.new_model(|cx: &mut ModelContext<Self>| {
 32                let context_server_factory_registry =
 33                    ContextServerFactoryRegistry::default_global(cx);
 34                let context_server_manager = cx.new_model(|cx| {
 35                    ContextServerManager::new(context_server_factory_registry, project.clone(), cx)
 36                });
 37
 38                let mut this = Self {
 39                    project,
 40                    tools,
 41                    context_server_manager,
 42                    context_server_tool_ids: HashMap::default(),
 43                    threads: Vec::new(),
 44                };
 45                this.mock_recent_threads(cx);
 46                this.register_context_server_handlers(cx);
 47
 48                this
 49            })?;
 50
 51            Ok(this)
 52        })
 53    }
 54
 55    /// Returns the number of non-empty threads.
 56    pub fn non_empty_len(&self, cx: &AppContext) -> usize {
 57        self.threads
 58            .iter()
 59            .filter(|thread| !thread.read(cx).is_empty())
 60            .count()
 61    }
 62
 63    pub fn threads(&self, cx: &ModelContext<Self>) -> Vec<Model<Thread>> {
 64        let mut threads = self
 65            .threads
 66            .iter()
 67            .filter(|thread| !thread.read(cx).is_empty())
 68            .cloned()
 69            .collect::<Vec<_>>();
 70        threads.sort_unstable_by_key(|thread| std::cmp::Reverse(thread.read(cx).updated_at()));
 71        threads
 72    }
 73
 74    pub fn recent_threads(&self, limit: usize, cx: &ModelContext<Self>) -> Vec<Model<Thread>> {
 75        self.threads(cx).into_iter().take(limit).collect()
 76    }
 77
 78    pub fn create_thread(&mut self, cx: &mut ModelContext<Self>) -> Model<Thread> {
 79        let thread = cx.new_model(|cx| Thread::new(self.tools.clone(), cx));
 80        self.threads.push(thread.clone());
 81        thread
 82    }
 83
 84    pub fn open_thread(&self, id: &ThreadId, cx: &mut ModelContext<Self>) -> Option<Model<Thread>> {
 85        self.threads
 86            .iter()
 87            .find(|thread| thread.read(cx).id() == id)
 88            .cloned()
 89    }
 90
 91    pub fn delete_thread(&mut self, id: &ThreadId, cx: &mut ModelContext<Self>) {
 92        self.threads.retain(|thread| thread.read(cx).id() != id);
 93    }
 94
 95    fn register_context_server_handlers(&self, cx: &mut ModelContext<Self>) {
 96        cx.subscribe(
 97            &self.context_server_manager.clone(),
 98            Self::handle_context_server_event,
 99        )
100        .detach();
101    }
102
103    fn handle_context_server_event(
104        &mut self,
105        context_server_manager: Model<ContextServerManager>,
106        event: &context_server::manager::Event,
107        cx: &mut ModelContext<Self>,
108    ) {
109        let tool_working_set = self.tools.clone();
110        match event {
111            context_server::manager::Event::ServerStarted { server_id } => {
112                if let Some(server) = context_server_manager.read(cx).get_server(server_id) {
113                    let context_server_manager = context_server_manager.clone();
114                    cx.spawn({
115                        let server = server.clone();
116                        let server_id = server_id.clone();
117                        |this, mut cx| async move {
118                            let Some(protocol) = server.client() else {
119                                return;
120                            };
121
122                            if protocol.capable(context_server::protocol::ServerCapability::Tools) {
123                                if let Some(tools) = protocol.list_tools().await.log_err() {
124                                    let tool_ids = tools
125                                        .tools
126                                        .into_iter()
127                                        .map(|tool| {
128                                            log::info!(
129                                                "registering context server tool: {:?}",
130                                                tool.name
131                                            );
132                                            tool_working_set.insert(Arc::new(
133                                                ContextServerTool::new(
134                                                    context_server_manager.clone(),
135                                                    server.id(),
136                                                    tool,
137                                                ),
138                                            ))
139                                        })
140                                        .collect::<Vec<_>>();
141
142                                    this.update(&mut cx, |this, _cx| {
143                                        this.context_server_tool_ids.insert(server_id, tool_ids);
144                                    })
145                                    .log_err();
146                                }
147                            }
148                        }
149                    })
150                    .detach();
151                }
152            }
153            context_server::manager::Event::ServerStopped { server_id } => {
154                if let Some(tool_ids) = self.context_server_tool_ids.remove(server_id) {
155                    tool_working_set.remove(&tool_ids);
156                }
157            }
158        }
159    }
160}
161
162impl ThreadStore {
163    /// Creates some mocked recent threads for testing purposes.
164    fn mock_recent_threads(&mut self, cx: &mut ModelContext<Self>) {
165        use language_model::Role;
166
167        self.threads.push(cx.new_model(|cx| {
168            let mut thread = Thread::new(self.tools.clone(), cx);
169            thread.set_summary("Introduction to quantum computing", cx);
170            thread.insert_user_message("Hello! Can you help me understand quantum computing?", Vec::new(), cx);
171            thread.insert_message(Role::Assistant, "Of course! I'd be happy to help you understand quantum computing. Quantum computing is a fascinating field that uses the principles of quantum mechanics to process information. Unlike classical computers that use bits (0s and 1s), quantum computers use quantum bits or 'qubits'. These qubits can exist in multiple states simultaneously, a property called superposition. This allows quantum computers to perform certain calculations much faster than classical computers. What specific aspect of quantum computing would you like to know more about?", cx);
172            thread.insert_user_message("That's interesting! Can you explain how quantum entanglement is used in quantum computing?", Vec::new(), cx);
173            thread.insert_message(Role::Assistant, "Certainly! Quantum entanglement is a key principle used in quantum computing. When two qubits become entangled, the state of one qubit is directly related to the state of the other, regardless of the distance between them. This property is used in quantum computing to create complex quantum states and to perform operations on multiple qubits simultaneously. Entanglement allows quantum computers to process information in ways that classical computers cannot, potentially solving certain problems much more efficiently. For example, it's crucial in quantum error correction and in algorithms like quantum teleportation, which is important for quantum communication.", cx);
174            thread
175        }));
176
177        self.threads.push(cx.new_model(|cx| {
178            let mut thread = Thread::new(self.tools.clone(), cx);
179            thread.set_summary("Rust web development and async programming", cx);
180            thread.insert_user_message("Can you show me an example of Rust code for a simple web server?", Vec::new(), cx);
181            thread.insert_message(Role::Assistant, "Certainly! Here's an example of a simple web server in Rust using the `actix-web` framework:
182
183        ```rust
184        use actix_web::{web, App, HttpResponse, HttpServer, Responder};
185
186        async fn hello() -> impl Responder {
187            HttpResponse::Ok().body(\"Hello, World!\")
188        }
189
190        #[actix_web::main]
191        async fn main() -> std::io::Result<()> {
192            HttpServer::new(|| {
193                App::new()
194                    .route(\"/\", web::get().to(hello))
195            })
196            .bind(\"127.0.0.1:8080\")?
197            .run()
198            .await
199        }
200        ```
201
202        This code creates a basic web server that responds with 'Hello, World!' when you access the root URL. Here's a breakdown of what's happening:
203
204        1. We import necessary items from the `actix-web` crate.
205        2. We define an async `hello` function that returns a simple HTTP response.
206        3. In the `main` function, we set up the server to listen on `127.0.0.1:8080`.
207        4. We configure the app to respond to GET requests on the root path with our `hello` function.
208
209        To run this, you'd need to add `actix-web` to your `Cargo.toml` dependencies:
210
211        ```toml
212        [dependencies]
213        actix-web = \"4.0\"
214        ```
215
216        Then you can run the server with `cargo run` and access it at `http://localhost:8080`.".unindent(), cx);
217            thread.insert_user_message("That's great! Can you explain more about async functions in Rust?", Vec::new(), cx);
218            thread.insert_message(Role::Assistant, "Certainly! Async functions are a key feature in Rust for writing efficient, non-blocking code, especially for I/O-bound operations. Here's an overview:
219
220        1. **Syntax**: Async functions are declared using the `async` keyword:
221
222           ```rust
223           async fn my_async_function() -> Result<(), Error> {
224               // Asynchronous code here
225           }
226           ```
227
228        2. **Futures**: Async functions return a `Future`. A `Future` represents a value that may not be available yet but will be at some point.
229
230        3. **Await**: Inside an async function, you can use the `.await` syntax to wait for other async operations to complete:
231
232           ```rust
233           async fn fetch_data() -> Result<String, Error> {
234               let response = make_http_request().await?;
235               let data = process_response(response).await?;
236               Ok(data)
237           }
238           ```
239
240        4. **Non-blocking**: Async functions allow the runtime to work on other tasks while waiting for I/O or other operations to complete, making efficient use of system resources.
241
242        5. **Runtime**: To execute async code, you need a runtime like `tokio` or `async-std`. Actix-web, which we used in the previous example, includes its own runtime.
243
244        6. **Error Handling**: Async functions work well with Rust's `?` operator for error handling.
245
246        Async programming in Rust provides a powerful way to write concurrent code that's both safe and efficient. It's particularly useful for servers, network programming, and any application that deals with many concurrent operations.".unindent(), cx);
247            thread
248        }));
249
250        self.threads.push(cx.new_model(|cx| {
251            let mut thread = Thread::new(self.tools.clone(), cx);
252            thread.set_summary("Rust code with long lines", cx);
253            thread.insert_user_message("Could you write me some Rust code with long lines?", Vec::new(), cx);
254            thread.insert_message(Role::Assistant, r#"Here's some Rust code with some intentionally long lines:
255            ```rust
256            use std::collections::{HashMap, HashSet};
257            use std::sync::{Arc, Mutex};
258            use std::thread;
259
260            fn main() {
261                let very_long_vector = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25];
262
263                let complicated_hashmap: HashMap<String, Vec<(i32, f64, String)>> = [("key1".to_string(), vec![(1, 1.1, "value1".to_string()), (2, 2.2, "value2".to_string())]), ("key2".to_string(), vec![(3, 3.3, "value3".to_string()), (4, 4.4, "value4".to_string())])].iter().cloned().collect();
264
265                let nested_structure = Arc::new(Mutex::new(HashMap::new()));
266
267                let long_closure = |x: i32, y: i32, z: i32| -> i32 { let result = x * y + z; println!("The result of the long closure calculation is: {}", result); result };
268
269                let thread_handles: Vec<_> = (0..10).map(|i| {
270                    let nested_structure_clone = Arc::clone(&nested_structure);
271                    thread::spawn(move || {
272                        let mut lock = nested_structure_clone.lock().unwrap();
273                        lock.entry(format!("thread_{}", i)).or_insert_with(|| HashSet::new()).insert(i * i);
274                    })
275                }).collect();
276
277                for handle in thread_handles {
278                    handle.join().unwrap();
279                }
280
281                println!("The final state of the nested structure is: {:?}", nested_structure.lock().unwrap());
282
283                let complex_expression = very_long_vector.iter().filter(|&&x| x % 2 == 0).map(|&x| x * x).fold(0, |acc, x| acc + x) + long_closure(5, 10, 15);
284
285                println!("The result of the complex expression is: {}", complex_expression);
286            }
287            ```"#.unindent(), cx);
288            thread
289        }));
290    }
291}