1mod store;
2
3use super::{
4 auth::process_auth_header,
5 db::{ChannelId, MessageId, UserId},
6 AppState,
7};
8use anyhow::anyhow;
9use async_std::task;
10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
11use collections::{HashMap, HashSet};
12use futures::{future::BoxFuture, FutureExt};
13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
14use postage::{mpsc, prelude::Sink as _, prelude::Stream as _};
15use rpc::{
16 proto::{self, AnyTypedEnvelope, EnvelopedMessage},
17 Connection, ConnectionId, Peer, TypedEnvelope,
18};
19use sha1::{Digest as _, Sha1};
20use std::{any::TypeId, future::Future, mem, sync::Arc, time::Instant};
21use store::{Store, Worktree};
22use surf::StatusCode;
23use tide::log;
24use tide::{
25 http::headers::{HeaderName, CONNECTION, UPGRADE},
26 Request, Response,
27};
28use time::OffsetDateTime;
29
30type MessageHandler = Box<
31 dyn Send
32 + Sync
33 + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
34>;
35
36pub struct Server {
37 peer: Arc<Peer>,
38 store: RwLock<Store>,
39 app_state: Arc<AppState>,
40 handlers: HashMap<TypeId, MessageHandler>,
41 notifications: Option<mpsc::Sender<()>>,
42}
43
44const MESSAGE_COUNT_PER_PAGE: usize = 100;
45const MAX_MESSAGE_LEN: usize = 1024;
46const NO_SUCH_PROJECT: &'static str = "no such project";
47
48impl Server {
49 pub fn new(
50 app_state: Arc<AppState>,
51 peer: Arc<Peer>,
52 notifications: Option<mpsc::Sender<()>>,
53 ) -> Arc<Self> {
54 let mut server = Self {
55 peer,
56 app_state,
57 store: Default::default(),
58 handlers: Default::default(),
59 notifications,
60 };
61
62 server
63 .add_handler(Server::ping)
64 .add_handler(Server::register_project)
65 .add_handler(Server::unregister_project)
66 .add_handler(Server::share_project)
67 .add_handler(Server::unshare_project)
68 .add_handler(Server::join_project)
69 .add_handler(Server::leave_project)
70 .add_handler(Server::register_worktree)
71 .add_handler(Server::unregister_worktree)
72 .add_handler(Server::share_worktree)
73 .add_handler(Server::update_worktree)
74 .add_handler(Server::open_buffer)
75 .add_handler(Server::close_buffer)
76 .add_handler(Server::update_buffer)
77 .add_handler(Server::buffer_saved)
78 .add_handler(Server::save_buffer)
79 .add_handler(Server::get_channels)
80 .add_handler(Server::get_users)
81 .add_handler(Server::join_channel)
82 .add_handler(Server::leave_channel)
83 .add_handler(Server::send_channel_message)
84 .add_handler(Server::get_channel_messages);
85
86 Arc::new(server)
87 }
88
89 fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
90 where
91 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
92 Fut: 'static + Send + Future<Output = tide::Result<()>>,
93 M: EnvelopedMessage,
94 {
95 let prev_handler = self.handlers.insert(
96 TypeId::of::<M>(),
97 Box::new(move |server, envelope| {
98 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
99 (handler)(server, *envelope).boxed()
100 }),
101 );
102 if prev_handler.is_some() {
103 panic!("registered a handler for the same message twice");
104 }
105 self
106 }
107
108 pub fn handle_connection(
109 self: &Arc<Self>,
110 connection: Connection,
111 addr: String,
112 user_id: UserId,
113 mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
114 ) -> impl Future<Output = ()> {
115 let mut this = self.clone();
116 async move {
117 let (connection_id, handle_io, mut incoming_rx) =
118 this.peer.add_connection(connection).await;
119
120 if let Some(send_connection_id) = send_connection_id.as_mut() {
121 let _ = send_connection_id.send(connection_id).await;
122 }
123
124 this.state_mut().add_connection(connection_id, user_id);
125 if let Err(err) = this.update_contacts_for_users(&[user_id]).await {
126 log::error!("error updating contacts for {:?}: {}", user_id, err);
127 }
128
129 let handle_io = handle_io.fuse();
130 futures::pin_mut!(handle_io);
131 loop {
132 let next_message = incoming_rx.recv().fuse();
133 futures::pin_mut!(next_message);
134 futures::select_biased! {
135 message = next_message => {
136 if let Some(message) = message {
137 let start_time = Instant::now();
138 log::info!("RPC message received: {}", message.payload_type_name());
139 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
140 if let Err(err) = (handler)(this.clone(), message).await {
141 log::error!("error handling message: {:?}", err);
142 } else {
143 log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
144 }
145
146 if let Some(mut notifications) = this.notifications.clone() {
147 let _ = notifications.send(()).await;
148 }
149 } else {
150 log::warn!("unhandled message: {}", message.payload_type_name());
151 }
152 } else {
153 log::info!("rpc connection closed {:?}", addr);
154 break;
155 }
156 }
157 handle_io = handle_io => {
158 if let Err(err) = handle_io {
159 log::error!("error handling rpc connection {:?} - {:?}", addr, err);
160 }
161 break;
162 }
163 }
164 }
165
166 if let Err(err) = this.sign_out(connection_id).await {
167 log::error!("error signing out connection {:?} - {:?}", addr, err);
168 }
169 }
170 }
171
172 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
173 self.peer.disconnect(connection_id).await;
174 let removed_connection = self.state_mut().remove_connection(connection_id)?;
175
176 for (project_id, project) in removed_connection.hosted_projects {
177 if let Some(share) = project.share {
178 broadcast(
179 connection_id,
180 share.guests.keys().copied().collect(),
181 |conn_id| {
182 self.peer
183 .send(conn_id, proto::UnshareProject { project_id })
184 },
185 )
186 .await?;
187 }
188 }
189
190 for (project_id, peer_ids) in removed_connection.guest_project_ids {
191 broadcast(connection_id, peer_ids, |conn_id| {
192 self.peer.send(
193 conn_id,
194 proto::RemoveProjectCollaborator {
195 project_id,
196 peer_id: connection_id.0,
197 },
198 )
199 })
200 .await?;
201 }
202
203 self.update_contacts_for_users(removed_connection.contact_ids.iter())
204 .await?;
205
206 Ok(())
207 }
208
209 async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
210 self.peer.respond(request.receipt(), proto::Ack {}).await?;
211 Ok(())
212 }
213
214 async fn register_project(
215 mut self: Arc<Server>,
216 request: TypedEnvelope<proto::RegisterProject>,
217 ) -> tide::Result<()> {
218 let project_id = {
219 let mut state = self.state_mut();
220 let user_id = state.user_id_for_connection(request.sender_id)?;
221 state.register_project(request.sender_id, user_id)
222 };
223 self.peer
224 .respond(
225 request.receipt(),
226 proto::RegisterProjectResponse { project_id },
227 )
228 .await?;
229 Ok(())
230 }
231
232 async fn unregister_project(
233 mut self: Arc<Server>,
234 request: TypedEnvelope<proto::UnregisterProject>,
235 ) -> tide::Result<()> {
236 self.state_mut()
237 .unregister_project(request.payload.project_id, request.sender_id);
238 Ok(())
239 }
240
241 async fn share_project(
242 mut self: Arc<Server>,
243 request: TypedEnvelope<proto::ShareProject>,
244 ) -> tide::Result<()> {
245 self.state_mut()
246 .share_project(request.payload.project_id, request.sender_id);
247 Ok(())
248 }
249
250 async fn unshare_project(
251 mut self: Arc<Server>,
252 request: TypedEnvelope<proto::UnshareProject>,
253 ) -> tide::Result<()> {
254 let project_id = request.payload.project_id;
255 let project = self
256 .state_mut()
257 .unshare_project(project_id, request.sender_id)?;
258
259 broadcast(request.sender_id, project.connection_ids, |conn_id| {
260 self.peer
261 .send(conn_id, proto::UnshareProject { project_id })
262 })
263 .await?;
264 self.update_contacts_for_users(&project.authorized_user_ids)
265 .await?;
266
267 Ok(())
268 }
269
270 async fn join_project(
271 mut self: Arc<Server>,
272 request: TypedEnvelope<proto::JoinProject>,
273 ) -> tide::Result<()> {
274 let project_id = request.payload.project_id;
275
276 let user_id = self.state().user_id_for_connection(request.sender_id)?;
277 let response_data = self
278 .state_mut()
279 .join_project(request.sender_id, user_id, project_id)
280 .and_then(|joined| {
281 let share = joined.project.share()?;
282 let peer_count = share.guests.len();
283 let mut collaborators = Vec::with_capacity(peer_count);
284 collaborators.push(proto::Collaborator {
285 peer_id: joined.project.host_connection_id.0,
286 replica_id: 0,
287 user_id: joined.project.host_user_id.to_proto(),
288 });
289 let worktrees = joined
290 .project
291 .worktrees
292 .iter()
293 .filter_map(|(id, worktree)| {
294 worktree.share.as_ref().map(|share| proto::Worktree {
295 id: *id,
296 root_name: worktree.root_name.clone(),
297 entries: share.entries.values().cloned().collect(),
298 })
299 })
300 .collect();
301 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
302 if *peer_conn_id != request.sender_id {
303 collaborators.push(proto::Collaborator {
304 peer_id: peer_conn_id.0,
305 replica_id: *peer_replica_id as u32,
306 user_id: peer_user_id.to_proto(),
307 });
308 }
309 }
310 let response = proto::JoinProjectResponse {
311 worktrees,
312 replica_id: joined.replica_id as u32,
313 collaborators,
314 };
315 let connection_ids = joined.project.connection_ids();
316 let contact_user_ids = joined.project.authorized_user_ids();
317 Ok((response, connection_ids, contact_user_ids))
318 });
319
320 match response_data {
321 Ok((response, connection_ids, contact_user_ids)) => {
322 broadcast(request.sender_id, connection_ids, |conn_id| {
323 self.peer.send(
324 conn_id,
325 proto::AddProjectCollaborator {
326 project_id: project_id,
327 collaborator: Some(proto::Collaborator {
328 peer_id: request.sender_id.0,
329 replica_id: response.replica_id,
330 user_id: user_id.to_proto(),
331 }),
332 },
333 )
334 })
335 .await?;
336 self.peer.respond(request.receipt(), response).await?;
337 self.update_contacts_for_users(&contact_user_ids).await?;
338 }
339 Err(error) => {
340 self.peer
341 .respond_with_error(
342 request.receipt(),
343 proto::Error {
344 message: error.to_string(),
345 },
346 )
347 .await?;
348 }
349 }
350
351 Ok(())
352 }
353
354 async fn leave_project(
355 mut self: Arc<Server>,
356 request: TypedEnvelope<proto::LeaveProject>,
357 ) -> tide::Result<()> {
358 let sender_id = request.sender_id;
359 let project_id = request.payload.project_id;
360 let worktree = self.state_mut().leave_project(sender_id, project_id);
361 if let Some(worktree) = worktree {
362 broadcast(sender_id, worktree.connection_ids, |conn_id| {
363 self.peer.send(
364 conn_id,
365 proto::RemoveProjectCollaborator {
366 project_id,
367 peer_id: sender_id.0,
368 },
369 )
370 })
371 .await?;
372 self.update_contacts_for_users(&worktree.authorized_user_ids)
373 .await?;
374 }
375 Ok(())
376 }
377
378 async fn register_worktree(
379 mut self: Arc<Server>,
380 request: TypedEnvelope<proto::RegisterWorktree>,
381 ) -> tide::Result<()> {
382 let receipt = request.receipt();
383 let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
384
385 let mut contact_user_ids = HashSet::default();
386 contact_user_ids.insert(host_user_id);
387 for github_login in request.payload.authorized_logins {
388 match self.app_state.db.create_user(&github_login, false).await {
389 Ok(contact_user_id) => {
390 contact_user_ids.insert(contact_user_id);
391 }
392 Err(err) => {
393 let message = err.to_string();
394 self.peer
395 .respond_with_error(receipt, proto::Error { message })
396 .await?;
397 return Ok(());
398 }
399 }
400 }
401
402 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
403 let ok = self.state_mut().register_worktree(
404 request.payload.project_id,
405 request.payload.worktree_id,
406 Worktree {
407 authorized_user_ids: contact_user_ids.clone(),
408 root_name: request.payload.root_name,
409 share: None,
410 },
411 );
412
413 if ok {
414 self.peer.respond(receipt, proto::Ack {}).await?;
415 self.update_contacts_for_users(&contact_user_ids).await?;
416 } else {
417 self.peer
418 .respond_with_error(
419 receipt,
420 proto::Error {
421 message: NO_SUCH_PROJECT.to_string(),
422 },
423 )
424 .await?;
425 }
426
427 Ok(())
428 }
429
430 async fn unregister_worktree(
431 mut self: Arc<Server>,
432 request: TypedEnvelope<proto::UnregisterWorktree>,
433 ) -> tide::Result<()> {
434 let project_id = request.payload.project_id;
435 let worktree_id = request.payload.worktree_id;
436 let (worktree, guest_connection_ids) =
437 self.state_mut()
438 .unregister_worktree(project_id, worktree_id, request.sender_id)?;
439
440 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
441 self.peer.send(
442 conn_id,
443 proto::UnregisterWorktree {
444 project_id,
445 worktree_id,
446 },
447 )
448 })
449 .await?;
450 self.update_contacts_for_users(&worktree.authorized_user_ids)
451 .await?;
452 Ok(())
453 }
454
455 async fn share_worktree(
456 mut self: Arc<Server>,
457 mut request: TypedEnvelope<proto::ShareWorktree>,
458 ) -> tide::Result<()> {
459 let worktree = request
460 .payload
461 .worktree
462 .as_mut()
463 .ok_or_else(|| anyhow!("missing worktree"))?;
464 let entries = mem::take(&mut worktree.entries)
465 .into_iter()
466 .map(|entry| (entry.id, entry))
467 .collect();
468
469 let contact_user_ids = self.state_mut().share_worktree(
470 request.payload.project_id,
471 worktree.id,
472 request.sender_id,
473 entries,
474 );
475 if let Some(contact_user_ids) = contact_user_ids {
476 self.peer.respond(request.receipt(), proto::Ack {}).await?;
477 self.update_contacts_for_users(&contact_user_ids).await?;
478 } else {
479 self.peer
480 .respond_with_error(
481 request.receipt(),
482 proto::Error {
483 message: "no such worktree".to_string(),
484 },
485 )
486 .await?;
487 }
488 Ok(())
489 }
490
491 async fn update_worktree(
492 mut self: Arc<Server>,
493 request: TypedEnvelope<proto::UpdateWorktree>,
494 ) -> tide::Result<()> {
495 let connection_ids = self
496 .state_mut()
497 .update_worktree(
498 request.sender_id,
499 request.payload.project_id,
500 request.payload.worktree_id,
501 &request.payload.removed_entries,
502 &request.payload.updated_entries,
503 )
504 .ok_or_else(|| anyhow!("no such worktree"))?;
505
506 broadcast(request.sender_id, connection_ids, |connection_id| {
507 self.peer
508 .forward_send(request.sender_id, connection_id, request.payload.clone())
509 })
510 .await?;
511
512 Ok(())
513 }
514
515 async fn open_buffer(
516 self: Arc<Server>,
517 request: TypedEnvelope<proto::OpenBuffer>,
518 ) -> tide::Result<()> {
519 let receipt = request.receipt();
520 let host_connection_id = self
521 .state()
522 .read_project(request.payload.project_id, request.sender_id)
523 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
524 .host_connection_id;
525 let response = self
526 .peer
527 .forward_request(request.sender_id, host_connection_id, request.payload)
528 .await?;
529 self.peer.respond(receipt, response).await?;
530 Ok(())
531 }
532
533 async fn close_buffer(
534 self: Arc<Server>,
535 request: TypedEnvelope<proto::CloseBuffer>,
536 ) -> tide::Result<()> {
537 let host_connection_id = self
538 .state()
539 .read_project(request.payload.project_id, request.sender_id)
540 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
541 .host_connection_id;
542 self.peer
543 .forward_send(request.sender_id, host_connection_id, request.payload)
544 .await?;
545 Ok(())
546 }
547
548 async fn save_buffer(
549 self: Arc<Server>,
550 request: TypedEnvelope<proto::SaveBuffer>,
551 ) -> tide::Result<()> {
552 let host;
553 let guests;
554 {
555 let state = self.state();
556 let project = state
557 .read_project(request.payload.project_id, request.sender_id)
558 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
559 host = project.host_connection_id;
560 guests = project.guest_connection_ids()
561 }
562
563 let sender = request.sender_id;
564 let receipt = request.receipt();
565 let response = self
566 .peer
567 .forward_request(sender, host, request.payload.clone())
568 .await?;
569
570 broadcast(host, guests, |conn_id| {
571 let response = response.clone();
572 let peer = &self.peer;
573 async move {
574 if conn_id == sender {
575 peer.respond(receipt, response).await
576 } else {
577 peer.forward_send(host, conn_id, response).await
578 }
579 }
580 })
581 .await?;
582
583 Ok(())
584 }
585
586 async fn update_buffer(
587 self: Arc<Server>,
588 request: TypedEnvelope<proto::UpdateBuffer>,
589 ) -> tide::Result<()> {
590 let receiver_ids = self
591 .state()
592 .project_connection_ids(request.payload.project_id, request.sender_id)
593 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
594 broadcast(request.sender_id, receiver_ids, |connection_id| {
595 self.peer
596 .forward_send(request.sender_id, connection_id, request.payload.clone())
597 })
598 .await?;
599 self.peer.respond(request.receipt(), proto::Ack {}).await?;
600 Ok(())
601 }
602
603 async fn buffer_saved(
604 self: Arc<Server>,
605 request: TypedEnvelope<proto::BufferSaved>,
606 ) -> tide::Result<()> {
607 let receiver_ids = self
608 .state()
609 .project_connection_ids(request.payload.project_id, request.sender_id)
610 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
611 broadcast(request.sender_id, receiver_ids, |connection_id| {
612 self.peer
613 .forward_send(request.sender_id, connection_id, request.payload.clone())
614 })
615 .await?;
616 Ok(())
617 }
618
619 async fn get_channels(
620 self: Arc<Server>,
621 request: TypedEnvelope<proto::GetChannels>,
622 ) -> tide::Result<()> {
623 let user_id = self.state().user_id_for_connection(request.sender_id)?;
624 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
625 self.peer
626 .respond(
627 request.receipt(),
628 proto::GetChannelsResponse {
629 channels: channels
630 .into_iter()
631 .map(|chan| proto::Channel {
632 id: chan.id.to_proto(),
633 name: chan.name,
634 })
635 .collect(),
636 },
637 )
638 .await?;
639 Ok(())
640 }
641
642 async fn get_users(
643 self: Arc<Server>,
644 request: TypedEnvelope<proto::GetUsers>,
645 ) -> tide::Result<()> {
646 let receipt = request.receipt();
647 let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
648 let users = self
649 .app_state
650 .db
651 .get_users_by_ids(user_ids)
652 .await?
653 .into_iter()
654 .map(|user| proto::User {
655 id: user.id.to_proto(),
656 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
657 github_login: user.github_login,
658 })
659 .collect();
660 self.peer
661 .respond(receipt, proto::GetUsersResponse { users })
662 .await?;
663 Ok(())
664 }
665
666 async fn update_contacts_for_users<'a>(
667 self: &Arc<Server>,
668 user_ids: impl IntoIterator<Item = &'a UserId>,
669 ) -> tide::Result<()> {
670 let mut send_futures = Vec::new();
671
672 {
673 let state = self.state();
674 for user_id in user_ids {
675 let contacts = state.contacts_for_user(*user_id);
676 for connection_id in state.connection_ids_for_user(*user_id) {
677 send_futures.push(self.peer.send(
678 connection_id,
679 proto::UpdateContacts {
680 contacts: contacts.clone(),
681 },
682 ));
683 }
684 }
685 }
686 futures::future::try_join_all(send_futures).await?;
687
688 Ok(())
689 }
690
691 async fn join_channel(
692 mut self: Arc<Self>,
693 request: TypedEnvelope<proto::JoinChannel>,
694 ) -> tide::Result<()> {
695 let user_id = self.state().user_id_for_connection(request.sender_id)?;
696 let channel_id = ChannelId::from_proto(request.payload.channel_id);
697 if !self
698 .app_state
699 .db
700 .can_user_access_channel(user_id, channel_id)
701 .await?
702 {
703 Err(anyhow!("access denied"))?;
704 }
705
706 self.state_mut().join_channel(request.sender_id, channel_id);
707 let messages = self
708 .app_state
709 .db
710 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
711 .await?
712 .into_iter()
713 .map(|msg| proto::ChannelMessage {
714 id: msg.id.to_proto(),
715 body: msg.body,
716 timestamp: msg.sent_at.unix_timestamp() as u64,
717 sender_id: msg.sender_id.to_proto(),
718 nonce: Some(msg.nonce.as_u128().into()),
719 })
720 .collect::<Vec<_>>();
721 self.peer
722 .respond(
723 request.receipt(),
724 proto::JoinChannelResponse {
725 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
726 messages,
727 },
728 )
729 .await?;
730 Ok(())
731 }
732
733 async fn leave_channel(
734 mut self: Arc<Self>,
735 request: TypedEnvelope<proto::LeaveChannel>,
736 ) -> tide::Result<()> {
737 let user_id = self.state().user_id_for_connection(request.sender_id)?;
738 let channel_id = ChannelId::from_proto(request.payload.channel_id);
739 if !self
740 .app_state
741 .db
742 .can_user_access_channel(user_id, channel_id)
743 .await?
744 {
745 Err(anyhow!("access denied"))?;
746 }
747
748 self.state_mut()
749 .leave_channel(request.sender_id, channel_id);
750
751 Ok(())
752 }
753
754 async fn send_channel_message(
755 self: Arc<Self>,
756 request: TypedEnvelope<proto::SendChannelMessage>,
757 ) -> tide::Result<()> {
758 let receipt = request.receipt();
759 let channel_id = ChannelId::from_proto(request.payload.channel_id);
760 let user_id;
761 let connection_ids;
762 {
763 let state = self.state();
764 user_id = state.user_id_for_connection(request.sender_id)?;
765 if let Some(ids) = state.channel_connection_ids(channel_id) {
766 connection_ids = ids;
767 } else {
768 return Ok(());
769 }
770 }
771
772 // Validate the message body.
773 let body = request.payload.body.trim().to_string();
774 if body.len() > MAX_MESSAGE_LEN {
775 self.peer
776 .respond_with_error(
777 receipt,
778 proto::Error {
779 message: "message is too long".to_string(),
780 },
781 )
782 .await?;
783 return Ok(());
784 }
785 if body.is_empty() {
786 self.peer
787 .respond_with_error(
788 receipt,
789 proto::Error {
790 message: "message can't be blank".to_string(),
791 },
792 )
793 .await?;
794 return Ok(());
795 }
796
797 let timestamp = OffsetDateTime::now_utc();
798 let nonce = if let Some(nonce) = request.payload.nonce {
799 nonce
800 } else {
801 self.peer
802 .respond_with_error(
803 receipt,
804 proto::Error {
805 message: "nonce can't be blank".to_string(),
806 },
807 )
808 .await?;
809 return Ok(());
810 };
811
812 let message_id = self
813 .app_state
814 .db
815 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
816 .await?
817 .to_proto();
818 let message = proto::ChannelMessage {
819 sender_id: user_id.to_proto(),
820 id: message_id,
821 body,
822 timestamp: timestamp.unix_timestamp() as u64,
823 nonce: Some(nonce),
824 };
825 broadcast(request.sender_id, connection_ids, |conn_id| {
826 self.peer.send(
827 conn_id,
828 proto::ChannelMessageSent {
829 channel_id: channel_id.to_proto(),
830 message: Some(message.clone()),
831 },
832 )
833 })
834 .await?;
835 self.peer
836 .respond(
837 receipt,
838 proto::SendChannelMessageResponse {
839 message: Some(message),
840 },
841 )
842 .await?;
843 Ok(())
844 }
845
846 async fn get_channel_messages(
847 self: Arc<Self>,
848 request: TypedEnvelope<proto::GetChannelMessages>,
849 ) -> tide::Result<()> {
850 let user_id = self.state().user_id_for_connection(request.sender_id)?;
851 let channel_id = ChannelId::from_proto(request.payload.channel_id);
852 if !self
853 .app_state
854 .db
855 .can_user_access_channel(user_id, channel_id)
856 .await?
857 {
858 Err(anyhow!("access denied"))?;
859 }
860
861 let messages = self
862 .app_state
863 .db
864 .get_channel_messages(
865 channel_id,
866 MESSAGE_COUNT_PER_PAGE,
867 Some(MessageId::from_proto(request.payload.before_message_id)),
868 )
869 .await?
870 .into_iter()
871 .map(|msg| proto::ChannelMessage {
872 id: msg.id.to_proto(),
873 body: msg.body,
874 timestamp: msg.sent_at.unix_timestamp() as u64,
875 sender_id: msg.sender_id.to_proto(),
876 nonce: Some(msg.nonce.as_u128().into()),
877 })
878 .collect::<Vec<_>>();
879 self.peer
880 .respond(
881 request.receipt(),
882 proto::GetChannelMessagesResponse {
883 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
884 messages,
885 },
886 )
887 .await?;
888 Ok(())
889 }
890
891 fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
892 self.store.read()
893 }
894
895 fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
896 self.store.write()
897 }
898}
899
900pub async fn broadcast<F, T>(
901 sender_id: ConnectionId,
902 receiver_ids: Vec<ConnectionId>,
903 mut f: F,
904) -> anyhow::Result<()>
905where
906 F: FnMut(ConnectionId) -> T,
907 T: Future<Output = anyhow::Result<()>>,
908{
909 let futures = receiver_ids
910 .into_iter()
911 .filter(|id| *id != sender_id)
912 .map(|id| f(id));
913 futures::future::try_join_all(futures).await?;
914 Ok(())
915}
916
917pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
918 let server = Server::new(app.state().clone(), rpc.clone(), None);
919 app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
920 let server = server.clone();
921 async move {
922 const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
923
924 let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
925 let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
926 let upgrade_requested = connection_upgrade && upgrade_to_websocket;
927 let client_protocol_version: Option<u32> = request
928 .header("X-Zed-Protocol-Version")
929 .and_then(|v| v.as_str().parse().ok());
930
931 if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
932 return Ok(Response::new(StatusCode::UpgradeRequired));
933 }
934
935 let header = match request.header("Sec-Websocket-Key") {
936 Some(h) => h.as_str(),
937 None => return Err(anyhow!("expected sec-websocket-key"))?,
938 };
939
940 let user_id = process_auth_header(&request).await?;
941
942 let mut response = Response::new(StatusCode::SwitchingProtocols);
943 response.insert_header(UPGRADE, "websocket");
944 response.insert_header(CONNECTION, "Upgrade");
945 let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
946 response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
947 response.insert_header("Sec-Websocket-Version", "13");
948
949 let http_res: &mut tide::http::Response = response.as_mut();
950 let upgrade_receiver = http_res.recv_upgrade().await;
951 let addr = request.remote().unwrap_or("unknown").to_string();
952 task::spawn(async move {
953 if let Some(stream) = upgrade_receiver.await {
954 server
955 .handle_connection(
956 Connection::new(
957 WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
958 ),
959 addr,
960 user_id,
961 None,
962 )
963 .await;
964 }
965 });
966
967 Ok(response)
968 }
969 });
970}
971
972fn header_contains_ignore_case<T>(
973 request: &tide::Request<T>,
974 header_name: HeaderName,
975 value: &str,
976) -> bool {
977 request
978 .header(header_name)
979 .map(|h| {
980 h.as_str()
981 .split(',')
982 .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
983 })
984 .unwrap_or(false)
985}
986
987#[cfg(test)]
988mod tests {
989 use super::*;
990 use crate::{
991 auth,
992 db::{tests::TestDb, UserId},
993 github, AppState, Config,
994 };
995 use ::rpc::Peer;
996 use async_std::task;
997 use gpui::{ModelHandle, TestAppContext};
998 use parking_lot::Mutex;
999 use postage::{mpsc, watch};
1000 use rpc::PeerId;
1001 use serde_json::json;
1002 use sqlx::types::time::OffsetDateTime;
1003 use std::{
1004 ops::Deref,
1005 path::Path,
1006 sync::{
1007 atomic::{AtomicBool, Ordering::SeqCst},
1008 Arc,
1009 },
1010 time::Duration,
1011 };
1012 use zed::{
1013 client::{
1014 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1015 EstablishConnectionError, UserStore,
1016 },
1017 editor::{Editor, EditorSettings, Input, MultiBuffer},
1018 fs::{FakeFs, Fs as _},
1019 language::{
1020 tree_sitter_rust, Diagnostic, DiagnosticEntry, Language, LanguageConfig,
1021 LanguageRegistry, LanguageServerConfig, Point,
1022 },
1023 lsp,
1024 project::Project,
1025 };
1026
1027 #[gpui::test]
1028 async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1029 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1030 let lang_registry = Arc::new(LanguageRegistry::new());
1031 let fs = Arc::new(FakeFs::new());
1032 cx_a.foreground().forbid_parking();
1033
1034 // Connect to a server as 2 clients.
1035 let mut server = TestServer::start().await;
1036 let client_a = server.create_client(&mut cx_a, "user_a").await;
1037 let client_b = server.create_client(&mut cx_b, "user_b").await;
1038
1039 // Share a project as client A
1040 fs.insert_tree(
1041 "/a",
1042 json!({
1043 ".zed.toml": r#"collaborators = ["user_b"]"#,
1044 "a.txt": "a-contents",
1045 "b.txt": "b-contents",
1046 }),
1047 )
1048 .await;
1049 let project_a = cx_a.update(|cx| {
1050 Project::local(
1051 client_a.clone(),
1052 client_a.user_store.clone(),
1053 lang_registry.clone(),
1054 fs.clone(),
1055 cx,
1056 )
1057 });
1058 let worktree_a = project_a
1059 .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1060 .await
1061 .unwrap();
1062 worktree_a
1063 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1064 .await;
1065 let project_id = project_a
1066 .update(&mut cx_a, |project, _| project.next_remote_id())
1067 .await;
1068 project_a
1069 .update(&mut cx_a, |project, cx| project.share(cx))
1070 .await
1071 .unwrap();
1072
1073 // Join that project as client B
1074 let project_b = Project::remote(
1075 project_id,
1076 client_b.clone(),
1077 client_b.user_store.clone(),
1078 lang_registry.clone(),
1079 fs.clone(),
1080 &mut cx_b.to_async(),
1081 )
1082 .await
1083 .unwrap();
1084 let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1085
1086 let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1087 assert_eq!(
1088 project
1089 .collaborators()
1090 .get(&client_a.peer_id)
1091 .unwrap()
1092 .user
1093 .github_login,
1094 "user_a"
1095 );
1096 project.replica_id()
1097 });
1098 project_a
1099 .condition(&cx_a, |tree, _| {
1100 tree.collaborators()
1101 .get(&client_b.peer_id)
1102 .map_or(false, |collaborator| {
1103 collaborator.replica_id == replica_id_b
1104 && collaborator.user.github_login == "user_b"
1105 })
1106 })
1107 .await;
1108
1109 // Open the same file as client B and client A.
1110 let buffer_b = worktree_b
1111 .update(&mut cx_b, |worktree, cx| worktree.open_buffer("b.txt", cx))
1112 .await
1113 .unwrap();
1114 let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1115 buffer_b.read_with(&cx_b, |buf, cx| {
1116 assert_eq!(buf.read(cx).text(), "b-contents")
1117 });
1118 worktree_a.read_with(&cx_a, |tree, cx| assert!(tree.has_open_buffer("b.txt", cx)));
1119 let buffer_a = worktree_a
1120 .update(&mut cx_a, |tree, cx| tree.open_buffer("b.txt", cx))
1121 .await
1122 .unwrap();
1123
1124 let editor_b = cx_b.add_view(window_b, |cx| {
1125 Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), cx)
1126 });
1127 // TODO
1128 // // Create a selection set as client B and see that selection set as client A.
1129 // buffer_a
1130 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1131 // .await;
1132
1133 // Edit the buffer as client B and see that edit as client A.
1134 editor_b.update(&mut cx_b, |editor, cx| {
1135 editor.handle_input(&Input("ok, ".into()), cx)
1136 });
1137 buffer_a
1138 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1139 .await;
1140
1141 // TODO
1142 // // Remove the selection set as client B, see those selections disappear as client A.
1143 cx_b.update(move |_| drop(editor_b));
1144 // buffer_a
1145 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1146 // .await;
1147
1148 // Close the buffer as client A, see that the buffer is closed.
1149 cx_a.update(move |_| drop(buffer_a));
1150 worktree_a
1151 .condition(&cx_a, |tree, cx| !tree.has_open_buffer("b.txt", cx))
1152 .await;
1153
1154 // Dropping the client B's project removes client B from client A's collaborators.
1155 cx_b.update(move |_| drop(project_b));
1156 project_a
1157 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1158 .await;
1159 }
1160
1161 #[gpui::test]
1162 async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1163 cx_b.update(zed::contacts_panel::init);
1164 let lang_registry = Arc::new(LanguageRegistry::new());
1165 let fs = Arc::new(FakeFs::new());
1166 cx_a.foreground().forbid_parking();
1167
1168 // Connect to a server as 2 clients.
1169 let mut server = TestServer::start().await;
1170 let client_a = server.create_client(&mut cx_a, "user_a").await;
1171 let client_b = server.create_client(&mut cx_b, "user_b").await;
1172
1173 // Share a project as client A
1174 fs.insert_tree(
1175 "/a",
1176 json!({
1177 ".zed.toml": r#"collaborators = ["user_b"]"#,
1178 "a.txt": "a-contents",
1179 "b.txt": "b-contents",
1180 }),
1181 )
1182 .await;
1183 let project_a = cx_a.update(|cx| {
1184 Project::local(
1185 client_a.clone(),
1186 client_a.user_store.clone(),
1187 lang_registry.clone(),
1188 fs.clone(),
1189 cx,
1190 )
1191 });
1192 let worktree_a = project_a
1193 .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1194 .await
1195 .unwrap();
1196 worktree_a
1197 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1198 .await;
1199 let project_id = project_a
1200 .update(&mut cx_a, |project, _| project.next_remote_id())
1201 .await;
1202 project_a
1203 .update(&mut cx_a, |project, cx| project.share(cx))
1204 .await
1205 .unwrap();
1206
1207 // Join that project as client B
1208 let project_b = Project::remote(
1209 project_id,
1210 client_b.clone(),
1211 client_b.user_store.clone(),
1212 lang_registry.clone(),
1213 fs.clone(),
1214 &mut cx_b.to_async(),
1215 )
1216 .await
1217 .unwrap();
1218
1219 let worktree_b = project_b.read_with(&cx_b, |p, _| p.worktrees()[0].clone());
1220 worktree_b
1221 .update(&mut cx_b, |tree, cx| tree.open_buffer("a.txt", cx))
1222 .await
1223 .unwrap();
1224
1225 project_a
1226 .update(&mut cx_a, |project, cx| project.unshare(cx))
1227 .await
1228 .unwrap();
1229 project_b
1230 .condition(&mut cx_b, |project, _| project.is_read_only())
1231 .await;
1232 }
1233
1234 #[gpui::test]
1235 async fn test_propagate_saves_and_fs_changes(
1236 mut cx_a: TestAppContext,
1237 mut cx_b: TestAppContext,
1238 mut cx_c: TestAppContext,
1239 ) {
1240 let lang_registry = Arc::new(LanguageRegistry::new());
1241 let fs = Arc::new(FakeFs::new());
1242 cx_a.foreground().forbid_parking();
1243
1244 // Connect to a server as 3 clients.
1245 let mut server = TestServer::start().await;
1246 let client_a = server.create_client(&mut cx_a, "user_a").await;
1247 let client_b = server.create_client(&mut cx_b, "user_b").await;
1248 let client_c = server.create_client(&mut cx_c, "user_c").await;
1249
1250 // Share a worktree as client A.
1251 fs.insert_tree(
1252 "/a",
1253 json!({
1254 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1255 "file1": "",
1256 "file2": ""
1257 }),
1258 )
1259 .await;
1260 let project_a = cx_a.update(|cx| {
1261 Project::local(
1262 client_a.clone(),
1263 client_a.user_store.clone(),
1264 lang_registry.clone(),
1265 fs.clone(),
1266 cx,
1267 )
1268 });
1269 let worktree_a = project_a
1270 .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1271 .await
1272 .unwrap();
1273 worktree_a
1274 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1275 .await;
1276 let project_id = project_a
1277 .update(&mut cx_a, |project, _| project.next_remote_id())
1278 .await;
1279 project_a
1280 .update(&mut cx_a, |project, cx| project.share(cx))
1281 .await
1282 .unwrap();
1283
1284 // Join that worktree as clients B and C.
1285 let project_b = Project::remote(
1286 project_id,
1287 client_b.clone(),
1288 client_b.user_store.clone(),
1289 lang_registry.clone(),
1290 fs.clone(),
1291 &mut cx_b.to_async(),
1292 )
1293 .await
1294 .unwrap();
1295 let project_c = Project::remote(
1296 project_id,
1297 client_c.clone(),
1298 client_c.user_store.clone(),
1299 lang_registry.clone(),
1300 fs.clone(),
1301 &mut cx_c.to_async(),
1302 )
1303 .await
1304 .unwrap();
1305
1306 // Open and edit a buffer as both guests B and C.
1307 let worktree_b = project_b.read_with(&cx_b, |p, _| p.worktrees()[0].clone());
1308 let worktree_c = project_c.read_with(&cx_c, |p, _| p.worktrees()[0].clone());
1309 let buffer_b = worktree_b
1310 .update(&mut cx_b, |tree, cx| tree.open_buffer("file1", cx))
1311 .await
1312 .unwrap();
1313 let buffer_c = worktree_c
1314 .update(&mut cx_c, |tree, cx| tree.open_buffer("file1", cx))
1315 .await
1316 .unwrap();
1317 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1318 buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1319
1320 // Open and edit that buffer as the host.
1321 let buffer_a = worktree_a
1322 .update(&mut cx_a, |tree, cx| tree.open_buffer("file1", cx))
1323 .await
1324 .unwrap();
1325
1326 buffer_a
1327 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1328 .await;
1329 buffer_a.update(&mut cx_a, |buf, cx| {
1330 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1331 });
1332
1333 // Wait for edits to propagate
1334 buffer_a
1335 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1336 .await;
1337 buffer_b
1338 .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1339 .await;
1340 buffer_c
1341 .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1342 .await;
1343
1344 // Edit the buffer as the host and concurrently save as guest B.
1345 let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx).unwrap());
1346 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1347 save_b.await.unwrap();
1348 assert_eq!(
1349 fs.load("/a/file1".as_ref()).await.unwrap(),
1350 "hi-a, i-am-c, i-am-b, i-am-a"
1351 );
1352 buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1353 buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1354 buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1355
1356 // Make changes on host's file system, see those changes on the guests.
1357 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
1358 .await
1359 .unwrap();
1360 fs.insert_file(Path::new("/a/file4"), "4".into())
1361 .await
1362 .unwrap();
1363
1364 worktree_b
1365 .condition(&cx_b, |tree, _| tree.file_count() == 4)
1366 .await;
1367 worktree_c
1368 .condition(&cx_c, |tree, _| tree.file_count() == 4)
1369 .await;
1370 worktree_b.read_with(&cx_b, |tree, _| {
1371 assert_eq!(
1372 tree.paths()
1373 .map(|p| p.to_string_lossy())
1374 .collect::<Vec<_>>(),
1375 &[".zed.toml", "file1", "file3", "file4"]
1376 )
1377 });
1378 worktree_c.read_with(&cx_c, |tree, _| {
1379 assert_eq!(
1380 tree.paths()
1381 .map(|p| p.to_string_lossy())
1382 .collect::<Vec<_>>(),
1383 &[".zed.toml", "file1", "file3", "file4"]
1384 )
1385 });
1386 }
1387
1388 #[gpui::test]
1389 async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1390 cx_a.foreground().forbid_parking();
1391 let lang_registry = Arc::new(LanguageRegistry::new());
1392 let fs = Arc::new(FakeFs::new());
1393
1394 // Connect to a server as 2 clients.
1395 let mut server = TestServer::start().await;
1396 let client_a = server.create_client(&mut cx_a, "user_a").await;
1397 let client_b = server.create_client(&mut cx_b, "user_b").await;
1398
1399 // Share a project as client A
1400 fs.insert_tree(
1401 "/dir",
1402 json!({
1403 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1404 "a.txt": "a-contents",
1405 }),
1406 )
1407 .await;
1408
1409 let project_a = cx_a.update(|cx| {
1410 Project::local(
1411 client_a.clone(),
1412 client_a.user_store.clone(),
1413 lang_registry.clone(),
1414 fs.clone(),
1415 cx,
1416 )
1417 });
1418 let worktree_a = project_a
1419 .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1420 .await
1421 .unwrap();
1422 worktree_a
1423 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1424 .await;
1425 let project_id = project_a
1426 .update(&mut cx_a, |project, _| project.next_remote_id())
1427 .await;
1428 project_a
1429 .update(&mut cx_a, |project, cx| project.share(cx))
1430 .await
1431 .unwrap();
1432
1433 // Join that project as client B
1434 let project_b = Project::remote(
1435 project_id,
1436 client_b.clone(),
1437 client_b.user_store.clone(),
1438 lang_registry.clone(),
1439 fs.clone(),
1440 &mut cx_b.to_async(),
1441 )
1442 .await
1443 .unwrap();
1444 let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1445
1446 // Open a buffer as client B
1447 let buffer_b = worktree_b
1448 .update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx))
1449 .await
1450 .unwrap();
1451 let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1452
1453 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1454 buffer_b.read_with(&cx_b, |buf, _| {
1455 assert!(buf.is_dirty());
1456 assert!(!buf.has_conflict());
1457 });
1458
1459 buffer_b
1460 .update(&mut cx_b, |buf, cx| buf.save(cx))
1461 .unwrap()
1462 .await
1463 .unwrap();
1464 worktree_b
1465 .condition(&cx_b, |_, cx| {
1466 buffer_b.read(cx).file().unwrap().mtime() != mtime
1467 })
1468 .await;
1469 buffer_b.read_with(&cx_b, |buf, _| {
1470 assert!(!buf.is_dirty());
1471 assert!(!buf.has_conflict());
1472 });
1473
1474 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1475 buffer_b.read_with(&cx_b, |buf, _| {
1476 assert!(buf.is_dirty());
1477 assert!(!buf.has_conflict());
1478 });
1479 }
1480
1481 #[gpui::test]
1482 async fn test_editing_while_guest_opens_buffer(
1483 mut cx_a: TestAppContext,
1484 mut cx_b: TestAppContext,
1485 ) {
1486 cx_a.foreground().forbid_parking();
1487 let lang_registry = Arc::new(LanguageRegistry::new());
1488 let fs = Arc::new(FakeFs::new());
1489
1490 // Connect to a server as 2 clients.
1491 let mut server = TestServer::start().await;
1492 let client_a = server.create_client(&mut cx_a, "user_a").await;
1493 let client_b = server.create_client(&mut cx_b, "user_b").await;
1494
1495 // Share a project as client A
1496 fs.insert_tree(
1497 "/dir",
1498 json!({
1499 ".zed.toml": r#"collaborators = ["user_b"]"#,
1500 "a.txt": "a-contents",
1501 }),
1502 )
1503 .await;
1504 let project_a = cx_a.update(|cx| {
1505 Project::local(
1506 client_a.clone(),
1507 client_a.user_store.clone(),
1508 lang_registry.clone(),
1509 fs.clone(),
1510 cx,
1511 )
1512 });
1513 let worktree_a = project_a
1514 .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1515 .await
1516 .unwrap();
1517 worktree_a
1518 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1519 .await;
1520 let project_id = project_a
1521 .update(&mut cx_a, |project, _| project.next_remote_id())
1522 .await;
1523 project_a
1524 .update(&mut cx_a, |project, cx| project.share(cx))
1525 .await
1526 .unwrap();
1527
1528 // Join that project as client B
1529 let project_b = Project::remote(
1530 project_id,
1531 client_b.clone(),
1532 client_b.user_store.clone(),
1533 lang_registry.clone(),
1534 fs.clone(),
1535 &mut cx_b.to_async(),
1536 )
1537 .await
1538 .unwrap();
1539 let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1540
1541 // Open a buffer as client A
1542 let buffer_a = worktree_a
1543 .update(&mut cx_a, |tree, cx| tree.open_buffer("a.txt", cx))
1544 .await
1545 .unwrap();
1546
1547 // Start opening the same buffer as client B
1548 let buffer_b = cx_b
1549 .background()
1550 .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1551 task::yield_now().await;
1552
1553 // Edit the buffer as client A while client B is still opening it.
1554 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1555
1556 let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1557 let buffer_b = buffer_b.await.unwrap();
1558 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1559 }
1560
1561 #[gpui::test]
1562 async fn test_leaving_worktree_while_opening_buffer(
1563 mut cx_a: TestAppContext,
1564 mut cx_b: TestAppContext,
1565 ) {
1566 cx_a.foreground().forbid_parking();
1567 let lang_registry = Arc::new(LanguageRegistry::new());
1568 let fs = Arc::new(FakeFs::new());
1569
1570 // Connect to a server as 2 clients.
1571 let mut server = TestServer::start().await;
1572 let client_a = server.create_client(&mut cx_a, "user_a").await;
1573 let client_b = server.create_client(&mut cx_b, "user_b").await;
1574
1575 // Share a project as client A
1576 fs.insert_tree(
1577 "/dir",
1578 json!({
1579 ".zed.toml": r#"collaborators = ["user_b"]"#,
1580 "a.txt": "a-contents",
1581 }),
1582 )
1583 .await;
1584 let project_a = cx_a.update(|cx| {
1585 Project::local(
1586 client_a.clone(),
1587 client_a.user_store.clone(),
1588 lang_registry.clone(),
1589 fs.clone(),
1590 cx,
1591 )
1592 });
1593 let worktree_a = project_a
1594 .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1595 .await
1596 .unwrap();
1597 worktree_a
1598 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1599 .await;
1600 let project_id = project_a
1601 .update(&mut cx_a, |project, _| project.next_remote_id())
1602 .await;
1603 project_a
1604 .update(&mut cx_a, |project, cx| project.share(cx))
1605 .await
1606 .unwrap();
1607
1608 // Join that project as client B
1609 let project_b = Project::remote(
1610 project_id,
1611 client_b.clone(),
1612 client_b.user_store.clone(),
1613 lang_registry.clone(),
1614 fs.clone(),
1615 &mut cx_b.to_async(),
1616 )
1617 .await
1618 .unwrap();
1619 let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1620
1621 // See that a guest has joined as client A.
1622 project_a
1623 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1624 .await;
1625
1626 // Begin opening a buffer as client B, but leave the project before the open completes.
1627 let buffer_b = cx_b
1628 .background()
1629 .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1630 cx_b.update(|_| drop(worktree_b));
1631 drop(buffer_b);
1632
1633 // See that the guest has left.
1634 project_a
1635 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1636 .await;
1637 }
1638
1639 #[gpui::test]
1640 async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1641 cx_a.foreground().forbid_parking();
1642 let lang_registry = Arc::new(LanguageRegistry::new());
1643 let fs = Arc::new(FakeFs::new());
1644
1645 // Connect to a server as 2 clients.
1646 let mut server = TestServer::start().await;
1647 let client_a = server.create_client(&mut cx_a, "user_a").await;
1648 let client_b = server.create_client(&mut cx_b, "user_b").await;
1649
1650 // Share a project as client A
1651 fs.insert_tree(
1652 "/a",
1653 json!({
1654 ".zed.toml": r#"collaborators = ["user_b"]"#,
1655 "a.txt": "a-contents",
1656 "b.txt": "b-contents",
1657 }),
1658 )
1659 .await;
1660 let project_a = cx_a.update(|cx| {
1661 Project::local(
1662 client_a.clone(),
1663 client_a.user_store.clone(),
1664 lang_registry.clone(),
1665 fs.clone(),
1666 cx,
1667 )
1668 });
1669 let worktree_a = project_a
1670 .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1671 .await
1672 .unwrap();
1673 worktree_a
1674 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1675 .await;
1676 let project_id = project_a
1677 .update(&mut cx_a, |project, _| project.next_remote_id())
1678 .await;
1679 project_a
1680 .update(&mut cx_a, |project, cx| project.share(cx))
1681 .await
1682 .unwrap();
1683
1684 // Join that project as client B
1685 let _project_b = Project::remote(
1686 project_id,
1687 client_b.clone(),
1688 client_b.user_store.clone(),
1689 lang_registry.clone(),
1690 fs.clone(),
1691 &mut cx_b.to_async(),
1692 )
1693 .await
1694 .unwrap();
1695
1696 // See that a guest has joined as client A.
1697 project_a
1698 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1699 .await;
1700
1701 // Drop client B's connection and ensure client A observes client B leaving the worktree.
1702 client_b.disconnect(&cx_b.to_async()).await.unwrap();
1703 project_a
1704 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1705 .await;
1706 }
1707
1708 #[gpui::test]
1709 async fn test_collaborating_with_diagnostics(
1710 mut cx_a: TestAppContext,
1711 mut cx_b: TestAppContext,
1712 ) {
1713 cx_a.foreground().forbid_parking();
1714 let mut lang_registry = Arc::new(LanguageRegistry::new());
1715 let fs = Arc::new(FakeFs::new());
1716
1717 // Set up a fake language server.
1718 let (language_server_config, mut fake_language_server) =
1719 LanguageServerConfig::fake(cx_a.background()).await;
1720 Arc::get_mut(&mut lang_registry)
1721 .unwrap()
1722 .add(Arc::new(Language::new(
1723 LanguageConfig {
1724 name: "Rust".to_string(),
1725 path_suffixes: vec!["rs".to_string()],
1726 language_server: Some(language_server_config),
1727 ..Default::default()
1728 },
1729 Some(tree_sitter_rust::language()),
1730 )));
1731
1732 // Connect to a server as 2 clients.
1733 let mut server = TestServer::start().await;
1734 let client_a = server.create_client(&mut cx_a, "user_a").await;
1735 let client_b = server.create_client(&mut cx_b, "user_b").await;
1736
1737 // Share a project as client A
1738 fs.insert_tree(
1739 "/a",
1740 json!({
1741 ".zed.toml": r#"collaborators = ["user_b"]"#,
1742 "a.rs": "let one = two",
1743 "other.rs": "",
1744 }),
1745 )
1746 .await;
1747 let project_a = cx_a.update(|cx| {
1748 Project::local(
1749 client_a.clone(),
1750 client_a.user_store.clone(),
1751 lang_registry.clone(),
1752 fs.clone(),
1753 cx,
1754 )
1755 });
1756 let worktree_a = project_a
1757 .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1758 .await
1759 .unwrap();
1760 worktree_a
1761 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1762 .await;
1763 let project_id = project_a
1764 .update(&mut cx_a, |project, _| project.next_remote_id())
1765 .await;
1766 project_a
1767 .update(&mut cx_a, |project, cx| project.share(cx))
1768 .await
1769 .unwrap();
1770
1771 // Cause the language server to start.
1772 let _ = cx_a
1773 .background()
1774 .spawn(worktree_a.update(&mut cx_a, |worktree, cx| {
1775 worktree.open_buffer("other.rs", cx)
1776 }))
1777 .await
1778 .unwrap();
1779
1780 // Simulate a language server reporting errors for a file.
1781 fake_language_server
1782 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1783 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1784 version: None,
1785 diagnostics: vec![
1786 lsp::Diagnostic {
1787 severity: Some(lsp::DiagnosticSeverity::ERROR),
1788 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1789 message: "message 1".to_string(),
1790 ..Default::default()
1791 },
1792 lsp::Diagnostic {
1793 severity: Some(lsp::DiagnosticSeverity::WARNING),
1794 range: lsp::Range::new(
1795 lsp::Position::new(0, 10),
1796 lsp::Position::new(0, 13),
1797 ),
1798 message: "message 2".to_string(),
1799 ..Default::default()
1800 },
1801 ],
1802 })
1803 .await;
1804
1805 // Join the worktree as client B.
1806 let project_b = Project::remote(
1807 project_id,
1808 client_b.clone(),
1809 client_b.user_store.clone(),
1810 lang_registry.clone(),
1811 fs.clone(),
1812 &mut cx_b.to_async(),
1813 )
1814 .await
1815 .unwrap();
1816 let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1817
1818 // Open the file with the errors.
1819 let buffer_b = cx_b
1820 .background()
1821 .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.rs", cx)))
1822 .await
1823 .unwrap();
1824
1825 buffer_b.read_with(&cx_b, |buffer, _| {
1826 assert_eq!(
1827 buffer
1828 .snapshot()
1829 .diagnostics_in_range::<_, Point>(0..buffer.len())
1830 .collect::<Vec<_>>(),
1831 &[
1832 DiagnosticEntry {
1833 range: Point::new(0, 4)..Point::new(0, 7),
1834 diagnostic: Diagnostic {
1835 group_id: 0,
1836 message: "message 1".to_string(),
1837 severity: lsp::DiagnosticSeverity::ERROR,
1838 is_primary: true,
1839 ..Default::default()
1840 }
1841 },
1842 DiagnosticEntry {
1843 range: Point::new(0, 10)..Point::new(0, 13),
1844 diagnostic: Diagnostic {
1845 group_id: 1,
1846 severity: lsp::DiagnosticSeverity::WARNING,
1847 message: "message 2".to_string(),
1848 is_primary: true,
1849 ..Default::default()
1850 }
1851 }
1852 ]
1853 );
1854 });
1855 }
1856
1857 #[gpui::test]
1858 async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1859 cx_a.foreground().forbid_parking();
1860
1861 // Connect to a server as 2 clients.
1862 let mut server = TestServer::start().await;
1863 let client_a = server.create_client(&mut cx_a, "user_a").await;
1864 let client_b = server.create_client(&mut cx_b, "user_b").await;
1865
1866 // Create an org that includes these 2 users.
1867 let db = &server.app_state.db;
1868 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1869 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
1870 .await
1871 .unwrap();
1872 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
1873 .await
1874 .unwrap();
1875
1876 // Create a channel that includes all the users.
1877 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1878 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
1879 .await
1880 .unwrap();
1881 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
1882 .await
1883 .unwrap();
1884 db.create_channel_message(
1885 channel_id,
1886 client_b.current_user_id(&cx_b),
1887 "hello A, it's B.",
1888 OffsetDateTime::now_utc(),
1889 1,
1890 )
1891 .await
1892 .unwrap();
1893
1894 let channels_a = cx_a
1895 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
1896 channels_a
1897 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
1898 .await;
1899 channels_a.read_with(&cx_a, |list, _| {
1900 assert_eq!(
1901 list.available_channels().unwrap(),
1902 &[ChannelDetails {
1903 id: channel_id.to_proto(),
1904 name: "test-channel".to_string()
1905 }]
1906 )
1907 });
1908 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
1909 this.get_channel(channel_id.to_proto(), cx).unwrap()
1910 });
1911 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
1912 channel_a
1913 .condition(&cx_a, |channel, _| {
1914 channel_messages(channel)
1915 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1916 })
1917 .await;
1918
1919 let channels_b = cx_b
1920 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
1921 channels_b
1922 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
1923 .await;
1924 channels_b.read_with(&cx_b, |list, _| {
1925 assert_eq!(
1926 list.available_channels().unwrap(),
1927 &[ChannelDetails {
1928 id: channel_id.to_proto(),
1929 name: "test-channel".to_string()
1930 }]
1931 )
1932 });
1933
1934 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
1935 this.get_channel(channel_id.to_proto(), cx).unwrap()
1936 });
1937 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
1938 channel_b
1939 .condition(&cx_b, |channel, _| {
1940 channel_messages(channel)
1941 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1942 })
1943 .await;
1944
1945 channel_a
1946 .update(&mut cx_a, |channel, cx| {
1947 channel
1948 .send_message("oh, hi B.".to_string(), cx)
1949 .unwrap()
1950 .detach();
1951 let task = channel.send_message("sup".to_string(), cx).unwrap();
1952 assert_eq!(
1953 channel_messages(channel),
1954 &[
1955 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
1956 ("user_a".to_string(), "oh, hi B.".to_string(), true),
1957 ("user_a".to_string(), "sup".to_string(), true)
1958 ]
1959 );
1960 task
1961 })
1962 .await
1963 .unwrap();
1964
1965 channel_b
1966 .condition(&cx_b, |channel, _| {
1967 channel_messages(channel)
1968 == [
1969 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
1970 ("user_a".to_string(), "oh, hi B.".to_string(), false),
1971 ("user_a".to_string(), "sup".to_string(), false),
1972 ]
1973 })
1974 .await;
1975
1976 assert_eq!(
1977 server
1978 .state()
1979 .await
1980 .channel(channel_id)
1981 .unwrap()
1982 .connection_ids
1983 .len(),
1984 2
1985 );
1986 cx_b.update(|_| drop(channel_b));
1987 server
1988 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
1989 .await;
1990
1991 cx_a.update(|_| drop(channel_a));
1992 server
1993 .condition(|state| state.channel(channel_id).is_none())
1994 .await;
1995 }
1996
1997 #[gpui::test]
1998 async fn test_chat_message_validation(mut cx_a: TestAppContext) {
1999 cx_a.foreground().forbid_parking();
2000
2001 let mut server = TestServer::start().await;
2002 let client_a = server.create_client(&mut cx_a, "user_a").await;
2003
2004 let db = &server.app_state.db;
2005 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2006 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2007 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2008 .await
2009 .unwrap();
2010 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2011 .await
2012 .unwrap();
2013
2014 let channels_a = cx_a
2015 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2016 channels_a
2017 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2018 .await;
2019 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2020 this.get_channel(channel_id.to_proto(), cx).unwrap()
2021 });
2022
2023 // Messages aren't allowed to be too long.
2024 channel_a
2025 .update(&mut cx_a, |channel, cx| {
2026 let long_body = "this is long.\n".repeat(1024);
2027 channel.send_message(long_body, cx).unwrap()
2028 })
2029 .await
2030 .unwrap_err();
2031
2032 // Messages aren't allowed to be blank.
2033 channel_a.update(&mut cx_a, |channel, cx| {
2034 channel.send_message(String::new(), cx).unwrap_err()
2035 });
2036
2037 // Leading and trailing whitespace are trimmed.
2038 channel_a
2039 .update(&mut cx_a, |channel, cx| {
2040 channel
2041 .send_message("\n surrounded by whitespace \n".to_string(), cx)
2042 .unwrap()
2043 })
2044 .await
2045 .unwrap();
2046 assert_eq!(
2047 db.get_channel_messages(channel_id, 10, None)
2048 .await
2049 .unwrap()
2050 .iter()
2051 .map(|m| &m.body)
2052 .collect::<Vec<_>>(),
2053 &["surrounded by whitespace"]
2054 );
2055 }
2056
2057 #[gpui::test]
2058 async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2059 cx_a.foreground().forbid_parking();
2060
2061 // Connect to a server as 2 clients.
2062 let mut server = TestServer::start().await;
2063 let client_a = server.create_client(&mut cx_a, "user_a").await;
2064 let client_b = server.create_client(&mut cx_b, "user_b").await;
2065 let mut status_b = client_b.status();
2066
2067 // Create an org that includes these 2 users.
2068 let db = &server.app_state.db;
2069 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2070 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2071 .await
2072 .unwrap();
2073 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2074 .await
2075 .unwrap();
2076
2077 // Create a channel that includes all the users.
2078 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2079 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2080 .await
2081 .unwrap();
2082 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2083 .await
2084 .unwrap();
2085 db.create_channel_message(
2086 channel_id,
2087 client_b.current_user_id(&cx_b),
2088 "hello A, it's B.",
2089 OffsetDateTime::now_utc(),
2090 2,
2091 )
2092 .await
2093 .unwrap();
2094
2095 let channels_a = cx_a
2096 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2097 channels_a
2098 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2099 .await;
2100
2101 channels_a.read_with(&cx_a, |list, _| {
2102 assert_eq!(
2103 list.available_channels().unwrap(),
2104 &[ChannelDetails {
2105 id: channel_id.to_proto(),
2106 name: "test-channel".to_string()
2107 }]
2108 )
2109 });
2110 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2111 this.get_channel(channel_id.to_proto(), cx).unwrap()
2112 });
2113 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2114 channel_a
2115 .condition(&cx_a, |channel, _| {
2116 channel_messages(channel)
2117 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2118 })
2119 .await;
2120
2121 let channels_b = cx_b
2122 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2123 channels_b
2124 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2125 .await;
2126 channels_b.read_with(&cx_b, |list, _| {
2127 assert_eq!(
2128 list.available_channels().unwrap(),
2129 &[ChannelDetails {
2130 id: channel_id.to_proto(),
2131 name: "test-channel".to_string()
2132 }]
2133 )
2134 });
2135
2136 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2137 this.get_channel(channel_id.to_proto(), cx).unwrap()
2138 });
2139 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2140 channel_b
2141 .condition(&cx_b, |channel, _| {
2142 channel_messages(channel)
2143 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2144 })
2145 .await;
2146
2147 // Disconnect client B, ensuring we can still access its cached channel data.
2148 server.forbid_connections();
2149 server.disconnect_client(client_b.current_user_id(&cx_b));
2150 while !matches!(
2151 status_b.recv().await,
2152 Some(client::Status::ReconnectionError { .. })
2153 ) {}
2154
2155 channels_b.read_with(&cx_b, |channels, _| {
2156 assert_eq!(
2157 channels.available_channels().unwrap(),
2158 [ChannelDetails {
2159 id: channel_id.to_proto(),
2160 name: "test-channel".to_string()
2161 }]
2162 )
2163 });
2164 channel_b.read_with(&cx_b, |channel, _| {
2165 assert_eq!(
2166 channel_messages(channel),
2167 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2168 )
2169 });
2170
2171 // Send a message from client B while it is disconnected.
2172 channel_b
2173 .update(&mut cx_b, |channel, cx| {
2174 let task = channel
2175 .send_message("can you see this?".to_string(), cx)
2176 .unwrap();
2177 assert_eq!(
2178 channel_messages(channel),
2179 &[
2180 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2181 ("user_b".to_string(), "can you see this?".to_string(), true)
2182 ]
2183 );
2184 task
2185 })
2186 .await
2187 .unwrap_err();
2188
2189 // Send a message from client A while B is disconnected.
2190 channel_a
2191 .update(&mut cx_a, |channel, cx| {
2192 channel
2193 .send_message("oh, hi B.".to_string(), cx)
2194 .unwrap()
2195 .detach();
2196 let task = channel.send_message("sup".to_string(), cx).unwrap();
2197 assert_eq!(
2198 channel_messages(channel),
2199 &[
2200 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2201 ("user_a".to_string(), "oh, hi B.".to_string(), true),
2202 ("user_a".to_string(), "sup".to_string(), true)
2203 ]
2204 );
2205 task
2206 })
2207 .await
2208 .unwrap();
2209
2210 // Give client B a chance to reconnect.
2211 server.allow_connections();
2212 cx_b.foreground().advance_clock(Duration::from_secs(10));
2213
2214 // Verify that B sees the new messages upon reconnection, as well as the message client B
2215 // sent while offline.
2216 channel_b
2217 .condition(&cx_b, |channel, _| {
2218 channel_messages(channel)
2219 == [
2220 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2221 ("user_a".to_string(), "oh, hi B.".to_string(), false),
2222 ("user_a".to_string(), "sup".to_string(), false),
2223 ("user_b".to_string(), "can you see this?".to_string(), false),
2224 ]
2225 })
2226 .await;
2227
2228 // Ensure client A and B can communicate normally after reconnection.
2229 channel_a
2230 .update(&mut cx_a, |channel, cx| {
2231 channel.send_message("you online?".to_string(), cx).unwrap()
2232 })
2233 .await
2234 .unwrap();
2235 channel_b
2236 .condition(&cx_b, |channel, _| {
2237 channel_messages(channel)
2238 == [
2239 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2240 ("user_a".to_string(), "oh, hi B.".to_string(), false),
2241 ("user_a".to_string(), "sup".to_string(), false),
2242 ("user_b".to_string(), "can you see this?".to_string(), false),
2243 ("user_a".to_string(), "you online?".to_string(), false),
2244 ]
2245 })
2246 .await;
2247
2248 channel_b
2249 .update(&mut cx_b, |channel, cx| {
2250 channel.send_message("yep".to_string(), cx).unwrap()
2251 })
2252 .await
2253 .unwrap();
2254 channel_a
2255 .condition(&cx_a, |channel, _| {
2256 channel_messages(channel)
2257 == [
2258 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2259 ("user_a".to_string(), "oh, hi B.".to_string(), false),
2260 ("user_a".to_string(), "sup".to_string(), false),
2261 ("user_b".to_string(), "can you see this?".to_string(), false),
2262 ("user_a".to_string(), "you online?".to_string(), false),
2263 ("user_b".to_string(), "yep".to_string(), false),
2264 ]
2265 })
2266 .await;
2267 }
2268
2269 #[gpui::test]
2270 async fn test_contacts(
2271 mut cx_a: TestAppContext,
2272 mut cx_b: TestAppContext,
2273 mut cx_c: TestAppContext,
2274 ) {
2275 cx_a.foreground().forbid_parking();
2276 let lang_registry = Arc::new(LanguageRegistry::new());
2277 let fs = Arc::new(FakeFs::new());
2278
2279 // Connect to a server as 3 clients.
2280 let mut server = TestServer::start().await;
2281 let client_a = server.create_client(&mut cx_a, "user_a").await;
2282 let client_b = server.create_client(&mut cx_b, "user_b").await;
2283 let client_c = server.create_client(&mut cx_c, "user_c").await;
2284
2285 // Share a worktree as client A.
2286 fs.insert_tree(
2287 "/a",
2288 json!({
2289 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2290 }),
2291 )
2292 .await;
2293
2294 let project_a = cx_a.update(|cx| {
2295 Project::local(
2296 client_a.clone(),
2297 client_a.user_store.clone(),
2298 lang_registry.clone(),
2299 fs.clone(),
2300 cx,
2301 )
2302 });
2303 let worktree_a = project_a
2304 .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
2305 .await
2306 .unwrap();
2307 worktree_a
2308 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2309 .await;
2310
2311 client_a
2312 .user_store
2313 .condition(&cx_a, |user_store, _| {
2314 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2315 })
2316 .await;
2317 client_b
2318 .user_store
2319 .condition(&cx_b, |user_store, _| {
2320 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2321 })
2322 .await;
2323 client_c
2324 .user_store
2325 .condition(&cx_c, |user_store, _| {
2326 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2327 })
2328 .await;
2329
2330 let project_id = project_a
2331 .update(&mut cx_a, |project, _| project.next_remote_id())
2332 .await;
2333 project_a
2334 .update(&mut cx_a, |project, cx| project.share(cx))
2335 .await
2336 .unwrap();
2337
2338 let _project_b = Project::remote(
2339 project_id,
2340 client_b.clone(),
2341 client_b.user_store.clone(),
2342 lang_registry.clone(),
2343 fs.clone(),
2344 &mut cx_b.to_async(),
2345 )
2346 .await
2347 .unwrap();
2348
2349 client_a
2350 .user_store
2351 .condition(&cx_a, |user_store, _| {
2352 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2353 })
2354 .await;
2355 client_b
2356 .user_store
2357 .condition(&cx_b, |user_store, _| {
2358 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2359 })
2360 .await;
2361 client_c
2362 .user_store
2363 .condition(&cx_c, |user_store, _| {
2364 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2365 })
2366 .await;
2367
2368 project_a
2369 .condition(&cx_a, |project, _| {
2370 project.collaborators().contains_key(&client_b.peer_id)
2371 })
2372 .await;
2373
2374 cx_a.update(move |_| drop(project_a));
2375 client_a
2376 .user_store
2377 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
2378 .await;
2379 client_b
2380 .user_store
2381 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
2382 .await;
2383 client_c
2384 .user_store
2385 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
2386 .await;
2387
2388 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
2389 user_store
2390 .contacts()
2391 .iter()
2392 .map(|contact| {
2393 let worktrees = contact
2394 .projects
2395 .iter()
2396 .map(|p| {
2397 (
2398 p.worktree_root_names[0].as_str(),
2399 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
2400 )
2401 })
2402 .collect();
2403 (contact.user.github_login.as_str(), worktrees)
2404 })
2405 .collect()
2406 }
2407 }
2408
2409 struct TestServer {
2410 peer: Arc<Peer>,
2411 app_state: Arc<AppState>,
2412 server: Arc<Server>,
2413 notifications: mpsc::Receiver<()>,
2414 connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
2415 forbid_connections: Arc<AtomicBool>,
2416 _test_db: TestDb,
2417 }
2418
2419 impl TestServer {
2420 async fn start() -> Self {
2421 let test_db = TestDb::new();
2422 let app_state = Self::build_app_state(&test_db).await;
2423 let peer = Peer::new();
2424 let notifications = mpsc::channel(128);
2425 let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
2426 Self {
2427 peer,
2428 app_state,
2429 server,
2430 notifications: notifications.1,
2431 connection_killers: Default::default(),
2432 forbid_connections: Default::default(),
2433 _test_db: test_db,
2434 }
2435 }
2436
2437 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
2438 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
2439 let client_name = name.to_string();
2440 let mut client = Client::new();
2441 let server = self.server.clone();
2442 let connection_killers = self.connection_killers.clone();
2443 let forbid_connections = self.forbid_connections.clone();
2444 let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
2445
2446 Arc::get_mut(&mut client)
2447 .unwrap()
2448 .override_authenticate(move |cx| {
2449 cx.spawn(|_| async move {
2450 let access_token = "the-token".to_string();
2451 Ok(Credentials {
2452 user_id: user_id.0 as u64,
2453 access_token,
2454 })
2455 })
2456 })
2457 .override_establish_connection(move |credentials, cx| {
2458 assert_eq!(credentials.user_id, user_id.0 as u64);
2459 assert_eq!(credentials.access_token, "the-token");
2460
2461 let server = server.clone();
2462 let connection_killers = connection_killers.clone();
2463 let forbid_connections = forbid_connections.clone();
2464 let client_name = client_name.clone();
2465 let connection_id_tx = connection_id_tx.clone();
2466 cx.spawn(move |cx| async move {
2467 if forbid_connections.load(SeqCst) {
2468 Err(EstablishConnectionError::other(anyhow!(
2469 "server is forbidding connections"
2470 )))
2471 } else {
2472 let (client_conn, server_conn, kill_conn) = Connection::in_memory();
2473 connection_killers.lock().insert(user_id, kill_conn);
2474 cx.background()
2475 .spawn(server.handle_connection(
2476 server_conn,
2477 client_name,
2478 user_id,
2479 Some(connection_id_tx),
2480 ))
2481 .detach();
2482 Ok(client_conn)
2483 }
2484 })
2485 });
2486
2487 let http = FakeHttpClient::new(|_| async move { Ok(surf::http::Response::new(404)) });
2488 client
2489 .authenticate_and_connect(&cx.to_async())
2490 .await
2491 .unwrap();
2492
2493 let peer_id = PeerId(connection_id_rx.recv().await.unwrap().0);
2494 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
2495 let mut authed_user =
2496 user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
2497 while authed_user.recv().await.unwrap().is_none() {}
2498
2499 TestClient {
2500 client,
2501 peer_id,
2502 user_store,
2503 }
2504 }
2505
2506 fn disconnect_client(&self, user_id: UserId) {
2507 if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
2508 let _ = kill_conn.try_send(Some(()));
2509 }
2510 }
2511
2512 fn forbid_connections(&self) {
2513 self.forbid_connections.store(true, SeqCst);
2514 }
2515
2516 fn allow_connections(&self) {
2517 self.forbid_connections.store(false, SeqCst);
2518 }
2519
2520 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
2521 let mut config = Config::default();
2522 config.session_secret = "a".repeat(32);
2523 config.database_url = test_db.url.clone();
2524 let github_client = github::AppClient::test();
2525 Arc::new(AppState {
2526 db: test_db.db().clone(),
2527 handlebars: Default::default(),
2528 auth_client: auth::build_client("", ""),
2529 repo_client: github::RepoClient::test(&github_client),
2530 github_client,
2531 config,
2532 })
2533 }
2534
2535 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
2536 self.server.store.read()
2537 }
2538
2539 async fn condition<F>(&mut self, mut predicate: F)
2540 where
2541 F: FnMut(&Store) -> bool,
2542 {
2543 async_std::future::timeout(Duration::from_millis(500), async {
2544 while !(predicate)(&*self.server.store.read()) {
2545 self.notifications.recv().await;
2546 }
2547 })
2548 .await
2549 .expect("condition timed out");
2550 }
2551 }
2552
2553 impl Drop for TestServer {
2554 fn drop(&mut self) {
2555 task::block_on(self.peer.reset());
2556 }
2557 }
2558
2559 struct TestClient {
2560 client: Arc<Client>,
2561 pub peer_id: PeerId,
2562 pub user_store: ModelHandle<UserStore>,
2563 }
2564
2565 impl Deref for TestClient {
2566 type Target = Arc<Client>;
2567
2568 fn deref(&self) -> &Self::Target {
2569 &self.client
2570 }
2571 }
2572
2573 impl TestClient {
2574 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
2575 UserId::from_proto(
2576 self.user_store
2577 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
2578 )
2579 }
2580 }
2581
2582 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
2583 channel
2584 .messages()
2585 .cursor::<()>()
2586 .map(|m| {
2587 (
2588 m.sender.github_login.clone(),
2589 m.body.clone(),
2590 m.is_pending(),
2591 )
2592 })
2593 .collect()
2594 }
2595
2596 struct EmptyView;
2597
2598 impl gpui::Entity for EmptyView {
2599 type Event = ();
2600 }
2601
2602 impl gpui::View for EmptyView {
2603 fn ui_name() -> &'static str {
2604 "empty view"
2605 }
2606
2607 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
2608 gpui::Element::boxed(gpui::elements::Empty)
2609 }
2610 }
2611}