1mod store;
2
3use super::{
4 auth::process_auth_header,
5 db::{ChannelId, MessageId, UserId},
6 AppState,
7};
8use anyhow::anyhow;
9use async_std::task;
10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
11use collections::{HashMap, HashSet};
12use futures::{future::BoxFuture, FutureExt, StreamExt};
13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
14use postage::{mpsc, prelude::Sink as _};
15use rpc::{
16 proto::{self, AnyTypedEnvelope, EnvelopedMessage},
17 Connection, ConnectionId, Peer, TypedEnvelope,
18};
19use sha1::{Digest as _, Sha1};
20use std::{any::TypeId, future::Future, mem, path::PathBuf, sync::Arc, time::Instant};
21use store::{Store, Worktree};
22use surf::StatusCode;
23use tide::log;
24use tide::{
25 http::headers::{HeaderName, CONNECTION, UPGRADE},
26 Request, Response,
27};
28use time::OffsetDateTime;
29
30type MessageHandler = Box<
31 dyn Send
32 + Sync
33 + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
34>;
35
36pub struct Server {
37 peer: Arc<Peer>,
38 store: RwLock<Store>,
39 app_state: Arc<AppState>,
40 handlers: HashMap<TypeId, MessageHandler>,
41 notifications: Option<mpsc::Sender<()>>,
42}
43
44const MESSAGE_COUNT_PER_PAGE: usize = 100;
45const MAX_MESSAGE_LEN: usize = 1024;
46const NO_SUCH_PROJECT: &'static str = "no such project";
47
48impl Server {
49 pub fn new(
50 app_state: Arc<AppState>,
51 peer: Arc<Peer>,
52 notifications: Option<mpsc::Sender<()>>,
53 ) -> Arc<Self> {
54 let mut server = Self {
55 peer,
56 app_state,
57 store: Default::default(),
58 handlers: Default::default(),
59 notifications,
60 };
61
62 server
63 .add_handler(Server::ping)
64 .add_handler(Server::register_project)
65 .add_handler(Server::unregister_project)
66 .add_handler(Server::share_project)
67 .add_handler(Server::unshare_project)
68 .add_handler(Server::join_project)
69 .add_handler(Server::leave_project)
70 .add_handler(Server::register_worktree)
71 .add_handler(Server::unregister_worktree)
72 .add_handler(Server::share_worktree)
73 .add_handler(Server::update_worktree)
74 .add_handler(Server::update_diagnostic_summary)
75 .add_handler(Server::disk_based_diagnostics_updating)
76 .add_handler(Server::disk_based_diagnostics_updated)
77 .add_handler(Server::open_buffer)
78 .add_handler(Server::close_buffer)
79 .add_handler(Server::update_buffer)
80 .add_handler(Server::update_buffer_file)
81 .add_handler(Server::buffer_saved)
82 .add_handler(Server::save_buffer)
83 .add_handler(Server::format_buffer)
84 .add_handler(Server::get_channels)
85 .add_handler(Server::get_users)
86 .add_handler(Server::join_channel)
87 .add_handler(Server::leave_channel)
88 .add_handler(Server::send_channel_message)
89 .add_handler(Server::get_channel_messages);
90
91 Arc::new(server)
92 }
93
94 fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
95 where
96 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
97 Fut: 'static + Send + Future<Output = tide::Result<()>>,
98 M: EnvelopedMessage,
99 {
100 let prev_handler = self.handlers.insert(
101 TypeId::of::<M>(),
102 Box::new(move |server, envelope| {
103 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
104 (handler)(server, *envelope).boxed()
105 }),
106 );
107 if prev_handler.is_some() {
108 panic!("registered a handler for the same message twice");
109 }
110 self
111 }
112
113 pub fn handle_connection(
114 self: &Arc<Self>,
115 connection: Connection,
116 addr: String,
117 user_id: UserId,
118 mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
119 ) -> impl Future<Output = ()> {
120 let mut this = self.clone();
121 async move {
122 let (connection_id, handle_io, mut incoming_rx) =
123 this.peer.add_connection(connection).await;
124
125 if let Some(send_connection_id) = send_connection_id.as_mut() {
126 let _ = send_connection_id.send(connection_id).await;
127 }
128
129 this.state_mut().add_connection(connection_id, user_id);
130 if let Err(err) = this.update_contacts_for_users(&[user_id]).await {
131 log::error!("error updating contacts for {:?}: {}", user_id, err);
132 }
133
134 let handle_io = handle_io.fuse();
135 futures::pin_mut!(handle_io);
136 loop {
137 let next_message = incoming_rx.next().fuse();
138 futures::pin_mut!(next_message);
139 futures::select_biased! {
140 message = next_message => {
141 if let Some(message) = message {
142 let start_time = Instant::now();
143 log::info!("RPC message received: {}", message.payload_type_name());
144 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
145 if let Err(err) = (handler)(this.clone(), message).await {
146 log::error!("error handling message: {:?}", err);
147 } else {
148 log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
149 }
150
151 if let Some(mut notifications) = this.notifications.clone() {
152 let _ = notifications.send(()).await;
153 }
154 } else {
155 log::warn!("unhandled message: {}", message.payload_type_name());
156 }
157 } else {
158 log::info!("rpc connection closed {:?}", addr);
159 break;
160 }
161 }
162 handle_io = handle_io => {
163 if let Err(err) = handle_io {
164 log::error!("error handling rpc connection {:?} - {:?}", addr, err);
165 }
166 break;
167 }
168 }
169 }
170
171 if let Err(err) = this.sign_out(connection_id).await {
172 log::error!("error signing out connection {:?} - {:?}", addr, err);
173 }
174 }
175 }
176
177 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
178 self.peer.disconnect(connection_id);
179 let removed_connection = self.state_mut().remove_connection(connection_id)?;
180
181 for (project_id, project) in removed_connection.hosted_projects {
182 if let Some(share) = project.share {
183 broadcast(
184 connection_id,
185 share.guests.keys().copied().collect(),
186 |conn_id| {
187 self.peer
188 .send(conn_id, proto::UnshareProject { project_id })
189 },
190 )
191 .await?;
192 }
193 }
194
195 for (project_id, peer_ids) in removed_connection.guest_project_ids {
196 broadcast(connection_id, peer_ids, |conn_id| {
197 self.peer.send(
198 conn_id,
199 proto::RemoveProjectCollaborator {
200 project_id,
201 peer_id: connection_id.0,
202 },
203 )
204 })
205 .await?;
206 }
207
208 self.update_contacts_for_users(removed_connection.contact_ids.iter())
209 .await?;
210
211 Ok(())
212 }
213
214 async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
215 self.peer.respond(request.receipt(), proto::Ack {}).await?;
216 Ok(())
217 }
218
219 async fn register_project(
220 mut self: Arc<Server>,
221 request: TypedEnvelope<proto::RegisterProject>,
222 ) -> tide::Result<()> {
223 let project_id = {
224 let mut state = self.state_mut();
225 let user_id = state.user_id_for_connection(request.sender_id)?;
226 state.register_project(request.sender_id, user_id)
227 };
228 self.peer
229 .respond(
230 request.receipt(),
231 proto::RegisterProjectResponse { project_id },
232 )
233 .await?;
234 Ok(())
235 }
236
237 async fn unregister_project(
238 mut self: Arc<Server>,
239 request: TypedEnvelope<proto::UnregisterProject>,
240 ) -> tide::Result<()> {
241 let project = self
242 .state_mut()
243 .unregister_project(request.payload.project_id, request.sender_id)
244 .ok_or_else(|| anyhow!("no such project"))?;
245 self.update_contacts_for_users(project.authorized_user_ids().iter())
246 .await?;
247 Ok(())
248 }
249
250 async fn share_project(
251 mut self: Arc<Server>,
252 request: TypedEnvelope<proto::ShareProject>,
253 ) -> tide::Result<()> {
254 self.state_mut()
255 .share_project(request.payload.project_id, request.sender_id);
256 self.peer.respond(request.receipt(), proto::Ack {}).await?;
257 Ok(())
258 }
259
260 async fn unshare_project(
261 mut self: Arc<Server>,
262 request: TypedEnvelope<proto::UnshareProject>,
263 ) -> tide::Result<()> {
264 let project_id = request.payload.project_id;
265 let project = self
266 .state_mut()
267 .unshare_project(project_id, request.sender_id)?;
268
269 broadcast(request.sender_id, project.connection_ids, |conn_id| {
270 self.peer
271 .send(conn_id, proto::UnshareProject { project_id })
272 })
273 .await?;
274 self.update_contacts_for_users(&project.authorized_user_ids)
275 .await?;
276
277 Ok(())
278 }
279
280 async fn join_project(
281 mut self: Arc<Server>,
282 request: TypedEnvelope<proto::JoinProject>,
283 ) -> tide::Result<()> {
284 let project_id = request.payload.project_id;
285
286 let user_id = self.state().user_id_for_connection(request.sender_id)?;
287 let response_data = self
288 .state_mut()
289 .join_project(request.sender_id, user_id, project_id)
290 .and_then(|joined| {
291 let share = joined.project.share()?;
292 let peer_count = share.guests.len();
293 let mut collaborators = Vec::with_capacity(peer_count);
294 collaborators.push(proto::Collaborator {
295 peer_id: joined.project.host_connection_id.0,
296 replica_id: 0,
297 user_id: joined.project.host_user_id.to_proto(),
298 });
299 let worktrees = joined
300 .project
301 .worktrees
302 .iter()
303 .filter_map(|(id, worktree)| {
304 worktree.share.as_ref().map(|share| proto::Worktree {
305 id: *id,
306 root_name: worktree.root_name.clone(),
307 entries: share.entries.values().cloned().collect(),
308 diagnostic_summaries: share
309 .diagnostic_summaries
310 .values()
311 .cloned()
312 .collect(),
313 weak: worktree.weak,
314 })
315 })
316 .collect();
317 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
318 if *peer_conn_id != request.sender_id {
319 collaborators.push(proto::Collaborator {
320 peer_id: peer_conn_id.0,
321 replica_id: *peer_replica_id as u32,
322 user_id: peer_user_id.to_proto(),
323 });
324 }
325 }
326 let response = proto::JoinProjectResponse {
327 worktrees,
328 replica_id: joined.replica_id as u32,
329 collaborators,
330 };
331 let connection_ids = joined.project.connection_ids();
332 let contact_user_ids = joined.project.authorized_user_ids();
333 Ok((response, connection_ids, contact_user_ids))
334 });
335
336 match response_data {
337 Ok((response, connection_ids, contact_user_ids)) => {
338 broadcast(request.sender_id, connection_ids, |conn_id| {
339 self.peer.send(
340 conn_id,
341 proto::AddProjectCollaborator {
342 project_id: project_id,
343 collaborator: Some(proto::Collaborator {
344 peer_id: request.sender_id.0,
345 replica_id: response.replica_id,
346 user_id: user_id.to_proto(),
347 }),
348 },
349 )
350 })
351 .await?;
352 self.peer.respond(request.receipt(), response).await?;
353 self.update_contacts_for_users(&contact_user_ids).await?;
354 }
355 Err(error) => {
356 self.peer
357 .respond_with_error(
358 request.receipt(),
359 proto::Error {
360 message: error.to_string(),
361 },
362 )
363 .await?;
364 }
365 }
366
367 Ok(())
368 }
369
370 async fn leave_project(
371 mut self: Arc<Server>,
372 request: TypedEnvelope<proto::LeaveProject>,
373 ) -> tide::Result<()> {
374 let sender_id = request.sender_id;
375 let project_id = request.payload.project_id;
376 let worktree = self.state_mut().leave_project(sender_id, project_id);
377 if let Some(worktree) = worktree {
378 broadcast(sender_id, worktree.connection_ids, |conn_id| {
379 self.peer.send(
380 conn_id,
381 proto::RemoveProjectCollaborator {
382 project_id,
383 peer_id: sender_id.0,
384 },
385 )
386 })
387 .await?;
388 self.update_contacts_for_users(&worktree.authorized_user_ids)
389 .await?;
390 }
391 Ok(())
392 }
393
394 async fn register_worktree(
395 mut self: Arc<Server>,
396 request: TypedEnvelope<proto::RegisterWorktree>,
397 ) -> tide::Result<()> {
398 let receipt = request.receipt();
399 let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
400
401 let mut contact_user_ids = HashSet::default();
402 contact_user_ids.insert(host_user_id);
403 for github_login in request.payload.authorized_logins {
404 match self.app_state.db.create_user(&github_login, false).await {
405 Ok(contact_user_id) => {
406 contact_user_ids.insert(contact_user_id);
407 }
408 Err(err) => {
409 let message = err.to_string();
410 self.peer
411 .respond_with_error(receipt, proto::Error { message })
412 .await?;
413 return Ok(());
414 }
415 }
416 }
417
418 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
419 let ok = self.state_mut().register_worktree(
420 request.payload.project_id,
421 request.payload.worktree_id,
422 Worktree {
423 authorized_user_ids: contact_user_ids.clone(),
424 root_name: request.payload.root_name,
425 share: None,
426 weak: false,
427 },
428 );
429
430 if ok {
431 self.peer.respond(receipt, proto::Ack {}).await?;
432 self.update_contacts_for_users(&contact_user_ids).await?;
433 } else {
434 self.peer
435 .respond_with_error(
436 receipt,
437 proto::Error {
438 message: NO_SUCH_PROJECT.to_string(),
439 },
440 )
441 .await?;
442 }
443
444 Ok(())
445 }
446
447 async fn unregister_worktree(
448 mut self: Arc<Server>,
449 request: TypedEnvelope<proto::UnregisterWorktree>,
450 ) -> tide::Result<()> {
451 let project_id = request.payload.project_id;
452 let worktree_id = request.payload.worktree_id;
453 let (worktree, guest_connection_ids) =
454 self.state_mut()
455 .unregister_worktree(project_id, worktree_id, request.sender_id)?;
456
457 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
458 self.peer.send(
459 conn_id,
460 proto::UnregisterWorktree {
461 project_id,
462 worktree_id,
463 },
464 )
465 })
466 .await?;
467 self.update_contacts_for_users(&worktree.authorized_user_ids)
468 .await?;
469 Ok(())
470 }
471
472 async fn share_worktree(
473 mut self: Arc<Server>,
474 mut request: TypedEnvelope<proto::ShareWorktree>,
475 ) -> tide::Result<()> {
476 let worktree = request
477 .payload
478 .worktree
479 .as_mut()
480 .ok_or_else(|| anyhow!("missing worktree"))?;
481 let entries = mem::take(&mut worktree.entries)
482 .into_iter()
483 .map(|entry| (entry.id, entry))
484 .collect();
485
486 let diagnostic_summaries = mem::take(&mut worktree.diagnostic_summaries)
487 .into_iter()
488 .map(|summary| (PathBuf::from(summary.path.clone()), summary))
489 .collect();
490
491 let contact_user_ids = self.state_mut().share_worktree(
492 request.payload.project_id,
493 worktree.id,
494 request.sender_id,
495 entries,
496 diagnostic_summaries,
497 );
498 if let Some(contact_user_ids) = contact_user_ids {
499 self.peer.respond(request.receipt(), proto::Ack {}).await?;
500 self.update_contacts_for_users(&contact_user_ids).await?;
501 } else {
502 self.peer
503 .respond_with_error(
504 request.receipt(),
505 proto::Error {
506 message: "no such worktree".to_string(),
507 },
508 )
509 .await?;
510 }
511 Ok(())
512 }
513
514 async fn update_worktree(
515 mut self: Arc<Server>,
516 request: TypedEnvelope<proto::UpdateWorktree>,
517 ) -> tide::Result<()> {
518 let connection_ids = self
519 .state_mut()
520 .update_worktree(
521 request.sender_id,
522 request.payload.project_id,
523 request.payload.worktree_id,
524 &request.payload.removed_entries,
525 &request.payload.updated_entries,
526 )
527 .ok_or_else(|| anyhow!("no such worktree"))?;
528
529 broadcast(request.sender_id, connection_ids, |connection_id| {
530 self.peer
531 .forward_send(request.sender_id, connection_id, request.payload.clone())
532 })
533 .await?;
534
535 Ok(())
536 }
537
538 async fn update_diagnostic_summary(
539 mut self: Arc<Server>,
540 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
541 ) -> tide::Result<()> {
542 let receiver_ids = request
543 .payload
544 .summary
545 .clone()
546 .and_then(|summary| {
547 self.state_mut().update_diagnostic_summary(
548 request.payload.project_id,
549 request.payload.worktree_id,
550 request.sender_id,
551 summary,
552 )
553 })
554 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
555
556 broadcast(request.sender_id, receiver_ids, |connection_id| {
557 self.peer
558 .forward_send(request.sender_id, connection_id, request.payload.clone())
559 })
560 .await?;
561 Ok(())
562 }
563
564 async fn disk_based_diagnostics_updating(
565 self: Arc<Server>,
566 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
567 ) -> tide::Result<()> {
568 let receiver_ids = self
569 .state()
570 .project_connection_ids(request.payload.project_id, request.sender_id)
571 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
572 broadcast(request.sender_id, receiver_ids, |connection_id| {
573 self.peer
574 .forward_send(request.sender_id, connection_id, request.payload.clone())
575 })
576 .await?;
577 Ok(())
578 }
579
580 async fn disk_based_diagnostics_updated(
581 self: Arc<Server>,
582 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
583 ) -> tide::Result<()> {
584 let receiver_ids = self
585 .state()
586 .project_connection_ids(request.payload.project_id, request.sender_id)
587 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
588 broadcast(request.sender_id, receiver_ids, |connection_id| {
589 self.peer
590 .forward_send(request.sender_id, connection_id, request.payload.clone())
591 })
592 .await?;
593 Ok(())
594 }
595
596 async fn open_buffer(
597 self: Arc<Server>,
598 request: TypedEnvelope<proto::OpenBuffer>,
599 ) -> tide::Result<()> {
600 let receipt = request.receipt();
601 let host_connection_id = self
602 .state()
603 .read_project(request.payload.project_id, request.sender_id)
604 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
605 .host_connection_id;
606 let response = self
607 .peer
608 .forward_request(request.sender_id, host_connection_id, request.payload)
609 .await?;
610 self.peer.respond(receipt, response).await?;
611 Ok(())
612 }
613
614 async fn close_buffer(
615 self: Arc<Server>,
616 request: TypedEnvelope<proto::CloseBuffer>,
617 ) -> tide::Result<()> {
618 let host_connection_id = self
619 .state()
620 .read_project(request.payload.project_id, request.sender_id)
621 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
622 .host_connection_id;
623 self.peer
624 .forward_send(request.sender_id, host_connection_id, request.payload)
625 .await?;
626 Ok(())
627 }
628
629 async fn save_buffer(
630 self: Arc<Server>,
631 request: TypedEnvelope<proto::SaveBuffer>,
632 ) -> tide::Result<()> {
633 let host;
634 let guests;
635 {
636 let state = self.state();
637 let project = state
638 .read_project(request.payload.project_id, request.sender_id)
639 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
640 host = project.host_connection_id;
641 guests = project.guest_connection_ids()
642 }
643
644 let sender = request.sender_id;
645 let receipt = request.receipt();
646 let response = self
647 .peer
648 .forward_request(sender, host, request.payload.clone())
649 .await?;
650
651 broadcast(host, guests, |conn_id| {
652 let response = response.clone();
653 let peer = &self.peer;
654 async move {
655 if conn_id == sender {
656 peer.respond(receipt, response).await
657 } else {
658 peer.forward_send(host, conn_id, response).await
659 }
660 }
661 })
662 .await?;
663
664 Ok(())
665 }
666
667 async fn format_buffer(
668 self: Arc<Server>,
669 request: TypedEnvelope<proto::FormatBuffer>,
670 ) -> tide::Result<()> {
671 let host;
672 {
673 let state = self.state();
674 let project = state
675 .read_project(request.payload.project_id, request.sender_id)
676 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
677 host = project.host_connection_id;
678 }
679
680 let sender = request.sender_id;
681 let receipt = request.receipt();
682 let response = self
683 .peer
684 .forward_request(sender, host, request.payload.clone())
685 .await?;
686 self.peer.respond(receipt, response).await?;
687
688 Ok(())
689 }
690
691 async fn update_buffer(
692 self: Arc<Server>,
693 request: TypedEnvelope<proto::UpdateBuffer>,
694 ) -> tide::Result<()> {
695 let receiver_ids = self
696 .state()
697 .project_connection_ids(request.payload.project_id, request.sender_id)
698 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
699 broadcast(request.sender_id, receiver_ids, |connection_id| {
700 self.peer
701 .forward_send(request.sender_id, connection_id, request.payload.clone())
702 })
703 .await?;
704 self.peer.respond(request.receipt(), proto::Ack {}).await?;
705 Ok(())
706 }
707
708 async fn update_buffer_file(
709 self: Arc<Server>,
710 request: TypedEnvelope<proto::UpdateBufferFile>,
711 ) -> tide::Result<()> {
712 let receiver_ids = self
713 .state()
714 .project_connection_ids(request.payload.project_id, request.sender_id)
715 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
716 broadcast(request.sender_id, receiver_ids, |connection_id| {
717 self.peer
718 .forward_send(request.sender_id, connection_id, request.payload.clone())
719 })
720 .await?;
721 Ok(())
722 }
723
724 async fn buffer_saved(
725 self: Arc<Server>,
726 request: TypedEnvelope<proto::BufferSaved>,
727 ) -> tide::Result<()> {
728 let receiver_ids = self
729 .state()
730 .project_connection_ids(request.payload.project_id, request.sender_id)
731 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
732 broadcast(request.sender_id, receiver_ids, |connection_id| {
733 self.peer
734 .forward_send(request.sender_id, connection_id, request.payload.clone())
735 })
736 .await?;
737 Ok(())
738 }
739
740 async fn get_channels(
741 self: Arc<Server>,
742 request: TypedEnvelope<proto::GetChannels>,
743 ) -> tide::Result<()> {
744 let user_id = self.state().user_id_for_connection(request.sender_id)?;
745 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
746 self.peer
747 .respond(
748 request.receipt(),
749 proto::GetChannelsResponse {
750 channels: channels
751 .into_iter()
752 .map(|chan| proto::Channel {
753 id: chan.id.to_proto(),
754 name: chan.name,
755 })
756 .collect(),
757 },
758 )
759 .await?;
760 Ok(())
761 }
762
763 async fn get_users(
764 self: Arc<Server>,
765 request: TypedEnvelope<proto::GetUsers>,
766 ) -> tide::Result<()> {
767 let receipt = request.receipt();
768 let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
769 let users = self
770 .app_state
771 .db
772 .get_users_by_ids(user_ids)
773 .await?
774 .into_iter()
775 .map(|user| proto::User {
776 id: user.id.to_proto(),
777 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
778 github_login: user.github_login,
779 })
780 .collect();
781 self.peer
782 .respond(receipt, proto::GetUsersResponse { users })
783 .await?;
784 Ok(())
785 }
786
787 async fn update_contacts_for_users<'a>(
788 self: &Arc<Server>,
789 user_ids: impl IntoIterator<Item = &'a UserId>,
790 ) -> tide::Result<()> {
791 let mut send_futures = Vec::new();
792
793 {
794 let state = self.state();
795 for user_id in user_ids {
796 let contacts = state.contacts_for_user(*user_id);
797 for connection_id in state.connection_ids_for_user(*user_id) {
798 send_futures.push(self.peer.send(
799 connection_id,
800 proto::UpdateContacts {
801 contacts: contacts.clone(),
802 },
803 ));
804 }
805 }
806 }
807 futures::future::try_join_all(send_futures).await?;
808
809 Ok(())
810 }
811
812 async fn join_channel(
813 mut self: Arc<Self>,
814 request: TypedEnvelope<proto::JoinChannel>,
815 ) -> tide::Result<()> {
816 let user_id = self.state().user_id_for_connection(request.sender_id)?;
817 let channel_id = ChannelId::from_proto(request.payload.channel_id);
818 if !self
819 .app_state
820 .db
821 .can_user_access_channel(user_id, channel_id)
822 .await?
823 {
824 Err(anyhow!("access denied"))?;
825 }
826
827 self.state_mut().join_channel(request.sender_id, channel_id);
828 let messages = self
829 .app_state
830 .db
831 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
832 .await?
833 .into_iter()
834 .map(|msg| proto::ChannelMessage {
835 id: msg.id.to_proto(),
836 body: msg.body,
837 timestamp: msg.sent_at.unix_timestamp() as u64,
838 sender_id: msg.sender_id.to_proto(),
839 nonce: Some(msg.nonce.as_u128().into()),
840 })
841 .collect::<Vec<_>>();
842 self.peer
843 .respond(
844 request.receipt(),
845 proto::JoinChannelResponse {
846 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
847 messages,
848 },
849 )
850 .await?;
851 Ok(())
852 }
853
854 async fn leave_channel(
855 mut self: Arc<Self>,
856 request: TypedEnvelope<proto::LeaveChannel>,
857 ) -> tide::Result<()> {
858 let user_id = self.state().user_id_for_connection(request.sender_id)?;
859 let channel_id = ChannelId::from_proto(request.payload.channel_id);
860 if !self
861 .app_state
862 .db
863 .can_user_access_channel(user_id, channel_id)
864 .await?
865 {
866 Err(anyhow!("access denied"))?;
867 }
868
869 self.state_mut()
870 .leave_channel(request.sender_id, channel_id);
871
872 Ok(())
873 }
874
875 async fn send_channel_message(
876 self: Arc<Self>,
877 request: TypedEnvelope<proto::SendChannelMessage>,
878 ) -> tide::Result<()> {
879 let receipt = request.receipt();
880 let channel_id = ChannelId::from_proto(request.payload.channel_id);
881 let user_id;
882 let connection_ids;
883 {
884 let state = self.state();
885 user_id = state.user_id_for_connection(request.sender_id)?;
886 if let Some(ids) = state.channel_connection_ids(channel_id) {
887 connection_ids = ids;
888 } else {
889 return Ok(());
890 }
891 }
892
893 // Validate the message body.
894 let body = request.payload.body.trim().to_string();
895 if body.len() > MAX_MESSAGE_LEN {
896 self.peer
897 .respond_with_error(
898 receipt,
899 proto::Error {
900 message: "message is too long".to_string(),
901 },
902 )
903 .await?;
904 return Ok(());
905 }
906 if body.is_empty() {
907 self.peer
908 .respond_with_error(
909 receipt,
910 proto::Error {
911 message: "message can't be blank".to_string(),
912 },
913 )
914 .await?;
915 return Ok(());
916 }
917
918 let timestamp = OffsetDateTime::now_utc();
919 let nonce = if let Some(nonce) = request.payload.nonce {
920 nonce
921 } else {
922 self.peer
923 .respond_with_error(
924 receipt,
925 proto::Error {
926 message: "nonce can't be blank".to_string(),
927 },
928 )
929 .await?;
930 return Ok(());
931 };
932
933 let message_id = self
934 .app_state
935 .db
936 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
937 .await?
938 .to_proto();
939 let message = proto::ChannelMessage {
940 sender_id: user_id.to_proto(),
941 id: message_id,
942 body,
943 timestamp: timestamp.unix_timestamp() as u64,
944 nonce: Some(nonce),
945 };
946 broadcast(request.sender_id, connection_ids, |conn_id| {
947 self.peer.send(
948 conn_id,
949 proto::ChannelMessageSent {
950 channel_id: channel_id.to_proto(),
951 message: Some(message.clone()),
952 },
953 )
954 })
955 .await?;
956 self.peer
957 .respond(
958 receipt,
959 proto::SendChannelMessageResponse {
960 message: Some(message),
961 },
962 )
963 .await?;
964 Ok(())
965 }
966
967 async fn get_channel_messages(
968 self: Arc<Self>,
969 request: TypedEnvelope<proto::GetChannelMessages>,
970 ) -> tide::Result<()> {
971 let user_id = self.state().user_id_for_connection(request.sender_id)?;
972 let channel_id = ChannelId::from_proto(request.payload.channel_id);
973 if !self
974 .app_state
975 .db
976 .can_user_access_channel(user_id, channel_id)
977 .await?
978 {
979 Err(anyhow!("access denied"))?;
980 }
981
982 let messages = self
983 .app_state
984 .db
985 .get_channel_messages(
986 channel_id,
987 MESSAGE_COUNT_PER_PAGE,
988 Some(MessageId::from_proto(request.payload.before_message_id)),
989 )
990 .await?
991 .into_iter()
992 .map(|msg| proto::ChannelMessage {
993 id: msg.id.to_proto(),
994 body: msg.body,
995 timestamp: msg.sent_at.unix_timestamp() as u64,
996 sender_id: msg.sender_id.to_proto(),
997 nonce: Some(msg.nonce.as_u128().into()),
998 })
999 .collect::<Vec<_>>();
1000 self.peer
1001 .respond(
1002 request.receipt(),
1003 proto::GetChannelMessagesResponse {
1004 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1005 messages,
1006 },
1007 )
1008 .await?;
1009 Ok(())
1010 }
1011
1012 fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
1013 self.store.read()
1014 }
1015
1016 fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
1017 self.store.write()
1018 }
1019}
1020
1021pub async fn broadcast<F, T>(
1022 sender_id: ConnectionId,
1023 receiver_ids: Vec<ConnectionId>,
1024 mut f: F,
1025) -> anyhow::Result<()>
1026where
1027 F: FnMut(ConnectionId) -> T,
1028 T: Future<Output = anyhow::Result<()>>,
1029{
1030 let futures = receiver_ids
1031 .into_iter()
1032 .filter(|id| *id != sender_id)
1033 .map(|id| f(id));
1034 futures::future::try_join_all(futures).await?;
1035 Ok(())
1036}
1037
1038pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1039 let server = Server::new(app.state().clone(), rpc.clone(), None);
1040 app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1041 let server = server.clone();
1042 async move {
1043 const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1044
1045 let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1046 let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1047 let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1048 let client_protocol_version: Option<u32> = request
1049 .header("X-Zed-Protocol-Version")
1050 .and_then(|v| v.as_str().parse().ok());
1051
1052 if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1053 return Ok(Response::new(StatusCode::UpgradeRequired));
1054 }
1055
1056 let header = match request.header("Sec-Websocket-Key") {
1057 Some(h) => h.as_str(),
1058 None => return Err(anyhow!("expected sec-websocket-key"))?,
1059 };
1060
1061 let user_id = process_auth_header(&request).await?;
1062
1063 let mut response = Response::new(StatusCode::SwitchingProtocols);
1064 response.insert_header(UPGRADE, "websocket");
1065 response.insert_header(CONNECTION, "Upgrade");
1066 let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1067 response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1068 response.insert_header("Sec-Websocket-Version", "13");
1069
1070 let http_res: &mut tide::http::Response = response.as_mut();
1071 let upgrade_receiver = http_res.recv_upgrade().await;
1072 let addr = request.remote().unwrap_or("unknown").to_string();
1073 task::spawn(async move {
1074 if let Some(stream) = upgrade_receiver.await {
1075 server
1076 .handle_connection(
1077 Connection::new(
1078 WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1079 ),
1080 addr,
1081 user_id,
1082 None,
1083 )
1084 .await;
1085 }
1086 });
1087
1088 Ok(response)
1089 }
1090 });
1091}
1092
1093fn header_contains_ignore_case<T>(
1094 request: &tide::Request<T>,
1095 header_name: HeaderName,
1096 value: &str,
1097) -> bool {
1098 request
1099 .header(header_name)
1100 .map(|h| {
1101 h.as_str()
1102 .split(',')
1103 .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1104 })
1105 .unwrap_or(false)
1106}
1107
1108#[cfg(test)]
1109mod tests {
1110 use super::*;
1111 use crate::{
1112 auth,
1113 db::{tests::TestDb, UserId},
1114 github, AppState, Config,
1115 };
1116 use ::rpc::Peer;
1117 use async_std::task;
1118 use gpui::{executor, ModelHandle, TestAppContext};
1119 use parking_lot::Mutex;
1120 use postage::{mpsc, watch};
1121 use rpc::PeerId;
1122 use serde_json::json;
1123 use sqlx::types::time::OffsetDateTime;
1124 use std::{
1125 ops::Deref,
1126 path::Path,
1127 rc::Rc,
1128 sync::{
1129 atomic::{AtomicBool, Ordering::SeqCst},
1130 Arc,
1131 },
1132 time::Duration,
1133 };
1134 use zed::{
1135 client::{
1136 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1137 EstablishConnectionError, UserStore,
1138 },
1139 editor::{Editor, EditorSettings, Input, MultiBuffer},
1140 fs::{FakeFs, Fs as _},
1141 language::{
1142 tree_sitter_rust, Diagnostic, DiagnosticEntry, Language, LanguageConfig,
1143 LanguageRegistry, LanguageServerConfig, Point,
1144 },
1145 lsp,
1146 project::{DiagnosticSummary, Project, ProjectPath},
1147 };
1148
1149 #[gpui::test]
1150 async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1151 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1152 let lang_registry = Arc::new(LanguageRegistry::new());
1153 let fs = Arc::new(FakeFs::new());
1154 cx_a.foreground().forbid_parking();
1155
1156 // Connect to a server as 2 clients.
1157 let mut server = TestServer::start(cx_a.foreground()).await;
1158 let client_a = server.create_client(&mut cx_a, "user_a").await;
1159 let client_b = server.create_client(&mut cx_b, "user_b").await;
1160
1161 // Share a project as client A
1162 fs.insert_tree(
1163 "/a",
1164 json!({
1165 ".zed.toml": r#"collaborators = ["user_b"]"#,
1166 "a.txt": "a-contents",
1167 "b.txt": "b-contents",
1168 }),
1169 )
1170 .await;
1171 let project_a = cx_a.update(|cx| {
1172 Project::local(
1173 client_a.clone(),
1174 client_a.user_store.clone(),
1175 lang_registry.clone(),
1176 fs.clone(),
1177 cx,
1178 )
1179 });
1180 let (worktree_a, _) = project_a
1181 .update(&mut cx_a, |p, cx| {
1182 p.find_or_create_local_worktree("/a", false, cx)
1183 })
1184 .await
1185 .unwrap();
1186 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1187 worktree_a
1188 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1189 .await;
1190 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1191 project_a
1192 .update(&mut cx_a, |p, cx| p.share(cx))
1193 .await
1194 .unwrap();
1195
1196 // Join that project as client B
1197 let project_b = Project::remote(
1198 project_id,
1199 client_b.clone(),
1200 client_b.user_store.clone(),
1201 lang_registry.clone(),
1202 fs.clone(),
1203 &mut cx_b.to_async(),
1204 )
1205 .await
1206 .unwrap();
1207
1208 let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1209 assert_eq!(
1210 project
1211 .collaborators()
1212 .get(&client_a.peer_id)
1213 .unwrap()
1214 .user
1215 .github_login,
1216 "user_a"
1217 );
1218 project.replica_id()
1219 });
1220 project_a
1221 .condition(&cx_a, |tree, _| {
1222 tree.collaborators()
1223 .get(&client_b.peer_id)
1224 .map_or(false, |collaborator| {
1225 collaborator.replica_id == replica_id_b
1226 && collaborator.user.github_login == "user_b"
1227 })
1228 })
1229 .await;
1230
1231 // Open the same file as client B and client A.
1232 let buffer_b = project_b
1233 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1234 .await
1235 .unwrap();
1236 let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1237 buffer_b.read_with(&cx_b, |buf, cx| {
1238 assert_eq!(buf.read(cx).text(), "b-contents")
1239 });
1240 project_a.read_with(&cx_a, |project, cx| {
1241 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1242 });
1243 let buffer_a = project_a
1244 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1245 .await
1246 .unwrap();
1247
1248 let editor_b = cx_b.add_view(window_b, |cx| {
1249 Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), cx)
1250 });
1251
1252 // TODO
1253 // // Create a selection set as client B and see that selection set as client A.
1254 // buffer_a
1255 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1256 // .await;
1257
1258 // Edit the buffer as client B and see that edit as client A.
1259 editor_b.update(&mut cx_b, |editor, cx| {
1260 editor.handle_input(&Input("ok, ".into()), cx)
1261 });
1262 buffer_a
1263 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1264 .await;
1265
1266 // TODO
1267 // // Remove the selection set as client B, see those selections disappear as client A.
1268 cx_b.update(move |_| drop(editor_b));
1269 // buffer_a
1270 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1271 // .await;
1272
1273 // Close the buffer as client A, see that the buffer is closed.
1274 cx_a.update(move |_| drop(buffer_a));
1275 project_a
1276 .condition(&cx_a, |project, cx| {
1277 !project.has_open_buffer((worktree_id, "b.txt"), cx)
1278 })
1279 .await;
1280
1281 // Dropping the client B's project removes client B from client A's collaborators.
1282 cx_b.update(move |_| drop(project_b));
1283 project_a
1284 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1285 .await;
1286 }
1287
1288 #[gpui::test]
1289 async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1290 let lang_registry = Arc::new(LanguageRegistry::new());
1291 let fs = Arc::new(FakeFs::new());
1292 cx_a.foreground().forbid_parking();
1293
1294 // Connect to a server as 2 clients.
1295 let mut server = TestServer::start(cx_a.foreground()).await;
1296 let client_a = server.create_client(&mut cx_a, "user_a").await;
1297 let client_b = server.create_client(&mut cx_b, "user_b").await;
1298
1299 // Share a project as client A
1300 fs.insert_tree(
1301 "/a",
1302 json!({
1303 ".zed.toml": r#"collaborators = ["user_b"]"#,
1304 "a.txt": "a-contents",
1305 "b.txt": "b-contents",
1306 }),
1307 )
1308 .await;
1309 let project_a = cx_a.update(|cx| {
1310 Project::local(
1311 client_a.clone(),
1312 client_a.user_store.clone(),
1313 lang_registry.clone(),
1314 fs.clone(),
1315 cx,
1316 )
1317 });
1318 let (worktree_a, _) = project_a
1319 .update(&mut cx_a, |p, cx| {
1320 p.find_or_create_local_worktree("/a", false, cx)
1321 })
1322 .await
1323 .unwrap();
1324 worktree_a
1325 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1326 .await;
1327 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1328 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1329 project_a
1330 .update(&mut cx_a, |p, cx| p.share(cx))
1331 .await
1332 .unwrap();
1333 assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1334
1335 // Join that project as client B
1336 let project_b = Project::remote(
1337 project_id,
1338 client_b.clone(),
1339 client_b.user_store.clone(),
1340 lang_registry.clone(),
1341 fs.clone(),
1342 &mut cx_b.to_async(),
1343 )
1344 .await
1345 .unwrap();
1346 project_b
1347 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1348 .await
1349 .unwrap();
1350
1351 // Unshare the project as client A
1352 project_a
1353 .update(&mut cx_a, |project, cx| project.unshare(cx))
1354 .await
1355 .unwrap();
1356 project_b
1357 .condition(&mut cx_b, |project, _| project.is_read_only())
1358 .await;
1359 assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1360 drop(project_b);
1361
1362 // Share the project again and ensure guests can still join.
1363 project_a
1364 .update(&mut cx_a, |project, cx| project.share(cx))
1365 .await
1366 .unwrap();
1367 assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1368
1369 let project_c = Project::remote(
1370 project_id,
1371 client_b.clone(),
1372 client_b.user_store.clone(),
1373 lang_registry.clone(),
1374 fs.clone(),
1375 &mut cx_b.to_async(),
1376 )
1377 .await
1378 .unwrap();
1379 project_c
1380 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1381 .await
1382 .unwrap();
1383 }
1384
1385 #[gpui::test]
1386 async fn test_propagate_saves_and_fs_changes(
1387 mut cx_a: TestAppContext,
1388 mut cx_b: TestAppContext,
1389 mut cx_c: TestAppContext,
1390 ) {
1391 let lang_registry = Arc::new(LanguageRegistry::new());
1392 let fs = Arc::new(FakeFs::new());
1393 cx_a.foreground().forbid_parking();
1394
1395 // Connect to a server as 3 clients.
1396 let mut server = TestServer::start(cx_a.foreground()).await;
1397 let client_a = server.create_client(&mut cx_a, "user_a").await;
1398 let client_b = server.create_client(&mut cx_b, "user_b").await;
1399 let client_c = server.create_client(&mut cx_c, "user_c").await;
1400
1401 // Share a worktree as client A.
1402 fs.insert_tree(
1403 "/a",
1404 json!({
1405 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1406 "file1": "",
1407 "file2": ""
1408 }),
1409 )
1410 .await;
1411 let project_a = cx_a.update(|cx| {
1412 Project::local(
1413 client_a.clone(),
1414 client_a.user_store.clone(),
1415 lang_registry.clone(),
1416 fs.clone(),
1417 cx,
1418 )
1419 });
1420 let (worktree_a, _) = project_a
1421 .update(&mut cx_a, |p, cx| {
1422 p.find_or_create_local_worktree("/a", false, cx)
1423 })
1424 .await
1425 .unwrap();
1426 worktree_a
1427 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1428 .await;
1429 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1430 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1431 project_a
1432 .update(&mut cx_a, |p, cx| p.share(cx))
1433 .await
1434 .unwrap();
1435
1436 // Join that worktree as clients B and C.
1437 let project_b = Project::remote(
1438 project_id,
1439 client_b.clone(),
1440 client_b.user_store.clone(),
1441 lang_registry.clone(),
1442 fs.clone(),
1443 &mut cx_b.to_async(),
1444 )
1445 .await
1446 .unwrap();
1447 let project_c = Project::remote(
1448 project_id,
1449 client_c.clone(),
1450 client_c.user_store.clone(),
1451 lang_registry.clone(),
1452 fs.clone(),
1453 &mut cx_c.to_async(),
1454 )
1455 .await
1456 .unwrap();
1457 let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1458 let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1459
1460 // Open and edit a buffer as both guests B and C.
1461 let buffer_b = project_b
1462 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1463 .await
1464 .unwrap();
1465 let buffer_c = project_c
1466 .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1467 .await
1468 .unwrap();
1469 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1470 buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1471
1472 // Open and edit that buffer as the host.
1473 let buffer_a = project_a
1474 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1475 .await
1476 .unwrap();
1477
1478 buffer_a
1479 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1480 .await;
1481 buffer_a.update(&mut cx_a, |buf, cx| {
1482 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1483 });
1484
1485 // Wait for edits to propagate
1486 buffer_a
1487 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1488 .await;
1489 buffer_b
1490 .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1491 .await;
1492 buffer_c
1493 .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1494 .await;
1495
1496 // Edit the buffer as the host and concurrently save as guest B.
1497 let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1498 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1499 save_b.await.unwrap();
1500 assert_eq!(
1501 fs.load("/a/file1".as_ref()).await.unwrap(),
1502 "hi-a, i-am-c, i-am-b, i-am-a"
1503 );
1504 buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1505 buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1506 buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1507
1508 // Make changes on host's file system, see those changes on guest worktrees.
1509 fs.rename("/a/file1".as_ref(), "/a/file1-renamed".as_ref())
1510 .await
1511 .unwrap();
1512 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
1513 .await
1514 .unwrap();
1515 fs.insert_file(Path::new("/a/file4"), "4".into())
1516 .await
1517 .unwrap();
1518
1519 worktree_a
1520 .condition(&cx_a, |tree, _| tree.file_count() == 4)
1521 .await;
1522 worktree_b
1523 .condition(&cx_b, |tree, _| tree.file_count() == 4)
1524 .await;
1525 worktree_c
1526 .condition(&cx_c, |tree, _| tree.file_count() == 4)
1527 .await;
1528 worktree_a.read_with(&cx_a, |tree, _| {
1529 assert_eq!(
1530 tree.paths()
1531 .map(|p| p.to_string_lossy())
1532 .collect::<Vec<_>>(),
1533 &[".zed.toml", "file1-renamed", "file3", "file4"]
1534 )
1535 });
1536 worktree_b.read_with(&cx_b, |tree, _| {
1537 assert_eq!(
1538 tree.paths()
1539 .map(|p| p.to_string_lossy())
1540 .collect::<Vec<_>>(),
1541 &[".zed.toml", "file1-renamed", "file3", "file4"]
1542 )
1543 });
1544 worktree_c.read_with(&cx_c, |tree, _| {
1545 assert_eq!(
1546 tree.paths()
1547 .map(|p| p.to_string_lossy())
1548 .collect::<Vec<_>>(),
1549 &[".zed.toml", "file1-renamed", "file3", "file4"]
1550 )
1551 });
1552
1553 // Ensure buffer files are updated as well.
1554 buffer_a
1555 .condition(&cx_a, |buf, _| {
1556 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1557 })
1558 .await;
1559 buffer_b
1560 .condition(&cx_b, |buf, _| {
1561 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1562 })
1563 .await;
1564 buffer_c
1565 .condition(&cx_c, |buf, _| {
1566 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1567 })
1568 .await;
1569 }
1570
1571 #[gpui::test]
1572 async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1573 cx_a.foreground().forbid_parking();
1574 let lang_registry = Arc::new(LanguageRegistry::new());
1575 let fs = Arc::new(FakeFs::new());
1576
1577 // Connect to a server as 2 clients.
1578 let mut server = TestServer::start(cx_a.foreground()).await;
1579 let client_a = server.create_client(&mut cx_a, "user_a").await;
1580 let client_b = server.create_client(&mut cx_b, "user_b").await;
1581
1582 // Share a project as client A
1583 fs.insert_tree(
1584 "/dir",
1585 json!({
1586 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1587 "a.txt": "a-contents",
1588 }),
1589 )
1590 .await;
1591
1592 let project_a = cx_a.update(|cx| {
1593 Project::local(
1594 client_a.clone(),
1595 client_a.user_store.clone(),
1596 lang_registry.clone(),
1597 fs.clone(),
1598 cx,
1599 )
1600 });
1601 let (worktree_a, _) = project_a
1602 .update(&mut cx_a, |p, cx| {
1603 p.find_or_create_local_worktree("/dir", false, cx)
1604 })
1605 .await
1606 .unwrap();
1607 worktree_a
1608 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1609 .await;
1610 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1611 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1612 project_a
1613 .update(&mut cx_a, |p, cx| p.share(cx))
1614 .await
1615 .unwrap();
1616
1617 // Join that project as client B
1618 let project_b = Project::remote(
1619 project_id,
1620 client_b.clone(),
1621 client_b.user_store.clone(),
1622 lang_registry.clone(),
1623 fs.clone(),
1624 &mut cx_b.to_async(),
1625 )
1626 .await
1627 .unwrap();
1628 let worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1629
1630 // Open a buffer as client B
1631 let buffer_b = project_b
1632 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1633 .await
1634 .unwrap();
1635 let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1636
1637 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1638 buffer_b.read_with(&cx_b, |buf, _| {
1639 assert!(buf.is_dirty());
1640 assert!(!buf.has_conflict());
1641 });
1642
1643 buffer_b
1644 .update(&mut cx_b, |buf, cx| buf.save(cx))
1645 .await
1646 .unwrap();
1647 worktree_b
1648 .condition(&cx_b, |_, cx| {
1649 buffer_b.read(cx).file().unwrap().mtime() != mtime
1650 })
1651 .await;
1652 buffer_b.read_with(&cx_b, |buf, _| {
1653 assert!(!buf.is_dirty());
1654 assert!(!buf.has_conflict());
1655 });
1656
1657 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1658 buffer_b.read_with(&cx_b, |buf, _| {
1659 assert!(buf.is_dirty());
1660 assert!(!buf.has_conflict());
1661 });
1662 }
1663
1664 #[gpui::test]
1665 async fn test_editing_while_guest_opens_buffer(
1666 mut cx_a: TestAppContext,
1667 mut cx_b: TestAppContext,
1668 ) {
1669 cx_a.foreground().forbid_parking();
1670 let lang_registry = Arc::new(LanguageRegistry::new());
1671 let fs = Arc::new(FakeFs::new());
1672
1673 // Connect to a server as 2 clients.
1674 let mut server = TestServer::start(cx_a.foreground()).await;
1675 let client_a = server.create_client(&mut cx_a, "user_a").await;
1676 let client_b = server.create_client(&mut cx_b, "user_b").await;
1677
1678 // Share a project as client A
1679 fs.insert_tree(
1680 "/dir",
1681 json!({
1682 ".zed.toml": r#"collaborators = ["user_b"]"#,
1683 "a.txt": "a-contents",
1684 }),
1685 )
1686 .await;
1687 let project_a = cx_a.update(|cx| {
1688 Project::local(
1689 client_a.clone(),
1690 client_a.user_store.clone(),
1691 lang_registry.clone(),
1692 fs.clone(),
1693 cx,
1694 )
1695 });
1696 let (worktree_a, _) = project_a
1697 .update(&mut cx_a, |p, cx| {
1698 p.find_or_create_local_worktree("/dir", false, cx)
1699 })
1700 .await
1701 .unwrap();
1702 worktree_a
1703 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1704 .await;
1705 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1706 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1707 project_a
1708 .update(&mut cx_a, |p, cx| p.share(cx))
1709 .await
1710 .unwrap();
1711
1712 // Join that project as client B
1713 let project_b = Project::remote(
1714 project_id,
1715 client_b.clone(),
1716 client_b.user_store.clone(),
1717 lang_registry.clone(),
1718 fs.clone(),
1719 &mut cx_b.to_async(),
1720 )
1721 .await
1722 .unwrap();
1723
1724 // Open a buffer as client A
1725 let buffer_a = project_a
1726 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1727 .await
1728 .unwrap();
1729
1730 // Start opening the same buffer as client B
1731 let buffer_b = cx_b
1732 .background()
1733 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1734 task::yield_now().await;
1735
1736 // Edit the buffer as client A while client B is still opening it.
1737 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1738
1739 let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1740 let buffer_b = buffer_b.await.unwrap();
1741 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1742 }
1743
1744 #[gpui::test]
1745 async fn test_leaving_worktree_while_opening_buffer(
1746 mut cx_a: TestAppContext,
1747 mut cx_b: TestAppContext,
1748 ) {
1749 cx_a.foreground().forbid_parking();
1750 let lang_registry = Arc::new(LanguageRegistry::new());
1751 let fs = Arc::new(FakeFs::new());
1752
1753 // Connect to a server as 2 clients.
1754 let mut server = TestServer::start(cx_a.foreground()).await;
1755 let client_a = server.create_client(&mut cx_a, "user_a").await;
1756 let client_b = server.create_client(&mut cx_b, "user_b").await;
1757
1758 // Share a project as client A
1759 fs.insert_tree(
1760 "/dir",
1761 json!({
1762 ".zed.toml": r#"collaborators = ["user_b"]"#,
1763 "a.txt": "a-contents",
1764 }),
1765 )
1766 .await;
1767 let project_a = cx_a.update(|cx| {
1768 Project::local(
1769 client_a.clone(),
1770 client_a.user_store.clone(),
1771 lang_registry.clone(),
1772 fs.clone(),
1773 cx,
1774 )
1775 });
1776 let (worktree_a, _) = project_a
1777 .update(&mut cx_a, |p, cx| {
1778 p.find_or_create_local_worktree("/dir", false, cx)
1779 })
1780 .await
1781 .unwrap();
1782 worktree_a
1783 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1784 .await;
1785 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1786 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1787 project_a
1788 .update(&mut cx_a, |p, cx| p.share(cx))
1789 .await
1790 .unwrap();
1791
1792 // Join that project as client B
1793 let project_b = Project::remote(
1794 project_id,
1795 client_b.clone(),
1796 client_b.user_store.clone(),
1797 lang_registry.clone(),
1798 fs.clone(),
1799 &mut cx_b.to_async(),
1800 )
1801 .await
1802 .unwrap();
1803
1804 // See that a guest has joined as client A.
1805 project_a
1806 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1807 .await;
1808
1809 // Begin opening a buffer as client B, but leave the project before the open completes.
1810 let buffer_b = cx_b
1811 .background()
1812 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1813 cx_b.update(|_| drop(project_b));
1814 drop(buffer_b);
1815
1816 // See that the guest has left.
1817 project_a
1818 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1819 .await;
1820 }
1821
1822 #[gpui::test]
1823 async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1824 cx_a.foreground().forbid_parking();
1825 let lang_registry = Arc::new(LanguageRegistry::new());
1826 let fs = Arc::new(FakeFs::new());
1827
1828 // Connect to a server as 2 clients.
1829 let mut server = TestServer::start(cx_a.foreground()).await;
1830 let client_a = server.create_client(&mut cx_a, "user_a").await;
1831 let client_b = server.create_client(&mut cx_b, "user_b").await;
1832
1833 // Share a project as client A
1834 fs.insert_tree(
1835 "/a",
1836 json!({
1837 ".zed.toml": r#"collaborators = ["user_b"]"#,
1838 "a.txt": "a-contents",
1839 "b.txt": "b-contents",
1840 }),
1841 )
1842 .await;
1843 let project_a = cx_a.update(|cx| {
1844 Project::local(
1845 client_a.clone(),
1846 client_a.user_store.clone(),
1847 lang_registry.clone(),
1848 fs.clone(),
1849 cx,
1850 )
1851 });
1852 let (worktree_a, _) = project_a
1853 .update(&mut cx_a, |p, cx| {
1854 p.find_or_create_local_worktree("/a", false, cx)
1855 })
1856 .await
1857 .unwrap();
1858 worktree_a
1859 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1860 .await;
1861 let project_id = project_a
1862 .update(&mut cx_a, |project, _| project.next_remote_id())
1863 .await;
1864 project_a
1865 .update(&mut cx_a, |project, cx| project.share(cx))
1866 .await
1867 .unwrap();
1868
1869 // Join that project as client B
1870 let _project_b = Project::remote(
1871 project_id,
1872 client_b.clone(),
1873 client_b.user_store.clone(),
1874 lang_registry.clone(),
1875 fs.clone(),
1876 &mut cx_b.to_async(),
1877 )
1878 .await
1879 .unwrap();
1880
1881 // See that a guest has joined as client A.
1882 project_a
1883 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1884 .await;
1885
1886 // Drop client B's connection and ensure client A observes client B leaving the worktree.
1887 client_b.disconnect(&cx_b.to_async()).unwrap();
1888 project_a
1889 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1890 .await;
1891 }
1892
1893 #[gpui::test]
1894 async fn test_collaborating_with_diagnostics(
1895 mut cx_a: TestAppContext,
1896 mut cx_b: TestAppContext,
1897 ) {
1898 cx_a.foreground().forbid_parking();
1899 let mut lang_registry = Arc::new(LanguageRegistry::new());
1900 let fs = Arc::new(FakeFs::new());
1901
1902 // Set up a fake language server.
1903 let (language_server_config, mut fake_language_server) =
1904 LanguageServerConfig::fake(cx_a.background()).await;
1905 Arc::get_mut(&mut lang_registry)
1906 .unwrap()
1907 .add(Arc::new(Language::new(
1908 LanguageConfig {
1909 name: "Rust".to_string(),
1910 path_suffixes: vec!["rs".to_string()],
1911 language_server: Some(language_server_config),
1912 ..Default::default()
1913 },
1914 Some(tree_sitter_rust::language()),
1915 )));
1916
1917 // Connect to a server as 2 clients.
1918 let mut server = TestServer::start(cx_a.foreground()).await;
1919 let client_a = server.create_client(&mut cx_a, "user_a").await;
1920 let client_b = server.create_client(&mut cx_b, "user_b").await;
1921
1922 // Share a project as client A
1923 fs.insert_tree(
1924 "/a",
1925 json!({
1926 ".zed.toml": r#"collaborators = ["user_b"]"#,
1927 "a.rs": "let one = two",
1928 "other.rs": "",
1929 }),
1930 )
1931 .await;
1932 let project_a = cx_a.update(|cx| {
1933 Project::local(
1934 client_a.clone(),
1935 client_a.user_store.clone(),
1936 lang_registry.clone(),
1937 fs.clone(),
1938 cx,
1939 )
1940 });
1941 let (worktree_a, _) = project_a
1942 .update(&mut cx_a, |p, cx| {
1943 p.find_or_create_local_worktree("/a", false, cx)
1944 })
1945 .await
1946 .unwrap();
1947 worktree_a
1948 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1949 .await;
1950 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1951 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1952 project_a
1953 .update(&mut cx_a, |p, cx| p.share(cx))
1954 .await
1955 .unwrap();
1956
1957 // Cause the language server to start.
1958 let _ = cx_a
1959 .background()
1960 .spawn(project_a.update(&mut cx_a, |project, cx| {
1961 project.open_buffer(
1962 ProjectPath {
1963 worktree_id,
1964 path: Path::new("other.rs").into(),
1965 },
1966 cx,
1967 )
1968 }))
1969 .await
1970 .unwrap();
1971
1972 // Simulate a language server reporting errors for a file.
1973 fake_language_server
1974 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1975 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1976 version: None,
1977 diagnostics: vec![lsp::Diagnostic {
1978 severity: Some(lsp::DiagnosticSeverity::ERROR),
1979 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1980 message: "message 1".to_string(),
1981 ..Default::default()
1982 }],
1983 })
1984 .await;
1985
1986 // Wait for server to see the diagnostics update.
1987 server
1988 .condition(|store| {
1989 let worktree = store
1990 .project(project_id)
1991 .unwrap()
1992 .worktrees
1993 .get(&worktree_id.to_proto())
1994 .unwrap();
1995
1996 !worktree
1997 .share
1998 .as_ref()
1999 .unwrap()
2000 .diagnostic_summaries
2001 .is_empty()
2002 })
2003 .await;
2004
2005 // Join the worktree as client B.
2006 let project_b = Project::remote(
2007 project_id,
2008 client_b.clone(),
2009 client_b.user_store.clone(),
2010 lang_registry.clone(),
2011 fs.clone(),
2012 &mut cx_b.to_async(),
2013 )
2014 .await
2015 .unwrap();
2016
2017 project_b.read_with(&cx_b, |project, cx| {
2018 assert_eq!(
2019 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2020 &[(
2021 ProjectPath {
2022 worktree_id,
2023 path: Arc::from(Path::new("a.rs")),
2024 },
2025 DiagnosticSummary {
2026 error_count: 1,
2027 warning_count: 0,
2028 ..Default::default()
2029 },
2030 )]
2031 )
2032 });
2033
2034 // Simulate a language server reporting more errors for a file.
2035 fake_language_server
2036 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2037 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2038 version: None,
2039 diagnostics: vec![
2040 lsp::Diagnostic {
2041 severity: Some(lsp::DiagnosticSeverity::ERROR),
2042 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2043 message: "message 1".to_string(),
2044 ..Default::default()
2045 },
2046 lsp::Diagnostic {
2047 severity: Some(lsp::DiagnosticSeverity::WARNING),
2048 range: lsp::Range::new(
2049 lsp::Position::new(0, 10),
2050 lsp::Position::new(0, 13),
2051 ),
2052 message: "message 2".to_string(),
2053 ..Default::default()
2054 },
2055 ],
2056 })
2057 .await;
2058
2059 // Client b gets the updated summaries
2060 project_b
2061 .condition(&cx_b, |project, cx| {
2062 project.diagnostic_summaries(cx).collect::<Vec<_>>()
2063 == &[(
2064 ProjectPath {
2065 worktree_id,
2066 path: Arc::from(Path::new("a.rs")),
2067 },
2068 DiagnosticSummary {
2069 error_count: 1,
2070 warning_count: 1,
2071 ..Default::default()
2072 },
2073 )]
2074 })
2075 .await;
2076
2077 // Open the file with the errors on client B. They should be present.
2078 let buffer_b = cx_b
2079 .background()
2080 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2081 .await
2082 .unwrap();
2083
2084 buffer_b.read_with(&cx_b, |buffer, _| {
2085 assert_eq!(
2086 buffer
2087 .snapshot()
2088 .diagnostics_in_range::<_, Point>(0..buffer.len())
2089 .map(|entry| entry)
2090 .collect::<Vec<_>>(),
2091 &[
2092 DiagnosticEntry {
2093 range: Point::new(0, 4)..Point::new(0, 7),
2094 diagnostic: Diagnostic {
2095 group_id: 0,
2096 message: "message 1".to_string(),
2097 severity: lsp::DiagnosticSeverity::ERROR,
2098 is_primary: true,
2099 ..Default::default()
2100 }
2101 },
2102 DiagnosticEntry {
2103 range: Point::new(0, 10)..Point::new(0, 13),
2104 diagnostic: Diagnostic {
2105 group_id: 1,
2106 severity: lsp::DiagnosticSeverity::WARNING,
2107 message: "message 2".to_string(),
2108 is_primary: true,
2109 ..Default::default()
2110 }
2111 }
2112 ]
2113 );
2114 });
2115 }
2116
2117 #[gpui::test]
2118 async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2119 cx_a.foreground().forbid_parking();
2120 let mut lang_registry = Arc::new(LanguageRegistry::new());
2121 let fs = Arc::new(FakeFs::new());
2122
2123 // Set up a fake language server.
2124 let (language_server_config, mut fake_language_server) =
2125 LanguageServerConfig::fake(cx_a.background()).await;
2126 Arc::get_mut(&mut lang_registry)
2127 .unwrap()
2128 .add(Arc::new(Language::new(
2129 LanguageConfig {
2130 name: "Rust".to_string(),
2131 path_suffixes: vec!["rs".to_string()],
2132 language_server: Some(language_server_config),
2133 ..Default::default()
2134 },
2135 Some(tree_sitter_rust::language()),
2136 )));
2137
2138 // Connect to a server as 2 clients.
2139 let mut server = TestServer::start(cx_a.foreground()).await;
2140 let client_a = server.create_client(&mut cx_a, "user_a").await;
2141 let client_b = server.create_client(&mut cx_b, "user_b").await;
2142
2143 // Share a project as client A
2144 fs.insert_tree(
2145 "/a",
2146 json!({
2147 ".zed.toml": r#"collaborators = ["user_b"]"#,
2148 "a.rs": "let one = two",
2149 }),
2150 )
2151 .await;
2152 let project_a = cx_a.update(|cx| {
2153 Project::local(
2154 client_a.clone(),
2155 client_a.user_store.clone(),
2156 lang_registry.clone(),
2157 fs.clone(),
2158 cx,
2159 )
2160 });
2161 let (worktree_a, _) = project_a
2162 .update(&mut cx_a, |p, cx| {
2163 p.find_or_create_local_worktree("/a", false, cx)
2164 })
2165 .await
2166 .unwrap();
2167 worktree_a
2168 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2169 .await;
2170 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2171 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2172 project_a
2173 .update(&mut cx_a, |p, cx| p.share(cx))
2174 .await
2175 .unwrap();
2176
2177 // Join the worktree as client B.
2178 let project_b = Project::remote(
2179 project_id,
2180 client_b.clone(),
2181 client_b.user_store.clone(),
2182 lang_registry.clone(),
2183 fs.clone(),
2184 &mut cx_b.to_async(),
2185 )
2186 .await
2187 .unwrap();
2188
2189 // Open the file to be formatted on client B.
2190 let buffer_b = cx_b
2191 .background()
2192 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2193 .await
2194 .unwrap();
2195
2196 let format = buffer_b.update(&mut cx_b, |buffer, cx| buffer.format(cx));
2197 let (request_id, _) = fake_language_server
2198 .receive_request::<lsp::request::Formatting>()
2199 .await;
2200 fake_language_server
2201 .respond(
2202 request_id,
2203 Some(vec![
2204 lsp::TextEdit {
2205 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2206 new_text: "h".to_string(),
2207 },
2208 lsp::TextEdit {
2209 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2210 new_text: "y".to_string(),
2211 },
2212 ]),
2213 )
2214 .await;
2215 format.await.unwrap();
2216 assert_eq!(
2217 buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2218 "let honey = two"
2219 );
2220 }
2221
2222 #[gpui::test]
2223 async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2224 cx_a.foreground().forbid_parking();
2225
2226 // Connect to a server as 2 clients.
2227 let mut server = TestServer::start(cx_a.foreground()).await;
2228 let client_a = server.create_client(&mut cx_a, "user_a").await;
2229 let client_b = server.create_client(&mut cx_b, "user_b").await;
2230
2231 // Create an org that includes these 2 users.
2232 let db = &server.app_state.db;
2233 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2234 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2235 .await
2236 .unwrap();
2237 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2238 .await
2239 .unwrap();
2240
2241 // Create a channel that includes all the users.
2242 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2243 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2244 .await
2245 .unwrap();
2246 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2247 .await
2248 .unwrap();
2249 db.create_channel_message(
2250 channel_id,
2251 client_b.current_user_id(&cx_b),
2252 "hello A, it's B.",
2253 OffsetDateTime::now_utc(),
2254 1,
2255 )
2256 .await
2257 .unwrap();
2258
2259 let channels_a = cx_a
2260 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2261 channels_a
2262 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2263 .await;
2264 channels_a.read_with(&cx_a, |list, _| {
2265 assert_eq!(
2266 list.available_channels().unwrap(),
2267 &[ChannelDetails {
2268 id: channel_id.to_proto(),
2269 name: "test-channel".to_string()
2270 }]
2271 )
2272 });
2273 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2274 this.get_channel(channel_id.to_proto(), cx).unwrap()
2275 });
2276 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2277 channel_a
2278 .condition(&cx_a, |channel, _| {
2279 channel_messages(channel)
2280 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2281 })
2282 .await;
2283
2284 let channels_b = cx_b
2285 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2286 channels_b
2287 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2288 .await;
2289 channels_b.read_with(&cx_b, |list, _| {
2290 assert_eq!(
2291 list.available_channels().unwrap(),
2292 &[ChannelDetails {
2293 id: channel_id.to_proto(),
2294 name: "test-channel".to_string()
2295 }]
2296 )
2297 });
2298
2299 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2300 this.get_channel(channel_id.to_proto(), cx).unwrap()
2301 });
2302 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2303 channel_b
2304 .condition(&cx_b, |channel, _| {
2305 channel_messages(channel)
2306 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2307 })
2308 .await;
2309
2310 channel_a
2311 .update(&mut cx_a, |channel, cx| {
2312 channel
2313 .send_message("oh, hi B.".to_string(), cx)
2314 .unwrap()
2315 .detach();
2316 let task = channel.send_message("sup".to_string(), cx).unwrap();
2317 assert_eq!(
2318 channel_messages(channel),
2319 &[
2320 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2321 ("user_a".to_string(), "oh, hi B.".to_string(), true),
2322 ("user_a".to_string(), "sup".to_string(), true)
2323 ]
2324 );
2325 task
2326 })
2327 .await
2328 .unwrap();
2329
2330 channel_b
2331 .condition(&cx_b, |channel, _| {
2332 channel_messages(channel)
2333 == [
2334 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2335 ("user_a".to_string(), "oh, hi B.".to_string(), false),
2336 ("user_a".to_string(), "sup".to_string(), false),
2337 ]
2338 })
2339 .await;
2340
2341 assert_eq!(
2342 server
2343 .state()
2344 .await
2345 .channel(channel_id)
2346 .unwrap()
2347 .connection_ids
2348 .len(),
2349 2
2350 );
2351 cx_b.update(|_| drop(channel_b));
2352 server
2353 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
2354 .await;
2355
2356 cx_a.update(|_| drop(channel_a));
2357 server
2358 .condition(|state| state.channel(channel_id).is_none())
2359 .await;
2360 }
2361
2362 #[gpui::test]
2363 async fn test_chat_message_validation(mut cx_a: TestAppContext) {
2364 cx_a.foreground().forbid_parking();
2365
2366 let mut server = TestServer::start(cx_a.foreground()).await;
2367 let client_a = server.create_client(&mut cx_a, "user_a").await;
2368
2369 let db = &server.app_state.db;
2370 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2371 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2372 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2373 .await
2374 .unwrap();
2375 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2376 .await
2377 .unwrap();
2378
2379 let channels_a = cx_a
2380 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2381 channels_a
2382 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2383 .await;
2384 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2385 this.get_channel(channel_id.to_proto(), cx).unwrap()
2386 });
2387
2388 // Messages aren't allowed to be too long.
2389 channel_a
2390 .update(&mut cx_a, |channel, cx| {
2391 let long_body = "this is long.\n".repeat(1024);
2392 channel.send_message(long_body, cx).unwrap()
2393 })
2394 .await
2395 .unwrap_err();
2396
2397 // Messages aren't allowed to be blank.
2398 channel_a.update(&mut cx_a, |channel, cx| {
2399 channel.send_message(String::new(), cx).unwrap_err()
2400 });
2401
2402 // Leading and trailing whitespace are trimmed.
2403 channel_a
2404 .update(&mut cx_a, |channel, cx| {
2405 channel
2406 .send_message("\n surrounded by whitespace \n".to_string(), cx)
2407 .unwrap()
2408 })
2409 .await
2410 .unwrap();
2411 assert_eq!(
2412 db.get_channel_messages(channel_id, 10, None)
2413 .await
2414 .unwrap()
2415 .iter()
2416 .map(|m| &m.body)
2417 .collect::<Vec<_>>(),
2418 &["surrounded by whitespace"]
2419 );
2420 }
2421
2422 #[gpui::test]
2423 async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2424 cx_a.foreground().forbid_parking();
2425
2426 // Connect to a server as 2 clients.
2427 let mut server = TestServer::start(cx_a.foreground()).await;
2428 let client_a = server.create_client(&mut cx_a, "user_a").await;
2429 let client_b = server.create_client(&mut cx_b, "user_b").await;
2430 let mut status_b = client_b.status();
2431
2432 // Create an org that includes these 2 users.
2433 let db = &server.app_state.db;
2434 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2435 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2436 .await
2437 .unwrap();
2438 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2439 .await
2440 .unwrap();
2441
2442 // Create a channel that includes all the users.
2443 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2444 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2445 .await
2446 .unwrap();
2447 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2448 .await
2449 .unwrap();
2450 db.create_channel_message(
2451 channel_id,
2452 client_b.current_user_id(&cx_b),
2453 "hello A, it's B.",
2454 OffsetDateTime::now_utc(),
2455 2,
2456 )
2457 .await
2458 .unwrap();
2459
2460 let channels_a = cx_a
2461 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2462 channels_a
2463 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2464 .await;
2465
2466 channels_a.read_with(&cx_a, |list, _| {
2467 assert_eq!(
2468 list.available_channels().unwrap(),
2469 &[ChannelDetails {
2470 id: channel_id.to_proto(),
2471 name: "test-channel".to_string()
2472 }]
2473 )
2474 });
2475 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2476 this.get_channel(channel_id.to_proto(), cx).unwrap()
2477 });
2478 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2479 channel_a
2480 .condition(&cx_a, |channel, _| {
2481 channel_messages(channel)
2482 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2483 })
2484 .await;
2485
2486 let channels_b = cx_b
2487 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2488 channels_b
2489 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2490 .await;
2491 channels_b.read_with(&cx_b, |list, _| {
2492 assert_eq!(
2493 list.available_channels().unwrap(),
2494 &[ChannelDetails {
2495 id: channel_id.to_proto(),
2496 name: "test-channel".to_string()
2497 }]
2498 )
2499 });
2500
2501 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2502 this.get_channel(channel_id.to_proto(), cx).unwrap()
2503 });
2504 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2505 channel_b
2506 .condition(&cx_b, |channel, _| {
2507 channel_messages(channel)
2508 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2509 })
2510 .await;
2511
2512 // Disconnect client B, ensuring we can still access its cached channel data.
2513 server.forbid_connections();
2514 server.disconnect_client(client_b.current_user_id(&cx_b));
2515 while !matches!(
2516 status_b.next().await,
2517 Some(client::Status::ReconnectionError { .. })
2518 ) {}
2519
2520 channels_b.read_with(&cx_b, |channels, _| {
2521 assert_eq!(
2522 channels.available_channels().unwrap(),
2523 [ChannelDetails {
2524 id: channel_id.to_proto(),
2525 name: "test-channel".to_string()
2526 }]
2527 )
2528 });
2529 channel_b.read_with(&cx_b, |channel, _| {
2530 assert_eq!(
2531 channel_messages(channel),
2532 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2533 )
2534 });
2535
2536 // Send a message from client B while it is disconnected.
2537 channel_b
2538 .update(&mut cx_b, |channel, cx| {
2539 let task = channel
2540 .send_message("can you see this?".to_string(), cx)
2541 .unwrap();
2542 assert_eq!(
2543 channel_messages(channel),
2544 &[
2545 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2546 ("user_b".to_string(), "can you see this?".to_string(), true)
2547 ]
2548 );
2549 task
2550 })
2551 .await
2552 .unwrap_err();
2553
2554 // Send a message from client A while B is disconnected.
2555 channel_a
2556 .update(&mut cx_a, |channel, cx| {
2557 channel
2558 .send_message("oh, hi B.".to_string(), cx)
2559 .unwrap()
2560 .detach();
2561 let task = channel.send_message("sup".to_string(), cx).unwrap();
2562 assert_eq!(
2563 channel_messages(channel),
2564 &[
2565 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2566 ("user_a".to_string(), "oh, hi B.".to_string(), true),
2567 ("user_a".to_string(), "sup".to_string(), true)
2568 ]
2569 );
2570 task
2571 })
2572 .await
2573 .unwrap();
2574
2575 // Give client B a chance to reconnect.
2576 server.allow_connections();
2577 cx_b.foreground().advance_clock(Duration::from_secs(10));
2578
2579 // Verify that B sees the new messages upon reconnection, as well as the message client B
2580 // sent while offline.
2581 channel_b
2582 .condition(&cx_b, |channel, _| {
2583 channel_messages(channel)
2584 == [
2585 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2586 ("user_a".to_string(), "oh, hi B.".to_string(), false),
2587 ("user_a".to_string(), "sup".to_string(), false),
2588 ("user_b".to_string(), "can you see this?".to_string(), false),
2589 ]
2590 })
2591 .await;
2592
2593 // Ensure client A and B can communicate normally after reconnection.
2594 channel_a
2595 .update(&mut cx_a, |channel, cx| {
2596 channel.send_message("you online?".to_string(), cx).unwrap()
2597 })
2598 .await
2599 .unwrap();
2600 channel_b
2601 .condition(&cx_b, |channel, _| {
2602 channel_messages(channel)
2603 == [
2604 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2605 ("user_a".to_string(), "oh, hi B.".to_string(), false),
2606 ("user_a".to_string(), "sup".to_string(), false),
2607 ("user_b".to_string(), "can you see this?".to_string(), false),
2608 ("user_a".to_string(), "you online?".to_string(), false),
2609 ]
2610 })
2611 .await;
2612
2613 channel_b
2614 .update(&mut cx_b, |channel, cx| {
2615 channel.send_message("yep".to_string(), cx).unwrap()
2616 })
2617 .await
2618 .unwrap();
2619 channel_a
2620 .condition(&cx_a, |channel, _| {
2621 channel_messages(channel)
2622 == [
2623 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2624 ("user_a".to_string(), "oh, hi B.".to_string(), false),
2625 ("user_a".to_string(), "sup".to_string(), false),
2626 ("user_b".to_string(), "can you see this?".to_string(), false),
2627 ("user_a".to_string(), "you online?".to_string(), false),
2628 ("user_b".to_string(), "yep".to_string(), false),
2629 ]
2630 })
2631 .await;
2632 }
2633
2634 #[gpui::test]
2635 async fn test_contacts(
2636 mut cx_a: TestAppContext,
2637 mut cx_b: TestAppContext,
2638 mut cx_c: TestAppContext,
2639 ) {
2640 cx_a.foreground().forbid_parking();
2641 let lang_registry = Arc::new(LanguageRegistry::new());
2642 let fs = Arc::new(FakeFs::new());
2643
2644 // Connect to a server as 3 clients.
2645 let mut server = TestServer::start(cx_a.foreground()).await;
2646 let client_a = server.create_client(&mut cx_a, "user_a").await;
2647 let client_b = server.create_client(&mut cx_b, "user_b").await;
2648 let client_c = server.create_client(&mut cx_c, "user_c").await;
2649
2650 // Share a worktree as client A.
2651 fs.insert_tree(
2652 "/a",
2653 json!({
2654 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2655 }),
2656 )
2657 .await;
2658
2659 let project_a = cx_a.update(|cx| {
2660 Project::local(
2661 client_a.clone(),
2662 client_a.user_store.clone(),
2663 lang_registry.clone(),
2664 fs.clone(),
2665 cx,
2666 )
2667 });
2668 let (worktree_a, _) = project_a
2669 .update(&mut cx_a, |p, cx| {
2670 p.find_or_create_local_worktree("/a", false, cx)
2671 })
2672 .await
2673 .unwrap();
2674 worktree_a
2675 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2676 .await;
2677
2678 client_a
2679 .user_store
2680 .condition(&cx_a, |user_store, _| {
2681 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2682 })
2683 .await;
2684 client_b
2685 .user_store
2686 .condition(&cx_b, |user_store, _| {
2687 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2688 })
2689 .await;
2690 client_c
2691 .user_store
2692 .condition(&cx_c, |user_store, _| {
2693 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
2694 })
2695 .await;
2696
2697 let project_id = project_a
2698 .update(&mut cx_a, |project, _| project.next_remote_id())
2699 .await;
2700 project_a
2701 .update(&mut cx_a, |project, cx| project.share(cx))
2702 .await
2703 .unwrap();
2704
2705 let _project_b = Project::remote(
2706 project_id,
2707 client_b.clone(),
2708 client_b.user_store.clone(),
2709 lang_registry.clone(),
2710 fs.clone(),
2711 &mut cx_b.to_async(),
2712 )
2713 .await
2714 .unwrap();
2715
2716 client_a
2717 .user_store
2718 .condition(&cx_a, |user_store, _| {
2719 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2720 })
2721 .await;
2722 client_b
2723 .user_store
2724 .condition(&cx_b, |user_store, _| {
2725 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2726 })
2727 .await;
2728 client_c
2729 .user_store
2730 .condition(&cx_c, |user_store, _| {
2731 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
2732 })
2733 .await;
2734
2735 project_a
2736 .condition(&cx_a, |project, _| {
2737 project.collaborators().contains_key(&client_b.peer_id)
2738 })
2739 .await;
2740
2741 cx_a.update(move |_| drop(project_a));
2742 client_a
2743 .user_store
2744 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
2745 .await;
2746 client_b
2747 .user_store
2748 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
2749 .await;
2750 client_c
2751 .user_store
2752 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
2753 .await;
2754
2755 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
2756 user_store
2757 .contacts()
2758 .iter()
2759 .map(|contact| {
2760 let worktrees = contact
2761 .projects
2762 .iter()
2763 .map(|p| {
2764 (
2765 p.worktree_root_names[0].as_str(),
2766 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
2767 )
2768 })
2769 .collect();
2770 (contact.user.github_login.as_str(), worktrees)
2771 })
2772 .collect()
2773 }
2774 }
2775
2776 struct TestServer {
2777 peer: Arc<Peer>,
2778 app_state: Arc<AppState>,
2779 server: Arc<Server>,
2780 foreground: Rc<executor::Foreground>,
2781 notifications: mpsc::Receiver<()>,
2782 connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
2783 forbid_connections: Arc<AtomicBool>,
2784 _test_db: TestDb,
2785 }
2786
2787 impl TestServer {
2788 async fn start(foreground: Rc<executor::Foreground>) -> Self {
2789 let test_db = TestDb::new();
2790 let app_state = Self::build_app_state(&test_db).await;
2791 let peer = Peer::new();
2792 let notifications = mpsc::channel(128);
2793 let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
2794 Self {
2795 peer,
2796 app_state,
2797 server,
2798 foreground,
2799 notifications: notifications.1,
2800 connection_killers: Default::default(),
2801 forbid_connections: Default::default(),
2802 _test_db: test_db,
2803 }
2804 }
2805
2806 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
2807 let http = FakeHttpClient::with_404_response();
2808 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
2809 let client_name = name.to_string();
2810 let mut client = Client::new(http.clone());
2811 let server = self.server.clone();
2812 let connection_killers = self.connection_killers.clone();
2813 let forbid_connections = self.forbid_connections.clone();
2814 let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
2815
2816 Arc::get_mut(&mut client)
2817 .unwrap()
2818 .override_authenticate(move |cx| {
2819 cx.spawn(|_| async move {
2820 let access_token = "the-token".to_string();
2821 Ok(Credentials {
2822 user_id: user_id.0 as u64,
2823 access_token,
2824 })
2825 })
2826 })
2827 .override_establish_connection(move |credentials, cx| {
2828 assert_eq!(credentials.user_id, user_id.0 as u64);
2829 assert_eq!(credentials.access_token, "the-token");
2830
2831 let server = server.clone();
2832 let connection_killers = connection_killers.clone();
2833 let forbid_connections = forbid_connections.clone();
2834 let client_name = client_name.clone();
2835 let connection_id_tx = connection_id_tx.clone();
2836 cx.spawn(move |cx| async move {
2837 if forbid_connections.load(SeqCst) {
2838 Err(EstablishConnectionError::other(anyhow!(
2839 "server is forbidding connections"
2840 )))
2841 } else {
2842 let (client_conn, server_conn, kill_conn) = Connection::in_memory();
2843 connection_killers.lock().insert(user_id, kill_conn);
2844 cx.background()
2845 .spawn(server.handle_connection(
2846 server_conn,
2847 client_name,
2848 user_id,
2849 Some(connection_id_tx),
2850 ))
2851 .detach();
2852 Ok(client_conn)
2853 }
2854 })
2855 });
2856
2857 client
2858 .authenticate_and_connect(&cx.to_async())
2859 .await
2860 .unwrap();
2861
2862 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
2863 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
2864 let mut authed_user =
2865 user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
2866 while authed_user.next().await.unwrap().is_none() {}
2867
2868 TestClient {
2869 client,
2870 peer_id,
2871 user_store,
2872 }
2873 }
2874
2875 fn disconnect_client(&self, user_id: UserId) {
2876 if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
2877 let _ = kill_conn.try_send(Some(()));
2878 }
2879 }
2880
2881 fn forbid_connections(&self) {
2882 self.forbid_connections.store(true, SeqCst);
2883 }
2884
2885 fn allow_connections(&self) {
2886 self.forbid_connections.store(false, SeqCst);
2887 }
2888
2889 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
2890 let mut config = Config::default();
2891 config.session_secret = "a".repeat(32);
2892 config.database_url = test_db.url.clone();
2893 let github_client = github::AppClient::test();
2894 Arc::new(AppState {
2895 db: test_db.db().clone(),
2896 handlebars: Default::default(),
2897 auth_client: auth::build_client("", ""),
2898 repo_client: github::RepoClient::test(&github_client),
2899 github_client,
2900 config,
2901 })
2902 }
2903
2904 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
2905 self.server.store.read()
2906 }
2907
2908 async fn condition<F>(&mut self, mut predicate: F)
2909 where
2910 F: FnMut(&Store) -> bool,
2911 {
2912 async_std::future::timeout(Duration::from_millis(500), async {
2913 while !(predicate)(&*self.server.store.read()) {
2914 self.foreground.start_waiting();
2915 self.notifications.next().await;
2916 self.foreground.finish_waiting();
2917 }
2918 })
2919 .await
2920 .expect("condition timed out");
2921 }
2922 }
2923
2924 impl Drop for TestServer {
2925 fn drop(&mut self) {
2926 self.peer.reset();
2927 }
2928 }
2929
2930 struct TestClient {
2931 client: Arc<Client>,
2932 pub peer_id: PeerId,
2933 pub user_store: ModelHandle<UserStore>,
2934 }
2935
2936 impl Deref for TestClient {
2937 type Target = Arc<Client>;
2938
2939 fn deref(&self) -> &Self::Target {
2940 &self.client
2941 }
2942 }
2943
2944 impl TestClient {
2945 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
2946 UserId::from_proto(
2947 self.user_store
2948 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
2949 )
2950 }
2951 }
2952
2953 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
2954 channel
2955 .messages()
2956 .cursor::<()>()
2957 .map(|m| {
2958 (
2959 m.sender.github_login.clone(),
2960 m.body.clone(),
2961 m.is_pending(),
2962 )
2963 })
2964 .collect()
2965 }
2966
2967 struct EmptyView;
2968
2969 impl gpui::Entity for EmptyView {
2970 type Event = ();
2971 }
2972
2973 impl gpui::View for EmptyView {
2974 fn ui_name() -> &'static str {
2975 "empty view"
2976 }
2977
2978 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
2979 gpui::Element::boxed(gpui::elements::Empty)
2980 }
2981 }
2982}