1mod store;
2
3use super::{
4 auth::process_auth_header,
5 db::{ChannelId, MessageId, UserId},
6 AppState,
7};
8use anyhow::anyhow;
9use async_std::task;
10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
11use collections::{HashMap, HashSet};
12use futures::{future::BoxFuture, FutureExt, StreamExt};
13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
14use postage::{mpsc, prelude::Sink as _};
15use rpc::{
16 proto::{self, AnyTypedEnvelope, EnvelopedMessage},
17 Connection, ConnectionId, Peer, TypedEnvelope,
18};
19use sha1::{Digest as _, Sha1};
20use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
21use store::{Store, Worktree};
22use surf::StatusCode;
23use tide::log;
24use tide::{
25 http::headers::{HeaderName, CONNECTION, UPGRADE},
26 Request, Response,
27};
28use time::OffsetDateTime;
29
30type MessageHandler = Box<
31 dyn Send
32 + Sync
33 + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
34>;
35
36pub struct Server {
37 peer: Arc<Peer>,
38 store: RwLock<Store>,
39 app_state: Arc<AppState>,
40 handlers: HashMap<TypeId, MessageHandler>,
41 notifications: Option<mpsc::Sender<()>>,
42}
43
44const MESSAGE_COUNT_PER_PAGE: usize = 100;
45const MAX_MESSAGE_LEN: usize = 1024;
46const NO_SUCH_PROJECT: &'static str = "no such project";
47
48impl Server {
49 pub fn new(
50 app_state: Arc<AppState>,
51 peer: Arc<Peer>,
52 notifications: Option<mpsc::Sender<()>>,
53 ) -> Arc<Self> {
54 let mut server = Self {
55 peer,
56 app_state,
57 store: Default::default(),
58 handlers: Default::default(),
59 notifications,
60 };
61
62 server
63 .add_handler(Server::ping)
64 .add_handler(Server::register_project)
65 .add_handler(Server::unregister_project)
66 .add_handler(Server::share_project)
67 .add_handler(Server::unshare_project)
68 .add_handler(Server::join_project)
69 .add_handler(Server::leave_project)
70 .add_handler(Server::register_worktree)
71 .add_handler(Server::unregister_worktree)
72 .add_handler(Server::share_worktree)
73 .add_handler(Server::update_worktree)
74 .add_handler(Server::update_diagnostic_summary)
75 .add_handler(Server::disk_based_diagnostics_updating)
76 .add_handler(Server::disk_based_diagnostics_updated)
77 .add_handler(Server::get_definition)
78 .add_handler(Server::open_buffer)
79 .add_handler(Server::close_buffer)
80 .add_handler(Server::update_buffer)
81 .add_handler(Server::update_buffer_file)
82 .add_handler(Server::buffer_reloaded)
83 .add_handler(Server::buffer_saved)
84 .add_handler(Server::save_buffer)
85 .add_handler(Server::format_buffer)
86 .add_handler(Server::get_completions)
87 .add_handler(Server::apply_additional_edits_for_completion)
88 .add_handler(Server::get_channels)
89 .add_handler(Server::get_users)
90 .add_handler(Server::join_channel)
91 .add_handler(Server::leave_channel)
92 .add_handler(Server::send_channel_message)
93 .add_handler(Server::get_channel_messages);
94
95 Arc::new(server)
96 }
97
98 fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
99 where
100 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
101 Fut: 'static + Send + Future<Output = tide::Result<()>>,
102 M: EnvelopedMessage,
103 {
104 let prev_handler = self.handlers.insert(
105 TypeId::of::<M>(),
106 Box::new(move |server, envelope| {
107 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
108 (handler)(server, *envelope).boxed()
109 }),
110 );
111 if prev_handler.is_some() {
112 panic!("registered a handler for the same message twice");
113 }
114 self
115 }
116
117 pub fn handle_connection(
118 self: &Arc<Self>,
119 connection: Connection,
120 addr: String,
121 user_id: UserId,
122 mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
123 ) -> impl Future<Output = ()> {
124 let mut this = self.clone();
125 async move {
126 let (connection_id, handle_io, mut incoming_rx) =
127 this.peer.add_connection(connection).await;
128
129 if let Some(send_connection_id) = send_connection_id.as_mut() {
130 let _ = send_connection_id.send(connection_id).await;
131 }
132
133 this.state_mut().add_connection(connection_id, user_id);
134 if let Err(err) = this.update_contacts_for_users(&[user_id]).await {
135 log::error!("error updating contacts for {:?}: {}", user_id, err);
136 }
137
138 let handle_io = handle_io.fuse();
139 futures::pin_mut!(handle_io);
140 loop {
141 let next_message = incoming_rx.next().fuse();
142 futures::pin_mut!(next_message);
143 futures::select_biased! {
144 message = next_message => {
145 if let Some(message) = message {
146 let start_time = Instant::now();
147 log::info!("RPC message received: {}", message.payload_type_name());
148 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
149 if let Err(err) = (handler)(this.clone(), message).await {
150 log::error!("error handling message: {:?}", err);
151 } else {
152 log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
153 }
154
155 if let Some(mut notifications) = this.notifications.clone() {
156 let _ = notifications.send(()).await;
157 }
158 } else {
159 log::warn!("unhandled message: {}", message.payload_type_name());
160 }
161 } else {
162 log::info!("rpc connection closed {:?}", addr);
163 break;
164 }
165 }
166 handle_io = handle_io => {
167 if let Err(err) = handle_io {
168 log::error!("error handling rpc connection {:?} - {:?}", addr, err);
169 }
170 break;
171 }
172 }
173 }
174
175 if let Err(err) = this.sign_out(connection_id).await {
176 log::error!("error signing out connection {:?} - {:?}", addr, err);
177 }
178 }
179 }
180
181 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
182 self.peer.disconnect(connection_id);
183 let removed_connection = self.state_mut().remove_connection(connection_id)?;
184
185 for (project_id, project) in removed_connection.hosted_projects {
186 if let Some(share) = project.share {
187 broadcast(
188 connection_id,
189 share.guests.keys().copied().collect(),
190 |conn_id| {
191 self.peer
192 .send(conn_id, proto::UnshareProject { project_id })
193 },
194 )
195 .await?;
196 }
197 }
198
199 for (project_id, peer_ids) in removed_connection.guest_project_ids {
200 broadcast(connection_id, peer_ids, |conn_id| {
201 self.peer.send(
202 conn_id,
203 proto::RemoveProjectCollaborator {
204 project_id,
205 peer_id: connection_id.0,
206 },
207 )
208 })
209 .await?;
210 }
211
212 self.update_contacts_for_users(removed_connection.contact_ids.iter())
213 .await?;
214
215 Ok(())
216 }
217
218 async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
219 self.peer.respond(request.receipt(), proto::Ack {}).await?;
220 Ok(())
221 }
222
223 async fn register_project(
224 mut self: Arc<Server>,
225 request: TypedEnvelope<proto::RegisterProject>,
226 ) -> tide::Result<()> {
227 let project_id = {
228 let mut state = self.state_mut();
229 let user_id = state.user_id_for_connection(request.sender_id)?;
230 state.register_project(request.sender_id, user_id)
231 };
232 self.peer
233 .respond(
234 request.receipt(),
235 proto::RegisterProjectResponse { project_id },
236 )
237 .await?;
238 Ok(())
239 }
240
241 async fn unregister_project(
242 mut self: Arc<Server>,
243 request: TypedEnvelope<proto::UnregisterProject>,
244 ) -> tide::Result<()> {
245 let project = self
246 .state_mut()
247 .unregister_project(request.payload.project_id, request.sender_id)
248 .ok_or_else(|| anyhow!("no such project"))?;
249 self.update_contacts_for_users(project.authorized_user_ids().iter())
250 .await?;
251 Ok(())
252 }
253
254 async fn share_project(
255 mut self: Arc<Server>,
256 request: TypedEnvelope<proto::ShareProject>,
257 ) -> tide::Result<()> {
258 self.state_mut()
259 .share_project(request.payload.project_id, request.sender_id);
260 self.peer.respond(request.receipt(), proto::Ack {}).await?;
261 Ok(())
262 }
263
264 async fn unshare_project(
265 mut self: Arc<Server>,
266 request: TypedEnvelope<proto::UnshareProject>,
267 ) -> tide::Result<()> {
268 let project_id = request.payload.project_id;
269 let project = self
270 .state_mut()
271 .unshare_project(project_id, request.sender_id)?;
272
273 broadcast(request.sender_id, project.connection_ids, |conn_id| {
274 self.peer
275 .send(conn_id, proto::UnshareProject { project_id })
276 })
277 .await?;
278 self.update_contacts_for_users(&project.authorized_user_ids)
279 .await?;
280
281 Ok(())
282 }
283
284 async fn join_project(
285 mut self: Arc<Server>,
286 request: TypedEnvelope<proto::JoinProject>,
287 ) -> tide::Result<()> {
288 let project_id = request.payload.project_id;
289
290 let user_id = self.state().user_id_for_connection(request.sender_id)?;
291 let response_data = self
292 .state_mut()
293 .join_project(request.sender_id, user_id, project_id)
294 .and_then(|joined| {
295 let share = joined.project.share()?;
296 let peer_count = share.guests.len();
297 let mut collaborators = Vec::with_capacity(peer_count);
298 collaborators.push(proto::Collaborator {
299 peer_id: joined.project.host_connection_id.0,
300 replica_id: 0,
301 user_id: joined.project.host_user_id.to_proto(),
302 });
303 let worktrees = joined
304 .project
305 .worktrees
306 .iter()
307 .filter_map(|(id, worktree)| {
308 worktree.share.as_ref().map(|share| proto::Worktree {
309 id: *id,
310 root_name: worktree.root_name.clone(),
311 entries: share.entries.values().cloned().collect(),
312 diagnostic_summaries: share
313 .diagnostic_summaries
314 .values()
315 .cloned()
316 .collect(),
317 weak: worktree.weak,
318 })
319 })
320 .collect();
321 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
322 if *peer_conn_id != request.sender_id {
323 collaborators.push(proto::Collaborator {
324 peer_id: peer_conn_id.0,
325 replica_id: *peer_replica_id as u32,
326 user_id: peer_user_id.to_proto(),
327 });
328 }
329 }
330 let response = proto::JoinProjectResponse {
331 worktrees,
332 replica_id: joined.replica_id as u32,
333 collaborators,
334 };
335 let connection_ids = joined.project.connection_ids();
336 let contact_user_ids = joined.project.authorized_user_ids();
337 Ok((response, connection_ids, contact_user_ids))
338 });
339
340 match response_data {
341 Ok((response, connection_ids, contact_user_ids)) => {
342 broadcast(request.sender_id, connection_ids, |conn_id| {
343 self.peer.send(
344 conn_id,
345 proto::AddProjectCollaborator {
346 project_id,
347 collaborator: Some(proto::Collaborator {
348 peer_id: request.sender_id.0,
349 replica_id: response.replica_id,
350 user_id: user_id.to_proto(),
351 }),
352 },
353 )
354 })
355 .await?;
356 self.peer.respond(request.receipt(), response).await?;
357 self.update_contacts_for_users(&contact_user_ids).await?;
358 }
359 Err(error) => {
360 self.peer
361 .respond_with_error(
362 request.receipt(),
363 proto::Error {
364 message: error.to_string(),
365 },
366 )
367 .await?;
368 }
369 }
370
371 Ok(())
372 }
373
374 async fn leave_project(
375 mut self: Arc<Server>,
376 request: TypedEnvelope<proto::LeaveProject>,
377 ) -> tide::Result<()> {
378 let sender_id = request.sender_id;
379 let project_id = request.payload.project_id;
380 let worktree = self.state_mut().leave_project(sender_id, project_id);
381 if let Some(worktree) = worktree {
382 broadcast(sender_id, worktree.connection_ids, |conn_id| {
383 self.peer.send(
384 conn_id,
385 proto::RemoveProjectCollaborator {
386 project_id,
387 peer_id: sender_id.0,
388 },
389 )
390 })
391 .await?;
392 self.update_contacts_for_users(&worktree.authorized_user_ids)
393 .await?;
394 }
395 Ok(())
396 }
397
398 async fn register_worktree(
399 mut self: Arc<Server>,
400 request: TypedEnvelope<proto::RegisterWorktree>,
401 ) -> tide::Result<()> {
402 let receipt = request.receipt();
403 let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
404
405 let mut contact_user_ids = HashSet::default();
406 contact_user_ids.insert(host_user_id);
407 for github_login in request.payload.authorized_logins {
408 match self.app_state.db.create_user(&github_login, false).await {
409 Ok(contact_user_id) => {
410 contact_user_ids.insert(contact_user_id);
411 }
412 Err(err) => {
413 let message = err.to_string();
414 self.peer
415 .respond_with_error(receipt, proto::Error { message })
416 .await?;
417 return Ok(());
418 }
419 }
420 }
421
422 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
423 let ok = self.state_mut().register_worktree(
424 request.payload.project_id,
425 request.payload.worktree_id,
426 Worktree {
427 authorized_user_ids: contact_user_ids.clone(),
428 root_name: request.payload.root_name,
429 share: None,
430 weak: false,
431 },
432 );
433
434 if ok {
435 self.peer.respond(receipt, proto::Ack {}).await?;
436 self.update_contacts_for_users(&contact_user_ids).await?;
437 } else {
438 self.peer
439 .respond_with_error(
440 receipt,
441 proto::Error {
442 message: NO_SUCH_PROJECT.to_string(),
443 },
444 )
445 .await?;
446 }
447
448 Ok(())
449 }
450
451 async fn unregister_worktree(
452 mut self: Arc<Server>,
453 request: TypedEnvelope<proto::UnregisterWorktree>,
454 ) -> tide::Result<()> {
455 let project_id = request.payload.project_id;
456 let worktree_id = request.payload.worktree_id;
457 let (worktree, guest_connection_ids) =
458 self.state_mut()
459 .unregister_worktree(project_id, worktree_id, request.sender_id)?;
460
461 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
462 self.peer.send(
463 conn_id,
464 proto::UnregisterWorktree {
465 project_id,
466 worktree_id,
467 },
468 )
469 })
470 .await?;
471 self.update_contacts_for_users(&worktree.authorized_user_ids)
472 .await?;
473 Ok(())
474 }
475
476 async fn share_worktree(
477 mut self: Arc<Server>,
478 mut request: TypedEnvelope<proto::ShareWorktree>,
479 ) -> tide::Result<()> {
480 let worktree = request
481 .payload
482 .worktree
483 .as_mut()
484 .ok_or_else(|| anyhow!("missing worktree"))?;
485 let entries = worktree
486 .entries
487 .iter()
488 .map(|entry| (entry.id, entry.clone()))
489 .collect();
490 let diagnostic_summaries = worktree
491 .diagnostic_summaries
492 .iter()
493 .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
494 .collect();
495
496 let shared_worktree = self.state_mut().share_worktree(
497 request.payload.project_id,
498 worktree.id,
499 request.sender_id,
500 entries,
501 diagnostic_summaries,
502 );
503 if let Some(shared_worktree) = shared_worktree {
504 broadcast(
505 request.sender_id,
506 shared_worktree.connection_ids,
507 |connection_id| {
508 self.peer.forward_send(
509 request.sender_id,
510 connection_id,
511 request.payload.clone(),
512 )
513 },
514 )
515 .await?;
516 self.peer.respond(request.receipt(), proto::Ack {}).await?;
517 self.update_contacts_for_users(&shared_worktree.authorized_user_ids)
518 .await?;
519 } else {
520 self.peer
521 .respond_with_error(
522 request.receipt(),
523 proto::Error {
524 message: "no such worktree".to_string(),
525 },
526 )
527 .await?;
528 }
529 Ok(())
530 }
531
532 async fn update_worktree(
533 mut self: Arc<Server>,
534 request: TypedEnvelope<proto::UpdateWorktree>,
535 ) -> tide::Result<()> {
536 let connection_ids = self
537 .state_mut()
538 .update_worktree(
539 request.sender_id,
540 request.payload.project_id,
541 request.payload.worktree_id,
542 &request.payload.removed_entries,
543 &request.payload.updated_entries,
544 )
545 .ok_or_else(|| anyhow!("no such worktree"))?;
546
547 broadcast(request.sender_id, connection_ids, |connection_id| {
548 self.peer
549 .forward_send(request.sender_id, connection_id, request.payload.clone())
550 })
551 .await?;
552
553 Ok(())
554 }
555
556 async fn update_diagnostic_summary(
557 mut self: Arc<Server>,
558 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
559 ) -> tide::Result<()> {
560 let receiver_ids = request
561 .payload
562 .summary
563 .clone()
564 .and_then(|summary| {
565 self.state_mut().update_diagnostic_summary(
566 request.payload.project_id,
567 request.payload.worktree_id,
568 request.sender_id,
569 summary,
570 )
571 })
572 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
573
574 broadcast(request.sender_id, receiver_ids, |connection_id| {
575 self.peer
576 .forward_send(request.sender_id, connection_id, request.payload.clone())
577 })
578 .await?;
579 Ok(())
580 }
581
582 async fn disk_based_diagnostics_updating(
583 self: Arc<Server>,
584 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
585 ) -> tide::Result<()> {
586 let receiver_ids = self
587 .state()
588 .project_connection_ids(request.payload.project_id, request.sender_id)
589 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
590 broadcast(request.sender_id, receiver_ids, |connection_id| {
591 self.peer
592 .forward_send(request.sender_id, connection_id, request.payload.clone())
593 })
594 .await?;
595 Ok(())
596 }
597
598 async fn disk_based_diagnostics_updated(
599 self: Arc<Server>,
600 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
601 ) -> tide::Result<()> {
602 let receiver_ids = self
603 .state()
604 .project_connection_ids(request.payload.project_id, request.sender_id)
605 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
606 broadcast(request.sender_id, receiver_ids, |connection_id| {
607 self.peer
608 .forward_send(request.sender_id, connection_id, request.payload.clone())
609 })
610 .await?;
611 Ok(())
612 }
613
614 async fn get_definition(
615 self: Arc<Server>,
616 request: TypedEnvelope<proto::GetDefinition>,
617 ) -> tide::Result<()> {
618 let receipt = request.receipt();
619 let host_connection_id = self
620 .state()
621 .read_project(request.payload.project_id, request.sender_id)
622 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
623 .host_connection_id;
624 let response = self
625 .peer
626 .forward_request(request.sender_id, host_connection_id, request.payload)
627 .await?;
628 self.peer.respond(receipt, response).await?;
629 Ok(())
630 }
631
632 async fn open_buffer(
633 self: Arc<Server>,
634 request: TypedEnvelope<proto::OpenBuffer>,
635 ) -> tide::Result<()> {
636 let receipt = request.receipt();
637 let host_connection_id = self
638 .state()
639 .read_project(request.payload.project_id, request.sender_id)
640 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
641 .host_connection_id;
642 let response = self
643 .peer
644 .forward_request(request.sender_id, host_connection_id, request.payload)
645 .await?;
646 self.peer.respond(receipt, response).await?;
647 Ok(())
648 }
649
650 async fn close_buffer(
651 self: Arc<Server>,
652 request: TypedEnvelope<proto::CloseBuffer>,
653 ) -> tide::Result<()> {
654 let host_connection_id = self
655 .state()
656 .read_project(request.payload.project_id, request.sender_id)
657 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
658 .host_connection_id;
659 self.peer
660 .forward_send(request.sender_id, host_connection_id, request.payload)
661 .await?;
662 Ok(())
663 }
664
665 async fn save_buffer(
666 self: Arc<Server>,
667 request: TypedEnvelope<proto::SaveBuffer>,
668 ) -> tide::Result<()> {
669 let host;
670 let guests;
671 {
672 let state = self.state();
673 let project = state
674 .read_project(request.payload.project_id, request.sender_id)
675 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
676 host = project.host_connection_id;
677 guests = project.guest_connection_ids()
678 }
679
680 let sender = request.sender_id;
681 let receipt = request.receipt();
682 let response = self
683 .peer
684 .forward_request(sender, host, request.payload.clone())
685 .await?;
686
687 broadcast(host, guests, |conn_id| {
688 let response = response.clone();
689 let peer = &self.peer;
690 async move {
691 if conn_id == sender {
692 peer.respond(receipt, response).await
693 } else {
694 peer.forward_send(host, conn_id, response).await
695 }
696 }
697 })
698 .await?;
699
700 Ok(())
701 }
702
703 async fn format_buffer(
704 self: Arc<Server>,
705 request: TypedEnvelope<proto::FormatBuffer>,
706 ) -> tide::Result<()> {
707 let host;
708 {
709 let state = self.state();
710 let project = state
711 .read_project(request.payload.project_id, request.sender_id)
712 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
713 host = project.host_connection_id;
714 }
715
716 let sender = request.sender_id;
717 let receipt = request.receipt();
718 let response = self
719 .peer
720 .forward_request(sender, host, request.payload.clone())
721 .await?;
722 self.peer.respond(receipt, response).await?;
723
724 Ok(())
725 }
726
727 async fn get_completions(
728 self: Arc<Server>,
729 request: TypedEnvelope<proto::GetCompletions>,
730 ) -> tide::Result<()> {
731 let host;
732 {
733 let state = self.state();
734 let project = state
735 .read_project(request.payload.project_id, request.sender_id)
736 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
737 host = project.host_connection_id;
738 }
739
740 let sender = request.sender_id;
741 let receipt = request.receipt();
742 let response = self
743 .peer
744 .forward_request(sender, host, request.payload.clone())
745 .await?;
746 self.peer.respond(receipt, response).await?;
747
748 Ok(())
749 }
750
751 async fn apply_additional_edits_for_completion(
752 self: Arc<Server>,
753 request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
754 ) -> tide::Result<()> {
755 let host;
756 {
757 let state = self.state();
758 let project = state
759 .read_project(request.payload.project_id, request.sender_id)
760 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
761 host = project.host_connection_id;
762 }
763
764 let sender = request.sender_id;
765 let receipt = request.receipt();
766 let response = self
767 .peer
768 .forward_request(sender, host, request.payload.clone())
769 .await?;
770 self.peer.respond(receipt, response).await?;
771
772 Ok(())
773 }
774
775 async fn update_buffer(
776 self: Arc<Server>,
777 request: TypedEnvelope<proto::UpdateBuffer>,
778 ) -> tide::Result<()> {
779 let receiver_ids = self
780 .state()
781 .project_connection_ids(request.payload.project_id, request.sender_id)
782 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
783 broadcast(request.sender_id, receiver_ids, |connection_id| {
784 self.peer
785 .forward_send(request.sender_id, connection_id, request.payload.clone())
786 })
787 .await?;
788 self.peer.respond(request.receipt(), proto::Ack {}).await?;
789 Ok(())
790 }
791
792 async fn update_buffer_file(
793 self: Arc<Server>,
794 request: TypedEnvelope<proto::UpdateBufferFile>,
795 ) -> tide::Result<()> {
796 let receiver_ids = self
797 .state()
798 .project_connection_ids(request.payload.project_id, request.sender_id)
799 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
800 broadcast(request.sender_id, receiver_ids, |connection_id| {
801 self.peer
802 .forward_send(request.sender_id, connection_id, request.payload.clone())
803 })
804 .await?;
805 Ok(())
806 }
807
808 async fn buffer_reloaded(
809 self: Arc<Server>,
810 request: TypedEnvelope<proto::BufferReloaded>,
811 ) -> tide::Result<()> {
812 let receiver_ids = self
813 .state()
814 .project_connection_ids(request.payload.project_id, request.sender_id)
815 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
816 broadcast(request.sender_id, receiver_ids, |connection_id| {
817 self.peer
818 .forward_send(request.sender_id, connection_id, request.payload.clone())
819 })
820 .await?;
821 Ok(())
822 }
823
824 async fn buffer_saved(
825 self: Arc<Server>,
826 request: TypedEnvelope<proto::BufferSaved>,
827 ) -> tide::Result<()> {
828 let receiver_ids = self
829 .state()
830 .project_connection_ids(request.payload.project_id, request.sender_id)
831 .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
832 broadcast(request.sender_id, receiver_ids, |connection_id| {
833 self.peer
834 .forward_send(request.sender_id, connection_id, request.payload.clone())
835 })
836 .await?;
837 Ok(())
838 }
839
840 async fn get_channels(
841 self: Arc<Server>,
842 request: TypedEnvelope<proto::GetChannels>,
843 ) -> tide::Result<()> {
844 let user_id = self.state().user_id_for_connection(request.sender_id)?;
845 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
846 self.peer
847 .respond(
848 request.receipt(),
849 proto::GetChannelsResponse {
850 channels: channels
851 .into_iter()
852 .map(|chan| proto::Channel {
853 id: chan.id.to_proto(),
854 name: chan.name,
855 })
856 .collect(),
857 },
858 )
859 .await?;
860 Ok(())
861 }
862
863 async fn get_users(
864 self: Arc<Server>,
865 request: TypedEnvelope<proto::GetUsers>,
866 ) -> tide::Result<()> {
867 let receipt = request.receipt();
868 let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
869 let users = self
870 .app_state
871 .db
872 .get_users_by_ids(user_ids)
873 .await?
874 .into_iter()
875 .map(|user| proto::User {
876 id: user.id.to_proto(),
877 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
878 github_login: user.github_login,
879 })
880 .collect();
881 self.peer
882 .respond(receipt, proto::GetUsersResponse { users })
883 .await?;
884 Ok(())
885 }
886
887 async fn update_contacts_for_users<'a>(
888 self: &Arc<Server>,
889 user_ids: impl IntoIterator<Item = &'a UserId>,
890 ) -> tide::Result<()> {
891 let mut send_futures = Vec::new();
892
893 {
894 let state = self.state();
895 for user_id in user_ids {
896 let contacts = state.contacts_for_user(*user_id);
897 for connection_id in state.connection_ids_for_user(*user_id) {
898 send_futures.push(self.peer.send(
899 connection_id,
900 proto::UpdateContacts {
901 contacts: contacts.clone(),
902 },
903 ));
904 }
905 }
906 }
907 futures::future::try_join_all(send_futures).await?;
908
909 Ok(())
910 }
911
912 async fn join_channel(
913 mut self: Arc<Self>,
914 request: TypedEnvelope<proto::JoinChannel>,
915 ) -> tide::Result<()> {
916 let user_id = self.state().user_id_for_connection(request.sender_id)?;
917 let channel_id = ChannelId::from_proto(request.payload.channel_id);
918 if !self
919 .app_state
920 .db
921 .can_user_access_channel(user_id, channel_id)
922 .await?
923 {
924 Err(anyhow!("access denied"))?;
925 }
926
927 self.state_mut().join_channel(request.sender_id, channel_id);
928 let messages = self
929 .app_state
930 .db
931 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
932 .await?
933 .into_iter()
934 .map(|msg| proto::ChannelMessage {
935 id: msg.id.to_proto(),
936 body: msg.body,
937 timestamp: msg.sent_at.unix_timestamp() as u64,
938 sender_id: msg.sender_id.to_proto(),
939 nonce: Some(msg.nonce.as_u128().into()),
940 })
941 .collect::<Vec<_>>();
942 self.peer
943 .respond(
944 request.receipt(),
945 proto::JoinChannelResponse {
946 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
947 messages,
948 },
949 )
950 .await?;
951 Ok(())
952 }
953
954 async fn leave_channel(
955 mut self: Arc<Self>,
956 request: TypedEnvelope<proto::LeaveChannel>,
957 ) -> tide::Result<()> {
958 let user_id = self.state().user_id_for_connection(request.sender_id)?;
959 let channel_id = ChannelId::from_proto(request.payload.channel_id);
960 if !self
961 .app_state
962 .db
963 .can_user_access_channel(user_id, channel_id)
964 .await?
965 {
966 Err(anyhow!("access denied"))?;
967 }
968
969 self.state_mut()
970 .leave_channel(request.sender_id, channel_id);
971
972 Ok(())
973 }
974
975 async fn send_channel_message(
976 self: Arc<Self>,
977 request: TypedEnvelope<proto::SendChannelMessage>,
978 ) -> tide::Result<()> {
979 let receipt = request.receipt();
980 let channel_id = ChannelId::from_proto(request.payload.channel_id);
981 let user_id;
982 let connection_ids;
983 {
984 let state = self.state();
985 user_id = state.user_id_for_connection(request.sender_id)?;
986 if let Some(ids) = state.channel_connection_ids(channel_id) {
987 connection_ids = ids;
988 } else {
989 return Ok(());
990 }
991 }
992
993 // Validate the message body.
994 let body = request.payload.body.trim().to_string();
995 if body.len() > MAX_MESSAGE_LEN {
996 self.peer
997 .respond_with_error(
998 receipt,
999 proto::Error {
1000 message: "message is too long".to_string(),
1001 },
1002 )
1003 .await?;
1004 return Ok(());
1005 }
1006 if body.is_empty() {
1007 self.peer
1008 .respond_with_error(
1009 receipt,
1010 proto::Error {
1011 message: "message can't be blank".to_string(),
1012 },
1013 )
1014 .await?;
1015 return Ok(());
1016 }
1017
1018 let timestamp = OffsetDateTime::now_utc();
1019 let nonce = if let Some(nonce) = request.payload.nonce {
1020 nonce
1021 } else {
1022 self.peer
1023 .respond_with_error(
1024 receipt,
1025 proto::Error {
1026 message: "nonce can't be blank".to_string(),
1027 },
1028 )
1029 .await?;
1030 return Ok(());
1031 };
1032
1033 let message_id = self
1034 .app_state
1035 .db
1036 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1037 .await?
1038 .to_proto();
1039 let message = proto::ChannelMessage {
1040 sender_id: user_id.to_proto(),
1041 id: message_id,
1042 body,
1043 timestamp: timestamp.unix_timestamp() as u64,
1044 nonce: Some(nonce),
1045 };
1046 broadcast(request.sender_id, connection_ids, |conn_id| {
1047 self.peer.send(
1048 conn_id,
1049 proto::ChannelMessageSent {
1050 channel_id: channel_id.to_proto(),
1051 message: Some(message.clone()),
1052 },
1053 )
1054 })
1055 .await?;
1056 self.peer
1057 .respond(
1058 receipt,
1059 proto::SendChannelMessageResponse {
1060 message: Some(message),
1061 },
1062 )
1063 .await?;
1064 Ok(())
1065 }
1066
1067 async fn get_channel_messages(
1068 self: Arc<Self>,
1069 request: TypedEnvelope<proto::GetChannelMessages>,
1070 ) -> tide::Result<()> {
1071 let user_id = self.state().user_id_for_connection(request.sender_id)?;
1072 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1073 if !self
1074 .app_state
1075 .db
1076 .can_user_access_channel(user_id, channel_id)
1077 .await?
1078 {
1079 Err(anyhow!("access denied"))?;
1080 }
1081
1082 let messages = self
1083 .app_state
1084 .db
1085 .get_channel_messages(
1086 channel_id,
1087 MESSAGE_COUNT_PER_PAGE,
1088 Some(MessageId::from_proto(request.payload.before_message_id)),
1089 )
1090 .await?
1091 .into_iter()
1092 .map(|msg| proto::ChannelMessage {
1093 id: msg.id.to_proto(),
1094 body: msg.body,
1095 timestamp: msg.sent_at.unix_timestamp() as u64,
1096 sender_id: msg.sender_id.to_proto(),
1097 nonce: Some(msg.nonce.as_u128().into()),
1098 })
1099 .collect::<Vec<_>>();
1100 self.peer
1101 .respond(
1102 request.receipt(),
1103 proto::GetChannelMessagesResponse {
1104 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1105 messages,
1106 },
1107 )
1108 .await?;
1109 Ok(())
1110 }
1111
1112 fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
1113 self.store.read()
1114 }
1115
1116 fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
1117 self.store.write()
1118 }
1119}
1120
1121pub async fn broadcast<F, T>(
1122 sender_id: ConnectionId,
1123 receiver_ids: Vec<ConnectionId>,
1124 mut f: F,
1125) -> anyhow::Result<()>
1126where
1127 F: FnMut(ConnectionId) -> T,
1128 T: Future<Output = anyhow::Result<()>>,
1129{
1130 let futures = receiver_ids
1131 .into_iter()
1132 .filter(|id| *id != sender_id)
1133 .map(|id| f(id));
1134 futures::future::try_join_all(futures).await?;
1135 Ok(())
1136}
1137
1138pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1139 let server = Server::new(app.state().clone(), rpc.clone(), None);
1140 app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1141 let server = server.clone();
1142 async move {
1143 const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1144
1145 let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1146 let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1147 let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1148 let client_protocol_version: Option<u32> = request
1149 .header("X-Zed-Protocol-Version")
1150 .and_then(|v| v.as_str().parse().ok());
1151
1152 if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1153 return Ok(Response::new(StatusCode::UpgradeRequired));
1154 }
1155
1156 let header = match request.header("Sec-Websocket-Key") {
1157 Some(h) => h.as_str(),
1158 None => return Err(anyhow!("expected sec-websocket-key"))?,
1159 };
1160
1161 let user_id = process_auth_header(&request).await?;
1162
1163 let mut response = Response::new(StatusCode::SwitchingProtocols);
1164 response.insert_header(UPGRADE, "websocket");
1165 response.insert_header(CONNECTION, "Upgrade");
1166 let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1167 response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1168 response.insert_header("Sec-Websocket-Version", "13");
1169
1170 let http_res: &mut tide::http::Response = response.as_mut();
1171 let upgrade_receiver = http_res.recv_upgrade().await;
1172 let addr = request.remote().unwrap_or("unknown").to_string();
1173 task::spawn(async move {
1174 if let Some(stream) = upgrade_receiver.await {
1175 server
1176 .handle_connection(
1177 Connection::new(
1178 WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1179 ),
1180 addr,
1181 user_id,
1182 None,
1183 )
1184 .await;
1185 }
1186 });
1187
1188 Ok(response)
1189 }
1190 });
1191}
1192
1193fn header_contains_ignore_case<T>(
1194 request: &tide::Request<T>,
1195 header_name: HeaderName,
1196 value: &str,
1197) -> bool {
1198 request
1199 .header(header_name)
1200 .map(|h| {
1201 h.as_str()
1202 .split(',')
1203 .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1204 })
1205 .unwrap_or(false)
1206}
1207
1208#[cfg(test)]
1209mod tests {
1210 use super::*;
1211 use crate::{
1212 auth,
1213 db::{tests::TestDb, UserId},
1214 github, AppState, Config,
1215 };
1216 use ::rpc::Peer;
1217 use async_std::task;
1218 use gpui::{executor, ModelHandle, TestAppContext};
1219 use parking_lot::Mutex;
1220 use postage::{mpsc, watch};
1221 use rand::prelude::*;
1222 use rpc::PeerId;
1223 use serde_json::json;
1224 use sqlx::types::time::OffsetDateTime;
1225 use std::{
1226 ops::Deref,
1227 path::Path,
1228 rc::Rc,
1229 sync::{
1230 atomic::{AtomicBool, Ordering::SeqCst},
1231 Arc,
1232 },
1233 time::Duration,
1234 };
1235 use zed::{
1236 client::{
1237 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1238 EstablishConnectionError, UserStore,
1239 },
1240 editor::{ConfirmCompletion, Editor, EditorSettings, Input, MultiBuffer},
1241 fs::{FakeFs, Fs as _},
1242 language::{
1243 tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1244 LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1245 },
1246 lsp,
1247 project::{DiagnosticSummary, Project, ProjectPath},
1248 };
1249
1250 #[gpui::test]
1251 async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1252 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1253 let lang_registry = Arc::new(LanguageRegistry::new());
1254 let fs = Arc::new(FakeFs::new(cx_a.background()));
1255 cx_a.foreground().forbid_parking();
1256
1257 // Connect to a server as 2 clients.
1258 let mut server = TestServer::start(cx_a.foreground()).await;
1259 let client_a = server.create_client(&mut cx_a, "user_a").await;
1260 let client_b = server.create_client(&mut cx_b, "user_b").await;
1261
1262 // Share a project as client A
1263 fs.insert_tree(
1264 "/a",
1265 json!({
1266 ".zed.toml": r#"collaborators = ["user_b"]"#,
1267 "a.txt": "a-contents",
1268 "b.txt": "b-contents",
1269 }),
1270 )
1271 .await;
1272 let project_a = cx_a.update(|cx| {
1273 Project::local(
1274 client_a.clone(),
1275 client_a.user_store.clone(),
1276 lang_registry.clone(),
1277 fs.clone(),
1278 cx,
1279 )
1280 });
1281 let (worktree_a, _) = project_a
1282 .update(&mut cx_a, |p, cx| {
1283 p.find_or_create_local_worktree("/a", false, cx)
1284 })
1285 .await
1286 .unwrap();
1287 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1288 worktree_a
1289 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1290 .await;
1291 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1292 project_a
1293 .update(&mut cx_a, |p, cx| p.share(cx))
1294 .await
1295 .unwrap();
1296
1297 // Join that project as client B
1298 let project_b = Project::remote(
1299 project_id,
1300 client_b.clone(),
1301 client_b.user_store.clone(),
1302 lang_registry.clone(),
1303 fs.clone(),
1304 &mut cx_b.to_async(),
1305 )
1306 .await
1307 .unwrap();
1308
1309 let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1310 assert_eq!(
1311 project
1312 .collaborators()
1313 .get(&client_a.peer_id)
1314 .unwrap()
1315 .user
1316 .github_login,
1317 "user_a"
1318 );
1319 project.replica_id()
1320 });
1321 project_a
1322 .condition(&cx_a, |tree, _| {
1323 tree.collaborators()
1324 .get(&client_b.peer_id)
1325 .map_or(false, |collaborator| {
1326 collaborator.replica_id == replica_id_b
1327 && collaborator.user.github_login == "user_b"
1328 })
1329 })
1330 .await;
1331
1332 // Open the same file as client B and client A.
1333 let buffer_b = project_b
1334 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1335 .await
1336 .unwrap();
1337 let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1338 buffer_b.read_with(&cx_b, |buf, cx| {
1339 assert_eq!(buf.read(cx).text(), "b-contents")
1340 });
1341 project_a.read_with(&cx_a, |project, cx| {
1342 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1343 });
1344 let buffer_a = project_a
1345 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1346 .await
1347 .unwrap();
1348
1349 let editor_b = cx_b.add_view(window_b, |cx| {
1350 Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), cx)
1351 });
1352
1353 // TODO
1354 // // Create a selection set as client B and see that selection set as client A.
1355 // buffer_a
1356 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1357 // .await;
1358
1359 // Edit the buffer as client B and see that edit as client A.
1360 editor_b.update(&mut cx_b, |editor, cx| {
1361 editor.handle_input(&Input("ok, ".into()), cx)
1362 });
1363 buffer_a
1364 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1365 .await;
1366
1367 // TODO
1368 // // Remove the selection set as client B, see those selections disappear as client A.
1369 cx_b.update(move |_| drop(editor_b));
1370 // buffer_a
1371 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1372 // .await;
1373
1374 // Close the buffer as client A, see that the buffer is closed.
1375 cx_a.update(move |_| drop(buffer_a));
1376 project_a
1377 .condition(&cx_a, |project, cx| {
1378 !project.has_open_buffer((worktree_id, "b.txt"), cx)
1379 })
1380 .await;
1381
1382 // Dropping the client B's project removes client B from client A's collaborators.
1383 cx_b.update(move |_| drop(project_b));
1384 project_a
1385 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1386 .await;
1387 }
1388
1389 #[gpui::test]
1390 async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1391 let lang_registry = Arc::new(LanguageRegistry::new());
1392 let fs = Arc::new(FakeFs::new(cx_a.background()));
1393 cx_a.foreground().forbid_parking();
1394
1395 // Connect to a server as 2 clients.
1396 let mut server = TestServer::start(cx_a.foreground()).await;
1397 let client_a = server.create_client(&mut cx_a, "user_a").await;
1398 let client_b = server.create_client(&mut cx_b, "user_b").await;
1399
1400 // Share a project as client A
1401 fs.insert_tree(
1402 "/a",
1403 json!({
1404 ".zed.toml": r#"collaborators = ["user_b"]"#,
1405 "a.txt": "a-contents",
1406 "b.txt": "b-contents",
1407 }),
1408 )
1409 .await;
1410 let project_a = cx_a.update(|cx| {
1411 Project::local(
1412 client_a.clone(),
1413 client_a.user_store.clone(),
1414 lang_registry.clone(),
1415 fs.clone(),
1416 cx,
1417 )
1418 });
1419 let (worktree_a, _) = project_a
1420 .update(&mut cx_a, |p, cx| {
1421 p.find_or_create_local_worktree("/a", false, cx)
1422 })
1423 .await
1424 .unwrap();
1425 worktree_a
1426 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1427 .await;
1428 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1429 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1430 project_a
1431 .update(&mut cx_a, |p, cx| p.share(cx))
1432 .await
1433 .unwrap();
1434 assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1435
1436 // Join that project as client B
1437 let project_b = Project::remote(
1438 project_id,
1439 client_b.clone(),
1440 client_b.user_store.clone(),
1441 lang_registry.clone(),
1442 fs.clone(),
1443 &mut cx_b.to_async(),
1444 )
1445 .await
1446 .unwrap();
1447 project_b
1448 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1449 .await
1450 .unwrap();
1451
1452 // Unshare the project as client A
1453 project_a
1454 .update(&mut cx_a, |project, cx| project.unshare(cx))
1455 .await
1456 .unwrap();
1457 project_b
1458 .condition(&mut cx_b, |project, _| project.is_read_only())
1459 .await;
1460 assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1461 drop(project_b);
1462
1463 // Share the project again and ensure guests can still join.
1464 project_a
1465 .update(&mut cx_a, |project, cx| project.share(cx))
1466 .await
1467 .unwrap();
1468 assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1469
1470 let project_c = Project::remote(
1471 project_id,
1472 client_b.clone(),
1473 client_b.user_store.clone(),
1474 lang_registry.clone(),
1475 fs.clone(),
1476 &mut cx_b.to_async(),
1477 )
1478 .await
1479 .unwrap();
1480 project_c
1481 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1482 .await
1483 .unwrap();
1484 }
1485
1486 #[gpui::test]
1487 async fn test_propagate_saves_and_fs_changes(
1488 mut cx_a: TestAppContext,
1489 mut cx_b: TestAppContext,
1490 mut cx_c: TestAppContext,
1491 ) {
1492 let lang_registry = Arc::new(LanguageRegistry::new());
1493 let fs = Arc::new(FakeFs::new(cx_a.background()));
1494 cx_a.foreground().forbid_parking();
1495
1496 // Connect to a server as 3 clients.
1497 let mut server = TestServer::start(cx_a.foreground()).await;
1498 let client_a = server.create_client(&mut cx_a, "user_a").await;
1499 let client_b = server.create_client(&mut cx_b, "user_b").await;
1500 let client_c = server.create_client(&mut cx_c, "user_c").await;
1501
1502 // Share a worktree as client A.
1503 fs.insert_tree(
1504 "/a",
1505 json!({
1506 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1507 "file1": "",
1508 "file2": ""
1509 }),
1510 )
1511 .await;
1512 let project_a = cx_a.update(|cx| {
1513 Project::local(
1514 client_a.clone(),
1515 client_a.user_store.clone(),
1516 lang_registry.clone(),
1517 fs.clone(),
1518 cx,
1519 )
1520 });
1521 let (worktree_a, _) = project_a
1522 .update(&mut cx_a, |p, cx| {
1523 p.find_or_create_local_worktree("/a", false, cx)
1524 })
1525 .await
1526 .unwrap();
1527 worktree_a
1528 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1529 .await;
1530 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1531 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1532 project_a
1533 .update(&mut cx_a, |p, cx| p.share(cx))
1534 .await
1535 .unwrap();
1536
1537 // Join that worktree as clients B and C.
1538 let project_b = Project::remote(
1539 project_id,
1540 client_b.clone(),
1541 client_b.user_store.clone(),
1542 lang_registry.clone(),
1543 fs.clone(),
1544 &mut cx_b.to_async(),
1545 )
1546 .await
1547 .unwrap();
1548 let project_c = Project::remote(
1549 project_id,
1550 client_c.clone(),
1551 client_c.user_store.clone(),
1552 lang_registry.clone(),
1553 fs.clone(),
1554 &mut cx_c.to_async(),
1555 )
1556 .await
1557 .unwrap();
1558 let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1559 let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1560
1561 // Open and edit a buffer as both guests B and C.
1562 let buffer_b = project_b
1563 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1564 .await
1565 .unwrap();
1566 let buffer_c = project_c
1567 .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1568 .await
1569 .unwrap();
1570 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1571 buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1572
1573 // Open and edit that buffer as the host.
1574 let buffer_a = project_a
1575 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1576 .await
1577 .unwrap();
1578
1579 buffer_a
1580 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1581 .await;
1582 buffer_a.update(&mut cx_a, |buf, cx| {
1583 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1584 });
1585
1586 // Wait for edits to propagate
1587 buffer_a
1588 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1589 .await;
1590 buffer_b
1591 .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1592 .await;
1593 buffer_c
1594 .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1595 .await;
1596
1597 // Edit the buffer as the host and concurrently save as guest B.
1598 let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1599 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1600 save_b.await.unwrap();
1601 assert_eq!(
1602 fs.load("/a/file1".as_ref()).await.unwrap(),
1603 "hi-a, i-am-c, i-am-b, i-am-a"
1604 );
1605 buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1606 buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1607 buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1608
1609 // Make changes on host's file system, see those changes on guest worktrees.
1610 fs.rename(
1611 "/a/file1".as_ref(),
1612 "/a/file1-renamed".as_ref(),
1613 Default::default(),
1614 )
1615 .await
1616 .unwrap();
1617 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1618 .await
1619 .unwrap();
1620 fs.insert_file(Path::new("/a/file4"), "4".into())
1621 .await
1622 .unwrap();
1623
1624 worktree_a
1625 .condition(&cx_a, |tree, _| tree.file_count() == 4)
1626 .await;
1627 worktree_b
1628 .condition(&cx_b, |tree, _| tree.file_count() == 4)
1629 .await;
1630 worktree_c
1631 .condition(&cx_c, |tree, _| tree.file_count() == 4)
1632 .await;
1633 worktree_a.read_with(&cx_a, |tree, _| {
1634 assert_eq!(
1635 tree.paths()
1636 .map(|p| p.to_string_lossy())
1637 .collect::<Vec<_>>(),
1638 &[".zed.toml", "file1-renamed", "file3", "file4"]
1639 )
1640 });
1641 worktree_b.read_with(&cx_b, |tree, _| {
1642 assert_eq!(
1643 tree.paths()
1644 .map(|p| p.to_string_lossy())
1645 .collect::<Vec<_>>(),
1646 &[".zed.toml", "file1-renamed", "file3", "file4"]
1647 )
1648 });
1649 worktree_c.read_with(&cx_c, |tree, _| {
1650 assert_eq!(
1651 tree.paths()
1652 .map(|p| p.to_string_lossy())
1653 .collect::<Vec<_>>(),
1654 &[".zed.toml", "file1-renamed", "file3", "file4"]
1655 )
1656 });
1657
1658 // Ensure buffer files are updated as well.
1659 buffer_a
1660 .condition(&cx_a, |buf, _| {
1661 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1662 })
1663 .await;
1664 buffer_b
1665 .condition(&cx_b, |buf, _| {
1666 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1667 })
1668 .await;
1669 buffer_c
1670 .condition(&cx_c, |buf, _| {
1671 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1672 })
1673 .await;
1674 }
1675
1676 #[gpui::test]
1677 async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1678 cx_a.foreground().forbid_parking();
1679 let lang_registry = Arc::new(LanguageRegistry::new());
1680 let fs = Arc::new(FakeFs::new(cx_a.background()));
1681
1682 // Connect to a server as 2 clients.
1683 let mut server = TestServer::start(cx_a.foreground()).await;
1684 let client_a = server.create_client(&mut cx_a, "user_a").await;
1685 let client_b = server.create_client(&mut cx_b, "user_b").await;
1686
1687 // Share a project as client A
1688 fs.insert_tree(
1689 "/dir",
1690 json!({
1691 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1692 "a.txt": "a-contents",
1693 }),
1694 )
1695 .await;
1696
1697 let project_a = cx_a.update(|cx| {
1698 Project::local(
1699 client_a.clone(),
1700 client_a.user_store.clone(),
1701 lang_registry.clone(),
1702 fs.clone(),
1703 cx,
1704 )
1705 });
1706 let (worktree_a, _) = project_a
1707 .update(&mut cx_a, |p, cx| {
1708 p.find_or_create_local_worktree("/dir", false, cx)
1709 })
1710 .await
1711 .unwrap();
1712 worktree_a
1713 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1714 .await;
1715 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1716 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1717 project_a
1718 .update(&mut cx_a, |p, cx| p.share(cx))
1719 .await
1720 .unwrap();
1721
1722 // Join that project as client B
1723 let project_b = Project::remote(
1724 project_id,
1725 client_b.clone(),
1726 client_b.user_store.clone(),
1727 lang_registry.clone(),
1728 fs.clone(),
1729 &mut cx_b.to_async(),
1730 )
1731 .await
1732 .unwrap();
1733 let worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1734
1735 // Open a buffer as client B
1736 let buffer_b = project_b
1737 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1738 .await
1739 .unwrap();
1740 let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1741
1742 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1743 buffer_b.read_with(&cx_b, |buf, _| {
1744 assert!(buf.is_dirty());
1745 assert!(!buf.has_conflict());
1746 });
1747
1748 buffer_b
1749 .update(&mut cx_b, |buf, cx| buf.save(cx))
1750 .await
1751 .unwrap();
1752 worktree_b
1753 .condition(&cx_b, |_, cx| {
1754 buffer_b.read(cx).file().unwrap().mtime() != mtime
1755 })
1756 .await;
1757 buffer_b.read_with(&cx_b, |buf, _| {
1758 assert!(!buf.is_dirty());
1759 assert!(!buf.has_conflict());
1760 });
1761
1762 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1763 buffer_b.read_with(&cx_b, |buf, _| {
1764 assert!(buf.is_dirty());
1765 assert!(!buf.has_conflict());
1766 });
1767 }
1768
1769 #[gpui::test]
1770 async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1771 cx_a.foreground().forbid_parking();
1772 let lang_registry = Arc::new(LanguageRegistry::new());
1773 let fs = Arc::new(FakeFs::new(cx_a.background()));
1774
1775 // Connect to a server as 2 clients.
1776 let mut server = TestServer::start(cx_a.foreground()).await;
1777 let client_a = server.create_client(&mut cx_a, "user_a").await;
1778 let client_b = server.create_client(&mut cx_b, "user_b").await;
1779
1780 // Share a project as client A
1781 fs.insert_tree(
1782 "/dir",
1783 json!({
1784 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1785 "a.txt": "a-contents",
1786 }),
1787 )
1788 .await;
1789
1790 let project_a = cx_a.update(|cx| {
1791 Project::local(
1792 client_a.clone(),
1793 client_a.user_store.clone(),
1794 lang_registry.clone(),
1795 fs.clone(),
1796 cx,
1797 )
1798 });
1799 let (worktree_a, _) = project_a
1800 .update(&mut cx_a, |p, cx| {
1801 p.find_or_create_local_worktree("/dir", false, cx)
1802 })
1803 .await
1804 .unwrap();
1805 worktree_a
1806 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1807 .await;
1808 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1809 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1810 project_a
1811 .update(&mut cx_a, |p, cx| p.share(cx))
1812 .await
1813 .unwrap();
1814
1815 // Join that project as client B
1816 let project_b = Project::remote(
1817 project_id,
1818 client_b.clone(),
1819 client_b.user_store.clone(),
1820 lang_registry.clone(),
1821 fs.clone(),
1822 &mut cx_b.to_async(),
1823 )
1824 .await
1825 .unwrap();
1826 let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1827
1828 // Open a buffer as client B
1829 let buffer_b = project_b
1830 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1831 .await
1832 .unwrap();
1833 buffer_b.read_with(&cx_b, |buf, _| {
1834 assert!(!buf.is_dirty());
1835 assert!(!buf.has_conflict());
1836 });
1837
1838 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1839 .await
1840 .unwrap();
1841 buffer_b
1842 .condition(&cx_b, |buf, _| {
1843 buf.text() == "new contents" && !buf.is_dirty()
1844 })
1845 .await;
1846 buffer_b.read_with(&cx_b, |buf, _| {
1847 assert!(!buf.has_conflict());
1848 });
1849 }
1850
1851 #[gpui::test(iterations = 100)]
1852 async fn test_editing_while_guest_opens_buffer(
1853 mut cx_a: TestAppContext,
1854 mut cx_b: TestAppContext,
1855 ) {
1856 cx_a.foreground().forbid_parking();
1857 let lang_registry = Arc::new(LanguageRegistry::new());
1858 let fs = Arc::new(FakeFs::new(cx_a.background()));
1859
1860 // Connect to a server as 2 clients.
1861 let mut server = TestServer::start(cx_a.foreground()).await;
1862 let client_a = server.create_client(&mut cx_a, "user_a").await;
1863 let client_b = server.create_client(&mut cx_b, "user_b").await;
1864
1865 // Share a project as client A
1866 fs.insert_tree(
1867 "/dir",
1868 json!({
1869 ".zed.toml": r#"collaborators = ["user_b"]"#,
1870 "a.txt": "a-contents",
1871 }),
1872 )
1873 .await;
1874 let project_a = cx_a.update(|cx| {
1875 Project::local(
1876 client_a.clone(),
1877 client_a.user_store.clone(),
1878 lang_registry.clone(),
1879 fs.clone(),
1880 cx,
1881 )
1882 });
1883 let (worktree_a, _) = project_a
1884 .update(&mut cx_a, |p, cx| {
1885 p.find_or_create_local_worktree("/dir", false, cx)
1886 })
1887 .await
1888 .unwrap();
1889 worktree_a
1890 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1891 .await;
1892 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1893 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1894 project_a
1895 .update(&mut cx_a, |p, cx| p.share(cx))
1896 .await
1897 .unwrap();
1898
1899 // Join that project as client B
1900 let project_b = Project::remote(
1901 project_id,
1902 client_b.clone(),
1903 client_b.user_store.clone(),
1904 lang_registry.clone(),
1905 fs.clone(),
1906 &mut cx_b.to_async(),
1907 )
1908 .await
1909 .unwrap();
1910
1911 // Open a buffer as client A
1912 let buffer_a = project_a
1913 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1914 .await
1915 .unwrap();
1916
1917 // Start opening the same buffer as client B
1918 let buffer_b = cx_b
1919 .background()
1920 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1921 task::yield_now().await;
1922
1923 // Edit the buffer as client A while client B is still opening it.
1924 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1925
1926 let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1927 let buffer_b = buffer_b.await.unwrap();
1928 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1929 }
1930
1931 #[gpui::test]
1932 async fn test_leaving_worktree_while_opening_buffer(
1933 mut cx_a: TestAppContext,
1934 mut cx_b: TestAppContext,
1935 ) {
1936 cx_a.foreground().forbid_parking();
1937 let lang_registry = Arc::new(LanguageRegistry::new());
1938 let fs = Arc::new(FakeFs::new(cx_a.background()));
1939
1940 // Connect to a server as 2 clients.
1941 let mut server = TestServer::start(cx_a.foreground()).await;
1942 let client_a = server.create_client(&mut cx_a, "user_a").await;
1943 let client_b = server.create_client(&mut cx_b, "user_b").await;
1944
1945 // Share a project as client A
1946 fs.insert_tree(
1947 "/dir",
1948 json!({
1949 ".zed.toml": r#"collaborators = ["user_b"]"#,
1950 "a.txt": "a-contents",
1951 }),
1952 )
1953 .await;
1954 let project_a = cx_a.update(|cx| {
1955 Project::local(
1956 client_a.clone(),
1957 client_a.user_store.clone(),
1958 lang_registry.clone(),
1959 fs.clone(),
1960 cx,
1961 )
1962 });
1963 let (worktree_a, _) = project_a
1964 .update(&mut cx_a, |p, cx| {
1965 p.find_or_create_local_worktree("/dir", false, cx)
1966 })
1967 .await
1968 .unwrap();
1969 worktree_a
1970 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1971 .await;
1972 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1973 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1974 project_a
1975 .update(&mut cx_a, |p, cx| p.share(cx))
1976 .await
1977 .unwrap();
1978
1979 // Join that project as client B
1980 let project_b = Project::remote(
1981 project_id,
1982 client_b.clone(),
1983 client_b.user_store.clone(),
1984 lang_registry.clone(),
1985 fs.clone(),
1986 &mut cx_b.to_async(),
1987 )
1988 .await
1989 .unwrap();
1990
1991 // See that a guest has joined as client A.
1992 project_a
1993 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1994 .await;
1995
1996 // Begin opening a buffer as client B, but leave the project before the open completes.
1997 let buffer_b = cx_b
1998 .background()
1999 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2000 cx_b.update(|_| drop(project_b));
2001 drop(buffer_b);
2002
2003 // See that the guest has left.
2004 project_a
2005 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2006 .await;
2007 }
2008
2009 #[gpui::test]
2010 async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2011 cx_a.foreground().forbid_parking();
2012 let lang_registry = Arc::new(LanguageRegistry::new());
2013 let fs = Arc::new(FakeFs::new(cx_a.background()));
2014
2015 // Connect to a server as 2 clients.
2016 let mut server = TestServer::start(cx_a.foreground()).await;
2017 let client_a = server.create_client(&mut cx_a, "user_a").await;
2018 let client_b = server.create_client(&mut cx_b, "user_b").await;
2019
2020 // Share a project as client A
2021 fs.insert_tree(
2022 "/a",
2023 json!({
2024 ".zed.toml": r#"collaborators = ["user_b"]"#,
2025 "a.txt": "a-contents",
2026 "b.txt": "b-contents",
2027 }),
2028 )
2029 .await;
2030 let project_a = cx_a.update(|cx| {
2031 Project::local(
2032 client_a.clone(),
2033 client_a.user_store.clone(),
2034 lang_registry.clone(),
2035 fs.clone(),
2036 cx,
2037 )
2038 });
2039 let (worktree_a, _) = project_a
2040 .update(&mut cx_a, |p, cx| {
2041 p.find_or_create_local_worktree("/a", false, cx)
2042 })
2043 .await
2044 .unwrap();
2045 worktree_a
2046 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2047 .await;
2048 let project_id = project_a
2049 .update(&mut cx_a, |project, _| project.next_remote_id())
2050 .await;
2051 project_a
2052 .update(&mut cx_a, |project, cx| project.share(cx))
2053 .await
2054 .unwrap();
2055
2056 // Join that project as client B
2057 let _project_b = Project::remote(
2058 project_id,
2059 client_b.clone(),
2060 client_b.user_store.clone(),
2061 lang_registry.clone(),
2062 fs.clone(),
2063 &mut cx_b.to_async(),
2064 )
2065 .await
2066 .unwrap();
2067
2068 // See that a guest has joined as client A.
2069 project_a
2070 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2071 .await;
2072
2073 // Drop client B's connection and ensure client A observes client B leaving the worktree.
2074 client_b.disconnect(&cx_b.to_async()).unwrap();
2075 project_a
2076 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2077 .await;
2078 }
2079
2080 #[gpui::test]
2081 async fn test_collaborating_with_diagnostics(
2082 mut cx_a: TestAppContext,
2083 mut cx_b: TestAppContext,
2084 ) {
2085 cx_a.foreground().forbid_parking();
2086 let mut lang_registry = Arc::new(LanguageRegistry::new());
2087 let fs = Arc::new(FakeFs::new(cx_a.background()));
2088
2089 // Set up a fake language server.
2090 let (language_server_config, mut fake_language_server) =
2091 LanguageServerConfig::fake(cx_a.background()).await;
2092 Arc::get_mut(&mut lang_registry)
2093 .unwrap()
2094 .add(Arc::new(Language::new(
2095 LanguageConfig {
2096 name: "Rust".to_string(),
2097 path_suffixes: vec!["rs".to_string()],
2098 language_server: Some(language_server_config),
2099 ..Default::default()
2100 },
2101 Some(tree_sitter_rust::language()),
2102 )));
2103
2104 // Connect to a server as 2 clients.
2105 let mut server = TestServer::start(cx_a.foreground()).await;
2106 let client_a = server.create_client(&mut cx_a, "user_a").await;
2107 let client_b = server.create_client(&mut cx_b, "user_b").await;
2108
2109 // Share a project as client A
2110 fs.insert_tree(
2111 "/a",
2112 json!({
2113 ".zed.toml": r#"collaborators = ["user_b"]"#,
2114 "a.rs": "let one = two",
2115 "other.rs": "",
2116 }),
2117 )
2118 .await;
2119 let project_a = cx_a.update(|cx| {
2120 Project::local(
2121 client_a.clone(),
2122 client_a.user_store.clone(),
2123 lang_registry.clone(),
2124 fs.clone(),
2125 cx,
2126 )
2127 });
2128 let (worktree_a, _) = project_a
2129 .update(&mut cx_a, |p, cx| {
2130 p.find_or_create_local_worktree("/a", false, cx)
2131 })
2132 .await
2133 .unwrap();
2134 worktree_a
2135 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2136 .await;
2137 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2138 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2139 project_a
2140 .update(&mut cx_a, |p, cx| p.share(cx))
2141 .await
2142 .unwrap();
2143
2144 // Cause the language server to start.
2145 let _ = cx_a
2146 .background()
2147 .spawn(project_a.update(&mut cx_a, |project, cx| {
2148 project.open_buffer(
2149 ProjectPath {
2150 worktree_id,
2151 path: Path::new("other.rs").into(),
2152 },
2153 cx,
2154 )
2155 }))
2156 .await
2157 .unwrap();
2158
2159 // Simulate a language server reporting errors for a file.
2160 fake_language_server
2161 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2162 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2163 version: None,
2164 diagnostics: vec![lsp::Diagnostic {
2165 severity: Some(lsp::DiagnosticSeverity::ERROR),
2166 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2167 message: "message 1".to_string(),
2168 ..Default::default()
2169 }],
2170 })
2171 .await;
2172
2173 // Wait for server to see the diagnostics update.
2174 server
2175 .condition(|store| {
2176 let worktree = store
2177 .project(project_id)
2178 .unwrap()
2179 .worktrees
2180 .get(&worktree_id.to_proto())
2181 .unwrap();
2182
2183 !worktree
2184 .share
2185 .as_ref()
2186 .unwrap()
2187 .diagnostic_summaries
2188 .is_empty()
2189 })
2190 .await;
2191
2192 // Join the worktree as client B.
2193 let project_b = Project::remote(
2194 project_id,
2195 client_b.clone(),
2196 client_b.user_store.clone(),
2197 lang_registry.clone(),
2198 fs.clone(),
2199 &mut cx_b.to_async(),
2200 )
2201 .await
2202 .unwrap();
2203
2204 project_b.read_with(&cx_b, |project, cx| {
2205 assert_eq!(
2206 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2207 &[(
2208 ProjectPath {
2209 worktree_id,
2210 path: Arc::from(Path::new("a.rs")),
2211 },
2212 DiagnosticSummary {
2213 error_count: 1,
2214 warning_count: 0,
2215 ..Default::default()
2216 },
2217 )]
2218 )
2219 });
2220
2221 // Simulate a language server reporting more errors for a file.
2222 fake_language_server
2223 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2224 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2225 version: None,
2226 diagnostics: vec![
2227 lsp::Diagnostic {
2228 severity: Some(lsp::DiagnosticSeverity::ERROR),
2229 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2230 message: "message 1".to_string(),
2231 ..Default::default()
2232 },
2233 lsp::Diagnostic {
2234 severity: Some(lsp::DiagnosticSeverity::WARNING),
2235 range: lsp::Range::new(
2236 lsp::Position::new(0, 10),
2237 lsp::Position::new(0, 13),
2238 ),
2239 message: "message 2".to_string(),
2240 ..Default::default()
2241 },
2242 ],
2243 })
2244 .await;
2245
2246 // Client b gets the updated summaries
2247 project_b
2248 .condition(&cx_b, |project, cx| {
2249 project.diagnostic_summaries(cx).collect::<Vec<_>>()
2250 == &[(
2251 ProjectPath {
2252 worktree_id,
2253 path: Arc::from(Path::new("a.rs")),
2254 },
2255 DiagnosticSummary {
2256 error_count: 1,
2257 warning_count: 1,
2258 ..Default::default()
2259 },
2260 )]
2261 })
2262 .await;
2263
2264 // Open the file with the errors on client B. They should be present.
2265 let buffer_b = cx_b
2266 .background()
2267 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2268 .await
2269 .unwrap();
2270
2271 buffer_b.read_with(&cx_b, |buffer, _| {
2272 assert_eq!(
2273 buffer
2274 .snapshot()
2275 .diagnostics_in_range::<_, Point>(0..buffer.len())
2276 .map(|entry| entry)
2277 .collect::<Vec<_>>(),
2278 &[
2279 DiagnosticEntry {
2280 range: Point::new(0, 4)..Point::new(0, 7),
2281 diagnostic: Diagnostic {
2282 group_id: 0,
2283 message: "message 1".to_string(),
2284 severity: lsp::DiagnosticSeverity::ERROR,
2285 is_primary: true,
2286 ..Default::default()
2287 }
2288 },
2289 DiagnosticEntry {
2290 range: Point::new(0, 10)..Point::new(0, 13),
2291 diagnostic: Diagnostic {
2292 group_id: 1,
2293 severity: lsp::DiagnosticSeverity::WARNING,
2294 message: "message 2".to_string(),
2295 is_primary: true,
2296 ..Default::default()
2297 }
2298 }
2299 ]
2300 );
2301 });
2302 }
2303
2304 #[gpui::test]
2305 async fn test_collaborating_with_completion(
2306 mut cx_a: TestAppContext,
2307 mut cx_b: TestAppContext,
2308 ) {
2309 cx_a.foreground().forbid_parking();
2310 let mut lang_registry = Arc::new(LanguageRegistry::new());
2311 let fs = Arc::new(FakeFs::new(cx_a.background()));
2312
2313 // Set up a fake language server.
2314 let (language_server_config, mut fake_language_server) =
2315 LanguageServerConfig::fake_with_capabilities(
2316 lsp::ServerCapabilities {
2317 completion_provider: Some(lsp::CompletionOptions {
2318 trigger_characters: Some(vec![".".to_string()]),
2319 ..Default::default()
2320 }),
2321 ..Default::default()
2322 },
2323 cx_a.background(),
2324 )
2325 .await;
2326 Arc::get_mut(&mut lang_registry)
2327 .unwrap()
2328 .add(Arc::new(Language::new(
2329 LanguageConfig {
2330 name: "Rust".to_string(),
2331 path_suffixes: vec!["rs".to_string()],
2332 language_server: Some(language_server_config),
2333 ..Default::default()
2334 },
2335 Some(tree_sitter_rust::language()),
2336 )));
2337
2338 // Connect to a server as 2 clients.
2339 let mut server = TestServer::start(cx_a.foreground()).await;
2340 let client_a = server.create_client(&mut cx_a, "user_a").await;
2341 let client_b = server.create_client(&mut cx_b, "user_b").await;
2342
2343 // Share a project as client A
2344 fs.insert_tree(
2345 "/a",
2346 json!({
2347 ".zed.toml": r#"collaborators = ["user_b"]"#,
2348 "main.rs": "fn main() { a }",
2349 "other.rs": "",
2350 }),
2351 )
2352 .await;
2353 let project_a = cx_a.update(|cx| {
2354 Project::local(
2355 client_a.clone(),
2356 client_a.user_store.clone(),
2357 lang_registry.clone(),
2358 fs.clone(),
2359 cx,
2360 )
2361 });
2362 let (worktree_a, _) = project_a
2363 .update(&mut cx_a, |p, cx| {
2364 p.find_or_create_local_worktree("/a", false, cx)
2365 })
2366 .await
2367 .unwrap();
2368 worktree_a
2369 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2370 .await;
2371 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2372 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2373 project_a
2374 .update(&mut cx_a, |p, cx| p.share(cx))
2375 .await
2376 .unwrap();
2377
2378 // Join the worktree as client B.
2379 let project_b = Project::remote(
2380 project_id,
2381 client_b.clone(),
2382 client_b.user_store.clone(),
2383 lang_registry.clone(),
2384 fs.clone(),
2385 &mut cx_b.to_async(),
2386 )
2387 .await
2388 .unwrap();
2389
2390 // Open a file in an editor as the guest.
2391 let buffer_b = project_b
2392 .update(&mut cx_b, |p, cx| {
2393 p.open_buffer((worktree_id, "main.rs"), cx)
2394 })
2395 .await
2396 .unwrap();
2397 let (window_b, _) = cx_b.add_window(|_| EmptyView);
2398 let editor_b = cx_b.add_view(window_b, |cx| {
2399 Editor::for_buffer(
2400 cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2401 Arc::new(|cx| EditorSettings::test(cx)),
2402 cx,
2403 )
2404 });
2405
2406 // Type a completion trigger character as the guest.
2407 editor_b.update(&mut cx_b, |editor, cx| {
2408 editor.select_ranges([13..13], None, cx);
2409 editor.handle_input(&Input(".".into()), cx);
2410 cx.focus(&editor_b);
2411 });
2412
2413 // Receive a completion request as the host's language server.
2414 let (request_id, params) = fake_language_server
2415 .receive_request::<lsp::request::Completion>()
2416 .await;
2417 assert_eq!(
2418 params.text_document_position.text_document.uri,
2419 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2420 );
2421 assert_eq!(
2422 params.text_document_position.position,
2423 lsp::Position::new(0, 14),
2424 );
2425
2426 // Return some completions from the host's language server.
2427 fake_language_server
2428 .respond(
2429 request_id,
2430 Some(lsp::CompletionResponse::Array(vec![
2431 lsp::CompletionItem {
2432 label: "first_method(…)".into(),
2433 detail: Some("fn(&mut self, B) -> C".into()),
2434 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2435 new_text: "first_method($1)".to_string(),
2436 range: lsp::Range::new(
2437 lsp::Position::new(0, 14),
2438 lsp::Position::new(0, 14),
2439 ),
2440 })),
2441 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2442 ..Default::default()
2443 },
2444 lsp::CompletionItem {
2445 label: "second_method(…)".into(),
2446 detail: Some("fn(&mut self, C) -> D<E>".into()),
2447 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2448 new_text: "second_method()".to_string(),
2449 range: lsp::Range::new(
2450 lsp::Position::new(0, 14),
2451 lsp::Position::new(0, 14),
2452 ),
2453 })),
2454 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2455 ..Default::default()
2456 },
2457 ])),
2458 )
2459 .await;
2460
2461 // Open the buffer on the host.
2462 let buffer_a = project_a
2463 .update(&mut cx_a, |p, cx| {
2464 p.open_buffer((worktree_id, "main.rs"), cx)
2465 })
2466 .await
2467 .unwrap();
2468 buffer_a
2469 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2470 .await;
2471
2472 // Confirm a completion on the guest.
2473 editor_b.next_notification(&cx_b).await;
2474 editor_b.update(&mut cx_b, |editor, cx| {
2475 assert!(editor.showing_context_menu());
2476 editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2477 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2478 });
2479
2480 buffer_a
2481 .condition(&cx_a, |buffer, _| {
2482 buffer.text() == "fn main() { a.first_method() }"
2483 })
2484 .await;
2485
2486 // Receive a request resolve the selected completion on the host's language server.
2487 let (request_id, params) = fake_language_server
2488 .receive_request::<lsp::request::ResolveCompletionItem>()
2489 .await;
2490 assert_eq!(params.label, "first_method(…)");
2491
2492 // Return a resolved completion from the host's language server.
2493 // The resolved completion has an additional text edit.
2494 fake_language_server
2495 .respond(
2496 request_id,
2497 lsp::CompletionItem {
2498 label: "first_method(…)".into(),
2499 detail: Some("fn(&mut self, B) -> C".into()),
2500 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2501 new_text: "first_method($1)".to_string(),
2502 range: lsp::Range::new(
2503 lsp::Position::new(0, 14),
2504 lsp::Position::new(0, 14),
2505 ),
2506 })),
2507 additional_text_edits: Some(vec![lsp::TextEdit {
2508 new_text: "use d::SomeTrait;\n".to_string(),
2509 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2510 }]),
2511 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2512 ..Default::default()
2513 },
2514 )
2515 .await;
2516
2517 // The additional edit is applied.
2518 buffer_b
2519 .condition(&cx_b, |buffer, _| {
2520 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2521 })
2522 .await;
2523 assert_eq!(
2524 buffer_a.read_with(&cx_a, |buffer, _| buffer.text()),
2525 buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2526 );
2527 }
2528
2529 #[gpui::test]
2530 async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2531 cx_a.foreground().forbid_parking();
2532 let mut lang_registry = Arc::new(LanguageRegistry::new());
2533 let fs = Arc::new(FakeFs::new(cx_a.background()));
2534
2535 // Set up a fake language server.
2536 let (language_server_config, mut fake_language_server) =
2537 LanguageServerConfig::fake(cx_a.background()).await;
2538 Arc::get_mut(&mut lang_registry)
2539 .unwrap()
2540 .add(Arc::new(Language::new(
2541 LanguageConfig {
2542 name: "Rust".to_string(),
2543 path_suffixes: vec!["rs".to_string()],
2544 language_server: Some(language_server_config),
2545 ..Default::default()
2546 },
2547 Some(tree_sitter_rust::language()),
2548 )));
2549
2550 // Connect to a server as 2 clients.
2551 let mut server = TestServer::start(cx_a.foreground()).await;
2552 let client_a = server.create_client(&mut cx_a, "user_a").await;
2553 let client_b = server.create_client(&mut cx_b, "user_b").await;
2554
2555 // Share a project as client A
2556 fs.insert_tree(
2557 "/a",
2558 json!({
2559 ".zed.toml": r#"collaborators = ["user_b"]"#,
2560 "a.rs": "let one = two",
2561 }),
2562 )
2563 .await;
2564 let project_a = cx_a.update(|cx| {
2565 Project::local(
2566 client_a.clone(),
2567 client_a.user_store.clone(),
2568 lang_registry.clone(),
2569 fs.clone(),
2570 cx,
2571 )
2572 });
2573 let (worktree_a, _) = project_a
2574 .update(&mut cx_a, |p, cx| {
2575 p.find_or_create_local_worktree("/a", false, cx)
2576 })
2577 .await
2578 .unwrap();
2579 worktree_a
2580 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2581 .await;
2582 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2583 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2584 project_a
2585 .update(&mut cx_a, |p, cx| p.share(cx))
2586 .await
2587 .unwrap();
2588
2589 // Join the worktree as client B.
2590 let project_b = Project::remote(
2591 project_id,
2592 client_b.clone(),
2593 client_b.user_store.clone(),
2594 lang_registry.clone(),
2595 fs.clone(),
2596 &mut cx_b.to_async(),
2597 )
2598 .await
2599 .unwrap();
2600
2601 let buffer_b = cx_b
2602 .background()
2603 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2604 .await
2605 .unwrap();
2606
2607 let format = buffer_b.update(&mut cx_b, |buffer, cx| buffer.format(cx));
2608 let (request_id, _) = fake_language_server
2609 .receive_request::<lsp::request::Formatting>()
2610 .await;
2611 fake_language_server
2612 .respond(
2613 request_id,
2614 Some(vec![
2615 lsp::TextEdit {
2616 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2617 new_text: "h".to_string(),
2618 },
2619 lsp::TextEdit {
2620 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2621 new_text: "y".to_string(),
2622 },
2623 ]),
2624 )
2625 .await;
2626 format.await.unwrap();
2627 assert_eq!(
2628 buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2629 "let honey = two"
2630 );
2631 }
2632
2633 #[gpui::test]
2634 async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2635 cx_a.foreground().forbid_parking();
2636 let mut lang_registry = Arc::new(LanguageRegistry::new());
2637 let fs = Arc::new(FakeFs::new(cx_a.background()));
2638 fs.insert_tree(
2639 "/root-1",
2640 json!({
2641 ".zed.toml": r#"collaborators = ["user_b"]"#,
2642 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2643 }),
2644 )
2645 .await;
2646 fs.insert_tree(
2647 "/root-2",
2648 json!({
2649 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2650 }),
2651 )
2652 .await;
2653
2654 // Set up a fake language server.
2655 let (language_server_config, mut fake_language_server) =
2656 LanguageServerConfig::fake(cx_a.background()).await;
2657 Arc::get_mut(&mut lang_registry)
2658 .unwrap()
2659 .add(Arc::new(Language::new(
2660 LanguageConfig {
2661 name: "Rust".to_string(),
2662 path_suffixes: vec!["rs".to_string()],
2663 language_server: Some(language_server_config),
2664 ..Default::default()
2665 },
2666 Some(tree_sitter_rust::language()),
2667 )));
2668
2669 // Connect to a server as 2 clients.
2670 let mut server = TestServer::start(cx_a.foreground()).await;
2671 let client_a = server.create_client(&mut cx_a, "user_a").await;
2672 let client_b = server.create_client(&mut cx_b, "user_b").await;
2673
2674 // Share a project as client A
2675 let project_a = cx_a.update(|cx| {
2676 Project::local(
2677 client_a.clone(),
2678 client_a.user_store.clone(),
2679 lang_registry.clone(),
2680 fs.clone(),
2681 cx,
2682 )
2683 });
2684 let (worktree_a, _) = project_a
2685 .update(&mut cx_a, |p, cx| {
2686 p.find_or_create_local_worktree("/root-1", false, cx)
2687 })
2688 .await
2689 .unwrap();
2690 worktree_a
2691 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2692 .await;
2693 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2694 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2695 project_a
2696 .update(&mut cx_a, |p, cx| p.share(cx))
2697 .await
2698 .unwrap();
2699
2700 // Join the worktree as client B.
2701 let project_b = Project::remote(
2702 project_id,
2703 client_b.clone(),
2704 client_b.user_store.clone(),
2705 lang_registry.clone(),
2706 fs.clone(),
2707 &mut cx_b.to_async(),
2708 )
2709 .await
2710 .unwrap();
2711
2712 // Open the file to be formatted on client B.
2713 let buffer_b = cx_b
2714 .background()
2715 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2716 .await
2717 .unwrap();
2718
2719 let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2720 let (request_id, _) = fake_language_server
2721 .receive_request::<lsp::request::GotoDefinition>()
2722 .await;
2723 fake_language_server
2724 .respond(
2725 request_id,
2726 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2727 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2728 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2729 ))),
2730 )
2731 .await;
2732 let definitions_1 = definitions_1.await.unwrap();
2733 cx_b.read(|cx| {
2734 assert_eq!(definitions_1.len(), 1);
2735 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2736 let target_buffer = definitions_1[0].target_buffer.read(cx);
2737 assert_eq!(
2738 target_buffer.text(),
2739 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2740 );
2741 assert_eq!(
2742 definitions_1[0].target_range.to_point(target_buffer),
2743 Point::new(0, 6)..Point::new(0, 9)
2744 );
2745 });
2746
2747 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2748 // the previous call to `definition`.
2749 let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2750 let (request_id, _) = fake_language_server
2751 .receive_request::<lsp::request::GotoDefinition>()
2752 .await;
2753 fake_language_server
2754 .respond(
2755 request_id,
2756 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2757 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2758 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2759 ))),
2760 )
2761 .await;
2762 let definitions_2 = definitions_2.await.unwrap();
2763 cx_b.read(|cx| {
2764 assert_eq!(definitions_2.len(), 1);
2765 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2766 let target_buffer = definitions_2[0].target_buffer.read(cx);
2767 assert_eq!(
2768 target_buffer.text(),
2769 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2770 );
2771 assert_eq!(
2772 definitions_2[0].target_range.to_point(target_buffer),
2773 Point::new(1, 6)..Point::new(1, 11)
2774 );
2775 });
2776 assert_eq!(
2777 definitions_1[0].target_buffer,
2778 definitions_2[0].target_buffer
2779 );
2780
2781 cx_b.update(|_| {
2782 drop(definitions_1);
2783 drop(definitions_2);
2784 });
2785 project_b
2786 .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2787 .await;
2788 }
2789
2790 #[gpui::test]
2791 async fn test_open_buffer_while_getting_definition_pointing_to_it(
2792 mut cx_a: TestAppContext,
2793 mut cx_b: TestAppContext,
2794 mut rng: StdRng,
2795 ) {
2796 cx_a.foreground().forbid_parking();
2797 let mut lang_registry = Arc::new(LanguageRegistry::new());
2798 let fs = Arc::new(FakeFs::new(cx_a.background()));
2799 fs.insert_tree(
2800 "/root",
2801 json!({
2802 ".zed.toml": r#"collaborators = ["user_b"]"#,
2803 "a.rs": "const ONE: usize = b::TWO;",
2804 "b.rs": "const TWO: usize = 2",
2805 }),
2806 )
2807 .await;
2808
2809 // Set up a fake language server.
2810 let (language_server_config, mut fake_language_server) =
2811 LanguageServerConfig::fake(cx_a.background()).await;
2812 Arc::get_mut(&mut lang_registry)
2813 .unwrap()
2814 .add(Arc::new(Language::new(
2815 LanguageConfig {
2816 name: "Rust".to_string(),
2817 path_suffixes: vec!["rs".to_string()],
2818 language_server: Some(language_server_config),
2819 ..Default::default()
2820 },
2821 Some(tree_sitter_rust::language()),
2822 )));
2823
2824 // Connect to a server as 2 clients.
2825 let mut server = TestServer::start(cx_a.foreground()).await;
2826 let client_a = server.create_client(&mut cx_a, "user_a").await;
2827 let client_b = server.create_client(&mut cx_b, "user_b").await;
2828
2829 // Share a project as client A
2830 let project_a = cx_a.update(|cx| {
2831 Project::local(
2832 client_a.clone(),
2833 client_a.user_store.clone(),
2834 lang_registry.clone(),
2835 fs.clone(),
2836 cx,
2837 )
2838 });
2839 let (worktree_a, _) = project_a
2840 .update(&mut cx_a, |p, cx| {
2841 p.find_or_create_local_worktree("/root", false, cx)
2842 })
2843 .await
2844 .unwrap();
2845 worktree_a
2846 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2847 .await;
2848 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2849 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2850 project_a
2851 .update(&mut cx_a, |p, cx| p.share(cx))
2852 .await
2853 .unwrap();
2854
2855 // Join the worktree as client B.
2856 let project_b = Project::remote(
2857 project_id,
2858 client_b.clone(),
2859 client_b.user_store.clone(),
2860 lang_registry.clone(),
2861 fs.clone(),
2862 &mut cx_b.to_async(),
2863 )
2864 .await
2865 .unwrap();
2866
2867 let buffer_b1 = cx_b
2868 .background()
2869 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2870 .await
2871 .unwrap();
2872
2873 let definitions;
2874 let buffer_b2;
2875 if rng.gen() {
2876 definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2877 buffer_b2 =
2878 project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2879 } else {
2880 buffer_b2 =
2881 project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2882 definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2883 }
2884
2885 let (request_id, _) = fake_language_server
2886 .receive_request::<lsp::request::GotoDefinition>()
2887 .await;
2888 fake_language_server
2889 .respond(
2890 request_id,
2891 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2892 lsp::Url::from_file_path("/root/b.rs").unwrap(),
2893 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2894 ))),
2895 )
2896 .await;
2897
2898 let buffer_b2 = buffer_b2.await.unwrap();
2899 let definitions = definitions.await.unwrap();
2900 assert_eq!(definitions.len(), 1);
2901 assert_eq!(definitions[0].target_buffer, buffer_b2);
2902 }
2903
2904 #[gpui::test]
2905 async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2906 cx_a.foreground().forbid_parking();
2907
2908 // Connect to a server as 2 clients.
2909 let mut server = TestServer::start(cx_a.foreground()).await;
2910 let client_a = server.create_client(&mut cx_a, "user_a").await;
2911 let client_b = server.create_client(&mut cx_b, "user_b").await;
2912
2913 // Create an org that includes these 2 users.
2914 let db = &server.app_state.db;
2915 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2916 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2917 .await
2918 .unwrap();
2919 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2920 .await
2921 .unwrap();
2922
2923 // Create a channel that includes all the users.
2924 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2925 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2926 .await
2927 .unwrap();
2928 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2929 .await
2930 .unwrap();
2931 db.create_channel_message(
2932 channel_id,
2933 client_b.current_user_id(&cx_b),
2934 "hello A, it's B.",
2935 OffsetDateTime::now_utc(),
2936 1,
2937 )
2938 .await
2939 .unwrap();
2940
2941 let channels_a = cx_a
2942 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2943 channels_a
2944 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2945 .await;
2946 channels_a.read_with(&cx_a, |list, _| {
2947 assert_eq!(
2948 list.available_channels().unwrap(),
2949 &[ChannelDetails {
2950 id: channel_id.to_proto(),
2951 name: "test-channel".to_string()
2952 }]
2953 )
2954 });
2955 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2956 this.get_channel(channel_id.to_proto(), cx).unwrap()
2957 });
2958 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2959 channel_a
2960 .condition(&cx_a, |channel, _| {
2961 channel_messages(channel)
2962 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2963 })
2964 .await;
2965
2966 let channels_b = cx_b
2967 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2968 channels_b
2969 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2970 .await;
2971 channels_b.read_with(&cx_b, |list, _| {
2972 assert_eq!(
2973 list.available_channels().unwrap(),
2974 &[ChannelDetails {
2975 id: channel_id.to_proto(),
2976 name: "test-channel".to_string()
2977 }]
2978 )
2979 });
2980
2981 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2982 this.get_channel(channel_id.to_proto(), cx).unwrap()
2983 });
2984 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2985 channel_b
2986 .condition(&cx_b, |channel, _| {
2987 channel_messages(channel)
2988 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2989 })
2990 .await;
2991
2992 channel_a
2993 .update(&mut cx_a, |channel, cx| {
2994 channel
2995 .send_message("oh, hi B.".to_string(), cx)
2996 .unwrap()
2997 .detach();
2998 let task = channel.send_message("sup".to_string(), cx).unwrap();
2999 assert_eq!(
3000 channel_messages(channel),
3001 &[
3002 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3003 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3004 ("user_a".to_string(), "sup".to_string(), true)
3005 ]
3006 );
3007 task
3008 })
3009 .await
3010 .unwrap();
3011
3012 channel_b
3013 .condition(&cx_b, |channel, _| {
3014 channel_messages(channel)
3015 == [
3016 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3017 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3018 ("user_a".to_string(), "sup".to_string(), false),
3019 ]
3020 })
3021 .await;
3022
3023 assert_eq!(
3024 server
3025 .state()
3026 .await
3027 .channel(channel_id)
3028 .unwrap()
3029 .connection_ids
3030 .len(),
3031 2
3032 );
3033 cx_b.update(|_| drop(channel_b));
3034 server
3035 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3036 .await;
3037
3038 cx_a.update(|_| drop(channel_a));
3039 server
3040 .condition(|state| state.channel(channel_id).is_none())
3041 .await;
3042 }
3043
3044 #[gpui::test]
3045 async fn test_chat_message_validation(mut cx_a: TestAppContext) {
3046 cx_a.foreground().forbid_parking();
3047
3048 let mut server = TestServer::start(cx_a.foreground()).await;
3049 let client_a = server.create_client(&mut cx_a, "user_a").await;
3050
3051 let db = &server.app_state.db;
3052 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3053 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3054 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3055 .await
3056 .unwrap();
3057 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3058 .await
3059 .unwrap();
3060
3061 let channels_a = cx_a
3062 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3063 channels_a
3064 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3065 .await;
3066 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3067 this.get_channel(channel_id.to_proto(), cx).unwrap()
3068 });
3069
3070 // Messages aren't allowed to be too long.
3071 channel_a
3072 .update(&mut cx_a, |channel, cx| {
3073 let long_body = "this is long.\n".repeat(1024);
3074 channel.send_message(long_body, cx).unwrap()
3075 })
3076 .await
3077 .unwrap_err();
3078
3079 // Messages aren't allowed to be blank.
3080 channel_a.update(&mut cx_a, |channel, cx| {
3081 channel.send_message(String::new(), cx).unwrap_err()
3082 });
3083
3084 // Leading and trailing whitespace are trimmed.
3085 channel_a
3086 .update(&mut cx_a, |channel, cx| {
3087 channel
3088 .send_message("\n surrounded by whitespace \n".to_string(), cx)
3089 .unwrap()
3090 })
3091 .await
3092 .unwrap();
3093 assert_eq!(
3094 db.get_channel_messages(channel_id, 10, None)
3095 .await
3096 .unwrap()
3097 .iter()
3098 .map(|m| &m.body)
3099 .collect::<Vec<_>>(),
3100 &["surrounded by whitespace"]
3101 );
3102 }
3103
3104 #[gpui::test]
3105 async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3106 cx_a.foreground().forbid_parking();
3107
3108 // Connect to a server as 2 clients.
3109 let mut server = TestServer::start(cx_a.foreground()).await;
3110 let client_a = server.create_client(&mut cx_a, "user_a").await;
3111 let client_b = server.create_client(&mut cx_b, "user_b").await;
3112 let mut status_b = client_b.status();
3113
3114 // Create an org that includes these 2 users.
3115 let db = &server.app_state.db;
3116 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3117 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3118 .await
3119 .unwrap();
3120 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3121 .await
3122 .unwrap();
3123
3124 // Create a channel that includes all the users.
3125 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3126 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3127 .await
3128 .unwrap();
3129 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3130 .await
3131 .unwrap();
3132 db.create_channel_message(
3133 channel_id,
3134 client_b.current_user_id(&cx_b),
3135 "hello A, it's B.",
3136 OffsetDateTime::now_utc(),
3137 2,
3138 )
3139 .await
3140 .unwrap();
3141
3142 let channels_a = cx_a
3143 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3144 channels_a
3145 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3146 .await;
3147
3148 channels_a.read_with(&cx_a, |list, _| {
3149 assert_eq!(
3150 list.available_channels().unwrap(),
3151 &[ChannelDetails {
3152 id: channel_id.to_proto(),
3153 name: "test-channel".to_string()
3154 }]
3155 )
3156 });
3157 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3158 this.get_channel(channel_id.to_proto(), cx).unwrap()
3159 });
3160 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3161 channel_a
3162 .condition(&cx_a, |channel, _| {
3163 channel_messages(channel)
3164 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3165 })
3166 .await;
3167
3168 let channels_b = cx_b
3169 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3170 channels_b
3171 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3172 .await;
3173 channels_b.read_with(&cx_b, |list, _| {
3174 assert_eq!(
3175 list.available_channels().unwrap(),
3176 &[ChannelDetails {
3177 id: channel_id.to_proto(),
3178 name: "test-channel".to_string()
3179 }]
3180 )
3181 });
3182
3183 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3184 this.get_channel(channel_id.to_proto(), cx).unwrap()
3185 });
3186 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3187 channel_b
3188 .condition(&cx_b, |channel, _| {
3189 channel_messages(channel)
3190 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3191 })
3192 .await;
3193
3194 // Disconnect client B, ensuring we can still access its cached channel data.
3195 server.forbid_connections();
3196 server.disconnect_client(client_b.current_user_id(&cx_b));
3197 while !matches!(
3198 status_b.next().await,
3199 Some(client::Status::ReconnectionError { .. })
3200 ) {}
3201
3202 channels_b.read_with(&cx_b, |channels, _| {
3203 assert_eq!(
3204 channels.available_channels().unwrap(),
3205 [ChannelDetails {
3206 id: channel_id.to_proto(),
3207 name: "test-channel".to_string()
3208 }]
3209 )
3210 });
3211 channel_b.read_with(&cx_b, |channel, _| {
3212 assert_eq!(
3213 channel_messages(channel),
3214 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3215 )
3216 });
3217
3218 // Send a message from client B while it is disconnected.
3219 channel_b
3220 .update(&mut cx_b, |channel, cx| {
3221 let task = channel
3222 .send_message("can you see this?".to_string(), cx)
3223 .unwrap();
3224 assert_eq!(
3225 channel_messages(channel),
3226 &[
3227 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3228 ("user_b".to_string(), "can you see this?".to_string(), true)
3229 ]
3230 );
3231 task
3232 })
3233 .await
3234 .unwrap_err();
3235
3236 // Send a message from client A while B is disconnected.
3237 channel_a
3238 .update(&mut cx_a, |channel, cx| {
3239 channel
3240 .send_message("oh, hi B.".to_string(), cx)
3241 .unwrap()
3242 .detach();
3243 let task = channel.send_message("sup".to_string(), cx).unwrap();
3244 assert_eq!(
3245 channel_messages(channel),
3246 &[
3247 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3248 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3249 ("user_a".to_string(), "sup".to_string(), true)
3250 ]
3251 );
3252 task
3253 })
3254 .await
3255 .unwrap();
3256
3257 // Give client B a chance to reconnect.
3258 server.allow_connections();
3259 cx_b.foreground().advance_clock(Duration::from_secs(10));
3260
3261 // Verify that B sees the new messages upon reconnection, as well as the message client B
3262 // sent while offline.
3263 channel_b
3264 .condition(&cx_b, |channel, _| {
3265 channel_messages(channel)
3266 == [
3267 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3268 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3269 ("user_a".to_string(), "sup".to_string(), false),
3270 ("user_b".to_string(), "can you see this?".to_string(), false),
3271 ]
3272 })
3273 .await;
3274
3275 // Ensure client A and B can communicate normally after reconnection.
3276 channel_a
3277 .update(&mut cx_a, |channel, cx| {
3278 channel.send_message("you online?".to_string(), cx).unwrap()
3279 })
3280 .await
3281 .unwrap();
3282 channel_b
3283 .condition(&cx_b, |channel, _| {
3284 channel_messages(channel)
3285 == [
3286 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3287 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3288 ("user_a".to_string(), "sup".to_string(), false),
3289 ("user_b".to_string(), "can you see this?".to_string(), false),
3290 ("user_a".to_string(), "you online?".to_string(), false),
3291 ]
3292 })
3293 .await;
3294
3295 channel_b
3296 .update(&mut cx_b, |channel, cx| {
3297 channel.send_message("yep".to_string(), cx).unwrap()
3298 })
3299 .await
3300 .unwrap();
3301 channel_a
3302 .condition(&cx_a, |channel, _| {
3303 channel_messages(channel)
3304 == [
3305 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3306 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3307 ("user_a".to_string(), "sup".to_string(), false),
3308 ("user_b".to_string(), "can you see this?".to_string(), false),
3309 ("user_a".to_string(), "you online?".to_string(), false),
3310 ("user_b".to_string(), "yep".to_string(), false),
3311 ]
3312 })
3313 .await;
3314 }
3315
3316 #[gpui::test]
3317 async fn test_contacts(
3318 mut cx_a: TestAppContext,
3319 mut cx_b: TestAppContext,
3320 mut cx_c: TestAppContext,
3321 ) {
3322 cx_a.foreground().forbid_parking();
3323 let lang_registry = Arc::new(LanguageRegistry::new());
3324 let fs = Arc::new(FakeFs::new(cx_a.background()));
3325
3326 // Connect to a server as 3 clients.
3327 let mut server = TestServer::start(cx_a.foreground()).await;
3328 let client_a = server.create_client(&mut cx_a, "user_a").await;
3329 let client_b = server.create_client(&mut cx_b, "user_b").await;
3330 let client_c = server.create_client(&mut cx_c, "user_c").await;
3331
3332 // Share a worktree as client A.
3333 fs.insert_tree(
3334 "/a",
3335 json!({
3336 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3337 }),
3338 )
3339 .await;
3340
3341 let project_a = cx_a.update(|cx| {
3342 Project::local(
3343 client_a.clone(),
3344 client_a.user_store.clone(),
3345 lang_registry.clone(),
3346 fs.clone(),
3347 cx,
3348 )
3349 });
3350 let (worktree_a, _) = project_a
3351 .update(&mut cx_a, |p, cx| {
3352 p.find_or_create_local_worktree("/a", false, cx)
3353 })
3354 .await
3355 .unwrap();
3356 worktree_a
3357 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3358 .await;
3359
3360 client_a
3361 .user_store
3362 .condition(&cx_a, |user_store, _| {
3363 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3364 })
3365 .await;
3366 client_b
3367 .user_store
3368 .condition(&cx_b, |user_store, _| {
3369 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3370 })
3371 .await;
3372 client_c
3373 .user_store
3374 .condition(&cx_c, |user_store, _| {
3375 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3376 })
3377 .await;
3378
3379 let project_id = project_a
3380 .update(&mut cx_a, |project, _| project.next_remote_id())
3381 .await;
3382 project_a
3383 .update(&mut cx_a, |project, cx| project.share(cx))
3384 .await
3385 .unwrap();
3386
3387 let _project_b = Project::remote(
3388 project_id,
3389 client_b.clone(),
3390 client_b.user_store.clone(),
3391 lang_registry.clone(),
3392 fs.clone(),
3393 &mut cx_b.to_async(),
3394 )
3395 .await
3396 .unwrap();
3397
3398 client_a
3399 .user_store
3400 .condition(&cx_a, |user_store, _| {
3401 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3402 })
3403 .await;
3404 client_b
3405 .user_store
3406 .condition(&cx_b, |user_store, _| {
3407 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3408 })
3409 .await;
3410 client_c
3411 .user_store
3412 .condition(&cx_c, |user_store, _| {
3413 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3414 })
3415 .await;
3416
3417 project_a
3418 .condition(&cx_a, |project, _| {
3419 project.collaborators().contains_key(&client_b.peer_id)
3420 })
3421 .await;
3422
3423 cx_a.update(move |_| drop(project_a));
3424 client_a
3425 .user_store
3426 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
3427 .await;
3428 client_b
3429 .user_store
3430 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
3431 .await;
3432 client_c
3433 .user_store
3434 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
3435 .await;
3436
3437 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
3438 user_store
3439 .contacts()
3440 .iter()
3441 .map(|contact| {
3442 let worktrees = contact
3443 .projects
3444 .iter()
3445 .map(|p| {
3446 (
3447 p.worktree_root_names[0].as_str(),
3448 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
3449 )
3450 })
3451 .collect();
3452 (contact.user.github_login.as_str(), worktrees)
3453 })
3454 .collect()
3455 }
3456 }
3457
3458 struct TestServer {
3459 peer: Arc<Peer>,
3460 app_state: Arc<AppState>,
3461 server: Arc<Server>,
3462 foreground: Rc<executor::Foreground>,
3463 notifications: mpsc::Receiver<()>,
3464 connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
3465 forbid_connections: Arc<AtomicBool>,
3466 _test_db: TestDb,
3467 }
3468
3469 impl TestServer {
3470 async fn start(foreground: Rc<executor::Foreground>) -> Self {
3471 let test_db = TestDb::new();
3472 let app_state = Self::build_app_state(&test_db).await;
3473 let peer = Peer::new();
3474 let notifications = mpsc::channel(128);
3475 let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
3476 Self {
3477 peer,
3478 app_state,
3479 server,
3480 foreground,
3481 notifications: notifications.1,
3482 connection_killers: Default::default(),
3483 forbid_connections: Default::default(),
3484 _test_db: test_db,
3485 }
3486 }
3487
3488 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
3489 let http = FakeHttpClient::with_404_response();
3490 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
3491 let client_name = name.to_string();
3492 let mut client = Client::new(http.clone());
3493 let server = self.server.clone();
3494 let connection_killers = self.connection_killers.clone();
3495 let forbid_connections = self.forbid_connections.clone();
3496 let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
3497
3498 Arc::get_mut(&mut client)
3499 .unwrap()
3500 .override_authenticate(move |cx| {
3501 cx.spawn(|_| async move {
3502 let access_token = "the-token".to_string();
3503 Ok(Credentials {
3504 user_id: user_id.0 as u64,
3505 access_token,
3506 })
3507 })
3508 })
3509 .override_establish_connection(move |credentials, cx| {
3510 assert_eq!(credentials.user_id, user_id.0 as u64);
3511 assert_eq!(credentials.access_token, "the-token");
3512
3513 let server = server.clone();
3514 let connection_killers = connection_killers.clone();
3515 let forbid_connections = forbid_connections.clone();
3516 let client_name = client_name.clone();
3517 let connection_id_tx = connection_id_tx.clone();
3518 cx.spawn(move |cx| async move {
3519 if forbid_connections.load(SeqCst) {
3520 Err(EstablishConnectionError::other(anyhow!(
3521 "server is forbidding connections"
3522 )))
3523 } else {
3524 let (client_conn, server_conn, kill_conn) =
3525 Connection::in_memory(cx.background());
3526 connection_killers.lock().insert(user_id, kill_conn);
3527 cx.background()
3528 .spawn(server.handle_connection(
3529 server_conn,
3530 client_name,
3531 user_id,
3532 Some(connection_id_tx),
3533 ))
3534 .detach();
3535 Ok(client_conn)
3536 }
3537 })
3538 });
3539
3540 client
3541 .authenticate_and_connect(&cx.to_async())
3542 .await
3543 .unwrap();
3544
3545 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
3546 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
3547 let mut authed_user =
3548 user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
3549 while authed_user.next().await.unwrap().is_none() {}
3550
3551 TestClient {
3552 client,
3553 peer_id,
3554 user_store,
3555 }
3556 }
3557
3558 fn disconnect_client(&self, user_id: UserId) {
3559 if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
3560 let _ = kill_conn.try_send(Some(()));
3561 }
3562 }
3563
3564 fn forbid_connections(&self) {
3565 self.forbid_connections.store(true, SeqCst);
3566 }
3567
3568 fn allow_connections(&self) {
3569 self.forbid_connections.store(false, SeqCst);
3570 }
3571
3572 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
3573 let mut config = Config::default();
3574 config.session_secret = "a".repeat(32);
3575 config.database_url = test_db.url.clone();
3576 let github_client = github::AppClient::test();
3577 Arc::new(AppState {
3578 db: test_db.db().clone(),
3579 handlebars: Default::default(),
3580 auth_client: auth::build_client("", ""),
3581 repo_client: github::RepoClient::test(&github_client),
3582 github_client,
3583 config,
3584 })
3585 }
3586
3587 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
3588 self.server.store.read()
3589 }
3590
3591 async fn condition<F>(&mut self, mut predicate: F)
3592 where
3593 F: FnMut(&Store) -> bool,
3594 {
3595 async_std::future::timeout(Duration::from_millis(500), async {
3596 while !(predicate)(&*self.server.store.read()) {
3597 self.foreground.start_waiting();
3598 self.notifications.next().await;
3599 self.foreground.finish_waiting();
3600 }
3601 })
3602 .await
3603 .expect("condition timed out");
3604 }
3605 }
3606
3607 impl Drop for TestServer {
3608 fn drop(&mut self) {
3609 self.peer.reset();
3610 }
3611 }
3612
3613 struct TestClient {
3614 client: Arc<Client>,
3615 pub peer_id: PeerId,
3616 pub user_store: ModelHandle<UserStore>,
3617 }
3618
3619 impl Deref for TestClient {
3620 type Target = Arc<Client>;
3621
3622 fn deref(&self) -> &Self::Target {
3623 &self.client
3624 }
3625 }
3626
3627 impl TestClient {
3628 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
3629 UserId::from_proto(
3630 self.user_store
3631 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
3632 )
3633 }
3634 }
3635
3636 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
3637 channel
3638 .messages()
3639 .cursor::<()>()
3640 .map(|m| {
3641 (
3642 m.sender.github_login.clone(),
3643 m.body.clone(),
3644 m.is_pending(),
3645 )
3646 })
3647 .collect()
3648 }
3649
3650 struct EmptyView;
3651
3652 impl gpui::Entity for EmptyView {
3653 type Event = ();
3654 }
3655
3656 impl gpui::View for EmptyView {
3657 fn ui_name() -> &'static str {
3658 "empty view"
3659 }
3660
3661 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
3662 gpui::Element::boxed(gpui::elements::Empty)
3663 }
3664 }
3665}