1mod store;
2
3use super::{
4 auth::process_auth_header,
5 db::{ChannelId, MessageId, UserId},
6 AppState,
7};
8use anyhow::anyhow;
9use async_std::task;
10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
11use futures::{future::BoxFuture, FutureExt};
12use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
13use postage::{mpsc, prelude::Sink as _, prelude::Stream as _};
14use rpc::{
15 proto::{self, AnyTypedEnvelope, EnvelopedMessage},
16 Connection, ConnectionId, Peer, TypedEnvelope,
17};
18use sha1::{Digest as _, Sha1};
19use std::{
20 any::TypeId,
21 collections::{HashMap, HashSet},
22 future::Future,
23 mem,
24 sync::Arc,
25 time::Instant,
26};
27use store::{Store, Worktree};
28use surf::StatusCode;
29use tide::log;
30use tide::{
31 http::headers::{HeaderName, CONNECTION, UPGRADE},
32 Request, Response,
33};
34use time::OffsetDateTime;
35
36type MessageHandler = Box<
37 dyn Send
38 + Sync
39 + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
40>;
41
42pub struct Server {
43 peer: Arc<Peer>,
44 store: RwLock<Store>,
45 app_state: Arc<AppState>,
46 handlers: HashMap<TypeId, MessageHandler>,
47 notifications: Option<mpsc::Sender<()>>,
48}
49
50const MESSAGE_COUNT_PER_PAGE: usize = 100;
51const MAX_MESSAGE_LEN: usize = 1024;
52
53impl Server {
54 pub fn new(
55 app_state: Arc<AppState>,
56 peer: Arc<Peer>,
57 notifications: Option<mpsc::Sender<()>>,
58 ) -> Arc<Self> {
59 let mut server = Self {
60 peer,
61 app_state,
62 store: Default::default(),
63 handlers: Default::default(),
64 notifications,
65 };
66
67 server
68 .add_handler(Server::ping)
69 .add_handler(Server::open_worktree)
70 .add_handler(Server::close_worktree)
71 .add_handler(Server::share_worktree)
72 .add_handler(Server::unshare_worktree)
73 .add_handler(Server::join_worktree)
74 .add_handler(Server::leave_worktree)
75 .add_handler(Server::update_worktree)
76 .add_handler(Server::open_buffer)
77 .add_handler(Server::close_buffer)
78 .add_handler(Server::update_buffer)
79 .add_handler(Server::buffer_saved)
80 .add_handler(Server::save_buffer)
81 .add_handler(Server::get_channels)
82 .add_handler(Server::get_users)
83 .add_handler(Server::join_channel)
84 .add_handler(Server::leave_channel)
85 .add_handler(Server::send_channel_message)
86 .add_handler(Server::get_channel_messages);
87
88 Arc::new(server)
89 }
90
91 fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
92 where
93 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
94 Fut: 'static + Send + Future<Output = tide::Result<()>>,
95 M: EnvelopedMessage,
96 {
97 let prev_handler = self.handlers.insert(
98 TypeId::of::<M>(),
99 Box::new(move |server, envelope| {
100 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
101 (handler)(server, *envelope).boxed()
102 }),
103 );
104 if prev_handler.is_some() {
105 panic!("registered a handler for the same message twice");
106 }
107 self
108 }
109
110 pub fn handle_connection(
111 self: &Arc<Self>,
112 connection: Connection,
113 addr: String,
114 user_id: UserId,
115 mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
116 ) -> impl Future<Output = ()> {
117 let mut this = self.clone();
118 async move {
119 let (connection_id, handle_io, mut incoming_rx) =
120 this.peer.add_connection(connection).await;
121
122 if let Some(send_connection_id) = send_connection_id.as_mut() {
123 let _ = send_connection_id.send(connection_id).await;
124 }
125
126 this.state_mut().add_connection(connection_id, user_id);
127 if let Err(err) = this.update_contacts_for_users(&[user_id]).await {
128 log::error!("error updating contacts for {:?}: {}", user_id, err);
129 }
130
131 let handle_io = handle_io.fuse();
132 futures::pin_mut!(handle_io);
133 loop {
134 let next_message = incoming_rx.recv().fuse();
135 futures::pin_mut!(next_message);
136 futures::select_biased! {
137 message = next_message => {
138 if let Some(message) = message {
139 let start_time = Instant::now();
140 log::info!("RPC message received: {}", message.payload_type_name());
141 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
142 if let Err(err) = (handler)(this.clone(), message).await {
143 log::error!("error handling message: {:?}", err);
144 } else {
145 log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
146 }
147
148 if let Some(mut notifications) = this.notifications.clone() {
149 let _ = notifications.send(()).await;
150 }
151 } else {
152 log::warn!("unhandled message: {}", message.payload_type_name());
153 }
154 } else {
155 log::info!("rpc connection closed {:?}", addr);
156 break;
157 }
158 }
159 handle_io = handle_io => {
160 if let Err(err) = handle_io {
161 log::error!("error handling rpc connection {:?} - {:?}", addr, err);
162 }
163 break;
164 }
165 }
166 }
167
168 if let Err(err) = this.sign_out(connection_id).await {
169 log::error!("error signing out connection {:?} - {:?}", addr, err);
170 }
171 }
172 }
173
174 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
175 self.peer.disconnect(connection_id).await;
176 let removed_connection = self.state_mut().remove_connection(connection_id)?;
177
178 for (worktree_id, worktree) in removed_connection.hosted_worktrees {
179 if let Some(share) = worktree.share {
180 broadcast(
181 connection_id,
182 share.guests.keys().copied().collect(),
183 |conn_id| {
184 self.peer
185 .send(conn_id, proto::UnshareWorktree { worktree_id })
186 },
187 )
188 .await?;
189 }
190 }
191
192 for (worktree_id, peer_ids) in removed_connection.guest_worktree_ids {
193 broadcast(connection_id, peer_ids, |conn_id| {
194 self.peer.send(
195 conn_id,
196 proto::RemoveCollaborator {
197 worktree_id,
198 peer_id: connection_id.0,
199 },
200 )
201 })
202 .await?;
203 }
204
205 self.update_contacts_for_users(removed_connection.contact_ids.iter())
206 .await?;
207
208 Ok(())
209 }
210
211 async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
212 self.peer.respond(request.receipt(), proto::Ack {}).await?;
213 Ok(())
214 }
215
216 async fn open_worktree(
217 mut self: Arc<Server>,
218 request: TypedEnvelope<proto::OpenWorktree>,
219 ) -> tide::Result<()> {
220 let receipt = request.receipt();
221 let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
222
223 let mut contact_user_ids = HashSet::new();
224 contact_user_ids.insert(host_user_id);
225 for github_login in request.payload.authorized_logins {
226 match self.app_state.db.create_user(&github_login, false).await {
227 Ok(contact_user_id) => {
228 contact_user_ids.insert(contact_user_id);
229 }
230 Err(err) => {
231 let message = err.to_string();
232 self.peer
233 .respond_with_error(receipt, proto::Error { message })
234 .await?;
235 return Ok(());
236 }
237 }
238 }
239
240 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
241 let worktree_id = self.state_mut().add_worktree(Worktree {
242 host_connection_id: request.sender_id,
243 host_user_id,
244 authorized_user_ids: contact_user_ids.clone(),
245 root_name: request.payload.root_name,
246 share: None,
247 });
248
249 self.peer
250 .respond(receipt, proto::OpenWorktreeResponse { worktree_id })
251 .await?;
252 self.update_contacts_for_users(&contact_user_ids).await?;
253
254 Ok(())
255 }
256
257 async fn close_worktree(
258 mut self: Arc<Server>,
259 request: TypedEnvelope<proto::CloseWorktree>,
260 ) -> tide::Result<()> {
261 let worktree_id = request.payload.worktree_id;
262 let worktree = self
263 .state_mut()
264 .remove_worktree(worktree_id, request.sender_id)?;
265
266 if let Some(share) = worktree.share {
267 broadcast(
268 request.sender_id,
269 share.guests.keys().copied().collect(),
270 |conn_id| {
271 self.peer
272 .send(conn_id, proto::UnshareWorktree { worktree_id })
273 },
274 )
275 .await?;
276 }
277 self.update_contacts_for_users(&worktree.authorized_user_ids)
278 .await?;
279 Ok(())
280 }
281
282 async fn share_worktree(
283 mut self: Arc<Server>,
284 mut request: TypedEnvelope<proto::ShareWorktree>,
285 ) -> tide::Result<()> {
286 let worktree = request
287 .payload
288 .worktree
289 .as_mut()
290 .ok_or_else(|| anyhow!("missing worktree"))?;
291 let entries = mem::take(&mut worktree.entries)
292 .into_iter()
293 .map(|entry| (entry.id, entry))
294 .collect();
295
296 let contact_user_ids =
297 self.state_mut()
298 .share_worktree(worktree.id, request.sender_id, entries);
299 if let Some(contact_user_ids) = contact_user_ids {
300 self.peer
301 .respond(request.receipt(), proto::ShareWorktreeResponse {})
302 .await?;
303 self.update_contacts_for_users(&contact_user_ids).await?;
304 } else {
305 self.peer
306 .respond_with_error(
307 request.receipt(),
308 proto::Error {
309 message: "no such worktree".to_string(),
310 },
311 )
312 .await?;
313 }
314 Ok(())
315 }
316
317 async fn unshare_worktree(
318 mut self: Arc<Server>,
319 request: TypedEnvelope<proto::UnshareWorktree>,
320 ) -> tide::Result<()> {
321 let worktree_id = request.payload.worktree_id;
322 let worktree = self
323 .state_mut()
324 .unshare_worktree(worktree_id, request.sender_id)?;
325
326 broadcast(request.sender_id, worktree.connection_ids, |conn_id| {
327 self.peer
328 .send(conn_id, proto::UnshareWorktree { worktree_id })
329 })
330 .await?;
331 self.update_contacts_for_users(&worktree.authorized_user_ids)
332 .await?;
333
334 Ok(())
335 }
336
337 async fn join_worktree(
338 mut self: Arc<Server>,
339 request: TypedEnvelope<proto::JoinWorktree>,
340 ) -> tide::Result<()> {
341 let worktree_id = request.payload.worktree_id;
342
343 let user_id = self.state().user_id_for_connection(request.sender_id)?;
344 let response_data = self
345 .state_mut()
346 .join_worktree(request.sender_id, user_id, worktree_id)
347 .and_then(|joined| {
348 let share = joined.worktree.share()?;
349 let peer_count = share.guests.len();
350 let mut collaborators = Vec::with_capacity(peer_count);
351 collaborators.push(proto::Collaborator {
352 peer_id: joined.worktree.host_connection_id.0,
353 replica_id: 0,
354 user_id: joined.worktree.host_user_id.to_proto(),
355 });
356 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
357 if *peer_conn_id != request.sender_id {
358 collaborators.push(proto::Collaborator {
359 peer_id: peer_conn_id.0,
360 replica_id: *peer_replica_id as u32,
361 user_id: peer_user_id.to_proto(),
362 });
363 }
364 }
365 let response = proto::JoinWorktreeResponse {
366 worktree: Some(proto::Worktree {
367 id: worktree_id,
368 root_name: joined.worktree.root_name.clone(),
369 entries: share.entries.values().cloned().collect(),
370 }),
371 replica_id: joined.replica_id as u32,
372 collaborators,
373 };
374 let connection_ids = joined.worktree.connection_ids();
375 let contact_user_ids = joined.worktree.authorized_user_ids.clone();
376 Ok((response, connection_ids, contact_user_ids))
377 });
378
379 match response_data {
380 Ok((response, connection_ids, contact_user_ids)) => {
381 broadcast(request.sender_id, connection_ids, |conn_id| {
382 self.peer.send(
383 conn_id,
384 proto::AddCollaborator {
385 worktree_id,
386 collaborator: Some(proto::Collaborator {
387 peer_id: request.sender_id.0,
388 replica_id: response.replica_id,
389 user_id: user_id.to_proto(),
390 }),
391 },
392 )
393 })
394 .await?;
395 self.peer.respond(request.receipt(), response).await?;
396 self.update_contacts_for_users(&contact_user_ids).await?;
397 }
398 Err(error) => {
399 self.peer
400 .respond_with_error(
401 request.receipt(),
402 proto::Error {
403 message: error.to_string(),
404 },
405 )
406 .await?;
407 }
408 }
409
410 Ok(())
411 }
412
413 async fn leave_worktree(
414 mut self: Arc<Server>,
415 request: TypedEnvelope<proto::LeaveWorktree>,
416 ) -> tide::Result<()> {
417 let sender_id = request.sender_id;
418 let worktree_id = request.payload.worktree_id;
419 let worktree = self.state_mut().leave_worktree(sender_id, worktree_id);
420 if let Some(worktree) = worktree {
421 broadcast(sender_id, worktree.connection_ids, |conn_id| {
422 self.peer.send(
423 conn_id,
424 proto::RemoveCollaborator {
425 worktree_id,
426 peer_id: sender_id.0,
427 },
428 )
429 })
430 .await?;
431 self.update_contacts_for_users(&worktree.authorized_user_ids)
432 .await?;
433 }
434 Ok(())
435 }
436
437 async fn update_worktree(
438 mut self: Arc<Server>,
439 request: TypedEnvelope<proto::UpdateWorktree>,
440 ) -> tide::Result<()> {
441 let connection_ids = self.state_mut().update_worktree(
442 request.sender_id,
443 request.payload.worktree_id,
444 &request.payload.removed_entries,
445 &request.payload.updated_entries,
446 )?;
447
448 broadcast(request.sender_id, connection_ids, |connection_id| {
449 self.peer
450 .forward_send(request.sender_id, connection_id, request.payload.clone())
451 })
452 .await?;
453
454 Ok(())
455 }
456
457 async fn open_buffer(
458 self: Arc<Server>,
459 request: TypedEnvelope<proto::OpenBuffer>,
460 ) -> tide::Result<()> {
461 let receipt = request.receipt();
462 let host_connection_id = self
463 .state()
464 .worktree_host_connection_id(request.sender_id, request.payload.worktree_id)?;
465 let response = self
466 .peer
467 .forward_request(request.sender_id, host_connection_id, request.payload)
468 .await?;
469 self.peer.respond(receipt, response).await?;
470 Ok(())
471 }
472
473 async fn close_buffer(
474 self: Arc<Server>,
475 request: TypedEnvelope<proto::CloseBuffer>,
476 ) -> tide::Result<()> {
477 let host_connection_id = self
478 .state()
479 .worktree_host_connection_id(request.sender_id, request.payload.worktree_id)?;
480 self.peer
481 .forward_send(request.sender_id, host_connection_id, request.payload)
482 .await?;
483 Ok(())
484 }
485
486 async fn save_buffer(
487 self: Arc<Server>,
488 request: TypedEnvelope<proto::SaveBuffer>,
489 ) -> tide::Result<()> {
490 let host;
491 let guests;
492 {
493 let state = self.state();
494 host = state
495 .worktree_host_connection_id(request.sender_id, request.payload.worktree_id)?;
496 guests = state
497 .worktree_guest_connection_ids(request.sender_id, request.payload.worktree_id)?;
498 }
499
500 let sender = request.sender_id;
501 let receipt = request.receipt();
502 let response = self
503 .peer
504 .forward_request(sender, host, request.payload.clone())
505 .await?;
506
507 broadcast(host, guests, |conn_id| {
508 let response = response.clone();
509 let peer = &self.peer;
510 async move {
511 if conn_id == sender {
512 peer.respond(receipt, response).await
513 } else {
514 peer.forward_send(host, conn_id, response).await
515 }
516 }
517 })
518 .await?;
519
520 Ok(())
521 }
522
523 async fn update_buffer(
524 self: Arc<Server>,
525 request: TypedEnvelope<proto::UpdateBuffer>,
526 ) -> tide::Result<()> {
527 let receiver_ids = self
528 .state()
529 .worktree_connection_ids(request.sender_id, request.payload.worktree_id)?;
530 broadcast(request.sender_id, receiver_ids, |connection_id| {
531 self.peer
532 .forward_send(request.sender_id, connection_id, request.payload.clone())
533 })
534 .await?;
535 self.peer.respond(request.receipt(), proto::Ack {}).await?;
536 Ok(())
537 }
538
539 async fn buffer_saved(
540 self: Arc<Server>,
541 request: TypedEnvelope<proto::BufferSaved>,
542 ) -> tide::Result<()> {
543 let receiver_ids = self
544 .state()
545 .worktree_connection_ids(request.sender_id, request.payload.worktree_id)?;
546 broadcast(request.sender_id, receiver_ids, |connection_id| {
547 self.peer
548 .forward_send(request.sender_id, connection_id, request.payload.clone())
549 })
550 .await?;
551 Ok(())
552 }
553
554 async fn get_channels(
555 self: Arc<Server>,
556 request: TypedEnvelope<proto::GetChannels>,
557 ) -> tide::Result<()> {
558 let user_id = self.state().user_id_for_connection(request.sender_id)?;
559 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
560 self.peer
561 .respond(
562 request.receipt(),
563 proto::GetChannelsResponse {
564 channels: channels
565 .into_iter()
566 .map(|chan| proto::Channel {
567 id: chan.id.to_proto(),
568 name: chan.name,
569 })
570 .collect(),
571 },
572 )
573 .await?;
574 Ok(())
575 }
576
577 async fn get_users(
578 self: Arc<Server>,
579 request: TypedEnvelope<proto::GetUsers>,
580 ) -> tide::Result<()> {
581 let receipt = request.receipt();
582 let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
583 let users = self
584 .app_state
585 .db
586 .get_users_by_ids(user_ids)
587 .await?
588 .into_iter()
589 .map(|user| proto::User {
590 id: user.id.to_proto(),
591 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
592 github_login: user.github_login,
593 })
594 .collect();
595 self.peer
596 .respond(receipt, proto::GetUsersResponse { users })
597 .await?;
598 Ok(())
599 }
600
601 async fn update_contacts_for_users<'a>(
602 self: &Arc<Server>,
603 user_ids: impl IntoIterator<Item = &'a UserId>,
604 ) -> tide::Result<()> {
605 let mut send_futures = Vec::new();
606
607 {
608 let state = self.state();
609 for user_id in user_ids {
610 let contacts = state.contacts_for_user(*user_id);
611 for connection_id in state.connection_ids_for_user(*user_id) {
612 send_futures.push(self.peer.send(
613 connection_id,
614 proto::UpdateContacts {
615 contacts: contacts.clone(),
616 },
617 ));
618 }
619 }
620 }
621 futures::future::try_join_all(send_futures).await?;
622
623 Ok(())
624 }
625
626 async fn join_channel(
627 mut self: Arc<Self>,
628 request: TypedEnvelope<proto::JoinChannel>,
629 ) -> tide::Result<()> {
630 let user_id = self.state().user_id_for_connection(request.sender_id)?;
631 let channel_id = ChannelId::from_proto(request.payload.channel_id);
632 if !self
633 .app_state
634 .db
635 .can_user_access_channel(user_id, channel_id)
636 .await?
637 {
638 Err(anyhow!("access denied"))?;
639 }
640
641 self.state_mut().join_channel(request.sender_id, channel_id);
642 let messages = self
643 .app_state
644 .db
645 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
646 .await?
647 .into_iter()
648 .map(|msg| proto::ChannelMessage {
649 id: msg.id.to_proto(),
650 body: msg.body,
651 timestamp: msg.sent_at.unix_timestamp() as u64,
652 sender_id: msg.sender_id.to_proto(),
653 nonce: Some(msg.nonce.as_u128().into()),
654 })
655 .collect::<Vec<_>>();
656 self.peer
657 .respond(
658 request.receipt(),
659 proto::JoinChannelResponse {
660 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
661 messages,
662 },
663 )
664 .await?;
665 Ok(())
666 }
667
668 async fn leave_channel(
669 mut self: Arc<Self>,
670 request: TypedEnvelope<proto::LeaveChannel>,
671 ) -> tide::Result<()> {
672 let user_id = self.state().user_id_for_connection(request.sender_id)?;
673 let channel_id = ChannelId::from_proto(request.payload.channel_id);
674 if !self
675 .app_state
676 .db
677 .can_user_access_channel(user_id, channel_id)
678 .await?
679 {
680 Err(anyhow!("access denied"))?;
681 }
682
683 self.state_mut()
684 .leave_channel(request.sender_id, channel_id);
685
686 Ok(())
687 }
688
689 async fn send_channel_message(
690 self: Arc<Self>,
691 request: TypedEnvelope<proto::SendChannelMessage>,
692 ) -> tide::Result<()> {
693 let receipt = request.receipt();
694 let channel_id = ChannelId::from_proto(request.payload.channel_id);
695 let user_id;
696 let connection_ids;
697 {
698 let state = self.state();
699 user_id = state.user_id_for_connection(request.sender_id)?;
700 if let Some(ids) = state.channel_connection_ids(channel_id) {
701 connection_ids = ids;
702 } else {
703 return Ok(());
704 }
705 }
706
707 // Validate the message body.
708 let body = request.payload.body.trim().to_string();
709 if body.len() > MAX_MESSAGE_LEN {
710 self.peer
711 .respond_with_error(
712 receipt,
713 proto::Error {
714 message: "message is too long".to_string(),
715 },
716 )
717 .await?;
718 return Ok(());
719 }
720 if body.is_empty() {
721 self.peer
722 .respond_with_error(
723 receipt,
724 proto::Error {
725 message: "message can't be blank".to_string(),
726 },
727 )
728 .await?;
729 return Ok(());
730 }
731
732 let timestamp = OffsetDateTime::now_utc();
733 let nonce = if let Some(nonce) = request.payload.nonce {
734 nonce
735 } else {
736 self.peer
737 .respond_with_error(
738 receipt,
739 proto::Error {
740 message: "nonce can't be blank".to_string(),
741 },
742 )
743 .await?;
744 return Ok(());
745 };
746
747 let message_id = self
748 .app_state
749 .db
750 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
751 .await?
752 .to_proto();
753 let message = proto::ChannelMessage {
754 sender_id: user_id.to_proto(),
755 id: message_id,
756 body,
757 timestamp: timestamp.unix_timestamp() as u64,
758 nonce: Some(nonce),
759 };
760 broadcast(request.sender_id, connection_ids, |conn_id| {
761 self.peer.send(
762 conn_id,
763 proto::ChannelMessageSent {
764 channel_id: channel_id.to_proto(),
765 message: Some(message.clone()),
766 },
767 )
768 })
769 .await?;
770 self.peer
771 .respond(
772 receipt,
773 proto::SendChannelMessageResponse {
774 message: Some(message),
775 },
776 )
777 .await?;
778 Ok(())
779 }
780
781 async fn get_channel_messages(
782 self: Arc<Self>,
783 request: TypedEnvelope<proto::GetChannelMessages>,
784 ) -> tide::Result<()> {
785 let user_id = self.state().user_id_for_connection(request.sender_id)?;
786 let channel_id = ChannelId::from_proto(request.payload.channel_id);
787 if !self
788 .app_state
789 .db
790 .can_user_access_channel(user_id, channel_id)
791 .await?
792 {
793 Err(anyhow!("access denied"))?;
794 }
795
796 let messages = self
797 .app_state
798 .db
799 .get_channel_messages(
800 channel_id,
801 MESSAGE_COUNT_PER_PAGE,
802 Some(MessageId::from_proto(request.payload.before_message_id)),
803 )
804 .await?
805 .into_iter()
806 .map(|msg| proto::ChannelMessage {
807 id: msg.id.to_proto(),
808 body: msg.body,
809 timestamp: msg.sent_at.unix_timestamp() as u64,
810 sender_id: msg.sender_id.to_proto(),
811 nonce: Some(msg.nonce.as_u128().into()),
812 })
813 .collect::<Vec<_>>();
814 self.peer
815 .respond(
816 request.receipt(),
817 proto::GetChannelMessagesResponse {
818 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
819 messages,
820 },
821 )
822 .await?;
823 Ok(())
824 }
825
826 fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
827 self.store.read()
828 }
829
830 fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
831 self.store.write()
832 }
833}
834
835pub async fn broadcast<F, T>(
836 sender_id: ConnectionId,
837 receiver_ids: Vec<ConnectionId>,
838 mut f: F,
839) -> anyhow::Result<()>
840where
841 F: FnMut(ConnectionId) -> T,
842 T: Future<Output = anyhow::Result<()>>,
843{
844 let futures = receiver_ids
845 .into_iter()
846 .filter(|id| *id != sender_id)
847 .map(|id| f(id));
848 futures::future::try_join_all(futures).await?;
849 Ok(())
850}
851
852pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
853 let server = Server::new(app.state().clone(), rpc.clone(), None);
854 app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
855 let server = server.clone();
856 async move {
857 const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
858
859 let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
860 let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
861 let upgrade_requested = connection_upgrade && upgrade_to_websocket;
862 let client_protocol_version: Option<u32> = request
863 .header("X-Zed-Protocol-Version")
864 .and_then(|v| v.as_str().parse().ok());
865
866 if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
867 return Ok(Response::new(StatusCode::UpgradeRequired));
868 }
869
870 let header = match request.header("Sec-Websocket-Key") {
871 Some(h) => h.as_str(),
872 None => return Err(anyhow!("expected sec-websocket-key"))?,
873 };
874
875 let user_id = process_auth_header(&request).await?;
876
877 let mut response = Response::new(StatusCode::SwitchingProtocols);
878 response.insert_header(UPGRADE, "websocket");
879 response.insert_header(CONNECTION, "Upgrade");
880 let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
881 response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
882 response.insert_header("Sec-Websocket-Version", "13");
883
884 let http_res: &mut tide::http::Response = response.as_mut();
885 let upgrade_receiver = http_res.recv_upgrade().await;
886 let addr = request.remote().unwrap_or("unknown").to_string();
887 task::spawn(async move {
888 if let Some(stream) = upgrade_receiver.await {
889 server
890 .handle_connection(
891 Connection::new(
892 WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
893 ),
894 addr,
895 user_id,
896 None,
897 )
898 .await;
899 }
900 });
901
902 Ok(response)
903 }
904 });
905}
906
907fn header_contains_ignore_case<T>(
908 request: &tide::Request<T>,
909 header_name: HeaderName,
910 value: &str,
911) -> bool {
912 request
913 .header(header_name)
914 .map(|h| {
915 h.as_str()
916 .split(',')
917 .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
918 })
919 .unwrap_or(false)
920}
921
922#[cfg(test)]
923mod tests {
924 use super::*;
925 use crate::{
926 auth,
927 db::{tests::TestDb, UserId},
928 github, AppState, Config,
929 };
930 use ::rpc::Peer;
931 use async_std::task;
932 use gpui::{ModelHandle, TestAppContext};
933 use parking_lot::Mutex;
934 use postage::{mpsc, watch};
935 use rpc::PeerId;
936 use serde_json::json;
937 use sqlx::types::time::OffsetDateTime;
938 use std::{
939 ops::Deref,
940 path::Path,
941 sync::{
942 atomic::{AtomicBool, Ordering::SeqCst},
943 Arc,
944 },
945 time::Duration,
946 };
947 use zed::{
948 client::{
949 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
950 EstablishConnectionError, UserStore,
951 },
952 contacts_panel::JoinWorktree,
953 editor::{Editor, EditorSettings, Input},
954 fs::{FakeFs, Fs as _},
955 language::{
956 tree_sitter_rust, Diagnostic, Language, LanguageConfig, LanguageRegistry,
957 LanguageServerConfig, Point,
958 },
959 lsp,
960 project::{ProjectPath, Worktree},
961 test::test_app_state,
962 workspace::Workspace,
963 };
964
965 #[gpui::test]
966 async fn test_share_worktree(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
967 let (window_b, _) = cx_b.add_window(|_| EmptyView);
968 let lang_registry = Arc::new(LanguageRegistry::new());
969
970 // Connect to a server as 2 clients.
971 let mut server = TestServer::start().await;
972 let client_a = server.create_client(&mut cx_a, "user_a").await;
973 let client_b = server.create_client(&mut cx_b, "user_b").await;
974
975 cx_a.foreground().forbid_parking();
976
977 // Share a local worktree as client A
978 let fs = Arc::new(FakeFs::new());
979 fs.insert_tree(
980 "/a",
981 json!({
982 ".zed.toml": r#"collaborators = ["user_b"]"#,
983 "a.txt": "a-contents",
984 "b.txt": "b-contents",
985 }),
986 )
987 .await;
988 let worktree_a = Worktree::open_local(
989 client_a.clone(),
990 client_a.user_store.clone(),
991 "/a".as_ref(),
992 fs,
993 lang_registry.clone(),
994 &mut cx_a.to_async(),
995 )
996 .await
997 .unwrap();
998 worktree_a
999 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1000 .await;
1001 let worktree_id = worktree_a
1002 .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1003 .await
1004 .unwrap();
1005
1006 // Join that worktree as client B, and see that a guest has joined as client A.
1007 let worktree_b = Worktree::open_remote(
1008 client_b.clone(),
1009 worktree_id,
1010 lang_registry.clone(),
1011 client_b.user_store.clone(),
1012 &mut cx_b.to_async(),
1013 )
1014 .await
1015 .unwrap();
1016
1017 let replica_id_b = worktree_b.read_with(&cx_b, |tree, _| {
1018 assert_eq!(
1019 tree.collaborators()
1020 .get(&client_a.peer_id)
1021 .unwrap()
1022 .user
1023 .github_login,
1024 "user_a"
1025 );
1026 tree.replica_id()
1027 });
1028 worktree_a
1029 .condition(&cx_a, |tree, _| {
1030 tree.collaborators()
1031 .get(&client_b.peer_id)
1032 .map_or(false, |collaborator| {
1033 collaborator.replica_id == replica_id_b
1034 && collaborator.user.github_login == "user_b"
1035 })
1036 })
1037 .await;
1038
1039 // Open the same file as client B and client A.
1040 let buffer_b = worktree_b
1041 .update(&mut cx_b, |worktree, cx| worktree.open_buffer("b.txt", cx))
1042 .await
1043 .unwrap();
1044 buffer_b.read_with(&cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1045 worktree_a.read_with(&cx_a, |tree, cx| assert!(tree.has_open_buffer("b.txt", cx)));
1046 let buffer_a = worktree_a
1047 .update(&mut cx_a, |tree, cx| tree.open_buffer("b.txt", cx))
1048 .await
1049 .unwrap();
1050
1051 // Create a selection set as client B and see that selection set as client A.
1052 let editor_b = cx_b.add_view(window_b, |cx| {
1053 Editor::for_buffer(buffer_b, |cx| EditorSettings::test(cx), cx)
1054 });
1055 buffer_a
1056 .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1057 .await;
1058
1059 // Edit the buffer as client B and see that edit as client A.
1060 editor_b.update(&mut cx_b, |editor, cx| {
1061 editor.handle_input(&Input("ok, ".into()), cx)
1062 });
1063 buffer_a
1064 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1065 .await;
1066
1067 // Remove the selection set as client B, see those selections disappear as client A.
1068 cx_b.update(move |_| drop(editor_b));
1069 buffer_a
1070 .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1071 .await;
1072
1073 // Close the buffer as client A, see that the buffer is closed.
1074 cx_a.update(move |_| drop(buffer_a));
1075 worktree_a
1076 .condition(&cx_a, |tree, cx| !tree.has_open_buffer("b.txt", cx))
1077 .await;
1078
1079 // Dropping the worktree removes client B from client A's collaborators.
1080 cx_b.update(move |_| drop(worktree_b));
1081 worktree_a
1082 .condition(&cx_a, |tree, _| tree.collaborators().is_empty())
1083 .await;
1084 }
1085
1086 #[gpui::test]
1087 async fn test_unshare_worktree(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1088 cx_b.update(zed::contacts_panel::init);
1089 let mut app_state_a = cx_a.update(test_app_state);
1090 let mut app_state_b = cx_b.update(test_app_state);
1091
1092 // Connect to a server as 2 clients.
1093 let mut server = TestServer::start().await;
1094 let client_a = server.create_client(&mut cx_a, "user_a").await;
1095 let client_b = server.create_client(&mut cx_b, "user_b").await;
1096 Arc::get_mut(&mut app_state_a).unwrap().client = client_a.clone();
1097 Arc::get_mut(&mut app_state_a).unwrap().user_store = client_a.user_store.clone();
1098 Arc::get_mut(&mut app_state_b).unwrap().client = client_b.clone();
1099 Arc::get_mut(&mut app_state_b).unwrap().user_store = client_b.user_store.clone();
1100
1101 cx_a.foreground().forbid_parking();
1102
1103 // Share a local worktree as client A
1104 let fs = Arc::new(FakeFs::new());
1105 fs.insert_tree(
1106 "/a",
1107 json!({
1108 ".zed.toml": r#"collaborators = ["user_b"]"#,
1109 "a.txt": "a-contents",
1110 "b.txt": "b-contents",
1111 }),
1112 )
1113 .await;
1114 let worktree_a = Worktree::open_local(
1115 app_state_a.client.clone(),
1116 app_state_a.user_store.clone(),
1117 "/a".as_ref(),
1118 fs,
1119 app_state_a.languages.clone(),
1120 &mut cx_a.to_async(),
1121 )
1122 .await
1123 .unwrap();
1124 worktree_a
1125 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1126 .await;
1127
1128 let remote_worktree_id = worktree_a
1129 .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1130 .await
1131 .unwrap();
1132
1133 let (window_b, workspace_b) =
1134 cx_b.add_window(|cx| Workspace::new(&app_state_b.as_ref().into(), cx));
1135 cx_b.update(|cx| {
1136 cx.dispatch_action(
1137 window_b,
1138 vec![workspace_b.id()],
1139 &JoinWorktree(remote_worktree_id),
1140 );
1141 });
1142 workspace_b
1143 .condition(&cx_b, |workspace, cx| workspace.worktrees(cx).len() == 1)
1144 .await;
1145
1146 let local_worktree_id_b = workspace_b.read_with(&cx_b, |workspace, cx| {
1147 let active_pane = workspace.active_pane().read(cx);
1148 assert!(active_pane.active_item().is_none());
1149 workspace.worktrees(cx).first().unwrap().id()
1150 });
1151 workspace_b
1152 .update(&mut cx_b, |workspace, cx| {
1153 workspace.open_entry(
1154 ProjectPath {
1155 worktree_id: local_worktree_id_b,
1156 path: Path::new("a.txt").into(),
1157 },
1158 cx,
1159 )
1160 })
1161 .unwrap()
1162 .await;
1163 workspace_b.read_with(&cx_b, |workspace, cx| {
1164 let active_pane = workspace.active_pane().read(cx);
1165 assert!(active_pane.active_item().is_some());
1166 });
1167
1168 worktree_a.update(&mut cx_a, |tree, cx| {
1169 tree.as_local_mut().unwrap().unshare(cx);
1170 });
1171 workspace_b
1172 .condition(&cx_b, |workspace, cx| workspace.worktrees(cx).len() == 0)
1173 .await;
1174 workspace_b.read_with(&cx_b, |workspace, cx| {
1175 let active_pane = workspace.active_pane().read(cx);
1176 assert!(active_pane.active_item().is_none());
1177 });
1178 }
1179
1180 #[gpui::test]
1181 async fn test_propagate_saves_and_fs_changes_in_shared_worktree(
1182 mut cx_a: TestAppContext,
1183 mut cx_b: TestAppContext,
1184 mut cx_c: TestAppContext,
1185 ) {
1186 cx_a.foreground().forbid_parking();
1187 let lang_registry = Arc::new(LanguageRegistry::new());
1188
1189 // Connect to a server as 3 clients.
1190 let mut server = TestServer::start().await;
1191 let client_a = server.create_client(&mut cx_a, "user_a").await;
1192 let client_b = server.create_client(&mut cx_b, "user_b").await;
1193 let client_c = server.create_client(&mut cx_c, "user_c").await;
1194
1195 let fs = Arc::new(FakeFs::new());
1196
1197 // Share a worktree as client A.
1198 fs.insert_tree(
1199 "/a",
1200 json!({
1201 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1202 "file1": "",
1203 "file2": ""
1204 }),
1205 )
1206 .await;
1207
1208 let worktree_a = Worktree::open_local(
1209 client_a.clone(),
1210 client_a.user_store.clone(),
1211 "/a".as_ref(),
1212 fs.clone(),
1213 lang_registry.clone(),
1214 &mut cx_a.to_async(),
1215 )
1216 .await
1217 .unwrap();
1218 worktree_a
1219 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1220 .await;
1221 let worktree_id = worktree_a
1222 .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1223 .await
1224 .unwrap();
1225
1226 // Join that worktree as clients B and C.
1227 let worktree_b = Worktree::open_remote(
1228 client_b.clone(),
1229 worktree_id,
1230 lang_registry.clone(),
1231 client_b.user_store.clone(),
1232 &mut cx_b.to_async(),
1233 )
1234 .await
1235 .unwrap();
1236 let worktree_c = Worktree::open_remote(
1237 client_c.clone(),
1238 worktree_id,
1239 lang_registry.clone(),
1240 client_c.user_store.clone(),
1241 &mut cx_c.to_async(),
1242 )
1243 .await
1244 .unwrap();
1245
1246 // Open and edit a buffer as both guests B and C.
1247 let buffer_b = worktree_b
1248 .update(&mut cx_b, |tree, cx| tree.open_buffer("file1", cx))
1249 .await
1250 .unwrap();
1251 let buffer_c = worktree_c
1252 .update(&mut cx_c, |tree, cx| tree.open_buffer("file1", cx))
1253 .await
1254 .unwrap();
1255 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1256 buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1257
1258 // Open and edit that buffer as the host.
1259 let buffer_a = worktree_a
1260 .update(&mut cx_a, |tree, cx| tree.open_buffer("file1", cx))
1261 .await
1262 .unwrap();
1263
1264 buffer_a
1265 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1266 .await;
1267 buffer_a.update(&mut cx_a, |buf, cx| {
1268 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1269 });
1270
1271 // Wait for edits to propagate
1272 buffer_a
1273 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1274 .await;
1275 buffer_b
1276 .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1277 .await;
1278 buffer_c
1279 .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1280 .await;
1281
1282 // Edit the buffer as the host and concurrently save as guest B.
1283 let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx).unwrap());
1284 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1285 save_b.await.unwrap();
1286 assert_eq!(
1287 fs.load("/a/file1".as_ref()).await.unwrap(),
1288 "hi-a, i-am-c, i-am-b, i-am-a"
1289 );
1290 buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1291 buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1292 buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1293
1294 // Make changes on host's file system, see those changes on the guests.
1295 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
1296 .await
1297 .unwrap();
1298 fs.insert_file(Path::new("/a/file4"), "4".into())
1299 .await
1300 .unwrap();
1301
1302 worktree_b
1303 .condition(&cx_b, |tree, _| tree.file_count() == 4)
1304 .await;
1305 worktree_c
1306 .condition(&cx_c, |tree, _| tree.file_count() == 4)
1307 .await;
1308 worktree_b.read_with(&cx_b, |tree, _| {
1309 assert_eq!(
1310 tree.paths()
1311 .map(|p| p.to_string_lossy())
1312 .collect::<Vec<_>>(),
1313 &[".zed.toml", "file1", "file3", "file4"]
1314 )
1315 });
1316 worktree_c.read_with(&cx_c, |tree, _| {
1317 assert_eq!(
1318 tree.paths()
1319 .map(|p| p.to_string_lossy())
1320 .collect::<Vec<_>>(),
1321 &[".zed.toml", "file1", "file3", "file4"]
1322 )
1323 });
1324 }
1325
1326 #[gpui::test]
1327 async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1328 cx_a.foreground().forbid_parking();
1329 let lang_registry = Arc::new(LanguageRegistry::new());
1330
1331 // Connect to a server as 2 clients.
1332 let mut server = TestServer::start().await;
1333 let client_a = server.create_client(&mut cx_a, "user_a").await;
1334 let client_b = server.create_client(&mut cx_b, "user_b").await;
1335
1336 // Share a local worktree as client A
1337 let fs = Arc::new(FakeFs::new());
1338 fs.insert_tree(
1339 "/dir",
1340 json!({
1341 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1342 "a.txt": "a-contents",
1343 }),
1344 )
1345 .await;
1346
1347 let worktree_a = Worktree::open_local(
1348 client_a.clone(),
1349 client_a.user_store.clone(),
1350 "/dir".as_ref(),
1351 fs,
1352 lang_registry.clone(),
1353 &mut cx_a.to_async(),
1354 )
1355 .await
1356 .unwrap();
1357 worktree_a
1358 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1359 .await;
1360 let worktree_id = worktree_a
1361 .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1362 .await
1363 .unwrap();
1364
1365 // Join that worktree as client B, and see that a guest has joined as client A.
1366 let worktree_b = Worktree::open_remote(
1367 client_b.clone(),
1368 worktree_id,
1369 lang_registry.clone(),
1370 client_b.user_store.clone(),
1371 &mut cx_b.to_async(),
1372 )
1373 .await
1374 .unwrap();
1375
1376 let buffer_b = worktree_b
1377 .update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx))
1378 .await
1379 .unwrap();
1380 let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1381
1382 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1383 buffer_b.read_with(&cx_b, |buf, _| {
1384 assert!(buf.is_dirty());
1385 assert!(!buf.has_conflict());
1386 });
1387
1388 buffer_b
1389 .update(&mut cx_b, |buf, cx| buf.save(cx))
1390 .unwrap()
1391 .await
1392 .unwrap();
1393 worktree_b
1394 .condition(&cx_b, |_, cx| {
1395 buffer_b.read(cx).file().unwrap().mtime() != mtime
1396 })
1397 .await;
1398 buffer_b.read_with(&cx_b, |buf, _| {
1399 assert!(!buf.is_dirty());
1400 assert!(!buf.has_conflict());
1401 });
1402
1403 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1404 buffer_b.read_with(&cx_b, |buf, _| {
1405 assert!(buf.is_dirty());
1406 assert!(!buf.has_conflict());
1407 });
1408 }
1409
1410 #[gpui::test]
1411 async fn test_editing_while_guest_opens_buffer(
1412 mut cx_a: TestAppContext,
1413 mut cx_b: TestAppContext,
1414 ) {
1415 cx_a.foreground().forbid_parking();
1416 let lang_registry = Arc::new(LanguageRegistry::new());
1417
1418 // Connect to a server as 2 clients.
1419 let mut server = TestServer::start().await;
1420 let client_a = server.create_client(&mut cx_a, "user_a").await;
1421 let client_b = server.create_client(&mut cx_b, "user_b").await;
1422
1423 // Share a local worktree as client A
1424 let fs = Arc::new(FakeFs::new());
1425 fs.insert_tree(
1426 "/dir",
1427 json!({
1428 ".zed.toml": r#"collaborators = ["user_b"]"#,
1429 "a.txt": "a-contents",
1430 }),
1431 )
1432 .await;
1433 let worktree_a = Worktree::open_local(
1434 client_a.clone(),
1435 client_a.user_store.clone(),
1436 "/dir".as_ref(),
1437 fs,
1438 lang_registry.clone(),
1439 &mut cx_a.to_async(),
1440 )
1441 .await
1442 .unwrap();
1443 worktree_a
1444 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1445 .await;
1446 let worktree_id = worktree_a
1447 .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1448 .await
1449 .unwrap();
1450
1451 // Join that worktree as client B, and see that a guest has joined as client A.
1452 let worktree_b = Worktree::open_remote(
1453 client_b.clone(),
1454 worktree_id,
1455 lang_registry.clone(),
1456 client_b.user_store.clone(),
1457 &mut cx_b.to_async(),
1458 )
1459 .await
1460 .unwrap();
1461
1462 let buffer_a = worktree_a
1463 .update(&mut cx_a, |tree, cx| tree.open_buffer("a.txt", cx))
1464 .await
1465 .unwrap();
1466 let buffer_b = cx_b
1467 .background()
1468 .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1469
1470 task::yield_now().await;
1471 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1472
1473 let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1474 let buffer_b = buffer_b.await.unwrap();
1475 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1476 }
1477
1478 #[gpui::test]
1479 async fn test_leaving_worktree_while_opening_buffer(
1480 mut cx_a: TestAppContext,
1481 mut cx_b: TestAppContext,
1482 ) {
1483 cx_a.foreground().forbid_parking();
1484 let lang_registry = Arc::new(LanguageRegistry::new());
1485
1486 // Connect to a server as 2 clients.
1487 let mut server = TestServer::start().await;
1488 let client_a = server.create_client(&mut cx_a, "user_a").await;
1489 let client_b = server.create_client(&mut cx_b, "user_b").await;
1490
1491 // Share a local worktree as client A
1492 let fs = Arc::new(FakeFs::new());
1493 fs.insert_tree(
1494 "/dir",
1495 json!({
1496 ".zed.toml": r#"collaborators = ["user_b"]"#,
1497 "a.txt": "a-contents",
1498 }),
1499 )
1500 .await;
1501 let worktree_a = Worktree::open_local(
1502 client_a.clone(),
1503 client_a.user_store.clone(),
1504 "/dir".as_ref(),
1505 fs,
1506 lang_registry.clone(),
1507 &mut cx_a.to_async(),
1508 )
1509 .await
1510 .unwrap();
1511 worktree_a
1512 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1513 .await;
1514 let worktree_id = worktree_a
1515 .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1516 .await
1517 .unwrap();
1518
1519 // Join that worktree as client B, and see that a guest has joined as client A.
1520 let worktree_b = Worktree::open_remote(
1521 client_b.clone(),
1522 worktree_id,
1523 lang_registry.clone(),
1524 client_b.user_store.clone(),
1525 &mut cx_b.to_async(),
1526 )
1527 .await
1528 .unwrap();
1529 worktree_a
1530 .condition(&cx_a, |tree, _| tree.collaborators().len() == 1)
1531 .await;
1532
1533 let buffer_b = cx_b
1534 .background()
1535 .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1536 cx_b.update(|_| drop(worktree_b));
1537 drop(buffer_b);
1538 worktree_a
1539 .condition(&cx_a, |tree, _| tree.collaborators().len() == 0)
1540 .await;
1541 }
1542
1543 #[gpui::test]
1544 async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1545 cx_a.foreground().forbid_parking();
1546 let lang_registry = Arc::new(LanguageRegistry::new());
1547
1548 // Connect to a server as 2 clients.
1549 let mut server = TestServer::start().await;
1550 let client_a = server.create_client(&mut cx_a, "user_a").await;
1551 let client_b = server.create_client(&mut cx_b, "user_b").await;
1552
1553 // Share a local worktree as client A
1554 let fs = Arc::new(FakeFs::new());
1555 fs.insert_tree(
1556 "/a",
1557 json!({
1558 ".zed.toml": r#"collaborators = ["user_b"]"#,
1559 "a.txt": "a-contents",
1560 "b.txt": "b-contents",
1561 }),
1562 )
1563 .await;
1564 let worktree_a = Worktree::open_local(
1565 client_a.clone(),
1566 client_a.user_store.clone(),
1567 "/a".as_ref(),
1568 fs,
1569 lang_registry.clone(),
1570 &mut cx_a.to_async(),
1571 )
1572 .await
1573 .unwrap();
1574 worktree_a
1575 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1576 .await;
1577 let worktree_id = worktree_a
1578 .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1579 .await
1580 .unwrap();
1581
1582 // Join that worktree as client B, and see that a guest has joined as client A.
1583 let _worktree_b = Worktree::open_remote(
1584 client_b.clone(),
1585 worktree_id,
1586 lang_registry.clone(),
1587 client_b.user_store.clone(),
1588 &mut cx_b.to_async(),
1589 )
1590 .await
1591 .unwrap();
1592 worktree_a
1593 .condition(&cx_a, |tree, _| tree.collaborators().len() == 1)
1594 .await;
1595
1596 // Drop client B's connection and ensure client A observes client B leaving the worktree.
1597 client_b.disconnect(&cx_b.to_async()).await.unwrap();
1598 worktree_a
1599 .condition(&cx_a, |tree, _| tree.collaborators().len() == 0)
1600 .await;
1601 }
1602
1603 #[gpui::test]
1604 async fn test_collaborating_with_diagnostics(
1605 mut cx_a: TestAppContext,
1606 mut cx_b: TestAppContext,
1607 ) {
1608 cx_a.foreground().forbid_parking();
1609 let (language_server_config, mut fake_language_server) =
1610 LanguageServerConfig::fake(cx_a.background()).await;
1611 let mut lang_registry = LanguageRegistry::new();
1612 lang_registry.add(Arc::new(Language::new(
1613 LanguageConfig {
1614 name: "Rust".to_string(),
1615 path_suffixes: vec!["rs".to_string()],
1616 language_server: Some(language_server_config),
1617 ..Default::default()
1618 },
1619 Some(tree_sitter_rust::language()),
1620 )));
1621
1622 let lang_registry = Arc::new(lang_registry);
1623
1624 // Connect to a server as 2 clients.
1625 let mut server = TestServer::start().await;
1626 let client_a = server.create_client(&mut cx_a, "user_a").await;
1627 let client_b = server.create_client(&mut cx_b, "user_b").await;
1628
1629 // Share a local worktree as client A
1630 let fs = Arc::new(FakeFs::new());
1631 fs.insert_tree(
1632 "/a",
1633 json!({
1634 ".zed.toml": r#"collaborators = ["user_b"]"#,
1635 "a.rs": "let one = two",
1636 "other.rs": "",
1637 }),
1638 )
1639 .await;
1640 let worktree_a = Worktree::open_local(
1641 client_a.clone(),
1642 client_a.user_store.clone(),
1643 "/a".as_ref(),
1644 fs,
1645 lang_registry.clone(),
1646 &mut cx_a.to_async(),
1647 )
1648 .await
1649 .unwrap();
1650 worktree_a
1651 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1652 .await;
1653 let worktree_id = worktree_a
1654 .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1655 .await
1656 .unwrap();
1657
1658 // Cause language server to start.
1659 let _ = cx_a
1660 .background()
1661 .spawn(worktree_a.update(&mut cx_a, |worktree, cx| {
1662 worktree.open_buffer("other.rs", cx)
1663 }))
1664 .await
1665 .unwrap();
1666
1667 // Simulate a language server reporting errors for a file.
1668 fake_language_server
1669 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1670 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1671 version: None,
1672 diagnostics: vec![
1673 lsp::Diagnostic {
1674 severity: Some(lsp::DiagnosticSeverity::ERROR),
1675 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1676 message: "message 1".to_string(),
1677 ..Default::default()
1678 },
1679 lsp::Diagnostic {
1680 severity: Some(lsp::DiagnosticSeverity::WARNING),
1681 range: lsp::Range::new(
1682 lsp::Position::new(0, 10),
1683 lsp::Position::new(0, 13),
1684 ),
1685 message: "message 2".to_string(),
1686 ..Default::default()
1687 },
1688 ],
1689 })
1690 .await;
1691
1692 // Join the worktree as client B.
1693 let worktree_b = Worktree::open_remote(
1694 client_b.clone(),
1695 worktree_id,
1696 lang_registry.clone(),
1697 client_b.user_store.clone(),
1698 &mut cx_b.to_async(),
1699 )
1700 .await
1701 .unwrap();
1702
1703 // Open the file with the errors.
1704 let buffer_b = cx_b
1705 .background()
1706 .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.rs", cx)))
1707 .await
1708 .unwrap();
1709
1710 buffer_b.read_with(&cx_b, |buffer, _| {
1711 assert_eq!(
1712 buffer
1713 .diagnostics_in_range(0..buffer.len())
1714 .collect::<Vec<_>>(),
1715 &[
1716 (
1717 Point::new(0, 4)..Point::new(0, 7),
1718 &Diagnostic {
1719 group_id: 0,
1720 message: "message 1".to_string(),
1721 severity: lsp::DiagnosticSeverity::ERROR,
1722 is_primary: true
1723 }
1724 ),
1725 (
1726 Point::new(0, 10)..Point::new(0, 13),
1727 &Diagnostic {
1728 group_id: 1,
1729 severity: lsp::DiagnosticSeverity::WARNING,
1730 message: "message 2".to_string(),
1731 is_primary: true
1732 }
1733 )
1734 ]
1735 );
1736 });
1737 }
1738
1739 #[gpui::test]
1740 async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1741 cx_a.foreground().forbid_parking();
1742
1743 // Connect to a server as 2 clients.
1744 let mut server = TestServer::start().await;
1745 let client_a = server.create_client(&mut cx_a, "user_a").await;
1746 let client_b = server.create_client(&mut cx_b, "user_b").await;
1747
1748 // Create an org that includes these 2 users.
1749 let db = &server.app_state.db;
1750 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1751 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
1752 .await
1753 .unwrap();
1754 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
1755 .await
1756 .unwrap();
1757
1758 // Create a channel that includes all the users.
1759 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1760 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
1761 .await
1762 .unwrap();
1763 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
1764 .await
1765 .unwrap();
1766 db.create_channel_message(
1767 channel_id,
1768 client_b.current_user_id(&cx_b),
1769 "hello A, it's B.",
1770 OffsetDateTime::now_utc(),
1771 1,
1772 )
1773 .await
1774 .unwrap();
1775
1776 let channels_a = cx_a
1777 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
1778 channels_a
1779 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
1780 .await;
1781 channels_a.read_with(&cx_a, |list, _| {
1782 assert_eq!(
1783 list.available_channels().unwrap(),
1784 &[ChannelDetails {
1785 id: channel_id.to_proto(),
1786 name: "test-channel".to_string()
1787 }]
1788 )
1789 });
1790 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
1791 this.get_channel(channel_id.to_proto(), cx).unwrap()
1792 });
1793 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
1794 channel_a
1795 .condition(&cx_a, |channel, _| {
1796 channel_messages(channel)
1797 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1798 })
1799 .await;
1800
1801 let channels_b = cx_b
1802 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
1803 channels_b
1804 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
1805 .await;
1806 channels_b.read_with(&cx_b, |list, _| {
1807 assert_eq!(
1808 list.available_channels().unwrap(),
1809 &[ChannelDetails {
1810 id: channel_id.to_proto(),
1811 name: "test-channel".to_string()
1812 }]
1813 )
1814 });
1815
1816 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
1817 this.get_channel(channel_id.to_proto(), cx).unwrap()
1818 });
1819 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
1820 channel_b
1821 .condition(&cx_b, |channel, _| {
1822 channel_messages(channel)
1823 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1824 })
1825 .await;
1826
1827 channel_a
1828 .update(&mut cx_a, |channel, cx| {
1829 channel
1830 .send_message("oh, hi B.".to_string(), cx)
1831 .unwrap()
1832 .detach();
1833 let task = channel.send_message("sup".to_string(), cx).unwrap();
1834 assert_eq!(
1835 channel_messages(channel),
1836 &[
1837 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
1838 ("user_a".to_string(), "oh, hi B.".to_string(), true),
1839 ("user_a".to_string(), "sup".to_string(), true)
1840 ]
1841 );
1842 task
1843 })
1844 .await
1845 .unwrap();
1846
1847 channel_b
1848 .condition(&cx_b, |channel, _| {
1849 channel_messages(channel)
1850 == [
1851 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
1852 ("user_a".to_string(), "oh, hi B.".to_string(), false),
1853 ("user_a".to_string(), "sup".to_string(), false),
1854 ]
1855 })
1856 .await;
1857
1858 assert_eq!(
1859 server
1860 .state()
1861 .await
1862 .channel(channel_id)
1863 .unwrap()
1864 .connection_ids
1865 .len(),
1866 2
1867 );
1868 cx_b.update(|_| drop(channel_b));
1869 server
1870 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
1871 .await;
1872
1873 cx_a.update(|_| drop(channel_a));
1874 server
1875 .condition(|state| state.channel(channel_id).is_none())
1876 .await;
1877 }
1878
1879 #[gpui::test]
1880 async fn test_chat_message_validation(mut cx_a: TestAppContext) {
1881 cx_a.foreground().forbid_parking();
1882
1883 let mut server = TestServer::start().await;
1884 let client_a = server.create_client(&mut cx_a, "user_a").await;
1885
1886 let db = &server.app_state.db;
1887 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1888 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1889 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
1890 .await
1891 .unwrap();
1892 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
1893 .await
1894 .unwrap();
1895
1896 let channels_a = cx_a
1897 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
1898 channels_a
1899 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
1900 .await;
1901 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
1902 this.get_channel(channel_id.to_proto(), cx).unwrap()
1903 });
1904
1905 // Messages aren't allowed to be too long.
1906 channel_a
1907 .update(&mut cx_a, |channel, cx| {
1908 let long_body = "this is long.\n".repeat(1024);
1909 channel.send_message(long_body, cx).unwrap()
1910 })
1911 .await
1912 .unwrap_err();
1913
1914 // Messages aren't allowed to be blank.
1915 channel_a.update(&mut cx_a, |channel, cx| {
1916 channel.send_message(String::new(), cx).unwrap_err()
1917 });
1918
1919 // Leading and trailing whitespace are trimmed.
1920 channel_a
1921 .update(&mut cx_a, |channel, cx| {
1922 channel
1923 .send_message("\n surrounded by whitespace \n".to_string(), cx)
1924 .unwrap()
1925 })
1926 .await
1927 .unwrap();
1928 assert_eq!(
1929 db.get_channel_messages(channel_id, 10, None)
1930 .await
1931 .unwrap()
1932 .iter()
1933 .map(|m| &m.body)
1934 .collect::<Vec<_>>(),
1935 &["surrounded by whitespace"]
1936 );
1937 }
1938
1939 #[gpui::test]
1940 async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1941 cx_a.foreground().forbid_parking();
1942
1943 // Connect to a server as 2 clients.
1944 let mut server = TestServer::start().await;
1945 let client_a = server.create_client(&mut cx_a, "user_a").await;
1946 let client_b = server.create_client(&mut cx_b, "user_b").await;
1947 let mut status_b = client_b.status();
1948
1949 // Create an org that includes these 2 users.
1950 let db = &server.app_state.db;
1951 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1952 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
1953 .await
1954 .unwrap();
1955 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
1956 .await
1957 .unwrap();
1958
1959 // Create a channel that includes all the users.
1960 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1961 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
1962 .await
1963 .unwrap();
1964 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
1965 .await
1966 .unwrap();
1967 db.create_channel_message(
1968 channel_id,
1969 client_b.current_user_id(&cx_b),
1970 "hello A, it's B.",
1971 OffsetDateTime::now_utc(),
1972 2,
1973 )
1974 .await
1975 .unwrap();
1976
1977 let channels_a = cx_a
1978 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
1979 channels_a
1980 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
1981 .await;
1982
1983 channels_a.read_with(&cx_a, |list, _| {
1984 assert_eq!(
1985 list.available_channels().unwrap(),
1986 &[ChannelDetails {
1987 id: channel_id.to_proto(),
1988 name: "test-channel".to_string()
1989 }]
1990 )
1991 });
1992 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
1993 this.get_channel(channel_id.to_proto(), cx).unwrap()
1994 });
1995 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
1996 channel_a
1997 .condition(&cx_a, |channel, _| {
1998 channel_messages(channel)
1999 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2000 })
2001 .await;
2002
2003 let channels_b = cx_b
2004 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2005 channels_b
2006 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2007 .await;
2008 channels_b.read_with(&cx_b, |list, _| {
2009 assert_eq!(
2010 list.available_channels().unwrap(),
2011 &[ChannelDetails {
2012 id: channel_id.to_proto(),
2013 name: "test-channel".to_string()
2014 }]
2015 )
2016 });
2017
2018 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2019 this.get_channel(channel_id.to_proto(), cx).unwrap()
2020 });
2021 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2022 channel_b
2023 .condition(&cx_b, |channel, _| {
2024 channel_messages(channel)
2025 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2026 })
2027 .await;
2028
2029 // Disconnect client B, ensuring we can still access its cached channel data.
2030 server.forbid_connections();
2031 server.disconnect_client(client_b.current_user_id(&cx_b));
2032 while !matches!(
2033 status_b.recv().await,
2034 Some(client::Status::ReconnectionError { .. })
2035 ) {}
2036
2037 channels_b.read_with(&cx_b, |channels, _| {
2038 assert_eq!(
2039 channels.available_channels().unwrap(),
2040 [ChannelDetails {
2041 id: channel_id.to_proto(),
2042 name: "test-channel".to_string()
2043 }]
2044 )
2045 });
2046 channel_b.read_with(&cx_b, |channel, _| {
2047 assert_eq!(
2048 channel_messages(channel),
2049 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2050 )
2051 });
2052
2053 // Send a message from client B while it is disconnected.
2054 channel_b
2055 .update(&mut cx_b, |channel, cx| {
2056 let task = channel
2057 .send_message("can you see this?".to_string(), cx)
2058 .unwrap();
2059 assert_eq!(
2060 channel_messages(channel),
2061 &[
2062 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2063 ("user_b".to_string(), "can you see this?".to_string(), true)
2064 ]
2065 );
2066 task
2067 })
2068 .await
2069 .unwrap_err();
2070
2071 // Send a message from client A while B is disconnected.
2072 channel_a
2073 .update(&mut cx_a, |channel, cx| {
2074 channel
2075 .send_message("oh, hi B.".to_string(), cx)
2076 .unwrap()
2077 .detach();
2078 let task = channel.send_message("sup".to_string(), cx).unwrap();
2079 assert_eq!(
2080 channel_messages(channel),
2081 &[
2082 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2083 ("user_a".to_string(), "oh, hi B.".to_string(), true),
2084 ("user_a".to_string(), "sup".to_string(), true)
2085 ]
2086 );
2087 task
2088 })
2089 .await
2090 .unwrap();
2091
2092 // Give client B a chance to reconnect.
2093 server.allow_connections();
2094 cx_b.foreground().advance_clock(Duration::from_secs(10));
2095
2096 // Verify that B sees the new messages upon reconnection, as well as the message client B
2097 // sent while offline.
2098 channel_b
2099 .condition(&cx_b, |channel, _| {
2100 channel_messages(channel)
2101 == [
2102 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2103 ("user_a".to_string(), "oh, hi B.".to_string(), false),
2104 ("user_a".to_string(), "sup".to_string(), false),
2105 ("user_b".to_string(), "can you see this?".to_string(), false),
2106 ]
2107 })
2108 .await;
2109
2110 // Ensure client A and B can communicate normally after reconnection.
2111 channel_a
2112 .update(&mut cx_a, |channel, cx| {
2113 channel.send_message("you online?".to_string(), cx).unwrap()
2114 })
2115 .await
2116 .unwrap();
2117 channel_b
2118 .condition(&cx_b, |channel, _| {
2119 channel_messages(channel)
2120 == [
2121 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2122 ("user_a".to_string(), "oh, hi B.".to_string(), false),
2123 ("user_a".to_string(), "sup".to_string(), false),
2124 ("user_b".to_string(), "can you see this?".to_string(), false),
2125 ("user_a".to_string(), "you online?".to_string(), false),
2126 ]
2127 })
2128 .await;
2129
2130 channel_b
2131 .update(&mut cx_b, |channel, cx| {
2132 channel.send_message("yep".to_string(), cx).unwrap()
2133 })
2134 .await
2135 .unwrap();
2136 channel_a
2137 .condition(&cx_a, |channel, _| {
2138 channel_messages(channel)
2139 == [
2140 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2141 ("user_a".to_string(), "oh, hi B.".to_string(), false),
2142 ("user_a".to_string(), "sup".to_string(), false),
2143 ("user_b".to_string(), "can you see this?".to_string(), false),
2144 ("user_a".to_string(), "you online?".to_string(), false),
2145 ("user_b".to_string(), "yep".to_string(), false),
2146 ]
2147 })
2148 .await;
2149 }
2150
2151 #[gpui::test]
2152 async fn test_contacts(
2153 mut cx_a: TestAppContext,
2154 mut cx_b: TestAppContext,
2155 mut cx_c: TestAppContext,
2156 ) {
2157 cx_a.foreground().forbid_parking();
2158 let lang_registry = Arc::new(LanguageRegistry::new());
2159
2160 // Connect to a server as 3 clients.
2161 let mut server = TestServer::start().await;
2162 let client_a = server.create_client(&mut cx_a, "user_a").await;
2163 let client_b = server.create_client(&mut cx_b, "user_b").await;
2164 let client_c = server.create_client(&mut cx_c, "user_c").await;
2165
2166 let fs = Arc::new(FakeFs::new());
2167
2168 // Share a worktree as client A.
2169 fs.insert_tree(
2170 "/a",
2171 json!({
2172 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2173 }),
2174 )
2175 .await;
2176
2177 let worktree_a = Worktree::open_local(
2178 client_a.clone(),
2179 client_a.user_store.clone(),
2180 "/a".as_ref(),
2181 fs.clone(),
2182 lang_registry.clone(),
2183 &mut cx_a.to_async(),
2184 )
2185 .await
2186 .unwrap();
2187
2188 client_a
2189 .user_store
2190 .condition(&cx_a, |user_store, _| {
2191 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2192 })
2193 .await;
2194 client_b
2195 .user_store
2196 .condition(&cx_b, |user_store, _| {
2197 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2198 })
2199 .await;
2200 client_c
2201 .user_store
2202 .condition(&cx_c, |user_store, _| {
2203 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2204 })
2205 .await;
2206
2207 let worktree_id = worktree_a
2208 .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
2209 .await
2210 .unwrap();
2211
2212 let _worktree_b = Worktree::open_remote(
2213 client_b.clone(),
2214 worktree_id,
2215 lang_registry.clone(),
2216 client_b.user_store.clone(),
2217 &mut cx_b.to_async(),
2218 )
2219 .await
2220 .unwrap();
2221
2222 client_a
2223 .user_store
2224 .condition(&cx_a, |user_store, _| {
2225 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2226 })
2227 .await;
2228 client_b
2229 .user_store
2230 .condition(&cx_b, |user_store, _| {
2231 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2232 })
2233 .await;
2234 client_c
2235 .user_store
2236 .condition(&cx_c, |user_store, _| {
2237 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2238 })
2239 .await;
2240
2241 cx_a.update(move |_| drop(worktree_a));
2242 client_a
2243 .user_store
2244 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
2245 .await;
2246 client_b
2247 .user_store
2248 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
2249 .await;
2250 client_c
2251 .user_store
2252 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
2253 .await;
2254
2255 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
2256 user_store
2257 .contacts()
2258 .iter()
2259 .map(|contact| {
2260 let worktrees = contact
2261 .worktrees
2262 .iter()
2263 .map(|w| {
2264 (
2265 w.root_name.as_str(),
2266 w.guests.iter().map(|p| p.github_login.as_str()).collect(),
2267 )
2268 })
2269 .collect();
2270 (contact.user.github_login.as_str(), worktrees)
2271 })
2272 .collect()
2273 }
2274 }
2275
2276 struct TestServer {
2277 peer: Arc<Peer>,
2278 app_state: Arc<AppState>,
2279 server: Arc<Server>,
2280 notifications: mpsc::Receiver<()>,
2281 connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
2282 forbid_connections: Arc<AtomicBool>,
2283 _test_db: TestDb,
2284 }
2285
2286 impl TestServer {
2287 async fn start() -> Self {
2288 let test_db = TestDb::new();
2289 let app_state = Self::build_app_state(&test_db).await;
2290 let peer = Peer::new();
2291 let notifications = mpsc::channel(128);
2292 let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
2293 Self {
2294 peer,
2295 app_state,
2296 server,
2297 notifications: notifications.1,
2298 connection_killers: Default::default(),
2299 forbid_connections: Default::default(),
2300 _test_db: test_db,
2301 }
2302 }
2303
2304 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
2305 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
2306 let client_name = name.to_string();
2307 let mut client = Client::new();
2308 let server = self.server.clone();
2309 let connection_killers = self.connection_killers.clone();
2310 let forbid_connections = self.forbid_connections.clone();
2311 let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
2312
2313 Arc::get_mut(&mut client)
2314 .unwrap()
2315 .override_authenticate(move |cx| {
2316 cx.spawn(|_| async move {
2317 let access_token = "the-token".to_string();
2318 Ok(Credentials {
2319 user_id: user_id.0 as u64,
2320 access_token,
2321 })
2322 })
2323 })
2324 .override_establish_connection(move |credentials, cx| {
2325 assert_eq!(credentials.user_id, user_id.0 as u64);
2326 assert_eq!(credentials.access_token, "the-token");
2327
2328 let server = server.clone();
2329 let connection_killers = connection_killers.clone();
2330 let forbid_connections = forbid_connections.clone();
2331 let client_name = client_name.clone();
2332 let connection_id_tx = connection_id_tx.clone();
2333 cx.spawn(move |cx| async move {
2334 if forbid_connections.load(SeqCst) {
2335 Err(EstablishConnectionError::other(anyhow!(
2336 "server is forbidding connections"
2337 )))
2338 } else {
2339 let (client_conn, server_conn, kill_conn) = Connection::in_memory();
2340 connection_killers.lock().insert(user_id, kill_conn);
2341 cx.background()
2342 .spawn(server.handle_connection(
2343 server_conn,
2344 client_name,
2345 user_id,
2346 Some(connection_id_tx),
2347 ))
2348 .detach();
2349 Ok(client_conn)
2350 }
2351 })
2352 });
2353
2354 let http = FakeHttpClient::new(|_| async move { Ok(surf::http::Response::new(404)) });
2355 client
2356 .authenticate_and_connect(&cx.to_async())
2357 .await
2358 .unwrap();
2359
2360 let peer_id = PeerId(connection_id_rx.recv().await.unwrap().0);
2361 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
2362 let mut authed_user =
2363 user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
2364 while authed_user.recv().await.unwrap().is_none() {}
2365
2366 TestClient {
2367 client,
2368 peer_id,
2369 user_store,
2370 }
2371 }
2372
2373 fn disconnect_client(&self, user_id: UserId) {
2374 if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
2375 let _ = kill_conn.try_send(Some(()));
2376 }
2377 }
2378
2379 fn forbid_connections(&self) {
2380 self.forbid_connections.store(true, SeqCst);
2381 }
2382
2383 fn allow_connections(&self) {
2384 self.forbid_connections.store(false, SeqCst);
2385 }
2386
2387 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
2388 let mut config = Config::default();
2389 config.session_secret = "a".repeat(32);
2390 config.database_url = test_db.url.clone();
2391 let github_client = github::AppClient::test();
2392 Arc::new(AppState {
2393 db: test_db.db().clone(),
2394 handlebars: Default::default(),
2395 auth_client: auth::build_client("", ""),
2396 repo_client: github::RepoClient::test(&github_client),
2397 github_client,
2398 config,
2399 })
2400 }
2401
2402 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
2403 self.server.store.read()
2404 }
2405
2406 async fn condition<F>(&mut self, mut predicate: F)
2407 where
2408 F: FnMut(&Store) -> bool,
2409 {
2410 async_std::future::timeout(Duration::from_millis(500), async {
2411 while !(predicate)(&*self.server.store.read()) {
2412 self.notifications.recv().await;
2413 }
2414 })
2415 .await
2416 .expect("condition timed out");
2417 }
2418 }
2419
2420 impl Drop for TestServer {
2421 fn drop(&mut self) {
2422 task::block_on(self.peer.reset());
2423 }
2424 }
2425
2426 struct TestClient {
2427 client: Arc<Client>,
2428 pub peer_id: PeerId,
2429 pub user_store: ModelHandle<UserStore>,
2430 }
2431
2432 impl Deref for TestClient {
2433 type Target = Arc<Client>;
2434
2435 fn deref(&self) -> &Self::Target {
2436 &self.client
2437 }
2438 }
2439
2440 impl TestClient {
2441 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
2442 UserId::from_proto(
2443 self.user_store
2444 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
2445 )
2446 }
2447 }
2448
2449 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
2450 channel
2451 .messages()
2452 .cursor::<()>()
2453 .map(|m| {
2454 (
2455 m.sender.github_login.clone(),
2456 m.body.clone(),
2457 m.is_pending(),
2458 )
2459 })
2460 .collect()
2461 }
2462
2463 struct EmptyView;
2464
2465 impl gpui::Entity for EmptyView {
2466 type Event = ();
2467 }
2468
2469 impl gpui::View for EmptyView {
2470 fn ui_name() -> &'static str {
2471 "empty view"
2472 }
2473
2474 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
2475 gpui::Element::boxed(gpui::elements::Empty)
2476 }
2477 }
2478}