WIP

Antonio Scandurra created

Change summary

crates/project/src/worktree.rs | 110 ++++++++++++++++++-----------------
crates/rpc/proto/zed.proto     |   7 -
crates/rpc/src/proto.rs        |   3 
crates/server/src/rpc.rs       |  81 ++++++++------------------
crates/server/src/rpc/store.rs |  30 ---------
5 files changed, 84 insertions(+), 147 deletions(-)

Detailed changes

crates/project/src/worktree.rs 🔗

@@ -43,7 +43,7 @@ use std::{
     time::{Duration, SystemTime},
 };
 use sum_tree::{Bias, Edit, SeekTarget, SumTree, TreeMap};
-use util::ResultExt;
+use util::{ResultExt, TryFutureExt};
 
 lazy_static! {
     static ref GITIGNORE: &'static OsStr = OsStr::new(".gitignore");
@@ -137,7 +137,7 @@ enum Registration {
 struct ShareState {
     project_id: u64,
     snapshots_tx: Sender<LocalSnapshot>,
-    _maintain_remote_snapshot: Option<Task<()>>,
+    _maintain_remote_snapshot: Option<Task<Option<()>>>,
 }
 
 #[derive(Default, Deserialize)]
@@ -737,6 +737,7 @@ impl LocalWorktree {
             worktree_id: self.id().to_proto(),
             root_name: self.root_name().to_string(),
             authorized_logins: self.authorized_logins(),
+            weak: self.weak,
         };
         cx.spawn(|this, mut cx| async move {
             let response = client.request(register_message).await;
@@ -760,61 +761,66 @@ impl LocalWorktree {
         &mut self,
         project_id: u64,
         cx: &mut ModelContext<Worktree>,
-    ) -> Task<anyhow::Result<()>> {
-        if self.share.is_some() {
-            return Task::ready(Ok(()));
-        }
-
-        let snapshot = self.snapshot();
-        let rpc = self.client.clone();
-        let worktree_id = cx.model_id() as u64;
-        let (snapshots_to_send_tx, snapshots_to_send_rx) =
-            smol::channel::unbounded::<LocalSnapshot>();
+    ) -> impl Future<Output = Result<()>> {
         let (mut share_tx, mut share_rx) = oneshot::channel();
-        let maintain_remote_snapshot = cx.background().spawn({
-            let rpc = rpc.clone();
-            let snapshot = snapshot.clone();
-            let diagnostic_summaries = self.diagnostic_summaries.clone();
-            let weak = self.weak;
-            async move {
-                if let Err(error) = rpc
-                    .request(proto::ShareWorktree {
-                        project_id,
-                        worktree: Some(snapshot.to_proto(&diagnostic_summaries, weak)),
-                    })
-                    .await
-                {
-                    let _ = share_tx.try_send(Err(error));
-                    return;
-                } else {
-                    let _ = share_tx.try_send(Ok(()));
-                }
+        if self.share.is_some() {
+            let _ = share_tx.try_send(Ok(()));
+        } else {
+            let snapshot = self.snapshot();
+            let rpc = self.client.clone();
+            let worktree_id = cx.model_id() as u64;
+            let (snapshots_to_send_tx, snapshots_to_send_rx) =
+                smol::channel::unbounded::<LocalSnapshot>();
+            let (mut share_tx, mut share_rx) = oneshot::channel();
+            let maintain_remote_snapshot = cx.background().spawn({
+                let rpc = rpc.clone();
+                let snapshot = snapshot.clone();
+                let diagnostic_summaries = self.diagnostic_summaries.clone();
+                let weak = self.weak;
+                async move {
+                    if let Err(error) = rpc
+                        .request(proto::UpdateWorktree {
+                            project_id,
+                            worktree_id,
+                            root_name: snapshot.root_name().to_string(),
+                            updated_entries: snapshot
+                                .entries_by_path
+                                .iter()
+                                .filter(|e| !e.is_ignored)
+                                .map(Into::into)
+                                .collect(),
+                            removed_entries: Default::default(),
+                        })
+                        .await
+                    {
+                        let _ = share_tx.try_send(Err(error));
+                        return Err(anyhow!("failed to send initial update worktree"));
+                    } else {
+                        let _ = share_tx.try_send(Ok(()));
+                    }
 
-                let mut prev_snapshot = snapshot;
-                while let Ok(snapshot) = snapshots_to_send_rx.recv().await {
-                    let message =
-                        snapshot.build_update(&prev_snapshot, project_id, worktree_id, false);
-                    match rpc.request(message).await {
-                        Ok(_) => {
-                            prev_snapshot = snapshot;
-                        }
-                        Err(err) => log::error!("error sending snapshot diff {}", err),
+                    let mut prev_snapshot = snapshot;
+                    while let Ok(snapshot) = snapshots_to_send_rx.recv().await {
+                        let message =
+                            snapshot.build_update(&prev_snapshot, project_id, worktree_id, false);
+                        rpc.request(message).await?;
+                        prev_snapshot = snapshot;
                     }
+
+                    Ok::<_, anyhow::Error>(())
                 }
-            }
-        });
-        self.share = Some(ShareState {
-            project_id,
-            snapshots_tx: snapshots_to_send_tx,
-            _maintain_remote_snapshot: Some(maintain_remote_snapshot),
-        });
+                .log_err()
+            });
+            self.share = Some(ShareState {
+                project_id,
+                snapshots_tx: snapshots_to_send_tx,
+                _maintain_remote_snapshot: Some(maintain_remote_snapshot),
+            });
+        }
 
-        cx.foreground().spawn(async move {
-            match share_rx.next().await {
-                Some(result) => result,
-                None => Err(anyhow!("unshared before sharing completed")),
-            }
-        })
+        async move {
+            share_rx.next().await;
+        }
     }
 
     pub fn unshare(&mut self) {

crates/rpc/proto/zed.proto 🔗

@@ -34,7 +34,6 @@ message Envelope {
 
         RegisterWorktree register_worktree = 28;
         UnregisterWorktree unregister_worktree = 29;
-        ShareWorktree share_worktree = 30;
         UpdateWorktree update_worktree = 31;
         UpdateDiagnosticSummary update_diagnostic_summary = 32;
         DiskBasedDiagnosticsUpdating disk_based_diagnostics_updating = 33;
@@ -132,6 +131,7 @@ message RegisterWorktree {
     uint64 worktree_id = 2;
     string root_name = 3;
     repeated string authorized_logins = 4;
+    bool weak = 5;
 }
 
 message UnregisterWorktree {
@@ -139,11 +139,6 @@ message UnregisterWorktree {
     uint64 worktree_id = 2;
 }
 
-message ShareWorktree {
-    uint64 project_id = 1;
-    Worktree worktree = 2;
-}
-
 message UpdateWorktree {
     uint64 project_id = 1;
     uint64 worktree_id = 2;

crates/rpc/src/proto.rs 🔗

@@ -188,7 +188,6 @@ messages!(
     (SendChannelMessage, Foreground),
     (SendChannelMessageResponse, Foreground),
     (ShareProject, Foreground),
-    (ShareWorktree, Foreground),
     (Test, Foreground),
     (UnregisterProject, Foreground),
     (UnregisterWorktree, Foreground),
@@ -228,7 +227,6 @@ request_messages!(
     (SaveBuffer, BufferSaved),
     (SendChannelMessage, SendChannelMessageResponse),
     (ShareProject, Ack),
-    (ShareWorktree, Ack),
     (Test, Test),
     (UpdateBuffer, Ack),
     (UpdateWorktree, Ack),
@@ -259,7 +257,6 @@ entity_messages!(
     PrepareRename,
     RemoveProjectCollaborator,
     SaveBuffer,
-    ShareWorktree,
     UnregisterWorktree,
     UnshareProject,
     UpdateBuffer,

crates/server/src/rpc.rs 🔗

@@ -16,7 +16,7 @@ use rpc::{
     Connection, ConnectionId, Peer, TypedEnvelope,
 };
 use sha1::{Digest as _, Sha1};
-use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
+use std::{any::TypeId, future::Future, sync::Arc, time::Instant};
 use store::{Store, Worktree};
 use surf::StatusCode;
 use tide::log;
@@ -73,7 +73,6 @@ impl Server {
             .add_message_handler(Server::leave_project)
             .add_request_handler(Server::register_worktree)
             .add_message_handler(Server::unregister_worktree)
-            .add_request_handler(Server::share_worktree)
             .add_request_handler(Server::update_worktree)
             .add_message_handler(Server::update_diagnostic_summary)
             .add_message_handler(Server::disk_based_diagnostics_updating)
@@ -419,23 +418,34 @@ impl Server {
 
         let mut contact_user_ids = HashSet::default();
         contact_user_ids.insert(host_user_id);
-        for github_login in request.payload.authorized_logins {
-            let contact_user_id = self.app_state.db.create_user(&github_login, false).await?;
+        for github_login in &request.payload.authorized_logins {
+            let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
             contact_user_ids.insert(contact_user_id);
         }
 
         let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
-        self.state_mut().register_worktree(
-            request.payload.project_id,
-            request.payload.worktree_id,
-            request.sender_id,
-            Worktree {
-                authorized_user_ids: contact_user_ids.clone(),
-                root_name: request.payload.root_name,
-                share: None,
-                weak: false,
-            },
-        )?;
+        let guest_connection_ids;
+        {
+            let mut state = self.state_mut();
+            guest_connection_ids = state
+                .read_project(request.payload.project_id, request.sender_id)?
+                .guest_connection_ids();
+            state.register_worktree(
+                request.payload.project_id,
+                request.payload.worktree_id,
+                request.sender_id,
+                Worktree {
+                    authorized_user_ids: contact_user_ids.clone(),
+                    root_name: request.payload.root_name.clone(),
+                    share: None,
+                    weak: request.payload.weak,
+                },
+            )?;
+        }
+        broadcast(request.sender_id, guest_connection_ids, |connection_id| {
+            self.peer
+                .forward_send(request.sender_id, connection_id, request.payload.clone())
+        })?;
         self.update_contacts_for_users(&contact_user_ids)?;
         Ok(proto::Ack {})
     }
@@ -462,47 +472,6 @@ impl Server {
         Ok(())
     }
 
-    async fn share_worktree(
-        mut self: Arc<Server>,
-        mut request: TypedEnvelope<proto::ShareWorktree>,
-    ) -> tide::Result<proto::Ack> {
-        let worktree = request
-            .payload
-            .worktree
-            .as_mut()
-            .ok_or_else(|| anyhow!("missing worktree"))?;
-        let entries = worktree
-            .entries
-            .iter()
-            .map(|entry| (entry.id, entry.clone()))
-            .collect();
-        let diagnostic_summaries = worktree
-            .diagnostic_summaries
-            .iter()
-            .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
-            .collect();
-
-        let shared_worktree = self.state_mut().share_worktree(
-            request.payload.project_id,
-            worktree.id,
-            request.sender_id,
-            entries,
-            diagnostic_summaries,
-        )?;
-
-        broadcast(
-            request.sender_id,
-            shared_worktree.connection_ids,
-            |connection_id| {
-                self.peer
-                    .forward_send(request.sender_id, connection_id, request.payload.clone())
-            },
-        )?;
-        self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?;
-
-        Ok(proto::Ack {})
-    }
-
     async fn update_worktree(
         mut self: Arc<Server>,
         request: TypedEnvelope<proto::UpdateWorktree>,

crates/server/src/rpc/store.rs 🔗

@@ -396,36 +396,6 @@ impl Store {
         }
     }
 
-    pub fn share_worktree(
-        &mut self,
-        project_id: u64,
-        worktree_id: u64,
-        connection_id: ConnectionId,
-        entries: HashMap<u64, proto::Entry>,
-        diagnostic_summaries: BTreeMap<PathBuf, proto::DiagnosticSummary>,
-    ) -> tide::Result<SharedWorktree> {
-        let project = self
-            .projects
-            .get_mut(&project_id)
-            .ok_or_else(|| anyhow!("no such project"))?;
-        let worktree = project
-            .worktrees
-            .get_mut(&worktree_id)
-            .ok_or_else(|| anyhow!("no such worktree"))?;
-        if project.host_connection_id == connection_id && project.share.is_some() {
-            worktree.share = Some(WorktreeShare {
-                entries,
-                diagnostic_summaries,
-            });
-            Ok(SharedWorktree {
-                authorized_user_ids: project.authorized_user_ids(),
-                connection_ids: project.guest_connection_ids(),
-            })
-        } else {
-            Err(anyhow!("no such worktree"))?
-        }
-    }
-
     pub fn update_diagnostic_summary(
         &mut self,
         project_id: u64,