1mod store;
2
3use super::{
4 auth::process_auth_header,
5 db::{ChannelId, MessageId, UserId},
6 AppState,
7};
8use anyhow::anyhow;
9use async_std::task;
10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
11use collections::{HashMap, HashSet};
12use futures::{future::BoxFuture, FutureExt};
13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
14use postage::{mpsc, prelude::Sink as _, prelude::Stream as _};
15use rpc::{
16 proto::{self, AnyTypedEnvelope, EnvelopedMessage},
17 Connection, ConnectionId, Peer, TypedEnvelope,
18};
19use sha1::{Digest as _, Sha1};
20use std::{any::TypeId, future::Future, mem, sync::Arc, time::Instant};
21use store::{Store, Worktree};
22use surf::StatusCode;
23use tide::log;
24use tide::{
25 http::headers::{HeaderName, CONNECTION, UPGRADE},
26 Request, Response,
27};
28use time::OffsetDateTime;
29
30type MessageHandler = Box<
31 dyn Send
32 + Sync
33 + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
34>;
35
36pub struct Server {
37 peer: Arc<Peer>,
38 store: RwLock<Store>,
39 app_state: Arc<AppState>,
40 handlers: HashMap<TypeId, MessageHandler>,
41 notifications: Option<mpsc::Sender<()>>,
42}
43
44const MESSAGE_COUNT_PER_PAGE: usize = 100;
45const MAX_MESSAGE_LEN: usize = 1024;
46const NO_SUCH_PROJECT: &'static str = "no such project";
47
48impl Server {
49 pub fn new(
50 app_state: Arc<AppState>,
51 peer: Arc<Peer>,
52 notifications: Option<mpsc::Sender<()>>,
53 ) -> Arc<Self> {
54 let mut server = Self {
55 peer,
56 app_state,
57 store: Default::default(),
58 handlers: Default::default(),
59 notifications,
60 };
61
62 server
63 .add_handler(Server::ping)
64 .add_handler(Server::register_project)
65 .add_handler(Server::unregister_project)
66 .add_handler(Server::share_project)
67 .add_handler(Server::unshare_project)
68 .add_handler(Server::join_project)
69 .add_handler(Server::leave_project)
70 .add_handler(Server::register_worktree)
71 .add_handler(Server::unregister_worktree)
72 .add_handler(Server::share_worktree)
73 .add_handler(Server::update_worktree)
74 .add_handler(Server::open_buffer)
75 .add_handler(Server::close_buffer)
76 .add_handler(Server::update_buffer)
77 .add_handler(Server::buffer_saved)
78 .add_handler(Server::save_buffer)
79 .add_handler(Server::get_channels)
80 .add_handler(Server::get_users)
81 .add_handler(Server::join_channel)
82 .add_handler(Server::leave_channel)
83 .add_handler(Server::send_channel_message)
84 .add_handler(Server::get_channel_messages);
85
86 Arc::new(server)
87 }
88
89 fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
90 where
91 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
92 Fut: 'static + Send + Future<Output = tide::Result<()>>,
93 M: EnvelopedMessage,
94 {
95 let prev_handler = self.handlers.insert(
96 TypeId::of::<M>(),
97 Box::new(move |server, envelope| {
98 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
99 (handler)(server, *envelope).boxed()
100 }),
101 );
102 if prev_handler.is_some() {
103 panic!("registered a handler for the same message twice");
104 }
105 self
106 }
107
108 pub fn handle_connection(
109 self: &Arc<Self>,
110 connection: Connection,
111 addr: String,
112 user_id: UserId,
113 mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
114 ) -> impl Future<Output = ()> {
115 let mut this = self.clone();
116 async move {
117 let (connection_id, handle_io, mut incoming_rx) =
118 this.peer.add_connection(connection).await;
119
120 if let Some(send_connection_id) = send_connection_id.as_mut() {
121 let _ = send_connection_id.send(connection_id).await;
122 }
123
124 this.state_mut().add_connection(connection_id, user_id);
125 if let Err(err) = this.update_contacts_for_users(&[user_id]).await {
126 log::error!("error updating contacts for {:?}: {}", user_id, err);
127 }
128
129 let handle_io = handle_io.fuse();
130 futures::pin_mut!(handle_io);
131 loop {
132 let next_message = incoming_rx.recv().fuse();
133 futures::pin_mut!(next_message);
134 futures::select_biased! {
135 message = next_message => {
136 if let Some(message) = message {
137 let start_time = Instant::now();
138 log::info!("RPC message received: {}", message.payload_type_name());
139 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
140 if let Err(err) = (handler)(this.clone(), message).await {
141 log::error!("error handling message: {:?}", err);
142 } else {
143 log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
144 }
145
146 if let Some(mut notifications) = this.notifications.clone() {
147 let _ = notifications.send(()).await;
148 }
149 } else {
150 log::warn!("unhandled message: {}", message.payload_type_name());
151 }
152 } else {
153 log::info!("rpc connection closed {:?}", addr);
154 break;
155 }
156 }
157 handle_io = handle_io => {
158 if let Err(err) = handle_io {
159 log::error!("error handling rpc connection {:?} - {:?}", addr, err);
160 }
161 break;
162 }
163 }
164 }
165
166 if let Err(err) = this.sign_out(connection_id).await {
167 log::error!("error signing out connection {:?} - {:?}", addr, err);
168 }
169 }
170 }
171
172 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
173 self.peer.disconnect(connection_id).await;
174 let removed_connection = self.state_mut().remove_connection(connection_id)?;
175
176 for (project_id, project) in removed_connection.hosted_projects {
177 if let Some(share) = project.share {
178 broadcast(
179 connection_id,
180 share.guests.keys().copied().collect(),
181 |conn_id| {
182 self.peer
183 .send(conn_id, proto::UnshareProject { project_id })
184 },
185 )
186 .await?;
187 }
188 }
189
190 for (project_id, peer_ids) in removed_connection.guest_project_ids {
191 broadcast(connection_id, peer_ids, |conn_id| {
192 self.peer.send(
193 conn_id,
194 proto::RemoveProjectCollaborator {
195 project_id,
196 peer_id: connection_id.0,
197 },
198 )
199 })
200 .await?;
201 }
202
203 self.update_contacts_for_users(removed_connection.contact_ids.iter())
204 .await?;
205
206 Ok(())
207 }
208
209 async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
210 self.peer.respond(request.receipt(), proto::Ack {}).await?;
211 Ok(())
212 }
213
214 async fn register_project(
215 mut self: Arc<Server>,
216 request: TypedEnvelope<proto::RegisterProject>,
217 ) -> tide::Result<()> {
218 let mut state = self.state_mut();
219 let user_id = state.user_id_for_connection(request.sender_id)?;
220 state.register_project(request.sender_id, user_id);
221 Ok(())
222 }
223
224 async fn unregister_project(
225 mut self: Arc<Server>,
226 request: TypedEnvelope<proto::UnregisterProject>,
227 ) -> tide::Result<()> {
228 self.state_mut()
229 .unregister_project(request.payload.project_id, request.sender_id);
230 Ok(())
231 }
232
233 async fn share_project(
234 mut self: Arc<Server>,
235 request: TypedEnvelope<proto::ShareProject>,
236 ) -> tide::Result<()> {
237 self.state_mut()
238 .share_project(request.payload.project_id, request.sender_id);
239 Ok(())
240 }
241
242 async fn unshare_project(
243 mut self: Arc<Server>,
244 request: TypedEnvelope<proto::UnshareProject>,
245 ) -> tide::Result<()> {
246 let project_id = request.payload.project_id;
247 let project = self
248 .state_mut()
249 .unshare_project(project_id, request.sender_id)?;
250
251 broadcast(request.sender_id, project.connection_ids, |conn_id| {
252 self.peer
253 .send(conn_id, proto::UnshareProject { project_id })
254 })
255 .await?;
256 self.update_contacts_for_users(&project.authorized_user_ids)
257 .await?;
258
259 Ok(())
260 }
261
262 async fn join_project(
263 mut self: Arc<Server>,
264 request: TypedEnvelope<proto::JoinProject>,
265 ) -> tide::Result<()> {
266 let project_id = request.payload.project_id;
267
268 let user_id = self.state().user_id_for_connection(request.sender_id)?;
269 let response_data = self
270 .state_mut()
271 .join_project(request.sender_id, user_id, project_id)
272 .and_then(|joined| {
273 let share = joined.project.share()?;
274 let peer_count = share.guests.len();
275 let mut collaborators = Vec::with_capacity(peer_count);
276 collaborators.push(proto::Collaborator {
277 peer_id: joined.project.host_connection_id.0,
278 replica_id: 0,
279 user_id: joined.project.host_user_id.to_proto(),
280 });
281 let worktrees = joined
282 .project
283 .worktrees
284 .values()
285 .filter_map(|worktree| {
286 worktree.share.as_ref().map(|share| proto::Worktree {
287 id: project_id,
288 root_name: worktree.root_name.clone(),
289 entries: share.entries.values().cloned().collect(),
290 })
291 })
292 .collect();
293 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
294 if *peer_conn_id != request.sender_id {
295 collaborators.push(proto::Collaborator {
296 peer_id: peer_conn_id.0,
297 replica_id: *peer_replica_id as u32,
298 user_id: peer_user_id.to_proto(),
299 });
300 }
301 }
302 let response = proto::JoinProjectResponse {
303 worktrees,
304 replica_id: joined.replica_id as u32,
305 collaborators,
306 };
307 let connection_ids = joined.project.connection_ids();
308 let contact_user_ids = joined.project.authorized_user_ids();
309 Ok((response, connection_ids, contact_user_ids))
310 });
311
312 match response_data {
313 Ok((response, connection_ids, contact_user_ids)) => {
314 broadcast(request.sender_id, connection_ids, |conn_id| {
315 self.peer.send(
316 conn_id,
317 proto::AddProjectCollaborator {
318 project_id: project_id,
319 collaborator: Some(proto::Collaborator {
320 peer_id: request.sender_id.0,
321 replica_id: response.replica_id,
322 user_id: user_id.to_proto(),
323 }),
324 },
325 )
326 })
327 .await?;
328 self.peer.respond(request.receipt(), response).await?;
329 self.update_contacts_for_users(&contact_user_ids).await?;
330 }
331 Err(error) => {
332 self.peer
333 .respond_with_error(
334 request.receipt(),
335 proto::Error {
336 message: error.to_string(),
337 },
338 )
339 .await?;
340 }
341 }
342
343 Ok(())
344 }
345
346 async fn leave_project(
347 mut self: Arc<Server>,
348 request: TypedEnvelope<proto::LeaveProject>,
349 ) -> tide::Result<()> {
350 let sender_id = request.sender_id;
351 let project_id = request.payload.project_id;
352 let worktree = self.state_mut().leave_project(sender_id, project_id);
353 if let Some(worktree) = worktree {
354 broadcast(sender_id, worktree.connection_ids, |conn_id| {
355 self.peer.send(
356 conn_id,
357 proto::RemoveProjectCollaborator {
358 project_id,
359 peer_id: sender_id.0,
360 },
361 )
362 })
363 .await?;
364 self.update_contacts_for_users(&worktree.authorized_user_ids)
365 .await?;
366 }
367 Ok(())
368 }
369
370 async fn register_worktree(
371 mut self: Arc<Server>,
372 request: TypedEnvelope<proto::RegisterWorktree>,
373 ) -> tide::Result<()> {
374 let receipt = request.receipt();
375 let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
376
377 let mut contact_user_ids = HashSet::default();
378 contact_user_ids.insert(host_user_id);
379 for github_login in request.payload.authorized_logins {
380 match self.app_state.db.create_user(&github_login, false).await {
381 Ok(contact_user_id) => {
382 contact_user_ids.insert(contact_user_id);
383 }
384 Err(err) => {
385 let message = err.to_string();
386 self.peer
387 .respond_with_error(receipt, proto::Error { message })
388 .await?;
389 return Ok(());
390 }
391 }
392 }
393
394 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
395 let ok = self.state_mut().register_worktree(
396 request.payload.project_id,
397 request.payload.worktree_id,
398 Worktree {
399 authorized_user_ids: contact_user_ids.clone(),
400 root_name: request.payload.root_name,
401 share: None,
402 },
403 );
404
405 if ok {
406 self.peer.respond(receipt, proto::Ack {}).await?;
407 self.update_contacts_for_users(&contact_user_ids).await?;
408 } else {
409 self.peer
410 .respond_with_error(
411 receipt,
412 proto::Error {
413 message: NO_SUCH_PROJECT.to_string(),
414 },
415 )
416 .await?;
417 }
418
419 Ok(())
420 }
421
422 async fn unregister_worktree(
423 mut self: Arc<Server>,
424 request: TypedEnvelope<proto::UnregisterWorktree>,
425 ) -> tide::Result<()> {
426 let project_id = request.payload.project_id;
427 let worktree_id = request.payload.worktree_id;
428 let (worktree, guest_connection_ids) =
429 self.state_mut()
430 .unregister_worktree(project_id, worktree_id, request.sender_id)?;
431
432 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
433 self.peer.send(
434 conn_id,
435 proto::UnregisterWorktree {
436 project_id,
437 worktree_id,
438 },
439 )
440 })
441 .await?;
442 self.update_contacts_for_users(&worktree.authorized_user_ids)
443 .await?;
444 Ok(())
445 }
446
447 async fn share_worktree(
448 mut self: Arc<Server>,
449 mut request: TypedEnvelope<proto::ShareWorktree>,
450 ) -> tide::Result<()> {
451 let worktree = request
452 .payload
453 .worktree
454 .as_mut()
455 .ok_or_else(|| anyhow!("missing worktree"))?;
456 let entries = mem::take(&mut worktree.entries)
457 .into_iter()
458 .map(|entry| (entry.id, entry))
459 .collect();
460
461 let contact_user_ids = self.state_mut().share_worktree(
462 request.payload.project_id,
463 worktree.id,
464 request.sender_id,
465 entries,
466 );
467 if let Some(contact_user_ids) = contact_user_ids {
468 self.peer.respond(request.receipt(), proto::Ack {}).await?;
469 self.update_contacts_for_users(&contact_user_ids).await?;
470 } else {
471 self.peer
472 .respond_with_error(
473 request.receipt(),
474 proto::Error {
475 message: "no such worktree".to_string(),
476 },
477 )
478 .await?;
479 }
480 Ok(())
481 }
482
483 async fn update_worktree(
484 mut self: Arc<Server>,
485 request: TypedEnvelope<proto::UpdateWorktree>,
486 ) -> tide::Result<()> {
487 let connection_ids = self
488 .state_mut()
489 .update_worktree(
490 request.sender_id,
491 request.payload.project_id,
492 request.payload.worktree_id,
493 &request.payload.removed_entries,
494 &request.payload.updated_entries,
495 )
496 .ok_or_else(|| anyhow!("no such worktree"))?;
497
498 broadcast(request.sender_id, connection_ids, |connection_id| {
499 self.peer
500 .forward_send(request.sender_id, connection_id, request.payload.clone())
501 })
502 .await?;
503
504 Ok(())
505 }
506
507 async fn open_buffer(
508 self: Arc<Server>,
509 request: TypedEnvelope<proto::OpenBuffer>,
510 ) -> tide::Result<()> {
511 let receipt = request.receipt();
512 let host_connection_id = self
513 .state()
514 .read_project(request.payload.project_id, request.sender_id)
515 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
516 .host_connection_id;
517 let response = self
518 .peer
519 .forward_request(request.sender_id, host_connection_id, request.payload)
520 .await?;
521 self.peer.respond(receipt, response).await?;
522 Ok(())
523 }
524
525 async fn close_buffer(
526 self: Arc<Server>,
527 request: TypedEnvelope<proto::CloseBuffer>,
528 ) -> tide::Result<()> {
529 let host_connection_id = self
530 .state()
531 .read_project(request.payload.project_id, request.sender_id)
532 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
533 .host_connection_id;
534 self.peer
535 .forward_send(request.sender_id, host_connection_id, request.payload)
536 .await?;
537 Ok(())
538 }
539
540 async fn save_buffer(
541 self: Arc<Server>,
542 request: TypedEnvelope<proto::SaveBuffer>,
543 ) -> tide::Result<()> {
544 let host;
545 let guests;
546 {
547 let state = self.state();
548 let project = state
549 .read_project(request.payload.project_id, request.sender_id)
550 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
551 host = project.host_connection_id;
552 guests = project.guest_connection_ids()
553 }
554
555 let sender = request.sender_id;
556 let receipt = request.receipt();
557 let response = self
558 .peer
559 .forward_request(sender, host, request.payload.clone())
560 .await?;
561
562 broadcast(host, guests, |conn_id| {
563 let response = response.clone();
564 let peer = &self.peer;
565 async move {
566 if conn_id == sender {
567 peer.respond(receipt, response).await
568 } else {
569 peer.forward_send(host, conn_id, response).await
570 }
571 }
572 })
573 .await?;
574
575 Ok(())
576 }
577
578 async fn update_buffer(
579 self: Arc<Server>,
580 request: TypedEnvelope<proto::UpdateBuffer>,
581 ) -> tide::Result<()> {
582 let receiver_ids = self
583 .state()
584 .project_connection_ids(request.payload.project_id, request.sender_id)
585 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
586 broadcast(request.sender_id, receiver_ids, |connection_id| {
587 self.peer
588 .forward_send(request.sender_id, connection_id, request.payload.clone())
589 })
590 .await?;
591 self.peer.respond(request.receipt(), proto::Ack {}).await?;
592 Ok(())
593 }
594
595 async fn buffer_saved(
596 self: Arc<Server>,
597 request: TypedEnvelope<proto::BufferSaved>,
598 ) -> tide::Result<()> {
599 let receiver_ids = self
600 .state()
601 .project_connection_ids(request.payload.project_id, request.sender_id)
602 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
603 broadcast(request.sender_id, receiver_ids, |connection_id| {
604 self.peer
605 .forward_send(request.sender_id, connection_id, request.payload.clone())
606 })
607 .await?;
608 Ok(())
609 }
610
611 async fn get_channels(
612 self: Arc<Server>,
613 request: TypedEnvelope<proto::GetChannels>,
614 ) -> tide::Result<()> {
615 let user_id = self.state().user_id_for_connection(request.sender_id)?;
616 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
617 self.peer
618 .respond(
619 request.receipt(),
620 proto::GetChannelsResponse {
621 channels: channels
622 .into_iter()
623 .map(|chan| proto::Channel {
624 id: chan.id.to_proto(),
625 name: chan.name,
626 })
627 .collect(),
628 },
629 )
630 .await?;
631 Ok(())
632 }
633
634 async fn get_users(
635 self: Arc<Server>,
636 request: TypedEnvelope<proto::GetUsers>,
637 ) -> tide::Result<()> {
638 let receipt = request.receipt();
639 let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
640 let users = self
641 .app_state
642 .db
643 .get_users_by_ids(user_ids)
644 .await?
645 .into_iter()
646 .map(|user| proto::User {
647 id: user.id.to_proto(),
648 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
649 github_login: user.github_login,
650 })
651 .collect();
652 self.peer
653 .respond(receipt, proto::GetUsersResponse { users })
654 .await?;
655 Ok(())
656 }
657
658 async fn update_contacts_for_users<'a>(
659 self: &Arc<Server>,
660 user_ids: impl IntoIterator<Item = &'a UserId>,
661 ) -> tide::Result<()> {
662 let mut send_futures = Vec::new();
663
664 {
665 let state = self.state();
666 for user_id in user_ids {
667 let contacts = state.contacts_for_user(*user_id);
668 for connection_id in state.connection_ids_for_user(*user_id) {
669 send_futures.push(self.peer.send(
670 connection_id,
671 proto::UpdateContacts {
672 contacts: contacts.clone(),
673 },
674 ));
675 }
676 }
677 }
678 futures::future::try_join_all(send_futures).await?;
679
680 Ok(())
681 }
682
683 async fn join_channel(
684 mut self: Arc<Self>,
685 request: TypedEnvelope<proto::JoinChannel>,
686 ) -> tide::Result<()> {
687 let user_id = self.state().user_id_for_connection(request.sender_id)?;
688 let channel_id = ChannelId::from_proto(request.payload.channel_id);
689 if !self
690 .app_state
691 .db
692 .can_user_access_channel(user_id, channel_id)
693 .await?
694 {
695 Err(anyhow!("access denied"))?;
696 }
697
698 self.state_mut().join_channel(request.sender_id, channel_id);
699 let messages = self
700 .app_state
701 .db
702 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
703 .await?
704 .into_iter()
705 .map(|msg| proto::ChannelMessage {
706 id: msg.id.to_proto(),
707 body: msg.body,
708 timestamp: msg.sent_at.unix_timestamp() as u64,
709 sender_id: msg.sender_id.to_proto(),
710 nonce: Some(msg.nonce.as_u128().into()),
711 })
712 .collect::<Vec<_>>();
713 self.peer
714 .respond(
715 request.receipt(),
716 proto::JoinChannelResponse {
717 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
718 messages,
719 },
720 )
721 .await?;
722 Ok(())
723 }
724
725 async fn leave_channel(
726 mut self: Arc<Self>,
727 request: TypedEnvelope<proto::LeaveChannel>,
728 ) -> tide::Result<()> {
729 let user_id = self.state().user_id_for_connection(request.sender_id)?;
730 let channel_id = ChannelId::from_proto(request.payload.channel_id);
731 if !self
732 .app_state
733 .db
734 .can_user_access_channel(user_id, channel_id)
735 .await?
736 {
737 Err(anyhow!("access denied"))?;
738 }
739
740 self.state_mut()
741 .leave_channel(request.sender_id, channel_id);
742
743 Ok(())
744 }
745
746 async fn send_channel_message(
747 self: Arc<Self>,
748 request: TypedEnvelope<proto::SendChannelMessage>,
749 ) -> tide::Result<()> {
750 let receipt = request.receipt();
751 let channel_id = ChannelId::from_proto(request.payload.channel_id);
752 let user_id;
753 let connection_ids;
754 {
755 let state = self.state();
756 user_id = state.user_id_for_connection(request.sender_id)?;
757 if let Some(ids) = state.channel_connection_ids(channel_id) {
758 connection_ids = ids;
759 } else {
760 return Ok(());
761 }
762 }
763
764 // Validate the message body.
765 let body = request.payload.body.trim().to_string();
766 if body.len() > MAX_MESSAGE_LEN {
767 self.peer
768 .respond_with_error(
769 receipt,
770 proto::Error {
771 message: "message is too long".to_string(),
772 },
773 )
774 .await?;
775 return Ok(());
776 }
777 if body.is_empty() {
778 self.peer
779 .respond_with_error(
780 receipt,
781 proto::Error {
782 message: "message can't be blank".to_string(),
783 },
784 )
785 .await?;
786 return Ok(());
787 }
788
789 let timestamp = OffsetDateTime::now_utc();
790 let nonce = if let Some(nonce) = request.payload.nonce {
791 nonce
792 } else {
793 self.peer
794 .respond_with_error(
795 receipt,
796 proto::Error {
797 message: "nonce can't be blank".to_string(),
798 },
799 )
800 .await?;
801 return Ok(());
802 };
803
804 let message_id = self
805 .app_state
806 .db
807 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
808 .await?
809 .to_proto();
810 let message = proto::ChannelMessage {
811 sender_id: user_id.to_proto(),
812 id: message_id,
813 body,
814 timestamp: timestamp.unix_timestamp() as u64,
815 nonce: Some(nonce),
816 };
817 broadcast(request.sender_id, connection_ids, |conn_id| {
818 self.peer.send(
819 conn_id,
820 proto::ChannelMessageSent {
821 channel_id: channel_id.to_proto(),
822 message: Some(message.clone()),
823 },
824 )
825 })
826 .await?;
827 self.peer
828 .respond(
829 receipt,
830 proto::SendChannelMessageResponse {
831 message: Some(message),
832 },
833 )
834 .await?;
835 Ok(())
836 }
837
838 async fn get_channel_messages(
839 self: Arc<Self>,
840 request: TypedEnvelope<proto::GetChannelMessages>,
841 ) -> tide::Result<()> {
842 let user_id = self.state().user_id_for_connection(request.sender_id)?;
843 let channel_id = ChannelId::from_proto(request.payload.channel_id);
844 if !self
845 .app_state
846 .db
847 .can_user_access_channel(user_id, channel_id)
848 .await?
849 {
850 Err(anyhow!("access denied"))?;
851 }
852
853 let messages = self
854 .app_state
855 .db
856 .get_channel_messages(
857 channel_id,
858 MESSAGE_COUNT_PER_PAGE,
859 Some(MessageId::from_proto(request.payload.before_message_id)),
860 )
861 .await?
862 .into_iter()
863 .map(|msg| proto::ChannelMessage {
864 id: msg.id.to_proto(),
865 body: msg.body,
866 timestamp: msg.sent_at.unix_timestamp() as u64,
867 sender_id: msg.sender_id.to_proto(),
868 nonce: Some(msg.nonce.as_u128().into()),
869 })
870 .collect::<Vec<_>>();
871 self.peer
872 .respond(
873 request.receipt(),
874 proto::GetChannelMessagesResponse {
875 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
876 messages,
877 },
878 )
879 .await?;
880 Ok(())
881 }
882
883 fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
884 self.store.read()
885 }
886
887 fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
888 self.store.write()
889 }
890}
891
892pub async fn broadcast<F, T>(
893 sender_id: ConnectionId,
894 receiver_ids: Vec<ConnectionId>,
895 mut f: F,
896) -> anyhow::Result<()>
897where
898 F: FnMut(ConnectionId) -> T,
899 T: Future<Output = anyhow::Result<()>>,
900{
901 let futures = receiver_ids
902 .into_iter()
903 .filter(|id| *id != sender_id)
904 .map(|id| f(id));
905 futures::future::try_join_all(futures).await?;
906 Ok(())
907}
908
909pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
910 let server = Server::new(app.state().clone(), rpc.clone(), None);
911 app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
912 let server = server.clone();
913 async move {
914 const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
915
916 let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
917 let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
918 let upgrade_requested = connection_upgrade && upgrade_to_websocket;
919 let client_protocol_version: Option<u32> = request
920 .header("X-Zed-Protocol-Version")
921 .and_then(|v| v.as_str().parse().ok());
922
923 if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
924 return Ok(Response::new(StatusCode::UpgradeRequired));
925 }
926
927 let header = match request.header("Sec-Websocket-Key") {
928 Some(h) => h.as_str(),
929 None => return Err(anyhow!("expected sec-websocket-key"))?,
930 };
931
932 let user_id = process_auth_header(&request).await?;
933
934 let mut response = Response::new(StatusCode::SwitchingProtocols);
935 response.insert_header(UPGRADE, "websocket");
936 response.insert_header(CONNECTION, "Upgrade");
937 let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
938 response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
939 response.insert_header("Sec-Websocket-Version", "13");
940
941 let http_res: &mut tide::http::Response = response.as_mut();
942 let upgrade_receiver = http_res.recv_upgrade().await;
943 let addr = request.remote().unwrap_or("unknown").to_string();
944 task::spawn(async move {
945 if let Some(stream) = upgrade_receiver.await {
946 server
947 .handle_connection(
948 Connection::new(
949 WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
950 ),
951 addr,
952 user_id,
953 None,
954 )
955 .await;
956 }
957 });
958
959 Ok(response)
960 }
961 });
962}
963
964fn header_contains_ignore_case<T>(
965 request: &tide::Request<T>,
966 header_name: HeaderName,
967 value: &str,
968) -> bool {
969 request
970 .header(header_name)
971 .map(|h| {
972 h.as_str()
973 .split(',')
974 .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
975 })
976 .unwrap_or(false)
977}
978
979#[cfg(test)]
980mod tests {
981 use super::*;
982 use crate::{
983 auth,
984 db::{tests::TestDb, UserId},
985 github, AppState, Config,
986 };
987 use ::rpc::Peer;
988 use async_std::task;
989 use gpui::{ModelHandle, TestAppContext};
990 use parking_lot::Mutex;
991 use postage::{mpsc, watch};
992 use rpc::PeerId;
993 use serde_json::json;
994 use sqlx::types::time::OffsetDateTime;
995 use std::{
996 ops::Deref,
997 path::Path,
998 sync::{
999 atomic::{AtomicBool, Ordering::SeqCst},
1000 Arc,
1001 },
1002 time::Duration,
1003 };
1004 use zed::{
1005 client::{
1006 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1007 EstablishConnectionError, UserStore,
1008 },
1009 editor::{Editor, EditorSettings, Input, MultiBuffer},
1010 fs::{FakeFs, Fs as _},
1011 language::{
1012 tree_sitter_rust, Diagnostic, DiagnosticEntry, Language, LanguageConfig,
1013 LanguageRegistry, LanguageServerConfig, Point,
1014 },
1015 lsp,
1016 project::Project,
1017 };
1018
1019 #[gpui::test]
1020 async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1021 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1022 let lang_registry = Arc::new(LanguageRegistry::new());
1023 let fs = Arc::new(FakeFs::new());
1024 cx_a.foreground().forbid_parking();
1025
1026 // Connect to a server as 2 clients.
1027 let mut server = TestServer::start().await;
1028 let client_a = server.create_client(&mut cx_a, "user_a").await;
1029 let client_b = server.create_client(&mut cx_b, "user_b").await;
1030
1031 // Share a project as client A
1032 fs.insert_tree(
1033 "/a",
1034 json!({
1035 ".zed.toml": r#"collaborators = ["user_b"]"#,
1036 "a.txt": "a-contents",
1037 "b.txt": "b-contents",
1038 }),
1039 )
1040 .await;
1041 let project_a = cx_a.update(|cx| {
1042 Project::local(
1043 client_a.clone(),
1044 client_a.user_store.clone(),
1045 lang_registry.clone(),
1046 fs.clone(),
1047 cx,
1048 )
1049 });
1050 let worktree_a = project_a
1051 .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1052 .await
1053 .unwrap();
1054 worktree_a
1055 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1056 .await;
1057 let project_id = project_a
1058 .update(&mut cx_a, |project, _| project.next_remote_id())
1059 .await;
1060 project_a
1061 .update(&mut cx_a, |project, cx| project.share(cx))
1062 .await
1063 .unwrap();
1064
1065 // Join that project as client B, and see that a guest has joined as client A.
1066 let project_b = Project::remote(
1067 project_id,
1068 client_b.clone(),
1069 client_b.user_store.clone(),
1070 lang_registry.clone(),
1071 fs.clone(),
1072 &mut cx_b.to_async(),
1073 )
1074 .await
1075 .unwrap();
1076 let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1077
1078 let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1079 assert_eq!(
1080 project
1081 .collaborators()
1082 .get(&client_a.peer_id)
1083 .unwrap()
1084 .user
1085 .github_login,
1086 "user_a"
1087 );
1088 project.replica_id()
1089 });
1090 project_a
1091 .condition(&cx_a, |tree, _| {
1092 tree.collaborators()
1093 .get(&client_b.peer_id)
1094 .map_or(false, |collaborator| {
1095 collaborator.replica_id == replica_id_b
1096 && collaborator.user.github_login == "user_b"
1097 })
1098 })
1099 .await;
1100
1101 // Open the same file as client B and client A.
1102 let buffer_b = worktree_b
1103 .update(&mut cx_b, |worktree, cx| worktree.open_buffer("b.txt", cx))
1104 .await
1105 .unwrap();
1106 let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1107 buffer_b.read_with(&cx_b, |buf, cx| {
1108 assert_eq!(buf.read(cx).text(), "b-contents")
1109 });
1110 worktree_a.read_with(&cx_a, |tree, cx| assert!(tree.has_open_buffer("b.txt", cx)));
1111 let buffer_a = worktree_a
1112 .update(&mut cx_a, |tree, cx| tree.open_buffer("b.txt", cx))
1113 .await
1114 .unwrap();
1115
1116 let editor_b = cx_b.add_view(window_b, |cx| {
1117 Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), cx)
1118 });
1119 // TODO
1120 // // Create a selection set as client B and see that selection set as client A.
1121 // buffer_a
1122 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1123 // .await;
1124
1125 // Edit the buffer as client B and see that edit as client A.
1126 editor_b.update(&mut cx_b, |editor, cx| {
1127 editor.handle_input(&Input("ok, ".into()), cx)
1128 });
1129 buffer_a
1130 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1131 .await;
1132
1133 // TODO
1134 // // Remove the selection set as client B, see those selections disappear as client A.
1135 cx_b.update(move |_| drop(editor_b));
1136 // buffer_a
1137 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1138 // .await;
1139
1140 // Close the buffer as client A, see that the buffer is closed.
1141 cx_a.update(move |_| drop(buffer_a));
1142 worktree_a
1143 .condition(&cx_a, |tree, cx| !tree.has_open_buffer("b.txt", cx))
1144 .await;
1145
1146 // Dropping the worktree removes client B from client A's collaborators.
1147 cx_b.update(move |_| drop(worktree_b));
1148 project_a
1149 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1150 .await;
1151 }
1152
1153 #[gpui::test]
1154 async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1155 cx_b.update(zed::contacts_panel::init);
1156 let lang_registry = Arc::new(LanguageRegistry::new());
1157 let fs = Arc::new(FakeFs::new());
1158
1159 // Connect to a server as 2 clients.
1160 let mut server = TestServer::start().await;
1161 let client_a = server.create_client(&mut cx_a, "user_a").await;
1162 let client_b = server.create_client(&mut cx_b, "user_b").await;
1163 cx_a.foreground().forbid_parking();
1164
1165 // Share a project as client A
1166 fs.insert_tree(
1167 "/a",
1168 json!({
1169 ".zed.toml": r#"collaborators = ["user_b"]"#,
1170 "a.txt": "a-contents",
1171 "b.txt": "b-contents",
1172 }),
1173 )
1174 .await;
1175 let project_a = cx_a.update(|cx| {
1176 Project::local(
1177 client_a.clone(),
1178 client_a.user_store.clone(),
1179 lang_registry.clone(),
1180 fs.clone(),
1181 cx,
1182 )
1183 });
1184 let worktree_a = project_a
1185 .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1186 .await
1187 .unwrap();
1188 worktree_a
1189 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1190 .await;
1191 let project_id = project_a
1192 .update(&mut cx_a, |project, _| project.next_remote_id())
1193 .await;
1194 project_a
1195 .update(&mut cx_a, |project, cx| project.share(cx))
1196 .await
1197 .unwrap();
1198
1199 // Join that project as client B
1200 let project_b = Project::remote(
1201 project_id,
1202 client_b.clone(),
1203 client_b.user_store.clone(),
1204 lang_registry.clone(),
1205 fs.clone(),
1206 &mut cx_b.to_async(),
1207 )
1208 .await
1209 .unwrap();
1210
1211 let worktree_b = project_b.read_with(&cx_b, |p, _| p.worktrees()[0].clone());
1212 worktree_b
1213 .update(&mut cx_b, |tree, cx| tree.open_buffer("a.txt", cx))
1214 .await
1215 .unwrap();
1216
1217 project_a
1218 .update(&mut cx_a, |project, cx| project.unshare(cx))
1219 .await
1220 .unwrap();
1221 project_b
1222 .condition(&mut cx_b, |project, _| project.is_read_only())
1223 .await;
1224 }
1225
1226 #[gpui::test]
1227 async fn test_propagate_saves_and_fs_changes_in_shared_worktree(
1228 mut cx_a: TestAppContext,
1229 mut cx_b: TestAppContext,
1230 mut cx_c: TestAppContext,
1231 ) {
1232 cx_a.foreground().forbid_parking();
1233 let lang_registry = Arc::new(LanguageRegistry::new());
1234 let fs = Arc::new(FakeFs::new());
1235
1236 // Connect to a server as 3 clients.
1237 let mut server = TestServer::start().await;
1238 let client_a = server.create_client(&mut cx_a, "user_a").await;
1239 let client_b = server.create_client(&mut cx_b, "user_b").await;
1240 let client_c = server.create_client(&mut cx_c, "user_c").await;
1241
1242 // Share a worktree as client A.
1243 fs.insert_tree(
1244 "/a",
1245 json!({
1246 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1247 "file1": "",
1248 "file2": ""
1249 }),
1250 )
1251 .await;
1252 let project_a = cx_a.update(|cx| {
1253 Project::local(
1254 client_a.clone(),
1255 client_a.user_store.clone(),
1256 lang_registry.clone(),
1257 fs.clone(),
1258 cx,
1259 )
1260 });
1261 let worktree_a = project_a
1262 .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1263 .await
1264 .unwrap();
1265 worktree_a
1266 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1267 .await;
1268 let project_id = project_a
1269 .update(&mut cx_a, |project, _| project.next_remote_id())
1270 .await;
1271 project_a
1272 .update(&mut cx_a, |project, cx| project.share(cx))
1273 .await
1274 .unwrap();
1275
1276 // Join that worktree as clients B and C.
1277 let project_b = Project::remote(
1278 project_id,
1279 client_b.clone(),
1280 client_b.user_store.clone(),
1281 lang_registry.clone(),
1282 fs.clone(),
1283 &mut cx_b.to_async(),
1284 )
1285 .await
1286 .unwrap();
1287 let project_c = Project::remote(
1288 project_id,
1289 client_c.clone(),
1290 client_c.user_store.clone(),
1291 lang_registry.clone(),
1292 fs.clone(),
1293 &mut cx_c.to_async(),
1294 )
1295 .await
1296 .unwrap();
1297
1298 // Open and edit a buffer as both guests B and C.
1299 let worktree_b = project_b.read_with(&cx_b, |p, _| p.worktrees()[0].clone());
1300 let worktree_c = project_c.read_with(&cx_c, |p, _| p.worktrees()[0].clone());
1301 let buffer_b = worktree_b
1302 .update(&mut cx_b, |tree, cx| tree.open_buffer("file1", cx))
1303 .await
1304 .unwrap();
1305 let buffer_c = worktree_c
1306 .update(&mut cx_c, |tree, cx| tree.open_buffer("file1", cx))
1307 .await
1308 .unwrap();
1309 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1310 buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1311
1312 // Open and edit that buffer as the host.
1313 let buffer_a = worktree_a
1314 .update(&mut cx_a, |tree, cx| tree.open_buffer("file1", cx))
1315 .await
1316 .unwrap();
1317
1318 buffer_a
1319 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1320 .await;
1321 buffer_a.update(&mut cx_a, |buf, cx| {
1322 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1323 });
1324
1325 // Wait for edits to propagate
1326 buffer_a
1327 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1328 .await;
1329 buffer_b
1330 .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1331 .await;
1332 buffer_c
1333 .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1334 .await;
1335
1336 // Edit the buffer as the host and concurrently save as guest B.
1337 let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx).unwrap());
1338 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1339 save_b.await.unwrap();
1340 assert_eq!(
1341 fs.load("/a/file1".as_ref()).await.unwrap(),
1342 "hi-a, i-am-c, i-am-b, i-am-a"
1343 );
1344 buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1345 buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1346 buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1347
1348 // Make changes on host's file system, see those changes on the guests.
1349 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
1350 .await
1351 .unwrap();
1352 fs.insert_file(Path::new("/a/file4"), "4".into())
1353 .await
1354 .unwrap();
1355
1356 worktree_b
1357 .condition(&cx_b, |tree, _| tree.file_count() == 4)
1358 .await;
1359 worktree_c
1360 .condition(&cx_c, |tree, _| tree.file_count() == 4)
1361 .await;
1362 worktree_b.read_with(&cx_b, |tree, _| {
1363 assert_eq!(
1364 tree.paths()
1365 .map(|p| p.to_string_lossy())
1366 .collect::<Vec<_>>(),
1367 &[".zed.toml", "file1", "file3", "file4"]
1368 )
1369 });
1370 worktree_c.read_with(&cx_c, |tree, _| {
1371 assert_eq!(
1372 tree.paths()
1373 .map(|p| p.to_string_lossy())
1374 .collect::<Vec<_>>(),
1375 &[".zed.toml", "file1", "file3", "file4"]
1376 )
1377 });
1378 }
1379
1380 #[gpui::test]
1381 async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1382 cx_a.foreground().forbid_parking();
1383 let lang_registry = Arc::new(LanguageRegistry::new());
1384 let fs = Arc::new(FakeFs::new());
1385
1386 // Connect to a server as 2 clients.
1387 let mut server = TestServer::start().await;
1388 let client_a = server.create_client(&mut cx_a, "user_a").await;
1389 let client_b = server.create_client(&mut cx_b, "user_b").await;
1390
1391 // Share a project as client A
1392 fs.insert_tree(
1393 "/dir",
1394 json!({
1395 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1396 "a.txt": "a-contents",
1397 }),
1398 )
1399 .await;
1400
1401 let project_a = cx_a.update(|cx| {
1402 Project::local(
1403 client_a.clone(),
1404 client_a.user_store.clone(),
1405 lang_registry.clone(),
1406 fs.clone(),
1407 cx,
1408 )
1409 });
1410 let worktree_a = project_a
1411 .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1412 .await
1413 .unwrap();
1414 worktree_a
1415 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1416 .await;
1417 let project_id = project_a
1418 .update(&mut cx_a, |project, _| project.next_remote_id())
1419 .await;
1420 project_a
1421 .update(&mut cx_a, |project, cx| project.share(cx))
1422 .await
1423 .unwrap();
1424
1425 // Join that project as client B
1426 let project_b = Project::remote(
1427 project_id,
1428 client_b.clone(),
1429 client_b.user_store.clone(),
1430 lang_registry.clone(),
1431 fs.clone(),
1432 &mut cx_b.to_async(),
1433 )
1434 .await
1435 .unwrap();
1436 let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1437
1438 // Open a buffer as client B
1439 let buffer_b = worktree_b
1440 .update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx))
1441 .await
1442 .unwrap();
1443 let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1444
1445 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1446 buffer_b.read_with(&cx_b, |buf, _| {
1447 assert!(buf.is_dirty());
1448 assert!(!buf.has_conflict());
1449 });
1450
1451 buffer_b
1452 .update(&mut cx_b, |buf, cx| buf.save(cx))
1453 .unwrap()
1454 .await
1455 .unwrap();
1456 worktree_b
1457 .condition(&cx_b, |_, cx| {
1458 buffer_b.read(cx).file().unwrap().mtime() != mtime
1459 })
1460 .await;
1461 buffer_b.read_with(&cx_b, |buf, _| {
1462 assert!(!buf.is_dirty());
1463 assert!(!buf.has_conflict());
1464 });
1465
1466 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1467 buffer_b.read_with(&cx_b, |buf, _| {
1468 assert!(buf.is_dirty());
1469 assert!(!buf.has_conflict());
1470 });
1471 }
1472
1473 #[gpui::test]
1474 async fn test_editing_while_guest_opens_buffer(
1475 mut cx_a: TestAppContext,
1476 mut cx_b: TestAppContext,
1477 ) {
1478 cx_a.foreground().forbid_parking();
1479 let lang_registry = Arc::new(LanguageRegistry::new());
1480 let fs = Arc::new(FakeFs::new());
1481
1482 // Connect to a server as 2 clients.
1483 let mut server = TestServer::start().await;
1484 let client_a = server.create_client(&mut cx_a, "user_a").await;
1485 let client_b = server.create_client(&mut cx_b, "user_b").await;
1486
1487 // Share a project as client A
1488 fs.insert_tree(
1489 "/dir",
1490 json!({
1491 ".zed.toml": r#"collaborators = ["user_b"]"#,
1492 "a.txt": "a-contents",
1493 }),
1494 )
1495 .await;
1496 let project_a = cx_a.update(|cx| {
1497 Project::local(
1498 client_a.clone(),
1499 client_a.user_store.clone(),
1500 lang_registry.clone(),
1501 fs.clone(),
1502 cx,
1503 )
1504 });
1505 let worktree_a = project_a
1506 .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1507 .await
1508 .unwrap();
1509 worktree_a
1510 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1511 .await;
1512 let project_id = project_a
1513 .update(&mut cx_a, |project, _| project.next_remote_id())
1514 .await;
1515 project_a
1516 .update(&mut cx_a, |project, cx| project.share(cx))
1517 .await
1518 .unwrap();
1519
1520 // Join that project as client B
1521 let project_b = Project::remote(
1522 project_id,
1523 client_b.clone(),
1524 client_b.user_store.clone(),
1525 lang_registry.clone(),
1526 fs.clone(),
1527 &mut cx_b.to_async(),
1528 )
1529 .await
1530 .unwrap();
1531 let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1532
1533 // Open a buffer as client A
1534 let buffer_a = worktree_a
1535 .update(&mut cx_a, |tree, cx| tree.open_buffer("a.txt", cx))
1536 .await
1537 .unwrap();
1538
1539 // Start opening the same buffer as client B
1540 let buffer_b = cx_b
1541 .background()
1542 .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1543 task::yield_now().await;
1544
1545 // Edit the buffer as client A while client B is still opening it.
1546 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1547
1548 let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1549 let buffer_b = buffer_b.await.unwrap();
1550 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1551 }
1552
1553 #[gpui::test]
1554 async fn test_leaving_worktree_while_opening_buffer(
1555 mut cx_a: TestAppContext,
1556 mut cx_b: TestAppContext,
1557 ) {
1558 cx_a.foreground().forbid_parking();
1559 let lang_registry = Arc::new(LanguageRegistry::new());
1560 let fs = Arc::new(FakeFs::new());
1561
1562 // Connect to a server as 2 clients.
1563 let mut server = TestServer::start().await;
1564 let client_a = server.create_client(&mut cx_a, "user_a").await;
1565 let client_b = server.create_client(&mut cx_b, "user_b").await;
1566
1567 // Share a project as client A
1568 fs.insert_tree(
1569 "/dir",
1570 json!({
1571 ".zed.toml": r#"collaborators = ["user_b"]"#,
1572 "a.txt": "a-contents",
1573 }),
1574 )
1575 .await;
1576 let project_a = cx_a.update(|cx| {
1577 Project::local(
1578 client_a.clone(),
1579 client_a.user_store.clone(),
1580 lang_registry.clone(),
1581 fs.clone(),
1582 cx,
1583 )
1584 });
1585 let worktree_a = project_a
1586 .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1587 .await
1588 .unwrap();
1589 worktree_a
1590 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1591 .await;
1592 let project_id = project_a
1593 .update(&mut cx_a, |project, _| project.next_remote_id())
1594 .await;
1595 project_a
1596 .update(&mut cx_a, |project, cx| project.share(cx))
1597 .await
1598 .unwrap();
1599
1600 // Join that project as client B
1601 let project_b = Project::remote(
1602 project_id,
1603 client_b.clone(),
1604 client_b.user_store.clone(),
1605 lang_registry.clone(),
1606 fs.clone(),
1607 &mut cx_b.to_async(),
1608 )
1609 .await
1610 .unwrap();
1611 let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1612
1613 // See that a guest has joined as client A.
1614 project_a
1615 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1616 .await;
1617
1618 // Begin opening a buffer as client B, but leave the project before the open completes.
1619 let buffer_b = cx_b
1620 .background()
1621 .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1622 cx_b.update(|_| drop(worktree_b));
1623 drop(buffer_b);
1624
1625 // See that the guest has left.
1626 project_a
1627 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1628 .await;
1629 }
1630
1631 #[gpui::test]
1632 async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1633 cx_a.foreground().forbid_parking();
1634 let lang_registry = Arc::new(LanguageRegistry::new());
1635 let fs = Arc::new(FakeFs::new());
1636
1637 // Connect to a server as 2 clients.
1638 let mut server = TestServer::start().await;
1639 let client_a = server.create_client(&mut cx_a, "user_a").await;
1640 let client_b = server.create_client(&mut cx_b, "user_b").await;
1641
1642 // Share a project as client A
1643 fs.insert_tree(
1644 "/a",
1645 json!({
1646 ".zed.toml": r#"collaborators = ["user_b"]"#,
1647 "a.txt": "a-contents",
1648 "b.txt": "b-contents",
1649 }),
1650 )
1651 .await;
1652 let project_a = cx_a.update(|cx| {
1653 Project::local(
1654 client_a.clone(),
1655 client_a.user_store.clone(),
1656 lang_registry.clone(),
1657 fs.clone(),
1658 cx,
1659 )
1660 });
1661 let worktree_a = project_a
1662 .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1663 .await
1664 .unwrap();
1665 worktree_a
1666 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1667 .await;
1668 let project_id = project_a
1669 .update(&mut cx_a, |project, _| project.next_remote_id())
1670 .await;
1671 project_a
1672 .update(&mut cx_a, |project, cx| project.share(cx))
1673 .await
1674 .unwrap();
1675
1676 // Join that project as client B
1677 let _project_b = Project::remote(
1678 project_id,
1679 client_b.clone(),
1680 client_b.user_store.clone(),
1681 lang_registry.clone(),
1682 fs.clone(),
1683 &mut cx_b.to_async(),
1684 )
1685 .await
1686 .unwrap();
1687
1688 // See that a guest has joined as client A.
1689 project_a
1690 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1691 .await;
1692
1693 // Drop client B's connection and ensure client A observes client B leaving the worktree.
1694 client_b.disconnect(&cx_b.to_async()).await.unwrap();
1695 project_a
1696 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1697 .await;
1698 }
1699
1700 #[gpui::test]
1701 async fn test_collaborating_with_diagnostics(
1702 mut cx_a: TestAppContext,
1703 mut cx_b: TestAppContext,
1704 ) {
1705 cx_a.foreground().forbid_parking();
1706 let mut lang_registry = Arc::new(LanguageRegistry::new());
1707 let fs = Arc::new(FakeFs::new());
1708
1709 // Set up a fake language server.
1710 let (language_server_config, mut fake_language_server) =
1711 LanguageServerConfig::fake(cx_a.background()).await;
1712 Arc::get_mut(&mut lang_registry)
1713 .unwrap()
1714 .add(Arc::new(Language::new(
1715 LanguageConfig {
1716 name: "Rust".to_string(),
1717 path_suffixes: vec!["rs".to_string()],
1718 language_server: Some(language_server_config),
1719 ..Default::default()
1720 },
1721 Some(tree_sitter_rust::language()),
1722 )));
1723
1724 // Connect to a server as 2 clients.
1725 let mut server = TestServer::start().await;
1726 let client_a = server.create_client(&mut cx_a, "user_a").await;
1727 let client_b = server.create_client(&mut cx_b, "user_b").await;
1728
1729 // Share a project as client A
1730 fs.insert_tree(
1731 "/a",
1732 json!({
1733 ".zed.toml": r#"collaborators = ["user_b"]"#,
1734 "a.rs": "let one = two",
1735 "other.rs": "",
1736 }),
1737 )
1738 .await;
1739 let project_a = cx_a.update(|cx| {
1740 Project::local(
1741 client_a.clone(),
1742 client_a.user_store.clone(),
1743 lang_registry.clone(),
1744 fs.clone(),
1745 cx,
1746 )
1747 });
1748 let worktree_a = project_a
1749 .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1750 .await
1751 .unwrap();
1752 worktree_a
1753 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1754 .await;
1755 let project_id = project_a
1756 .update(&mut cx_a, |project, _| project.next_remote_id())
1757 .await;
1758 project_a
1759 .update(&mut cx_a, |project, cx| project.share(cx))
1760 .await
1761 .unwrap();
1762
1763 // Cause the language server to start.
1764 let _ = cx_a
1765 .background()
1766 .spawn(worktree_a.update(&mut cx_a, |worktree, cx| {
1767 worktree.open_buffer("other.rs", cx)
1768 }))
1769 .await
1770 .unwrap();
1771
1772 // Simulate a language server reporting errors for a file.
1773 fake_language_server
1774 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1775 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1776 version: None,
1777 diagnostics: vec![
1778 lsp::Diagnostic {
1779 severity: Some(lsp::DiagnosticSeverity::ERROR),
1780 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1781 message: "message 1".to_string(),
1782 ..Default::default()
1783 },
1784 lsp::Diagnostic {
1785 severity: Some(lsp::DiagnosticSeverity::WARNING),
1786 range: lsp::Range::new(
1787 lsp::Position::new(0, 10),
1788 lsp::Position::new(0, 13),
1789 ),
1790 message: "message 2".to_string(),
1791 ..Default::default()
1792 },
1793 ],
1794 })
1795 .await;
1796
1797 // Join the worktree as client B.
1798 let project_b = Project::remote(
1799 project_id,
1800 client_b.clone(),
1801 client_b.user_store.clone(),
1802 lang_registry.clone(),
1803 fs.clone(),
1804 &mut cx_b.to_async(),
1805 )
1806 .await
1807 .unwrap();
1808 let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1809
1810 // Open the file with the errors.
1811 let buffer_b = cx_b
1812 .background()
1813 .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.rs", cx)))
1814 .await
1815 .unwrap();
1816
1817 buffer_b.read_with(&cx_b, |buffer, _| {
1818 assert_eq!(
1819 buffer
1820 .snapshot()
1821 .diagnostics_in_range::<_, Point>(0..buffer.len())
1822 .collect::<Vec<_>>(),
1823 &[
1824 DiagnosticEntry {
1825 range: Point::new(0, 4)..Point::new(0, 7),
1826 diagnostic: Diagnostic {
1827 group_id: 0,
1828 message: "message 1".to_string(),
1829 severity: lsp::DiagnosticSeverity::ERROR,
1830 is_primary: true,
1831 ..Default::default()
1832 }
1833 },
1834 DiagnosticEntry {
1835 range: Point::new(0, 10)..Point::new(0, 13),
1836 diagnostic: Diagnostic {
1837 group_id: 1,
1838 severity: lsp::DiagnosticSeverity::WARNING,
1839 message: "message 2".to_string(),
1840 is_primary: true,
1841 ..Default::default()
1842 }
1843 }
1844 ]
1845 );
1846 });
1847 }
1848
1849 #[gpui::test]
1850 async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1851 cx_a.foreground().forbid_parking();
1852
1853 // Connect to a server as 2 clients.
1854 let mut server = TestServer::start().await;
1855 let client_a = server.create_client(&mut cx_a, "user_a").await;
1856 let client_b = server.create_client(&mut cx_b, "user_b").await;
1857
1858 // Create an org that includes these 2 users.
1859 let db = &server.app_state.db;
1860 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1861 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
1862 .await
1863 .unwrap();
1864 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
1865 .await
1866 .unwrap();
1867
1868 // Create a channel that includes all the users.
1869 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1870 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
1871 .await
1872 .unwrap();
1873 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
1874 .await
1875 .unwrap();
1876 db.create_channel_message(
1877 channel_id,
1878 client_b.current_user_id(&cx_b),
1879 "hello A, it's B.",
1880 OffsetDateTime::now_utc(),
1881 1,
1882 )
1883 .await
1884 .unwrap();
1885
1886 let channels_a = cx_a
1887 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
1888 channels_a
1889 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
1890 .await;
1891 channels_a.read_with(&cx_a, |list, _| {
1892 assert_eq!(
1893 list.available_channels().unwrap(),
1894 &[ChannelDetails {
1895 id: channel_id.to_proto(),
1896 name: "test-channel".to_string()
1897 }]
1898 )
1899 });
1900 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
1901 this.get_channel(channel_id.to_proto(), cx).unwrap()
1902 });
1903 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
1904 channel_a
1905 .condition(&cx_a, |channel, _| {
1906 channel_messages(channel)
1907 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1908 })
1909 .await;
1910
1911 let channels_b = cx_b
1912 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
1913 channels_b
1914 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
1915 .await;
1916 channels_b.read_with(&cx_b, |list, _| {
1917 assert_eq!(
1918 list.available_channels().unwrap(),
1919 &[ChannelDetails {
1920 id: channel_id.to_proto(),
1921 name: "test-channel".to_string()
1922 }]
1923 )
1924 });
1925
1926 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
1927 this.get_channel(channel_id.to_proto(), cx).unwrap()
1928 });
1929 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
1930 channel_b
1931 .condition(&cx_b, |channel, _| {
1932 channel_messages(channel)
1933 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1934 })
1935 .await;
1936
1937 channel_a
1938 .update(&mut cx_a, |channel, cx| {
1939 channel
1940 .send_message("oh, hi B.".to_string(), cx)
1941 .unwrap()
1942 .detach();
1943 let task = channel.send_message("sup".to_string(), cx).unwrap();
1944 assert_eq!(
1945 channel_messages(channel),
1946 &[
1947 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
1948 ("user_a".to_string(), "oh, hi B.".to_string(), true),
1949 ("user_a".to_string(), "sup".to_string(), true)
1950 ]
1951 );
1952 task
1953 })
1954 .await
1955 .unwrap();
1956
1957 channel_b
1958 .condition(&cx_b, |channel, _| {
1959 channel_messages(channel)
1960 == [
1961 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
1962 ("user_a".to_string(), "oh, hi B.".to_string(), false),
1963 ("user_a".to_string(), "sup".to_string(), false),
1964 ]
1965 })
1966 .await;
1967
1968 assert_eq!(
1969 server
1970 .state()
1971 .await
1972 .channel(channel_id)
1973 .unwrap()
1974 .connection_ids
1975 .len(),
1976 2
1977 );
1978 cx_b.update(|_| drop(channel_b));
1979 server
1980 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
1981 .await;
1982
1983 cx_a.update(|_| drop(channel_a));
1984 server
1985 .condition(|state| state.channel(channel_id).is_none())
1986 .await;
1987 }
1988
1989 #[gpui::test]
1990 async fn test_chat_message_validation(mut cx_a: TestAppContext) {
1991 cx_a.foreground().forbid_parking();
1992
1993 let mut server = TestServer::start().await;
1994 let client_a = server.create_client(&mut cx_a, "user_a").await;
1995
1996 let db = &server.app_state.db;
1997 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1998 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1999 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2000 .await
2001 .unwrap();
2002 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2003 .await
2004 .unwrap();
2005
2006 let channels_a = cx_a
2007 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2008 channels_a
2009 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2010 .await;
2011 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2012 this.get_channel(channel_id.to_proto(), cx).unwrap()
2013 });
2014
2015 // Messages aren't allowed to be too long.
2016 channel_a
2017 .update(&mut cx_a, |channel, cx| {
2018 let long_body = "this is long.\n".repeat(1024);
2019 channel.send_message(long_body, cx).unwrap()
2020 })
2021 .await
2022 .unwrap_err();
2023
2024 // Messages aren't allowed to be blank.
2025 channel_a.update(&mut cx_a, |channel, cx| {
2026 channel.send_message(String::new(), cx).unwrap_err()
2027 });
2028
2029 // Leading and trailing whitespace are trimmed.
2030 channel_a
2031 .update(&mut cx_a, |channel, cx| {
2032 channel
2033 .send_message("\n surrounded by whitespace \n".to_string(), cx)
2034 .unwrap()
2035 })
2036 .await
2037 .unwrap();
2038 assert_eq!(
2039 db.get_channel_messages(channel_id, 10, None)
2040 .await
2041 .unwrap()
2042 .iter()
2043 .map(|m| &m.body)
2044 .collect::<Vec<_>>(),
2045 &["surrounded by whitespace"]
2046 );
2047 }
2048
2049 #[gpui::test]
2050 async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2051 cx_a.foreground().forbid_parking();
2052
2053 // Connect to a server as 2 clients.
2054 let mut server = TestServer::start().await;
2055 let client_a = server.create_client(&mut cx_a, "user_a").await;
2056 let client_b = server.create_client(&mut cx_b, "user_b").await;
2057 let mut status_b = client_b.status();
2058
2059 // Create an org that includes these 2 users.
2060 let db = &server.app_state.db;
2061 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2062 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2063 .await
2064 .unwrap();
2065 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2066 .await
2067 .unwrap();
2068
2069 // Create a channel that includes all the users.
2070 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2071 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2072 .await
2073 .unwrap();
2074 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2075 .await
2076 .unwrap();
2077 db.create_channel_message(
2078 channel_id,
2079 client_b.current_user_id(&cx_b),
2080 "hello A, it's B.",
2081 OffsetDateTime::now_utc(),
2082 2,
2083 )
2084 .await
2085 .unwrap();
2086
2087 let channels_a = cx_a
2088 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2089 channels_a
2090 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2091 .await;
2092
2093 channels_a.read_with(&cx_a, |list, _| {
2094 assert_eq!(
2095 list.available_channels().unwrap(),
2096 &[ChannelDetails {
2097 id: channel_id.to_proto(),
2098 name: "test-channel".to_string()
2099 }]
2100 )
2101 });
2102 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2103 this.get_channel(channel_id.to_proto(), cx).unwrap()
2104 });
2105 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2106 channel_a
2107 .condition(&cx_a, |channel, _| {
2108 channel_messages(channel)
2109 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2110 })
2111 .await;
2112
2113 let channels_b = cx_b
2114 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2115 channels_b
2116 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2117 .await;
2118 channels_b.read_with(&cx_b, |list, _| {
2119 assert_eq!(
2120 list.available_channels().unwrap(),
2121 &[ChannelDetails {
2122 id: channel_id.to_proto(),
2123 name: "test-channel".to_string()
2124 }]
2125 )
2126 });
2127
2128 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2129 this.get_channel(channel_id.to_proto(), cx).unwrap()
2130 });
2131 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2132 channel_b
2133 .condition(&cx_b, |channel, _| {
2134 channel_messages(channel)
2135 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2136 })
2137 .await;
2138
2139 // Disconnect client B, ensuring we can still access its cached channel data.
2140 server.forbid_connections();
2141 server.disconnect_client(client_b.current_user_id(&cx_b));
2142 while !matches!(
2143 status_b.recv().await,
2144 Some(client::Status::ReconnectionError { .. })
2145 ) {}
2146
2147 channels_b.read_with(&cx_b, |channels, _| {
2148 assert_eq!(
2149 channels.available_channels().unwrap(),
2150 [ChannelDetails {
2151 id: channel_id.to_proto(),
2152 name: "test-channel".to_string()
2153 }]
2154 )
2155 });
2156 channel_b.read_with(&cx_b, |channel, _| {
2157 assert_eq!(
2158 channel_messages(channel),
2159 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2160 )
2161 });
2162
2163 // Send a message from client B while it is disconnected.
2164 channel_b
2165 .update(&mut cx_b, |channel, cx| {
2166 let task = channel
2167 .send_message("can you see this?".to_string(), cx)
2168 .unwrap();
2169 assert_eq!(
2170 channel_messages(channel),
2171 &[
2172 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2173 ("user_b".to_string(), "can you see this?".to_string(), true)
2174 ]
2175 );
2176 task
2177 })
2178 .await
2179 .unwrap_err();
2180
2181 // Send a message from client A while B is disconnected.
2182 channel_a
2183 .update(&mut cx_a, |channel, cx| {
2184 channel
2185 .send_message("oh, hi B.".to_string(), cx)
2186 .unwrap()
2187 .detach();
2188 let task = channel.send_message("sup".to_string(), cx).unwrap();
2189 assert_eq!(
2190 channel_messages(channel),
2191 &[
2192 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2193 ("user_a".to_string(), "oh, hi B.".to_string(), true),
2194 ("user_a".to_string(), "sup".to_string(), true)
2195 ]
2196 );
2197 task
2198 })
2199 .await
2200 .unwrap();
2201
2202 // Give client B a chance to reconnect.
2203 server.allow_connections();
2204 cx_b.foreground().advance_clock(Duration::from_secs(10));
2205
2206 // Verify that B sees the new messages upon reconnection, as well as the message client B
2207 // sent while offline.
2208 channel_b
2209 .condition(&cx_b, |channel, _| {
2210 channel_messages(channel)
2211 == [
2212 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2213 ("user_a".to_string(), "oh, hi B.".to_string(), false),
2214 ("user_a".to_string(), "sup".to_string(), false),
2215 ("user_b".to_string(), "can you see this?".to_string(), false),
2216 ]
2217 })
2218 .await;
2219
2220 // Ensure client A and B can communicate normally after reconnection.
2221 channel_a
2222 .update(&mut cx_a, |channel, cx| {
2223 channel.send_message("you online?".to_string(), cx).unwrap()
2224 })
2225 .await
2226 .unwrap();
2227 channel_b
2228 .condition(&cx_b, |channel, _| {
2229 channel_messages(channel)
2230 == [
2231 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2232 ("user_a".to_string(), "oh, hi B.".to_string(), false),
2233 ("user_a".to_string(), "sup".to_string(), false),
2234 ("user_b".to_string(), "can you see this?".to_string(), false),
2235 ("user_a".to_string(), "you online?".to_string(), false),
2236 ]
2237 })
2238 .await;
2239
2240 channel_b
2241 .update(&mut cx_b, |channel, cx| {
2242 channel.send_message("yep".to_string(), cx).unwrap()
2243 })
2244 .await
2245 .unwrap();
2246 channel_a
2247 .condition(&cx_a, |channel, _| {
2248 channel_messages(channel)
2249 == [
2250 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2251 ("user_a".to_string(), "oh, hi B.".to_string(), false),
2252 ("user_a".to_string(), "sup".to_string(), false),
2253 ("user_b".to_string(), "can you see this?".to_string(), false),
2254 ("user_a".to_string(), "you online?".to_string(), false),
2255 ("user_b".to_string(), "yep".to_string(), false),
2256 ]
2257 })
2258 .await;
2259 }
2260
2261 #[gpui::test]
2262 async fn test_contacts(
2263 mut cx_a: TestAppContext,
2264 mut cx_b: TestAppContext,
2265 mut cx_c: TestAppContext,
2266 ) {
2267 cx_a.foreground().forbid_parking();
2268 let lang_registry = Arc::new(LanguageRegistry::new());
2269 let fs = Arc::new(FakeFs::new());
2270
2271 // Connect to a server as 3 clients.
2272 let mut server = TestServer::start().await;
2273 let client_a = server.create_client(&mut cx_a, "user_a").await;
2274 let client_b = server.create_client(&mut cx_b, "user_b").await;
2275 let client_c = server.create_client(&mut cx_c, "user_c").await;
2276
2277 // Share a worktree as client A.
2278 fs.insert_tree(
2279 "/a",
2280 json!({
2281 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2282 }),
2283 )
2284 .await;
2285
2286 let project_a = cx_a.update(|cx| {
2287 Project::local(
2288 client_a.clone(),
2289 client_a.user_store.clone(),
2290 lang_registry.clone(),
2291 fs.clone(),
2292 cx,
2293 )
2294 });
2295 let worktree_a = project_a
2296 .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
2297 .await
2298 .unwrap();
2299 worktree_a
2300 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2301 .await;
2302
2303 client_a
2304 .user_store
2305 .condition(&cx_a, |user_store, _| {
2306 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2307 })
2308 .await;
2309 client_b
2310 .user_store
2311 .condition(&cx_b, |user_store, _| {
2312 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2313 })
2314 .await;
2315 client_c
2316 .user_store
2317 .condition(&cx_c, |user_store, _| {
2318 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2319 })
2320 .await;
2321
2322 let project_id = project_a
2323 .update(&mut cx_a, |project, _| project.next_remote_id())
2324 .await;
2325 project_a
2326 .update(&mut cx_a, |project, cx| project.share(cx))
2327 .await
2328 .unwrap();
2329
2330 let _project_b = Project::remote(
2331 project_id,
2332 client_b.clone(),
2333 client_b.user_store.clone(),
2334 lang_registry.clone(),
2335 fs.clone(),
2336 &mut cx_b.to_async(),
2337 )
2338 .await
2339 .unwrap();
2340
2341 client_a
2342 .user_store
2343 .condition(&cx_a, |user_store, _| {
2344 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2345 })
2346 .await;
2347 client_b
2348 .user_store
2349 .condition(&cx_b, |user_store, _| {
2350 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2351 })
2352 .await;
2353 client_c
2354 .user_store
2355 .condition(&cx_c, |user_store, _| {
2356 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2357 })
2358 .await;
2359
2360 project_a
2361 .condition(&cx_a, |project, _| {
2362 project.collaborators().contains_key(&client_b.peer_id)
2363 })
2364 .await;
2365
2366 cx_a.update(move |_| drop(project_a));
2367 client_a
2368 .user_store
2369 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
2370 .await;
2371 client_b
2372 .user_store
2373 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
2374 .await;
2375 client_c
2376 .user_store
2377 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
2378 .await;
2379
2380 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
2381 user_store
2382 .contacts()
2383 .iter()
2384 .map(|contact| {
2385 let worktrees = contact
2386 .projects
2387 .iter()
2388 .map(|p| {
2389 (
2390 p.worktree_root_names[0].as_str(),
2391 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
2392 )
2393 })
2394 .collect();
2395 (contact.user.github_login.as_str(), worktrees)
2396 })
2397 .collect()
2398 }
2399 }
2400
2401 struct TestServer {
2402 peer: Arc<Peer>,
2403 app_state: Arc<AppState>,
2404 server: Arc<Server>,
2405 notifications: mpsc::Receiver<()>,
2406 connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
2407 forbid_connections: Arc<AtomicBool>,
2408 _test_db: TestDb,
2409 }
2410
2411 impl TestServer {
2412 async fn start() -> Self {
2413 let test_db = TestDb::new();
2414 let app_state = Self::build_app_state(&test_db).await;
2415 let peer = Peer::new();
2416 let notifications = mpsc::channel(128);
2417 let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
2418 Self {
2419 peer,
2420 app_state,
2421 server,
2422 notifications: notifications.1,
2423 connection_killers: Default::default(),
2424 forbid_connections: Default::default(),
2425 _test_db: test_db,
2426 }
2427 }
2428
2429 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
2430 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
2431 let client_name = name.to_string();
2432 let mut client = Client::new();
2433 let server = self.server.clone();
2434 let connection_killers = self.connection_killers.clone();
2435 let forbid_connections = self.forbid_connections.clone();
2436 let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
2437
2438 Arc::get_mut(&mut client)
2439 .unwrap()
2440 .override_authenticate(move |cx| {
2441 cx.spawn(|_| async move {
2442 let access_token = "the-token".to_string();
2443 Ok(Credentials {
2444 user_id: user_id.0 as u64,
2445 access_token,
2446 })
2447 })
2448 })
2449 .override_establish_connection(move |credentials, cx| {
2450 assert_eq!(credentials.user_id, user_id.0 as u64);
2451 assert_eq!(credentials.access_token, "the-token");
2452
2453 let server = server.clone();
2454 let connection_killers = connection_killers.clone();
2455 let forbid_connections = forbid_connections.clone();
2456 let client_name = client_name.clone();
2457 let connection_id_tx = connection_id_tx.clone();
2458 cx.spawn(move |cx| async move {
2459 if forbid_connections.load(SeqCst) {
2460 Err(EstablishConnectionError::other(anyhow!(
2461 "server is forbidding connections"
2462 )))
2463 } else {
2464 let (client_conn, server_conn, kill_conn) = Connection::in_memory();
2465 connection_killers.lock().insert(user_id, kill_conn);
2466 cx.background()
2467 .spawn(server.handle_connection(
2468 server_conn,
2469 client_name,
2470 user_id,
2471 Some(connection_id_tx),
2472 ))
2473 .detach();
2474 Ok(client_conn)
2475 }
2476 })
2477 });
2478
2479 let http = FakeHttpClient::new(|_| async move { Ok(surf::http::Response::new(404)) });
2480 client
2481 .authenticate_and_connect(&cx.to_async())
2482 .await
2483 .unwrap();
2484
2485 let peer_id = PeerId(connection_id_rx.recv().await.unwrap().0);
2486 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
2487 let mut authed_user =
2488 user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
2489 while authed_user.recv().await.unwrap().is_none() {}
2490
2491 TestClient {
2492 client,
2493 peer_id,
2494 user_store,
2495 }
2496 }
2497
2498 fn disconnect_client(&self, user_id: UserId) {
2499 if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
2500 let _ = kill_conn.try_send(Some(()));
2501 }
2502 }
2503
2504 fn forbid_connections(&self) {
2505 self.forbid_connections.store(true, SeqCst);
2506 }
2507
2508 fn allow_connections(&self) {
2509 self.forbid_connections.store(false, SeqCst);
2510 }
2511
2512 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
2513 let mut config = Config::default();
2514 config.session_secret = "a".repeat(32);
2515 config.database_url = test_db.url.clone();
2516 let github_client = github::AppClient::test();
2517 Arc::new(AppState {
2518 db: test_db.db().clone(),
2519 handlebars: Default::default(),
2520 auth_client: auth::build_client("", ""),
2521 repo_client: github::RepoClient::test(&github_client),
2522 github_client,
2523 config,
2524 })
2525 }
2526
2527 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
2528 self.server.store.read()
2529 }
2530
2531 async fn condition<F>(&mut self, mut predicate: F)
2532 where
2533 F: FnMut(&Store) -> bool,
2534 {
2535 async_std::future::timeout(Duration::from_millis(500), async {
2536 while !(predicate)(&*self.server.store.read()) {
2537 self.notifications.recv().await;
2538 }
2539 })
2540 .await
2541 .expect("condition timed out");
2542 }
2543 }
2544
2545 impl Drop for TestServer {
2546 fn drop(&mut self) {
2547 task::block_on(self.peer.reset());
2548 }
2549 }
2550
2551 struct TestClient {
2552 client: Arc<Client>,
2553 pub peer_id: PeerId,
2554 pub user_store: ModelHandle<UserStore>,
2555 }
2556
2557 impl Deref for TestClient {
2558 type Target = Arc<Client>;
2559
2560 fn deref(&self) -> &Self::Target {
2561 &self.client
2562 }
2563 }
2564
2565 impl TestClient {
2566 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
2567 UserId::from_proto(
2568 self.user_store
2569 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
2570 )
2571 }
2572 }
2573
2574 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
2575 channel
2576 .messages()
2577 .cursor::<()>()
2578 .map(|m| {
2579 (
2580 m.sender.github_login.clone(),
2581 m.body.clone(),
2582 m.is_pending(),
2583 )
2584 })
2585 .collect()
2586 }
2587
2588 struct EmptyView;
2589
2590 impl gpui::Entity for EmptyView {
2591 type Event = ();
2592 }
2593
2594 impl gpui::View for EmptyView {
2595 fn ui_name() -> &'static str {
2596 "empty view"
2597 }
2598
2599 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
2600 gpui::Element::boxed(gpui::elements::Empty)
2601 }
2602 }
2603}