1mod store;
2
3use super::{
4 auth::process_auth_header,
5 db::{ChannelId, MessageId, UserId},
6 AppState,
7};
8use anyhow::anyhow;
9use async_std::task;
10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
11use collections::{HashMap, HashSet};
12use futures::{future::BoxFuture, FutureExt};
13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
14use postage::{mpsc, prelude::Sink as _, prelude::Stream as _};
15use rpc::{
16 proto::{self, AnyTypedEnvelope, EnvelopedMessage},
17 Connection, ConnectionId, Peer, TypedEnvelope,
18};
19use sha1::{Digest as _, Sha1};
20use std::{any::TypeId, future::Future, mem, sync::Arc, time::Instant};
21use store::{Store, Worktree};
22use surf::StatusCode;
23use tide::log;
24use tide::{
25 http::headers::{HeaderName, CONNECTION, UPGRADE},
26 Request, Response,
27};
28use time::OffsetDateTime;
29
30type MessageHandler = Box<
31 dyn Send
32 + Sync
33 + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
34>;
35
36pub struct Server {
37 peer: Arc<Peer>,
38 store: RwLock<Store>,
39 app_state: Arc<AppState>,
40 handlers: HashMap<TypeId, MessageHandler>,
41 notifications: Option<mpsc::Sender<()>>,
42}
43
44const MESSAGE_COUNT_PER_PAGE: usize = 100;
45const MAX_MESSAGE_LEN: usize = 1024;
46
47impl Server {
48 pub fn new(
49 app_state: Arc<AppState>,
50 peer: Arc<Peer>,
51 notifications: Option<mpsc::Sender<()>>,
52 ) -> Arc<Self> {
53 let mut server = Self {
54 peer,
55 app_state,
56 store: Default::default(),
57 handlers: Default::default(),
58 notifications,
59 };
60
61 server
62 .add_handler(Server::ping)
63 .add_handler(Server::register_worktree)
64 .add_handler(Server::unregister_worktree)
65 .add_handler(Server::share_worktree)
66 .add_handler(Server::unshare_worktree)
67 .add_handler(Server::join_worktree)
68 .add_handler(Server::leave_worktree)
69 .add_handler(Server::update_worktree)
70 .add_handler(Server::open_buffer)
71 .add_handler(Server::close_buffer)
72 .add_handler(Server::update_buffer)
73 .add_handler(Server::buffer_saved)
74 .add_handler(Server::save_buffer)
75 .add_handler(Server::get_channels)
76 .add_handler(Server::get_users)
77 .add_handler(Server::join_channel)
78 .add_handler(Server::leave_channel)
79 .add_handler(Server::send_channel_message)
80 .add_handler(Server::get_channel_messages);
81
82 Arc::new(server)
83 }
84
85 fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
86 where
87 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
88 Fut: 'static + Send + Future<Output = tide::Result<()>>,
89 M: EnvelopedMessage,
90 {
91 let prev_handler = self.handlers.insert(
92 TypeId::of::<M>(),
93 Box::new(move |server, envelope| {
94 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
95 (handler)(server, *envelope).boxed()
96 }),
97 );
98 if prev_handler.is_some() {
99 panic!("registered a handler for the same message twice");
100 }
101 self
102 }
103
104 pub fn handle_connection(
105 self: &Arc<Self>,
106 connection: Connection,
107 addr: String,
108 user_id: UserId,
109 mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
110 ) -> impl Future<Output = ()> {
111 let mut this = self.clone();
112 async move {
113 let (connection_id, handle_io, mut incoming_rx) =
114 this.peer.add_connection(connection).await;
115
116 if let Some(send_connection_id) = send_connection_id.as_mut() {
117 let _ = send_connection_id.send(connection_id).await;
118 }
119
120 this.state_mut().add_connection(connection_id, user_id);
121 if let Err(err) = this.update_contacts_for_users(&[user_id]).await {
122 log::error!("error updating contacts for {:?}: {}", user_id, err);
123 }
124
125 let handle_io = handle_io.fuse();
126 futures::pin_mut!(handle_io);
127 loop {
128 let next_message = incoming_rx.recv().fuse();
129 futures::pin_mut!(next_message);
130 futures::select_biased! {
131 message = next_message => {
132 if let Some(message) = message {
133 let start_time = Instant::now();
134 log::info!("RPC message received: {}", message.payload_type_name());
135 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
136 if let Err(err) = (handler)(this.clone(), message).await {
137 log::error!("error handling message: {:?}", err);
138 } else {
139 log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
140 }
141
142 if let Some(mut notifications) = this.notifications.clone() {
143 let _ = notifications.send(()).await;
144 }
145 } else {
146 log::warn!("unhandled message: {}", message.payload_type_name());
147 }
148 } else {
149 log::info!("rpc connection closed {:?}", addr);
150 break;
151 }
152 }
153 handle_io = handle_io => {
154 if let Err(err) = handle_io {
155 log::error!("error handling rpc connection {:?} - {:?}", addr, err);
156 }
157 break;
158 }
159 }
160 }
161
162 if let Err(err) = this.sign_out(connection_id).await {
163 log::error!("error signing out connection {:?} - {:?}", addr, err);
164 }
165 }
166 }
167
168 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
169 self.peer.disconnect(connection_id).await;
170 let removed_connection = self.state_mut().remove_connection(connection_id)?;
171
172 for (project_id, project) in removed_connection.hosted_projects {
173 if let Some(share) = project.share {
174 broadcast(
175 connection_id,
176 share.guests.keys().copied().collect(),
177 |conn_id| {
178 self.peer
179 .send(conn_id, proto::UnshareProject { project_id })
180 },
181 )
182 .await?;
183 }
184 }
185
186 for (project_id, peer_ids) in removed_connection.guest_project_ids {
187 broadcast(connection_id, peer_ids, |conn_id| {
188 self.peer.send(
189 conn_id,
190 proto::RemoveProjectCollaborator {
191 project_id,
192 peer_id: connection_id.0,
193 },
194 )
195 })
196 .await?;
197 }
198
199 self.update_contacts_for_users(removed_connection.contact_ids.iter())
200 .await?;
201
202 Ok(())
203 }
204
205 async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
206 self.peer.respond(request.receipt(), proto::Ack {}).await?;
207 Ok(())
208 }
209
210 async fn register_worktree(
211 mut self: Arc<Server>,
212 request: TypedEnvelope<proto::RegisterWorktree>,
213 ) -> tide::Result<()> {
214 let receipt = request.receipt();
215 let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
216
217 let mut contact_user_ids = HashSet::default();
218 contact_user_ids.insert(host_user_id);
219 for github_login in request.payload.authorized_logins {
220 match self.app_state.db.create_user(&github_login, false).await {
221 Ok(contact_user_id) => {
222 contact_user_ids.insert(contact_user_id);
223 }
224 Err(err) => {
225 let message = err.to_string();
226 self.peer
227 .respond_with_error(receipt, proto::Error { message })
228 .await?;
229 return Ok(());
230 }
231 }
232 }
233
234 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
235 let ok = self.state_mut().register_worktree(
236 request.project_id,
237 request.worktree_id,
238 Worktree {
239 authorized_user_ids: contact_user_ids.clone(),
240 root_name: request.payload.root_name,
241 },
242 );
243
244 if ok {
245 self.peer.respond(receipt, proto::Ack {}).await?;
246 self.update_contacts_for_users(&contact_user_ids).await?;
247 } else {
248 self.peer
249 .respond_with_error(
250 receipt,
251 proto::Error {
252 message: "no such project".to_string(),
253 },
254 )
255 .await?;
256 }
257
258 Ok(())
259 }
260
261 async fn unregister_worktree(
262 mut self: Arc<Server>,
263 request: TypedEnvelope<proto::UnregisterWorktree>,
264 ) -> tide::Result<()> {
265 let project_id = request.payload.project_id;
266 let worktree_id = request.payload.worktree_id;
267 let worktree =
268 self.state_mut()
269 .unregister_worktree(project_id, worktree_id, request.sender_id)?;
270
271 if let Some(share) = worktree.share {
272 broadcast(
273 request.sender_id,
274 share.guests.keys().copied().collect(),
275 |conn_id| {
276 self.peer.send(
277 conn_id,
278 proto::UnregisterWorktree {
279 project_id,
280 worktree_id,
281 },
282 )
283 },
284 )
285 .await?;
286 }
287 self.update_contacts_for_users(&worktree.authorized_user_ids)
288 .await?;
289 Ok(())
290 }
291
292 async fn share_worktree(
293 mut self: Arc<Server>,
294 mut request: TypedEnvelope<proto::ShareWorktree>,
295 ) -> tide::Result<()> {
296 let worktree = request
297 .payload
298 .worktree
299 .as_mut()
300 .ok_or_else(|| anyhow!("missing worktree"))?;
301 let entries = mem::take(&mut worktree.entries)
302 .into_iter()
303 .map(|entry| (entry.id, entry))
304 .collect();
305
306 let contact_user_ids =
307 self.state_mut()
308 .share_worktree(worktree.id, request.sender_id, entries);
309 if let Some(contact_user_ids) = contact_user_ids {
310 self.peer
311 .respond(request.receipt(), proto::ShareWorktreeResponse {})
312 .await?;
313 self.update_contacts_for_users(&contact_user_ids).await?;
314 } else {
315 self.peer
316 .respond_with_error(
317 request.receipt(),
318 proto::Error {
319 message: "no such worktree".to_string(),
320 },
321 )
322 .await?;
323 }
324 Ok(())
325 }
326
327 async fn unshare_worktree(
328 mut self: Arc<Server>,
329 request: TypedEnvelope<proto::UnshareWorktree>,
330 ) -> tide::Result<()> {
331 let worktree_id = request.payload.worktree_id;
332 let worktree = self
333 .state_mut()
334 .unshare_worktree(worktree_id, request.sender_id)?;
335
336 broadcast(request.sender_id, worktree.connection_ids, |conn_id| {
337 self.peer
338 .send(conn_id, proto::UnshareWorktree { worktree_id })
339 })
340 .await?;
341 self.update_contacts_for_users(&worktree.authorized_user_ids)
342 .await?;
343
344 Ok(())
345 }
346
347 async fn join_worktree(
348 mut self: Arc<Server>,
349 request: TypedEnvelope<proto::JoinWorktree>,
350 ) -> tide::Result<()> {
351 let worktree_id = request.payload.worktree_id;
352
353 let user_id = self.state().user_id_for_connection(request.sender_id)?;
354 let response_data = self
355 .state_mut()
356 .join_worktree(request.sender_id, user_id, worktree_id)
357 .and_then(|joined| {
358 let share = joined.worktree.share()?;
359 let peer_count = share.guests.len();
360 let mut collaborators = Vec::with_capacity(peer_count);
361 collaborators.push(proto::Collaborator {
362 peer_id: joined.worktree.host_connection_id.0,
363 replica_id: 0,
364 user_id: joined.worktree.host_user_id.to_proto(),
365 });
366 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
367 if *peer_conn_id != request.sender_id {
368 collaborators.push(proto::Collaborator {
369 peer_id: peer_conn_id.0,
370 replica_id: *peer_replica_id as u32,
371 user_id: peer_user_id.to_proto(),
372 });
373 }
374 }
375 let response = proto::JoinWorktreeResponse {
376 worktree: Some(proto::Worktree {
377 id: worktree_id,
378 root_name: joined.worktree.root_name.clone(),
379 entries: share.entries.values().cloned().collect(),
380 }),
381 replica_id: joined.replica_id as u32,
382 collaborators,
383 };
384 let connection_ids = joined.worktree.connection_ids();
385 let contact_user_ids = joined.worktree.authorized_user_ids.clone();
386 Ok((response, connection_ids, contact_user_ids))
387 });
388
389 match response_data {
390 Ok((response, connection_ids, contact_user_ids)) => {
391 broadcast(request.sender_id, connection_ids, |conn_id| {
392 self.peer.send(
393 conn_id,
394 proto::AddCollaborator {
395 worktree_id,
396 collaborator: Some(proto::Collaborator {
397 peer_id: request.sender_id.0,
398 replica_id: response.replica_id,
399 user_id: user_id.to_proto(),
400 }),
401 },
402 )
403 })
404 .await?;
405 self.peer.respond(request.receipt(), response).await?;
406 self.update_contacts_for_users(&contact_user_ids).await?;
407 }
408 Err(error) => {
409 self.peer
410 .respond_with_error(
411 request.receipt(),
412 proto::Error {
413 message: error.to_string(),
414 },
415 )
416 .await?;
417 }
418 }
419
420 Ok(())
421 }
422
423 async fn leave_worktree(
424 mut self: Arc<Server>,
425 request: TypedEnvelope<proto::LeaveWorktree>,
426 ) -> tide::Result<()> {
427 let sender_id = request.sender_id;
428 let worktree_id = request.payload.worktree_id;
429 let worktree = self.state_mut().leave_worktree(sender_id, worktree_id);
430 if let Some(worktree) = worktree {
431 broadcast(sender_id, worktree.connection_ids, |conn_id| {
432 self.peer.send(
433 conn_id,
434 proto::RemoveCollaborator {
435 worktree_id,
436 peer_id: sender_id.0,
437 },
438 )
439 })
440 .await?;
441 self.update_contacts_for_users(&worktree.authorized_user_ids)
442 .await?;
443 }
444 Ok(())
445 }
446
447 async fn update_worktree(
448 mut self: Arc<Server>,
449 request: TypedEnvelope<proto::UpdateWorktree>,
450 ) -> tide::Result<()> {
451 let connection_ids = self.state_mut().update_worktree(
452 request.sender_id,
453 request.payload.worktree_id,
454 &request.payload.removed_entries,
455 &request.payload.updated_entries,
456 )?;
457
458 broadcast(request.sender_id, connection_ids, |connection_id| {
459 self.peer
460 .forward_send(request.sender_id, connection_id, request.payload.clone())
461 })
462 .await?;
463
464 Ok(())
465 }
466
467 async fn open_buffer(
468 self: Arc<Server>,
469 request: TypedEnvelope<proto::OpenBuffer>,
470 ) -> tide::Result<()> {
471 let receipt = request.receipt();
472 let host_connection_id = self
473 .state()
474 .worktree_host_connection_id(request.sender_id, request.payload.worktree_id)?;
475 let response = self
476 .peer
477 .forward_request(request.sender_id, host_connection_id, request.payload)
478 .await?;
479 self.peer.respond(receipt, response).await?;
480 Ok(())
481 }
482
483 async fn close_buffer(
484 self: Arc<Server>,
485 request: TypedEnvelope<proto::CloseBuffer>,
486 ) -> tide::Result<()> {
487 let host_connection_id = self
488 .state()
489 .worktree_host_connection_id(request.sender_id, request.payload.worktree_id)?;
490 self.peer
491 .forward_send(request.sender_id, host_connection_id, request.payload)
492 .await?;
493 Ok(())
494 }
495
496 async fn save_buffer(
497 self: Arc<Server>,
498 request: TypedEnvelope<proto::SaveBuffer>,
499 ) -> tide::Result<()> {
500 let host;
501 let guests;
502 {
503 let state = self.state();
504 host = state
505 .worktree_host_connection_id(request.sender_id, request.payload.worktree_id)?;
506 guests = state
507 .worktree_guest_connection_ids(request.sender_id, request.payload.worktree_id)?;
508 }
509
510 let sender = request.sender_id;
511 let receipt = request.receipt();
512 let response = self
513 .peer
514 .forward_request(sender, host, request.payload.clone())
515 .await?;
516
517 broadcast(host, guests, |conn_id| {
518 let response = response.clone();
519 let peer = &self.peer;
520 async move {
521 if conn_id == sender {
522 peer.respond(receipt, response).await
523 } else {
524 peer.forward_send(host, conn_id, response).await
525 }
526 }
527 })
528 .await?;
529
530 Ok(())
531 }
532
533 async fn update_buffer(
534 self: Arc<Server>,
535 request: TypedEnvelope<proto::UpdateBuffer>,
536 ) -> tide::Result<()> {
537 let receiver_ids = self
538 .state()
539 .worktree_connection_ids(request.sender_id, request.payload.worktree_id)?;
540 broadcast(request.sender_id, receiver_ids, |connection_id| {
541 self.peer
542 .forward_send(request.sender_id, connection_id, request.payload.clone())
543 })
544 .await?;
545 self.peer.respond(request.receipt(), proto::Ack {}).await?;
546 Ok(())
547 }
548
549 async fn buffer_saved(
550 self: Arc<Server>,
551 request: TypedEnvelope<proto::BufferSaved>,
552 ) -> tide::Result<()> {
553 let receiver_ids = self
554 .state()
555 .worktree_connection_ids(request.sender_id, request.payload.worktree_id)?;
556 broadcast(request.sender_id, receiver_ids, |connection_id| {
557 self.peer
558 .forward_send(request.sender_id, connection_id, request.payload.clone())
559 })
560 .await?;
561 Ok(())
562 }
563
564 async fn get_channels(
565 self: Arc<Server>,
566 request: TypedEnvelope<proto::GetChannels>,
567 ) -> tide::Result<()> {
568 let user_id = self.state().user_id_for_connection(request.sender_id)?;
569 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
570 self.peer
571 .respond(
572 request.receipt(),
573 proto::GetChannelsResponse {
574 channels: channels
575 .into_iter()
576 .map(|chan| proto::Channel {
577 id: chan.id.to_proto(),
578 name: chan.name,
579 })
580 .collect(),
581 },
582 )
583 .await?;
584 Ok(())
585 }
586
587 async fn get_users(
588 self: Arc<Server>,
589 request: TypedEnvelope<proto::GetUsers>,
590 ) -> tide::Result<()> {
591 let receipt = request.receipt();
592 let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
593 let users = self
594 .app_state
595 .db
596 .get_users_by_ids(user_ids)
597 .await?
598 .into_iter()
599 .map(|user| proto::User {
600 id: user.id.to_proto(),
601 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
602 github_login: user.github_login,
603 })
604 .collect();
605 self.peer
606 .respond(receipt, proto::GetUsersResponse { users })
607 .await?;
608 Ok(())
609 }
610
611 async fn update_contacts_for_users<'a>(
612 self: &Arc<Server>,
613 user_ids: impl IntoIterator<Item = &'a UserId>,
614 ) -> tide::Result<()> {
615 let mut send_futures = Vec::new();
616
617 {
618 let state = self.state();
619 for user_id in user_ids {
620 let contacts = state.contacts_for_user(*user_id);
621 for connection_id in state.connection_ids_for_user(*user_id) {
622 send_futures.push(self.peer.send(
623 connection_id,
624 proto::UpdateContacts {
625 contacts: contacts.clone(),
626 },
627 ));
628 }
629 }
630 }
631 futures::future::try_join_all(send_futures).await?;
632
633 Ok(())
634 }
635
636 async fn join_channel(
637 mut self: Arc<Self>,
638 request: TypedEnvelope<proto::JoinChannel>,
639 ) -> tide::Result<()> {
640 let user_id = self.state().user_id_for_connection(request.sender_id)?;
641 let channel_id = ChannelId::from_proto(request.payload.channel_id);
642 if !self
643 .app_state
644 .db
645 .can_user_access_channel(user_id, channel_id)
646 .await?
647 {
648 Err(anyhow!("access denied"))?;
649 }
650
651 self.state_mut().join_channel(request.sender_id, channel_id);
652 let messages = self
653 .app_state
654 .db
655 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
656 .await?
657 .into_iter()
658 .map(|msg| proto::ChannelMessage {
659 id: msg.id.to_proto(),
660 body: msg.body,
661 timestamp: msg.sent_at.unix_timestamp() as u64,
662 sender_id: msg.sender_id.to_proto(),
663 nonce: Some(msg.nonce.as_u128().into()),
664 })
665 .collect::<Vec<_>>();
666 self.peer
667 .respond(
668 request.receipt(),
669 proto::JoinChannelResponse {
670 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
671 messages,
672 },
673 )
674 .await?;
675 Ok(())
676 }
677
678 async fn leave_channel(
679 mut self: Arc<Self>,
680 request: TypedEnvelope<proto::LeaveChannel>,
681 ) -> tide::Result<()> {
682 let user_id = self.state().user_id_for_connection(request.sender_id)?;
683 let channel_id = ChannelId::from_proto(request.payload.channel_id);
684 if !self
685 .app_state
686 .db
687 .can_user_access_channel(user_id, channel_id)
688 .await?
689 {
690 Err(anyhow!("access denied"))?;
691 }
692
693 self.state_mut()
694 .leave_channel(request.sender_id, channel_id);
695
696 Ok(())
697 }
698
699 async fn send_channel_message(
700 self: Arc<Self>,
701 request: TypedEnvelope<proto::SendChannelMessage>,
702 ) -> tide::Result<()> {
703 let receipt = request.receipt();
704 let channel_id = ChannelId::from_proto(request.payload.channel_id);
705 let user_id;
706 let connection_ids;
707 {
708 let state = self.state();
709 user_id = state.user_id_for_connection(request.sender_id)?;
710 if let Some(ids) = state.channel_connection_ids(channel_id) {
711 connection_ids = ids;
712 } else {
713 return Ok(());
714 }
715 }
716
717 // Validate the message body.
718 let body = request.payload.body.trim().to_string();
719 if body.len() > MAX_MESSAGE_LEN {
720 self.peer
721 .respond_with_error(
722 receipt,
723 proto::Error {
724 message: "message is too long".to_string(),
725 },
726 )
727 .await?;
728 return Ok(());
729 }
730 if body.is_empty() {
731 self.peer
732 .respond_with_error(
733 receipt,
734 proto::Error {
735 message: "message can't be blank".to_string(),
736 },
737 )
738 .await?;
739 return Ok(());
740 }
741
742 let timestamp = OffsetDateTime::now_utc();
743 let nonce = if let Some(nonce) = request.payload.nonce {
744 nonce
745 } else {
746 self.peer
747 .respond_with_error(
748 receipt,
749 proto::Error {
750 message: "nonce can't be blank".to_string(),
751 },
752 )
753 .await?;
754 return Ok(());
755 };
756
757 let message_id = self
758 .app_state
759 .db
760 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
761 .await?
762 .to_proto();
763 let message = proto::ChannelMessage {
764 sender_id: user_id.to_proto(),
765 id: message_id,
766 body,
767 timestamp: timestamp.unix_timestamp() as u64,
768 nonce: Some(nonce),
769 };
770 broadcast(request.sender_id, connection_ids, |conn_id| {
771 self.peer.send(
772 conn_id,
773 proto::ChannelMessageSent {
774 channel_id: channel_id.to_proto(),
775 message: Some(message.clone()),
776 },
777 )
778 })
779 .await?;
780 self.peer
781 .respond(
782 receipt,
783 proto::SendChannelMessageResponse {
784 message: Some(message),
785 },
786 )
787 .await?;
788 Ok(())
789 }
790
791 async fn get_channel_messages(
792 self: Arc<Self>,
793 request: TypedEnvelope<proto::GetChannelMessages>,
794 ) -> tide::Result<()> {
795 let user_id = self.state().user_id_for_connection(request.sender_id)?;
796 let channel_id = ChannelId::from_proto(request.payload.channel_id);
797 if !self
798 .app_state
799 .db
800 .can_user_access_channel(user_id, channel_id)
801 .await?
802 {
803 Err(anyhow!("access denied"))?;
804 }
805
806 let messages = self
807 .app_state
808 .db
809 .get_channel_messages(
810 channel_id,
811 MESSAGE_COUNT_PER_PAGE,
812 Some(MessageId::from_proto(request.payload.before_message_id)),
813 )
814 .await?
815 .into_iter()
816 .map(|msg| proto::ChannelMessage {
817 id: msg.id.to_proto(),
818 body: msg.body,
819 timestamp: msg.sent_at.unix_timestamp() as u64,
820 sender_id: msg.sender_id.to_proto(),
821 nonce: Some(msg.nonce.as_u128().into()),
822 })
823 .collect::<Vec<_>>();
824 self.peer
825 .respond(
826 request.receipt(),
827 proto::GetChannelMessagesResponse {
828 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
829 messages,
830 },
831 )
832 .await?;
833 Ok(())
834 }
835
836 fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
837 self.store.read()
838 }
839
840 fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
841 self.store.write()
842 }
843}
844
845pub async fn broadcast<F, T>(
846 sender_id: ConnectionId,
847 receiver_ids: Vec<ConnectionId>,
848 mut f: F,
849) -> anyhow::Result<()>
850where
851 F: FnMut(ConnectionId) -> T,
852 T: Future<Output = anyhow::Result<()>>,
853{
854 let futures = receiver_ids
855 .into_iter()
856 .filter(|id| *id != sender_id)
857 .map(|id| f(id));
858 futures::future::try_join_all(futures).await?;
859 Ok(())
860}
861
862pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
863 let server = Server::new(app.state().clone(), rpc.clone(), None);
864 app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
865 let server = server.clone();
866 async move {
867 const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
868
869 let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
870 let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
871 let upgrade_requested = connection_upgrade && upgrade_to_websocket;
872 let client_protocol_version: Option<u32> = request
873 .header("X-Zed-Protocol-Version")
874 .and_then(|v| v.as_str().parse().ok());
875
876 if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
877 return Ok(Response::new(StatusCode::UpgradeRequired));
878 }
879
880 let header = match request.header("Sec-Websocket-Key") {
881 Some(h) => h.as_str(),
882 None => return Err(anyhow!("expected sec-websocket-key"))?,
883 };
884
885 let user_id = process_auth_header(&request).await?;
886
887 let mut response = Response::new(StatusCode::SwitchingProtocols);
888 response.insert_header(UPGRADE, "websocket");
889 response.insert_header(CONNECTION, "Upgrade");
890 let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
891 response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
892 response.insert_header("Sec-Websocket-Version", "13");
893
894 let http_res: &mut tide::http::Response = response.as_mut();
895 let upgrade_receiver = http_res.recv_upgrade().await;
896 let addr = request.remote().unwrap_or("unknown").to_string();
897 task::spawn(async move {
898 if let Some(stream) = upgrade_receiver.await {
899 server
900 .handle_connection(
901 Connection::new(
902 WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
903 ),
904 addr,
905 user_id,
906 None,
907 )
908 .await;
909 }
910 });
911
912 Ok(response)
913 }
914 });
915}
916
917fn header_contains_ignore_case<T>(
918 request: &tide::Request<T>,
919 header_name: HeaderName,
920 value: &str,
921) -> bool {
922 request
923 .header(header_name)
924 .map(|h| {
925 h.as_str()
926 .split(',')
927 .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
928 })
929 .unwrap_or(false)
930}
931
932#[cfg(test)]
933mod tests {
934 use super::*;
935 use crate::{
936 auth,
937 db::{tests::TestDb, UserId},
938 github, AppState, Config,
939 };
940 use ::rpc::Peer;
941 use async_std::task;
942 use gpui::{ModelHandle, TestAppContext};
943 use parking_lot::Mutex;
944 use postage::{mpsc, watch};
945 use rpc::PeerId;
946 use serde_json::json;
947 use sqlx::types::time::OffsetDateTime;
948 use std::{
949 ops::Deref,
950 path::Path,
951 sync::{
952 atomic::{AtomicBool, Ordering::SeqCst},
953 Arc,
954 },
955 time::Duration,
956 };
957 use zed::{
958 client::{
959 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
960 EstablishConnectionError, UserStore,
961 },
962 contacts_panel::JoinWorktree,
963 editor::{Editor, EditorSettings, Input, MultiBuffer},
964 fs::{FakeFs, Fs as _},
965 language::{
966 tree_sitter_rust, Diagnostic, DiagnosticEntry, Language, LanguageConfig,
967 LanguageRegistry, LanguageServerConfig, Point,
968 },
969 lsp,
970 project::{ProjectPath, Worktree},
971 test::test_app_state,
972 workspace::Workspace,
973 };
974
975 #[gpui::test]
976 async fn test_share_worktree(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
977 let (window_b, _) = cx_b.add_window(|_| EmptyView);
978 let lang_registry = Arc::new(LanguageRegistry::new());
979
980 // Connect to a server as 2 clients.
981 let mut server = TestServer::start().await;
982 let client_a = server.create_client(&mut cx_a, "user_a").await;
983 let client_b = server.create_client(&mut cx_b, "user_b").await;
984
985 cx_a.foreground().forbid_parking();
986
987 // Share a local worktree as client A
988 let fs = Arc::new(FakeFs::new());
989 fs.insert_tree(
990 "/a",
991 json!({
992 ".zed.toml": r#"collaborators = ["user_b"]"#,
993 "a.txt": "a-contents",
994 "b.txt": "b-contents",
995 }),
996 )
997 .await;
998 let worktree_a = Worktree::open_local(
999 client_a.clone(),
1000 client_a.user_store.clone(),
1001 "/a".as_ref(),
1002 fs,
1003 lang_registry.clone(),
1004 &mut cx_a.to_async(),
1005 )
1006 .await
1007 .unwrap();
1008 worktree_a
1009 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1010 .await;
1011 let worktree_id = worktree_a
1012 .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1013 .await
1014 .unwrap();
1015
1016 // Join that worktree as client B, and see that a guest has joined as client A.
1017 let worktree_b = Worktree::open_remote(
1018 client_b.clone(),
1019 worktree_id,
1020 lang_registry.clone(),
1021 client_b.user_store.clone(),
1022 &mut cx_b.to_async(),
1023 )
1024 .await
1025 .unwrap();
1026
1027 let replica_id_b = worktree_b.read_with(&cx_b, |tree, _| {
1028 assert_eq!(
1029 tree.collaborators()
1030 .get(&client_a.peer_id)
1031 .unwrap()
1032 .user
1033 .github_login,
1034 "user_a"
1035 );
1036 tree.replica_id()
1037 });
1038 worktree_a
1039 .condition(&cx_a, |tree, _| {
1040 tree.collaborators()
1041 .get(&client_b.peer_id)
1042 .map_or(false, |collaborator| {
1043 collaborator.replica_id == replica_id_b
1044 && collaborator.user.github_login == "user_b"
1045 })
1046 })
1047 .await;
1048
1049 // Open the same file as client B and client A.
1050 let buffer_b = worktree_b
1051 .update(&mut cx_b, |worktree, cx| worktree.open_buffer("b.txt", cx))
1052 .await
1053 .unwrap();
1054 let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1055 buffer_b.read_with(&cx_b, |buf, cx| {
1056 assert_eq!(buf.read(cx).text(), "b-contents")
1057 });
1058 worktree_a.read_with(&cx_a, |tree, cx| assert!(tree.has_open_buffer("b.txt", cx)));
1059 let buffer_a = worktree_a
1060 .update(&mut cx_a, |tree, cx| tree.open_buffer("b.txt", cx))
1061 .await
1062 .unwrap();
1063
1064 let editor_b = cx_b.add_view(window_b, |cx| {
1065 Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), cx)
1066 });
1067 // TODO
1068 // // Create a selection set as client B and see that selection set as client A.
1069 // buffer_a
1070 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1071 // .await;
1072
1073 // Edit the buffer as client B and see that edit as client A.
1074 editor_b.update(&mut cx_b, |editor, cx| {
1075 editor.handle_input(&Input("ok, ".into()), cx)
1076 });
1077 buffer_a
1078 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1079 .await;
1080
1081 // TODO
1082 // // Remove the selection set as client B, see those selections disappear as client A.
1083 cx_b.update(move |_| drop(editor_b));
1084 // buffer_a
1085 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1086 // .await;
1087
1088 // Close the buffer as client A, see that the buffer is closed.
1089 cx_a.update(move |_| drop(buffer_a));
1090 worktree_a
1091 .condition(&cx_a, |tree, cx| !tree.has_open_buffer("b.txt", cx))
1092 .await;
1093
1094 // Dropping the worktree removes client B from client A's collaborators.
1095 cx_b.update(move |_| drop(worktree_b));
1096 worktree_a
1097 .condition(&cx_a, |tree, _| tree.collaborators().is_empty())
1098 .await;
1099 }
1100
1101 #[gpui::test]
1102 async fn test_unshare_worktree(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1103 cx_b.update(zed::contacts_panel::init);
1104 let mut app_state_a = cx_a.update(test_app_state);
1105 let mut app_state_b = cx_b.update(test_app_state);
1106
1107 // Connect to a server as 2 clients.
1108 let mut server = TestServer::start().await;
1109 let client_a = server.create_client(&mut cx_a, "user_a").await;
1110 let client_b = server.create_client(&mut cx_b, "user_b").await;
1111 Arc::get_mut(&mut app_state_a).unwrap().client = client_a.clone();
1112 Arc::get_mut(&mut app_state_a).unwrap().user_store = client_a.user_store.clone();
1113 Arc::get_mut(&mut app_state_b).unwrap().client = client_b.clone();
1114 Arc::get_mut(&mut app_state_b).unwrap().user_store = client_b.user_store.clone();
1115
1116 cx_a.foreground().forbid_parking();
1117
1118 // Share a local worktree as client A
1119 let fs = Arc::new(FakeFs::new());
1120 fs.insert_tree(
1121 "/a",
1122 json!({
1123 ".zed.toml": r#"collaborators = ["user_b"]"#,
1124 "a.txt": "a-contents",
1125 "b.txt": "b-contents",
1126 }),
1127 )
1128 .await;
1129 let worktree_a = Worktree::open_local(
1130 app_state_a.client.clone(),
1131 app_state_a.user_store.clone(),
1132 "/a".as_ref(),
1133 fs,
1134 app_state_a.languages.clone(),
1135 &mut cx_a.to_async(),
1136 )
1137 .await
1138 .unwrap();
1139 worktree_a
1140 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1141 .await;
1142
1143 let remote_worktree_id = worktree_a
1144 .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1145 .await
1146 .unwrap();
1147
1148 let (window_b, workspace_b) =
1149 cx_b.add_window(|cx| Workspace::new(&app_state_b.as_ref().into(), cx));
1150 cx_b.update(|cx| {
1151 cx.dispatch_action(
1152 window_b,
1153 vec![workspace_b.id()],
1154 &JoinWorktree(remote_worktree_id),
1155 );
1156 });
1157 workspace_b
1158 .condition(&cx_b, |workspace, cx| workspace.worktrees(cx).len() == 1)
1159 .await;
1160
1161 let local_worktree_id_b = workspace_b.read_with(&cx_b, |workspace, cx| {
1162 let active_pane = workspace.active_pane().read(cx);
1163 assert!(active_pane.active_item().is_none());
1164 workspace.worktrees(cx).first().unwrap().id()
1165 });
1166 workspace_b
1167 .update(&mut cx_b, |workspace, cx| {
1168 workspace.open_entry(
1169 ProjectPath {
1170 worktree_id: local_worktree_id_b,
1171 path: Path::new("a.txt").into(),
1172 },
1173 cx,
1174 )
1175 })
1176 .unwrap()
1177 .await
1178 .unwrap();
1179 workspace_b.read_with(&cx_b, |workspace, cx| {
1180 let active_pane = workspace.active_pane().read(cx);
1181 assert!(active_pane.active_item().is_some());
1182 });
1183
1184 worktree_a.update(&mut cx_a, |tree, cx| {
1185 tree.as_local_mut().unwrap().unshare(cx);
1186 });
1187 workspace_b
1188 .condition(&cx_b, |workspace, cx| workspace.worktrees(cx).len() == 0)
1189 .await;
1190 workspace_b.read_with(&cx_b, |workspace, cx| {
1191 let active_pane = workspace.active_pane().read(cx);
1192 assert!(active_pane.active_item().is_none());
1193 });
1194 }
1195
1196 #[gpui::test]
1197 async fn test_propagate_saves_and_fs_changes_in_shared_worktree(
1198 mut cx_a: TestAppContext,
1199 mut cx_b: TestAppContext,
1200 mut cx_c: TestAppContext,
1201 ) {
1202 cx_a.foreground().forbid_parking();
1203 let lang_registry = Arc::new(LanguageRegistry::new());
1204
1205 // Connect to a server as 3 clients.
1206 let mut server = TestServer::start().await;
1207 let client_a = server.create_client(&mut cx_a, "user_a").await;
1208 let client_b = server.create_client(&mut cx_b, "user_b").await;
1209 let client_c = server.create_client(&mut cx_c, "user_c").await;
1210
1211 let fs = Arc::new(FakeFs::new());
1212
1213 // Share a worktree as client A.
1214 fs.insert_tree(
1215 "/a",
1216 json!({
1217 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1218 "file1": "",
1219 "file2": ""
1220 }),
1221 )
1222 .await;
1223
1224 let worktree_a = Worktree::open_local(
1225 client_a.clone(),
1226 client_a.user_store.clone(),
1227 "/a".as_ref(),
1228 fs.clone(),
1229 lang_registry.clone(),
1230 &mut cx_a.to_async(),
1231 )
1232 .await
1233 .unwrap();
1234 worktree_a
1235 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1236 .await;
1237 let worktree_id = worktree_a
1238 .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1239 .await
1240 .unwrap();
1241
1242 // Join that worktree as clients B and C.
1243 let worktree_b = Worktree::open_remote(
1244 client_b.clone(),
1245 worktree_id,
1246 lang_registry.clone(),
1247 client_b.user_store.clone(),
1248 &mut cx_b.to_async(),
1249 )
1250 .await
1251 .unwrap();
1252 let worktree_c = Worktree::open_remote(
1253 client_c.clone(),
1254 worktree_id,
1255 lang_registry.clone(),
1256 client_c.user_store.clone(),
1257 &mut cx_c.to_async(),
1258 )
1259 .await
1260 .unwrap();
1261
1262 // Open and edit a buffer as both guests B and C.
1263 let buffer_b = worktree_b
1264 .update(&mut cx_b, |tree, cx| tree.open_buffer("file1", cx))
1265 .await
1266 .unwrap();
1267 let buffer_c = worktree_c
1268 .update(&mut cx_c, |tree, cx| tree.open_buffer("file1", cx))
1269 .await
1270 .unwrap();
1271 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1272 buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1273
1274 // Open and edit that buffer as the host.
1275 let buffer_a = worktree_a
1276 .update(&mut cx_a, |tree, cx| tree.open_buffer("file1", cx))
1277 .await
1278 .unwrap();
1279
1280 buffer_a
1281 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1282 .await;
1283 buffer_a.update(&mut cx_a, |buf, cx| {
1284 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1285 });
1286
1287 // Wait for edits to propagate
1288 buffer_a
1289 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1290 .await;
1291 buffer_b
1292 .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1293 .await;
1294 buffer_c
1295 .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1296 .await;
1297
1298 // Edit the buffer as the host and concurrently save as guest B.
1299 let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx).unwrap());
1300 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1301 save_b.await.unwrap();
1302 assert_eq!(
1303 fs.load("/a/file1".as_ref()).await.unwrap(),
1304 "hi-a, i-am-c, i-am-b, i-am-a"
1305 );
1306 buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1307 buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1308 buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1309
1310 // Make changes on host's file system, see those changes on the guests.
1311 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
1312 .await
1313 .unwrap();
1314 fs.insert_file(Path::new("/a/file4"), "4".into())
1315 .await
1316 .unwrap();
1317
1318 worktree_b
1319 .condition(&cx_b, |tree, _| tree.file_count() == 4)
1320 .await;
1321 worktree_c
1322 .condition(&cx_c, |tree, _| tree.file_count() == 4)
1323 .await;
1324 worktree_b.read_with(&cx_b, |tree, _| {
1325 assert_eq!(
1326 tree.paths()
1327 .map(|p| p.to_string_lossy())
1328 .collect::<Vec<_>>(),
1329 &[".zed.toml", "file1", "file3", "file4"]
1330 )
1331 });
1332 worktree_c.read_with(&cx_c, |tree, _| {
1333 assert_eq!(
1334 tree.paths()
1335 .map(|p| p.to_string_lossy())
1336 .collect::<Vec<_>>(),
1337 &[".zed.toml", "file1", "file3", "file4"]
1338 )
1339 });
1340 }
1341
1342 #[gpui::test]
1343 async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1344 cx_a.foreground().forbid_parking();
1345 let lang_registry = Arc::new(LanguageRegistry::new());
1346
1347 // Connect to a server as 2 clients.
1348 let mut server = TestServer::start().await;
1349 let client_a = server.create_client(&mut cx_a, "user_a").await;
1350 let client_b = server.create_client(&mut cx_b, "user_b").await;
1351
1352 // Share a local worktree as client A
1353 let fs = Arc::new(FakeFs::new());
1354 fs.insert_tree(
1355 "/dir",
1356 json!({
1357 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1358 "a.txt": "a-contents",
1359 }),
1360 )
1361 .await;
1362
1363 let worktree_a = Worktree::open_local(
1364 client_a.clone(),
1365 client_a.user_store.clone(),
1366 "/dir".as_ref(),
1367 fs,
1368 lang_registry.clone(),
1369 &mut cx_a.to_async(),
1370 )
1371 .await
1372 .unwrap();
1373 worktree_a
1374 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1375 .await;
1376 let worktree_id = worktree_a
1377 .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1378 .await
1379 .unwrap();
1380
1381 // Join that worktree as client B, and see that a guest has joined as client A.
1382 let worktree_b = Worktree::open_remote(
1383 client_b.clone(),
1384 worktree_id,
1385 lang_registry.clone(),
1386 client_b.user_store.clone(),
1387 &mut cx_b.to_async(),
1388 )
1389 .await
1390 .unwrap();
1391
1392 let buffer_b = worktree_b
1393 .update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx))
1394 .await
1395 .unwrap();
1396 let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1397
1398 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1399 buffer_b.read_with(&cx_b, |buf, _| {
1400 assert!(buf.is_dirty());
1401 assert!(!buf.has_conflict());
1402 });
1403
1404 buffer_b
1405 .update(&mut cx_b, |buf, cx| buf.save(cx))
1406 .unwrap()
1407 .await
1408 .unwrap();
1409 worktree_b
1410 .condition(&cx_b, |_, cx| {
1411 buffer_b.read(cx).file().unwrap().mtime() != mtime
1412 })
1413 .await;
1414 buffer_b.read_with(&cx_b, |buf, _| {
1415 assert!(!buf.is_dirty());
1416 assert!(!buf.has_conflict());
1417 });
1418
1419 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1420 buffer_b.read_with(&cx_b, |buf, _| {
1421 assert!(buf.is_dirty());
1422 assert!(!buf.has_conflict());
1423 });
1424 }
1425
1426 #[gpui::test]
1427 async fn test_editing_while_guest_opens_buffer(
1428 mut cx_a: TestAppContext,
1429 mut cx_b: TestAppContext,
1430 ) {
1431 cx_a.foreground().forbid_parking();
1432 let lang_registry = Arc::new(LanguageRegistry::new());
1433
1434 // Connect to a server as 2 clients.
1435 let mut server = TestServer::start().await;
1436 let client_a = server.create_client(&mut cx_a, "user_a").await;
1437 let client_b = server.create_client(&mut cx_b, "user_b").await;
1438
1439 // Share a local worktree as client A
1440 let fs = Arc::new(FakeFs::new());
1441 fs.insert_tree(
1442 "/dir",
1443 json!({
1444 ".zed.toml": r#"collaborators = ["user_b"]"#,
1445 "a.txt": "a-contents",
1446 }),
1447 )
1448 .await;
1449 let worktree_a = Worktree::open_local(
1450 client_a.clone(),
1451 client_a.user_store.clone(),
1452 "/dir".as_ref(),
1453 fs,
1454 lang_registry.clone(),
1455 &mut cx_a.to_async(),
1456 )
1457 .await
1458 .unwrap();
1459 worktree_a
1460 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1461 .await;
1462 let worktree_id = worktree_a
1463 .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1464 .await
1465 .unwrap();
1466
1467 // Join that worktree as client B, and see that a guest has joined as client A.
1468 let worktree_b = Worktree::open_remote(
1469 client_b.clone(),
1470 worktree_id,
1471 lang_registry.clone(),
1472 client_b.user_store.clone(),
1473 &mut cx_b.to_async(),
1474 )
1475 .await
1476 .unwrap();
1477
1478 let buffer_a = worktree_a
1479 .update(&mut cx_a, |tree, cx| tree.open_buffer("a.txt", cx))
1480 .await
1481 .unwrap();
1482 let buffer_b = cx_b
1483 .background()
1484 .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1485
1486 task::yield_now().await;
1487 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1488
1489 let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1490 let buffer_b = buffer_b.await.unwrap();
1491 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1492 }
1493
1494 #[gpui::test]
1495 async fn test_leaving_worktree_while_opening_buffer(
1496 mut cx_a: TestAppContext,
1497 mut cx_b: TestAppContext,
1498 ) {
1499 cx_a.foreground().forbid_parking();
1500 let lang_registry = Arc::new(LanguageRegistry::new());
1501
1502 // Connect to a server as 2 clients.
1503 let mut server = TestServer::start().await;
1504 let client_a = server.create_client(&mut cx_a, "user_a").await;
1505 let client_b = server.create_client(&mut cx_b, "user_b").await;
1506
1507 // Share a local worktree as client A
1508 let fs = Arc::new(FakeFs::new());
1509 fs.insert_tree(
1510 "/dir",
1511 json!({
1512 ".zed.toml": r#"collaborators = ["user_b"]"#,
1513 "a.txt": "a-contents",
1514 }),
1515 )
1516 .await;
1517 let worktree_a = Worktree::open_local(
1518 client_a.clone(),
1519 client_a.user_store.clone(),
1520 "/dir".as_ref(),
1521 fs,
1522 lang_registry.clone(),
1523 &mut cx_a.to_async(),
1524 )
1525 .await
1526 .unwrap();
1527 worktree_a
1528 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1529 .await;
1530 let worktree_id = worktree_a
1531 .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1532 .await
1533 .unwrap();
1534
1535 // Join that worktree as client B, and see that a guest has joined as client A.
1536 let worktree_b = Worktree::open_remote(
1537 client_b.clone(),
1538 worktree_id,
1539 lang_registry.clone(),
1540 client_b.user_store.clone(),
1541 &mut cx_b.to_async(),
1542 )
1543 .await
1544 .unwrap();
1545 worktree_a
1546 .condition(&cx_a, |tree, _| tree.collaborators().len() == 1)
1547 .await;
1548
1549 let buffer_b = cx_b
1550 .background()
1551 .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1552 cx_b.update(|_| drop(worktree_b));
1553 drop(buffer_b);
1554 worktree_a
1555 .condition(&cx_a, |tree, _| tree.collaborators().len() == 0)
1556 .await;
1557 }
1558
1559 #[gpui::test]
1560 async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1561 cx_a.foreground().forbid_parking();
1562 let lang_registry = Arc::new(LanguageRegistry::new());
1563
1564 // Connect to a server as 2 clients.
1565 let mut server = TestServer::start().await;
1566 let client_a = server.create_client(&mut cx_a, "user_a").await;
1567 let client_b = server.create_client(&mut cx_b, "user_b").await;
1568
1569 // Share a local worktree as client A
1570 let fs = Arc::new(FakeFs::new());
1571 fs.insert_tree(
1572 "/a",
1573 json!({
1574 ".zed.toml": r#"collaborators = ["user_b"]"#,
1575 "a.txt": "a-contents",
1576 "b.txt": "b-contents",
1577 }),
1578 )
1579 .await;
1580 let worktree_a = Worktree::open_local(
1581 client_a.clone(),
1582 client_a.user_store.clone(),
1583 "/a".as_ref(),
1584 fs,
1585 lang_registry.clone(),
1586 &mut cx_a.to_async(),
1587 )
1588 .await
1589 .unwrap();
1590 worktree_a
1591 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1592 .await;
1593 let worktree_id = worktree_a
1594 .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1595 .await
1596 .unwrap();
1597
1598 // Join that worktree as client B, and see that a guest has joined as client A.
1599 let _worktree_b = Worktree::open_remote(
1600 client_b.clone(),
1601 worktree_id,
1602 lang_registry.clone(),
1603 client_b.user_store.clone(),
1604 &mut cx_b.to_async(),
1605 )
1606 .await
1607 .unwrap();
1608 worktree_a
1609 .condition(&cx_a, |tree, _| tree.collaborators().len() == 1)
1610 .await;
1611
1612 // Drop client B's connection and ensure client A observes client B leaving the worktree.
1613 client_b.disconnect(&cx_b.to_async()).await.unwrap();
1614 worktree_a
1615 .condition(&cx_a, |tree, _| tree.collaborators().len() == 0)
1616 .await;
1617 }
1618
1619 #[gpui::test]
1620 async fn test_collaborating_with_diagnostics(
1621 mut cx_a: TestAppContext,
1622 mut cx_b: TestAppContext,
1623 ) {
1624 cx_a.foreground().forbid_parking();
1625 let (language_server_config, mut fake_language_server) =
1626 LanguageServerConfig::fake(cx_a.background()).await;
1627 let mut lang_registry = LanguageRegistry::new();
1628 lang_registry.add(Arc::new(Language::new(
1629 LanguageConfig {
1630 name: "Rust".to_string(),
1631 path_suffixes: vec!["rs".to_string()],
1632 language_server: Some(language_server_config),
1633 ..Default::default()
1634 },
1635 Some(tree_sitter_rust::language()),
1636 )));
1637
1638 let lang_registry = Arc::new(lang_registry);
1639
1640 // Connect to a server as 2 clients.
1641 let mut server = TestServer::start().await;
1642 let client_a = server.create_client(&mut cx_a, "user_a").await;
1643 let client_b = server.create_client(&mut cx_b, "user_b").await;
1644
1645 // Share a local worktree as client A
1646 let fs = Arc::new(FakeFs::new());
1647 fs.insert_tree(
1648 "/a",
1649 json!({
1650 ".zed.toml": r#"collaborators = ["user_b"]"#,
1651 "a.rs": "let one = two",
1652 "other.rs": "",
1653 }),
1654 )
1655 .await;
1656 let worktree_a = Worktree::open_local(
1657 client_a.clone(),
1658 client_a.user_store.clone(),
1659 "/a".as_ref(),
1660 fs,
1661 lang_registry.clone(),
1662 &mut cx_a.to_async(),
1663 )
1664 .await
1665 .unwrap();
1666 worktree_a
1667 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1668 .await;
1669 let worktree_id = worktree_a
1670 .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
1671 .await
1672 .unwrap();
1673
1674 // Cause language server to start.
1675 let _ = cx_a
1676 .background()
1677 .spawn(worktree_a.update(&mut cx_a, |worktree, cx| {
1678 worktree.open_buffer("other.rs", cx)
1679 }))
1680 .await
1681 .unwrap();
1682
1683 // Simulate a language server reporting errors for a file.
1684 fake_language_server
1685 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1686 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1687 version: None,
1688 diagnostics: vec![
1689 lsp::Diagnostic {
1690 severity: Some(lsp::DiagnosticSeverity::ERROR),
1691 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1692 message: "message 1".to_string(),
1693 ..Default::default()
1694 },
1695 lsp::Diagnostic {
1696 severity: Some(lsp::DiagnosticSeverity::WARNING),
1697 range: lsp::Range::new(
1698 lsp::Position::new(0, 10),
1699 lsp::Position::new(0, 13),
1700 ),
1701 message: "message 2".to_string(),
1702 ..Default::default()
1703 },
1704 ],
1705 })
1706 .await;
1707
1708 // Join the worktree as client B.
1709 let worktree_b = Worktree::open_remote(
1710 client_b.clone(),
1711 worktree_id,
1712 lang_registry.clone(),
1713 client_b.user_store.clone(),
1714 &mut cx_b.to_async(),
1715 )
1716 .await
1717 .unwrap();
1718
1719 // Open the file with the errors.
1720 let buffer_b = cx_b
1721 .background()
1722 .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.rs", cx)))
1723 .await
1724 .unwrap();
1725
1726 buffer_b.read_with(&cx_b, |buffer, _| {
1727 assert_eq!(
1728 buffer
1729 .snapshot()
1730 .diagnostics_in_range::<_, Point>(0..buffer.len())
1731 .collect::<Vec<_>>(),
1732 &[
1733 DiagnosticEntry {
1734 range: Point::new(0, 4)..Point::new(0, 7),
1735 diagnostic: Diagnostic {
1736 group_id: 0,
1737 message: "message 1".to_string(),
1738 severity: lsp::DiagnosticSeverity::ERROR,
1739 is_primary: true,
1740 ..Default::default()
1741 }
1742 },
1743 DiagnosticEntry {
1744 range: Point::new(0, 10)..Point::new(0, 13),
1745 diagnostic: Diagnostic {
1746 group_id: 1,
1747 severity: lsp::DiagnosticSeverity::WARNING,
1748 message: "message 2".to_string(),
1749 is_primary: true,
1750 ..Default::default()
1751 }
1752 }
1753 ]
1754 );
1755 });
1756 }
1757
1758 #[gpui::test]
1759 async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1760 cx_a.foreground().forbid_parking();
1761
1762 // Connect to a server as 2 clients.
1763 let mut server = TestServer::start().await;
1764 let client_a = server.create_client(&mut cx_a, "user_a").await;
1765 let client_b = server.create_client(&mut cx_b, "user_b").await;
1766
1767 // Create an org that includes these 2 users.
1768 let db = &server.app_state.db;
1769 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1770 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
1771 .await
1772 .unwrap();
1773 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
1774 .await
1775 .unwrap();
1776
1777 // Create a channel that includes all the users.
1778 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1779 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
1780 .await
1781 .unwrap();
1782 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
1783 .await
1784 .unwrap();
1785 db.create_channel_message(
1786 channel_id,
1787 client_b.current_user_id(&cx_b),
1788 "hello A, it's B.",
1789 OffsetDateTime::now_utc(),
1790 1,
1791 )
1792 .await
1793 .unwrap();
1794
1795 let channels_a = cx_a
1796 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
1797 channels_a
1798 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
1799 .await;
1800 channels_a.read_with(&cx_a, |list, _| {
1801 assert_eq!(
1802 list.available_channels().unwrap(),
1803 &[ChannelDetails {
1804 id: channel_id.to_proto(),
1805 name: "test-channel".to_string()
1806 }]
1807 )
1808 });
1809 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
1810 this.get_channel(channel_id.to_proto(), cx).unwrap()
1811 });
1812 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
1813 channel_a
1814 .condition(&cx_a, |channel, _| {
1815 channel_messages(channel)
1816 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1817 })
1818 .await;
1819
1820 let channels_b = cx_b
1821 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
1822 channels_b
1823 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
1824 .await;
1825 channels_b.read_with(&cx_b, |list, _| {
1826 assert_eq!(
1827 list.available_channels().unwrap(),
1828 &[ChannelDetails {
1829 id: channel_id.to_proto(),
1830 name: "test-channel".to_string()
1831 }]
1832 )
1833 });
1834
1835 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
1836 this.get_channel(channel_id.to_proto(), cx).unwrap()
1837 });
1838 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
1839 channel_b
1840 .condition(&cx_b, |channel, _| {
1841 channel_messages(channel)
1842 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1843 })
1844 .await;
1845
1846 channel_a
1847 .update(&mut cx_a, |channel, cx| {
1848 channel
1849 .send_message("oh, hi B.".to_string(), cx)
1850 .unwrap()
1851 .detach();
1852 let task = channel.send_message("sup".to_string(), cx).unwrap();
1853 assert_eq!(
1854 channel_messages(channel),
1855 &[
1856 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
1857 ("user_a".to_string(), "oh, hi B.".to_string(), true),
1858 ("user_a".to_string(), "sup".to_string(), true)
1859 ]
1860 );
1861 task
1862 })
1863 .await
1864 .unwrap();
1865
1866 channel_b
1867 .condition(&cx_b, |channel, _| {
1868 channel_messages(channel)
1869 == [
1870 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
1871 ("user_a".to_string(), "oh, hi B.".to_string(), false),
1872 ("user_a".to_string(), "sup".to_string(), false),
1873 ]
1874 })
1875 .await;
1876
1877 assert_eq!(
1878 server
1879 .state()
1880 .await
1881 .channel(channel_id)
1882 .unwrap()
1883 .connection_ids
1884 .len(),
1885 2
1886 );
1887 cx_b.update(|_| drop(channel_b));
1888 server
1889 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
1890 .await;
1891
1892 cx_a.update(|_| drop(channel_a));
1893 server
1894 .condition(|state| state.channel(channel_id).is_none())
1895 .await;
1896 }
1897
1898 #[gpui::test]
1899 async fn test_chat_message_validation(mut cx_a: TestAppContext) {
1900 cx_a.foreground().forbid_parking();
1901
1902 let mut server = TestServer::start().await;
1903 let client_a = server.create_client(&mut cx_a, "user_a").await;
1904
1905 let db = &server.app_state.db;
1906 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1907 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1908 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
1909 .await
1910 .unwrap();
1911 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
1912 .await
1913 .unwrap();
1914
1915 let channels_a = cx_a
1916 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
1917 channels_a
1918 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
1919 .await;
1920 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
1921 this.get_channel(channel_id.to_proto(), cx).unwrap()
1922 });
1923
1924 // Messages aren't allowed to be too long.
1925 channel_a
1926 .update(&mut cx_a, |channel, cx| {
1927 let long_body = "this is long.\n".repeat(1024);
1928 channel.send_message(long_body, cx).unwrap()
1929 })
1930 .await
1931 .unwrap_err();
1932
1933 // Messages aren't allowed to be blank.
1934 channel_a.update(&mut cx_a, |channel, cx| {
1935 channel.send_message(String::new(), cx).unwrap_err()
1936 });
1937
1938 // Leading and trailing whitespace are trimmed.
1939 channel_a
1940 .update(&mut cx_a, |channel, cx| {
1941 channel
1942 .send_message("\n surrounded by whitespace \n".to_string(), cx)
1943 .unwrap()
1944 })
1945 .await
1946 .unwrap();
1947 assert_eq!(
1948 db.get_channel_messages(channel_id, 10, None)
1949 .await
1950 .unwrap()
1951 .iter()
1952 .map(|m| &m.body)
1953 .collect::<Vec<_>>(),
1954 &["surrounded by whitespace"]
1955 );
1956 }
1957
1958 #[gpui::test]
1959 async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1960 cx_a.foreground().forbid_parking();
1961
1962 // Connect to a server as 2 clients.
1963 let mut server = TestServer::start().await;
1964 let client_a = server.create_client(&mut cx_a, "user_a").await;
1965 let client_b = server.create_client(&mut cx_b, "user_b").await;
1966 let mut status_b = client_b.status();
1967
1968 // Create an org that includes these 2 users.
1969 let db = &server.app_state.db;
1970 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1971 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
1972 .await
1973 .unwrap();
1974 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
1975 .await
1976 .unwrap();
1977
1978 // Create a channel that includes all the users.
1979 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1980 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
1981 .await
1982 .unwrap();
1983 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
1984 .await
1985 .unwrap();
1986 db.create_channel_message(
1987 channel_id,
1988 client_b.current_user_id(&cx_b),
1989 "hello A, it's B.",
1990 OffsetDateTime::now_utc(),
1991 2,
1992 )
1993 .await
1994 .unwrap();
1995
1996 let channels_a = cx_a
1997 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
1998 channels_a
1999 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2000 .await;
2001
2002 channels_a.read_with(&cx_a, |list, _| {
2003 assert_eq!(
2004 list.available_channels().unwrap(),
2005 &[ChannelDetails {
2006 id: channel_id.to_proto(),
2007 name: "test-channel".to_string()
2008 }]
2009 )
2010 });
2011 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2012 this.get_channel(channel_id.to_proto(), cx).unwrap()
2013 });
2014 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2015 channel_a
2016 .condition(&cx_a, |channel, _| {
2017 channel_messages(channel)
2018 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2019 })
2020 .await;
2021
2022 let channels_b = cx_b
2023 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2024 channels_b
2025 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2026 .await;
2027 channels_b.read_with(&cx_b, |list, _| {
2028 assert_eq!(
2029 list.available_channels().unwrap(),
2030 &[ChannelDetails {
2031 id: channel_id.to_proto(),
2032 name: "test-channel".to_string()
2033 }]
2034 )
2035 });
2036
2037 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2038 this.get_channel(channel_id.to_proto(), cx).unwrap()
2039 });
2040 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2041 channel_b
2042 .condition(&cx_b, |channel, _| {
2043 channel_messages(channel)
2044 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2045 })
2046 .await;
2047
2048 // Disconnect client B, ensuring we can still access its cached channel data.
2049 server.forbid_connections();
2050 server.disconnect_client(client_b.current_user_id(&cx_b));
2051 while !matches!(
2052 status_b.recv().await,
2053 Some(client::Status::ReconnectionError { .. })
2054 ) {}
2055
2056 channels_b.read_with(&cx_b, |channels, _| {
2057 assert_eq!(
2058 channels.available_channels().unwrap(),
2059 [ChannelDetails {
2060 id: channel_id.to_proto(),
2061 name: "test-channel".to_string()
2062 }]
2063 )
2064 });
2065 channel_b.read_with(&cx_b, |channel, _| {
2066 assert_eq!(
2067 channel_messages(channel),
2068 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2069 )
2070 });
2071
2072 // Send a message from client B while it is disconnected.
2073 channel_b
2074 .update(&mut cx_b, |channel, cx| {
2075 let task = channel
2076 .send_message("can you see this?".to_string(), cx)
2077 .unwrap();
2078 assert_eq!(
2079 channel_messages(channel),
2080 &[
2081 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2082 ("user_b".to_string(), "can you see this?".to_string(), true)
2083 ]
2084 );
2085 task
2086 })
2087 .await
2088 .unwrap_err();
2089
2090 // Send a message from client A while B is disconnected.
2091 channel_a
2092 .update(&mut cx_a, |channel, cx| {
2093 channel
2094 .send_message("oh, hi B.".to_string(), cx)
2095 .unwrap()
2096 .detach();
2097 let task = channel.send_message("sup".to_string(), cx).unwrap();
2098 assert_eq!(
2099 channel_messages(channel),
2100 &[
2101 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2102 ("user_a".to_string(), "oh, hi B.".to_string(), true),
2103 ("user_a".to_string(), "sup".to_string(), true)
2104 ]
2105 );
2106 task
2107 })
2108 .await
2109 .unwrap();
2110
2111 // Give client B a chance to reconnect.
2112 server.allow_connections();
2113 cx_b.foreground().advance_clock(Duration::from_secs(10));
2114
2115 // Verify that B sees the new messages upon reconnection, as well as the message client B
2116 // sent while offline.
2117 channel_b
2118 .condition(&cx_b, |channel, _| {
2119 channel_messages(channel)
2120 == [
2121 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2122 ("user_a".to_string(), "oh, hi B.".to_string(), false),
2123 ("user_a".to_string(), "sup".to_string(), false),
2124 ("user_b".to_string(), "can you see this?".to_string(), false),
2125 ]
2126 })
2127 .await;
2128
2129 // Ensure client A and B can communicate normally after reconnection.
2130 channel_a
2131 .update(&mut cx_a, |channel, cx| {
2132 channel.send_message("you online?".to_string(), cx).unwrap()
2133 })
2134 .await
2135 .unwrap();
2136 channel_b
2137 .condition(&cx_b, |channel, _| {
2138 channel_messages(channel)
2139 == [
2140 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2141 ("user_a".to_string(), "oh, hi B.".to_string(), false),
2142 ("user_a".to_string(), "sup".to_string(), false),
2143 ("user_b".to_string(), "can you see this?".to_string(), false),
2144 ("user_a".to_string(), "you online?".to_string(), false),
2145 ]
2146 })
2147 .await;
2148
2149 channel_b
2150 .update(&mut cx_b, |channel, cx| {
2151 channel.send_message("yep".to_string(), cx).unwrap()
2152 })
2153 .await
2154 .unwrap();
2155 channel_a
2156 .condition(&cx_a, |channel, _| {
2157 channel_messages(channel)
2158 == [
2159 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2160 ("user_a".to_string(), "oh, hi B.".to_string(), false),
2161 ("user_a".to_string(), "sup".to_string(), false),
2162 ("user_b".to_string(), "can you see this?".to_string(), false),
2163 ("user_a".to_string(), "you online?".to_string(), false),
2164 ("user_b".to_string(), "yep".to_string(), false),
2165 ]
2166 })
2167 .await;
2168 }
2169
2170 #[gpui::test]
2171 async fn test_contacts(
2172 mut cx_a: TestAppContext,
2173 mut cx_b: TestAppContext,
2174 mut cx_c: TestAppContext,
2175 ) {
2176 cx_a.foreground().forbid_parking();
2177 let lang_registry = Arc::new(LanguageRegistry::new());
2178
2179 // Connect to a server as 3 clients.
2180 let mut server = TestServer::start().await;
2181 let client_a = server.create_client(&mut cx_a, "user_a").await;
2182 let client_b = server.create_client(&mut cx_b, "user_b").await;
2183 let client_c = server.create_client(&mut cx_c, "user_c").await;
2184
2185 let fs = Arc::new(FakeFs::new());
2186
2187 // Share a worktree as client A.
2188 fs.insert_tree(
2189 "/a",
2190 json!({
2191 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2192 }),
2193 )
2194 .await;
2195
2196 let worktree_a = Worktree::open_local(
2197 client_a.clone(),
2198 client_a.user_store.clone(),
2199 "/a".as_ref(),
2200 fs.clone(),
2201 lang_registry.clone(),
2202 &mut cx_a.to_async(),
2203 )
2204 .await
2205 .unwrap();
2206
2207 client_a
2208 .user_store
2209 .condition(&cx_a, |user_store, _| {
2210 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2211 })
2212 .await;
2213 client_b
2214 .user_store
2215 .condition(&cx_b, |user_store, _| {
2216 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2217 })
2218 .await;
2219 client_c
2220 .user_store
2221 .condition(&cx_c, |user_store, _| {
2222 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2223 })
2224 .await;
2225
2226 let worktree_id = worktree_a
2227 .update(&mut cx_a, |tree, cx| tree.as_local_mut().unwrap().share(cx))
2228 .await
2229 .unwrap();
2230
2231 let _worktree_b = Worktree::open_remote(
2232 client_b.clone(),
2233 worktree_id,
2234 lang_registry.clone(),
2235 client_b.user_store.clone(),
2236 &mut cx_b.to_async(),
2237 )
2238 .await
2239 .unwrap();
2240
2241 client_a
2242 .user_store
2243 .condition(&cx_a, |user_store, _| {
2244 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2245 })
2246 .await;
2247 client_b
2248 .user_store
2249 .condition(&cx_b, |user_store, _| {
2250 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2251 })
2252 .await;
2253 client_c
2254 .user_store
2255 .condition(&cx_c, |user_store, _| {
2256 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2257 })
2258 .await;
2259
2260 worktree_a
2261 .condition(&cx_a, |worktree, _| {
2262 worktree.collaborators().contains_key(&client_b.peer_id)
2263 })
2264 .await;
2265
2266 cx_a.update(move |_| drop(worktree_a));
2267 client_a
2268 .user_store
2269 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
2270 .await;
2271 client_b
2272 .user_store
2273 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
2274 .await;
2275 client_c
2276 .user_store
2277 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
2278 .await;
2279
2280 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
2281 user_store
2282 .contacts()
2283 .iter()
2284 .map(|contact| {
2285 let worktrees = contact
2286 .worktrees
2287 .iter()
2288 .map(|w| {
2289 (
2290 w.root_name.as_str(),
2291 w.guests.iter().map(|p| p.github_login.as_str()).collect(),
2292 )
2293 })
2294 .collect();
2295 (contact.user.github_login.as_str(), worktrees)
2296 })
2297 .collect()
2298 }
2299 }
2300
2301 struct TestServer {
2302 peer: Arc<Peer>,
2303 app_state: Arc<AppState>,
2304 server: Arc<Server>,
2305 notifications: mpsc::Receiver<()>,
2306 connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
2307 forbid_connections: Arc<AtomicBool>,
2308 _test_db: TestDb,
2309 }
2310
2311 impl TestServer {
2312 async fn start() -> Self {
2313 let test_db = TestDb::new();
2314 let app_state = Self::build_app_state(&test_db).await;
2315 let peer = Peer::new();
2316 let notifications = mpsc::channel(128);
2317 let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
2318 Self {
2319 peer,
2320 app_state,
2321 server,
2322 notifications: notifications.1,
2323 connection_killers: Default::default(),
2324 forbid_connections: Default::default(),
2325 _test_db: test_db,
2326 }
2327 }
2328
2329 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
2330 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
2331 let client_name = name.to_string();
2332 let mut client = Client::new();
2333 let server = self.server.clone();
2334 let connection_killers = self.connection_killers.clone();
2335 let forbid_connections = self.forbid_connections.clone();
2336 let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
2337
2338 Arc::get_mut(&mut client)
2339 .unwrap()
2340 .override_authenticate(move |cx| {
2341 cx.spawn(|_| async move {
2342 let access_token = "the-token".to_string();
2343 Ok(Credentials {
2344 user_id: user_id.0 as u64,
2345 access_token,
2346 })
2347 })
2348 })
2349 .override_establish_connection(move |credentials, cx| {
2350 assert_eq!(credentials.user_id, user_id.0 as u64);
2351 assert_eq!(credentials.access_token, "the-token");
2352
2353 let server = server.clone();
2354 let connection_killers = connection_killers.clone();
2355 let forbid_connections = forbid_connections.clone();
2356 let client_name = client_name.clone();
2357 let connection_id_tx = connection_id_tx.clone();
2358 cx.spawn(move |cx| async move {
2359 if forbid_connections.load(SeqCst) {
2360 Err(EstablishConnectionError::other(anyhow!(
2361 "server is forbidding connections"
2362 )))
2363 } else {
2364 let (client_conn, server_conn, kill_conn) = Connection::in_memory();
2365 connection_killers.lock().insert(user_id, kill_conn);
2366 cx.background()
2367 .spawn(server.handle_connection(
2368 server_conn,
2369 client_name,
2370 user_id,
2371 Some(connection_id_tx),
2372 ))
2373 .detach();
2374 Ok(client_conn)
2375 }
2376 })
2377 });
2378
2379 let http = FakeHttpClient::new(|_| async move { Ok(surf::http::Response::new(404)) });
2380 client
2381 .authenticate_and_connect(&cx.to_async())
2382 .await
2383 .unwrap();
2384
2385 let peer_id = PeerId(connection_id_rx.recv().await.unwrap().0);
2386 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
2387 let mut authed_user =
2388 user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
2389 while authed_user.recv().await.unwrap().is_none() {}
2390
2391 TestClient {
2392 client,
2393 peer_id,
2394 user_store,
2395 }
2396 }
2397
2398 fn disconnect_client(&self, user_id: UserId) {
2399 if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
2400 let _ = kill_conn.try_send(Some(()));
2401 }
2402 }
2403
2404 fn forbid_connections(&self) {
2405 self.forbid_connections.store(true, SeqCst);
2406 }
2407
2408 fn allow_connections(&self) {
2409 self.forbid_connections.store(false, SeqCst);
2410 }
2411
2412 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
2413 let mut config = Config::default();
2414 config.session_secret = "a".repeat(32);
2415 config.database_url = test_db.url.clone();
2416 let github_client = github::AppClient::test();
2417 Arc::new(AppState {
2418 db: test_db.db().clone(),
2419 handlebars: Default::default(),
2420 auth_client: auth::build_client("", ""),
2421 repo_client: github::RepoClient::test(&github_client),
2422 github_client,
2423 config,
2424 })
2425 }
2426
2427 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
2428 self.server.store.read()
2429 }
2430
2431 async fn condition<F>(&mut self, mut predicate: F)
2432 where
2433 F: FnMut(&Store) -> bool,
2434 {
2435 async_std::future::timeout(Duration::from_millis(500), async {
2436 while !(predicate)(&*self.server.store.read()) {
2437 self.notifications.recv().await;
2438 }
2439 })
2440 .await
2441 .expect("condition timed out");
2442 }
2443 }
2444
2445 impl Drop for TestServer {
2446 fn drop(&mut self) {
2447 task::block_on(self.peer.reset());
2448 }
2449 }
2450
2451 struct TestClient {
2452 client: Arc<Client>,
2453 pub peer_id: PeerId,
2454 pub user_store: ModelHandle<UserStore>,
2455 }
2456
2457 impl Deref for TestClient {
2458 type Target = Arc<Client>;
2459
2460 fn deref(&self) -> &Self::Target {
2461 &self.client
2462 }
2463 }
2464
2465 impl TestClient {
2466 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
2467 UserId::from_proto(
2468 self.user_store
2469 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
2470 )
2471 }
2472 }
2473
2474 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
2475 channel
2476 .messages()
2477 .cursor::<()>()
2478 .map(|m| {
2479 (
2480 m.sender.github_login.clone(),
2481 m.body.clone(),
2482 m.is_pending(),
2483 )
2484 })
2485 .collect()
2486 }
2487
2488 struct EmptyView;
2489
2490 impl gpui::Entity for EmptyView {
2491 type Event = ();
2492 }
2493
2494 impl gpui::View for EmptyView {
2495 fn ui_name() -> &'static str {
2496 "empty view"
2497 }
2498
2499 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
2500 gpui::Element::boxed(gpui::elements::Empty)
2501 }
2502 }
2503}