1mod store;
2
3use super::{
4 auth::process_auth_header,
5 db::{ChannelId, MessageId, UserId},
6 AppState,
7};
8use anyhow::anyhow;
9use async_std::task;
10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
11use collections::{HashMap, HashSet};
12use futures::{future::BoxFuture, FutureExt};
13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
14use postage::{mpsc, prelude::Sink as _, prelude::Stream as _};
15use rpc::{
16 proto::{self, AnyTypedEnvelope, EnvelopedMessage},
17 Connection, ConnectionId, Peer, TypedEnvelope,
18};
19use sha1::{Digest as _, Sha1};
20use std::{any::TypeId, future::Future, mem, sync::Arc, time::Instant};
21use store::{Store, Worktree};
22use surf::StatusCode;
23use tide::log;
24use tide::{
25 http::headers::{HeaderName, CONNECTION, UPGRADE},
26 Request, Response,
27};
28use time::OffsetDateTime;
29
30type MessageHandler = Box<
31 dyn Send
32 + Sync
33 + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
34>;
35
36pub struct Server {
37 peer: Arc<Peer>,
38 store: RwLock<Store>,
39 app_state: Arc<AppState>,
40 handlers: HashMap<TypeId, MessageHandler>,
41 notifications: Option<mpsc::Sender<()>>,
42}
43
44const MESSAGE_COUNT_PER_PAGE: usize = 100;
45const MAX_MESSAGE_LEN: usize = 1024;
46const NO_SUCH_PROJECT: &'static str = "no such project";
47
48impl Server {
49 pub fn new(
50 app_state: Arc<AppState>,
51 peer: Arc<Peer>,
52 notifications: Option<mpsc::Sender<()>>,
53 ) -> Arc<Self> {
54 let mut server = Self {
55 peer,
56 app_state,
57 store: Default::default(),
58 handlers: Default::default(),
59 notifications,
60 };
61
62 server
63 .add_handler(Server::ping)
64 .add_handler(Server::register_project)
65 .add_handler(Server::unregister_project)
66 .add_handler(Server::share_project)
67 .add_handler(Server::unshare_project)
68 .add_handler(Server::join_project)
69 .add_handler(Server::leave_project)
70 .add_handler(Server::register_worktree)
71 .add_handler(Server::unregister_worktree)
72 .add_handler(Server::share_worktree)
73 .add_handler(Server::update_worktree)
74 .add_handler(Server::update_diagnostic_summary)
75 .add_handler(Server::disk_based_diagnostics_updated)
76 .add_handler(Server::open_buffer)
77 .add_handler(Server::close_buffer)
78 .add_handler(Server::update_buffer)
79 .add_handler(Server::buffer_saved)
80 .add_handler(Server::save_buffer)
81 .add_handler(Server::get_channels)
82 .add_handler(Server::get_users)
83 .add_handler(Server::join_channel)
84 .add_handler(Server::leave_channel)
85 .add_handler(Server::send_channel_message)
86 .add_handler(Server::get_channel_messages);
87
88 Arc::new(server)
89 }
90
91 fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
92 where
93 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
94 Fut: 'static + Send + Future<Output = tide::Result<()>>,
95 M: EnvelopedMessage,
96 {
97 let prev_handler = self.handlers.insert(
98 TypeId::of::<M>(),
99 Box::new(move |server, envelope| {
100 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
101 (handler)(server, *envelope).boxed()
102 }),
103 );
104 if prev_handler.is_some() {
105 panic!("registered a handler for the same message twice");
106 }
107 self
108 }
109
110 pub fn handle_connection(
111 self: &Arc<Self>,
112 connection: Connection,
113 addr: String,
114 user_id: UserId,
115 mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
116 ) -> impl Future<Output = ()> {
117 let mut this = self.clone();
118 async move {
119 let (connection_id, handle_io, mut incoming_rx) =
120 this.peer.add_connection(connection).await;
121
122 if let Some(send_connection_id) = send_connection_id.as_mut() {
123 let _ = send_connection_id.send(connection_id).await;
124 }
125
126 this.state_mut().add_connection(connection_id, user_id);
127 if let Err(err) = this.update_contacts_for_users(&[user_id]).await {
128 log::error!("error updating contacts for {:?}: {}", user_id, err);
129 }
130
131 let handle_io = handle_io.fuse();
132 futures::pin_mut!(handle_io);
133 loop {
134 let next_message = incoming_rx.recv().fuse();
135 futures::pin_mut!(next_message);
136 futures::select_biased! {
137 message = next_message => {
138 if let Some(message) = message {
139 let start_time = Instant::now();
140 log::info!("RPC message received: {}", message.payload_type_name());
141 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
142 if let Err(err) = (handler)(this.clone(), message).await {
143 log::error!("error handling message: {:?}", err);
144 } else {
145 log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
146 }
147
148 if let Some(mut notifications) = this.notifications.clone() {
149 let _ = notifications.send(()).await;
150 }
151 } else {
152 log::warn!("unhandled message: {}", message.payload_type_name());
153 }
154 } else {
155 log::info!("rpc connection closed {:?}", addr);
156 break;
157 }
158 }
159 handle_io = handle_io => {
160 if let Err(err) = handle_io {
161 log::error!("error handling rpc connection {:?} - {:?}", addr, err);
162 }
163 break;
164 }
165 }
166 }
167
168 if let Err(err) = this.sign_out(connection_id).await {
169 log::error!("error signing out connection {:?} - {:?}", addr, err);
170 }
171 }
172 }
173
174 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
175 self.peer.disconnect(connection_id).await;
176 let removed_connection = self.state_mut().remove_connection(connection_id)?;
177
178 for (project_id, project) in removed_connection.hosted_projects {
179 if let Some(share) = project.share {
180 broadcast(
181 connection_id,
182 share.guests.keys().copied().collect(),
183 |conn_id| {
184 self.peer
185 .send(conn_id, proto::UnshareProject { project_id })
186 },
187 )
188 .await?;
189 }
190 }
191
192 for (project_id, peer_ids) in removed_connection.guest_project_ids {
193 broadcast(connection_id, peer_ids, |conn_id| {
194 self.peer.send(
195 conn_id,
196 proto::RemoveProjectCollaborator {
197 project_id,
198 peer_id: connection_id.0,
199 },
200 )
201 })
202 .await?;
203 }
204
205 self.update_contacts_for_users(removed_connection.contact_ids.iter())
206 .await?;
207
208 Ok(())
209 }
210
211 async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
212 self.peer.respond(request.receipt(), proto::Ack {}).await?;
213 Ok(())
214 }
215
216 async fn register_project(
217 mut self: Arc<Server>,
218 request: TypedEnvelope<proto::RegisterProject>,
219 ) -> tide::Result<()> {
220 let project_id = {
221 let mut state = self.state_mut();
222 let user_id = state.user_id_for_connection(request.sender_id)?;
223 state.register_project(request.sender_id, user_id)
224 };
225 self.peer
226 .respond(
227 request.receipt(),
228 proto::RegisterProjectResponse { project_id },
229 )
230 .await?;
231 Ok(())
232 }
233
234 async fn unregister_project(
235 mut self: Arc<Server>,
236 request: TypedEnvelope<proto::UnregisterProject>,
237 ) -> tide::Result<()> {
238 let project = self
239 .state_mut()
240 .unregister_project(request.payload.project_id, request.sender_id)
241 .ok_or_else(|| anyhow!("no such project"))?;
242 self.update_contacts_for_users(project.authorized_user_ids().iter())
243 .await?;
244 Ok(())
245 }
246
247 async fn share_project(
248 mut self: Arc<Server>,
249 request: TypedEnvelope<proto::ShareProject>,
250 ) -> tide::Result<()> {
251 self.state_mut()
252 .share_project(request.payload.project_id, request.sender_id);
253 self.peer.respond(request.receipt(), proto::Ack {}).await?;
254 Ok(())
255 }
256
257 async fn unshare_project(
258 mut self: Arc<Server>,
259 request: TypedEnvelope<proto::UnshareProject>,
260 ) -> tide::Result<()> {
261 let project_id = request.payload.project_id;
262 let project = self
263 .state_mut()
264 .unshare_project(project_id, request.sender_id)?;
265
266 broadcast(request.sender_id, project.connection_ids, |conn_id| {
267 self.peer
268 .send(conn_id, proto::UnshareProject { project_id })
269 })
270 .await?;
271 self.update_contacts_for_users(&project.authorized_user_ids)
272 .await?;
273
274 Ok(())
275 }
276
277 async fn join_project(
278 mut self: Arc<Server>,
279 request: TypedEnvelope<proto::JoinProject>,
280 ) -> tide::Result<()> {
281 let project_id = request.payload.project_id;
282
283 let user_id = self.state().user_id_for_connection(request.sender_id)?;
284 let response_data = self
285 .state_mut()
286 .join_project(request.sender_id, user_id, project_id)
287 .and_then(|joined| {
288 let share = joined.project.share()?;
289 let peer_count = share.guests.len();
290 let mut collaborators = Vec::with_capacity(peer_count);
291 collaborators.push(proto::Collaborator {
292 peer_id: joined.project.host_connection_id.0,
293 replica_id: 0,
294 user_id: joined.project.host_user_id.to_proto(),
295 });
296 let worktrees = joined
297 .project
298 .worktrees
299 .iter()
300 .filter_map(|(id, worktree)| {
301 worktree.share.as_ref().map(|share| proto::Worktree {
302 id: *id,
303 root_name: worktree.root_name.clone(),
304 entries: share.entries.values().cloned().collect(),
305 })
306 })
307 .collect();
308 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
309 if *peer_conn_id != request.sender_id {
310 collaborators.push(proto::Collaborator {
311 peer_id: peer_conn_id.0,
312 replica_id: *peer_replica_id as u32,
313 user_id: peer_user_id.to_proto(),
314 });
315 }
316 }
317 let response = proto::JoinProjectResponse {
318 worktrees,
319 replica_id: joined.replica_id as u32,
320 collaborators,
321 };
322 let connection_ids = joined.project.connection_ids();
323 let contact_user_ids = joined.project.authorized_user_ids();
324 Ok((response, connection_ids, contact_user_ids))
325 });
326
327 match response_data {
328 Ok((response, connection_ids, contact_user_ids)) => {
329 broadcast(request.sender_id, connection_ids, |conn_id| {
330 self.peer.send(
331 conn_id,
332 proto::AddProjectCollaborator {
333 project_id: project_id,
334 collaborator: Some(proto::Collaborator {
335 peer_id: request.sender_id.0,
336 replica_id: response.replica_id,
337 user_id: user_id.to_proto(),
338 }),
339 },
340 )
341 })
342 .await?;
343 self.peer.respond(request.receipt(), response).await?;
344 self.update_contacts_for_users(&contact_user_ids).await?;
345 }
346 Err(error) => {
347 self.peer
348 .respond_with_error(
349 request.receipt(),
350 proto::Error {
351 message: error.to_string(),
352 },
353 )
354 .await?;
355 }
356 }
357
358 Ok(())
359 }
360
361 async fn leave_project(
362 mut self: Arc<Server>,
363 request: TypedEnvelope<proto::LeaveProject>,
364 ) -> tide::Result<()> {
365 let sender_id = request.sender_id;
366 let project_id = request.payload.project_id;
367 let worktree = self.state_mut().leave_project(sender_id, project_id);
368 if let Some(worktree) = worktree {
369 broadcast(sender_id, worktree.connection_ids, |conn_id| {
370 self.peer.send(
371 conn_id,
372 proto::RemoveProjectCollaborator {
373 project_id,
374 peer_id: sender_id.0,
375 },
376 )
377 })
378 .await?;
379 self.update_contacts_for_users(&worktree.authorized_user_ids)
380 .await?;
381 }
382 Ok(())
383 }
384
385 async fn register_worktree(
386 mut self: Arc<Server>,
387 request: TypedEnvelope<proto::RegisterWorktree>,
388 ) -> tide::Result<()> {
389 let receipt = request.receipt();
390 let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
391
392 let mut contact_user_ids = HashSet::default();
393 contact_user_ids.insert(host_user_id);
394 for github_login in request.payload.authorized_logins {
395 match self.app_state.db.create_user(&github_login, false).await {
396 Ok(contact_user_id) => {
397 contact_user_ids.insert(contact_user_id);
398 }
399 Err(err) => {
400 let message = err.to_string();
401 self.peer
402 .respond_with_error(receipt, proto::Error { message })
403 .await?;
404 return Ok(());
405 }
406 }
407 }
408
409 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
410 let ok = self.state_mut().register_worktree(
411 request.payload.project_id,
412 request.payload.worktree_id,
413 Worktree {
414 authorized_user_ids: contact_user_ids.clone(),
415 root_name: request.payload.root_name,
416 share: None,
417 },
418 );
419
420 if ok {
421 self.peer.respond(receipt, proto::Ack {}).await?;
422 self.update_contacts_for_users(&contact_user_ids).await?;
423 } else {
424 self.peer
425 .respond_with_error(
426 receipt,
427 proto::Error {
428 message: NO_SUCH_PROJECT.to_string(),
429 },
430 )
431 .await?;
432 }
433
434 Ok(())
435 }
436
437 async fn unregister_worktree(
438 mut self: Arc<Server>,
439 request: TypedEnvelope<proto::UnregisterWorktree>,
440 ) -> tide::Result<()> {
441 let project_id = request.payload.project_id;
442 let worktree_id = request.payload.worktree_id;
443 let (worktree, guest_connection_ids) =
444 self.state_mut()
445 .unregister_worktree(project_id, worktree_id, request.sender_id)?;
446
447 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
448 self.peer.send(
449 conn_id,
450 proto::UnregisterWorktree {
451 project_id,
452 worktree_id,
453 },
454 )
455 })
456 .await?;
457 self.update_contacts_for_users(&worktree.authorized_user_ids)
458 .await?;
459 Ok(())
460 }
461
462 async fn share_worktree(
463 mut self: Arc<Server>,
464 mut request: TypedEnvelope<proto::ShareWorktree>,
465 ) -> tide::Result<()> {
466 let worktree = request
467 .payload
468 .worktree
469 .as_mut()
470 .ok_or_else(|| anyhow!("missing worktree"))?;
471 let entries = mem::take(&mut worktree.entries)
472 .into_iter()
473 .map(|entry| (entry.id, entry))
474 .collect();
475
476 let contact_user_ids = self.state_mut().share_worktree(
477 request.payload.project_id,
478 worktree.id,
479 request.sender_id,
480 entries,
481 );
482 if let Some(contact_user_ids) = contact_user_ids {
483 self.peer.respond(request.receipt(), proto::Ack {}).await?;
484 self.update_contacts_for_users(&contact_user_ids).await?;
485 } else {
486 self.peer
487 .respond_with_error(
488 request.receipt(),
489 proto::Error {
490 message: "no such worktree".to_string(),
491 },
492 )
493 .await?;
494 }
495 Ok(())
496 }
497
498 async fn update_worktree(
499 mut self: Arc<Server>,
500 request: TypedEnvelope<proto::UpdateWorktree>,
501 ) -> tide::Result<()> {
502 let connection_ids = self
503 .state_mut()
504 .update_worktree(
505 request.sender_id,
506 request.payload.project_id,
507 request.payload.worktree_id,
508 &request.payload.removed_entries,
509 &request.payload.updated_entries,
510 )
511 .ok_or_else(|| anyhow!("no such worktree"))?;
512
513 broadcast(request.sender_id, connection_ids, |connection_id| {
514 self.peer
515 .forward_send(request.sender_id, connection_id, request.payload.clone())
516 })
517 .await?;
518
519 Ok(())
520 }
521
522 async fn update_diagnostic_summary(
523 self: Arc<Server>,
524 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
525 ) -> tide::Result<()> {
526 let receiver_ids = self
527 .state()
528 .project_connection_ids(request.payload.project_id, request.sender_id)
529 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
530 broadcast(request.sender_id, receiver_ids, |connection_id| {
531 self.peer
532 .forward_send(request.sender_id, connection_id, request.payload.clone())
533 })
534 .await?;
535 Ok(())
536 }
537
538 async fn disk_based_diagnostics_updated(
539 self: Arc<Server>,
540 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
541 ) -> tide::Result<()> {
542 let receiver_ids = self
543 .state()
544 .project_connection_ids(request.payload.project_id, request.sender_id)
545 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
546 broadcast(request.sender_id, receiver_ids, |connection_id| {
547 self.peer
548 .forward_send(request.sender_id, connection_id, request.payload.clone())
549 })
550 .await?;
551 Ok(())
552 }
553
554 async fn open_buffer(
555 self: Arc<Server>,
556 request: TypedEnvelope<proto::OpenBuffer>,
557 ) -> tide::Result<()> {
558 let receipt = request.receipt();
559 let host_connection_id = self
560 .state()
561 .read_project(request.payload.project_id, request.sender_id)
562 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
563 .host_connection_id;
564 let response = self
565 .peer
566 .forward_request(request.sender_id, host_connection_id, request.payload)
567 .await?;
568 self.peer.respond(receipt, response).await?;
569 Ok(())
570 }
571
572 async fn close_buffer(
573 self: Arc<Server>,
574 request: TypedEnvelope<proto::CloseBuffer>,
575 ) -> tide::Result<()> {
576 let host_connection_id = self
577 .state()
578 .read_project(request.payload.project_id, request.sender_id)
579 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
580 .host_connection_id;
581 self.peer
582 .forward_send(request.sender_id, host_connection_id, request.payload)
583 .await?;
584 Ok(())
585 }
586
587 async fn save_buffer(
588 self: Arc<Server>,
589 request: TypedEnvelope<proto::SaveBuffer>,
590 ) -> tide::Result<()> {
591 let host;
592 let guests;
593 {
594 let state = self.state();
595 let project = state
596 .read_project(request.payload.project_id, request.sender_id)
597 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
598 host = project.host_connection_id;
599 guests = project.guest_connection_ids()
600 }
601
602 let sender = request.sender_id;
603 let receipt = request.receipt();
604 let response = self
605 .peer
606 .forward_request(sender, host, request.payload.clone())
607 .await?;
608
609 broadcast(host, guests, |conn_id| {
610 let response = response.clone();
611 let peer = &self.peer;
612 async move {
613 if conn_id == sender {
614 peer.respond(receipt, response).await
615 } else {
616 peer.forward_send(host, conn_id, response).await
617 }
618 }
619 })
620 .await?;
621
622 Ok(())
623 }
624
625 async fn update_buffer(
626 self: Arc<Server>,
627 request: TypedEnvelope<proto::UpdateBuffer>,
628 ) -> tide::Result<()> {
629 let receiver_ids = self
630 .state()
631 .project_connection_ids(request.payload.project_id, request.sender_id)
632 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
633 broadcast(request.sender_id, receiver_ids, |connection_id| {
634 self.peer
635 .forward_send(request.sender_id, connection_id, request.payload.clone())
636 })
637 .await?;
638 self.peer.respond(request.receipt(), proto::Ack {}).await?;
639 Ok(())
640 }
641
642 async fn buffer_saved(
643 self: Arc<Server>,
644 request: TypedEnvelope<proto::BufferSaved>,
645 ) -> tide::Result<()> {
646 let receiver_ids = self
647 .state()
648 .project_connection_ids(request.payload.project_id, request.sender_id)
649 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
650 broadcast(request.sender_id, receiver_ids, |connection_id| {
651 self.peer
652 .forward_send(request.sender_id, connection_id, request.payload.clone())
653 })
654 .await?;
655 Ok(())
656 }
657
658 async fn get_channels(
659 self: Arc<Server>,
660 request: TypedEnvelope<proto::GetChannels>,
661 ) -> tide::Result<()> {
662 let user_id = self.state().user_id_for_connection(request.sender_id)?;
663 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
664 self.peer
665 .respond(
666 request.receipt(),
667 proto::GetChannelsResponse {
668 channels: channels
669 .into_iter()
670 .map(|chan| proto::Channel {
671 id: chan.id.to_proto(),
672 name: chan.name,
673 })
674 .collect(),
675 },
676 )
677 .await?;
678 Ok(())
679 }
680
681 async fn get_users(
682 self: Arc<Server>,
683 request: TypedEnvelope<proto::GetUsers>,
684 ) -> tide::Result<()> {
685 let receipt = request.receipt();
686 let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
687 let users = self
688 .app_state
689 .db
690 .get_users_by_ids(user_ids)
691 .await?
692 .into_iter()
693 .map(|user| proto::User {
694 id: user.id.to_proto(),
695 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
696 github_login: user.github_login,
697 })
698 .collect();
699 self.peer
700 .respond(receipt, proto::GetUsersResponse { users })
701 .await?;
702 Ok(())
703 }
704
705 async fn update_contacts_for_users<'a>(
706 self: &Arc<Server>,
707 user_ids: impl IntoIterator<Item = &'a UserId>,
708 ) -> tide::Result<()> {
709 let mut send_futures = Vec::new();
710
711 {
712 let state = self.state();
713 for user_id in user_ids {
714 let contacts = state.contacts_for_user(*user_id);
715 for connection_id in state.connection_ids_for_user(*user_id) {
716 send_futures.push(self.peer.send(
717 connection_id,
718 proto::UpdateContacts {
719 contacts: contacts.clone(),
720 },
721 ));
722 }
723 }
724 }
725 futures::future::try_join_all(send_futures).await?;
726
727 Ok(())
728 }
729
730 async fn join_channel(
731 mut self: Arc<Self>,
732 request: TypedEnvelope<proto::JoinChannel>,
733 ) -> tide::Result<()> {
734 let user_id = self.state().user_id_for_connection(request.sender_id)?;
735 let channel_id = ChannelId::from_proto(request.payload.channel_id);
736 if !self
737 .app_state
738 .db
739 .can_user_access_channel(user_id, channel_id)
740 .await?
741 {
742 Err(anyhow!("access denied"))?;
743 }
744
745 self.state_mut().join_channel(request.sender_id, channel_id);
746 let messages = self
747 .app_state
748 .db
749 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
750 .await?
751 .into_iter()
752 .map(|msg| proto::ChannelMessage {
753 id: msg.id.to_proto(),
754 body: msg.body,
755 timestamp: msg.sent_at.unix_timestamp() as u64,
756 sender_id: msg.sender_id.to_proto(),
757 nonce: Some(msg.nonce.as_u128().into()),
758 })
759 .collect::<Vec<_>>();
760 self.peer
761 .respond(
762 request.receipt(),
763 proto::JoinChannelResponse {
764 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
765 messages,
766 },
767 )
768 .await?;
769 Ok(())
770 }
771
772 async fn leave_channel(
773 mut self: Arc<Self>,
774 request: TypedEnvelope<proto::LeaveChannel>,
775 ) -> tide::Result<()> {
776 let user_id = self.state().user_id_for_connection(request.sender_id)?;
777 let channel_id = ChannelId::from_proto(request.payload.channel_id);
778 if !self
779 .app_state
780 .db
781 .can_user_access_channel(user_id, channel_id)
782 .await?
783 {
784 Err(anyhow!("access denied"))?;
785 }
786
787 self.state_mut()
788 .leave_channel(request.sender_id, channel_id);
789
790 Ok(())
791 }
792
793 async fn send_channel_message(
794 self: Arc<Self>,
795 request: TypedEnvelope<proto::SendChannelMessage>,
796 ) -> tide::Result<()> {
797 let receipt = request.receipt();
798 let channel_id = ChannelId::from_proto(request.payload.channel_id);
799 let user_id;
800 let connection_ids;
801 {
802 let state = self.state();
803 user_id = state.user_id_for_connection(request.sender_id)?;
804 if let Some(ids) = state.channel_connection_ids(channel_id) {
805 connection_ids = ids;
806 } else {
807 return Ok(());
808 }
809 }
810
811 // Validate the message body.
812 let body = request.payload.body.trim().to_string();
813 if body.len() > MAX_MESSAGE_LEN {
814 self.peer
815 .respond_with_error(
816 receipt,
817 proto::Error {
818 message: "message is too long".to_string(),
819 },
820 )
821 .await?;
822 return Ok(());
823 }
824 if body.is_empty() {
825 self.peer
826 .respond_with_error(
827 receipt,
828 proto::Error {
829 message: "message can't be blank".to_string(),
830 },
831 )
832 .await?;
833 return Ok(());
834 }
835
836 let timestamp = OffsetDateTime::now_utc();
837 let nonce = if let Some(nonce) = request.payload.nonce {
838 nonce
839 } else {
840 self.peer
841 .respond_with_error(
842 receipt,
843 proto::Error {
844 message: "nonce can't be blank".to_string(),
845 },
846 )
847 .await?;
848 return Ok(());
849 };
850
851 let message_id = self
852 .app_state
853 .db
854 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
855 .await?
856 .to_proto();
857 let message = proto::ChannelMessage {
858 sender_id: user_id.to_proto(),
859 id: message_id,
860 body,
861 timestamp: timestamp.unix_timestamp() as u64,
862 nonce: Some(nonce),
863 };
864 broadcast(request.sender_id, connection_ids, |conn_id| {
865 self.peer.send(
866 conn_id,
867 proto::ChannelMessageSent {
868 channel_id: channel_id.to_proto(),
869 message: Some(message.clone()),
870 },
871 )
872 })
873 .await?;
874 self.peer
875 .respond(
876 receipt,
877 proto::SendChannelMessageResponse {
878 message: Some(message),
879 },
880 )
881 .await?;
882 Ok(())
883 }
884
885 async fn get_channel_messages(
886 self: Arc<Self>,
887 request: TypedEnvelope<proto::GetChannelMessages>,
888 ) -> tide::Result<()> {
889 let user_id = self.state().user_id_for_connection(request.sender_id)?;
890 let channel_id = ChannelId::from_proto(request.payload.channel_id);
891 if !self
892 .app_state
893 .db
894 .can_user_access_channel(user_id, channel_id)
895 .await?
896 {
897 Err(anyhow!("access denied"))?;
898 }
899
900 let messages = self
901 .app_state
902 .db
903 .get_channel_messages(
904 channel_id,
905 MESSAGE_COUNT_PER_PAGE,
906 Some(MessageId::from_proto(request.payload.before_message_id)),
907 )
908 .await?
909 .into_iter()
910 .map(|msg| proto::ChannelMessage {
911 id: msg.id.to_proto(),
912 body: msg.body,
913 timestamp: msg.sent_at.unix_timestamp() as u64,
914 sender_id: msg.sender_id.to_proto(),
915 nonce: Some(msg.nonce.as_u128().into()),
916 })
917 .collect::<Vec<_>>();
918 self.peer
919 .respond(
920 request.receipt(),
921 proto::GetChannelMessagesResponse {
922 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
923 messages,
924 },
925 )
926 .await?;
927 Ok(())
928 }
929
930 fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
931 self.store.read()
932 }
933
934 fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
935 self.store.write()
936 }
937}
938
939pub async fn broadcast<F, T>(
940 sender_id: ConnectionId,
941 receiver_ids: Vec<ConnectionId>,
942 mut f: F,
943) -> anyhow::Result<()>
944where
945 F: FnMut(ConnectionId) -> T,
946 T: Future<Output = anyhow::Result<()>>,
947{
948 let futures = receiver_ids
949 .into_iter()
950 .filter(|id| *id != sender_id)
951 .map(|id| f(id));
952 futures::future::try_join_all(futures).await?;
953 Ok(())
954}
955
956pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
957 let server = Server::new(app.state().clone(), rpc.clone(), None);
958 app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
959 let server = server.clone();
960 async move {
961 const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
962
963 let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
964 let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
965 let upgrade_requested = connection_upgrade && upgrade_to_websocket;
966 let client_protocol_version: Option<u32> = request
967 .header("X-Zed-Protocol-Version")
968 .and_then(|v| v.as_str().parse().ok());
969
970 if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
971 return Ok(Response::new(StatusCode::UpgradeRequired));
972 }
973
974 let header = match request.header("Sec-Websocket-Key") {
975 Some(h) => h.as_str(),
976 None => return Err(anyhow!("expected sec-websocket-key"))?,
977 };
978
979 let user_id = process_auth_header(&request).await?;
980
981 let mut response = Response::new(StatusCode::SwitchingProtocols);
982 response.insert_header(UPGRADE, "websocket");
983 response.insert_header(CONNECTION, "Upgrade");
984 let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
985 response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
986 response.insert_header("Sec-Websocket-Version", "13");
987
988 let http_res: &mut tide::http::Response = response.as_mut();
989 let upgrade_receiver = http_res.recv_upgrade().await;
990 let addr = request.remote().unwrap_or("unknown").to_string();
991 task::spawn(async move {
992 if let Some(stream) = upgrade_receiver.await {
993 server
994 .handle_connection(
995 Connection::new(
996 WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
997 ),
998 addr,
999 user_id,
1000 None,
1001 )
1002 .await;
1003 }
1004 });
1005
1006 Ok(response)
1007 }
1008 });
1009}
1010
1011fn header_contains_ignore_case<T>(
1012 request: &tide::Request<T>,
1013 header_name: HeaderName,
1014 value: &str,
1015) -> bool {
1016 request
1017 .header(header_name)
1018 .map(|h| {
1019 h.as_str()
1020 .split(',')
1021 .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1022 })
1023 .unwrap_or(false)
1024}
1025
1026#[cfg(test)]
1027mod tests {
1028 use super::*;
1029 use crate::{
1030 auth,
1031 db::{tests::TestDb, UserId},
1032 github, AppState, Config,
1033 };
1034 use ::rpc::Peer;
1035 use async_std::task;
1036 use gpui::{ModelHandle, TestAppContext};
1037 use parking_lot::Mutex;
1038 use postage::{mpsc, watch};
1039 use rpc::PeerId;
1040 use serde_json::json;
1041 use sqlx::types::time::OffsetDateTime;
1042 use std::{
1043 ops::Deref,
1044 path::Path,
1045 sync::{
1046 atomic::{AtomicBool, Ordering::SeqCst},
1047 Arc,
1048 },
1049 time::Duration,
1050 };
1051 use zed::{
1052 client::{
1053 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1054 EstablishConnectionError, UserStore,
1055 },
1056 editor::{Editor, EditorSettings, Input, MultiBuffer},
1057 fs::{FakeFs, Fs as _},
1058 language::{
1059 tree_sitter_rust, Diagnostic, DiagnosticEntry, Language, LanguageConfig,
1060 LanguageRegistry, LanguageServerConfig, Point,
1061 },
1062 lsp,
1063 project::{DiagnosticSummary, Project, ProjectPath},
1064 };
1065
1066 #[gpui::test]
1067 async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1068 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1069 let lang_registry = Arc::new(LanguageRegistry::new());
1070 let fs = Arc::new(FakeFs::new());
1071 cx_a.foreground().forbid_parking();
1072
1073 // Connect to a server as 2 clients.
1074 let mut server = TestServer::start().await;
1075 let client_a = server.create_client(&mut cx_a, "user_a").await;
1076 let client_b = server.create_client(&mut cx_b, "user_b").await;
1077
1078 // Share a project as client A
1079 fs.insert_tree(
1080 "/a",
1081 json!({
1082 ".zed.toml": r#"collaborators = ["user_b"]"#,
1083 "a.txt": "a-contents",
1084 "b.txt": "b-contents",
1085 }),
1086 )
1087 .await;
1088 let project_a = cx_a.update(|cx| {
1089 Project::local(
1090 client_a.clone(),
1091 client_a.user_store.clone(),
1092 lang_registry.clone(),
1093 fs.clone(),
1094 cx,
1095 )
1096 });
1097 let worktree_a = project_a
1098 .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1099 .await
1100 .unwrap();
1101 worktree_a
1102 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1103 .await;
1104 let project_id = project_a
1105 .update(&mut cx_a, |project, _| project.next_remote_id())
1106 .await;
1107 project_a
1108 .update(&mut cx_a, |project, cx| project.share(cx))
1109 .await
1110 .unwrap();
1111
1112 // Join that project as client B
1113 let project_b = Project::remote(
1114 project_id,
1115 client_b.clone(),
1116 client_b.user_store.clone(),
1117 lang_registry.clone(),
1118 fs.clone(),
1119 &mut cx_b.to_async(),
1120 )
1121 .await
1122 .unwrap();
1123 let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1124
1125 let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1126 assert_eq!(
1127 project
1128 .collaborators()
1129 .get(&client_a.peer_id)
1130 .unwrap()
1131 .user
1132 .github_login,
1133 "user_a"
1134 );
1135 project.replica_id()
1136 });
1137 project_a
1138 .condition(&cx_a, |tree, _| {
1139 tree.collaborators()
1140 .get(&client_b.peer_id)
1141 .map_or(false, |collaborator| {
1142 collaborator.replica_id == replica_id_b
1143 && collaborator.user.github_login == "user_b"
1144 })
1145 })
1146 .await;
1147
1148 // Open the same file as client B and client A.
1149 let buffer_b = worktree_b
1150 .update(&mut cx_b, |worktree, cx| worktree.open_buffer("b.txt", cx))
1151 .await
1152 .unwrap();
1153 let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1154 buffer_b.read_with(&cx_b, |buf, cx| {
1155 assert_eq!(buf.read(cx).text(), "b-contents")
1156 });
1157 worktree_a.read_with(&cx_a, |tree, cx| assert!(tree.has_open_buffer("b.txt", cx)));
1158 let buffer_a = worktree_a
1159 .update(&mut cx_a, |tree, cx| tree.open_buffer("b.txt", cx))
1160 .await
1161 .unwrap();
1162
1163 let editor_b = cx_b.add_view(window_b, |cx| {
1164 Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), cx)
1165 });
1166 // TODO
1167 // // Create a selection set as client B and see that selection set as client A.
1168 // buffer_a
1169 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1170 // .await;
1171
1172 // Edit the buffer as client B and see that edit as client A.
1173 editor_b.update(&mut cx_b, |editor, cx| {
1174 editor.handle_input(&Input("ok, ".into()), cx)
1175 });
1176 buffer_a
1177 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1178 .await;
1179
1180 // TODO
1181 // // Remove the selection set as client B, see those selections disappear as client A.
1182 cx_b.update(move |_| drop(editor_b));
1183 // buffer_a
1184 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1185 // .await;
1186
1187 // Close the buffer as client A, see that the buffer is closed.
1188 cx_a.update(move |_| drop(buffer_a));
1189 worktree_a
1190 .condition(&cx_a, |tree, cx| !tree.has_open_buffer("b.txt", cx))
1191 .await;
1192
1193 // Dropping the client B's project removes client B from client A's collaborators.
1194 cx_b.update(move |_| drop(project_b));
1195 project_a
1196 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1197 .await;
1198 }
1199
1200 #[gpui::test]
1201 async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1202 let lang_registry = Arc::new(LanguageRegistry::new());
1203 let fs = Arc::new(FakeFs::new());
1204 cx_a.foreground().forbid_parking();
1205
1206 // Connect to a server as 2 clients.
1207 let mut server = TestServer::start().await;
1208 let client_a = server.create_client(&mut cx_a, "user_a").await;
1209 let client_b = server.create_client(&mut cx_b, "user_b").await;
1210
1211 // Share a project as client A
1212 fs.insert_tree(
1213 "/a",
1214 json!({
1215 ".zed.toml": r#"collaborators = ["user_b"]"#,
1216 "a.txt": "a-contents",
1217 "b.txt": "b-contents",
1218 }),
1219 )
1220 .await;
1221 let project_a = cx_a.update(|cx| {
1222 Project::local(
1223 client_a.clone(),
1224 client_a.user_store.clone(),
1225 lang_registry.clone(),
1226 fs.clone(),
1227 cx,
1228 )
1229 });
1230 let worktree_a = project_a
1231 .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1232 .await
1233 .unwrap();
1234 worktree_a
1235 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1236 .await;
1237 let project_id = project_a
1238 .update(&mut cx_a, |project, _| project.next_remote_id())
1239 .await;
1240 project_a
1241 .update(&mut cx_a, |project, cx| project.share(cx))
1242 .await
1243 .unwrap();
1244
1245 // Join that project as client B
1246 let project_b = Project::remote(
1247 project_id,
1248 client_b.clone(),
1249 client_b.user_store.clone(),
1250 lang_registry.clone(),
1251 fs.clone(),
1252 &mut cx_b.to_async(),
1253 )
1254 .await
1255 .unwrap();
1256
1257 let worktree_b = project_b.read_with(&cx_b, |p, _| p.worktrees()[0].clone());
1258 worktree_b
1259 .update(&mut cx_b, |tree, cx| tree.open_buffer("a.txt", cx))
1260 .await
1261 .unwrap();
1262
1263 project_a
1264 .update(&mut cx_a, |project, cx| project.unshare(cx))
1265 .await
1266 .unwrap();
1267 project_b
1268 .condition(&mut cx_b, |project, _| project.is_read_only())
1269 .await;
1270 }
1271
1272 #[gpui::test]
1273 async fn test_propagate_saves_and_fs_changes(
1274 mut cx_a: TestAppContext,
1275 mut cx_b: TestAppContext,
1276 mut cx_c: TestAppContext,
1277 ) {
1278 let lang_registry = Arc::new(LanguageRegistry::new());
1279 let fs = Arc::new(FakeFs::new());
1280 cx_a.foreground().forbid_parking();
1281
1282 // Connect to a server as 3 clients.
1283 let mut server = TestServer::start().await;
1284 let client_a = server.create_client(&mut cx_a, "user_a").await;
1285 let client_b = server.create_client(&mut cx_b, "user_b").await;
1286 let client_c = server.create_client(&mut cx_c, "user_c").await;
1287
1288 // Share a worktree as client A.
1289 fs.insert_tree(
1290 "/a",
1291 json!({
1292 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1293 "file1": "",
1294 "file2": ""
1295 }),
1296 )
1297 .await;
1298 let project_a = cx_a.update(|cx| {
1299 Project::local(
1300 client_a.clone(),
1301 client_a.user_store.clone(),
1302 lang_registry.clone(),
1303 fs.clone(),
1304 cx,
1305 )
1306 });
1307 let worktree_a = project_a
1308 .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1309 .await
1310 .unwrap();
1311 worktree_a
1312 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1313 .await;
1314 let project_id = project_a
1315 .update(&mut cx_a, |project, _| project.next_remote_id())
1316 .await;
1317 project_a
1318 .update(&mut cx_a, |project, cx| project.share(cx))
1319 .await
1320 .unwrap();
1321
1322 // Join that worktree as clients B and C.
1323 let project_b = Project::remote(
1324 project_id,
1325 client_b.clone(),
1326 client_b.user_store.clone(),
1327 lang_registry.clone(),
1328 fs.clone(),
1329 &mut cx_b.to_async(),
1330 )
1331 .await
1332 .unwrap();
1333 let project_c = Project::remote(
1334 project_id,
1335 client_c.clone(),
1336 client_c.user_store.clone(),
1337 lang_registry.clone(),
1338 fs.clone(),
1339 &mut cx_c.to_async(),
1340 )
1341 .await
1342 .unwrap();
1343
1344 // Open and edit a buffer as both guests B and C.
1345 let worktree_b = project_b.read_with(&cx_b, |p, _| p.worktrees()[0].clone());
1346 let worktree_c = project_c.read_with(&cx_c, |p, _| p.worktrees()[0].clone());
1347 let buffer_b = worktree_b
1348 .update(&mut cx_b, |tree, cx| tree.open_buffer("file1", cx))
1349 .await
1350 .unwrap();
1351 let buffer_c = worktree_c
1352 .update(&mut cx_c, |tree, cx| tree.open_buffer("file1", cx))
1353 .await
1354 .unwrap();
1355 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1356 buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1357
1358 // Open and edit that buffer as the host.
1359 let buffer_a = worktree_a
1360 .update(&mut cx_a, |tree, cx| tree.open_buffer("file1", cx))
1361 .await
1362 .unwrap();
1363
1364 buffer_a
1365 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1366 .await;
1367 buffer_a.update(&mut cx_a, |buf, cx| {
1368 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1369 });
1370
1371 // Wait for edits to propagate
1372 buffer_a
1373 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1374 .await;
1375 buffer_b
1376 .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1377 .await;
1378 buffer_c
1379 .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1380 .await;
1381
1382 // Edit the buffer as the host and concurrently save as guest B.
1383 let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx).unwrap());
1384 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1385 save_b.await.unwrap();
1386 assert_eq!(
1387 fs.load("/a/file1".as_ref()).await.unwrap(),
1388 "hi-a, i-am-c, i-am-b, i-am-a"
1389 );
1390 buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1391 buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1392 buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1393
1394 // Make changes on host's file system, see those changes on the guests.
1395 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
1396 .await
1397 .unwrap();
1398 fs.insert_file(Path::new("/a/file4"), "4".into())
1399 .await
1400 .unwrap();
1401
1402 worktree_b
1403 .condition(&cx_b, |tree, _| tree.file_count() == 4)
1404 .await;
1405 worktree_c
1406 .condition(&cx_c, |tree, _| tree.file_count() == 4)
1407 .await;
1408 worktree_b.read_with(&cx_b, |tree, _| {
1409 assert_eq!(
1410 tree.paths()
1411 .map(|p| p.to_string_lossy())
1412 .collect::<Vec<_>>(),
1413 &[".zed.toml", "file1", "file3", "file4"]
1414 )
1415 });
1416 worktree_c.read_with(&cx_c, |tree, _| {
1417 assert_eq!(
1418 tree.paths()
1419 .map(|p| p.to_string_lossy())
1420 .collect::<Vec<_>>(),
1421 &[".zed.toml", "file1", "file3", "file4"]
1422 )
1423 });
1424 }
1425
1426 #[gpui::test]
1427 async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1428 cx_a.foreground().forbid_parking();
1429 let lang_registry = Arc::new(LanguageRegistry::new());
1430 let fs = Arc::new(FakeFs::new());
1431
1432 // Connect to a server as 2 clients.
1433 let mut server = TestServer::start().await;
1434 let client_a = server.create_client(&mut cx_a, "user_a").await;
1435 let client_b = server.create_client(&mut cx_b, "user_b").await;
1436
1437 // Share a project as client A
1438 fs.insert_tree(
1439 "/dir",
1440 json!({
1441 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1442 "a.txt": "a-contents",
1443 }),
1444 )
1445 .await;
1446
1447 let project_a = cx_a.update(|cx| {
1448 Project::local(
1449 client_a.clone(),
1450 client_a.user_store.clone(),
1451 lang_registry.clone(),
1452 fs.clone(),
1453 cx,
1454 )
1455 });
1456 let worktree_a = project_a
1457 .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1458 .await
1459 .unwrap();
1460 worktree_a
1461 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1462 .await;
1463 let project_id = project_a
1464 .update(&mut cx_a, |project, _| project.next_remote_id())
1465 .await;
1466 project_a
1467 .update(&mut cx_a, |project, cx| project.share(cx))
1468 .await
1469 .unwrap();
1470
1471 // Join that project as client B
1472 let project_b = Project::remote(
1473 project_id,
1474 client_b.clone(),
1475 client_b.user_store.clone(),
1476 lang_registry.clone(),
1477 fs.clone(),
1478 &mut cx_b.to_async(),
1479 )
1480 .await
1481 .unwrap();
1482 let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1483
1484 // Open a buffer as client B
1485 let buffer_b = worktree_b
1486 .update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx))
1487 .await
1488 .unwrap();
1489 let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1490
1491 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1492 buffer_b.read_with(&cx_b, |buf, _| {
1493 assert!(buf.is_dirty());
1494 assert!(!buf.has_conflict());
1495 });
1496
1497 buffer_b
1498 .update(&mut cx_b, |buf, cx| buf.save(cx))
1499 .unwrap()
1500 .await
1501 .unwrap();
1502 worktree_b
1503 .condition(&cx_b, |_, cx| {
1504 buffer_b.read(cx).file().unwrap().mtime() != mtime
1505 })
1506 .await;
1507 buffer_b.read_with(&cx_b, |buf, _| {
1508 assert!(!buf.is_dirty());
1509 assert!(!buf.has_conflict());
1510 });
1511
1512 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1513 buffer_b.read_with(&cx_b, |buf, _| {
1514 assert!(buf.is_dirty());
1515 assert!(!buf.has_conflict());
1516 });
1517 }
1518
1519 #[gpui::test]
1520 async fn test_editing_while_guest_opens_buffer(
1521 mut cx_a: TestAppContext,
1522 mut cx_b: TestAppContext,
1523 ) {
1524 cx_a.foreground().forbid_parking();
1525 let lang_registry = Arc::new(LanguageRegistry::new());
1526 let fs = Arc::new(FakeFs::new());
1527
1528 // Connect to a server as 2 clients.
1529 let mut server = TestServer::start().await;
1530 let client_a = server.create_client(&mut cx_a, "user_a").await;
1531 let client_b = server.create_client(&mut cx_b, "user_b").await;
1532
1533 // Share a project as client A
1534 fs.insert_tree(
1535 "/dir",
1536 json!({
1537 ".zed.toml": r#"collaborators = ["user_b"]"#,
1538 "a.txt": "a-contents",
1539 }),
1540 )
1541 .await;
1542 let project_a = cx_a.update(|cx| {
1543 Project::local(
1544 client_a.clone(),
1545 client_a.user_store.clone(),
1546 lang_registry.clone(),
1547 fs.clone(),
1548 cx,
1549 )
1550 });
1551 let worktree_a = project_a
1552 .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1553 .await
1554 .unwrap();
1555 worktree_a
1556 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1557 .await;
1558 let project_id = project_a
1559 .update(&mut cx_a, |project, _| project.next_remote_id())
1560 .await;
1561 project_a
1562 .update(&mut cx_a, |project, cx| project.share(cx))
1563 .await
1564 .unwrap();
1565
1566 // Join that project as client B
1567 let project_b = Project::remote(
1568 project_id,
1569 client_b.clone(),
1570 client_b.user_store.clone(),
1571 lang_registry.clone(),
1572 fs.clone(),
1573 &mut cx_b.to_async(),
1574 )
1575 .await
1576 .unwrap();
1577 let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1578
1579 // Open a buffer as client A
1580 let buffer_a = worktree_a
1581 .update(&mut cx_a, |tree, cx| tree.open_buffer("a.txt", cx))
1582 .await
1583 .unwrap();
1584
1585 // Start opening the same buffer as client B
1586 let buffer_b = cx_b
1587 .background()
1588 .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1589 task::yield_now().await;
1590
1591 // Edit the buffer as client A while client B is still opening it.
1592 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1593
1594 let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1595 let buffer_b = buffer_b.await.unwrap();
1596 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1597 }
1598
1599 #[gpui::test]
1600 async fn test_leaving_worktree_while_opening_buffer(
1601 mut cx_a: TestAppContext,
1602 mut cx_b: TestAppContext,
1603 ) {
1604 cx_a.foreground().forbid_parking();
1605 let lang_registry = Arc::new(LanguageRegistry::new());
1606 let fs = Arc::new(FakeFs::new());
1607
1608 // Connect to a server as 2 clients.
1609 let mut server = TestServer::start().await;
1610 let client_a = server.create_client(&mut cx_a, "user_a").await;
1611 let client_b = server.create_client(&mut cx_b, "user_b").await;
1612
1613 // Share a project as client A
1614 fs.insert_tree(
1615 "/dir",
1616 json!({
1617 ".zed.toml": r#"collaborators = ["user_b"]"#,
1618 "a.txt": "a-contents",
1619 }),
1620 )
1621 .await;
1622 let project_a = cx_a.update(|cx| {
1623 Project::local(
1624 client_a.clone(),
1625 client_a.user_store.clone(),
1626 lang_registry.clone(),
1627 fs.clone(),
1628 cx,
1629 )
1630 });
1631 let worktree_a = project_a
1632 .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1633 .await
1634 .unwrap();
1635 worktree_a
1636 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1637 .await;
1638 let project_id = project_a
1639 .update(&mut cx_a, |project, _| project.next_remote_id())
1640 .await;
1641 project_a
1642 .update(&mut cx_a, |project, cx| project.share(cx))
1643 .await
1644 .unwrap();
1645
1646 // Join that project as client B
1647 let project_b = Project::remote(
1648 project_id,
1649 client_b.clone(),
1650 client_b.user_store.clone(),
1651 lang_registry.clone(),
1652 fs.clone(),
1653 &mut cx_b.to_async(),
1654 )
1655 .await
1656 .unwrap();
1657 let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1658
1659 // See that a guest has joined as client A.
1660 project_a
1661 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1662 .await;
1663
1664 // Begin opening a buffer as client B, but leave the project before the open completes.
1665 let buffer_b = cx_b
1666 .background()
1667 .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1668 cx_b.update(|_| drop(project_b));
1669 drop(buffer_b);
1670
1671 // See that the guest has left.
1672 project_a
1673 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1674 .await;
1675 }
1676
1677 #[gpui::test]
1678 async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1679 cx_a.foreground().forbid_parking();
1680 let lang_registry = Arc::new(LanguageRegistry::new());
1681 let fs = Arc::new(FakeFs::new());
1682
1683 // Connect to a server as 2 clients.
1684 let mut server = TestServer::start().await;
1685 let client_a = server.create_client(&mut cx_a, "user_a").await;
1686 let client_b = server.create_client(&mut cx_b, "user_b").await;
1687
1688 // Share a project as client A
1689 fs.insert_tree(
1690 "/a",
1691 json!({
1692 ".zed.toml": r#"collaborators = ["user_b"]"#,
1693 "a.txt": "a-contents",
1694 "b.txt": "b-contents",
1695 }),
1696 )
1697 .await;
1698 let project_a = cx_a.update(|cx| {
1699 Project::local(
1700 client_a.clone(),
1701 client_a.user_store.clone(),
1702 lang_registry.clone(),
1703 fs.clone(),
1704 cx,
1705 )
1706 });
1707 let worktree_a = project_a
1708 .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1709 .await
1710 .unwrap();
1711 worktree_a
1712 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1713 .await;
1714 let project_id = project_a
1715 .update(&mut cx_a, |project, _| project.next_remote_id())
1716 .await;
1717 project_a
1718 .update(&mut cx_a, |project, cx| project.share(cx))
1719 .await
1720 .unwrap();
1721
1722 // Join that project as client B
1723 let _project_b = Project::remote(
1724 project_id,
1725 client_b.clone(),
1726 client_b.user_store.clone(),
1727 lang_registry.clone(),
1728 fs.clone(),
1729 &mut cx_b.to_async(),
1730 )
1731 .await
1732 .unwrap();
1733
1734 // See that a guest has joined as client A.
1735 project_a
1736 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1737 .await;
1738
1739 // Drop client B's connection and ensure client A observes client B leaving the worktree.
1740 client_b.disconnect(&cx_b.to_async()).await.unwrap();
1741 project_a
1742 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1743 .await;
1744 }
1745
1746 #[gpui::test]
1747 async fn test_collaborating_with_diagnostics(
1748 mut cx_a: TestAppContext,
1749 mut cx_b: TestAppContext,
1750 ) {
1751 cx_a.foreground().forbid_parking();
1752 let mut lang_registry = Arc::new(LanguageRegistry::new());
1753 let fs = Arc::new(FakeFs::new());
1754
1755 // Set up a fake language server.
1756 let (language_server_config, mut fake_language_server) =
1757 LanguageServerConfig::fake(cx_a.background()).await;
1758 Arc::get_mut(&mut lang_registry)
1759 .unwrap()
1760 .add(Arc::new(Language::new(
1761 LanguageConfig {
1762 name: "Rust".to_string(),
1763 path_suffixes: vec!["rs".to_string()],
1764 language_server: Some(language_server_config),
1765 ..Default::default()
1766 },
1767 Some(tree_sitter_rust::language()),
1768 )));
1769
1770 // Connect to a server as 2 clients.
1771 let mut server = TestServer::start().await;
1772 let client_a = server.create_client(&mut cx_a, "user_a").await;
1773 let client_b = server.create_client(&mut cx_b, "user_b").await;
1774
1775 // Share a project as client A
1776 fs.insert_tree(
1777 "/a",
1778 json!({
1779 ".zed.toml": r#"collaborators = ["user_b"]"#,
1780 "a.rs": "let one = two",
1781 "other.rs": "",
1782 }),
1783 )
1784 .await;
1785 let project_a = cx_a.update(|cx| {
1786 Project::local(
1787 client_a.clone(),
1788 client_a.user_store.clone(),
1789 lang_registry.clone(),
1790 fs.clone(),
1791 cx,
1792 )
1793 });
1794 let worktree_a = project_a
1795 .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1796 .await
1797 .unwrap();
1798 worktree_a
1799 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1800 .await;
1801 let project_id = project_a
1802 .update(&mut cx_a, |project, _| project.next_remote_id())
1803 .await;
1804 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1805 project_a
1806 .update(&mut cx_a, |project, cx| project.share(cx))
1807 .await
1808 .unwrap();
1809
1810 // Cause the language server to start.
1811 let _ = cx_a
1812 .background()
1813 .spawn(worktree_a.update(&mut cx_a, |worktree, cx| {
1814 worktree.open_buffer("other.rs", cx)
1815 }))
1816 .await
1817 .unwrap();
1818
1819 // Join the worktree as client B.
1820 let project_b = Project::remote(
1821 project_id,
1822 client_b.clone(),
1823 client_b.user_store.clone(),
1824 lang_registry.clone(),
1825 fs.clone(),
1826 &mut cx_b.to_async(),
1827 )
1828 .await
1829 .unwrap();
1830
1831 // Simulate a language server reporting errors for a file.
1832 fake_language_server
1833 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1834 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1835 version: None,
1836 diagnostics: vec![
1837 lsp::Diagnostic {
1838 severity: Some(lsp::DiagnosticSeverity::ERROR),
1839 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1840 message: "message 1".to_string(),
1841 ..Default::default()
1842 },
1843 lsp::Diagnostic {
1844 severity: Some(lsp::DiagnosticSeverity::WARNING),
1845 range: lsp::Range::new(
1846 lsp::Position::new(0, 10),
1847 lsp::Position::new(0, 13),
1848 ),
1849 message: "message 2".to_string(),
1850 ..Default::default()
1851 },
1852 ],
1853 })
1854 .await;
1855
1856 project_b
1857 .condition(&cx_b, |project, cx| {
1858 project.diagnostic_summaries(cx).collect::<Vec<_>>()
1859 == &[(
1860 ProjectPath {
1861 worktree_id,
1862 path: Arc::from(Path::new("a.rs")),
1863 },
1864 DiagnosticSummary {
1865 error_count: 1,
1866 warning_count: 1,
1867 ..Default::default()
1868 },
1869 )]
1870 })
1871 .await;
1872
1873 // Open the file with the errors.
1874 let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1875 let buffer_b = cx_b
1876 .background()
1877 .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.rs", cx)))
1878 .await
1879 .unwrap();
1880
1881 buffer_b.read_with(&cx_b, |buffer, _| {
1882 assert_eq!(
1883 buffer
1884 .snapshot()
1885 .diagnostics_in_range::<_, Point>(0..buffer.len())
1886 .map(|entry| entry)
1887 .collect::<Vec<_>>(),
1888 &[
1889 DiagnosticEntry {
1890 range: Point::new(0, 4)..Point::new(0, 7),
1891 diagnostic: Diagnostic {
1892 group_id: 0,
1893 message: "message 1".to_string(),
1894 severity: lsp::DiagnosticSeverity::ERROR,
1895 is_primary: true,
1896 ..Default::default()
1897 }
1898 },
1899 DiagnosticEntry {
1900 range: Point::new(0, 10)..Point::new(0, 13),
1901 diagnostic: Diagnostic {
1902 group_id: 1,
1903 severity: lsp::DiagnosticSeverity::WARNING,
1904 message: "message 2".to_string(),
1905 is_primary: true,
1906 ..Default::default()
1907 }
1908 }
1909 ]
1910 );
1911 });
1912 }
1913
1914 #[gpui::test]
1915 async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1916 cx_a.foreground().forbid_parking();
1917
1918 // Connect to a server as 2 clients.
1919 let mut server = TestServer::start().await;
1920 let client_a = server.create_client(&mut cx_a, "user_a").await;
1921 let client_b = server.create_client(&mut cx_b, "user_b").await;
1922
1923 // Create an org that includes these 2 users.
1924 let db = &server.app_state.db;
1925 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1926 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
1927 .await
1928 .unwrap();
1929 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
1930 .await
1931 .unwrap();
1932
1933 // Create a channel that includes all the users.
1934 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
1935 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
1936 .await
1937 .unwrap();
1938 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
1939 .await
1940 .unwrap();
1941 db.create_channel_message(
1942 channel_id,
1943 client_b.current_user_id(&cx_b),
1944 "hello A, it's B.",
1945 OffsetDateTime::now_utc(),
1946 1,
1947 )
1948 .await
1949 .unwrap();
1950
1951 let channels_a = cx_a
1952 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
1953 channels_a
1954 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
1955 .await;
1956 channels_a.read_with(&cx_a, |list, _| {
1957 assert_eq!(
1958 list.available_channels().unwrap(),
1959 &[ChannelDetails {
1960 id: channel_id.to_proto(),
1961 name: "test-channel".to_string()
1962 }]
1963 )
1964 });
1965 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
1966 this.get_channel(channel_id.to_proto(), cx).unwrap()
1967 });
1968 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
1969 channel_a
1970 .condition(&cx_a, |channel, _| {
1971 channel_messages(channel)
1972 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1973 })
1974 .await;
1975
1976 let channels_b = cx_b
1977 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
1978 channels_b
1979 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
1980 .await;
1981 channels_b.read_with(&cx_b, |list, _| {
1982 assert_eq!(
1983 list.available_channels().unwrap(),
1984 &[ChannelDetails {
1985 id: channel_id.to_proto(),
1986 name: "test-channel".to_string()
1987 }]
1988 )
1989 });
1990
1991 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
1992 this.get_channel(channel_id.to_proto(), cx).unwrap()
1993 });
1994 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
1995 channel_b
1996 .condition(&cx_b, |channel, _| {
1997 channel_messages(channel)
1998 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
1999 })
2000 .await;
2001
2002 channel_a
2003 .update(&mut cx_a, |channel, cx| {
2004 channel
2005 .send_message("oh, hi B.".to_string(), cx)
2006 .unwrap()
2007 .detach();
2008 let task = channel.send_message("sup".to_string(), cx).unwrap();
2009 assert_eq!(
2010 channel_messages(channel),
2011 &[
2012 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2013 ("user_a".to_string(), "oh, hi B.".to_string(), true),
2014 ("user_a".to_string(), "sup".to_string(), true)
2015 ]
2016 );
2017 task
2018 })
2019 .await
2020 .unwrap();
2021
2022 channel_b
2023 .condition(&cx_b, |channel, _| {
2024 channel_messages(channel)
2025 == [
2026 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2027 ("user_a".to_string(), "oh, hi B.".to_string(), false),
2028 ("user_a".to_string(), "sup".to_string(), false),
2029 ]
2030 })
2031 .await;
2032
2033 assert_eq!(
2034 server
2035 .state()
2036 .await
2037 .channel(channel_id)
2038 .unwrap()
2039 .connection_ids
2040 .len(),
2041 2
2042 );
2043 cx_b.update(|_| drop(channel_b));
2044 server
2045 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
2046 .await;
2047
2048 cx_a.update(|_| drop(channel_a));
2049 server
2050 .condition(|state| state.channel(channel_id).is_none())
2051 .await;
2052 }
2053
2054 #[gpui::test]
2055 async fn test_chat_message_validation(mut cx_a: TestAppContext) {
2056 cx_a.foreground().forbid_parking();
2057
2058 let mut server = TestServer::start().await;
2059 let client_a = server.create_client(&mut cx_a, "user_a").await;
2060
2061 let db = &server.app_state.db;
2062 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2063 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2064 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2065 .await
2066 .unwrap();
2067 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2068 .await
2069 .unwrap();
2070
2071 let channels_a = cx_a
2072 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2073 channels_a
2074 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2075 .await;
2076 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2077 this.get_channel(channel_id.to_proto(), cx).unwrap()
2078 });
2079
2080 // Messages aren't allowed to be too long.
2081 channel_a
2082 .update(&mut cx_a, |channel, cx| {
2083 let long_body = "this is long.\n".repeat(1024);
2084 channel.send_message(long_body, cx).unwrap()
2085 })
2086 .await
2087 .unwrap_err();
2088
2089 // Messages aren't allowed to be blank.
2090 channel_a.update(&mut cx_a, |channel, cx| {
2091 channel.send_message(String::new(), cx).unwrap_err()
2092 });
2093
2094 // Leading and trailing whitespace are trimmed.
2095 channel_a
2096 .update(&mut cx_a, |channel, cx| {
2097 channel
2098 .send_message("\n surrounded by whitespace \n".to_string(), cx)
2099 .unwrap()
2100 })
2101 .await
2102 .unwrap();
2103 assert_eq!(
2104 db.get_channel_messages(channel_id, 10, None)
2105 .await
2106 .unwrap()
2107 .iter()
2108 .map(|m| &m.body)
2109 .collect::<Vec<_>>(),
2110 &["surrounded by whitespace"]
2111 );
2112 }
2113
2114 #[gpui::test]
2115 async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2116 cx_a.foreground().forbid_parking();
2117
2118 // Connect to a server as 2 clients.
2119 let mut server = TestServer::start().await;
2120 let client_a = server.create_client(&mut cx_a, "user_a").await;
2121 let client_b = server.create_client(&mut cx_b, "user_b").await;
2122 let mut status_b = client_b.status();
2123
2124 // Create an org that includes these 2 users.
2125 let db = &server.app_state.db;
2126 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2127 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2128 .await
2129 .unwrap();
2130 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2131 .await
2132 .unwrap();
2133
2134 // Create a channel that includes all the users.
2135 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2136 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2137 .await
2138 .unwrap();
2139 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2140 .await
2141 .unwrap();
2142 db.create_channel_message(
2143 channel_id,
2144 client_b.current_user_id(&cx_b),
2145 "hello A, it's B.",
2146 OffsetDateTime::now_utc(),
2147 2,
2148 )
2149 .await
2150 .unwrap();
2151
2152 let channels_a = cx_a
2153 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2154 channels_a
2155 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2156 .await;
2157
2158 channels_a.read_with(&cx_a, |list, _| {
2159 assert_eq!(
2160 list.available_channels().unwrap(),
2161 &[ChannelDetails {
2162 id: channel_id.to_proto(),
2163 name: "test-channel".to_string()
2164 }]
2165 )
2166 });
2167 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2168 this.get_channel(channel_id.to_proto(), cx).unwrap()
2169 });
2170 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2171 channel_a
2172 .condition(&cx_a, |channel, _| {
2173 channel_messages(channel)
2174 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2175 })
2176 .await;
2177
2178 let channels_b = cx_b
2179 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2180 channels_b
2181 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2182 .await;
2183 channels_b.read_with(&cx_b, |list, _| {
2184 assert_eq!(
2185 list.available_channels().unwrap(),
2186 &[ChannelDetails {
2187 id: channel_id.to_proto(),
2188 name: "test-channel".to_string()
2189 }]
2190 )
2191 });
2192
2193 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2194 this.get_channel(channel_id.to_proto(), cx).unwrap()
2195 });
2196 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2197 channel_b
2198 .condition(&cx_b, |channel, _| {
2199 channel_messages(channel)
2200 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2201 })
2202 .await;
2203
2204 // Disconnect client B, ensuring we can still access its cached channel data.
2205 server.forbid_connections();
2206 server.disconnect_client(client_b.current_user_id(&cx_b));
2207 while !matches!(
2208 status_b.recv().await,
2209 Some(client::Status::ReconnectionError { .. })
2210 ) {}
2211
2212 channels_b.read_with(&cx_b, |channels, _| {
2213 assert_eq!(
2214 channels.available_channels().unwrap(),
2215 [ChannelDetails {
2216 id: channel_id.to_proto(),
2217 name: "test-channel".to_string()
2218 }]
2219 )
2220 });
2221 channel_b.read_with(&cx_b, |channel, _| {
2222 assert_eq!(
2223 channel_messages(channel),
2224 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2225 )
2226 });
2227
2228 // Send a message from client B while it is disconnected.
2229 channel_b
2230 .update(&mut cx_b, |channel, cx| {
2231 let task = channel
2232 .send_message("can you see this?".to_string(), cx)
2233 .unwrap();
2234 assert_eq!(
2235 channel_messages(channel),
2236 &[
2237 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2238 ("user_b".to_string(), "can you see this?".to_string(), true)
2239 ]
2240 );
2241 task
2242 })
2243 .await
2244 .unwrap_err();
2245
2246 // Send a message from client A while B is disconnected.
2247 channel_a
2248 .update(&mut cx_a, |channel, cx| {
2249 channel
2250 .send_message("oh, hi B.".to_string(), cx)
2251 .unwrap()
2252 .detach();
2253 let task = channel.send_message("sup".to_string(), cx).unwrap();
2254 assert_eq!(
2255 channel_messages(channel),
2256 &[
2257 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2258 ("user_a".to_string(), "oh, hi B.".to_string(), true),
2259 ("user_a".to_string(), "sup".to_string(), true)
2260 ]
2261 );
2262 task
2263 })
2264 .await
2265 .unwrap();
2266
2267 // Give client B a chance to reconnect.
2268 server.allow_connections();
2269 cx_b.foreground().advance_clock(Duration::from_secs(10));
2270
2271 // Verify that B sees the new messages upon reconnection, as well as the message client B
2272 // sent while offline.
2273 channel_b
2274 .condition(&cx_b, |channel, _| {
2275 channel_messages(channel)
2276 == [
2277 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2278 ("user_a".to_string(), "oh, hi B.".to_string(), false),
2279 ("user_a".to_string(), "sup".to_string(), false),
2280 ("user_b".to_string(), "can you see this?".to_string(), false),
2281 ]
2282 })
2283 .await;
2284
2285 // Ensure client A and B can communicate normally after reconnection.
2286 channel_a
2287 .update(&mut cx_a, |channel, cx| {
2288 channel.send_message("you online?".to_string(), cx).unwrap()
2289 })
2290 .await
2291 .unwrap();
2292 channel_b
2293 .condition(&cx_b, |channel, _| {
2294 channel_messages(channel)
2295 == [
2296 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2297 ("user_a".to_string(), "oh, hi B.".to_string(), false),
2298 ("user_a".to_string(), "sup".to_string(), false),
2299 ("user_b".to_string(), "can you see this?".to_string(), false),
2300 ("user_a".to_string(), "you online?".to_string(), false),
2301 ]
2302 })
2303 .await;
2304
2305 channel_b
2306 .update(&mut cx_b, |channel, cx| {
2307 channel.send_message("yep".to_string(), cx).unwrap()
2308 })
2309 .await
2310 .unwrap();
2311 channel_a
2312 .condition(&cx_a, |channel, _| {
2313 channel_messages(channel)
2314 == [
2315 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2316 ("user_a".to_string(), "oh, hi B.".to_string(), false),
2317 ("user_a".to_string(), "sup".to_string(), false),
2318 ("user_b".to_string(), "can you see this?".to_string(), false),
2319 ("user_a".to_string(), "you online?".to_string(), false),
2320 ("user_b".to_string(), "yep".to_string(), false),
2321 ]
2322 })
2323 .await;
2324 }
2325
2326 #[gpui::test]
2327 async fn test_contacts(
2328 mut cx_a: TestAppContext,
2329 mut cx_b: TestAppContext,
2330 mut cx_c: TestAppContext,
2331 ) {
2332 cx_a.foreground().forbid_parking();
2333 let lang_registry = Arc::new(LanguageRegistry::new());
2334 let fs = Arc::new(FakeFs::new());
2335
2336 // Connect to a server as 3 clients.
2337 let mut server = TestServer::start().await;
2338 let client_a = server.create_client(&mut cx_a, "user_a").await;
2339 let client_b = server.create_client(&mut cx_b, "user_b").await;
2340 let client_c = server.create_client(&mut cx_c, "user_c").await;
2341
2342 // Share a worktree as client A.
2343 fs.insert_tree(
2344 "/a",
2345 json!({
2346 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2347 }),
2348 )
2349 .await;
2350
2351 let project_a = cx_a.update(|cx| {
2352 Project::local(
2353 client_a.clone(),
2354 client_a.user_store.clone(),
2355 lang_registry.clone(),
2356 fs.clone(),
2357 cx,
2358 )
2359 });
2360 let worktree_a = project_a
2361 .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
2362 .await
2363 .unwrap();
2364 worktree_a
2365 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2366 .await;
2367
2368 client_a
2369 .user_store
2370 .condition(&cx_a, |user_store, _| {
2371 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2372 })
2373 .await;
2374 client_b
2375 .user_store
2376 .condition(&cx_b, |user_store, _| {
2377 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2378 })
2379 .await;
2380 client_c
2381 .user_store
2382 .condition(&cx_c, |user_store, _| {
2383 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2384 })
2385 .await;
2386
2387 let project_id = project_a
2388 .update(&mut cx_a, |project, _| project.next_remote_id())
2389 .await;
2390 project_a
2391 .update(&mut cx_a, |project, cx| project.share(cx))
2392 .await
2393 .unwrap();
2394
2395 let _project_b = Project::remote(
2396 project_id,
2397 client_b.clone(),
2398 client_b.user_store.clone(),
2399 lang_registry.clone(),
2400 fs.clone(),
2401 &mut cx_b.to_async(),
2402 )
2403 .await
2404 .unwrap();
2405
2406 client_a
2407 .user_store
2408 .condition(&cx_a, |user_store, _| {
2409 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2410 })
2411 .await;
2412 client_b
2413 .user_store
2414 .condition(&cx_b, |user_store, _| {
2415 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2416 })
2417 .await;
2418 client_c
2419 .user_store
2420 .condition(&cx_c, |user_store, _| {
2421 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2422 })
2423 .await;
2424
2425 project_a
2426 .condition(&cx_a, |project, _| {
2427 project.collaborators().contains_key(&client_b.peer_id)
2428 })
2429 .await;
2430
2431 cx_a.update(move |_| drop(project_a));
2432 client_a
2433 .user_store
2434 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
2435 .await;
2436 client_b
2437 .user_store
2438 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
2439 .await;
2440 client_c
2441 .user_store
2442 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
2443 .await;
2444
2445 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
2446 user_store
2447 .contacts()
2448 .iter()
2449 .map(|contact| {
2450 let worktrees = contact
2451 .projects
2452 .iter()
2453 .map(|p| {
2454 (
2455 p.worktree_root_names[0].as_str(),
2456 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
2457 )
2458 })
2459 .collect();
2460 (contact.user.github_login.as_str(), worktrees)
2461 })
2462 .collect()
2463 }
2464 }
2465
2466 struct TestServer {
2467 peer: Arc<Peer>,
2468 app_state: Arc<AppState>,
2469 server: Arc<Server>,
2470 notifications: mpsc::Receiver<()>,
2471 connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
2472 forbid_connections: Arc<AtomicBool>,
2473 _test_db: TestDb,
2474 }
2475
2476 impl TestServer {
2477 async fn start() -> Self {
2478 let test_db = TestDb::new();
2479 let app_state = Self::build_app_state(&test_db).await;
2480 let peer = Peer::new();
2481 let notifications = mpsc::channel(128);
2482 let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
2483 Self {
2484 peer,
2485 app_state,
2486 server,
2487 notifications: notifications.1,
2488 connection_killers: Default::default(),
2489 forbid_connections: Default::default(),
2490 _test_db: test_db,
2491 }
2492 }
2493
2494 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
2495 let http = FakeHttpClient::with_404_response();
2496 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
2497 let client_name = name.to_string();
2498 let mut client = Client::new(http.clone());
2499 let server = self.server.clone();
2500 let connection_killers = self.connection_killers.clone();
2501 let forbid_connections = self.forbid_connections.clone();
2502 let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
2503
2504 Arc::get_mut(&mut client)
2505 .unwrap()
2506 .override_authenticate(move |cx| {
2507 cx.spawn(|_| async move {
2508 let access_token = "the-token".to_string();
2509 Ok(Credentials {
2510 user_id: user_id.0 as u64,
2511 access_token,
2512 })
2513 })
2514 })
2515 .override_establish_connection(move |credentials, cx| {
2516 assert_eq!(credentials.user_id, user_id.0 as u64);
2517 assert_eq!(credentials.access_token, "the-token");
2518
2519 let server = server.clone();
2520 let connection_killers = connection_killers.clone();
2521 let forbid_connections = forbid_connections.clone();
2522 let client_name = client_name.clone();
2523 let connection_id_tx = connection_id_tx.clone();
2524 cx.spawn(move |cx| async move {
2525 if forbid_connections.load(SeqCst) {
2526 Err(EstablishConnectionError::other(anyhow!(
2527 "server is forbidding connections"
2528 )))
2529 } else {
2530 let (client_conn, server_conn, kill_conn) = Connection::in_memory();
2531 connection_killers.lock().insert(user_id, kill_conn);
2532 cx.background()
2533 .spawn(server.handle_connection(
2534 server_conn,
2535 client_name,
2536 user_id,
2537 Some(connection_id_tx),
2538 ))
2539 .detach();
2540 Ok(client_conn)
2541 }
2542 })
2543 });
2544
2545 client
2546 .authenticate_and_connect(&cx.to_async())
2547 .await
2548 .unwrap();
2549
2550 let peer_id = PeerId(connection_id_rx.recv().await.unwrap().0);
2551 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
2552 let mut authed_user =
2553 user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
2554 while authed_user.recv().await.unwrap().is_none() {}
2555
2556 TestClient {
2557 client,
2558 peer_id,
2559 user_store,
2560 }
2561 }
2562
2563 fn disconnect_client(&self, user_id: UserId) {
2564 if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
2565 let _ = kill_conn.try_send(Some(()));
2566 }
2567 }
2568
2569 fn forbid_connections(&self) {
2570 self.forbid_connections.store(true, SeqCst);
2571 }
2572
2573 fn allow_connections(&self) {
2574 self.forbid_connections.store(false, SeqCst);
2575 }
2576
2577 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
2578 let mut config = Config::default();
2579 config.session_secret = "a".repeat(32);
2580 config.database_url = test_db.url.clone();
2581 let github_client = github::AppClient::test();
2582 Arc::new(AppState {
2583 db: test_db.db().clone(),
2584 handlebars: Default::default(),
2585 auth_client: auth::build_client("", ""),
2586 repo_client: github::RepoClient::test(&github_client),
2587 github_client,
2588 config,
2589 })
2590 }
2591
2592 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
2593 self.server.store.read()
2594 }
2595
2596 async fn condition<F>(&mut self, mut predicate: F)
2597 where
2598 F: FnMut(&Store) -> bool,
2599 {
2600 async_std::future::timeout(Duration::from_millis(500), async {
2601 while !(predicate)(&*self.server.store.read()) {
2602 self.notifications.recv().await;
2603 }
2604 })
2605 .await
2606 .expect("condition timed out");
2607 }
2608 }
2609
2610 impl Drop for TestServer {
2611 fn drop(&mut self) {
2612 task::block_on(self.peer.reset());
2613 }
2614 }
2615
2616 struct TestClient {
2617 client: Arc<Client>,
2618 pub peer_id: PeerId,
2619 pub user_store: ModelHandle<UserStore>,
2620 }
2621
2622 impl Deref for TestClient {
2623 type Target = Arc<Client>;
2624
2625 fn deref(&self) -> &Self::Target {
2626 &self.client
2627 }
2628 }
2629
2630 impl TestClient {
2631 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
2632 UserId::from_proto(
2633 self.user_store
2634 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
2635 )
2636 }
2637 }
2638
2639 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
2640 channel
2641 .messages()
2642 .cursor::<()>()
2643 .map(|m| {
2644 (
2645 m.sender.github_login.clone(),
2646 m.body.clone(),
2647 m.is_pending(),
2648 )
2649 })
2650 .collect()
2651 }
2652
2653 struct EmptyView;
2654
2655 impl gpui::Entity for EmptyView {
2656 type Event = ();
2657 }
2658
2659 impl gpui::View for EmptyView {
2660 fn ui_name() -> &'static str {
2661 "empty view"
2662 }
2663
2664 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
2665 gpui::Element::boxed(gpui::elements::Empty)
2666 }
2667 }
2668}