From e3c0d6980cf5758aa1ccefe6280cf3772860dadc Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Thu, 16 Sep 2021 18:39:29 -0700 Subject: [PATCH] Switch to a new flow for advertising, sharing and joining worktrees Now, when you open a local worktree, we immediately send an `OpenWorktree` message to the server, telling it the name of the folder that you've opened, and the names of all the collaborators (based on a `.zed.toml` file). The server responds with a unique id for the worktree. When starting share this local worktree, you now include this previously-assigned id in the `ShareWorktree` message. When joining a worktree, there is no longer a need to provide an access token. The access is controlled by the set of "collaborator logins" that were provided when the worktree was initially opened by the host. --- server/src/rpc.rs | 498 ++++++++++++++++++++++++++++--------------- zed/src/workspace.rs | 90 +++----- zed/src/worktree.rs | 208 ++++++++++++++---- zrpc/proto/zed.proto | 24 ++- zrpc/src/proto.rs | 10 +- 5 files changed, 547 insertions(+), 283 deletions(-) diff --git a/server/src/rpc.rs b/server/src/rpc.rs index ac04e5ee24f07254913739a1f7f2a7ebd31562b4..bbc0b090ca6afc9b61030888017137912b8f660f 100644 --- a/server/src/rpc.rs +++ b/server/src/rpc.rs @@ -1,6 +1,6 @@ use super::{ auth, - db::{ChannelId, MessageId, UserId}, + db::{ChannelId, MessageId, User, UserId}, AppState, }; use anyhow::anyhow; @@ -25,7 +25,6 @@ use tide::{ }; use time::OffsetDateTime; use zrpc::{ - auth::random_token, proto::{self, AnyTypedEnvelope, EnvelopedMessage}, Connection, ConnectionId, Peer, TypedEnvelope, }; @@ -50,6 +49,7 @@ pub struct Server { struct ServerState { connections: HashMap, pub worktrees: HashMap, + visible_worktrees_by_github_login: HashMap>, channels: HashMap, next_worktree_id: u64, } @@ -61,11 +61,15 @@ struct ConnectionState { } struct Worktree { - host_connection_id: Option, + host_connection_id: ConnectionId, + collaborator_github_logins: Vec, + root_name: String, + share: Option, +} + +struct WorktreeShare { guest_connection_ids: HashMap, active_replica_ids: HashSet, - access_token: String, - root_name: String, entries: HashMap, } @@ -93,10 +97,12 @@ impl Server { server .add_handler(Server::ping) + .add_handler(Server::open_worktree) + .add_handler(Server::close_worktree) .add_handler(Server::share_worktree) + .add_handler(Server::unshare_worktree) .add_handler(Server::join_worktree) .add_handler(Server::update_worktree) - .add_handler(Server::close_worktree) .add_handler(Server::open_buffer) .add_handler(Server::close_buffer) .add_handler(Server::update_buffer) @@ -231,13 +237,15 @@ impl Server { } for worktree_id in connection.worktrees { if let Some(worktree) = state.worktrees.get_mut(&worktree_id) { - if worktree.host_connection_id == Some(connection_id) { - worktree_ids.push(worktree_id); - } else if let Some(replica_id) = - worktree.guest_connection_ids.remove(&connection_id) - { - worktree.active_replica_ids.remove(&replica_id); + if worktree.host_connection_id == connection_id { worktree_ids.push(worktree_id); + } else if let Some(share_state) = worktree.share.as_mut() { + if let Some(replica_id) = + share_state.guest_connection_ids.remove(&connection_id) + { + share_state.active_replica_ids.remove(&replica_id); + worktree_ids.push(worktree_id); + } } } } @@ -250,14 +258,30 @@ impl Server { Ok(()) } + async fn open_worktree( + self: Arc, + request: TypedEnvelope, + ) -> tide::Result<()> { + let receipt = request.receipt(); + + let mut state = self.state.write().await; + let worktree_id = state.add_worktree(Worktree { + host_connection_id: request.sender_id, + collaborator_github_logins: request.payload.collaborator_logins, + root_name: request.payload.root_name, + share: None, + }); + + self.peer + .respond(receipt, proto::OpenWorktreeResponse { worktree_id }) + .await?; + Ok(()) + } + async fn share_worktree( self: Arc, mut request: TypedEnvelope, ) -> tide::Result<()> { - let mut state = self.state.write().await; - let worktree_id = state.next_worktree_id; - state.next_worktree_id += 1; - let access_token = random_token(); let worktree = request .payload .worktree @@ -267,27 +291,58 @@ impl Server { .into_iter() .map(|entry| (entry.id, entry)) .collect(); - state.worktrees.insert( - worktree_id, - Worktree { - host_connection_id: Some(request.sender_id), + let mut state = self.state.write().await; + if let Some(worktree) = state.worktrees.get_mut(&worktree.id) { + worktree.share = Some(WorktreeShare { guest_connection_ids: Default::default(), active_replica_ids: Default::default(), - access_token: access_token.clone(), - root_name: mem::take(&mut worktree.root_name), entries, - }, - ); + }); + self.peer + .respond(request.receipt(), proto::ShareWorktreeResponse {}) + .await?; + } else { + self.peer + .respond_with_error( + request.receipt(), + proto::Error { + message: "no such worktree".to_string(), + }, + ) + .await?; + } + Ok(()) + } + + async fn unshare_worktree( + self: Arc, + request: TypedEnvelope, + ) -> tide::Result<()> { + let worktree_id = request.payload.worktree_id; + + let connection_ids; + { + let mut state = self.state.write().await; + let worktree = state.write_worktree(worktree_id, request.sender_id)?; + if worktree.host_connection_id != request.sender_id { + return Err(anyhow!("no such worktree"))?; + } + + connection_ids = worktree.connection_ids(); + worktree.share.take(); + for connection_id in &connection_ids { + if let Some(connection) = state.connections.get_mut(connection_id) { + connection.worktrees.remove(&worktree_id); + } + } + } + + broadcast(request.sender_id, connection_ids, |conn_id| { + self.peer + .send(conn_id, proto::UnshareWorktree { worktree_id }) + }) + .await?; - self.peer - .respond( - request.receipt(), - proto::ShareWorktreeResponse { - worktree_id, - access_token, - }, - ) - .await?; Ok(()) } @@ -296,67 +351,112 @@ impl Server { request: TypedEnvelope, ) -> tide::Result<()> { let worktree_id = request.payload.worktree_id; - let access_token = &request.payload.access_token; + let user = self.user_for_connection(request.sender_id).await?; + let response; + let connection_ids; let mut state = self.state.write().await; - if let Some((peer_replica_id, worktree)) = - state.join_worktree(request.sender_id, worktree_id, access_token) - { - let mut peers = Vec::new(); - if let Some(host_connection_id) = worktree.host_connection_id { + match state.join_worktree(request.sender_id, &user, worktree_id) { + Ok((peer_replica_id, worktree)) => { + let share = worktree.share()?; + let peer_count = share.guest_connection_ids.len(); + let mut peers = Vec::with_capacity(peer_count); peers.push(proto::Peer { - peer_id: host_connection_id.0, + peer_id: worktree.host_connection_id.0, replica_id: 0, }); + for (peer_conn_id, peer_replica_id) in &share.guest_connection_ids { + if *peer_conn_id != request.sender_id { + peers.push(proto::Peer { + peer_id: peer_conn_id.0, + replica_id: *peer_replica_id as u32, + }); + } + } + connection_ids = worktree.connection_ids(); + response = proto::JoinWorktreeResponse { + worktree: Some(proto::Worktree { + id: worktree_id, + root_name: worktree.root_name.clone(), + entries: share.entries.values().cloned().collect(), + }), + replica_id: peer_replica_id as u32, + peers, + }; } - for (peer_conn_id, peer_replica_id) in &worktree.guest_connection_ids { - if *peer_conn_id != request.sender_id { - peers.push(proto::Peer { - peer_id: peer_conn_id.0, - replica_id: *peer_replica_id as u32, - }); + Err(error) => { + self.peer + .respond_with_error( + request.receipt(), + proto::Error { + message: error.to_string(), + }, + ) + .await?; + return Ok(()); + } + } + + broadcast(request.sender_id, connection_ids, |conn_id| { + self.peer.send( + conn_id, + proto::AddPeer { + worktree_id, + peer: Some(proto::Peer { + peer_id: request.sender_id.0, + replica_id: response.replica_id, + }), + }, + ) + }) + .await?; + self.peer.respond(request.receipt(), response).await?; + + Ok(()) + } + + async fn close_worktree( + self: Arc, + request: TypedEnvelope, + ) -> tide::Result<()> { + let worktree_id = request.payload.worktree_id; + let connection_ids; + let mut is_host = false; + let mut is_guest = false; + { + let mut state = self.state.write().await; + let worktree = state.write_worktree(worktree_id, request.sender_id)?; + connection_ids = worktree.connection_ids(); + + if worktree.host_connection_id == request.sender_id { + is_host = true; + state.remove_worktree(worktree_id); + } else { + let share = worktree.share_mut()?; + if let Some(replica_id) = share.guest_connection_ids.remove(&request.sender_id) { + is_guest = true; + share.active_replica_ids.remove(&replica_id); } } + } - broadcast(request.sender_id, worktree.connection_ids(), |conn_id| { + if is_host { + broadcast(request.sender_id, connection_ids, |conn_id| { + self.peer + .send(conn_id, proto::UnshareWorktree { worktree_id }) + }) + .await?; + } else if is_guest { + broadcast(request.sender_id, connection_ids, |conn_id| { self.peer.send( conn_id, - proto::AddPeer { + proto::RemovePeer { worktree_id, - peer: Some(proto::Peer { - peer_id: request.sender_id.0, - replica_id: peer_replica_id as u32, - }), + peer_id: request.sender_id.0, }, ) }) - .await?; - self.peer - .respond( - request.receipt(), - proto::JoinWorktreeResponse { - worktree_id, - worktree: Some(proto::Worktree { - root_name: worktree.root_name.clone(), - entries: worktree.entries.values().cloned().collect(), - }), - replica_id: peer_replica_id as u32, - peers, - }, - ) - .await?; - } else { - self.peer - .respond( - request.receipt(), - proto::JoinWorktreeResponse { - worktree_id, - worktree: None, - replica_id: 0, - peers: Vec::new(), - }, - ) - .await?; + .await? } Ok(()) @@ -369,12 +469,14 @@ impl Server { { let mut state = self.state.write().await; let worktree = state.write_worktree(request.payload.worktree_id, request.sender_id)?; + let share = worktree.share_mut()?; + for entry_id in &request.payload.removed_entries { - worktree.entries.remove(&entry_id); + share.entries.remove(&entry_id); } for entry in &request.payload.updated_entries { - worktree.entries.insert(entry.id, entry.clone()); + share.entries.insert(entry.id, entry.clone()); } } @@ -383,38 +485,6 @@ impl Server { Ok(()) } - async fn close_worktree( - self: Arc, - request: TypedEnvelope, - ) -> tide::Result<()> { - let connection_ids; - { - let mut state = self.state.write().await; - let worktree = state.write_worktree(request.payload.worktree_id, request.sender_id)?; - connection_ids = worktree.connection_ids(); - if worktree.host_connection_id == Some(request.sender_id) { - worktree.host_connection_id = None; - } else if let Some(replica_id) = - worktree.guest_connection_ids.remove(&request.sender_id) - { - worktree.active_replica_ids.remove(&replica_id); - } - } - - broadcast(request.sender_id, connection_ids, |conn_id| { - self.peer.send( - conn_id, - proto::RemovePeer { - worktree_id: request.payload.worktree_id, - peer_id: request.sender_id.0, - }, - ) - }) - .await?; - - Ok(()) - } - async fn open_buffer( self: Arc, request: TypedEnvelope, @@ -426,7 +496,7 @@ impl Server { .read() .await .read_worktree(worktree_id, request.sender_id)? - .host_connection_id()?; + .host_connection_id; let response = self .peer @@ -445,7 +515,7 @@ impl Server { .read() .await .read_worktree(request.payload.worktree_id, request.sender_id)? - .host_connection_id()?; + .host_connection_id; self.peer .forward_send(request.sender_id, host_connection_id, request.payload) @@ -463,8 +533,9 @@ impl Server { { let state = self.state.read().await; let worktree = state.read_worktree(request.payload.worktree_id, request.sender_id)?; - host = worktree.host_connection_id()?; + host = worktree.host_connection_id; guests = worktree + .share()? .guest_connection_ids .keys() .copied() @@ -785,6 +856,24 @@ impl Server { Ok(()) } + async fn user_for_connection(&self, connection_id: ConnectionId) -> tide::Result { + let user_id = self + .state + .read() + .await + .connections + .get(&connection_id) + .ok_or_else(|| anyhow!("no such connection"))? + .user_id; + Ok(self + .app_state + .db + .get_users_by_ids(user_id, Some(user_id).into_iter()) + .await? + .pop() + .ok_or_else(|| anyhow!("no such user"))?) + } + async fn broadcast_in_worktree( &self, worktree_id: u64, @@ -860,30 +949,34 @@ impl ServerState { fn join_worktree( &mut self, connection_id: ConnectionId, + user: &User, worktree_id: u64, - access_token: &str, - ) -> Option<(ReplicaId, &Worktree)> { - if let Some(worktree) = self.worktrees.get_mut(&worktree_id) { - if access_token == worktree.access_token { - if let Some(connection) = self.connections.get_mut(&connection_id) { - connection.worktrees.insert(worktree_id); - } + ) -> tide::Result<(ReplicaId, &Worktree)> { + let connection = self + .connections + .get_mut(&connection_id) + .ok_or_else(|| anyhow!("no such connection"))?; + let worktree = self + .worktrees + .get_mut(&worktree_id) + .ok_or_else(|| anyhow!("no such worktree"))?; + if !worktree + .collaborator_github_logins + .contains(&user.github_login) + { + Err(anyhow!("no such worktree"))?; + } - let mut replica_id = 1; - while worktree.active_replica_ids.contains(&replica_id) { - replica_id += 1; - } - worktree.active_replica_ids.insert(replica_id); - worktree - .guest_connection_ids - .insert(connection_id, replica_id); - Some((replica_id, worktree)) - } else { - None - } - } else { - None + let share = worktree.share_mut()?; + connection.worktrees.insert(worktree_id); + + let mut replica_id = 1; + while share.active_replica_ids.contains(&replica_id) { + replica_id += 1; } + share.active_replica_ids.insert(replica_id); + share.guest_connection_ids.insert(connection_id, replica_id); + return Ok((replica_id, worktree)); } fn read_worktree( @@ -896,8 +989,11 @@ impl ServerState { .get(&worktree_id) .ok_or_else(|| anyhow!("worktree not found"))?; - if worktree.host_connection_id == Some(connection_id) - || worktree.guest_connection_ids.contains_key(&connection_id) + if worktree.host_connection_id == connection_id + || worktree + .share()? + .guest_connection_ids + .contains_key(&connection_id) { Ok(worktree) } else { @@ -919,8 +1015,10 @@ impl ServerState { .get_mut(&worktree_id) .ok_or_else(|| anyhow!("worktree not found"))?; - if worktree.host_connection_id == Some(connection_id) - || worktree.guest_connection_ids.contains_key(&connection_id) + if worktree.host_connection_id == connection_id + || worktree.share.as_ref().map_or(false, |share| { + share.guest_connection_ids.contains_key(&connection_id) + }) { Ok(worktree) } else { @@ -931,21 +1029,69 @@ impl ServerState { ))? } } + + fn add_worktree(&mut self, worktree: Worktree) -> u64 { + let worktree_id = self.next_worktree_id; + for collaborator_login in &worktree.collaborator_github_logins { + self.visible_worktrees_by_github_login + .entry(collaborator_login.clone()) + .or_default() + .insert(worktree_id); + } + self.next_worktree_id += 1; + self.worktrees.insert(worktree_id, worktree); + worktree_id + } + + fn remove_worktree(&mut self, worktree_id: u64) { + let worktree = self.worktrees.remove(&worktree_id).unwrap(); + if let Some(connection) = self.connections.get_mut(&worktree.host_connection_id) { + connection.worktrees.remove(&worktree_id); + } + if let Some(share) = worktree.share { + for connection_id in share.guest_connection_ids.keys() { + if let Some(connection) = self.connections.get_mut(connection_id) { + connection.worktrees.remove(&worktree_id); + } + } + } + for collaborator_login in worktree.collaborator_github_logins { + if let Some(visible_worktrees) = self + .visible_worktrees_by_github_login + .get_mut(&collaborator_login) + { + visible_worktrees.remove(&worktree_id); + } + } + } } impl Worktree { pub fn connection_ids(&self) -> Vec { - self.guest_connection_ids - .keys() - .copied() - .chain(self.host_connection_id) - .collect() + if let Some(share) = &self.share { + share + .guest_connection_ids + .keys() + .copied() + .chain(Some(self.host_connection_id)) + .collect() + } else { + vec![self.host_connection_id] + } } - fn host_connection_id(&self) -> tide::Result { + fn share(&self) -> tide::Result<&WorktreeShare> { Ok(self - .host_connection_id - .ok_or_else(|| anyhow!("host disconnected from worktree"))?) + .share + .as_ref() + .ok_or_else(|| anyhow!("worktree is not shared"))?) + } + + fn share_mut(&mut self) -> tide::Result<&mut WorktreeShare> { + Ok(self + .share + .as_mut() + .ok_or_else(|| anyhow!("worktree is not shared"))?) } } @@ -1066,6 +1212,7 @@ mod tests { fs.insert_tree( "/a", json!({ + ".zed.toml": r#"collaborators = ["user_b"]"#, "a.txt": "a-contents", "b.txt": "b-contents", }), @@ -1083,7 +1230,7 @@ mod tests { worktree_a .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete()) .await; - let (worktree_id, worktree_token) = worktree_a + let worktree_id = worktree_a .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx)) .await .unwrap(); @@ -1092,7 +1239,6 @@ mod tests { let worktree_b = Worktree::open_remote( client_b.clone(), worktree_id, - worktree_token, lang_registry.clone(), &mut cx_b.to_async(), ) @@ -1173,6 +1319,7 @@ mod tests { fs.insert_tree( "/a", json!({ + ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#, "file1": "", "file2": "" }), @@ -1191,7 +1338,7 @@ mod tests { worktree_a .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete()) .await; - let (worktree_id, worktree_token) = worktree_a + let worktree_id = worktree_a .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx)) .await .unwrap(); @@ -1200,7 +1347,6 @@ mod tests { let worktree_b = Worktree::open_remote( client_b.clone(), worktree_id, - worktree_token.clone(), lang_registry.clone(), &mut cx_b.to_async(), ) @@ -1209,7 +1355,6 @@ mod tests { let worktree_c = Worktree::open_remote( client_c.clone(), worktree_id, - worktree_token, lang_registry.clone(), &mut cx_c.to_async(), ) @@ -1273,17 +1418,17 @@ mod tests { .unwrap(); worktree_b - .condition(&cx_b, |tree, _| tree.file_count() == 3) + .condition(&cx_b, |tree, _| tree.file_count() == 4) .await; worktree_c - .condition(&cx_c, |tree, _| tree.file_count() == 3) + .condition(&cx_c, |tree, _| tree.file_count() == 4) .await; worktree_b.read_with(&cx_b, |tree, _| { assert_eq!( tree.paths() .map(|p| p.to_string_lossy()) .collect::>(), - &["file1", "file3", "file4"] + &[".zed.toml", "file1", "file3", "file4"] ) }); worktree_c.read_with(&cx_c, |tree, _| { @@ -1291,7 +1436,7 @@ mod tests { tree.paths() .map(|p| p.to_string_lossy()) .collect::>(), - &["file1", "file3", "file4"] + &[".zed.toml", "file1", "file3", "file4"] ) }); } @@ -1308,12 +1453,18 @@ mod tests { // Share a local worktree as client A let fs = Arc::new(FakeFs::new()); - fs.save(Path::new("/a.txt"), &"a-contents".into()) - .await - .unwrap(); + fs.insert_tree( + "/dir", + json!({ + ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#, + "a.txt": "a-contents", + }), + ) + .await; + let worktree_a = Worktree::open_local( client_a.clone(), - "/".as_ref(), + "/dir".as_ref(), fs, lang_registry.clone(), &mut cx_a.to_async(), @@ -1323,7 +1474,7 @@ mod tests { worktree_a .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete()) .await; - let (worktree_id, worktree_token) = worktree_a + let worktree_id = worktree_a .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx)) .await .unwrap(); @@ -1332,7 +1483,6 @@ mod tests { let worktree_b = Worktree::open_remote( client_b.clone(), worktree_id, - worktree_token, lang_registry.clone(), &mut cx_b.to_async(), ) @@ -1388,12 +1538,17 @@ mod tests { // Share a local worktree as client A let fs = Arc::new(FakeFs::new()); - fs.save(Path::new("/a.txt"), &"a-contents".into()) - .await - .unwrap(); + fs.insert_tree( + "/dir", + json!({ + ".zed.toml": r#"collaborators = ["user_b"]"#, + "a.txt": "a-contents", + }), + ) + .await; let worktree_a = Worktree::open_local( client_a.clone(), - "/".as_ref(), + "/dir".as_ref(), fs, lang_registry.clone(), &mut cx_a.to_async(), @@ -1403,7 +1558,7 @@ mod tests { worktree_a .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete()) .await; - let (worktree_id, worktree_token) = worktree_a + let worktree_id = worktree_a .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx)) .await .unwrap(); @@ -1412,7 +1567,6 @@ mod tests { let worktree_b = Worktree::open_remote( client_b.clone(), worktree_id, - worktree_token, lang_registry.clone(), &mut cx_b.to_async(), ) @@ -1450,6 +1604,7 @@ mod tests { fs.insert_tree( "/a", json!({ + ".zed.toml": r#"collaborators = ["user_b"]"#, "a.txt": "a-contents", "b.txt": "b-contents", }), @@ -1467,7 +1622,7 @@ mod tests { worktree_a .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete()) .await; - let (worktree_id, worktree_token) = worktree_a + let worktree_id = worktree_a .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx)) .await .unwrap(); @@ -1476,7 +1631,6 @@ mod tests { let _worktree_b = Worktree::open_remote( client_b.clone(), worktree_id, - worktree_token, lang_registry.clone(), &mut cx_b.to_async(), ) diff --git a/zed/src/workspace.rs b/zed/src/workspace.rs index 8582745ffe476382a282794b1301fbf08b6f5a64..bc918bb30fd5ca3f65c8cae463e4edfbf30b430b 100644 --- a/zed/src/workspace.rs +++ b/zed/src/workspace.rs @@ -11,10 +11,11 @@ use crate::{ rpc, settings::Settings, user, + util::TryFutureExt as _, worktree::{File, Worktree}, AppState, Authenticate, }; -use anyhow::{anyhow, Result}; +use anyhow::Result; use gpui::{ action, elements::*, @@ -52,12 +53,10 @@ pub fn init(cx: &mut MutableAppContext) { open_paths(action, cx).detach() }); cx.add_global_action(open_new); - cx.add_global_action(join_worktree); cx.add_action(Workspace::save_active_item); cx.add_action(Workspace::debug_elements); cx.add_action(Workspace::open_new_file); cx.add_action(Workspace::share_worktree); - cx.add_action(Workspace::join_worktree); cx.add_action(Workspace::toggle_sidebar_item); cx.add_bindings(vec![ Binding::new("cmd-s", Save, None), @@ -129,14 +128,6 @@ fn open_new(action: &OpenNew, cx: &mut MutableAppContext) { }); } -fn join_worktree(action: &JoinWorktree, cx: &mut MutableAppContext) { - cx.add_window(window_options(), |cx| { - let mut view = Workspace::new(action.0.as_ref(), cx); - view.join_worktree(action, cx); - view - }); -} - fn window_options() -> WindowOptions<'static> { WindowOptions { bounds: RectF::new(vec2f(0., 0.), vec2f(1024., 768.)), @@ -818,67 +809,46 @@ impl Workspace { fn share_worktree(&mut self, _: &ShareWorktree, cx: &mut ViewContext) { let rpc = self.rpc.clone(); - let platform = cx.platform(); - - let task = cx.spawn(|this, mut cx| async move { - rpc.authenticate_and_connect(&cx).await?; - - let share_task = this.update(&mut cx, |this, cx| { - let worktree = this.worktrees.iter().next()?; - worktree.update(cx, |worktree, cx| { - let worktree = worktree.as_local_mut()?; - Some(worktree.share(cx)) - }) - }); + cx.spawn(|this, mut cx| { + async move { + rpc.authenticate_and_connect(&cx).await?; + + let share_task = this.update(&mut cx, |this, cx| { + let worktree = this.worktrees.iter().next()?; + worktree.update(cx, |worktree, cx| { + let worktree = worktree.as_local_mut()?; + Some(worktree.share(cx)) + }) + }); - if let Some(share_task) = share_task { - let (worktree_id, access_token) = share_task.await?; - let worktree_url = rpc::encode_worktree_url(worktree_id, &access_token); - log::info!("wrote worktree url to clipboard: {}", worktree_url); - platform.write_to_clipboard(ClipboardItem::new(worktree_url)); - } - surf::Result::Ok(()) - }); + if let Some(share_task) = share_task { + share_task.await?; + } - cx.spawn(|_, _| async move { - if let Err(e) = task.await { - log::error!("sharing failed: {:?}", e); + Ok(()) } + .log_err() }) .detach(); } - fn join_worktree(&mut self, _: &JoinWorktree, cx: &mut ViewContext) { + fn join_worktree(&mut self, id: u64, cx: &mut ViewContext) { let rpc = self.rpc.clone(); let languages = self.languages.clone(); - let task = cx.spawn(|this, mut cx| async move { - rpc.authenticate_and_connect(&cx).await?; - - let worktree_url = cx - .platform() - .read_from_clipboard() - .ok_or_else(|| anyhow!("failed to read url from clipboard"))?; - let (worktree_id, access_token) = rpc::decode_worktree_url(worktree_url.text()) - .ok_or_else(|| anyhow!("failed to decode worktree url"))?; - log::info!("read worktree url from clipboard: {}", worktree_url.text()); - - let worktree = - Worktree::open_remote(rpc.clone(), worktree_id, access_token, languages, &mut cx) - .await?; - this.update(&mut cx, |workspace, cx| { - cx.observe(&worktree, |_, _, cx| cx.notify()).detach(); - workspace.worktrees.insert(worktree); - cx.notify(); - }); - - surf::Result::Ok(()) - }); + cx.spawn(|this, mut cx| { + async move { + rpc.authenticate_and_connect(&cx).await?; + let worktree = Worktree::open_remote(rpc.clone(), id, languages, &mut cx).await?; + this.update(&mut cx, |workspace, cx| { + cx.observe(&worktree, |_, _, cx| cx.notify()).detach(); + workspace.worktrees.insert(worktree); + cx.notify(); + }); - cx.spawn(|_, _| async move { - if let Err(e) = task.await { - log::error!("joining failed: {}", e); + Ok(()) } + .log_err() }) .detach(); } diff --git a/zed/src/worktree.rs b/zed/src/worktree.rs index 79f76a97089e1c765e4a6d02f82574795f35b0c0..60d453b4083e991ad4fe6be9e3a934ce0279ea6a 100644 --- a/zed/src/worktree.rs +++ b/zed/src/worktree.rs @@ -7,7 +7,7 @@ use crate::{ fuzzy, fuzzy::CharBag, language::LanguageRegistry, - rpc::{self, proto}, + rpc::{self, proto, Status}, time::{self, ReplicaId}, util::{Bias, TryFutureExt}, }; @@ -27,6 +27,7 @@ use postage::{ prelude::{Sink as _, Stream as _}, watch, }; +use serde::Deserialize; use smol::channel::{self, Sender}; use std::{ cmp::{self, Ordering}, @@ -67,9 +68,9 @@ impl Entity for Worktree { fn release(&mut self, cx: &mut MutableAppContext) { let rpc = match self { Self::Local(tree) => tree - .share - .as_ref() - .map(|share| (tree.rpc.clone(), share.remote_id)), + .remote_id + .borrow() + .map(|remote_id| (tree.rpc.clone(), remote_id)), Self::Remote(tree) => Some((tree.rpc.clone(), tree.remote_id)), }; @@ -112,17 +113,10 @@ impl Worktree { pub async fn open_remote( rpc: Arc, id: u64, - access_token: String, languages: Arc, cx: &mut AsyncAppContext, ) -> Result> { - let response = rpc - .request(proto::JoinWorktree { - worktree_id: id, - access_token, - }) - .await?; - + let response = rpc.request(proto::JoinWorktree { worktree_id: id }).await?; Worktree::remote(response, rpc, languages, cx).await } @@ -136,7 +130,7 @@ impl Worktree { .worktree .ok_or_else(|| anyhow!("empty worktree"))?; - let remote_id = join_response.worktree_id; + let remote_id = worktree.id; let replica_id = join_response.replica_id as ReplicaId; let peers = join_response.peers; let root_char_bag: CharBag = worktree @@ -650,10 +644,13 @@ impl Deref for Worktree { pub struct LocalWorktree { snapshot: Snapshot, + config: WorktreeConfig, background_snapshot: Arc>, last_scan_state_rx: watch::Receiver, _background_scanner_task: Option>, + _maintain_remote_id_task: Task>, poll_task: Option>, + remote_id: watch::Receiver>, share: Option, open_buffers: HashMap>, shared_buffers: HashMap>>, @@ -664,6 +661,11 @@ pub struct LocalWorktree { fs: Arc, } +#[derive(Default, Deserialize)] +struct WorktreeConfig { + collaborators: Vec, +} + impl LocalWorktree { async fn new( rpc: Arc, @@ -684,6 +686,13 @@ impl LocalWorktree { let root_char_bag = root_name.chars().map(|c| c.to_ascii_lowercase()).collect(); let metadata = fs.metadata(&abs_path).await?; + let mut config = WorktreeConfig::default(); + if let Ok(zed_toml) = fs.load(&abs_path.join(".zed.toml")).await { + if let Ok(parsed) = toml::from_str(&zed_toml) { + config = parsed; + } + } + let (scan_states_tx, scan_states_rx) = smol::channel::unbounded(); let (mut last_scan_state_tx, last_scan_state_rx) = watch::channel_with(ScanState::Scanning); let tree = cx.add_model(move |cx: &mut ModelContext| { @@ -691,7 +700,7 @@ impl LocalWorktree { id: cx.model_id(), scan_id: 0, abs_path, - root_name, + root_name: root_name.clone(), root_char_bag, ignores: Default::default(), entries_by_path: Default::default(), @@ -708,11 +717,48 @@ impl LocalWorktree { )); } + let (mut remote_id_tx, remote_id_rx) = watch::channel(); + let _maintain_remote_id_task = cx.spawn_weak({ + let rpc = rpc.clone(); + move |this, cx| { + async move { + let mut status = rpc.status(); + while let Some(status) = status.recv().await { + if let Some(this) = this.upgrade(&cx) { + let remote_id = if let Status::Connected { .. } = status { + let collaborator_logins = this.read_with(&cx, |this, _| { + this.as_local().unwrap().config.collaborators.clone() + }); + let response = rpc + .request(proto::OpenWorktree { + root_name: root_name.clone(), + collaborator_logins, + }) + .await?; + + Some(response.worktree_id) + } else { + None + }; + if remote_id_tx.send(remote_id).await.is_err() { + break; + } + } + } + Ok(()) + } + .log_err() + } + }); + let tree = Self { snapshot: snapshot.clone(), + config, + remote_id: remote_id_rx, background_snapshot: Arc::new(Mutex::new(snapshot)), last_scan_state_rx, _background_scanner_task: None, + _maintain_remote_id_task, share: None, poll_task: None, open_buffers: Default::default(), @@ -733,13 +779,10 @@ impl LocalWorktree { let tree = this.as_local_mut().unwrap(); if !tree.is_scanning() { if let Some(share) = tree.share.as_ref() { - Some((tree.snapshot(), share.snapshots_tx.clone())) - } else { - None + return Some((tree.snapshot(), share.snapshots_tx.clone())); } - } else { - None } + None }); if let Some((snapshot, snapshots_to_send_tx)) = to_send { @@ -894,6 +937,18 @@ impl LocalWorktree { } } + pub fn next_remote_id(&self) -> impl Future> { + let mut remote_id = self.remote_id.clone(); + async move { + while let Some(remote_id) = remote_id.recv().await { + if remote_id.is_some() { + return remote_id; + } + } + None + } + } + fn is_scanning(&self) -> bool { if let ScanState::Scanning = *self.last_scan_state_rx.borrow() { true @@ -979,17 +1034,19 @@ impl LocalWorktree { }) } - pub fn share( - &mut self, - cx: &mut ModelContext, - ) -> Task> { + pub fn share(&mut self, cx: &mut ModelContext) -> Task> { let snapshot = self.snapshot(); let share_request = self.share_request(cx); let rpc = self.rpc.clone(); cx.spawn(|this, mut cx| async move { - let share_request = share_request.await; + let share_request = if let Some(request) = share_request.await { + request + } else { + return Err(anyhow!("failed to open worktree on the server")); + }; + + let remote_id = share_request.worktree.as_ref().unwrap().id; let share_response = rpc.request(share_request).await?; - let remote_id = share_response.worktree_id; log::info!("sharing worktree {:?}", share_response); let (snapshots_to_send_tx, snapshots_to_send_rx) = @@ -1023,28 +1080,34 @@ impl LocalWorktree { let worktree = worktree.as_local_mut().unwrap(); worktree.share = Some(ShareState { - remote_id: share_response.worktree_id, snapshots_tx: snapshots_to_send_tx, _subscriptions, }); }); - Ok((remote_id, share_response.access_token)) + Ok(remote_id) }) } - fn share_request(&self, cx: &mut ModelContext) -> Task { + fn share_request(&self, cx: &mut ModelContext) -> Task> { + let remote_id = self.next_remote_id(); let snapshot = self.snapshot(); let root_name = self.root_name.clone(); cx.background().spawn(async move { - let entries = snapshot - .entries_by_path - .cursor::<(), ()>() - .map(Into::into) - .collect(); - proto::ShareWorktree { - worktree: Some(proto::Worktree { root_name, entries }), - } + remote_id.await.map(|id| { + let entries = snapshot + .entries_by_path + .cursor::<(), ()>() + .map(Into::into) + .collect(); + proto::ShareWorktree { + worktree: Some(proto::Worktree { + id, + root_name, + entries, + }), + } + }) }) } } @@ -1082,7 +1145,6 @@ impl fmt::Debug for LocalWorktree { } struct ShareState { - remote_id: u64, snapshots_tx: Sender, _subscriptions: Vec, } @@ -1552,9 +1614,9 @@ impl File { self.worktree.update(cx, |worktree, cx| { if let Some((rpc, remote_id)) = match worktree { Worktree::Local(worktree) => worktree - .share - .as_ref() - .map(|share| (worktree.rpc.clone(), share.remote_id)), + .remote_id + .borrow() + .map(|id| (worktree.rpc.clone(), id)), Worktree::Remote(worktree) => Some((worktree.rpc.clone(), worktree.remote_id)), } { cx.spawn(|worktree, mut cx| async move { @@ -1639,7 +1701,7 @@ impl File { self.worktree.update(cx, |worktree, cx| match worktree { Worktree::Local(worktree) => { let rpc = worktree.rpc.clone(); - let worktree_id = worktree.share.as_ref().map(|share| share.remote_id); + let worktree_id = *worktree.remote_id.borrow(); let save = worktree.save(self.path.clone(), text, cx); cx.background().spawn(async move { let entry = save.await?; @@ -2528,6 +2590,7 @@ impl<'a> TryFrom<(&'a CharBag, proto::Entry)> for Entry { #[cfg(test)] mod tests { use super::*; + use crate::fs::FakeFs; use crate::test::*; use anyhow::Result; use fs::RealFs; @@ -2778,10 +2841,10 @@ mod tests { .update(&mut cx, |tree, cx| { tree.as_local().unwrap().share_request(cx) }) - .await; + .await + .unwrap(); let remote = Worktree::remote( proto::JoinWorktreeResponse { - worktree_id, worktree: share_request.worktree, replica_id: 1, peers: Vec::new(), @@ -2925,6 +2988,65 @@ mod tests { }); } + #[gpui::test] + async fn test_open_and_share_worktree(mut cx: gpui::TestAppContext) { + let user_id = 100; + let mut client = rpc::Client::new(); + let server = FakeServer::for_client(user_id, &mut client, &cx).await; + + let fs = Arc::new(FakeFs::new()); + fs.insert_tree( + "/path", + json!({ + "to": { + "the-dir": { + ".zed.toml": r#"collaborators = ["friend-1", "friend-2"]"#, + "a.txt": "a-contents", + }, + }, + }), + ) + .await; + + let worktree = Worktree::open_local( + client.clone(), + "/path/to/the-dir".as_ref(), + fs, + Default::default(), + &mut cx.to_async(), + ) + .await + .unwrap(); + + { + let cx = cx.to_async(); + client.authenticate_and_connect(&cx).await.unwrap(); + } + + let open_worktree = server.receive::().await.unwrap(); + assert_eq!( + open_worktree.payload, + proto::OpenWorktree { + root_name: "the-dir".to_string(), + collaborator_logins: vec!["friend-1".to_string(), "friend-2".to_string()], + } + ); + + server + .respond( + open_worktree.receipt(), + proto::OpenWorktreeResponse { worktree_id: 5 }, + ) + .await; + let remote_id = worktree + .update(&mut cx, |tree, _| tree.as_local().unwrap().next_remote_id()) + .await; + assert_eq!(remote_id, Some(5)); + + cx.update(move |_| drop(worktree)); + server.receive::().await.unwrap(); + } + #[gpui::test(iterations = 100)] fn test_random(mut rng: StdRng) { let operations = env::var("OPERATIONS") diff --git a/zrpc/proto/zed.proto b/zrpc/proto/zed.proto index e6b400bef98ba6fcdcbe7f4f97c8e5dd3daebfd6..b5fe1604aee48dbe1eab9d49b12ddbfc10562cf3 100644 --- a/zrpc/proto/zed.proto +++ b/zrpc/proto/zed.proto @@ -35,6 +35,9 @@ message Envelope { ChannelMessageSent channel_message_sent = 30; GetChannelMessages get_channel_messages = 31; GetChannelMessagesResponse get_channel_messages_response = 32; + OpenWorktree open_worktree = 33; + OpenWorktreeResponse open_worktree_response = 34; + UnshareWorktree unshare_worktree = 35; } } @@ -48,22 +51,30 @@ message Error { string message = 1; } +message OpenWorktree { + string root_name = 1; + repeated string collaborator_logins = 2; +} + +message OpenWorktreeResponse { + uint64 worktree_id = 1; +} + message ShareWorktree { Worktree worktree = 1; } -message ShareWorktreeResponse { +message ShareWorktreeResponse {} + +message UnshareWorktree { uint64 worktree_id = 1; - string access_token = 2; } message JoinWorktree { uint64 worktree_id = 1; - string access_token = 2; } message JoinWorktreeResponse { - uint64 worktree_id = 1; Worktree worktree = 2; uint32 replica_id = 3; repeated Peer peers = 4; @@ -187,8 +198,9 @@ message User { } message Worktree { - string root_name = 1; - repeated Entry entries = 2; + uint64 id = 1; + string root_name = 2; + repeated Entry entries = 3; } message Entry { diff --git a/zrpc/src/proto.rs b/zrpc/src/proto.rs index b00bca89c2404258ee8ac5cd11b56b0ff4fc66b5..09348a0fc5d2a59f2a3663292e6cf340b611b453 100644 --- a/zrpc/src/proto.rs +++ b/zrpc/src/proto.rs @@ -135,11 +135,13 @@ messages!( GetUsersResponse, JoinChannel, JoinChannelResponse, + JoinWorktree, + JoinWorktreeResponse, LeaveChannel, OpenBuffer, OpenBufferResponse, - JoinWorktree, - JoinWorktreeResponse, + OpenWorktree, + OpenWorktreeResponse, Ping, RemovePeer, SaveBuffer, @@ -147,6 +149,7 @@ messages!( SendChannelMessageResponse, ShareWorktree, ShareWorktreeResponse, + UnshareWorktree, UpdateBuffer, UpdateWorktree, ); @@ -157,10 +160,12 @@ request_messages!( (JoinChannel, JoinChannelResponse), (OpenBuffer, OpenBufferResponse), (JoinWorktree, JoinWorktreeResponse), + (OpenWorktree, OpenWorktreeResponse), (Ping, Ack), (SaveBuffer, BufferSaved), (UpdateBuffer, Ack), (ShareWorktree, ShareWorktreeResponse), + (UnshareWorktree, Ack), (SendChannelMessage, SendChannelMessageResponse), (GetChannelMessages, GetChannelMessagesResponse), ); @@ -175,6 +180,7 @@ entity_messages!( JoinWorktree, RemovePeer, SaveBuffer, + UnshareWorktree, UpdateBuffer, UpdateWorktree, );