@@ -10,7 +10,11 @@ use anyhow::{anyhow, Context, Result};
use client::{proto, Client, PeerId, TypedEnvelope, UserStore};
use clock::ReplicaId;
use collections::{hash_map, BTreeMap, HashMap, HashSet};
-use futures::{future::Shared, AsyncWriteExt, Future, FutureExt, StreamExt, TryFutureExt};
+use futures::{
+ channel::{mpsc, oneshot},
+ future::Shared,
+ AsyncWriteExt, Future, FutureExt, StreamExt, TryFutureExt,
+};
use gpui::{
AnyModelHandle, AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle,
@@ -145,7 +149,7 @@ enum WorktreeHandle {
enum ProjectClientState {
Local {
remote_id: u64,
- metadata_changed: watch::Sender<()>,
+ metadata_changed: mpsc::UnboundedSender<oneshot::Sender<()>>,
_maintain_metadata: Task<()>,
_detect_unshare: Task<Option<()>>,
},
@@ -533,7 +537,7 @@ impl Project {
nonce: StdRng::from_entropy().gen(),
};
for worktree in worktrees {
- this.add_worktree(&worktree, cx);
+ let _ = this.add_worktree(&worktree, cx);
}
this
});
@@ -728,14 +732,22 @@ impl Project {
}
}
- fn metadata_changed(&mut self, cx: &mut ModelContext<Self>) {
+ fn metadata_changed(&mut self, cx: &mut ModelContext<Self>) -> impl Future<Output = ()> {
+ let (tx, rx) = oneshot::channel();
if let Some(ProjectClientState::Local {
metadata_changed, ..
}) = &mut self.client_state
{
- *metadata_changed.borrow_mut() = ();
+ let _ = metadata_changed.unbounded_send(tx);
}
cx.notify();
+
+ async move {
+ // If the project is shared, this will resolve when the `_maintain_metadata` task has
+ // a chance to update the metadata. Otherwise, it will resolve right away because `tx`
+ // will get dropped.
+ let _ = rx.await;
+ }
}
pub fn collaborators(&self) -> &HashMap<PeerId, Collaborator> {
@@ -1025,17 +1037,22 @@ impl Project {
self.client_subscriptions
.push(self.client.add_model_for_remote_entity(project_id, cx));
- self.metadata_changed(cx);
+ let _ = self.metadata_changed(cx);
cx.emit(Event::RemoteIdChanged(Some(project_id)));
cx.notify();
let mut status = self.client.status();
- let (metadata_changed_tx, mut metadata_changed_rx) = watch::channel();
+ let (metadata_changed_tx, mut metadata_changed_rx) = mpsc::unbounded();
self.client_state = Some(ProjectClientState::Local {
remote_id: project_id,
metadata_changed: metadata_changed_tx,
_maintain_metadata: cx.spawn_weak(move |this, cx| async move {
- while let Some(()) = metadata_changed_rx.next().await {
+ while let Some(tx) = metadata_changed_rx.next().await {
+ let mut txs = vec![tx];
+ while let Ok(Some(next_tx)) = metadata_changed_rx.try_next() {
+ txs.push(next_tx);
+ }
+
let Some(this) = this.upgrade(&cx) else { break };
this.read_with(&cx, |this, cx| {
let worktrees = this
@@ -1054,6 +1071,10 @@ impl Project {
})
.await
.log_err();
+
+ for tx in txs {
+ let _ = tx.send(());
+ }
}
}),
_detect_unshare: cx.spawn_weak(move |this, mut cx| {
@@ -1105,7 +1126,7 @@ impl Project {
}
}
- self.metadata_changed(cx);
+ let _ = self.metadata_changed(cx);
cx.notify();
self.client.send(proto::UnshareProject {
project_id: remote_id,
@@ -4162,12 +4183,13 @@ impl Project {
});
let worktree = worktree?;
- let project_id = project.update(&mut cx, |project, cx| {
- project.add_worktree(&worktree, cx);
- project.remote_id()
- });
+ project
+ .update(&mut cx, |project, cx| project.add_worktree(&worktree, cx))
+ .await;
- if let Some(project_id) = project_id {
+ if let Some(project_id) =
+ project.read_with(&cx, |project, _| project.remote_id())
+ {
worktree
.update(&mut cx, |worktree, cx| {
worktree.as_local_mut().unwrap().share(project_id, cx)
@@ -4191,7 +4213,11 @@ impl Project {
})
}
- pub fn remove_worktree(&mut self, id_to_remove: WorktreeId, cx: &mut ModelContext<Self>) {
+ pub fn remove_worktree(
+ &mut self,
+ id_to_remove: WorktreeId,
+ cx: &mut ModelContext<Self>,
+ ) -> impl Future<Output = ()> {
self.worktrees.retain(|worktree| {
if let Some(worktree) = worktree.upgrade(cx) {
let id = worktree.read(cx).id();
@@ -4205,11 +4231,14 @@ impl Project {
false
}
});
- self.metadata_changed(cx);
- cx.notify();
+ self.metadata_changed(cx)
}
- fn add_worktree(&mut self, worktree: &ModelHandle<Worktree>, cx: &mut ModelContext<Self>) {
+ fn add_worktree(
+ &mut self,
+ worktree: &ModelHandle<Worktree>,
+ cx: &mut ModelContext<Self>,
+ ) -> impl Future<Output = ()> {
cx.observe(worktree, |_, _, cx| cx.notify()).detach();
if worktree.read(cx).is_local() {
cx.subscribe(worktree, |this, worktree, event, cx| match event {
@@ -4233,15 +4262,13 @@ impl Project {
.push(WorktreeHandle::Weak(worktree.downgrade()));
}
- self.metadata_changed(cx);
cx.observe_release(worktree, |this, worktree, cx| {
- this.remove_worktree(worktree.id(), cx);
- cx.notify();
+ let _ = this.remove_worktree(worktree.id(), cx);
})
.detach();
cx.emit(Event::WorktreeAdded);
- cx.notify();
+ self.metadata_changed(cx)
}
fn update_local_worktree_buffers(
@@ -4558,11 +4585,11 @@ impl Project {
} else {
let worktree =
Worktree::remote(remote_id, replica_id, worktree, client.clone(), cx);
- this.add_worktree(&worktree, cx);
+ let _ = this.add_worktree(&worktree, cx);
}
}
- this.metadata_changed(cx);
+ let _ = this.metadata_changed(cx);
for (id, _) in old_worktrees_by_id {
cx.emit(Event::WorktreeRemoved(id));
}