Fix room disconnection problems when creating room and sharing project

Antonio Scandurra created

Change summary

crates/call/src/participant.rs         |  2 
crates/call/src/room.rs                | 24 ++++--
crates/collab/src/integration_tests.rs | 19 +++++
crates/project/src/project.rs          | 89 +++++++++++++--------------
crates/rpc/src/peer.rs                 |  6 +
5 files changed, 80 insertions(+), 60 deletions(-)

Detailed changes

crates/call/src/participant.rs 🔗

@@ -20,7 +20,7 @@ impl ParticipantLocation {
     }
 }
 
-#[derive(Clone)]
+#[derive(Clone, Debug)]
 pub struct RemoteParticipant {
     pub user: Arc<User>,
     pub project_ids: Vec<u64>,

crates/call/src/room.rs 🔗

@@ -22,6 +22,7 @@ pub struct Room {
     remote_participants: HashMap<PeerId, RemoteParticipant>,
     pending_users: Vec<Arc<User>>,
     pending_call_count: usize,
+    leave_when_empty: bool,
     client: Arc<Client>,
     user_store: ModelHandle<UserStore>,
     subscriptions: Vec<client::Subscription>,
@@ -65,6 +66,7 @@ impl Room {
             pending_users: Default::default(),
             pending_call_count: 0,
             subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
+            leave_when_empty: false,
             _pending_room_update: None,
             client,
             user_store,
@@ -81,17 +83,13 @@ impl Room {
         cx.spawn(|mut cx| async move {
             let response = client.request(proto::CreateRoom {}).await?;
             let room = cx.add_model(|cx| Self::new(response.id, client, user_store, cx));
+
             let initial_project_id = if let Some(initial_project) = initial_project {
                 let initial_project_id = room
                     .update(&mut cx, |room, cx| {
                         room.share_project(initial_project.clone(), cx)
                     })
                     .await?;
-                initial_project
-                    .update(&mut cx, |project, cx| {
-                        project.shared(initial_project_id, cx)
-                    })
-                    .await?;
                 Some(initial_project_id)
             } else {
                 None
@@ -103,8 +101,11 @@ impl Room {
                 })
                 .await
             {
-                Ok(()) => Ok(room),
-                Err(_) => Err(anyhow!("call failed")),
+                Ok(()) => {
+                    room.update(&mut cx, |room, _| room.leave_when_empty = true);
+                    Ok(room)
+                }
+                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
             }
         })
     }
@@ -120,13 +121,18 @@ impl Room {
             let response = client.request(proto::JoinRoom { id: room_id }).await?;
             let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
             let room = cx.add_model(|cx| Self::new(room_id, client, user_store, cx));
-            room.update(&mut cx, |room, cx| room.apply_room_update(room_proto, cx))?;
+            room.update(&mut cx, |room, cx| {
+                room.leave_when_empty = true;
+                room.apply_room_update(room_proto, cx)?;
+                anyhow::Ok(())
+            })?;
             Ok(room)
         })
     }
 
     fn should_leave(&self) -> bool {
-        self.pending_users.is_empty()
+        self.leave_when_empty
+            && self.pending_users.is_empty()
             && self.remote_participants.is_empty()
             && self.pending_call_count == 0
     }

crates/collab/src/integration_tests.rs 🔗

@@ -504,9 +504,10 @@ async fn test_share_project(
     let client_a = server.create_client(cx_a, "user_a").await;
     let client_b = server.create_client(cx_b, "user_b").await;
     server
-        .create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)])
+        .make_contacts(&mut [(&client_a, cx_a), (&client_b, cx_b)])
         .await;
     let active_call_a = cx_a.read(ActiveCall::global);
+    let active_call_b = cx_b.read(ActiveCall::global);
 
     client_a
         .fs
@@ -524,13 +525,25 @@ async fn test_share_project(
         )
         .await;
 
+    // Invite client B to collaborate on a project
     let (project_a, worktree_id) = client_a.build_local_project("/a", cx_a).await;
-    let project_id = active_call_a
-        .update(cx_a, |call, cx| call.share_project(project_a.clone(), cx))
+    active_call_a
+        .update(cx_a, |call, cx| {
+            call.invite(client_b.user_id().unwrap(), Some(project_a.clone()), cx)
+        })
         .await
         .unwrap();
 
     // Join that project as client B
+    let incoming_call_b = active_call_b.read_with(cx_b, |call, _| call.incoming());
+    deterministic.run_until_parked();
+    let call = incoming_call_b.borrow().clone().unwrap();
+    assert_eq!(call.caller.github_login, "user_a");
+    let project_id = call.initial_project_id.unwrap();
+    active_call_b
+        .update(cx_b, |call, cx| call.accept_incoming(cx))
+        .await
+        .unwrap();
     let client_b_peer_id = client_b.peer_id;
     let project_b = client_b.build_remote_project(project_id, cx_b).await;
     let replica_id_b = project_b.read_with(cx_b, |project, _| project.replica_id());

