Switch to a new flow for advertising, sharing and joining worktrees

Max Brunsfeld created

Now, when you open a local worktree, we immediately send an `OpenWorktree` message
to the server, telling it the name of the folder that you've opened, and the names of all the
collaborators (based on a `.zed.toml` file). The server responds with a unique id for the
worktree.

When starting share this local worktree, you now include this previously-assigned id
in the `ShareWorktree` message.

When joining a worktree, there is no longer a need to provide an access token. The access
is controlled by the set of "collaborator logins" that were provided when the worktree
was initially opened by the host.

Change summary

server/src/rpc.rs    | 498 ++++++++++++++++++++++++++++++---------------
zed/src/workspace.rs |  90 ++-----
zed/src/worktree.rs  | 208 +++++++++++++++---
zrpc/proto/zed.proto |  24 +
zrpc/src/proto.rs    |  10 
5 files changed, 547 insertions(+), 283 deletions(-)

Detailed changes

server/src/rpc.rs 🔗

@@ -1,6 +1,6 @@
 use super::{
     auth,
-    db::{ChannelId, MessageId, UserId},
+    db::{ChannelId, MessageId, User, UserId},
     AppState,
 };
 use anyhow::anyhow;
@@ -25,7 +25,6 @@ use tide::{
 };
 use time::OffsetDateTime;
 use zrpc::{
-    auth::random_token,
     proto::{self, AnyTypedEnvelope, EnvelopedMessage},
     Connection, ConnectionId, Peer, TypedEnvelope,
 };
