1mod store;
2
3use super::{
4 auth::process_auth_header,
5 db::{ChannelId, MessageId, UserId},
6 AppState,
7};
8use anyhow::anyhow;
9use async_std::task;
10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
11use collections::{HashMap, HashSet};
12use futures::{future::BoxFuture, FutureExt};
13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
14use postage::{mpsc, prelude::Sink as _, prelude::Stream as _};
15use rpc::{
16 proto::{self, AnyTypedEnvelope, EnvelopedMessage},
17 Connection, ConnectionId, Peer, TypedEnvelope,
18};
19use sha1::{Digest as _, Sha1};
20use std::{any::TypeId, future::Future, mem, sync::Arc, time::Instant};
21use store::{Store, Worktree};
22use surf::StatusCode;
23use tide::log;
24use tide::{
25 http::headers::{HeaderName, CONNECTION, UPGRADE},
26 Request, Response,
27};
28use time::OffsetDateTime;
29
30type MessageHandler = Box<
31 dyn Send
32 + Sync
33 + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
34>;
35
36pub struct Server {
37 peer: Arc<Peer>,
38 store: RwLock<Store>,
39 app_state: Arc<AppState>,
40 handlers: HashMap<TypeId, MessageHandler>,
41 notifications: Option<mpsc::Sender<()>>,
42}
43
44const MESSAGE_COUNT_PER_PAGE: usize = 100;
45const MAX_MESSAGE_LEN: usize = 1024;
46const NO_SUCH_PROJECT: &'static str = "no such project";
47
48impl Server {
49 pub fn new(
50 app_state: Arc<AppState>,
51 peer: Arc<Peer>,
52 notifications: Option<mpsc::Sender<()>>,
53 ) -> Arc<Self> {
54 let mut server = Self {
55 peer,
56 app_state,
57 store: Default::default(),
58 handlers: Default::default(),
59 notifications,
60 };
61
62 server
63 .add_handler(Server::ping)
64 .add_handler(Server::register_project)
65 .add_handler(Server::unregister_project)
66 .add_handler(Server::share_project)
67 .add_handler(Server::unshare_project)
68 .add_handler(Server::join_project)
69 .add_handler(Server::leave_project)
70 .add_handler(Server::register_worktree)
71 .add_handler(Server::unregister_worktree)
72 .add_handler(Server::share_worktree)
73 .add_handler(Server::update_worktree)
74 .add_handler(Server::open_buffer)
75 .add_handler(Server::close_buffer)
76 .add_handler(Server::update_buffer)
77 .add_handler(Server::buffer_saved)
78 .add_handler(Server::save_buffer)
79 .add_handler(Server::get_channels)
80 .add_handler(Server::get_users)
81 .add_handler(Server::join_channel)
82 .add_handler(Server::leave_channel)
83 .add_handler(Server::send_channel_message)
84 .add_handler(Server::get_channel_messages);
85
86 Arc::new(server)
87 }
88
89 fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
90 where
91 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
92 Fut: 'static + Send + Future<Output = tide::Result<()>>,
93 M: EnvelopedMessage,
94 {
95 let prev_handler = self.handlers.insert(
96 TypeId::of::<M>(),
97 Box::new(move |server, envelope| {
98 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
99 (handler)(server, *envelope).boxed()
100 }),
101 );
102 if prev_handler.is_some() {
103 panic!("registered a handler for the same message twice");
104 }
105 self
106 }
107
108 pub fn handle_connection(
109 self: &Arc<Self>,
110 connection: Connection,
111 addr: String,
112 user_id: UserId,
113 mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
114 ) -> impl Future<Output = ()> {
115 let mut this = self.clone();
116 async move {
117 let (connection_id, handle_io, mut incoming_rx) =
118 this.peer.add_connection(connection).await;
119
120 if let Some(send_connection_id) = send_connection_id.as_mut() {
121 let _ = send_connection_id.send(connection_id).await;
122 }
123
124 this.state_mut().add_connection(connection_id, user_id);
125 if let Err(err) = this.update_contacts_for_users(&[user_id]).await {
126 log::error!("error updating contacts for {:?}: {}", user_id, err);
127 }
128
129 let handle_io = handle_io.fuse();
130 futures::pin_mut!(handle_io);
131 loop {
132 let next_message = incoming_rx.recv().fuse();
133 futures::pin_mut!(next_message);
134 futures::select_biased! {
135 message = next_message => {
136 if let Some(message) = message {
137 let start_time = Instant::now();
138 log::info!("RPC message received: {}", message.payload_type_name());
139 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
140 if let Err(err) = (handler)(this.clone(), message).await {
141 log::error!("error handling message: {:?}", err);
142 } else {
143 log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
144 }
145
146 if let Some(mut notifications) = this.notifications.clone() {
147 let _ = notifications.send(()).await;
148 }
149 } else {
150 log::warn!("unhandled message: {}", message.payload_type_name());
151 }
152 } else {
153 log::info!("rpc connection closed {:?}", addr);
154 break;
155 }
156 }
157 handle_io = handle_io => {
158 if let Err(err) = handle_io {
159 log::error!("error handling rpc connection {:?} - {:?}", addr, err);
160 }
161 break;
162 }
163 }
164 }
165
166 if let Err(err) = this.sign_out(connection_id).await {
167 log::error!("error signing out connection {:?} - {:?}", addr, err);
168 }
169 }
170 }
171
172 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
173 self.peer.disconnect(connection_id).await;
174 let removed_connection = self.state_mut().remove_connection(connection_id)?;
175
176 for (project_id, project) in removed_connection.hosted_projects {
177 if let Some(share) = project.share {
178 broadcast(
179 connection_id,
180 share.guests.keys().copied().collect(),
181 |conn_id| {
182 self.peer
183 .send(conn_id, proto::UnshareProject { project_id })
184 },
185 )
186 .await?;
187 }
188 }
189
190 for (project_id, peer_ids) in removed_connection.guest_project_ids {
191 broadcast(connection_id, peer_ids, |conn_id| {
192 self.peer.send(
193 conn_id,
194 proto::RemoveProjectCollaborator {
195 project_id,
196 peer_id: connection_id.0,
197 },
198 )
199 })
200 .await?;
201 }
202
203 self.update_contacts_for_users(removed_connection.contact_ids.iter())
204 .await?;
205
206 Ok(())
207 }
208
209 async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
210 self.peer.respond(request.receipt(), proto::Ack {}).await?;
211 Ok(())
212 }
213
214 async fn register_project(
215 mut self: Arc<Server>,
216 request: TypedEnvelope<proto::RegisterProject>,
217 ) -> tide::Result<()> {
218 let project_id = {
219 let mut state = self.state_mut();
220 let user_id = state.user_id_for_connection(request.sender_id)?;
221 state.register_project(request.sender_id, user_id)
222 };
223 self.peer
224 .respond(
225 request.receipt(),
226 proto::RegisterProjectResponse { project_id },
227 )
228 .await?;
229 Ok(())
230 }
231
232 async fn unregister_project(
233 mut self: Arc<Server>,
234 request: TypedEnvelope<proto::UnregisterProject>,
235 ) -> tide::Result<()> {
236 self.state_mut()
237 .unregister_project(request.payload.project_id, request.sender_id);
238 Ok(())
239 }
240
241 async fn share_project(
242 mut self: Arc<Server>,
243 request: TypedEnvelope<proto::ShareProject>,
244 ) -> tide::Result<()> {
245 self.state_mut()
246 .share_project(request.payload.project_id, request.sender_id);
247 self.peer.respond(request.receipt(), proto::Ack {}).await?;
248 Ok(())
249 }
250
251 async fn unshare_project(
252 mut self: Arc<Server>,
253 request: TypedEnvelope<proto::UnshareProject>,
254 ) -> tide::Result<()> {
255 let project_id = request.payload.project_id;
256 let project = self
257 .state_mut()
258 .unshare_project(project_id, request.sender_id)?;
259
260 broadcast(request.sender_id, project.connection_ids, |conn_id| {
261 self.peer
262 .send(conn_id, proto::UnshareProject { project_id })
263 })
264 .await?;
265 self.update_contacts_for_users(&project.authorized_user_ids)
266 .await?;
267
268 Ok(())
269 }
270
271 async fn join_project(
272 mut self: Arc<Server>,
273 request: TypedEnvelope<proto::JoinProject>,
274 ) -> tide::Result<()> {
275 let project_id = request.payload.project_id;
276
277 let user_id = self.state().user_id_for_connection(request.sender_id)?;
278 let response_data = self
279 .state_mut()
280 .join_project(request.sender_id, user_id, project_id)
281 .and_then(|joined| {
282 let share = joined.project.share()?;
283 let peer_count = share.guests.len();
284 let mut collaborators = Vec::with_capacity(peer_count);
285 collaborators.push(proto::Collaborator {
286 peer_id: joined.project.host_connection_id.0,
287 replica_id: 0,
288 user_id: joined.project.host_user_id.to_proto(),
289 });
290 let worktrees = joined
291 .project
292 .worktrees
293 .iter()
294 .filter_map(|(id, worktree)| {
295 worktree.share.as_ref().map(|share| proto::Worktree {
296 id: *id,
297 root_name: worktree.root_name.clone(),
298 entries: share.entries.values().cloned().collect(),
299 })
300 })
301 .collect();
302 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
303 if *peer_conn_id != request.sender_id {
304 collaborators.push(proto::Collaborator {
305 peer_id: peer_conn_id.0,
306 replica_id: *peer_replica_id as u32,
307 user_id: peer_user_id.to_proto(),
308 });
309 }
310 }
311 let response = proto::JoinProjectResponse {
312 worktrees,
313 replica_id: joined.replica_id as u32,
314 collaborators,
315 };
316 let connection_ids = joined.project.connection_ids();
317 let contact_user_ids = joined.project.authorized_user_ids();
318 Ok((response, connection_ids, contact_user_ids))
319 });
320
321 match response_data {
322 Ok((response, connection_ids, contact_user_ids)) => {
323 broadcast(request.sender_id, connection_ids, |conn_id| {
324 self.peer.send(
325 conn_id,
326 proto::AddProjectCollaborator {
327 project_id: project_id,
328 collaborator: Some(proto::Collaborator {
329 peer_id: request.sender_id.0,
330 replica_id: response.replica_id,
331 user_id: user_id.to_proto(),
332 }),
333 },
334 )
335 })
336 .await?;
337 self.peer.respond(request.receipt(), response).await?;
338 self.update_contacts_for_users(&contact_user_ids).await?;
339 }
340 Err(error) => {
341 self.peer
342 .respond_with_error(
343 request.receipt(),
344 proto::Error {
345 message: error.to_string(),
346 },
347 )
348 .await?;
349 }
350 }
351
352 Ok(())
353 }
354
355 async fn leave_project(
356 mut self: Arc<Server>,
357 request: TypedEnvelope<proto::LeaveProject>,
358 ) -> tide::Result<()> {
359 let sender_id = request.sender_id;
360 let project_id = request.payload.project_id;
361 let worktree = self.state_mut().leave_project(sender_id, project_id);
362 if let Some(worktree) = worktree {
363 broadcast(sender_id, worktree.connection_ids, |conn_id| {
364 self.peer.send(
365 conn_id,
366 proto::RemoveProjectCollaborator {
367 project_id,
368 peer_id: sender_id.0,
369 },
370 )
371 })
372 .await?;
373 self.update_contacts_for_users(&worktree.authorized_user_ids)
374 .await?;
375 }
376 Ok(())
377 }
378
379 async fn register_worktree(
380 mut self: Arc<Server>,
381 request: TypedEnvelope<proto::RegisterWorktree>,
382 ) -> tide::Result<()> {
383 let receipt = request.receipt();
384 let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
385
386 let mut contact_user_ids = HashSet::default();
387 contact_user_ids.insert(host_user_id);
388 for github_login in request.payload.authorized_logins {
389 match self.app_state.db.create_user(&github_login, false).await {
390 Ok(contact_user_id) => {
391 contact_user_ids.insert(contact_user_id);
392 }
393 Err(err) => {
394 let message = err.to_string();
395 self.peer
396 .respond_with_error(receipt, proto::Error { message })
397 .await?;
398 return Ok(());
399 }
400 }
401 }
402
403 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
404 let ok = self.state_mut().register_worktree(
405 request.payload.project_id,
406 request.payload.worktree_id,
407 Worktree {
408 authorized_user_ids: contact_user_ids.clone(),
409 root_name: request.payload.root_name,
410 share: None,
411 },
412 );
413
414 if ok {
415 self.peer.respond(receipt, proto::Ack {}).await?;
416 self.update_contacts_for_users(&contact_user_ids).await?;
417 } else {
418 self.peer
419 .respond_with_error(
420 receipt,
421 proto::Error {
422 message: NO_SUCH_PROJECT.to_string(),
423 },
424 )
425 .await?;
426 }
427
428 Ok(())
429 }
430
431 async fn unregister_worktree(
432 mut self: Arc<Server>,
433 request: TypedEnvelope<proto::UnregisterWorktree>,
434 ) -> tide::Result<()> {
435 let project_id = request.payload.project_id;
436 let worktree_id = request.payload.worktree_id;
437 let (worktree, guest_connection_ids) =
438 self.state_mut()
439 .unregister_worktree(project_id, worktree_id, request.sender_id)?;
440
441 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
442 self.peer.send(
443 conn_id,
444 proto::UnregisterWorktree {
445 project_id,
446 worktree_id,
447 },
448 )
449 })
450 .await?;
451 self.update_contacts_for_users(&worktree.authorized_user_ids)
452 .await?;
453 Ok(())
454 }
455
456 async fn share_worktree(
457 mut self: Arc<Server>,
458 mut request: TypedEnvelope<proto::ShareWorktree>,
459 ) -> tide::Result<()> {
460 let worktree = request
461 .payload
462 .worktree
463 .as_mut()
464 .ok_or_else(|| anyhow!("missing worktree"))?;
465 let entries = mem::take(&mut worktree.entries)
466 .into_iter()
467 .map(|entry| (entry.id, entry))
468 .collect();
469
470 let contact_user_ids = self.state_mut().share_worktree(
471 request.payload.project_id,
472 worktree.id,
473 request.sender_id,
474 entries,
475 );
476 if let Some(contact_user_ids) = contact_user_ids {
477 self.peer.respond(request.receipt(), proto::Ack {}).await?;
478 self.update_contacts_for_users(&contact_user_ids).await?;
479 } else {
480 self.peer
481 .respond_with_error(
482 request.receipt(),
483 proto::Error {
484 message: "no such worktree".to_string(),
485 },
486 )
487 .await?;
488 }
489 Ok(())
490 }
491
492 async fn update_worktree(
493 mut self: Arc<Server>,
494 request: TypedEnvelope<proto::UpdateWorktree>,
495 ) -> tide::Result<()> {
496 let connection_ids = self
497 .state_mut()
498 .update_worktree(
499 request.sender_id,
500 request.payload.project_id,
501 request.payload.worktree_id,
502 &request.payload.removed_entries,
503 &request.payload.updated_entries,
504 )
505 .ok_or_else(|| anyhow!("no such worktree"))?;
506
507 broadcast(request.sender_id, connection_ids, |connection_id| {
508 self.peer
509 .forward_send(request.sender_id, connection_id, request.payload.clone())
510 })
511 .await?;
512
513 Ok(())
514 }
515
516 async fn open_buffer(
517 self: Arc<Server>,
518 request: TypedEnvelope<proto::OpenBuffer>,
519 ) -> tide::Result<()> {
520 let receipt = request.receipt();
521 let host_connection_id = self
522 .state()
523 .read_project(request.payload.project_id, request.sender_id)
524 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
525 .host_connection_id;
526 let response = self
527 .peer
528 .forward_request(request.sender_id, host_connection_id, request.payload)
529 .await?;
530 self.peer.respond(receipt, response).await?;
531 Ok(())
532 }
533
534 async fn close_buffer(
535 self: Arc<Server>,
536 request: TypedEnvelope<proto::CloseBuffer>,
537 ) -> tide::Result<()> {
538 let host_connection_id = self
539 .state()
540 .read_project(request.payload.project_id, request.sender_id)
541 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
542 .host_connection_id;
543 self.peer
544 .forward_send(request.sender_id, host_connection_id, request.payload)
545 .await?;
546 Ok(())
547 }
548
549 async fn save_buffer(
550 self: Arc<Server>,
551 request: TypedEnvelope<proto::SaveBuffer>,
552 ) -> tide::Result<()> {
553 let host;
554 let guests;
555 {
556 let state = self.state();
557 let project = state
558 .read_project(request.payload.project_id, request.sender_id)
559 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
560 host = project.host_connection_id;
561 guests = project.guest_connection_ids()
562 }
563
564 let sender = request.sender_id;
565 let receipt = request.receipt();
566 let response = self
567 .peer
568 .forward_request(sender, host, request.payload.clone())
569 .await?;
570
571 broadcast(host, guests, |conn_id| {
572 let response = response.clone();
573 let peer = &self.peer;
574 async move {
575 if conn_id == sender {
576 peer.respond(receipt, response).await
577 } else {
578 peer.forward_send(host, conn_id, response).await
579 }
580 }
581 })
582 .await?;
583
584 Ok(())
585 }
586
587 async fn update_buffer(
588 self: Arc<Server>,
589 request: TypedEnvelope<proto::UpdateBuffer>,
590 ) -> tide::Result<()> {
591 let receiver_ids = self
592 .state()
593 .project_connection_ids(request.payload.project_id, request.sender_id)
594 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
595 broadcast(request.sender_id, receiver_ids, |connection_id| {
596 self.peer
597 .forward_send(request.sender_id, connection_id, request.payload.clone())
598 })
599 .await?;
600 self.peer.respond(request.receipt(), proto::Ack {}).await?;
601 Ok(())
602 }
603
604 async fn buffer_saved(
605 self: Arc<Server>,
606 request: TypedEnvelope<proto::BufferSaved>,
607 ) -> tide::Result<()> {
608 let receiver_ids = self
609 .state()
610 .project_connection_ids(request.payload.project_id, request.sender_id)
611 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
612 broadcast(request.sender_id, receiver_ids, |connection_id| {
613 self.peer
614 .forward_send(request.sender_id, connection_id, request.payload.clone())
615 })
616 .await?;
617 Ok(())
618 }
619
620 async fn get_channels(
621 self: Arc<Server>,
622 request: TypedEnvelope<proto::GetChannels>,
623 ) -> tide::Result<()> {
624 let user_id = self.state().user_id_for_connection(request.sender_id)?;
625 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
626 self.peer
627 .respond(
628 request.receipt(),
629 proto::GetChannelsResponse {
630 channels: channels
631 .into_iter()
632 .map(|chan| proto::Channel {
633 id: chan.id.to_proto(),
634 name: chan.name,
635 })
636 .collect(),
637 },
638 )
639 .await?;
640 Ok(())
641 }
642
643 async fn get_users(
644 self: Arc<Server>,
645 request: TypedEnvelope<proto::GetUsers>,
646 ) -> tide::Result<()> {
647 let receipt = request.receipt();
648 let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
649 let users = self
650 .app_state
651 .db
652 .get_users_by_ids(user_ids)
653 .await?
654 .into_iter()
655 .map(|user| proto::User {
656 id: user.id.to_proto(),
657 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
658 github_login: user.github_login,
659 })
660 .collect();
661 self.peer
662 .respond(receipt, proto::GetUsersResponse { users })
663 .await?;
664 Ok(())
665 }
666
667 async fn update_contacts_for_users<'a>(
668 self: &Arc<Server>,
669 user_ids: impl IntoIterator<Item = &'a UserId>,
670 ) -> tide::Result<()> {
671 let mut send_futures = Vec::new();
672
673 {
674 let state = self.state();
675 for user_id in user_ids {
676 let contacts = state.contacts_for_user(*user_id);
677 for connection_id in state.connection_ids_for_user(*user_id) {
678 send_futures.push(self.peer.send(
679 connection_id,
680 proto::UpdateContacts {
681 contacts: contacts.clone(),
682 },
683 ));
684 }
685 }
686 }
687 futures::future::try_join_all(send_futures).await?;
688
689 Ok(())
690 }
691
692 async fn join_channel(
693 mut self: Arc<Self>,
694 request: TypedEnvelope<proto::JoinChannel>,
695 ) -> tide::Result<()> {
696 let user_id = self.state().user_id_for_connection(request.sender_id)?;
697 let channel_id = ChannelId::from_proto(request.payload.channel_id);
698 if !self
699 .app_state
700 .db
701 .can_user_access_channel(user_id, channel_id)
702 .await?
703 {
704 Err(anyhow!("access denied"))?;
705 }
706
707 self.state_mut().join_channel(request.sender_id, channel_id);
708 let messages = self
709 .app_state
710 .db
711 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
712 .await?
713 .into_iter()
714 .map(|msg| proto::ChannelMessage {
715 id: msg.id.to_proto(),
716 body: msg.body,
717 timestamp: msg.sent_at.unix_timestamp() as u64,
718 sender_id: msg.sender_id.to_proto(),
719 nonce: Some(msg.nonce.as_u128().into()),
720 })
721 .collect::<Vec<_>>();
722 self.peer
723 .respond(
724 request.receipt(),
725 proto::JoinChannelResponse {
726 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
727 messages,
728 },
729 )
730 .await?;
731 Ok(())
732 }
733
734 async fn leave_channel(
735 mut self: Arc<Self>,
736 request: TypedEnvelope<proto::LeaveChannel>,
737 ) -> tide::Result<()> {
738 let user_id = self.state().user_id_for_connection(request.sender_id)?;
739 let channel_id = ChannelId::from_proto(request.payload.channel_id);
740 if !self
741 .app_state
742 .db
743 .can_user_access_channel(user_id, channel_id)
744 .await?
745 {
746 Err(anyhow!("access denied"))?;
747 }
748
749 self.state_mut()
750 .leave_channel(request.sender_id, channel_id);
751
752 Ok(())
753 }
754
755 async fn send_channel_message(
756 self: Arc<Self>,
757 request: TypedEnvelope<proto::SendChannelMessage>,
758 ) -> tide::Result<()> {
759 let receipt = request.receipt();
760 let channel_id = ChannelId::from_proto(request.payload.channel_id);
761 let user_id;
762 let connection_ids;
763 {
764 let state = self.state();
765 user_id = state.user_id_for_connection(request.sender_id)?;
766 if let Some(ids) = state.channel_connection_ids(channel_id) {
767 connection_ids = ids;
768 } else {
769 return Ok(());
770 }
771 }
772
773 // Validate the message body.
774 let body = request.payload.body.trim().to_string();
775 if body.len() > MAX_MESSAGE_LEN {
776 self.peer
777 .respond_with_error(
778 receipt,
779 proto::Error {
780 message: "message is too long".to_string(),
781 },
782 )
783 .await?;
784 return Ok(());
785 }
786 if body.is_empty() {
787 self.peer
788 .respond_with_error(
789 receipt,
790 proto::Error {
791 message: "message can't be blank".to_string(),
792 },
793 )
794 .await?;
795 return Ok(());
796 }
797
798 let timestamp = OffsetDateTime::now_utc();
799 let nonce = if let Some(nonce) = request.payload.nonce {
800 nonce
801 } else {
802 self.peer
803 .respond_with_error(
804 receipt,
805 proto::Error {
806 message: "nonce can't be blank".to_string(),
807 },
808 )
809 .await?;
810 return Ok(());
811 };
812
813 let message_id = self
814 .app_state
815 .db
816 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
817 .await?
818 .to_proto();
819 let message = proto::ChannelMessage {
820 sender_id: user_id.to_proto(),
821 id: message_id,
822 body,
823 timestamp: timestamp.unix_timestamp() as u64,
824 nonce: Some(nonce),
825 };
826 broadcast(request.sender_id, connection_ids, |conn_id| {
827 self.peer.send(
828 conn_id,
829 proto::ChannelMessageSent {
830 channel_id: channel_id.to_proto(),
831 message: Some(message.clone()),
832 },
833 )
834 })
835 .await?;
836 self.peer
837 .respond(
838 receipt,
839 proto::SendChannelMessageResponse {
840 message: Some(message),
841 },
842 )
843 .await?;
844 Ok(())
845 }
846
847 async fn get_channel_messages(
848 self: Arc<Self>,
849 request: TypedEnvelope<proto::GetChannelMessages>,
850 ) -> tide::Result<()> {
851 let user_id = self.state().user_id_for_connection(request.sender_id)?;
852 let channel_id = ChannelId::from_proto(request.payload.channel_id);
853 if !self
854 .app_state
855 .db
856 .can_user_access_channel(user_id, channel_id)
857 .await?
858 {
859 Err(anyhow!("access denied"))?;
860 }
861
862 let messages = self
863 .app_state
864 .db
865 .get_channel_messages(
866 channel_id,
867 MESSAGE_COUNT_PER_PAGE,
868 Some(MessageId::from_proto(request.payload.before_message_id)),
869 )
870 .await?
871 .into_iter()
872 .map(|msg| proto::ChannelMessage {
873 id: msg.id.to_proto(),
874 body: msg.body,
875 timestamp: msg.sent_at.unix_timestamp() as u64,
876 sender_id: msg.sender_id.to_proto(),
877 nonce: Some(msg.nonce.as_u128().into()),
878 })
879 .collect::<Vec<_>>();
880 self.peer
881 .respond(
882 request.receipt(),
883 proto::GetChannelMessagesResponse {
884 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
885 messages,
886 },
887 )
888 .await?;
889 Ok(())
890 }
891
892 fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
893 self.store.read()
894 }
895
896 fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
897 self.store.write()
898 }
899}
900
901pub async fn broadcast<F, T>(
902 sender_id: ConnectionId,
903 receiver_ids: Vec<ConnectionId>,
904 mut f: F,
905) -> anyhow::Result<()>
906where
907 F: FnMut(ConnectionId) -> T,
908 T: Future<Output = anyhow::Result<()>>,
909{
910 let futures = receiver_ids
911 .into_iter()
912 .filter(|id| *id != sender_id)
913 .map(|id| f(id));
914 futures::future::try_join_all(futures).await?;
915 Ok(())
916}
917
918pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
919 let server = Server::new(app.state().clone(), rpc.clone(), None);
920 app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
921 let server = server.clone();
922 async move {
923 const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
924
925 let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
926 let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
927 let upgrade_requested = connection_upgrade && upgrade_to_websocket;
928 let client_protocol_version: Option<u32> = request
929 .header("X-Zed-Protocol-Version")
930 .and_then(|v| v.as_str().parse().ok());
931
932 if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
933 return Ok(Response::new(StatusCode::UpgradeRequired));
934 }
935
936 let header = match request.header("Sec-Websocket-Key") {
937 Some(h) => h.as_str(),
938 None => return Err(anyhow!("expected sec-websocket-key"))?,
939 };
940
941 let user_id = process_auth_header(&request).await?;
942
943 let mut response = Response::new(StatusCode::SwitchingProtocols);
944 response.insert_header(UPGRADE, "websocket");
945 response.insert_header(CONNECTION, "Upgrade");
946 let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
947 response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
948 response.insert_header("Sec-Websocket-Version", "13");
949
950 let http_res: &mut tide::http::Response = response.as_mut();
951 let upgrade_receiver = http_res.recv_upgrade().await;
952 let addr = request.remote().unwrap_or("unknown").to_string();
953 task::spawn(async move {
954 if let Some(stream) = upgrade_receiver.await {
955 server
956 .handle_connection(
957 Connection::new(
958 WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
959 ),
960 addr,
961 user_id,
962 None,
963 )
964 .await;
965 }
966 });
967
968 Ok(response)
969 }
970 });
971}
972
973fn header_contains_ignore_case<T>(
974 request: &tide::Request<T>,
975 header_name: HeaderName,
976 value: &str,
977) -> bool {
978 request
979 .header(header_name)
980 .map(|h| {
981 h.as_str()
982 .split(',')
983 .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
984 })
985 .unwrap_or(false)
986}
987
988#[cfg(test)]
989mod tests {
990 use super::*;
991 use crate::{
992 auth,
993 db::{tests::TestDb, UserId},
994 github, AppState, Config,
995 };
996 use ::rpc::Peer;
997 use async_std::task;
998 use gpui::{ModelHandle, TestAppContext};
999 use parking_lot::Mutex;
1000 use postage::{mpsc, watch};
1001 use rpc::PeerId;
1002 use serde_json::json;
1003 use sqlx::types::time::OffsetDateTime;
1004 use std::{
1005 ops::Deref,
1006 path::Path,
1007 sync::{
1008 atomic::{AtomicBool, Ordering::SeqCst},
1009 Arc,
1010 },
1011 time::Duration,
1012 };
1013 use zed::{
1014 client::{
1015 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1016 EstablishConnectionError, UserStore,
1017 },
1018 editor::{Editor, EditorSettings, Input, MultiBuffer},
1019 fs::{FakeFs, Fs as _},
1020 language::{
1021 tree_sitter_rust, Diagnostic, DiagnosticEntry, Language, LanguageConfig,
1022 LanguageRegistry, LanguageServerConfig, Point,
1023 },
1024 lsp,
1025 project::Project,
1026 };
1027
1028 #[gpui::test]
1029 async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1030 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1031 let lang_registry = Arc::new(LanguageRegistry::new());
1032 let fs = Arc::new(FakeFs::new());
1033 cx_a.foreground().forbid_parking();
1034
1035 // Connect to a server as 2 clients.
1036 let mut server = TestServer::start().await;
1037 let client_a = server.create_client(&mut cx_a, "user_a").await;
1038 let client_b = server.create_client(&mut cx_b, "user_b").await;
1039
1040 // Share a project as client A
1041 fs.insert_tree(
1042 "/a",
1043 json!({
1044 ".zed.toml": r#"collaborators = ["user_b"]"#,
1045 "a.txt": "a-contents",
1046 "b.txt": "b-contents",
1047 }),
1048 )
1049 .await;
1050 let project_a = cx_a.update(|cx| {
1051 Project::local(
1052 client_a.clone(),
1053 client_a.user_store.clone(),
1054 lang_registry.clone(),
1055 fs.clone(),
1056 cx,
1057 )
1058 });
1059 let worktree_a = project_a
1060 .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1061 .await
1062 .unwrap();
1063 worktree_a
1064 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1065 .await;
1066 let project_id = project_a
1067 .update(&mut cx_a, |project, _| project.next_remote_id())
1068 .await;
1069 project_a
1070 .update(&mut cx_a, |project, cx| project.share(cx))
1071 .await
1072 .unwrap();
1073
1074 // Join that project as client B
1075 let project_b = Project::remote(
1076 project_id,
1077 client_b.clone(),
1078 client_b.user_store.clone(),
1079 lang_registry.clone(),
1080 fs.clone(),
1081 &mut cx_b.to_async(),
1082 )
1083 .await
1084 .unwrap();
1085 let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1086
1087 let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1088 assert_eq!(
1089 project
1090 .collaborators()
1091 .get(&client_a.peer_id)
1092 .unwrap()
1093 .user
1094 .github_login,
1095 "user_a"
1096 );
1097 project.replica_id()
1098 });
1099 project_a
1100 .condition(&cx_a, |tree, _| {
1101 tree.collaborators()
1102 .get(&client_b.peer_id)
1103 .map_or(false, |collaborator| {
1104 collaborator.replica_id == replica_id_b
1105 && collaborator.user.github_login == "user_b"
1106 })
1107 })
1108 .await;
1109
1110 // Open the same file as client B and client A.
1111 let buffer_b = worktree_b
1112 .update(&mut cx_b, |worktree, cx| worktree.open_buffer("b.txt", cx))
1113 .await
1114 .unwrap();
1115 let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1116 buffer_b.read_with(&cx_b, |buf, cx| {
1117 assert_eq!(buf.read(cx).text(), "b-contents")
1118 });
1119 worktree_a.read_with(&cx_a, |tree, cx| assert!(tree.has_open_buffer("b.txt", cx)));
1120 let buffer_a = worktree_a
1121 .update(&mut cx_a, |tree, cx| tree.open_buffer("b.txt", cx))
1122 .await
1123 .unwrap();
1124
1125 let editor_b = cx_b.add_view(window_b, |cx| {
1126 Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), cx)
1127 });
1128 // TODO
1129 // // Create a selection set as client B and see that selection set as client A.
1130 // buffer_a
1131 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1132 // .await;
1133
1134 // Edit the buffer as client B and see that edit as client A.
1135 editor_b.update(&mut cx_b, |editor, cx| {
1136 editor.handle_input(&Input("ok, ".into()), cx)
1137 });
1138 buffer_a
1139 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1140 .await;
1141
1142 // TODO
1143 // // Remove the selection set as client B, see those selections disappear as client A.
1144 cx_b.update(move |_| drop(editor_b));
1145 // buffer_a
1146 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1147 // .await;
1148
1149 // Close the buffer as client A, see that the buffer is closed.
1150 cx_a.update(move |_| drop(buffer_a));
1151 worktree_a
1152 .condition(&cx_a, |tree, cx| !tree.has_open_buffer("b.txt", cx))
1153 .await;
1154
1155 // Dropping the client B's project removes client B from client A's collaborators.
1156 cx_b.update(move |_| drop(project_b));
1157 project_a
1158 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1159 .await;
1160 }
1161
1162 #[gpui::test]
1163 async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1164 cx_b.update(zed::contacts_panel::init);
1165 let lang_registry = Arc::new(LanguageRegistry::new());
1166 let fs = Arc::new(FakeFs::new());
1167 cx_a.foreground().forbid_parking();
1168
1169 // Connect to a server as 2 clients.
1170 let mut server = TestServer::start().await;
1171 let client_a = server.create_client(&mut cx_a, "user_a").await;
1172 let client_b = server.create_client(&mut cx_b, "user_b").await;
1173
1174 // Share a project as client A
1175 fs.insert_tree(
1176 "/a",
1177 json!({
1178 ".zed.toml": r#"collaborators = ["user_b"]"#,
1179 "a.txt": "a-contents",
1180 "b.txt": "b-contents",
1181 }),
1182 )
1183 .await;
1184 let project_a = cx_a.update(|cx| {
1185 Project::local(
1186 client_a.clone(),
1187 client_a.user_store.clone(),
1188 lang_registry.clone(),
1189 fs.clone(),
1190 cx,
1191 )
1192 });
1193 let worktree_a = project_a
1194 .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1195 .await
1196 .unwrap();
1197 worktree_a
1198 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1199 .await;
1200 let project_id = project_a
1201 .update(&mut cx_a, |project, _| project.next_remote_id())
1202 .await;
1203 project_a
1204 .update(&mut cx_a, |project, cx| project.share(cx))
1205 .await
1206 .unwrap();
1207
1208 // Join that project as client B
1209 let project_b = Project::remote(
1210 project_id,
1211 client_b.clone(),
1212 client_b.user_store.clone(),
1213 lang_registry.clone(),
1214 fs.clone(),
1215 &mut cx_b.to_async(),
1216 )
1217 .await
1218 .unwrap();
1219
1220 let worktree_b = project_b.read_with(&cx_b, |p, _| p.worktrees()[0].clone());
1221 worktree_b
1222 .update(&mut cx_b, |tree, cx| tree.open_buffer("a.txt", cx))
1223 .await
1224 .unwrap();
1225
1226 project_a
1227 .update(&mut cx_a, |project, cx| project.unshare(cx))
1228 .await
1229 .unwrap();
1230 project_b
1231 .condition(&mut cx_b, |project, _| project.is_read_only())
1232 .await;
1233 }
1234
1235 #[gpui::test]
1236 async fn test_propagate_saves_and_fs_changes(
1237 mut cx_a: TestAppContext,
1238 mut cx_b: TestAppContext,
1239 mut cx_c: TestAppContext,
1240 ) {
1241 let lang_registry = Arc::new(LanguageRegistry::new());
1242 let fs = Arc::new(FakeFs::new());
1243 cx_a.foreground().forbid_parking();
1244
1245 // Connect to a server as 3 clients.
1246 let mut server = TestServer::start().await;
1247 let client_a = server.create_client(&mut cx_a, "user_a").await;
1248 let client_b = server.create_client(&mut cx_b, "user_b").await;
1249 let client_c = server.create_client(&mut cx_c, "user_c").await;
1250
1251 // Share a worktree as client A.
1252 fs.insert_tree(
1253 "/a",
1254 json!({
1255 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1256 "file1": "",
1257 "file2": ""
1258 }),
1259 )
1260 .await;
1261 let project_a = cx_a.update(|cx| {
1262 Project::local(
1263 client_a.clone(),
1264 client_a.user_store.clone(),
1265 lang_registry.clone(),
1266 fs.clone(),
1267 cx,
1268 )
1269 });
1270 let worktree_a = project_a
1271 .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1272 .await
1273 .unwrap();
1274 worktree_a
1275 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1276 .await;
1277 let project_id = project_a
1278 .update(&mut cx_a, |project, _| project.next_remote_id())
1279 .await;
1280 project_a
1281 .update(&mut cx_a, |project, cx| project.share(cx))
1282 .await
1283 .unwrap();
1284
1285 // Join that worktree as clients B and C.
1286 let project_b = Project::remote(
1287 project_id,
1288 client_b.clone(),
1289 client_b.user_store.clone(),
1290 lang_registry.clone(),
1291 fs.clone(),
1292 &mut cx_b.to_async(),
1293 )
1294 .await
1295 .unwrap();
1296 let project_c = Project::remote(
1297 project_id,
1298 client_c.clone(),
1299 client_c.user_store.clone(),
1300 lang_registry.clone(),
1301 fs.clone(),
1302 &mut cx_c.to_async(),
1303 )
1304 .await
1305 .unwrap();
1306
1307 // Open and edit a buffer as both guests B and C.
1308 let worktree_b = project_b.read_with(&cx_b, |p, _| p.worktrees()[0].clone());
1309 let worktree_c = project_c.read_with(&cx_c, |p, _| p.worktrees()[0].clone());
1310 let buffer_b = worktree_b
1311 .update(&mut cx_b, |tree, cx| tree.open_buffer("file1", cx))
1312 .await
1313 .unwrap();
1314 let buffer_c = worktree_c
1315 .update(&mut cx_c, |tree, cx| tree.open_buffer("file1", cx))
1316 .await
1317 .unwrap();
1318 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1319 buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1320
1321 // Open and edit that buffer as the host.
1322 let buffer_a = worktree_a
1323 .update(&mut cx_a, |tree, cx| tree.open_buffer("file1", cx))
1324 .await
1325 .unwrap();
1326
1327 buffer_a
1328 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1329 .await;
1330 buffer_a.update(&mut cx_a, |buf, cx| {
1331 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1332 });
1333
1334 // Wait for edits to propagate
1335 buffer_a
1336 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1337 .await;
1338 buffer_b
1339 .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1340 .await;
1341 buffer_c
1342 .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1343 .await;
1344
1345 // Edit the buffer as the host and concurrently save as guest B.
1346 let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx).unwrap());
1347 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1348 save_b.await.unwrap();
1349 assert_eq!(
1350 fs.load("/a/file1".as_ref()).await.unwrap(),
1351 "hi-a, i-am-c, i-am-b, i-am-a"
1352 );
1353 buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1354 buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1355 buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1356
1357 // Make changes on host's file system, see those changes on the guests.
1358 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
1359 .await
1360 .unwrap();
1361 fs.insert_file(Path::new("/a/file4"), "4".into())
1362 .await
1363 .unwrap();
1364
1365 worktree_b
1366 .condition(&cx_b, |tree, _| tree.file_count() == 4)
1367 .await;
1368 worktree_c
1369 .condition(&cx_c, |tree, _| tree.file_count() == 4)
1370 .await;
1371 worktree_b.read_with(&cx_b, |tree, _| {
1372 assert_eq!(
1373 tree.paths()
1374 .map(|p| p.to_string_lossy())
1375 .collect::<Vec<_>>(),
1376 &[".zed.toml", "file1", "file3", "file4"]
1377 )
1378 });
1379 worktree_c.read_with(&cx_c, |tree, _| {
1380 assert_eq!(
1381 tree.paths()
1382 .map(|p| p.to_string_lossy())
1383 .collect::<Vec<_>>(),
1384 &[".zed.toml", "file1", "file3", "file4"]
1385 )
1386 });
1387 }
1388
1389 #[gpui::test]
1390 async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1391 cx_a.foreground().forbid_parking();
1392 let lang_registry = Arc::new(LanguageRegistry::new());
1393 let fs = Arc::new(FakeFs::new());
1394
1395 // Connect to a server as 2 clients.
1396 let mut server = TestServer::start().await;
1397 let client_a = server.create_client(&mut cx_a, "user_a").await;
1398 let client_b = server.create_client(&mut cx_b, "user_b").await;
1399
1400 // Share a project as client A
1401 fs.insert_tree(
1402 "/dir",
1403 json!({
1404 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1405 "a.txt": "a-contents",
1406 }),
1407 )
1408 .await;
1409
1410 let project_a = cx_a.update(|cx| {
1411 Project::local(
1412 client_a.clone(),
1413 client_a.user_store.clone(),
1414 lang_registry.clone(),
1415 fs.clone(),
1416 cx,
1417 )
1418 });
1419 let worktree_a = project_a
1420 .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1421 .await
1422 .unwrap();
1423 worktree_a
1424 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1425 .await;
1426 let project_id = project_a
1427 .update(&mut cx_a, |project, _| project.next_remote_id())
1428 .await;
1429 project_a
1430 .update(&mut cx_a, |project, cx| project.share(cx))
1431 .await
1432 .unwrap();
1433
1434 // Join that project as client B
1435 let project_b = Project::remote(
1436 project_id,
1437 client_b.clone(),
1438 client_b.user_store.clone(),
1439 lang_registry.clone(),
1440 fs.clone(),
1441 &mut cx_b.to_async(),
1442 )
1443 .await
1444 .unwrap();
1445 let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1446
1447 // Open a buffer as client B
1448 let buffer_b = worktree_b
1449 .update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx))
1450 .await
1451 .unwrap();
1452 let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1453
1454 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1455 buffer_b.read_with(&cx_b, |buf, _| {
1456 assert!(buf.is_dirty());
1457 assert!(!buf.has_conflict());
1458 });
1459
1460 buffer_b
1461 .update(&mut cx_b, |buf, cx| buf.save(cx))
1462 .unwrap()
1463 .await
1464 .unwrap();
1465 worktree_b
1466 .condition(&cx_b, |_, cx| {
1467 buffer_b.read(cx).file().unwrap().mtime() != mtime
1468 })
1469 .await;
1470 buffer_b.read_with(&cx_b, |buf, _| {
1471 assert!(!buf.is_dirty());
1472 assert!(!buf.has_conflict());
1473 });
1474
1475 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1476 buffer_b.read_with(&cx_b, |buf, _| {
1477 assert!(buf.is_dirty());
1478 assert!(!buf.has_conflict());
1479 });
1480 }
1481
1482 #[gpui::test]
1483 async fn test_editing_while_guest_opens_buffer(
1484 mut cx_a: TestAppContext,
1485 mut cx_b: TestAppContext,
1486 ) {
1487 cx_a.foreground().forbid_parking();
1488 let lang_registry = Arc::new(LanguageRegistry::new());
1489 let fs = Arc::new(FakeFs::new());
1490
1491 // Connect to a server as 2 clients.
1492 let mut server = TestServer::start().await;
1493 let client_a = server.create_client(&mut cx_a, "user_a").await;
1494 let client_b = server.create_client(&mut cx_b, "user_b").await;
1495
1496 // Share a project as client A
1497 fs.insert_tree(
1498 "/dir",
1499 json!({
1500 ".zed.toml": r#"collaborators = ["user_b"]"#,
1501 "a.txt": "a-contents",
1502 }),
1503 )
1504 .await;
1505 let project_a = cx_a.update(|cx| {
1506 Project::local(
1507 client_a.clone(),
1508 client_a.user_store.clone(),
1509 lang_registry.clone(),
1510 fs.clone(),
1511 cx,
1512 )
1513 });
1514 let worktree_a = project_a
1515 .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1516 .await
1517 .unwrap();
1518 worktree_a
1519 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1520 .await;
1521 let project_id = project_a
1522 .update(&mut cx_a, |project, _| project.next_remote_id())
1523 .await;
1524 project_a
1525 .update(&mut cx_a, |project, cx| project.share(cx))
1526 .await
1527 .unwrap();
1528
1529 // Join that project as client B
1530 let project_b = Project::remote(
1531 project_id,
1532 client_b.clone(),
1533 client_b.user_store.clone(),
1534 lang_registry.clone(),
1535 fs.clone(),
1536 &mut cx_b.to_async(),
1537 )
1538 .await
1539 .unwrap();
1540 let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1541
1542 // Open a buffer as client A
1543 let buffer_a = worktree_a
1544 .update(&mut cx_a, |tree, cx| tree.open_buffer("a.txt", cx))
1545 .await
1546 .unwrap();
1547
1548 // Start opening the same buffer as client B
1549 let buffer_b = cx_b
1550 .background()
1551 .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1552 task::yield_now().await;
1553
1554 // Edit the buffer as client A while client B is still opening it.
1555 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1556
1557 let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1558 let buffer_b = buffer_b.await.unwrap();
1559 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1560 }
1561
1562 #[gpui::test]
1563 async fn test_leaving_worktree_while_opening_buffer(
1564 mut cx_a: TestAppContext,
1565 mut cx_b: TestAppContext,
1566 ) {
1567 cx_a.foreground().forbid_parking();
1568 let lang_registry = Arc::new(LanguageRegistry::new());
1569 let fs = Arc::new(FakeFs::new());
1570
1571 // Connect to a server as 2 clients.
1572 let mut server = TestServer::start().await;
1573 let client_a = server.create_client(&mut cx_a, "user_a").await;
1574 let client_b = server.create_client(&mut cx_b, "user_b").await;
1575
1576 // Share a project as client A
1577 fs.insert_tree(
1578 "/dir",
1579 json!({
1580 ".zed.toml": r#"collaborators = ["user_b"]"#,
1581 "a.txt": "a-contents",
1582 }),
1583 )
1584 .await;
1585 let project_a = cx_a.update(|cx| {
1586 Project::local(
1587 client_a.clone(),
1588 client_a.user_store.clone(),
1589 lang_registry.clone(),
1590 fs.clone(),
1591 cx,
1592 )
1593 });
1594 let worktree_a = project_a
1595 .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1596 .await
1597 .unwrap();
1598 worktree_a
1599 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1600 .await;
1601 let project_id = project_a
1602 .update(&mut cx_a, |project, _| project.next_remote_id())
1603 .await;
1604 project_a
1605 .update(&mut cx_a, |project, cx| project.share(cx))
1606 .await
1607 .unwrap();
1608
1609 // Join that project as client B
1610 let project_b = Project::remote(
1611 project_id,
1612 client_b.clone(),
1613 client_b.user_store.clone(),
1614 lang_registry.clone(),
1615 fs.clone(),
1616 &mut cx_b.to_async(),
1617 )
1618 .await
1619 .unwrap();
1620 let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1621
1622 // See that a guest has joined as client A.
1623 project_a
1624 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1625 .await;
1626
1627 // Begin opening a buffer as client B, but leave the project before the open completes.
1628 let buffer_b = cx_b
1629 .background()
1630 .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1631 cx_b.update(|_| drop(worktree_b));
1632 drop(buffer_b);
1633
1634 // See that the guest has left.
1635 project_a
1636 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1637 .await;
1638 }
1639
1640 #[gpui::test]
1641 async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1642 cx_a.foreground().forbid_parking();
1643 let lang_registry = Arc::new(LanguageRegistry::new());
1644 let fs = Arc::new(FakeFs::new());
1645
1646 // Connect to a server as 2 clients.
1647 let mut server = TestServer::start().await;
1648 let client_a = server.create_client(&mut cx_a, "user_a").await;
1649 let client_b = server.create_client(&mut cx_b, "user_b").await;
1650
1651 // Share a project as client A
1652 fs.insert_tree(
1653 "/a",
1654 json!({
1655 ".zed.toml": r#"collaborators = ["user_b"]"#,
1656 "a.txt": "a-contents",
1657 "b.txt": "b-contents",
1658 }),
1659 )
1660 .await;
1661 let project_a = cx_a.update(|cx| {
1662 Project::local(
1663 client_a.clone(),
1664 client_a.user_store.clone(),
1665 lang_registry.clone(),
1666 fs.clone(),
1667 cx,
1668 )
1669 });
1670 let worktree_a = project_a
1671 .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1672 .await
1673 .unwrap();
1674 worktree_a
1675 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1676 .await;
1677 let project_id = project_a
1678 .update(&mut cx_a, |project, _| project.next_remote_id())
1679 .await;
1680 project_a
1681 .update(&mut cx_a, |project, cx| project.share(cx))
1682 .await
1683 .unwrap();
1684
1685 // Join that project as client B
1686 let _project_b = Project::remote(
1687 project_id,
1688 client_b.clone(),
1689 client_b.user_store.clone(),
1690 lang_registry.clone(),
1691 fs.clone(),
1692 &mut cx_b.to_async(),
1693 )
1694 .await
1695 .unwrap();
1696
1697 // See that a guest has joined as client A.
1698 project_a
1699 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1700 .await;
1701
1702 // Drop client B's connection and ensure client A observes client B leaving the worktree.
1703 client_b.disconnect(&cx_b.to_async()).await.unwrap();
1704 project_a
1705 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1706 .await;
1707 }
1708
1709 #[gpui::test]
1710 async fn test_collaborating_with_diagnostics(
1711 mut cx_a: TestAppContext,
1712 mut cx_b: TestAppContext,
1713 ) {
1714 cx_a.foreground().forbid_parking();
1715 let mut lang_registry = Arc::new(LanguageRegistry::new());
1716 let fs = Arc::new(FakeFs::new());
1717
1718 // Set up a fake language server.
1719 let (language_server_config, mut fake_language_server) =
1720 LanguageServerConfig::fake(cx_a.background()).await;
1721 Arc::get_mut(&mut lang_registry)
1722 .unwrap()
1723 .add(Arc::new(Language::new(
1724 LanguageConfig {
1725 name: "Rust".to_string(),
1726 path_suffixes: vec!["rs".to_string()],
1727 language_server: Some(language_server_config),
1728 ..Default::default()
1729 },
1730 Some(tree_sitter_rust::language()),
1731 )));
1732
1733 // Connect to a server as 2 clients.
1734 let mut server = TestServer::start().await;
1735 let client_a = server.create_client(&mut cx_a, "user_a").await;
1736 let client_b = server.create_client(&mut cx_b, "user_b").await;
1737
1738 // Share a project as client A
1739 fs.insert_tree(
1740 "/a",
1741 json!({
1742 ".zed.toml": r#"collaborators = ["user_b"]"#,
1743 "a.rs": "let one = two",
1744 "other.rs": "",
1745 }),
1746 )
1747 .await;
1748 let project_a = cx_a.update(|cx| {
1749 Project::local(
1750 client_a.clone(),
1751 client_a.user_store.clone(),
1752 lang_registry.clone(),
1753 fs.clone(),
1754 cx,
1755 )
1756 });
1757 let worktree_a = project_a
1758 .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1759 .await
1760 .unwrap();
1761 worktree_a
1762 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1763 .await;
1764 let project_id = project_a
1765 .update(&mut cx_a, |project, _| project.next_remote_id())
1766 .await;
1767 project_a
1768 .update(&mut cx_a, |project, cx| project.share(cx))
1769 .await
1770 .unwrap();
1771
1772 // Cause the language server to start.
1773 let _ = cx_a
1774 .background()
1775 .spawn(worktree_a.update(&mut cx_a, |worktree, cx| {
1776 worktree.open_buffer("other.rs", cx)
1777 }))
1778 .await
1779 .unwrap();
1780
1781 // Simulate a language server reporting errors for a file.
1782 fake_language_server
1783 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1784 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1785 version: None,
1786 diagnostics: vec![
1787 lsp::Diagnostic {
1788 severity: Some(lsp::DiagnosticSeverity::ERROR),
1789 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1790 message: "message 1".to_string(),
1791 ..Default::default()
1792 },
1793 lsp::Diagnostic {
1794 severity: Some(lsp::DiagnosticSeverity::WARNING),
1795 range: lsp::Range::new(
1796 lsp::Position::new(0, 10),
1797 lsp::Position::new(0, 13),
1798 ),
1799 message: "message 2".to_string(),
1800 ..Default::default()
1801 },
1802 ],
1803 })
1804 .await;
1805
1806 // Join the worktree as client B.
1807 let project_b = Project::remote(
1808 project_id,
1809 client_b.clone(),
1810 client_b.user_store.clone(),
1811 lang_registry.clone(),
1812 fs.clone(),
1813 &mut cx_b.to_async(),
1814 )
1815 .await
1816 .unwrap();
1817 let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1818
1819 // Open the file with the errors.
1820 let buffer_b = cx_b
1821 .background()
1822 .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.rs", cx)))
1823 .await
1824 .unwrap();
1825
1826 buffer_b.read_with(&cx_b, |buffer, _| {
1827 assert_eq!(
1828 buffer
1829 .snapshot()
1830 .diagnostics_in_range::<_, Point>(0..buffer.len())
1831 .collect::<Vec<_>>(),
1832 &[
1833 DiagnosticEntry {
1834 range: Point::new(0, 4)..Point::new(0, 7),
1835 diagnostic: Diagnostic {
1836 group_id: 0,
1837 message: "message 1".to_string(),
1838 severity: lsp::DiagnosticSeverity::ERROR,
1839 is_primary: true,
1840 ..Default::default()
1841 }
1842 },
1843 DiagnosticEntry {
1844 range: Point::new(0, 10)..Point::new(0, 13),
1845 diagnostic: Diagnostic {
1846 group_id: 1,
1847 severity: lsp::DiagnosticSeverity::WARNING,
1848 message: "message 2".to_string(),
1849 is_primary: true,
1850 ..Default::default()
1851 }
1852 }
1853 ]
1854 );
1855 });
1856 }
1857
1858 #[gpui::test]
1859 async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1860 cx_a.foreground().forbid_parking();
1861
1862 // Connect to a server as 2 clients.
1863 let mut server = TestServer::start().await;
1864 let client_a = server.create_client(&mut cx_a, "user_a").await;
1865 let client_b = server.create_client(&mut cx_b, "user_b").await;
1866
1867 // Create an org that includes these 2 users.
1868 let db = &server.app_state.db;
1869 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1870 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
1871 .await
1872 .unwrap();
1873 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
1874 .await
1875 .unwrap();
1876
1877 // Create a channel that includes all the users.
1878 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1879 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
1880 .await
1881 .unwrap();
1882 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
1883 .await
1884 .unwrap();
1885 db.create_channel_message(
1886 channel_id,
1887 client_b.current_user_id(&cx_b),
1888 "hello A, it's B.",
1889 OffsetDateTime::now_utc(),
1890 1,
1891 )
1892 .await
1893 .unwrap();
1894
1895 let channels_a = cx_a
1896 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
1897 channels_a
1898 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
1899 .await;
1900 channels_a.read_with(&cx_a, |list, _| {
1901 assert_eq!(
1902 list.available_channels().unwrap(),
1903 &[ChannelDetails {
1904 id: channel_id.to_proto(),
1905 name: "test-channel".to_string()
1906 }]
1907 )
1908 });
1909 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
1910 this.get_channel(channel_id.to_proto(), cx).unwrap()
1911 });
1912 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
1913 channel_a
1914 .condition(&cx_a, |channel, _| {
1915 channel_messages(channel)
1916 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1917 })
1918 .await;
1919
1920 let channels_b = cx_b
1921 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
1922 channels_b
1923 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
1924 .await;
1925 channels_b.read_with(&cx_b, |list, _| {
1926 assert_eq!(
1927 list.available_channels().unwrap(),
1928 &[ChannelDetails {
1929 id: channel_id.to_proto(),
1930 name: "test-channel".to_string()
1931 }]
1932 )
1933 });
1934
1935 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
1936 this.get_channel(channel_id.to_proto(), cx).unwrap()
1937 });
1938 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
1939 channel_b
1940 .condition(&cx_b, |channel, _| {
1941 channel_messages(channel)
1942 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1943 })
1944 .await;
1945
1946 channel_a
1947 .update(&mut cx_a, |channel, cx| {
1948 channel
1949 .send_message("oh, hi B.".to_string(), cx)
1950 .unwrap()
1951 .detach();
1952 let task = channel.send_message("sup".to_string(), cx).unwrap();
1953 assert_eq!(
1954 channel_messages(channel),
1955 &[
1956 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
1957 ("user_a".to_string(), "oh, hi B.".to_string(), true),
1958 ("user_a".to_string(), "sup".to_string(), true)
1959 ]
1960 );
1961 task
1962 })
1963 .await
1964 .unwrap();
1965
1966 channel_b
1967 .condition(&cx_b, |channel, _| {
1968 channel_messages(channel)
1969 == [
1970 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
1971 ("user_a".to_string(), "oh, hi B.".to_string(), false),
1972 ("user_a".to_string(), "sup".to_string(), false),
1973 ]
1974 })
1975 .await;
1976
1977 assert_eq!(
1978 server
1979 .state()
1980 .await
1981 .channel(channel_id)
1982 .unwrap()
1983 .connection_ids
1984 .len(),
1985 2
1986 );
1987 cx_b.update(|_| drop(channel_b));
1988 server
1989 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
1990 .await;
1991
1992 cx_a.update(|_| drop(channel_a));
1993 server
1994 .condition(|state| state.channel(channel_id).is_none())
1995 .await;
1996 }
1997
1998 #[gpui::test]
1999 async fn test_chat_message_validation(mut cx_a: TestAppContext) {
2000 cx_a.foreground().forbid_parking();
2001
2002 let mut server = TestServer::start().await;
2003 let client_a = server.create_client(&mut cx_a, "user_a").await;
2004
2005 let db = &server.app_state.db;
2006 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2007 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2008 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2009 .await
2010 .unwrap();
2011 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2012 .await
2013 .unwrap();
2014
2015 let channels_a = cx_a
2016 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2017 channels_a
2018 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2019 .await;
2020 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2021 this.get_channel(channel_id.to_proto(), cx).unwrap()
2022 });
2023
2024 // Messages aren't allowed to be too long.
2025 channel_a
2026 .update(&mut cx_a, |channel, cx| {
2027 let long_body = "this is long.\n".repeat(1024);
2028 channel.send_message(long_body, cx).unwrap()
2029 })
2030 .await
2031 .unwrap_err();
2032
2033 // Messages aren't allowed to be blank.
2034 channel_a.update(&mut cx_a, |channel, cx| {
2035 channel.send_message(String::new(), cx).unwrap_err()
2036 });
2037
2038 // Leading and trailing whitespace are trimmed.
2039 channel_a
2040 .update(&mut cx_a, |channel, cx| {
2041 channel
2042 .send_message("\n surrounded by whitespace \n".to_string(), cx)
2043 .unwrap()
2044 })
2045 .await
2046 .unwrap();
2047 assert_eq!(
2048 db.get_channel_messages(channel_id, 10, None)
2049 .await
2050 .unwrap()
2051 .iter()
2052 .map(|m| &m.body)
2053 .collect::<Vec<_>>(),
2054 &["surrounded by whitespace"]
2055 );
2056 }
2057
2058 #[gpui::test]
2059 async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2060 cx_a.foreground().forbid_parking();
2061
2062 // Connect to a server as 2 clients.
2063 let mut server = TestServer::start().await;
2064 let client_a = server.create_client(&mut cx_a, "user_a").await;
2065 let client_b = server.create_client(&mut cx_b, "user_b").await;
2066 let mut status_b = client_b.status();
2067
2068 // Create an org that includes these 2 users.
2069 let db = &server.app_state.db;
2070 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2071 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2072 .await
2073 .unwrap();
2074 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2075 .await
2076 .unwrap();
2077
2078 // Create a channel that includes all the users.
2079 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2080 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2081 .await
2082 .unwrap();
2083 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2084 .await
2085 .unwrap();
2086 db.create_channel_message(
2087 channel_id,
2088 client_b.current_user_id(&cx_b),
2089 "hello A, it's B.",
2090 OffsetDateTime::now_utc(),
2091 2,
2092 )
2093 .await
2094 .unwrap();
2095
2096 let channels_a = cx_a
2097 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2098 channels_a
2099 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2100 .await;
2101
2102 channels_a.read_with(&cx_a, |list, _| {
2103 assert_eq!(
2104 list.available_channels().unwrap(),
2105 &[ChannelDetails {
2106 id: channel_id.to_proto(),
2107 name: "test-channel".to_string()
2108 }]
2109 )
2110 });
2111 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2112 this.get_channel(channel_id.to_proto(), cx).unwrap()
2113 });
2114 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2115 channel_a
2116 .condition(&cx_a, |channel, _| {
2117 channel_messages(channel)
2118 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2119 })
2120 .await;
2121
2122 let channels_b = cx_b
2123 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2124 channels_b
2125 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2126 .await;
2127 channels_b.read_with(&cx_b, |list, _| {
2128 assert_eq!(
2129 list.available_channels().unwrap(),
2130 &[ChannelDetails {
2131 id: channel_id.to_proto(),
2132 name: "test-channel".to_string()
2133 }]
2134 )
2135 });
2136
2137 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2138 this.get_channel(channel_id.to_proto(), cx).unwrap()
2139 });
2140 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2141 channel_b
2142 .condition(&cx_b, |channel, _| {
2143 channel_messages(channel)
2144 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2145 })
2146 .await;
2147
2148 // Disconnect client B, ensuring we can still access its cached channel data.
2149 server.forbid_connections();
2150 server.disconnect_client(client_b.current_user_id(&cx_b));
2151 while !matches!(
2152 status_b.recv().await,
2153 Some(client::Status::ReconnectionError { .. })
2154 ) {}
2155
2156 channels_b.read_with(&cx_b, |channels, _| {
2157 assert_eq!(
2158 channels.available_channels().unwrap(),
2159 [ChannelDetails {
2160 id: channel_id.to_proto(),
2161 name: "test-channel".to_string()
2162 }]
2163 )
2164 });
2165 channel_b.read_with(&cx_b, |channel, _| {
2166 assert_eq!(
2167 channel_messages(channel),
2168 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2169 )
2170 });
2171
2172 // Send a message from client B while it is disconnected.
2173 channel_b
2174 .update(&mut cx_b, |channel, cx| {
2175 let task = channel
2176 .send_message("can you see this?".to_string(), cx)
2177 .unwrap();
2178 assert_eq!(
2179 channel_messages(channel),
2180 &[
2181 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2182 ("user_b".to_string(), "can you see this?".to_string(), true)
2183 ]
2184 );
2185 task
2186 })
2187 .await
2188 .unwrap_err();
2189
2190 // Send a message from client A while B is disconnected.
2191 channel_a
2192 .update(&mut cx_a, |channel, cx| {
2193 channel
2194 .send_message("oh, hi B.".to_string(), cx)
2195 .unwrap()
2196 .detach();
2197 let task = channel.send_message("sup".to_string(), cx).unwrap();
2198 assert_eq!(
2199 channel_messages(channel),
2200 &[
2201 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2202 ("user_a".to_string(), "oh, hi B.".to_string(), true),
2203 ("user_a".to_string(), "sup".to_string(), true)
2204 ]
2205 );
2206 task
2207 })
2208 .await
2209 .unwrap();
2210
2211 // Give client B a chance to reconnect.
2212 server.allow_connections();
2213 cx_b.foreground().advance_clock(Duration::from_secs(10));
2214
2215 // Verify that B sees the new messages upon reconnection, as well as the message client B
2216 // sent while offline.
2217 channel_b
2218 .condition(&cx_b, |channel, _| {
2219 channel_messages(channel)
2220 == [
2221 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2222 ("user_a".to_string(), "oh, hi B.".to_string(), false),
2223 ("user_a".to_string(), "sup".to_string(), false),
2224 ("user_b".to_string(), "can you see this?".to_string(), false),
2225 ]
2226 })
2227 .await;
2228
2229 // Ensure client A and B can communicate normally after reconnection.
2230 channel_a
2231 .update(&mut cx_a, |channel, cx| {
2232 channel.send_message("you online?".to_string(), cx).unwrap()
2233 })
2234 .await
2235 .unwrap();
2236 channel_b
2237 .condition(&cx_b, |channel, _| {
2238 channel_messages(channel)
2239 == [
2240 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2241 ("user_a".to_string(), "oh, hi B.".to_string(), false),
2242 ("user_a".to_string(), "sup".to_string(), false),
2243 ("user_b".to_string(), "can you see this?".to_string(), false),
2244 ("user_a".to_string(), "you online?".to_string(), false),
2245 ]
2246 })
2247 .await;
2248
2249 channel_b
2250 .update(&mut cx_b, |channel, cx| {
2251 channel.send_message("yep".to_string(), cx).unwrap()
2252 })
2253 .await
2254 .unwrap();
2255 channel_a
2256 .condition(&cx_a, |channel, _| {
2257 channel_messages(channel)
2258 == [
2259 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2260 ("user_a".to_string(), "oh, hi B.".to_string(), false),
2261 ("user_a".to_string(), "sup".to_string(), false),
2262 ("user_b".to_string(), "can you see this?".to_string(), false),
2263 ("user_a".to_string(), "you online?".to_string(), false),
2264 ("user_b".to_string(), "yep".to_string(), false),
2265 ]
2266 })
2267 .await;
2268 }
2269
2270 #[gpui::test]
2271 async fn test_contacts(
2272 mut cx_a: TestAppContext,
2273 mut cx_b: TestAppContext,
2274 mut cx_c: TestAppContext,
2275 ) {
2276 cx_a.foreground().forbid_parking();
2277 let lang_registry = Arc::new(LanguageRegistry::new());
2278 let fs = Arc::new(FakeFs::new());
2279
2280 // Connect to a server as 3 clients.
2281 let mut server = TestServer::start().await;
2282 let client_a = server.create_client(&mut cx_a, "user_a").await;
2283 let client_b = server.create_client(&mut cx_b, "user_b").await;
2284 let client_c = server.create_client(&mut cx_c, "user_c").await;
2285
2286 // Share a worktree as client A.
2287 fs.insert_tree(
2288 "/a",
2289 json!({
2290 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2291 }),
2292 )
2293 .await;
2294
2295 let project_a = cx_a.update(|cx| {
2296 Project::local(
2297 client_a.clone(),
2298 client_a.user_store.clone(),
2299 lang_registry.clone(),
2300 fs.clone(),
2301 cx,
2302 )
2303 });
2304 let worktree_a = project_a
2305 .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
2306 .await
2307 .unwrap();
2308 worktree_a
2309 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2310 .await;
2311
2312 client_a
2313 .user_store
2314 .condition(&cx_a, |user_store, _| {
2315 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2316 })
2317 .await;
2318 client_b
2319 .user_store
2320 .condition(&cx_b, |user_store, _| {
2321 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2322 })
2323 .await;
2324 client_c
2325 .user_store
2326 .condition(&cx_c, |user_store, _| {
2327 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2328 })
2329 .await;
2330
2331 let project_id = project_a
2332 .update(&mut cx_a, |project, _| project.next_remote_id())
2333 .await;
2334 project_a
2335 .update(&mut cx_a, |project, cx| project.share(cx))
2336 .await
2337 .unwrap();
2338
2339 let _project_b = Project::remote(
2340 project_id,
2341 client_b.clone(),
2342 client_b.user_store.clone(),
2343 lang_registry.clone(),
2344 fs.clone(),
2345 &mut cx_b.to_async(),
2346 )
2347 .await
2348 .unwrap();
2349
2350 client_a
2351 .user_store
2352 .condition(&cx_a, |user_store, _| {
2353 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2354 })
2355 .await;
2356 client_b
2357 .user_store
2358 .condition(&cx_b, |user_store, _| {
2359 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2360 })
2361 .await;
2362 client_c
2363 .user_store
2364 .condition(&cx_c, |user_store, _| {
2365 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2366 })
2367 .await;
2368
2369 project_a
2370 .condition(&cx_a, |project, _| {
2371 project.collaborators().contains_key(&client_b.peer_id)
2372 })
2373 .await;
2374
2375 cx_a.update(move |_| drop(project_a));
2376 client_a
2377 .user_store
2378 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
2379 .await;
2380 client_b
2381 .user_store
2382 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
2383 .await;
2384 client_c
2385 .user_store
2386 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
2387 .await;
2388
2389 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
2390 user_store
2391 .contacts()
2392 .iter()
2393 .map(|contact| {
2394 let worktrees = contact
2395 .projects
2396 .iter()
2397 .map(|p| {
2398 (
2399 p.worktree_root_names[0].as_str(),
2400 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
2401 )
2402 })
2403 .collect();
2404 (contact.user.github_login.as_str(), worktrees)
2405 })
2406 .collect()
2407 }
2408 }
2409
2410 struct TestServer {
2411 peer: Arc<Peer>,
2412 app_state: Arc<AppState>,
2413 server: Arc<Server>,
2414 notifications: mpsc::Receiver<()>,
2415 connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
2416 forbid_connections: Arc<AtomicBool>,
2417 _test_db: TestDb,
2418 }
2419
2420 impl TestServer {
2421 async fn start() -> Self {
2422 let test_db = TestDb::new();
2423 let app_state = Self::build_app_state(&test_db).await;
2424 let peer = Peer::new();
2425 let notifications = mpsc::channel(128);
2426 let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
2427 Self {
2428 peer,
2429 app_state,
2430 server,
2431 notifications: notifications.1,
2432 connection_killers: Default::default(),
2433 forbid_connections: Default::default(),
2434 _test_db: test_db,
2435 }
2436 }
2437
2438 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
2439 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
2440 let client_name = name.to_string();
2441 let mut client = Client::new();
2442 let server = self.server.clone();
2443 let connection_killers = self.connection_killers.clone();
2444 let forbid_connections = self.forbid_connections.clone();
2445 let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
2446
2447 Arc::get_mut(&mut client)
2448 .unwrap()
2449 .override_authenticate(move |cx| {
2450 cx.spawn(|_| async move {
2451 let access_token = "the-token".to_string();
2452 Ok(Credentials {
2453 user_id: user_id.0 as u64,
2454 access_token,
2455 })
2456 })
2457 })
2458 .override_establish_connection(move |credentials, cx| {
2459 assert_eq!(credentials.user_id, user_id.0 as u64);
2460 assert_eq!(credentials.access_token, "the-token");
2461
2462 let server = server.clone();
2463 let connection_killers = connection_killers.clone();
2464 let forbid_connections = forbid_connections.clone();
2465 let client_name = client_name.clone();
2466 let connection_id_tx = connection_id_tx.clone();
2467 cx.spawn(move |cx| async move {
2468 if forbid_connections.load(SeqCst) {
2469 Err(EstablishConnectionError::other(anyhow!(
2470 "server is forbidding connections"
2471 )))
2472 } else {
2473 let (client_conn, server_conn, kill_conn) = Connection::in_memory();
2474 connection_killers.lock().insert(user_id, kill_conn);
2475 cx.background()
2476 .spawn(server.handle_connection(
2477 server_conn,
2478 client_name,
2479 user_id,
2480 Some(connection_id_tx),
2481 ))
2482 .detach();
2483 Ok(client_conn)
2484 }
2485 })
2486 });
2487
2488 let http = FakeHttpClient::new(|_| async move { Ok(surf::http::Response::new(404)) });
2489 client
2490 .authenticate_and_connect(&cx.to_async())
2491 .await
2492 .unwrap();
2493
2494 let peer_id = PeerId(connection_id_rx.recv().await.unwrap().0);
2495 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
2496 let mut authed_user =
2497 user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
2498 while authed_user.recv().await.unwrap().is_none() {}
2499
2500 TestClient {
2501 client,
2502 peer_id,
2503 user_store,
2504 }
2505 }
2506
2507 fn disconnect_client(&self, user_id: UserId) {
2508 if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
2509 let _ = kill_conn.try_send(Some(()));
2510 }
2511 }
2512
2513 fn forbid_connections(&self) {
2514 self.forbid_connections.store(true, SeqCst);
2515 }
2516
2517 fn allow_connections(&self) {
2518 self.forbid_connections.store(false, SeqCst);
2519 }
2520
2521 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
2522 let mut config = Config::default();
2523 config.session_secret = "a".repeat(32);
2524 config.database_url = test_db.url.clone();
2525 let github_client = github::AppClient::test();
2526 Arc::new(AppState {
2527 db: test_db.db().clone(),
2528 handlebars: Default::default(),
2529 auth_client: auth::build_client("", ""),
2530 repo_client: github::RepoClient::test(&github_client),
2531 github_client,
2532 config,
2533 })
2534 }
2535
2536 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
2537 self.server.store.read()
2538 }
2539
2540 async fn condition<F>(&mut self, mut predicate: F)
2541 where
2542 F: FnMut(&Store) -> bool,
2543 {
2544 async_std::future::timeout(Duration::from_millis(500), async {
2545 while !(predicate)(&*self.server.store.read()) {
2546 self.notifications.recv().await;
2547 }
2548 })
2549 .await
2550 .expect("condition timed out");
2551 }
2552 }
2553
2554 impl Drop for TestServer {
2555 fn drop(&mut self) {
2556 task::block_on(self.peer.reset());
2557 }
2558 }
2559
2560 struct TestClient {
2561 client: Arc<Client>,
2562 pub peer_id: PeerId,
2563 pub user_store: ModelHandle<UserStore>,
2564 }
2565
2566 impl Deref for TestClient {
2567 type Target = Arc<Client>;
2568
2569 fn deref(&self) -> &Self::Target {
2570 &self.client
2571 }
2572 }
2573
2574 impl TestClient {
2575 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
2576 UserId::from_proto(
2577 self.user_store
2578 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
2579 )
2580 }
2581 }
2582
2583 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
2584 channel
2585 .messages()
2586 .cursor::<()>()
2587 .map(|m| {
2588 (
2589 m.sender.github_login.clone(),
2590 m.body.clone(),
2591 m.is_pending(),
2592 )
2593 })
2594 .collect()
2595 }
2596
2597 struct EmptyView;
2598
2599 impl gpui::Entity for EmptyView {
2600 type Event = ();
2601 }
2602
2603 impl gpui::View for EmptyView {
2604 fn ui_name() -> &'static str {
2605 "empty view"
2606 }
2607
2608 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
2609 gpui::Element::boxed(gpui::elements::Empty)
2610 }
2611 }
2612}