1mod store;
2
3use crate::{
4 auth,
5 db::{self, ChannelId, MessageId, User, UserId},
6 AppState, Result,
7};
8use anyhow::anyhow;
9use async_tungstenite::tungstenite::{
10 protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
11};
12use axum::{
13 body::Body,
14 extract::{
15 ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
16 ConnectInfo, WebSocketUpgrade,
17 },
18 headers::{Header, HeaderName},
19 http::StatusCode,
20 middleware,
21 response::IntoResponse,
22 routing::get,
23 Extension, Router, TypedHeader,
24};
25use collections::HashMap;
26use futures::{
27 channel::mpsc,
28 future::{self, BoxFuture},
29 FutureExt, SinkExt, StreamExt, TryStreamExt,
30};
31use lazy_static::lazy_static;
32use rpc::{
33 proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
34 Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
35};
36use serde::{Serialize, Serializer};
37use std::{
38 any::TypeId,
39 future::Future,
40 marker::PhantomData,
41 net::SocketAddr,
42 ops::{Deref, DerefMut},
43 rc::Rc,
44 sync::{
45 atomic::{AtomicBool, Ordering::SeqCst},
46 Arc,
47 },
48 time::Duration,
49};
50use store::{Store, Worktree};
51use time::OffsetDateTime;
52use tokio::{
53 sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
54 time::Sleep,
55};
56use tower::ServiceBuilder;
57use tracing::{info_span, instrument, Instrument};
58
59type MessageHandler =
60 Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
61
62struct Response<R> {
63 server: Arc<Server>,
64 receipt: Receipt<R>,
65 responded: Arc<AtomicBool>,
66}
67
68impl<R: RequestMessage> Response<R> {
69 fn send(self, payload: R::Response) -> Result<()> {
70 self.responded.store(true, SeqCst);
71 self.server.peer.respond(self.receipt, payload)?;
72 Ok(())
73 }
74
75 fn into_receipt(self) -> Receipt<R> {
76 self.responded.store(true, SeqCst);
77 self.receipt
78 }
79}
80
81pub struct Server {
82 peer: Arc<Peer>,
83 store: RwLock<Store>,
84 app_state: Arc<AppState>,
85 handlers: HashMap<TypeId, MessageHandler>,
86 notifications: Option<mpsc::UnboundedSender<()>>,
87}
88
89
90pub trait Executor: Send + Clone {
91 type Sleep: Send + Future;
92 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
93 fn sleep(&self, duration: Duration) -> Self::Sleep;
94}
95
96#[derive(Clone)]
97pub struct RealExecutor;
98
99const MESSAGE_COUNT_PER_PAGE: usize = 100;
100const MAX_MESSAGE_LEN: usize = 1024;
101
102struct StoreReadGuard<'a> {
103 guard: RwLockReadGuard<'a, Store>,
104 _not_send: PhantomData<Rc<()>>,
105}
106
107struct StoreWriteGuard<'a> {
108 guard: RwLockWriteGuard<'a, Store>,
109 _not_send: PhantomData<Rc<()>>,
110}
111
112#[derive(Serialize)]
113pub struct ServerSnapshot<'a> {
114 peer: &'a Peer,
115 #[serde(serialize_with = "serialize_deref")]
116 store: RwLockReadGuard<'a, Store>,
117}
118
119pub fn serialize_deref<S, T, U>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
120where
121 S: Serializer,
122 T: Deref<Target = U>,
123 U: Serialize
124{
125 Serialize::serialize(value.deref(), serializer)
126}
127
128
129impl Server {
130 pub fn new(
131 app_state: Arc<AppState>,
132 notifications: Option<mpsc::UnboundedSender<()>>,
133 ) -> Arc<Self> {
134 let mut server = Self {
135 peer: Peer::new(),
136 app_state,
137 store: Default::default(),
138 handlers: Default::default(),
139 notifications,
140 };
141
142 server
143 .add_request_handler(Server::ping)
144 .add_request_handler(Server::register_project)
145 .add_message_handler(Server::unregister_project)
146 .add_request_handler(Server::join_project)
147 .add_message_handler(Server::leave_project)
148 .add_message_handler(Server::respond_to_join_project_request)
149 .add_request_handler(Server::register_worktree)
150 .add_message_handler(Server::unregister_worktree)
151 .add_request_handler(Server::update_worktree)
152 .add_message_handler(Server::start_language_server)
153 .add_message_handler(Server::update_language_server)
154 .add_message_handler(Server::update_diagnostic_summary)
155 .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
156 .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
157 .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
158 .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
159 .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
160 .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
161 .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
162 .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
163 .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
164 .add_request_handler(
165 Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
166 )
167 .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
168 .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
169 .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
170 .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
171 .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
172 .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
173 .add_request_handler(Server::forward_project_request::<proto::CreateProjectEntry>)
174 .add_request_handler(Server::forward_project_request::<proto::RenameProjectEntry>)
175 .add_request_handler(Server::forward_project_request::<proto::DeleteProjectEntry>)
176 .add_request_handler(Server::update_buffer)
177 .add_message_handler(Server::update_buffer_file)
178 .add_message_handler(Server::buffer_reloaded)
179 .add_message_handler(Server::buffer_saved)
180 .add_request_handler(Server::save_buffer)
181 .add_request_handler(Server::get_channels)
182 .add_request_handler(Server::get_users)
183 .add_request_handler(Server::fuzzy_search_users)
184 .add_request_handler(Server::request_contact)
185 .add_request_handler(Server::remove_contact)
186 .add_request_handler(Server::respond_to_contact_request)
187 .add_request_handler(Server::join_channel)
188 .add_message_handler(Server::leave_channel)
189 .add_request_handler(Server::send_channel_message)
190 .add_request_handler(Server::follow)
191 .add_message_handler(Server::unfollow)
192 .add_message_handler(Server::update_followers)
193 .add_request_handler(Server::get_channel_messages);
194
195 Arc::new(server)
196 }
197
198 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
199 where
200 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
201 Fut: 'static + Send + Future<Output = Result<()>>,
202 M: EnvelopedMessage,
203 {
204 let prev_handler = self.handlers.insert(
205 TypeId::of::<M>(),
206 Box::new(move |server, envelope| {
207 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
208 let span = info_span!(
209 "handle message",
210 payload_type = envelope.payload_type_name(),
211 payload = format!("{:?}", envelope.payload).as_str(),
212 );
213 let future = (handler)(server, *envelope);
214 async move {
215 if let Err(error) = future.await {
216 tracing::error!(%error, "error handling message");
217 }
218 }
219 .instrument(span)
220 .boxed()
221 }),
222 );
223 if prev_handler.is_some() {
224 panic!("registered a handler for the same message twice");
225 }
226 self
227 }
228
229 /// Handle a request while holding a lock to the store. This is useful when we're registering
230 /// a connection but we want to respond on the connection before anybody else can send on it.
231 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
232 where
233 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>, Response<M>) -> Fut,
234 Fut: Send + Future<Output = Result<()>>,
235 M: RequestMessage,
236 {
237 let handler = Arc::new(handler);
238 self.add_message_handler(move |server, envelope| {
239 let receipt = envelope.receipt();
240 let handler = handler.clone();
241 async move {
242 let responded = Arc::new(AtomicBool::default());
243 let response = Response {
244 server: server.clone(),
245 responded: responded.clone(),
246 receipt: envelope.receipt(),
247 };
248 match (handler)(server.clone(), envelope, response).await {
249 Ok(()) => {
250 if responded.load(std::sync::atomic::Ordering::SeqCst) {
251 Ok(())
252 } else {
253 Err(anyhow!("handler did not send a response"))?
254 }
255 }
256 Err(error) => {
257 server.peer.respond_with_error(
258 receipt,
259 proto::Error {
260 message: error.to_string(),
261 },
262 )?;
263 Err(error)
264 }
265 }
266 }
267 })
268 }
269
270 pub fn handle_connection<E: Executor>(
271 self: &Arc<Self>,
272 connection: Connection,
273 address: String,
274 user: User,
275 mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
276 executor: E,
277 ) -> impl Future<Output = Result<()>> {
278 let mut this = self.clone();
279 let user_id = user.id;
280 let login = user.github_login;
281 let span = info_span!("handle connection", %user_id, %login, %address);
282 async move {
283 let (connection_id, handle_io, mut incoming_rx) = this
284 .peer
285 .add_connection(connection, {
286 let executor = executor.clone();
287 move |duration| {
288 let timer = executor.sleep(duration);
289 async move {
290 timer.await;
291 }
292 }
293 })
294 .await;
295
296 tracing::info!(%user_id, %login, %connection_id, %address, "connection opened");
297
298 if let Some(send_connection_id) = send_connection_id.as_mut() {
299 let _ = send_connection_id.send(connection_id).await;
300 }
301
302 if !user.connected_once {
303 this.peer.send(connection_id, proto::ShowContacts {})?;
304 this.app_state.db.set_user_connected_once(user_id, true).await?;
305 }
306
307 let (contacts, invite_code) = future::try_join(
308 this.app_state.db.get_contacts(user_id),
309 this.app_state.db.get_invite_code_for_user(user_id)
310 ).await?;
311
312 {
313 let mut store = this.store_mut().await;
314 store.add_connection(connection_id, user_id);
315 this.peer.send(connection_id, store.build_initial_contacts_update(contacts))?;
316
317 if let Some((code, count)) = invite_code {
318 this.peer.send(connection_id, proto::UpdateInviteInfo {
319 url: format!("{}{}", this.app_state.invite_link_prefix, code),
320 count,
321 })?;
322 }
323 }
324 this.update_user_contacts(user_id).await?;
325
326 let handle_io = handle_io.fuse();
327 futures::pin_mut!(handle_io);
328 loop {
329 let next_message = incoming_rx.next().fuse();
330 futures::pin_mut!(next_message);
331 futures::select_biased! {
332 result = handle_io => {
333 if let Err(error) = result {
334 tracing::error!(?error, %user_id, %login, %connection_id, %address, "error handling I/O");
335 }
336 break;
337 }
338 message = next_message => {
339 if let Some(message) = message {
340 let type_name = message.payload_type_name();
341 let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name);
342 async {
343 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
344 let notifications = this.notifications.clone();
345 let is_background = message.is_background();
346 let handle_message = (handler)(this.clone(), message);
347 let handle_message = async move {
348 handle_message.await;
349 if let Some(mut notifications) = notifications {
350 let _ = notifications.send(()).await;
351 }
352 };
353 if is_background {
354 executor.spawn_detached(handle_message);
355 } else {
356 handle_message.await;
357 }
358 } else {
359 tracing::error!(%user_id, %login, %connection_id, %address, "no message handler");
360 }
361 }.instrument(span).await;
362 } else {
363 tracing::info!(%user_id, %login, %connection_id, %address, "connection closed");
364 break;
365 }
366 }
367 }
368 }
369
370 tracing::info!(%user_id, %login, %connection_id, %address, "signing out");
371 if let Err(error) = this.sign_out(connection_id).await {
372 tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out");
373 }
374
375 Ok(())
376 }.instrument(span)
377 }
378
379 #[instrument(skip(self), err)]
380 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
381 self.peer.disconnect(connection_id);
382
383 let removed_user_id = {
384 let mut store = self.store_mut().await;
385 let removed_connection = store.remove_connection(connection_id)?;
386
387 for (project_id, project) in removed_connection.hosted_projects {
388 broadcast(connection_id, project.guests.keys().copied(), |conn_id| {
389 self.peer
390 .send(conn_id, proto::UnregisterProject { project_id })
391 });
392
393 for (_, receipts) in project.join_requests {
394 for receipt in receipts {
395 self.peer.respond(
396 receipt,
397 proto::JoinProjectResponse {
398 variant: Some(proto::join_project_response::Variant::Decline(
399 proto::join_project_response::Decline {
400 reason: proto::join_project_response::decline::Reason::WentOffline as i32
401 },
402 )),
403 },
404 )?;
405 }
406 }
407 }
408
409 for project_id in removed_connection.guest_project_ids {
410 if let Some(project) = store.project(project_id).trace_err() {
411 broadcast(connection_id, project.connection_ids(), |conn_id| {
412 self.peer.send(
413 conn_id,
414 proto::RemoveProjectCollaborator {
415 project_id,
416 peer_id: connection_id.0,
417 },
418 )
419 });
420 if project.guests.is_empty() {
421 self.peer
422 .send(
423 project.host_connection_id,
424 proto::ProjectUnshared { project_id },
425 )
426 .trace_err();
427 }
428 }
429 }
430
431 removed_connection.user_id
432 };
433
434 self.update_user_contacts(removed_user_id).await?;
435
436 Ok(())
437 }
438
439 pub async fn invite_code_redeemed(self: &Arc<Self>, code: &str, invitee_id: UserId) -> Result<()> {
440 let user = self.app_state.db.get_user_for_invite_code(code).await?;
441 let store = self.store().await;
442 let invitee_contact = store.contact_for_user(invitee_id, true);
443 for connection_id in store.connection_ids_for_user(user.id) {
444 self.peer.send(connection_id, proto::UpdateContacts {
445 contacts: vec![invitee_contact.clone()],
446 ..Default::default()
447 })?;
448 self.peer.send(connection_id, proto::UpdateInviteInfo {
449 url: format!("{}{}", self.app_state.invite_link_prefix, code),
450 count: user.invite_count as u32,
451 })?;
452 }
453 Ok(())
454 }
455
456 async fn ping(
457 self: Arc<Server>,
458 _: TypedEnvelope<proto::Ping>,
459 response: Response<proto::Ping>,
460 ) -> Result<()> {
461 response.send(proto::Ack {})?;
462 Ok(())
463 }
464
465 async fn register_project(
466 self: Arc<Server>,
467 request: TypedEnvelope<proto::RegisterProject>,
468 response: Response<proto::RegisterProject>,
469 ) -> Result<()> {
470 let user_id;
471 let project_id;
472 {
473 let mut state = self.store_mut().await;
474 user_id = state.user_id_for_connection(request.sender_id)?;
475 project_id = state.register_project(request.sender_id, user_id);
476 };
477 self.update_user_contacts(user_id).await?;
478 response.send(proto::RegisterProjectResponse { project_id })?;
479 Ok(())
480 }
481
482 async fn unregister_project(
483 self: Arc<Server>,
484 request: TypedEnvelope<proto::UnregisterProject>,
485 ) -> Result<()> {
486 let (user_id, project) = {
487 let mut state = self.store_mut().await;
488 let project =
489 state.unregister_project(request.payload.project_id, request.sender_id)?;
490 (state.user_id_for_connection(request.sender_id)?, project)
491 };
492
493 broadcast(
494 request.sender_id,
495 project.guests.keys().copied(),
496 |conn_id| {
497 self.peer.send(
498 conn_id,
499 proto::UnregisterProject {
500 project_id: request.payload.project_id,
501 },
502 )
503 },
504 );
505 for (_, receipts) in project.join_requests {
506 for receipt in receipts {
507 self.peer.respond(
508 receipt,
509 proto::JoinProjectResponse {
510 variant: Some(proto::join_project_response::Variant::Decline(
511 proto::join_project_response::Decline {
512 reason: proto::join_project_response::decline::Reason::Closed
513 as i32,
514 },
515 )),
516 },
517 )?;
518 }
519 }
520
521 self.update_user_contacts(user_id).await?;
522 Ok(())
523 }
524
525 async fn update_user_contacts(self: &Arc<Server>, user_id: UserId) -> Result<()> {
526 let contacts = self.app_state.db.get_contacts(user_id).await?;
527 let store = self.store().await;
528 let updated_contact = store.contact_for_user(user_id, false);
529 for contact in contacts {
530 if let db::Contact::Accepted {
531 user_id: contact_user_id,
532 ..
533 } = contact
534 {
535 for contact_conn_id in store.connection_ids_for_user(contact_user_id) {
536 self.peer
537 .send(
538 contact_conn_id,
539 proto::UpdateContacts {
540 contacts: vec![updated_contact.clone()],
541 remove_contacts: Default::default(),
542 incoming_requests: Default::default(),
543 remove_incoming_requests: Default::default(),
544 outgoing_requests: Default::default(),
545 remove_outgoing_requests: Default::default(),
546 },
547 )
548 .trace_err();
549 }
550 }
551 }
552 Ok(())
553 }
554
555 async fn join_project(
556 self: Arc<Server>,
557 request: TypedEnvelope<proto::JoinProject>,
558 response: Response<proto::JoinProject>,
559 ) -> Result<()> {
560 let project_id = request.payload.project_id;
561 let host_user_id;
562 let guest_user_id;
563 let host_connection_id;
564 {
565 let state = self.store().await;
566 let project = state.project(project_id)?;
567 host_user_id = project.host_user_id;
568 host_connection_id = project.host_connection_id;
569 guest_user_id = state.user_id_for_connection(request.sender_id)?;
570 };
571
572 let has_contact = self
573 .app_state
574 .db
575 .has_contact(guest_user_id, host_user_id)
576 .await?;
577 if !has_contact {
578 return Err(anyhow!("no such project"))?;
579 }
580
581 self.store_mut().await.request_join_project(
582 guest_user_id,
583 project_id,
584 response.into_receipt(),
585 )?;
586 self.peer.send(
587 host_connection_id,
588 proto::RequestJoinProject {
589 project_id,
590 requester_id: guest_user_id.to_proto(),
591 },
592 )?;
593 Ok(())
594 }
595
596 async fn respond_to_join_project_request(
597 self: Arc<Server>,
598 request: TypedEnvelope<proto::RespondToJoinProjectRequest>,
599 ) -> Result<()> {
600 let host_user_id;
601
602 {
603 let mut state = self.store_mut().await;
604 let project_id = request.payload.project_id;
605 let project = state.project(project_id)?;
606 if project.host_connection_id != request.sender_id {
607 Err(anyhow!("no such connection"))?;
608 }
609
610 host_user_id = project.host_user_id;
611 let guest_user_id = UserId::from_proto(request.payload.requester_id);
612
613 if !request.payload.allow {
614 let receipts = state
615 .deny_join_project_request(request.sender_id, guest_user_id, project_id)
616 .ok_or_else(|| anyhow!("no such request"))?;
617 for receipt in receipts {
618 self.peer.respond(
619 receipt,
620 proto::JoinProjectResponse {
621 variant: Some(proto::join_project_response::Variant::Decline(
622 proto::join_project_response::Decline {
623 reason: proto::join_project_response::decline::Reason::Declined
624 as i32,
625 },
626 )),
627 },
628 )?;
629 }
630 return Ok(());
631 }
632
633 let (receipts_with_replica_ids, project) = state
634 .accept_join_project_request(request.sender_id, guest_user_id, project_id)
635 .ok_or_else(|| anyhow!("no such request"))?;
636
637 let peer_count = project.guests.len();
638 let mut collaborators = Vec::with_capacity(peer_count);
639 collaborators.push(proto::Collaborator {
640 peer_id: project.host_connection_id.0,
641 replica_id: 0,
642 user_id: project.host_user_id.to_proto(),
643 });
644 let worktrees = project
645 .worktrees
646 .iter()
647 .filter_map(|(id, shared_worktree)| {
648 let worktree = project.worktrees.get(&id)?;
649 Some(proto::Worktree {
650 id: *id,
651 root_name: worktree.root_name.clone(),
652 entries: shared_worktree.entries.values().cloned().collect(),
653 diagnostic_summaries: shared_worktree
654 .diagnostic_summaries
655 .values()
656 .cloned()
657 .collect(),
658 visible: worktree.visible,
659 scan_id: shared_worktree.scan_id,
660 })
661 })
662 .collect::<Vec<_>>();
663
664 // Add all guests other than the requesting user's own connections as collaborators
665 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &project.guests {
666 if receipts_with_replica_ids
667 .iter()
668 .all(|(receipt, _)| receipt.sender_id != *peer_conn_id)
669 {
670 collaborators.push(proto::Collaborator {
671 peer_id: peer_conn_id.0,
672 replica_id: *peer_replica_id as u32,
673 user_id: peer_user_id.to_proto(),
674 });
675 }
676 }
677
678 for conn_id in project.connection_ids() {
679 for (receipt, replica_id) in &receipts_with_replica_ids {
680 if conn_id != receipt.sender_id {
681 self.peer.send(
682 conn_id,
683 proto::AddProjectCollaborator {
684 project_id,
685 collaborator: Some(proto::Collaborator {
686 peer_id: receipt.sender_id.0,
687 replica_id: *replica_id as u32,
688 user_id: guest_user_id.to_proto(),
689 }),
690 },
691 )?;
692 }
693 }
694 }
695
696 for (receipt, replica_id) in receipts_with_replica_ids {
697 self.peer.respond(
698 receipt,
699 proto::JoinProjectResponse {
700 variant: Some(proto::join_project_response::Variant::Accept(
701 proto::join_project_response::Accept {
702 worktrees: worktrees.clone(),
703 replica_id: replica_id as u32,
704 collaborators: collaborators.clone(),
705 language_servers: project.language_servers.clone(),
706 },
707 )),
708 },
709 )?;
710 }
711 }
712
713 self.update_user_contacts(host_user_id).await?;
714 Ok(())
715 }
716
717 async fn leave_project(
718 self: Arc<Server>,
719 request: TypedEnvelope<proto::LeaveProject>,
720 ) -> Result<()> {
721 let sender_id = request.sender_id;
722 let project_id = request.payload.project_id;
723 let project;
724 {
725 let mut store = self.store_mut().await;
726 project = store.leave_project(sender_id, project_id)?;
727
728 if project.remove_collaborator {
729 broadcast(sender_id, project.connection_ids, |conn_id| {
730 self.peer.send(
731 conn_id,
732 proto::RemoveProjectCollaborator {
733 project_id,
734 peer_id: sender_id.0,
735 },
736 )
737 });
738 }
739
740 if let Some(requester_id) = project.cancel_request {
741 self.peer.send(
742 project.host_connection_id,
743 proto::JoinProjectRequestCancelled {
744 project_id,
745 requester_id: requester_id.to_proto(),
746 },
747 )?;
748 }
749
750 if project.unshare {
751 self.peer.send(
752 project.host_connection_id,
753 proto::ProjectUnshared { project_id },
754 )?;
755 }
756 }
757 self.update_user_contacts(project.host_user_id).await?;
758 Ok(())
759 }
760
761 async fn register_worktree(
762 self: Arc<Server>,
763 request: TypedEnvelope<proto::RegisterWorktree>,
764 response: Response<proto::RegisterWorktree>,
765 ) -> Result<()> {
766 let host_user_id;
767 {
768 let mut state = self.store_mut().await;
769 host_user_id = state.user_id_for_connection(request.sender_id)?;
770
771 let guest_connection_ids = state
772 .read_project(request.payload.project_id, request.sender_id)?
773 .guest_connection_ids();
774 state.register_worktree(
775 request.payload.project_id,
776 request.payload.worktree_id,
777 request.sender_id,
778 Worktree {
779 root_name: request.payload.root_name.clone(),
780 visible: request.payload.visible,
781 ..Default::default()
782 },
783 )?;
784
785 broadcast(request.sender_id, guest_connection_ids, |connection_id| {
786 self.peer
787 .forward_send(request.sender_id, connection_id, request.payload.clone())
788 });
789 }
790 self.update_user_contacts(host_user_id).await?;
791 response.send(proto::Ack {})?;
792 Ok(())
793 }
794
795 async fn unregister_worktree(
796 self: Arc<Server>,
797 request: TypedEnvelope<proto::UnregisterWorktree>,
798 ) -> Result<()> {
799 let host_user_id;
800 let project_id = request.payload.project_id;
801 let worktree_id = request.payload.worktree_id;
802 {
803 let mut state = self.store_mut().await;
804 let (_, guest_connection_ids) =
805 state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
806 host_user_id = state.user_id_for_connection(request.sender_id)?;
807 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
808 self.peer.send(
809 conn_id,
810 proto::UnregisterWorktree {
811 project_id,
812 worktree_id,
813 },
814 )
815 });
816 }
817 self.update_user_contacts(host_user_id).await?;
818 Ok(())
819 }
820
821 async fn update_worktree(
822 self: Arc<Server>,
823 request: TypedEnvelope<proto::UpdateWorktree>,
824 response: Response<proto::UpdateWorktree>,
825 ) -> Result<()> {
826 let connection_ids = self.store_mut().await.update_worktree(
827 request.sender_id,
828 request.payload.project_id,
829 request.payload.worktree_id,
830 &request.payload.removed_entries,
831 &request.payload.updated_entries,
832 request.payload.scan_id,
833 )?;
834
835 broadcast(request.sender_id, connection_ids, |connection_id| {
836 self.peer
837 .forward_send(request.sender_id, connection_id, request.payload.clone())
838 });
839 response.send(proto::Ack {})?;
840 Ok(())
841 }
842
843 async fn update_diagnostic_summary(
844 self: Arc<Server>,
845 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
846 ) -> Result<()> {
847 let summary = request
848 .payload
849 .summary
850 .clone()
851 .ok_or_else(|| anyhow!("invalid summary"))?;
852 let receiver_ids = self.store_mut().await.update_diagnostic_summary(
853 request.payload.project_id,
854 request.payload.worktree_id,
855 request.sender_id,
856 summary,
857 )?;
858
859 broadcast(request.sender_id, receiver_ids, |connection_id| {
860 self.peer
861 .forward_send(request.sender_id, connection_id, request.payload.clone())
862 });
863 Ok(())
864 }
865
866 async fn start_language_server(
867 self: Arc<Server>,
868 request: TypedEnvelope<proto::StartLanguageServer>,
869 ) -> Result<()> {
870 let receiver_ids = self.store_mut().await.start_language_server(
871 request.payload.project_id,
872 request.sender_id,
873 request
874 .payload
875 .server
876 .clone()
877 .ok_or_else(|| anyhow!("invalid language server"))?,
878 )?;
879 broadcast(request.sender_id, receiver_ids, |connection_id| {
880 self.peer
881 .forward_send(request.sender_id, connection_id, request.payload.clone())
882 });
883 Ok(())
884 }
885
886 async fn update_language_server(
887 self: Arc<Server>,
888 request: TypedEnvelope<proto::UpdateLanguageServer>,
889 ) -> Result<()> {
890 let receiver_ids = self
891 .store()
892 .await
893 .project_connection_ids(request.payload.project_id, request.sender_id)?;
894 broadcast(request.sender_id, receiver_ids, |connection_id| {
895 self.peer
896 .forward_send(request.sender_id, connection_id, request.payload.clone())
897 });
898 Ok(())
899 }
900
901 async fn forward_project_request<T>(
902 self: Arc<Server>,
903 request: TypedEnvelope<T>,
904 response: Response<T>,
905 ) -> Result<()>
906 where
907 T: EntityMessage + RequestMessage,
908 {
909 let host_connection_id = self
910 .store()
911 .await
912 .read_project(request.payload.remote_entity_id(), request.sender_id)?
913 .host_connection_id;
914
915 response.send(
916 self.peer
917 .forward_request(request.sender_id, host_connection_id, request.payload)
918 .await?,
919 )?;
920 Ok(())
921 }
922
923 async fn save_buffer(
924 self: Arc<Server>,
925 request: TypedEnvelope<proto::SaveBuffer>,
926 response: Response<proto::SaveBuffer>,
927 ) -> Result<()> {
928 let host = self
929 .store()
930 .await
931 .read_project(request.payload.project_id, request.sender_id)?
932 .host_connection_id;
933 let response_payload = self
934 .peer
935 .forward_request(request.sender_id, host, request.payload.clone())
936 .await?;
937
938 let mut guests = self
939 .store()
940 .await
941 .read_project(request.payload.project_id, request.sender_id)?
942 .connection_ids();
943 guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
944 broadcast(host, guests, |conn_id| {
945 self.peer
946 .forward_send(host, conn_id, response_payload.clone())
947 });
948 response.send(response_payload)?;
949 Ok(())
950 }
951
952 async fn update_buffer(
953 self: Arc<Server>,
954 request: TypedEnvelope<proto::UpdateBuffer>,
955 response: Response<proto::UpdateBuffer>,
956 ) -> Result<()> {
957 let receiver_ids = self
958 .store()
959 .await
960 .project_connection_ids(request.payload.project_id, request.sender_id)?;
961 broadcast(request.sender_id, receiver_ids, |connection_id| {
962 self.peer
963 .forward_send(request.sender_id, connection_id, request.payload.clone())
964 });
965 response.send(proto::Ack {})?;
966 Ok(())
967 }
968
969 async fn update_buffer_file(
970 self: Arc<Server>,
971 request: TypedEnvelope<proto::UpdateBufferFile>,
972 ) -> Result<()> {
973 let receiver_ids = self
974 .store()
975 .await
976 .project_connection_ids(request.payload.project_id, request.sender_id)?;
977 broadcast(request.sender_id, receiver_ids, |connection_id| {
978 self.peer
979 .forward_send(request.sender_id, connection_id, request.payload.clone())
980 });
981 Ok(())
982 }
983
984 async fn buffer_reloaded(
985 self: Arc<Server>,
986 request: TypedEnvelope<proto::BufferReloaded>,
987 ) -> Result<()> {
988 let receiver_ids = self
989 .store()
990 .await
991 .project_connection_ids(request.payload.project_id, request.sender_id)?;
992 broadcast(request.sender_id, receiver_ids, |connection_id| {
993 self.peer
994 .forward_send(request.sender_id, connection_id, request.payload.clone())
995 });
996 Ok(())
997 }
998
999 async fn buffer_saved(
1000 self: Arc<Server>,
1001 request: TypedEnvelope<proto::BufferSaved>,
1002 ) -> Result<()> {
1003 let receiver_ids = self
1004 .store()
1005 .await
1006 .project_connection_ids(request.payload.project_id, request.sender_id)?;
1007 broadcast(request.sender_id, receiver_ids, |connection_id| {
1008 self.peer
1009 .forward_send(request.sender_id, connection_id, request.payload.clone())
1010 });
1011 Ok(())
1012 }
1013
1014 async fn follow(
1015 self: Arc<Self>,
1016 request: TypedEnvelope<proto::Follow>,
1017 response: Response<proto::Follow>,
1018 ) -> Result<()> {
1019 let leader_id = ConnectionId(request.payload.leader_id);
1020 let follower_id = request.sender_id;
1021 if !self
1022 .store()
1023 .await
1024 .project_connection_ids(request.payload.project_id, follower_id)?
1025 .contains(&leader_id)
1026 {
1027 Err(anyhow!("no such peer"))?;
1028 }
1029 let mut response_payload = self
1030 .peer
1031 .forward_request(request.sender_id, leader_id, request.payload)
1032 .await?;
1033 response_payload
1034 .views
1035 .retain(|view| view.leader_id != Some(follower_id.0));
1036 response.send(response_payload)?;
1037 Ok(())
1038 }
1039
1040 async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
1041 let leader_id = ConnectionId(request.payload.leader_id);
1042 if !self
1043 .store()
1044 .await
1045 .project_connection_ids(request.payload.project_id, request.sender_id)?
1046 .contains(&leader_id)
1047 {
1048 Err(anyhow!("no such peer"))?;
1049 }
1050 self.peer
1051 .forward_send(request.sender_id, leader_id, request.payload)?;
1052 Ok(())
1053 }
1054
1055 async fn update_followers(
1056 self: Arc<Self>,
1057 request: TypedEnvelope<proto::UpdateFollowers>,
1058 ) -> Result<()> {
1059 let connection_ids = self
1060 .store()
1061 .await
1062 .project_connection_ids(request.payload.project_id, request.sender_id)?;
1063 let leader_id = request
1064 .payload
1065 .variant
1066 .as_ref()
1067 .and_then(|variant| match variant {
1068 proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
1069 proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
1070 proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
1071 });
1072 for follower_id in &request.payload.follower_ids {
1073 let follower_id = ConnectionId(*follower_id);
1074 if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
1075 self.peer
1076 .forward_send(request.sender_id, follower_id, request.payload.clone())?;
1077 }
1078 }
1079 Ok(())
1080 }
1081
1082 async fn get_channels(
1083 self: Arc<Server>,
1084 request: TypedEnvelope<proto::GetChannels>,
1085 response: Response<proto::GetChannels>,
1086 ) -> Result<()> {
1087 let user_id = self
1088 .store()
1089 .await
1090 .user_id_for_connection(request.sender_id)?;
1091 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
1092 response.send(proto::GetChannelsResponse {
1093 channels: channels
1094 .into_iter()
1095 .map(|chan| proto::Channel {
1096 id: chan.id.to_proto(),
1097 name: chan.name,
1098 })
1099 .collect(),
1100 })?;
1101 Ok(())
1102 }
1103
1104 async fn get_users(
1105 self: Arc<Server>,
1106 request: TypedEnvelope<proto::GetUsers>,
1107 response: Response<proto::GetUsers>,
1108 ) -> Result<()> {
1109 let user_ids = request
1110 .payload
1111 .user_ids
1112 .into_iter()
1113 .map(UserId::from_proto)
1114 .collect();
1115 let users = self
1116 .app_state
1117 .db
1118 .get_users_by_ids(user_ids)
1119 .await?
1120 .into_iter()
1121 .map(|user| proto::User {
1122 id: user.id.to_proto(),
1123 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1124 github_login: user.github_login,
1125 })
1126 .collect();
1127 response.send(proto::UsersResponse { users })?;
1128 Ok(())
1129 }
1130
1131 async fn fuzzy_search_users(
1132 self: Arc<Server>,
1133 request: TypedEnvelope<proto::FuzzySearchUsers>,
1134 response: Response<proto::FuzzySearchUsers>,
1135 ) -> Result<()> {
1136 let user_id = self
1137 .store()
1138 .await
1139 .user_id_for_connection(request.sender_id)?;
1140 let query = request.payload.query;
1141 let db = &self.app_state.db;
1142 let users = match query.len() {
1143 0 => vec![],
1144 1 | 2 => db
1145 .get_user_by_github_login(&query)
1146 .await?
1147 .into_iter()
1148 .collect(),
1149 _ => db.fuzzy_search_users(&query, 10).await?,
1150 };
1151 let users = users
1152 .into_iter()
1153 .filter(|user| user.id != user_id)
1154 .map(|user| proto::User {
1155 id: user.id.to_proto(),
1156 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1157 github_login: user.github_login,
1158 })
1159 .collect();
1160 response.send(proto::UsersResponse { users })?;
1161 Ok(())
1162 }
1163
1164 async fn request_contact(
1165 self: Arc<Server>,
1166 request: TypedEnvelope<proto::RequestContact>,
1167 response: Response<proto::RequestContact>,
1168 ) -> Result<()> {
1169 let requester_id = self
1170 .store()
1171 .await
1172 .user_id_for_connection(request.sender_id)?;
1173 let responder_id = UserId::from_proto(request.payload.responder_id);
1174 if requester_id == responder_id {
1175 return Err(anyhow!("cannot add yourself as a contact"))?;
1176 }
1177
1178 self.app_state
1179 .db
1180 .send_contact_request(requester_id, responder_id)
1181 .await?;
1182
1183 // Update outgoing contact requests of requester
1184 let mut update = proto::UpdateContacts::default();
1185 update.outgoing_requests.push(responder_id.to_proto());
1186 for connection_id in self.store().await.connection_ids_for_user(requester_id) {
1187 self.peer.send(connection_id, update.clone())?;
1188 }
1189
1190 // Update incoming contact requests of responder
1191 let mut update = proto::UpdateContacts::default();
1192 update
1193 .incoming_requests
1194 .push(proto::IncomingContactRequest {
1195 requester_id: requester_id.to_proto(),
1196 should_notify: true,
1197 });
1198 for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1199 self.peer.send(connection_id, update.clone())?;
1200 }
1201
1202 response.send(proto::Ack {})?;
1203 Ok(())
1204 }
1205
1206 async fn respond_to_contact_request(
1207 self: Arc<Server>,
1208 request: TypedEnvelope<proto::RespondToContactRequest>,
1209 response: Response<proto::RespondToContactRequest>,
1210 ) -> Result<()> {
1211 let responder_id = self
1212 .store()
1213 .await
1214 .user_id_for_connection(request.sender_id)?;
1215 let requester_id = UserId::from_proto(request.payload.requester_id);
1216 if request.payload.response == proto::ContactRequestResponse::Dismiss as i32 {
1217 self.app_state
1218 .db
1219 .dismiss_contact_notification(responder_id, requester_id)
1220 .await?;
1221 } else {
1222 let accept = request.payload.response == proto::ContactRequestResponse::Accept as i32;
1223 self.app_state
1224 .db
1225 .respond_to_contact_request(responder_id, requester_id, accept)
1226 .await?;
1227
1228 let store = self.store().await;
1229 // Update responder with new contact
1230 let mut update = proto::UpdateContacts::default();
1231 if accept {
1232 update
1233 .contacts
1234 .push(store.contact_for_user(requester_id, false));
1235 }
1236 update
1237 .remove_incoming_requests
1238 .push(requester_id.to_proto());
1239 for connection_id in store.connection_ids_for_user(responder_id) {
1240 self.peer.send(connection_id, update.clone())?;
1241 }
1242
1243 // Update requester with new contact
1244 let mut update = proto::UpdateContacts::default();
1245 if accept {
1246 update
1247 .contacts
1248 .push(store.contact_for_user(responder_id, true));
1249 }
1250 update
1251 .remove_outgoing_requests
1252 .push(responder_id.to_proto());
1253 for connection_id in store.connection_ids_for_user(requester_id) {
1254 self.peer.send(connection_id, update.clone())?;
1255 }
1256 }
1257
1258 response.send(proto::Ack {})?;
1259 Ok(())
1260 }
1261
1262 async fn remove_contact(
1263 self: Arc<Server>,
1264 request: TypedEnvelope<proto::RemoveContact>,
1265 response: Response<proto::RemoveContact>,
1266 ) -> Result<()> {
1267 let requester_id = self
1268 .store()
1269 .await
1270 .user_id_for_connection(request.sender_id)?;
1271 let responder_id = UserId::from_proto(request.payload.user_id);
1272 self.app_state
1273 .db
1274 .remove_contact(requester_id, responder_id)
1275 .await?;
1276
1277 // Update outgoing contact requests of requester
1278 let mut update = proto::UpdateContacts::default();
1279 update
1280 .remove_outgoing_requests
1281 .push(responder_id.to_proto());
1282 for connection_id in self.store().await.connection_ids_for_user(requester_id) {
1283 self.peer.send(connection_id, update.clone())?;
1284 }
1285
1286 // Update incoming contact requests of responder
1287 let mut update = proto::UpdateContacts::default();
1288 update
1289 .remove_incoming_requests
1290 .push(requester_id.to_proto());
1291 for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1292 self.peer.send(connection_id, update.clone())?;
1293 }
1294
1295 response.send(proto::Ack {})?;
1296 Ok(())
1297 }
1298
1299 async fn join_channel(
1300 self: Arc<Self>,
1301 request: TypedEnvelope<proto::JoinChannel>,
1302 response: Response<proto::JoinChannel>,
1303 ) -> Result<()> {
1304 let user_id = self
1305 .store()
1306 .await
1307 .user_id_for_connection(request.sender_id)?;
1308 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1309 if !self
1310 .app_state
1311 .db
1312 .can_user_access_channel(user_id, channel_id)
1313 .await?
1314 {
1315 Err(anyhow!("access denied"))?;
1316 }
1317
1318 self.store_mut()
1319 .await
1320 .join_channel(request.sender_id, channel_id);
1321 let messages = self
1322 .app_state
1323 .db
1324 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
1325 .await?
1326 .into_iter()
1327 .map(|msg| proto::ChannelMessage {
1328 id: msg.id.to_proto(),
1329 body: msg.body,
1330 timestamp: msg.sent_at.unix_timestamp() as u64,
1331 sender_id: msg.sender_id.to_proto(),
1332 nonce: Some(msg.nonce.as_u128().into()),
1333 })
1334 .collect::<Vec<_>>();
1335 response.send(proto::JoinChannelResponse {
1336 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1337 messages,
1338 })?;
1339 Ok(())
1340 }
1341
1342 async fn leave_channel(
1343 self: Arc<Self>,
1344 request: TypedEnvelope<proto::LeaveChannel>,
1345 ) -> Result<()> {
1346 let user_id = self
1347 .store()
1348 .await
1349 .user_id_for_connection(request.sender_id)?;
1350 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1351 if !self
1352 .app_state
1353 .db
1354 .can_user_access_channel(user_id, channel_id)
1355 .await?
1356 {
1357 Err(anyhow!("access denied"))?;
1358 }
1359
1360 self.store_mut()
1361 .await
1362 .leave_channel(request.sender_id, channel_id);
1363
1364 Ok(())
1365 }
1366
1367 async fn send_channel_message(
1368 self: Arc<Self>,
1369 request: TypedEnvelope<proto::SendChannelMessage>,
1370 response: Response<proto::SendChannelMessage>,
1371 ) -> Result<()> {
1372 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1373 let user_id;
1374 let connection_ids;
1375 {
1376 let state = self.store().await;
1377 user_id = state.user_id_for_connection(request.sender_id)?;
1378 connection_ids = state.channel_connection_ids(channel_id)?;
1379 }
1380
1381 // Validate the message body.
1382 let body = request.payload.body.trim().to_string();
1383 if body.len() > MAX_MESSAGE_LEN {
1384 return Err(anyhow!("message is too long"))?;
1385 }
1386 if body.is_empty() {
1387 return Err(anyhow!("message can't be blank"))?;
1388 }
1389
1390 let timestamp = OffsetDateTime::now_utc();
1391 let nonce = request
1392 .payload
1393 .nonce
1394 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
1395
1396 let message_id = self
1397 .app_state
1398 .db
1399 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1400 .await?
1401 .to_proto();
1402 let message = proto::ChannelMessage {
1403 sender_id: user_id.to_proto(),
1404 id: message_id,
1405 body,
1406 timestamp: timestamp.unix_timestamp() as u64,
1407 nonce: Some(nonce),
1408 };
1409 broadcast(request.sender_id, connection_ids, |conn_id| {
1410 self.peer.send(
1411 conn_id,
1412 proto::ChannelMessageSent {
1413 channel_id: channel_id.to_proto(),
1414 message: Some(message.clone()),
1415 },
1416 )
1417 });
1418 response.send(proto::SendChannelMessageResponse {
1419 message: Some(message),
1420 })?;
1421 Ok(())
1422 }
1423
1424 async fn get_channel_messages(
1425 self: Arc<Self>,
1426 request: TypedEnvelope<proto::GetChannelMessages>,
1427 response: Response<proto::GetChannelMessages>,
1428 ) -> Result<()> {
1429 let user_id = self
1430 .store()
1431 .await
1432 .user_id_for_connection(request.sender_id)?;
1433 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1434 if !self
1435 .app_state
1436 .db
1437 .can_user_access_channel(user_id, channel_id)
1438 .await?
1439 {
1440 Err(anyhow!("access denied"))?;
1441 }
1442
1443 let messages = self
1444 .app_state
1445 .db
1446 .get_channel_messages(
1447 channel_id,
1448 MESSAGE_COUNT_PER_PAGE,
1449 Some(MessageId::from_proto(request.payload.before_message_id)),
1450 )
1451 .await?
1452 .into_iter()
1453 .map(|msg| proto::ChannelMessage {
1454 id: msg.id.to_proto(),
1455 body: msg.body,
1456 timestamp: msg.sent_at.unix_timestamp() as u64,
1457 sender_id: msg.sender_id.to_proto(),
1458 nonce: Some(msg.nonce.as_u128().into()),
1459 })
1460 .collect::<Vec<_>>();
1461 response.send(proto::GetChannelMessagesResponse {
1462 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1463 messages,
1464 })?;
1465 Ok(())
1466 }
1467
1468 async fn store<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1469 #[cfg(test)]
1470 tokio::task::yield_now().await;
1471 let guard = self.store.read().await;
1472 #[cfg(test)]
1473 tokio::task::yield_now().await;
1474 StoreReadGuard {
1475 guard,
1476 _not_send: PhantomData,
1477 }
1478 }
1479
1480 async fn store_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1481 #[cfg(test)]
1482 tokio::task::yield_now().await;
1483 let guard = self.store.write().await;
1484 #[cfg(test)]
1485 tokio::task::yield_now().await;
1486 StoreWriteGuard {
1487 guard,
1488 _not_send: PhantomData,
1489 }
1490 }
1491
1492 pub async fn snapshot<'a>(self: &'a Arc<Self>) -> ServerSnapshot<'a> {
1493 ServerSnapshot {
1494 store: self.store.read().await,
1495 peer: &self.peer
1496 }
1497 }
1498}
1499
1500impl<'a> Deref for StoreReadGuard<'a> {
1501 type Target = Store;
1502
1503 fn deref(&self) -> &Self::Target {
1504 &*self.guard
1505 }
1506}
1507
1508impl<'a> Deref for StoreWriteGuard<'a> {
1509 type Target = Store;
1510
1511 fn deref(&self) -> &Self::Target {
1512 &*self.guard
1513 }
1514}
1515
1516impl<'a> DerefMut for StoreWriteGuard<'a> {
1517 fn deref_mut(&mut self) -> &mut Self::Target {
1518 &mut *self.guard
1519 }
1520}
1521
1522impl<'a> Drop for StoreWriteGuard<'a> {
1523 fn drop(&mut self) {
1524 #[cfg(test)]
1525 self.check_invariants();
1526
1527 let metrics = self.metrics();
1528 tracing::info!(
1529 connections = metrics.connections,
1530 registered_projects = metrics.registered_projects,
1531 shared_projects = metrics.shared_projects,
1532 "metrics"
1533 );
1534 }
1535}
1536
1537impl Executor for RealExecutor {
1538 type Sleep = Sleep;
1539
1540 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1541 tokio::task::spawn(future);
1542 }
1543
1544 fn sleep(&self, duration: Duration) -> Self::Sleep {
1545 tokio::time::sleep(duration)
1546 }
1547}
1548
1549fn broadcast<F>(
1550 sender_id: ConnectionId,
1551 receiver_ids: impl IntoIterator<Item = ConnectionId>,
1552 mut f: F,
1553) where
1554 F: FnMut(ConnectionId) -> anyhow::Result<()>,
1555{
1556 for receiver_id in receiver_ids {
1557 if receiver_id != sender_id {
1558 f(receiver_id).trace_err();
1559 }
1560 }
1561}
1562
1563lazy_static! {
1564 static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1565}
1566
1567pub struct ProtocolVersion(u32);
1568
1569impl Header for ProtocolVersion {
1570 fn name() -> &'static HeaderName {
1571 &ZED_PROTOCOL_VERSION
1572 }
1573
1574 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1575 where
1576 Self: Sized,
1577 I: Iterator<Item = &'i axum::http::HeaderValue>,
1578 {
1579 let version = values
1580 .next()
1581 .ok_or_else(|| axum::headers::Error::invalid())?
1582 .to_str()
1583 .map_err(|_| axum::headers::Error::invalid())?
1584 .parse()
1585 .map_err(|_| axum::headers::Error::invalid())?;
1586 Ok(Self(version))
1587 }
1588
1589 fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1590 values.extend([self.0.to_string().parse().unwrap()]);
1591 }
1592}
1593
1594pub fn routes(server: Arc<Server>) -> Router<Body> {
1595 Router::new()
1596 .route("/rpc", get(handle_websocket_request))
1597 .layer(
1598 ServiceBuilder::new()
1599 .layer(Extension(server.app_state.clone()))
1600 .layer(middleware::from_fn(auth::validate_header))
1601 .layer(Extension(server)),
1602 )
1603}
1604
1605pub async fn handle_websocket_request(
1606 TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1607 ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1608 Extension(server): Extension<Arc<Server>>,
1609 Extension(user): Extension<User>,
1610 ws: WebSocketUpgrade,
1611) -> axum::response::Response {
1612 if protocol_version != rpc::PROTOCOL_VERSION {
1613 return (
1614 StatusCode::UPGRADE_REQUIRED,
1615 "client must be upgraded".to_string(),
1616 )
1617 .into_response();
1618 }
1619 let socket_address = socket_address.to_string();
1620 ws.on_upgrade(move |socket| {
1621 use util::ResultExt;
1622 let socket = socket
1623 .map_ok(to_tungstenite_message)
1624 .err_into()
1625 .with(|message| async move { Ok(to_axum_message(message)) });
1626 let connection = Connection::new(Box::pin(socket));
1627 async move {
1628 server
1629 .handle_connection(connection, socket_address, user, None, RealExecutor)
1630 .await
1631 .log_err();
1632 }
1633 })
1634}
1635
1636fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1637 match message {
1638 TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1639 TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1640 TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1641 TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1642 TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1643 code: frame.code.into(),
1644 reason: frame.reason,
1645 })),
1646 }
1647}
1648
1649fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1650 match message {
1651 AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1652 AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1653 AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1654 AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1655 AxumMessage::Close(frame) => {
1656 TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1657 code: frame.code.into(),
1658 reason: frame.reason,
1659 }))
1660 }
1661 }
1662}
1663
1664pub trait ResultExt {
1665 type Ok;
1666
1667 fn trace_err(self) -> Option<Self::Ok>;
1668}
1669
1670impl<T, E> ResultExt for Result<T, E>
1671where
1672 E: std::fmt::Debug,
1673{
1674 type Ok = T;
1675
1676 fn trace_err(self) -> Option<T> {
1677 match self {
1678 Ok(value) => Some(value),
1679 Err(error) => {
1680 tracing::error!("{:?}", error);
1681 None
1682 }
1683 }
1684 }
1685}
1686
1687#[cfg(test)]
1688mod tests {
1689 use super::*;
1690 use crate::{
1691 db::{tests::TestDb, UserId},
1692 AppState,
1693 };
1694 use ::rpc::Peer;
1695 use client::{
1696 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1697 EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1698 };
1699 use collections::{BTreeMap, HashSet};
1700 use editor::{
1701 self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1702 ToOffset, ToggleCodeActions, Undo,
1703 };
1704 use gpui::{
1705 executor::{self, Deterministic},
1706 geometry::vector::vec2f,
1707 ModelHandle, Task, TestAppContext, ViewHandle,
1708 };
1709 use language::{
1710 range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1711 LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1712 };
1713 use lsp::{self, FakeLanguageServer};
1714 use parking_lot::Mutex;
1715 use project::{
1716 fs::{FakeFs, Fs as _},
1717 search::SearchQuery,
1718 worktree::WorktreeHandle,
1719 DiagnosticSummary, Project, ProjectPath, WorktreeId,
1720 };
1721 use rand::prelude::*;
1722 use rpc::PeerId;
1723 use serde_json::json;
1724 use settings::Settings;
1725 use sqlx::types::time::OffsetDateTime;
1726 use std::{
1727 cell::RefCell,
1728 env,
1729 ops::Deref,
1730 path::{Path, PathBuf},
1731 rc::Rc,
1732 sync::{
1733 atomic::{AtomicBool, Ordering::SeqCst},
1734 Arc,
1735 },
1736 time::Duration,
1737 };
1738 use theme::ThemeRegistry;
1739 use workspace::{Item, SplitDirection, ToggleFollow, Workspace};
1740
1741 #[cfg(test)]
1742 #[ctor::ctor]
1743 fn init_logger() {
1744 if std::env::var("RUST_LOG").is_ok() {
1745 env_logger::init();
1746 }
1747 }
1748
1749 #[gpui::test(iterations = 10)]
1750 async fn test_share_project(
1751 deterministic: Arc<Deterministic>,
1752 cx_a: &mut TestAppContext,
1753 cx_b: &mut TestAppContext,
1754 cx_b2: &mut TestAppContext,
1755 ) {
1756 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1757 let lang_registry = Arc::new(LanguageRegistry::test());
1758 let fs = FakeFs::new(cx_a.background());
1759 cx_a.foreground().forbid_parking();
1760
1761 // Connect to a server as 2 clients.
1762 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1763 let client_a = server.create_client(cx_a, "user_a").await;
1764 let mut client_b = server.create_client(cx_b, "user_b").await;
1765 server
1766 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1767 .await;
1768
1769 // Share a project as client A
1770 fs.insert_tree(
1771 "/a",
1772 json!({
1773 ".gitignore": "ignored-dir",
1774 "a.txt": "a-contents",
1775 "b.txt": "b-contents",
1776 "ignored-dir": {
1777 "c.txt": "",
1778 "d.txt": "",
1779 }
1780 }),
1781 )
1782 .await;
1783 let project_a = cx_a.update(|cx| {
1784 Project::local(
1785 client_a.clone(),
1786 client_a.user_store.clone(),
1787 lang_registry.clone(),
1788 fs.clone(),
1789 cx,
1790 )
1791 });
1792 let project_id = project_a
1793 .read_with(cx_a, |project, _| project.next_remote_id())
1794 .await;
1795 let (worktree_a, _) = project_a
1796 .update(cx_a, |p, cx| {
1797 p.find_or_create_local_worktree("/a", true, cx)
1798 })
1799 .await
1800 .unwrap();
1801 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1802 worktree_a
1803 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1804 .await;
1805
1806 // Join that project as client B
1807 let client_b_peer_id = client_b.peer_id;
1808 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1809 let replica_id_b = project_b.read_with(cx_b, |project, _| {
1810 assert_eq!(
1811 project
1812 .collaborators()
1813 .get(&client_a.peer_id)
1814 .unwrap()
1815 .user
1816 .github_login,
1817 "user_a"
1818 );
1819 project.replica_id()
1820 });
1821
1822 deterministic.run_until_parked();
1823 project_a.read_with(cx_a, |project, _| {
1824 let client_b_collaborator = project.collaborators().get(&client_b_peer_id).unwrap();
1825 assert_eq!(client_b_collaborator.replica_id, replica_id_b);
1826 assert_eq!(client_b_collaborator.user.github_login, "user_b");
1827 });
1828 project_b.read_with(cx_b, |project, cx| {
1829 let worktree = project.worktrees(cx).next().unwrap().read(cx);
1830 assert_eq!(
1831 worktree.paths().map(AsRef::as_ref).collect::<Vec<_>>(),
1832 [
1833 Path::new(".gitignore"),
1834 Path::new("a.txt"),
1835 Path::new("b.txt"),
1836 Path::new("ignored-dir"),
1837 Path::new("ignored-dir/c.txt"),
1838 Path::new("ignored-dir/d.txt"),
1839 ]
1840 );
1841 });
1842
1843 // Open the same file as client B and client A.
1844 let buffer_b = project_b
1845 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1846 .await
1847 .unwrap();
1848 buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1849 project_a.read_with(cx_a, |project, cx| {
1850 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1851 });
1852 let buffer_a = project_a
1853 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1854 .await
1855 .unwrap();
1856
1857 let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1858
1859 // TODO
1860 // // Create a selection set as client B and see that selection set as client A.
1861 // buffer_a
1862 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1863 // .await;
1864
1865 // Edit the buffer as client B and see that edit as client A.
1866 editor_b.update(cx_b, |editor, cx| {
1867 editor.handle_input(&Input("ok, ".into()), cx)
1868 });
1869 buffer_a
1870 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1871 .await;
1872
1873 // TODO
1874 // // Remove the selection set as client B, see those selections disappear as client A.
1875 cx_b.update(move |_| drop(editor_b));
1876 // buffer_a
1877 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1878 // .await;
1879
1880 // Client B can join again on a different window because they are already a participant.
1881 let client_b2 = server.create_client(cx_b2, "user_b").await;
1882 let project_b2 = Project::remote(
1883 project_id,
1884 client_b2.client.clone(),
1885 client_b2.user_store.clone(),
1886 lang_registry.clone(),
1887 FakeFs::new(cx_b2.background()),
1888 &mut cx_b2.to_async(),
1889 )
1890 .await
1891 .unwrap();
1892 deterministic.run_until_parked();
1893 project_a.read_with(cx_a, |project, _| {
1894 assert_eq!(project.collaborators().len(), 2);
1895 });
1896 project_b.read_with(cx_b, |project, _| {
1897 assert_eq!(project.collaborators().len(), 2);
1898 });
1899 project_b2.read_with(cx_b2, |project, _| {
1900 assert_eq!(project.collaborators().len(), 2);
1901 });
1902
1903 // Dropping client B's first project removes only that from client A's collaborators.
1904 cx_b.update(move |_| {
1905 drop(client_b.project.take());
1906 drop(project_b);
1907 });
1908 deterministic.run_until_parked();
1909 project_a.read_with(cx_a, |project, _| {
1910 assert_eq!(project.collaborators().len(), 1);
1911 });
1912 project_b2.read_with(cx_b2, |project, _| {
1913 assert_eq!(project.collaborators().len(), 1);
1914 });
1915 }
1916
1917 #[gpui::test(iterations = 10)]
1918 async fn test_unshare_project(
1919 deterministic: Arc<Deterministic>,
1920 cx_a: &mut TestAppContext,
1921 cx_b: &mut TestAppContext,
1922 ) {
1923 let lang_registry = Arc::new(LanguageRegistry::test());
1924 let fs = FakeFs::new(cx_a.background());
1925 cx_a.foreground().forbid_parking();
1926
1927 // Connect to a server as 2 clients.
1928 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1929 let client_a = server.create_client(cx_a, "user_a").await;
1930 let mut client_b = server.create_client(cx_b, "user_b").await;
1931 server
1932 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1933 .await;
1934
1935 // Share a project as client A
1936 fs.insert_tree(
1937 "/a",
1938 json!({
1939 "a.txt": "a-contents",
1940 "b.txt": "b-contents",
1941 }),
1942 )
1943 .await;
1944 let project_a = cx_a.update(|cx| {
1945 Project::local(
1946 client_a.clone(),
1947 client_a.user_store.clone(),
1948 lang_registry.clone(),
1949 fs.clone(),
1950 cx,
1951 )
1952 });
1953 let (worktree_a, _) = project_a
1954 .update(cx_a, |p, cx| {
1955 p.find_or_create_local_worktree("/a", true, cx)
1956 })
1957 .await
1958 .unwrap();
1959 worktree_a
1960 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1961 .await;
1962 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1963
1964 // Join that project as client B
1965 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1966 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1967 project_b
1968 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1969 .await
1970 .unwrap();
1971
1972 // When client B leaves the project, it gets automatically unshared.
1973 cx_b.update(|_| {
1974 drop(client_b.project.take());
1975 drop(project_b);
1976 });
1977 deterministic.run_until_parked();
1978 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1979
1980 // When client B joins again, the project gets re-shared.
1981 let project_b2 = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1982 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1983 project_b2
1984 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1985 .await
1986 .unwrap();
1987
1988 // When client A (the host) leaves, the project gets unshared and guests are notified.
1989 cx_a.update(|_| drop(project_a));
1990 deterministic.run_until_parked();
1991 project_b2.read_with(cx_b, |project, _| {
1992 assert!(project.is_read_only());
1993 assert!(project.collaborators().is_empty());
1994 });
1995 }
1996
1997 #[gpui::test(iterations = 10)]
1998 async fn test_host_disconnect(
1999 deterministic: Arc<Deterministic>,
2000 cx_a: &mut TestAppContext,
2001 cx_b: &mut TestAppContext,
2002 cx_c: &mut TestAppContext,
2003 ) {
2004 let lang_registry = Arc::new(LanguageRegistry::test());
2005 let fs = FakeFs::new(cx_a.background());
2006 cx_a.foreground().forbid_parking();
2007
2008 // Connect to a server as 3 clients.
2009 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2010 let client_a = server.create_client(cx_a, "user_a").await;
2011 let mut client_b = server.create_client(cx_b, "user_b").await;
2012 let client_c = server.create_client(cx_c, "user_c").await;
2013 server
2014 .make_contacts(vec![
2015 (&client_a, cx_a),
2016 (&client_b, cx_b),
2017 (&client_c, cx_c),
2018 ])
2019 .await;
2020
2021 // Share a project as client A
2022 fs.insert_tree(
2023 "/a",
2024 json!({
2025 "a.txt": "a-contents",
2026 "b.txt": "b-contents",
2027 }),
2028 )
2029 .await;
2030 let project_a = cx_a.update(|cx| {
2031 Project::local(
2032 client_a.clone(),
2033 client_a.user_store.clone(),
2034 lang_registry.clone(),
2035 fs.clone(),
2036 cx,
2037 )
2038 });
2039 let project_id = project_a
2040 .read_with(cx_a, |project, _| project.next_remote_id())
2041 .await;
2042 let (worktree_a, _) = project_a
2043 .update(cx_a, |p, cx| {
2044 p.find_or_create_local_worktree("/a", true, cx)
2045 })
2046 .await
2047 .unwrap();
2048 worktree_a
2049 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2050 .await;
2051 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2052
2053 // Join that project as client B
2054 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2055 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
2056 project_b
2057 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2058 .await
2059 .unwrap();
2060
2061 // Request to join that project as client C
2062 let project_c = cx_c.spawn(|mut cx| {
2063 let client = client_c.client.clone();
2064 let user_store = client_c.user_store.clone();
2065 let lang_registry = lang_registry.clone();
2066 async move {
2067 Project::remote(
2068 project_id,
2069 client,
2070 user_store,
2071 lang_registry.clone(),
2072 FakeFs::new(cx.background()),
2073 &mut cx,
2074 )
2075 .await
2076 }
2077 });
2078 deterministic.run_until_parked();
2079
2080 // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
2081 server.disconnect_client(client_a.current_user_id(cx_a));
2082 cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
2083 project_a
2084 .condition(cx_a, |project, _| project.collaborators().is_empty())
2085 .await;
2086 project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
2087 project_b
2088 .condition(cx_b, |project, _| project.is_read_only())
2089 .await;
2090 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
2091 cx_b.update(|_| {
2092 drop(project_b);
2093 });
2094 assert!(matches!(
2095 project_c.await.unwrap_err(),
2096 project::JoinProjectError::HostWentOffline
2097 ));
2098
2099 // Ensure guests can still join.
2100 let project_b2 = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2101 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
2102 project_b2
2103 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2104 .await
2105 .unwrap();
2106 }
2107
2108 #[gpui::test(iterations = 10)]
2109 async fn test_decline_join_request(
2110 deterministic: Arc<Deterministic>,
2111 cx_a: &mut TestAppContext,
2112 cx_b: &mut TestAppContext,
2113 ) {
2114 let lang_registry = Arc::new(LanguageRegistry::test());
2115 let fs = FakeFs::new(cx_a.background());
2116 cx_a.foreground().forbid_parking();
2117
2118 // Connect to a server as 2 clients.
2119 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2120 let client_a = server.create_client(cx_a, "user_a").await;
2121 let client_b = server.create_client(cx_b, "user_b").await;
2122 server
2123 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2124 .await;
2125
2126 // Share a project as client A
2127 fs.insert_tree("/a", json!({})).await;
2128 let project_a = cx_a.update(|cx| {
2129 Project::local(
2130 client_a.clone(),
2131 client_a.user_store.clone(),
2132 lang_registry.clone(),
2133 fs.clone(),
2134 cx,
2135 )
2136 });
2137 let project_id = project_a
2138 .read_with(cx_a, |project, _| project.next_remote_id())
2139 .await;
2140 let (worktree_a, _) = project_a
2141 .update(cx_a, |p, cx| {
2142 p.find_or_create_local_worktree("/a", true, cx)
2143 })
2144 .await
2145 .unwrap();
2146 worktree_a
2147 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2148 .await;
2149
2150 // Request to join that project as client B
2151 let project_b = cx_b.spawn(|mut cx| {
2152 let client = client_b.client.clone();
2153 let user_store = client_b.user_store.clone();
2154 let lang_registry = lang_registry.clone();
2155 async move {
2156 Project::remote(
2157 project_id,
2158 client,
2159 user_store,
2160 lang_registry.clone(),
2161 FakeFs::new(cx.background()),
2162 &mut cx,
2163 )
2164 .await
2165 }
2166 });
2167 deterministic.run_until_parked();
2168 project_a.update(cx_a, |project, cx| {
2169 project.respond_to_join_request(client_b.user_id().unwrap(), false, cx)
2170 });
2171 assert!(matches!(
2172 project_b.await.unwrap_err(),
2173 project::JoinProjectError::HostDeclined
2174 ));
2175
2176 // Request to join the project again as client B
2177 let project_b = cx_b.spawn(|mut cx| {
2178 let client = client_b.client.clone();
2179 let user_store = client_b.user_store.clone();
2180 let lang_registry = lang_registry.clone();
2181 async move {
2182 Project::remote(
2183 project_id,
2184 client,
2185 user_store,
2186 lang_registry.clone(),
2187 FakeFs::new(cx.background()),
2188 &mut cx,
2189 )
2190 .await
2191 }
2192 });
2193
2194 // Close the project on the host
2195 deterministic.run_until_parked();
2196 cx_a.update(|_| drop(project_a));
2197 deterministic.run_until_parked();
2198 assert!(matches!(
2199 project_b.await.unwrap_err(),
2200 project::JoinProjectError::HostClosedProject
2201 ));
2202 }
2203
2204 #[gpui::test(iterations = 10)]
2205 async fn test_cancel_join_request(
2206 deterministic: Arc<Deterministic>,
2207 cx_a: &mut TestAppContext,
2208 cx_b: &mut TestAppContext,
2209 ) {
2210 let lang_registry = Arc::new(LanguageRegistry::test());
2211 let fs = FakeFs::new(cx_a.background());
2212 cx_a.foreground().forbid_parking();
2213
2214 // Connect to a server as 2 clients.
2215 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2216 let client_a = server.create_client(cx_a, "user_a").await;
2217 let client_b = server.create_client(cx_b, "user_b").await;
2218 server
2219 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2220 .await;
2221
2222 // Share a project as client A
2223 fs.insert_tree("/a", json!({})).await;
2224 let project_a = cx_a.update(|cx| {
2225 Project::local(
2226 client_a.clone(),
2227 client_a.user_store.clone(),
2228 lang_registry.clone(),
2229 fs.clone(),
2230 cx,
2231 )
2232 });
2233 let project_id = project_a
2234 .read_with(cx_a, |project, _| project.next_remote_id())
2235 .await;
2236
2237 let user_b = client_a
2238 .user_store
2239 .update(cx_a, |store, cx| {
2240 store.fetch_user(client_b.user_id().unwrap(), cx)
2241 })
2242 .await
2243 .unwrap();
2244
2245 let (worktree_a, _) = project_a
2246 .update(cx_a, |p, cx| {
2247 p.find_or_create_local_worktree("/a", true, cx)
2248 })
2249 .await
2250 .unwrap();
2251 worktree_a
2252 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2253 .await;
2254
2255 let project_a_events = Rc::new(RefCell::new(Vec::new()));
2256 project_a.update(cx_a, {
2257 let project_a_events = project_a_events.clone();
2258 move |_, cx| {
2259 cx.subscribe(&cx.handle(), move |_, _, event, _| {
2260 project_a_events.borrow_mut().push(event.clone());
2261 })
2262 .detach();
2263 }
2264 });
2265
2266 // Request to join that project as client B
2267 let project_b = cx_b.spawn(|mut cx| {
2268 let client = client_b.client.clone();
2269 let user_store = client_b.user_store.clone();
2270 let lang_registry = lang_registry.clone();
2271 async move {
2272 Project::remote(
2273 project_id,
2274 client,
2275 user_store,
2276 lang_registry.clone(),
2277 FakeFs::new(cx.background()),
2278 &mut cx,
2279 )
2280 .await
2281 }
2282 });
2283 deterministic.run_until_parked();
2284 assert_eq!(
2285 &*project_a_events.borrow(),
2286 &[project::Event::ContactRequestedJoin(user_b.clone())]
2287 );
2288 project_a_events.borrow_mut().clear();
2289
2290 // Cancel the join request by leaving the project
2291 client_b
2292 .client
2293 .send(proto::LeaveProject { project_id })
2294 .unwrap();
2295 drop(project_b);
2296
2297 deterministic.run_until_parked();
2298 assert_eq!(
2299 &*project_a_events.borrow(),
2300 &[project::Event::ContactCancelledJoinRequest(user_b.clone())]
2301 );
2302 }
2303
2304 #[gpui::test(iterations = 10)]
2305 async fn test_propagate_saves_and_fs_changes(
2306 cx_a: &mut TestAppContext,
2307 cx_b: &mut TestAppContext,
2308 cx_c: &mut TestAppContext,
2309 ) {
2310 let lang_registry = Arc::new(LanguageRegistry::test());
2311 let fs = FakeFs::new(cx_a.background());
2312 cx_a.foreground().forbid_parking();
2313
2314 // Connect to a server as 3 clients.
2315 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2316 let client_a = server.create_client(cx_a, "user_a").await;
2317 let mut client_b = server.create_client(cx_b, "user_b").await;
2318 let mut client_c = server.create_client(cx_c, "user_c").await;
2319 server
2320 .make_contacts(vec![
2321 (&client_a, cx_a),
2322 (&client_b, cx_b),
2323 (&client_c, cx_c),
2324 ])
2325 .await;
2326
2327 // Share a worktree as client A.
2328 fs.insert_tree(
2329 "/a",
2330 json!({
2331 "file1": "",
2332 "file2": ""
2333 }),
2334 )
2335 .await;
2336 let project_a = cx_a.update(|cx| {
2337 Project::local(
2338 client_a.clone(),
2339 client_a.user_store.clone(),
2340 lang_registry.clone(),
2341 fs.clone(),
2342 cx,
2343 )
2344 });
2345 let (worktree_a, _) = project_a
2346 .update(cx_a, |p, cx| {
2347 p.find_or_create_local_worktree("/a", true, cx)
2348 })
2349 .await
2350 .unwrap();
2351 worktree_a
2352 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2353 .await;
2354 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2355
2356 // Join that worktree as clients B and C.
2357 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2358 let project_c = client_c.build_remote_project(&project_a, cx_a, cx_c).await;
2359 let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2360 let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
2361
2362 // Open and edit a buffer as both guests B and C.
2363 let buffer_b = project_b
2364 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
2365 .await
2366 .unwrap();
2367 let buffer_c = project_c
2368 .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
2369 .await
2370 .unwrap();
2371 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "i-am-b, ")], cx));
2372 buffer_c.update(cx_c, |buf, cx| buf.edit([(0..0, "i-am-c, ")], cx));
2373
2374 // Open and edit that buffer as the host.
2375 let buffer_a = project_a
2376 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
2377 .await
2378 .unwrap();
2379
2380 buffer_a
2381 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
2382 .await;
2383 buffer_a.update(cx_a, |buf, cx| {
2384 buf.edit([(buf.len()..buf.len(), "i-am-a")], cx)
2385 });
2386
2387 // Wait for edits to propagate
2388 buffer_a
2389 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2390 .await;
2391 buffer_b
2392 .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2393 .await;
2394 buffer_c
2395 .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2396 .await;
2397
2398 // Edit the buffer as the host and concurrently save as guest B.
2399 let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
2400 buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "hi-a, ")], cx));
2401 save_b.await.unwrap();
2402 assert_eq!(
2403 fs.load("/a/file1".as_ref()).await.unwrap(),
2404 "hi-a, i-am-c, i-am-b, i-am-a"
2405 );
2406 buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
2407 buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
2408 buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
2409
2410 worktree_a.flush_fs_events(cx_a).await;
2411
2412 // Make changes on host's file system, see those changes on guest worktrees.
2413 fs.rename(
2414 "/a/file1".as_ref(),
2415 "/a/file1-renamed".as_ref(),
2416 Default::default(),
2417 )
2418 .await
2419 .unwrap();
2420
2421 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
2422 .await
2423 .unwrap();
2424 fs.insert_file(Path::new("/a/file4"), "4".into()).await;
2425
2426 worktree_a
2427 .condition(&cx_a, |tree, _| {
2428 tree.paths()
2429 .map(|p| p.to_string_lossy())
2430 .collect::<Vec<_>>()
2431 == ["file1-renamed", "file3", "file4"]
2432 })
2433 .await;
2434 worktree_b
2435 .condition(&cx_b, |tree, _| {
2436 tree.paths()
2437 .map(|p| p.to_string_lossy())
2438 .collect::<Vec<_>>()
2439 == ["file1-renamed", "file3", "file4"]
2440 })
2441 .await;
2442 worktree_c
2443 .condition(&cx_c, |tree, _| {
2444 tree.paths()
2445 .map(|p| p.to_string_lossy())
2446 .collect::<Vec<_>>()
2447 == ["file1-renamed", "file3", "file4"]
2448 })
2449 .await;
2450
2451 // Ensure buffer files are updated as well.
2452 buffer_a
2453 .condition(&cx_a, |buf, _| {
2454 buf.file().unwrap().path().to_str() == Some("file1-renamed")
2455 })
2456 .await;
2457 buffer_b
2458 .condition(&cx_b, |buf, _| {
2459 buf.file().unwrap().path().to_str() == Some("file1-renamed")
2460 })
2461 .await;
2462 buffer_c
2463 .condition(&cx_c, |buf, _| {
2464 buf.file().unwrap().path().to_str() == Some("file1-renamed")
2465 })
2466 .await;
2467 }
2468
2469 #[gpui::test(iterations = 10)]
2470 async fn test_fs_operations(
2471 executor: Arc<Deterministic>,
2472 cx_a: &mut TestAppContext,
2473 cx_b: &mut TestAppContext,
2474 ) {
2475 executor.forbid_parking();
2476 let fs = FakeFs::new(cx_a.background());
2477
2478 // Connect to a server as 2 clients.
2479 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2480 let mut client_a = server.create_client(cx_a, "user_a").await;
2481 let mut client_b = server.create_client(cx_b, "user_b").await;
2482 server
2483 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2484 .await;
2485
2486 // Share a project as client A
2487 fs.insert_tree(
2488 "/dir",
2489 json!({
2490 "a.txt": "a-contents",
2491 "b.txt": "b-contents",
2492 }),
2493 )
2494 .await;
2495
2496 let (project_a, worktree_id) = client_a.build_local_project(fs, "/dir", cx_a).await;
2497 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2498
2499 let worktree_a =
2500 project_a.read_with(cx_a, |project, cx| project.worktrees(cx).next().unwrap());
2501 let worktree_b =
2502 project_b.read_with(cx_b, |project, cx| project.worktrees(cx).next().unwrap());
2503
2504 let entry = project_b
2505 .update(cx_b, |project, cx| {
2506 project
2507 .create_entry((worktree_id, "c.txt"), false, cx)
2508 .unwrap()
2509 })
2510 .await
2511 .unwrap();
2512 worktree_a.read_with(cx_a, |worktree, _| {
2513 assert_eq!(
2514 worktree
2515 .paths()
2516 .map(|p| p.to_string_lossy())
2517 .collect::<Vec<_>>(),
2518 ["a.txt", "b.txt", "c.txt"]
2519 );
2520 });
2521 worktree_b.read_with(cx_b, |worktree, _| {
2522 assert_eq!(
2523 worktree
2524 .paths()
2525 .map(|p| p.to_string_lossy())
2526 .collect::<Vec<_>>(),
2527 ["a.txt", "b.txt", "c.txt"]
2528 );
2529 });
2530
2531 project_b
2532 .update(cx_b, |project, cx| {
2533 project.rename_entry(entry.id, Path::new("d.txt"), cx)
2534 })
2535 .unwrap()
2536 .await
2537 .unwrap();
2538 worktree_a.read_with(cx_a, |worktree, _| {
2539 assert_eq!(
2540 worktree
2541 .paths()
2542 .map(|p| p.to_string_lossy())
2543 .collect::<Vec<_>>(),
2544 ["a.txt", "b.txt", "d.txt"]
2545 );
2546 });
2547 worktree_b.read_with(cx_b, |worktree, _| {
2548 assert_eq!(
2549 worktree
2550 .paths()
2551 .map(|p| p.to_string_lossy())
2552 .collect::<Vec<_>>(),
2553 ["a.txt", "b.txt", "d.txt"]
2554 );
2555 });
2556
2557 let dir_entry = project_b
2558 .update(cx_b, |project, cx| {
2559 project
2560 .create_entry((worktree_id, "DIR"), true, cx)
2561 .unwrap()
2562 })
2563 .await
2564 .unwrap();
2565 worktree_a.read_with(cx_a, |worktree, _| {
2566 assert_eq!(
2567 worktree
2568 .paths()
2569 .map(|p| p.to_string_lossy())
2570 .collect::<Vec<_>>(),
2571 ["DIR", "a.txt", "b.txt", "d.txt"]
2572 );
2573 });
2574 worktree_b.read_with(cx_b, |worktree, _| {
2575 assert_eq!(
2576 worktree
2577 .paths()
2578 .map(|p| p.to_string_lossy())
2579 .collect::<Vec<_>>(),
2580 ["DIR", "a.txt", "b.txt", "d.txt"]
2581 );
2582 });
2583
2584 project_b
2585 .update(cx_b, |project, cx| {
2586 project.delete_entry(dir_entry.id, cx).unwrap()
2587 })
2588 .await
2589 .unwrap();
2590 worktree_a.read_with(cx_a, |worktree, _| {
2591 assert_eq!(
2592 worktree
2593 .paths()
2594 .map(|p| p.to_string_lossy())
2595 .collect::<Vec<_>>(),
2596 ["a.txt", "b.txt", "d.txt"]
2597 );
2598 });
2599 worktree_b.read_with(cx_b, |worktree, _| {
2600 assert_eq!(
2601 worktree
2602 .paths()
2603 .map(|p| p.to_string_lossy())
2604 .collect::<Vec<_>>(),
2605 ["a.txt", "b.txt", "d.txt"]
2606 );
2607 });
2608
2609 project_b
2610 .update(cx_b, |project, cx| {
2611 project.delete_entry(entry.id, cx).unwrap()
2612 })
2613 .await
2614 .unwrap();
2615 worktree_a.read_with(cx_a, |worktree, _| {
2616 assert_eq!(
2617 worktree
2618 .paths()
2619 .map(|p| p.to_string_lossy())
2620 .collect::<Vec<_>>(),
2621 ["a.txt", "b.txt"]
2622 );
2623 });
2624 worktree_b.read_with(cx_b, |worktree, _| {
2625 assert_eq!(
2626 worktree
2627 .paths()
2628 .map(|p| p.to_string_lossy())
2629 .collect::<Vec<_>>(),
2630 ["a.txt", "b.txt"]
2631 );
2632 });
2633 }
2634
2635 #[gpui::test(iterations = 10)]
2636 async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2637 cx_a.foreground().forbid_parking();
2638 let lang_registry = Arc::new(LanguageRegistry::test());
2639 let fs = FakeFs::new(cx_a.background());
2640
2641 // Connect to a server as 2 clients.
2642 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2643 let client_a = server.create_client(cx_a, "user_a").await;
2644 let mut client_b = server.create_client(cx_b, "user_b").await;
2645 server
2646 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2647 .await;
2648
2649 // Share a project as client A
2650 fs.insert_tree(
2651 "/dir",
2652 json!({
2653 "a.txt": "a-contents",
2654 }),
2655 )
2656 .await;
2657
2658 let project_a = cx_a.update(|cx| {
2659 Project::local(
2660 client_a.clone(),
2661 client_a.user_store.clone(),
2662 lang_registry.clone(),
2663 fs.clone(),
2664 cx,
2665 )
2666 });
2667 let (worktree_a, _) = project_a
2668 .update(cx_a, |p, cx| {
2669 p.find_or_create_local_worktree("/dir", true, cx)
2670 })
2671 .await
2672 .unwrap();
2673 worktree_a
2674 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2675 .await;
2676 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2677
2678 // Join that project as client B
2679 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2680
2681 // Open a buffer as client B
2682 let buffer_b = project_b
2683 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2684 .await
2685 .unwrap();
2686
2687 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "world ")], cx));
2688 buffer_b.read_with(cx_b, |buf, _| {
2689 assert!(buf.is_dirty());
2690 assert!(!buf.has_conflict());
2691 });
2692
2693 buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
2694 buffer_b
2695 .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
2696 .await;
2697 buffer_b.read_with(cx_b, |buf, _| {
2698 assert!(!buf.has_conflict());
2699 });
2700
2701 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "hello ")], cx));
2702 buffer_b.read_with(cx_b, |buf, _| {
2703 assert!(buf.is_dirty());
2704 assert!(!buf.has_conflict());
2705 });
2706 }
2707
2708 #[gpui::test(iterations = 10)]
2709 async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2710 cx_a.foreground().forbid_parking();
2711 let lang_registry = Arc::new(LanguageRegistry::test());
2712 let fs = FakeFs::new(cx_a.background());
2713
2714 // Connect to a server as 2 clients.
2715 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2716 let client_a = server.create_client(cx_a, "user_a").await;
2717 let mut client_b = server.create_client(cx_b, "user_b").await;
2718 server
2719 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2720 .await;
2721
2722 // Share a project as client A
2723 fs.insert_tree(
2724 "/dir",
2725 json!({
2726 "a.txt": "a-contents",
2727 }),
2728 )
2729 .await;
2730
2731 let project_a = cx_a.update(|cx| {
2732 Project::local(
2733 client_a.clone(),
2734 client_a.user_store.clone(),
2735 lang_registry.clone(),
2736 fs.clone(),
2737 cx,
2738 )
2739 });
2740 let (worktree_a, _) = project_a
2741 .update(cx_a, |p, cx| {
2742 p.find_or_create_local_worktree("/dir", true, cx)
2743 })
2744 .await
2745 .unwrap();
2746 worktree_a
2747 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2748 .await;
2749 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2750
2751 // Join that project as client B
2752 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2753 let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2754
2755 // Open a buffer as client B
2756 let buffer_b = project_b
2757 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2758 .await
2759 .unwrap();
2760 buffer_b.read_with(cx_b, |buf, _| {
2761 assert!(!buf.is_dirty());
2762 assert!(!buf.has_conflict());
2763 });
2764
2765 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
2766 .await
2767 .unwrap();
2768 buffer_b
2769 .condition(&cx_b, |buf, _| {
2770 buf.text() == "new contents" && !buf.is_dirty()
2771 })
2772 .await;
2773 buffer_b.read_with(cx_b, |buf, _| {
2774 assert!(!buf.has_conflict());
2775 });
2776 }
2777
2778 #[gpui::test(iterations = 10)]
2779 async fn test_editing_while_guest_opens_buffer(
2780 cx_a: &mut TestAppContext,
2781 cx_b: &mut TestAppContext,
2782 ) {
2783 cx_a.foreground().forbid_parking();
2784 let lang_registry = Arc::new(LanguageRegistry::test());
2785 let fs = FakeFs::new(cx_a.background());
2786
2787 // Connect to a server as 2 clients.
2788 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2789 let client_a = server.create_client(cx_a, "user_a").await;
2790 let mut client_b = server.create_client(cx_b, "user_b").await;
2791 server
2792 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2793 .await;
2794
2795 // Share a project as client A
2796 fs.insert_tree(
2797 "/dir",
2798 json!({
2799 "a.txt": "a-contents",
2800 }),
2801 )
2802 .await;
2803 let project_a = cx_a.update(|cx| {
2804 Project::local(
2805 client_a.clone(),
2806 client_a.user_store.clone(),
2807 lang_registry.clone(),
2808 fs.clone(),
2809 cx,
2810 )
2811 });
2812 let (worktree_a, _) = project_a
2813 .update(cx_a, |p, cx| {
2814 p.find_or_create_local_worktree("/dir", true, cx)
2815 })
2816 .await
2817 .unwrap();
2818 worktree_a
2819 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2820 .await;
2821 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2822
2823 // Join that project as client B
2824 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2825
2826 // Open a buffer as client A
2827 let buffer_a = project_a
2828 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2829 .await
2830 .unwrap();
2831
2832 // Start opening the same buffer as client B
2833 let buffer_b = cx_b
2834 .background()
2835 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2836
2837 // Edit the buffer as client A while client B is still opening it.
2838 cx_b.background().simulate_random_delay().await;
2839 buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "X")], cx));
2840 cx_b.background().simulate_random_delay().await;
2841 buffer_a.update(cx_a, |buf, cx| buf.edit([(1..1, "Y")], cx));
2842
2843 let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2844 let buffer_b = buffer_b.await.unwrap();
2845 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2846 }
2847
2848 #[gpui::test(iterations = 10)]
2849 async fn test_leaving_worktree_while_opening_buffer(
2850 cx_a: &mut TestAppContext,
2851 cx_b: &mut TestAppContext,
2852 ) {
2853 cx_a.foreground().forbid_parking();
2854 let lang_registry = Arc::new(LanguageRegistry::test());
2855 let fs = FakeFs::new(cx_a.background());
2856
2857 // Connect to a server as 2 clients.
2858 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2859 let client_a = server.create_client(cx_a, "user_a").await;
2860 let mut client_b = server.create_client(cx_b, "user_b").await;
2861 server
2862 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2863 .await;
2864
2865 // Share a project as client A
2866 fs.insert_tree(
2867 "/dir",
2868 json!({
2869 "a.txt": "a-contents",
2870 }),
2871 )
2872 .await;
2873 let project_a = cx_a.update(|cx| {
2874 Project::local(
2875 client_a.clone(),
2876 client_a.user_store.clone(),
2877 lang_registry.clone(),
2878 fs.clone(),
2879 cx,
2880 )
2881 });
2882 let (worktree_a, _) = project_a
2883 .update(cx_a, |p, cx| {
2884 p.find_or_create_local_worktree("/dir", true, cx)
2885 })
2886 .await
2887 .unwrap();
2888 worktree_a
2889 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2890 .await;
2891 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2892
2893 // Join that project as client B
2894 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2895
2896 // See that a guest has joined as client A.
2897 project_a
2898 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2899 .await;
2900
2901 // Begin opening a buffer as client B, but leave the project before the open completes.
2902 let buffer_b = cx_b
2903 .background()
2904 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2905 cx_b.update(|_| {
2906 drop(client_b.project.take());
2907 drop(project_b);
2908 });
2909 drop(buffer_b);
2910
2911 // See that the guest has left.
2912 project_a
2913 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2914 .await;
2915 }
2916
2917 #[gpui::test(iterations = 10)]
2918 async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2919 cx_a.foreground().forbid_parking();
2920 let lang_registry = Arc::new(LanguageRegistry::test());
2921 let fs = FakeFs::new(cx_a.background());
2922
2923 // Connect to a server as 2 clients.
2924 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2925 let client_a = server.create_client(cx_a, "user_a").await;
2926 let mut client_b = server.create_client(cx_b, "user_b").await;
2927 server
2928 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2929 .await;
2930
2931 // Share a project as client A
2932 fs.insert_tree(
2933 "/a",
2934 json!({
2935 "a.txt": "a-contents",
2936 "b.txt": "b-contents",
2937 }),
2938 )
2939 .await;
2940 let project_a = cx_a.update(|cx| {
2941 Project::local(
2942 client_a.clone(),
2943 client_a.user_store.clone(),
2944 lang_registry.clone(),
2945 fs.clone(),
2946 cx,
2947 )
2948 });
2949 let (worktree_a, _) = project_a
2950 .update(cx_a, |p, cx| {
2951 p.find_or_create_local_worktree("/a", true, cx)
2952 })
2953 .await
2954 .unwrap();
2955 worktree_a
2956 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2957 .await;
2958
2959 // Join that project as client B
2960 let _project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2961
2962 // Client A sees that a guest has joined.
2963 project_a
2964 .condition(cx_a, |p, _| p.collaborators().len() == 1)
2965 .await;
2966
2967 // Drop client B's connection and ensure client A observes client B leaving the project.
2968 client_b.disconnect(&cx_b.to_async()).unwrap();
2969 project_a
2970 .condition(cx_a, |p, _| p.collaborators().len() == 0)
2971 .await;
2972
2973 // Rejoin the project as client B
2974 let _project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2975
2976 // Client A sees that a guest has re-joined.
2977 project_a
2978 .condition(cx_a, |p, _| p.collaborators().len() == 1)
2979 .await;
2980
2981 // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2982 client_b.wait_for_current_user(cx_b).await;
2983 server.disconnect_client(client_b.current_user_id(cx_b));
2984 cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
2985 project_a
2986 .condition(cx_a, |p, _| p.collaborators().len() == 0)
2987 .await;
2988 }
2989
2990 #[gpui::test(iterations = 10)]
2991 async fn test_collaborating_with_diagnostics(
2992 deterministic: Arc<Deterministic>,
2993 cx_a: &mut TestAppContext,
2994 cx_b: &mut TestAppContext,
2995 cx_c: &mut TestAppContext,
2996 ) {
2997 deterministic.forbid_parking();
2998 let lang_registry = Arc::new(LanguageRegistry::test());
2999 let fs = FakeFs::new(cx_a.background());
3000
3001 // Set up a fake language server.
3002 let mut language = Language::new(
3003 LanguageConfig {
3004 name: "Rust".into(),
3005 path_suffixes: vec!["rs".to_string()],
3006 ..Default::default()
3007 },
3008 Some(tree_sitter_rust::language()),
3009 );
3010 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3011 lang_registry.add(Arc::new(language));
3012
3013 // Connect to a server as 2 clients.
3014 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3015 let client_a = server.create_client(cx_a, "user_a").await;
3016 let mut client_b = server.create_client(cx_b, "user_b").await;
3017 let mut client_c = server.create_client(cx_c, "user_c").await;
3018 server
3019 .make_contacts(vec![
3020 (&client_a, cx_a),
3021 (&client_b, cx_b),
3022 (&client_c, cx_c),
3023 ])
3024 .await;
3025
3026 // Share a project as client A
3027 fs.insert_tree(
3028 "/a",
3029 json!({
3030 "a.rs": "let one = two",
3031 "other.rs": "",
3032 }),
3033 )
3034 .await;
3035 let project_a = cx_a.update(|cx| {
3036 Project::local(
3037 client_a.clone(),
3038 client_a.user_store.clone(),
3039 lang_registry.clone(),
3040 fs.clone(),
3041 cx,
3042 )
3043 });
3044 let (worktree_a, _) = project_a
3045 .update(cx_a, |p, cx| {
3046 p.find_or_create_local_worktree("/a", true, cx)
3047 })
3048 .await
3049 .unwrap();
3050 worktree_a
3051 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3052 .await;
3053 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3054 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3055
3056 // Cause the language server to start.
3057 let _buffer = cx_a
3058 .background()
3059 .spawn(project_a.update(cx_a, |project, cx| {
3060 project.open_buffer(
3061 ProjectPath {
3062 worktree_id,
3063 path: Path::new("other.rs").into(),
3064 },
3065 cx,
3066 )
3067 }))
3068 .await
3069 .unwrap();
3070
3071 // Join the worktree as client B.
3072 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3073
3074 // Simulate a language server reporting errors for a file.
3075 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3076 fake_language_server
3077 .receive_notification::<lsp::notification::DidOpenTextDocument>()
3078 .await;
3079 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
3080 lsp::PublishDiagnosticsParams {
3081 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
3082 version: None,
3083 diagnostics: vec![lsp::Diagnostic {
3084 severity: Some(lsp::DiagnosticSeverity::ERROR),
3085 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
3086 message: "message 1".to_string(),
3087 ..Default::default()
3088 }],
3089 },
3090 );
3091
3092 // Wait for server to see the diagnostics update.
3093 deterministic.run_until_parked();
3094 {
3095 let store = server.store.read().await;
3096 let project = store.project(project_id).unwrap();
3097 let worktree = project.worktrees.get(&worktree_id.to_proto()).unwrap();
3098 assert!(!worktree.diagnostic_summaries.is_empty());
3099 }
3100
3101 // Ensure client B observes the new diagnostics.
3102 project_b.read_with(cx_b, |project, cx| {
3103 assert_eq!(
3104 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
3105 &[(
3106 ProjectPath {
3107 worktree_id,
3108 path: Arc::from(Path::new("a.rs")),
3109 },
3110 DiagnosticSummary {
3111 error_count: 1,
3112 warning_count: 0,
3113 ..Default::default()
3114 },
3115 )]
3116 )
3117 });
3118
3119 // Join project as client C and observe the diagnostics.
3120 let project_c = client_c.build_remote_project(&project_a, cx_a, cx_c).await;
3121 project_c.read_with(cx_c, |project, cx| {
3122 assert_eq!(
3123 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
3124 &[(
3125 ProjectPath {
3126 worktree_id,
3127 path: Arc::from(Path::new("a.rs")),
3128 },
3129 DiagnosticSummary {
3130 error_count: 1,
3131 warning_count: 0,
3132 ..Default::default()
3133 },
3134 )]
3135 )
3136 });
3137
3138 // Simulate a language server reporting more errors for a file.
3139 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
3140 lsp::PublishDiagnosticsParams {
3141 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
3142 version: None,
3143 diagnostics: vec![
3144 lsp::Diagnostic {
3145 severity: Some(lsp::DiagnosticSeverity::ERROR),
3146 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
3147 message: "message 1".to_string(),
3148 ..Default::default()
3149 },
3150 lsp::Diagnostic {
3151 severity: Some(lsp::DiagnosticSeverity::WARNING),
3152 range: lsp::Range::new(
3153 lsp::Position::new(0, 10),
3154 lsp::Position::new(0, 13),
3155 ),
3156 message: "message 2".to_string(),
3157 ..Default::default()
3158 },
3159 ],
3160 },
3161 );
3162
3163 // Clients B and C get the updated summaries
3164 deterministic.run_until_parked();
3165 project_b.read_with(cx_b, |project, cx| {
3166 assert_eq!(
3167 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
3168 [(
3169 ProjectPath {
3170 worktree_id,
3171 path: Arc::from(Path::new("a.rs")),
3172 },
3173 DiagnosticSummary {
3174 error_count: 1,
3175 warning_count: 1,
3176 ..Default::default()
3177 },
3178 )]
3179 );
3180 });
3181 project_c.read_with(cx_c, |project, cx| {
3182 assert_eq!(
3183 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
3184 [(
3185 ProjectPath {
3186 worktree_id,
3187 path: Arc::from(Path::new("a.rs")),
3188 },
3189 DiagnosticSummary {
3190 error_count: 1,
3191 warning_count: 1,
3192 ..Default::default()
3193 },
3194 )]
3195 );
3196 });
3197
3198 // Open the file with the errors on client B. They should be present.
3199 let buffer_b = cx_b
3200 .background()
3201 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3202 .await
3203 .unwrap();
3204
3205 buffer_b.read_with(cx_b, |buffer, _| {
3206 assert_eq!(
3207 buffer
3208 .snapshot()
3209 .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
3210 .map(|entry| entry)
3211 .collect::<Vec<_>>(),
3212 &[
3213 DiagnosticEntry {
3214 range: Point::new(0, 4)..Point::new(0, 7),
3215 diagnostic: Diagnostic {
3216 group_id: 0,
3217 message: "message 1".to_string(),
3218 severity: lsp::DiagnosticSeverity::ERROR,
3219 is_primary: true,
3220 ..Default::default()
3221 }
3222 },
3223 DiagnosticEntry {
3224 range: Point::new(0, 10)..Point::new(0, 13),
3225 diagnostic: Diagnostic {
3226 group_id: 1,
3227 severity: lsp::DiagnosticSeverity::WARNING,
3228 message: "message 2".to_string(),
3229 is_primary: true,
3230 ..Default::default()
3231 }
3232 }
3233 ]
3234 );
3235 });
3236
3237 // Simulate a language server reporting no errors for a file.
3238 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
3239 lsp::PublishDiagnosticsParams {
3240 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
3241 version: None,
3242 diagnostics: vec![],
3243 },
3244 );
3245 deterministic.run_until_parked();
3246 project_a.read_with(cx_a, |project, cx| {
3247 assert_eq!(project.diagnostic_summaries(cx).collect::<Vec<_>>(), [])
3248 });
3249 project_b.read_with(cx_b, |project, cx| {
3250 assert_eq!(project.diagnostic_summaries(cx).collect::<Vec<_>>(), [])
3251 });
3252 project_c.read_with(cx_c, |project, cx| {
3253 assert_eq!(project.diagnostic_summaries(cx).collect::<Vec<_>>(), [])
3254 });
3255 }
3256
3257 #[gpui::test(iterations = 10)]
3258 async fn test_collaborating_with_completion(
3259 cx_a: &mut TestAppContext,
3260 cx_b: &mut TestAppContext,
3261 ) {
3262 cx_a.foreground().forbid_parking();
3263 let lang_registry = Arc::new(LanguageRegistry::test());
3264 let fs = FakeFs::new(cx_a.background());
3265
3266 // Set up a fake language server.
3267 let mut language = Language::new(
3268 LanguageConfig {
3269 name: "Rust".into(),
3270 path_suffixes: vec!["rs".to_string()],
3271 ..Default::default()
3272 },
3273 Some(tree_sitter_rust::language()),
3274 );
3275 let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
3276 capabilities: lsp::ServerCapabilities {
3277 completion_provider: Some(lsp::CompletionOptions {
3278 trigger_characters: Some(vec![".".to_string()]),
3279 ..Default::default()
3280 }),
3281 ..Default::default()
3282 },
3283 ..Default::default()
3284 });
3285 lang_registry.add(Arc::new(language));
3286
3287 // Connect to a server as 2 clients.
3288 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3289 let client_a = server.create_client(cx_a, "user_a").await;
3290 let mut client_b = server.create_client(cx_b, "user_b").await;
3291 server
3292 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3293 .await;
3294
3295 // Share a project as client A
3296 fs.insert_tree(
3297 "/a",
3298 json!({
3299 "main.rs": "fn main() { a }",
3300 "other.rs": "",
3301 }),
3302 )
3303 .await;
3304 let project_a = cx_a.update(|cx| {
3305 Project::local(
3306 client_a.clone(),
3307 client_a.user_store.clone(),
3308 lang_registry.clone(),
3309 fs.clone(),
3310 cx,
3311 )
3312 });
3313 let (worktree_a, _) = project_a
3314 .update(cx_a, |p, cx| {
3315 p.find_or_create_local_worktree("/a", true, cx)
3316 })
3317 .await
3318 .unwrap();
3319 worktree_a
3320 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3321 .await;
3322 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3323
3324 // Join the worktree as client B.
3325 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3326
3327 // Open a file in an editor as the guest.
3328 let buffer_b = project_b
3329 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
3330 .await
3331 .unwrap();
3332 let (window_b, _) = cx_b.add_window(|_| EmptyView);
3333 let editor_b = cx_b.add_view(window_b, |cx| {
3334 Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
3335 });
3336
3337 let fake_language_server = fake_language_servers.next().await.unwrap();
3338 buffer_b
3339 .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
3340 .await;
3341
3342 // Type a completion trigger character as the guest.
3343 editor_b.update(cx_b, |editor, cx| {
3344 editor.change_selections(None, cx, |s| s.select_ranges([13..13]));
3345 editor.handle_input(&Input(".".into()), cx);
3346 cx.focus(&editor_b);
3347 });
3348
3349 // Receive a completion request as the host's language server.
3350 // Return some completions from the host's language server.
3351 cx_a.foreground().start_waiting();
3352 fake_language_server
3353 .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
3354 assert_eq!(
3355 params.text_document_position.text_document.uri,
3356 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3357 );
3358 assert_eq!(
3359 params.text_document_position.position,
3360 lsp::Position::new(0, 14),
3361 );
3362
3363 Ok(Some(lsp::CompletionResponse::Array(vec![
3364 lsp::CompletionItem {
3365 label: "first_method(…)".into(),
3366 detail: Some("fn(&mut self, B) -> C".into()),
3367 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3368 new_text: "first_method($1)".to_string(),
3369 range: lsp::Range::new(
3370 lsp::Position::new(0, 14),
3371 lsp::Position::new(0, 14),
3372 ),
3373 })),
3374 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3375 ..Default::default()
3376 },
3377 lsp::CompletionItem {
3378 label: "second_method(…)".into(),
3379 detail: Some("fn(&mut self, C) -> D<E>".into()),
3380 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3381 new_text: "second_method()".to_string(),
3382 range: lsp::Range::new(
3383 lsp::Position::new(0, 14),
3384 lsp::Position::new(0, 14),
3385 ),
3386 })),
3387 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3388 ..Default::default()
3389 },
3390 ])))
3391 })
3392 .next()
3393 .await
3394 .unwrap();
3395 cx_a.foreground().finish_waiting();
3396
3397 // Open the buffer on the host.
3398 let buffer_a = project_a
3399 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
3400 .await
3401 .unwrap();
3402 buffer_a
3403 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
3404 .await;
3405
3406 // Confirm a completion on the guest.
3407 editor_b
3408 .condition(&cx_b, |editor, _| editor.context_menu_visible())
3409 .await;
3410 editor_b.update(cx_b, |editor, cx| {
3411 editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
3412 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
3413 });
3414
3415 // Return a resolved completion from the host's language server.
3416 // The resolved completion has an additional text edit.
3417 fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
3418 |params, _| async move {
3419 assert_eq!(params.label, "first_method(…)");
3420 Ok(lsp::CompletionItem {
3421 label: "first_method(…)".into(),
3422 detail: Some("fn(&mut self, B) -> C".into()),
3423 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3424 new_text: "first_method($1)".to_string(),
3425 range: lsp::Range::new(
3426 lsp::Position::new(0, 14),
3427 lsp::Position::new(0, 14),
3428 ),
3429 })),
3430 additional_text_edits: Some(vec![lsp::TextEdit {
3431 new_text: "use d::SomeTrait;\n".to_string(),
3432 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
3433 }]),
3434 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3435 ..Default::default()
3436 })
3437 },
3438 );
3439
3440 // The additional edit is applied.
3441 buffer_a
3442 .condition(&cx_a, |buffer, _| {
3443 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3444 })
3445 .await;
3446 buffer_b
3447 .condition(&cx_b, |buffer, _| {
3448 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3449 })
3450 .await;
3451 }
3452
3453 #[gpui::test(iterations = 10)]
3454 async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3455 cx_a.foreground().forbid_parking();
3456 let lang_registry = Arc::new(LanguageRegistry::test());
3457 let fs = FakeFs::new(cx_a.background());
3458
3459 // Connect to a server as 2 clients.
3460 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3461 let client_a = server.create_client(cx_a, "user_a").await;
3462 let mut client_b = server.create_client(cx_b, "user_b").await;
3463 server
3464 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3465 .await;
3466
3467 // Share a project as client A
3468 fs.insert_tree(
3469 "/a",
3470 json!({
3471 "a.rs": "let one = 1;",
3472 }),
3473 )
3474 .await;
3475 let project_a = cx_a.update(|cx| {
3476 Project::local(
3477 client_a.clone(),
3478 client_a.user_store.clone(),
3479 lang_registry.clone(),
3480 fs.clone(),
3481 cx,
3482 )
3483 });
3484 let (worktree_a, _) = project_a
3485 .update(cx_a, |p, cx| {
3486 p.find_or_create_local_worktree("/a", true, cx)
3487 })
3488 .await
3489 .unwrap();
3490 worktree_a
3491 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3492 .await;
3493 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3494 let buffer_a = project_a
3495 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
3496 .await
3497 .unwrap();
3498
3499 // Join the worktree as client B.
3500 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3501
3502 let buffer_b = cx_b
3503 .background()
3504 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3505 .await
3506 .unwrap();
3507 buffer_b.update(cx_b, |buffer, cx| {
3508 buffer.edit([(4..7, "six")], cx);
3509 buffer.edit([(10..11, "6")], cx);
3510 assert_eq!(buffer.text(), "let six = 6;");
3511 assert!(buffer.is_dirty());
3512 assert!(!buffer.has_conflict());
3513 });
3514 buffer_a
3515 .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
3516 .await;
3517
3518 fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
3519 .await
3520 .unwrap();
3521 buffer_a
3522 .condition(cx_a, |buffer, _| buffer.has_conflict())
3523 .await;
3524 buffer_b
3525 .condition(cx_b, |buffer, _| buffer.has_conflict())
3526 .await;
3527
3528 project_b
3529 .update(cx_b, |project, cx| {
3530 project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
3531 })
3532 .await
3533 .unwrap();
3534 buffer_a.read_with(cx_a, |buffer, _| {
3535 assert_eq!(buffer.text(), "let seven = 7;");
3536 assert!(!buffer.is_dirty());
3537 assert!(!buffer.has_conflict());
3538 });
3539 buffer_b.read_with(cx_b, |buffer, _| {
3540 assert_eq!(buffer.text(), "let seven = 7;");
3541 assert!(!buffer.is_dirty());
3542 assert!(!buffer.has_conflict());
3543 });
3544
3545 buffer_a.update(cx_a, |buffer, cx| {
3546 // Undoing on the host is a no-op when the reload was initiated by the guest.
3547 buffer.undo(cx);
3548 assert_eq!(buffer.text(), "let seven = 7;");
3549 assert!(!buffer.is_dirty());
3550 assert!(!buffer.has_conflict());
3551 });
3552 buffer_b.update(cx_b, |buffer, cx| {
3553 // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
3554 buffer.undo(cx);
3555 assert_eq!(buffer.text(), "let six = 6;");
3556 assert!(buffer.is_dirty());
3557 assert!(!buffer.has_conflict());
3558 });
3559 }
3560
3561 #[gpui::test(iterations = 10)]
3562 async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3563 cx_a.foreground().forbid_parking();
3564 let lang_registry = Arc::new(LanguageRegistry::test());
3565 let fs = FakeFs::new(cx_a.background());
3566
3567 // Set up a fake language server.
3568 let mut language = Language::new(
3569 LanguageConfig {
3570 name: "Rust".into(),
3571 path_suffixes: vec!["rs".to_string()],
3572 ..Default::default()
3573 },
3574 Some(tree_sitter_rust::language()),
3575 );
3576 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3577 lang_registry.add(Arc::new(language));
3578
3579 // Connect to a server as 2 clients.
3580 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3581 let client_a = server.create_client(cx_a, "user_a").await;
3582 let mut client_b = server.create_client(cx_b, "user_b").await;
3583 server
3584 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3585 .await;
3586
3587 // Share a project as client A
3588 fs.insert_tree(
3589 "/a",
3590 json!({
3591 "a.rs": "let one = two",
3592 }),
3593 )
3594 .await;
3595 let project_a = cx_a.update(|cx| {
3596 Project::local(
3597 client_a.clone(),
3598 client_a.user_store.clone(),
3599 lang_registry.clone(),
3600 fs.clone(),
3601 cx,
3602 )
3603 });
3604 let (worktree_a, _) = project_a
3605 .update(cx_a, |p, cx| {
3606 p.find_or_create_local_worktree("/a", true, cx)
3607 })
3608 .await
3609 .unwrap();
3610 worktree_a
3611 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3612 .await;
3613 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3614
3615 // Join the project as client B.
3616 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3617
3618 let buffer_b = cx_b
3619 .background()
3620 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3621 .await
3622 .unwrap();
3623
3624 let fake_language_server = fake_language_servers.next().await.unwrap();
3625 fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
3626 Ok(Some(vec![
3627 lsp::TextEdit {
3628 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
3629 new_text: "h".to_string(),
3630 },
3631 lsp::TextEdit {
3632 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
3633 new_text: "y".to_string(),
3634 },
3635 ]))
3636 });
3637
3638 project_b
3639 .update(cx_b, |project, cx| {
3640 project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
3641 })
3642 .await
3643 .unwrap();
3644 assert_eq!(
3645 buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
3646 "let honey = two"
3647 );
3648 }
3649
3650 #[gpui::test(iterations = 10)]
3651 async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3652 cx_a.foreground().forbid_parking();
3653 let lang_registry = Arc::new(LanguageRegistry::test());
3654 let fs = FakeFs::new(cx_a.background());
3655 fs.insert_tree(
3656 "/root-1",
3657 json!({
3658 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
3659 }),
3660 )
3661 .await;
3662 fs.insert_tree(
3663 "/root-2",
3664 json!({
3665 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
3666 }),
3667 )
3668 .await;
3669
3670 // Set up a fake language server.
3671 let mut language = Language::new(
3672 LanguageConfig {
3673 name: "Rust".into(),
3674 path_suffixes: vec!["rs".to_string()],
3675 ..Default::default()
3676 },
3677 Some(tree_sitter_rust::language()),
3678 );
3679 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3680 lang_registry.add(Arc::new(language));
3681
3682 // Connect to a server as 2 clients.
3683 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3684 let client_a = server.create_client(cx_a, "user_a").await;
3685 let mut client_b = server.create_client(cx_b, "user_b").await;
3686 server
3687 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3688 .await;
3689
3690 // Share a project as client A
3691 let project_a = cx_a.update(|cx| {
3692 Project::local(
3693 client_a.clone(),
3694 client_a.user_store.clone(),
3695 lang_registry.clone(),
3696 fs.clone(),
3697 cx,
3698 )
3699 });
3700 let (worktree_a, _) = project_a
3701 .update(cx_a, |p, cx| {
3702 p.find_or_create_local_worktree("/root-1", true, cx)
3703 })
3704 .await
3705 .unwrap();
3706 worktree_a
3707 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3708 .await;
3709 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3710
3711 // Join the project as client B.
3712 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3713
3714 // Open the file on client B.
3715 let buffer_b = cx_b
3716 .background()
3717 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3718 .await
3719 .unwrap();
3720
3721 // Request the definition of a symbol as the guest.
3722 let fake_language_server = fake_language_servers.next().await.unwrap();
3723 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3724 |_, _| async move {
3725 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3726 lsp::Location::new(
3727 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3728 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3729 ),
3730 )))
3731 },
3732 );
3733
3734 let definitions_1 = project_b
3735 .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
3736 .await
3737 .unwrap();
3738 cx_b.read(|cx| {
3739 assert_eq!(definitions_1.len(), 1);
3740 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3741 let target_buffer = definitions_1[0].buffer.read(cx);
3742 assert_eq!(
3743 target_buffer.text(),
3744 "const TWO: usize = 2;\nconst THREE: usize = 3;"
3745 );
3746 assert_eq!(
3747 definitions_1[0].range.to_point(target_buffer),
3748 Point::new(0, 6)..Point::new(0, 9)
3749 );
3750 });
3751
3752 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
3753 // the previous call to `definition`.
3754 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3755 |_, _| async move {
3756 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3757 lsp::Location::new(
3758 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3759 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
3760 ),
3761 )))
3762 },
3763 );
3764
3765 let definitions_2 = project_b
3766 .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
3767 .await
3768 .unwrap();
3769 cx_b.read(|cx| {
3770 assert_eq!(definitions_2.len(), 1);
3771 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3772 let target_buffer = definitions_2[0].buffer.read(cx);
3773 assert_eq!(
3774 target_buffer.text(),
3775 "const TWO: usize = 2;\nconst THREE: usize = 3;"
3776 );
3777 assert_eq!(
3778 definitions_2[0].range.to_point(target_buffer),
3779 Point::new(1, 6)..Point::new(1, 11)
3780 );
3781 });
3782 assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
3783 }
3784
3785 #[gpui::test(iterations = 10)]
3786 async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3787 cx_a.foreground().forbid_parking();
3788 let lang_registry = Arc::new(LanguageRegistry::test());
3789 let fs = FakeFs::new(cx_a.background());
3790 fs.insert_tree(
3791 "/root-1",
3792 json!({
3793 "one.rs": "const ONE: usize = 1;",
3794 "two.rs": "const TWO: usize = one::ONE + one::ONE;",
3795 }),
3796 )
3797 .await;
3798 fs.insert_tree(
3799 "/root-2",
3800 json!({
3801 "three.rs": "const THREE: usize = two::TWO + one::ONE;",
3802 }),
3803 )
3804 .await;
3805
3806 // Set up a fake language server.
3807 let mut language = Language::new(
3808 LanguageConfig {
3809 name: "Rust".into(),
3810 path_suffixes: vec!["rs".to_string()],
3811 ..Default::default()
3812 },
3813 Some(tree_sitter_rust::language()),
3814 );
3815 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3816 lang_registry.add(Arc::new(language));
3817
3818 // Connect to a server as 2 clients.
3819 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3820 let client_a = server.create_client(cx_a, "user_a").await;
3821 let mut client_b = server.create_client(cx_b, "user_b").await;
3822 server
3823 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3824 .await;
3825
3826 // Share a project as client A
3827 let project_a = cx_a.update(|cx| {
3828 Project::local(
3829 client_a.clone(),
3830 client_a.user_store.clone(),
3831 lang_registry.clone(),
3832 fs.clone(),
3833 cx,
3834 )
3835 });
3836 let (worktree_a, _) = project_a
3837 .update(cx_a, |p, cx| {
3838 p.find_or_create_local_worktree("/root-1", true, cx)
3839 })
3840 .await
3841 .unwrap();
3842 worktree_a
3843 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3844 .await;
3845 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3846
3847 // Join the worktree as client B.
3848 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3849
3850 // Open the file on client B.
3851 let buffer_b = cx_b
3852 .background()
3853 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3854 .await
3855 .unwrap();
3856
3857 // Request references to a symbol as the guest.
3858 let fake_language_server = fake_language_servers.next().await.unwrap();
3859 fake_language_server.handle_request::<lsp::request::References, _, _>(
3860 |params, _| async move {
3861 assert_eq!(
3862 params.text_document_position.text_document.uri.as_str(),
3863 "file:///root-1/one.rs"
3864 );
3865 Ok(Some(vec![
3866 lsp::Location {
3867 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3868 range: lsp::Range::new(
3869 lsp::Position::new(0, 24),
3870 lsp::Position::new(0, 27),
3871 ),
3872 },
3873 lsp::Location {
3874 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3875 range: lsp::Range::new(
3876 lsp::Position::new(0, 35),
3877 lsp::Position::new(0, 38),
3878 ),
3879 },
3880 lsp::Location {
3881 uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3882 range: lsp::Range::new(
3883 lsp::Position::new(0, 37),
3884 lsp::Position::new(0, 40),
3885 ),
3886 },
3887 ]))
3888 },
3889 );
3890
3891 let references = project_b
3892 .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3893 .await
3894 .unwrap();
3895 cx_b.read(|cx| {
3896 assert_eq!(references.len(), 3);
3897 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3898
3899 let two_buffer = references[0].buffer.read(cx);
3900 let three_buffer = references[2].buffer.read(cx);
3901 assert_eq!(
3902 two_buffer.file().unwrap().path().as_ref(),
3903 Path::new("two.rs")
3904 );
3905 assert_eq!(references[1].buffer, references[0].buffer);
3906 assert_eq!(
3907 three_buffer.file().unwrap().full_path(cx),
3908 Path::new("three.rs")
3909 );
3910
3911 assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3912 assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3913 assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3914 });
3915 }
3916
3917 #[gpui::test(iterations = 10)]
3918 async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3919 cx_a.foreground().forbid_parking();
3920 let lang_registry = Arc::new(LanguageRegistry::test());
3921 let fs = FakeFs::new(cx_a.background());
3922 fs.insert_tree(
3923 "/root-1",
3924 json!({
3925 "a": "hello world",
3926 "b": "goodnight moon",
3927 "c": "a world of goo",
3928 "d": "world champion of clown world",
3929 }),
3930 )
3931 .await;
3932 fs.insert_tree(
3933 "/root-2",
3934 json!({
3935 "e": "disney world is fun",
3936 }),
3937 )
3938 .await;
3939
3940 // Connect to a server as 2 clients.
3941 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3942 let client_a = server.create_client(cx_a, "user_a").await;
3943 let mut client_b = server.create_client(cx_b, "user_b").await;
3944 server
3945 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3946 .await;
3947
3948 // Share a project as client A
3949 let project_a = cx_a.update(|cx| {
3950 Project::local(
3951 client_a.clone(),
3952 client_a.user_store.clone(),
3953 lang_registry.clone(),
3954 fs.clone(),
3955 cx,
3956 )
3957 });
3958
3959 let (worktree_1, _) = project_a
3960 .update(cx_a, |p, cx| {
3961 p.find_or_create_local_worktree("/root-1", true, cx)
3962 })
3963 .await
3964 .unwrap();
3965 worktree_1
3966 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3967 .await;
3968 let (worktree_2, _) = project_a
3969 .update(cx_a, |p, cx| {
3970 p.find_or_create_local_worktree("/root-2", true, cx)
3971 })
3972 .await
3973 .unwrap();
3974 worktree_2
3975 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3976 .await;
3977
3978 // Join the worktree as client B.
3979 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3980 let results = project_b
3981 .update(cx_b, |project, cx| {
3982 project.search(SearchQuery::text("world", false, false), cx)
3983 })
3984 .await
3985 .unwrap();
3986
3987 let mut ranges_by_path = results
3988 .into_iter()
3989 .map(|(buffer, ranges)| {
3990 buffer.read_with(cx_b, |buffer, cx| {
3991 let path = buffer.file().unwrap().full_path(cx);
3992 let offset_ranges = ranges
3993 .into_iter()
3994 .map(|range| range.to_offset(buffer))
3995 .collect::<Vec<_>>();
3996 (path, offset_ranges)
3997 })
3998 })
3999 .collect::<Vec<_>>();
4000 ranges_by_path.sort_by_key(|(path, _)| path.clone());
4001
4002 assert_eq!(
4003 ranges_by_path,
4004 &[
4005 (PathBuf::from("root-1/a"), vec![6..11]),
4006 (PathBuf::from("root-1/c"), vec![2..7]),
4007 (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
4008 (PathBuf::from("root-2/e"), vec![7..12]),
4009 ]
4010 );
4011 }
4012
4013 #[gpui::test(iterations = 10)]
4014 async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4015 cx_a.foreground().forbid_parking();
4016 let lang_registry = Arc::new(LanguageRegistry::test());
4017 let fs = FakeFs::new(cx_a.background());
4018 fs.insert_tree(
4019 "/root-1",
4020 json!({
4021 "main.rs": "fn double(number: i32) -> i32 { number + number }",
4022 }),
4023 )
4024 .await;
4025
4026 // Set up a fake language server.
4027 let mut language = Language::new(
4028 LanguageConfig {
4029 name: "Rust".into(),
4030 path_suffixes: vec!["rs".to_string()],
4031 ..Default::default()
4032 },
4033 Some(tree_sitter_rust::language()),
4034 );
4035 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4036 lang_registry.add(Arc::new(language));
4037
4038 // Connect to a server as 2 clients.
4039 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4040 let client_a = server.create_client(cx_a, "user_a").await;
4041 let mut client_b = server.create_client(cx_b, "user_b").await;
4042 server
4043 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4044 .await;
4045
4046 // Share a project as client A
4047 let project_a = cx_a.update(|cx| {
4048 Project::local(
4049 client_a.clone(),
4050 client_a.user_store.clone(),
4051 lang_registry.clone(),
4052 fs.clone(),
4053 cx,
4054 )
4055 });
4056 let (worktree_a, _) = project_a
4057 .update(cx_a, |p, cx| {
4058 p.find_or_create_local_worktree("/root-1", true, cx)
4059 })
4060 .await
4061 .unwrap();
4062 worktree_a
4063 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4064 .await;
4065 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4066
4067 // Join the worktree as client B.
4068 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4069
4070 // Open the file on client B.
4071 let buffer_b = cx_b
4072 .background()
4073 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
4074 .await
4075 .unwrap();
4076
4077 // Request document highlights as the guest.
4078 let fake_language_server = fake_language_servers.next().await.unwrap();
4079 fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
4080 |params, _| async move {
4081 assert_eq!(
4082 params
4083 .text_document_position_params
4084 .text_document
4085 .uri
4086 .as_str(),
4087 "file:///root-1/main.rs"
4088 );
4089 assert_eq!(
4090 params.text_document_position_params.position,
4091 lsp::Position::new(0, 34)
4092 );
4093 Ok(Some(vec![
4094 lsp::DocumentHighlight {
4095 kind: Some(lsp::DocumentHighlightKind::WRITE),
4096 range: lsp::Range::new(
4097 lsp::Position::new(0, 10),
4098 lsp::Position::new(0, 16),
4099 ),
4100 },
4101 lsp::DocumentHighlight {
4102 kind: Some(lsp::DocumentHighlightKind::READ),
4103 range: lsp::Range::new(
4104 lsp::Position::new(0, 32),
4105 lsp::Position::new(0, 38),
4106 ),
4107 },
4108 lsp::DocumentHighlight {
4109 kind: Some(lsp::DocumentHighlightKind::READ),
4110 range: lsp::Range::new(
4111 lsp::Position::new(0, 41),
4112 lsp::Position::new(0, 47),
4113 ),
4114 },
4115 ]))
4116 },
4117 );
4118
4119 let highlights = project_b
4120 .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
4121 .await
4122 .unwrap();
4123 buffer_b.read_with(cx_b, |buffer, _| {
4124 let snapshot = buffer.snapshot();
4125
4126 let highlights = highlights
4127 .into_iter()
4128 .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
4129 .collect::<Vec<_>>();
4130 assert_eq!(
4131 highlights,
4132 &[
4133 (lsp::DocumentHighlightKind::WRITE, 10..16),
4134 (lsp::DocumentHighlightKind::READ, 32..38),
4135 (lsp::DocumentHighlightKind::READ, 41..47)
4136 ]
4137 )
4138 });
4139 }
4140
4141 #[gpui::test(iterations = 10)]
4142 async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4143 cx_a.foreground().forbid_parking();
4144 let lang_registry = Arc::new(LanguageRegistry::test());
4145 let fs = FakeFs::new(cx_a.background());
4146 fs.insert_tree(
4147 "/code",
4148 json!({
4149 "crate-1": {
4150 "one.rs": "const ONE: usize = 1;",
4151 },
4152 "crate-2": {
4153 "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
4154 },
4155 "private": {
4156 "passwords.txt": "the-password",
4157 }
4158 }),
4159 )
4160 .await;
4161
4162 // Set up a fake language server.
4163 let mut language = Language::new(
4164 LanguageConfig {
4165 name: "Rust".into(),
4166 path_suffixes: vec!["rs".to_string()],
4167 ..Default::default()
4168 },
4169 Some(tree_sitter_rust::language()),
4170 );
4171 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4172 lang_registry.add(Arc::new(language));
4173
4174 // Connect to a server as 2 clients.
4175 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4176 let client_a = server.create_client(cx_a, "user_a").await;
4177 let mut client_b = server.create_client(cx_b, "user_b").await;
4178 server
4179 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4180 .await;
4181
4182 // Share a project as client A
4183 let project_a = cx_a.update(|cx| {
4184 Project::local(
4185 client_a.clone(),
4186 client_a.user_store.clone(),
4187 lang_registry.clone(),
4188 fs.clone(),
4189 cx,
4190 )
4191 });
4192 let (worktree_a, _) = project_a
4193 .update(cx_a, |p, cx| {
4194 p.find_or_create_local_worktree("/code/crate-1", true, cx)
4195 })
4196 .await
4197 .unwrap();
4198 worktree_a
4199 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4200 .await;
4201 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4202
4203 // Join the worktree as client B.
4204 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4205
4206 // Cause the language server to start.
4207 let _buffer = cx_b
4208 .background()
4209 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
4210 .await
4211 .unwrap();
4212
4213 let fake_language_server = fake_language_servers.next().await.unwrap();
4214 fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
4215 |_, _| async move {
4216 #[allow(deprecated)]
4217 Ok(Some(vec![lsp::SymbolInformation {
4218 name: "TWO".into(),
4219 location: lsp::Location {
4220 uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
4221 range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
4222 },
4223 kind: lsp::SymbolKind::CONSTANT,
4224 tags: None,
4225 container_name: None,
4226 deprecated: None,
4227 }]))
4228 },
4229 );
4230
4231 // Request the definition of a symbol as the guest.
4232 let symbols = project_b
4233 .update(cx_b, |p, cx| p.symbols("two", cx))
4234 .await
4235 .unwrap();
4236 assert_eq!(symbols.len(), 1);
4237 assert_eq!(symbols[0].name, "TWO");
4238
4239 // Open one of the returned symbols.
4240 let buffer_b_2 = project_b
4241 .update(cx_b, |project, cx| {
4242 project.open_buffer_for_symbol(&symbols[0], cx)
4243 })
4244 .await
4245 .unwrap();
4246 buffer_b_2.read_with(cx_b, |buffer, _| {
4247 assert_eq!(
4248 buffer.file().unwrap().path().as_ref(),
4249 Path::new("../crate-2/two.rs")
4250 );
4251 });
4252
4253 // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
4254 let mut fake_symbol = symbols[0].clone();
4255 fake_symbol.path = Path::new("/code/secrets").into();
4256 let error = project_b
4257 .update(cx_b, |project, cx| {
4258 project.open_buffer_for_symbol(&fake_symbol, cx)
4259 })
4260 .await
4261 .unwrap_err();
4262 assert!(error.to_string().contains("invalid symbol signature"));
4263 }
4264
4265 #[gpui::test(iterations = 10)]
4266 async fn test_open_buffer_while_getting_definition_pointing_to_it(
4267 cx_a: &mut TestAppContext,
4268 cx_b: &mut TestAppContext,
4269 mut rng: StdRng,
4270 ) {
4271 cx_a.foreground().forbid_parking();
4272 let lang_registry = Arc::new(LanguageRegistry::test());
4273 let fs = FakeFs::new(cx_a.background());
4274 fs.insert_tree(
4275 "/root",
4276 json!({
4277 "a.rs": "const ONE: usize = b::TWO;",
4278 "b.rs": "const TWO: usize = 2",
4279 }),
4280 )
4281 .await;
4282
4283 // Set up a fake language server.
4284 let mut language = Language::new(
4285 LanguageConfig {
4286 name: "Rust".into(),
4287 path_suffixes: vec!["rs".to_string()],
4288 ..Default::default()
4289 },
4290 Some(tree_sitter_rust::language()),
4291 );
4292 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4293 lang_registry.add(Arc::new(language));
4294
4295 // Connect to a server as 2 clients.
4296 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4297 let client_a = server.create_client(cx_a, "user_a").await;
4298 let mut client_b = server.create_client(cx_b, "user_b").await;
4299 server
4300 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4301 .await;
4302
4303 // Share a project as client A
4304 let project_a = cx_a.update(|cx| {
4305 Project::local(
4306 client_a.clone(),
4307 client_a.user_store.clone(),
4308 lang_registry.clone(),
4309 fs.clone(),
4310 cx,
4311 )
4312 });
4313
4314 let (worktree_a, _) = project_a
4315 .update(cx_a, |p, cx| {
4316 p.find_or_create_local_worktree("/root", true, cx)
4317 })
4318 .await
4319 .unwrap();
4320 worktree_a
4321 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4322 .await;
4323 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4324
4325 // Join the project as client B.
4326 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4327
4328 let buffer_b1 = cx_b
4329 .background()
4330 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
4331 .await
4332 .unwrap();
4333
4334 let fake_language_server = fake_language_servers.next().await.unwrap();
4335 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
4336 |_, _| async move {
4337 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
4338 lsp::Location::new(
4339 lsp::Url::from_file_path("/root/b.rs").unwrap(),
4340 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
4341 ),
4342 )))
4343 },
4344 );
4345
4346 let definitions;
4347 let buffer_b2;
4348 if rng.gen() {
4349 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4350 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4351 } else {
4352 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4353 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4354 }
4355
4356 let buffer_b2 = buffer_b2.await.unwrap();
4357 let definitions = definitions.await.unwrap();
4358 assert_eq!(definitions.len(), 1);
4359 assert_eq!(definitions[0].buffer, buffer_b2);
4360 }
4361
4362 #[gpui::test(iterations = 10)]
4363 async fn test_collaborating_with_code_actions(
4364 cx_a: &mut TestAppContext,
4365 cx_b: &mut TestAppContext,
4366 ) {
4367 cx_a.foreground().forbid_parking();
4368 let lang_registry = Arc::new(LanguageRegistry::test());
4369 let fs = FakeFs::new(cx_a.background());
4370 cx_b.update(|cx| editor::init(cx));
4371
4372 // Set up a fake language server.
4373 let mut language = Language::new(
4374 LanguageConfig {
4375 name: "Rust".into(),
4376 path_suffixes: vec!["rs".to_string()],
4377 ..Default::default()
4378 },
4379 Some(tree_sitter_rust::language()),
4380 );
4381 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4382 lang_registry.add(Arc::new(language));
4383
4384 // Connect to a server as 2 clients.
4385 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4386 let client_a = server.create_client(cx_a, "user_a").await;
4387 let mut client_b = server.create_client(cx_b, "user_b").await;
4388 server
4389 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4390 .await;
4391
4392 // Share a project as client A
4393 fs.insert_tree(
4394 "/a",
4395 json!({
4396 "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
4397 "other.rs": "pub fn foo() -> usize { 4 }",
4398 }),
4399 )
4400 .await;
4401 let project_a = cx_a.update(|cx| {
4402 Project::local(
4403 client_a.clone(),
4404 client_a.user_store.clone(),
4405 lang_registry.clone(),
4406 fs.clone(),
4407 cx,
4408 )
4409 });
4410 let (worktree_a, _) = project_a
4411 .update(cx_a, |p, cx| {
4412 p.find_or_create_local_worktree("/a", true, cx)
4413 })
4414 .await
4415 .unwrap();
4416 worktree_a
4417 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4418 .await;
4419 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4420
4421 // Join the project as client B.
4422 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4423 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(project_b.clone(), cx));
4424 let editor_b = workspace_b
4425 .update(cx_b, |workspace, cx| {
4426 workspace.open_path((worktree_id, "main.rs"), true, cx)
4427 })
4428 .await
4429 .unwrap()
4430 .downcast::<Editor>()
4431 .unwrap();
4432
4433 let mut fake_language_server = fake_language_servers.next().await.unwrap();
4434 fake_language_server
4435 .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4436 assert_eq!(
4437 params.text_document.uri,
4438 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4439 );
4440 assert_eq!(params.range.start, lsp::Position::new(0, 0));
4441 assert_eq!(params.range.end, lsp::Position::new(0, 0));
4442 Ok(None)
4443 })
4444 .next()
4445 .await;
4446
4447 // Move cursor to a location that contains code actions.
4448 editor_b.update(cx_b, |editor, cx| {
4449 editor.change_selections(None, cx, |s| {
4450 s.select_ranges([Point::new(1, 31)..Point::new(1, 31)])
4451 });
4452 cx.focus(&editor_b);
4453 });
4454
4455 fake_language_server
4456 .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4457 assert_eq!(
4458 params.text_document.uri,
4459 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4460 );
4461 assert_eq!(params.range.start, lsp::Position::new(1, 31));
4462 assert_eq!(params.range.end, lsp::Position::new(1, 31));
4463
4464 Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
4465 lsp::CodeAction {
4466 title: "Inline into all callers".to_string(),
4467 edit: Some(lsp::WorkspaceEdit {
4468 changes: Some(
4469 [
4470 (
4471 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4472 vec![lsp::TextEdit::new(
4473 lsp::Range::new(
4474 lsp::Position::new(1, 22),
4475 lsp::Position::new(1, 34),
4476 ),
4477 "4".to_string(),
4478 )],
4479 ),
4480 (
4481 lsp::Url::from_file_path("/a/other.rs").unwrap(),
4482 vec![lsp::TextEdit::new(
4483 lsp::Range::new(
4484 lsp::Position::new(0, 0),
4485 lsp::Position::new(0, 27),
4486 ),
4487 "".to_string(),
4488 )],
4489 ),
4490 ]
4491 .into_iter()
4492 .collect(),
4493 ),
4494 ..Default::default()
4495 }),
4496 data: Some(json!({
4497 "codeActionParams": {
4498 "range": {
4499 "start": {"line": 1, "column": 31},
4500 "end": {"line": 1, "column": 31},
4501 }
4502 }
4503 })),
4504 ..Default::default()
4505 },
4506 )]))
4507 })
4508 .next()
4509 .await;
4510
4511 // Toggle code actions and wait for them to display.
4512 editor_b.update(cx_b, |editor, cx| {
4513 editor.toggle_code_actions(
4514 &ToggleCodeActions {
4515 deployed_from_indicator: false,
4516 },
4517 cx,
4518 );
4519 });
4520 editor_b
4521 .condition(&cx_b, |editor, _| editor.context_menu_visible())
4522 .await;
4523
4524 fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
4525
4526 // Confirming the code action will trigger a resolve request.
4527 let confirm_action = workspace_b
4528 .update(cx_b, |workspace, cx| {
4529 Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
4530 })
4531 .unwrap();
4532 fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
4533 |_, _| async move {
4534 Ok(lsp::CodeAction {
4535 title: "Inline into all callers".to_string(),
4536 edit: Some(lsp::WorkspaceEdit {
4537 changes: Some(
4538 [
4539 (
4540 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4541 vec![lsp::TextEdit::new(
4542 lsp::Range::new(
4543 lsp::Position::new(1, 22),
4544 lsp::Position::new(1, 34),
4545 ),
4546 "4".to_string(),
4547 )],
4548 ),
4549 (
4550 lsp::Url::from_file_path("/a/other.rs").unwrap(),
4551 vec![lsp::TextEdit::new(
4552 lsp::Range::new(
4553 lsp::Position::new(0, 0),
4554 lsp::Position::new(0, 27),
4555 ),
4556 "".to_string(),
4557 )],
4558 ),
4559 ]
4560 .into_iter()
4561 .collect(),
4562 ),
4563 ..Default::default()
4564 }),
4565 ..Default::default()
4566 })
4567 },
4568 );
4569
4570 // After the action is confirmed, an editor containing both modified files is opened.
4571 confirm_action.await.unwrap();
4572 let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4573 workspace
4574 .active_item(cx)
4575 .unwrap()
4576 .downcast::<Editor>()
4577 .unwrap()
4578 });
4579 code_action_editor.update(cx_b, |editor, cx| {
4580 assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4581 editor.undo(&Undo, cx);
4582 assert_eq!(
4583 editor.text(cx),
4584 "mod other;\nfn main() { let foo = other::foo(); }\npub fn foo() -> usize { 4 }"
4585 );
4586 editor.redo(&Redo, cx);
4587 assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4588 });
4589 }
4590
4591 #[gpui::test(iterations = 10)]
4592 async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4593 cx_a.foreground().forbid_parking();
4594 let lang_registry = Arc::new(LanguageRegistry::test());
4595 let fs = FakeFs::new(cx_a.background());
4596 cx_b.update(|cx| editor::init(cx));
4597
4598 // Set up a fake language server.
4599 let mut language = Language::new(
4600 LanguageConfig {
4601 name: "Rust".into(),
4602 path_suffixes: vec!["rs".to_string()],
4603 ..Default::default()
4604 },
4605 Some(tree_sitter_rust::language()),
4606 );
4607 let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
4608 capabilities: lsp::ServerCapabilities {
4609 rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
4610 prepare_provider: Some(true),
4611 work_done_progress_options: Default::default(),
4612 })),
4613 ..Default::default()
4614 },
4615 ..Default::default()
4616 });
4617 lang_registry.add(Arc::new(language));
4618
4619 // Connect to a server as 2 clients.
4620 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4621 let client_a = server.create_client(cx_a, "user_a").await;
4622 let mut client_b = server.create_client(cx_b, "user_b").await;
4623 server
4624 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4625 .await;
4626
4627 // Share a project as client A
4628 fs.insert_tree(
4629 "/dir",
4630 json!({
4631 "one.rs": "const ONE: usize = 1;",
4632 "two.rs": "const TWO: usize = one::ONE + one::ONE;"
4633 }),
4634 )
4635 .await;
4636 let project_a = cx_a.update(|cx| {
4637 Project::local(
4638 client_a.clone(),
4639 client_a.user_store.clone(),
4640 lang_registry.clone(),
4641 fs.clone(),
4642 cx,
4643 )
4644 });
4645 let (worktree_a, _) = project_a
4646 .update(cx_a, |p, cx| {
4647 p.find_or_create_local_worktree("/dir", true, cx)
4648 })
4649 .await
4650 .unwrap();
4651 worktree_a
4652 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4653 .await;
4654 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4655
4656 // Join the worktree as client B.
4657 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4658 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(project_b.clone(), cx));
4659 let editor_b = workspace_b
4660 .update(cx_b, |workspace, cx| {
4661 workspace.open_path((worktree_id, "one.rs"), true, cx)
4662 })
4663 .await
4664 .unwrap()
4665 .downcast::<Editor>()
4666 .unwrap();
4667 let fake_language_server = fake_language_servers.next().await.unwrap();
4668
4669 // Move cursor to a location that can be renamed.
4670 let prepare_rename = editor_b.update(cx_b, |editor, cx| {
4671 editor.change_selections(None, cx, |s| s.select_ranges([7..7]));
4672 editor.rename(&Rename, cx).unwrap()
4673 });
4674
4675 fake_language_server
4676 .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
4677 assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
4678 assert_eq!(params.position, lsp::Position::new(0, 7));
4679 Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4680 lsp::Position::new(0, 6),
4681 lsp::Position::new(0, 9),
4682 ))))
4683 })
4684 .next()
4685 .await
4686 .unwrap();
4687 prepare_rename.await.unwrap();
4688 editor_b.update(cx_b, |editor, cx| {
4689 let rename = editor.pending_rename().unwrap();
4690 let buffer = editor.buffer().read(cx).snapshot(cx);
4691 assert_eq!(
4692 rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
4693 6..9
4694 );
4695 rename.editor.update(cx, |rename_editor, cx| {
4696 rename_editor.buffer().update(cx, |rename_buffer, cx| {
4697 rename_buffer.edit([(0..3, "THREE")], cx);
4698 });
4699 });
4700 });
4701
4702 let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
4703 Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
4704 });
4705 fake_language_server
4706 .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
4707 assert_eq!(
4708 params.text_document_position.text_document.uri.as_str(),
4709 "file:///dir/one.rs"
4710 );
4711 assert_eq!(
4712 params.text_document_position.position,
4713 lsp::Position::new(0, 6)
4714 );
4715 assert_eq!(params.new_name, "THREE");
4716 Ok(Some(lsp::WorkspaceEdit {
4717 changes: Some(
4718 [
4719 (
4720 lsp::Url::from_file_path("/dir/one.rs").unwrap(),
4721 vec![lsp::TextEdit::new(
4722 lsp::Range::new(
4723 lsp::Position::new(0, 6),
4724 lsp::Position::new(0, 9),
4725 ),
4726 "THREE".to_string(),
4727 )],
4728 ),
4729 (
4730 lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4731 vec![
4732 lsp::TextEdit::new(
4733 lsp::Range::new(
4734 lsp::Position::new(0, 24),
4735 lsp::Position::new(0, 27),
4736 ),
4737 "THREE".to_string(),
4738 ),
4739 lsp::TextEdit::new(
4740 lsp::Range::new(
4741 lsp::Position::new(0, 35),
4742 lsp::Position::new(0, 38),
4743 ),
4744 "THREE".to_string(),
4745 ),
4746 ],
4747 ),
4748 ]
4749 .into_iter()
4750 .collect(),
4751 ),
4752 ..Default::default()
4753 }))
4754 })
4755 .next()
4756 .await
4757 .unwrap();
4758 confirm_rename.await.unwrap();
4759
4760 let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4761 workspace
4762 .active_item(cx)
4763 .unwrap()
4764 .downcast::<Editor>()
4765 .unwrap()
4766 });
4767 rename_editor.update(cx_b, |editor, cx| {
4768 assert_eq!(
4769 editor.text(cx),
4770 "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4771 );
4772 editor.undo(&Undo, cx);
4773 assert_eq!(
4774 editor.text(cx),
4775 "const ONE: usize = 1;\nconst TWO: usize = one::ONE + one::ONE;"
4776 );
4777 editor.redo(&Redo, cx);
4778 assert_eq!(
4779 editor.text(cx),
4780 "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4781 );
4782 });
4783
4784 // Ensure temporary rename edits cannot be undone/redone.
4785 editor_b.update(cx_b, |editor, cx| {
4786 editor.undo(&Undo, cx);
4787 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4788 editor.undo(&Undo, cx);
4789 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4790 editor.redo(&Redo, cx);
4791 assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4792 })
4793 }
4794
4795 #[gpui::test(iterations = 10)]
4796 async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4797 cx_a.foreground().forbid_parking();
4798
4799 // Connect to a server as 2 clients.
4800 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4801 let client_a = server.create_client(cx_a, "user_a").await;
4802 let client_b = server.create_client(cx_b, "user_b").await;
4803
4804 // Create an org that includes these 2 users.
4805 let db = &server.app_state.db;
4806 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4807 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4808 .await
4809 .unwrap();
4810 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4811 .await
4812 .unwrap();
4813
4814 // Create a channel that includes all the users.
4815 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4816 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4817 .await
4818 .unwrap();
4819 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4820 .await
4821 .unwrap();
4822 db.create_channel_message(
4823 channel_id,
4824 client_b.current_user_id(&cx_b),
4825 "hello A, it's B.",
4826 OffsetDateTime::now_utc(),
4827 1,
4828 )
4829 .await
4830 .unwrap();
4831
4832 let channels_a = cx_a
4833 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4834 channels_a
4835 .condition(cx_a, |list, _| list.available_channels().is_some())
4836 .await;
4837 channels_a.read_with(cx_a, |list, _| {
4838 assert_eq!(
4839 list.available_channels().unwrap(),
4840 &[ChannelDetails {
4841 id: channel_id.to_proto(),
4842 name: "test-channel".to_string()
4843 }]
4844 )
4845 });
4846 let channel_a = channels_a.update(cx_a, |this, cx| {
4847 this.get_channel(channel_id.to_proto(), cx).unwrap()
4848 });
4849 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4850 channel_a
4851 .condition(&cx_a, |channel, _| {
4852 channel_messages(channel)
4853 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4854 })
4855 .await;
4856
4857 let channels_b = cx_b
4858 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4859 channels_b
4860 .condition(cx_b, |list, _| list.available_channels().is_some())
4861 .await;
4862 channels_b.read_with(cx_b, |list, _| {
4863 assert_eq!(
4864 list.available_channels().unwrap(),
4865 &[ChannelDetails {
4866 id: channel_id.to_proto(),
4867 name: "test-channel".to_string()
4868 }]
4869 )
4870 });
4871
4872 let channel_b = channels_b.update(cx_b, |this, cx| {
4873 this.get_channel(channel_id.to_proto(), cx).unwrap()
4874 });
4875 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4876 channel_b
4877 .condition(&cx_b, |channel, _| {
4878 channel_messages(channel)
4879 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4880 })
4881 .await;
4882
4883 channel_a
4884 .update(cx_a, |channel, cx| {
4885 channel
4886 .send_message("oh, hi B.".to_string(), cx)
4887 .unwrap()
4888 .detach();
4889 let task = channel.send_message("sup".to_string(), cx).unwrap();
4890 assert_eq!(
4891 channel_messages(channel),
4892 &[
4893 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4894 ("user_a".to_string(), "oh, hi B.".to_string(), true),
4895 ("user_a".to_string(), "sup".to_string(), true)
4896 ]
4897 );
4898 task
4899 })
4900 .await
4901 .unwrap();
4902
4903 channel_b
4904 .condition(&cx_b, |channel, _| {
4905 channel_messages(channel)
4906 == [
4907 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4908 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4909 ("user_a".to_string(), "sup".to_string(), false),
4910 ]
4911 })
4912 .await;
4913
4914 assert_eq!(
4915 server
4916 .state()
4917 .await
4918 .channel(channel_id)
4919 .unwrap()
4920 .connection_ids
4921 .len(),
4922 2
4923 );
4924 cx_b.update(|_| drop(channel_b));
4925 server
4926 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4927 .await;
4928
4929 cx_a.update(|_| drop(channel_a));
4930 server
4931 .condition(|state| state.channel(channel_id).is_none())
4932 .await;
4933 }
4934
4935 #[gpui::test(iterations = 10)]
4936 async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4937 cx_a.foreground().forbid_parking();
4938
4939 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4940 let client_a = server.create_client(cx_a, "user_a").await;
4941
4942 let db = &server.app_state.db;
4943 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4944 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4945 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4946 .await
4947 .unwrap();
4948 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4949 .await
4950 .unwrap();
4951
4952 let channels_a = cx_a
4953 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4954 channels_a
4955 .condition(cx_a, |list, _| list.available_channels().is_some())
4956 .await;
4957 let channel_a = channels_a.update(cx_a, |this, cx| {
4958 this.get_channel(channel_id.to_proto(), cx).unwrap()
4959 });
4960
4961 // Messages aren't allowed to be too long.
4962 channel_a
4963 .update(cx_a, |channel, cx| {
4964 let long_body = "this is long.\n".repeat(1024);
4965 channel.send_message(long_body, cx).unwrap()
4966 })
4967 .await
4968 .unwrap_err();
4969
4970 // Messages aren't allowed to be blank.
4971 channel_a.update(cx_a, |channel, cx| {
4972 channel.send_message(String::new(), cx).unwrap_err()
4973 });
4974
4975 // Leading and trailing whitespace are trimmed.
4976 channel_a
4977 .update(cx_a, |channel, cx| {
4978 channel
4979 .send_message("\n surrounded by whitespace \n".to_string(), cx)
4980 .unwrap()
4981 })
4982 .await
4983 .unwrap();
4984 assert_eq!(
4985 db.get_channel_messages(channel_id, 10, None)
4986 .await
4987 .unwrap()
4988 .iter()
4989 .map(|m| &m.body)
4990 .collect::<Vec<_>>(),
4991 &["surrounded by whitespace"]
4992 );
4993 }
4994
4995 #[gpui::test(iterations = 10)]
4996 async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4997 cx_a.foreground().forbid_parking();
4998
4999 // Connect to a server as 2 clients.
5000 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5001 let client_a = server.create_client(cx_a, "user_a").await;
5002 let client_b = server.create_client(cx_b, "user_b").await;
5003 let mut status_b = client_b.status();
5004
5005 // Create an org that includes these 2 users.
5006 let db = &server.app_state.db;
5007 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
5008 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
5009 .await
5010 .unwrap();
5011 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
5012 .await
5013 .unwrap();
5014
5015 // Create a channel that includes all the users.
5016 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
5017 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
5018 .await
5019 .unwrap();
5020 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
5021 .await
5022 .unwrap();
5023 db.create_channel_message(
5024 channel_id,
5025 client_b.current_user_id(&cx_b),
5026 "hello A, it's B.",
5027 OffsetDateTime::now_utc(),
5028 2,
5029 )
5030 .await
5031 .unwrap();
5032
5033 let channels_a = cx_a
5034 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
5035 channels_a
5036 .condition(cx_a, |list, _| list.available_channels().is_some())
5037 .await;
5038
5039 channels_a.read_with(cx_a, |list, _| {
5040 assert_eq!(
5041 list.available_channels().unwrap(),
5042 &[ChannelDetails {
5043 id: channel_id.to_proto(),
5044 name: "test-channel".to_string()
5045 }]
5046 )
5047 });
5048 let channel_a = channels_a.update(cx_a, |this, cx| {
5049 this.get_channel(channel_id.to_proto(), cx).unwrap()
5050 });
5051 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
5052 channel_a
5053 .condition(&cx_a, |channel, _| {
5054 channel_messages(channel)
5055 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
5056 })
5057 .await;
5058
5059 let channels_b = cx_b
5060 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
5061 channels_b
5062 .condition(cx_b, |list, _| list.available_channels().is_some())
5063 .await;
5064 channels_b.read_with(cx_b, |list, _| {
5065 assert_eq!(
5066 list.available_channels().unwrap(),
5067 &[ChannelDetails {
5068 id: channel_id.to_proto(),
5069 name: "test-channel".to_string()
5070 }]
5071 )
5072 });
5073
5074 let channel_b = channels_b.update(cx_b, |this, cx| {
5075 this.get_channel(channel_id.to_proto(), cx).unwrap()
5076 });
5077 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
5078 channel_b
5079 .condition(&cx_b, |channel, _| {
5080 channel_messages(channel)
5081 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
5082 })
5083 .await;
5084
5085 // Disconnect client B, ensuring we can still access its cached channel data.
5086 server.forbid_connections();
5087 server.disconnect_client(client_b.current_user_id(&cx_b));
5088 cx_b.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
5089 while !matches!(
5090 status_b.next().await,
5091 Some(client::Status::ReconnectionError { .. })
5092 ) {}
5093
5094 channels_b.read_with(cx_b, |channels, _| {
5095 assert_eq!(
5096 channels.available_channels().unwrap(),
5097 [ChannelDetails {
5098 id: channel_id.to_proto(),
5099 name: "test-channel".to_string()
5100 }]
5101 )
5102 });
5103 channel_b.read_with(cx_b, |channel, _| {
5104 assert_eq!(
5105 channel_messages(channel),
5106 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
5107 )
5108 });
5109
5110 // Send a message from client B while it is disconnected.
5111 channel_b
5112 .update(cx_b, |channel, cx| {
5113 let task = channel
5114 .send_message("can you see this?".to_string(), cx)
5115 .unwrap();
5116 assert_eq!(
5117 channel_messages(channel),
5118 &[
5119 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5120 ("user_b".to_string(), "can you see this?".to_string(), true)
5121 ]
5122 );
5123 task
5124 })
5125 .await
5126 .unwrap_err();
5127
5128 // Send a message from client A while B is disconnected.
5129 channel_a
5130 .update(cx_a, |channel, cx| {
5131 channel
5132 .send_message("oh, hi B.".to_string(), cx)
5133 .unwrap()
5134 .detach();
5135 let task = channel.send_message("sup".to_string(), cx).unwrap();
5136 assert_eq!(
5137 channel_messages(channel),
5138 &[
5139 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5140 ("user_a".to_string(), "oh, hi B.".to_string(), true),
5141 ("user_a".to_string(), "sup".to_string(), true)
5142 ]
5143 );
5144 task
5145 })
5146 .await
5147 .unwrap();
5148
5149 // Give client B a chance to reconnect.
5150 server.allow_connections();
5151 cx_b.foreground().advance_clock(Duration::from_secs(10));
5152
5153 // Verify that B sees the new messages upon reconnection, as well as the message client B
5154 // sent while offline.
5155 channel_b
5156 .condition(&cx_b, |channel, _| {
5157 channel_messages(channel)
5158 == [
5159 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5160 ("user_a".to_string(), "oh, hi B.".to_string(), false),
5161 ("user_a".to_string(), "sup".to_string(), false),
5162 ("user_b".to_string(), "can you see this?".to_string(), false),
5163 ]
5164 })
5165 .await;
5166
5167 // Ensure client A and B can communicate normally after reconnection.
5168 channel_a
5169 .update(cx_a, |channel, cx| {
5170 channel.send_message("you online?".to_string(), cx).unwrap()
5171 })
5172 .await
5173 .unwrap();
5174 channel_b
5175 .condition(&cx_b, |channel, _| {
5176 channel_messages(channel)
5177 == [
5178 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5179 ("user_a".to_string(), "oh, hi B.".to_string(), false),
5180 ("user_a".to_string(), "sup".to_string(), false),
5181 ("user_b".to_string(), "can you see this?".to_string(), false),
5182 ("user_a".to_string(), "you online?".to_string(), false),
5183 ]
5184 })
5185 .await;
5186
5187 channel_b
5188 .update(cx_b, |channel, cx| {
5189 channel.send_message("yep".to_string(), cx).unwrap()
5190 })
5191 .await
5192 .unwrap();
5193 channel_a
5194 .condition(&cx_a, |channel, _| {
5195 channel_messages(channel)
5196 == [
5197 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5198 ("user_a".to_string(), "oh, hi B.".to_string(), false),
5199 ("user_a".to_string(), "sup".to_string(), false),
5200 ("user_b".to_string(), "can you see this?".to_string(), false),
5201 ("user_a".to_string(), "you online?".to_string(), false),
5202 ("user_b".to_string(), "yep".to_string(), false),
5203 ]
5204 })
5205 .await;
5206 }
5207
5208 #[gpui::test(iterations = 10)]
5209 async fn test_contacts(
5210 deterministic: Arc<Deterministic>,
5211 cx_a: &mut TestAppContext,
5212 cx_b: &mut TestAppContext,
5213 cx_c: &mut TestAppContext,
5214 ) {
5215 cx_a.foreground().forbid_parking();
5216
5217 // Connect to a server as 3 clients.
5218 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5219 let mut client_a = server.create_client(cx_a, "user_a").await;
5220 let mut client_b = server.create_client(cx_b, "user_b").await;
5221 let client_c = server.create_client(cx_c, "user_c").await;
5222 server
5223 .make_contacts(vec![
5224 (&client_a, cx_a),
5225 (&client_b, cx_b),
5226 (&client_c, cx_c),
5227 ])
5228 .await;
5229
5230 deterministic.run_until_parked();
5231 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5232 client.user_store.read_with(*cx, |store, _| {
5233 assert_eq!(
5234 contacts(store),
5235 [
5236 ("user_a", true, vec![]),
5237 ("user_b", true, vec![]),
5238 ("user_c", true, vec![])
5239 ],
5240 "{} has the wrong contacts",
5241 client.username
5242 )
5243 });
5244 }
5245
5246 // Share a project as client A.
5247 let fs = FakeFs::new(cx_a.background());
5248 fs.create_dir(Path::new("/a")).await.unwrap();
5249 let (project_a, _) = client_a.build_local_project(fs, "/a", cx_a).await;
5250
5251 deterministic.run_until_parked();
5252 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5253 client.user_store.read_with(*cx, |store, _| {
5254 assert_eq!(
5255 contacts(store),
5256 [
5257 ("user_a", true, vec![("a", vec![])]),
5258 ("user_b", true, vec![]),
5259 ("user_c", true, vec![])
5260 ],
5261 "{} has the wrong contacts",
5262 client.username
5263 )
5264 });
5265 }
5266
5267 let _project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5268
5269 deterministic.run_until_parked();
5270 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5271 client.user_store.read_with(*cx, |store, _| {
5272 assert_eq!(
5273 contacts(store),
5274 [
5275 ("user_a", true, vec![("a", vec!["user_b"])]),
5276 ("user_b", true, vec![]),
5277 ("user_c", true, vec![])
5278 ],
5279 "{} has the wrong contacts",
5280 client.username
5281 )
5282 });
5283 }
5284
5285 // Add a local project as client B
5286 let fs = FakeFs::new(cx_b.background());
5287 fs.create_dir(Path::new("/b")).await.unwrap();
5288 let (_project_b, _) = client_b.build_local_project(fs, "/b", cx_a).await;
5289
5290 deterministic.run_until_parked();
5291 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5292 client.user_store.read_with(*cx, |store, _| {
5293 assert_eq!(
5294 contacts(store),
5295 [
5296 ("user_a", true, vec![("a", vec!["user_b"])]),
5297 ("user_b", true, vec![("b", vec![])]),
5298 ("user_c", true, vec![])
5299 ],
5300 "{} has the wrong contacts",
5301 client.username
5302 )
5303 });
5304 }
5305
5306 project_a
5307 .condition(&cx_a, |project, _| {
5308 project.collaborators().contains_key(&client_b.peer_id)
5309 })
5310 .await;
5311
5312 client_a.project.take();
5313 cx_a.update(move |_| drop(project_a));
5314 deterministic.run_until_parked();
5315 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5316 client.user_store.read_with(*cx, |store, _| {
5317 assert_eq!(
5318 contacts(store),
5319 [
5320 ("user_a", true, vec![]),
5321 ("user_b", true, vec![("b", vec![])]),
5322 ("user_c", true, vec![])
5323 ],
5324 "{} has the wrong contacts",
5325 client.username
5326 )
5327 });
5328 }
5329
5330 server.disconnect_client(client_c.current_user_id(cx_c));
5331 server.forbid_connections();
5332 deterministic.advance_clock(rpc::RECEIVE_TIMEOUT);
5333 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b)] {
5334 client.user_store.read_with(*cx, |store, _| {
5335 assert_eq!(
5336 contacts(store),
5337 [
5338 ("user_a", true, vec![]),
5339 ("user_b", true, vec![("b", vec![])]),
5340 ("user_c", false, vec![])
5341 ],
5342 "{} has the wrong contacts",
5343 client.username
5344 )
5345 });
5346 }
5347 client_c
5348 .user_store
5349 .read_with(cx_c, |store, _| assert_eq!(contacts(store), []));
5350
5351 server.allow_connections();
5352 client_c
5353 .authenticate_and_connect(false, &cx_c.to_async())
5354 .await
5355 .unwrap();
5356
5357 deterministic.run_until_parked();
5358 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5359 client.user_store.read_with(*cx, |store, _| {
5360 assert_eq!(
5361 contacts(store),
5362 [
5363 ("user_a", true, vec![]),
5364 ("user_b", true, vec![("b", vec![])]),
5365 ("user_c", true, vec![])
5366 ],
5367 "{} has the wrong contacts",
5368 client.username
5369 )
5370 });
5371 }
5372
5373 fn contacts(user_store: &UserStore) -> Vec<(&str, bool, Vec<(&str, Vec<&str>)>)> {
5374 user_store
5375 .contacts()
5376 .iter()
5377 .map(|contact| {
5378 let projects = contact
5379 .projects
5380 .iter()
5381 .map(|p| {
5382 (
5383 p.worktree_root_names[0].as_str(),
5384 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
5385 )
5386 })
5387 .collect();
5388 (contact.user.github_login.as_str(), contact.online, projects)
5389 })
5390 .collect()
5391 }
5392 }
5393
5394 #[gpui::test(iterations = 10)]
5395 async fn test_contact_requests(
5396 executor: Arc<Deterministic>,
5397 cx_a: &mut TestAppContext,
5398 cx_a2: &mut TestAppContext,
5399 cx_b: &mut TestAppContext,
5400 cx_b2: &mut TestAppContext,
5401 cx_c: &mut TestAppContext,
5402 cx_c2: &mut TestAppContext,
5403 ) {
5404 cx_a.foreground().forbid_parking();
5405
5406 // Connect to a server as 3 clients.
5407 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5408 let client_a = server.create_client(cx_a, "user_a").await;
5409 let client_a2 = server.create_client(cx_a2, "user_a").await;
5410 let client_b = server.create_client(cx_b, "user_b").await;
5411 let client_b2 = server.create_client(cx_b2, "user_b").await;
5412 let client_c = server.create_client(cx_c, "user_c").await;
5413 let client_c2 = server.create_client(cx_c2, "user_c").await;
5414
5415 assert_eq!(client_a.user_id().unwrap(), client_a2.user_id().unwrap());
5416 assert_eq!(client_b.user_id().unwrap(), client_b2.user_id().unwrap());
5417 assert_eq!(client_c.user_id().unwrap(), client_c2.user_id().unwrap());
5418
5419 // User A and User C request that user B become their contact.
5420 client_a
5421 .user_store
5422 .update(cx_a, |store, cx| {
5423 store.request_contact(client_b.user_id().unwrap(), cx)
5424 })
5425 .await
5426 .unwrap();
5427 client_c
5428 .user_store
5429 .update(cx_c, |store, cx| {
5430 store.request_contact(client_b.user_id().unwrap(), cx)
5431 })
5432 .await
5433 .unwrap();
5434 executor.run_until_parked();
5435
5436 // All users see the pending request appear in all their clients.
5437 assert_eq!(
5438 client_a.summarize_contacts(&cx_a).outgoing_requests,
5439 &["user_b"]
5440 );
5441 assert_eq!(
5442 client_a2.summarize_contacts(&cx_a2).outgoing_requests,
5443 &["user_b"]
5444 );
5445 assert_eq!(
5446 client_b.summarize_contacts(&cx_b).incoming_requests,
5447 &["user_a", "user_c"]
5448 );
5449 assert_eq!(
5450 client_b2.summarize_contacts(&cx_b2).incoming_requests,
5451 &["user_a", "user_c"]
5452 );
5453 assert_eq!(
5454 client_c.summarize_contacts(&cx_c).outgoing_requests,
5455 &["user_b"]
5456 );
5457 assert_eq!(
5458 client_c2.summarize_contacts(&cx_c2).outgoing_requests,
5459 &["user_b"]
5460 );
5461
5462 // Contact requests are present upon connecting (tested here via disconnect/reconnect)
5463 disconnect_and_reconnect(&client_a, cx_a).await;
5464 disconnect_and_reconnect(&client_b, cx_b).await;
5465 disconnect_and_reconnect(&client_c, cx_c).await;
5466 executor.run_until_parked();
5467 assert_eq!(
5468 client_a.summarize_contacts(&cx_a).outgoing_requests,
5469 &["user_b"]
5470 );
5471 assert_eq!(
5472 client_b.summarize_contacts(&cx_b).incoming_requests,
5473 &["user_a", "user_c"]
5474 );
5475 assert_eq!(
5476 client_c.summarize_contacts(&cx_c).outgoing_requests,
5477 &["user_b"]
5478 );
5479
5480 // User B accepts the request from user A.
5481 client_b
5482 .user_store
5483 .update(cx_b, |store, cx| {
5484 store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
5485 })
5486 .await
5487 .unwrap();
5488
5489 executor.run_until_parked();
5490
5491 // User B sees user A as their contact now in all client, and the incoming request from them is removed.
5492 let contacts_b = client_b.summarize_contacts(&cx_b);
5493 assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5494 assert_eq!(contacts_b.incoming_requests, &["user_c"]);
5495 let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5496 assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5497 assert_eq!(contacts_b2.incoming_requests, &["user_c"]);
5498
5499 // User A sees user B as their contact now in all clients, and the outgoing request to them is removed.
5500 let contacts_a = client_a.summarize_contacts(&cx_a);
5501 assert_eq!(contacts_a.current, &["user_a", "user_b"]);
5502 assert!(contacts_a.outgoing_requests.is_empty());
5503 let contacts_a2 = client_a2.summarize_contacts(&cx_a2);
5504 assert_eq!(contacts_a2.current, &["user_a", "user_b"]);
5505 assert!(contacts_a2.outgoing_requests.is_empty());
5506
5507 // Contacts are present upon connecting (tested here via disconnect/reconnect)
5508 disconnect_and_reconnect(&client_a, cx_a).await;
5509 disconnect_and_reconnect(&client_b, cx_b).await;
5510 disconnect_and_reconnect(&client_c, cx_c).await;
5511 executor.run_until_parked();
5512 assert_eq!(
5513 client_a.summarize_contacts(&cx_a).current,
5514 &["user_a", "user_b"]
5515 );
5516 assert_eq!(
5517 client_b.summarize_contacts(&cx_b).current,
5518 &["user_a", "user_b"]
5519 );
5520 assert_eq!(
5521 client_b.summarize_contacts(&cx_b).incoming_requests,
5522 &["user_c"]
5523 );
5524 assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5525 assert_eq!(
5526 client_c.summarize_contacts(&cx_c).outgoing_requests,
5527 &["user_b"]
5528 );
5529
5530 // User B rejects the request from user C.
5531 client_b
5532 .user_store
5533 .update(cx_b, |store, cx| {
5534 store.respond_to_contact_request(client_c.user_id().unwrap(), false, cx)
5535 })
5536 .await
5537 .unwrap();
5538
5539 executor.run_until_parked();
5540
5541 // User B doesn't see user C as their contact, and the incoming request from them is removed.
5542 let contacts_b = client_b.summarize_contacts(&cx_b);
5543 assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5544 assert!(contacts_b.incoming_requests.is_empty());
5545 let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5546 assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5547 assert!(contacts_b2.incoming_requests.is_empty());
5548
5549 // User C doesn't see user B as their contact, and the outgoing request to them is removed.
5550 let contacts_c = client_c.summarize_contacts(&cx_c);
5551 assert_eq!(contacts_c.current, &["user_c"]);
5552 assert!(contacts_c.outgoing_requests.is_empty());
5553 let contacts_c2 = client_c2.summarize_contacts(&cx_c2);
5554 assert_eq!(contacts_c2.current, &["user_c"]);
5555 assert!(contacts_c2.outgoing_requests.is_empty());
5556
5557 // Incoming/outgoing requests are not present upon connecting (tested here via disconnect/reconnect)
5558 disconnect_and_reconnect(&client_a, cx_a).await;
5559 disconnect_and_reconnect(&client_b, cx_b).await;
5560 disconnect_and_reconnect(&client_c, cx_c).await;
5561 executor.run_until_parked();
5562 assert_eq!(
5563 client_a.summarize_contacts(&cx_a).current,
5564 &["user_a", "user_b"]
5565 );
5566 assert_eq!(
5567 client_b.summarize_contacts(&cx_b).current,
5568 &["user_a", "user_b"]
5569 );
5570 assert!(client_b
5571 .summarize_contacts(&cx_b)
5572 .incoming_requests
5573 .is_empty());
5574 assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5575 assert!(client_c
5576 .summarize_contacts(&cx_c)
5577 .outgoing_requests
5578 .is_empty());
5579
5580 async fn disconnect_and_reconnect(client: &TestClient, cx: &mut TestAppContext) {
5581 client.disconnect(&cx.to_async()).unwrap();
5582 client.clear_contacts(cx).await;
5583 client
5584 .authenticate_and_connect(false, &cx.to_async())
5585 .await
5586 .unwrap();
5587 }
5588 }
5589
5590 #[gpui::test(iterations = 10)]
5591 async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5592 cx_a.foreground().forbid_parking();
5593 let fs = FakeFs::new(cx_a.background());
5594
5595 // 2 clients connect to a server.
5596 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5597 let mut client_a = server.create_client(cx_a, "user_a").await;
5598 let mut client_b = server.create_client(cx_b, "user_b").await;
5599 server
5600 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5601 .await;
5602 cx_a.update(editor::init);
5603 cx_b.update(editor::init);
5604
5605 // Client A shares a project.
5606 fs.insert_tree(
5607 "/a",
5608 json!({
5609 "1.txt": "one",
5610 "2.txt": "two",
5611 "3.txt": "three",
5612 }),
5613 )
5614 .await;
5615 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5616
5617 // Client B joins the project.
5618 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5619
5620 // Client A opens some editors.
5621 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5622 let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5623 let editor_a1 = workspace_a
5624 .update(cx_a, |workspace, cx| {
5625 workspace.open_path((worktree_id, "1.txt"), true, cx)
5626 })
5627 .await
5628 .unwrap()
5629 .downcast::<Editor>()
5630 .unwrap();
5631 let editor_a2 = workspace_a
5632 .update(cx_a, |workspace, cx| {
5633 workspace.open_path((worktree_id, "2.txt"), true, cx)
5634 })
5635 .await
5636 .unwrap()
5637 .downcast::<Editor>()
5638 .unwrap();
5639
5640 // Client B opens an editor.
5641 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5642 let editor_b1 = workspace_b
5643 .update(cx_b, |workspace, cx| {
5644 workspace.open_path((worktree_id, "1.txt"), true, cx)
5645 })
5646 .await
5647 .unwrap()
5648 .downcast::<Editor>()
5649 .unwrap();
5650
5651 let client_a_id = project_b.read_with(cx_b, |project, _| {
5652 project.collaborators().values().next().unwrap().peer_id
5653 });
5654 let client_b_id = project_a.read_with(cx_a, |project, _| {
5655 project.collaborators().values().next().unwrap().peer_id
5656 });
5657
5658 // When client B starts following client A, all visible view states are replicated to client B.
5659 editor_a1.update(cx_a, |editor, cx| {
5660 editor.change_selections(None, cx, |s| s.select_ranges([0..1]))
5661 });
5662 editor_a2.update(cx_a, |editor, cx| {
5663 editor.change_selections(None, cx, |s| s.select_ranges([2..3]))
5664 });
5665 workspace_b
5666 .update(cx_b, |workspace, cx| {
5667 workspace
5668 .toggle_follow(&ToggleFollow(client_a_id), cx)
5669 .unwrap()
5670 })
5671 .await
5672 .unwrap();
5673
5674 let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5675 workspace
5676 .active_item(cx)
5677 .unwrap()
5678 .downcast::<Editor>()
5679 .unwrap()
5680 });
5681 assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
5682 assert_eq!(
5683 editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
5684 Some((worktree_id, "2.txt").into())
5685 );
5686 assert_eq!(
5687 editor_b2.read_with(cx_b, |editor, cx| editor.selections.ranges(cx)),
5688 vec![2..3]
5689 );
5690 assert_eq!(
5691 editor_b1.read_with(cx_b, |editor, cx| editor.selections.ranges(cx)),
5692 vec![0..1]
5693 );
5694
5695 // When client A activates a different editor, client B does so as well.
5696 workspace_a.update(cx_a, |workspace, cx| {
5697 workspace.activate_item(&editor_a1, cx)
5698 });
5699 workspace_b
5700 .condition(cx_b, |workspace, cx| {
5701 workspace.active_item(cx).unwrap().id() == editor_b1.id()
5702 })
5703 .await;
5704
5705 // When client A navigates back and forth, client B does so as well.
5706 workspace_a
5707 .update(cx_a, |workspace, cx| {
5708 workspace::Pane::go_back(workspace, None, cx)
5709 })
5710 .await;
5711 workspace_b
5712 .condition(cx_b, |workspace, cx| {
5713 workspace.active_item(cx).unwrap().id() == editor_b2.id()
5714 })
5715 .await;
5716
5717 workspace_a
5718 .update(cx_a, |workspace, cx| {
5719 workspace::Pane::go_forward(workspace, None, cx)
5720 })
5721 .await;
5722 workspace_b
5723 .condition(cx_b, |workspace, cx| {
5724 workspace.active_item(cx).unwrap().id() == editor_b1.id()
5725 })
5726 .await;
5727
5728 // Changes to client A's editor are reflected on client B.
5729 editor_a1.update(cx_a, |editor, cx| {
5730 editor.change_selections(None, cx, |s| s.select_ranges([1..1, 2..2]));
5731 });
5732 editor_b1
5733 .condition(cx_b, |editor, cx| {
5734 editor.selections.ranges(cx) == vec![1..1, 2..2]
5735 })
5736 .await;
5737
5738 editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
5739 editor_b1
5740 .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
5741 .await;
5742
5743 editor_a1.update(cx_a, |editor, cx| {
5744 editor.change_selections(None, cx, |s| s.select_ranges([3..3]));
5745 editor.set_scroll_position(vec2f(0., 100.), cx);
5746 });
5747 editor_b1
5748 .condition(cx_b, |editor, cx| {
5749 editor.selections.ranges(cx) == vec![3..3]
5750 })
5751 .await;
5752
5753 // After unfollowing, client B stops receiving updates from client A.
5754 workspace_b.update(cx_b, |workspace, cx| {
5755 workspace.unfollow(&workspace.active_pane().clone(), cx)
5756 });
5757 workspace_a.update(cx_a, |workspace, cx| {
5758 workspace.activate_item(&editor_a2, cx)
5759 });
5760 cx_a.foreground().run_until_parked();
5761 assert_eq!(
5762 workspace_b.read_with(cx_b, |workspace, cx| workspace
5763 .active_item(cx)
5764 .unwrap()
5765 .id()),
5766 editor_b1.id()
5767 );
5768
5769 // Client A starts following client B.
5770 workspace_a
5771 .update(cx_a, |workspace, cx| {
5772 workspace
5773 .toggle_follow(&ToggleFollow(client_b_id), cx)
5774 .unwrap()
5775 })
5776 .await
5777 .unwrap();
5778 assert_eq!(
5779 workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5780 Some(client_b_id)
5781 );
5782 assert_eq!(
5783 workspace_a.read_with(cx_a, |workspace, cx| workspace
5784 .active_item(cx)
5785 .unwrap()
5786 .id()),
5787 editor_a1.id()
5788 );
5789
5790 // Following interrupts when client B disconnects.
5791 client_b.disconnect(&cx_b.to_async()).unwrap();
5792 cx_a.foreground().run_until_parked();
5793 assert_eq!(
5794 workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5795 None
5796 );
5797 }
5798
5799 #[gpui::test(iterations = 10)]
5800 async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5801 cx_a.foreground().forbid_parking();
5802 let fs = FakeFs::new(cx_a.background());
5803
5804 // 2 clients connect to a server.
5805 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5806 let mut client_a = server.create_client(cx_a, "user_a").await;
5807 let mut client_b = server.create_client(cx_b, "user_b").await;
5808 server
5809 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5810 .await;
5811 cx_a.update(editor::init);
5812 cx_b.update(editor::init);
5813
5814 // Client A shares a project.
5815 fs.insert_tree(
5816 "/a",
5817 json!({
5818 "1.txt": "one",
5819 "2.txt": "two",
5820 "3.txt": "three",
5821 "4.txt": "four",
5822 }),
5823 )
5824 .await;
5825 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5826
5827 // Client B joins the project.
5828 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5829
5830 // Client A opens some editors.
5831 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5832 let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5833 let _editor_a1 = workspace_a
5834 .update(cx_a, |workspace, cx| {
5835 workspace.open_path((worktree_id, "1.txt"), true, cx)
5836 })
5837 .await
5838 .unwrap()
5839 .downcast::<Editor>()
5840 .unwrap();
5841
5842 // Client B opens an editor.
5843 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5844 let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5845 let _editor_b1 = workspace_b
5846 .update(cx_b, |workspace, cx| {
5847 workspace.open_path((worktree_id, "2.txt"), true, cx)
5848 })
5849 .await
5850 .unwrap()
5851 .downcast::<Editor>()
5852 .unwrap();
5853
5854 // Clients A and B follow each other in split panes
5855 workspace_a
5856 .update(cx_a, |workspace, cx| {
5857 workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5858 assert_ne!(*workspace.active_pane(), pane_a1);
5859 });
5860 workspace_a
5861 .update(cx_a, |workspace, cx| {
5862 let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
5863 workspace
5864 .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5865 .unwrap()
5866 })
5867 .await
5868 .unwrap();
5869 workspace_b
5870 .update(cx_b, |workspace, cx| {
5871 workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5872 assert_ne!(*workspace.active_pane(), pane_b1);
5873 });
5874 workspace_b
5875 .update(cx_b, |workspace, cx| {
5876 let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
5877 workspace
5878 .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5879 .unwrap()
5880 })
5881 .await
5882 .unwrap();
5883
5884 workspace_a
5885 .update(cx_a, |workspace, cx| {
5886 workspace.activate_next_pane(cx);
5887 assert_eq!(*workspace.active_pane(), pane_a1);
5888 workspace.open_path((worktree_id, "3.txt"), true, cx)
5889 })
5890 .await
5891 .unwrap();
5892 workspace_b
5893 .update(cx_b, |workspace, cx| {
5894 workspace.activate_next_pane(cx);
5895 assert_eq!(*workspace.active_pane(), pane_b1);
5896 workspace.open_path((worktree_id, "4.txt"), true, cx)
5897 })
5898 .await
5899 .unwrap();
5900 cx_a.foreground().run_until_parked();
5901
5902 // Ensure leader updates don't change the active pane of followers
5903 workspace_a.read_with(cx_a, |workspace, _| {
5904 assert_eq!(*workspace.active_pane(), pane_a1);
5905 });
5906 workspace_b.read_with(cx_b, |workspace, _| {
5907 assert_eq!(*workspace.active_pane(), pane_b1);
5908 });
5909
5910 // Ensure peers following each other doesn't cause an infinite loop.
5911 assert_eq!(
5912 workspace_a.read_with(cx_a, |workspace, cx| workspace
5913 .active_item(cx)
5914 .unwrap()
5915 .project_path(cx)),
5916 Some((worktree_id, "3.txt").into())
5917 );
5918 workspace_a.update(cx_a, |workspace, cx| {
5919 assert_eq!(
5920 workspace.active_item(cx).unwrap().project_path(cx),
5921 Some((worktree_id, "3.txt").into())
5922 );
5923 workspace.activate_next_pane(cx);
5924 assert_eq!(
5925 workspace.active_item(cx).unwrap().project_path(cx),
5926 Some((worktree_id, "4.txt").into())
5927 );
5928 });
5929 workspace_b.update(cx_b, |workspace, cx| {
5930 assert_eq!(
5931 workspace.active_item(cx).unwrap().project_path(cx),
5932 Some((worktree_id, "4.txt").into())
5933 );
5934 workspace.activate_next_pane(cx);
5935 assert_eq!(
5936 workspace.active_item(cx).unwrap().project_path(cx),
5937 Some((worktree_id, "3.txt").into())
5938 );
5939 });
5940 }
5941
5942 #[gpui::test(iterations = 10)]
5943 async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5944 cx_a.foreground().forbid_parking();
5945 let fs = FakeFs::new(cx_a.background());
5946
5947 // 2 clients connect to a server.
5948 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5949 let mut client_a = server.create_client(cx_a, "user_a").await;
5950 let mut client_b = server.create_client(cx_b, "user_b").await;
5951 server
5952 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5953 .await;
5954 cx_a.update(editor::init);
5955 cx_b.update(editor::init);
5956
5957 // Client A shares a project.
5958 fs.insert_tree(
5959 "/a",
5960 json!({
5961 "1.txt": "one",
5962 "2.txt": "two",
5963 "3.txt": "three",
5964 }),
5965 )
5966 .await;
5967 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5968
5969 // Client B joins the project.
5970 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5971
5972 // Client A opens some editors.
5973 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5974 let _editor_a1 = workspace_a
5975 .update(cx_a, |workspace, cx| {
5976 workspace.open_path((worktree_id, "1.txt"), true, cx)
5977 })
5978 .await
5979 .unwrap()
5980 .downcast::<Editor>()
5981 .unwrap();
5982
5983 // Client B starts following client A.
5984 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5985 let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5986 let leader_id = project_b.read_with(cx_b, |project, _| {
5987 project.collaborators().values().next().unwrap().peer_id
5988 });
5989 workspace_b
5990 .update(cx_b, |workspace, cx| {
5991 workspace
5992 .toggle_follow(&ToggleFollow(leader_id), cx)
5993 .unwrap()
5994 })
5995 .await
5996 .unwrap();
5997 assert_eq!(
5998 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5999 Some(leader_id)
6000 );
6001 let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
6002 workspace
6003 .active_item(cx)
6004 .unwrap()
6005 .downcast::<Editor>()
6006 .unwrap()
6007 });
6008
6009 // When client B moves, it automatically stops following client A.
6010 editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
6011 assert_eq!(
6012 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6013 None
6014 );
6015
6016 workspace_b
6017 .update(cx_b, |workspace, cx| {
6018 workspace
6019 .toggle_follow(&ToggleFollow(leader_id), cx)
6020 .unwrap()
6021 })
6022 .await
6023 .unwrap();
6024 assert_eq!(
6025 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6026 Some(leader_id)
6027 );
6028
6029 // When client B edits, it automatically stops following client A.
6030 editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
6031 assert_eq!(
6032 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6033 None
6034 );
6035
6036 workspace_b
6037 .update(cx_b, |workspace, cx| {
6038 workspace
6039 .toggle_follow(&ToggleFollow(leader_id), cx)
6040 .unwrap()
6041 })
6042 .await
6043 .unwrap();
6044 assert_eq!(
6045 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6046 Some(leader_id)
6047 );
6048
6049 // When client B scrolls, it automatically stops following client A.
6050 editor_b2.update(cx_b, |editor, cx| {
6051 editor.set_scroll_position(vec2f(0., 3.), cx)
6052 });
6053 assert_eq!(
6054 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6055 None
6056 );
6057
6058 workspace_b
6059 .update(cx_b, |workspace, cx| {
6060 workspace
6061 .toggle_follow(&ToggleFollow(leader_id), cx)
6062 .unwrap()
6063 })
6064 .await
6065 .unwrap();
6066 assert_eq!(
6067 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6068 Some(leader_id)
6069 );
6070
6071 // When client B activates a different pane, it continues following client A in the original pane.
6072 workspace_b.update(cx_b, |workspace, cx| {
6073 workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
6074 });
6075 assert_eq!(
6076 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6077 Some(leader_id)
6078 );
6079
6080 workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
6081 assert_eq!(
6082 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6083 Some(leader_id)
6084 );
6085
6086 // When client B activates a different item in the original pane, it automatically stops following client A.
6087 workspace_b
6088 .update(cx_b, |workspace, cx| {
6089 workspace.open_path((worktree_id, "2.txt"), true, cx)
6090 })
6091 .await
6092 .unwrap();
6093 assert_eq!(
6094 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6095 None
6096 );
6097 }
6098
6099 #[gpui::test(iterations = 100)]
6100 async fn test_random_collaboration(
6101 cx: &mut TestAppContext,
6102 deterministic: Arc<Deterministic>,
6103 rng: StdRng,
6104 ) {
6105 cx.foreground().forbid_parking();
6106 let max_peers = env::var("MAX_PEERS")
6107 .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
6108 .unwrap_or(5);
6109 assert!(max_peers <= 5);
6110
6111 let max_operations = env::var("OPERATIONS")
6112 .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
6113 .unwrap_or(10);
6114
6115 let rng = Arc::new(Mutex::new(rng));
6116
6117 let guest_lang_registry = Arc::new(LanguageRegistry::test());
6118 let host_language_registry = Arc::new(LanguageRegistry::test());
6119
6120 let fs = FakeFs::new(cx.background());
6121 fs.insert_tree("/_collab", json!({"init": ""})).await;
6122
6123 let mut server = TestServer::start(cx.foreground(), cx.background()).await;
6124 let db = server.app_state.db.clone();
6125 let host_user_id = db.create_user("host", None, false).await.unwrap();
6126 for username in ["guest-1", "guest-2", "guest-3", "guest-4"] {
6127 let guest_user_id = db.create_user(username, None, false).await.unwrap();
6128 server
6129 .app_state
6130 .db
6131 .send_contact_request(guest_user_id, host_user_id)
6132 .await
6133 .unwrap();
6134 server
6135 .app_state
6136 .db
6137 .respond_to_contact_request(host_user_id, guest_user_id, true)
6138 .await
6139 .unwrap();
6140 }
6141
6142 let mut clients = Vec::new();
6143 let mut user_ids = Vec::new();
6144 let mut op_start_signals = Vec::new();
6145
6146 let mut next_entity_id = 100000;
6147 let mut host_cx = TestAppContext::new(
6148 cx.foreground_platform(),
6149 cx.platform(),
6150 deterministic.build_foreground(next_entity_id),
6151 deterministic.build_background(),
6152 cx.font_cache(),
6153 cx.leak_detector(),
6154 next_entity_id,
6155 );
6156 let host = server.create_client(&mut host_cx, "host").await;
6157 let host_project = host_cx.update(|cx| {
6158 Project::local(
6159 host.client.clone(),
6160 host.user_store.clone(),
6161 host_language_registry.clone(),
6162 fs.clone(),
6163 cx,
6164 )
6165 });
6166 let host_project_id = host_project
6167 .update(&mut host_cx, |p, _| p.next_remote_id())
6168 .await;
6169
6170 let (collab_worktree, _) = host_project
6171 .update(&mut host_cx, |project, cx| {
6172 project.find_or_create_local_worktree("/_collab", true, cx)
6173 })
6174 .await
6175 .unwrap();
6176 collab_worktree
6177 .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
6178 .await;
6179
6180 // Set up fake language servers.
6181 let mut language = Language::new(
6182 LanguageConfig {
6183 name: "Rust".into(),
6184 path_suffixes: vec!["rs".to_string()],
6185 ..Default::default()
6186 },
6187 None,
6188 );
6189 let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
6190 name: "the-fake-language-server",
6191 capabilities: lsp::LanguageServer::full_capabilities(),
6192 initializer: Some(Box::new({
6193 let rng = rng.clone();
6194 let fs = fs.clone();
6195 let project = host_project.downgrade();
6196 move |fake_server: &mut FakeLanguageServer| {
6197 fake_server.handle_request::<lsp::request::Completion, _, _>(
6198 |_, _| async move {
6199 Ok(Some(lsp::CompletionResponse::Array(vec![
6200 lsp::CompletionItem {
6201 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
6202 range: lsp::Range::new(
6203 lsp::Position::new(0, 0),
6204 lsp::Position::new(0, 0),
6205 ),
6206 new_text: "the-new-text".to_string(),
6207 })),
6208 ..Default::default()
6209 },
6210 ])))
6211 },
6212 );
6213
6214 fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
6215 |_, _| async move {
6216 Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
6217 lsp::CodeAction {
6218 title: "the-code-action".to_string(),
6219 ..Default::default()
6220 },
6221 )]))
6222 },
6223 );
6224
6225 fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
6226 |params, _| async move {
6227 Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
6228 params.position,
6229 params.position,
6230 ))))
6231 },
6232 );
6233
6234 fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
6235 let fs = fs.clone();
6236 let rng = rng.clone();
6237 move |_, _| {
6238 let fs = fs.clone();
6239 let rng = rng.clone();
6240 async move {
6241 let files = fs.files().await;
6242 let mut rng = rng.lock();
6243 let count = rng.gen_range::<usize, _>(1..3);
6244 let files = (0..count)
6245 .map(|_| files.choose(&mut *rng).unwrap())
6246 .collect::<Vec<_>>();
6247 log::info!("LSP: Returning definitions in files {:?}", &files);
6248 Ok(Some(lsp::GotoDefinitionResponse::Array(
6249 files
6250 .into_iter()
6251 .map(|file| lsp::Location {
6252 uri: lsp::Url::from_file_path(file).unwrap(),
6253 range: Default::default(),
6254 })
6255 .collect(),
6256 )))
6257 }
6258 }
6259 });
6260
6261 fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
6262 let rng = rng.clone();
6263 let project = project.clone();
6264 move |params, mut cx| {
6265 let highlights = if let Some(project) = project.upgrade(&cx) {
6266 project.update(&mut cx, |project, cx| {
6267 let path = params
6268 .text_document_position_params
6269 .text_document
6270 .uri
6271 .to_file_path()
6272 .unwrap();
6273 let (worktree, relative_path) =
6274 project.find_local_worktree(&path, cx)?;
6275 let project_path =
6276 ProjectPath::from((worktree.read(cx).id(), relative_path));
6277 let buffer =
6278 project.get_open_buffer(&project_path, cx)?.read(cx);
6279
6280 let mut highlights = Vec::new();
6281 let highlight_count = rng.lock().gen_range(1..=5);
6282 let mut prev_end = 0;
6283 for _ in 0..highlight_count {
6284 let range =
6285 buffer.random_byte_range(prev_end, &mut *rng.lock());
6286
6287 highlights.push(lsp::DocumentHighlight {
6288 range: range_to_lsp(range.to_point_utf16(buffer)),
6289 kind: Some(lsp::DocumentHighlightKind::READ),
6290 });
6291 prev_end = range.end;
6292 }
6293 Some(highlights)
6294 })
6295 } else {
6296 None
6297 };
6298 async move { Ok(highlights) }
6299 }
6300 });
6301 }
6302 })),
6303 ..Default::default()
6304 });
6305 host_language_registry.add(Arc::new(language));
6306
6307 let op_start_signal = futures::channel::mpsc::unbounded();
6308 user_ids.push(host.current_user_id(&host_cx));
6309 op_start_signals.push(op_start_signal.0);
6310 clients.push(host_cx.foreground().spawn(host.simulate_host(
6311 host_project,
6312 op_start_signal.1,
6313 rng.clone(),
6314 host_cx,
6315 )));
6316
6317 let disconnect_host_at = if rng.lock().gen_bool(0.2) {
6318 rng.lock().gen_range(0..max_operations)
6319 } else {
6320 max_operations
6321 };
6322 let mut available_guests = vec![
6323 "guest-1".to_string(),
6324 "guest-2".to_string(),
6325 "guest-3".to_string(),
6326 "guest-4".to_string(),
6327 ];
6328 let mut operations = 0;
6329 while operations < max_operations {
6330 if operations == disconnect_host_at {
6331 server.disconnect_client(user_ids[0]);
6332 cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6333 drop(op_start_signals);
6334 let mut clients = futures::future::join_all(clients).await;
6335 cx.foreground().run_until_parked();
6336
6337 let (host, mut host_cx, host_err) = clients.remove(0);
6338 if let Some(host_err) = host_err {
6339 log::error!("host error - {:?}", host_err);
6340 }
6341 host.project
6342 .as_ref()
6343 .unwrap()
6344 .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
6345 for (guest, mut guest_cx, guest_err) in clients {
6346 if let Some(guest_err) = guest_err {
6347 log::error!("{} error - {:?}", guest.username, guest_err);
6348 }
6349
6350 let contacts = server
6351 .app_state
6352 .db
6353 .get_contacts(guest.current_user_id(&guest_cx))
6354 .await
6355 .unwrap();
6356 let contacts = server
6357 .store
6358 .read()
6359 .await
6360 .build_initial_contacts_update(contacts)
6361 .contacts;
6362 assert!(!contacts
6363 .iter()
6364 .flat_map(|contact| &contact.projects)
6365 .any(|project| project.id == host_project_id));
6366 guest
6367 .project
6368 .as_ref()
6369 .unwrap()
6370 .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6371 guest_cx.update(|_| drop(guest));
6372 }
6373 host_cx.update(|_| drop(host));
6374
6375 return;
6376 }
6377
6378 let distribution = rng.lock().gen_range(0..100);
6379 match distribution {
6380 0..=19 if !available_guests.is_empty() => {
6381 let guest_ix = rng.lock().gen_range(0..available_guests.len());
6382 let guest_username = available_guests.remove(guest_ix);
6383 log::info!("Adding new connection for {}", guest_username);
6384 next_entity_id += 100000;
6385 let mut guest_cx = TestAppContext::new(
6386 cx.foreground_platform(),
6387 cx.platform(),
6388 deterministic.build_foreground(next_entity_id),
6389 deterministic.build_background(),
6390 cx.font_cache(),
6391 cx.leak_detector(),
6392 next_entity_id,
6393 );
6394 let guest = server.create_client(&mut guest_cx, &guest_username).await;
6395 let guest_project = Project::remote(
6396 host_project_id,
6397 guest.client.clone(),
6398 guest.user_store.clone(),
6399 guest_lang_registry.clone(),
6400 FakeFs::new(cx.background()),
6401 &mut guest_cx.to_async(),
6402 )
6403 .await
6404 .unwrap();
6405 let op_start_signal = futures::channel::mpsc::unbounded();
6406 user_ids.push(guest.current_user_id(&guest_cx));
6407 op_start_signals.push(op_start_signal.0);
6408 clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
6409 guest_username.clone(),
6410 guest_project,
6411 op_start_signal.1,
6412 rng.clone(),
6413 guest_cx,
6414 )));
6415
6416 log::info!("Added connection for {}", guest_username);
6417 operations += 1;
6418 }
6419 20..=29 if clients.len() > 1 => {
6420 let guest_ix = rng.lock().gen_range(1..clients.len());
6421 log::info!("Removing guest {}", user_ids[guest_ix]);
6422 let removed_guest_id = user_ids.remove(guest_ix);
6423 let guest = clients.remove(guest_ix);
6424 op_start_signals.remove(guest_ix);
6425 server.forbid_connections();
6426 server.disconnect_client(removed_guest_id);
6427 cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6428 let (guest, mut guest_cx, guest_err) = guest.await;
6429 server.allow_connections();
6430
6431 if let Some(guest_err) = guest_err {
6432 log::error!("{} error - {:?}", guest.username, guest_err);
6433 }
6434 guest
6435 .project
6436 .as_ref()
6437 .unwrap()
6438 .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6439 for user_id in &user_ids {
6440 let contacts = server.app_state.db.get_contacts(*user_id).await.unwrap();
6441 let contacts = server
6442 .store
6443 .read()
6444 .await
6445 .build_initial_contacts_update(contacts)
6446 .contacts;
6447 for contact in contacts {
6448 if contact.online {
6449 assert_ne!(
6450 contact.user_id, removed_guest_id.0 as u64,
6451 "removed guest is still a contact of another peer"
6452 );
6453 }
6454 for project in contact.projects {
6455 for project_guest_id in project.guests {
6456 assert_ne!(
6457 project_guest_id, removed_guest_id.0 as u64,
6458 "removed guest appears as still participating on a project"
6459 );
6460 }
6461 }
6462 }
6463 }
6464
6465 log::info!("{} removed", guest.username);
6466 available_guests.push(guest.username.clone());
6467 guest_cx.update(|_| drop(guest));
6468
6469 operations += 1;
6470 }
6471 _ => {
6472 while operations < max_operations && rng.lock().gen_bool(0.7) {
6473 op_start_signals
6474 .choose(&mut *rng.lock())
6475 .unwrap()
6476 .unbounded_send(())
6477 .unwrap();
6478 operations += 1;
6479 }
6480
6481 if rng.lock().gen_bool(0.8) {
6482 cx.foreground().run_until_parked();
6483 }
6484 }
6485 }
6486 }
6487
6488 drop(op_start_signals);
6489 let mut clients = futures::future::join_all(clients).await;
6490 cx.foreground().run_until_parked();
6491
6492 let (host_client, mut host_cx, host_err) = clients.remove(0);
6493 if let Some(host_err) = host_err {
6494 panic!("host error - {:?}", host_err);
6495 }
6496 let host_project = host_client.project.as_ref().unwrap();
6497 let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
6498 project
6499 .worktrees(cx)
6500 .map(|worktree| {
6501 let snapshot = worktree.read(cx).snapshot();
6502 (snapshot.id(), snapshot)
6503 })
6504 .collect::<BTreeMap<_, _>>()
6505 });
6506
6507 host_client
6508 .project
6509 .as_ref()
6510 .unwrap()
6511 .read_with(&host_cx, |project, cx| project.check_invariants(cx));
6512
6513 for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
6514 if let Some(guest_err) = guest_err {
6515 panic!("{} error - {:?}", guest_client.username, guest_err);
6516 }
6517 let worktree_snapshots =
6518 guest_client
6519 .project
6520 .as_ref()
6521 .unwrap()
6522 .read_with(&guest_cx, |project, cx| {
6523 project
6524 .worktrees(cx)
6525 .map(|worktree| {
6526 let worktree = worktree.read(cx);
6527 (worktree.id(), worktree.snapshot())
6528 })
6529 .collect::<BTreeMap<_, _>>()
6530 });
6531
6532 assert_eq!(
6533 worktree_snapshots.keys().collect::<Vec<_>>(),
6534 host_worktree_snapshots.keys().collect::<Vec<_>>(),
6535 "{} has different worktrees than the host",
6536 guest_client.username
6537 );
6538 for (id, host_snapshot) in &host_worktree_snapshots {
6539 let guest_snapshot = &worktree_snapshots[id];
6540 assert_eq!(
6541 guest_snapshot.root_name(),
6542 host_snapshot.root_name(),
6543 "{} has different root name than the host for worktree {}",
6544 guest_client.username,
6545 id
6546 );
6547 assert_eq!(
6548 guest_snapshot.entries(false).collect::<Vec<_>>(),
6549 host_snapshot.entries(false).collect::<Vec<_>>(),
6550 "{} has different snapshot than the host for worktree {}",
6551 guest_client.username,
6552 id
6553 );
6554 assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id());
6555 }
6556
6557 guest_client
6558 .project
6559 .as_ref()
6560 .unwrap()
6561 .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
6562
6563 for guest_buffer in &guest_client.buffers {
6564 let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
6565 let host_buffer = host_project.read_with(&host_cx, |project, cx| {
6566 project.buffer_for_id(buffer_id, cx).expect(&format!(
6567 "host does not have buffer for guest:{}, peer:{}, id:{}",
6568 guest_client.username, guest_client.peer_id, buffer_id
6569 ))
6570 });
6571 let path = host_buffer
6572 .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
6573
6574 assert_eq!(
6575 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
6576 0,
6577 "{}, buffer {}, path {:?} has deferred operations",
6578 guest_client.username,
6579 buffer_id,
6580 path,
6581 );
6582 assert_eq!(
6583 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
6584 host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
6585 "{}, buffer {}, path {:?}, differs from the host's buffer",
6586 guest_client.username,
6587 buffer_id,
6588 path
6589 );
6590 }
6591
6592 guest_cx.update(|_| drop(guest_client));
6593 }
6594
6595 host_cx.update(|_| drop(host_client));
6596 }
6597
6598 struct TestServer {
6599 peer: Arc<Peer>,
6600 app_state: Arc<AppState>,
6601 server: Arc<Server>,
6602 foreground: Rc<executor::Foreground>,
6603 notifications: mpsc::UnboundedReceiver<()>,
6604 connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
6605 forbid_connections: Arc<AtomicBool>,
6606 _test_db: TestDb,
6607 }
6608
6609 impl TestServer {
6610 async fn start(
6611 foreground: Rc<executor::Foreground>,
6612 background: Arc<executor::Background>,
6613 ) -> Self {
6614 let test_db = TestDb::fake(background);
6615 let app_state = Self::build_app_state(&test_db).await;
6616 let peer = Peer::new();
6617 let notifications = mpsc::unbounded();
6618 let server = Server::new(app_state.clone(), Some(notifications.0));
6619 Self {
6620 peer,
6621 app_state,
6622 server,
6623 foreground,
6624 notifications: notifications.1,
6625 connection_killers: Default::default(),
6626 forbid_connections: Default::default(),
6627 _test_db: test_db,
6628 }
6629 }
6630
6631 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
6632 cx.update(|cx| {
6633 let settings = Settings::test(cx);
6634 cx.set_global(settings);
6635 });
6636
6637 let http = FakeHttpClient::with_404_response();
6638 let user_id =
6639 if let Ok(Some(user)) = self.app_state.db.get_user_by_github_login(name).await {
6640 user.id
6641 } else {
6642 self.app_state.db.create_user(name, None, false).await.unwrap()
6643 };
6644 let client_name = name.to_string();
6645 let mut client = Client::new(http.clone());
6646 let server = self.server.clone();
6647 let db = self.app_state.db.clone();
6648 let connection_killers = self.connection_killers.clone();
6649 let forbid_connections = self.forbid_connections.clone();
6650 let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
6651
6652 Arc::get_mut(&mut client)
6653 .unwrap()
6654 .override_authenticate(move |cx| {
6655 cx.spawn(|_| async move {
6656 let access_token = "the-token".to_string();
6657 Ok(Credentials {
6658 user_id: user_id.0 as u64,
6659 access_token,
6660 })
6661 })
6662 })
6663 .override_establish_connection(move |credentials, cx| {
6664 assert_eq!(credentials.user_id, user_id.0 as u64);
6665 assert_eq!(credentials.access_token, "the-token");
6666
6667 let server = server.clone();
6668 let db = db.clone();
6669 let connection_killers = connection_killers.clone();
6670 let forbid_connections = forbid_connections.clone();
6671 let client_name = client_name.clone();
6672 let connection_id_tx = connection_id_tx.clone();
6673 cx.spawn(move |cx| async move {
6674 if forbid_connections.load(SeqCst) {
6675 Err(EstablishConnectionError::other(anyhow!(
6676 "server is forbidding connections"
6677 )))
6678 } else {
6679 let (client_conn, server_conn, killed) =
6680 Connection::in_memory(cx.background());
6681 connection_killers.lock().insert(user_id, killed);
6682 let user = db.get_user_by_id(user_id).await.unwrap().unwrap();
6683 cx.background()
6684 .spawn(server.handle_connection(
6685 server_conn,
6686 client_name,
6687 user,
6688 Some(connection_id_tx),
6689 cx.background(),
6690 ))
6691 .detach();
6692 Ok(client_conn)
6693 }
6694 })
6695 });
6696
6697 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
6698 let app_state = Arc::new(workspace::AppState {
6699 client: client.clone(),
6700 user_store: user_store.clone(),
6701 languages: Arc::new(LanguageRegistry::new(Task::ready(()))),
6702 themes: ThemeRegistry::new((), cx.font_cache()),
6703 fs: FakeFs::new(cx.background()),
6704 build_window_options: || Default::default(),
6705 initialize_workspace: |_, _, _| unimplemented!(),
6706 });
6707
6708 Channel::init(&client);
6709 Project::init(&client);
6710 cx.update(|cx| workspace::init(app_state.clone(), cx));
6711
6712 client
6713 .authenticate_and_connect(false, &cx.to_async())
6714 .await
6715 .unwrap();
6716 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
6717
6718 let client = TestClient {
6719 client,
6720 peer_id,
6721 username: name.to_string(),
6722 user_store,
6723 language_registry: Arc::new(LanguageRegistry::test()),
6724 project: Default::default(),
6725 buffers: Default::default(),
6726 };
6727 client.wait_for_current_user(cx).await;
6728 client
6729 }
6730
6731 fn disconnect_client(&self, user_id: UserId) {
6732 self.connection_killers
6733 .lock()
6734 .remove(&user_id)
6735 .unwrap()
6736 .store(true, SeqCst);
6737 }
6738
6739 fn forbid_connections(&self) {
6740 self.forbid_connections.store(true, SeqCst);
6741 }
6742
6743 fn allow_connections(&self) {
6744 self.forbid_connections.store(false, SeqCst);
6745 }
6746
6747 async fn make_contacts(&self, mut clients: Vec<(&TestClient, &mut TestAppContext)>) {
6748 while let Some((client_a, cx_a)) = clients.pop() {
6749 for (client_b, cx_b) in &mut clients {
6750 client_a
6751 .user_store
6752 .update(cx_a, |store, cx| {
6753 store.request_contact(client_b.user_id().unwrap(), cx)
6754 })
6755 .await
6756 .unwrap();
6757 cx_a.foreground().run_until_parked();
6758 client_b
6759 .user_store
6760 .update(*cx_b, |store, cx| {
6761 store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
6762 })
6763 .await
6764 .unwrap();
6765 }
6766 }
6767 }
6768
6769 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
6770 Arc::new(AppState {
6771 db: test_db.db().clone(),
6772 api_token: Default::default(),
6773 invite_link_prefix: Default::default(),
6774 })
6775 }
6776
6777 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
6778 self.server.store.read().await
6779 }
6780
6781 async fn condition<F>(&mut self, mut predicate: F)
6782 where
6783 F: FnMut(&Store) -> bool,
6784 {
6785 assert!(
6786 self.foreground.parking_forbidden(),
6787 "you must call forbid_parking to use server conditions so we don't block indefinitely"
6788 );
6789 while !(predicate)(&*self.server.store.read().await) {
6790 self.foreground.start_waiting();
6791 self.notifications.next().await;
6792 self.foreground.finish_waiting();
6793 }
6794 }
6795 }
6796
6797 impl Deref for TestServer {
6798 type Target = Server;
6799
6800 fn deref(&self) -> &Self::Target {
6801 &self.server
6802 }
6803 }
6804
6805 impl Drop for TestServer {
6806 fn drop(&mut self) {
6807 self.peer.reset();
6808 }
6809 }
6810
6811 struct TestClient {
6812 client: Arc<Client>,
6813 username: String,
6814 pub peer_id: PeerId,
6815 pub user_store: ModelHandle<UserStore>,
6816 language_registry: Arc<LanguageRegistry>,
6817 project: Option<ModelHandle<Project>>,
6818 buffers: HashSet<ModelHandle<language::Buffer>>,
6819 }
6820
6821 impl Deref for TestClient {
6822 type Target = Arc<Client>;
6823
6824 fn deref(&self) -> &Self::Target {
6825 &self.client
6826 }
6827 }
6828
6829 struct ContactsSummary {
6830 pub current: Vec<String>,
6831 pub outgoing_requests: Vec<String>,
6832 pub incoming_requests: Vec<String>,
6833 }
6834
6835 impl TestClient {
6836 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
6837 UserId::from_proto(
6838 self.user_store
6839 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
6840 )
6841 }
6842
6843 async fn wait_for_current_user(&self, cx: &TestAppContext) {
6844 let mut authed_user = self
6845 .user_store
6846 .read_with(cx, |user_store, _| user_store.watch_current_user());
6847 while authed_user.next().await.unwrap().is_none() {}
6848 }
6849
6850 async fn clear_contacts(&self, cx: &mut TestAppContext) {
6851 self.user_store
6852 .update(cx, |store, _| store.clear_contacts())
6853 .await;
6854 }
6855
6856 fn summarize_contacts(&self, cx: &TestAppContext) -> ContactsSummary {
6857 self.user_store.read_with(cx, |store, _| ContactsSummary {
6858 current: store
6859 .contacts()
6860 .iter()
6861 .map(|contact| contact.user.github_login.clone())
6862 .collect(),
6863 outgoing_requests: store
6864 .outgoing_contact_requests()
6865 .iter()
6866 .map(|user| user.github_login.clone())
6867 .collect(),
6868 incoming_requests: store
6869 .incoming_contact_requests()
6870 .iter()
6871 .map(|user| user.github_login.clone())
6872 .collect(),
6873 })
6874 }
6875
6876 async fn build_local_project(
6877 &mut self,
6878 fs: Arc<FakeFs>,
6879 root_path: impl AsRef<Path>,
6880 cx: &mut TestAppContext,
6881 ) -> (ModelHandle<Project>, WorktreeId) {
6882 let project = cx.update(|cx| {
6883 Project::local(
6884 self.client.clone(),
6885 self.user_store.clone(),
6886 self.language_registry.clone(),
6887 fs,
6888 cx,
6889 )
6890 });
6891 self.project = Some(project.clone());
6892 let (worktree, _) = project
6893 .update(cx, |p, cx| {
6894 p.find_or_create_local_worktree(root_path, true, cx)
6895 })
6896 .await
6897 .unwrap();
6898 worktree
6899 .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
6900 .await;
6901 project
6902 .update(cx, |project, _| project.next_remote_id())
6903 .await;
6904 (project, worktree.read_with(cx, |tree, _| tree.id()))
6905 }
6906
6907 async fn build_remote_project(
6908 &mut self,
6909 host_project: &ModelHandle<Project>,
6910 host_cx: &mut TestAppContext,
6911 guest_cx: &mut TestAppContext,
6912 ) -> ModelHandle<Project> {
6913 let host_project_id = host_project
6914 .read_with(host_cx, |project, _| project.next_remote_id())
6915 .await;
6916 let guest_user_id = self.user_id().unwrap();
6917 let languages =
6918 host_project.read_with(host_cx, |project, _| project.languages().clone());
6919 let project_b = guest_cx.spawn(|mut cx| {
6920 let user_store = self.user_store.clone();
6921 let guest_client = self.client.clone();
6922 async move {
6923 Project::remote(
6924 host_project_id,
6925 guest_client,
6926 user_store.clone(),
6927 languages,
6928 FakeFs::new(cx.background()),
6929 &mut cx,
6930 )
6931 .await
6932 .unwrap()
6933 }
6934 });
6935 host_cx.foreground().run_until_parked();
6936 host_project.update(host_cx, |project, cx| {
6937 project.respond_to_join_request(guest_user_id, true, cx)
6938 });
6939 let project = project_b.await;
6940 self.project = Some(project.clone());
6941 project
6942 }
6943
6944 fn build_workspace(
6945 &self,
6946 project: &ModelHandle<Project>,
6947 cx: &mut TestAppContext,
6948 ) -> ViewHandle<Workspace> {
6949 let (window_id, _) = cx.add_window(|_| EmptyView);
6950 cx.add_view(window_id, |cx| Workspace::new(project.clone(), cx))
6951 }
6952
6953 async fn simulate_host(
6954 mut self,
6955 project: ModelHandle<Project>,
6956 op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6957 rng: Arc<Mutex<StdRng>>,
6958 mut cx: TestAppContext,
6959 ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6960 async fn simulate_host_internal(
6961 client: &mut TestClient,
6962 project: ModelHandle<Project>,
6963 mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6964 rng: Arc<Mutex<StdRng>>,
6965 cx: &mut TestAppContext,
6966 ) -> anyhow::Result<()> {
6967 let fs = project.read_with(cx, |project, _| project.fs().clone());
6968
6969 cx.update(|cx| {
6970 cx.subscribe(&project, move |project, event, cx| {
6971 if let project::Event::ContactRequestedJoin(user) = event {
6972 log::info!("Host: accepting join request from {}", user.github_login);
6973 project.update(cx, |project, cx| {
6974 project.respond_to_join_request(user.id, true, cx)
6975 });
6976 }
6977 })
6978 .detach();
6979 });
6980
6981 while op_start_signal.next().await.is_some() {
6982 let distribution = rng.lock().gen_range::<usize, _>(0..100);
6983 let files = fs.as_fake().files().await;
6984 match distribution {
6985 0..=19 if !files.is_empty() => {
6986 let path = files.choose(&mut *rng.lock()).unwrap();
6987 let mut path = path.as_path();
6988 while let Some(parent_path) = path.parent() {
6989 path = parent_path;
6990 if rng.lock().gen() {
6991 break;
6992 }
6993 }
6994
6995 log::info!("Host: find/create local worktree {:?}", path);
6996 let find_or_create_worktree = project.update(cx, |project, cx| {
6997 project.find_or_create_local_worktree(path, true, cx)
6998 });
6999 if rng.lock().gen() {
7000 cx.background().spawn(find_or_create_worktree).detach();
7001 } else {
7002 find_or_create_worktree.await?;
7003 }
7004 }
7005 20..=79 if !files.is_empty() => {
7006 let buffer = if client.buffers.is_empty() || rng.lock().gen() {
7007 let file = files.choose(&mut *rng.lock()).unwrap();
7008 let (worktree, path) = project
7009 .update(cx, |project, cx| {
7010 project.find_or_create_local_worktree(
7011 file.clone(),
7012 true,
7013 cx,
7014 )
7015 })
7016 .await?;
7017 let project_path =
7018 worktree.read_with(cx, |worktree, _| (worktree.id(), path));
7019 log::info!(
7020 "Host: opening path {:?}, worktree {}, relative_path {:?}",
7021 file,
7022 project_path.0,
7023 project_path.1
7024 );
7025 let buffer = project
7026 .update(cx, |project, cx| project.open_buffer(project_path, cx))
7027 .await
7028 .unwrap();
7029 client.buffers.insert(buffer.clone());
7030 buffer
7031 } else {
7032 client
7033 .buffers
7034 .iter()
7035 .choose(&mut *rng.lock())
7036 .unwrap()
7037 .clone()
7038 };
7039
7040 if rng.lock().gen_bool(0.1) {
7041 cx.update(|cx| {
7042 log::info!(
7043 "Host: dropping buffer {:?}",
7044 buffer.read(cx).file().unwrap().full_path(cx)
7045 );
7046 client.buffers.remove(&buffer);
7047 drop(buffer);
7048 });
7049 } else {
7050 buffer.update(cx, |buffer, cx| {
7051 log::info!(
7052 "Host: updating buffer {:?} ({})",
7053 buffer.file().unwrap().full_path(cx),
7054 buffer.remote_id()
7055 );
7056
7057 if rng.lock().gen_bool(0.7) {
7058 buffer.randomly_edit(&mut *rng.lock(), 5, cx);
7059 } else {
7060 buffer.randomly_undo_redo(&mut *rng.lock(), cx);
7061 }
7062 });
7063 }
7064 }
7065 _ => loop {
7066 let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
7067 let mut path = PathBuf::new();
7068 path.push("/");
7069 for _ in 0..path_component_count {
7070 let letter = rng.lock().gen_range(b'a'..=b'z');
7071 path.push(std::str::from_utf8(&[letter]).unwrap());
7072 }
7073 path.set_extension("rs");
7074 let parent_path = path.parent().unwrap();
7075
7076 log::info!("Host: creating file {:?}", path,);
7077
7078 if fs.create_dir(&parent_path).await.is_ok()
7079 && fs.create_file(&path, Default::default()).await.is_ok()
7080 {
7081 break;
7082 } else {
7083 log::info!("Host: cannot create file");
7084 }
7085 },
7086 }
7087
7088 cx.background().simulate_random_delay().await;
7089 }
7090
7091 Ok(())
7092 }
7093
7094 let result =
7095 simulate_host_internal(&mut self, project.clone(), op_start_signal, rng, &mut cx)
7096 .await;
7097 log::info!("Host done");
7098 self.project = Some(project);
7099 (self, cx, result.err())
7100 }
7101
7102 pub async fn simulate_guest(
7103 mut self,
7104 guest_username: String,
7105 project: ModelHandle<Project>,
7106 op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
7107 rng: Arc<Mutex<StdRng>>,
7108 mut cx: TestAppContext,
7109 ) -> (Self, TestAppContext, Option<anyhow::Error>) {
7110 async fn simulate_guest_internal(
7111 client: &mut TestClient,
7112 guest_username: &str,
7113 project: ModelHandle<Project>,
7114 mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
7115 rng: Arc<Mutex<StdRng>>,
7116 cx: &mut TestAppContext,
7117 ) -> anyhow::Result<()> {
7118 while op_start_signal.next().await.is_some() {
7119 let buffer = if client.buffers.is_empty() || rng.lock().gen() {
7120 let worktree = if let Some(worktree) =
7121 project.read_with(cx, |project, cx| {
7122 project
7123 .worktrees(&cx)
7124 .filter(|worktree| {
7125 let worktree = worktree.read(cx);
7126 worktree.is_visible()
7127 && worktree.entries(false).any(|e| e.is_file())
7128 })
7129 .choose(&mut *rng.lock())
7130 }) {
7131 worktree
7132 } else {
7133 cx.background().simulate_random_delay().await;
7134 continue;
7135 };
7136
7137 let (worktree_root_name, project_path) =
7138 worktree.read_with(cx, |worktree, _| {
7139 let entry = worktree
7140 .entries(false)
7141 .filter(|e| e.is_file())
7142 .choose(&mut *rng.lock())
7143 .unwrap();
7144 (
7145 worktree.root_name().to_string(),
7146 (worktree.id(), entry.path.clone()),
7147 )
7148 });
7149 log::info!(
7150 "{}: opening path {:?} in worktree {} ({})",
7151 guest_username,
7152 project_path.1,
7153 project_path.0,
7154 worktree_root_name,
7155 );
7156 let buffer = project
7157 .update(cx, |project, cx| {
7158 project.open_buffer(project_path.clone(), cx)
7159 })
7160 .await?;
7161 log::info!(
7162 "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
7163 guest_username,
7164 project_path.1,
7165 project_path.0,
7166 worktree_root_name,
7167 buffer.read_with(cx, |buffer, _| buffer.remote_id())
7168 );
7169 client.buffers.insert(buffer.clone());
7170 buffer
7171 } else {
7172 client
7173 .buffers
7174 .iter()
7175 .choose(&mut *rng.lock())
7176 .unwrap()
7177 .clone()
7178 };
7179
7180 let choice = rng.lock().gen_range(0..100);
7181 match choice {
7182 0..=9 => {
7183 cx.update(|cx| {
7184 log::info!(
7185 "{}: dropping buffer {:?}",
7186 guest_username,
7187 buffer.read(cx).file().unwrap().full_path(cx)
7188 );
7189 client.buffers.remove(&buffer);
7190 drop(buffer);
7191 });
7192 }
7193 10..=19 => {
7194 let completions = project.update(cx, |project, cx| {
7195 log::info!(
7196 "{}: requesting completions for buffer {} ({:?})",
7197 guest_username,
7198 buffer.read(cx).remote_id(),
7199 buffer.read(cx).file().unwrap().full_path(cx)
7200 );
7201 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7202 project.completions(&buffer, offset, cx)
7203 });
7204 let completions = cx.background().spawn(async move {
7205 completions
7206 .await
7207 .map_err(|err| anyhow!("completions request failed: {:?}", err))
7208 });
7209 if rng.lock().gen_bool(0.3) {
7210 log::info!("{}: detaching completions request", guest_username);
7211 cx.update(|cx| completions.detach_and_log_err(cx));
7212 } else {
7213 completions.await?;
7214 }
7215 }
7216 20..=29 => {
7217 let code_actions = project.update(cx, |project, cx| {
7218 log::info!(
7219 "{}: requesting code actions for buffer {} ({:?})",
7220 guest_username,
7221 buffer.read(cx).remote_id(),
7222 buffer.read(cx).file().unwrap().full_path(cx)
7223 );
7224 let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
7225 project.code_actions(&buffer, range, cx)
7226 });
7227 let code_actions = cx.background().spawn(async move {
7228 code_actions.await.map_err(|err| {
7229 anyhow!("code actions request failed: {:?}", err)
7230 })
7231 });
7232 if rng.lock().gen_bool(0.3) {
7233 log::info!("{}: detaching code actions request", guest_username);
7234 cx.update(|cx| code_actions.detach_and_log_err(cx));
7235 } else {
7236 code_actions.await?;
7237 }
7238 }
7239 30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
7240 let (requested_version, save) = buffer.update(cx, |buffer, cx| {
7241 log::info!(
7242 "{}: saving buffer {} ({:?})",
7243 guest_username,
7244 buffer.remote_id(),
7245 buffer.file().unwrap().full_path(cx)
7246 );
7247 (buffer.version(), buffer.save(cx))
7248 });
7249 let save = cx.background().spawn(async move {
7250 let (saved_version, _) = save
7251 .await
7252 .map_err(|err| anyhow!("save request failed: {:?}", err))?;
7253 assert!(saved_version.observed_all(&requested_version));
7254 Ok::<_, anyhow::Error>(())
7255 });
7256 if rng.lock().gen_bool(0.3) {
7257 log::info!("{}: detaching save request", guest_username);
7258 cx.update(|cx| save.detach_and_log_err(cx));
7259 } else {
7260 save.await?;
7261 }
7262 }
7263 40..=44 => {
7264 let prepare_rename = project.update(cx, |project, cx| {
7265 log::info!(
7266 "{}: preparing rename for buffer {} ({:?})",
7267 guest_username,
7268 buffer.read(cx).remote_id(),
7269 buffer.read(cx).file().unwrap().full_path(cx)
7270 );
7271 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7272 project.prepare_rename(buffer, offset, cx)
7273 });
7274 let prepare_rename = cx.background().spawn(async move {
7275 prepare_rename.await.map_err(|err| {
7276 anyhow!("prepare rename request failed: {:?}", err)
7277 })
7278 });
7279 if rng.lock().gen_bool(0.3) {
7280 log::info!("{}: detaching prepare rename request", guest_username);
7281 cx.update(|cx| prepare_rename.detach_and_log_err(cx));
7282 } else {
7283 prepare_rename.await?;
7284 }
7285 }
7286 45..=49 => {
7287 let definitions = project.update(cx, |project, cx| {
7288 log::info!(
7289 "{}: requesting definitions for buffer {} ({:?})",
7290 guest_username,
7291 buffer.read(cx).remote_id(),
7292 buffer.read(cx).file().unwrap().full_path(cx)
7293 );
7294 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7295 project.definition(&buffer, offset, cx)
7296 });
7297 let definitions = cx.background().spawn(async move {
7298 definitions
7299 .await
7300 .map_err(|err| anyhow!("definitions request failed: {:?}", err))
7301 });
7302 if rng.lock().gen_bool(0.3) {
7303 log::info!("{}: detaching definitions request", guest_username);
7304 cx.update(|cx| definitions.detach_and_log_err(cx));
7305 } else {
7306 client
7307 .buffers
7308 .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
7309 }
7310 }
7311 50..=54 => {
7312 let highlights = project.update(cx, |project, cx| {
7313 log::info!(
7314 "{}: requesting highlights for buffer {} ({:?})",
7315 guest_username,
7316 buffer.read(cx).remote_id(),
7317 buffer.read(cx).file().unwrap().full_path(cx)
7318 );
7319 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7320 project.document_highlights(&buffer, offset, cx)
7321 });
7322 let highlights = cx.background().spawn(async move {
7323 highlights
7324 .await
7325 .map_err(|err| anyhow!("highlights request failed: {:?}", err))
7326 });
7327 if rng.lock().gen_bool(0.3) {
7328 log::info!("{}: detaching highlights request", guest_username);
7329 cx.update(|cx| highlights.detach_and_log_err(cx));
7330 } else {
7331 highlights.await?;
7332 }
7333 }
7334 55..=59 => {
7335 let search = project.update(cx, |project, cx| {
7336 let query = rng.lock().gen_range('a'..='z');
7337 log::info!("{}: project-wide search {:?}", guest_username, query);
7338 project.search(SearchQuery::text(query, false, false), cx)
7339 });
7340 let search = cx.background().spawn(async move {
7341 search
7342 .await
7343 .map_err(|err| anyhow!("search request failed: {:?}", err))
7344 });
7345 if rng.lock().gen_bool(0.3) {
7346 log::info!("{}: detaching search request", guest_username);
7347 cx.update(|cx| search.detach_and_log_err(cx));
7348 } else {
7349 client.buffers.extend(search.await?.into_keys());
7350 }
7351 }
7352 60..=69 => {
7353 let worktree = project
7354 .read_with(cx, |project, cx| {
7355 project
7356 .worktrees(&cx)
7357 .filter(|worktree| {
7358 let worktree = worktree.read(cx);
7359 worktree.is_visible()
7360 && worktree.entries(false).any(|e| e.is_file())
7361 && worktree
7362 .root_entry()
7363 .map_or(false, |e| e.is_dir())
7364 })
7365 .choose(&mut *rng.lock())
7366 })
7367 .unwrap();
7368 let (worktree_id, worktree_root_name) = worktree
7369 .read_with(cx, |worktree, _| {
7370 (worktree.id(), worktree.root_name().to_string())
7371 });
7372
7373 let mut new_name = String::new();
7374 for _ in 0..10 {
7375 let letter = rng.lock().gen_range('a'..='z');
7376 new_name.push(letter);
7377 }
7378 let mut new_path = PathBuf::new();
7379 new_path.push(new_name);
7380 new_path.set_extension("rs");
7381 log::info!(
7382 "{}: creating {:?} in worktree {} ({})",
7383 guest_username,
7384 new_path,
7385 worktree_id,
7386 worktree_root_name,
7387 );
7388 project
7389 .update(cx, |project, cx| {
7390 project.create_entry((worktree_id, new_path), false, cx)
7391 })
7392 .unwrap()
7393 .await?;
7394 }
7395 _ => {
7396 buffer.update(cx, |buffer, cx| {
7397 log::info!(
7398 "{}: updating buffer {} ({:?})",
7399 guest_username,
7400 buffer.remote_id(),
7401 buffer.file().unwrap().full_path(cx)
7402 );
7403 if rng.lock().gen_bool(0.7) {
7404 buffer.randomly_edit(&mut *rng.lock(), 5, cx);
7405 } else {
7406 buffer.randomly_undo_redo(&mut *rng.lock(), cx);
7407 }
7408 });
7409 }
7410 }
7411 cx.background().simulate_random_delay().await;
7412 }
7413 Ok(())
7414 }
7415
7416 let result = simulate_guest_internal(
7417 &mut self,
7418 &guest_username,
7419 project.clone(),
7420 op_start_signal,
7421 rng,
7422 &mut cx,
7423 )
7424 .await;
7425 log::info!("{}: done", guest_username);
7426
7427 self.project = Some(project);
7428 (self, cx, result.err())
7429 }
7430 }
7431
7432 impl Drop for TestClient {
7433 fn drop(&mut self) {
7434 self.client.tear_down();
7435 }
7436 }
7437
7438 impl Executor for Arc<gpui::executor::Background> {
7439 type Sleep = gpui::executor::Timer;
7440
7441 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
7442 self.spawn(future).detach();
7443 }
7444
7445 fn sleep(&self, duration: Duration) -> Self::Sleep {
7446 self.as_ref().timer(duration)
7447 }
7448 }
7449
7450 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
7451 channel
7452 .messages()
7453 .cursor::<()>()
7454 .map(|m| {
7455 (
7456 m.sender.github_login.clone(),
7457 m.body.clone(),
7458 m.is_pending(),
7459 )
7460 })
7461 .collect()
7462 }
7463
7464 struct EmptyView;
7465
7466 impl gpui::Entity for EmptyView {
7467 type Event = ();
7468 }
7469
7470 impl gpui::View for EmptyView {
7471 fn ui_name() -> &'static str {
7472 "empty view"
7473 }
7474
7475 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
7476 gpui::Element::boxed(gpui::elements::Empty::new())
7477 }
7478 }
7479}