@@ -1,6 +1,6 @@
use super::{http::HttpClient, proto, Client, Status, TypedEnvelope};
use anyhow::{anyhow, Context, Result};
-use futures::{future, AsyncReadExt};
+use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
use postage::{prelude::Stream, sink::Sink, watch};
use rpc::proto::{RequestMessage, UsersResponse};
@@ -42,7 +42,7 @@ pub enum ContactRequestStatus {
pub struct UserStore {
users: HashMap<u64, Arc<User>>,
- update_contacts_tx: watch::Sender<Option<proto::UpdateContacts>>,
+ update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
current_user: watch::Receiver<Option<Arc<User>>>,
contacts: Vec<Arc<Contact>>,
incoming_contact_requests: Vec<Arc<User>>,
@@ -60,6 +60,11 @@ impl Entity for UserStore {
type Event = Event;
}
+enum UpdateContacts {
+ Update(proto::UpdateContacts),
+ Clear(postage::barrier::Sender),
+}
+
impl UserStore {
pub fn new(
client: Arc<Client>,
@@ -67,8 +72,7 @@ impl UserStore {
cx: &mut ModelContext<Self>,
) -> Self {
let (mut current_user_tx, current_user_rx) = watch::channel();
- let (update_contacts_tx, mut update_contacts_rx) =
- watch::channel::<Option<proto::UpdateContacts>>();
+ let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
let rpc_subscription =
client.add_message_handler(cx.handle(), Self::handle_update_contacts);
Self {
@@ -82,8 +86,8 @@ impl UserStore {
http,
_maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
let _subscription = rpc_subscription;
- while let Some(message) = update_contacts_rx.recv().await {
- if let Some((message, this)) = message.zip(this.upgrade(&cx)) {
+ while let Some(message) = update_contacts_rx.next().await {
+ if let Some(this) = this.upgrade(&cx) {
this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
.log_err()
.await;
@@ -121,114 +125,130 @@ impl UserStore {
mut cx: AsyncAppContext,
) -> Result<()> {
this.update(&mut cx, |this, _| {
- *this.update_contacts_tx.borrow_mut() = Some(msg.payload);
+ this.update_contacts_tx
+ .unbounded_send(UpdateContacts::Update(msg.payload))
+ .unwrap();
});
Ok(())
}
fn update_contacts(
&mut self,
- message: proto::UpdateContacts,
+ message: UpdateContacts,
cx: &mut ModelContext<Self>,
) -> Task<Result<()>> {
- log::info!("update contacts on client {:?}", message);
- let mut user_ids = HashSet::new();
- for contact in &message.contacts {
- user_ids.insert(contact.user_id);
- user_ids.extend(contact.projects.iter().flat_map(|w| &w.guests).copied());
- }
- user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
- user_ids.extend(message.outgoing_requests.iter());
-
- let load_users = self.get_users(user_ids.into_iter().collect(), cx);
- cx.spawn(|this, mut cx| async move {
- load_users.await?;
-
- // Users are fetched in parallel above and cached in call to get_users
- // No need to paralellize here
- let mut updated_contacts = Vec::new();
- for contact in message.contacts {
- updated_contacts.push(Arc::new(
- Contact::from_proto(contact, &this, &mut cx).await?,
- ));
- }
-
- let mut incoming_requests = Vec::new();
- for request in message.incoming_requests {
- incoming_requests.push(
- this.update(&mut cx, |this, cx| {
- this.fetch_user(request.requester_id, cx)
- })
- .await?,
- );
+ match message {
+ UpdateContacts::Clear(barrier) => {
+ self.contacts.clear();
+ self.incoming_contact_requests.clear();
+ self.outgoing_contact_requests.clear();
+ drop(barrier);
+ Task::ready(Ok(()))
}
-
- let mut outgoing_requests = Vec::new();
- for requested_user_id in message.outgoing_requests {
- outgoing_requests.push(
- this.update(&mut cx, |this, cx| this.fetch_user(requested_user_id, cx))
- .await?,
+ UpdateContacts::Update(message) => {
+ log::info!(
+ "update contacts on client {}: {:?}",
+ self.client.upgrade().unwrap().id,
+ message
);
- }
-
- let removed_contacts =
- HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
- let removed_incoming_requests =
- HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
- let removed_outgoing_requests =
- HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
-
- this.update(&mut cx, |this, cx| {
- // Remove contacts
- this.contacts
- .retain(|contact| !removed_contacts.contains(&contact.user.id));
- // Update existing contacts and insert new ones
- for updated_contact in updated_contacts {
- match this
- .contacts
- .binary_search_by_key(&&updated_contact.user.github_login, |contact| {
- &contact.user.github_login
- }) {
- Ok(ix) => this.contacts[ix] = updated_contact,
- Err(ix) => this.contacts.insert(ix, updated_contact),
- }
+ let mut user_ids = HashSet::new();
+ for contact in &message.contacts {
+ user_ids.insert(contact.user_id);
+ user_ids.extend(contact.projects.iter().flat_map(|w| &w.guests).copied());
}
+ user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
+ user_ids.extend(message.outgoing_requests.iter());
+
+ let load_users = self.get_users(user_ids.into_iter().collect(), cx);
+ cx.spawn(|this, mut cx| async move {
+ load_users.await?;
+
+ // Users are fetched in parallel above and cached in call to get_users
+ // No need to paralellize here
+ let mut updated_contacts = Vec::new();
+ for contact in message.contacts {
+ updated_contacts.push(Arc::new(
+ Contact::from_proto(contact, &this, &mut cx).await?,
+ ));
+ }
- // Remove incoming contact requests
- this.incoming_contact_requests
- .retain(|user| !removed_incoming_requests.contains(&user.id));
- // Update existing incoming requests and insert new ones
- for request in incoming_requests {
- match this
- .incoming_contact_requests
- .binary_search_by_key(&&request.github_login, |contact| {
- &contact.github_login
- }) {
- Ok(ix) => this.incoming_contact_requests[ix] = request,
- Err(ix) => this.incoming_contact_requests.insert(ix, request),
+ let mut incoming_requests = Vec::new();
+ for request in message.incoming_requests {
+ incoming_requests.push(
+ this.update(&mut cx, |this, cx| {
+ this.fetch_user(request.requester_id, cx)
+ })
+ .await?,
+ );
}
- }
- // Remove outgoing contact requests
- this.outgoing_contact_requests
- .retain(|user| !removed_outgoing_requests.contains(&user.id));
- // Update existing incoming requests and insert new ones
- for request in outgoing_requests {
- match this
- .outgoing_contact_requests
- .binary_search_by_key(&&request.github_login, |contact| {
- &contact.github_login
- }) {
- Ok(ix) => this.outgoing_contact_requests[ix] = request,
- Err(ix) => this.outgoing_contact_requests.insert(ix, request),
+ let mut outgoing_requests = Vec::new();
+ for requested_user_id in message.outgoing_requests {
+ outgoing_requests.push(
+ this.update(&mut cx, |this, cx| this.fetch_user(requested_user_id, cx))
+ .await?,
+ );
}
- }
- cx.notify();
- });
+ let removed_contacts =
+ HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
+ let removed_incoming_requests =
+ HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
+ let removed_outgoing_requests =
+ HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
- Ok(())
- })
+ this.update(&mut cx, |this, cx| {
+ // Remove contacts
+ this.contacts
+ .retain(|contact| !removed_contacts.contains(&contact.user.id));
+ // Update existing contacts and insert new ones
+ for updated_contact in updated_contacts {
+ match this.contacts.binary_search_by_key(
+ &&updated_contact.user.github_login,
+ |contact| &contact.user.github_login,
+ ) {
+ Ok(ix) => this.contacts[ix] = updated_contact,
+ Err(ix) => this.contacts.insert(ix, updated_contact),
+ }
+ }
+
+ // Remove incoming contact requests
+ this.incoming_contact_requests
+ .retain(|user| !removed_incoming_requests.contains(&user.id));
+ // Update existing incoming requests and insert new ones
+ for request in incoming_requests {
+ match this
+ .incoming_contact_requests
+ .binary_search_by_key(&&request.github_login, |contact| {
+ &contact.github_login
+ }) {
+ Ok(ix) => this.incoming_contact_requests[ix] = request,
+ Err(ix) => this.incoming_contact_requests.insert(ix, request),
+ }
+ }
+
+ // Remove outgoing contact requests
+ this.outgoing_contact_requests
+ .retain(|user| !removed_outgoing_requests.contains(&user.id));
+ // Update existing incoming requests and insert new ones
+ for request in outgoing_requests {
+ match this
+ .outgoing_contact_requests
+ .binary_search_by_key(&&request.github_login, |contact| {
+ &contact.github_login
+ }) {
+ Ok(ix) => this.outgoing_contact_requests[ix] = request,
+ Err(ix) => this.outgoing_contact_requests.insert(ix, request),
+ }
+ }
+
+ cx.notify();
+ });
+
+ Ok(())
+ })
+ }
+ }
}
pub fn contacts(&self) -> &[Arc<Contact>] {
@@ -342,11 +362,14 @@ impl UserStore {
})
}
- #[cfg(any(test, feature = "test-support"))]
- pub fn clear_contacts(&mut self) {
- self.contacts.clear();
- self.incoming_contact_requests.clear();
- self.outgoing_contact_requests.clear();
+ pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
+ let (tx, mut rx) = postage::barrier::channel();
+ self.update_contacts_tx
+ .unbounded_send(UpdateContacts::Clear(tx))
+ .unwrap();
+ async move {
+ rx.recv().await;
+ }
}
pub fn get_users(
@@ -77,6 +77,8 @@ pub trait Db: Send + Sync {
) -> Result<Vec<ChannelMessage>>;
#[cfg(test)]
async fn teardown(&self, url: &str);
+ #[cfg(test)]
+ fn as_fake<'a>(&'a self) -> Option<&'a tests::FakeDb>;
}
pub struct PostgresDb {
@@ -291,6 +293,37 @@ impl Db for PostgresDb {
}
}
+ async fn dismiss_contact_request(
+ &self,
+ responder_id: UserId,
+ requester_id: UserId,
+ ) -> Result<()> {
+ let (id_a, id_b, a_to_b) = if responder_id < requester_id {
+ (responder_id, requester_id, false)
+ } else {
+ (requester_id, responder_id, true)
+ };
+
+ let query = "
+ UPDATE contacts
+ SET should_notify = 'f'
+ WHERE user_id_a = $1 AND user_id_b = $2 AND a_to_b = $3;
+ ";
+
+ let result = sqlx::query(query)
+ .bind(id_a.0)
+ .bind(id_b.0)
+ .bind(a_to_b)
+ .execute(&self.pool)
+ .await?;
+
+ if result.rows_affected() == 0 {
+ Err(anyhow!("no such contact request"))?;
+ }
+
+ Ok(())
+ }
+
async fn respond_to_contact_request(
&self,
responder_id: UserId,
@@ -333,37 +366,6 @@ impl Db for PostgresDb {
}
}
- async fn dismiss_contact_request(
- &self,
- responder_id: UserId,
- requester_id: UserId,
- ) -> Result<()> {
- let (id_a, id_b, a_to_b) = if responder_id < requester_id {
- (responder_id, requester_id, false)
- } else {
- (requester_id, responder_id, true)
- };
-
- let query = "
- UPDATE contacts
- SET should_notify = 'f'
- WHERE user_id_a = $1 AND user_id_b = $2 AND a_to_b = $3;
- ";
-
- let result = sqlx::query(query)
- .bind(id_a.0)
- .bind(id_b.0)
- .bind(a_to_b)
- .execute(&self.pool)
- .await?;
-
- if result.rows_affected() == 0 {
- Err(anyhow!("no such contact request"))?;
- }
-
- Ok(())
- }
-
// access tokens
async fn create_access_token_hash(
@@ -620,6 +622,11 @@ impl Db for PostgresDb {
.await
.log_err();
}
+
+ #[cfg(test)]
+ fn as_fake(&self) -> Option<&tests::FakeDb> {
+ None
+ }
}
macro_rules! id_type {
@@ -1108,25 +1115,25 @@ pub mod tests {
pub struct FakeDb {
background: Arc<Background>,
- users: Mutex<BTreeMap<UserId, User>>,
+ pub users: Mutex<BTreeMap<UserId, User>>,
+ pub orgs: Mutex<BTreeMap<OrgId, Org>>,
+ pub org_memberships: Mutex<BTreeMap<(OrgId, UserId), bool>>,
+ pub channels: Mutex<BTreeMap<ChannelId, Channel>>,
+ pub channel_memberships: Mutex<BTreeMap<(ChannelId, UserId), bool>>,
+ pub channel_messages: Mutex<BTreeMap<MessageId, ChannelMessage>>,
+ pub contacts: Mutex<Vec<FakeContact>>,
+ next_channel_message_id: Mutex<i32>,
next_user_id: Mutex<i32>,
- orgs: Mutex<BTreeMap<OrgId, Org>>,
next_org_id: Mutex<i32>,
- org_memberships: Mutex<BTreeMap<(OrgId, UserId), bool>>,
- channels: Mutex<BTreeMap<ChannelId, Channel>>,
next_channel_id: Mutex<i32>,
- channel_memberships: Mutex<BTreeMap<(ChannelId, UserId), bool>>,
- channel_messages: Mutex<BTreeMap<MessageId, ChannelMessage>>,
- next_channel_message_id: Mutex<i32>,
- contacts: Mutex<Vec<FakeContact>>,
}
#[derive(Debug)]
- struct FakeContact {
- requester_id: UserId,
- responder_id: UserId,
- accepted: bool,
- should_notify: bool,
+ pub struct FakeContact {
+ pub requester_id: UserId,
+ pub responder_id: UserId,
+ pub accepted: bool,
+ pub should_notify: bool,
}
impl FakeDb {
@@ -1514,5 +1521,10 @@ pub mod tests {
}
async fn teardown(&self, _: &str) {}
+
+ #[cfg(test)]
+ fn as_fake(&self) -> Option<&FakeDb> {
+ Some(self)
+ }
}
}