1mod store;
2
3use super::{
4 auth::process_auth_header,
5 db::{ChannelId, MessageId, UserId},
6 AppState,
7};
8use anyhow::anyhow;
9use async_std::task;
10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
11use collections::{HashMap, HashSet};
12use futures::{future::BoxFuture, FutureExt};
13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
14use postage::{mpsc, prelude::Sink as _, prelude::Stream as _};
15use rpc::{
16 proto::{self, AnyTypedEnvelope, EnvelopedMessage},
17 Connection, ConnectionId, Peer, TypedEnvelope,
18};
19use sha1::{Digest as _, Sha1};
20use std::{any::TypeId, future::Future, mem, sync::Arc, time::Instant};
21use store::{Store, Worktree};
22use surf::StatusCode;
23use tide::log;
24use tide::{
25 http::headers::{HeaderName, CONNECTION, UPGRADE},
26 Request, Response,
27};
28use time::OffsetDateTime;
29
30type MessageHandler = Box<
31 dyn Send
32 + Sync
33 + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
34>;
35
36pub struct Server {
37 peer: Arc<Peer>,
38 store: RwLock<Store>,
39 app_state: Arc<AppState>,
40 handlers: HashMap<TypeId, MessageHandler>,
41 notifications: Option<mpsc::Sender<()>>,
42}
43
44const MESSAGE_COUNT_PER_PAGE: usize = 100;
45const MAX_MESSAGE_LEN: usize = 1024;
46
47impl Server {
48 pub fn new(
49 app_state: Arc<AppState>,
50 peer: Arc<Peer>,
51 notifications: Option<mpsc::Sender<()>>,
52 ) -> Arc<Self> {
53 let mut server = Self {
54 peer,
55 app_state,
56 store: Default::default(),
57 handlers: Default::default(),
58 notifications,
59 };
60
61 server
62 .add_handler(Server::ping)
63 .add_handler(Server::register_worktree)
64 .add_handler(Server::unregister_worktree)
65 .add_handler(Server::share_worktree)
66 .add_handler(Server::unshare_worktree)
67 .add_handler(Server::join_worktree)
68 .add_handler(Server::leave_worktree)
69 .add_handler(Server::update_worktree)
70 .add_handler(Server::open_buffer)
71 .add_handler(Server::close_buffer)
72 .add_handler(Server::update_buffer)
73 .add_handler(Server::buffer_saved)
74 .add_handler(Server::save_buffer)
75 .add_handler(Server::get_channels)
76 .add_handler(Server::get_users)
77 .add_handler(Server::join_channel)
78 .add_handler(Server::leave_channel)
79 .add_handler(Server::send_channel_message)
80 .add_handler(Server::get_channel_messages);
81
82 Arc::new(server)
83 }
84
85 fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
86 where
87 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
88 Fut: 'static + Send + Future<Output = tide::Result<()>>,
89 M: EnvelopedMessage,
90 {
91 let prev_handler = self.handlers.insert(
92 TypeId::of::<M>(),
93 Box::new(move |server, envelope| {
94 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
95 (handler)(server, *envelope).boxed()
96 }),
97 );
98 if prev_handler.is_some() {
99 panic!("registered a handler for the same message twice");
100 }
101 self
102 }
103
104 pub fn handle_connection(
105 self: &Arc<Self>,
106 connection: Connection,
107 addr: String,
108 user_id: UserId,
109 mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
110 ) -> impl Future<Output = ()> {
111 let mut this = self.clone();
112 async move {
113 let (connection_id, handle_io, mut incoming_rx) =
114 this.peer.add_connection(connection).await;
115
116 if let Some(send_connection_id) = send_connection_id.as_mut() {
117 let _ = send_connection_id.send(connection_id).await;
118 }
119
120 this.state_mut().add_connection(connection_id, user_id);
121 if let Err(err) = this.update_contacts_for_users(&[user_id]).await {
122 log::error!("error updating contacts for {:?}: {}", user_id, err);
123 }
124
125 let handle_io = handle_io.fuse();
126 futures::pin_mut!(handle_io);
127 loop {
128 let next_message = incoming_rx.recv().fuse();
129 futures::pin_mut!(next_message);
130 futures::select_biased! {
131 message = next_message => {
132 if let Some(message) = message {
133 let start_time = Instant::now();
134 log::info!("RPC message received: {}", message.payload_type_name());
135 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
136 if let Err(err) = (handler)(this.clone(), message).await {
137 log::error!("error handling message: {:?}", err);
138 } else {
139 log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
140 }
141
142 if let Some(mut notifications) = this.notifications.clone() {
143 let _ = notifications.send(()).await;
144 }
145 } else {
146 log::warn!("unhandled message: {}", message.payload_type_name());
147 }
148 } else {
149 log::info!("rpc connection closed {:?}", addr);
150 break;
151 }
152 }
153 handle_io = handle_io => {
154 if let Err(err) = handle_io {
155 log::error!("error handling rpc connection {:?} - {:?}", addr, err);
156 }
157 break;
158 }
159 }
160 }
161
162 if let Err(err) = this.sign_out(connection_id).await {
163 log::error!("error signing out connection {:?} - {:?}", addr, err);
164 }
165 }
166 }
167
168 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
169 self.peer.disconnect(connection_id).await;
170 let removed_connection = self.state_mut().remove_connection(connection_id)?;
171
172 for (project_id, project) in removed_connection.hosted_projects {
173 if let Some(share) = project.share {
174 broadcast(
175 connection_id,
176 share.guests.keys().copied().collect(),
177 |conn_id| {
178 self.peer
179 .send(conn_id, proto::UnshareProject { project_id })
180 },
181 )
182 .await?;
183 }
184 }
185
186 for (project_id, peer_ids) in removed_connection.guest_project_ids {
187 broadcast(connection_id, peer_ids, |conn_id| {
188 self.peer.send(
189 conn_id,
190 proto::RemoveProjectCollaborator {
191 project_id,
192 peer_id: connection_id.0,
193 },
194 )
195 })
196 .await?;
197 }
198
199 self.update_contacts_for_users(removed_connection.contact_ids.iter())
200 .await?;
201
202 Ok(())
203 }
204
205 async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
206 self.peer.respond(request.receipt(), proto::Ack {}).await?;
207 Ok(())
208 }
209
210 async fn register_worktree(
211 mut self: Arc<Server>,
212 request: TypedEnvelope<proto::RegisterWorktree>,
213 ) -> tide::Result<()> {
214 let receipt = request.receipt();
215 let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
216
217 let mut contact_user_ids = HashSet::default();
218 contact_user_ids.insert(host_user_id);
219 for github_login in request.payload.authorized_logins {
220 match self.app_state.db.create_user(&github_login, false).await {
221 Ok(contact_user_id) => {
222 contact_user_ids.insert(contact_user_id);
223 }
224 Err(err) => {
225 let message = err.to_string();
226 self.peer
227 .respond_with_error(receipt, proto::Error { message })
228 .await?;
229 return Ok(());
230 }
231 }
232 }
233
234 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
235 let ok = self.state_mut().register_worktree(
236 request.project_id,
237 request.worktree_id,
238 Worktree {
239 authorized_user_ids: contact_user_ids.clone(),
240 root_name: request.payload.root_name,
241 },
242 );
243
244 if ok {
245 self.peer.respond(receipt, proto::Ack {}).await?;
246 self.update_contacts_for_users(&contact_user_ids).await?;
247 } else {
248 self.peer
249 .respond_with_error(
250 receipt,
251 proto::Error {
252 message: "no such project".to_string(),
253 },
254 )
255 .await?;
256 }
257
258 Ok(())
259 }
260
261 async fn unregister_worktree(
262 mut self: Arc<Server>,
263 request: TypedEnvelope<proto::UnregisterWorktree>,
264 ) -> tide::Result<()> {
265 let project_id = request.payload.project_id;
266 let worktree_id = request.payload.worktree_id;
267 let worktree =
268 self.state_mut()
269 .unregister_worktree(project_id, worktree_id, request.sender_id)?;
270
271 if let Some(share) = worktree.share {
272 broadcast(
273 request.sender_id,
274 share.guests.keys().copied().collect(),
275 |conn_id| {
276 self.peer.send(
277 conn_id,
278 proto::UnregisterWorktree {
279 project_id,
280 worktree_id,
281 },
282 )
283 },
284 )
285 .await?;
286 }
287 self.update_contacts_for_users(&worktree.authorized_user_ids)
288 .await?;
289 Ok(())
290 }
291
292 async fn share_worktree(
293 mut self: Arc<Server>,
294 mut request: TypedEnvelope<proto::ShareWorktree>,
295 ) -> tide::Result<()> {
296 let worktree = request
297 .payload
298 .worktree
299 .as_mut()
300 .ok_or_else(|| anyhow!("missing worktree"))?;
301 let entries = mem::take(&mut worktree.entries)
302 .into_iter()
303 .map(|entry| (entry.id, entry))
304 .collect();
305
306 let contact_user_ids =
307 self.state_mut()
308 .share_worktree(worktree.id, request.sender_id, entries);
309 if let Some(contact_user_ids) = contact_user_ids {
310 self.peer
311 .respond(request.receipt(), proto::ShareWorktreeResponse {})
312 .await?;
313 self.update_contacts_for_users(&contact_user_ids).await?;
314 } else {
315 self.peer
316 .respond_with_error(
317 request.receipt(),
318 proto::Error {
319 message: "no such worktree".to_string(),
320 },
321 )
322 .await?;
323 }
324 Ok(())
325 }
326
327 async fn unshare_worktree(
328 mut self: Arc<Server>,
329 request: TypedEnvelope<proto::UnshareWorktree>,
330 ) -> tide::Result<()> {
331 let worktree_id = request.payload.worktree_id;
332 let worktree = self
333 .state_mut()
334 .unshare_worktree(worktree_id, request.sender_id)?;
335
336 broadcast(request.sender_id, worktree.connection_ids, |conn_id| {
337 self.peer
338 .send(conn_id, proto::UnshareWorktree { worktree_id })
339 })
340 .await?;
341 self.update_contacts_for_users(&worktree.authorized_user_ids)
342 .await?;
343
344 Ok(())
345 }
346
347 async fn join_worktree(
348 mut self: Arc<Server>,
349 request: TypedEnvelope<proto::JoinWorktree>,
350 ) -> tide::Result<()> {
351 let worktree_id = request.payload.worktree_id;
352
353 let user_id = self.state().user_id_for_connection(request.sender_id)?;
354 let response_data = self
355 .state_mut()
356 .join_worktree(request.sender_id, user_id, worktree_id)
357 .and_then(|joined| {
358 let share = joined.worktree.share()?;
359 let peer_count = share.guests.len();
360 let mut collaborators = Vec::with_capacity(peer_count);
361 collaborators.push(proto::Collaborator {
362 peer_id: joined.worktree.host_connection_id.0,
363 replica_id: 0,
364 user_id: joined.worktree.host_user_id.to_proto(),
365 });
366 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
367 if *peer_conn_id != request.sender_id {
368 collaborators.push(proto::Collaborator {
369 peer_id: peer_conn_id.0,
370 replica_id: *peer_replica_id as u32,
371 user_id: peer_user_id.to_proto(),
372 });
373 }
374 }
375 let response = proto::JoinWorktreeResponse {
376 worktree: Some(proto::Worktree {
377 id: worktree_id,
378 root_name: joined.worktree.root_name.clone(),
379 entries: share.entries.values().cloned().collect(),
380 }),
381 replica_id: joined.replica_id as u32,
382 collaborators,
383 };
384 let connection_ids = joined.worktree.connection_ids();
385 let contact_user_ids = joined.worktree.authorized_user_ids.clone();
386 Ok((response, connection_ids, contact_user_ids))
387 });
388
389 match response_data {
390 Ok((response, connection_ids, contact_user_ids)) => {
391 broadcast(request.sender_id, connection_ids, |conn_id| {
392 self.peer.send(
393 conn_id,
394 proto::AddCollaborator {
395 worktree_id,
396 collaborator: Some(proto::Collaborator {
397 peer_id: request.sender_id.0,
398 replica_id: response.replica_id,
399 user_id: user_id.to_proto(),
400 }),
401 },
402 )
403 })
404 .await?;
405 self.peer.respond(request.receipt(), response).await?;
406 self.update_contacts_for_users(&contact_user_ids).await?;
407 }
408 Err(error) => {
409 self.peer
410 .respond_with_error(
411 request.receipt(),
412 proto::Error {
413 message: error.to_string(),
414 },
415 )
416 .await?;
417 }
418 }
419
420 Ok(())
421 }
422
423 async fn leave_worktree(
424 mut self: Arc<Server>,
425 request: TypedEnvelope<proto::LeaveWorktree>,
426 ) -> tide::Result<()> {
427 let sender_id = request.sender_id;
428 let worktree_id = request.payload.worktree_id;
429 let worktree = self.state_mut().leave_worktree(sender_id, worktree_id);
430 if let Some(worktree) = worktree {
431 broadcast(sender_id, worktree.connection_ids, |conn_id| {
432 self.peer.send(
433 conn_id,
434 proto::RemoveCollaborator {
435 worktree_id,
436 peer_id: sender_id.0,
437 },
438 )
439 })
440 .await?;
441 self.update_contacts_for_users(&worktree.authorized_user_ids)
442 .await?;
443 }
444 Ok(())
445 }
446
447 async fn update_worktree(
448 mut self: Arc<Server>,
449 request: TypedEnvelope<proto::UpdateWorktree>,
450 ) -> tide::Result<()> {
451 let connection_ids = self.state_mut().update_worktree(
452 request.sender_id,
453 request.payload.worktree_id,
454 &request.payload.removed_entries,
455 &request.payload.updated_entries,
456 )?;
457
458 broadcast(request.sender_id, connection_ids, |connection_id| {
459 self.peer
460 .forward_send(request.sender_id, connection_id, request.payload.clone())
461 })
462 .await?;
463
464 Ok(())
465 }
466
467 async fn open_buffer(
468 self: Arc<Server>,
469 request: TypedEnvelope<proto::OpenBuffer>,
470 ) -> tide::Result<()> {
471 let receipt = request.receipt();
472 let host_connection_id = self
473 .state()
474 .worktree_host_connection_id(request.sender_id, request.payload.worktree_id)?;
475 let response = self
476 .peer
477 .forward_request(request.sender_id, host_connection_id, request.payload)
478 .await?;
479 self.peer.respond(receipt, response).await?;
480 Ok(())
481 }
482
483 async fn close_buffer(
484 self: Arc<Server>,
485 request: TypedEnvelope<proto::CloseBuffer>,
486 ) -> tide::Result<()> {
487 let host_connection_id = self
488 .state()
489 .worktree_host_connection_id(request.sender_id, request.payload.worktree_id)?;
490 self.peer
491 .forward_send(request.sender_id, host_connection_id, request.payload)
492 .await?;
493 Ok(())
494 }
495
496 async fn save_buffer(
497 self: Arc<Server>,
498 request: TypedEnvelope<proto::SaveBuffer>,
499 ) -> tide::Result<()> {
500 let host;
501 let guests;
502 {
503 let state = self.state();
504 host = state
505 .worktree_host_connection_id(request.sender_id, request.payload.worktree_id)?;
506 guests = state
507 .worktree_guest_connection_ids(request.sender_id, request.payload.worktree_id)?;
508 }
509
510 let sender = request.sender_id;
511 let receipt = request.receipt();
512 let response = self
513 .peer
514 .forward_request(sender, host, request.payload.clone())
515 .await?;
516
517 broadcast(host, guests, |conn_id| {
518 let response = response.clone();
519 let peer = &self.peer;
520 async move {
521 if conn_id == sender {
522 peer.respond(receipt, response).await
523 } else {
524 peer.forward_send(host, conn_id, response).await
525 }
526 }
527 })
528 .await?;
529
530 Ok(())
531 }
532
533 async fn update_buffer(
534 self: Arc<Server>,
535 request: TypedEnvelope<proto::UpdateBuffer>,
536 ) -> tide::Result<()> {
537 let receiver_ids = self
538 .state()
539 .worktree_connection_ids(request.sender_id, request.payload.worktree_id)?;
540 broadcast(request.sender_id, receiver_ids, |connection_id| {
541 self.peer
542 .forward_send(request.sender_id, connection_id, request.payload.clone())
543 })
544 .await?;
545 self.peer.respond(request.receipt(), proto::Ack {}).await?;
546 Ok(())
547 }
548
549 async fn buffer_saved(
550 self: Arc<Server>,
551 request: TypedEnvelope<proto::BufferSaved>,
552 ) -> tide::Result<()> {
553 let receiver_ids = self
554 .state()
555 .worktree_connection_ids(request.sender_id, request.payload.worktree_id)?;
556 broadcast(request.sender_id, receiver_ids, |connection_id| {
557 self.peer
558 .forward_send(request.sender_id, connection_id, request.payload.clone())
559 })
560 .await?;
561 Ok(())
562 }
563
564 async fn get_channels(
565 self: Arc<Server>,
566 request: TypedEnvelope<proto::GetChannels>,
567 ) -> tide::Result<()> {
568 let user_id = self.state().user_id_for_connection(request.sender_id)?;
569 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
570 self.peer
571 .respond(
572 request.receipt(),
573 proto::GetChannelsResponse {
574 channels: channels
575 .into_iter()
576 .map(|chan| proto::Channel {
577 id: chan.id.to_proto(),
578 name: chan.name,
579 })
580 .collect(),
581 },
582 )
583 .await?;
584 Ok(())
585 }
586
587 async fn get_users(
588 self: Arc<Server>,
589 request: TypedEnvelope<proto::GetUsers>,
590 ) -> tide::Result<()> {
591 let receipt = request.receipt();
592 let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
593 let users = self
594 .app_state
595 .db
596 .get_users_by_ids(user_ids)
597 .await?
598 .into_iter()
599 .map(|user| proto::User {
600 id: user.id.to_proto(),
601 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
602 github_login: user.github_login,
603 })
604 .collect();
605 self.peer
606 .respond(receipt, proto::GetUsersResponse { users })
607 .await?;
608 Ok(())
609 }
610
611 async fn update_contacts_for_users<'a>(
612 self: &Arc<Server>,
613 user_ids: impl IntoIterator<Item = &'a UserId>,
614 ) -> tide::Result<()> {
615 let mut send_futures = Vec::new();
616
617 {
618 let state = self.state();
619 for user_id in user_ids {
620 let contacts = state.contacts_for_user(*user_id);
621 for connection_id in state.connection_ids_for_user(*user_id) {
622 send_futures.push(self.peer.send(
623 connection_id,
624 proto::UpdateContacts {
625 contacts: contacts.clone(),
626 },
627 ));
628 }
629 }
630 }
631 futures::future::try_join_all(send_futures).await?;
632
633 Ok(())
634 }
635
636 async fn join_channel(
637 mut self: Arc<Self>,
638 request: TypedEnvelope<proto::JoinChannel>,
639 ) -> tide::Result<()> {
640 let user_id = self.state().user_id_for_connection(request.sender_id)?;
641 let channel_id = ChannelId::from_proto(request.payload.channel_id);
642 if !self
643 .app_state
644 .db
645 .can_user_access_channel(user_id, channel_id)
646 .await?
647 {
648 Err(anyhow!("access denied"))?;
649 }
650
651 self.state_mut().join_channel(request.sender_id, channel_id);
652 let messages = self
653 .app_state
654 .db
655 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
656 .await?
657 .into_iter()
658 .map(|msg| proto::ChannelMessage {
659 id: msg.id.to_proto(),
660 body: msg.body,
661 timestamp: msg.sent_at.unix_timestamp() as u64,
662 sender_id: msg.sender_id.to_proto(),
663 nonce: Some(msg.nonce.as_u128().into()),
664 })
665 .collect::<Vec<_>>();
666 self.peer
667 .respond(
668 request.receipt(),
669 proto::JoinChannelResponse {
670 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
671 messages,
672 },
673 )
674 .await?;
675 Ok(())
676 }
677
678 async fn leave_channel(
679 mut self: Arc<Self>,
680 request: TypedEnvelope<proto::LeaveChannel>,
681 ) -> tide::Result<()> {
682 let user_id = self.state().user_id_for_connection(request.sender_id)?;
683 let channel_id = ChannelId::from_proto(request.payload.channel_id);
684 if !self
685 .app_state
686 .db
687 .can_user_access_channel(user_id, channel_id)
688 .await?
689 {
690 Err(anyhow!("access denied"))?;
691 }
692
693 self.state_mut()
694 .leave_channel(request.sender_id, channel_id);
695
696 Ok(())
697 }
698
699 async fn send_channel_message(
700 self: Arc<Self>,
701 request: TypedEnvelope<proto::SendChannelMessage>,
702 ) -> tide::Result<()> {
703 let receipt = request.receipt();
704 let channel_id = ChannelId::from_proto(request.payload.channel_id);
705 let user_id;
706 let connection_ids;
707 {
708 let state = self.state();
709 user_id = state.user_id_for_connection(request.sender_id)?;
710 if let Some(ids) = state.channel_connection_ids(channel_id) {
711 connection_ids = ids;
712 } else {
713 return Ok(());
714 }
715 }
716
717 // Validate the message body.
718 let body = request.payload.body.trim().to_string();
719 if body.len() > MAX_MESSAGE_LEN {
720 self.peer
721 .respond_with_error(
722 receipt,
723 proto::Error {
724 message: "message is too long".to_string(),
725 },
726 )
727 .await?;
728 return Ok(());
729 }
730 if body.is_empty() {
731 self.peer
732 .respond_with_error(
733 receipt,
734 proto::Error {
735 message: "message can't be blank".to_string(),
736 },
737 )
738 .await?;
739 return Ok(());
740 }
741
742 let timestamp = OffsetDateTime::now_utc();
743 let nonce = if let Some(nonce) = request.payload.nonce {
744 nonce
745 } else {
746 self.peer
747 .respond_with_error(
748 receipt,
749 proto::Error {
750 message: "nonce can't be blank".to_string(),
751 },
752 )
753 .await?;
754 return Ok(());
755 };
756
757 let message_id = self
758 .app_state
759 .db
760 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
761 .await?
762 .to_proto();
763 let message = proto::ChannelMessage {
764 sender_id: user_id.to_proto(),
765 id: message_id,
766 body,
767 timestamp: timestamp.unix_timestamp() as u64,
768 nonce: Some(nonce),
769 };
770 broadcast(request.sender_id, connection_ids, |conn_id| {
771 self.peer.send(
772 conn_id,
773 proto::ChannelMessageSent {
774 channel_id: channel_id.to_proto(),
775 message: Some(message.clone()),
776 },
777 )
778 })
779 .await?;
780 self.peer
781 .respond(
782 receipt,
783 proto::SendChannelMessageResponse {
784 message: Some(message),
785 },
786 )
787 .await?;
788 Ok(())
789 }
790
791 async fn get_channel_messages(
792 self: Arc<Self>,
793 request: TypedEnvelope<proto::GetChannelMessages>,
794 ) -> tide::Result<()> {
795 let user_id = self.state().user_id_for_connection(request.sender_id)?;
796 let channel_id = ChannelId::from_proto(request.payload.channel_id);
797 if !self
798 .app_state
799 .db
800 .can_user_access_channel(user_id, channel_id)
801 .await?
802 {
803 Err(anyhow!("access denied"))?;
804 }
805
806 let messages = self
807 .app_state
808 .db
809 .get_channel_messages(
810 channel_id,
811 MESSAGE_COUNT_PER_PAGE,
812 Some(MessageId::from_proto(request.payload.before_message_id)),
813 )
814 .await?
815 .into_iter()
816 .map(|msg| proto::ChannelMessage {
817 id: msg.id.to_proto(),
818 body: msg.body,
819 timestamp: msg.sent_at.unix_timestamp() as u64,
820 sender_id: msg.sender_id.to_proto(),
821 nonce: Some(msg.nonce.as_u128().into()),
822 })
823 .collect::<Vec<_>>();
824 self.peer
825 .respond(
826 request.receipt(),
827 proto::GetChannelMessagesResponse {
828 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
829 messages,
830 },
831 )
832 .await?;
833 Ok(())
834 }
835
836 fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
837 self.store.read()
838 }
839
840 fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
841 self.store.write()
842 }
843}
844
845pub async fn broadcast<F, T>(
846 sender_id: ConnectionId,
847 receiver_ids: Vec<ConnectionId>,
848 mut f: F,
849) -> anyhow::Result<()>
850where
851 F: FnMut(ConnectionId) -> T,
852 T: Future<Output = anyhow::Result<()>>,
853{
854 let futures = receiver_ids
855 .into_iter()
856 .filter(|id| *id != sender_id)
857 .map(|id| f(id));
858 futures::future::try_join_all(futures).await?;
859 Ok(())
860}
861
862pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
863 let server = Server::new(app.state().clone(), rpc.clone(), None);
864 app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
865 let server = server.clone();
866 async move {
867 const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
868
869 let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
870 let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
871 let upgrade_requested = connection_upgrade && upgrade_to_websocket;
872 let client_protocol_version: Option<u32> = request
873 .header("X-Zed-Protocol-Version")
874 .and_then(|v| v.as_str().parse().ok());
875
876 if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
877 return Ok(Response::new(StatusCode::UpgradeRequired));
878 }
879
880 let header = match request.header("Sec-Websocket-Key") {
881 Some(h) => h.as_str(),
882 None => return Err(anyhow!("expected sec-websocket-key"))?,
883 };
884
885 let user_id = process_auth_header(&request).await?;
886
887 let mut response = Response::new(StatusCode::SwitchingProtocols);
888 response.insert_header(UPGRADE, "websocket");
889 response.insert_header(CONNECTION, "Upgrade");
890 let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
891 response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
892 response.insert_header("Sec-Websocket-Version", "13");
893
894 let http_res: &mut tide::http::Response = response.as_mut();
895 let upgrade_receiver = http_res.recv_upgrade().await;
896 let addr = request.remote().unwrap_or("unknown").to_string();
897 task::spawn(async move {
898 if let Some(stream) = upgrade_receiver.await {
899 server
900 .handle_connection(
901 Connection::new(
902 WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
903 ),
904 addr,
905 user_id,
906 None,
907 )
908 .await;
909 }
910 });
911
912 Ok(response)
913 }
914 });
915}
916
917fn header_contains_ignore_case<T>(
918 request: &tide::Request<T>,
919 header_name: HeaderName,
920 value: &str,
921) -> bool {
922 request
923 .header(header_name)
924 .map(|h| {
925 h.as_str()
926 .split(',')
927 .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
928 })
929 .unwrap_or(false)
930}
931
932#[cfg(test)]
933mod tests {
934 use super::*;
935 use crate::{
936 auth,
937 db::{tests::TestDb, UserId},
938 github, AppState, Config,
939 };
940 use ::rpc::Peer;
941 use async_std::task;
942 use gpui::{ModelHandle, TestAppContext};
943 use parking_lot::Mutex;
944 use postage::{mpsc, watch};
945 use rpc::PeerId;
946 use serde_json::json;
947 use sqlx::types::time::OffsetDateTime;
948 use std::{
949 ops::Deref,
950 path::Path,
951 sync::{
952 atomic::{AtomicBool, Ordering::SeqCst},
953 Arc,
954 },
955 time::Duration,
956 };
957 use zed::{
958 client::{
959 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
960 EstablishConnectionError, UserStore,
961 },
962 contacts_panel::JoinWorktree,
963 editor::{Editor, EditorSettings, Input, MultiBuffer},
964 fs::{FakeFs, Fs as _},
965 language::{
966 tree_sitter_rust, Diagnostic, DiagnosticEntry, Language, LanguageConfig,
967 LanguageRegistry, LanguageServerConfig, Point,
968 },
969 lsp,
970 project::{ProjectPath, Worktree},
971 test::test_app_state,
972 workspace::Workspace,
973 };
974
975 #[gpui::test]
976 async fn test_share_worktree(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
977 let (window_b, _) = cx_b.add_window(|_| EmptyView);
978 let lang_registry = Arc::new(LanguageRegistry::new());
979
980 // Connect to a server as 2 clients.
981 let mut server = TestServer::start().await;
982 let client_a = server.create_client(&mut cx_a, "user_a").await;
983 let client_b = server.create_client(&mut cx_b, "user_b").await;
984
985 cx_a.foreground().forbid_parking();
986
987 // Share a local worktree as client A
988 let fs = Arc::new(FakeFs::new());
989 fs.insert_tree(
990 "/a",
991 json!({
992 ".zed.toml": r#"collaborators = ["user_b"]"#,
993 "a.txt": "a-contents",
994 "b.txt": "b-contents",
995 }),
996 )
997 .await;
998 let worktree_a = Worktree::open_local(
999 client_a.clone(),
1000 client_a.user_store.clone(),
1001 "/a".as_ref(),
1002 fs,
1003 lang_registry.clone(),
1004 &mut cx_a.to_async(),
1005 )
1006 .await
1007 .unwrap();
1008 worktree_a
1009 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1010 .await;
1011 let worktree_id = worktree_a
1012 .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1013 .await
1014 .unwrap();
1015
1016 // Join that worktree as client B, and see that a guest has joined as client A.
1017 let worktree_b = Worktree::open_remote(
1018 client_b.clone(),
1019 worktree_id,
1020 lang_registry.clone(),
1021 client_b.user_store.clone(),
1022 &mut cx_b.to_async(),
1023 )
1024 .await
1025 .unwrap();
1026
1027 let replica_id_b = worktree_b.read_with(&cx_b, |tree, _| {
1028 assert_eq!(
1029 tree.collaborators()
1030 .get(&client_a.peer_id)
1031 .unwrap()
1032 .user
1033 .github_login,
1034 "user_a"
1035 );
1036 tree.replica_id()
1037 });
1038 worktree_a
1039 .condition(&cx_a, |tree, _| {
1040 tree.collaborators()
1041 .get(&client_b.peer_id)
1042 .map_or(false, |collaborator| {
1043 collaborator.replica_id == replica_id_b
1044 && collaborator.user.github_login == "user_b"
1045 })
1046 })
1047 .await;
1048
1049 // Open the same file as client B and client A.
1050 let buffer_b = worktree_b
1051 .update(&mut cx_b, |worktree, cx| worktree.open_buffer("b.txt", cx))
1052 .await
1053 .unwrap();
1054 let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1055 buffer_b.read_with(&cx_b, |buf, cx| {
1056 assert_eq!(buf.read(cx).text(), "b-contents")
1057 });
1058 worktree_a.read_with(&cx_a, |tree, cx| assert!(tree.has_open_buffer("b.txt", cx)));
1059 let buffer_a = worktree_a
1060 .update(&mut cx_a, |tree, cx| tree.open_buffer("b.txt", cx))
1061 .await
1062 .unwrap();
1063
1064 let editor_b = cx_b.add_view(window_b, |cx| {
1065 Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), cx)
1066 });
1067 // TODO
1068 // // Create a selection set as client B and see that selection set as client A.
1069 // buffer_a
1070 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1071 // .await;
1072
1073 // Edit the buffer as client B and see that edit as client A.
1074 editor_b.update(&mut cx_b, |editor, cx| {
1075 editor.handle_input(&Input("ok, ".into()), cx)
1076 });
1077 buffer_a
1078 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1079 .await;
1080
1081 // TODO
1082 // // Remove the selection set as client B, see those selections disappear as client A.
1083 cx_b.update(move |_| drop(editor_b));
1084 // buffer_a
1085 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1086 // .await;
1087
1088 // Close the buffer as client A, see that the buffer is closed.
1089 cx_a.update(move |_| drop(buffer_a));
1090 worktree_a
1091 .condition(&cx_a, |tree, cx| !tree.has_open_buffer("b.txt", cx))
1092 .await;
1093
1094 // Dropping the worktree removes client B from client A's collaborators.
1095 cx_b.update(move |_| drop(worktree_b));
1096 worktree_a
1097 .condition(&cx_a, |tree, _| tree.collaborators().is_empty())
1098 .await;
1099 }
1100
1101 #[gpui::test]
1102 async fn test_unshare_worktree(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1103 cx_b.update(zed::contacts_panel::init);
1104 let mut app_state_a = cx_a.update(test_app_state);
1105 let mut app_state_b = cx_b.update(test_app_state);
1106
1107 // Connect to a server as 2 clients.
1108 let mut server = TestServer::start().await;
1109 let client_a = server.create_client(&mut cx_a, "user_a").await;
1110 let client_b = server.create_client(&mut cx_b, "user_b").await;
1111 Arc::get_mut(&mut app_state_a).unwrap().client = client_a.clone();
1112 Arc::get_mut(&mut app_state_a).unwrap().user_store = client_a.user_store.clone();
1113 Arc::get_mut(&mut app_state_b).unwrap().client = client_b.clone();
1114 Arc::get_mut(&mut app_state_b).unwrap().user_store = client_b.user_store.clone();
1115
1116 cx_a.foreground().forbid_parking();
1117
1118 // Share a local worktree as client A
1119 let fs = Arc::new(FakeFs::new());
1120 fs.insert_tree(
1121 "/a",
1122 json!({
1123 ".zed.toml": r#"collaborators = ["user_b"]"#,
1124 "a.txt": "a-contents",
1125 "b.txt": "b-contents",
1126 }),
1127 )
1128 .await;
1129 let worktree_a = Worktree::open_local(
1130 app_state_a.client.clone(),
1131 app_state_a.user_store.clone(),
1132 "/a".as_ref(),
1133 fs,
1134 app_state_a.languages.clone(),
1135 &mut cx_a.to_async(),
1136 )
1137 .await
1138 .unwrap();
1139 worktree_a
1140 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1141 .await;
1142
1143 let remote_worktree_id = worktree_a
1144 .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1145 .await
1146 .unwrap();
1147
1148 let (window_b, workspace_b) =
1149 cx_b.add_window(|cx| Workspace::new(&app_state_b.as_ref().into(), cx));
1150 cx_b.update(|cx| {
1151 cx.dispatch_action(
1152 window_b,
1153 vec![workspace_b.id()],
1154 &JoinWorktree(remote_worktree_id),
1155 );
1156 });
1157 workspace_b
1158 .condition(&cx_b, |workspace, cx| workspace.worktrees(cx).len() == 1)
1159 .await;
1160
1161 let local_worktree_id_b = workspace_b.read_with(&cx_b, |workspace, cx| {
1162 let active_pane = workspace.active_pane().read(cx);
1163 assert!(active_pane.active_item().is_none());
1164 workspace.worktrees(cx).first().unwrap().id()
1165 });
1166 workspace_b
1167 .update(&mut cx_b, |workspace, cx| {
1168 workspace.open_entry(
1169 ProjectPath {
1170 worktree_id: local_worktree_id_b,
1171 path: Path::new("a.txt").into(),
1172 },
1173 cx,
1174 )
1175 })
1176 .unwrap()
1177 .await;
1178 workspace_b.read_with(&cx_b, |workspace, cx| {
1179 let active_pane = workspace.active_pane().read(cx);
1180 assert!(active_pane.active_item().is_some());
1181 });
1182
1183 worktree_a.update(&mut cx_a, |tree, cx| {
1184 tree.as_local_mut().unwrap().unshare(cx);
1185 });
1186 workspace_b
1187 .condition(&cx_b, |workspace, cx| workspace.worktrees(cx).len() == 0)
1188 .await;
1189 workspace_b.read_with(&cx_b, |workspace, cx| {
1190 let active_pane = workspace.active_pane().read(cx);
1191 assert!(active_pane.active_item().is_none());
1192 });
1193 }
1194
1195 #[gpui::test]
1196 async fn test_propagate_saves_and_fs_changes_in_shared_worktree(
1197 mut cx_a: TestAppContext,
1198 mut cx_b: TestAppContext,
1199 mut cx_c: TestAppContext,
1200 ) {
1201 cx_a.foreground().forbid_parking();
1202 let lang_registry = Arc::new(LanguageRegistry::new());
1203
1204 // Connect to a server as 3 clients.
1205 let mut server = TestServer::start().await;
1206 let client_a = server.create_client(&mut cx_a, "user_a").await;
1207 let client_b = server.create_client(&mut cx_b, "user_b").await;
1208 let client_c = server.create_client(&mut cx_c, "user_c").await;
1209
1210 let fs = Arc::new(FakeFs::new());
1211
1212 // Share a worktree as client A.
1213 fs.insert_tree(
1214 "/a",
1215 json!({
1216 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1217 "file1": "",
1218 "file2": ""
1219 }),
1220 )
1221 .await;
1222
1223 let worktree_a = Worktree::open_local(
1224 client_a.clone(),
1225 client_a.user_store.clone(),
1226 "/a".as_ref(),
1227 fs.clone(),
1228 lang_registry.clone(),
1229 &mut cx_a.to_async(),
1230 )
1231 .await
1232 .unwrap();
1233 worktree_a
1234 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1235 .await;
1236 let worktree_id = worktree_a
1237 .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1238 .await
1239 .unwrap();
1240
1241 // Join that worktree as clients B and C.
1242 let worktree_b = Worktree::open_remote(
1243 client_b.clone(),
1244 worktree_id,
1245 lang_registry.clone(),
1246 client_b.user_store.clone(),
1247 &mut cx_b.to_async(),
1248 )
1249 .await
1250 .unwrap();
1251 let worktree_c = Worktree::open_remote(
1252 client_c.clone(),
1253 worktree_id,
1254 lang_registry.clone(),
1255 client_c.user_store.clone(),
1256 &mut cx_c.to_async(),
1257 )
1258 .await
1259 .unwrap();
1260
1261 // Open and edit a buffer as both guests B and C.
1262 let buffer_b = worktree_b
1263 .update(&mut cx_b, |tree, cx| tree.open_buffer("file1", cx))
1264 .await
1265 .unwrap();
1266 let buffer_c = worktree_c
1267 .update(&mut cx_c, |tree, cx| tree.open_buffer("file1", cx))
1268 .await
1269 .unwrap();
1270 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1271 buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1272
1273 // Open and edit that buffer as the host.
1274 let buffer_a = worktree_a
1275 .update(&mut cx_a, |tree, cx| tree.open_buffer("file1", cx))
1276 .await
1277 .unwrap();
1278
1279 buffer_a
1280 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1281 .await;
1282 buffer_a.update(&mut cx_a, |buf, cx| {
1283 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1284 });
1285
1286 // Wait for edits to propagate
1287 buffer_a
1288 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1289 .await;
1290 buffer_b
1291 .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1292 .await;
1293 buffer_c
1294 .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1295 .await;
1296
1297 // Edit the buffer as the host and concurrently save as guest B.
1298 let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx).unwrap());
1299 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1300 save_b.await.unwrap();
1301 assert_eq!(
1302 fs.load("/a/file1".as_ref()).await.unwrap(),
1303 "hi-a, i-am-c, i-am-b, i-am-a"
1304 );
1305 buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1306 buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1307 buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1308
1309 // Make changes on host's file system, see those changes on the guests.
1310 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
1311 .await
1312 .unwrap();
1313 fs.insert_file(Path::new("/a/file4"), "4".into())
1314 .await
1315 .unwrap();
1316
1317 worktree_b
1318 .condition(&cx_b, |tree, _| tree.file_count() == 4)
1319 .await;
1320 worktree_c
1321 .condition(&cx_c, |tree, _| tree.file_count() == 4)
1322 .await;
1323 worktree_b.read_with(&cx_b, |tree, _| {
1324 assert_eq!(
1325 tree.paths()
1326 .map(|p| p.to_string_lossy())
1327 .collect::<Vec<_>>(),
1328 &[".zed.toml", "file1", "file3", "file4"]
1329 )
1330 });
1331 worktree_c.read_with(&cx_c, |tree, _| {
1332 assert_eq!(
1333 tree.paths()
1334 .map(|p| p.to_string_lossy())
1335 .collect::<Vec<_>>(),
1336 &[".zed.toml", "file1", "file3", "file4"]
1337 )
1338 });
1339 }
1340
1341 #[gpui::test]
1342 async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1343 cx_a.foreground().forbid_parking();
1344 let lang_registry = Arc::new(LanguageRegistry::new());
1345
1346 // Connect to a server as 2 clients.
1347 let mut server = TestServer::start().await;
1348 let client_a = server.create_client(&mut cx_a, "user_a").await;
1349 let client_b = server.create_client(&mut cx_b, "user_b").await;
1350
1351 // Share a local worktree as client A
1352 let fs = Arc::new(FakeFs::new());
1353 fs.insert_tree(
1354 "/dir",
1355 json!({
1356 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1357 "a.txt": "a-contents",
1358 }),
1359 )
1360 .await;
1361
1362 let worktree_a = Worktree::open_local(
1363 client_a.clone(),
1364 client_a.user_store.clone(),
1365 "/dir".as_ref(),
1366 fs,
1367 lang_registry.clone(),
1368 &mut cx_a.to_async(),
1369 )
1370 .await
1371 .unwrap();
1372 worktree_a
1373 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1374 .await;
1375 let worktree_id = worktree_a
1376 .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1377 .await
1378 .unwrap();
1379
1380 // Join that worktree as client B, and see that a guest has joined as client A.
1381 let worktree_b = Worktree::open_remote(
1382 client_b.clone(),
1383 worktree_id,
1384 lang_registry.clone(),
1385 client_b.user_store.clone(),
1386 &mut cx_b.to_async(),
1387 )
1388 .await
1389 .unwrap();
1390
1391 let buffer_b = worktree_b
1392 .update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx))
1393 .await
1394 .unwrap();
1395 let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1396
1397 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1398 buffer_b.read_with(&cx_b, |buf, _| {
1399 assert!(buf.is_dirty());
1400 assert!(!buf.has_conflict());
1401 });
1402
1403 buffer_b
1404 .update(&mut cx_b, |buf, cx| buf.save(cx))
1405 .unwrap()
1406 .await
1407 .unwrap();
1408 worktree_b
1409 .condition(&cx_b, |_, cx| {
1410 buffer_b.read(cx).file().unwrap().mtime() != mtime
1411 })
1412 .await;
1413 buffer_b.read_with(&cx_b, |buf, _| {
1414 assert!(!buf.is_dirty());
1415 assert!(!buf.has_conflict());
1416 });
1417
1418 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1419 buffer_b.read_with(&cx_b, |buf, _| {
1420 assert!(buf.is_dirty());
1421 assert!(!buf.has_conflict());
1422 });
1423 }
1424
1425 #[gpui::test]
1426 async fn test_editing_while_guest_opens_buffer(
1427 mut cx_a: TestAppContext,
1428 mut cx_b: TestAppContext,
1429 ) {
1430 cx_a.foreground().forbid_parking();
1431 let lang_registry = Arc::new(LanguageRegistry::new());
1432
1433 // Connect to a server as 2 clients.
1434 let mut server = TestServer::start().await;
1435 let client_a = server.create_client(&mut cx_a, "user_a").await;
1436 let client_b = server.create_client(&mut cx_b, "user_b").await;
1437
1438 // Share a local worktree as client A
1439 let fs = Arc::new(FakeFs::new());
1440 fs.insert_tree(
1441 "/dir",
1442 json!({
1443 ".zed.toml": r#"collaborators = ["user_b"]"#,
1444 "a.txt": "a-contents",
1445 }),
1446 )
1447 .await;
1448 let worktree_a = Worktree::open_local(
1449 client_a.clone(),
1450 client_a.user_store.clone(),
1451 "/dir".as_ref(),
1452 fs,
1453 lang_registry.clone(),
1454 &mut cx_a.to_async(),
1455 )
1456 .await
1457 .unwrap();
1458 worktree_a
1459 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1460 .await;
1461 let worktree_id = worktree_a
1462 .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1463 .await
1464 .unwrap();
1465
1466 // Join that worktree as client B, and see that a guest has joined as client A.
1467 let worktree_b = Worktree::open_remote(
1468 client_b.clone(),
1469 worktree_id,
1470 lang_registry.clone(),
1471 client_b.user_store.clone(),
1472 &mut cx_b.to_async(),
1473 )
1474 .await
1475 .unwrap();
1476
1477 let buffer_a = worktree_a
1478 .update(&mut cx_a, |tree, cx| tree.open_buffer("a.txt", cx))
1479 .await
1480 .unwrap();
1481 let buffer_b = cx_b
1482 .background()
1483 .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1484
1485 task::yield_now().await;
1486 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1487
1488 let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1489 let buffer_b = buffer_b.await.unwrap();
1490 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1491 }
1492
1493 #[gpui::test]
1494 async fn test_leaving_worktree_while_opening_buffer(
1495 mut cx_a: TestAppContext,
1496 mut cx_b: TestAppContext,
1497 ) {
1498 cx_a.foreground().forbid_parking();
1499 let lang_registry = Arc::new(LanguageRegistry::new());
1500
1501 // Connect to a server as 2 clients.
1502 let mut server = TestServer::start().await;
1503 let client_a = server.create_client(&mut cx_a, "user_a").await;
1504 let client_b = server.create_client(&mut cx_b, "user_b").await;
1505
1506 // Share a local worktree as client A
1507 let fs = Arc::new(FakeFs::new());
1508 fs.insert_tree(
1509 "/dir",
1510 json!({
1511 ".zed.toml": r#"collaborators = ["user_b"]"#,
1512 "a.txt": "a-contents",
1513 }),
1514 )
1515 .await;
1516 let worktree_a = Worktree::open_local(
1517 client_a.clone(),
1518 client_a.user_store.clone(),
1519 "/dir".as_ref(),
1520 fs,
1521 lang_registry.clone(),
1522 &mut cx_a.to_async(),
1523 )
1524 .await
1525 .unwrap();
1526 worktree_a
1527 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1528 .await;
1529 let worktree_id = worktree_a
1530 .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1531 .await
1532 .unwrap();
1533
1534 // Join that worktree as client B, and see that a guest has joined as client A.
1535 let worktree_b = Worktree::open_remote(
1536 client_b.clone(),
1537 worktree_id,
1538 lang_registry.clone(),
1539 client_b.user_store.clone(),
1540 &mut cx_b.to_async(),
1541 )
1542 .await
1543 .unwrap();
1544 worktree_a
1545 .condition(&cx_a, |tree, _| tree.collaborators().len() == 1)
1546 .await;
1547
1548 let buffer_b = cx_b
1549 .background()
1550 .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1551 cx_b.update(|_| drop(worktree_b));
1552 drop(buffer_b);
1553 worktree_a
1554 .condition(&cx_a, |tree, _| tree.collaborators().len() == 0)
1555 .await;
1556 }
1557
1558 #[gpui::test]
1559 async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1560 cx_a.foreground().forbid_parking();
1561 let lang_registry = Arc::new(LanguageRegistry::new());
1562
1563 // Connect to a server as 2 clients.
1564 let mut server = TestServer::start().await;
1565 let client_a = server.create_client(&mut cx_a, "user_a").await;
1566 let client_b = server.create_client(&mut cx_b, "user_b").await;
1567
1568 // Share a local worktree as client A
1569 let fs = Arc::new(FakeFs::new());
1570 fs.insert_tree(
1571 "/a",
1572 json!({
1573 ".zed.toml": r#"collaborators = ["user_b"]"#,
1574 "a.txt": "a-contents",
1575 "b.txt": "b-contents",
1576 }),
1577 )
1578 .await;
1579 let worktree_a = Worktree::open_local(
1580 client_a.clone(),
1581 client_a.user_store.clone(),
1582 "/a".as_ref(),
1583 fs,
1584 lang_registry.clone(),
1585 &mut cx_a.to_async(),
1586 )
1587 .await
1588 .unwrap();
1589 worktree_a
1590 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1591 .await;
1592 let worktree_id = worktree_a
1593 .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1594 .await
1595 .unwrap();
1596
1597 // Join that worktree as client B, and see that a guest has joined as client A.
1598 let _worktree_b = Worktree::open_remote(
1599 client_b.clone(),
1600 worktree_id,
1601 lang_registry.clone(),
1602 client_b.user_store.clone(),
1603 &mut cx_b.to_async(),
1604 )
1605 .await
1606 .unwrap();
1607 worktree_a
1608 .condition(&cx_a, |tree, _| tree.collaborators().len() == 1)
1609 .await;
1610
1611 // Drop client B's connection and ensure client A observes client B leaving the worktree.
1612 client_b.disconnect(&cx_b.to_async()).await.unwrap();
1613 worktree_a
1614 .condition(&cx_a, |tree, _| tree.collaborators().len() == 0)
1615 .await;
1616 }
1617
1618 #[gpui::test]
1619 async fn test_collaborating_with_diagnostics(
1620 mut cx_a: TestAppContext,
1621 mut cx_b: TestAppContext,
1622 ) {
1623 cx_a.foreground().forbid_parking();
1624 let (language_server_config, mut fake_language_server) =
1625 LanguageServerConfig::fake(cx_a.background()).await;
1626 let mut lang_registry = LanguageRegistry::new();
1627 lang_registry.add(Arc::new(Language::new(
1628 LanguageConfig {
1629 name: "Rust".to_string(),
1630 path_suffixes: vec!["rs".to_string()],
1631 language_server: Some(language_server_config),
1632 ..Default::default()
1633 },
1634 Some(tree_sitter_rust::language()),
1635 )));
1636
1637 let lang_registry = Arc::new(lang_registry);
1638
1639 // Connect to a server as 2 clients.
1640 let mut server = TestServer::start().await;
1641 let client_a = server.create_client(&mut cx_a, "user_a").await;
1642 let client_b = server.create_client(&mut cx_b, "user_b").await;
1643
1644 // Share a local worktree as client A
1645 let fs = Arc::new(FakeFs::new());
1646 fs.insert_tree(
1647 "/a",
1648 json!({
1649 ".zed.toml": r#"collaborators = ["user_b"]"#,
1650 "a.rs": "let one = two",
1651 "other.rs": "",
1652 }),
1653 )
1654 .await;
1655 let worktree_a = Worktree::open_local(
1656 client_a.clone(),
1657 client_a.user_store.clone(),
1658 "/a".as_ref(),
1659 fs,
1660 lang_registry.clone(),
1661 &mut cx_a.to_async(),
1662 )
1663 .await
1664 .unwrap();
1665 worktree_a
1666 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1667 .await;
1668 let worktree_id = worktree_a
1669 .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1670 .await
1671 .unwrap();
1672
1673 // Cause language server to start.
1674 let _ = cx_a
1675 .background()
1676 .spawn(worktree_a.update(&mut cx_a, |worktree, cx| {
1677 worktree.open_buffer("other.rs", cx)
1678 }))
1679 .await
1680 .unwrap();
1681
1682 // Simulate a language server reporting errors for a file.
1683 fake_language_server
1684 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1685 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1686 version: None,
1687 diagnostics: vec![
1688 lsp::Diagnostic {
1689 severity: Some(lsp::DiagnosticSeverity::ERROR),
1690 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1691 message: "message 1".to_string(),
1692 ..Default::default()
1693 },
1694 lsp::Diagnostic {
1695 severity: Some(lsp::DiagnosticSeverity::WARNING),
1696 range: lsp::Range::new(
1697 lsp::Position::new(0, 10),
1698 lsp::Position::new(0, 13),
1699 ),
1700 message: "message 2".to_string(),
1701 ..Default::default()
1702 },
1703 ],
1704 })
1705 .await;
1706
1707 // Join the worktree as client B.
1708 let worktree_b = Worktree::open_remote(
1709 client_b.clone(),
1710 worktree_id,
1711 lang_registry.clone(),
1712 client_b.user_store.clone(),
1713 &mut cx_b.to_async(),
1714 )
1715 .await
1716 .unwrap();
1717
1718 // Open the file with the errors.
1719 let buffer_b = cx_b
1720 .background()
1721 .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.rs", cx)))
1722 .await
1723 .unwrap();
1724
1725 buffer_b.read_with(&cx_b, |buffer, _| {
1726 assert_eq!(
1727 buffer
1728 .snapshot()
1729 .diagnostics_in_range::<_, Point>(0..buffer.len())
1730 .collect::<Vec<_>>(),
1731 &[
1732 DiagnosticEntry {
1733 range: Point::new(0, 4)..Point::new(0, 7),
1734 diagnostic: Diagnostic {
1735 group_id: 0,
1736 message: "message 1".to_string(),
1737 severity: lsp::DiagnosticSeverity::ERROR,
1738 is_primary: true,
1739 ..Default::default()
1740 }
1741 },
1742 DiagnosticEntry {
1743 range: Point::new(0, 10)..Point::new(0, 13),
1744 diagnostic: Diagnostic {
1745 group_id: 1,
1746 severity: lsp::DiagnosticSeverity::WARNING,
1747 message: "message 2".to_string(),
1748 is_primary: true,
1749 ..Default::default()
1750 }
1751 }
1752 ]
1753 );
1754 });
1755 }
1756
1757 #[gpui::test]
1758 async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1759 cx_a.foreground().forbid_parking();
1760
1761 // Connect to a server as 2 clients.
1762 let mut server = TestServer::start().await;
1763 let client_a = server.create_client(&mut cx_a, "user_a").await;
1764 let client_b = server.create_client(&mut cx_b, "user_b").await;
1765
1766 // Create an org that includes these 2 users.
1767 let db = &server.app_state.db;
1768 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1769 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
1770 .await
1771 .unwrap();
1772 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
1773 .await
1774 .unwrap();
1775
1776 // Create a channel that includes all the users.
1777 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1778 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
1779 .await
1780 .unwrap();
1781 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
1782 .await
1783 .unwrap();
1784 db.create_channel_message(
1785 channel_id,
1786 client_b.current_user_id(&cx_b),
1787 "hello A, it's B.",
1788 OffsetDateTime::now_utc(),
1789 1,
1790 )
1791 .await
1792 .unwrap();
1793
1794 let channels_a = cx_a
1795 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
1796 channels_a
1797 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
1798 .await;
1799 channels_a.read_with(&cx_a, |list, _| {
1800 assert_eq!(
1801 list.available_channels().unwrap(),
1802 &[ChannelDetails {
1803 id: channel_id.to_proto(),
1804 name: "test-channel".to_string()
1805 }]
1806 )
1807 });
1808 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
1809 this.get_channel(channel_id.to_proto(), cx).unwrap()
1810 });
1811 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
1812 channel_a
1813 .condition(&cx_a, |channel, _| {
1814 channel_messages(channel)
1815 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1816 })
1817 .await;
1818
1819 let channels_b = cx_b
1820 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
1821 channels_b
1822 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
1823 .await;
1824 channels_b.read_with(&cx_b, |list, _| {
1825 assert_eq!(
1826 list.available_channels().unwrap(),
1827 &[ChannelDetails {
1828 id: channel_id.to_proto(),
1829 name: "test-channel".to_string()
1830 }]
1831 )
1832 });
1833
1834 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
1835 this.get_channel(channel_id.to_proto(), cx).unwrap()
1836 });
1837 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
1838 channel_b
1839 .condition(&cx_b, |channel, _| {
1840 channel_messages(channel)
1841 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1842 })
1843 .await;
1844
1845 channel_a
1846 .update(&mut cx_a, |channel, cx| {
1847 channel
1848 .send_message("oh, hi B.".to_string(), cx)
1849 .unwrap()
1850 .detach();
1851 let task = channel.send_message("sup".to_string(), cx).unwrap();
1852 assert_eq!(
1853 channel_messages(channel),
1854 &[
1855 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
1856 ("user_a".to_string(), "oh, hi B.".to_string(), true),
1857 ("user_a".to_string(), "sup".to_string(), true)
1858 ]
1859 );
1860 task
1861 })
1862 .await
1863 .unwrap();
1864
1865 channel_b
1866 .condition(&cx_b, |channel, _| {
1867 channel_messages(channel)
1868 == [
1869 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
1870 ("user_a".to_string(), "oh, hi B.".to_string(), false),
1871 ("user_a".to_string(), "sup".to_string(), false),
1872 ]
1873 })
1874 .await;
1875
1876 assert_eq!(
1877 server
1878 .state()
1879 .await
1880 .channel(channel_id)
1881 .unwrap()
1882 .connection_ids
1883 .len(),
1884 2
1885 );
1886 cx_b.update(|_| drop(channel_b));
1887 server
1888 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
1889 .await;
1890
1891 cx_a.update(|_| drop(channel_a));
1892 server
1893 .condition(|state| state.channel(channel_id).is_none())
1894 .await;
1895 }
1896
1897 #[gpui::test]
1898 async fn test_chat_message_validation(mut cx_a: TestAppContext) {
1899 cx_a.foreground().forbid_parking();
1900
1901 let mut server = TestServer::start().await;
1902 let client_a = server.create_client(&mut cx_a, "user_a").await;
1903
1904 let db = &server.app_state.db;
1905 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1906 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1907 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
1908 .await
1909 .unwrap();
1910 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
1911 .await
1912 .unwrap();
1913
1914 let channels_a = cx_a
1915 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
1916 channels_a
1917 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
1918 .await;
1919 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
1920 this.get_channel(channel_id.to_proto(), cx).unwrap()
1921 });
1922
1923 // Messages aren't allowed to be too long.
1924 channel_a
1925 .update(&mut cx_a, |channel, cx| {
1926 let long_body = "this is long.\n".repeat(1024);
1927 channel.send_message(long_body, cx).unwrap()
1928 })
1929 .await
1930 .unwrap_err();
1931
1932 // Messages aren't allowed to be blank.
1933 channel_a.update(&mut cx_a, |channel, cx| {
1934 channel.send_message(String::new(), cx).unwrap_err()
1935 });
1936
1937 // Leading and trailing whitespace are trimmed.
1938 channel_a
1939 .update(&mut cx_a, |channel, cx| {
1940 channel
1941 .send_message("\n surrounded by whitespace \n".to_string(), cx)
1942 .unwrap()
1943 })
1944 .await
1945 .unwrap();
1946 assert_eq!(
1947 db.get_channel_messages(channel_id, 10, None)
1948 .await
1949 .unwrap()
1950 .iter()
1951 .map(|m| &m.body)
1952 .collect::<Vec<_>>(),
1953 &["surrounded by whitespace"]
1954 );
1955 }
1956
1957 #[gpui::test]
1958 async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1959 cx_a.foreground().forbid_parking();
1960
1961 // Connect to a server as 2 clients.
1962 let mut server = TestServer::start().await;
1963 let client_a = server.create_client(&mut cx_a, "user_a").await;
1964 let client_b = server.create_client(&mut cx_b, "user_b").await;
1965 let mut status_b = client_b.status();
1966
1967 // Create an org that includes these 2 users.
1968 let db = &server.app_state.db;
1969 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1970 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
1971 .await
1972 .unwrap();
1973 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
1974 .await
1975 .unwrap();
1976
1977 // Create a channel that includes all the users.
1978 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1979 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
1980 .await
1981 .unwrap();
1982 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
1983 .await
1984 .unwrap();
1985 db.create_channel_message(
1986 channel_id,
1987 client_b.current_user_id(&cx_b),
1988 "hello A, it's B.",
1989 OffsetDateTime::now_utc(),
1990 2,
1991 )
1992 .await
1993 .unwrap();
1994
1995 let channels_a = cx_a
1996 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
1997 channels_a
1998 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
1999 .await;
2000
2001 channels_a.read_with(&cx_a, |list, _| {
2002 assert_eq!(
2003 list.available_channels().unwrap(),
2004 &[ChannelDetails {
2005 id: channel_id.to_proto(),
2006 name: "test-channel".to_string()
2007 }]
2008 )
2009 });
2010 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2011 this.get_channel(channel_id.to_proto(), cx).unwrap()
2012 });
2013 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2014 channel_a
2015 .condition(&cx_a, |channel, _| {
2016 channel_messages(channel)
2017 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2018 })
2019 .await;
2020
2021 let channels_b = cx_b
2022 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2023 channels_b
2024 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2025 .await;
2026 channels_b.read_with(&cx_b, |list, _| {
2027 assert_eq!(
2028 list.available_channels().unwrap(),
2029 &[ChannelDetails {
2030 id: channel_id.to_proto(),
2031 name: "test-channel".to_string()
2032 }]
2033 )
2034 });
2035
2036 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2037 this.get_channel(channel_id.to_proto(), cx).unwrap()
2038 });
2039 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2040 channel_b
2041 .condition(&cx_b, |channel, _| {
2042 channel_messages(channel)
2043 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2044 })
2045 .await;
2046
2047 // Disconnect client B, ensuring we can still access its cached channel data.
2048 server.forbid_connections();
2049 server.disconnect_client(client_b.current_user_id(&cx_b));
2050 while !matches!(
2051 status_b.recv().await,
2052 Some(client::Status::ReconnectionError { .. })
2053 ) {}
2054
2055 channels_b.read_with(&cx_b, |channels, _| {
2056 assert_eq!(
2057 channels.available_channels().unwrap(),
2058 [ChannelDetails {
2059 id: channel_id.to_proto(),
2060 name: "test-channel".to_string()
2061 }]
2062 )
2063 });
2064 channel_b.read_with(&cx_b, |channel, _| {
2065 assert_eq!(
2066 channel_messages(channel),
2067 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2068 )
2069 });
2070
2071 // Send a message from client B while it is disconnected.
2072 channel_b
2073 .update(&mut cx_b, |channel, cx| {
2074 let task = channel
2075 .send_message("can you see this?".to_string(), cx)
2076 .unwrap();
2077 assert_eq!(
2078 channel_messages(channel),
2079 &[
2080 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2081 ("user_b".to_string(), "can you see this?".to_string(), true)
2082 ]
2083 );
2084 task
2085 })
2086 .await
2087 .unwrap_err();
2088
2089 // Send a message from client A while B is disconnected.
2090 channel_a
2091 .update(&mut cx_a, |channel, cx| {
2092 channel
2093 .send_message("oh, hi B.".to_string(), cx)
2094 .unwrap()
2095 .detach();
2096 let task = channel.send_message("sup".to_string(), cx).unwrap();
2097 assert_eq!(
2098 channel_messages(channel),
2099 &[
2100 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2101 ("user_a".to_string(), "oh, hi B.".to_string(), true),
2102 ("user_a".to_string(), "sup".to_string(), true)
2103 ]
2104 );
2105 task
2106 })
2107 .await
2108 .unwrap();
2109
2110 // Give client B a chance to reconnect.
2111 server.allow_connections();
2112 cx_b.foreground().advance_clock(Duration::from_secs(10));
2113
2114 // Verify that B sees the new messages upon reconnection, as well as the message client B
2115 // sent while offline.
2116 channel_b
2117 .condition(&cx_b, |channel, _| {
2118 channel_messages(channel)
2119 == [
2120 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2121 ("user_a".to_string(), "oh, hi B.".to_string(), false),
2122 ("user_a".to_string(), "sup".to_string(), false),
2123 ("user_b".to_string(), "can you see this?".to_string(), false),
2124 ]
2125 })
2126 .await;
2127
2128 // Ensure client A and B can communicate normally after reconnection.
2129 channel_a
2130 .update(&mut cx_a, |channel, cx| {
2131 channel.send_message("you online?".to_string(), cx).unwrap()
2132 })
2133 .await
2134 .unwrap();
2135 channel_b
2136 .condition(&cx_b, |channel, _| {
2137 channel_messages(channel)
2138 == [
2139 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2140 ("user_a".to_string(), "oh, hi B.".to_string(), false),
2141 ("user_a".to_string(), "sup".to_string(), false),
2142 ("user_b".to_string(), "can you see this?".to_string(), false),
2143 ("user_a".to_string(), "you online?".to_string(), false),
2144 ]
2145 })
2146 .await;
2147
2148 channel_b
2149 .update(&mut cx_b, |channel, cx| {
2150 channel.send_message("yep".to_string(), cx).unwrap()
2151 })
2152 .await
2153 .unwrap();
2154 channel_a
2155 .condition(&cx_a, |channel, _| {
2156 channel_messages(channel)
2157 == [
2158 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2159 ("user_a".to_string(), "oh, hi B.".to_string(), false),
2160 ("user_a".to_string(), "sup".to_string(), false),
2161 ("user_b".to_string(), "can you see this?".to_string(), false),
2162 ("user_a".to_string(), "you online?".to_string(), false),
2163 ("user_b".to_string(), "yep".to_string(), false),
2164 ]
2165 })
2166 .await;
2167 }
2168
2169 #[gpui::test]
2170 async fn test_contacts(
2171 mut cx_a: TestAppContext,
2172 mut cx_b: TestAppContext,
2173 mut cx_c: TestAppContext,
2174 ) {
2175 cx_a.foreground().forbid_parking();
2176 let lang_registry = Arc::new(LanguageRegistry::new());
2177
2178 // Connect to a server as 3 clients.
2179 let mut server = TestServer::start().await;
2180 let client_a = server.create_client(&mut cx_a, "user_a").await;
2181 let client_b = server.create_client(&mut cx_b, "user_b").await;
2182 let client_c = server.create_client(&mut cx_c, "user_c").await;
2183
2184 let fs = Arc::new(FakeFs::new());
2185
2186 // Share a worktree as client A.
2187 fs.insert_tree(
2188 "/a",
2189 json!({
2190 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2191 }),
2192 )
2193 .await;
2194
2195 let worktree_a = Worktree::open_local(
2196 client_a.clone(),
2197 client_a.user_store.clone(),
2198 "/a".as_ref(),
2199 fs.clone(),
2200 lang_registry.clone(),
2201 &mut cx_a.to_async(),
2202 )
2203 .await
2204 .unwrap();
2205
2206 client_a
2207 .user_store
2208 .condition(&cx_a, |user_store, _| {
2209 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2210 })
2211 .await;
2212 client_b
2213 .user_store
2214 .condition(&cx_b, |user_store, _| {
2215 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2216 })
2217 .await;
2218 client_c
2219 .user_store
2220 .condition(&cx_c, |user_store, _| {
2221 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2222 })
2223 .await;
2224
2225 let worktree_id = worktree_a
2226 .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
2227 .await
2228 .unwrap();
2229
2230 let _worktree_b = Worktree::open_remote(
2231 client_b.clone(),
2232 worktree_id,
2233 lang_registry.clone(),
2234 client_b.user_store.clone(),
2235 &mut cx_b.to_async(),
2236 )
2237 .await
2238 .unwrap();
2239
2240 client_a
2241 .user_store
2242 .condition(&cx_a, |user_store, _| {
2243 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2244 })
2245 .await;
2246 client_b
2247 .user_store
2248 .condition(&cx_b, |user_store, _| {
2249 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2250 })
2251 .await;
2252 client_c
2253 .user_store
2254 .condition(&cx_c, |user_store, _| {
2255 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2256 })
2257 .await;
2258
2259 worktree_a
2260 .condition(&cx_a, |worktree, _| {
2261 worktree.collaborators().contains_key(&client_b.peer_id)
2262 })
2263 .await;
2264
2265 cx_a.update(move |_| drop(worktree_a));
2266 client_a
2267 .user_store
2268 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
2269 .await;
2270 client_b
2271 .user_store
2272 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
2273 .await;
2274 client_c
2275 .user_store
2276 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
2277 .await;
2278
2279 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
2280 user_store
2281 .contacts()
2282 .iter()
2283 .map(|contact| {
2284 let worktrees = contact
2285 .worktrees
2286 .iter()
2287 .map(|w| {
2288 (
2289 w.root_name.as_str(),
2290 w.guests.iter().map(|p| p.github_login.as_str()).collect(),
2291 )
2292 })
2293 .collect();
2294 (contact.user.github_login.as_str(), worktrees)
2295 })
2296 .collect()
2297 }
2298 }
2299
2300 struct TestServer {
2301 peer: Arc<Peer>,
2302 app_state: Arc<AppState>,
2303 server: Arc<Server>,
2304 notifications: mpsc::Receiver<()>,
2305 connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
2306 forbid_connections: Arc<AtomicBool>,
2307 _test_db: TestDb,
2308 }
2309
2310 impl TestServer {
2311 async fn start() -> Self {
2312 let test_db = TestDb::new();
2313 let app_state = Self::build_app_state(&test_db).await;
2314 let peer = Peer::new();
2315 let notifications = mpsc::channel(128);
2316 let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
2317 Self {
2318 peer,
2319 app_state,
2320 server,
2321 notifications: notifications.1,
2322 connection_killers: Default::default(),
2323 forbid_connections: Default::default(),
2324 _test_db: test_db,
2325 }
2326 }
2327
2328 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
2329 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
2330 let client_name = name.to_string();
2331 let mut client = Client::new();
2332 let server = self.server.clone();
2333 let connection_killers = self.connection_killers.clone();
2334 let forbid_connections = self.forbid_connections.clone();
2335 let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
2336
2337 Arc::get_mut(&mut client)
2338 .unwrap()
2339 .override_authenticate(move |cx| {
2340 cx.spawn(|_| async move {
2341 let access_token = "the-token".to_string();
2342 Ok(Credentials {
2343 user_id: user_id.0 as u64,
2344 access_token,
2345 })
2346 })
2347 })
2348 .override_establish_connection(move |credentials, cx| {
2349 assert_eq!(credentials.user_id, user_id.0 as u64);
2350 assert_eq!(credentials.access_token, "the-token");
2351
2352 let server = server.clone();
2353 let connection_killers = connection_killers.clone();
2354 let forbid_connections = forbid_connections.clone();
2355 let client_name = client_name.clone();
2356 let connection_id_tx = connection_id_tx.clone();
2357 cx.spawn(move |cx| async move {
2358 if forbid_connections.load(SeqCst) {
2359 Err(EstablishConnectionError::other(anyhow!(
2360 "server is forbidding connections"
2361 )))
2362 } else {
2363 let (client_conn, server_conn, kill_conn) = Connection::in_memory();
2364 connection_killers.lock().insert(user_id, kill_conn);
2365 cx.background()
2366 .spawn(server.handle_connection(
2367 server_conn,
2368 client_name,
2369 user_id,
2370 Some(connection_id_tx),
2371 ))
2372 .detach();
2373 Ok(client_conn)
2374 }
2375 })
2376 });
2377
2378 let http = FakeHttpClient::new(|_| async move { Ok(surf::http::Response::new(404)) });
2379 client
2380 .authenticate_and_connect(&cx.to_async())
2381 .await
2382 .unwrap();
2383
2384 let peer_id = PeerId(connection_id_rx.recv().await.unwrap().0);
2385 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
2386 let mut authed_user =
2387 user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
2388 while authed_user.recv().await.unwrap().is_none() {}
2389
2390 TestClient {
2391 client,
2392 peer_id,
2393 user_store,
2394 }
2395 }
2396
2397 fn disconnect_client(&self, user_id: UserId) {
2398 if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
2399 let _ = kill_conn.try_send(Some(()));
2400 }
2401 }
2402
2403 fn forbid_connections(&self) {
2404 self.forbid_connections.store(true, SeqCst);
2405 }
2406
2407 fn allow_connections(&self) {
2408 self.forbid_connections.store(false, SeqCst);
2409 }
2410
2411 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
2412 let mut config = Config::default();
2413 config.session_secret = "a".repeat(32);
2414 config.database_url = test_db.url.clone();
2415 let github_client = github::AppClient::test();
2416 Arc::new(AppState {
2417 db: test_db.db().clone(),
2418 handlebars: Default::default(),
2419 auth_client: auth::build_client("", ""),
2420 repo_client: github::RepoClient::test(&github_client),
2421 github_client,
2422 config,
2423 })
2424 }
2425
2426 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
2427 self.server.store.read()
2428 }
2429
2430 async fn condition<F>(&mut self, mut predicate: F)
2431 where
2432 F: FnMut(&Store) -> bool,
2433 {
2434 async_std::future::timeout(Duration::from_millis(500), async {
2435 while !(predicate)(&*self.server.store.read()) {
2436 self.notifications.recv().await;
2437 }
2438 })
2439 .await
2440 .expect("condition timed out");
2441 }
2442 }
2443
2444 impl Drop for TestServer {
2445 fn drop(&mut self) {
2446 task::block_on(self.peer.reset());
2447 }
2448 }
2449
2450 struct TestClient {
2451 client: Arc<Client>,
2452 pub peer_id: PeerId,
2453 pub user_store: ModelHandle<UserStore>,
2454 }
2455
2456 impl Deref for TestClient {
2457 type Target = Arc<Client>;
2458
2459 fn deref(&self) -> &Self::Target {
2460 &self.client
2461 }
2462 }
2463
2464 impl TestClient {
2465 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
2466 UserId::from_proto(
2467 self.user_store
2468 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
2469 )
2470 }
2471 }
2472
2473 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
2474 channel
2475 .messages()
2476 .cursor::<()>()
2477 .map(|m| {
2478 (
2479 m.sender.github_login.clone(),
2480 m.body.clone(),
2481 m.is_pending(),
2482 )
2483 })
2484 .collect()
2485 }
2486
2487 struct EmptyView;
2488
2489 impl gpui::Entity for EmptyView {
2490 type Event = ();
2491 }
2492
2493 impl gpui::View for EmptyView {
2494 fn ui_name() -> &'static str {
2495 "empty view"
2496 }
2497
2498 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
2499 gpui::Element::boxed(gpui::elements::Empty)
2500 }
2501 }
2502}