1mod store;
2
3use super::{
4 auth::process_auth_header,
5 db::{ChannelId, MessageId, UserId},
6 AppState,
7};
8use anyhow::anyhow;
9use async_std::task;
10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
11use collections::{HashMap, HashSet};
12use futures::{future::BoxFuture, FutureExt};
13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
14use postage::{mpsc, prelude::Sink as _, prelude::Stream as _};
15use rpc::{
16 proto::{self, AnyTypedEnvelope, EnvelopedMessage},
17 Connection, ConnectionId, Peer, TypedEnvelope,
18};
19use sha1::{Digest as _, Sha1};
20use std::{any::TypeId, future::Future, mem, path::PathBuf, sync::Arc, time::Instant};
21use store::{Store, Worktree};
22use surf::StatusCode;
23use tide::log;
24use tide::{
25 http::headers::{HeaderName, CONNECTION, UPGRADE},
26 Request, Response,
27};
28use time::OffsetDateTime;
29
30type MessageHandler = Box<
31 dyn Send
32 + Sync
33 + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
34>;
35
36pub struct Server {
37 peer: Arc<Peer>,
38 store: RwLock<Store>,
39 app_state: Arc<AppState>,
40 handlers: HashMap<TypeId, MessageHandler>,
41 notifications: Option<mpsc::Sender<()>>,
42}
43
44const MESSAGE_COUNT_PER_PAGE: usize = 100;
45const MAX_MESSAGE_LEN: usize = 1024;
46const NO_SUCH_PROJECT: &'static str = "no such project";
47
48impl Server {
49 pub fn new(
50 app_state: Arc<AppState>,
51 peer: Arc<Peer>,
52 notifications: Option<mpsc::Sender<()>>,
53 ) -> Arc<Self> {
54 let mut server = Self {
55 peer,
56 app_state,
57 store: Default::default(),
58 handlers: Default::default(),
59 notifications,
60 };
61
62 server
63 .add_handler(Server::ping)
64 .add_handler(Server::register_project)
65 .add_handler(Server::unregister_project)
66 .add_handler(Server::share_project)
67 .add_handler(Server::unshare_project)
68 .add_handler(Server::join_project)
69 .add_handler(Server::leave_project)
70 .add_handler(Server::register_worktree)
71 .add_handler(Server::unregister_worktree)
72 .add_handler(Server::share_worktree)
73 .add_handler(Server::update_worktree)
74 .add_handler(Server::update_diagnostic_summary)
75 .add_handler(Server::disk_based_diagnostics_updating)
76 .add_handler(Server::disk_based_diagnostics_updated)
77 .add_handler(Server::open_buffer)
78 .add_handler(Server::close_buffer)
79 .add_handler(Server::update_buffer)
80 .add_handler(Server::buffer_saved)
81 .add_handler(Server::save_buffer)
82 .add_handler(Server::get_channels)
83 .add_handler(Server::get_users)
84 .add_handler(Server::join_channel)
85 .add_handler(Server::leave_channel)
86 .add_handler(Server::send_channel_message)
87 .add_handler(Server::get_channel_messages);
88
89 Arc::new(server)
90 }
91
92 fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
93 where
94 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
95 Fut: 'static + Send + Future<Output = tide::Result<()>>,
96 M: EnvelopedMessage,
97 {
98 let prev_handler = self.handlers.insert(
99 TypeId::of::<M>(),
100 Box::new(move |server, envelope| {
101 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
102 (handler)(server, *envelope).boxed()
103 }),
104 );
105 if prev_handler.is_some() {
106 panic!("registered a handler for the same message twice");
107 }
108 self
109 }
110
111 pub fn handle_connection(
112 self: &Arc<Self>,
113 connection: Connection,
114 addr: String,
115 user_id: UserId,
116 mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
117 ) -> impl Future<Output = ()> {
118 let mut this = self.clone();
119 async move {
120 let (connection_id, handle_io, mut incoming_rx) =
121 this.peer.add_connection(connection).await;
122
123 if let Some(send_connection_id) = send_connection_id.as_mut() {
124 let _ = send_connection_id.send(connection_id).await;
125 }
126
127 this.state_mut().add_connection(connection_id, user_id);
128 if let Err(err) = this.update_contacts_for_users(&[user_id]).await {
129 log::error!("error updating contacts for {:?}: {}", user_id, err);
130 }
131
132 let handle_io = handle_io.fuse();
133 futures::pin_mut!(handle_io);
134 loop {
135 let next_message = incoming_rx.recv().fuse();
136 futures::pin_mut!(next_message);
137 futures::select_biased! {
138 message = next_message => {
139 if let Some(message) = message {
140 let start_time = Instant::now();
141 log::info!("RPC message received: {}", message.payload_type_name());
142 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
143 if let Err(err) = (handler)(this.clone(), message).await {
144 log::error!("error handling message: {:?}", err);
145 } else {
146 log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
147 }
148
149 if let Some(mut notifications) = this.notifications.clone() {
150 let _ = notifications.send(()).await;
151 }
152 } else {
153 log::warn!("unhandled message: {}", message.payload_type_name());
154 }
155 } else {
156 log::info!("rpc connection closed {:?}", addr);
157 break;
158 }
159 }
160 handle_io = handle_io => {
161 if let Err(err) = handle_io {
162 log::error!("error handling rpc connection {:?} - {:?}", addr, err);
163 }
164 break;
165 }
166 }
167 }
168
169 if let Err(err) = this.sign_out(connection_id).await {
170 log::error!("error signing out connection {:?} - {:?}", addr, err);
171 }
172 }
173 }
174
175 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
176 self.peer.disconnect(connection_id).await;
177 let removed_connection = self.state_mut().remove_connection(connection_id)?;
178
179 for (project_id, project) in removed_connection.hosted_projects {
180 if let Some(share) = project.share {
181 broadcast(
182 connection_id,
183 share.guests.keys().copied().collect(),
184 |conn_id| {
185 self.peer
186 .send(conn_id, proto::UnshareProject { project_id })
187 },
188 )
189 .await?;
190 }
191 }
192
193 for (project_id, peer_ids) in removed_connection.guest_project_ids {
194 broadcast(connection_id, peer_ids, |conn_id| {
195 self.peer.send(
196 conn_id,
197 proto::RemoveProjectCollaborator {
198 project_id,
199 peer_id: connection_id.0,
200 },
201 )
202 })
203 .await?;
204 }
205
206 self.update_contacts_for_users(removed_connection.contact_ids.iter())
207 .await?;
208
209 Ok(())
210 }
211
212 async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
213 self.peer.respond(request.receipt(), proto::Ack {}).await?;
214 Ok(())
215 }
216
217 async fn register_project(
218 mut self: Arc<Server>,
219 request: TypedEnvelope<proto::RegisterProject>,
220 ) -> tide::Result<()> {
221 let project_id = {
222 let mut state = self.state_mut();
223 let user_id = state.user_id_for_connection(request.sender_id)?;
224 state.register_project(request.sender_id, user_id)
225 };
226 self.peer
227 .respond(
228 request.receipt(),
229 proto::RegisterProjectResponse { project_id },
230 )
231 .await?;
232 Ok(())
233 }
234
235 async fn unregister_project(
236 mut self: Arc<Server>,
237 request: TypedEnvelope<proto::UnregisterProject>,
238 ) -> tide::Result<()> {
239 let project = self
240 .state_mut()
241 .unregister_project(request.payload.project_id, request.sender_id)
242 .ok_or_else(|| anyhow!("no such project"))?;
243 self.update_contacts_for_users(project.authorized_user_ids().iter())
244 .await?;
245 Ok(())
246 }
247
248 async fn share_project(
249 mut self: Arc<Server>,
250 request: TypedEnvelope<proto::ShareProject>,
251 ) -> tide::Result<()> {
252 self.state_mut()
253 .share_project(request.payload.project_id, request.sender_id);
254 self.peer.respond(request.receipt(), proto::Ack {}).await?;
255 Ok(())
256 }
257
258 async fn unshare_project(
259 mut self: Arc<Server>,
260 request: TypedEnvelope<proto::UnshareProject>,
261 ) -> tide::Result<()> {
262 let project_id = request.payload.project_id;
263 let project = self
264 .state_mut()
265 .unshare_project(project_id, request.sender_id)?;
266
267 broadcast(request.sender_id, project.connection_ids, |conn_id| {
268 self.peer
269 .send(conn_id, proto::UnshareProject { project_id })
270 })
271 .await?;
272 self.update_contacts_for_users(&project.authorized_user_ids)
273 .await?;
274
275 Ok(())
276 }
277
278 async fn join_project(
279 mut self: Arc<Server>,
280 request: TypedEnvelope<proto::JoinProject>,
281 ) -> tide::Result<()> {
282 let project_id = request.payload.project_id;
283
284 let user_id = self.state().user_id_for_connection(request.sender_id)?;
285 let response_data = self
286 .state_mut()
287 .join_project(request.sender_id, user_id, project_id)
288 .and_then(|joined| {
289 let share = joined.project.share()?;
290 let peer_count = share.guests.len();
291 let mut collaborators = Vec::with_capacity(peer_count);
292 collaborators.push(proto::Collaborator {
293 peer_id: joined.project.host_connection_id.0,
294 replica_id: 0,
295 user_id: joined.project.host_user_id.to_proto(),
296 });
297 let worktrees = joined
298 .project
299 .worktrees
300 .iter()
301 .filter_map(|(id, worktree)| {
302 worktree.share.as_ref().map(|share| proto::Worktree {
303 id: *id,
304 root_name: worktree.root_name.clone(),
305 entries: share.entries.values().cloned().collect(),
306 diagnostic_summaries: share
307 .diagnostic_summaries
308 .values()
309 .cloned()
310 .collect(),
311 })
312 })
313 .collect();
314 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
315 if *peer_conn_id != request.sender_id {
316 collaborators.push(proto::Collaborator {
317 peer_id: peer_conn_id.0,
318 replica_id: *peer_replica_id as u32,
319 user_id: peer_user_id.to_proto(),
320 });
321 }
322 }
323 let response = proto::JoinProjectResponse {
324 worktrees,
325 replica_id: joined.replica_id as u32,
326 collaborators,
327 };
328 let connection_ids = joined.project.connection_ids();
329 let contact_user_ids = joined.project.authorized_user_ids();
330 Ok((response, connection_ids, contact_user_ids))
331 });
332
333 match response_data {
334 Ok((response, connection_ids, contact_user_ids)) => {
335 broadcast(request.sender_id, connection_ids, |conn_id| {
336 self.peer.send(
337 conn_id,
338 proto::AddProjectCollaborator {
339 project_id: project_id,
340 collaborator: Some(proto::Collaborator {
341 peer_id: request.sender_id.0,
342 replica_id: response.replica_id,
343 user_id: user_id.to_proto(),
344 }),
345 },
346 )
347 })
348 .await?;
349 self.peer.respond(request.receipt(), response).await?;
350 self.update_contacts_for_users(&contact_user_ids).await?;
351 }
352 Err(error) => {
353 self.peer
354 .respond_with_error(
355 request.receipt(),
356 proto::Error {
357 message: error.to_string(),
358 },
359 )
360 .await?;
361 }
362 }
363
364 Ok(())
365 }
366
367 async fn leave_project(
368 mut self: Arc<Server>,
369 request: TypedEnvelope<proto::LeaveProject>,
370 ) -> tide::Result<()> {
371 let sender_id = request.sender_id;
372 let project_id = request.payload.project_id;
373 let worktree = self.state_mut().leave_project(sender_id, project_id);
374 if let Some(worktree) = worktree {
375 broadcast(sender_id, worktree.connection_ids, |conn_id| {
376 self.peer.send(
377 conn_id,
378 proto::RemoveProjectCollaborator {
379 project_id,
380 peer_id: sender_id.0,
381 },
382 )
383 })
384 .await?;
385 self.update_contacts_for_users(&worktree.authorized_user_ids)
386 .await?;
387 }
388 Ok(())
389 }
390
391 async fn register_worktree(
392 mut self: Arc<Server>,
393 request: TypedEnvelope<proto::RegisterWorktree>,
394 ) -> tide::Result<()> {
395 let receipt = request.receipt();
396 let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
397
398 let mut contact_user_ids = HashSet::default();
399 contact_user_ids.insert(host_user_id);
400 for github_login in request.payload.authorized_logins {
401 match self.app_state.db.create_user(&github_login, false).await {
402 Ok(contact_user_id) => {
403 contact_user_ids.insert(contact_user_id);
404 }
405 Err(err) => {
406 let message = err.to_string();
407 self.peer
408 .respond_with_error(receipt, proto::Error { message })
409 .await?;
410 return Ok(());
411 }
412 }
413 }
414
415 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
416 let ok = self.state_mut().register_worktree(
417 request.payload.project_id,
418 request.payload.worktree_id,
419 Worktree {
420 authorized_user_ids: contact_user_ids.clone(),
421 root_name: request.payload.root_name,
422 share: None,
423 },
424 );
425
426 if ok {
427 self.peer.respond(receipt, proto::Ack {}).await?;
428 self.update_contacts_for_users(&contact_user_ids).await?;
429 } else {
430 self.peer
431 .respond_with_error(
432 receipt,
433 proto::Error {
434 message: NO_SUCH_PROJECT.to_string(),
435 },
436 )
437 .await?;
438 }
439
440 Ok(())
441 }
442
443 async fn unregister_worktree(
444 mut self: Arc<Server>,
445 request: TypedEnvelope<proto::UnregisterWorktree>,
446 ) -> tide::Result<()> {
447 let project_id = request.payload.project_id;
448 let worktree_id = request.payload.worktree_id;
449 let (worktree, guest_connection_ids) =
450 self.state_mut()
451 .unregister_worktree(project_id, worktree_id, request.sender_id)?;
452
453 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
454 self.peer.send(
455 conn_id,
456 proto::UnregisterWorktree {
457 project_id,
458 worktree_id,
459 },
460 )
461 })
462 .await?;
463 self.update_contacts_for_users(&worktree.authorized_user_ids)
464 .await?;
465 Ok(())
466 }
467
468 async fn share_worktree(
469 mut self: Arc<Server>,
470 mut request: TypedEnvelope<proto::ShareWorktree>,
471 ) -> tide::Result<()> {
472 let worktree = request
473 .payload
474 .worktree
475 .as_mut()
476 .ok_or_else(|| anyhow!("missing worktree"))?;
477 let entries = mem::take(&mut worktree.entries)
478 .into_iter()
479 .map(|entry| (entry.id, entry))
480 .collect();
481
482 let diagnostic_summaries = mem::take(&mut worktree.diagnostic_summaries)
483 .into_iter()
484 .map(|summary| (PathBuf::from(summary.path.clone()), summary))
485 .collect();
486
487 let contact_user_ids = self.state_mut().share_worktree(
488 request.payload.project_id,
489 worktree.id,
490 request.sender_id,
491 entries,
492 diagnostic_summaries,
493 );
494 if let Some(contact_user_ids) = contact_user_ids {
495 self.peer.respond(request.receipt(), proto::Ack {}).await?;
496 self.update_contacts_for_users(&contact_user_ids).await?;
497 } else {
498 self.peer
499 .respond_with_error(
500 request.receipt(),
501 proto::Error {
502 message: "no such worktree".to_string(),
503 },
504 )
505 .await?;
506 }
507 Ok(())
508 }
509
510 async fn update_worktree(
511 mut self: Arc<Server>,
512 request: TypedEnvelope<proto::UpdateWorktree>,
513 ) -> tide::Result<()> {
514 let connection_ids = self
515 .state_mut()
516 .update_worktree(
517 request.sender_id,
518 request.payload.project_id,
519 request.payload.worktree_id,
520 &request.payload.removed_entries,
521 &request.payload.updated_entries,
522 )
523 .ok_or_else(|| anyhow!("no such worktree"))?;
524
525 broadcast(request.sender_id, connection_ids, |connection_id| {
526 self.peer
527 .forward_send(request.sender_id, connection_id, request.payload.clone())
528 })
529 .await?;
530
531 Ok(())
532 }
533
534 async fn update_diagnostic_summary(
535 mut self: Arc<Server>,
536 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
537 ) -> tide::Result<()> {
538 let receiver_ids = request
539 .payload
540 .summary
541 .clone()
542 .and_then(|summary| {
543 self.state_mut().update_diagnostic_summary(
544 request.payload.project_id,
545 request.payload.worktree_id,
546 request.sender_id,
547 summary,
548 )
549 })
550 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
551
552 broadcast(request.sender_id, receiver_ids, |connection_id| {
553 self.peer
554 .forward_send(request.sender_id, connection_id, request.payload.clone())
555 })
556 .await?;
557 Ok(())
558 }
559
560 async fn disk_based_diagnostics_updating(
561 self: Arc<Server>,
562 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
563 ) -> tide::Result<()> {
564 let receiver_ids = self
565 .state()
566 .project_connection_ids(request.payload.project_id, request.sender_id)
567 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
568 broadcast(request.sender_id, receiver_ids, |connection_id| {
569 self.peer
570 .forward_send(request.sender_id, connection_id, request.payload.clone())
571 })
572 .await?;
573 Ok(())
574 }
575
576 async fn disk_based_diagnostics_updated(
577 self: Arc<Server>,
578 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
579 ) -> tide::Result<()> {
580 let receiver_ids = self
581 .state()
582 .project_connection_ids(request.payload.project_id, request.sender_id)
583 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
584 broadcast(request.sender_id, receiver_ids, |connection_id| {
585 self.peer
586 .forward_send(request.sender_id, connection_id, request.payload.clone())
587 })
588 .await?;
589 Ok(())
590 }
591
592 async fn open_buffer(
593 self: Arc<Server>,
594 request: TypedEnvelope<proto::OpenBuffer>,
595 ) -> tide::Result<()> {
596 let receipt = request.receipt();
597 let host_connection_id = self
598 .state()
599 .read_project(request.payload.project_id, request.sender_id)
600 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
601 .host_connection_id;
602 let response = self
603 .peer
604 .forward_request(request.sender_id, host_connection_id, request.payload)
605 .await?;
606 self.peer.respond(receipt, response).await?;
607 Ok(())
608 }
609
610 async fn close_buffer(
611 self: Arc<Server>,
612 request: TypedEnvelope<proto::CloseBuffer>,
613 ) -> tide::Result<()> {
614 let host_connection_id = self
615 .state()
616 .read_project(request.payload.project_id, request.sender_id)
617 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
618 .host_connection_id;
619 self.peer
620 .forward_send(request.sender_id, host_connection_id, request.payload)
621 .await?;
622 Ok(())
623 }
624
625 async fn save_buffer(
626 self: Arc<Server>,
627 request: TypedEnvelope<proto::SaveBuffer>,
628 ) -> tide::Result<()> {
629 let host;
630 let guests;
631 {
632 let state = self.state();
633 let project = state
634 .read_project(request.payload.project_id, request.sender_id)
635 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
636 host = project.host_connection_id;
637 guests = project.guest_connection_ids()
638 }
639
640 let sender = request.sender_id;
641 let receipt = request.receipt();
642 let response = self
643 .peer
644 .forward_request(sender, host, request.payload.clone())
645 .await?;
646
647 broadcast(host, guests, |conn_id| {
648 let response = response.clone();
649 let peer = &self.peer;
650 async move {
651 if conn_id == sender {
652 peer.respond(receipt, response).await
653 } else {
654 peer.forward_send(host, conn_id, response).await
655 }
656 }
657 })
658 .await?;
659
660 Ok(())
661 }
662
663 async fn update_buffer(
664 self: Arc<Server>,
665 request: TypedEnvelope<proto::UpdateBuffer>,
666 ) -> tide::Result<()> {
667 let receiver_ids = self
668 .state()
669 .project_connection_ids(request.payload.project_id, request.sender_id)
670 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
671 broadcast(request.sender_id, receiver_ids, |connection_id| {
672 self.peer
673 .forward_send(request.sender_id, connection_id, request.payload.clone())
674 })
675 .await?;
676 self.peer.respond(request.receipt(), proto::Ack {}).await?;
677 Ok(())
678 }
679
680 async fn buffer_saved(
681 self: Arc<Server>,
682 request: TypedEnvelope<proto::BufferSaved>,
683 ) -> tide::Result<()> {
684 let receiver_ids = self
685 .state()
686 .project_connection_ids(request.payload.project_id, request.sender_id)
687 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
688 broadcast(request.sender_id, receiver_ids, |connection_id| {
689 self.peer
690 .forward_send(request.sender_id, connection_id, request.payload.clone())
691 })
692 .await?;
693 Ok(())
694 }
695
696 async fn get_channels(
697 self: Arc<Server>,
698 request: TypedEnvelope<proto::GetChannels>,
699 ) -> tide::Result<()> {
700 let user_id = self.state().user_id_for_connection(request.sender_id)?;
701 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
702 self.peer
703 .respond(
704 request.receipt(),
705 proto::GetChannelsResponse {
706 channels: channels
707 .into_iter()
708 .map(|chan| proto::Channel {
709 id: chan.id.to_proto(),
710 name: chan.name,
711 })
712 .collect(),
713 },
714 )
715 .await?;
716 Ok(())
717 }
718
719 async fn get_users(
720 self: Arc<Server>,
721 request: TypedEnvelope<proto::GetUsers>,
722 ) -> tide::Result<()> {
723 let receipt = request.receipt();
724 let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
725 let users = self
726 .app_state
727 .db
728 .get_users_by_ids(user_ids)
729 .await?
730 .into_iter()
731 .map(|user| proto::User {
732 id: user.id.to_proto(),
733 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
734 github_login: user.github_login,
735 })
736 .collect();
737 self.peer
738 .respond(receipt, proto::GetUsersResponse { users })
739 .await?;
740 Ok(())
741 }
742
743 async fn update_contacts_for_users<'a>(
744 self: &Arc<Server>,
745 user_ids: impl IntoIterator<Item = &'a UserId>,
746 ) -> tide::Result<()> {
747 let mut send_futures = Vec::new();
748
749 {
750 let state = self.state();
751 for user_id in user_ids {
752 let contacts = state.contacts_for_user(*user_id);
753 for connection_id in state.connection_ids_for_user(*user_id) {
754 send_futures.push(self.peer.send(
755 connection_id,
756 proto::UpdateContacts {
757 contacts: contacts.clone(),
758 },
759 ));
760 }
761 }
762 }
763 futures::future::try_join_all(send_futures).await?;
764
765 Ok(())
766 }
767
768 async fn join_channel(
769 mut self: Arc<Self>,
770 request: TypedEnvelope<proto::JoinChannel>,
771 ) -> tide::Result<()> {
772 let user_id = self.state().user_id_for_connection(request.sender_id)?;
773 let channel_id = ChannelId::from_proto(request.payload.channel_id);
774 if !self
775 .app_state
776 .db
777 .can_user_access_channel(user_id, channel_id)
778 .await?
779 {
780 Err(anyhow!("access denied"))?;
781 }
782
783 self.state_mut().join_channel(request.sender_id, channel_id);
784 let messages = self
785 .app_state
786 .db
787 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
788 .await?
789 .into_iter()
790 .map(|msg| proto::ChannelMessage {
791 id: msg.id.to_proto(),
792 body: msg.body,
793 timestamp: msg.sent_at.unix_timestamp() as u64,
794 sender_id: msg.sender_id.to_proto(),
795 nonce: Some(msg.nonce.as_u128().into()),
796 })
797 .collect::<Vec<_>>();
798 self.peer
799 .respond(
800 request.receipt(),
801 proto::JoinChannelResponse {
802 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
803 messages,
804 },
805 )
806 .await?;
807 Ok(())
808 }
809
810 async fn leave_channel(
811 mut self: Arc<Self>,
812 request: TypedEnvelope<proto::LeaveChannel>,
813 ) -> tide::Result<()> {
814 let user_id = self.state().user_id_for_connection(request.sender_id)?;
815 let channel_id = ChannelId::from_proto(request.payload.channel_id);
816 if !self
817 .app_state
818 .db
819 .can_user_access_channel(user_id, channel_id)
820 .await?
821 {
822 Err(anyhow!("access denied"))?;
823 }
824
825 self.state_mut()
826 .leave_channel(request.sender_id, channel_id);
827
828 Ok(())
829 }
830
831 async fn send_channel_message(
832 self: Arc<Self>,
833 request: TypedEnvelope<proto::SendChannelMessage>,
834 ) -> tide::Result<()> {
835 let receipt = request.receipt();
836 let channel_id = ChannelId::from_proto(request.payload.channel_id);
837 let user_id;
838 let connection_ids;
839 {
840 let state = self.state();
841 user_id = state.user_id_for_connection(request.sender_id)?;
842 if let Some(ids) = state.channel_connection_ids(channel_id) {
843 connection_ids = ids;
844 } else {
845 return Ok(());
846 }
847 }
848
849 // Validate the message body.
850 let body = request.payload.body.trim().to_string();
851 if body.len() > MAX_MESSAGE_LEN {
852 self.peer
853 .respond_with_error(
854 receipt,
855 proto::Error {
856 message: "message is too long".to_string(),
857 },
858 )
859 .await?;
860 return Ok(());
861 }
862 if body.is_empty() {
863 self.peer
864 .respond_with_error(
865 receipt,
866 proto::Error {
867 message: "message can't be blank".to_string(),
868 },
869 )
870 .await?;
871 return Ok(());
872 }
873
874 let timestamp = OffsetDateTime::now_utc();
875 let nonce = if let Some(nonce) = request.payload.nonce {
876 nonce
877 } else {
878 self.peer
879 .respond_with_error(
880 receipt,
881 proto::Error {
882 message: "nonce can't be blank".to_string(),
883 },
884 )
885 .await?;
886 return Ok(());
887 };
888
889 let message_id = self
890 .app_state
891 .db
892 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
893 .await?
894 .to_proto();
895 let message = proto::ChannelMessage {
896 sender_id: user_id.to_proto(),
897 id: message_id,
898 body,
899 timestamp: timestamp.unix_timestamp() as u64,
900 nonce: Some(nonce),
901 };
902 broadcast(request.sender_id, connection_ids, |conn_id| {
903 self.peer.send(
904 conn_id,
905 proto::ChannelMessageSent {
906 channel_id: channel_id.to_proto(),
907 message: Some(message.clone()),
908 },
909 )
910 })
911 .await?;
912 self.peer
913 .respond(
914 receipt,
915 proto::SendChannelMessageResponse {
916 message: Some(message),
917 },
918 )
919 .await?;
920 Ok(())
921 }
922
923 async fn get_channel_messages(
924 self: Arc<Self>,
925 request: TypedEnvelope<proto::GetChannelMessages>,
926 ) -> tide::Result<()> {
927 let user_id = self.state().user_id_for_connection(request.sender_id)?;
928 let channel_id = ChannelId::from_proto(request.payload.channel_id);
929 if !self
930 .app_state
931 .db
932 .can_user_access_channel(user_id, channel_id)
933 .await?
934 {
935 Err(anyhow!("access denied"))?;
936 }
937
938 let messages = self
939 .app_state
940 .db
941 .get_channel_messages(
942 channel_id,
943 MESSAGE_COUNT_PER_PAGE,
944 Some(MessageId::from_proto(request.payload.before_message_id)),
945 )
946 .await?
947 .into_iter()
948 .map(|msg| proto::ChannelMessage {
949 id: msg.id.to_proto(),
950 body: msg.body,
951 timestamp: msg.sent_at.unix_timestamp() as u64,
952 sender_id: msg.sender_id.to_proto(),
953 nonce: Some(msg.nonce.as_u128().into()),
954 })
955 .collect::<Vec<_>>();
956 self.peer
957 .respond(
958 request.receipt(),
959 proto::GetChannelMessagesResponse {
960 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
961 messages,
962 },
963 )
964 .await?;
965 Ok(())
966 }
967
968 fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
969 self.store.read()
970 }
971
972 fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
973 self.store.write()
974 }
975}
976
977pub async fn broadcast<F, T>(
978 sender_id: ConnectionId,
979 receiver_ids: Vec<ConnectionId>,
980 mut f: F,
981) -> anyhow::Result<()>
982where
983 F: FnMut(ConnectionId) -> T,
984 T: Future<Output = anyhow::Result<()>>,
985{
986 let futures = receiver_ids
987 .into_iter()
988 .filter(|id| *id != sender_id)
989 .map(|id| f(id));
990 futures::future::try_join_all(futures).await?;
991 Ok(())
992}
993
994pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
995 let server = Server::new(app.state().clone(), rpc.clone(), None);
996 app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
997 let server = server.clone();
998 async move {
999 const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1000
1001 let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1002 let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1003 let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1004 let client_protocol_version: Option<u32> = request
1005 .header("X-Zed-Protocol-Version")
1006 .and_then(|v| v.as_str().parse().ok());
1007
1008 if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1009 return Ok(Response::new(StatusCode::UpgradeRequired));
1010 }
1011
1012 let header = match request.header("Sec-Websocket-Key") {
1013 Some(h) => h.as_str(),
1014 None => return Err(anyhow!("expected sec-websocket-key"))?,
1015 };
1016
1017 let user_id = process_auth_header(&request).await?;
1018
1019 let mut response = Response::new(StatusCode::SwitchingProtocols);
1020 response.insert_header(UPGRADE, "websocket");
1021 response.insert_header(CONNECTION, "Upgrade");
1022 let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1023 response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1024 response.insert_header("Sec-Websocket-Version", "13");
1025
1026 let http_res: &mut tide::http::Response = response.as_mut();
1027 let upgrade_receiver = http_res.recv_upgrade().await;
1028 let addr = request.remote().unwrap_or("unknown").to_string();
1029 task::spawn(async move {
1030 if let Some(stream) = upgrade_receiver.await {
1031 server
1032 .handle_connection(
1033 Connection::new(
1034 WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1035 ),
1036 addr,
1037 user_id,
1038 None,
1039 )
1040 .await;
1041 }
1042 });
1043
1044 Ok(response)
1045 }
1046 });
1047}
1048
1049fn header_contains_ignore_case<T>(
1050 request: &tide::Request<T>,
1051 header_name: HeaderName,
1052 value: &str,
1053) -> bool {
1054 request
1055 .header(header_name)
1056 .map(|h| {
1057 h.as_str()
1058 .split(',')
1059 .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1060 })
1061 .unwrap_or(false)
1062}
1063
1064#[cfg(test)]
1065mod tests {
1066 use super::*;
1067 use crate::{
1068 auth,
1069 db::{tests::TestDb, UserId},
1070 github, AppState, Config,
1071 };
1072 use ::rpc::Peer;
1073 use async_std::task;
1074 use gpui::{executor, ModelHandle, TestAppContext};
1075 use parking_lot::Mutex;
1076 use postage::{mpsc, watch};
1077 use rpc::PeerId;
1078 use serde_json::json;
1079 use sqlx::types::time::OffsetDateTime;
1080 use std::{
1081 ops::Deref,
1082 path::Path,
1083 rc::Rc,
1084 sync::{
1085 atomic::{AtomicBool, Ordering::SeqCst},
1086 Arc,
1087 },
1088 time::Duration,
1089 };
1090 use zed::{
1091 client::{
1092 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1093 EstablishConnectionError, UserStore,
1094 },
1095 editor::{Editor, EditorSettings, Input, MultiBuffer},
1096 fs::{FakeFs, Fs as _},
1097 language::{
1098 tree_sitter_rust, Diagnostic, DiagnosticEntry, Language, LanguageConfig,
1099 LanguageRegistry, LanguageServerConfig, Point,
1100 },
1101 lsp,
1102 project::{DiagnosticSummary, Project, ProjectPath},
1103 };
1104
1105 #[gpui::test]
1106 async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1107 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1108 let lang_registry = Arc::new(LanguageRegistry::new());
1109 let fs = Arc::new(FakeFs::new());
1110 cx_a.foreground().forbid_parking();
1111
1112 // Connect to a server as 2 clients.
1113 let mut server = TestServer::start(cx_a.foreground()).await;
1114 let client_a = server.create_client(&mut cx_a, "user_a").await;
1115 let client_b = server.create_client(&mut cx_b, "user_b").await;
1116
1117 // Share a project as client A
1118 fs.insert_tree(
1119 "/a",
1120 json!({
1121 ".zed.toml": r#"collaborators = ["user_b"]"#,
1122 "a.txt": "a-contents",
1123 "b.txt": "b-contents",
1124 }),
1125 )
1126 .await;
1127 let project_a = cx_a.update(|cx| {
1128 Project::local(
1129 client_a.clone(),
1130 client_a.user_store.clone(),
1131 lang_registry.clone(),
1132 fs.clone(),
1133 cx,
1134 )
1135 });
1136 let worktree_a = project_a
1137 .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1138 .await
1139 .unwrap();
1140 worktree_a
1141 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1142 .await;
1143 let project_id = project_a
1144 .update(&mut cx_a, |project, _| project.next_remote_id())
1145 .await;
1146 project_a
1147 .update(&mut cx_a, |project, cx| project.share(cx))
1148 .await
1149 .unwrap();
1150
1151 // Join that project as client B
1152 let project_b = Project::remote(
1153 project_id,
1154 client_b.clone(),
1155 client_b.user_store.clone(),
1156 lang_registry.clone(),
1157 fs.clone(),
1158 &mut cx_b.to_async(),
1159 )
1160 .await
1161 .unwrap();
1162 let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1163
1164 let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1165 assert_eq!(
1166 project
1167 .collaborators()
1168 .get(&client_a.peer_id)
1169 .unwrap()
1170 .user
1171 .github_login,
1172 "user_a"
1173 );
1174 project.replica_id()
1175 });
1176 project_a
1177 .condition(&cx_a, |tree, _| {
1178 tree.collaborators()
1179 .get(&client_b.peer_id)
1180 .map_or(false, |collaborator| {
1181 collaborator.replica_id == replica_id_b
1182 && collaborator.user.github_login == "user_b"
1183 })
1184 })
1185 .await;
1186
1187 // Open the same file as client B and client A.
1188 let buffer_b = worktree_b
1189 .update(&mut cx_b, |worktree, cx| worktree.open_buffer("b.txt", cx))
1190 .await
1191 .unwrap();
1192 let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1193 buffer_b.read_with(&cx_b, |buf, cx| {
1194 assert_eq!(buf.read(cx).text(), "b-contents")
1195 });
1196 worktree_a.read_with(&cx_a, |tree, cx| assert!(tree.has_open_buffer("b.txt", cx)));
1197 let buffer_a = worktree_a
1198 .update(&mut cx_a, |tree, cx| tree.open_buffer("b.txt", cx))
1199 .await
1200 .unwrap();
1201
1202 let editor_b = cx_b.add_view(window_b, |cx| {
1203 Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), cx)
1204 });
1205 // TODO
1206 // // Create a selection set as client B and see that selection set as client A.
1207 // buffer_a
1208 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1209 // .await;
1210
1211 // Edit the buffer as client B and see that edit as client A.
1212 editor_b.update(&mut cx_b, |editor, cx| {
1213 editor.handle_input(&Input("ok, ".into()), cx)
1214 });
1215 buffer_a
1216 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1217 .await;
1218
1219 // TODO
1220 // // Remove the selection set as client B, see those selections disappear as client A.
1221 cx_b.update(move |_| drop(editor_b));
1222 // buffer_a
1223 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1224 // .await;
1225
1226 // Close the buffer as client A, see that the buffer is closed.
1227 cx_a.update(move |_| drop(buffer_a));
1228 worktree_a
1229 .condition(&cx_a, |tree, cx| !tree.has_open_buffer("b.txt", cx))
1230 .await;
1231
1232 // Dropping the client B's project removes client B from client A's collaborators.
1233 cx_b.update(move |_| drop(project_b));
1234 project_a
1235 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1236 .await;
1237 }
1238
1239 #[gpui::test]
1240 async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1241 let lang_registry = Arc::new(LanguageRegistry::new());
1242 let fs = Arc::new(FakeFs::new());
1243 cx_a.foreground().forbid_parking();
1244
1245 // Connect to a server as 2 clients.
1246 let mut server = TestServer::start(cx_a.foreground()).await;
1247 let client_a = server.create_client(&mut cx_a, "user_a").await;
1248 let client_b = server.create_client(&mut cx_b, "user_b").await;
1249
1250 // Share a project as client A
1251 fs.insert_tree(
1252 "/a",
1253 json!({
1254 ".zed.toml": r#"collaborators = ["user_b"]"#,
1255 "a.txt": "a-contents",
1256 "b.txt": "b-contents",
1257 }),
1258 )
1259 .await;
1260 let project_a = cx_a.update(|cx| {
1261 Project::local(
1262 client_a.clone(),
1263 client_a.user_store.clone(),
1264 lang_registry.clone(),
1265 fs.clone(),
1266 cx,
1267 )
1268 });
1269 let worktree_a = project_a
1270 .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1271 .await
1272 .unwrap();
1273 worktree_a
1274 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1275 .await;
1276 let project_id = project_a
1277 .update(&mut cx_a, |project, _| project.next_remote_id())
1278 .await;
1279 project_a
1280 .update(&mut cx_a, |project, cx| project.share(cx))
1281 .await
1282 .unwrap();
1283
1284 // Join that project as client B
1285 let project_b = Project::remote(
1286 project_id,
1287 client_b.clone(),
1288 client_b.user_store.clone(),
1289 lang_registry.clone(),
1290 fs.clone(),
1291 &mut cx_b.to_async(),
1292 )
1293 .await
1294 .unwrap();
1295
1296 let worktree_b = project_b.read_with(&cx_b, |p, _| p.worktrees()[0].clone());
1297 worktree_b
1298 .update(&mut cx_b, |tree, cx| tree.open_buffer("a.txt", cx))
1299 .await
1300 .unwrap();
1301
1302 project_a
1303 .update(&mut cx_a, |project, cx| project.unshare(cx))
1304 .await
1305 .unwrap();
1306 project_b
1307 .condition(&mut cx_b, |project, _| project.is_read_only())
1308 .await;
1309 }
1310
1311 #[gpui::test]
1312 async fn test_propagate_saves_and_fs_changes(
1313 mut cx_a: TestAppContext,
1314 mut cx_b: TestAppContext,
1315 mut cx_c: TestAppContext,
1316 ) {
1317 let lang_registry = Arc::new(LanguageRegistry::new());
1318 let fs = Arc::new(FakeFs::new());
1319 cx_a.foreground().forbid_parking();
1320
1321 // Connect to a server as 3 clients.
1322 let mut server = TestServer::start(cx_a.foreground()).await;
1323 let client_a = server.create_client(&mut cx_a, "user_a").await;
1324 let client_b = server.create_client(&mut cx_b, "user_b").await;
1325 let client_c = server.create_client(&mut cx_c, "user_c").await;
1326
1327 // Share a worktree as client A.
1328 fs.insert_tree(
1329 "/a",
1330 json!({
1331 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1332 "file1": "",
1333 "file2": ""
1334 }),
1335 )
1336 .await;
1337 let project_a = cx_a.update(|cx| {
1338 Project::local(
1339 client_a.clone(),
1340 client_a.user_store.clone(),
1341 lang_registry.clone(),
1342 fs.clone(),
1343 cx,
1344 )
1345 });
1346 let worktree_a = project_a
1347 .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1348 .await
1349 .unwrap();
1350 worktree_a
1351 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1352 .await;
1353 let project_id = project_a
1354 .update(&mut cx_a, |project, _| project.next_remote_id())
1355 .await;
1356 project_a
1357 .update(&mut cx_a, |project, cx| project.share(cx))
1358 .await
1359 .unwrap();
1360
1361 // Join that worktree as clients B and C.
1362 let project_b = Project::remote(
1363 project_id,
1364 client_b.clone(),
1365 client_b.user_store.clone(),
1366 lang_registry.clone(),
1367 fs.clone(),
1368 &mut cx_b.to_async(),
1369 )
1370 .await
1371 .unwrap();
1372 let project_c = Project::remote(
1373 project_id,
1374 client_c.clone(),
1375 client_c.user_store.clone(),
1376 lang_registry.clone(),
1377 fs.clone(),
1378 &mut cx_c.to_async(),
1379 )
1380 .await
1381 .unwrap();
1382
1383 // Open and edit a buffer as both guests B and C.
1384 let worktree_b = project_b.read_with(&cx_b, |p, _| p.worktrees()[0].clone());
1385 let worktree_c = project_c.read_with(&cx_c, |p, _| p.worktrees()[0].clone());
1386 let buffer_b = worktree_b
1387 .update(&mut cx_b, |tree, cx| tree.open_buffer("file1", cx))
1388 .await
1389 .unwrap();
1390 let buffer_c = worktree_c
1391 .update(&mut cx_c, |tree, cx| tree.open_buffer("file1", cx))
1392 .await
1393 .unwrap();
1394 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1395 buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1396
1397 // Open and edit that buffer as the host.
1398 let buffer_a = worktree_a
1399 .update(&mut cx_a, |tree, cx| tree.open_buffer("file1", cx))
1400 .await
1401 .unwrap();
1402
1403 buffer_a
1404 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1405 .await;
1406 buffer_a.update(&mut cx_a, |buf, cx| {
1407 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1408 });
1409
1410 // Wait for edits to propagate
1411 buffer_a
1412 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1413 .await;
1414 buffer_b
1415 .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1416 .await;
1417 buffer_c
1418 .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1419 .await;
1420
1421 // Edit the buffer as the host and concurrently save as guest B.
1422 let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx).unwrap());
1423 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1424 save_b.await.unwrap();
1425 assert_eq!(
1426 fs.load("/a/file1".as_ref()).await.unwrap(),
1427 "hi-a, i-am-c, i-am-b, i-am-a"
1428 );
1429 buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1430 buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1431 buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1432
1433 // Make changes on host's file system, see those changes on the guests.
1434 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
1435 .await
1436 .unwrap();
1437 fs.insert_file(Path::new("/a/file4"), "4".into())
1438 .await
1439 .unwrap();
1440
1441 worktree_b
1442 .condition(&cx_b, |tree, _| tree.file_count() == 4)
1443 .await;
1444 worktree_c
1445 .condition(&cx_c, |tree, _| tree.file_count() == 4)
1446 .await;
1447 worktree_b.read_with(&cx_b, |tree, _| {
1448 assert_eq!(
1449 tree.paths()
1450 .map(|p| p.to_string_lossy())
1451 .collect::<Vec<_>>(),
1452 &[".zed.toml", "file1", "file3", "file4"]
1453 )
1454 });
1455 worktree_c.read_with(&cx_c, |tree, _| {
1456 assert_eq!(
1457 tree.paths()
1458 .map(|p| p.to_string_lossy())
1459 .collect::<Vec<_>>(),
1460 &[".zed.toml", "file1", "file3", "file4"]
1461 )
1462 });
1463 }
1464
1465 #[gpui::test]
1466 async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1467 cx_a.foreground().forbid_parking();
1468 let lang_registry = Arc::new(LanguageRegistry::new());
1469 let fs = Arc::new(FakeFs::new());
1470
1471 // Connect to a server as 2 clients.
1472 let mut server = TestServer::start(cx_a.foreground()).await;
1473 let client_a = server.create_client(&mut cx_a, "user_a").await;
1474 let client_b = server.create_client(&mut cx_b, "user_b").await;
1475
1476 // Share a project as client A
1477 fs.insert_tree(
1478 "/dir",
1479 json!({
1480 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1481 "a.txt": "a-contents",
1482 }),
1483 )
1484 .await;
1485
1486 let project_a = cx_a.update(|cx| {
1487 Project::local(
1488 client_a.clone(),
1489 client_a.user_store.clone(),
1490 lang_registry.clone(),
1491 fs.clone(),
1492 cx,
1493 )
1494 });
1495 let worktree_a = project_a
1496 .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1497 .await
1498 .unwrap();
1499 worktree_a
1500 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1501 .await;
1502 let project_id = project_a
1503 .update(&mut cx_a, |project, _| project.next_remote_id())
1504 .await;
1505 project_a
1506 .update(&mut cx_a, |project, cx| project.share(cx))
1507 .await
1508 .unwrap();
1509
1510 // Join that project as client B
1511 let project_b = Project::remote(
1512 project_id,
1513 client_b.clone(),
1514 client_b.user_store.clone(),
1515 lang_registry.clone(),
1516 fs.clone(),
1517 &mut cx_b.to_async(),
1518 )
1519 .await
1520 .unwrap();
1521 let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1522
1523 // Open a buffer as client B
1524 let buffer_b = worktree_b
1525 .update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx))
1526 .await
1527 .unwrap();
1528 let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1529
1530 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1531 buffer_b.read_with(&cx_b, |buf, _| {
1532 assert!(buf.is_dirty());
1533 assert!(!buf.has_conflict());
1534 });
1535
1536 buffer_b
1537 .update(&mut cx_b, |buf, cx| buf.save(cx))
1538 .unwrap()
1539 .await
1540 .unwrap();
1541 worktree_b
1542 .condition(&cx_b, |_, cx| {
1543 buffer_b.read(cx).file().unwrap().mtime() != mtime
1544 })
1545 .await;
1546 buffer_b.read_with(&cx_b, |buf, _| {
1547 assert!(!buf.is_dirty());
1548 assert!(!buf.has_conflict());
1549 });
1550
1551 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1552 buffer_b.read_with(&cx_b, |buf, _| {
1553 assert!(buf.is_dirty());
1554 assert!(!buf.has_conflict());
1555 });
1556 }
1557
1558 #[gpui::test]
1559 async fn test_editing_while_guest_opens_buffer(
1560 mut cx_a: TestAppContext,
1561 mut cx_b: TestAppContext,
1562 ) {
1563 cx_a.foreground().forbid_parking();
1564 let lang_registry = Arc::new(LanguageRegistry::new());
1565 let fs = Arc::new(FakeFs::new());
1566
1567 // Connect to a server as 2 clients.
1568 let mut server = TestServer::start(cx_a.foreground()).await;
1569 let client_a = server.create_client(&mut cx_a, "user_a").await;
1570 let client_b = server.create_client(&mut cx_b, "user_b").await;
1571
1572 // Share a project as client A
1573 fs.insert_tree(
1574 "/dir",
1575 json!({
1576 ".zed.toml": r#"collaborators = ["user_b"]"#,
1577 "a.txt": "a-contents",
1578 }),
1579 )
1580 .await;
1581 let project_a = cx_a.update(|cx| {
1582 Project::local(
1583 client_a.clone(),
1584 client_a.user_store.clone(),
1585 lang_registry.clone(),
1586 fs.clone(),
1587 cx,
1588 )
1589 });
1590 let worktree_a = project_a
1591 .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1592 .await
1593 .unwrap();
1594 worktree_a
1595 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1596 .await;
1597 let project_id = project_a
1598 .update(&mut cx_a, |project, _| project.next_remote_id())
1599 .await;
1600 project_a
1601 .update(&mut cx_a, |project, cx| project.share(cx))
1602 .await
1603 .unwrap();
1604
1605 // Join that project as client B
1606 let project_b = Project::remote(
1607 project_id,
1608 client_b.clone(),
1609 client_b.user_store.clone(),
1610 lang_registry.clone(),
1611 fs.clone(),
1612 &mut cx_b.to_async(),
1613 )
1614 .await
1615 .unwrap();
1616 let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1617
1618 // Open a buffer as client A
1619 let buffer_a = worktree_a
1620 .update(&mut cx_a, |tree, cx| tree.open_buffer("a.txt", cx))
1621 .await
1622 .unwrap();
1623
1624 // Start opening the same buffer as client B
1625 let buffer_b = cx_b
1626 .background()
1627 .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1628 task::yield_now().await;
1629
1630 // Edit the buffer as client A while client B is still opening it.
1631 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1632
1633 let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1634 let buffer_b = buffer_b.await.unwrap();
1635 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1636 }
1637
1638 #[gpui::test]
1639 async fn test_leaving_worktree_while_opening_buffer(
1640 mut cx_a: TestAppContext,
1641 mut cx_b: TestAppContext,
1642 ) {
1643 cx_a.foreground().forbid_parking();
1644 let lang_registry = Arc::new(LanguageRegistry::new());
1645 let fs = Arc::new(FakeFs::new());
1646
1647 // Connect to a server as 2 clients.
1648 let mut server = TestServer::start(cx_a.foreground()).await;
1649 let client_a = server.create_client(&mut cx_a, "user_a").await;
1650 let client_b = server.create_client(&mut cx_b, "user_b").await;
1651
1652 // Share a project as client A
1653 fs.insert_tree(
1654 "/dir",
1655 json!({
1656 ".zed.toml": r#"collaborators = ["user_b"]"#,
1657 "a.txt": "a-contents",
1658 }),
1659 )
1660 .await;
1661 let project_a = cx_a.update(|cx| {
1662 Project::local(
1663 client_a.clone(),
1664 client_a.user_store.clone(),
1665 lang_registry.clone(),
1666 fs.clone(),
1667 cx,
1668 )
1669 });
1670 let worktree_a = project_a
1671 .update(&mut cx_a, |p, cx| p.add_local_worktree("/dir", cx))
1672 .await
1673 .unwrap();
1674 worktree_a
1675 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1676 .await;
1677 let project_id = project_a
1678 .update(&mut cx_a, |project, _| project.next_remote_id())
1679 .await;
1680 project_a
1681 .update(&mut cx_a, |project, cx| project.share(cx))
1682 .await
1683 .unwrap();
1684
1685 // Join that project as client B
1686 let project_b = Project::remote(
1687 project_id,
1688 client_b.clone(),
1689 client_b.user_store.clone(),
1690 lang_registry.clone(),
1691 fs.clone(),
1692 &mut cx_b.to_async(),
1693 )
1694 .await
1695 .unwrap();
1696 let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1697
1698 // See that a guest has joined as client A.
1699 project_a
1700 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1701 .await;
1702
1703 // Begin opening a buffer as client B, but leave the project before the open completes.
1704 let buffer_b = cx_b
1705 .background()
1706 .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
1707 cx_b.update(|_| drop(project_b));
1708 drop(buffer_b);
1709
1710 // See that the guest has left.
1711 project_a
1712 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1713 .await;
1714 }
1715
1716 #[gpui::test]
1717 async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1718 cx_a.foreground().forbid_parking();
1719 let lang_registry = Arc::new(LanguageRegistry::new());
1720 let fs = Arc::new(FakeFs::new());
1721
1722 // Connect to a server as 2 clients.
1723 let mut server = TestServer::start(cx_a.foreground()).await;
1724 let client_a = server.create_client(&mut cx_a, "user_a").await;
1725 let client_b = server.create_client(&mut cx_b, "user_b").await;
1726
1727 // Share a project as client A
1728 fs.insert_tree(
1729 "/a",
1730 json!({
1731 ".zed.toml": r#"collaborators = ["user_b"]"#,
1732 "a.txt": "a-contents",
1733 "b.txt": "b-contents",
1734 }),
1735 )
1736 .await;
1737 let project_a = cx_a.update(|cx| {
1738 Project::local(
1739 client_a.clone(),
1740 client_a.user_store.clone(),
1741 lang_registry.clone(),
1742 fs.clone(),
1743 cx,
1744 )
1745 });
1746 let worktree_a = project_a
1747 .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1748 .await
1749 .unwrap();
1750 worktree_a
1751 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1752 .await;
1753 let project_id = project_a
1754 .update(&mut cx_a, |project, _| project.next_remote_id())
1755 .await;
1756 project_a
1757 .update(&mut cx_a, |project, cx| project.share(cx))
1758 .await
1759 .unwrap();
1760
1761 // Join that project as client B
1762 let _project_b = Project::remote(
1763 project_id,
1764 client_b.clone(),
1765 client_b.user_store.clone(),
1766 lang_registry.clone(),
1767 fs.clone(),
1768 &mut cx_b.to_async(),
1769 )
1770 .await
1771 .unwrap();
1772
1773 // See that a guest has joined as client A.
1774 project_a
1775 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1776 .await;
1777
1778 // Drop client B's connection and ensure client A observes client B leaving the worktree.
1779 client_b.disconnect(&cx_b.to_async()).await.unwrap();
1780 project_a
1781 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1782 .await;
1783 }
1784
1785 #[gpui::test]
1786 async fn test_collaborating_with_diagnostics(
1787 mut cx_a: TestAppContext,
1788 mut cx_b: TestAppContext,
1789 ) {
1790 cx_a.foreground().forbid_parking();
1791 let mut lang_registry = Arc::new(LanguageRegistry::new());
1792 let fs = Arc::new(FakeFs::new());
1793
1794 // Set up a fake language server.
1795 let (language_server_config, mut fake_language_server) =
1796 LanguageServerConfig::fake(cx_a.background()).await;
1797 Arc::get_mut(&mut lang_registry)
1798 .unwrap()
1799 .add(Arc::new(Language::new(
1800 LanguageConfig {
1801 name: "Rust".to_string(),
1802 path_suffixes: vec!["rs".to_string()],
1803 language_server: Some(language_server_config),
1804 ..Default::default()
1805 },
1806 Some(tree_sitter_rust::language()),
1807 )));
1808
1809 // Connect to a server as 2 clients.
1810 let mut server = TestServer::start(cx_a.foreground()).await;
1811 let client_a = server.create_client(&mut cx_a, "user_a").await;
1812 let client_b = server.create_client(&mut cx_b, "user_b").await;
1813
1814 // Share a project as client A
1815 fs.insert_tree(
1816 "/a",
1817 json!({
1818 ".zed.toml": r#"collaborators = ["user_b"]"#,
1819 "a.rs": "let one = two",
1820 "other.rs": "",
1821 }),
1822 )
1823 .await;
1824 let project_a = cx_a.update(|cx| {
1825 Project::local(
1826 client_a.clone(),
1827 client_a.user_store.clone(),
1828 lang_registry.clone(),
1829 fs.clone(),
1830 cx,
1831 )
1832 });
1833 let worktree_a = project_a
1834 .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
1835 .await
1836 .unwrap();
1837 worktree_a
1838 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1839 .await;
1840 let project_id = project_a
1841 .update(&mut cx_a, |project, _| project.next_remote_id())
1842 .await;
1843 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1844 project_a
1845 .update(&mut cx_a, |project, cx| project.share(cx))
1846 .await
1847 .unwrap();
1848
1849 // Cause the language server to start.
1850 let _ = cx_a
1851 .background()
1852 .spawn(worktree_a.update(&mut cx_a, |worktree, cx| {
1853 worktree.open_buffer("other.rs", cx)
1854 }))
1855 .await
1856 .unwrap();
1857
1858 // Simulate a language server reporting errors for a file.
1859 fake_language_server
1860 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1861 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1862 version: None,
1863 diagnostics: vec![lsp::Diagnostic {
1864 severity: Some(lsp::DiagnosticSeverity::ERROR),
1865 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1866 message: "message 1".to_string(),
1867 ..Default::default()
1868 }],
1869 })
1870 .await;
1871
1872 // Wait for server to see the diagnostics update.
1873 server
1874 .condition(|store| {
1875 let worktree = store
1876 .project(project_id)
1877 .unwrap()
1878 .worktrees
1879 .get(&worktree_id.to_proto())
1880 .unwrap();
1881
1882 !worktree
1883 .share
1884 .as_ref()
1885 .unwrap()
1886 .diagnostic_summaries
1887 .is_empty()
1888 })
1889 .await;
1890
1891 // Join the worktree as client B.
1892 let project_b = Project::remote(
1893 project_id,
1894 client_b.clone(),
1895 client_b.user_store.clone(),
1896 lang_registry.clone(),
1897 fs.clone(),
1898 &mut cx_b.to_async(),
1899 )
1900 .await
1901 .unwrap();
1902
1903 project_b.read_with(&cx_b, |project, cx| {
1904 assert_eq!(
1905 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
1906 &[(
1907 ProjectPath {
1908 worktree_id,
1909 path: Arc::from(Path::new("a.rs")),
1910 },
1911 DiagnosticSummary {
1912 error_count: 1,
1913 warning_count: 0,
1914 ..Default::default()
1915 },
1916 )]
1917 )
1918 });
1919
1920 // Simulate a language server reporting more errors for a file.
1921 fake_language_server
1922 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1923 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1924 version: None,
1925 diagnostics: vec![
1926 lsp::Diagnostic {
1927 severity: Some(lsp::DiagnosticSeverity::ERROR),
1928 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1929 message: "message 1".to_string(),
1930 ..Default::default()
1931 },
1932 lsp::Diagnostic {
1933 severity: Some(lsp::DiagnosticSeverity::WARNING),
1934 range: lsp::Range::new(
1935 lsp::Position::new(0, 10),
1936 lsp::Position::new(0, 13),
1937 ),
1938 message: "message 2".to_string(),
1939 ..Default::default()
1940 },
1941 ],
1942 })
1943 .await;
1944
1945 // Client b gets the updated summaries
1946 project_b
1947 .condition(&cx_b, |project, cx| {
1948 project.diagnostic_summaries(cx).collect::<Vec<_>>()
1949 == &[(
1950 ProjectPath {
1951 worktree_id,
1952 path: Arc::from(Path::new("a.rs")),
1953 },
1954 DiagnosticSummary {
1955 error_count: 1,
1956 warning_count: 1,
1957 ..Default::default()
1958 },
1959 )]
1960 })
1961 .await;
1962
1963 // Open the file with the errors on client B. They should be present.
1964 let worktree_b = project_b.update(&mut cx_b, |p, _| p.worktrees()[0].clone());
1965 let buffer_b = cx_b
1966 .background()
1967 .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.rs", cx)))
1968 .await
1969 .unwrap();
1970
1971 buffer_b.read_with(&cx_b, |buffer, _| {
1972 assert_eq!(
1973 buffer
1974 .snapshot()
1975 .diagnostics_in_range::<_, Point>(0..buffer.len())
1976 .map(|entry| entry)
1977 .collect::<Vec<_>>(),
1978 &[
1979 DiagnosticEntry {
1980 range: Point::new(0, 4)..Point::new(0, 7),
1981 diagnostic: Diagnostic {
1982 group_id: 0,
1983 message: "message 1".to_string(),
1984 severity: lsp::DiagnosticSeverity::ERROR,
1985 is_primary: true,
1986 ..Default::default()
1987 }
1988 },
1989 DiagnosticEntry {
1990 range: Point::new(0, 10)..Point::new(0, 13),
1991 diagnostic: Diagnostic {
1992 group_id: 1,
1993 severity: lsp::DiagnosticSeverity::WARNING,
1994 message: "message 2".to_string(),
1995 is_primary: true,
1996 ..Default::default()
1997 }
1998 }
1999 ]
2000 );
2001 });
2002 }
2003
2004 #[gpui::test]
2005 async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2006 cx_a.foreground().forbid_parking();
2007
2008 // Connect to a server as 2 clients.
2009 let mut server = TestServer::start(cx_a.foreground()).await;
2010 let client_a = server.create_client(&mut cx_a, "user_a").await;
2011 let client_b = server.create_client(&mut cx_b, "user_b").await;
2012
2013 // Create an org that includes these 2 users.
2014 let db = &server.app_state.db;
2015 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2016 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2017 .await
2018 .unwrap();
2019 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2020 .await
2021 .unwrap();
2022
2023 // Create a channel that includes all the users.
2024 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2025 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2026 .await
2027 .unwrap();
2028 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2029 .await
2030 .unwrap();
2031 db.create_channel_message(
2032 channel_id,
2033 client_b.current_user_id(&cx_b),
2034 "hello A, it's B.",
2035 OffsetDateTime::now_utc(),
2036 1,
2037 )
2038 .await
2039 .unwrap();
2040
2041 let channels_a = cx_a
2042 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2043 channels_a
2044 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2045 .await;
2046 channels_a.read_with(&cx_a, |list, _| {
2047 assert_eq!(
2048 list.available_channels().unwrap(),
2049 &[ChannelDetails {
2050 id: channel_id.to_proto(),
2051 name: "test-channel".to_string()
2052 }]
2053 )
2054 });
2055 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2056 this.get_channel(channel_id.to_proto(), cx).unwrap()
2057 });
2058 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2059 channel_a
2060 .condition(&cx_a, |channel, _| {
2061 channel_messages(channel)
2062 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2063 })
2064 .await;
2065
2066 let channels_b = cx_b
2067 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2068 channels_b
2069 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2070 .await;
2071 channels_b.read_with(&cx_b, |list, _| {
2072 assert_eq!(
2073 list.available_channels().unwrap(),
2074 &[ChannelDetails {
2075 id: channel_id.to_proto(),
2076 name: "test-channel".to_string()
2077 }]
2078 )
2079 });
2080
2081 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2082 this.get_channel(channel_id.to_proto(), cx).unwrap()
2083 });
2084 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2085 channel_b
2086 .condition(&cx_b, |channel, _| {
2087 channel_messages(channel)
2088 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2089 })
2090 .await;
2091
2092 channel_a
2093 .update(&mut cx_a, |channel, cx| {
2094 channel
2095 .send_message("oh, hi B.".to_string(), cx)
2096 .unwrap()
2097 .detach();
2098 let task = channel.send_message("sup".to_string(), cx).unwrap();
2099 assert_eq!(
2100 channel_messages(channel),
2101 &[
2102 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2103 ("user_a".to_string(), "oh, hi B.".to_string(), true),
2104 ("user_a".to_string(), "sup".to_string(), true)
2105 ]
2106 );
2107 task
2108 })
2109 .await
2110 .unwrap();
2111
2112 channel_b
2113 .condition(&cx_b, |channel, _| {
2114 channel_messages(channel)
2115 == [
2116 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2117 ("user_a".to_string(), "oh, hi B.".to_string(), false),
2118 ("user_a".to_string(), "sup".to_string(), false),
2119 ]
2120 })
2121 .await;
2122
2123 assert_eq!(
2124 server
2125 .state()
2126 .await
2127 .channel(channel_id)
2128 .unwrap()
2129 .connection_ids
2130 .len(),
2131 2
2132 );
2133 cx_b.update(|_| drop(channel_b));
2134 server
2135 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
2136 .await;
2137
2138 cx_a.update(|_| drop(channel_a));
2139 server
2140 .condition(|state| state.channel(channel_id).is_none())
2141 .await;
2142 }
2143
2144 #[gpui::test]
2145 async fn test_chat_message_validation(mut cx_a: TestAppContext) {
2146 cx_a.foreground().forbid_parking();
2147
2148 let mut server = TestServer::start(cx_a.foreground()).await;
2149 let client_a = server.create_client(&mut cx_a, "user_a").await;
2150
2151 let db = &server.app_state.db;
2152 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2153 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2154 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2155 .await
2156 .unwrap();
2157 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2158 .await
2159 .unwrap();
2160
2161 let channels_a = cx_a
2162 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2163 channels_a
2164 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2165 .await;
2166 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2167 this.get_channel(channel_id.to_proto(), cx).unwrap()
2168 });
2169
2170 // Messages aren't allowed to be too long.
2171 channel_a
2172 .update(&mut cx_a, |channel, cx| {
2173 let long_body = "this is long.\n".repeat(1024);
2174 channel.send_message(long_body, cx).unwrap()
2175 })
2176 .await
2177 .unwrap_err();
2178
2179 // Messages aren't allowed to be blank.
2180 channel_a.update(&mut cx_a, |channel, cx| {
2181 channel.send_message(String::new(), cx).unwrap_err()
2182 });
2183
2184 // Leading and trailing whitespace are trimmed.
2185 channel_a
2186 .update(&mut cx_a, |channel, cx| {
2187 channel
2188 .send_message("\n surrounded by whitespace \n".to_string(), cx)
2189 .unwrap()
2190 })
2191 .await
2192 .unwrap();
2193 assert_eq!(
2194 db.get_channel_messages(channel_id, 10, None)
2195 .await
2196 .unwrap()
2197 .iter()
2198 .map(|m| &m.body)
2199 .collect::<Vec<_>>(),
2200 &["surrounded by whitespace"]
2201 );
2202 }
2203
2204 #[gpui::test]
2205 async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2206 cx_a.foreground().forbid_parking();
2207
2208 // Connect to a server as 2 clients.
2209 let mut server = TestServer::start(cx_a.foreground()).await;
2210 let client_a = server.create_client(&mut cx_a, "user_a").await;
2211 let client_b = server.create_client(&mut cx_b, "user_b").await;
2212 let mut status_b = client_b.status();
2213
2214 // Create an org that includes these 2 users.
2215 let db = &server.app_state.db;
2216 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2217 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2218 .await
2219 .unwrap();
2220 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2221 .await
2222 .unwrap();
2223
2224 // Create a channel that includes all the users.
2225 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2226 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2227 .await
2228 .unwrap();
2229 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2230 .await
2231 .unwrap();
2232 db.create_channel_message(
2233 channel_id,
2234 client_b.current_user_id(&cx_b),
2235 "hello A, it's B.",
2236 OffsetDateTime::now_utc(),
2237 2,
2238 )
2239 .await
2240 .unwrap();
2241
2242 let channels_a = cx_a
2243 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2244 channels_a
2245 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2246 .await;
2247
2248 channels_a.read_with(&cx_a, |list, _| {
2249 assert_eq!(
2250 list.available_channels().unwrap(),
2251 &[ChannelDetails {
2252 id: channel_id.to_proto(),
2253 name: "test-channel".to_string()
2254 }]
2255 )
2256 });
2257 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2258 this.get_channel(channel_id.to_proto(), cx).unwrap()
2259 });
2260 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2261 channel_a
2262 .condition(&cx_a, |channel, _| {
2263 channel_messages(channel)
2264 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2265 })
2266 .await;
2267
2268 let channels_b = cx_b
2269 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2270 channels_b
2271 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2272 .await;
2273 channels_b.read_with(&cx_b, |list, _| {
2274 assert_eq!(
2275 list.available_channels().unwrap(),
2276 &[ChannelDetails {
2277 id: channel_id.to_proto(),
2278 name: "test-channel".to_string()
2279 }]
2280 )
2281 });
2282
2283 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2284 this.get_channel(channel_id.to_proto(), cx).unwrap()
2285 });
2286 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2287 channel_b
2288 .condition(&cx_b, |channel, _| {
2289 channel_messages(channel)
2290 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2291 })
2292 .await;
2293
2294 // Disconnect client B, ensuring we can still access its cached channel data.
2295 server.forbid_connections();
2296 server.disconnect_client(client_b.current_user_id(&cx_b));
2297 while !matches!(
2298 status_b.recv().await,
2299 Some(client::Status::ReconnectionError { .. })
2300 ) {}
2301
2302 channels_b.read_with(&cx_b, |channels, _| {
2303 assert_eq!(
2304 channels.available_channels().unwrap(),
2305 [ChannelDetails {
2306 id: channel_id.to_proto(),
2307 name: "test-channel".to_string()
2308 }]
2309 )
2310 });
2311 channel_b.read_with(&cx_b, |channel, _| {
2312 assert_eq!(
2313 channel_messages(channel),
2314 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2315 )
2316 });
2317
2318 // Send a message from client B while it is disconnected.
2319 channel_b
2320 .update(&mut cx_b, |channel, cx| {
2321 let task = channel
2322 .send_message("can you see this?".to_string(), cx)
2323 .unwrap();
2324 assert_eq!(
2325 channel_messages(channel),
2326 &[
2327 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2328 ("user_b".to_string(), "can you see this?".to_string(), true)
2329 ]
2330 );
2331 task
2332 })
2333 .await
2334 .unwrap_err();
2335
2336 // Send a message from client A while B is disconnected.
2337 channel_a
2338 .update(&mut cx_a, |channel, cx| {
2339 channel
2340 .send_message("oh, hi B.".to_string(), cx)
2341 .unwrap()
2342 .detach();
2343 let task = channel.send_message("sup".to_string(), cx).unwrap();
2344 assert_eq!(
2345 channel_messages(channel),
2346 &[
2347 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2348 ("user_a".to_string(), "oh, hi B.".to_string(), true),
2349 ("user_a".to_string(), "sup".to_string(), true)
2350 ]
2351 );
2352 task
2353 })
2354 .await
2355 .unwrap();
2356
2357 // Give client B a chance to reconnect.
2358 server.allow_connections();
2359 cx_b.foreground().advance_clock(Duration::from_secs(10));
2360
2361 // Verify that B sees the new messages upon reconnection, as well as the message client B
2362 // sent while offline.
2363 channel_b
2364 .condition(&cx_b, |channel, _| {
2365 channel_messages(channel)
2366 == [
2367 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2368 ("user_a".to_string(), "oh, hi B.".to_string(), false),
2369 ("user_a".to_string(), "sup".to_string(), false),
2370 ("user_b".to_string(), "can you see this?".to_string(), false),
2371 ]
2372 })
2373 .await;
2374
2375 // Ensure client A and B can communicate normally after reconnection.
2376 channel_a
2377 .update(&mut cx_a, |channel, cx| {
2378 channel.send_message("you online?".to_string(), cx).unwrap()
2379 })
2380 .await
2381 .unwrap();
2382 channel_b
2383 .condition(&cx_b, |channel, _| {
2384 channel_messages(channel)
2385 == [
2386 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2387 ("user_a".to_string(), "oh, hi B.".to_string(), false),
2388 ("user_a".to_string(), "sup".to_string(), false),
2389 ("user_b".to_string(), "can you see this?".to_string(), false),
2390 ("user_a".to_string(), "you online?".to_string(), false),
2391 ]
2392 })
2393 .await;
2394
2395 channel_b
2396 .update(&mut cx_b, |channel, cx| {
2397 channel.send_message("yep".to_string(), cx).unwrap()
2398 })
2399 .await
2400 .unwrap();
2401 channel_a
2402 .condition(&cx_a, |channel, _| {
2403 channel_messages(channel)
2404 == [
2405 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2406 ("user_a".to_string(), "oh, hi B.".to_string(), false),
2407 ("user_a".to_string(), "sup".to_string(), false),
2408 ("user_b".to_string(), "can you see this?".to_string(), false),
2409 ("user_a".to_string(), "you online?".to_string(), false),
2410 ("user_b".to_string(), "yep".to_string(), false),
2411 ]
2412 })
2413 .await;
2414 }
2415
2416 #[gpui::test]
2417 async fn test_contacts(
2418 mut cx_a: TestAppContext,
2419 mut cx_b: TestAppContext,
2420 mut cx_c: TestAppContext,
2421 ) {
2422 cx_a.foreground().forbid_parking();
2423 let lang_registry = Arc::new(LanguageRegistry::new());
2424 let fs = Arc::new(FakeFs::new());
2425
2426 // Connect to a server as 3 clients.
2427 let mut server = TestServer::start(cx_a.foreground()).await;
2428 let client_a = server.create_client(&mut cx_a, "user_a").await;
2429 let client_b = server.create_client(&mut cx_b, "user_b").await;
2430 let client_c = server.create_client(&mut cx_c, "user_c").await;
2431
2432 // Share a worktree as client A.
2433 fs.insert_tree(
2434 "/a",
2435 json!({
2436 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2437 }),
2438 )
2439 .await;
2440
2441 let project_a = cx_a.update(|cx| {
2442 Project::local(
2443 client_a.clone(),
2444 client_a.user_store.clone(),
2445 lang_registry.clone(),
2446 fs.clone(),
2447 cx,
2448 )
2449 });
2450 let worktree_a = project_a
2451 .update(&mut cx_a, |p, cx| p.add_local_worktree("/a", cx))
2452 .await
2453 .unwrap();
2454 worktree_a
2455 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2456 .await;
2457
2458 client_a
2459 .user_store
2460 .condition(&cx_a, |user_store, _| {
2461 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2462 })
2463 .await;
2464 client_b
2465 .user_store
2466 .condition(&cx_b, |user_store, _| {
2467 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2468 })
2469 .await;
2470 client_c
2471 .user_store
2472 .condition(&cx_c, |user_store, _| {
2473 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2474 })
2475 .await;
2476
2477 let project_id = project_a
2478 .update(&mut cx_a, |project, _| project.next_remote_id())
2479 .await;
2480 project_a
2481 .update(&mut cx_a, |project, cx| project.share(cx))
2482 .await
2483 .unwrap();
2484
2485 let _project_b = Project::remote(
2486 project_id,
2487 client_b.clone(),
2488 client_b.user_store.clone(),
2489 lang_registry.clone(),
2490 fs.clone(),
2491 &mut cx_b.to_async(),
2492 )
2493 .await
2494 .unwrap();
2495
2496 client_a
2497 .user_store
2498 .condition(&cx_a, |user_store, _| {
2499 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2500 })
2501 .await;
2502 client_b
2503 .user_store
2504 .condition(&cx_b, |user_store, _| {
2505 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2506 })
2507 .await;
2508 client_c
2509 .user_store
2510 .condition(&cx_c, |user_store, _| {
2511 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2512 })
2513 .await;
2514
2515 project_a
2516 .condition(&cx_a, |project, _| {
2517 project.collaborators().contains_key(&client_b.peer_id)
2518 })
2519 .await;
2520
2521 cx_a.update(move |_| drop(project_a));
2522 client_a
2523 .user_store
2524 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
2525 .await;
2526 client_b
2527 .user_store
2528 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
2529 .await;
2530 client_c
2531 .user_store
2532 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
2533 .await;
2534
2535 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
2536 user_store
2537 .contacts()
2538 .iter()
2539 .map(|contact| {
2540 let worktrees = contact
2541 .projects
2542 .iter()
2543 .map(|p| {
2544 (
2545 p.worktree_root_names[0].as_str(),
2546 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
2547 )
2548 })
2549 .collect();
2550 (contact.user.github_login.as_str(), worktrees)
2551 })
2552 .collect()
2553 }
2554 }
2555
2556 struct TestServer {
2557 peer: Arc<Peer>,
2558 app_state: Arc<AppState>,
2559 server: Arc<Server>,
2560 foreground: Rc<executor::Foreground>,
2561 notifications: mpsc::Receiver<()>,
2562 connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
2563 forbid_connections: Arc<AtomicBool>,
2564 _test_db: TestDb,
2565 }
2566
2567 impl TestServer {
2568 async fn start(foreground: Rc<executor::Foreground>) -> Self {
2569 let test_db = TestDb::new();
2570 let app_state = Self::build_app_state(&test_db).await;
2571 let peer = Peer::new();
2572 let notifications = mpsc::channel(128);
2573 let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
2574 Self {
2575 peer,
2576 app_state,
2577 server,
2578 foreground,
2579 notifications: notifications.1,
2580 connection_killers: Default::default(),
2581 forbid_connections: Default::default(),
2582 _test_db: test_db,
2583 }
2584 }
2585
2586 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
2587 let http = FakeHttpClient::with_404_response();
2588 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
2589 let client_name = name.to_string();
2590 let mut client = Client::new(http.clone());
2591 let server = self.server.clone();
2592 let connection_killers = self.connection_killers.clone();
2593 let forbid_connections = self.forbid_connections.clone();
2594 let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
2595
2596 Arc::get_mut(&mut client)
2597 .unwrap()
2598 .override_authenticate(move |cx| {
2599 cx.spawn(|_| async move {
2600 let access_token = "the-token".to_string();
2601 Ok(Credentials {
2602 user_id: user_id.0 as u64,
2603 access_token,
2604 })
2605 })
2606 })
2607 .override_establish_connection(move |credentials, cx| {
2608 assert_eq!(credentials.user_id, user_id.0 as u64);
2609 assert_eq!(credentials.access_token, "the-token");
2610
2611 let server = server.clone();
2612 let connection_killers = connection_killers.clone();
2613 let forbid_connections = forbid_connections.clone();
2614 let client_name = client_name.clone();
2615 let connection_id_tx = connection_id_tx.clone();
2616 cx.spawn(move |cx| async move {
2617 if forbid_connections.load(SeqCst) {
2618 Err(EstablishConnectionError::other(anyhow!(
2619 "server is forbidding connections"
2620 )))
2621 } else {
2622 let (client_conn, server_conn, kill_conn) = Connection::in_memory();
2623 connection_killers.lock().insert(user_id, kill_conn);
2624 cx.background()
2625 .spawn(server.handle_connection(
2626 server_conn,
2627 client_name,
2628 user_id,
2629 Some(connection_id_tx),
2630 ))
2631 .detach();
2632 Ok(client_conn)
2633 }
2634 })
2635 });
2636
2637 client
2638 .authenticate_and_connect(&cx.to_async())
2639 .await
2640 .unwrap();
2641
2642 let peer_id = PeerId(connection_id_rx.recv().await.unwrap().0);
2643 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
2644 let mut authed_user =
2645 user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
2646 while authed_user.recv().await.unwrap().is_none() {}
2647
2648 TestClient {
2649 client,
2650 peer_id,
2651 user_store,
2652 }
2653 }
2654
2655 fn disconnect_client(&self, user_id: UserId) {
2656 if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
2657 let _ = kill_conn.try_send(Some(()));
2658 }
2659 }
2660
2661 fn forbid_connections(&self) {
2662 self.forbid_connections.store(true, SeqCst);
2663 }
2664
2665 fn allow_connections(&self) {
2666 self.forbid_connections.store(false, SeqCst);
2667 }
2668
2669 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
2670 let mut config = Config::default();
2671 config.session_secret = "a".repeat(32);
2672 config.database_url = test_db.url.clone();
2673 let github_client = github::AppClient::test();
2674 Arc::new(AppState {
2675 db: test_db.db().clone(),
2676 handlebars: Default::default(),
2677 auth_client: auth::build_client("", ""),
2678 repo_client: github::RepoClient::test(&github_client),
2679 github_client,
2680 config,
2681 })
2682 }
2683
2684 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
2685 self.server.store.read()
2686 }
2687
2688 async fn condition<F>(&mut self, mut predicate: F)
2689 where
2690 F: FnMut(&Store) -> bool,
2691 {
2692 async_std::future::timeout(Duration::from_millis(500), async {
2693 while !(predicate)(&*self.server.store.read()) {
2694 self.foreground.start_waiting();
2695 self.notifications.recv().await;
2696 self.foreground.finish_waiting();
2697 }
2698 })
2699 .await
2700 .expect("condition timed out");
2701 }
2702 }
2703
2704 impl Drop for TestServer {
2705 fn drop(&mut self) {
2706 task::block_on(self.peer.reset());
2707 }
2708 }
2709
2710 struct TestClient {
2711 client: Arc<Client>,
2712 pub peer_id: PeerId,
2713 pub user_store: ModelHandle<UserStore>,
2714 }
2715
2716 impl Deref for TestClient {
2717 type Target = Arc<Client>;
2718
2719 fn deref(&self) -> &Self::Target {
2720 &self.client
2721 }
2722 }
2723
2724 impl TestClient {
2725 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
2726 UserId::from_proto(
2727 self.user_store
2728 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
2729 )
2730 }
2731 }
2732
2733 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
2734 channel
2735 .messages()
2736 .cursor::<()>()
2737 .map(|m| {
2738 (
2739 m.sender.github_login.clone(),
2740 m.body.clone(),
2741 m.is_pending(),
2742 )
2743 })
2744 .collect()
2745 }
2746
2747 struct EmptyView;
2748
2749 impl gpui::Entity for EmptyView {
2750 type Event = ();
2751 }
2752
2753 impl gpui::View for EmptyView {
2754 fn ui_name() -> &'static str {
2755 "empty view"
2756 }
2757
2758 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
2759 gpui::Element::boxed(gpui::elements::Empty)
2760 }
2761 }
2762}