1mod store;
2
3use super::{
4 auth::process_auth_header,
5 db::{ChannelId, MessageId, UserId},
6 AppState,
7};
8use anyhow::anyhow;
9use async_std::task;
10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
11use collections::{HashMap, HashSet};
12use futures::{future::BoxFuture, FutureExt};
13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
14use postage::{mpsc, prelude::Sink as _, prelude::Stream as _};
15use rpc::{
16 proto::{self, AnyTypedEnvelope, EnvelopedMessage},
17 Connection, ConnectionId, Peer, TypedEnvelope,
18};
19use sha1::{Digest as _, Sha1};
20use std::{any::TypeId, future::Future, mem, path::PathBuf, sync::Arc, time::Instant};
21use store::{Store, Worktree};
22use surf::StatusCode;
23use tide::log;
24use tide::{
25 http::headers::{HeaderName, CONNECTION, UPGRADE},
26 Request, Response,
27};
28use time::OffsetDateTime;
29
30type MessageHandler = Box<
31 dyn Send
32 + Sync
33 + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
34>;
35
36pub struct Server {
37 peer: Arc<Peer>,
38 store: RwLock<Store>,
39 app_state: Arc<AppState>,
40 handlers: HashMap<TypeId, MessageHandler>,
41 notifications: Option<mpsc::Sender<()>>,
42}
43
44const MESSAGE_COUNT_PER_PAGE: usize = 100;
45const MAX_MESSAGE_LEN: usize = 1024;
46const NO_SUCH_PROJECT: &'static str = "no such project";
47
48impl Server {
49 pub fn new(
50 app_state: Arc<AppState>,
51 peer: Arc<Peer>,
52 notifications: Option<mpsc::Sender<()>>,
53 ) -> Arc<Self> {
54 let mut server = Self {
55 peer,
56 app_state,
57 store: Default::default(),
58 handlers: Default::default(),
59 notifications,
60 };
61
62 server
63 .add_handler(Server::ping)
64 .add_handler(Server::register_project)
65 .add_handler(Server::unregister_project)
66 .add_handler(Server::share_project)
67 .add_handler(Server::unshare_project)
68 .add_handler(Server::join_project)
69 .add_handler(Server::leave_project)
70 .add_handler(Server::register_worktree)
71 .add_handler(Server::unregister_worktree)
72 .add_handler(Server::share_worktree)
73 .add_handler(Server::update_worktree)
74 .add_handler(Server::update_diagnostic_summary)
75 .add_handler(Server::disk_based_diagnostics_updated)
76 .add_handler(Server::open_buffer)
77 .add_handler(Server::close_buffer)
78 .add_handler(Server::update_buffer)
79 .add_handler(Server::buffer_saved)
80 .add_handler(Server::save_buffer)
81 .add_handler(Server::get_channels)
82 .add_handler(Server::get_users)
83 .add_handler(Server::join_channel)
84 .add_handler(Server::leave_channel)
85 .add_handler(Server::send_channel_message)
86 .add_handler(Server::get_channel_messages);
87
88 Arc::new(server)
89 }
90
91 fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
92 where
93 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
94 Fut: 'static + Send + Future<Output = tide::Result<()>>,
95 M: EnvelopedMessage,
96 {
97 let prev_handler = self.handlers.insert(
98 TypeId::of::<M>(),
99 Box::new(move |server, envelope| {
100 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
101 (handler)(server, *envelope).boxed()
102 }),
103 );
104 if prev_handler.is_some() {
105 panic!("registered a handler for the same message twice");
106 }
107 self
108 }
109
110 pub fn handle_connection(
111 self: &Arc<Self>,
112 connection: Connection,
113 addr: String,
114 user_id: UserId,
115 mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
116 ) -> impl Future<Output = ()> {
117 let mut this = self.clone();
118 async move {
119 let (connection_id, handle_io, mut incoming_rx) =
120 this.peer.add_connection(connection).await;
121
122 if let Some(send_connection_id) = send_connection_id.as_mut() {
123 let _ = send_connection_id.send(connection_id).await;
124 }
125
126 this.state_mut().add_connection(connection_id, user_id);
127 if let Err(err) = this.update_contacts_for_users(&[user_id]).await {
128 log::error!("error updating contacts for {:?}: {}", user_id, err);
129 }
130
131 let handle_io = handle_io.fuse();
132 futures::pin_mut!(handle_io);
133 loop {
134 let next_message = incoming_rx.recv().fuse();
135 futures::pin_mut!(next_message);
136 futures::select_biased! {
137 message = next_message => {
138 if let Some(message) = message {
139 let start_time = Instant::now();
140 log::info!("RPC message received: {}", message.payload_type_name());
141 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
142 if let Err(err) = (handler)(this.clone(), message).await {
143 log::error!("error handling message: {:?}", err);
144 } else {
145 log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
146 }
147
148 if let Some(mut notifications) = this.notifications.clone() {
149 let _ = notifications.send(()).await;
150 }
151 } else {
152 log::warn!("unhandled message: {}", message.payload_type_name());
153 }
154 } else {
155 log::info!("rpc connection closed {:?}", addr);
156 break;
157 }
158 }
159 handle_io = handle_io => {
160 if let Err(err) = handle_io {
161 log::error!("error handling rpc connection {:?} - {:?}", addr, err);
162 }
163 break;
164 }
165 }
166 }
167
168 if let Err(err) = this.sign_out(connection_id).await {
169 log::error!("error signing out connection {:?} - {:?}", addr, err);
170 }
171 }
172 }
173
174 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
175 self.peer.disconnect(connection_id).await;
176 let removed_connection = self.state_mut().remove_connection(connection_id)?;
177
178 for (project_id, project) in removed_connection.hosted_projects {
179 if let Some(share) = project.share {
180 broadcast(
181 connection_id,
182 share.guests.keys().copied().collect(),
183 |conn_id| {
184 self.peer
185 .send(conn_id, proto::UnshareProject { project_id })
186 },
187 )
188 .await?;
189 }
190 }
191
192 for (project_id, peer_ids) in removed_connection.guest_project_ids {
193 broadcast(connection_id, peer_ids, |conn_id| {
194 self.peer.send(
195 conn_id,
196 proto::RemoveProjectCollaborator {
197 project_id,
198 peer_id: connection_id.0,
199 },
200 )
201 })
202 .await?;
203 }
204
205 self.update_contacts_for_users(removed_connection.contact_ids.iter())
206 .await?;
207
208 Ok(())
209 }
210
211 async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
212 self.peer.respond(request.receipt(), proto::Ack {}).await?;
213 Ok(())
214 }
215
216 async fn register_project(
217 mut self: Arc<Server>,
218 request: TypedEnvelope<proto::RegisterProject>,
219 ) -> tide::Result<()> {
220 let project_id = {
221 let mut state = self.state_mut();
222 let user_id = state.user_id_for_connection(request.sender_id)?;
223 state.register_project(request.sender_id, user_id)
224 };
225 self.peer
226 .respond(
227 request.receipt(),
228 proto::RegisterProjectResponse { project_id },
229 )
230 .await?;
231 Ok(())
232 }
233
234 async fn unregister_project(
235 mut self: Arc<Server>,
236 request: TypedEnvelope<proto::UnregisterProject>,
237 ) -> tide::Result<()> {
238 let project = self
239 .state_mut()
240 .unregister_project(request.payload.project_id, request.sender_id)
241 .ok_or_else(|| anyhow!("no such project"))?;
242 self.update_contacts_for_users(project.authorized_user_ids().iter())
243 .await?;
244 Ok(())
245 }
246
247 async fn share_project(
248 mut self: Arc<Server>,
249 request: TypedEnvelope<proto::ShareProject>,
250 ) -> tide::Result<()> {
251 self.state_mut()
252 .share_project(request.payload.project_id, request.sender_id);
253 self.peer.respond(request.receipt(), proto::Ack {}).await?;
254 Ok(())
255 }
256
257 async fn unshare_project(
258 mut self: Arc<Server>,
259 request: TypedEnvelope<proto::UnshareProject>,
260 ) -> tide::Result<()> {
261 let project_id = request.payload.project_id;
262 let project = self
263 .state_mut()
264 .unshare_project(project_id, request.sender_id)?;
265
266 broadcast(request.sender_id, project.connection_ids, |conn_id| {
267 self.peer
268 .send(conn_id, proto::UnshareProject { project_id })
269 })
270 .await?;
271 self.update_contacts_for_users(&project.authorized_user_ids)
272 .await?;
273
274 Ok(())
275 }
276
277 async fn join_project(
278 mut self: Arc<Server>,
279 request: TypedEnvelope<proto::JoinProject>,
280 ) -> tide::Result<()> {
281 let project_id = request.payload.project_id;
282
283 let user_id = self.state().user_id_for_connection(request.sender_id)?;
284 let response_data = self
285 .state_mut()
286 .join_project(request.sender_id, user_id, project_id)
287 .and_then(|joined| {
288 let share = joined.project.share()?;
289 let peer_count = share.guests.len();
290 let mut collaborators = Vec::with_capacity(peer_count);
291 collaborators.push(proto::Collaborator {
292 peer_id: joined.project.host_connection_id.0,
293 replica_id: 0,
294 user_id: joined.project.host_user_id.to_proto(),
295 });
296 let worktrees = joined
297 .project
298 .worktrees
299 .iter()
300 .filter_map(|(id, worktree)| {
301 worktree.share.as_ref().map(|share| proto::Worktree {
302 id: *id,
303 root_name: worktree.root_name.clone(),
304 entries: share.entries.values().cloned().collect(),
305 diagnostic_summaries: share
306 .diagnostic_summaries
307 .values()
308 .cloned()
309 .collect(),
310 })
311 })
312 .collect();
313 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
314 if *peer_conn_id != request.sender_id {
315 collaborators.push(proto::Collaborator {
316 peer_id: peer_conn_id.0,
317 replica_id: *peer_replica_id as u32,
318 user_id: peer_user_id.to_proto(),
319 });
320 }
321 }
322 let response = proto::JoinProjectResponse {
323 worktrees,
324 replica_id: joined.replica_id as u32,
325 collaborators,
326 };
327 let connection_ids = joined.project.connection_ids();
328 let contact_user_ids = joined.project.authorized_user_ids();
329 Ok((response, connection_ids, contact_user_ids))
330 });
331
332 match response_data {
333 Ok((response, connection_ids, contact_user_ids)) => {
334 broadcast(request.sender_id, connection_ids, |conn_id| {
335 self.peer.send(
336 conn_id,
337 proto::AddProjectCollaborator {
338 project_id: project_id,
339 collaborator: Some(proto::Collaborator {
340 peer_id: request.sender_id.0,
341 replica_id: response.replica_id,
342 user_id: user_id.to_proto(),
343 }),
344 },
345 )
346 })
347 .await?;
348 self.peer.respond(request.receipt(), response).await?;
349 self.update_contacts_for_users(&contact_user_ids).await?;
350 }
351 Err(error) => {
352 self.peer
353 .respond_with_error(
354 request.receipt(),
355 proto::Error {
356 message: error.to_string(),
357 },
358 )
359 .await?;
360 }
361 }
362
363 Ok(())
364 }
365
366 async fn leave_project(
367 mut self: Arc<Server>,
368 request: TypedEnvelope<proto::LeaveProject>,
369 ) -> tide::Result<()> {
370 let sender_id = request.sender_id;
371 let project_id = request.payload.project_id;
372 let worktree = self.state_mut().leave_project(sender_id, project_id);
373 if let Some(worktree) = worktree {
374 broadcast(sender_id, worktree.connection_ids, |conn_id| {
375 self.peer.send(
376 conn_id,
377 proto::RemoveProjectCollaborator {
378 project_id,
379 peer_id: sender_id.0,
380 },
381 )
382 })
383 .await?;
384 self.update_contacts_for_users(&worktree.authorized_user_ids)
385 .await?;
386 }
387 Ok(())
388 }
389
390 async fn register_worktree(
391 mut self: Arc<Server>,
392 request: TypedEnvelope<proto::RegisterWorktree>,
393 ) -> tide::Result<()> {
394 let receipt = request.receipt();
395 let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
396
397 let mut contact_user_ids = HashSet::default();
398 contact_user_ids.insert(host_user_id);
399 for github_login in request.payload.authorized_logins {
400 match self.app_state.db.create_user(&github_login, false).await {
401 Ok(contact_user_id) => {
402 contact_user_ids.insert(contact_user_id);
403 }
404 Err(err) => {
405 let message = err.to_string();
406 self.peer
407 .respond_with_error(receipt, proto::Error { message })
408 .await?;
409 return Ok(());
410 }
411 }
412 }
413
414 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
415 let ok = self.state_mut().register_worktree(
416 request.payload.project_id,
417 request.payload.worktree_id,
418 Worktree {
419 authorized_user_ids: contact_user_ids.clone(),
420 root_name: request.payload.root_name,
421 share: None,
422 },
423 );
424
425 if ok {
426 self.peer.respond(receipt, proto::Ack {}).await?;
427 self.update_contacts_for_users(&contact_user_ids).await?;
428 } else {
429 self.peer
430 .respond_with_error(
431 receipt,
432 proto::Error {
433 message: NO_SUCH_PROJECT.to_string(),
434 },
435 )
436 .await?;
437 }
438
439 Ok(())
440 }
441
442 async fn unregister_worktree(
443 mut self: Arc<Server>,
444 request: TypedEnvelope<proto::UnregisterWorktree>,
445 ) -> tide::Result<()> {
446 let project_id = request.payload.project_id;
447 let worktree_id = request.payload.worktree_id;
448 let (worktree, guest_connection_ids) =
449 self.state_mut()
450 .unregister_worktree(project_id, worktree_id, request.sender_id)?;
451
452 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
453 self.peer.send(
454 conn_id,
455 proto::UnregisterWorktree {
456 project_id,
457 worktree_id,
458 },
459 )
460 })
461 .await?;
462 self.update_contacts_for_users(&worktree.authorized_user_ids)
463 .await?;
464 Ok(())
465 }
466
467 async fn share_worktree(
468 mut self: Arc<Server>,
469 mut request: TypedEnvelope<proto::ShareWorktree>,
470 ) -> tide::Result<()> {
471 let worktree = request
472 .payload
473 .worktree
474 .as_mut()
475 .ok_or_else(|| anyhow!("missing worktree"))?;
476 let entries = mem::take(&mut worktree.entries)
477 .into_iter()
478 .map(|entry| (entry.id, entry))
479 .collect();
480
481 let diagnostic_summaries = mem::take(&mut worktree.diagnostic_summaries)
482 .into_iter()
483 .map(|summary| (PathBuf::from(summary.path.clone()), summary))
484 .collect();
485
486 let contact_user_ids = self.state_mut().share_worktree(
487 request.payload.project_id,
488 worktree.id,
489 request.sender_id,
490 entries,
491 diagnostic_summaries,
492 );
493 if let Some(contact_user_ids) = contact_user_ids {
494 self.peer.respond(request.receipt(), proto::Ack {}).await?;
495 self.update_contacts_for_users(&contact_user_ids).await?;
496 } else {
497 self.peer
498 .respond_with_error(
499 request.receipt(),
500 proto::Error {
501 message: "no such worktree".to_string(),
502 },
503 )
504 .await?;
505 }
506 Ok(())
507 }
508
509 async fn update_worktree(
510 mut self: Arc<Server>,
511 request: TypedEnvelope<proto::UpdateWorktree>,
512 ) -> tide::Result<()> {
513 let connection_ids = self
514 .state_mut()
515 .update_worktree(
516 request.sender_id,
517 request.payload.project_id,
518 request.payload.worktree_id,
519 &request.payload.removed_entries,
520 &request.payload.updated_entries,
521 )
522 .ok_or_else(|| anyhow!("no such worktree"))?;
523
524 broadcast(request.sender_id, connection_ids, |connection_id| {
525 self.peer
526 .forward_send(request.sender_id, connection_id, request.payload.clone())
527 })
528 .await?;
529
530 Ok(())
531 }
532
533 async fn update_diagnostic_summary(
534 mut self: Arc<Server>,
535 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
536 ) -> tide::Result<()> {
537 let receiver_ids = request
538 .payload
539 .summary
540 .clone()
541 .and_then(|summary| {
542 self.state_mut().update_diagnostic_summary(
543 request.payload.project_id,
544 request.payload.worktree_id,
545 request.sender_id,
546 summary,
547 )
548 })
549 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
550
551 broadcast(request.sender_id, receiver_ids, |connection_id| {
552 self.peer
553 .forward_send(request.sender_id, connection_id, request.payload.clone())
554 })
555 .await?;
556 Ok(())
557 }
558
559 async fn disk_based_diagnostics_updated(
560 self: Arc<Server>,
561 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
562 ) -> tide::Result<()> {
563 let receiver_ids = self
564 .state()
565 .project_connection_ids(request.payload.project_id, request.sender_id)
566 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
567 broadcast(request.sender_id, receiver_ids, |connection_id| {
568 self.peer
569 .forward_send(request.sender_id, connection_id, request.payload.clone())
570 })
571 .await?;
572 Ok(())
573 }
574
575 async fn open_buffer(
576 self: Arc<Server>,
577 request: TypedEnvelope<proto::OpenBuffer>,
578 ) -> tide::Result<()> {
579 let receipt = request.receipt();
580 let host_connection_id = self
581 .state()
582 .read_project(request.payload.project_id, request.sender_id)
583 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
584 .host_connection_id;
585 let response = self
586 .peer
587 .forward_request(request.sender_id, host_connection_id, request.payload)
588 .await?;
589 self.peer.respond(receipt, response).await?;
590 Ok(())
591 }
592
593 async fn close_buffer(
594 self: Arc<Server>,
595 request: TypedEnvelope<proto::CloseBuffer>,
596 ) -> tide::Result<()> {
597 let host_connection_id = self
598 .state()
599 .read_project(request.payload.project_id, request.sender_id)
600 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
601 .host_connection_id;
602 self.peer
603 .forward_send(request.sender_id, host_connection_id, request.payload)
604 .await?;
605 Ok(())
606 }
607
608 async fn save_buffer(
609 self: Arc<Server>,
610 request: TypedEnvelope<proto::SaveBuffer>,
611 ) -> tide::Result<()> {
612 let host;
613 let guests;
614 {
615 let state = self.state();
616 let project = state
617 .read_project(request.payload.project_id, request.sender_id)
618 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
619 host = project.host_connection_id;
620 guests = project.guest_connection_ids()
621 }
622
623 let sender = request.sender_id;
624 let receipt = request.receipt();
625 let response = self
626 .peer
627 .forward_request(sender, host, request.payload.clone())
628 .await?;
629
630 broadcast(host, guests, |conn_id| {
631 let response = response.clone();
632 let peer = &self.peer;
633 async move {
634 if conn_id == sender {
635 peer.respond(receipt, response).await
636 } else {
637 peer.forward_send(host, conn_id, response).await
638 }
639 }
640 })
641 .await?;
642
643 Ok(())
644 }
645
646 async fn update_buffer(
647 self: Arc<Server>,
648 request: TypedEnvelope<proto::UpdateBuffer>,
649 ) -> tide::Result<()> {
650 let receiver_ids = self
651 .state()
652 .project_connection_ids(request.payload.project_id, request.sender_id)
653 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
654 broadcast(request.sender_id, receiver_ids, |connection_id| {
655 self.peer
656 .forward_send(request.sender_id, connection_id, request.payload.clone())
657 })
658 .await?;
659 self.peer.respond(request.receipt(), proto::Ack {}).await?;
660 Ok(())
661 }
662
663 async fn buffer_saved(
664 self: Arc<Server>,
665 request: TypedEnvelope<proto::BufferSaved>,
666 ) -> tide::Result<()> {
667 let receiver_ids = self
668 .state()
669 .project_connection_ids(request.payload.project_id, request.sender_id)
670 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
671 broadcast(request.sender_id, receiver_ids, |connection_id| {
672 self.peer
673 .forward_send(request.sender_id, connection_id, request.payload.clone())
674 })
675 .await?;
676 Ok(())
677 }
678
679 async fn get_channels(
680 self: Arc<Server>,
681 request: TypedEnvelope<proto::GetChannels>,
682 ) -> tide::Result<()> {
683 let user_id = self.state().user_id_for_connection(request.sender_id)?;
684 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
685 self.peer
686 .respond(
687 request.receipt(),
688 proto::GetChannelsResponse {
689 channels: channels
690 .into_iter()
691 .map(|chan| proto::Channel {
692 id: chan.id.to_proto(),
693 name: chan.name,
694 })
695 .collect(),
696 },
697 )
698 .await?;
699 Ok(())
700 }
701
702 async fn get_users(
703 self: Arc<Server>,
704 request: TypedEnvelope<proto::GetUsers>,
705 ) -> tide::Result<()> {
706 let receipt = request.receipt();
707 let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
708 let users = self
709 .app_state
710 .db
711 .get_users_by_ids(user_ids)
712 .await?
713 .into_iter()
714 .map(|user| proto::User {
715 id: user.id.to_proto(),
716 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
717 github_login: user.github_login,
718 })
719 .collect();
720 self.peer
721 .respond(receipt, proto::GetUsersResponse { users })
722 .await?;
723 Ok(())
724 }
725
726 async fn update_contacts_for_users<'a>(
727 self: &Arc<Server>,
728 user_ids: impl IntoIterator<Item = &'a UserId>,
729 ) -> tide::Result<()> {
730 let mut send_futures = Vec::new();
731
732 {
733 let state = self.state();
734 for user_id in user_ids {
735 let contacts = state.contacts_for_user(*user_id);
736 for connection_id in state.connection_ids_for_user(*user_id) {
737 send_futures.push(self.peer.send(
738 connection_id,
739 proto::UpdateContacts {
740 contacts: contacts.clone(),
741 },
742 ));
743 }
744 }
745 }
746 futures::future::try_join_all(send_futures).await?;
747
748 Ok(())
749 }
750
751 async fn join_channel(
752 mut self: Arc<Self>,
753 request: TypedEnvelope<proto::JoinChannel>,
754 ) -> tide::Result<()> {
755 let user_id = self.state().user_id_for_connection(request.sender_id)?;
756 let channel_id = ChannelId::from_proto(request.payload.channel_id);
757 if !self
758 .app_state
759 .db
760 .can_user_access_channel(user_id, channel_id)
761 .await?
762 {
763 Err(anyhow!("access denied"))?;
764 }
765
766 self.state_mut().join_channel(request.sender_id, channel_id);
767 let messages = self
768 .app_state
769 .db
770 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
771 .await?
772 .into_iter()
773 .map(|msg| proto::ChannelMessage {
774 id: msg.id.to_proto(),
775 body: msg.body,
776 timestamp: msg.sent_at.unix_timestamp() as u64,
777 sender_id: msg.sender_id.to_proto(),
778 nonce: Some(msg.nonce.as_u128().into()),
779 })
780 .collect::<Vec<_>>();
781 self.peer
782 .respond(
783 request.receipt(),
784 proto::JoinChannelResponse {
785 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
786 messages,
787 },
788 )
789 .await?;
790 Ok(())
791 }
792
793 async fn leave_channel(
794 mut self: Arc<Self>,
795 request: TypedEnvelope<proto::LeaveChannel>,
796 ) -> tide::Result<()> {
797 let user_id = self.state().user_id_for_connection(request.sender_id)?;
798 let channel_id = ChannelId::from_proto(request.payload.channel_id);
799 if !self
800 .app_state
801 .db
802 .can_user_access_channel(user_id, channel_id)
803 .await?
804 {
805 Err(anyhow!("access denied"))?;
806 }
807
808 self.state_mut()
809 .leave_channel(request.sender_id, channel_id);
810
811 Ok(())
812 }
813
814 async fn send_channel_message(
815 self: Arc<Self>,
816 request: TypedEnvelope<proto::SendChannelMessage>,
817 ) -> tide::Result<()> {
818 let receipt = request.receipt();
819 let channel_id = ChannelId::from_proto(request.payload.channel_id);
820 let user_id;
821 let connection_ids;
822 {
823 let state = self.state();
824 user_id = state.user_id_for_connection(request.sender_id)?;
825 if let Some(ids) = state.channel_connection_ids(channel_id) {
826 connection_ids = ids;
827 } else {
828 return Ok(());
829 }
830 }
831
832 // Validate the message body.
833 let body = request.payload.body.trim().to_string();
834 if body.len() > MAX_MESSAGE_LEN {
835 self.peer
836 .respond_with_error(
837 receipt,
838 proto::Error {
839 message: "message is too long".to_string(),
840 },
841 )
842 .await?;
843 return Ok(());
844 }
845 if body.is_empty() {
846 self.peer
847 .respond_with_error(
848 receipt,
849 proto::Error {
850 message: "message can't be blank".to_string(),
851 },
852 )
853 .await?;
854 return Ok(());
855 }
856
857 let timestamp = OffsetDateTime::now_utc();
858 let nonce = if let Some(nonce) = request.payload.nonce {
859 nonce
860 } else {
861 self.peer
862 .respond_with_error(
863 receipt,
864 proto::Error {
865 message: "nonce can't be blank".to_string(),
866 },
867 )
868 .await?;
869 return Ok(());
870 };
871
872 let message_id = self
873 .app_state
874 .db
875 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
876 .await?
877 .to_proto();
878 let message = proto::ChannelMessage {
879 sender_id: user_id.to_proto(),
880 id: message_id,
881 body,
882 timestamp: timestamp.unix_timestamp() as u64,
883 nonce: Some(nonce),
884 };
885 broadcast(request.sender_id, connection_ids, |conn_id| {
886 self.peer.send(
887 conn_id,
888 proto::ChannelMessageSent {
889 channel_id: channel_id.to_proto(),
890 message: Some(message.clone()),
891 },
892 )
893 })
894 .await?;
895 self.peer
896 .respond(
897 receipt,
898 proto::SendChannelMessageResponse {
899 message: Some(message),
900 },
901 )
902 .await?;
903 Ok(())
904 }
905
906 async fn get_channel_messages(
907 self: Arc<Self>,
908 request: TypedEnvelope<proto::GetChannelMessages>,
909 ) -> tide::Result<()> {
910 let user_id = self.state().user_id_for_connection(request.sender_id)?;
911 let channel_id = ChannelId::from_proto(request.payload.channel_id);
912 if !self
913 .app_state
914 .db
915 .can_user_access_channel(user_id, channel_id)
916 .await?
917 {
918 Err(anyhow!("access denied"))?;
919 }
920
921 let messages = self
922 .app_state
923 .db
924 .get_channel_messages(
925 channel_id,
926 MESSAGE_COUNT_PER_PAGE,
927 Some(MessageId::from_proto(request.payload.before_message_id)),
928 )
929 .await?
930 .into_iter()
931 .map(|msg| proto::ChannelMessage {
932 id: msg.id.to_proto(),
933 body: msg.body,
934 timestamp: msg.sent_at.unix_timestamp() as u64,
935 sender_id: msg.sender_id.to_proto(),
936 nonce: Some(msg.nonce.as_u128().into()),
937 })
938 .collect::<Vec<_>>();
939 self.peer
940 .respond(
941 request.receipt(),
942 proto::GetChannelMessagesResponse {
943 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
944 messages,
945 },
946 )
947 .await?;
948 Ok(())
949 }
950
951 fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
952 self.store.read()
953 }
954
955 fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
956 self.store.write()
957 }
958}
959
960pub async fn broadcast<F, T>(
961 sender_id: ConnectionId,
962 receiver_ids: Vec<ConnectionId>,
963 mut f: F,
964) -> anyhow::Result<()>
965where
966 F: FnMut(ConnectionId) -> T,
967 T: Future<Output = anyhow::Result<()>>,
968{
969 let futures = receiver_ids
970 .into_iter()
971 .filter(|id| *id != sender_id)
972 .map(|id| f(id));
973 futures::future::try_join_all(futures).await?;
974 Ok(())
975}
976
977pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
978 let server = Server::new(app.state().clone(), rpc.clone(), None);
979 app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
980 let server = server.clone();
981 async move {
982 const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
983
984 let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
985 let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
986 let upgrade_requested = connection_upgrade && upgrade_to_websocket;
987 let client_protocol_version: Option<u32> = request
988 .header("X-Zed-Protocol-Version")
989 .and_then(|v| v.as_str().parse().ok());
990
991 if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
992 return Ok(Response::new(StatusCode::UpgradeRequired));
993 }
994
995 let header = match request.header("Sec-Websocket-Key") {
996 Some(h) => h.as_str(),
997 None => return Err(anyhow!("expected sec-websocket-key"))?,
998 };
999
1000 let user_id = process_auth_header(&request).await?;
1001
1002 let mut response = Response::new(StatusCode::SwitchingProtocols);
1003 response.insert_header(UPGRADE, "websocket");
1004 response.insert_header(CONNECTION, "Upgrade");
1005 let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1006 response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1007 response.insert_header("Sec-Websocket-Version", "13");
1008
1009 let http_res: &mut tide::http::Response = response.as_mut();
1010 let upgrade_receiver = http_res.recv_upgrade().await;
1011 let addr = request.remote().unwrap_or("unknown").to_string();
1012 task::spawn(async move {
1013 if let Some(stream) = upgrade_receiver.await {
1014 server
1015 .handle_connection(
1016 Connection::new(
1017 WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1018 ),
1019 addr,
1020 user_id,
1021 None,
1022 )
1023 .await;
1024 }
1025 });
1026
1027 Ok(response)
1028 }
1029 });
1030}
1031
1032fn header_contains_ignore_case<T>(
1033 request: &tide::Request<T>,
1034 header_name: HeaderName,
1035 value: &str,
1036) -> bool {
1037 request
1038 .header(header_name)
1039 .map(|h| {
1040 h.as_str()
1041 .split(',')
1042 .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1043 })
1044 .unwrap_or(false)
1045}
1046
1047#[cfg(test)]
1048mod tests {
1049 use super::*;
1050 use crate::{
1051 auth,
1052 db::{tests::TestDb, UserId},
1053 github, AppState, Config,
1054 };
1055 use ::rpc::Peer;
1056 use async_std::task;
1057 use gpui::{ModelHandle, TestAppContext};
1058 use parking_lot::Mutex;
1059 use postage::{mpsc, watch};
1060 use rpc::PeerId;
1061 use serde_json::json;
1062 use sqlx::types::time::OffsetDateTime;
1063 use std::{
1064 ops::Deref,
1065 path::Path,
1066 sync::{
1067 atomic::{AtomicBool, Ordering::SeqCst},
1068 Arc,
1069 },
1070 time::Duration,
1071 };
1072 use zed::{
1073 client::{
1074 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1075 EstablishConnectionError, UserStore,
1076 },
1077 editor::{Editor, EditorSettings, Input, MultiBuffer},
1078 fs::{FakeFs, Fs as _},
1079 language::{
1080 tree_sitter_rust, Diagnostic, DiagnosticEntry, Language, LanguageConfig,
1081 LanguageRegistry, LanguageServerConfig, Point,
1082 },
1083 lsp,
1084 project::{DiagnosticSummary, Project, ProjectPath},
1085 };
1086
1087 #[gpui::test]
1088 async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1089 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1090 let lang_registry = Arc::new(LanguageRegistry::new());
1091 let fs = Arc::new(FakeFs::new());
1092 cx_a.foreground().forbid_parking();
1093
1094 // Connect to a server as 2 clients.
1095 let mut server = TestServer::start().await;
1096 let client_a = server.create_client(&mut cx_a, "user_a").await;
1097 let client_b = server.create_client(&mut cx_b, "user_b").await;
1098
1099 // Share a project as client A
1100 fs.insert_tree(
1101 "/a",
1102 json!({
1103 ".zed.toml": r#"collaborators = ["user_b"]"#,
1104 "a.txt": "a-contents",
1105 "b.txt": "b-contents",
1106 }),
1107 )
1108 .await;
1109 let project_a = cx_a.update(|cx| {
1110 Project::local(
1111 client_a.clone(),
1112 client_a.user_store.clone(),
1113 lang_registry.clone(),
1114 fs.clone(),
1115 cx,
1116 )
1117 });
1118 let worktree_a = project_a
1119 .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1120 .await
1121 .unwrap();
1122 worktree_a
1123 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1124 .await;
1125 let project_id = project_a
1126 .update(&mut cx_a, |project, _| project.next_remote_id())
1127 .await;
1128 project_a
1129 .update(&mut cx_a, |project, cx| project.share(cx))
1130 .await
1131 .unwrap();
1132
1133 // Join that project as client B
1134 let project_b = Project::remote(
1135 project_id,
1136 client_b.clone(),
1137 client_b.user_store.clone(),
1138 lang_registry.clone(),
1139 fs.clone(),
1140 &mut cx_b.to_async(),
1141 )
1142 .await
1143 .unwrap();
1144 let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1145
1146 let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1147 assert_eq!(
1148 project
1149 .collaborators()
1150 .get(&client_a.peer_id)
1151 .unwrap()
1152 .user
1153 .github_login,
1154 "user_a"
1155 );
1156 project.replica_id()
1157 });
1158 project_a
1159 .condition(&cx_a, |tree, _| {
1160 tree.collaborators()
1161 .get(&client_b.peer_id)
1162 .map_or(false, |collaborator| {
1163 collaborator.replica_id == replica_id_b
1164 && collaborator.user.github_login == "user_b"
1165 })
1166 })
1167 .await;
1168
1169 // Open the same file as client B and client A.
1170 let buffer_b = worktree_b
1171 .update(&mut cx_b, |worktree, cx| worktree.open_buffer("b.txt", cx))
1172 .await
1173 .unwrap();
1174 let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1175 buffer_b.read_with(&cx_b, |buf, cx| {
1176 assert_eq!(buf.read(cx).text(), "b-contents")
1177 });
1178 worktree_a.read_with(&cx_a, |tree, cx| assert!(tree.has_open_buffer("b.txt", cx)));
1179 let buffer_a = worktree_a
1180 .update(&mut cx_a, |tree, cx| tree.open_buffer("b.txt", cx))
1181 .await
1182 .unwrap();
1183
1184 let editor_b = cx_b.add_view(window_b, |cx| {
1185 Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), cx)
1186 });
1187 // TODO
1188 // // Create a selection set as client B and see that selection set as client A.
1189 // buffer_a
1190 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1191 // .await;
1192
1193 // Edit the buffer as client B and see that edit as client A.
1194 editor_b.update(&mut cx_b, |editor, cx| {
1195 editor.handle_input(&Input("ok, ".into()), cx)
1196 });
1197 buffer_a
1198 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1199 .await;
1200
1201 // TODO
1202 // // Remove the selection set as client B, see those selections disappear as client A.
1203 cx_b.update(move |_| drop(editor_b));
1204 // buffer_a
1205 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1206 // .await;
1207
1208 // Close the buffer as client A, see that the buffer is closed.
1209 cx_a.update(move |_| drop(buffer_a));
1210 worktree_a
1211 .condition(&cx_a, |tree, cx| !tree.has_open_buffer("b.txt", cx))
1212 .await;
1213
1214 // Dropping the client B's project removes client B from client A's collaborators.
1215 cx_b.update(move |_| drop(project_b));
1216 project_a
1217 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1218 .await;
1219 }
1220
1221 #[gpui::test]
1222 async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1223 let lang_registry = Arc::new(LanguageRegistry::new());
1224 let fs = Arc::new(FakeFs::new());
1225 cx_a.foreground().forbid_parking();
1226
1227 // Connect to a server as 2 clients.
1228 let mut server = TestServer::start().await;
1229 let client_a = server.create_client(&mut cx_a, "user_a").await;
1230 let client_b = server.create_client(&mut cx_b, "user_b").await;
1231
1232 // Share a project as client A
1233 fs.insert_tree(
1234 "/a",
1235 json!({
1236 ".zed.toml": r#"collaborators = ["user_b"]"#,
1237 "a.txt": "a-contents",
1238 "b.txt": "b-contents",
1239 }),
1240 )
1241 .await;
1242 let project_a = cx_a.update(|cx| {
1243 Project::local(
1244 client_a.clone(),
1245 client_a.user_store.clone(),
1246 lang_registry.clone(),
1247 fs.clone(),
1248 cx,
1249 )
1250 });
1251 let worktree_a = project_a
1252 .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1253 .await
1254 .unwrap();
1255 worktree_a
1256 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1257 .await;
1258 let project_id = project_a
1259 .update(&mut cx_a, |project, _| project.next_remote_id())
1260 .await;
1261 project_a
1262 .update(&mut cx_a, |project, cx| project.share(cx))
1263 .await
1264 .unwrap();
1265
1266 // Join that project as client B
1267 let project_b = Project::remote(
1268 project_id,
1269 client_b.clone(),
1270 client_b.user_store.clone(),
1271 lang_registry.clone(),
1272 fs.clone(),
1273 &mut cx_b.to_async(),
1274 )
1275 .await
1276 .unwrap();
1277
1278 let worktree_b = project_b.read_with(&cx_b, |p, _| p.worktrees()[0].clone());
1279 worktree_b
1280 .update(&mut cx_b, |tree, cx| tree.open_buffer("a.txt", cx))
1281 .await
1282 .unwrap();
1283
1284 project_a
1285 .update(&mut cx_a, |project, cx| project.unshare(cx))
1286 .await
1287 .unwrap();
1288 project_b
1289 .condition(&mut cx_b, |project, _| project.is_read_only())
1290 .await;
1291 }
1292
1293 #[gpui::test]
1294 async fn test_propagate_saves_and_fs_changes(
1295 mut cx_a: TestAppContext,
1296 mut cx_b: TestAppContext,
1297 mut cx_c: TestAppContext,
1298 ) {
1299 let lang_registry = Arc::new(LanguageRegistry::new());
1300 let fs = Arc::new(FakeFs::new());
1301 cx_a.foreground().forbid_parking();
1302
1303 // Connect to a server as 3 clients.
1304 let mut server = TestServer::start().await;
1305 let client_a = server.create_client(&mut cx_a, "user_a").await;
1306 let client_b = server.create_client(&mut cx_b, "user_b").await;
1307 let client_c = server.create_client(&mut cx_c, "user_c").await;
1308
1309 // Share a worktree as client A.
1310 fs.insert_tree(
1311 "/a",
1312 json!({
1313 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1314 "file1": "",
1315 "file2": ""
1316 }),
1317 )
1318 .await;
1319 let project_a = cx_a.update(|cx| {
1320 Project::local(
1321 client_a.clone(),
1322 client_a.user_store.clone(),
1323 lang_registry.clone(),
1324 fs.clone(),
1325 cx,
1326 )
1327 });
1328 let worktree_a = project_a
1329 .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1330 .await
1331 .unwrap();
1332 worktree_a
1333 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1334 .await;
1335 let project_id = project_a
1336 .update(&mut cx_a, |project, _| project.next_remote_id())
1337 .await;
1338 project_a
1339 .update(&mut cx_a, |project, cx| project.share(cx))
1340 .await
1341 .unwrap();
1342
1343 // Join that worktree as clients B and C.
1344 let project_b = Project::remote(
1345 project_id,
1346 client_b.clone(),
1347 client_b.user_store.clone(),
1348 lang_registry.clone(),
1349 fs.clone(),
1350 &mut cx_b.to_async(),
1351 )
1352 .await
1353 .unwrap();
1354 let project_c = Project::remote(
1355 project_id,
1356 client_c.clone(),
1357 client_c.user_store.clone(),
1358 lang_registry.clone(),
1359 fs.clone(),
1360 &mut cx_c.to_async(),
1361 )
1362 .await
1363 .unwrap();
1364
1365 // Open and edit a buffer as both guests B and C.
1366 let worktree_b = project_b.read_with(&cx_b, |p, _| p.worktrees()[0].clone());
1367 let worktree_c = project_c.read_with(&cx_c, |p, _| p.worktrees()[0].clone());
1368 let buffer_b = worktree_b
1369 .update(&mut cx_b, |tree, cx| tree.open_buffer("file1", cx))
1370 .await
1371 .unwrap();
1372 let buffer_c = worktree_c
1373 .update(&mut cx_c, |tree, cx| tree.open_buffer("file1", cx))
1374 .await
1375 .unwrap();
1376 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1377 buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1378
1379 // Open and edit that buffer as the host.
1380 let buffer_a = worktree_a
1381 .update(&mut cx_a, |tree, cx| tree.open_buffer("file1", cx))
1382 .await
1383 .unwrap();
1384
1385 buffer_a
1386 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1387 .await;
1388 buffer_a.update(&mut cx_a, |buf, cx| {
1389 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1390 });
1391
1392 // Wait for edits to propagate
1393 buffer_a
1394 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1395 .await;
1396 buffer_b
1397 .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1398 .await;
1399 buffer_c
1400 .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1401 .await;
1402
1403 // Edit the buffer as the host and concurrently save as guest B.
1404 let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx).unwrap());
1405 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1406 save_b.await.unwrap();
1407 assert_eq!(
1408 fs.load("/a/file1".as_ref()).await.unwrap(),
1409 "hi-a, i-am-c, i-am-b, i-am-a"
1410 );
1411 buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1412 buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1413 buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1414
1415 // Make changes on host's file system, see those changes on the guests.
1416 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
1417 .await
1418 .unwrap();
1419 fs.insert_file(Path::new("/a/file4"), "4".into())
1420 .await
1421 .unwrap();
1422
1423 worktree_b
1424 .condition(&cx_b, |tree, _| tree.file_count() == 4)
1425 .await;
1426 worktree_c
1427 .condition(&cx_c, |tree, _| tree.file_count() == 4)
1428 .await;
1429 worktree_b.read_with(&cx_b, |tree, _| {
1430 assert_eq!(
1431 tree.paths()
1432 .map(|p| p.to_string_lossy())
1433 .collect::<Vec<_>>(),
1434 &[".zed.toml", "file1", "file3", "file4"]
1435 )
1436 });
1437 worktree_c.read_with(&cx_c, |tree, _| {
1438 assert_eq!(
1439 tree.paths()
1440 .map(|p| p.to_string_lossy())
1441 .collect::<Vec<_>>(),
1442 &[".zed.toml", "file1", "file3", "file4"]
1443 )
1444 });
1445 }
1446
1447 #[gpui::test]
1448 async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1449 cx_a.foreground().forbid_parking();
1450 let lang_registry = Arc::new(LanguageRegistry::new());
1451 let fs = Arc::new(FakeFs::new());
1452
1453 // Connect to a server as 2 clients.
1454 let mut server = TestServer::start().await;
1455 let client_a = server.create_client(&mut cx_a, "user_a").await;
1456 let client_b = server.create_client(&mut cx_b, "user_b").await;
1457
1458 // Share a project as client A
1459 fs.insert_tree(
1460 "/dir",
1461 json!({
1462 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1463 "a.txt": "a-contents",
1464 }),
1465 )
1466 .await;
1467
1468 let project_a = cx_a.update(|cx| {
1469 Project::local(
1470 client_a.clone(),
1471 client_a.user_store.clone(),
1472 lang_registry.clone(),
1473 fs.clone(),
1474 cx,
1475 )
1476 });
1477 let worktree_a = project_a
1478 .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1479 .await
1480 .unwrap();
1481 worktree_a
1482 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1483 .await;
1484 let project_id = project_a
1485 .update(&mut cx_a, |project, _| project.next_remote_id())
1486 .await;
1487 project_a
1488 .update(&mut cx_a, |project, cx| project.share(cx))
1489 .await
1490 .unwrap();
1491
1492 // Join that project as client B
1493 let project_b = Project::remote(
1494 project_id,
1495 client_b.clone(),
1496 client_b.user_store.clone(),
1497 lang_registry.clone(),
1498 fs.clone(),
1499 &mut cx_b.to_async(),
1500 )
1501 .await
1502 .unwrap();
1503 let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1504
1505 // Open a buffer as client B
1506 let buffer_b = worktree_b
1507 .update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx))
1508 .await
1509 .unwrap();
1510 let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1511
1512 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1513 buffer_b.read_with(&cx_b, |buf, _| {
1514 assert!(buf.is_dirty());
1515 assert!(!buf.has_conflict());
1516 });
1517
1518 buffer_b
1519 .update(&mut cx_b, |buf, cx| buf.save(cx))
1520 .unwrap()
1521 .await
1522 .unwrap();
1523 worktree_b
1524 .condition(&cx_b, |_, cx| {
1525 buffer_b.read(cx).file().unwrap().mtime() != mtime
1526 })
1527 .await;
1528 buffer_b.read_with(&cx_b, |buf, _| {
1529 assert!(!buf.is_dirty());
1530 assert!(!buf.has_conflict());
1531 });
1532
1533 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1534 buffer_b.read_with(&cx_b, |buf, _| {
1535 assert!(buf.is_dirty());
1536 assert!(!buf.has_conflict());
1537 });
1538 }
1539
1540 #[gpui::test]
1541 async fn test_editing_while_guest_opens_buffer(
1542 mut cx_a: TestAppContext,
1543 mut cx_b: TestAppContext,
1544 ) {
1545 cx_a.foreground().forbid_parking();
1546 let lang_registry = Arc::new(LanguageRegistry::new());
1547 let fs = Arc::new(FakeFs::new());
1548
1549 // Connect to a server as 2 clients.
1550 let mut server = TestServer::start().await;
1551 let client_a = server.create_client(&mut cx_a, "user_a").await;
1552 let client_b = server.create_client(&mut cx_b, "user_b").await;
1553
1554 // Share a project as client A
1555 fs.insert_tree(
1556 "/dir",
1557 json!({
1558 ".zed.toml": r#"collaborators = ["user_b"]"#,
1559 "a.txt": "a-contents",
1560 }),
1561 )
1562 .await;
1563 let project_a = cx_a.update(|cx| {
1564 Project::local(
1565 client_a.clone(),
1566 client_a.user_store.clone(),
1567 lang_registry.clone(),
1568 fs.clone(),
1569 cx,
1570 )
1571 });
1572 let worktree_a = project_a
1573 .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1574 .await
1575 .unwrap();
1576 worktree_a
1577 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1578 .await;
1579 let project_id = project_a
1580 .update(&mut cx_a, |project, _| project.next_remote_id())
1581 .await;
1582 project_a
1583 .update(&mut cx_a, |project, cx| project.share(cx))
1584 .await
1585 .unwrap();
1586
1587 // Join that project as client B
1588 let project_b = Project::remote(
1589 project_id,
1590 client_b.clone(),
1591 client_b.user_store.clone(),
1592 lang_registry.clone(),
1593 fs.clone(),
1594 &mut cx_b.to_async(),
1595 )
1596 .await
1597 .unwrap();
1598 let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1599
1600 // Open a buffer as client A
1601 let buffer_a = worktree_a
1602 .update(&mut cx_a, |tree, cx| tree.open_buffer("a.txt", cx))
1603 .await
1604 .unwrap();
1605
1606 // Start opening the same buffer as client B
1607 let buffer_b = cx_b
1608 .background()
1609 .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1610 task::yield_now().await;
1611
1612 // Edit the buffer as client A while client B is still opening it.
1613 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1614
1615 let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1616 let buffer_b = buffer_b.await.unwrap();
1617 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1618 }
1619
1620 #[gpui::test]
1621 async fn test_leaving_worktree_while_opening_buffer(
1622 mut cx_a: TestAppContext,
1623 mut cx_b: TestAppContext,
1624 ) {
1625 cx_a.foreground().forbid_parking();
1626 let lang_registry = Arc::new(LanguageRegistry::new());
1627 let fs = Arc::new(FakeFs::new());
1628
1629 // Connect to a server as 2 clients.
1630 let mut server = TestServer::start().await;
1631 let client_a = server.create_client(&mut cx_a, "user_a").await;
1632 let client_b = server.create_client(&mut cx_b, "user_b").await;
1633
1634 // Share a project as client A
1635 fs.insert_tree(
1636 "/dir",
1637 json!({
1638 ".zed.toml": r#"collaborators = ["user_b"]"#,
1639 "a.txt": "a-contents",
1640 }),
1641 )
1642 .await;
1643 let project_a = cx_a.update(|cx| {
1644 Project::local(
1645 client_a.clone(),
1646 client_a.user_store.clone(),
1647 lang_registry.clone(),
1648 fs.clone(),
1649 cx,
1650 )
1651 });
1652 let worktree_a = project_a
1653 .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1654 .await
1655 .unwrap();
1656 worktree_a
1657 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1658 .await;
1659 let project_id = project_a
1660 .update(&mut cx_a, |project, _| project.next_remote_id())
1661 .await;
1662 project_a
1663 .update(&mut cx_a, |project, cx| project.share(cx))
1664 .await
1665 .unwrap();
1666
1667 // Join that project as client B
1668 let project_b = Project::remote(
1669 project_id,
1670 client_b.clone(),
1671 client_b.user_store.clone(),
1672 lang_registry.clone(),
1673 fs.clone(),
1674 &mut cx_b.to_async(),
1675 )
1676 .await
1677 .unwrap();
1678 let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1679
1680 // See that a guest has joined as client A.
1681 project_a
1682 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1683 .await;
1684
1685 // Begin opening a buffer as client B, but leave the project before the open completes.
1686 let buffer_b = cx_b
1687 .background()
1688 .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1689 cx_b.update(|_| drop(project_b));
1690 drop(buffer_b);
1691
1692 // See that the guest has left.
1693 project_a
1694 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1695 .await;
1696 }
1697
1698 #[gpui::test]
1699 async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1700 cx_a.foreground().forbid_parking();
1701 let lang_registry = Arc::new(LanguageRegistry::new());
1702 let fs = Arc::new(FakeFs::new());
1703
1704 // Connect to a server as 2 clients.
1705 let mut server = TestServer::start().await;
1706 let client_a = server.create_client(&mut cx_a, "user_a").await;
1707 let client_b = server.create_client(&mut cx_b, "user_b").await;
1708
1709 // Share a project as client A
1710 fs.insert_tree(
1711 "/a",
1712 json!({
1713 ".zed.toml": r#"collaborators = ["user_b"]"#,
1714 "a.txt": "a-contents",
1715 "b.txt": "b-contents",
1716 }),
1717 )
1718 .await;
1719 let project_a = cx_a.update(|cx| {
1720 Project::local(
1721 client_a.clone(),
1722 client_a.user_store.clone(),
1723 lang_registry.clone(),
1724 fs.clone(),
1725 cx,
1726 )
1727 });
1728 let worktree_a = project_a
1729 .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1730 .await
1731 .unwrap();
1732 worktree_a
1733 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1734 .await;
1735 let project_id = project_a
1736 .update(&mut cx_a, |project, _| project.next_remote_id())
1737 .await;
1738 project_a
1739 .update(&mut cx_a, |project, cx| project.share(cx))
1740 .await
1741 .unwrap();
1742
1743 // Join that project as client B
1744 let _project_b = Project::remote(
1745 project_id,
1746 client_b.clone(),
1747 client_b.user_store.clone(),
1748 lang_registry.clone(),
1749 fs.clone(),
1750 &mut cx_b.to_async(),
1751 )
1752 .await
1753 .unwrap();
1754
1755 // See that a guest has joined as client A.
1756 project_a
1757 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1758 .await;
1759
1760 // Drop client B's connection and ensure client A observes client B leaving the worktree.
1761 client_b.disconnect(&cx_b.to_async()).await.unwrap();
1762 project_a
1763 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1764 .await;
1765 }
1766
1767 #[gpui::test]
1768 async fn test_collaborating_with_diagnostics(
1769 mut cx_a: TestAppContext,
1770 mut cx_b: TestAppContext,
1771 ) {
1772 cx_a.foreground().forbid_parking();
1773 let mut lang_registry = Arc::new(LanguageRegistry::new());
1774 let fs = Arc::new(FakeFs::new());
1775
1776 // Set up a fake language server.
1777 let (language_server_config, mut fake_language_server) =
1778 LanguageServerConfig::fake(cx_a.background()).await;
1779 Arc::get_mut(&mut lang_registry)
1780 .unwrap()
1781 .add(Arc::new(Language::new(
1782 LanguageConfig {
1783 name: "Rust".to_string(),
1784 path_suffixes: vec!["rs".to_string()],
1785 language_server: Some(language_server_config),
1786 ..Default::default()
1787 },
1788 Some(tree_sitter_rust::language()),
1789 )));
1790
1791 // Connect to a server as 2 clients.
1792 let mut server = TestServer::start().await;
1793 let client_a = server.create_client(&mut cx_a, "user_a").await;
1794 let client_b = server.create_client(&mut cx_b, "user_b").await;
1795
1796 // Share a project as client A
1797 fs.insert_tree(
1798 "/a",
1799 json!({
1800 ".zed.toml": r#"collaborators = ["user_b"]"#,
1801 "a.rs": "let one = two",
1802 "other.rs": "",
1803 }),
1804 )
1805 .await;
1806 let project_a = cx_a.update(|cx| {
1807 Project::local(
1808 client_a.clone(),
1809 client_a.user_store.clone(),
1810 lang_registry.clone(),
1811 fs.clone(),
1812 cx,
1813 )
1814 });
1815 let worktree_a = project_a
1816 .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1817 .await
1818 .unwrap();
1819 worktree_a
1820 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1821 .await;
1822 let project_id = project_a
1823 .update(&mut cx_a, |project, _| project.next_remote_id())
1824 .await;
1825 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1826 project_a
1827 .update(&mut cx_a, |project, cx| project.share(cx))
1828 .await
1829 .unwrap();
1830
1831 // Cause the language server to start.
1832 let _ = cx_a
1833 .background()
1834 .spawn(worktree_a.update(&mut cx_a, |worktree, cx| {
1835 worktree.open_buffer("other.rs", cx)
1836 }))
1837 .await
1838 .unwrap();
1839
1840 // Simulate a language server reporting errors for a file.
1841 fake_language_server
1842 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1843 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1844 version: None,
1845 diagnostics: vec![lsp::Diagnostic {
1846 severity: Some(lsp::DiagnosticSeverity::ERROR),
1847 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1848 message: "message 1".to_string(),
1849 ..Default::default()
1850 }],
1851 })
1852 .await;
1853
1854 // Wait for server to see the diagnostics update.
1855 server
1856 .condition(|store| {
1857 let worktree = store
1858 .project(project_id)
1859 .unwrap()
1860 .worktrees
1861 .get(&worktree_id.to_proto())
1862 .unwrap();
1863
1864 !worktree
1865 .share
1866 .as_ref()
1867 .unwrap()
1868 .diagnostic_summaries
1869 .is_empty()
1870 })
1871 .await;
1872
1873 // Join the worktree as client B.
1874 let project_b = Project::remote(
1875 project_id,
1876 client_b.clone(),
1877 client_b.user_store.clone(),
1878 lang_registry.clone(),
1879 fs.clone(),
1880 &mut cx_b.to_async(),
1881 )
1882 .await
1883 .unwrap();
1884
1885 project_b.read_with(&cx_b, |project, cx| {
1886 assert_eq!(
1887 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
1888 &[(
1889 ProjectPath {
1890 worktree_id,
1891 path: Arc::from(Path::new("a.rs")),
1892 },
1893 DiagnosticSummary {
1894 error_count: 1,
1895 warning_count: 0,
1896 ..Default::default()
1897 },
1898 )]
1899 )
1900 });
1901
1902 // Simulate a language server reporting more errors for a file.
1903 fake_language_server
1904 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1905 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1906 version: None,
1907 diagnostics: vec![
1908 lsp::Diagnostic {
1909 severity: Some(lsp::DiagnosticSeverity::ERROR),
1910 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1911 message: "message 1".to_string(),
1912 ..Default::default()
1913 },
1914 lsp::Diagnostic {
1915 severity: Some(lsp::DiagnosticSeverity::WARNING),
1916 range: lsp::Range::new(
1917 lsp::Position::new(0, 10),
1918 lsp::Position::new(0, 13),
1919 ),
1920 message: "message 2".to_string(),
1921 ..Default::default()
1922 },
1923 ],
1924 })
1925 .await;
1926
1927 // Client b gets the updated summaries
1928 project_b
1929 .condition(&cx_b, |project, cx| {
1930 project.diagnostic_summaries(cx).collect::<Vec<_>>()
1931 == &[(
1932 ProjectPath {
1933 worktree_id,
1934 path: Arc::from(Path::new("a.rs")),
1935 },
1936 DiagnosticSummary {
1937 error_count: 1,
1938 warning_count: 1,
1939 ..Default::default()
1940 },
1941 )]
1942 })
1943 .await;
1944
1945 // Open the file with the errors on client B. They should be present.
1946 let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1947 let buffer_b = cx_b
1948 .background()
1949 .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.rs", cx)))
1950 .await
1951 .unwrap();
1952
1953 buffer_b.read_with(&cx_b, |buffer, _| {
1954 assert_eq!(
1955 buffer
1956 .snapshot()
1957 .diagnostics_in_range::<_, Point>(0..buffer.len())
1958 .map(|entry| entry)
1959 .collect::<Vec<_>>(),
1960 &[
1961 DiagnosticEntry {
1962 range: Point::new(0, 4)..Point::new(0, 7),
1963 diagnostic: Diagnostic {
1964 group_id: 0,
1965 message: "message 1".to_string(),
1966 severity: lsp::DiagnosticSeverity::ERROR,
1967 is_primary: true,
1968 ..Default::default()
1969 }
1970 },
1971 DiagnosticEntry {
1972 range: Point::new(0, 10)..Point::new(0, 13),
1973 diagnostic: Diagnostic {
1974 group_id: 1,
1975 severity: lsp::DiagnosticSeverity::WARNING,
1976 message: "message 2".to_string(),
1977 is_primary: true,
1978 ..Default::default()
1979 }
1980 }
1981 ]
1982 );
1983 });
1984 }
1985
1986 #[gpui::test]
1987 async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1988 cx_a.foreground().forbid_parking();
1989
1990 // Connect to a server as 2 clients.
1991 let mut server = TestServer::start().await;
1992 let client_a = server.create_client(&mut cx_a, "user_a").await;
1993 let client_b = server.create_client(&mut cx_b, "user_b").await;
1994
1995 // Create an org that includes these 2 users.
1996 let db = &server.app_state.db;
1997 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
1998 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
1999 .await
2000 .unwrap();
2001 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2002 .await
2003 .unwrap();
2004
2005 // Create a channel that includes all the users.
2006 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2007 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2008 .await
2009 .unwrap();
2010 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2011 .await
2012 .unwrap();
2013 db.create_channel_message(
2014 channel_id,
2015 client_b.current_user_id(&cx_b),
2016 "hello A, it's B.",
2017 OffsetDateTime::now_utc(),
2018 1,
2019 )
2020 .await
2021 .unwrap();
2022
2023 let channels_a = cx_a
2024 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2025 channels_a
2026 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2027 .await;
2028 channels_a.read_with(&cx_a, |list, _| {
2029 assert_eq!(
2030 list.available_channels().unwrap(),
2031 &[ChannelDetails {
2032 id: channel_id.to_proto(),
2033 name: "test-channel".to_string()
2034 }]
2035 )
2036 });
2037 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2038 this.get_channel(channel_id.to_proto(), cx).unwrap()
2039 });
2040 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2041 channel_a
2042 .condition(&cx_a, |channel, _| {
2043 channel_messages(channel)
2044 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2045 })
2046 .await;
2047
2048 let channels_b = cx_b
2049 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2050 channels_b
2051 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2052 .await;
2053 channels_b.read_with(&cx_b, |list, _| {
2054 assert_eq!(
2055 list.available_channels().unwrap(),
2056 &[ChannelDetails {
2057 id: channel_id.to_proto(),
2058 name: "test-channel".to_string()
2059 }]
2060 )
2061 });
2062
2063 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2064 this.get_channel(channel_id.to_proto(), cx).unwrap()
2065 });
2066 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2067 channel_b
2068 .condition(&cx_b, |channel, _| {
2069 channel_messages(channel)
2070 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2071 })
2072 .await;
2073
2074 channel_a
2075 .update(&mut cx_a, |channel, cx| {
2076 channel
2077 .send_message("oh, hi B.".to_string(), cx)
2078 .unwrap()
2079 .detach();
2080 let task = channel.send_message("sup".to_string(), cx).unwrap();
2081 assert_eq!(
2082 channel_messages(channel),
2083 &[
2084 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2085 ("user_a".to_string(), "oh, hi B.".to_string(), true),
2086 ("user_a".to_string(), "sup".to_string(), true)
2087 ]
2088 );
2089 task
2090 })
2091 .await
2092 .unwrap();
2093
2094 channel_b
2095 .condition(&cx_b, |channel, _| {
2096 channel_messages(channel)
2097 == [
2098 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2099 ("user_a".to_string(), "oh, hi B.".to_string(), false),
2100 ("user_a".to_string(), "sup".to_string(), false),
2101 ]
2102 })
2103 .await;
2104
2105 assert_eq!(
2106 server
2107 .state()
2108 .await
2109 .channel(channel_id)
2110 .unwrap()
2111 .connection_ids
2112 .len(),
2113 2
2114 );
2115 cx_b.update(|_| drop(channel_b));
2116 server
2117 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
2118 .await;
2119
2120 cx_a.update(|_| drop(channel_a));
2121 server
2122 .condition(|state| state.channel(channel_id).is_none())
2123 .await;
2124 }
2125
2126 #[gpui::test]
2127 async fn test_chat_message_validation(mut cx_a: TestAppContext) {
2128 cx_a.foreground().forbid_parking();
2129
2130 let mut server = TestServer::start().await;
2131 let client_a = server.create_client(&mut cx_a, "user_a").await;
2132
2133 let db = &server.app_state.db;
2134 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2135 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2136 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2137 .await
2138 .unwrap();
2139 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2140 .await
2141 .unwrap();
2142
2143 let channels_a = cx_a
2144 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2145 channels_a
2146 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2147 .await;
2148 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2149 this.get_channel(channel_id.to_proto(), cx).unwrap()
2150 });
2151
2152 // Messages aren't allowed to be too long.
2153 channel_a
2154 .update(&mut cx_a, |channel, cx| {
2155 let long_body = "this is long.\n".repeat(1024);
2156 channel.send_message(long_body, cx).unwrap()
2157 })
2158 .await
2159 .unwrap_err();
2160
2161 // Messages aren't allowed to be blank.
2162 channel_a.update(&mut cx_a, |channel, cx| {
2163 channel.send_message(String::new(), cx).unwrap_err()
2164 });
2165
2166 // Leading and trailing whitespace are trimmed.
2167 channel_a
2168 .update(&mut cx_a, |channel, cx| {
2169 channel
2170 .send_message("\n surrounded by whitespace \n".to_string(), cx)
2171 .unwrap()
2172 })
2173 .await
2174 .unwrap();
2175 assert_eq!(
2176 db.get_channel_messages(channel_id, 10, None)
2177 .await
2178 .unwrap()
2179 .iter()
2180 .map(|m| &m.body)
2181 .collect::<Vec<_>>(),
2182 &["surrounded by whitespace"]
2183 );
2184 }
2185
2186 #[gpui::test]
2187 async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2188 cx_a.foreground().forbid_parking();
2189
2190 // Connect to a server as 2 clients.
2191 let mut server = TestServer::start().await;
2192 let client_a = server.create_client(&mut cx_a, "user_a").await;
2193 let client_b = server.create_client(&mut cx_b, "user_b").await;
2194 let mut status_b = client_b.status();
2195
2196 // Create an org that includes these 2 users.
2197 let db = &server.app_state.db;
2198 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2199 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2200 .await
2201 .unwrap();
2202 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2203 .await
2204 .unwrap();
2205
2206 // Create a channel that includes all the users.
2207 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2208 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2209 .await
2210 .unwrap();
2211 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2212 .await
2213 .unwrap();
2214 db.create_channel_message(
2215 channel_id,
2216 client_b.current_user_id(&cx_b),
2217 "hello A, it's B.",
2218 OffsetDateTime::now_utc(),
2219 2,
2220 )
2221 .await
2222 .unwrap();
2223
2224 let channels_a = cx_a
2225 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2226 channels_a
2227 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2228 .await;
2229
2230 channels_a.read_with(&cx_a, |list, _| {
2231 assert_eq!(
2232 list.available_channels().unwrap(),
2233 &[ChannelDetails {
2234 id: channel_id.to_proto(),
2235 name: "test-channel".to_string()
2236 }]
2237 )
2238 });
2239 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2240 this.get_channel(channel_id.to_proto(), cx).unwrap()
2241 });
2242 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2243 channel_a
2244 .condition(&cx_a, |channel, _| {
2245 channel_messages(channel)
2246 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2247 })
2248 .await;
2249
2250 let channels_b = cx_b
2251 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2252 channels_b
2253 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2254 .await;
2255 channels_b.read_with(&cx_b, |list, _| {
2256 assert_eq!(
2257 list.available_channels().unwrap(),
2258 &[ChannelDetails {
2259 id: channel_id.to_proto(),
2260 name: "test-channel".to_string()
2261 }]
2262 )
2263 });
2264
2265 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2266 this.get_channel(channel_id.to_proto(), cx).unwrap()
2267 });
2268 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2269 channel_b
2270 .condition(&cx_b, |channel, _| {
2271 channel_messages(channel)
2272 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2273 })
2274 .await;
2275
2276 // Disconnect client B, ensuring we can still access its cached channel data.
2277 server.forbid_connections();
2278 server.disconnect_client(client_b.current_user_id(&cx_b));
2279 while !matches!(
2280 status_b.recv().await,
2281 Some(client::Status::ReconnectionError { .. })
2282 ) {}
2283
2284 channels_b.read_with(&cx_b, |channels, _| {
2285 assert_eq!(
2286 channels.available_channels().unwrap(),
2287 [ChannelDetails {
2288 id: channel_id.to_proto(),
2289 name: "test-channel".to_string()
2290 }]
2291 )
2292 });
2293 channel_b.read_with(&cx_b, |channel, _| {
2294 assert_eq!(
2295 channel_messages(channel),
2296 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2297 )
2298 });
2299
2300 // Send a message from client B while it is disconnected.
2301 channel_b
2302 .update(&mut cx_b, |channel, cx| {
2303 let task = channel
2304 .send_message("can you see this?".to_string(), cx)
2305 .unwrap();
2306 assert_eq!(
2307 channel_messages(channel),
2308 &[
2309 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2310 ("user_b".to_string(), "can you see this?".to_string(), true)
2311 ]
2312 );
2313 task
2314 })
2315 .await
2316 .unwrap_err();
2317
2318 // Send a message from client A while B is disconnected.
2319 channel_a
2320 .update(&mut cx_a, |channel, cx| {
2321 channel
2322 .send_message("oh, hi B.".to_string(), cx)
2323 .unwrap()
2324 .detach();
2325 let task = channel.send_message("sup".to_string(), cx).unwrap();
2326 assert_eq!(
2327 channel_messages(channel),
2328 &[
2329 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2330 ("user_a".to_string(), "oh, hi B.".to_string(), true),
2331 ("user_a".to_string(), "sup".to_string(), true)
2332 ]
2333 );
2334 task
2335 })
2336 .await
2337 .unwrap();
2338
2339 // Give client B a chance to reconnect.
2340 server.allow_connections();
2341 cx_b.foreground().advance_clock(Duration::from_secs(10));
2342
2343 // Verify that B sees the new messages upon reconnection, as well as the message client B
2344 // sent while offline.
2345 channel_b
2346 .condition(&cx_b, |channel, _| {
2347 channel_messages(channel)
2348 == [
2349 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2350 ("user_a".to_string(), "oh, hi B.".to_string(), false),
2351 ("user_a".to_string(), "sup".to_string(), false),
2352 ("user_b".to_string(), "can you see this?".to_string(), false),
2353 ]
2354 })
2355 .await;
2356
2357 // Ensure client A and B can communicate normally after reconnection.
2358 channel_a
2359 .update(&mut cx_a, |channel, cx| {
2360 channel.send_message("you online?".to_string(), cx).unwrap()
2361 })
2362 .await
2363 .unwrap();
2364 channel_b
2365 .condition(&cx_b, |channel, _| {
2366 channel_messages(channel)
2367 == [
2368 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2369 ("user_a".to_string(), "oh, hi B.".to_string(), false),
2370 ("user_a".to_string(), "sup".to_string(), false),
2371 ("user_b".to_string(), "can you see this?".to_string(), false),
2372 ("user_a".to_string(), "you online?".to_string(), false),
2373 ]
2374 })
2375 .await;
2376
2377 channel_b
2378 .update(&mut cx_b, |channel, cx| {
2379 channel.send_message("yep".to_string(), cx).unwrap()
2380 })
2381 .await
2382 .unwrap();
2383 channel_a
2384 .condition(&cx_a, |channel, _| {
2385 channel_messages(channel)
2386 == [
2387 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2388 ("user_a".to_string(), "oh, hi B.".to_string(), false),
2389 ("user_a".to_string(), "sup".to_string(), false),
2390 ("user_b".to_string(), "can you see this?".to_string(), false),
2391 ("user_a".to_string(), "you online?".to_string(), false),
2392 ("user_b".to_string(), "yep".to_string(), false),
2393 ]
2394 })
2395 .await;
2396 }
2397
2398 #[gpui::test]
2399 async fn test_contacts(
2400 mut cx_a: TestAppContext,
2401 mut cx_b: TestAppContext,
2402 mut cx_c: TestAppContext,
2403 ) {
2404 cx_a.foreground().forbid_parking();
2405 let lang_registry = Arc::new(LanguageRegistry::new());
2406 let fs = Arc::new(FakeFs::new());
2407
2408 // Connect to a server as 3 clients.
2409 let mut server = TestServer::start().await;
2410 let client_a = server.create_client(&mut cx_a, "user_a").await;
2411 let client_b = server.create_client(&mut cx_b, "user_b").await;
2412 let client_c = server.create_client(&mut cx_c, "user_c").await;
2413
2414 // Share a worktree as client A.
2415 fs.insert_tree(
2416 "/a",
2417 json!({
2418 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2419 }),
2420 )
2421 .await;
2422
2423 let project_a = cx_a.update(|cx| {
2424 Project::local(
2425 client_a.clone(),
2426 client_a.user_store.clone(),
2427 lang_registry.clone(),
2428 fs.clone(),
2429 cx,
2430 )
2431 });
2432 let worktree_a = project_a
2433 .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
2434 .await
2435 .unwrap();
2436 worktree_a
2437 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2438 .await;
2439
2440 client_a
2441 .user_store
2442 .condition(&cx_a, |user_store, _| {
2443 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2444 })
2445 .await;
2446 client_b
2447 .user_store
2448 .condition(&cx_b, |user_store, _| {
2449 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2450 })
2451 .await;
2452 client_c
2453 .user_store
2454 .condition(&cx_c, |user_store, _| {
2455 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2456 })
2457 .await;
2458
2459 let project_id = project_a
2460 .update(&mut cx_a, |project, _| project.next_remote_id())
2461 .await;
2462 project_a
2463 .update(&mut cx_a, |project, cx| project.share(cx))
2464 .await
2465 .unwrap();
2466
2467 let _project_b = Project::remote(
2468 project_id,
2469 client_b.clone(),
2470 client_b.user_store.clone(),
2471 lang_registry.clone(),
2472 fs.clone(),
2473 &mut cx_b.to_async(),
2474 )
2475 .await
2476 .unwrap();
2477
2478 client_a
2479 .user_store
2480 .condition(&cx_a, |user_store, _| {
2481 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2482 })
2483 .await;
2484 client_b
2485 .user_store
2486 .condition(&cx_b, |user_store, _| {
2487 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2488 })
2489 .await;
2490 client_c
2491 .user_store
2492 .condition(&cx_c, |user_store, _| {
2493 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2494 })
2495 .await;
2496
2497 project_a
2498 .condition(&cx_a, |project, _| {
2499 project.collaborators().contains_key(&client_b.peer_id)
2500 })
2501 .await;
2502
2503 cx_a.update(move |_| drop(project_a));
2504 client_a
2505 .user_store
2506 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
2507 .await;
2508 client_b
2509 .user_store
2510 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
2511 .await;
2512 client_c
2513 .user_store
2514 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
2515 .await;
2516
2517 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
2518 user_store
2519 .contacts()
2520 .iter()
2521 .map(|contact| {
2522 let worktrees = contact
2523 .projects
2524 .iter()
2525 .map(|p| {
2526 (
2527 p.worktree_root_names[0].as_str(),
2528 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
2529 )
2530 })
2531 .collect();
2532 (contact.user.github_login.as_str(), worktrees)
2533 })
2534 .collect()
2535 }
2536 }
2537
2538 struct TestServer {
2539 peer: Arc<Peer>,
2540 app_state: Arc<AppState>,
2541 server: Arc<Server>,
2542 notifications: mpsc::Receiver<()>,
2543 connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
2544 forbid_connections: Arc<AtomicBool>,
2545 _test_db: TestDb,
2546 }
2547
2548 impl TestServer {
2549 async fn start() -> Self {
2550 let test_db = TestDb::new();
2551 let app_state = Self::build_app_state(&test_db).await;
2552 let peer = Peer::new();
2553 let notifications = mpsc::channel(128);
2554 let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
2555 Self {
2556 peer,
2557 app_state,
2558 server,
2559 notifications: notifications.1,
2560 connection_killers: Default::default(),
2561 forbid_connections: Default::default(),
2562 _test_db: test_db,
2563 }
2564 }
2565
2566 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
2567 let http = FakeHttpClient::with_404_response();
2568 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
2569 let client_name = name.to_string();
2570 let mut client = Client::new(http.clone());
2571 let server = self.server.clone();
2572 let connection_killers = self.connection_killers.clone();
2573 let forbid_connections = self.forbid_connections.clone();
2574 let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
2575
2576 Arc::get_mut(&mut client)
2577 .unwrap()
2578 .override_authenticate(move |cx| {
2579 cx.spawn(|_| async move {
2580 let access_token = "the-token".to_string();
2581 Ok(Credentials {
2582 user_id: user_id.0 as u64,
2583 access_token,
2584 })
2585 })
2586 })
2587 .override_establish_connection(move |credentials, cx| {
2588 assert_eq!(credentials.user_id, user_id.0 as u64);
2589 assert_eq!(credentials.access_token, "the-token");
2590
2591 let server = server.clone();
2592 let connection_killers = connection_killers.clone();
2593 let forbid_connections = forbid_connections.clone();
2594 let client_name = client_name.clone();
2595 let connection_id_tx = connection_id_tx.clone();
2596 cx.spawn(move |cx| async move {
2597 if forbid_connections.load(SeqCst) {
2598 Err(EstablishConnectionError::other(anyhow!(
2599 "server is forbidding connections"
2600 )))
2601 } else {
2602 let (client_conn, server_conn, kill_conn) = Connection::in_memory();
2603 connection_killers.lock().insert(user_id, kill_conn);
2604 cx.background()
2605 .spawn(server.handle_connection(
2606 server_conn,
2607 client_name,
2608 user_id,
2609 Some(connection_id_tx),
2610 ))
2611 .detach();
2612 Ok(client_conn)
2613 }
2614 })
2615 });
2616
2617 client
2618 .authenticate_and_connect(&cx.to_async())
2619 .await
2620 .unwrap();
2621
2622 let peer_id = PeerId(connection_id_rx.recv().await.unwrap().0);
2623 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
2624 let mut authed_user =
2625 user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
2626 while authed_user.recv().await.unwrap().is_none() {}
2627
2628 TestClient {
2629 client,
2630 peer_id,
2631 user_store,
2632 }
2633 }
2634
2635 fn disconnect_client(&self, user_id: UserId) {
2636 if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
2637 let _ = kill_conn.try_send(Some(()));
2638 }
2639 }
2640
2641 fn forbid_connections(&self) {
2642 self.forbid_connections.store(true, SeqCst);
2643 }
2644
2645 fn allow_connections(&self) {
2646 self.forbid_connections.store(false, SeqCst);
2647 }
2648
2649 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
2650 let mut config = Config::default();
2651 config.session_secret = "a".repeat(32);
2652 config.database_url = test_db.url.clone();
2653 let github_client = github::AppClient::test();
2654 Arc::new(AppState {
2655 db: test_db.db().clone(),
2656 handlebars: Default::default(),
2657 auth_client: auth::build_client("", ""),
2658 repo_client: github::RepoClient::test(&github_client),
2659 github_client,
2660 config,
2661 })
2662 }
2663
2664 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
2665 self.server.store.read()
2666 }
2667
2668 async fn condition<F>(&mut self, mut predicate: F)
2669 where
2670 F: FnMut(&Store) -> bool,
2671 {
2672 async_std::future::timeout(Duration::from_millis(500), async {
2673 while !(predicate)(&*self.server.store.read()) {
2674 self.notifications.recv().await;
2675 }
2676 })
2677 .await
2678 .expect("condition timed out");
2679 }
2680 }
2681
2682 impl Drop for TestServer {
2683 fn drop(&mut self) {
2684 task::block_on(self.peer.reset());
2685 }
2686 }
2687
2688 struct TestClient {
2689 client: Arc<Client>,
2690 pub peer_id: PeerId,
2691 pub user_store: ModelHandle<UserStore>,
2692 }
2693
2694 impl Deref for TestClient {
2695 type Target = Arc<Client>;
2696
2697 fn deref(&self) -> &Self::Target {
2698 &self.client
2699 }
2700 }
2701
2702 impl TestClient {
2703 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
2704 UserId::from_proto(
2705 self.user_store
2706 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
2707 )
2708 }
2709 }
2710
2711 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
2712 channel
2713 .messages()
2714 .cursor::<()>()
2715 .map(|m| {
2716 (
2717 m.sender.github_login.clone(),
2718 m.body.clone(),
2719 m.is_pending(),
2720 )
2721 })
2722 .collect()
2723 }
2724
2725 struct EmptyView;
2726
2727 impl gpui::Entity for EmptyView {
2728 type Event = ();
2729 }
2730
2731 impl gpui::View for EmptyView {
2732 fn ui_name() -> &'static str {
2733 "empty view"
2734 }
2735
2736 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
2737 gpui::Element::boxed(gpui::elements::Empty)
2738 }
2739 }
2740}