@@ -50,6 +49,7 @@ pub struct Server {
 struct ServerState {
     connections: HashMap<ConnectionId, ConnectionState>,
     pub worktrees: HashMap<u64, Worktree>,
+    visible_worktrees_by_github_login: HashMap<String, HashSet<u64>>,
     channels: HashMap<ChannelId, Channel>,
     next_worktree_id: u64,
 }
@@ -61,11 +61,15 @@ struct ConnectionState {
 }
 
 struct Worktree {
-    host_connection_id: Option<ConnectionId>,
+    host_connection_id: ConnectionId,
+    collaborator_github_logins: Vec<String>,
+    root_name: String,
+    share: Option<WorktreeShare>,
+}
+
+struct WorktreeShare {
     guest_connection_ids: HashMap<ConnectionId, ReplicaId>,
     active_replica_ids: HashSet<ReplicaId>,
-    access_token: String,
-    root_name: String,
     entries: HashMap<u64, proto::Entry>,
 }
 
@@ -93,10 +97,12 @@ impl Server {
 
         server
             .add_handler(Server::ping)
+            .add_handler(Server::open_worktree)
+            .add_handler(Server::close_worktree)
             .add_handler(Server::share_worktree)
+            .add_handler(Server::unshare_worktree)
             .add_handler(Server::join_worktree)
             .add_handler(Server::update_worktree)
-            .add_handler(Server::close_worktree)
             .add_handler(Server::open_buffer)
             .add_handler(Server::close_buffer)
             .add_handler(Server::update_buffer)
@@ -231,13 +237,15 @@ impl Server {
             }
             for worktree_id in connection.worktrees {
                 if let Some(worktree) = state.worktrees.get_mut(&worktree_id) {
-                    if worktree.host_connection_id == Some(connection_id) {
-                        worktree_ids.push(worktree_id);
-                    } else if let Some(replica_id) =
-                        worktree.guest_connection_ids.remove(&connection_id)
-                    {
-                        worktree.active_replica_ids.remove(&replica_id);
+                    if worktree.host_connection_id == connection_id {
                         worktree_ids.push(worktree_id);
+                    } else if let Some(share_state) = worktree.share.as_mut() {
+                        if let Some(replica_id) =
+                            share_state.guest_connection_ids.remove(&connection_id)
+                        {
+                            share_state.active_replica_ids.remove(&replica_id);
+                            worktree_ids.push(worktree_id);
+                        }
                     }
                 }
             }
@@ -250,14 +258,30 @@ impl Server {
         Ok(())
     }
 
+    async fn open_worktree(
+        self: Arc<Server>,
+        request: TypedEnvelope<proto::OpenWorktree>,
+    ) -> tide::Result<()> {
+        let receipt = request.receipt();
+
+        let mut state = self.state.write().await;
+        let worktree_id = state.add_worktree(Worktree {
+            host_connection_id: request.sender_id,
+            collaborator_github_logins: request.payload.collaborator_logins,
+            root_name: request.payload.root_name,
+            share: None,
+        });
+
+        self.peer
+            .respond(receipt, proto::OpenWorktreeResponse { worktree_id })
+            .await?;
+        Ok(())
+    }
+
     async fn share_worktree(
         self: Arc<Server>,
         mut request: TypedEnvelope<proto::ShareWorktree>,
     ) -> tide::Result<()> {
-        let mut state = self.state.write().await;
-        let worktree_id = state.next_worktree_id;
-        state.next_worktree_id += 1;
-        let access_token = random_token();
         let worktree = request
             .payload
             .worktree
@@ -267,27 +291,58 @@ impl Server {
             .into_iter()
             .map(|entry| (entry.id, entry))
             .collect();
-        state.worktrees.insert(
-            worktree_id,
-            Worktree {
-                host_connection_id: Some(request.sender_id),
+        let mut state = self.state.write().await;
+        if let Some(worktree) = state.worktrees.get_mut(&worktree.id) {
+            worktree.share = Some(WorktreeShare {
                 guest_connection_ids: Default::default(),
                 active_replica_ids: Default::default(),
-                access_token: access_token.clone(),
-                root_name: mem::take(&mut worktree.root_name),
                 entries,
-            },
-        );
+            });
+            self.peer
+                .respond(request.receipt(), proto::ShareWorktreeResponse {})
+                .await?;
+        } else {
+            self.peer
+                .respond_with_error(
+                    request.receipt(),
+                    proto::Error {
+                        message: "no such worktree".to_string(),
+                    },
+                )
+                .await?;
+        }
+        Ok(())
+    }
+
+    async fn unshare_worktree(
+        self: Arc<Server>,
+        request: TypedEnvelope<proto::UnshareWorktree>,
+    ) -> tide::Result<()> {
+        let worktree_id = request.payload.worktree_id;
+
+        let connection_ids;
+        {
+            let mut state = self.state.write().await;
+            let worktree = state.write_worktree(worktree_id, request.sender_id)?;
+            if worktree.host_connection_id != request.sender_id {
+                return Err(anyhow!("no such worktree"))?;
+            }
+
+            connection_ids = worktree.connection_ids();
+            worktree.share.take();
+            for connection_id in &connection_ids {
+                if let Some(connection) = state.connections.get_mut(connection_id) {
+                    connection.worktrees.remove(&worktree_id);
+                }
+            }
+        }
+
+        broadcast(request.sender_id, connection_ids, |conn_id| {
+            self.peer
+                .send(conn_id, proto::UnshareWorktree { worktree_id })
+        })
+        .await?;
 
-        self.peer
-            .respond(
-                request.receipt(),
-                proto::ShareWorktreeResponse {
-                    worktree_id,
-                    access_token,
-                },
-            )
-            .await?;
         Ok(())
     }
 
@@ -296,67 +351,112 @@ impl Server {
         request: TypedEnvelope<proto::JoinWorktree>,
     ) -> tide::Result<()> {
         let worktree_id = request.payload.worktree_id;
-        let access_token = &request.payload.access_token;
+        let user = self.user_for_connection(request.sender_id).await?;
 
+        let response;
+        let connection_ids;
         let mut state = self.state.write().await;
-        if let Some((peer_replica_id, worktree)) =
-            state.join_worktree(request.sender_id, worktree_id, access_token)
-        {
-            let mut peers = Vec::new();
-            if let Some(host_connection_id) = worktree.host_connection_id {
+        match state.join_worktree(request.sender_id, &user, worktree_id) {
+            Ok((peer_replica_id, worktree)) => {
+                let share = worktree.share()?;
+                let peer_count = share.guest_connection_ids.len();
+                let mut peers = Vec::with_capacity(peer_count);
                 peers.push(proto::Peer {
-                    peer_id: host_connection_id.0,
+                    peer_id: worktree.host_connection_id.0,
                     replica_id: 0,
                 });
+                for (peer_conn_id, peer_replica_id) in &share.guest_connection_ids {
+                    if *peer_conn_id != request.sender_id {
+                        peers.push(proto::Peer {
+                            peer_id: peer_conn_id.0,
+                            replica_id: *peer_replica_id as u32,
+                        });
+                    }
+                }
+                connection_ids = worktree.connection_ids();
+                response = proto::JoinWorktreeResponse {
+                    worktree: Some(proto::Worktree {
+                        id: worktree_id,
+                        root_name: worktree.root_name.clone(),
+                        entries: share.entries.values().cloned().collect(),
+                    }),
+                    replica_id: peer_replica_id as u32,
+                    peers,
+                };
             }
-            for (peer_conn_id, peer_replica_id) in &worktree.guest_connection_ids {
-                if *peer_conn_id != request.sender_id {
-                    peers.push(proto::Peer {
-                        peer_id: peer_conn_id.0,
-                        replica_id: *peer_replica_id as u32,
-                    });
+            Err(error) => {
+                self.peer
+                    .respond_with_error(
+                        request.receipt(),
+                        proto::Error {
+                            message: error.to_string(),
+                        },
+                    )
+                    .await?;
+                return Ok(());
+            }
+        }
+
+        broadcast(request.sender_id, connection_ids, |conn_id| {
+            self.peer.send(
+                conn_id,
+                proto::AddPeer {
+                    worktree_id,
+                    peer: Some(proto::Peer {
+                        peer_id: request.sender_id.0,
+                        replica_id: response.replica_id,
+                    }),
+                },
+            )
+        })
+        .await?;
+        self.peer.respond(request.receipt(), response).await?;
+
+        Ok(())
+    }
+
+    async fn close_worktree(
+        self: Arc<Server>,
+        request: TypedEnvelope<proto::CloseWorktree>,
+    ) -> tide::Result<()> {
+        let worktree_id = request.payload.worktree_id;
+        let connection_ids;
+        let mut is_host = false;
+        let mut is_guest = false;
+        {
+            let mut state = self.state.write().await;
+            let worktree = state.write_worktree(worktree_id, request.sender_id)?;
+            connection_ids = worktree.connection_ids();
+
+            if worktree.host_connection_id == request.sender_id {
+                is_host = true;
+                state.remove_worktree(worktree_id);
+            } else {
+                let share = worktree.share_mut()?;
+                if let Some(replica_id) = share.guest_connection_ids.remove(&request.sender_id) {
+                    is_guest = true;
+                    share.active_replica_ids.remove(&replica_id);
                 }
             }
+        }
 
-            broadcast(request.sender_id, worktree.connection_ids(), |conn_id| {
+        if is_host {
+            broadcast(request.sender_id, connection_ids, |conn_id| {
+                self.peer
+                    .send(conn_id, proto::UnshareWorktree { worktree_id })
+            })
+            .await?;
+        } else if is_guest {
+            broadcast(request.sender_id, connection_ids, |conn_id| {
                 self.peer.send(
                     conn_id,
-                    proto::AddPeer {
+                    proto::RemovePeer {
                         worktree_id,
-                        peer: Some(proto::Peer {
-                            peer_id: request.sender_id.0,
-                            replica_id: peer_replica_id as u32,
-                        }),
+                        peer_id: request.sender_id.0,
                     },
                 )
             })
-            .await?;
-            self.peer
-                .respond(
-                    request.receipt(),
-                    proto::JoinWorktreeResponse {
-                        worktree_id,
-                        worktree: Some(proto::Worktree {
-                            root_name: worktree.root_name.clone(),
-                            entries: worktree.entries.values().cloned().collect(),
-                        }),
-                        replica_id: peer_replica_id as u32,
-                        peers,
-                    },
-                )
-                .await?;
-        } else {
-            self.peer
-                .respond(
-                    request.receipt(),
-                    proto::JoinWorktreeResponse {
-                        worktree_id,
-                        worktree: None,
-                        replica_id: 0,
-                        peers: Vec::new(),
-                    },
-                )
-                .await?;
+            .await?
         }
 
         Ok(())
@@ -369,12 +469,14 @@ impl Server {
         {
             let mut state = self.state.write().await;
             let worktree = state.write_worktree(request.payload.worktree_id, request.sender_id)?;
+            let share = worktree.share_mut()?;
+
             for entry_id in &request.payload.removed_entries {
-                worktree.entries.remove(&entry_id);
+                share.entries.remove(&entry_id);
             }
 
             for entry in &request.payload.updated_entries {
-                worktree.entries.insert(entry.id, entry.clone());
+                share.entries.insert(entry.id, entry.clone());
             }
         }
 
@@ -383,38 +485,6 @@ impl Server {
         Ok(())
     }
 
-    async fn close_worktree(
-        self: Arc<Server>,
-        request: TypedEnvelope<proto::CloseWorktree>,
-    ) -> tide::Result<()> {
-        let connection_ids;
-        {
-            let mut state = self.state.write().await;
-            let worktree = state.write_worktree(request.payload.worktree_id, request.sender_id)?;
-            connection_ids = worktree.connection_ids();
-            if worktree.host_connection_id == Some(request.sender_id) {
-                worktree.host_connection_id = None;
-            } else if let Some(replica_id) =
-                worktree.guest_connection_ids.remove(&request.sender_id)
-            {
-                worktree.active_replica_ids.remove(&replica_id);
-            }
-        }
-
-        broadcast(request.sender_id, connection_ids, |conn_id| {
-            self.peer.send(
-                conn_id,
-                proto::RemovePeer {
-                    worktree_id: request.payload.worktree_id,
-                    peer_id: request.sender_id.0,
-                },
-            )
-        })
-        .await?;
-
-        Ok(())
-    }
-
     async fn open_buffer(
         self: Arc<Server>,
         request: TypedEnvelope<proto::OpenBuffer>,
@@ -426,7 +496,7 @@ impl Server {
             .read()
             .await
             .read_worktree(worktree_id, request.sender_id)?
-            .host_connection_id()?;
+            .host_connection_id;
 
         let response = self
             .peer
@@ -445,7 +515,7 @@ impl Server {
             .read()
             .await
             .read_worktree(request.payload.worktree_id, request.sender_id)?
-            .host_connection_id()?;
+            .host_connection_id;
 
         self.peer
             .forward_send(request.sender_id, host_connection_id, request.payload)
@@ -463,8 +533,9 @@ impl Server {
         {
             let state = self.state.read().await;
             let worktree = state.read_worktree(request.payload.worktree_id, request.sender_id)?;
-            host = worktree.host_connection_id()?;
+            host = worktree.host_connection_id;
             guests = worktree
+                .share()?
                 .guest_connection_ids
                 .keys()
                 .copied()
@@ -785,6 +856,24 @@ impl Server {
         Ok(())
     }
 
+    async fn user_for_connection(&self, connection_id: ConnectionId) -> tide::Result<User> {
+        let user_id = self
+            .state
+            .read()
+            .await
+            .connections
+            .get(&connection_id)
+            .ok_or_else(|| anyhow!("no such connection"))?
+            .user_id;
+        Ok(self
+            .app_state
+            .db
+            .get_users_by_ids(user_id, Some(user_id).into_iter())
+            .await?
+            .pop()
+            .ok_or_else(|| anyhow!("no such user"))?)
+    }
+
     async fn broadcast_in_worktree<T: proto::EnvelopedMessage>(
         &self,
         worktree_id: u64,
@@ -860,30 +949,34 @@ impl ServerState {
     fn join_worktree(
         &mut self,
         connection_id: ConnectionId,
+        user: &User,
         worktree_id: u64,
-        access_token: &str,
-    ) -> Option<(ReplicaId, &Worktree)> {
-        if let Some(worktree) = self.worktrees.get_mut(&worktree_id) {
-            if access_token == worktree.access_token {
-                if let Some(connection) = self.connections.get_mut(&connection_id) {
-                    connection.worktrees.insert(worktree_id);
-                }
+    ) -> tide::Result<(ReplicaId, &Worktree)> {
+        let connection = self
+            .connections
+            .get_mut(&connection_id)
+            .ok_or_else(|| anyhow!("no such connection"))?;
+        let worktree = self
+            .worktrees
+            .get_mut(&worktree_id)
+            .ok_or_else(|| anyhow!("no such worktree"))?;
+        if !worktree
+            .collaborator_github_logins
+            .contains(&user.github_login)
+        {
+            Err(anyhow!("no such worktree"))?;
+        }
 
-                let mut replica_id = 1;
-                while worktree.active_replica_ids.contains(&replica_id) {
-                    replica_id += 1;
-                }
-                worktree.active_replica_ids.insert(replica_id);
-                worktree
-                    .guest_connection_ids
-                    .insert(connection_id, replica_id);
-                Some((replica_id, worktree))
-            } else {
-                None
-            }
-        } else {
-            None
+        let share = worktree.share_mut()?;
+        connection.worktrees.insert(worktree_id);
+
+        let mut replica_id = 1;
+        while share.active_replica_ids.contains(&replica_id) {
+            replica_id += 1;
         }
+        share.active_replica_ids.insert(replica_id);
+        share.guest_connection_ids.insert(connection_id, replica_id);
+        return Ok((replica_id, worktree));
     }
 
     fn read_worktree(
@@ -896,8 +989,11 @@ impl ServerState {
             .get(&worktree_id)
             .ok_or_else(|| anyhow!("worktree not found"))?;
 
-        if worktree.host_connection_id == Some(connection_id)
-            || worktree.guest_connection_ids.contains_key(&connection_id)
+        if worktree.host_connection_id == connection_id
+            || worktree
+                .share()?
+                .guest_connection_ids
+                .contains_key(&connection_id)
         {
             Ok(worktree)
         } else {
@@ -919,8 +1015,10 @@ impl ServerState {
             .get_mut(&worktree_id)
             .ok_or_else(|| anyhow!("worktree not found"))?;
 
-        if worktree.host_connection_id == Some(connection_id)
-            || worktree.guest_connection_ids.contains_key(&connection_id)
+        if worktree.host_connection_id == connection_id
+            || worktree.share.as_ref().map_or(false, |share| {
+                share.guest_connection_ids.contains_key(&connection_id)
+            })
         {
             Ok(worktree)
         } else {
@@ -931,21 +1029,69 @@ impl ServerState {
             ))?
         }
     }
+
+    fn add_worktree(&mut self, worktree: Worktree) -> u64 {
+        let worktree_id = self.next_worktree_id;
+        for collaborator_login in &worktree.collaborator_github_logins {
+            self.visible_worktrees_by_github_login
+                .entry(collaborator_login.clone())
+                .or_default()
+                .insert(worktree_id);
+        }
+        self.next_worktree_id += 1;
+        self.worktrees.insert(worktree_id, worktree);
+        worktree_id
+    }
+
+    fn remove_worktree(&mut self, worktree_id: u64) {
+        let worktree = self.worktrees.remove(&worktree_id).unwrap();
+        if let Some(connection) = self.connections.get_mut(&worktree.host_connection_id) {
+            connection.worktrees.remove(&worktree_id);
+        }
+        if let Some(share) = worktree.share {
+            for connection_id in share.guest_connection_ids.keys() {
+                if let Some(connection) = self.connections.get_mut(connection_id) {
+                    connection.worktrees.remove(&worktree_id);
+                }
+            }
+        }
+        for collaborator_login in worktree.collaborator_github_logins {
+            if let Some(visible_worktrees) = self
+                .visible_worktrees_by_github_login
+                .get_mut(&collaborator_login)
+            {
+                visible_worktrees.remove(&worktree_id);
+            }
+        }
+    }
 }
 
 impl Worktree {
     pub fn connection_ids(&self) -> Vec<ConnectionId> {
-        self.guest_connection_ids
-            .keys()
-            .copied()
-            .chain(self.host_connection_id)
-            .collect()
+        if let Some(share) = &self.share {
+            share
+                .guest_connection_ids
+                .keys()
+                .copied()
+                .chain(Some(self.host_connection_id))
+                .collect()
+        } else {
+            vec![self.host_connection_id]
+        }
     }
 
-    fn host_connection_id(&self) -> tide::Result<ConnectionId> {
+    fn share(&self) -> tide::Result<&WorktreeShare> {
         Ok(self
-            .host_connection_id
-            .ok_or_else(|| anyhow!("host disconnected from worktree"))?)
+            .share
+            .as_ref()
+            .ok_or_else(|| anyhow!("worktree is not shared"))?)
+    }
+
+    fn share_mut(&mut self) -> tide::Result<&mut WorktreeShare> {
+        Ok(self
+            .share
+            .as_mut()
+            .ok_or_else(|| anyhow!("worktree is not shared"))?)
     }
 }
 
@@ -1066,6 +1212,7 @@ mod tests {
         fs.insert_tree(
             "/a",
             json!({
+                ".zed.toml": r#"collaborators = ["user_b"]"#,
                 "a.txt": "a-contents",
                 "b.txt": "b-contents",
             }),
@@ -1083,7 +1230,7 @@ mod tests {
         worktree_a
             .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
             .await;
-        let (worktree_id, worktree_token) = worktree_a
+        let worktree_id = worktree_a
             .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
             .await
             .unwrap();
@@ -1092,7 +1239,6 @@ mod tests {
         let worktree_b = Worktree::open_remote(
             client_b.clone(),
             worktree_id,
-            worktree_token,
             lang_registry.clone(),
             &mut cx_b.to_async(),
         )
@@ -1173,6 +1319,7 @@ mod tests {
         fs.insert_tree(
             "/a",
             json!({
+                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
                 "file1": "",
                 "file2": ""
             }),
@@ -1191,7 +1338,7 @@ mod tests {
         worktree_a
             .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
             .await;
-        let (worktree_id, worktree_token) = worktree_a
+        let worktree_id = worktree_a
             .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
             .await
             .unwrap();
@@ -1200,7 +1347,6 @@ mod tests {
         let worktree_b = Worktree::open_remote(
             client_b.clone(),
             worktree_id,
-            worktree_token.clone(),
             lang_registry.clone(),
             &mut cx_b.to_async(),
         )
@@ -1209,7 +1355,6 @@ mod tests {
         let worktree_c = Worktree::open_remote(
             client_c.clone(),
             worktree_id,
-            worktree_token,
             lang_registry.clone(),
             &mut cx_c.to_async(),
         )
@@ -1273,17 +1418,17 @@ mod tests {
             .unwrap();
 
         worktree_b
-            .condition(&cx_b, |tree, _| tree.file_count() == 3)
+            .condition(&cx_b, |tree, _| tree.file_count() == 4)
             .await;
         worktree_c
-            .condition(&cx_c, |tree, _| tree.file_count() == 3)
+            .condition(&cx_c, |tree, _| tree.file_count() == 4)
             .await;
         worktree_b.read_with(&cx_b, |tree, _| {
             assert_eq!(
                 tree.paths()
                     .map(|p| p.to_string_lossy())
                     .collect::<Vec<_>>(),
-                &["file1", "file3", "file4"]
+                &[".zed.toml", "file1", "file3", "file4"]
             )
         });
         worktree_c.read_with(&cx_c, |tree, _| {
@@ -1291,7 +1436,7 @@ mod tests {
                 tree.paths()
                     .map(|p| p.to_string_lossy())
                     .collect::<Vec<_>>(),
-                &["file1", "file3", "file4"]
+                &[".zed.toml", "file1", "file3", "file4"]
             )
         });
     }
@@ -1308,12 +1453,18 @@ mod tests {
 
         // Share a local worktree as client A
         let fs = Arc::new(FakeFs::new());
-        fs.save(Path::new("/a.txt"), &"a-contents".into())
-            .await
-            .unwrap();
+        fs.insert_tree(
+            "/dir",
+            json!({
+                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
+                "a.txt": "a-contents",
+            }),
+        )
+        .await;
+
         let worktree_a = Worktree::open_local(
             client_a.clone(),
-            "/".as_ref(),
+            "/dir".as_ref(),
             fs,
             lang_registry.clone(),
             &mut cx_a.to_async(),
@@ -1323,7 +1474,7 @@ mod tests {
         worktree_a
             .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
             .await;
-        let (worktree_id, worktree_token) = worktree_a
+        let worktree_id = worktree_a
             .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
             .await
             .unwrap();
@@ -1332,7 +1483,6 @@ mod tests {
         let worktree_b = Worktree::open_remote(
             client_b.clone(),
             worktree_id,
-            worktree_token,
             lang_registry.clone(),
             &mut cx_b.to_async(),
         )
@@ -1388,12 +1538,17 @@ mod tests {
 
         // Share a local worktree as client A
         let fs = Arc::new(FakeFs::new());
-        fs.save(Path::new("/a.txt"), &"a-contents".into())
-            .await
-            .unwrap();
+        fs.insert_tree(
+            "/dir",
+            json!({
+                ".zed.toml": r#"collaborators = ["user_b"]"#,
+                "a.txt": "a-contents",
+            }),
+        )
+        .await;
         let worktree_a = Worktree::open_local(
             client_a.clone(),
-            "/".as_ref(),
+            "/dir".as_ref(),
             fs,
             lang_registry.clone(),
             &mut cx_a.to_async(),
@@ -1403,7 +1558,7 @@ mod tests {
         worktree_a
             .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
             .await;
-        let (worktree_id, worktree_token) = worktree_a
+        let worktree_id = worktree_a
             .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
             .await
             .unwrap();
@@ -1412,7 +1567,6 @@ mod tests {
         let worktree_b = Worktree::open_remote(
             client_b.clone(),
             worktree_id,
-            worktree_token,
             lang_registry.clone(),
             &mut cx_b.to_async(),
         )
@@ -1450,6 +1604,7 @@ mod tests {
         fs.insert_tree(
             "/a",
             json!({
+                ".zed.toml": r#"collaborators = ["user_b"]"#,
                 "a.txt": "a-contents",
                 "b.txt": "b-contents",
             }),
@@ -1467,7 +1622,7 @@ mod tests {
         worktree_a
             .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
             .await;
-        let (worktree_id, worktree_token) = worktree_a
+        let worktree_id = worktree_a
             .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
             .await
             .unwrap();
@@ -1476,7 +1631,6 @@ mod tests {
         let _worktree_b = Worktree::open_remote(
             client_b.clone(),
             worktree_id,
-            worktree_token,
             lang_registry.clone(),
             &mut cx_b.to_async(),
         )

zed/src/workspace.rs 🔗

@@ -11,10 +11,11 @@ use crate::{
     rpc,
     settings::Settings,
     user,
+    util::TryFutureExt as _,
     worktree::{File, Worktree},
     AppState, Authenticate,
 };
-use anyhow::{anyhow, Result};
+use anyhow::Result;
 use gpui::{
     action,
     elements::*,
@@ -52,12 +53,10 @@ pub fn init(cx: &mut MutableAppContext) {
         open_paths(action, cx).detach()
     });
     cx.add_global_action(open_new);
-    cx.add_global_action(join_worktree);
     cx.add_action(Workspace::save_active_item);
     cx.add_action(Workspace::debug_elements);
     cx.add_action(Workspace::open_new_file);
     cx.add_action(Workspace::share_worktree);
-    cx.add_action(Workspace::join_worktree);
     cx.add_action(Workspace::toggle_sidebar_item);
     cx.add_bindings(vec![
         Binding::new("cmd-s", Save, None),
@@ -129,14 +128,6 @@ fn open_new(action: &OpenNew, cx: &mut MutableAppContext) {
     });
 }
 
-fn join_worktree(action: &JoinWorktree, cx: &mut MutableAppContext) {
-    cx.add_window(window_options(), |cx| {
-        let mut view = Workspace::new(action.0.as_ref(), cx);
-        view.join_worktree(action, cx);
-        view
-    });
-}
-
 fn window_options() -> WindowOptions<'static> {
     WindowOptions {
         bounds: RectF::new(vec2f(0., 0.), vec2f(1024., 768.)),
@@ -818,67 +809,46 @@ impl Workspace {
 
     fn share_worktree(&mut self, _: &ShareWorktree, cx: &mut ViewContext<Self>) {
         let rpc = self.rpc.clone();
-        let platform = cx.platform();
-
-        let task = cx.spawn(|this, mut cx| async move {
-            rpc.authenticate_and_connect(&cx).await?;
-
-            let share_task = this.update(&mut cx, |this, cx| {
-                let worktree = this.worktrees.iter().next()?;
-                worktree.update(cx, |worktree, cx| {
-                    let worktree = worktree.as_local_mut()?;
-                    Some(worktree.share(cx))
-                })
-            });
+        cx.spawn(|this, mut cx| {
+            async move {
+                rpc.authenticate_and_connect(&cx).await?;
+
+                let share_task = this.update(&mut cx, |this, cx| {
+                    let worktree = this.worktrees.iter().next()?;
+                    worktree.update(cx, |worktree, cx| {
+                        let worktree = worktree.as_local_mut()?;
+                        Some(worktree.share(cx))
+                    })
+                });
 
-            if let Some(share_task) = share_task {
-                let (worktree_id, access_token) = share_task.await?;
-                let worktree_url = rpc::encode_worktree_url(worktree_id, &access_token);
-                log::info!("wrote worktree url to clipboard: {}", worktree_url);
-                platform.write_to_clipboard(ClipboardItem::new(worktree_url));
-            }
-            surf::Result::Ok(())
-        });
+                if let Some(share_task) = share_task {
+                    share_task.await?;
+                }
 
-        cx.spawn(|_, _| async move {
-            if let Err(e) = task.await {
-                log::error!("sharing failed: {:?}", e);
+                Ok(())
             }
+            .log_err()
         })
         .detach();
     }
 
-    fn join_worktree(&mut self, _: &JoinWorktree, cx: &mut ViewContext<Self>) {
+    fn join_worktree(&mut self, id: u64, cx: &mut ViewContext<Self>) {
         let rpc = self.rpc.clone();
         let languages = self.languages.clone();
 
-        let task = cx.spawn(|this, mut cx| async move {
-            rpc.authenticate_and_connect(&cx).await?;
-
-            let worktree_url = cx
-                .platform()
-                .read_from_clipboard()
-                .ok_or_else(|| anyhow!("failed to read url from clipboard"))?;
-            let (worktree_id, access_token) = rpc::decode_worktree_url(worktree_url.text())
-                .ok_or_else(|| anyhow!("failed to decode worktree url"))?;
-            log::info!("read worktree url from clipboard: {}", worktree_url.text());
-
-            let worktree =
-                Worktree::open_remote(rpc.clone(), worktree_id, access_token, languages, &mut cx)
-                    .await?;
-            this.update(&mut cx, |workspace, cx| {
-                cx.observe(&worktree, |_, _, cx| cx.notify()).detach();
-                workspace.worktrees.insert(worktree);
-                cx.notify();
-            });
-
-            surf::Result::Ok(())
-        });
+        cx.spawn(|this, mut cx| {
+            async move {
+                rpc.authenticate_and_connect(&cx).await?;
+                let worktree = Worktree::open_remote(rpc.clone(), id, languages, &mut cx).await?;
+                this.update(&mut cx, |workspace, cx| {
+                    cx.observe(&worktree, |_, _, cx| cx.notify()).detach();
+                    workspace.worktrees.insert(worktree);
+                    cx.notify();
+                });
 
-        cx.spawn(|_, _| async move {
-            if let Err(e) = task.await {
-                log::error!("joining failed: {}", e);
+                Ok(())
             }
+            .log_err()
         })
         .detach();
     }

zed/src/worktree.rs 🔗

@@ -7,7 +7,7 @@ use crate::{
     fuzzy,
     fuzzy::CharBag,
     language::LanguageRegistry,
-    rpc::{self, proto},
+    rpc::{self, proto, Status},
     time::{self, ReplicaId},
     util::{Bias, TryFutureExt},
 };
@@ -27,6 +27,7 @@ use postage::{
     prelude::{Sink as _, Stream as _},
     watch,
 };
+use serde::Deserialize;
 use smol::channel::{self, Sender};
 use std::{
     cmp::{self, Ordering},
@@ -67,9 +68,9 @@ impl Entity for Worktree {
     fn release(&mut self, cx: &mut MutableAppContext) {
         let rpc = match self {
             Self::Local(tree) => tree
-                .share
-                .as_ref()
-                .map(|share| (tree.rpc.clone(), share.remote_id)),
+                .remote_id
+                .borrow()
+                .map(|remote_id| (tree.rpc.clone(), remote_id)),
             Self::Remote(tree) => Some((tree.rpc.clone(), tree.remote_id)),
         };
 
@@ -112,17 +113,10 @@ impl Worktree {
     pub async fn open_remote(
         rpc: Arc<rpc::Client>,
         id: u64,
-        access_token: String,
         languages: Arc<LanguageRegistry>,
         cx: &mut AsyncAppContext,
     ) -> Result<ModelHandle<Self>> {
-        let response = rpc
-            .request(proto::JoinWorktree {
-                worktree_id: id,
-                access_token,
-            })
-            .await?;
-
+        let response = rpc.request(proto::JoinWorktree { worktree_id: id }).await?;
         Worktree::remote(response, rpc, languages, cx).await
     }
 
@@ -136,7 +130,7 @@ impl Worktree {
             .worktree
             .ok_or_else(|| anyhow!("empty worktree"))?;
 
-        let remote_id = join_response.worktree_id;
+        let remote_id = worktree.id;
         let replica_id = join_response.replica_id as ReplicaId;
         let peers = join_response.peers;
         let root_char_bag: CharBag = worktree
@@ -650,10 +644,13 @@ impl Deref for Worktree {
 
 pub struct LocalWorktree {
     snapshot: Snapshot,
+    config: WorktreeConfig,
     background_snapshot: Arc<Mutex<Snapshot>>,
     last_scan_state_rx: watch::Receiver<ScanState>,
     _background_scanner_task: Option<Task<()>>,
+    _maintain_remote_id_task: Task<Option<()>>,
     poll_task: Option<Task<()>>,
+    remote_id: watch::Receiver<Option<u64>>,
     share: Option<ShareState>,
     open_buffers: HashMap<usize, WeakModelHandle<Buffer>>,
     shared_buffers: HashMap<PeerId, HashMap<u64, ModelHandle<Buffer>>>,
@@ -664,6 +661,11 @@ pub struct LocalWorktree {
     fs: Arc<dyn Fs>,
 }
 
+#[derive(Default, Deserialize)]
+struct WorktreeConfig {
+    collaborators: Vec<String>,
+}
+
 impl LocalWorktree {
     async fn new(
         rpc: Arc<rpc::Client>,
@@ -684,6 +686,13 @@ impl LocalWorktree {
         let root_char_bag = root_name.chars().map(|c| c.to_ascii_lowercase()).collect();
         let metadata = fs.metadata(&abs_path).await?;
 
+        let mut config = WorktreeConfig::default();
+        if let Ok(zed_toml) = fs.load(&abs_path.join(".zed.toml")).await {
+            if let Ok(parsed) = toml::from_str(&zed_toml) {
+                config = parsed;
+            }
+        }
+
         let (scan_states_tx, scan_states_rx) = smol::channel::unbounded();
         let (mut last_scan_state_tx, last_scan_state_rx) = watch::channel_with(ScanState::Scanning);
         let tree = cx.add_model(move |cx: &mut ModelContext<Worktree>| {
@@ -691,7 +700,7 @@ impl LocalWorktree {
                 id: cx.model_id(),
                 scan_id: 0,
                 abs_path,
-                root_name,
+                root_name: root_name.clone(),
                 root_char_bag,
                 ignores: Default::default(),
                 entries_by_path: Default::default(),
@@ -708,11 +717,48 @@ impl LocalWorktree {
                 ));
             }
 
+            let (mut remote_id_tx, remote_id_rx) = watch::channel();
+            let _maintain_remote_id_task = cx.spawn_weak({
+                let rpc = rpc.clone();
+                move |this, cx| {
+                    async move {
+                        let mut status = rpc.status();
+                        while let Some(status) = status.recv().await {
+                            if let Some(this) = this.upgrade(&cx) {
+                                let remote_id = if let Status::Connected { .. } = status {
+                                    let collaborator_logins = this.read_with(&cx, |this, _| {
+                                        this.as_local().unwrap().config.collaborators.clone()
+                                    });
+                                    let response = rpc
+                                        .request(proto::OpenWorktree {
+                                            root_name: root_name.clone(),
+                                            collaborator_logins,
+                                        })
+                                        .await?;
+
+                                    Some(response.worktree_id)
+                                } else {
+                                    None
+                                };
+                                if remote_id_tx.send(remote_id).await.is_err() {
+                                    break;
+                                }
+                            }
+                        }
+                        Ok(())
+                    }
+                    .log_err()
+                }
+            });
+
             let tree = Self {
                 snapshot: snapshot.clone(),
+                config,
+                remote_id: remote_id_rx,
                 background_snapshot: Arc::new(Mutex::new(snapshot)),
                 last_scan_state_rx,
                 _background_scanner_task: None,
+                _maintain_remote_id_task,
                 share: None,
                 poll_task: None,
                 open_buffers: Default::default(),
@@ -733,13 +779,10 @@ impl LocalWorktree {
                             let tree = this.as_local_mut().unwrap();
                             if !tree.is_scanning() {
                                 if let Some(share) = tree.share.as_ref() {
-                                    Some((tree.snapshot(), share.snapshots_tx.clone()))
-                                } else {
-                                    None
+                                    return Some((tree.snapshot(), share.snapshots_tx.clone()));
                                 }
-                            } else {
-                                None
                             }
+                            None
                         });
 
                         if let Some((snapshot, snapshots_to_send_tx)) = to_send {
@@ -894,6 +937,18 @@ impl LocalWorktree {
         }
     }
 
+    pub fn next_remote_id(&self) -> impl Future<Output = Option<u64>> {
+        let mut remote_id = self.remote_id.clone();
+        async move {
+            while let Some(remote_id) = remote_id.recv().await {
+                if remote_id.is_some() {
+                    return remote_id;
+                }
+            }
+            None
+        }
+    }
+
     fn is_scanning(&self) -> bool {
         if let ScanState::Scanning = *self.last_scan_state_rx.borrow() {
             true
@@ -979,17 +1034,19 @@ impl LocalWorktree {
         })
     }
 
-    pub fn share(
-        &mut self,
-        cx: &mut ModelContext<Worktree>,
-    ) -> Task<anyhow::Result<(u64, String)>> {
+    pub fn share(&mut self, cx: &mut ModelContext<Worktree>) -> Task<anyhow::Result<u64>> {
         let snapshot = self.snapshot();
         let share_request = self.share_request(cx);
         let rpc = self.rpc.clone();
         cx.spawn(|this, mut cx| async move {
-            let share_request = share_request.await;
+            let share_request = if let Some(request) = share_request.await {
+                request
+            } else {
+                return Err(anyhow!("failed to open worktree on the server"));
+            };
+
+            let remote_id = share_request.worktree.as_ref().unwrap().id;
             let share_response = rpc.request(share_request).await?;
-            let remote_id = share_response.worktree_id;
 
             log::info!("sharing worktree {:?}", share_response);
             let (snapshots_to_send_tx, snapshots_to_send_rx) =
@@ -1023,28 +1080,34 @@ impl LocalWorktree {
 
                 let worktree = worktree.as_local_mut().unwrap();
                 worktree.share = Some(ShareState {
-                    remote_id: share_response.worktree_id,
                     snapshots_tx: snapshots_to_send_tx,
                     _subscriptions,
                 });
             });
 
-            Ok((remote_id, share_response.access_token))
+            Ok(remote_id)
         })
     }
 
-    fn share_request(&self, cx: &mut ModelContext<Worktree>) -> Task<proto::ShareWorktree> {
+    fn share_request(&self, cx: &mut ModelContext<Worktree>) -> Task<Option<proto::ShareWorktree>> {
+        let remote_id = self.next_remote_id();
         let snapshot = self.snapshot();
         let root_name = self.root_name.clone();
         cx.background().spawn(async move {
-            let entries = snapshot
-                .entries_by_path
-                .cursor::<(), ()>()
-                .map(Into::into)
-                .collect();
-            proto::ShareWorktree {
-                worktree: Some(proto::Worktree { root_name, entries }),
-            }
+            remote_id.await.map(|id| {
+                let entries = snapshot
+                    .entries_by_path
+                    .cursor::<(), ()>()
+                    .map(Into::into)
+                    .collect();
+                proto::ShareWorktree {
+                    worktree: Some(proto::Worktree {
+                        id,
+                        root_name,
+                        entries,
+                    }),
+                }
+            })
         })
     }
 }
@@ -1082,7 +1145,6 @@ impl fmt::Debug for LocalWorktree {
 }
 
 struct ShareState {
-    remote_id: u64,
     snapshots_tx: Sender<Snapshot>,
     _subscriptions: Vec<rpc::Subscription>,
 }
@@ -1552,9 +1614,9 @@ impl File {
         self.worktree.update(cx, |worktree, cx| {
             if let Some((rpc, remote_id)) = match worktree {
                 Worktree::Local(worktree) => worktree
-                    .share
-                    .as_ref()
-                    .map(|share| (worktree.rpc.clone(), share.remote_id)),
+                    .remote_id
+                    .borrow()
+                    .map(|id| (worktree.rpc.clone(), id)),
                 Worktree::Remote(worktree) => Some((worktree.rpc.clone(), worktree.remote_id)),
             } {
                 cx.spawn(|worktree, mut cx| async move {
@@ -1639,7 +1701,7 @@ impl File {
         self.worktree.update(cx, |worktree, cx| match worktree {
             Worktree::Local(worktree) => {
                 let rpc = worktree.rpc.clone();
-                let worktree_id = worktree.share.as_ref().map(|share| share.remote_id);
+                let worktree_id = *worktree.remote_id.borrow();
                 let save = worktree.save(self.path.clone(), text, cx);
                 cx.background().spawn(async move {
                     let entry = save.await?;
@@ -2528,6 +2590,7 @@ impl<'a> TryFrom<(&'a CharBag, proto::Entry)> for Entry {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::fs::FakeFs;
     use crate::test::*;
     use anyhow::Result;
     use fs::RealFs;
@@ -2778,10 +2841,10 @@ mod tests {
             .update(&mut cx, |tree, cx| {
                 tree.as_local().unwrap().share_request(cx)
             })
-            .await;
+            .await
+            .unwrap();
         let remote = Worktree::remote(
             proto::JoinWorktreeResponse {
-                worktree_id,
                 worktree: share_request.worktree,
                 replica_id: 1,
                 peers: Vec::new(),
@@ -2925,6 +2988,65 @@ mod tests {
         });
     }
 
+    #[gpui::test]
+    async fn test_open_and_share_worktree(mut cx: gpui::TestAppContext) {
+        let user_id = 100;
+        let mut client = rpc::Client::new();
+        let server = FakeServer::for_client(user_id, &mut client, &cx).await;
+
+        let fs = Arc::new(FakeFs::new());
+        fs.insert_tree(
+            "/path",
+            json!({
+                "to": {
+                    "the-dir": {
+                        ".zed.toml": r#"collaborators = ["friend-1", "friend-2"]"#,
+                        "a.txt": "a-contents",
+                    },
+                },
+            }),
+        )
+        .await;
+
+        let worktree = Worktree::open_local(
+            client.clone(),
+            "/path/to/the-dir".as_ref(),
+            fs,
+            Default::default(),
+            &mut cx.to_async(),
+        )
+        .await
+        .unwrap();
+
+        {
+            let cx = cx.to_async();
+            client.authenticate_and_connect(&cx).await.unwrap();
+        }
+
+        let open_worktree = server.receive::<proto::OpenWorktree>().await.unwrap();
+        assert_eq!(
+            open_worktree.payload,
+            proto::OpenWorktree {
+                root_name: "the-dir".to_string(),
+                collaborator_logins: vec!["friend-1".to_string(), "friend-2".to_string()],
+            }
+        );
+
+        server
+            .respond(
+                open_worktree.receipt(),
+                proto::OpenWorktreeResponse { worktree_id: 5 },
+            )
+            .await;
+        let remote_id = worktree
+            .update(&mut cx, |tree, _| tree.as_local().unwrap().next_remote_id())
+            .await;
+        assert_eq!(remote_id, Some(5));
+
+        cx.update(move |_| drop(worktree));
+        server.receive::<proto::CloseWorktree>().await.unwrap();
+    }
+
     #[gpui::test(iterations = 100)]
     fn test_random(mut rng: StdRng) {
         let operations = env::var("OPERATIONS")

zrpc/proto/zed.proto 🔗

@@ -35,6 +35,9 @@ message Envelope {
         ChannelMessageSent channel_message_sent = 30;
         GetChannelMessages get_channel_messages = 31;
         GetChannelMessagesResponse get_channel_messages_response = 32;
+        OpenWorktree open_worktree = 33;
+        OpenWorktreeResponse open_worktree_response = 34;
+        UnshareWorktree unshare_worktree = 35;
     }
 }
 
@@ -48,22 +51,30 @@ message Error {
     string message = 1;
 }
 
+message OpenWorktree {
+    string root_name = 1;
+    repeated string collaborator_logins = 2;
+}
+
+message OpenWorktreeResponse {
+    uint64 worktree_id = 1;
+}
+
 message ShareWorktree {
     Worktree worktree = 1;
 }
 
-message ShareWorktreeResponse {
+message ShareWorktreeResponse {}
+
+message UnshareWorktree {
     uint64 worktree_id = 1;
-    string access_token = 2;
 }
 
 message JoinWorktree {
     uint64 worktree_id = 1;
-    string access_token = 2;
 }
 
 message JoinWorktreeResponse {
-    uint64 worktree_id = 1;
     Worktree worktree = 2;
     uint32 replica_id = 3;
     repeated Peer peers = 4;
@@ -187,8 +198,9 @@ message User {
 }
 
 message Worktree {
-    string root_name = 1;
-    repeated Entry entries = 2;
+    uint64 id = 1;
+    string root_name = 2;
+    repeated Entry entries = 3;
 }
 
 message Entry {

zrpc/src/proto.rs 🔗

@@ -135,11 +135,13 @@ messages!(
     GetUsersResponse,
     JoinChannel,
     JoinChannelResponse,
+    JoinWorktree,
+    JoinWorktreeResponse,
     LeaveChannel,
     OpenBuffer,
     OpenBufferResponse,
-    JoinWorktree,
-    JoinWorktreeResponse,
+    OpenWorktree,
+    OpenWorktreeResponse,
     Ping,
     RemovePeer,
     SaveBuffer,
@@ -147,6 +149,7 @@ messages!(
     SendChannelMessageResponse,
     ShareWorktree,
     ShareWorktreeResponse,
+    UnshareWorktree,
     UpdateBuffer,
     UpdateWorktree,
 );
@@ -157,10 +160,12 @@ request_messages!(
     (JoinChannel, JoinChannelResponse),
     (OpenBuffer, OpenBufferResponse),
     (JoinWorktree, JoinWorktreeResponse),
+    (OpenWorktree, OpenWorktreeResponse),
     (Ping, Ack),
     (SaveBuffer, BufferSaved),
     (UpdateBuffer, Ack),
     (ShareWorktree, ShareWorktreeResponse),
+    (UnshareWorktree, Ack),
     (SendChannelMessage, SendChannelMessageResponse),
     (GetChannelMessages, GetChannelMessagesResponse),
 );
@@ -175,6 +180,7 @@ entity_messages!(
     JoinWorktree,
     RemovePeer,
     SaveBuffer,
+    UnshareWorktree,
     UpdateBuffer,
     UpdateWorktree,
 );