thread_store.rs

  1use std::sync::Arc;
  2
  3use anyhow::Result;
  4use assistant_tool::{ToolId, ToolWorkingSet};
  5use collections::HashMap;
  6use context_server::manager::ContextServerManager;
  7use context_server::{ContextServerFactoryRegistry, ContextServerTool};
  8use gpui::{prelude::*, AppContext, Model, ModelContext, Task};
  9use project::Project;
 10use unindent::Unindent;
 11use util::ResultExt as _;
 12
 13use crate::thread::{Thread, ThreadId};
 14
 15pub struct ThreadStore {
 16    #[allow(unused)]
 17    project: Model<Project>,
 18    tools: Arc<ToolWorkingSet>,
 19    context_server_manager: Model<ContextServerManager>,
 20    context_server_tool_ids: HashMap<Arc<str>, Vec<ToolId>>,
 21    threads: Vec<Model<Thread>>,
 22}
 23
 24impl ThreadStore {
 25    pub fn new(
 26        project: Model<Project>,
 27        tools: Arc<ToolWorkingSet>,
 28        cx: &mut AppContext,
 29    ) -> Task<Result<Model<Self>>> {
 30        cx.spawn(|mut cx| async move {
 31            let this = cx.new_model(|cx: &mut ModelContext<Self>| {
 32                let context_server_factory_registry =
 33                    ContextServerFactoryRegistry::default_global(cx);
 34                let context_server_manager = cx.new_model(|cx| {
 35                    ContextServerManager::new(context_server_factory_registry, project.clone(), cx)
 36                });
 37
 38                let mut this = Self {
 39                    project,
 40                    tools,
 41                    context_server_manager,
 42                    context_server_tool_ids: HashMap::default(),
 43                    threads: Vec::new(),
 44                };
 45                this.mock_recent_threads(cx);
 46                this.register_context_server_handlers(cx);
 47
 48                this
 49            })?;
 50
 51            Ok(this)
 52        })
 53    }
 54
 55    pub fn threads(&self, cx: &ModelContext<Self>) -> Vec<Model<Thread>> {
 56        let mut threads = self
 57            .threads
 58            .iter()
 59            .filter(|thread| !thread.read(cx).is_empty())
 60            .cloned()
 61            .collect::<Vec<_>>();
 62        threads.sort_unstable_by_key(|thread| std::cmp::Reverse(thread.read(cx).updated_at()));
 63        threads
 64    }
 65
 66    pub fn recent_threads(&self, limit: usize, cx: &ModelContext<Self>) -> Vec<Model<Thread>> {
 67        self.threads(cx).into_iter().take(limit).collect()
 68    }
 69
 70    pub fn create_thread(&mut self, cx: &mut ModelContext<Self>) -> Model<Thread> {
 71        let thread = cx.new_model(|cx| Thread::new(self.tools.clone(), cx));
 72        self.threads.push(thread.clone());
 73        thread
 74    }
 75
 76    pub fn open_thread(&self, id: &ThreadId, cx: &mut ModelContext<Self>) -> Option<Model<Thread>> {
 77        self.threads
 78            .iter()
 79            .find(|thread| thread.read(cx).id() == id)
 80            .cloned()
 81    }
 82
 83    pub fn delete_thread(&mut self, id: &ThreadId, cx: &mut ModelContext<Self>) {
 84        self.threads.retain(|thread| thread.read(cx).id() != id);
 85    }
 86
 87    fn register_context_server_handlers(&self, cx: &mut ModelContext<Self>) {
 88        cx.subscribe(
 89            &self.context_server_manager.clone(),
 90            Self::handle_context_server_event,
 91        )
 92        .detach();
 93    }
 94
 95    fn handle_context_server_event(
 96        &mut self,
 97        context_server_manager: Model<ContextServerManager>,
 98        event: &context_server::manager::Event,
 99        cx: &mut ModelContext<Self>,
100    ) {
101        let tool_working_set = self.tools.clone();
102        match event {
103            context_server::manager::Event::ServerStarted { server_id } => {
104                if let Some(server) = context_server_manager.read(cx).get_server(server_id) {
105                    let context_server_manager = context_server_manager.clone();
106                    cx.spawn({
107                        let server = server.clone();
108                        let server_id = server_id.clone();
109                        |this, mut cx| async move {
110                            let Some(protocol) = server.client() else {
111                                return;
112                            };
113
114                            if protocol.capable(context_server::protocol::ServerCapability::Tools) {
115                                if let Some(tools) = protocol.list_tools().await.log_err() {
116                                    let tool_ids = tools
117                                        .tools
118                                        .into_iter()
119                                        .map(|tool| {
120                                            log::info!(
121                                                "registering context server tool: {:?}",
122                                                tool.name
123                                            );
124                                            tool_working_set.insert(Arc::new(
125                                                ContextServerTool::new(
126                                                    context_server_manager.clone(),
127                                                    server.id(),
128                                                    tool,
129                                                ),
130                                            ))
131                                        })
132                                        .collect::<Vec<_>>();
133
134                                    this.update(&mut cx, |this, _cx| {
135                                        this.context_server_tool_ids.insert(server_id, tool_ids);
136                                    })
137                                    .log_err();
138                                }
139                            }
140                        }
141                    })
142                    .detach();
143                }
144            }
145            context_server::manager::Event::ServerStopped { server_id } => {
146                if let Some(tool_ids) = self.context_server_tool_ids.remove(server_id) {
147                    tool_working_set.remove(&tool_ids);
148                }
149            }
150        }
151    }
152}
153
154impl ThreadStore {
155    /// Creates some mocked recent threads for testing purposes.
156    fn mock_recent_threads(&mut self, cx: &mut ModelContext<Self>) {
157        use language_model::Role;
158
159        self.threads.push(cx.new_model(|cx| {
160            let mut thread = Thread::new(self.tools.clone(), cx);
161            thread.set_summary("Introduction to quantum computing", cx);
162            thread.insert_user_message("Hello! Can you help me understand quantum computing?", Vec::new(), cx);
163            thread.insert_message(Role::Assistant, "Of course! I'd be happy to help you understand quantum computing. Quantum computing is a fascinating field that uses the principles of quantum mechanics to process information. Unlike classical computers that use bits (0s and 1s), quantum computers use quantum bits or 'qubits'. These qubits can exist in multiple states simultaneously, a property called superposition. This allows quantum computers to perform certain calculations much faster than classical computers. What specific aspect of quantum computing would you like to know more about?", cx);
164            thread.insert_user_message("That's interesting! Can you explain how quantum entanglement is used in quantum computing?", Vec::new(), cx);
165            thread.insert_message(Role::Assistant, "Certainly! Quantum entanglement is a key principle used in quantum computing. When two qubits become entangled, the state of one qubit is directly related to the state of the other, regardless of the distance between them. This property is used in quantum computing to create complex quantum states and to perform operations on multiple qubits simultaneously. Entanglement allows quantum computers to process information in ways that classical computers cannot, potentially solving certain problems much more efficiently. For example, it's crucial in quantum error correction and in algorithms like quantum teleportation, which is important for quantum communication.", cx);
166            thread
167        }));
168
169        self.threads.push(cx.new_model(|cx| {
170            let mut thread = Thread::new(self.tools.clone(), cx);
171            thread.set_summary("Rust web development and async programming", cx);
172            thread.insert_user_message("Can you show me an example of Rust code for a simple web server?", Vec::new(), cx);
173            thread.insert_message(Role::Assistant, "Certainly! Here's an example of a simple web server in Rust using the `actix-web` framework:
174
175        ```rust
176        use actix_web::{web, App, HttpResponse, HttpServer, Responder};
177
178        async fn hello() -> impl Responder {
179            HttpResponse::Ok().body(\"Hello, World!\")
180        }
181
182        #[actix_web::main]
183        async fn main() -> std::io::Result<()> {
184            HttpServer::new(|| {
185                App::new()
186                    .route(\"/\", web::get().to(hello))
187            })
188            .bind(\"127.0.0.1:8080\")?
189            .run()
190            .await
191        }
192        ```
193
194        This code creates a basic web server that responds with 'Hello, World!' when you access the root URL. Here's a breakdown of what's happening:
195
196        1. We import necessary items from the `actix-web` crate.
197        2. We define an async `hello` function that returns a simple HTTP response.
198        3. In the `main` function, we set up the server to listen on `127.0.0.1:8080`.
199        4. We configure the app to respond to GET requests on the root path with our `hello` function.
200
201        To run this, you'd need to add `actix-web` to your `Cargo.toml` dependencies:
202
203        ```toml
204        [dependencies]
205        actix-web = \"4.0\"
206        ```
207
208        Then you can run the server with `cargo run` and access it at `http://localhost:8080`.".unindent(), cx);
209            thread.insert_user_message("That's great! Can you explain more about async functions in Rust?", Vec::new(), cx);
210            thread.insert_message(Role::Assistant, "Certainly! Async functions are a key feature in Rust for writing efficient, non-blocking code, especially for I/O-bound operations. Here's an overview:
211
212        1. **Syntax**: Async functions are declared using the `async` keyword:
213
214           ```rust
215           async fn my_async_function() -> Result<(), Error> {
216               // Asynchronous code here
217           }
218           ```
219
220        2. **Futures**: Async functions return a `Future`. A `Future` represents a value that may not be available yet but will be at some point.
221
222        3. **Await**: Inside an async function, you can use the `.await` syntax to wait for other async operations to complete:
223
224           ```rust
225           async fn fetch_data() -> Result<String, Error> {
226               let response = make_http_request().await?;
227               let data = process_response(response).await?;
228               Ok(data)
229           }
230           ```
231
232        4. **Non-blocking**: Async functions allow the runtime to work on other tasks while waiting for I/O or other operations to complete, making efficient use of system resources.
233
234        5. **Runtime**: To execute async code, you need a runtime like `tokio` or `async-std`. Actix-web, which we used in the previous example, includes its own runtime.
235
236        6. **Error Handling**: Async functions work well with Rust's `?` operator for error handling.
237
238        Async programming in Rust provides a powerful way to write concurrent code that's both safe and efficient. It's particularly useful for servers, network programming, and any application that deals with many concurrent operations.".unindent(), cx);
239            thread
240        }));
241
242        self.threads.push(cx.new_model(|cx| {
243            let mut thread = Thread::new(self.tools.clone(), cx);
244            thread.set_summary("Rust code with long lines", cx);
245            thread.insert_user_message("Could you write me some Rust code with long lines?", Vec::new(), cx);
246            thread.insert_message(Role::Assistant, r#"Here's some Rust code with some intentionally long lines:
247            ```rust
248            use std::collections::{HashMap, HashSet};
249            use std::sync::{Arc, Mutex};
250            use std::thread;
251
252            fn main() {
253                let very_long_vector = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25];
254
255                let complicated_hashmap: HashMap<String, Vec<(i32, f64, String)>> = [("key1".to_string(), vec![(1, 1.1, "value1".to_string()), (2, 2.2, "value2".to_string())]), ("key2".to_string(), vec![(3, 3.3, "value3".to_string()), (4, 4.4, "value4".to_string())])].iter().cloned().collect();
256
257                let nested_structure = Arc::new(Mutex::new(HashMap::new()));
258
259                let long_closure = |x: i32, y: i32, z: i32| -> i32 { let result = x * y + z; println!("The result of the long closure calculation is: {}", result); result };
260
261                let thread_handles: Vec<_> = (0..10).map(|i| {
262                    let nested_structure_clone = Arc::clone(&nested_structure);
263                    thread::spawn(move || {
264                        let mut lock = nested_structure_clone.lock().unwrap();
265                        lock.entry(format!("thread_{}", i)).or_insert_with(|| HashSet::new()).insert(i * i);
266                    })
267                }).collect();
268
269                for handle in thread_handles {
270                    handle.join().unwrap();
271                }
272
273                println!("The final state of the nested structure is: {:?}", nested_structure.lock().unwrap());
274
275                let complex_expression = very_long_vector.iter().filter(|&&x| x % 2 == 0).map(|&x| x * x).fold(0, |acc, x| acc + x) + long_closure(5, 10, 15);
276
277                println!("The result of the complex expression is: {}", complex_expression);
278            }
279            ```"#.unindent(), cx);
280            thread
281        }));
282    }
283}