crates/project/src/project.rs 🔗

@@ -1054,62 +1054,59 @@ impl Project {
                 return Task::ready(Err(anyhow!("project was already shared")));
             }
 
-            cx.spawn(|this, mut cx| async move {
-                let mut worktree_share_tasks = Vec::new();
-                this.update(&mut cx, |this, cx| {
-                    if let ProjectClientState::Local { remote_id, .. } = &mut this.client_state {
-                        *remote_id = Some(project_id);
-                    }
+            *remote_id = Some(project_id);
 
-                    for open_buffer in this.opened_buffers.values_mut() {
-                        match open_buffer {
-                            OpenBuffer::Strong(_) => {}
-                            OpenBuffer::Weak(buffer) => {
-                                if let Some(buffer) = buffer.upgrade(cx) {
-                                    *open_buffer = OpenBuffer::Strong(buffer);
-                                }
-                            }
-                            OpenBuffer::Operations(_) => unreachable!(),
+            let mut worktree_share_tasks = Vec::new();
+
+            for open_buffer in self.opened_buffers.values_mut() {
+                match open_buffer {
+                    OpenBuffer::Strong(_) => {}
+                    OpenBuffer::Weak(buffer) => {
+                        if let Some(buffer) = buffer.upgrade(cx) {
+                            *open_buffer = OpenBuffer::Strong(buffer);
                         }
                     }
+                    OpenBuffer::Operations(_) => unreachable!(),
+                }
+            }
 
-                    for worktree_handle in this.worktrees.iter_mut() {
-                        match worktree_handle {
-                            WorktreeHandle::Strong(_) => {}
-                            WorktreeHandle::Weak(worktree) => {
-                                if let Some(worktree) = worktree.upgrade(cx) {
-                                    *worktree_handle = WorktreeHandle::Strong(worktree);
-                                }
-                            }
+            for worktree_handle in self.worktrees.iter_mut() {
+                match worktree_handle {
+                    WorktreeHandle::Strong(_) => {}
+                    WorktreeHandle::Weak(worktree) => {
+                        if let Some(worktree) = worktree.upgrade(cx) {
+                            *worktree_handle = WorktreeHandle::Strong(worktree);
                         }
                     }
+                }
+            }
 
-                    for worktree in this.worktrees(cx).collect::<Vec<_>>() {
-                        worktree.update(cx, |worktree, cx| {
-                            let worktree = worktree.as_local_mut().unwrap();
-                            worktree_share_tasks.push(worktree.share(project_id, cx));
-                        });
-                    }
+            for worktree in self.worktrees(cx).collect::<Vec<_>>() {
+                worktree.update(cx, |worktree, cx| {
+                    let worktree = worktree.as_local_mut().unwrap();
+                    worktree_share_tasks.push(worktree.share(project_id, cx));
+                });
+            }
 
-                    for (server_id, status) in &this.language_server_statuses {
-                        this.client
-                            .send(proto::StartLanguageServer {
-                                project_id,
-                                server: Some(proto::LanguageServer {
-                                    id: *server_id as u64,
-                                    name: status.name.clone(),
-                                }),
-                            })
-                            .log_err();
-                    }
+            for (server_id, status) in &self.language_server_statuses {
+                self.client
+                    .send(proto::StartLanguageServer {
+                        project_id,
+                        server: Some(proto::LanguageServer {
+                            id: *server_id as u64,
+                            name: status.name.clone(),
+                        }),
+                    })
+                    .log_err();
+            }
 
-                    this.client_subscriptions
-                        .push(this.client.add_model_for_remote_entity(project_id, cx));
-                    this.metadata_changed(cx);
-                    cx.emit(Event::RemoteIdChanged(Some(project_id)));
-                    cx.notify();
-                });
+            self.client_subscriptions
+                .push(self.client.add_model_for_remote_entity(project_id, cx));
+            self.metadata_changed(cx);
+            cx.emit(Event::RemoteIdChanged(Some(project_id)));
+            cx.notify();
 
+            cx.foreground().spawn(async move {
                 futures::future::try_join_all(worktree_share_tasks).await?;
                 Ok(())
             })

crates/rpc/src/peer.rs 🔗

@@ -394,7 +394,11 @@ impl Peer {
             send?;
             let (response, _barrier) = rx.await.map_err(|_| anyhow!("connection was closed"))?;
             if let Some(proto::envelope::Payload::Error(error)) = &response.payload {
-                Err(anyhow!("RPC request failed - {}", error.message))
+                Err(anyhow!(
+                    "RPC request {} failed - {}",
+                    T::NAME,
+                    error.message
+                ))
             } else {
                 T::Response::from_envelope(response)
                     .ok_or_else(|| anyhow!("received response of the wrong type"))