Implement contact rejection

Antonio Scandurra created

Change summary

crates/collab/src/rpc.rs | 297 ++++++++++++++++++++++++++++++-----------
1 file changed, 218 insertions(+), 79 deletions(-)

Detailed changes

crates/collab/src/rpc.rs 🔗

@@ -270,7 +270,7 @@ impl Server {
             }
 
             let contacts = this.app_state.db.get_contacts(user_id).await?;
-            
+
             {
                 let mut store = this.store_mut().await;
                 store.add_connection(connection_id, user_id);
@@ -324,7 +324,7 @@ impl Server {
             if let Err(error) = this.sign_out(connection_id).await {
                 tracing::error!(%error, "error signing out");
             }
-            
+
             Ok(())
         }.instrument(span)
     }
@@ -332,7 +332,7 @@ impl Server {
     async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
         self.peer.disconnect(connection_id);
         let removed_connection = self.store_mut().await.remove_connection(connection_id)?;
-    
+
         for (project_id, project) in removed_connection.hosted_projects {
             if let Some(share) = project.share {
                 broadcast(
@@ -357,22 +357,26 @@ impl Server {
                 )
             });
         }
-                
-        let contacts_to_update = self.app_state.db.get_contacts(removed_connection.user_id).await?;
+
+        let contacts_to_update = self
+            .app_state
+            .db
+            .get_contacts(removed_connection.user_id)
+            .await?;
         let mut update = proto::UpdateContacts::default();
         update.contacts.push(proto::Contact {
             user_id: removed_connection.user_id.to_proto(),
             projects: Default::default(),
             online: false,
         });
-        
+
         let store = self.store().await;
         for user_id in contacts_to_update.current {
             for connection_id in store.connection_ids_for_user(user_id) {
-                self.peer.send(connection_id, update.clone());
+                self.peer.send(connection_id, update.clone()).trace_err();
             }
-        }        
-        
+        }
+
         Ok(())
     }
 
@@ -955,24 +959,26 @@ impl Server {
             .db
             .send_contact_request(requester_id, responder_id)
             .await?;
-        
+
         // Update outgoing contact requests of requester
         let mut update = proto::UpdateContacts::default();
         update.outgoing_requests.push(responder_id.to_proto());
         for connection_id in self.store().await.connection_ids_for_user(requester_id) {
             self.peer.send(connection_id, update.clone())?;
         }
-    
+
         // Update incoming contact requests of responder
         let mut update = proto::UpdateContacts::default();
-        update.incoming_requests.push(proto::IncomingContactRequest {
-            requester_id: requester_id.to_proto(),
-            should_notify: true,
-        });
+        update
+            .incoming_requests
+            .push(proto::IncomingContactRequest {
+                requester_id: requester_id.to_proto(),
+                should_notify: true,
+            });
         for connection_id in self.store().await.connection_ids_for_user(responder_id) {
             self.peer.send(connection_id, update.clone())?;
         }
-        
+
         response.send(proto::Ack {})?;
         Ok(())
     }
@@ -991,41 +997,51 @@ impl Server {
         let accept = request.payload.response == proto::ContactRequestResponse::Accept as i32;
         self.app_state
             .db
-            .respond_to_contact_request(
-                responder_id,
-                requester_id,
-                accept,
-            )
+            .respond_to_contact_request(responder_id, requester_id, accept)
             .await?;
-            
+
+        // Update responder with new contact
+        let mut update = proto::UpdateContacts::default();
         if accept {
-            // Update responder with new contact
-            let mut update = proto::UpdateContacts::default();
             update.contacts.push(proto::Contact {
                 user_id: requester_id.to_proto(),
                 projects: Default::default(), // TODO
-                online: true, // TODO
+                online: true,                 // TODO
             });
-            update.remove_incoming_requests.push(requester_id.to_proto());
-            for connection_id in self.store.read().await.connection_ids_for_user(responder_id) {
-                self.peer.send(connection_id, update.clone())?;
-            }
-            
-            // Update requester with new contact
-            let mut update = proto::UpdateContacts::default();
+        }
+        update
+            .remove_incoming_requests
+            .push(requester_id.to_proto());
+        for connection_id in self
+            .store
+            .read()
+            .await
+            .connection_ids_for_user(responder_id)
+        {
+            self.peer.send(connection_id, update.clone())?;
+        }
+
+        // Update requester with new contact
+        let mut update = proto::UpdateContacts::default();
+        if accept {
             update.contacts.push(proto::Contact {
                 user_id: responder_id.to_proto(),
                 projects: Default::default(), // TODO
-                online: true, // TODO
+                online: true,                 // TODO
             });
-            update.remove_outgoing_requests.push(responder_id.to_proto());
-            for connection_id in self.store.read().await.connection_ids_for_user(requester_id) {
-                self.peer.send(connection_id, update.clone())?;
-            }
-        } else {
-            todo!()
         }
-            
+        update
+            .remove_outgoing_requests
+            .push(responder_id.to_proto());
+        for connection_id in self
+            .store
+            .read()
+            .await
+            .connection_ids_for_user(requester_id)
+        {
+            self.peer.send(connection_id, update.clone())?;
+        }
+
         response.send(proto::Ack {})?;
         Ok(())
     }
