@@ -18,7 +18,7 @@ use live_kit_client::{
LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
RemoteVideoTrackUpdate,
};
-use postage::stream::Stream;
+use postage::{sink::Sink, stream::Stream, watch};
use project::Project;
use std::{future::Future, mem, pin::Pin, sync::Arc, time::Duration};
use util::{post_inc, ResultExt, TryFutureExt};
@@ -70,6 +70,8 @@ pub struct Room {
user_store: ModelHandle<UserStore>,
follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
subscriptions: Vec<client::Subscription>,
+ room_update_completed_tx: watch::Sender<Option<()>>,
+ room_update_completed_rx: watch::Receiver<Option<()>>,
pending_room_update: Option<Task<()>>,
maintain_connection: Option<Task<Option<()>>>,
}
@@ -211,6 +213,8 @@ impl Room {
Audio::play_sound(Sound::Joined, cx);
+ let (room_update_completed_tx, room_update_completed_rx) = watch::channel();
+
Self {
id,
channel_id,
@@ -230,6 +234,8 @@ impl Room {
user_store,
follows_by_leader_id_project_id: Default::default(),
maintain_connection: Some(maintain_connection),
+ room_update_completed_tx,
+ room_update_completed_rx,
}
}
@@ -856,6 +862,7 @@ impl Room {
});
this.check_invariants();
+ this.room_update_completed_tx.try_send(Some(())).ok();
cx.notify();
});
}));
@@ -864,6 +871,17 @@ impl Room {
Ok(())
}
+ pub fn next_room_update(&mut self) -> impl Future<Output = ()> {
+ let mut done_rx = self.room_update_completed_rx.clone();
+ async move {
+ while let Some(result) = done_rx.next().await {
+ if result.is_some() {
+ break;
+ }
+ }
+ }
+ }
+
fn remote_video_track_updated(
&mut self,
change: RemoteVideoTrackUpdate,
@@ -22,7 +22,7 @@ use drag_and_drop::DragAndDrop;
use futures::{
channel::{mpsc, oneshot},
future::try_join_all,
- select_biased, FutureExt, StreamExt,
+ FutureExt, StreamExt,
};
use gpui::{
actions,
@@ -36,9 +36,9 @@ use gpui::{
CursorStyle, ModifiersChangedEvent, MouseButton, PathPromptOptions, Platform, PromptLevel,
WindowBounds, WindowOptions,
},
- AnyModelHandle, AnyViewHandle, AnyWeakViewHandle, AppContext, AsyncAppContext, Entity,
- ModelContext, ModelHandle, SizeConstraint, Subscription, Task, View, ViewContext, ViewHandle,
- WeakViewHandle, WindowContext, WindowHandle,
+ AnyModelHandle, AnyViewHandle, AnyWeakViewHandle, AnyWindowHandle, AppContext, AsyncAppContext,
+ Entity, ModelContext, ModelHandle, SizeConstraint, Subscription, Task, View, ViewContext,
+ ViewHandle, WeakViewHandle, WindowContext, WindowHandle,
};
use item::{FollowableItem, FollowableItemHandle, Item, ItemHandle, ProjectItem};
use itertools::Itertools;
@@ -4156,7 +4156,7 @@ pub async fn last_opened_workspace_paths() -> Option<WorkspaceLocation> {
async fn join_channel_internal(
channel_id: u64,
- app_state: Arc<AppState>,
+ app_state: &Arc<AppState>,
requesting_window: Option<WindowHandle<Workspace>>,
active_call: &ModelHandle<ActiveCall>,
cx: &mut AsyncAppContext,
@@ -4196,33 +4196,24 @@ async fn join_channel_internal(
let client = cx.read(|cx| active_call.read(cx).client());
- let mut timer = cx.background().timer(Duration::from_secs(5)).fuse();
let mut client_status = client.status();
+ // this loop will terminate within client::CONNECTION_TIMEOUT seconds.
'outer: loop {
- select_biased! {
- _ = timer => {
- return Err(anyhow!("connecting timed out"))
- },
- status = client_status.recv().fuse() => {
- let Some(status) = status else {
- return Err(anyhow!("unexpected error reading connection status"))
- };
+ let Some(status) = client_status.recv().await else {
+ return Err(anyhow!("error connecting"));
+ };
- match status {
- Status::Connecting | Status::Authenticating | Status::Reconnecting | Status::Reauthenticating => continue,
- Status::Connected { .. } => break 'outer,
- Status::SignedOut => {
- if client.has_keychain_credentials(&cx) {
- client.authenticate_and_connect(true, &cx).await?;
- timer = cx.background().timer(Duration::from_secs(5)).fuse();
- } else {
- return Err(anyhow!("not signed in"))
- }
- },
- Status::UpgradeRequired => return Err(anyhow!("zed is out of date")),
- | Status::ConnectionError | Status::ConnectionLost | Status::ReconnectionError { .. } => return Err(anyhow!("zed is offline"))
- }
+ match status {
+ Status::Connecting
+ | Status::Authenticating
+ | Status::Reconnecting
+ | Status::Reauthenticating => continue,
+ Status::Connected { .. } => break 'outer,
+ Status::SignedOut => return Err(anyhow!("not signed in")),
+ Status::UpgradeRequired => return Err(anyhow!("zed is out of date")),
+ Status::ConnectionError | Status::ConnectionLost | Status::ReconnectionError { .. } => {
+ return Err(anyhow!("zed is offline"))
}
}
}
@@ -4233,6 +4224,8 @@ async fn join_channel_internal(
})
.await?;
+ room.update(cx, |room, cx| room.next_room_update()).await;
+
let task = room.update(cx, |room, cx| {
if let Some((project, host)) = room.most_active_project() {
return Some(join_remote_project(project, host, app_state.clone(), cx));
@@ -4255,10 +4248,10 @@ pub fn join_channel(
cx: &mut AppContext,
) -> Task<Result<()>> {
let active_call = ActiveCall::global(cx);
- cx.spawn(|mut cx| {
+ cx.spawn(|mut cx| async move {
let result = join_channel_internal(
channel_id,
- app_state,
+ &app_state,
requesting_window,
&active_call,
&mut cx,
@@ -4266,7 +4259,7 @@ pub fn join_channel(
.await;
// join channel succeeded, and opened a window
- if Some(true) = result {
+ if matches!(result, Ok(true)) {
return anyhow::Ok(());
}
@@ -4275,28 +4268,53 @@ pub fn join_channel(
}
// find an existing workspace to focus and show call controls
- for window in cx.windows() {
- let found = window.update(&mut cx, |cx| {
- let is_workspace = cx.root_view().clone().downcast::<Workspace>().is_some();
- if is_workspace {
- cx.activate_window();
- }
- is_workspace
- });
+ let mut active_window = activate_any_workspace_window(&mut cx);
+ if active_window.is_none() {
+ // no open workspaces, make one to show the error in (blergh)
+ cx.update(|cx| Workspace::new_local(vec![], app_state.clone(), requesting_window, cx))
+ .await;
+ }
- if found.unwrap_or(false) {
- return anyhow::Ok(());
- }
+ active_window = activate_any_workspace_window(&mut cx);
+ if active_window.is_none() {
+ return result.map(|_| ()); // unreachable!() assuming new_local always opens a window
}
- // no open workspaces
- cx.update(|cx| Workspace::new_local(vec![], app_state.clone(), requesting_window, cx))
- .await;
+ if let Err(err) = result {
+ let prompt = active_window.unwrap().prompt(
+ PromptLevel::Critical,
+ &format!("Failed to join channel: {}", err),
+ &["Ok"],
+ &mut cx,
+ );
+ if let Some(mut prompt) = prompt {
+ prompt.next().await;
+ } else {
+ return Err(err);
+ }
+ }
- return connected.map(|_| ());
+ // return ok, we showed the error to the user.
+ return anyhow::Ok(());
})
}
+pub fn activate_any_workspace_window(cx: &mut AsyncAppContext) -> Option<AnyWindowHandle> {
+ for window in cx.windows() {
+ let found = window.update(cx, |cx| {
+ let is_workspace = cx.root_view().clone().downcast::<Workspace>().is_some();
+ if is_workspace {
+ cx.activate_window();
+ }
+ is_workspace
+ });
+ if found == Some(true) {
+ return Some(window);
+ }
+ }
+ None
+}
+
#[allow(clippy::type_complexity)]
pub fn open_paths(
abs_paths: &[PathBuf],
@@ -8,7 +8,9 @@ use cli::{
ipc::{self, IpcSender},
CliRequest, CliResponse, IpcHandshake, FORCE_CLI_MODE_ENV_VAR_NAME,
};
-use client::{self, TelemetrySettings, UserStore, ZED_APP_VERSION, ZED_SECRET_CLIENT_TOKEN};
+use client::{
+ self, Client, TelemetrySettings, UserStore, ZED_APP_VERSION, ZED_SECRET_CLIENT_TOKEN,
+};
use db::kvp::KEY_VALUE_STORE;
use editor::{scroll::autoscroll::Autoscroll, Editor};
use futures::{
@@ -33,7 +35,7 @@ use std::{
fs::OpenOptions,
io::{IsTerminal, Write as _},
panic,
- path::{Path, PathBuf},
+ path::Path,
sync::{
atomic::{AtomicU32, Ordering},
Arc, Weak,
@@ -222,6 +224,8 @@ fn main() {
}
}
+ let mut triggered_authentication = false;
+
match open_rx.try_next() {
Ok(Some(OpenRequest::Paths { paths })) => {
cx.update(|cx| workspace::open_paths(&paths, &app_state, None, cx))
@@ -231,9 +235,18 @@ fn main() {
cx.spawn(|cx| handle_cli_connection(connection, app_state.clone(), cx))
.detach();
}
- Ok(Some(OpenRequest::JoinChannel { channel_id })) => cx
- .update(|cx| workspace::join_channel(channel_id, app_state.clone(), None, cx))
- .detach_and_log_err(cx),
+ Ok(Some(OpenRequest::JoinChannel { channel_id })) => {
+ triggered_authentication = true;
+ let app_state = app_state.clone();
+ let client = client.clone();
+ cx.spawn(|mut cx| async move {
+ // ignore errors here, we'll show a generic "not signed in"
+ let _ = authenticate(client, &cx).await;
+ cx.update(|cx| workspace::join_channel(channel_id, app_state, None, cx))
+ .await
+ })
+ .detach_and_log_err(cx)
+ }
Ok(None) | Err(_) => cx
.spawn({
let app_state = app_state.clone();
@@ -266,20 +279,24 @@ fn main() {
})
.detach();
- cx.spawn(|cx| async move {
- if stdout_is_a_pty() {
- if client::IMPERSONATE_LOGIN.is_some() {
- client.authenticate_and_connect(false, &cx).await?;
- }
- } else if client.has_keychain_credentials(&cx) {
- client.authenticate_and_connect(true, &cx).await?;
- }
- Ok::<_, anyhow::Error>(())
- })
- .detach_and_log_err(cx);
+ if !triggered_authentication {
+ cx.spawn(|cx| async move { authenticate(client, &cx).await })
+ .detach_and_log_err(cx);
+ }
});
}
+async fn authenticate(client: Arc<Client>, cx: &AsyncAppContext) -> Result<()> {
+ if stdout_is_a_pty() {
+ if client::IMPERSONATE_LOGIN.is_some() {
+ client.authenticate_and_connect(false, &cx).await?;
+ }
+ } else if client.has_keychain_credentials(&cx) {
+ client.authenticate_and_connect(true, &cx).await?;
+ }
+ Ok::<_, anyhow::Error>(())
+}
+
async fn installation_id() -> Result<String> {
let legacy_key_name = "device_id";