.zed.toml 🔗
@@ -0,0 +1 @@
+collaborators = ["nathansobo", "as-cii", "maxbrunsfeld", "iamnbutler"]
Nate created
.zed.toml | 1
gpui/src/color.rs | 8
gpui/src/elements.rs | 33 +
gpui/src/elements/align.rs | 5
gpui/src/elements/container.rs | 5
gpui/src/elements/image.rs | 23
gpui/src/elements/label.rs | 3
gpui/src/elements/line_box.rs | 87 ----
gpui/src/elements/list.rs | 35 +
gpui/src/font_cache.rs | 12
gpui/src/fonts.rs | 12
gpui/src/presenter.rs | 14
server/src/auth.rs | 28 +
server/src/bin/seed.rs | 8
server/src/db.rs | 115 -----
server/src/rpc.rs | 728 ++++++++++++++++-------------------
server/src/rpc/store.rs | 615 ++++++++++++++++++++++++++++++
zed/assets/themes/_base.toml | 40 +
zed/src/channel.rs | 77 ++-
zed/src/editor.rs | 6
zed/src/editor/buffer.rs | 22
zed/src/language.rs | 23
zed/src/lib.rs | 3
zed/src/main.rs | 2
zed/src/menus.rs | 15
zed/src/people_panel.rs | 267 +++++++++++++
zed/src/rpc.rs | 75 +++
zed/src/test.rs | 2
zed/src/theme.rs | 28 +
zed/src/theme/resolution.rs | 33 +
zed/src/user.rs | 233 ++++++++--
zed/src/workspace.rs | 237 ++++++++---
zed/src/worktree.rs | 416 +++++++++++++++-----
zrpc/proto/zed.proto | 52 ++
zrpc/src/proto.rs | 10
35 files changed, 2,340 insertions(+), 933 deletions(-)
@@ -0,0 +1 @@
+collaborators = ["nathansobo", "as-cii", "maxbrunsfeld", "iamnbutler"]
@@ -33,6 +33,14 @@ impl Color {
Self(ColorU::from_u32(0xff0000ff))
}
+ pub fn green() -> Self {
+ Self(ColorU::from_u32(0x00ff00ff))
+ }
+
+ pub fn blue() -> Self {
+ Self(ColorU::from_u32(0x0000ffff))
+ }
+
pub fn new(r: u8, g: u8, b: u8, a: u8) -> Self {
Self(ColorU::new(r, g, b, a))
}
@@ -8,7 +8,6 @@ mod flex;
mod hook;
mod image;
mod label;
-mod line_box;
mod list;
mod mouse_event_handler;
mod overlay;
@@ -19,8 +18,8 @@ mod uniform_list;
pub use self::{
align::*, canvas::*, constrained_box::*, container::*, empty::*, event_handler::*, flex::*,
- hook::*, image::*, label::*, line_box::*, list::*, mouse_event_handler::*, overlay::*,
- stack::*, svg::*, text::*, uniform_list::*,
+ hook::*, image::*, label::*, list::*, mouse_event_handler::*, overlay::*, stack::*, svg::*,
+ text::*, uniform_list::*,
};
pub use crate::presenter::ChildView;
use crate::{
@@ -109,6 +108,34 @@ pub trait Element {
element: Rc::new(RefCell::new(Lifecycle::Init { element: self })),
})
}
+
+ fn constrained(self) -> ConstrainedBox
+ where
+ Self: 'static + Sized,
+ {
+ ConstrainedBox::new(self.boxed())
+ }
+
+ fn aligned(self) -> Align
+ where
+ Self: 'static + Sized,
+ {
+ Align::new(self.boxed())
+ }
+
+ fn contained(self) -> Container
+ where
+ Self: 'static + Sized,
+ {
+ Container::new(self.boxed())
+ }
+
+ fn expanded(self, flex: f32) -> Expanded
+ where
+ Self: 'static + Sized,
+ {
+ Expanded::new(flex, self.boxed())
+ }
}
pub enum Lifecycle<T: Element> {
@@ -25,6 +25,11 @@ impl Align {
self
}
+ pub fn left(mut self) -> Self {
+ self.alignment.set_x(-1.0);
+ self
+ }
+
pub fn right(mut self) -> Self {
self.alignment.set_x(1.0);
self
@@ -57,6 +57,11 @@ impl Container {
self
}
+ pub fn with_margin_right(mut self, margin: f32) -> Self {
+ self.style.margin.right = margin;
+ self
+ }
+
pub fn with_horizontal_padding(mut self, padding: f32) -> Self {
self.style.padding.left = padding;
self.style.padding.right = padding;
@@ -1,6 +1,9 @@
use super::constrain_size_preserving_aspect_ratio;
use crate::{
- geometry::{rect::RectF, vector::Vector2F},
+ geometry::{
+ rect::RectF,
+ vector::{vec2f, Vector2F},
+ },
json::{json, ToJson},
scene, Border, DebugContext, Element, Event, EventContext, ImageData, LayoutContext,
PaintContext, SizeConstraint,
@@ -16,9 +19,13 @@ pub struct Image {
#[derive(Copy, Clone, Default, Deserialize)]
pub struct ImageStyle {
#[serde(default)]
- border: Border,
+ pub border: Border,
#[serde(default)]
- corner_radius: f32,
+ pub corner_radius: f32,
+ #[serde(default)]
+ pub height: Option<f32>,
+ #[serde(default)]
+ pub width: Option<f32>,
}
impl Image {
@@ -44,8 +51,14 @@ impl Element for Image {
constraint: SizeConstraint,
_: &mut LayoutContext,
) -> (Vector2F, Self::LayoutState) {
- let size =
- constrain_size_preserving_aspect_ratio(constraint.max, self.data.size().to_f32());
+ let desired_size = vec2f(
+ self.style.width.unwrap_or(constraint.max.x()),
+ self.style.height.unwrap_or(constraint.max.y()),
+ );
+ let size = constrain_size_preserving_aspect_ratio(
+ constraint.constrain(desired_size),
+ self.data.size().to_f32(),
+ );
(size, ())
}
@@ -137,8 +137,7 @@ impl Element for Label {
let size = vec2f(
line.width().max(constraint.min.x()).min(constraint.max.x()),
cx.font_cache
- .line_height(self.style.text.font_id, self.style.text.font_size)
- .ceil(),
+ .line_height(self.style.text.font_id, self.style.text.font_size),
);
(size, line)
@@ -1,87 +0,0 @@
-use crate::{
- fonts::TextStyle,
- geometry::{
- rect::RectF,
- vector::{vec2f, Vector2F},
- },
- json::{json, ToJson},
- DebugContext, Element, ElementBox, Event, EventContext, LayoutContext, PaintContext,
- SizeConstraint,
-};
-
-pub struct LineBox {
- child: ElementBox,
- style: TextStyle,
-}
-
-impl LineBox {
- pub fn new(child: ElementBox, style: TextStyle) -> Self {
- Self { child, style }
- }
-}
-
-impl Element for LineBox {
- type LayoutState = f32;
- type PaintState = ();
-
- fn layout(
- &mut self,
- constraint: SizeConstraint,
- cx: &mut LayoutContext,
- ) -> (Vector2F, Self::LayoutState) {
- let line_height = cx
- .font_cache
- .line_height(self.style.font_id, self.style.font_size);
- let character_height = cx
- .font_cache
- .ascent(self.style.font_id, self.style.font_size)
- + cx.font_cache
- .descent(self.style.font_id, self.style.font_size);
- let child_max = vec2f(constraint.max.x(), character_height);
- let child_size = self.child.layout(
- SizeConstraint::new(constraint.min.min(child_max), child_max),
- cx,
- );
- let size = vec2f(child_size.x(), line_height);
- (size, (line_height - character_height) / 2.)
- }
-
- fn paint(
- &mut self,
- bounds: RectF,
- visible_bounds: RectF,
- padding_top: &mut f32,
- cx: &mut PaintContext,
- ) -> Self::PaintState {
- self.child.paint(
- bounds.origin() + vec2f(0., *padding_top),
- visible_bounds,
- cx,
- );
- }
-
- fn dispatch_event(
- &mut self,
- event: &Event,
- _: RectF,
- _: &mut Self::LayoutState,
- _: &mut Self::PaintState,
- cx: &mut EventContext,
- ) -> bool {
- self.child.dispatch_event(event, cx)
- }
-
- fn debug(
- &self,
- bounds: RectF,
- _: &Self::LayoutState,
- _: &Self::PaintState,
- cx: &DebugContext,
- ) -> serde_json::Value {
- json!({
- "bounds": bounds.to_json(),
- "style": self.style.to_json(),
- "child": self.child.debug(cx),
- })
- }
-}
@@ -12,6 +12,7 @@ use std::{cell::RefCell, collections::VecDeque, ops::Range, rc::Rc};
pub struct List {
state: ListState,
+ invalidated_elements: Vec<ElementRc>,
}
#[derive(Clone)]
@@ -79,7 +80,10 @@ struct Height(f32);
impl List {
pub fn new(state: ListState) -> Self {
- Self { state }
+ Self {
+ state,
+ invalidated_elements: Default::default(),
+ }
}
}
@@ -258,10 +262,35 @@ impl Element for List {
let mut handled = false;
let mut state = self.state.0.borrow_mut();
- for (mut element, _) in state.visible_elements(bounds, scroll_top) {
- handled = element.dispatch_event(event, cx) || handled;
+ let mut item_origin = bounds.origin() - vec2f(0., scroll_top.offset_in_item);
+ let mut cursor = state.items.cursor::<Count, ()>();
+ let mut new_items = cursor.slice(&Count(scroll_top.item_ix), Bias::Right, &());
+ while let Some(item) = cursor.item() {
+ if item_origin.y() > bounds.max_y() {
+ break;
+ }
+
+ if let ListItem::Rendered(element) = item {
+ let prev_notify_count = cx.notify_count();
+ let mut element = element.clone();
+ handled = element.dispatch_event(event, cx) || handled;
+ item_origin.set_y(item_origin.y() + element.size().y());
+ if cx.notify_count() > prev_notify_count {
+ new_items.push(ListItem::Unrendered, &());
+ self.invalidated_elements.push(element);
+ } else {
+ new_items.push(item.clone(), &());
+ }
+ cursor.next(&());
+ } else {
+ unreachable!();
+ }
}
+ new_items.push_tree(cursor.suffix(&()), &());
+ drop(cursor);
+ state.items = new_items;
+
match event {
Event::ScrollWheel {
position,
@@ -166,6 +166,10 @@ impl FontCache {
self.metric(font_id, |m| m.cap_height) * self.em_scale(font_id, font_size)
}
+ pub fn x_height(&self, font_id: FontId, font_size: f32) -> f32 {
+ self.metric(font_id, |m| m.x_height) * self.em_scale(font_id, font_size)
+ }
+
pub fn ascent(&self, font_id: FontId, font_size: f32) -> f32 {
self.metric(font_id, |m| m.ascent) * self.em_scale(font_id, font_size)
}
@@ -178,6 +182,14 @@ impl FontCache {
font_size / self.metric(font_id, |m| m.units_per_em as f32)
}
+ pub fn baseline_offset(&self, font_id: FontId, font_size: f32) -> f32 {
+ let line_height = self.line_height(font_id, font_size);
+ let ascent = self.ascent(font_id, font_size);
+ let descent = self.descent(font_id, font_size);
+ let padding_top = (line_height - ascent - descent) / 2.;
+ padding_top + ascent
+ }
+
pub fn line_wrapper(self: &Arc<Self>, font_id: FontId, font_size: f32) -> LineWrapperHandle {
let mut state = self.0.write();
let wrappers = state
@@ -132,6 +132,14 @@ impl TextStyle {
font_cache.line_height(self.font_id, self.font_size)
}
+ pub fn cap_height(&self, font_cache: &FontCache) -> f32 {
+ font_cache.cap_height(self.font_id, self.font_size)
+ }
+
+ pub fn x_height(&self, font_cache: &FontCache) -> f32 {
+ font_cache.x_height(self.font_id, self.font_size)
+ }
+
pub fn em_width(&self, font_cache: &FontCache) -> f32 {
font_cache.em_width(self.font_id, self.font_size)
}
@@ -140,6 +148,10 @@ impl TextStyle {
font_cache.metric(self.font_id, |m| m.descent) * self.em_scale(font_cache)
}
+ pub fn baseline_offset(&self, font_cache: &FontCache) -> f32 {
+ font_cache.baseline_offset(self.font_id, self.font_size)
+ }
+
fn em_scale(&self, font_cache: &FontCache) -> f32 {
font_cache.em_scale(self.font_id, self.font_size)
}
@@ -195,6 +195,7 @@ impl Presenter {
text_layout_cache: &self.text_layout_cache,
view_stack: Default::default(),
invalidated_views: Default::default(),
+ notify_count: 0,
app: cx,
}
}
@@ -300,6 +301,7 @@ pub struct EventContext<'a> {
pub font_cache: &'a FontCache,
pub text_layout_cache: &'a TextLayoutCache,
pub app: &'a mut MutableAppContext,
+ pub notify_count: usize,
view_stack: Vec<usize>,
invalidated_views: HashSet<usize>,
}
@@ -325,10 +327,15 @@ impl<'a> EventContext<'a> {
}
pub fn notify(&mut self) {
+ self.notify_count += 1;
if let Some(view_id) = self.view_stack.last() {
self.invalidated_views.insert(*view_id);
}
}
+
+ pub fn notify_count(&self) -> usize {
+ self.notify_count
+ }
}
impl<'a> Deref for EventContext<'a> {
@@ -432,6 +439,13 @@ impl SizeConstraint {
Axis::Vertical => self.min.y(),
}
}
+
+ pub fn constrain(&self, size: Vector2F) -> Vector2F {
+ vec2f(
+ size.x().min(self.max.x()).max(self.min.x()),
+ size.y().min(self.max.y()).max(self.min.y()),
+ )
+ }
}
impl ToJson for SizeConstraint {
@@ -18,7 +18,7 @@ use scrypt::{
use serde::{Deserialize, Serialize};
use std::{borrow::Cow, convert::TryFrom, sync::Arc};
use surf::{StatusCode, Url};
-use tide::Server;
+use tide::{log, Server};
use zrpc::auth as zed_auth;
static CURRENT_GITHUB_USER: &'static str = "current_github_user";
@@ -121,6 +121,7 @@ pub fn add_routes(app: &mut Server<Arc<AppState>>) {
struct NativeAppSignInParams {
native_app_port: String,
native_app_public_key: String,
+ impersonate: Option<String>,
}
async fn get_sign_in(mut request: Request) -> tide::Result {
@@ -142,11 +143,15 @@ async fn get_sign_in(mut request: Request) -> tide::Result {
let app_sign_in_params: Option<NativeAppSignInParams> = request.query().ok();
if let Some(query) = app_sign_in_params {
- redirect_url
- .query_pairs_mut()
+ let mut redirect_query = redirect_url.query_pairs_mut();
+ redirect_query
.clear()
.append_pair("native_app_port", &query.native_app_port)
.append_pair("native_app_public_key", &query.native_app_public_key);
+
+ if let Some(impersonate) = &query.impersonate {
+ redirect_query.append_pair("impersonate", impersonate);
+ }
}
let (auth_url, csrf_token) = request
@@ -222,7 +227,20 @@ async fn get_auth_callback(mut request: Request) -> tide::Result {
// When signing in from the native app, generate a new access token for the current user. Return
// a redirect so that the user's browser sends this access token to the locally-running app.
if let Some((user, app_sign_in_params)) = user.zip(query.native_app_sign_in_params) {
- let access_token = create_access_token(request.db(), user.id).await?;
+ let mut user_id = user.id;
+ if let Some(impersonated_login) = app_sign_in_params.impersonate {
+ log::info!("attempting to impersonate user @{}", impersonated_login);
+ if let Some(user) = request.db().get_users_by_ids([user_id]).await?.first() {
+ if user.admin {
+ user_id = request.db().create_user(&impersonated_login, false).await?;
+ log::info!("impersonating user {}", user_id.0);
+ } else {
+ log::info!("refusing to impersonate user");
+ }
+ }
+ }
+
+ let access_token = create_access_token(request.db(), user_id).await?;
let native_app_public_key =
zed_auth::PublicKey::try_from(app_sign_in_params.native_app_public_key.clone())
.context("failed to parse app public key")?;
@@ -232,7 +250,7 @@ async fn get_auth_callback(mut request: Request) -> tide::Result {
return Ok(tide::Redirect::new(&format!(
"http://127.0.0.1:{}?user_id={}&access_token={}",
- app_sign_in_params.native_app_port, user.id.0, encrypted_access_token,
+ app_sign_in_params.native_app_port, user_id.0, encrypted_access_token,
))
.into());
}
@@ -27,8 +27,12 @@ async fn main() {
let zed_users = ["nathansobo", "maxbrunsfeld", "as-cii", "iamnbutler"];
let mut zed_user_ids = Vec::<UserId>::new();
for zed_user in zed_users {
- if let Some(user_id) = db.get_user(zed_user).await.expect("failed to fetch user") {
- zed_user_ids.push(user_id);
+ if let Some(user) = db
+ .get_user_by_github_login(zed_user)
+ .await
+ .expect("failed to fetch user")
+ {
+ zed_user_ids.push(user.id);
} else {
zed_user_ids.push(
db.create_user(zed_user, true)
@@ -97,27 +97,12 @@ impl Db {
// users
- #[allow(unused)] // Help rust-analyzer
- #[cfg(any(test, feature = "seed-support"))]
- pub async fn get_user(&self, github_login: &str) -> Result<Option<UserId>> {
- test_support!(self, {
- let query = "
- SELECT id
- FROM users
- WHERE github_login = $1
- ";
- sqlx::query_scalar(query)
- .bind(github_login)
- .fetch_optional(&self.pool)
- .await
- })
- }
-
pub async fn create_user(&self, github_login: &str, admin: bool) -> Result<UserId> {
test_support!(self, {
let query = "
INSERT INTO users (github_login, admin)
VALUES ($1, $2)
+ ON CONFLICT (github_login) DO UPDATE SET github_login = excluded.github_login
RETURNING id
";
sqlx::query_scalar(query)
@@ -138,51 +123,17 @@ impl Db {
pub async fn get_users_by_ids(
&self,
- requester_id: UserId,
- ids: impl Iterator<Item = UserId>,
+ ids: impl IntoIterator<Item = UserId>,
) -> Result<Vec<User>> {
- let mut include_requester = false;
- let ids = ids
- .map(|id| {
- if id == requester_id {
- include_requester = true;
- }
- id.0
- })
- .collect::<Vec<_>>();
-
+ let ids = ids.into_iter().map(|id| id.0).collect::<Vec<_>>();
test_support!(self, {
- // Only return users that are in a common channel with the requesting user.
- // Also allow the requesting user to return their own data, even if they aren't
- // in any channels.
let query = "
- SELECT
- users.*
- FROM
- users, channel_memberships
- WHERE
- users.id = ANY ($1) AND
- channel_memberships.user_id = users.id AND
- channel_memberships.channel_id IN (
- SELECT channel_id
- FROM channel_memberships
- WHERE channel_memberships.user_id = $2
- )
- UNION
- SELECT
- users.*
- FROM
- users
- WHERE
- $3 AND users.id = $2
+ SELECT users.*
+ FROM users
+ WHERE users.id = ANY ($1)
";
- sqlx::query_as(query)
- .bind(&ids)
- .bind(requester_id)
- .bind(include_requester)
- .fetch_all(&self.pool)
- .await
+ sqlx::query_as(query).bind(&ids).fetch_all(&self.pool).await
})
}
@@ -613,45 +564,11 @@ pub mod tests {
let friend1 = db.create_user("friend-1", false).await.unwrap();
let friend2 = db.create_user("friend-2", false).await.unwrap();
let friend3 = db.create_user("friend-3", false).await.unwrap();
- let stranger = db.create_user("stranger", false).await.unwrap();
- // A user can read their own info, even if they aren't in any channels.
assert_eq!(
- db.get_users_by_ids(
- user,
- [user, friend1, friend2, friend3, stranger].iter().copied()
- )
- .await
- .unwrap(),
- vec![User {
- id: user,
- github_login: "user".to_string(),
- admin: false,
- },],
- );
-
- // A user can read the info of any other user who is in a shared channel
- // with them.
- let org = db.create_org("test org", "test-org").await.unwrap();
- let chan1 = db.create_org_channel(org, "channel-1").await.unwrap();
- let chan2 = db.create_org_channel(org, "channel-2").await.unwrap();
- let chan3 = db.create_org_channel(org, "channel-3").await.unwrap();
-
- db.add_channel_member(chan1, user, false).await.unwrap();
- db.add_channel_member(chan2, user, false).await.unwrap();
- db.add_channel_member(chan1, friend1, false).await.unwrap();
- db.add_channel_member(chan1, friend2, false).await.unwrap();
- db.add_channel_member(chan2, friend2, false).await.unwrap();
- db.add_channel_member(chan2, friend3, false).await.unwrap();
- db.add_channel_member(chan3, stranger, false).await.unwrap();
-
- assert_eq!(
- db.get_users_by_ids(
- user,
- [user, friend1, friend2, friend3, stranger].iter().copied()
- )
- .await
- .unwrap(),
+ db.get_users_by_ids([user, friend1, friend2, friend3])
+ .await
+ .unwrap(),
vec![
User {
id: user,
@@ -675,18 +592,6 @@ pub mod tests {
}
]
);
-
- // The user's own info is only returned if they request it.
- assert_eq!(
- db.get_users_by_ids(user, [friend1].iter().copied())
- .await
- .unwrap(),
- vec![User {
- id: friend1,
- github_login: "friend-1".to_string(),
- admin: false,
- },]
- )
}
#[gpui::test]
@@ -1,3 +1,5 @@
+mod store;
+
use super::{
auth,
db::{ChannelId, MessageId, UserId},
@@ -11,12 +13,13 @@ use postage::{mpsc, prelude::Sink as _, prelude::Stream as _};
use sha1::{Digest as _, Sha1};
use std::{
any::TypeId,
- collections::{hash_map, HashMap, HashSet},
+ collections::{HashMap, HashSet},
future::Future,
mem,
sync::Arc,
time::Instant,
};
+use store::{JoinedWorktree, Store, Worktree};
use surf::StatusCode;
use tide::log;
use tide::{
@@ -25,13 +28,10 @@ use tide::{
};
use time::OffsetDateTime;
use zrpc::{
- auth::random_token,
proto::{self, AnyTypedEnvelope, EnvelopedMessage},
Connection, ConnectionId, Peer, TypedEnvelope,
};
-type ReplicaId = u16;
-
type MessageHandler = Box<
dyn Send
+ Sync
@@ -40,40 +40,12 @@ type MessageHandler = Box<
pub struct Server {
peer: Arc<Peer>,
- state: RwLock<ServerState>,
+ store: RwLock<Store>,
app_state: Arc<AppState>,
handlers: HashMap<TypeId, MessageHandler>,
notifications: Option<mpsc::Sender<()>>,
}
-#[derive(Default)]
-struct ServerState {
- connections: HashMap<ConnectionId, ConnectionState>,
- pub worktrees: HashMap<u64, Worktree>,
- channels: HashMap<ChannelId, Channel>,
- next_worktree_id: u64,
-}
-
-struct ConnectionState {
- user_id: UserId,
- worktrees: HashSet<u64>,
- channels: HashSet<ChannelId>,
-}
-
-struct Worktree {
- host_connection_id: Option<ConnectionId>,
- guest_connection_ids: HashMap<ConnectionId, ReplicaId>,
- active_replica_ids: HashSet<ReplicaId>,
- access_token: String,
- root_name: String,
- entries: HashMap<u64, proto::Entry>,
-}
-
-#[derive(Default)]
-struct Channel {
- connection_ids: HashSet<ConnectionId>,
-}
-
const MESSAGE_COUNT_PER_PAGE: usize = 100;
const MAX_MESSAGE_LEN: usize = 1024;
@@ -86,17 +58,20 @@ impl Server {
let mut server = Self {
peer,
app_state,
- state: Default::default(),
+ store: Default::default(),
handlers: Default::default(),
notifications,
};
server
.add_handler(Server::ping)
+ .add_handler(Server::open_worktree)
+ .add_handler(Server::close_worktree)
.add_handler(Server::share_worktree)
+ .add_handler(Server::unshare_worktree)
.add_handler(Server::join_worktree)
+ .add_handler(Server::leave_worktree)
.add_handler(Server::update_worktree)
- .add_handler(Server::close_worktree)
.add_handler(Server::open_buffer)
.add_handler(Server::close_buffer)
.add_handler(Server::update_buffer)
@@ -137,11 +112,16 @@ impl Server {
addr: String,
user_id: UserId,
) -> impl Future<Output = ()> {
- let this = self.clone();
+ let mut this = self.clone();
async move {
let (connection_id, handle_io, mut incoming_rx) =
this.peer.add_connection(connection).await;
- this.add_connection(connection_id, user_id).await;
+ this.state_mut()
+ .await
+ .add_connection(connection_id, user_id);
+ if let Err(err) = this.update_collaborators_for_users(&[user_id]).await {
+ log::error!("error updating collaborators for {:?}: {}", user_id, err);
+ }
let handle_io = handle_io.fuse();
futures::pin_mut!(handle_io);
@@ -186,78 +166,122 @@ impl Server {
}
}
- async fn sign_out(self: &Arc<Self>, connection_id: zrpc::ConnectionId) -> tide::Result<()> {
+ async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
self.peer.disconnect(connection_id).await;
- let worktree_ids = self.remove_connection(connection_id).await;
- for worktree_id in worktree_ids {
- let state = self.state.read().await;
- if let Some(worktree) = state.worktrees.get(&worktree_id) {
- broadcast(connection_id, worktree.connection_ids(), |conn_id| {
- self.peer.send(
- conn_id,
- proto::RemovePeer {
- worktree_id,
- peer_id: connection_id.0,
- },
- )
- })
+ let removed_connection = self.state_mut().await.remove_connection(connection_id)?;
+
+ for (worktree_id, worktree) in removed_connection.hosted_worktrees {
+ if let Some(share) = worktree.share {
+ broadcast(
+ connection_id,
+ share.guest_connection_ids.keys().copied().collect(),
+ |conn_id| {
+ self.peer
+ .send(conn_id, proto::UnshareWorktree { worktree_id })
+ },
+ )
.await?;
}
}
+
+ for (worktree_id, peer_ids) in removed_connection.guest_worktree_ids {
+ broadcast(connection_id, peer_ids, |conn_id| {
+ self.peer.send(
+ conn_id,
+ proto::RemovePeer {
+ worktree_id,
+ peer_id: connection_id.0,
+ },
+ )
+ })
+ .await?;
+ }
+
+ self.update_collaborators_for_users(removed_connection.collaborator_ids.iter())
+ .await?;
+
Ok(())
}
- // Add a new connection associated with a given user.
- async fn add_connection(&self, connection_id: ConnectionId, user_id: UserId) {
- self.state.write().await.connections.insert(
- connection_id,
- ConnectionState {
- user_id,
- worktrees: Default::default(),
- channels: Default::default(),
- },
- );
+ async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
+ self.peer.respond(request.receipt(), proto::Ack {}).await?;
+ Ok(())
}
- // Remove the given connection and its association with any worktrees.
- async fn remove_connection(&self, connection_id: ConnectionId) -> Vec<u64> {
- let mut worktree_ids = Vec::new();
- let mut state = self.state.write().await;
- if let Some(connection) = state.connections.remove(&connection_id) {
- for channel_id in connection.channels {
- if let Some(channel) = state.channels.get_mut(&channel_id) {
- channel.connection_ids.remove(&connection_id);
+ async fn open_worktree(
+ mut self: Arc<Server>,
+ request: TypedEnvelope<proto::OpenWorktree>,
+ ) -> tide::Result<()> {
+ let receipt = request.receipt();
+ let host_user_id = self
+ .state()
+ .await
+ .user_id_for_connection(request.sender_id)?;
+
+ let mut collaborator_user_ids = HashSet::new();
+ collaborator_user_ids.insert(host_user_id);
+ for github_login in request.payload.collaborator_logins {
+ match self.app_state.db.create_user(&github_login, false).await {
+ Ok(collaborator_user_id) => {
+ collaborator_user_ids.insert(collaborator_user_id);
}
- }
- for worktree_id in connection.worktrees {
- if let Some(worktree) = state.worktrees.get_mut(&worktree_id) {
- if worktree.host_connection_id == Some(connection_id) {
- worktree_ids.push(worktree_id);
- } else if let Some(replica_id) =
- worktree.guest_connection_ids.remove(&connection_id)
- {
- worktree.active_replica_ids.remove(&replica_id);
- worktree_ids.push(worktree_id);
- }
+ Err(err) => {
+ let message = err.to_string();
+ self.peer
+ .respond_with_error(receipt, proto::Error { message })
+ .await?;
+ return Ok(());
}
}
}
- worktree_ids
+
+ let collaborator_user_ids = collaborator_user_ids.into_iter().collect::<Vec<_>>();
+ let worktree_id = self.state_mut().await.add_worktree(Worktree {
+ host_connection_id: request.sender_id,
+ collaborator_user_ids: collaborator_user_ids.clone(),
+ root_name: request.payload.root_name,
+ share: None,
+ });
+
+ self.peer
+ .respond(receipt, proto::OpenWorktreeResponse { worktree_id })
+ .await?;
+ self.update_collaborators_for_users(&collaborator_user_ids)
+ .await?;
+
+ Ok(())
}
- async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
- self.peer.respond(request.receipt(), proto::Ack {}).await?;
+ async fn close_worktree(
+ mut self: Arc<Server>,
+ request: TypedEnvelope<proto::CloseWorktree>,
+ ) -> tide::Result<()> {
+ let worktree_id = request.payload.worktree_id;
+ let worktree = self
+ .state_mut()
+ .await
+ .remove_worktree(worktree_id, request.sender_id)?;
+
+ if let Some(share) = worktree.share {
+ broadcast(
+ request.sender_id,
+ share.guest_connection_ids.keys().copied().collect(),
+ |conn_id| {
+ self.peer
+ .send(conn_id, proto::UnshareWorktree { worktree_id })
+ },
+ )
+ .await?;
+ }
+ self.update_collaborators_for_users(&worktree.collaborator_user_ids)
+ .await?;
Ok(())
}
async fn share_worktree(
- self: Arc<Server>,
+ mut self: Arc<Server>,
mut request: TypedEnvelope<proto::ShareWorktree>,
) -> tide::Result<()> {
- let mut state = self.state.write().await;
- let worktree_id = state.next_worktree_id;
- state.next_worktree_id += 1;
- let access_token = random_token();
let worktree = request
.payload
.worktree
@@ -267,148 +291,169 @@ impl Server {
.into_iter()
.map(|entry| (entry.id, entry))
.collect();
- state.worktrees.insert(
- worktree_id,
- Worktree {
- host_connection_id: Some(request.sender_id),
- guest_connection_ids: Default::default(),
- active_replica_ids: Default::default(),
- access_token: access_token.clone(),
- root_name: mem::take(&mut worktree.root_name),
- entries,
- },
- );
- self.peer
- .respond(
- request.receipt(),
- proto::ShareWorktreeResponse {
- worktree_id,
- access_token,
- },
- )
+ let collaborator_user_ids =
+ self.state_mut()
+ .await
+ .share_worktree(worktree.id, request.sender_id, entries);
+ if let Some(collaborator_user_ids) = collaborator_user_ids {
+ self.peer
+ .respond(request.receipt(), proto::ShareWorktreeResponse {})
+ .await?;
+ self.update_collaborators_for_users(&collaborator_user_ids)
+ .await?;
+ } else {
+ self.peer
+ .respond_with_error(
+ request.receipt(),
+ proto::Error {
+ message: "no such worktree".to_string(),
+ },
+ )
+ .await?;
+ }
+ Ok(())
+ }
+
+ async fn unshare_worktree(
+ mut self: Arc<Server>,
+ request: TypedEnvelope<proto::UnshareWorktree>,
+ ) -> tide::Result<()> {
+ let worktree_id = request.payload.worktree_id;
+ let worktree = self
+ .state_mut()
+ .await
+ .unshare_worktree(worktree_id, request.sender_id)?;
+
+ broadcast(request.sender_id, worktree.connection_ids, |conn_id| {
+ self.peer
+ .send(conn_id, proto::UnshareWorktree { worktree_id })
+ })
+ .await?;
+ self.update_collaborators_for_users(&worktree.collaborator_ids)
.await?;
+
Ok(())
}
async fn join_worktree(
- self: Arc<Server>,
- request: TypedEnvelope<proto::OpenWorktree>,
+ mut self: Arc<Server>,
+ request: TypedEnvelope<proto::JoinWorktree>,
) -> tide::Result<()> {
let worktree_id = request.payload.worktree_id;
- let access_token = &request.payload.access_token;
+ let user_id = self
+ .state()
+ .await
+ .user_id_for_connection(request.sender_id)?;
- let mut state = self.state.write().await;
- if let Some((peer_replica_id, worktree)) =
- state.join_worktree(request.sender_id, worktree_id, access_token)
- {
- let mut peers = Vec::new();
- if let Some(host_connection_id) = worktree.host_connection_id {
+ let mut state = self.state_mut().await;
+ match state.join_worktree(request.sender_id, user_id, worktree_id) {
+ Ok(JoinedWorktree {
+ replica_id,
+ worktree,
+ }) => {
+ let share = worktree.share()?;
+ let peer_count = share.guest_connection_ids.len();
+ let mut peers = Vec::with_capacity(peer_count);
peers.push(proto::Peer {
- peer_id: host_connection_id.0,
+ peer_id: worktree.host_connection_id.0,
replica_id: 0,
});
- }
- for (peer_conn_id, peer_replica_id) in &worktree.guest_connection_ids {
- if *peer_conn_id != request.sender_id {
- peers.push(proto::Peer {
- peer_id: peer_conn_id.0,
- replica_id: *peer_replica_id as u32,
- });
+ for (peer_conn_id, peer_replica_id) in &share.guest_connection_ids {
+ if *peer_conn_id != request.sender_id {
+ peers.push(proto::Peer {
+ peer_id: peer_conn_id.0,
+ replica_id: *peer_replica_id as u32,
+ });
+ }
}
+ let response = proto::JoinWorktreeResponse {
+ worktree: Some(proto::Worktree {
+ id: worktree_id,
+ root_name: worktree.root_name.clone(),
+ entries: share.entries.values().cloned().collect(),
+ }),
+ replica_id: replica_id as u32,
+ peers,
+ };
+ let connection_ids = worktree.connection_ids();
+ let collaborator_user_ids = worktree.collaborator_user_ids.clone();
+ drop(state);
+
+ broadcast(request.sender_id, connection_ids, |conn_id| {
+ self.peer.send(
+ conn_id,
+ proto::AddPeer {
+ worktree_id,
+ peer: Some(proto::Peer {
+ peer_id: request.sender_id.0,
+ replica_id: response.replica_id,
+ }),
+ },
+ )
+ })
+ .await?;
+ self.peer.respond(request.receipt(), response).await?;
+ self.update_collaborators_for_users(&collaborator_user_ids)
+ .await?;
+ }
+ Err(error) => {
+ drop(state);
+ self.peer
+ .respond_with_error(
+ request.receipt(),
+ proto::Error {
+ message: error.to_string(),
+ },
+ )
+ .await?;
}
+ }
- broadcast(request.sender_id, worktree.connection_ids(), |conn_id| {
+ Ok(())
+ }
+
+ async fn leave_worktree(
+ mut self: Arc<Server>,
+ request: TypedEnvelope<proto::LeaveWorktree>,
+ ) -> tide::Result<()> {
+ let sender_id = request.sender_id;
+ let worktree_id = request.payload.worktree_id;
+ let worktree = self
+ .state_mut()
+ .await
+ .leave_worktree(sender_id, worktree_id);
+ if let Some(worktree) = worktree {
+ broadcast(sender_id, worktree.connection_ids, |conn_id| {
self.peer.send(
conn_id,
- proto::AddPeer {
+ proto::RemovePeer {
worktree_id,
- peer: Some(proto::Peer {
- peer_id: request.sender_id.0,
- replica_id: peer_replica_id as u32,
- }),
+ peer_id: sender_id.0,
},
)
})
.await?;
- self.peer
- .respond(
- request.receipt(),
- proto::OpenWorktreeResponse {
- worktree_id,
- worktree: Some(proto::Worktree {
- root_name: worktree.root_name.clone(),
- entries: worktree.entries.values().cloned().collect(),
- }),
- replica_id: peer_replica_id as u32,
- peers,
- },
- )
- .await?;
- } else {
- self.peer
- .respond(
- request.receipt(),
- proto::OpenWorktreeResponse {
- worktree_id,
- worktree: None,
- replica_id: 0,
- peers: Vec::new(),
- },
- )
+ self.update_collaborators_for_users(&worktree.collaborator_ids)
.await?;
}
-
Ok(())
}
async fn update_worktree(
- self: Arc<Server>,
+ mut self: Arc<Server>,
request: TypedEnvelope<proto::UpdateWorktree>,
) -> tide::Result<()> {
- {
- let mut state = self.state.write().await;
- let worktree = state.write_worktree(request.payload.worktree_id, request.sender_id)?;
- for entry_id in &request.payload.removed_entries {
- worktree.entries.remove(&entry_id);
- }
-
- for entry in &request.payload.updated_entries {
- worktree.entries.insert(entry.id, entry.clone());
- }
- }
-
- self.broadcast_in_worktree(request.payload.worktree_id, &request)
- .await?;
- Ok(())
- }
-
- async fn close_worktree(
- self: Arc<Server>,
- request: TypedEnvelope<proto::CloseWorktree>,
- ) -> tide::Result<()> {
- let connection_ids;
- {
- let mut state = self.state.write().await;
- let worktree = state.write_worktree(request.payload.worktree_id, request.sender_id)?;
- connection_ids = worktree.connection_ids();
- if worktree.host_connection_id == Some(request.sender_id) {
- worktree.host_connection_id = None;
- } else if let Some(replica_id) =
- worktree.guest_connection_ids.remove(&request.sender_id)
- {
- worktree.active_replica_ids.remove(&replica_id);
- }
- }
-
- broadcast(request.sender_id, connection_ids, |conn_id| {
- self.peer.send(
- conn_id,
- proto::RemovePeer {
- worktree_id: request.payload.worktree_id,
- peer_id: request.sender_id.0,
- },
- )
+ let connection_ids = self.state_mut().await.update_worktree(
+ request.sender_id,
+ request.payload.worktree_id,
+ &request.payload.removed_entries,
+ &request.payload.updated_entries,
+ )?;
+
+ broadcast(request.sender_id, connection_ids, |connection_id| {
+ self.peer
+ .forward_send(request.sender_id, connection_id, request.payload.clone())
})
.await?;
@@ -420,14 +465,10 @@ impl Server {
request: TypedEnvelope<proto::OpenBuffer>,
) -> tide::Result<()> {
let receipt = request.receipt();
- let worktree_id = request.payload.worktree_id;
let host_connection_id = self
- .state
- .read()
+ .state()
.await
- .read_worktree(worktree_id, request.sender_id)?
- .host_connection_id()?;
-
+ .worktree_host_connection_id(request.sender_id, request.payload.worktree_id)?;
let response = self
.peer
.forward_request(request.sender_id, host_connection_id, request.payload)
@@ -441,16 +482,12 @@ impl Server {
request: TypedEnvelope<proto::CloseBuffer>,
) -> tide::Result<()> {
let host_connection_id = self
- .state
- .read()
+ .state()
.await
- .read_worktree(request.payload.worktree_id, request.sender_id)?
- .host_connection_id()?;
-
+ .worktree_host_connection_id(request.sender_id, request.payload.worktree_id)?;
self.peer
.forward_send(request.sender_id, host_connection_id, request.payload)
.await?;
-
Ok(())
}
@@ -461,14 +498,11 @@ impl Server {
let host;
let guests;
{
- let state = self.state.read().await;
- let worktree = state.read_worktree(request.payload.worktree_id, request.sender_id)?;
- host = worktree.host_connection_id()?;
- guests = worktree
- .guest_connection_ids
- .keys()
- .copied()
- .collect::<Vec<_>>();
+ let state = self.state().await;
+ host = state
+ .worktree_host_connection_id(request.sender_id, request.payload.worktree_id)?;
+ guests = state
+ .worktree_guest_connection_ids(request.sender_id, request.payload.worktree_id)?;
}
let sender = request.sender_id;
@@ -498,8 +532,17 @@ impl Server {
self: Arc<Server>,
request: TypedEnvelope<proto::UpdateBuffer>,
) -> tide::Result<()> {
- self.broadcast_in_worktree(request.payload.worktree_id, &request)
- .await?;
+ broadcast(
+ request.sender_id,
+ self.state()
+ .await
+ .worktree_connection_ids(request.sender_id, request.payload.worktree_id)?,
+ |connection_id| {
+ self.peer
+ .forward_send(request.sender_id, connection_id, request.payload.clone())
+ },
+ )
+ .await?;
self.peer.respond(request.receipt(), proto::Ack {}).await?;
Ok(())
}
@@ -508,8 +551,19 @@ impl Server {
self: Arc<Server>,
request: TypedEnvelope<proto::BufferSaved>,
) -> tide::Result<()> {
- self.broadcast_in_worktree(request.payload.worktree_id, &request)
- .await
+ broadcast(
+ request.sender_id,
+ self.store
+ .read()
+ .await
+ .worktree_connection_ids(request.sender_id, request.payload.worktree_id)?,
+ |connection_id| {
+ self.peer
+ .forward_send(request.sender_id, connection_id, request.payload.clone())
+ },
+ )
+ .await?;
+ Ok(())
}
async fn get_channels(
@@ -517,8 +571,7 @@ impl Server {
request: TypedEnvelope<proto::GetChannels>,
) -> tide::Result<()> {
let user_id = self
- .state
- .read()
+ .state()
.await
.user_id_for_connection(request.sender_id)?;
let channels = self.app_state.db.get_accessible_channels(user_id).await?;
@@ -543,17 +596,12 @@ impl Server {
self: Arc<Server>,
request: TypedEnvelope<proto::GetUsers>,
) -> tide::Result<()> {
- let user_id = self
- .state
- .read()
- .await
- .user_id_for_connection(request.sender_id)?;
let receipt = request.receipt();
let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
let users = self
.app_state
.db
- .get_users_by_ids(user_id, user_ids)
+ .get_users_by_ids(user_ids)
.await?
.into_iter()
.map(|user| proto::User {
@@ -568,13 +616,37 @@ impl Server {
Ok(())
}
+ async fn update_collaborators_for_users<'a>(
+ self: &Arc<Server>,
+ user_ids: impl IntoIterator<Item = &'a UserId>,
+ ) -> tide::Result<()> {
+ let mut send_futures = Vec::new();
+
+ let state = self.state().await;
+ for user_id in user_ids {
+ let collaborators = state.collaborators_for_user(*user_id);
+ for connection_id in state.connection_ids_for_user(*user_id) {
+ send_futures.push(self.peer.send(
+ connection_id,
+ proto::UpdateCollaborators {
+ collaborators: collaborators.clone(),
+ },
+ ));
+ }
+ }
+
+ drop(state);
+ futures::future::try_join_all(send_futures).await?;
+
+ Ok(())
+ }
+
async fn join_channel(
- self: Arc<Self>,
+ mut self: Arc<Self>,
request: TypedEnvelope<proto::JoinChannel>,
) -> tide::Result<()> {
let user_id = self
- .state
- .read()
+ .state()
.await
.user_id_for_connection(request.sender_id)?;
let channel_id = ChannelId::from_proto(request.payload.channel_id);
@@ -587,8 +659,7 @@ impl Server {
Err(anyhow!("access denied"))?;
}
- self.state
- .write()
+ self.state_mut()
.await
.join_channel(request.sender_id, channel_id);
let messages = self
@@ -618,12 +689,11 @@ impl Server {
}
async fn leave_channel(
- self: Arc<Self>,
+ mut self: Arc<Self>,
request: TypedEnvelope<proto::LeaveChannel>,
) -> tide::Result<()> {
let user_id = self
- .state
- .read()
+ .state()
.await
.user_id_for_connection(request.sender_id)?;
let channel_id = ChannelId::from_proto(request.payload.channel_id);
@@ -636,8 +706,7 @@ impl Server {
Err(anyhow!("access denied"))?;
}
- self.state
- .write()
+ self.state_mut()
.await
.leave_channel(request.sender_id, channel_id);
@@ -653,10 +722,10 @@ impl Server {
let user_id;
let connection_ids;
{
- let state = self.state.read().await;
+ let state = self.state().await;
user_id = state.user_id_for_connection(request.sender_id)?;
- if let Some(channel) = state.channels.get(&channel_id) {
- connection_ids = channel.connection_ids();
+ if let Some(ids) = state.channel_connection_ids(channel_id) {
+ connection_ids = ids;
} else {
return Ok(());
}
@@ -741,8 +810,7 @@ impl Server {
request: TypedEnvelope<proto::GetChannelMessages>,
) -> tide::Result<()> {
let user_id = self
- .state
- .read()
+ .state()
.await
.user_id_for_connection(request.sender_id)?;
let channel_id = ChannelId::from_proto(request.payload.channel_id);
@@ -785,25 +853,16 @@ impl Server {
Ok(())
}
- async fn broadcast_in_worktree<T: proto::EnvelopedMessage>(
- &self,
- worktree_id: u64,
- message: &TypedEnvelope<T>,
- ) -> tide::Result<()> {
- let connection_ids = self
- .state
- .read()
- .await
- .read_worktree(worktree_id, message.sender_id)?
- .connection_ids();
-
- broadcast(message.sender_id, connection_ids, |conn_id| {
- self.peer
- .forward_send(message.sender_id, conn_id, message.payload.clone())
- })
- .await?;
+ fn state<'a>(
+ self: &'a Arc<Self>,
+ ) -> impl Future<Output = async_std::sync::RwLockReadGuard<'a, Store>> {
+ self.store.read()
+ }
- Ok(())
+ fn state_mut<'a>(
+ self: &'a mut Arc<Self>,
+ ) -> impl Future<Output = async_std::sync::RwLockWriteGuard<'a, Store>> {
+ self.store.write()
}
}
@@ -824,137 +883,6 @@ where
Ok(())
}
-impl ServerState {
- fn join_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
- if let Some(connection) = self.connections.get_mut(&connection_id) {
- connection.channels.insert(channel_id);
- self.channels
- .entry(channel_id)
- .or_default()
- .connection_ids
- .insert(connection_id);
- }
- }
-
- fn leave_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
- if let Some(connection) = self.connections.get_mut(&connection_id) {
- connection.channels.remove(&channel_id);
- if let hash_map::Entry::Occupied(mut entry) = self.channels.entry(channel_id) {
- entry.get_mut().connection_ids.remove(&connection_id);
- if entry.get_mut().connection_ids.is_empty() {
- entry.remove();
- }
- }
- }
- }
-
- fn user_id_for_connection(&self, connection_id: ConnectionId) -> tide::Result<UserId> {
- Ok(self
- .connections
- .get(&connection_id)
- .ok_or_else(|| anyhow!("unknown connection"))?
- .user_id)
- }
-
- // Add the given connection as a guest of the given worktree
- fn join_worktree(
- &mut self,
- connection_id: ConnectionId,
- worktree_id: u64,
- access_token: &str,
- ) -> Option<(ReplicaId, &Worktree)> {
- if let Some(worktree) = self.worktrees.get_mut(&worktree_id) {
- if access_token == worktree.access_token {
- if let Some(connection) = self.connections.get_mut(&connection_id) {
- connection.worktrees.insert(worktree_id);
- }
-
- let mut replica_id = 1;
- while worktree.active_replica_ids.contains(&replica_id) {
- replica_id += 1;
- }
- worktree.active_replica_ids.insert(replica_id);
- worktree
- .guest_connection_ids
- .insert(connection_id, replica_id);
- Some((replica_id, worktree))
- } else {
- None
- }
- } else {
- None
- }
- }
-
- fn read_worktree(
- &self,
- worktree_id: u64,
- connection_id: ConnectionId,
- ) -> tide::Result<&Worktree> {
- let worktree = self
- .worktrees
- .get(&worktree_id)
- .ok_or_else(|| anyhow!("worktree not found"))?;
-
- if worktree.host_connection_id == Some(connection_id)
- || worktree.guest_connection_ids.contains_key(&connection_id)
- {
- Ok(worktree)
- } else {
- Err(anyhow!(
- "{} is not a member of worktree {}",
- connection_id,
- worktree_id
- ))?
- }
- }
-
- fn write_worktree(
- &mut self,
- worktree_id: u64,
- connection_id: ConnectionId,
- ) -> tide::Result<&mut Worktree> {
- let worktree = self
- .worktrees
- .get_mut(&worktree_id)
- .ok_or_else(|| anyhow!("worktree not found"))?;
-
- if worktree.host_connection_id == Some(connection_id)
- || worktree.guest_connection_ids.contains_key(&connection_id)
- {
- Ok(worktree)
- } else {
- Err(anyhow!(
- "{} is not a member of worktree {}",
- connection_id,
- worktree_id
- ))?
- }
- }
-}
-
-impl Worktree {
- pub fn connection_ids(&self) -> Vec<ConnectionId> {
- self.guest_connection_ids
- .keys()
- .copied()
- .chain(self.host_connection_id)
- .collect()
- }
-
- fn host_connection_id(&self) -> tide::Result<ConnectionId> {
- Ok(self
- .host_connection_id
- .ok_or_else(|| anyhow!("host disconnected from worktree"))?)
- }
-}
-
-impl Channel {
- fn connection_ids(&self) -> Vec<ConnectionId> {
- self.connection_ids.iter().copied().collect()
- }
-}
-
pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
let server = Server::new(app.state().clone(), rpc.clone(), None);
app.at("/rpc").with(auth::VerifyToken).get(move |request: Request<Arc<AppState>>| {
@@ -1022,7 +950,7 @@ mod tests {
github, AppState, Config,
};
use async_std::{sync::RwLockReadGuard, task};
- use gpui::TestAppContext;
+ use gpui::{ModelHandle, TestAppContext};
use parking_lot::Mutex;
use postage::{mpsc, watch};
use serde_json::json;
@@ -1040,10 +968,12 @@ mod tests {
editor::{Editor, EditorStyle, Insert},
fs::{FakeFs, Fs as _},
language::LanguageRegistry,
+ people_panel::JoinWorktree,
rpc::{self, Client, Credentials, EstablishConnectionError},
settings,
test::FakeHttpClient,
user::UserStore,
+ workspace::Workspace,
worktree::Worktree,
};
use zrpc::Peer;
@@ -1066,15 +996,17 @@ mod tests {
fs.insert_tree(
"/a",
json!({
+ ".zed.toml": r#"collaborators = ["user_b"]"#,
"a.txt": "a-contents",
"b.txt": "b-contents",
}),
)
.await;
let worktree_a = Worktree::open_local(
+ client_a.clone(),
"/a".as_ref(),
- lang_registry.clone(),
fs,
+ lang_registry.clone(),
&mut cx_a.to_async(),
)
.await
@@ -0,0 +1,615 @@
+use crate::db::{ChannelId, UserId};
+use anyhow::anyhow;
+use std::collections::{hash_map, HashMap, HashSet};
+use zrpc::{proto, ConnectionId};
+
+#[derive(Default)]
+pub struct Store {
+ connections: HashMap<ConnectionId, ConnectionState>,
+ connections_by_user_id: HashMap<UserId, HashSet<ConnectionId>>,
+ worktrees: HashMap<u64, Worktree>,
+ visible_worktrees_by_user_id: HashMap<UserId, HashSet<u64>>,
+ channels: HashMap<ChannelId, Channel>,
+ next_worktree_id: u64,
+}
+
+struct ConnectionState {
+ user_id: UserId,
+ worktrees: HashSet<u64>,
+ channels: HashSet<ChannelId>,
+}
+
+pub struct Worktree {
+ pub host_connection_id: ConnectionId,
+ pub collaborator_user_ids: Vec<UserId>,
+ pub root_name: String,
+ pub share: Option<WorktreeShare>,
+}
+
+pub struct WorktreeShare {
+ pub guest_connection_ids: HashMap<ConnectionId, ReplicaId>,
+ pub active_replica_ids: HashSet<ReplicaId>,
+ pub entries: HashMap<u64, proto::Entry>,
+}
+
+#[derive(Default)]
+pub struct Channel {
+ pub connection_ids: HashSet<ConnectionId>,
+}
+
+pub type ReplicaId = u16;
+
+#[derive(Default)]
+pub struct RemovedConnectionState {
+ pub hosted_worktrees: HashMap<u64, Worktree>,
+ pub guest_worktree_ids: HashMap<u64, Vec<ConnectionId>>,
+ pub collaborator_ids: HashSet<UserId>,
+}
+
+pub struct JoinedWorktree<'a> {
+ pub replica_id: ReplicaId,
+ pub worktree: &'a Worktree,
+}
+
+pub struct UnsharedWorktree {
+ pub connection_ids: Vec<ConnectionId>,
+ pub collaborator_ids: Vec<UserId>,
+}
+
+pub struct LeftWorktree {
+ pub connection_ids: Vec<ConnectionId>,
+ pub collaborator_ids: Vec<UserId>,
+}
+
+impl Store {
+ pub fn add_connection(&mut self, connection_id: ConnectionId, user_id: UserId) {
+ self.connections.insert(
+ connection_id,
+ ConnectionState {
+ user_id,
+ worktrees: Default::default(),
+ channels: Default::default(),
+ },
+ );
+ self.connections_by_user_id
+ .entry(user_id)
+ .or_default()
+ .insert(connection_id);
+ }
+
+ pub fn remove_connection(
+ &mut self,
+ connection_id: ConnectionId,
+ ) -> tide::Result<RemovedConnectionState> {
+ let connection = if let Some(connection) = self.connections.remove(&connection_id) {
+ connection
+ } else {
+ return Err(anyhow!("no such connection"))?;
+ };
+
+ for channel_id in &connection.channels {
+ if let Some(channel) = self.channels.get_mut(&channel_id) {
+ channel.connection_ids.remove(&connection_id);
+ }
+ }
+
+ let user_connections = self
+ .connections_by_user_id
+ .get_mut(&connection.user_id)
+ .unwrap();
+ user_connections.remove(&connection_id);
+ if user_connections.is_empty() {
+ self.connections_by_user_id.remove(&connection.user_id);
+ }
+
+ let mut result = RemovedConnectionState::default();
+ for worktree_id in connection.worktrees.clone() {
+ if let Ok(worktree) = self.remove_worktree(worktree_id, connection_id) {
+ result
+ .collaborator_ids
+ .extend(worktree.collaborator_user_ids.iter().copied());
+ result.hosted_worktrees.insert(worktree_id, worktree);
+ } else if let Some(worktree) = self.leave_worktree(connection_id, worktree_id) {
+ result
+ .guest_worktree_ids
+ .insert(worktree_id, worktree.connection_ids);
+ result.collaborator_ids.extend(worktree.collaborator_ids);
+ }
+ }
+
+ #[cfg(test)]
+ self.check_invariants();
+
+ Ok(result)
+ }
+
+ #[cfg(test)]
+ pub fn channel(&self, id: ChannelId) -> Option<&Channel> {
+ self.channels.get(&id)
+ }
+
+ pub fn join_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
+ if let Some(connection) = self.connections.get_mut(&connection_id) {
+ connection.channels.insert(channel_id);
+ self.channels
+ .entry(channel_id)
+ .or_default()
+ .connection_ids
+ .insert(connection_id);
+ }
+ }
+
+ pub fn leave_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
+ if let Some(connection) = self.connections.get_mut(&connection_id) {
+ connection.channels.remove(&channel_id);
+ if let hash_map::Entry::Occupied(mut entry) = self.channels.entry(channel_id) {
+ entry.get_mut().connection_ids.remove(&connection_id);
+ if entry.get_mut().connection_ids.is_empty() {
+ entry.remove();
+ }
+ }
+ }
+ }
+
+ pub fn user_id_for_connection(&self, connection_id: ConnectionId) -> tide::Result<UserId> {
+ Ok(self
+ .connections
+ .get(&connection_id)
+ .ok_or_else(|| anyhow!("unknown connection"))?
+ .user_id)
+ }
+
+ pub fn connection_ids_for_user<'a>(
+ &'a self,
+ user_id: UserId,
+ ) -> impl 'a + Iterator<Item = ConnectionId> {
+ self.connections_by_user_id
+ .get(&user_id)
+ .into_iter()
+ .flatten()
+ .copied()
+ }
+
+ pub fn collaborators_for_user(&self, user_id: UserId) -> Vec<proto::Collaborator> {
+ let mut collaborators = HashMap::new();
+ for worktree_id in self
+ .visible_worktrees_by_user_id
+ .get(&user_id)
+ .unwrap_or(&HashSet::new())
+ {
+ let worktree = &self.worktrees[worktree_id];
+
+ let mut guests = HashSet::new();
+ if let Ok(share) = worktree.share() {
+ for guest_connection_id in share.guest_connection_ids.keys() {
+ if let Ok(user_id) = self.user_id_for_connection(*guest_connection_id) {
+ guests.insert(user_id.to_proto());
+ }
+ }
+ }
+
+ if let Ok(host_user_id) = self.user_id_for_connection(worktree.host_connection_id) {
+ collaborators
+ .entry(host_user_id)
+ .or_insert_with(|| proto::Collaborator {
+ user_id: host_user_id.to_proto(),
+ worktrees: Vec::new(),
+ })
+ .worktrees
+ .push(proto::WorktreeMetadata {
+ id: *worktree_id,
+ root_name: worktree.root_name.clone(),
+ is_shared: worktree.share.is_some(),
+ guests: guests.into_iter().collect(),
+ });
+ }
+ }
+
+ collaborators.into_values().collect()
+ }
+
+ pub fn add_worktree(&mut self, worktree: Worktree) -> u64 {
+ let worktree_id = self.next_worktree_id;
+ for collaborator_user_id in &worktree.collaborator_user_ids {
+ self.visible_worktrees_by_user_id
+ .entry(*collaborator_user_id)
+ .or_default()
+ .insert(worktree_id);
+ }
+ self.next_worktree_id += 1;
+ if let Some(connection) = self.connections.get_mut(&worktree.host_connection_id) {
+ connection.worktrees.insert(worktree_id);
+ }
+ self.worktrees.insert(worktree_id, worktree);
+
+ #[cfg(test)]
+ self.check_invariants();
+
+ worktree_id
+ }
+
+ pub fn remove_worktree(
+ &mut self,
+ worktree_id: u64,
+ acting_connection_id: ConnectionId,
+ ) -> tide::Result<Worktree> {
+ let worktree = if let hash_map::Entry::Occupied(e) = self.worktrees.entry(worktree_id) {
+ if e.get().host_connection_id != acting_connection_id {
+ Err(anyhow!("not your worktree"))?;
+ }
+ e.remove()
+ } else {
+ return Err(anyhow!("no such worktree"))?;
+ };
+
+ if let Some(connection) = self.connections.get_mut(&worktree.host_connection_id) {
+ connection.worktrees.remove(&worktree_id);
+ }
+
+ if let Some(share) = &worktree.share {
+ for connection_id in share.guest_connection_ids.keys() {
+ if let Some(connection) = self.connections.get_mut(connection_id) {
+ connection.worktrees.remove(&worktree_id);
+ }
+ }
+ }
+
+ for collaborator_user_id in &worktree.collaborator_user_ids {
+ if let Some(visible_worktrees) = self
+ .visible_worktrees_by_user_id
+ .get_mut(&collaborator_user_id)
+ {
+ visible_worktrees.remove(&worktree_id);
+ }
+ }
+
+ #[cfg(test)]
+ self.check_invariants();
+
+ Ok(worktree)
+ }
+
+ pub fn share_worktree(
+ &mut self,
+ worktree_id: u64,
+ connection_id: ConnectionId,
+ entries: HashMap<u64, proto::Entry>,
+ ) -> Option<Vec<UserId>> {
+ if let Some(worktree) = self.worktrees.get_mut(&worktree_id) {
+ if worktree.host_connection_id == connection_id {
+ worktree.share = Some(WorktreeShare {
+ guest_connection_ids: Default::default(),
+ active_replica_ids: Default::default(),
+ entries,
+ });
+ return Some(worktree.collaborator_user_ids.clone());
+ }
+ }
+ None
+ }
+
+ pub fn unshare_worktree(
+ &mut self,
+ worktree_id: u64,
+ acting_connection_id: ConnectionId,
+ ) -> tide::Result<UnsharedWorktree> {
+ let worktree = if let Some(worktree) = self.worktrees.get_mut(&worktree_id) {
+ worktree
+ } else {
+ return Err(anyhow!("no such worktree"))?;
+ };
+
+ if worktree.host_connection_id != acting_connection_id {
+ return Err(anyhow!("not your worktree"))?;
+ }
+
+ let connection_ids = worktree.connection_ids();
+ let collaborator_ids = worktree.collaborator_user_ids.clone();
+ if let Some(share) = worktree.share.take() {
+ for connection_id in share.guest_connection_ids.into_keys() {
+ if let Some(connection) = self.connections.get_mut(&connection_id) {
+ connection.worktrees.remove(&worktree_id);
+ }
+ }
+
+ #[cfg(test)]
+ self.check_invariants();
+
+ Ok(UnsharedWorktree {
+ connection_ids,
+ collaborator_ids,
+ })
+ } else {
+ Err(anyhow!("worktree is not shared"))?
+ }
+ }
+
+ pub fn join_worktree(
+ &mut self,
+ connection_id: ConnectionId,
+ user_id: UserId,
+ worktree_id: u64,
+ ) -> tide::Result<JoinedWorktree> {
+ let connection = self
+ .connections
+ .get_mut(&connection_id)
+ .ok_or_else(|| anyhow!("no such connection"))?;
+ let worktree = self
+ .worktrees
+ .get_mut(&worktree_id)
+ .and_then(|worktree| {
+ if worktree.collaborator_user_ids.contains(&user_id) {
+ Some(worktree)
+ } else {
+ None
+ }
+ })
+ .ok_or_else(|| anyhow!("no such worktree"))?;
+
+ let share = worktree.share_mut()?;
+ connection.worktrees.insert(worktree_id);
+
+ let mut replica_id = 1;
+ while share.active_replica_ids.contains(&replica_id) {
+ replica_id += 1;
+ }
+ share.active_replica_ids.insert(replica_id);
+ share.guest_connection_ids.insert(connection_id, replica_id);
+
+ #[cfg(test)]
+ self.check_invariants();
+
+ Ok(JoinedWorktree {
+ replica_id,
+ worktree: &self.worktrees[&worktree_id],
+ })
+ }
+
+ pub fn leave_worktree(
+ &mut self,
+ connection_id: ConnectionId,
+ worktree_id: u64,
+ ) -> Option<LeftWorktree> {
+ let worktree = self.worktrees.get_mut(&worktree_id)?;
+ let share = worktree.share.as_mut()?;
+ let replica_id = share.guest_connection_ids.remove(&connection_id)?;
+ share.active_replica_ids.remove(&replica_id);
+
+ if let Some(connection) = self.connections.get_mut(&connection_id) {
+ connection.worktrees.remove(&worktree_id);
+ }
+
+ let connection_ids = worktree.connection_ids();
+ let collaborator_ids = worktree.collaborator_user_ids.clone();
+
+ #[cfg(test)]
+ self.check_invariants();
+
+ Some(LeftWorktree {
+ connection_ids,
+ collaborator_ids,
+ })
+ }
+
+ pub fn update_worktree(
+ &mut self,
+ connection_id: ConnectionId,
+ worktree_id: u64,
+ removed_entries: &[u64],
+ updated_entries: &[proto::Entry],
+ ) -> tide::Result<Vec<ConnectionId>> {
+ let worktree = self.write_worktree(worktree_id, connection_id)?;
+ let share = worktree.share_mut()?;
+ for entry_id in removed_entries {
+ share.entries.remove(&entry_id);
+ }
+ for entry in updated_entries {
+ share.entries.insert(entry.id, entry.clone());
+ }
+ Ok(worktree.connection_ids())
+ }
+
+ pub fn worktree_host_connection_id(
+ &self,
+ connection_id: ConnectionId,
+ worktree_id: u64,
+ ) -> tide::Result<ConnectionId> {
+ Ok(self
+ .read_worktree(worktree_id, connection_id)?
+ .host_connection_id)
+ }
+
+ pub fn worktree_guest_connection_ids(
+ &self,
+ connection_id: ConnectionId,
+ worktree_id: u64,
+ ) -> tide::Result<Vec<ConnectionId>> {
+ Ok(self
+ .read_worktree(worktree_id, connection_id)?
+ .share()?
+ .guest_connection_ids
+ .keys()
+ .copied()
+ .collect())
+ }
+
+ pub fn worktree_connection_ids(
+ &self,
+ connection_id: ConnectionId,
+ worktree_id: u64,
+ ) -> tide::Result<Vec<ConnectionId>> {
+ Ok(self
+ .read_worktree(worktree_id, connection_id)?
+ .connection_ids())
+ }
+
+ pub fn channel_connection_ids(&self, channel_id: ChannelId) -> Option<Vec<ConnectionId>> {
+ Some(self.channels.get(&channel_id)?.connection_ids())
+ }
+
+ fn read_worktree(
+ &self,
+ worktree_id: u64,
+ connection_id: ConnectionId,
+ ) -> tide::Result<&Worktree> {
+ let worktree = self
+ .worktrees
+ .get(&worktree_id)
+ .ok_or_else(|| anyhow!("worktree not found"))?;
+
+ if worktree.host_connection_id == connection_id
+ || worktree
+ .share()?
+ .guest_connection_ids
+ .contains_key(&connection_id)
+ {
+ Ok(worktree)
+ } else {
+ Err(anyhow!(
+ "{} is not a member of worktree {}",
+ connection_id,
+ worktree_id
+ ))?
+ }
+ }
+
+ fn write_worktree(
+ &mut self,
+ worktree_id: u64,
+ connection_id: ConnectionId,
+ ) -> tide::Result<&mut Worktree> {
+ let worktree = self
+ .worktrees
+ .get_mut(&worktree_id)
+ .ok_or_else(|| anyhow!("worktree not found"))?;
+
+ if worktree.host_connection_id == connection_id
+ || worktree.share.as_ref().map_or(false, |share| {
+ share.guest_connection_ids.contains_key(&connection_id)
+ })
+ {
+ Ok(worktree)
+ } else {
+ Err(anyhow!(
+ "{} is not a member of worktree {}",
+ connection_id,
+ worktree_id
+ ))?
+ }
+ }
+
+ #[cfg(test)]
+ fn check_invariants(&self) {
+ for (connection_id, connection) in &self.connections {
+ for worktree_id in &connection.worktrees {
+ let worktree = &self.worktrees.get(&worktree_id).unwrap();
+ if worktree.host_connection_id != *connection_id {
+ assert!(worktree
+ .share()
+ .unwrap()
+ .guest_connection_ids
+ .contains_key(connection_id));
+ }
+ }
+ for channel_id in &connection.channels {
+ let channel = self.channels.get(channel_id).unwrap();
+ assert!(channel.connection_ids.contains(connection_id));
+ }
+ assert!(self
+ .connections_by_user_id
+ .get(&connection.user_id)
+ .unwrap()
+ .contains(connection_id));
+ }
+
+ for (user_id, connection_ids) in &self.connections_by_user_id {
+ for connection_id in connection_ids {
+ assert_eq!(
+ self.connections.get(connection_id).unwrap().user_id,
+ *user_id
+ );
+ }
+ }
+
+ for (worktree_id, worktree) in &self.worktrees {
+ let host_connection = self.connections.get(&worktree.host_connection_id).unwrap();
+ assert!(host_connection.worktrees.contains(worktree_id));
+
+ for collaborator_id in &worktree.collaborator_user_ids {
+ let visible_worktree_ids = self
+ .visible_worktrees_by_user_id
+ .get(collaborator_id)
+ .unwrap();
+ assert!(visible_worktree_ids.contains(worktree_id));
+ }
+
+ if let Some(share) = &worktree.share {
+ for guest_connection_id in share.guest_connection_ids.keys() {
+ let guest_connection = self.connections.get(guest_connection_id).unwrap();
+ assert!(guest_connection.worktrees.contains(worktree_id));
+ }
+ assert_eq!(
+ share.active_replica_ids.len(),
+ share.guest_connection_ids.len(),
+ );
+ assert_eq!(
+ share.active_replica_ids,
+ share
+ .guest_connection_ids
+ .values()
+ .copied()
+ .collect::<HashSet<_>>(),
+ );
+ }
+ }
+
+ for (user_id, visible_worktree_ids) in &self.visible_worktrees_by_user_id {
+ for worktree_id in visible_worktree_ids {
+ let worktree = self.worktrees.get(worktree_id).unwrap();
+ assert!(worktree.collaborator_user_ids.contains(user_id));
+ }
+ }
+
+ for (channel_id, channel) in &self.channels {
+ for connection_id in &channel.connection_ids {
+ let connection = self.connections.get(connection_id).unwrap();
+ assert!(connection.channels.contains(channel_id));
+ }
+ }
+ }
+}
+
+impl Worktree {
+ pub fn connection_ids(&self) -> Vec<ConnectionId> {
+ if let Some(share) = &self.share {
+ share
+ .guest_connection_ids
+ .keys()
+ .copied()
+ .chain(Some(self.host_connection_id))
+ .collect()
+ } else {
+ vec![self.host_connection_id]
+ }
+ }
+
+ pub fn share(&self) -> tide::Result<&WorktreeShare> {
+ Ok(self
+ .share
+ .as_ref()
+ .ok_or_else(|| anyhow!("worktree is not shared"))?)
+ }
+
+ fn share_mut(&mut self) -> tide::Result<&mut WorktreeShare> {
+ Ok(self
+ .share
+ .as_mut()
+ .ok_or_else(|| anyhow!("worktree is not shared"))?)
+ }
+}
+
+impl Channel {
+ fn connection_ids(&self) -> Vec<ConnectionId> {
+ self.connection_ids.iter().copied().collect()
+ }
+}
@@ -56,10 +56,13 @@ border = { width = 1, color = "$border.0", right = true }
extends = "$workspace.sidebar"
border = { width = 1, color = "$border.0", left = true }
+[panel]
+padding = 12
+
[chat_panel]
+extends = "$panel"
channel_name = { extends = "$text.0", weight = "bold" }
channel_name_hash = { text = "$text.2", padding.right = 8 }
-padding = 12
[chat_panel.message]
body = "$text.1"
@@ -120,6 +123,41 @@ underline = true
extends = "$chat_panel.sign_in_prompt"
color = "$text.1.color"
+[people_panel]
+extends = "$panel"
+host_row_height = 28
+host_avatar = { corner_radius = 10, width = 20 }
+host_username = { extends = "$text.0", padding.left = 8 }
+tree_branch_width = 1
+tree_branch_color = "$surface.2"
+
+[people_panel.worktree]
+height = 24
+padding = { left = 8 }
+guest_avatar = { corner_radius = 8, width = 16 }
+guest_avatar_spacing = 4
+
+[people_panel.worktree.name]
+extends = "$text.1"
+margin = { right = 6 }
+
+[people_panel.unshared_worktree]
+extends = "$people_panel.worktree"
+
+[people_panel.hovered_unshared_worktree]
+extends = "$people_panel.unshared_worktree"
+background = "$state.hover"
+corner_radius = 6
+
+[people_panel.shared_worktree]
+extends = "$people_panel.worktree"
+name.color = "$text.0.color"
+
+[people_panel.hovered_shared_worktree]
+extends = "$people_panel.shared_worktree"
+background = "$state.hover"
+corner_radius = 6
+
[selector]
background = "$surface.0"
padding = 8
@@ -6,7 +6,7 @@ use crate::{
use anyhow::{anyhow, Context, Result};
use gpui::{
sum_tree::{self, Bias, SumTree},
- Entity, ModelContext, ModelHandle, MutableAppContext, Task, WeakModelHandle,
+ AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task, WeakModelHandle,
};
use postage::prelude::Stream;
use rand::prelude::*;
@@ -26,7 +26,7 @@ pub struct ChannelList {
available_channels: Option<Vec<ChannelDetails>>,
channels: HashMap<u64, WeakModelHandle<Channel>>,
rpc: Arc<Client>,
- user_store: Arc<UserStore>,
+ user_store: ModelHandle<UserStore>,
_task: Task<Option<()>>,
}
@@ -41,7 +41,7 @@ pub struct Channel {
messages: SumTree<ChannelMessage>,
loaded_all_messages: bool,
next_pending_message_id: usize,
- user_store: Arc<UserStore>,
+ user_store: ModelHandle<UserStore>,
rpc: Arc<Client>,
rng: StdRng,
_subscription: rpc::Subscription,
@@ -87,7 +87,7 @@ impl Entity for ChannelList {
impl ChannelList {
pub fn new(
- user_store: Arc<UserStore>,
+ user_store: ModelHandle<UserStore>,
rpc: Arc<rpc::Client>,
cx: &mut ModelContext<Self>,
) -> Self {
@@ -186,11 +186,11 @@ impl Entity for Channel {
impl Channel {
pub fn new(
details: ChannelDetails,
- user_store: Arc<UserStore>,
+ user_store: ModelHandle<UserStore>,
rpc: Arc<Client>,
cx: &mut ModelContext<Self>,
) -> Self {
- let _subscription = rpc.subscribe_from_model(details.id, cx, Self::handle_message_sent);
+ let _subscription = rpc.subscribe_to_entity(details.id, cx, Self::handle_message_sent);
{
let user_store = user_store.clone();
@@ -199,7 +199,8 @@ impl Channel {
cx.spawn(|channel, mut cx| {
async move {
let response = rpc.request(proto::JoinChannel { channel_id }).await?;
- let messages = messages_from_proto(response.messages, &user_store).await?;
+ let messages =
+ messages_from_proto(response.messages, &user_store, &mut cx).await?;
let loaded_all_messages = response.done;
channel.update(&mut cx, |channel, cx| {
@@ -241,6 +242,7 @@ impl Channel {
let current_user = self
.user_store
+ .read(cx)
.current_user()
.ok_or_else(|| anyhow!("current_user is not present"))?;
@@ -272,6 +274,7 @@ impl Channel {
let message = ChannelMessage::from_proto(
response.message.ok_or_else(|| anyhow!("invalid message"))?,
&user_store,
+ &mut cx,
)
.await?;
this.update(&mut cx, |this, cx| {
@@ -301,7 +304,8 @@ impl Channel {
})
.await?;
let loaded_all_messages = response.done;
- let messages = messages_from_proto(response.messages, &user_store).await?;
+ let messages =
+ messages_from_proto(response.messages, &user_store, &mut cx).await?;
this.update(&mut cx, |this, cx| {
this.loaded_all_messages = loaded_all_messages;
this.insert_messages(messages, cx);
@@ -324,7 +328,7 @@ impl Channel {
cx.spawn(|this, mut cx| {
async move {
let response = rpc.request(proto::JoinChannel { channel_id }).await?;
- let messages = messages_from_proto(response.messages, &user_store).await?;
+ let messages = messages_from_proto(response.messages, &user_store, &mut cx).await?;
let loaded_all_messages = response.done;
let pending_messages = this.update(&mut cx, |this, cx| {
@@ -359,6 +363,7 @@ impl Channel {
let message = ChannelMessage::from_proto(
response.message.ok_or_else(|| anyhow!("invalid message"))?,
&user_store,
+ &mut cx,
)
.await?;
this.update(&mut cx, |this, cx| {
@@ -413,7 +418,7 @@ impl Channel {
cx.spawn(|this, mut cx| {
async move {
- let message = ChannelMessage::from_proto(message, &user_store).await?;
+ let message = ChannelMessage::from_proto(message, &user_store, &mut cx).await?;
this.update(&mut cx, |this, cx| {
this.insert_messages(SumTree::from_item(message, &()), cx)
});
@@ -486,7 +491,8 @@ impl Channel {
async fn messages_from_proto(
proto_messages: Vec<proto::ChannelMessage>,
- user_store: &UserStore,
+ user_store: &ModelHandle<UserStore>,
+ cx: &mut AsyncAppContext,
) -> Result<SumTree<ChannelMessage>> {
let unique_user_ids = proto_messages
.iter()
@@ -494,11 +500,15 @@ async fn messages_from_proto(
.collect::<HashSet<_>>()
.into_iter()
.collect();
- user_store.load_users(unique_user_ids).await?;
+ user_store
+ .update(cx, |user_store, cx| {
+ user_store.load_users(unique_user_ids, cx)
+ })
+ .await?;
let mut messages = Vec::with_capacity(proto_messages.len());
for message in proto_messages {
- messages.push(ChannelMessage::from_proto(message, &user_store).await?);
+ messages.push(ChannelMessage::from_proto(message, user_store, cx).await?);
}
let mut result = SumTree::new();
result.extend(messages, &());
@@ -517,9 +527,14 @@ impl From<proto::Channel> for ChannelDetails {
impl ChannelMessage {
pub async fn from_proto(
message: proto::ChannelMessage,
- user_store: &UserStore,
+ user_store: &ModelHandle<UserStore>,
+ cx: &mut AsyncAppContext,
) -> Result<Self> {
- let sender = user_store.fetch_user(message.sender_id).await?;
+ let sender = user_store
+ .update(cx, |user_store, cx| {
+ user_store.fetch_user(message.sender_id, cx)
+ })
+ .await?;
Ok(ChannelMessage {
id: ChannelMessageId::Saved(message.id),
body: message.body,
@@ -595,26 +610,11 @@ mod tests {
let mut client = Client::new();
let http_client = FakeHttpClient::new(|_| async move { Ok(Response::new(404)) });
let server = FakeServer::for_client(user_id, &mut client, &cx).await;
- let user_store = UserStore::new(client.clone(), http_client, cx.background().as_ref());
+ let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
let channel_list = cx.add_model(|cx| ChannelList::new(user_store, client.clone(), cx));
channel_list.read_with(&cx, |list, _| assert_eq!(list.available_channels(), None));
- let get_users = server.receive::<proto::GetUsers>().await.unwrap();
- assert_eq!(get_users.payload.user_ids, vec![5]);
- server
- .respond(
- get_users.receipt(),
- proto::GetUsersResponse {
- users: vec![proto::User {
- id: 5,
- github_login: "nathansobo".into(),
- avatar_url: "http://avatar.com/nathansobo".into(),
- }],
- },
- )
- .await;
-
// Get the available channels.
let get_channels = server.receive::<proto::GetChannels>().await.unwrap();
server
@@ -639,6 +639,21 @@ mod tests {
)
});
+ let get_users = server.receive::<proto::GetUsers>().await.unwrap();
+ assert_eq!(get_users.payload.user_ids, vec![5]);
+ server
+ .respond(
+ get_users.receipt(),
+ proto::GetUsersResponse {
+ users: vec![proto::User {
+ id: 5,
+ github_login: "nathansobo".into(),
+ avatar_url: "http://avatar.com/nathansobo".into(),
+ }],
+ },
+ )
+ .await;
+
// Join a channel and populate its existing messages.
let channel = channel_list
.update(&mut cx, |list, cx| {
@@ -2320,6 +2320,7 @@ impl Editor {
buffer::Event::Saved => cx.emit(Event::Saved),
buffer::Event::FileHandleChanged => cx.emit(Event::FileHandleChanged),
buffer::Event::Reloaded => cx.emit(Event::FileHandleChanged),
+ buffer::Event::Closed => cx.emit(Event::Closed),
buffer::Event::Reparsed => {}
}
}
@@ -2449,6 +2450,7 @@ pub enum Event {
Dirtied,
Saved,
FileHandleChanged,
+ Closed,
}
impl Entity for Editor {
@@ -2556,6 +2558,10 @@ impl workspace::ItemView for Editor {
matches!(event, Event::Activate)
}
+ fn should_close_item_on_event(event: &Self::Event) -> bool {
+ matches!(event, Event::Closed)
+ }
+
fn should_update_tab_on_event(event: &Self::Event) -> bool {
matches!(
event,
@@ -719,11 +719,6 @@ impl Buffer {
let text = self.visible_text.clone();
let version = self.version.clone();
- if let Some(language) = worktree.read(cx).languages().select_language(&path).cloned() {
- self.language = Some(language);
- self.reparse(cx);
- }
-
let save_as = worktree.update(cx, |worktree, cx| {
worktree
.as_local_mut()
@@ -734,6 +729,11 @@ impl Buffer {
cx.spawn(|this, mut cx| async move {
save_as.await.map(|new_file| {
this.update(&mut cx, |this, cx| {
+ if let Some(language) = new_file.select_language(cx) {
+ this.language = Some(language);
+ this.reparse(cx);
+ }
+
let mtime = new_file.mtime;
this.file = Some(new_file);
this.did_save(version, mtime, cx);
@@ -801,6 +801,10 @@ impl Buffer {
cx.emit(Event::FileHandleChanged);
}
+ pub fn close(&mut self, cx: &mut ModelContext<Self>) {
+ cx.emit(Event::Closed);
+ }
+
pub fn language(&self) -> Option<&Arc<Language>> {
self.language.as_ref()
}
@@ -2264,6 +2268,7 @@ pub enum Event {
FileHandleChanged,
Reloaded,
Reparsed,
+ Closed,
}
impl Entity for Buffer {
@@ -2928,6 +2933,7 @@ mod tests {
use crate::{
fs::RealFs,
language::LanguageRegistry,
+ rpc,
test::temp_tree,
util::RandomCharIter,
worktree::{Worktree, WorktreeHandle as _},
@@ -3394,9 +3400,10 @@ mod tests {
"file3": "ghi",
}));
let tree = Worktree::open_local(
+ rpc::Client::new(),
dir.path(),
- Default::default(),
Arc::new(RealFs),
+ Default::default(),
&mut cx.to_async(),
)
.await
@@ -3516,9 +3523,10 @@ mod tests {
let initial_contents = "aaa\nbbbbb\nc\n";
let dir = temp_tree(json!({ "the-file": initial_contents }));
let tree = Worktree::open_local(
+ rpc::Client::new(),
dir.path(),
- Default::default(),
Arc::new(RealFs),
+ Default::default(),
&mut cx.to_async(),
)
.await
@@ -35,6 +35,10 @@ pub struct LanguageRegistry {
}
impl Language {
+ pub fn name(&self) -> &str {
+ self.config.name.as_str()
+ }
+
pub fn highlight_map(&self) -> HighlightMap {
self.highlight_map.lock().clone()
}
@@ -133,27 +137,26 @@ mod tests {
// matching file extension
assert_eq!(
- registry.select_language("zed/lib.rs").map(get_name),
+ registry.select_language("zed/lib.rs").map(|l| l.name()),
Some("Rust")
);
assert_eq!(
- registry.select_language("zed/lib.mk").map(get_name),
+ registry.select_language("zed/lib.mk").map(|l| l.name()),
Some("Make")
);
// matching filename
assert_eq!(
- registry.select_language("zed/Makefile").map(get_name),
+ registry.select_language("zed/Makefile").map(|l| l.name()),
Some("Make")
);
// matching suffix that is not the full file extension or filename
- assert_eq!(registry.select_language("zed/cars").map(get_name), None);
- assert_eq!(registry.select_language("zed/a.cars").map(get_name), None);
- assert_eq!(registry.select_language("zed/sumk").map(get_name), None);
-
- fn get_name(language: &Arc<Language>) -> &str {
- language.config.name.as_str()
- }
+ assert_eq!(registry.select_language("zed/cars").map(|l| l.name()), None);
+ assert_eq!(
+ registry.select_language("zed/a.cars").map(|l| l.name()),
+ None
+ );
+ assert_eq!(registry.select_language("zed/sumk").map(|l| l.name()), None);
}
}
@@ -8,6 +8,7 @@ mod fuzzy;
pub mod http;
pub mod language;
pub mod menus;
+pub mod people_panel;
pub mod project_browser;
pub mod rpc;
pub mod settings;
@@ -43,7 +44,7 @@ pub struct AppState {
pub languages: Arc<language::LanguageRegistry>,
pub themes: Arc<settings::ThemeRegistry>,
pub rpc: Arc<rpc::Client>,
- pub user_store: Arc<user::UserStore>,
+ pub user_store: ModelHandle<user::UserStore>,
pub fs: Arc<dyn fs::Fs>,
pub channel_list: ModelHandle<ChannelList>,
}
@@ -38,7 +38,7 @@ fn main() {
app.run(move |cx| {
let rpc = rpc::Client::new();
let http = http::client();
- let user_store = UserStore::new(rpc.clone(), http.clone(), cx.background());
+ let user_store = cx.add_model(|cx| UserStore::new(rpc.clone(), http.clone(), cx));
let app_state = Arc::new(AppState {
languages: languages.clone(),
settings_tx: Arc::new(Mutex::new(settings_tx)),
@@ -16,21 +16,6 @@ pub fn menus(state: &Arc<AppState>) -> Vec<Menu<'static>> {
action: Box::new(super::About),
},
MenuItem::Separator,
- MenuItem::Action {
- name: "Sign In",
- keystroke: None,
- action: Box::new(super::Authenticate),
- },
- MenuItem::Action {
- name: "Share",
- keystroke: None,
- action: Box::new(workspace::ShareWorktree),
- },
- MenuItem::Action {
- name: "Join",
- keystroke: None,
- action: Box::new(workspace::JoinWorktree(state.clone())),
- },
MenuItem::Action {
name: "Quit",
keystroke: Some("cmd-q"),
@@ -0,0 +1,267 @@
+use crate::{
+ theme::Theme,
+ user::{Collaborator, UserStore},
+ Settings,
+};
+use gpui::{
+ action,
+ elements::*,
+ geometry::{rect::RectF, vector::vec2f},
+ platform::CursorStyle,
+ Element, ElementBox, Entity, LayoutContext, ModelHandle, RenderContext, Subscription, View,
+ ViewContext,
+};
+use postage::watch;
+
+action!(JoinWorktree, u64);
+action!(LeaveWorktree, u64);
+action!(ShareWorktree, u64);
+action!(UnshareWorktree, u64);
+
+pub struct PeoplePanel {
+ collaborators: ListState,
+ user_store: ModelHandle<UserStore>,
+ settings: watch::Receiver<Settings>,
+ _maintain_collaborators: Subscription,
+}
+
+impl PeoplePanel {
+ pub fn new(
+ user_store: ModelHandle<UserStore>,
+ settings: watch::Receiver<Settings>,
+ cx: &mut ViewContext<Self>,
+ ) -> Self {
+ Self {
+ collaborators: ListState::new(
+ user_store.read(cx).collaborators().len(),
+ Orientation::Top,
+ 1000.,
+ {
+ let user_store = user_store.clone();
+ let settings = settings.clone();
+ move |ix, cx| {
+ let user_store = user_store.read(cx);
+ let collaborators = user_store.collaborators().clone();
+ let current_user_id = user_store.current_user().map(|user| user.id);
+ Self::render_collaborator(
+ &collaborators[ix],
+ current_user_id,
+ &settings.borrow().theme,
+ cx,
+ )
+ }
+ },
+ ),
+ _maintain_collaborators: cx.observe(&user_store, Self::update_collaborators),
+ user_store,
+ settings,
+ }
+ }
+
+ fn update_collaborators(&mut self, _: ModelHandle<UserStore>, cx: &mut ViewContext<Self>) {
+ self.collaborators
+ .reset(self.user_store.read(cx).collaborators().len());
+ cx.notify();
+ }
+
+ fn render_collaborator(
+ collaborator: &Collaborator,
+ current_user_id: Option<u64>,
+ theme: &Theme,
+ cx: &mut LayoutContext,
+ ) -> ElementBox {
+ let theme = &theme.people_panel;
+ let worktree_count = collaborator.worktrees.len();
+ let font_cache = cx.font_cache();
+ let line_height = theme.unshared_worktree.name.text.line_height(font_cache);
+ let cap_height = theme.unshared_worktree.name.text.cap_height(font_cache);
+ let baseline_offset = theme
+ .unshared_worktree
+ .name
+ .text
+ .baseline_offset(font_cache)
+ + (theme.unshared_worktree.height - line_height) / 2.;
+ let tree_branch_width = theme.tree_branch_width;
+ let tree_branch_color = theme.tree_branch_color;
+ let host_avatar_height = theme
+ .host_avatar
+ .width
+ .or(theme.host_avatar.height)
+ .unwrap_or(0.);
+
+ Flex::column()
+ .with_child(
+ Flex::row()
+ .with_children(collaborator.user.avatar.clone().map(|avatar| {
+ Image::new(avatar)
+ .with_style(theme.host_avatar)
+ .aligned()
+ .left()
+ .boxed()
+ }))
+ .with_child(
+ Label::new(
+ collaborator.user.github_login.clone(),
+ theme.host_username.text.clone(),
+ )
+ .contained()
+ .with_style(theme.host_username.container)
+ .aligned()
+ .left()
+ .boxed(),
+ )
+ .constrained()
+ .with_height(theme.host_row_height)
+ .boxed(),
+ )
+ .with_children(
+ collaborator
+ .worktrees
+ .iter()
+ .enumerate()
+ .map(|(ix, worktree)| {
+ let worktree_id = worktree.id;
+
+ Flex::row()
+ .with_child(
+ Canvas::new(move |bounds, _, cx| {
+ let start_x = bounds.min_x() + (bounds.width() / 2.)
+ - (tree_branch_width / 2.);
+ let end_x = bounds.max_x();
+ let start_y = bounds.min_y();
+ let end_y =
+ bounds.min_y() + baseline_offset - (cap_height / 2.);
+
+ cx.scene.push_quad(gpui::Quad {
+ bounds: RectF::from_points(
+ vec2f(start_x, start_y),
+ vec2f(
+ start_x + tree_branch_width,
+ if ix + 1 == worktree_count {
+ end_y
+ } else {
+ bounds.max_y()
+ },
+ ),
+ ),
+ background: Some(tree_branch_color),
+ border: gpui::Border::default(),
+ corner_radius: 0.,
+ });
+ cx.scene.push_quad(gpui::Quad {
+ bounds: RectF::from_points(
+ vec2f(start_x, end_y),
+ vec2f(end_x, end_y + tree_branch_width),
+ ),
+ background: Some(tree_branch_color),
+ border: gpui::Border::default(),
+ corner_radius: 0.,
+ });
+ })
+ .constrained()
+ .with_width(host_avatar_height)
+ .boxed(),
+ )
+ .with_child({
+ let is_host = Some(collaborator.user.id) == current_user_id;
+ let is_guest = !is_host
+ && worktree
+ .guests
+ .iter()
+ .any(|guest| Some(guest.id) == current_user_id);
+ let is_shared = worktree.is_shared;
+
+ MouseEventHandler::new::<PeoplePanel, _, _, _>(
+ worktree_id as usize,
+ cx,
+ |mouse_state, _| {
+ let style = match (worktree.is_shared, mouse_state.hovered)
+ {
+ (false, false) => &theme.unshared_worktree,
+ (false, true) => &theme.hovered_unshared_worktree,
+ (true, false) => &theme.shared_worktree,
+ (true, true) => &theme.hovered_shared_worktree,
+ };
+
+ Flex::row()
+ .with_child(
+ Label::new(
+ worktree.root_name.clone(),
+ style.name.text.clone(),
+ )
+ .aligned()
+ .left()
+ .contained()
+ .with_style(style.name.container)
+ .boxed(),
+ )
+ .with_children(worktree.guests.iter().filter_map(
+ |participant| {
+ participant.avatar.clone().map(|avatar| {
+ Image::new(avatar)
+ .with_style(style.guest_avatar)
+ .aligned()
+ .left()
+ .contained()
+ .with_margin_right(
+ style.guest_avatar_spacing,
+ )
+ .boxed()
+ })
+ },
+ ))
+ .contained()
+ .with_style(style.container)
+ .constrained()
+ .with_height(style.height)
+ .boxed()
+ },
+ )
+ .with_cursor_style(if is_host || is_shared {
+ CursorStyle::PointingHand
+ } else {
+ CursorStyle::Arrow
+ })
+ .on_click(move |cx| {
+ if is_shared {
+ if is_host {
+ cx.dispatch_action(UnshareWorktree(worktree_id));
+ } else if is_guest {
+ cx.dispatch_action(LeaveWorktree(worktree_id));
+ } else {
+ cx.dispatch_action(JoinWorktree(worktree_id))
+ }
+ } else if is_host {
+ cx.dispatch_action(ShareWorktree(worktree_id));
+ }
+ })
+ .expanded(1.0)
+ .boxed()
+ })
+ .constrained()
+ .with_height(theme.unshared_worktree.height)
+ .boxed()
+ }),
+ )
+ .boxed()
+ }
+}
+
+pub enum Event {}
+
+impl Entity for PeoplePanel {
+ type Event = Event;
+}
+
+impl View for PeoplePanel {
+ fn ui_name() -> &'static str {
+ "PeoplePanel"
+ }
+
+ fn render(&mut self, _: &mut RenderContext<Self>) -> ElementBox {
+ let theme = &self.settings.borrow().theme.people_panel;
+ Container::new(List::new(self.collaborators.clone()).boxed())
+ .with_style(theme.container)
+ .boxed()
+ }
+}
@@ -14,6 +14,7 @@ use std::{
any::TypeId,
collections::HashMap,
convert::TryFrom,
+ fmt::Write as _,
future::Future,
sync::{Arc, Weak},
time::{Duration, Instant},
@@ -29,6 +30,9 @@ use zrpc::{
lazy_static! {
static ref ZED_SERVER_URL: String =
std::env::var("ZED_SERVER_URL").unwrap_or("https://zed.dev:443".to_string());
+ static ref IMPERSONATE_LOGIN: Option<String> = std::env::var("ZED_IMPERSONATE")
+ .ok()
+ .and_then(|s| if s.is_empty() { None } else { Some(s) });
}
pub struct Client {
@@ -230,7 +234,51 @@ impl Client {
}
}
- pub fn subscribe_from_model<T, M, F>(
+ pub fn subscribe<T, M, F>(
+ self: &Arc<Self>,
+ cx: &mut ModelContext<M>,
+ mut handler: F,
+ ) -> Subscription
+ where
+ T: EnvelopedMessage,
+ M: Entity,
+ F: 'static
+ + Send
+ + Sync
+ + FnMut(&mut M, TypedEnvelope<T>, Arc<Self>, &mut ModelContext<M>) -> Result<()>,
+ {
+ let subscription_id = (TypeId::of::<T>(), Default::default());
+ let client = self.clone();
+ let mut state = self.state.write();
+ let model = cx.handle().downgrade();
+ let prev_extractor = state
+ .entity_id_extractors
+ .insert(subscription_id.0, Box::new(|_| Default::default()));
+ if prev_extractor.is_some() {
+ panic!("registered a handler for the same entity twice")
+ }
+
+ state.model_handlers.insert(
+ subscription_id,
+ Box::new(move |envelope, cx| {
+ if let Some(model) = model.upgrade(cx) {
+ let envelope = envelope.into_any().downcast::<TypedEnvelope<T>>().unwrap();
+ model.update(cx, |model, cx| {
+ if let Err(error) = handler(model, *envelope, client.clone(), cx) {
+ log::error!("error handling message: {}", error)
+ }
+ });
+ }
+ }),
+ );
+
+ Subscription {
+ client: Arc::downgrade(self),
+ id: subscription_id,
+ }
+ }
+
+ pub fn subscribe_to_entity<T, M, F>(
self: &Arc<Self>,
remote_id: u64,
cx: &mut ModelContext<M>,
@@ -306,12 +354,12 @@ impl Client {
self.set_status(Status::Reauthenticating, cx)
}
- let mut read_from_keychain = false;
+ let mut used_keychain = false;
let credentials = self.state.read().credentials.clone();
let credentials = if let Some(credentials) = credentials {
credentials
} else if let Some(credentials) = read_credentials_from_keychain(cx) {
- read_from_keychain = true;
+ used_keychain = true;
credentials
} else {
let credentials = match self.authenticate(&cx).await {
@@ -334,7 +382,7 @@ impl Client {
Ok(conn) => {
log::info!("connected to rpc address {}", *ZED_SERVER_URL);
self.state.write().credentials = Some(credentials.clone());
- if !read_from_keychain {
+ if !used_keychain && IMPERSONATE_LOGIN.is_none() {
write_credentials_to_keychain(&credentials, cx).log_err();
}
self.set_connection(conn, cx).await;
@@ -343,8 +391,8 @@ impl Client {
Err(err) => {
if matches!(err, EstablishConnectionError::Unauthorized) {
self.state.write().credentials.take();
- cx.platform().delete_credentials(&ZED_SERVER_URL).log_err();
- if read_from_keychain {
+ if used_keychain {
+ cx.platform().delete_credentials(&ZED_SERVER_URL).log_err();
self.set_status(Status::SignedOut, cx);
self.authenticate_and_connect(cx).await
} else {
@@ -484,10 +532,17 @@ impl Client {
// Open the Zed sign-in page in the user's browser, with query parameters that indicate
// that the user is signing in from a Zed app running on the same device.
- platform.open_url(&format!(
+ let mut url = format!(
"{}/sign_in?native_app_port={}&native_app_public_key={}",
*ZED_SERVER_URL, port, public_key_string
- ));
+ );
+
+ if let Some(impersonate_login) = IMPERSONATE_LOGIN.as_ref() {
+ log::info!("impersonating user @{}", impersonate_login);
+ write!(&mut url, "&impersonate={}", impersonate_login).unwrap();
+ }
+
+ platform.open_url(&url);
// Receive the HTTP request from the user's browser. Retrieve the user id and encrypted
// access token from the query params.
@@ -571,6 +626,10 @@ impl Client {
}
fn read_credentials_from_keychain(cx: &AsyncAppContext) -> Option<Credentials> {
+ if IMPERSONATE_LOGIN.is_some() {
+ return None;
+ }
+
let (user_id, access_token) = cx
.platform()
.read_credentials(&ZED_SERVER_URL)
@@ -168,7 +168,7 @@ pub fn test_app_state(cx: &mut MutableAppContext) -> Arc<AppState> {
let themes = ThemeRegistry::new(Assets, cx.font_cache().clone());
let rpc = rpc::Client::new();
let http = FakeHttpClient::new(|_| async move { Ok(ServerResponse::new(404)) });
- let user_store = UserStore::new(rpc.clone(), http, cx.background());
+ let user_store = cx.add_model(|cx| UserStore::new(rpc.clone(), http, cx));
Arc::new(AppState {
settings_tx: Arc::new(Mutex::new(settings_tx)),
settings,
@@ -24,6 +24,7 @@ pub struct Theme {
pub name: String,
pub workspace: Workspace,
pub chat_panel: ChatPanel,
+ pub people_panel: PeoplePanel,
pub selector: Selector,
pub editor: EditorStyle,
pub syntax: SyntaxTheme,
@@ -104,6 +105,31 @@ pub struct ChatPanel {
pub hovered_sign_in_prompt: TextStyle,
}
+#[derive(Deserialize)]
+pub struct PeoplePanel {
+ #[serde(flatten)]
+ pub container: ContainerStyle,
+ pub host_row_height: f32,
+ pub host_avatar: ImageStyle,
+ pub host_username: ContainedText,
+ pub tree_branch_width: f32,
+ pub tree_branch_color: Color,
+ pub shared_worktree: WorktreeRow,
+ pub hovered_shared_worktree: WorktreeRow,
+ pub unshared_worktree: WorktreeRow,
+ pub hovered_unshared_worktree: WorktreeRow,
+}
+
+#[derive(Deserialize)]
+pub struct WorktreeRow {
+ #[serde(flatten)]
+ pub container: ContainerStyle,
+ pub height: f32,
+ pub name: ContainedText,
+ pub guest_avatar: ImageStyle,
+ pub guest_avatar_spacing: f32,
+}
+
#[derive(Deserialize)]
pub struct ChatMessage {
#[serde(flatten)]
@@ -143,7 +169,7 @@ pub struct Selector {
pub active_item: ContainedLabel,
}
-#[derive(Deserialize)]
+#[derive(Debug, Deserialize)]
pub struct ContainedText {
#[serde(flatten)]
pub container: ContainerStyle,
@@ -237,9 +237,12 @@ impl Tree {
fn update_resolved(&self) {
match &mut *self.0.borrow_mut() {
Node::Object {
- resolved, children, ..
+ resolved,
+ base,
+ children,
+ ..
} => {
- *resolved = children.values().all(|c| c.is_resolved());
+ *resolved = base.is_none() && children.values().all(|c| c.is_resolved());
}
Node::Array {
resolved, children, ..
@@ -261,6 +264,9 @@ impl Tree {
if tree.is_resolved() {
while let Some(parent) = tree.parent() {
parent.update_resolved();
+ if !parent.is_resolved() {
+ break;
+ }
tree = parent;
}
}
@@ -330,9 +336,10 @@ impl Tree {
made_progress = true;
}
- if let Node::Object { resolved, .. } = &mut *self.0.borrow_mut() {
+ if let Node::Object { resolved, base, .. } = &mut *self.0.borrow_mut() {
if has_base {
if resolved_base.is_some() {
+ base.take();
*resolved = true;
} else {
unresolved.push(self.clone());
@@ -341,6 +348,8 @@ impl Tree {
*resolved = true;
}
}
+ } else if base.is_some() {
+ unresolved.push(self.clone());
}
Ok(made_progress)
@@ -427,6 +436,7 @@ mod test {
fn test_references() {
let json = serde_json::json!({
"a": {
+ "extends": "$g",
"x": "$b.d"
},
"b": {
@@ -436,6 +446,9 @@ mod test {
"e": {
"extends": "$a",
"f": "1"
+ },
+ "g": {
+ "h": 2
}
});
@@ -443,19 +456,27 @@ mod test {
resolve_references(json).unwrap(),
serde_json::json!({
"a": {
- "x": "1"
+ "extends": "$g",
+ "x": "1",
+ "h": 2
},
"b": {
"c": {
- "x": "1"
+ "extends": "$g",
+ "x": "1",
+ "h": 2
},
"d": "1"
},
"e": {
"extends": "$a",
"f": "1",
- "x": "1"
+ "x": "1",
+ "h": 2
},
+ "g": {
+ "h": 2
+ }
})
)
}
@@ -5,14 +5,13 @@ use crate::{
};
use anyhow::{anyhow, Context, Result};
use futures::future;
-use gpui::{executor, ImageData, Task};
-use parking_lot::Mutex;
-use postage::{oneshot, prelude::Stream, sink::Sink, watch};
+use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
+use postage::{prelude::Stream, sink::Sink, watch};
use std::{
- collections::HashMap,
- sync::{Arc, Weak},
+ collections::{HashMap, HashSet},
+ sync::Arc,
};
-use zrpc::proto;
+use zrpc::{proto, TypedEnvelope};
#[derive(Debug)]
pub struct User {
@@ -21,42 +20,75 @@ pub struct User {
pub avatar: Option<Arc<ImageData>>,
}
+#[derive(Debug)]
+pub struct Collaborator {
+ pub user: Arc<User>,
+ pub worktrees: Vec<WorktreeMetadata>,
+}
+
+#[derive(Debug)]
+pub struct WorktreeMetadata {
+ pub id: u64,
+ pub root_name: String,
+ pub is_shared: bool,
+ pub guests: Vec<Arc<User>>,
+}
+
pub struct UserStore {
- users: Mutex<HashMap<u64, Arc<User>>>,
+ users: HashMap<u64, Arc<User>>,
current_user: watch::Receiver<Option<Arc<User>>>,
+ collaborators: Arc<[Collaborator]>,
rpc: Arc<Client>,
http: Arc<dyn HttpClient>,
+ _maintain_collaborators: Task<()>,
_maintain_current_user: Task<()>,
}
+pub enum Event {}
+
+impl Entity for UserStore {
+ type Event = Event;
+}
+
impl UserStore {
- pub fn new(
- rpc: Arc<Client>,
- http: Arc<dyn HttpClient>,
- executor: &executor::Background,
- ) -> Arc<Self> {
+ pub fn new(rpc: Arc<Client>, http: Arc<dyn HttpClient>, cx: &mut ModelContext<Self>) -> Self {
let (mut current_user_tx, current_user_rx) = watch::channel();
- let (mut this_tx, mut this_rx) = oneshot::channel::<Weak<Self>>();
- let this = Arc::new(Self {
+ let (mut update_collaborators_tx, mut update_collaborators_rx) =
+ watch::channel::<Option<proto::UpdateCollaborators>>();
+ let update_collaborators_subscription = rpc.subscribe(
+ cx,
+ move |_: &mut Self, msg: TypedEnvelope<proto::UpdateCollaborators>, _, _| {
+ let _ = update_collaborators_tx.blocking_send(Some(msg.payload));
+ Ok(())
+ },
+ );
+ Self {
users: Default::default(),
current_user: current_user_rx,
+ collaborators: Arc::from([]),
rpc: rpc.clone(),
http,
- _maintain_current_user: executor.spawn(async move {
- let this = if let Some(this) = this_rx.recv().await {
- this
- } else {
- return;
- };
+ _maintain_collaborators: cx.spawn_weak(|this, mut cx| async move {
+ let _subscription = update_collaborators_subscription;
+ while let Some(message) = update_collaborators_rx.recv().await {
+ if let Some((message, this)) = message.zip(this.upgrade(&cx)) {
+ this.update(&mut cx, |this, cx| this.update_collaborators(message, cx))
+ .log_err()
+ .await;
+ }
+ }
+ }),
+ _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
let mut status = rpc.status();
while let Some(status) = status.recv().await {
match status {
Status::Connected { .. } => {
- if let Some((this, user_id)) = this.upgrade().zip(rpc.user_id()) {
- current_user_tx
- .send(this.fetch_user(user_id).log_err().await)
- .await
- .ok();
+ if let Some((this, user_id)) = this.upgrade(&cx).zip(rpc.user_id()) {
+ let user = this
+ .update(&mut cx, |this, cx| this.fetch_user(user_id, cx))
+ .log_err()
+ .await;
+ current_user_tx.send(user).await.ok();
}
}
Status::SignedOut => {
@@ -66,49 +98,100 @@ impl UserStore {
}
}
}),
- });
- let weak = Arc::downgrade(&this);
- executor
- .spawn(async move { this_tx.send(weak).await })
- .detach();
- this
+ }
}
- pub async fn load_users(&self, mut user_ids: Vec<u64>) -> Result<()> {
- {
- let users = self.users.lock();
- user_ids.retain(|id| !users.contains_key(id));
+ fn update_collaborators(
+ &mut self,
+ message: proto::UpdateCollaborators,
+ cx: &mut ModelContext<Self>,
+ ) -> Task<Result<()>> {
+ let mut user_ids = HashSet::new();
+ for collaborator in &message.collaborators {
+ user_ids.insert(collaborator.user_id);
+ user_ids.extend(
+ collaborator
+ .worktrees
+ .iter()
+ .flat_map(|w| &w.guests)
+ .copied(),
+ );
}
- if !user_ids.is_empty() {
- let response = self.rpc.request(proto::GetUsers { user_ids }).await?;
- let new_users = future::join_all(
- response
- .users
- .into_iter()
- .map(|user| User::new(user, self.http.as_ref())),
- )
- .await;
- let mut users = self.users.lock();
- for user in new_users {
- users.insert(user.id, Arc::new(user));
+ let load_users = self.load_users(user_ids.into_iter().collect(), cx);
+ cx.spawn(|this, mut cx| async move {
+ load_users.await?;
+
+ let mut collaborators = Vec::new();
+ for collaborator in message.collaborators {
+ collaborators.push(Collaborator::from_proto(collaborator, &this, &mut cx).await?);
}
- }
- Ok(())
+ this.update(&mut cx, |this, cx| {
+ collaborators.sort_by(|a, b| a.user.github_login.cmp(&b.user.github_login));
+ this.collaborators = collaborators.into();
+ cx.notify();
+ });
+
+ Ok(())
+ })
+ }
+
+ pub fn collaborators(&self) -> &Arc<[Collaborator]> {
+ &self.collaborators
}
- pub async fn fetch_user(&self, user_id: u64) -> Result<Arc<User>> {
- if let Some(user) = self.users.lock().get(&user_id).cloned() {
- return Ok(user);
+ pub fn load_users(
+ &mut self,
+ mut user_ids: Vec<u64>,
+ cx: &mut ModelContext<Self>,
+ ) -> Task<Result<()>> {
+ let rpc = self.rpc.clone();
+ let http = self.http.clone();
+ user_ids.retain(|id| !self.users.contains_key(id));
+ cx.spawn_weak(|this, mut cx| async move {
+ if !user_ids.is_empty() {
+ let response = rpc.request(proto::GetUsers { user_ids }).await?;
+ let new_users = future::join_all(
+ response
+ .users
+ .into_iter()
+ .map(|user| User::new(user, http.as_ref())),
+ )
+ .await;
+
+ if let Some(this) = this.upgrade(&cx) {
+ this.update(&mut cx, |this, _| {
+ for user in new_users {
+ this.users.insert(user.id, Arc::new(user));
+ }
+ });
+ }
+ }
+
+ Ok(())
+ })
+ }
+
+ pub fn fetch_user(
+ &mut self,
+ user_id: u64,
+ cx: &mut ModelContext<Self>,
+ ) -> Task<Result<Arc<User>>> {
+ if let Some(user) = self.users.get(&user_id).cloned() {
+ return cx.spawn_weak(|_, _| async move { Ok(user) });
}
- self.load_users(vec![user_id]).await?;
- self.users
- .lock()
- .get(&user_id)
- .cloned()
- .ok_or_else(|| anyhow!("server responded with no users"))
+ let load_users = self.load_users(vec![user_id], cx);
+ cx.spawn(|this, mut cx| async move {
+ load_users.await?;
+ this.update(&mut cx, |this, _| {
+ this.users
+ .get(&user_id)
+ .cloned()
+ .ok_or_else(|| anyhow!("server responded with no users"))
+ })
+ })
}
pub fn current_user(&self) -> Option<Arc<User>> {
@@ -130,6 +213,40 @@ impl User {
}
}
+impl Collaborator {
+ async fn from_proto(
+ collaborator: proto::Collaborator,
+ user_store: &ModelHandle<UserStore>,
+ cx: &mut AsyncAppContext,
+ ) -> Result<Self> {
+ let user = user_store
+ .update(cx, |user_store, cx| {
+ user_store.fetch_user(collaborator.user_id, cx)
+ })
+ .await?;
+ let mut worktrees = Vec::new();
+ for worktree in collaborator.worktrees {
+ let mut guests = Vec::new();
+ for participant_id in worktree.guests {
+ guests.push(
+ user_store
+ .update(cx, |user_store, cx| {
+ user_store.fetch_user(participant_id, cx)
+ })
+ .await?,
+ );
+ }
+ worktrees.push(WorktreeMetadata {
+ id: worktree.id,
+ root_name: worktree.root_name,
+ is_shared: worktree.is_shared,
+ guests,
+ });
+ }
+ Ok(Self { user, worktrees })
+ }
+}
+
async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
let url = Url::parse(url).with_context(|| format!("failed to parse avatar url {:?}", url))?;
let mut request = Request::new(Method::Get, url);
@@ -7,14 +7,16 @@ use crate::{
editor::Buffer,
fs::Fs,
language::LanguageRegistry,
+ people_panel::{JoinWorktree, LeaveWorktree, PeoplePanel, ShareWorktree, UnshareWorktree},
project_browser::ProjectBrowser,
rpc,
settings::Settings,
user,
- worktree::{File, Worktree},
+ util::TryFutureExt as _,
+ worktree::{self, File, Worktree},
AppState, Authenticate,
};
-use anyhow::{anyhow, Result};
+use anyhow::Result;
use gpui::{
action,
elements::*,
@@ -41,8 +43,6 @@ use std::{
action!(Open, Arc<AppState>);
action!(OpenPaths, OpenParams);
action!(OpenNew, Arc<AppState>);
-action!(ShareWorktree);
-action!(JoinWorktree, Arc<AppState>);
action!(Save);
action!(DebugElements);
@@ -52,13 +52,14 @@ pub fn init(cx: &mut MutableAppContext) {
open_paths(action, cx).detach()
});
cx.add_global_action(open_new);
- cx.add_global_action(join_worktree);
cx.add_action(Workspace::save_active_item);
cx.add_action(Workspace::debug_elements);
cx.add_action(Workspace::open_new_file);
+ cx.add_action(Workspace::toggle_sidebar_item);
cx.add_action(Workspace::share_worktree);
+ cx.add_action(Workspace::unshare_worktree);
cx.add_action(Workspace::join_worktree);
- cx.add_action(Workspace::toggle_sidebar_item);
+ cx.add_action(Workspace::leave_worktree);
cx.add_bindings(vec![
Binding::new("cmd-s", Save, None),
Binding::new("cmd-alt-i", DebugElements, None),
@@ -129,14 +130,6 @@ fn open_new(action: &OpenNew, cx: &mut MutableAppContext) {
});
}
-fn join_worktree(action: &JoinWorktree, cx: &mut MutableAppContext) {
- cx.add_window(window_options(), |cx| {
- let mut view = Workspace::new(action.0.as_ref(), cx);
- view.join_worktree(action, cx);
- view
- });
-}
-
fn window_options() -> WindowOptions<'static> {
WindowOptions {
bounds: RectF::new(vec2f(0., 0.), vec2f(1024., 768.)),
@@ -183,6 +176,9 @@ pub trait ItemView: View {
fn should_activate_item_on_event(_: &Self::Event) -> bool {
false
}
+ fn should_close_item_on_event(_: &Self::Event) -> bool {
+ false
+ }
fn should_update_tab_on_event(_: &Self::Event) -> bool {
false
}
@@ -281,6 +277,10 @@ impl<T: ItemView> ItemViewHandle for ViewHandle<T> {
fn set_parent_pane(&self, pane: &ViewHandle<Pane>, cx: &mut MutableAppContext) {
pane.update(cx, |_, cx| {
cx.subscribe(self, |pane, item, event, cx| {
+ if T::should_close_item_on_event(event) {
+ pane.close_item(item.id(), cx);
+ return;
+ }
if T::should_activate_item_on_event(event) {
if let Some(ix) = pane.item_index(&item) {
pane.activate_item(ix, cx);
@@ -341,7 +341,7 @@ pub struct Workspace {
pub settings: watch::Receiver<Settings>,
languages: Arc<LanguageRegistry>,
rpc: Arc<rpc::Client>,
- user_store: Arc<user::UserStore>,
+ user_store: ModelHandle<user::UserStore>,
fs: Arc<dyn Fs>,
modal: Option<AnyViewHandle>,
center: PaneGroup,
@@ -375,6 +375,13 @@ impl Workspace {
);
let mut right_sidebar = Sidebar::new(Side::Right);
+ right_sidebar.add_item(
+ "icons/user-16.svg",
+ cx.add_view(|cx| {
+ PeoplePanel::new(app_state.user_store.clone(), app_state.settings.clone(), cx)
+ })
+ .into(),
+ );
right_sidebar.add_item(
"icons/comment-16.svg",
cx.add_view(|cx| {
@@ -387,9 +394,8 @@ impl Workspace {
})
.into(),
);
- right_sidebar.add_item("icons/user-16.svg", cx.add_view(|_| ProjectBrowser).into());
- let mut current_user = app_state.user_store.watch_current_user().clone();
+ let mut current_user = app_state.user_store.read(cx).watch_current_user().clone();
let mut connection_status = app_state.rpc.status().clone();
let _observe_current_user = cx.spawn_weak(|this, mut cx| async move {
current_user.recv().await;
@@ -546,10 +552,11 @@ impl Workspace {
cx: &mut ViewContext<Self>,
) -> Task<Result<ModelHandle<Worktree>>> {
let languages = self.languages.clone();
+ let rpc = self.rpc.clone();
let fs = self.fs.clone();
let path = Arc::from(path);
cx.spawn(|this, mut cx| async move {
- let worktree = Worktree::open_local(path, languages, fs, &mut cx).await?;
+ let worktree = Worktree::open_local(rpc, path, fs, languages, &mut cx).await?;
this.update(&mut cx, |this, cx| {
cx.observe(&worktree, |_, _, cx| cx.notify()).detach();
this.worktrees.insert(worktree.clone());
@@ -815,69 +822,122 @@ impl Workspace {
};
}
- fn share_worktree(&mut self, _: &ShareWorktree, cx: &mut ViewContext<Self>) {
+ fn share_worktree(&mut self, action: &ShareWorktree, cx: &mut ViewContext<Self>) {
let rpc = self.rpc.clone();
- let platform = cx.platform();
-
- let task = cx.spawn(|this, mut cx| async move {
- rpc.authenticate_and_connect(&cx).await?;
+ let remote_id = action.0;
+ cx.spawn(|this, mut cx| {
+ async move {
+ rpc.authenticate_and_connect(&cx).await?;
+
+ let task = this.update(&mut cx, |this, cx| {
+ for worktree in &this.worktrees {
+ let task = worktree.update(cx, |worktree, cx| {
+ worktree.as_local_mut().and_then(|worktree| {
+ if worktree.remote_id() == Some(remote_id) {
+ Some(worktree.share(cx))
+ } else {
+ None
+ }
+ })
+ });
- let share_task = this.update(&mut cx, |this, cx| {
- let worktree = this.worktrees.iter().next()?;
- worktree.update(cx, |worktree, cx| {
- let worktree = worktree.as_local_mut()?;
- Some(worktree.share(rpc, cx))
- })
- });
+ if task.is_some() {
+ return task;
+ }
+ }
+ None
+ });
- if let Some(share_task) = share_task {
- let (worktree_id, access_token) = share_task.await?;
- let worktree_url = rpc::encode_worktree_url(worktree_id, &access_token);
- log::info!("wrote worktree url to clipboard: {}", worktree_url);
- platform.write_to_clipboard(ClipboardItem::new(worktree_url));
- }
- surf::Result::Ok(())
- });
+ if let Some(share_task) = task {
+ share_task.await?;
+ }
- cx.spawn(|_, _| async move {
- if let Err(e) = task.await {
- log::error!("sharing failed: {:?}", e);
+ Ok(())
}
+ .log_err()
})
.detach();
}
- fn join_worktree(&mut self, _: &JoinWorktree, cx: &mut ViewContext<Self>) {
+ fn unshare_worktree(&mut self, action: &UnshareWorktree, cx: &mut ViewContext<Self>) {
+ let remote_id = action.0;
+ for worktree in &self.worktrees {
+ if worktree.update(cx, |worktree, cx| {
+ if let Some(worktree) = worktree.as_local_mut() {
+ if worktree.remote_id() == Some(remote_id) {
+ worktree.unshare(cx);
+ return true;
+ }
+ }
+ false
+ }) {
+ break;
+ }
+ }
+ }
+
+ fn join_worktree(&mut self, action: &JoinWorktree, cx: &mut ViewContext<Self>) {
let rpc = self.rpc.clone();
let languages = self.languages.clone();
+ let worktree_id = action.0;
+
+ cx.spawn(|this, mut cx| {
+ async move {
+ rpc.authenticate_and_connect(&cx).await?;
+ let worktree =
+ Worktree::open_remote(rpc.clone(), worktree_id, languages, &mut cx).await?;
+ this.update(&mut cx, |this, cx| {
+ cx.observe(&worktree, |_, _, cx| cx.notify()).detach();
+ cx.subscribe(&worktree, move |this, _, event, cx| match event {
+ worktree::Event::Closed => {
+ this.worktrees.retain(|worktree| {
+ worktree.update(cx, |worktree, cx| {
+ if let Some(worktree) = worktree.as_remote_mut() {
+ if worktree.remote_id() == worktree_id {
+ worktree.close_all_buffers(cx);
+ return false;
+ }
+ }
+ true
+ })
+ });
- let task = cx.spawn(|this, mut cx| async move {
- rpc.authenticate_and_connect(&cx).await?;
-
- let worktree_url = cx
- .platform()
- .read_from_clipboard()
- .ok_or_else(|| anyhow!("failed to read url from clipboard"))?;
- let (worktree_id, access_token) = rpc::decode_worktree_url(worktree_url.text())
- .ok_or_else(|| anyhow!("failed to decode worktree url"))?;
- log::info!("read worktree url from clipboard: {}", worktree_url.text());
+ cx.notify();
+ }
+ })
+ .detach();
+ this.worktrees.insert(worktree);
+ cx.notify();
+ });
- let worktree =
- Worktree::open_remote(rpc.clone(), worktree_id, access_token, languages, &mut cx)
- .await?;
- this.update(&mut cx, |workspace, cx| {
- cx.observe(&worktree, |_, _, cx| cx.notify()).detach();
- workspace.worktrees.insert(worktree);
- cx.notify();
- });
+ Ok(())
+ }
+ .log_err()
+ })
+ .detach();
+ }
- surf::Result::Ok(())
- });
+ fn leave_worktree(&mut self, action: &LeaveWorktree, cx: &mut ViewContext<Self>) {
+ let remote_id = action.0;
+ cx.spawn(|this, mut cx| {
+ async move {
+ this.update(&mut cx, |this, cx| {
+ this.worktrees.retain(|worktree| {
+ worktree.update(cx, |worktree, cx| {
+ if let Some(worktree) = worktree.as_remote_mut() {
+ if worktree.remote_id() == remote_id {
+ worktree.close_all_buffers(cx);
+ return false;
+ }
+ }
+ true
+ })
+ })
+ });
- cx.spawn(|_, _| async move {
- if let Err(e) = task.await {
- log::error!("joining failed: {}", e);
+ Ok(())
}
+ .log_err()
})
.detach();
}
@@ -989,6 +1049,7 @@ impl Workspace {
let theme = &self.settings.borrow().theme;
let avatar = if let Some(avatar) = self
.user_store
+ .read(cx)
.current_user()
.and_then(|user| user.avatar.clone())
{
@@ -1466,6 +1527,7 @@ mod tests {
editor.update(&mut cx, |editor, cx| {
assert!(!editor.is_dirty(cx.as_ref()));
assert_eq!(editor.title(cx.as_ref()), "untitled");
+ assert!(editor.language(cx).is_none());
editor.insert(&Insert("hi".into()), cx);
assert!(editor.is_dirty(cx.as_ref()));
});
@@ -1492,7 +1554,9 @@ mod tests {
assert_eq!(editor.title(cx), "the-new-name.rs");
});
// The language is assigned based on the path
- editor.read_with(&cx, |editor, cx| assert!(editor.language(cx).is_some()));
+ editor.read_with(&cx, |editor, cx| {
+ assert_eq!(editor.language(cx).unwrap().name(), "Rust")
+ });
// Edit the file and save it again. This time, there is no filename prompt.
editor.update(&mut cx, |editor, cx| {
@@ -1530,6 +1594,47 @@ mod tests {
})
}
+ #[gpui::test]
+ async fn test_setting_language_when_saving_as_single_file_worktree(
+ mut cx: gpui::TestAppContext,
+ ) {
+ let dir = TempDir::new("test-new-file").unwrap();
+ let app_state = cx.update(test_app_state);
+ let (_, workspace) = cx.add_window(|cx| Workspace::new(&app_state, cx));
+
+ // Create a new untitled buffer
+ let editor = workspace.update(&mut cx, |workspace, cx| {
+ workspace.open_new_file(&OpenNew(app_state.clone()), cx);
+ workspace
+ .active_item(cx)
+ .unwrap()
+ .to_any()
+ .downcast::<Editor>()
+ .unwrap()
+ });
+
+ editor.update(&mut cx, |editor, cx| {
+ assert!(editor.language(cx).is_none());
+ editor.insert(&Insert("hi".into()), cx);
+ assert!(editor.is_dirty(cx.as_ref()));
+ });
+
+ // Save the buffer. This prompts for a filename.
+ workspace.update(&mut cx, |workspace, cx| {
+ workspace.save_active_item(&Save, cx)
+ });
+ cx.simulate_new_path_selection(|_| Some(dir.path().join("the-new-name.rs")));
+
+ editor
+ .condition(&cx, |editor, cx| !editor.is_dirty(cx))
+ .await;
+
+ // The language is assigned based on the path
+ editor.read_with(&cx, |editor, cx| {
+ assert_eq!(editor.language(cx).unwrap().name(), "Rust")
+ });
+ }
+
#[gpui::test]
async fn test_new_empty_workspace(mut cx: gpui::TestAppContext) {
cx.update(init);
@@ -6,8 +6,8 @@ use crate::{
fs::{self, Fs},
fuzzy,
fuzzy::CharBag,
- language::LanguageRegistry,
- rpc::{self, proto},
+ language::{Language, LanguageRegistry},
+ rpc::{self, proto, Status},
time::{self, ReplicaId},
util::{Bias, TryFutureExt},
};
@@ -27,6 +27,7 @@ use postage::{
prelude::{Sink as _, Stream as _},
watch,
};
+use serde::Deserialize;
use smol::channel::{self, Sender};
use std::{
cmp::{self, Ordering},
@@ -61,37 +62,50 @@ pub enum Worktree {
Remote(RemoteWorktree),
}
+pub enum Event {
+ Closed,
+}
+
impl Entity for Worktree {
- type Event = ();
+ type Event = Event;
fn release(&mut self, cx: &mut MutableAppContext) {
- let rpc = match self {
- Self::Local(tree) => tree
- .share
- .as_ref()
- .map(|share| (share.rpc.clone(), share.remote_id)),
- Self::Remote(tree) => Some((tree.rpc.clone(), tree.remote_id)),
- };
-
- if let Some((rpc, worktree_id)) = rpc {
- cx.spawn(|_| async move {
- if let Err(err) = rpc.send(proto::CloseWorktree { worktree_id }).await {
- log::error!("error closing worktree {}: {}", worktree_id, err);
+ match self {
+ Self::Local(tree) => {
+ if let Some(worktree_id) = *tree.remote_id.borrow() {
+ let rpc = tree.rpc.clone();
+ cx.spawn(|_| async move {
+ if let Err(err) = rpc.send(proto::CloseWorktree { worktree_id }).await {
+ log::error!("error closing worktree: {}", err);
+ }
+ })
+ .detach();
}
- })
- .detach();
+ }
+ Self::Remote(tree) => {
+ let rpc = tree.rpc.clone();
+ let worktree_id = tree.remote_id;
+ cx.spawn(|_| async move {
+ if let Err(err) = rpc.send(proto::LeaveWorktree { worktree_id }).await {
+ log::error!("error closing worktree: {}", err);
+ }
+ })
+ .detach();
+ }
}
}
}
impl Worktree {
pub async fn open_local(
+ rpc: Arc<rpc::Client>,
path: impl Into<Arc<Path>>,
- languages: Arc<LanguageRegistry>,
fs: Arc<dyn Fs>,
+ languages: Arc<LanguageRegistry>,
cx: &mut AsyncAppContext,
) -> Result<ModelHandle<Self>> {
- let (tree, scan_states_tx) = LocalWorktree::new(path, languages, fs.clone(), cx).await?;
+ let (tree, scan_states_tx) =
+ LocalWorktree::new(rpc, path, fs.clone(), languages, cx).await?;
tree.update(cx, |tree, cx| {
let tree = tree.as_local_mut().unwrap();
let abs_path = tree.snapshot.abs_path.clone();
@@ -110,33 +124,26 @@ impl Worktree {
pub async fn open_remote(
rpc: Arc<rpc::Client>,
id: u64,
- access_token: String,
languages: Arc<LanguageRegistry>,
cx: &mut AsyncAppContext,
) -> Result<ModelHandle<Self>> {
- let response = rpc
- .request(proto::OpenWorktree {
- worktree_id: id,
- access_token,
- })
- .await?;
-
+ let response = rpc.request(proto::JoinWorktree { worktree_id: id }).await?;
Worktree::remote(response, rpc, languages, cx).await
}
async fn remote(
- open_response: proto::OpenWorktreeResponse,
+ join_response: proto::JoinWorktreeResponse,
rpc: Arc<rpc::Client>,
languages: Arc<LanguageRegistry>,
cx: &mut AsyncAppContext,
) -> Result<ModelHandle<Self>> {
- let worktree = open_response
+ let worktree = join_response
.worktree
.ok_or_else(|| anyhow!("empty worktree"))?;
- let remote_id = open_response.worktree_id;
- let replica_id = open_response.replica_id as ReplicaId;
- let peers = open_response.peers;
+ let remote_id = worktree.id;
+ let replica_id = join_response.replica_id as ReplicaId;
+ let peers = join_response.peers;
let root_char_bag: CharBag = worktree
.root_name
.chars()
@@ -215,11 +222,12 @@ impl Worktree {
}
let _subscriptions = vec![
- rpc.subscribe_from_model(remote_id, cx, Self::handle_add_peer),
- rpc.subscribe_from_model(remote_id, cx, Self::handle_remove_peer),
- rpc.subscribe_from_model(remote_id, cx, Self::handle_update),
- rpc.subscribe_from_model(remote_id, cx, Self::handle_update_buffer),
- rpc.subscribe_from_model(remote_id, cx, Self::handle_buffer_saved),
+ rpc.subscribe_to_entity(remote_id, cx, Self::handle_add_peer),
+ rpc.subscribe_to_entity(remote_id, cx, Self::handle_remove_peer),
+ rpc.subscribe_to_entity(remote_id, cx, Self::handle_update),
+ rpc.subscribe_to_entity(remote_id, cx, Self::handle_update_buffer),
+ rpc.subscribe_to_entity(remote_id, cx, Self::handle_buffer_saved),
+ rpc.subscribe_to_entity(remote_id, cx, Self::handle_unshare),
];
Worktree::Remote(RemoteWorktree {
@@ -268,13 +276,6 @@ impl Worktree {
}
}
- pub fn languages(&self) -> &Arc<LanguageRegistry> {
- match self {
- Worktree::Local(worktree) => &worktree.languages,
- Worktree::Remote(worktree) => &worktree.languages,
- }
- }
-
pub fn snapshot(&self) -> Snapshot {
match self {
Worktree::Local(worktree) => worktree.snapshot(),
@@ -526,6 +527,16 @@ impl Worktree {
Ok(())
}
+ pub fn handle_unshare(
+ &mut self,
+ _: TypedEnvelope<proto::UnshareWorktree>,
+ _: Arc<rpc::Client>,
+ cx: &mut ModelContext<Self>,
+ ) -> Result<()> {
+ cx.emit(Event::Closed);
+ Ok(())
+ }
+
fn poll_snapshot(&mut self, cx: &mut ModelContext<Self>) {
match self {
Self::Local(worktree) => {
@@ -655,24 +666,34 @@ impl Deref for Worktree {
pub struct LocalWorktree {
snapshot: Snapshot,
+ config: WorktreeConfig,
background_snapshot: Arc<Mutex<Snapshot>>,
last_scan_state_rx: watch::Receiver<ScanState>,
_background_scanner_task: Option<Task<()>>,
+ _maintain_remote_id_task: Task<Option<()>>,
poll_task: Option<Task<()>>,
+ remote_id: watch::Receiver<Option<u64>>,
share: Option<ShareState>,
open_buffers: HashMap<usize, WeakModelHandle<Buffer>>,
shared_buffers: HashMap<PeerId, HashMap<u64, ModelHandle<Buffer>>>,
peers: HashMap<PeerId, ReplicaId>,
languages: Arc<LanguageRegistry>,
queued_operations: Vec<(u64, Operation)>,
+ rpc: Arc<rpc::Client>,
fs: Arc<dyn Fs>,
}
+#[derive(Default, Deserialize)]
+struct WorktreeConfig {
+ collaborators: Vec<String>,
+}
+
impl LocalWorktree {
async fn new(
+ rpc: Arc<rpc::Client>,
path: impl Into<Arc<Path>>,
- languages: Arc<LanguageRegistry>,
fs: Arc<dyn Fs>,
+ languages: Arc<LanguageRegistry>,
cx: &mut AsyncAppContext,
) -> Result<(ModelHandle<Worktree>, Sender<ScanState>)> {
let abs_path = path.into();
@@ -687,6 +708,13 @@ impl LocalWorktree {
let root_char_bag = root_name.chars().map(|c| c.to_ascii_lowercase()).collect();
let metadata = fs.metadata(&abs_path).await?;
+ let mut config = WorktreeConfig::default();
+ if let Ok(zed_toml) = fs.load(&abs_path.join(".zed.toml")).await {
+ if let Ok(parsed) = toml::from_str(&zed_toml) {
+ config = parsed;
+ }
+ }
+
let (scan_states_tx, scan_states_rx) = smol::channel::unbounded();
let (mut last_scan_state_tx, last_scan_state_rx) = watch::channel_with(ScanState::Scanning);
let tree = cx.add_model(move |cx: &mut ModelContext<Worktree>| {
@@ -694,7 +722,7 @@ impl LocalWorktree {
id: cx.model_id(),
scan_id: 0,
abs_path,
- root_name,
+ root_name: root_name.clone(),
root_char_bag,
ignores: Default::default(),
entries_by_path: Default::default(),
@@ -711,11 +739,48 @@ impl LocalWorktree {
));
}
+ let (mut remote_id_tx, remote_id_rx) = watch::channel();
+ let _maintain_remote_id_task = cx.spawn_weak({
+ let rpc = rpc.clone();
+ move |this, cx| {
+ async move {
+ let mut status = rpc.status();
+ while let Some(status) = status.recv().await {
+ if let Some(this) = this.upgrade(&cx) {
+ let remote_id = if let Status::Connected { .. } = status {
+ let collaborator_logins = this.read_with(&cx, |this, _| {
+ this.as_local().unwrap().config.collaborators.clone()
+ });
+ let response = rpc
+ .request(proto::OpenWorktree {
+ root_name: root_name.clone(),
+ collaborator_logins,
+ })
+ .await?;
+
+ Some(response.worktree_id)
+ } else {
+ None
+ };
+ if remote_id_tx.send(remote_id).await.is_err() {
+ break;
+ }
+ }
+ }
+ Ok(())
+ }
+ .log_err()
+ }
+ });
+
let tree = Self {
snapshot: snapshot.clone(),
+ config,
+ remote_id: remote_id_rx,
background_snapshot: Arc::new(Mutex::new(snapshot)),
last_scan_state_rx,
_background_scanner_task: None,
+ _maintain_remote_id_task,
share: None,
poll_task: None,
open_buffers: Default::default(),
@@ -723,6 +788,7 @@ impl LocalWorktree {
queued_operations: Default::default(),
peers: Default::default(),
languages,
+ rpc,
fs,
};
@@ -735,13 +801,10 @@ impl LocalWorktree {
let tree = this.as_local_mut().unwrap();
if !tree.is_scanning() {
if let Some(share) = tree.share.as_ref() {
- Some((tree.snapshot(), share.snapshots_tx.clone()))
- } else {
- None
+ return Some((tree.snapshot(), share.snapshots_tx.clone()));
}
- } else {
- None
}
+ None
});
if let Some((snapshot, snapshots_to_send_tx)) = to_send {
@@ -784,7 +847,6 @@ impl LocalWorktree {
}
});
- let languages = self.languages.clone();
let path = Arc::from(path);
cx.spawn(|this, mut cx| async move {
if let Some(existing_buffer) = existing_buffer {
@@ -793,8 +855,8 @@ impl LocalWorktree {
let (file, contents) = this
.update(&mut cx, |this, cx| this.as_local().unwrap().load(&path, cx))
.await?;
- let language = languages.select_language(&path).cloned();
let buffer = cx.add_model(|cx| {
+ let language = file.select_language(cx);
Buffer::from_history(0, History::new(contents.into()), Some(file), language, cx)
});
this.update(&mut cx, |this, _| {
@@ -896,6 +958,22 @@ impl LocalWorktree {
}
}
+ pub fn remote_id(&self) -> Option<u64> {
+ *self.remote_id.borrow()
+ }
+
+ pub fn next_remote_id(&self) -> impl Future<Output = Option<u64>> {
+ let mut remote_id = self.remote_id.clone();
+ async move {
+ while let Some(remote_id) = remote_id.recv().await {
+ if remote_id.is_some() {
+ return remote_id;
+ }
+ }
+ None
+ }
+ }
+
fn is_scanning(&self) -> bool {
if let ScanState::Scanning = *self.last_scan_state_rx.borrow() {
true
@@ -981,17 +1059,19 @@ impl LocalWorktree {
})
}
- pub fn share(
- &mut self,
- rpc: Arc<rpc::Client>,
- cx: &mut ModelContext<Worktree>,
- ) -> Task<anyhow::Result<(u64, String)>> {
+ pub fn share(&mut self, cx: &mut ModelContext<Worktree>) -> Task<anyhow::Result<u64>> {
let snapshot = self.snapshot();
let share_request = self.share_request(cx);
+ let rpc = self.rpc.clone();
cx.spawn(|this, mut cx| async move {
- let share_request = share_request.await;
+ let share_request = if let Some(request) = share_request.await {
+ request
+ } else {
+ return Err(anyhow!("failed to open worktree on the server"));
+ };
+
+ let remote_id = share_request.worktree.as_ref().unwrap().id;
let share_response = rpc.request(share_request).await?;
- let remote_id = share_response.worktree_id;
log::info!("sharing worktree {:?}", share_response);
let (snapshots_to_send_tx, snapshots_to_send_rx) =
@@ -1015,39 +1095,61 @@ impl LocalWorktree {
this.update(&mut cx, |worktree, cx| {
let _subscriptions = vec![
- rpc.subscribe_from_model(remote_id, cx, Worktree::handle_add_peer),
- rpc.subscribe_from_model(remote_id, cx, Worktree::handle_remove_peer),
- rpc.subscribe_from_model(remote_id, cx, Worktree::handle_open_buffer),
- rpc.subscribe_from_model(remote_id, cx, Worktree::handle_close_buffer),
- rpc.subscribe_from_model(remote_id, cx, Worktree::handle_update_buffer),
- rpc.subscribe_from_model(remote_id, cx, Worktree::handle_save_buffer),
+ rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_add_peer),
+ rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_remove_peer),
+ rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_open_buffer),
+ rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_close_buffer),
+ rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_update_buffer),
+ rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_save_buffer),
];
let worktree = worktree.as_local_mut().unwrap();
worktree.share = Some(ShareState {
- rpc,
- remote_id: share_response.worktree_id,
snapshots_tx: snapshots_to_send_tx,
_subscriptions,
});
});
- Ok((remote_id, share_response.access_token))
+ Ok(remote_id)
})
}
- fn share_request(&self, cx: &mut ModelContext<Worktree>) -> Task<proto::ShareWorktree> {
+ pub fn unshare(&mut self, cx: &mut ModelContext<Worktree>) {
+ self.share.take();
+ let rpc = self.rpc.clone();
+ let remote_id = self.remote_id();
+ cx.foreground()
+ .spawn(
+ async move {
+ if let Some(worktree_id) = remote_id {
+ rpc.send(proto::UnshareWorktree { worktree_id }).await?;
+ }
+ Ok(())
+ }
+ .log_err(),
+ )
+ .detach()
+ }
+
+ fn share_request(&self, cx: &mut ModelContext<Worktree>) -> Task<Option<proto::ShareWorktree>> {
+ let remote_id = self.next_remote_id();
let snapshot = self.snapshot();
let root_name = self.root_name.clone();
cx.background().spawn(async move {
- let entries = snapshot
- .entries_by_path
- .cursor::<(), ()>()
- .map(Into::into)
- .collect();
- proto::ShareWorktree {
- worktree: Some(proto::Worktree { root_name, entries }),
- }
+ remote_id.await.map(|id| {
+ let entries = snapshot
+ .entries_by_path
+ .cursor::<(), ()>()
+ .map(Into::into)
+ .collect();
+ proto::ShareWorktree {
+ worktree: Some(proto::Worktree {
+ id,
+ root_name,
+ entries,
+ }),
+ }
+ })
})
}
}
@@ -1085,8 +1187,6 @@ impl fmt::Debug for LocalWorktree {
}
struct ShareState {
- rpc: Arc<rpc::Client>,
- remote_id: u64,
snapshots_tx: Sender<Snapshot>,
_subscriptions: Vec<rpc::Subscription>,
}
@@ -1111,12 +1211,11 @@ impl RemoteWorktree {
path: &Path,
cx: &mut ModelContext<Worktree>,
) -> Task<Result<ModelHandle<Buffer>>> {
- let handle = cx.handle();
let mut existing_buffer = None;
self.open_buffers.retain(|_buffer_id, buffer| {
if let Some(buffer) = buffer.upgrade(cx.as_ref()) {
if let Some(file) = buffer.read(cx.as_ref()).file() {
- if file.worktree_id() == handle.id() && file.path.as_ref() == path {
+ if file.worktree_id() == cx.model_id() && file.path.as_ref() == path {
existing_buffer = Some(buffer);
}
}
@@ -1127,25 +1226,30 @@ impl RemoteWorktree {
});
let rpc = self.rpc.clone();
- let languages = self.languages.clone();
let replica_id = self.replica_id;
let remote_worktree_id = self.remote_id;
let path = path.to_string_lossy().to_string();
- cx.spawn(|this, mut cx| async move {
+ cx.spawn_weak(|this, mut cx| async move {
if let Some(existing_buffer) = existing_buffer {
Ok(existing_buffer)
} else {
let entry = this
+ .upgrade(&cx)
+ .ok_or_else(|| anyhow!("worktree was closed"))?
.read_with(&cx, |tree, _| tree.entry_for_path(&path).cloned())
.ok_or_else(|| anyhow!("file does not exist"))?;
- let file = File::new(entry.id, handle, entry.path, entry.mtime);
- let language = languages.select_language(&path).cloned();
let response = rpc
.request(proto::OpenBuffer {
worktree_id: remote_worktree_id as u64,
path,
})
.await?;
+
+ let this = this
+ .upgrade(&cx)
+ .ok_or_else(|| anyhow!("worktree was closed"))?;
+ let file = File::new(entry.id, this.clone(), entry.path, entry.mtime);
+ let language = cx.read(|cx| file.select_language(cx));
let remote_buffer = response.buffer.ok_or_else(|| anyhow!("empty buffer"))?;
let buffer_id = remote_buffer.id as usize;
let buffer = cx.add_model(|cx| {
@@ -1166,6 +1270,20 @@ impl RemoteWorktree {
})
}
+ pub fn remote_id(&self) -> u64 {
+ self.remote_id
+ }
+
+ pub fn close_all_buffers(&mut self, cx: &mut MutableAppContext) {
+ for (_, buffer) in self.open_buffers.drain() {
+ if let RemoteBuffer::Loaded(buffer) = buffer {
+ if let Some(buffer) = buffer.upgrade(cx) {
+ buffer.update(cx, |buffer, cx| buffer.close(cx))
+ }
+ }
+ }
+ }
+
fn snapshot(&self) -> Snapshot {
self.snapshot.clone()
}
@@ -1556,9 +1674,9 @@ impl File {
self.worktree.update(cx, |worktree, cx| {
if let Some((rpc, remote_id)) = match worktree {
Worktree::Local(worktree) => worktree
- .share
- .as_ref()
- .map(|share| (share.rpc.clone(), share.remote_id)),
+ .remote_id
+ .borrow()
+ .map(|id| (worktree.rpc.clone(), id)),
Worktree::Remote(worktree) => Some((worktree.rpc.clone(), worktree.remote_id)),
} {
cx.spawn(|worktree, mut cx| async move {
@@ -1616,6 +1734,18 @@ impl File {
self.worktree.read(cx).abs_path.join(&self.path)
}
+ pub fn select_language(&self, cx: &AppContext) -> Option<Arc<Language>> {
+ let worktree = self.worktree.read(cx);
+ let mut full_path = PathBuf::new();
+ full_path.push(worktree.root_name());
+ full_path.push(&self.path);
+ let languages = match self.worktree.read(cx) {
+ Worktree::Local(worktree) => &worktree.languages,
+ Worktree::Remote(worktree) => &worktree.languages,
+ };
+ languages.select_language(&full_path).cloned()
+ }
+
/// Returns the last component of this handle's absolute path. If this handle refers to the root
/// of its worktree, then this method will return the name of the worktree itself.
pub fn file_name<'a>(&'a self, cx: &'a AppContext) -> Option<OsString> {
@@ -1642,14 +1772,12 @@ impl File {
) -> Task<Result<(time::Global, SystemTime)>> {
self.worktree.update(cx, |worktree, cx| match worktree {
Worktree::Local(worktree) => {
- let rpc = worktree
- .share
- .as_ref()
- .map(|share| (share.rpc.clone(), share.remote_id));
+ let rpc = worktree.rpc.clone();
+ let worktree_id = *worktree.remote_id.borrow();
let save = worktree.save(self.path.clone(), text, cx);
cx.background().spawn(async move {
let entry = save.await?;
- if let Some((rpc, worktree_id)) = rpc {
+ if let Some(worktree_id) = worktree_id {
rpc.send(proto::BufferSaved {
worktree_id,
buffer_id,
@@ -2534,6 +2662,7 @@ impl<'a> TryFrom<(&'a CharBag, proto::Entry)> for Entry {
#[cfg(test)]
mod tests {
use super::*;
+ use crate::fs::FakeFs;
use crate::test::*;
use anyhow::Result;
use fs::RealFs;
@@ -2568,9 +2697,10 @@ mod tests {
.unwrap();
let tree = Worktree::open_local(
+ rpc::Client::new(),
root_link_path,
- Default::default(),
Arc::new(RealFs),
+ Default::default(),
&mut cx.to_async(),
)
.await
@@ -2624,9 +2754,10 @@ mod tests {
}
}));
let tree = Worktree::open_local(
+ rpc::Client::new(),
dir.path(),
- Default::default(),
Arc::new(RealFs),
+ Default::default(),
&mut cx.to_async(),
)
.await
@@ -2668,9 +2799,10 @@ mod tests {
"file1": "the old contents",
}));
let tree = Worktree::open_local(
+ rpc::Client::new(),
dir.path(),
- Arc::new(LanguageRegistry::new()),
Arc::new(RealFs),
+ Default::default(),
&mut cx.to_async(),
)
.await
@@ -2697,9 +2829,10 @@ mod tests {
let file_path = dir.path().join("file1");
let tree = Worktree::open_local(
+ rpc::Client::new(),
file_path.clone(),
- Arc::new(LanguageRegistry::new()),
Arc::new(RealFs),
+ Default::default(),
&mut cx.to_async(),
)
.await
@@ -2738,10 +2871,14 @@ mod tests {
}
}));
+ let user_id = 5;
+ let mut client = rpc::Client::new();
+ let server = FakeServer::for_client(user_id, &mut client, &cx).await;
let tree = Worktree::open_local(
+ client,
dir.path(),
- Default::default(),
Arc::new(RealFs),
+ Default::default(),
&mut cx.to_async(),
)
.await
@@ -2775,15 +2912,20 @@ mod tests {
// Create a remote copy of this worktree.
let initial_snapshot = tree.read_with(&cx, |tree, _| tree.snapshot());
let worktree_id = 1;
- let share_request = tree
- .update(&mut cx, |tree, cx| {
- tree.as_local().unwrap().share_request(cx)
- })
+ let share_request = tree.update(&mut cx, |tree, cx| {
+ tree.as_local().unwrap().share_request(cx)
+ });
+ let open_worktree = server.receive::<proto::OpenWorktree>().await.unwrap();
+ server
+ .respond(
+ open_worktree.receipt(),
+ proto::OpenWorktreeResponse { worktree_id: 1 },
+ )
.await;
+
let remote = Worktree::remote(
- proto::OpenWorktreeResponse {
- worktree_id,
- worktree: share_request.worktree,
+ proto::JoinWorktreeResponse {
+ worktree: share_request.await.unwrap().worktree,
replica_id: 1,
peers: Vec::new(),
},
@@ -2893,9 +3035,10 @@ mod tests {
}));
let tree = Worktree::open_local(
+ rpc::Client::new(),
dir.path(),
- Default::default(),
Arc::new(RealFs),
+ Default::default(),
&mut cx.to_async(),
)
.await
@@ -2925,6 +3068,65 @@ mod tests {
});
}
+ #[gpui::test]
+ async fn test_open_and_share_worktree(mut cx: gpui::TestAppContext) {
+ let user_id = 100;
+ let mut client = rpc::Client::new();
+ let server = FakeServer::for_client(user_id, &mut client, &cx).await;
+
+ let fs = Arc::new(FakeFs::new());
+ fs.insert_tree(
+ "/path",
+ json!({
+ "to": {
+ "the-dir": {
+ ".zed.toml": r#"collaborators = ["friend-1", "friend-2"]"#,
+ "a.txt": "a-contents",
+ },
+ },
+ }),
+ )
+ .await;
+
+ let worktree = Worktree::open_local(
+ client.clone(),
+ "/path/to/the-dir".as_ref(),
+ fs,
+ Default::default(),
+ &mut cx.to_async(),
+ )
+ .await
+ .unwrap();
+
+ {
+ let cx = cx.to_async();
+ client.authenticate_and_connect(&cx).await.unwrap();
+ }
+
+ let open_worktree = server.receive::<proto::OpenWorktree>().await.unwrap();
+ assert_eq!(
+ open_worktree.payload,
+ proto::OpenWorktree {
+ root_name: "the-dir".to_string(),
+ collaborator_logins: vec!["friend-1".to_string(), "friend-2".to_string()],
+ }
+ );
+
+ server
+ .respond(
+ open_worktree.receipt(),
+ proto::OpenWorktreeResponse { worktree_id: 5 },
+ )
+ .await;
+ let remote_id = worktree
+ .update(&mut cx, |tree, _| tree.as_local().unwrap().next_remote_id())
+ .await;
+ assert_eq!(remote_id, Some(5));
+
+ cx.update(move |_| drop(worktree));
+ server.receive::<proto::CloseWorktree>().await.unwrap();
+ }
+
#[gpui::test(iterations = 100)]
fn test_random(mut rng: StdRng) {
let operations = env::var("OPERATIONS")
@@ -11,8 +11,8 @@ message Envelope {
Ping ping = 6;
ShareWorktree share_worktree = 7;
ShareWorktreeResponse share_worktree_response = 8;
- OpenWorktree open_worktree = 9;
- OpenWorktreeResponse open_worktree_response = 10;
+ JoinWorktree join_worktree = 9;
+ JoinWorktreeResponse join_worktree_response = 10;
UpdateWorktree update_worktree = 11;
CloseWorktree close_worktree = 12;
OpenBuffer open_buffer = 13;
@@ -35,6 +35,11 @@ message Envelope {
ChannelMessageSent channel_message_sent = 30;
GetChannelMessages get_channel_messages = 31;
GetChannelMessagesResponse get_channel_messages_response = 32;
+ OpenWorktree open_worktree = 33;
+ OpenWorktreeResponse open_worktree_response = 34;
+ UnshareWorktree unshare_worktree = 35;
+ UpdateCollaborators update_collaborators = 36;
+ LeaveWorktree leave_worktree = 37;
}
}
@@ -48,22 +53,34 @@ message Error {
string message = 1;
}
+message OpenWorktree {
+ string root_name = 1;
+ repeated string collaborator_logins = 2;
+}
+
+message OpenWorktreeResponse {
+ uint64 worktree_id = 1;
+}
+
message ShareWorktree {
Worktree worktree = 1;
}
-message ShareWorktreeResponse {
+message ShareWorktreeResponse {}
+
+message UnshareWorktree {
uint64 worktree_id = 1;
- string access_token = 2;
}
-message OpenWorktree {
+message JoinWorktree {
uint64 worktree_id = 1;
- string access_token = 2;
}
-message OpenWorktreeResponse {
+message LeaveWorktree {
uint64 worktree_id = 1;
+}
+
+message JoinWorktreeResponse {
Worktree worktree = 2;
uint32 replica_id = 3;
repeated Peer peers = 4;
@@ -173,6 +190,10 @@ message GetChannelMessagesResponse {
bool done = 2;
}
+message UpdateCollaborators {
+ repeated Collaborator collaborators = 1;
+}
+
// Entities
message Peer {
@@ -187,8 +208,9 @@ message User {
}
message Worktree {
- string root_name = 1;
- repeated Entry entries = 2;
+ uint64 id = 1;
+ string root_name = 2;
+ repeated Entry entries = 3;
}
message Entry {
@@ -314,3 +336,15 @@ message ChannelMessage {
uint64 sender_id = 4;
Nonce nonce = 5;
}
+
+message Collaborator {
+ uint64 user_id = 1;
+ repeated WorktreeMetadata worktrees = 2;
+}
+
+message WorktreeMetadata {
+ uint64 id = 1;
+ string root_name = 2;
+ bool is_shared = 3;
+ repeated uint64 guests = 4;
+}
@@ -131,11 +131,15 @@ messages!(
GetChannelMessagesResponse,
GetChannels,
GetChannelsResponse,
+ UpdateCollaborators,
GetUsers,
GetUsersResponse,
JoinChannel,
JoinChannelResponse,
+ JoinWorktree,
+ JoinWorktreeResponse,
LeaveChannel,
+ LeaveWorktree,
OpenBuffer,
OpenBufferResponse,
OpenWorktree,
@@ -147,6 +151,7 @@ messages!(
SendChannelMessageResponse,
ShareWorktree,
ShareWorktreeResponse,
+ UnshareWorktree,
UpdateBuffer,
UpdateWorktree,
);
@@ -156,11 +161,13 @@ request_messages!(
(GetUsers, GetUsersResponse),
(JoinChannel, JoinChannelResponse),
(OpenBuffer, OpenBufferResponse),
+ (JoinWorktree, JoinWorktreeResponse),
(OpenWorktree, OpenWorktreeResponse),
(Ping, Ack),
(SaveBuffer, BufferSaved),
(UpdateBuffer, Ack),
(ShareWorktree, ShareWorktreeResponse),
+ (UnshareWorktree, Ack),
(SendChannelMessage, SendChannelMessageResponse),
(GetChannelMessages, GetChannelMessagesResponse),
);
@@ -172,9 +179,10 @@ entity_messages!(
CloseBuffer,
CloseWorktree,
OpenBuffer,
- OpenWorktree,
+ JoinWorktree,
RemovePeer,
SaveBuffer,
+ UnshareWorktree,
UpdateBuffer,
UpdateWorktree,
);