@@ -1374,7 +1390,8 @@ pub async fn handle_websocket_request(
             .with(|message| async move { Ok(to_axum_message(message)) });
         let connection = Connection::new(Box::pin(socket));
         async move {
-            server.handle_connection(connection, socket_address, user_id, None, RealExecutor)
+            server
+                .handle_connection(connection, socket_address, user_id, None, RealExecutor)
                 .await
                 .log_err();
         }
@@ -5031,9 +5048,17 @@ mod tests {
                 .collect()
         }
     }
-    
+
     #[gpui::test(iterations = 10)]
-    async fn test_contact_requests(executor: Arc<Deterministic>, cx_a: &mut TestAppContext, cx_a2: &mut TestAppContext, cx_b: &mut TestAppContext, cx_b2: &mut TestAppContext) {
+    async fn test_contact_requests(
+        executor: Arc<Deterministic>,
+        cx_a: &mut TestAppContext,
+        cx_a2: &mut TestAppContext,
+        cx_b: &mut TestAppContext,
+        cx_b2: &mut TestAppContext,
+        cx_c: &mut TestAppContext,
+        cx_c2: &mut TestAppContext,
+    ) {
         cx_a.foreground().forbid_parking();
 
         // Connect to a server as 3 clients.
@@ -5042,10 +5067,14 @@ mod tests {
         let client_a2 = server.create_client(cx_a2, "user_a").await;
         let client_b = server.create_client(cx_b, "user_b").await;
         let client_b2 = server.create_client(cx_b2, "user_b").await;
-        
+        let client_c = server.create_client(cx_c, "user_c").await;
+        let client_c2 = server.create_client(cx_c2, "user_c").await;
+
         assert_eq!(client_a.user_id().unwrap(), client_a2.user_id().unwrap());
+        assert_eq!(client_b.user_id().unwrap(), client_b2.user_id().unwrap());
+        assert_eq!(client_c.user_id().unwrap(), client_c2.user_id().unwrap());
 
-        // User A requests that user B become their contact
+        // User A and User C request that user B become their contact.
         client_a
             .user_store
             .read_with(cx_a, |store, _| {
@@ -5053,36 +5082,78 @@ mod tests {
             })
             .await
             .unwrap();
+        client_c
+            .user_store
+            .read_with(cx_c, |store, _| {
+                store.request_contact(client_b.user_id().unwrap())
+            })
+            .await
+            .unwrap();
         executor.run_until_parked();
-        
-        // Both users see the pending request appear in all their clients.
-        assert_eq!(client_a.summarize_contacts(&cx_a).outgoing_requests, &["user_b"]);
-        assert_eq!(client_a2.summarize_contacts(&cx_a2).outgoing_requests, &["user_b"]);
-        assert_eq!(client_b.summarize_contacts(&cx_b).incoming_requests, &["user_a"]);
-        assert_eq!(client_b2.summarize_contacts(&cx_b2).incoming_requests, &["user_a"]);
-        
+
+        // All users see the pending request appear in all their clients.
+        assert_eq!(
+            client_a.summarize_contacts(&cx_a).outgoing_requests,
+            &["user_b"]
+        );
+        assert_eq!(
+            client_a2.summarize_contacts(&cx_a2).outgoing_requests,
+            &["user_b"]
+        );
+        assert_eq!(
+            client_b.summarize_contacts(&cx_b).incoming_requests,
+            &["user_a", "user_c"]
+        );
+        assert_eq!(
+            client_b2.summarize_contacts(&cx_b2).incoming_requests,
+            &["user_a", "user_c"]
+        );
+        assert_eq!(
+            client_c.summarize_contacts(&cx_c).outgoing_requests,
+            &["user_b"]
+        );
+        assert_eq!(
+            client_c2.summarize_contacts(&cx_c2).outgoing_requests,
+            &["user_b"]
+        );
+
         // Contact requests are present upon connecting (tested here via disconnect/reconnect)
         disconnect_and_reconnect(&client_a, cx_a).await;
         disconnect_and_reconnect(&client_b, cx_b).await;
+        disconnect_and_reconnect(&client_c, cx_c).await;
         executor.run_until_parked();
-        assert_eq!(client_a.summarize_contacts(&cx_a).outgoing_requests, &["user_b"]);
-        assert_eq!(client_b.summarize_contacts(&cx_b).incoming_requests, &["user_a"]);
-        
-        // User B accepts the request.
-        client_b.user_store.read_with(cx_b, |store, _| {
-            store.respond_to_contact_request(client_a.user_id().unwrap(), true)
-        }).await.unwrap();
+        assert_eq!(
+            client_a.summarize_contacts(&cx_a).outgoing_requests,
+            &["user_b"]
+        );
+        assert_eq!(
+            client_b.summarize_contacts(&cx_b).incoming_requests,
+            &["user_a", "user_c"]
+        );
+        assert_eq!(
+            client_c.summarize_contacts(&cx_c).outgoing_requests,
+            &["user_b"]
+        );
+
+        // User B accepts the request from user A.
+        client_b
+            .user_store
+            .read_with(cx_b, |store, _| {
+                store.respond_to_contact_request(client_a.user_id().unwrap(), true)
+            })
+            .await
+            .unwrap();
 
         executor.run_until_parked();
 
         // User B sees user A as their contact now in all client, and the incoming request from them is removed.
         let contacts_b = client_b.summarize_contacts(&cx_b);
         assert_eq!(contacts_b.current, &["user_a"]);
-        assert!(contacts_b.incoming_requests.is_empty());
+        assert_eq!(contacts_b.incoming_requests, &["user_c"]);
         let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
         assert_eq!(contacts_b2.current, &["user_a"]);
-        assert!(contacts_b2.incoming_requests.is_empty());
-        
+        assert_eq!(contacts_b2.incoming_requests, &["user_c"]);
+
         // User A sees user B as their contact now in all clients, and the outgoing request to them is removed.
         let contacts_a = client_a.summarize_contacts(&cx_a);
         assert_eq!(contacts_a.current, &["user_b"]);
@@ -5094,14 +5165,71 @@ mod tests {
         // Contacts are present upon connecting (tested here via disconnect/reconnect)
         disconnect_and_reconnect(&client_a, cx_a).await;
         disconnect_and_reconnect(&client_b, cx_b).await;
+        disconnect_and_reconnect(&client_c, cx_c).await;
+        executor.run_until_parked();
+        assert_eq!(client_a.summarize_contacts(&cx_a).current, &["user_b"]);
+        assert_eq!(client_b.summarize_contacts(&cx_b).current, &["user_a"]);
+        assert_eq!(
+            client_b.summarize_contacts(&cx_b).incoming_requests,
+            &["user_c"]
+        );
+        assert!(client_c.summarize_contacts(&cx_c).current.is_empty());
+        assert_eq!(
+            client_c.summarize_contacts(&cx_c).outgoing_requests,
+            &["user_b"]
+        );
+
+        // User B rejects the request from user C.
+        client_b
+            .user_store
+            .read_with(cx_b, |store, _| {
+                store.respond_to_contact_request(client_c.user_id().unwrap(), false)
+            })
+            .await
+            .unwrap();
+
+        executor.run_until_parked();
+
+        // User B doesn't see user C as their contact, and the incoming request from them is removed.
+        let contacts_b = client_b.summarize_contacts(&cx_b);
+        assert_eq!(contacts_b.current, &["user_a"]);
+        assert!(contacts_b.incoming_requests.is_empty());
+        let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
+        assert_eq!(contacts_b2.current, &["user_a"]);
+        assert!(contacts_b2.incoming_requests.is_empty());
+
+        // User C doesn't see user B as their contact, and the outgoing request to them is removed.
+        let contacts_c = client_c.summarize_contacts(&cx_c);
+        assert!(contacts_c.current.is_empty());
+        assert!(contacts_c.outgoing_requests.is_empty());
+        let contacts_c2 = client_c2.summarize_contacts(&cx_c2);
+        assert!(contacts_c2.current.is_empty());
+        assert!(contacts_c2.outgoing_requests.is_empty());
+
+        // Incoming/outgoing requests are not present upon connecting (tested here via disconnect/reconnect)
+        disconnect_and_reconnect(&client_a, cx_a).await;
+        disconnect_and_reconnect(&client_b, cx_b).await;
+        disconnect_and_reconnect(&client_c, cx_c).await;
         executor.run_until_parked();
         assert_eq!(client_a.summarize_contacts(&cx_a).current, &["user_b"]);
-        // assert_eq!(client_b.summarize_contacts(&cx_b).current, &["user_a"]);
-        
+        assert_eq!(client_b.summarize_contacts(&cx_b).current, &["user_a"]);
+        assert!(client_b
+            .summarize_contacts(&cx_b)
+            .incoming_requests
+            .is_empty());
+        assert!(client_c.summarize_contacts(&cx_c).current.is_empty());
+        assert!(client_c
+            .summarize_contacts(&cx_c)
+            .outgoing_requests
+            .is_empty());
+
         async fn disconnect_and_reconnect(client: &TestClient, cx: &mut TestAppContext) {
             client.disconnect(&cx.to_async()).unwrap();
             client.clear_contacts(cx);
-            client.authenticate_and_connect(false, &cx.to_async()).await.unwrap();
+            client
+                .authenticate_and_connect(false, &cx.to_async())
+                .await
+                .unwrap();
         }
     }
 
@@ -6143,11 +6271,12 @@ mod tests {
             });
 
             let http = FakeHttpClient::with_404_response();
-            let user_id = if let Ok(Some(user)) = self.app_state.db.get_user_by_github_login(name).await {
-                user.id
-            } else {
-                self.app_state.db.create_user(name, false).await.unwrap()
-            };
+            let user_id =
+                if let Ok(Some(user)) = self.app_state.db.get_user_by_github_login(name).await {
+                    user.id
+                } else {
+                    self.app_state.db.create_user(name, false).await.unwrap()
+                };
             let client_name = name.to_string();
             let mut client = Client::new(http.clone());
             let server = self.server.clone();
@@ -6299,7 +6428,7 @@ mod tests {
             &self.client
         }
     }
-    
+
     struct ContactsSummary {
         pub current: Vec<String>,
         pub outgoing_requests: Vec<String>,
@@ -6320,20 +6449,30 @@ mod tests {
                 .read_with(cx, |user_store, _| user_store.watch_current_user());
             while authed_user.next().await.unwrap().is_none() {}
         }
-        
+
         fn clear_contacts(&self, cx: &mut TestAppContext) {
             self.user_store.update(cx, |store, _| {
                 store.clear_contacts();
             });
         }
-        
+
         fn summarize_contacts(&self, cx: &TestAppContext) -> ContactsSummary {
-            self.user_store.read_with(cx, |store, cx| {
-                ContactsSummary {
-                    current: store.contacts().iter().map(|contact| contact.user.github_login.clone()).collect(),
-                    outgoing_requests: store.outgoing_contact_requests().iter().map(|user| user.github_login.clone()).collect(),
-                    incoming_requests: store.incoming_contact_requests().iter().map(|user| user.github_login.clone()).collect(),
-                }            
+            self.user_store.read_with(cx, |store, cx| ContactsSummary {
+                current: store
+                    .contacts()
+                    .iter()
+                    .map(|contact| contact.user.github_login.clone())
+                    .collect(),
+                outgoing_requests: store
+                    .outgoing_contact_requests()
+                    .iter()
+                    .map(|user| user.github_login.clone())
+                    .collect(),
+                incoming_requests: store
+                    .incoming_contact_requests()
+                    .iter()
+                    .map(|user| user.github_login.clone())
+                    .collect(),
             })
         }