1mod store;
2
3use crate::{
4 auth,
5 db::{self, ChannelId, MessageId, User, UserId},
6 AppState, Result,
7};
8use anyhow::anyhow;
9use async_tungstenite::tungstenite::{
10 protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
11};
12use axum::{
13 body::Body,
14 extract::{
15 ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
16 ConnectInfo, WebSocketUpgrade,
17 },
18 headers::{Header, HeaderName},
19 http::StatusCode,
20 middleware,
21 response::IntoResponse,
22 routing::get,
23 Extension, Router, TypedHeader,
24};
25use collections::HashMap;
26use futures::{
27 channel::mpsc,
28 future::{self, BoxFuture},
29 FutureExt, SinkExt, StreamExt, TryStreamExt,
30};
31use lazy_static::lazy_static;
32use rpc::{
33 proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
34 Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
35};
36use serde::{Serialize, Serializer};
37use std::{
38 any::TypeId,
39 future::Future,
40 marker::PhantomData,
41 net::SocketAddr,
42 ops::{Deref, DerefMut},
43 rc::Rc,
44 sync::{
45 atomic::{AtomicBool, Ordering::SeqCst},
46 Arc,
47 },
48 time::Duration,
49};
50use store::{Store, Worktree};
51use time::OffsetDateTime;
52use tokio::{
53 sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
54 time::Sleep,
55};
56use tower::ServiceBuilder;
57use tracing::{info_span, instrument, Instrument};
58
59type MessageHandler =
60 Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
61
62struct Response<R> {
63 server: Arc<Server>,
64 receipt: Receipt<R>,
65 responded: Arc<AtomicBool>,
66}
67
68impl<R: RequestMessage> Response<R> {
69 fn send(self, payload: R::Response) -> Result<()> {
70 self.responded.store(true, SeqCst);
71 self.server.peer.respond(self.receipt, payload)?;
72 Ok(())
73 }
74
75 fn into_receipt(self) -> Receipt<R> {
76 self.responded.store(true, SeqCst);
77 self.receipt
78 }
79}
80
81pub struct Server {
82 peer: Arc<Peer>,
83 store: RwLock<Store>,
84 app_state: Arc<AppState>,
85 handlers: HashMap<TypeId, MessageHandler>,
86 notifications: Option<mpsc::UnboundedSender<()>>,
87}
88
89
90pub trait Executor: Send + Clone {
91 type Sleep: Send + Future;
92 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
93 fn sleep(&self, duration: Duration) -> Self::Sleep;
94}
95
96#[derive(Clone)]
97pub struct RealExecutor;
98
99const MESSAGE_COUNT_PER_PAGE: usize = 100;
100const MAX_MESSAGE_LEN: usize = 1024;
101
102struct StoreReadGuard<'a> {
103 guard: RwLockReadGuard<'a, Store>,
104 _not_send: PhantomData<Rc<()>>,
105}
106
107struct StoreWriteGuard<'a> {
108 guard: RwLockWriteGuard<'a, Store>,
109 _not_send: PhantomData<Rc<()>>,
110}
111
112#[derive(Serialize)]
113pub struct ServerSnapshot<'a> {
114 peer: &'a Peer,
115 #[serde(serialize_with = "serialize_deref")]
116 store: RwLockReadGuard<'a, Store>,
117}
118
119pub fn serialize_deref<S, T, U>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
120where
121 S: Serializer,
122 T: Deref<Target = U>,
123 U: Serialize
124{
125 Serialize::serialize(value.deref(), serializer)
126}
127
128
129impl Server {
130 pub fn new(
131 app_state: Arc<AppState>,
132 notifications: Option<mpsc::UnboundedSender<()>>,
133 ) -> Arc<Self> {
134 let mut server = Self {
135 peer: Peer::new(),
136 app_state,
137 store: Default::default(),
138 handlers: Default::default(),
139 notifications,
140 };
141
142 server
143 .add_request_handler(Server::ping)
144 .add_request_handler(Server::register_project)
145 .add_message_handler(Server::unregister_project)
146 .add_request_handler(Server::join_project)
147 .add_message_handler(Server::leave_project)
148 .add_message_handler(Server::respond_to_join_project_request)
149 .add_request_handler(Server::register_worktree)
150 .add_message_handler(Server::unregister_worktree)
151 .add_request_handler(Server::update_worktree)
152 .add_message_handler(Server::start_language_server)
153 .add_message_handler(Server::update_language_server)
154 .add_message_handler(Server::update_diagnostic_summary)
155 .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
156 .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
157 .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
158 .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
159 .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
160 .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
161 .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
162 .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
163 .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
164 .add_request_handler(
165 Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
166 )
167 .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
168 .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
169 .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
170 .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
171 .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
172 .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
173 .add_request_handler(Server::forward_project_request::<proto::CreateProjectEntry>)
174 .add_request_handler(Server::forward_project_request::<proto::RenameProjectEntry>)
175 .add_request_handler(Server::forward_project_request::<proto::DeleteProjectEntry>)
176 .add_request_handler(Server::update_buffer)
177 .add_message_handler(Server::update_buffer_file)
178 .add_message_handler(Server::buffer_reloaded)
179 .add_message_handler(Server::buffer_saved)
180 .add_request_handler(Server::save_buffer)
181 .add_request_handler(Server::get_channels)
182 .add_request_handler(Server::get_users)
183 .add_request_handler(Server::fuzzy_search_users)
184 .add_request_handler(Server::request_contact)
185 .add_request_handler(Server::remove_contact)
186 .add_request_handler(Server::respond_to_contact_request)
187 .add_request_handler(Server::join_channel)
188 .add_message_handler(Server::leave_channel)
189 .add_request_handler(Server::send_channel_message)
190 .add_request_handler(Server::follow)
191 .add_message_handler(Server::unfollow)
192 .add_message_handler(Server::update_followers)
193 .add_request_handler(Server::get_channel_messages);
194
195 Arc::new(server)
196 }
197
198 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
199 where
200 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
201 Fut: 'static + Send + Future<Output = Result<()>>,
202 M: EnvelopedMessage,
203 {
204 let prev_handler = self.handlers.insert(
205 TypeId::of::<M>(),
206 Box::new(move |server, envelope| {
207 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
208 let span = info_span!(
209 "handle message",
210 payload_type = envelope.payload_type_name(),
211 payload = format!("{:?}", envelope.payload).as_str(),
212 );
213 let future = (handler)(server, *envelope);
214 async move {
215 if let Err(error) = future.await {
216 tracing::error!(%error, "error handling message");
217 }
218 }
219 .instrument(span)
220 .boxed()
221 }),
222 );
223 if prev_handler.is_some() {
224 panic!("registered a handler for the same message twice");
225 }
226 self
227 }
228
229 /// Handle a request while holding a lock to the store. This is useful when we're registering
230 /// a connection but we want to respond on the connection before anybody else can send on it.
231 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
232 where
233 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>, Response<M>) -> Fut,
234 Fut: Send + Future<Output = Result<()>>,
235 M: RequestMessage,
236 {
237 let handler = Arc::new(handler);
238 self.add_message_handler(move |server, envelope| {
239 let receipt = envelope.receipt();
240 let handler = handler.clone();
241 async move {
242 let responded = Arc::new(AtomicBool::default());
243 let response = Response {
244 server: server.clone(),
245 responded: responded.clone(),
246 receipt: envelope.receipt(),
247 };
248 match (handler)(server.clone(), envelope, response).await {
249 Ok(()) => {
250 if responded.load(std::sync::atomic::Ordering::SeqCst) {
251 Ok(())
252 } else {
253 Err(anyhow!("handler did not send a response"))?
254 }
255 }
256 Err(error) => {
257 server.peer.respond_with_error(
258 receipt,
259 proto::Error {
260 message: error.to_string(),
261 },
262 )?;
263 Err(error)
264 }
265 }
266 }
267 })
268 }
269
270 pub fn handle_connection<E: Executor>(
271 self: &Arc<Self>,
272 connection: Connection,
273 address: String,
274 user: User,
275 mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
276 executor: E,
277 ) -> impl Future<Output = Result<()>> {
278 let mut this = self.clone();
279 let user_id = user.id;
280 let login = user.github_login;
281 let span = info_span!("handle connection", %user_id, %login, %address);
282 async move {
283 let (connection_id, handle_io, mut incoming_rx) = this
284 .peer
285 .add_connection(connection, {
286 let executor = executor.clone();
287 move |duration| {
288 let timer = executor.sleep(duration);
289 async move {
290 timer.await;
291 }
292 }
293 })
294 .await;
295
296 tracing::info!(%user_id, %login, %connection_id, %address, "connection opened");
297
298 if let Some(send_connection_id) = send_connection_id.as_mut() {
299 let _ = send_connection_id.send(connection_id).await;
300 }
301
302 if !user.connected_once {
303 this.peer.send(connection_id, proto::ShowContacts {})?;
304 this.app_state.db.set_user_connected_once(user_id, true).await?;
305 }
306
307 let (contacts, invite_code) = future::try_join(
308 this.app_state.db.get_contacts(user_id),
309 this.app_state.db.get_invite_code_for_user(user_id)
310 ).await?;
311
312 {
313 let mut store = this.store_mut().await;
314 store.add_connection(connection_id, user_id);
315 this.peer.send(connection_id, store.build_initial_contacts_update(contacts))?;
316
317 if let Some((code, count)) = invite_code {
318 this.peer.send(connection_id, proto::UpdateInviteInfo {
319 url: format!("{}{}", this.app_state.invite_link_prefix, code),
320 count,
321 })?;
322 }
323 }
324 this.update_user_contacts(user_id).await?;
325
326 let handle_io = handle_io.fuse();
327 futures::pin_mut!(handle_io);
328 loop {
329 let next_message = incoming_rx.next().fuse();
330 futures::pin_mut!(next_message);
331 futures::select_biased! {
332 result = handle_io => {
333 if let Err(error) = result {
334 tracing::error!(?error, %user_id, %login, %connection_id, %address, "error handling I/O");
335 }
336 break;
337 }
338 message = next_message => {
339 if let Some(message) = message {
340 let type_name = message.payload_type_name();
341 let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name);
342 async {
343 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
344 let notifications = this.notifications.clone();
345 let is_background = message.is_background();
346 let handle_message = (handler)(this.clone(), message);
347 let handle_message = async move {
348 handle_message.await;
349 if let Some(mut notifications) = notifications {
350 let _ = notifications.send(()).await;
351 }
352 };
353 if is_background {
354 executor.spawn_detached(handle_message);
355 } else {
356 handle_message.await;
357 }
358 } else {
359 tracing::error!(%user_id, %login, %connection_id, %address, "no message handler");
360 }
361 }.instrument(span).await;
362 } else {
363 tracing::info!(%user_id, %login, %connection_id, %address, "connection closed");
364 break;
365 }
366 }
367 }
368 }
369
370 tracing::info!(%user_id, %login, %connection_id, %address, "signing out");
371 if let Err(error) = this.sign_out(connection_id).await {
372 tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out");
373 }
374
375 Ok(())
376 }.instrument(span)
377 }
378
379 #[instrument(skip(self), err)]
380 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
381 self.peer.disconnect(connection_id);
382
383 let removed_user_id = {
384 let mut store = self.store_mut().await;
385 let removed_connection = store.remove_connection(connection_id)?;
386
387 for (project_id, project) in removed_connection.hosted_projects {
388 broadcast(connection_id, project.guests.keys().copied(), |conn_id| {
389 self.peer
390 .send(conn_id, proto::UnregisterProject { project_id })
391 });
392
393 for (_, receipts) in project.join_requests {
394 for receipt in receipts {
395 self.peer.respond(
396 receipt,
397 proto::JoinProjectResponse {
398 variant: Some(proto::join_project_response::Variant::Decline(
399 proto::join_project_response::Decline {
400 reason: proto::join_project_response::decline::Reason::WentOffline as i32
401 },
402 )),
403 },
404 )?;
405 }
406 }
407 }
408
409 for project_id in removed_connection.guest_project_ids {
410 if let Some(project) = store.project(project_id).trace_err() {
411 broadcast(connection_id, project.connection_ids(), |conn_id| {
412 self.peer.send(
413 conn_id,
414 proto::RemoveProjectCollaborator {
415 project_id,
416 peer_id: connection_id.0,
417 },
418 )
419 });
420 if project.guests.is_empty() {
421 self.peer
422 .send(
423 project.host_connection_id,
424 proto::ProjectUnshared { project_id },
425 )
426 .trace_err();
427 }
428 }
429 }
430
431 removed_connection.user_id
432 };
433
434 self.update_user_contacts(removed_user_id).await?;
435
436 Ok(())
437 }
438
439 pub async fn invite_code_redeemed(self: &Arc<Self>, code: &str, invitee_id: UserId) -> Result<()> {
440 let user = self.app_state.db.get_user_for_invite_code(code).await?;
441 let store = self.store().await;
442 let invitee_contact = store.contact_for_user(invitee_id, true);
443 for connection_id in store.connection_ids_for_user(user.id) {
444 self.peer.send(connection_id, proto::UpdateContacts {
445 contacts: vec![invitee_contact.clone()],
446 ..Default::default()
447 })?;
448 self.peer.send(connection_id, proto::UpdateInviteInfo {
449 url: format!("{}{}", self.app_state.invite_link_prefix, code),
450 count: user.invite_count as u32,
451 })?;
452 }
453 Ok(())
454 }
455
456 async fn ping(
457 self: Arc<Server>,
458 _: TypedEnvelope<proto::Ping>,
459 response: Response<proto::Ping>,
460 ) -> Result<()> {
461 response.send(proto::Ack {})?;
462 Ok(())
463 }
464
465 async fn register_project(
466 self: Arc<Server>,
467 request: TypedEnvelope<proto::RegisterProject>,
468 response: Response<proto::RegisterProject>,
469 ) -> Result<()> {
470 let user_id;
471 let project_id;
472 {
473 let mut state = self.store_mut().await;
474 user_id = state.user_id_for_connection(request.sender_id)?;
475 project_id = state.register_project(request.sender_id, user_id);
476 };
477 self.update_user_contacts(user_id).await?;
478 response.send(proto::RegisterProjectResponse { project_id })?;
479 Ok(())
480 }
481
482 async fn unregister_project(
483 self: Arc<Server>,
484 request: TypedEnvelope<proto::UnregisterProject>,
485 ) -> Result<()> {
486 let (user_id, project) = {
487 let mut state = self.store_mut().await;
488 let project =
489 state.unregister_project(request.payload.project_id, request.sender_id)?;
490 (state.user_id_for_connection(request.sender_id)?, project)
491 };
492
493 broadcast(
494 request.sender_id,
495 project.guests.keys().copied(),
496 |conn_id| {
497 self.peer.send(
498 conn_id,
499 proto::UnregisterProject {
500 project_id: request.payload.project_id,
501 },
502 )
503 },
504 );
505 for (_, receipts) in project.join_requests {
506 for receipt in receipts {
507 self.peer.respond(
508 receipt,
509 proto::JoinProjectResponse {
510 variant: Some(proto::join_project_response::Variant::Decline(
511 proto::join_project_response::Decline {
512 reason: proto::join_project_response::decline::Reason::Closed
513 as i32,
514 },
515 )),
516 },
517 )?;
518 }
519 }
520
521 self.update_user_contacts(user_id).await?;
522 Ok(())
523 }
524
525 async fn update_user_contacts(self: &Arc<Server>, user_id: UserId) -> Result<()> {
526 let contacts = self.app_state.db.get_contacts(user_id).await?;
527 let store = self.store().await;
528 let updated_contact = store.contact_for_user(user_id, false);
529 for contact in contacts {
530 if let db::Contact::Accepted {
531 user_id: contact_user_id,
532 ..
533 } = contact
534 {
535 for contact_conn_id in store.connection_ids_for_user(contact_user_id) {
536 self.peer
537 .send(
538 contact_conn_id,
539 proto::UpdateContacts {
540 contacts: vec![updated_contact.clone()],
541 remove_contacts: Default::default(),
542 incoming_requests: Default::default(),
543 remove_incoming_requests: Default::default(),
544 outgoing_requests: Default::default(),
545 remove_outgoing_requests: Default::default(),
546 },
547 )
548 .trace_err();
549 }
550 }
551 }
552 Ok(())
553 }
554
555 async fn join_project(
556 self: Arc<Server>,
557 request: TypedEnvelope<proto::JoinProject>,
558 response: Response<proto::JoinProject>,
559 ) -> Result<()> {
560 let project_id = request.payload.project_id;
561 let host_user_id;
562 let guest_user_id;
563 let host_connection_id;
564 {
565 let state = self.store().await;
566 let project = state.project(project_id)?;
567 host_user_id = project.host_user_id;
568 host_connection_id = project.host_connection_id;
569 guest_user_id = state.user_id_for_connection(request.sender_id)?;
570 };
571
572 let has_contact = self
573 .app_state
574 .db
575 .has_contact(guest_user_id, host_user_id)
576 .await?;
577 if !has_contact {
578 return Err(anyhow!("no such project"))?;
579 }
580
581 self.store_mut().await.request_join_project(
582 guest_user_id,
583 project_id,
584 response.into_receipt(),
585 )?;
586 self.peer.send(
587 host_connection_id,
588 proto::RequestJoinProject {
589 project_id,
590 requester_id: guest_user_id.to_proto(),
591 },
592 )?;
593 Ok(())
594 }
595
596 async fn respond_to_join_project_request(
597 self: Arc<Server>,
598 request: TypedEnvelope<proto::RespondToJoinProjectRequest>,
599 ) -> Result<()> {
600 let host_user_id;
601
602 {
603 let mut state = self.store_mut().await;
604 let project_id = request.payload.project_id;
605 let project = state.project(project_id)?;
606 if project.host_connection_id != request.sender_id {
607 Err(anyhow!("no such connection"))?;
608 }
609
610 host_user_id = project.host_user_id;
611 let guest_user_id = UserId::from_proto(request.payload.requester_id);
612
613 if !request.payload.allow {
614 let receipts = state
615 .deny_join_project_request(request.sender_id, guest_user_id, project_id)
616 .ok_or_else(|| anyhow!("no such request"))?;
617 for receipt in receipts {
618 self.peer.respond(
619 receipt,
620 proto::JoinProjectResponse {
621 variant: Some(proto::join_project_response::Variant::Decline(
622 proto::join_project_response::Decline {
623 reason: proto::join_project_response::decline::Reason::Declined
624 as i32,
625 },
626 )),
627 },
628 )?;
629 }
630 return Ok(());
631 }
632
633 let (receipts_with_replica_ids, project) = state
634 .accept_join_project_request(request.sender_id, guest_user_id, project_id)
635 .ok_or_else(|| anyhow!("no such request"))?;
636
637 let peer_count = project.guests.len();
638 let mut collaborators = Vec::with_capacity(peer_count);
639 collaborators.push(proto::Collaborator {
640 peer_id: project.host_connection_id.0,
641 replica_id: 0,
642 user_id: project.host_user_id.to_proto(),
643 });
644 let worktrees = project
645 .worktrees
646 .iter()
647 .filter_map(|(id, shared_worktree)| {
648 let worktree = project.worktrees.get(&id)?;
649 Some(proto::Worktree {
650 id: *id,
651 root_name: worktree.root_name.clone(),
652 entries: shared_worktree.entries.values().cloned().collect(),
653 diagnostic_summaries: shared_worktree
654 .diagnostic_summaries
655 .values()
656 .cloned()
657 .collect(),
658 visible: worktree.visible,
659 scan_id: shared_worktree.scan_id,
660 })
661 })
662 .collect::<Vec<_>>();
663
664 // Add all guests other than the requesting user's own connections as collaborators
665 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &project.guests {
666 if receipts_with_replica_ids
667 .iter()
668 .all(|(receipt, _)| receipt.sender_id != *peer_conn_id)
669 {
670 collaborators.push(proto::Collaborator {
671 peer_id: peer_conn_id.0,
672 replica_id: *peer_replica_id as u32,
673 user_id: peer_user_id.to_proto(),
674 });
675 }
676 }
677
678 for conn_id in project.connection_ids() {
679 for (receipt, replica_id) in &receipts_with_replica_ids {
680 if conn_id != receipt.sender_id {
681 self.peer.send(
682 conn_id,
683 proto::AddProjectCollaborator {
684 project_id,
685 collaborator: Some(proto::Collaborator {
686 peer_id: receipt.sender_id.0,
687 replica_id: *replica_id as u32,
688 user_id: guest_user_id.to_proto(),
689 }),
690 },
691 )?;
692 }
693 }
694 }
695
696 for (receipt, replica_id) in receipts_with_replica_ids {
697 self.peer.respond(
698 receipt,
699 proto::JoinProjectResponse {
700 variant: Some(proto::join_project_response::Variant::Accept(
701 proto::join_project_response::Accept {
702 worktrees: worktrees.clone(),
703 replica_id: replica_id as u32,
704 collaborators: collaborators.clone(),
705 language_servers: project.language_servers.clone(),
706 },
707 )),
708 },
709 )?;
710 }
711 }
712
713 self.update_user_contacts(host_user_id).await?;
714 Ok(())
715 }
716
717 async fn leave_project(
718 self: Arc<Server>,
719 request: TypedEnvelope<proto::LeaveProject>,
720 ) -> Result<()> {
721 let sender_id = request.sender_id;
722 let project_id = request.payload.project_id;
723 let project;
724 {
725 let mut store = self.store_mut().await;
726 project = store.leave_project(sender_id, project_id)?;
727
728 if project.remove_collaborator {
729 broadcast(sender_id, project.connection_ids, |conn_id| {
730 self.peer.send(
731 conn_id,
732 proto::RemoveProjectCollaborator {
733 project_id,
734 peer_id: sender_id.0,
735 },
736 )
737 });
738 }
739
740 if let Some(requester_id) = project.cancel_request {
741 self.peer.send(
742 project.host_connection_id,
743 proto::JoinProjectRequestCancelled {
744 project_id,
745 requester_id: requester_id.to_proto(),
746 },
747 )?;
748 }
749
750 if project.unshare {
751 self.peer.send(
752 project.host_connection_id,
753 proto::ProjectUnshared { project_id },
754 )?;
755 }
756 }
757 self.update_user_contacts(project.host_user_id).await?;
758 Ok(())
759 }
760
761 async fn register_worktree(
762 self: Arc<Server>,
763 request: TypedEnvelope<proto::RegisterWorktree>,
764 response: Response<proto::RegisterWorktree>,
765 ) -> Result<()> {
766 let host_user_id;
767 {
768 let mut state = self.store_mut().await;
769 host_user_id = state.user_id_for_connection(request.sender_id)?;
770
771 let guest_connection_ids = state
772 .read_project(request.payload.project_id, request.sender_id)?
773 .guest_connection_ids();
774 state.register_worktree(
775 request.payload.project_id,
776 request.payload.worktree_id,
777 request.sender_id,
778 Worktree {
779 root_name: request.payload.root_name.clone(),
780 visible: request.payload.visible,
781 ..Default::default()
782 },
783 )?;
784
785 broadcast(request.sender_id, guest_connection_ids, |connection_id| {
786 self.peer
787 .forward_send(request.sender_id, connection_id, request.payload.clone())
788 });
789 }
790 self.update_user_contacts(host_user_id).await?;
791 response.send(proto::Ack {})?;
792 Ok(())
793 }
794
795 async fn unregister_worktree(
796 self: Arc<Server>,
797 request: TypedEnvelope<proto::UnregisterWorktree>,
798 ) -> Result<()> {
799 let host_user_id;
800 let project_id = request.payload.project_id;
801 let worktree_id = request.payload.worktree_id;
802 {
803 let mut state = self.store_mut().await;
804 let (_, guest_connection_ids) =
805 state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
806 host_user_id = state.user_id_for_connection(request.sender_id)?;
807 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
808 self.peer.send(
809 conn_id,
810 proto::UnregisterWorktree {
811 project_id,
812 worktree_id,
813 },
814 )
815 });
816 }
817 self.update_user_contacts(host_user_id).await?;
818 Ok(())
819 }
820
821 async fn update_worktree(
822 self: Arc<Server>,
823 request: TypedEnvelope<proto::UpdateWorktree>,
824 response: Response<proto::UpdateWorktree>,
825 ) -> Result<()> {
826 let connection_ids = self.store_mut().await.update_worktree(
827 request.sender_id,
828 request.payload.project_id,
829 request.payload.worktree_id,
830 &request.payload.removed_entries,
831 &request.payload.updated_entries,
832 request.payload.scan_id,
833 )?;
834
835 broadcast(request.sender_id, connection_ids, |connection_id| {
836 self.peer
837 .forward_send(request.sender_id, connection_id, request.payload.clone())
838 });
839 response.send(proto::Ack {})?;
840 Ok(())
841 }
842
843 async fn update_diagnostic_summary(
844 self: Arc<Server>,
845 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
846 ) -> Result<()> {
847 let summary = request
848 .payload
849 .summary
850 .clone()
851 .ok_or_else(|| anyhow!("invalid summary"))?;
852 let receiver_ids = self.store_mut().await.update_diagnostic_summary(
853 request.payload.project_id,
854 request.payload.worktree_id,
855 request.sender_id,
856 summary,
857 )?;
858
859 broadcast(request.sender_id, receiver_ids, |connection_id| {
860 self.peer
861 .forward_send(request.sender_id, connection_id, request.payload.clone())
862 });
863 Ok(())
864 }
865
866 async fn start_language_server(
867 self: Arc<Server>,
868 request: TypedEnvelope<proto::StartLanguageServer>,
869 ) -> Result<()> {
870 let receiver_ids = self.store_mut().await.start_language_server(
871 request.payload.project_id,
872 request.sender_id,
873 request
874 .payload
875 .server
876 .clone()
877 .ok_or_else(|| anyhow!("invalid language server"))?,
878 )?;
879 broadcast(request.sender_id, receiver_ids, |connection_id| {
880 self.peer
881 .forward_send(request.sender_id, connection_id, request.payload.clone())
882 });
883 Ok(())
884 }
885
886 async fn update_language_server(
887 self: Arc<Server>,
888 request: TypedEnvelope<proto::UpdateLanguageServer>,
889 ) -> Result<()> {
890 let receiver_ids = self
891 .store()
892 .await
893 .project_connection_ids(request.payload.project_id, request.sender_id)?;
894 broadcast(request.sender_id, receiver_ids, |connection_id| {
895 self.peer
896 .forward_send(request.sender_id, connection_id, request.payload.clone())
897 });
898 Ok(())
899 }
900
901 async fn forward_project_request<T>(
902 self: Arc<Server>,
903 request: TypedEnvelope<T>,
904 response: Response<T>,
905 ) -> Result<()>
906 where
907 T: EntityMessage + RequestMessage,
908 {
909 let host_connection_id = self
910 .store()
911 .await
912 .read_project(request.payload.remote_entity_id(), request.sender_id)?
913 .host_connection_id;
914
915 response.send(
916 self.peer
917 .forward_request(request.sender_id, host_connection_id, request.payload)
918 .await?,
919 )?;
920 Ok(())
921 }
922
923 async fn save_buffer(
924 self: Arc<Server>,
925 request: TypedEnvelope<proto::SaveBuffer>,
926 response: Response<proto::SaveBuffer>,
927 ) -> Result<()> {
928 let host = self
929 .store()
930 .await
931 .read_project(request.payload.project_id, request.sender_id)?
932 .host_connection_id;
933 let response_payload = self
934 .peer
935 .forward_request(request.sender_id, host, request.payload.clone())
936 .await?;
937
938 let mut guests = self
939 .store()
940 .await
941 .read_project(request.payload.project_id, request.sender_id)?
942 .connection_ids();
943 guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
944 broadcast(host, guests, |conn_id| {
945 self.peer
946 .forward_send(host, conn_id, response_payload.clone())
947 });
948 response.send(response_payload)?;
949 Ok(())
950 }
951
952 async fn update_buffer(
953 self: Arc<Server>,
954 request: TypedEnvelope<proto::UpdateBuffer>,
955 response: Response<proto::UpdateBuffer>,
956 ) -> Result<()> {
957 let receiver_ids = self
958 .store()
959 .await
960 .project_connection_ids(request.payload.project_id, request.sender_id)?;
961 broadcast(request.sender_id, receiver_ids, |connection_id| {
962 self.peer
963 .forward_send(request.sender_id, connection_id, request.payload.clone())
964 });
965 response.send(proto::Ack {})?;
966 Ok(())
967 }
968
969 async fn update_buffer_file(
970 self: Arc<Server>,
971 request: TypedEnvelope<proto::UpdateBufferFile>,
972 ) -> Result<()> {
973 let receiver_ids = self
974 .store()
975 .await
976 .project_connection_ids(request.payload.project_id, request.sender_id)?;
977 broadcast(request.sender_id, receiver_ids, |connection_id| {
978 self.peer
979 .forward_send(request.sender_id, connection_id, request.payload.clone())
980 });
981 Ok(())
982 }
983
984 async fn buffer_reloaded(
985 self: Arc<Server>,
986 request: TypedEnvelope<proto::BufferReloaded>,
987 ) -> Result<()> {
988 let receiver_ids = self
989 .store()
990 .await
991 .project_connection_ids(request.payload.project_id, request.sender_id)?;
992 broadcast(request.sender_id, receiver_ids, |connection_id| {
993 self.peer
994 .forward_send(request.sender_id, connection_id, request.payload.clone())
995 });
996 Ok(())
997 }
998
999 async fn buffer_saved(
1000 self: Arc<Server>,
1001 request: TypedEnvelope<proto::BufferSaved>,
1002 ) -> Result<()> {
1003 let receiver_ids = self
1004 .store()
1005 .await
1006 .project_connection_ids(request.payload.project_id, request.sender_id)?;
1007 broadcast(request.sender_id, receiver_ids, |connection_id| {
1008 self.peer
1009 .forward_send(request.sender_id, connection_id, request.payload.clone())
1010 });
1011 Ok(())
1012 }
1013
1014 async fn follow(
1015 self: Arc<Self>,
1016 request: TypedEnvelope<proto::Follow>,
1017 response: Response<proto::Follow>,
1018 ) -> Result<()> {
1019 let leader_id = ConnectionId(request.payload.leader_id);
1020 let follower_id = request.sender_id;
1021 if !self
1022 .store()
1023 .await
1024 .project_connection_ids(request.payload.project_id, follower_id)?
1025 .contains(&leader_id)
1026 {
1027 Err(anyhow!("no such peer"))?;
1028 }
1029 let mut response_payload = self
1030 .peer
1031 .forward_request(request.sender_id, leader_id, request.payload)
1032 .await?;
1033 response_payload
1034 .views
1035 .retain(|view| view.leader_id != Some(follower_id.0));
1036 response.send(response_payload)?;
1037 Ok(())
1038 }
1039
1040 async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
1041 let leader_id = ConnectionId(request.payload.leader_id);
1042 if !self
1043 .store()
1044 .await
1045 .project_connection_ids(request.payload.project_id, request.sender_id)?
1046 .contains(&leader_id)
1047 {
1048 Err(anyhow!("no such peer"))?;
1049 }
1050 self.peer
1051 .forward_send(request.sender_id, leader_id, request.payload)?;
1052 Ok(())
1053 }
1054
1055 async fn update_followers(
1056 self: Arc<Self>,
1057 request: TypedEnvelope<proto::UpdateFollowers>,
1058 ) -> Result<()> {
1059 let connection_ids = self
1060 .store()
1061 .await
1062 .project_connection_ids(request.payload.project_id, request.sender_id)?;
1063 let leader_id = request
1064 .payload
1065 .variant
1066 .as_ref()
1067 .and_then(|variant| match variant {
1068 proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
1069 proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
1070 proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
1071 });
1072 for follower_id in &request.payload.follower_ids {
1073 let follower_id = ConnectionId(*follower_id);
1074 if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
1075 self.peer
1076 .forward_send(request.sender_id, follower_id, request.payload.clone())?;
1077 }
1078 }
1079 Ok(())
1080 }
1081
1082 async fn get_channels(
1083 self: Arc<Server>,
1084 request: TypedEnvelope<proto::GetChannels>,
1085 response: Response<proto::GetChannels>,
1086 ) -> Result<()> {
1087 let user_id = self
1088 .store()
1089 .await
1090 .user_id_for_connection(request.sender_id)?;
1091 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
1092 response.send(proto::GetChannelsResponse {
1093 channels: channels
1094 .into_iter()
1095 .map(|chan| proto::Channel {
1096 id: chan.id.to_proto(),
1097 name: chan.name,
1098 })
1099 .collect(),
1100 })?;
1101 Ok(())
1102 }
1103
1104 async fn get_users(
1105 self: Arc<Server>,
1106 request: TypedEnvelope<proto::GetUsers>,
1107 response: Response<proto::GetUsers>,
1108 ) -> Result<()> {
1109 let user_ids = request
1110 .payload
1111 .user_ids
1112 .into_iter()
1113 .map(UserId::from_proto)
1114 .collect();
1115 let users = self
1116 .app_state
1117 .db
1118 .get_users_by_ids(user_ids)
1119 .await?
1120 .into_iter()
1121 .map(|user| proto::User {
1122 id: user.id.to_proto(),
1123 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1124 github_login: user.github_login,
1125 })
1126 .collect();
1127 response.send(proto::UsersResponse { users })?;
1128 Ok(())
1129 }
1130
1131 async fn fuzzy_search_users(
1132 self: Arc<Server>,
1133 request: TypedEnvelope<proto::FuzzySearchUsers>,
1134 response: Response<proto::FuzzySearchUsers>,
1135 ) -> Result<()> {
1136 let user_id = self
1137 .store()
1138 .await
1139 .user_id_for_connection(request.sender_id)?;
1140 let query = request.payload.query;
1141 let db = &self.app_state.db;
1142 let users = match query.len() {
1143 0 => vec![],
1144 1 | 2 => db
1145 .get_user_by_github_login(&query)
1146 .await?
1147 .into_iter()
1148 .collect(),
1149 _ => db.fuzzy_search_users(&query, 10).await?,
1150 };
1151 let users = users
1152 .into_iter()
1153 .filter(|user| user.id != user_id)
1154 .map(|user| proto::User {
1155 id: user.id.to_proto(),
1156 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1157 github_login: user.github_login,
1158 })
1159 .collect();
1160 response.send(proto::UsersResponse { users })?;
1161 Ok(())
1162 }
1163
1164 async fn request_contact(
1165 self: Arc<Server>,
1166 request: TypedEnvelope<proto::RequestContact>,
1167 response: Response<proto::RequestContact>,
1168 ) -> Result<()> {
1169 let requester_id = self
1170 .store()
1171 .await
1172 .user_id_for_connection(request.sender_id)?;
1173 let responder_id = UserId::from_proto(request.payload.responder_id);
1174 if requester_id == responder_id {
1175 return Err(anyhow!("cannot add yourself as a contact"))?;
1176 }
1177
1178 self.app_state
1179 .db
1180 .send_contact_request(requester_id, responder_id)
1181 .await?;
1182
1183 // Update outgoing contact requests of requester
1184 let mut update = proto::UpdateContacts::default();
1185 update.outgoing_requests.push(responder_id.to_proto());
1186 for connection_id in self.store().await.connection_ids_for_user(requester_id) {
1187 self.peer.send(connection_id, update.clone())?;
1188 }
1189
1190 // Update incoming contact requests of responder
1191 let mut update = proto::UpdateContacts::default();
1192 update
1193 .incoming_requests
1194 .push(proto::IncomingContactRequest {
1195 requester_id: requester_id.to_proto(),
1196 should_notify: true,
1197 });
1198 for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1199 self.peer.send(connection_id, update.clone())?;
1200 }
1201
1202 response.send(proto::Ack {})?;
1203 Ok(())
1204 }
1205
1206 async fn respond_to_contact_request(
1207 self: Arc<Server>,
1208 request: TypedEnvelope<proto::RespondToContactRequest>,
1209 response: Response<proto::RespondToContactRequest>,
1210 ) -> Result<()> {
1211 let responder_id = self
1212 .store()
1213 .await
1214 .user_id_for_connection(request.sender_id)?;
1215 let requester_id = UserId::from_proto(request.payload.requester_id);
1216 if request.payload.response == proto::ContactRequestResponse::Dismiss as i32 {
1217 self.app_state
1218 .db
1219 .dismiss_contact_notification(responder_id, requester_id)
1220 .await?;
1221 } else {
1222 let accept = request.payload.response == proto::ContactRequestResponse::Accept as i32;
1223 self.app_state
1224 .db
1225 .respond_to_contact_request(responder_id, requester_id, accept)
1226 .await?;
1227
1228 let store = self.store().await;
1229 // Update responder with new contact
1230 let mut update = proto::UpdateContacts::default();
1231 if accept {
1232 update
1233 .contacts
1234 .push(store.contact_for_user(requester_id, false));
1235 }
1236 update
1237 .remove_incoming_requests
1238 .push(requester_id.to_proto());
1239 for connection_id in store.connection_ids_for_user(responder_id) {
1240 self.peer.send(connection_id, update.clone())?;
1241 }
1242
1243 // Update requester with new contact
1244 let mut update = proto::UpdateContacts::default();
1245 if accept {
1246 update
1247 .contacts
1248 .push(store.contact_for_user(responder_id, true));
1249 }
1250 update
1251 .remove_outgoing_requests
1252 .push(responder_id.to_proto());
1253 for connection_id in store.connection_ids_for_user(requester_id) {
1254 self.peer.send(connection_id, update.clone())?;
1255 }
1256 }
1257
1258 response.send(proto::Ack {})?;
1259 Ok(())
1260 }
1261
1262 async fn remove_contact(
1263 self: Arc<Server>,
1264 request: TypedEnvelope<proto::RemoveContact>,
1265 response: Response<proto::RemoveContact>,
1266 ) -> Result<()> {
1267 let requester_id = self
1268 .store()
1269 .await
1270 .user_id_for_connection(request.sender_id)?;
1271 let responder_id = UserId::from_proto(request.payload.user_id);
1272 self.app_state
1273 .db
1274 .remove_contact(requester_id, responder_id)
1275 .await?;
1276
1277 // Update outgoing contact requests of requester
1278 let mut update = proto::UpdateContacts::default();
1279 update
1280 .remove_outgoing_requests
1281 .push(responder_id.to_proto());
1282 for connection_id in self.store().await.connection_ids_for_user(requester_id) {
1283 self.peer.send(connection_id, update.clone())?;
1284 }
1285
1286 // Update incoming contact requests of responder
1287 let mut update = proto::UpdateContacts::default();
1288 update
1289 .remove_incoming_requests
1290 .push(requester_id.to_proto());
1291 for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1292 self.peer.send(connection_id, update.clone())?;
1293 }
1294
1295 response.send(proto::Ack {})?;
1296 Ok(())
1297 }
1298
1299 async fn join_channel(
1300 self: Arc<Self>,
1301 request: TypedEnvelope<proto::JoinChannel>,
1302 response: Response<proto::JoinChannel>,
1303 ) -> Result<()> {
1304 let user_id = self
1305 .store()
1306 .await
1307 .user_id_for_connection(request.sender_id)?;
1308 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1309 if !self
1310 .app_state
1311 .db
1312 .can_user_access_channel(user_id, channel_id)
1313 .await?
1314 {
1315 Err(anyhow!("access denied"))?;
1316 }
1317
1318 self.store_mut()
1319 .await
1320 .join_channel(request.sender_id, channel_id);
1321 let messages = self
1322 .app_state
1323 .db
1324 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
1325 .await?
1326 .into_iter()
1327 .map(|msg| proto::ChannelMessage {
1328 id: msg.id.to_proto(),
1329 body: msg.body,
1330 timestamp: msg.sent_at.unix_timestamp() as u64,
1331 sender_id: msg.sender_id.to_proto(),
1332 nonce: Some(msg.nonce.as_u128().into()),
1333 })
1334 .collect::<Vec<_>>();
1335 response.send(proto::JoinChannelResponse {
1336 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1337 messages,
1338 })?;
1339 Ok(())
1340 }
1341
1342 async fn leave_channel(
1343 self: Arc<Self>,
1344 request: TypedEnvelope<proto::LeaveChannel>,
1345 ) -> Result<()> {
1346 let user_id = self
1347 .store()
1348 .await
1349 .user_id_for_connection(request.sender_id)?;
1350 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1351 if !self
1352 .app_state
1353 .db
1354 .can_user_access_channel(user_id, channel_id)
1355 .await?
1356 {
1357 Err(anyhow!("access denied"))?;
1358 }
1359
1360 self.store_mut()
1361 .await
1362 .leave_channel(request.sender_id, channel_id);
1363
1364 Ok(())
1365 }
1366
1367 async fn send_channel_message(
1368 self: Arc<Self>,
1369 request: TypedEnvelope<proto::SendChannelMessage>,
1370 response: Response<proto::SendChannelMessage>,
1371 ) -> Result<()> {
1372 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1373 let user_id;
1374 let connection_ids;
1375 {
1376 let state = self.store().await;
1377 user_id = state.user_id_for_connection(request.sender_id)?;
1378 connection_ids = state.channel_connection_ids(channel_id)?;
1379 }
1380
1381 // Validate the message body.
1382 let body = request.payload.body.trim().to_string();
1383 if body.len() > MAX_MESSAGE_LEN {
1384 return Err(anyhow!("message is too long"))?;
1385 }
1386 if body.is_empty() {
1387 return Err(anyhow!("message can't be blank"))?;
1388 }
1389
1390 let timestamp = OffsetDateTime::now_utc();
1391 let nonce = request
1392 .payload
1393 .nonce
1394 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
1395
1396 let message_id = self
1397 .app_state
1398 .db
1399 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1400 .await?
1401 .to_proto();
1402 let message = proto::ChannelMessage {
1403 sender_id: user_id.to_proto(),
1404 id: message_id,
1405 body,
1406 timestamp: timestamp.unix_timestamp() as u64,
1407 nonce: Some(nonce),
1408 };
1409 broadcast(request.sender_id, connection_ids, |conn_id| {
1410 self.peer.send(
1411 conn_id,
1412 proto::ChannelMessageSent {
1413 channel_id: channel_id.to_proto(),
1414 message: Some(message.clone()),
1415 },
1416 )
1417 });
1418 response.send(proto::SendChannelMessageResponse {
1419 message: Some(message),
1420 })?;
1421 Ok(())
1422 }
1423
1424 async fn get_channel_messages(
1425 self: Arc<Self>,
1426 request: TypedEnvelope<proto::GetChannelMessages>,
1427 response: Response<proto::GetChannelMessages>,
1428 ) -> Result<()> {
1429 let user_id = self
1430 .store()
1431 .await
1432 .user_id_for_connection(request.sender_id)?;
1433 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1434 if !self
1435 .app_state
1436 .db
1437 .can_user_access_channel(user_id, channel_id)
1438 .await?
1439 {
1440 Err(anyhow!("access denied"))?;
1441 }
1442
1443 let messages = self
1444 .app_state
1445 .db
1446 .get_channel_messages(
1447 channel_id,
1448 MESSAGE_COUNT_PER_PAGE,
1449 Some(MessageId::from_proto(request.payload.before_message_id)),
1450 )
1451 .await?
1452 .into_iter()
1453 .map(|msg| proto::ChannelMessage {
1454 id: msg.id.to_proto(),
1455 body: msg.body,
1456 timestamp: msg.sent_at.unix_timestamp() as u64,
1457 sender_id: msg.sender_id.to_proto(),
1458 nonce: Some(msg.nonce.as_u128().into()),
1459 })
1460 .collect::<Vec<_>>();
1461 response.send(proto::GetChannelMessagesResponse {
1462 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1463 messages,
1464 })?;
1465 Ok(())
1466 }
1467
1468 async fn store<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1469 #[cfg(test)]
1470 tokio::task::yield_now().await;
1471 let guard = self.store.read().await;
1472 #[cfg(test)]
1473 tokio::task::yield_now().await;
1474 StoreReadGuard {
1475 guard,
1476 _not_send: PhantomData,
1477 }
1478 }
1479
1480 async fn store_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1481 #[cfg(test)]
1482 tokio::task::yield_now().await;
1483 let guard = self.store.write().await;
1484 #[cfg(test)]
1485 tokio::task::yield_now().await;
1486 StoreWriteGuard {
1487 guard,
1488 _not_send: PhantomData,
1489 }
1490 }
1491
1492 pub async fn snapshot<'a>(self: &'a Arc<Self>) -> ServerSnapshot<'a> {
1493 ServerSnapshot {
1494 store: self.store.read().await,
1495 peer: &self.peer
1496 }
1497 }
1498}
1499
1500impl<'a> Deref for StoreReadGuard<'a> {
1501 type Target = Store;
1502
1503 fn deref(&self) -> &Self::Target {
1504 &*self.guard
1505 }
1506}
1507
1508impl<'a> Deref for StoreWriteGuard<'a> {
1509 type Target = Store;
1510
1511 fn deref(&self) -> &Self::Target {
1512 &*self.guard
1513 }
1514}
1515
1516impl<'a> DerefMut for StoreWriteGuard<'a> {
1517 fn deref_mut(&mut self) -> &mut Self::Target {
1518 &mut *self.guard
1519 }
1520}
1521
1522impl<'a> Drop for StoreWriteGuard<'a> {
1523 fn drop(&mut self) {
1524 #[cfg(test)]
1525 self.check_invariants();
1526
1527 let metrics = self.metrics();
1528 tracing::info!(
1529 connections = metrics.connections,
1530 registered_projects = metrics.registered_projects,
1531 shared_projects = metrics.shared_projects,
1532 "metrics"
1533 );
1534 }
1535}
1536
1537impl Executor for RealExecutor {
1538 type Sleep = Sleep;
1539
1540 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1541 tokio::task::spawn(future);
1542 }
1543
1544 fn sleep(&self, duration: Duration) -> Self::Sleep {
1545 tokio::time::sleep(duration)
1546 }
1547}
1548
1549fn broadcast<F>(
1550 sender_id: ConnectionId,
1551 receiver_ids: impl IntoIterator<Item = ConnectionId>,
1552 mut f: F,
1553) where
1554 F: FnMut(ConnectionId) -> anyhow::Result<()>,
1555{
1556 for receiver_id in receiver_ids {
1557 if receiver_id != sender_id {
1558 f(receiver_id).trace_err();
1559 }
1560 }
1561}
1562
1563lazy_static! {
1564 static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1565}
1566
1567pub struct ProtocolVersion(u32);
1568
1569impl Header for ProtocolVersion {
1570 fn name() -> &'static HeaderName {
1571 &ZED_PROTOCOL_VERSION
1572 }
1573
1574 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1575 where
1576 Self: Sized,
1577 I: Iterator<Item = &'i axum::http::HeaderValue>,
1578 {
1579 let version = values
1580 .next()
1581 .ok_or_else(|| axum::headers::Error::invalid())?
1582 .to_str()
1583 .map_err(|_| axum::headers::Error::invalid())?
1584 .parse()
1585 .map_err(|_| axum::headers::Error::invalid())?;
1586 Ok(Self(version))
1587 }
1588
1589 fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1590 values.extend([self.0.to_string().parse().unwrap()]);
1591 }
1592}
1593
1594pub fn routes(server: Arc<Server>) -> Router<Body> {
1595 Router::new()
1596 .route("/rpc", get(handle_websocket_request))
1597 .layer(
1598 ServiceBuilder::new()
1599 .layer(Extension(server.app_state.clone()))
1600 .layer(middleware::from_fn(auth::validate_header))
1601 .layer(Extension(server)),
1602 )
1603}
1604
1605pub async fn handle_websocket_request(
1606 TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1607 ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1608 Extension(server): Extension<Arc<Server>>,
1609 Extension(user): Extension<User>,
1610 ws: WebSocketUpgrade,
1611) -> axum::response::Response {
1612 if protocol_version != rpc::PROTOCOL_VERSION {
1613 return (
1614 StatusCode::UPGRADE_REQUIRED,
1615 "client must be upgraded".to_string(),
1616 )
1617 .into_response();
1618 }
1619 let socket_address = socket_address.to_string();
1620 ws.on_upgrade(move |socket| {
1621 use util::ResultExt;
1622 let socket = socket
1623 .map_ok(to_tungstenite_message)
1624 .err_into()
1625 .with(|message| async move { Ok(to_axum_message(message)) });
1626 let connection = Connection::new(Box::pin(socket));
1627 async move {
1628 server
1629 .handle_connection(connection, socket_address, user, None, RealExecutor)
1630 .await
1631 .log_err();
1632 }
1633 })
1634}
1635
1636fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1637 match message {
1638 TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1639 TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1640 TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1641 TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1642 TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1643 code: frame.code.into(),
1644 reason: frame.reason,
1645 })),
1646 }
1647}
1648
1649fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1650 match message {
1651 AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1652 AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1653 AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1654 AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1655 AxumMessage::Close(frame) => {
1656 TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1657 code: frame.code.into(),
1658 reason: frame.reason,
1659 }))
1660 }
1661 }
1662}
1663
1664pub trait ResultExt {
1665 type Ok;
1666
1667 fn trace_err(self) -> Option<Self::Ok>;
1668}
1669
1670impl<T, E> ResultExt for Result<T, E>
1671where
1672 E: std::fmt::Debug,
1673{
1674 type Ok = T;
1675
1676 fn trace_err(self) -> Option<T> {
1677 match self {
1678 Ok(value) => Some(value),
1679 Err(error) => {
1680 tracing::error!("{:?}", error);
1681 None
1682 }
1683 }
1684 }
1685}
1686
1687#[cfg(test)]
1688mod tests {
1689 use super::*;
1690 use crate::{
1691 db::{tests::TestDb, UserId},
1692 AppState,
1693 };
1694 use ::rpc::Peer;
1695 use client::{
1696 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1697 EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1698 };
1699 use collections::{BTreeMap, HashSet};
1700 use editor::{
1701 self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1702 ToOffset, ToggleCodeActions, Undo,
1703 };
1704 use gpui::{
1705 executor::{self, Deterministic},
1706 geometry::vector::vec2f,
1707 ModelHandle, Task, TestAppContext, ViewHandle,
1708 };
1709 use language::{
1710 range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1711 LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1712 };
1713 use lsp::{self, FakeLanguageServer};
1714 use parking_lot::Mutex;
1715 use project::{
1716 fs::{FakeFs, Fs as _},
1717 search::SearchQuery,
1718 worktree::WorktreeHandle,
1719 DiagnosticSummary, Project, ProjectPath, WorktreeId,
1720 };
1721 use rand::prelude::*;
1722 use rpc::PeerId;
1723 use serde_json::json;
1724 use settings::Settings;
1725 use sqlx::types::time::OffsetDateTime;
1726 use std::{
1727 cell::RefCell,
1728 env,
1729 ops::Deref,
1730 path::{Path, PathBuf},
1731 rc::Rc,
1732 sync::{
1733 atomic::{AtomicBool, Ordering::SeqCst},
1734 Arc,
1735 },
1736 time::Duration,
1737 };
1738 use theme::ThemeRegistry;
1739 use workspace::{Item, SplitDirection, ToggleFollow, Workspace};
1740
1741 #[cfg(test)]
1742 #[ctor::ctor]
1743 fn init_logger() {
1744 if std::env::var("RUST_LOG").is_ok() {
1745 env_logger::init();
1746 }
1747 }
1748
1749 #[gpui::test(iterations = 10)]
1750 async fn test_share_project(
1751 deterministic: Arc<Deterministic>,
1752 cx_a: &mut TestAppContext,
1753 cx_b: &mut TestAppContext,
1754 cx_b2: &mut TestAppContext,
1755 ) {
1756 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1757 let lang_registry = Arc::new(LanguageRegistry::test());
1758 let fs = FakeFs::new(cx_a.background());
1759 cx_a.foreground().forbid_parking();
1760
1761 // Connect to a server as 2 clients.
1762 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1763 let client_a = server.create_client(cx_a, "user_a").await;
1764 let mut client_b = server.create_client(cx_b, "user_b").await;
1765 server
1766 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1767 .await;
1768
1769 // Share a project as client A
1770 fs.insert_tree(
1771 "/a",
1772 json!({
1773 ".gitignore": "ignored-dir",
1774 "a.txt": "a-contents",
1775 "b.txt": "b-contents",
1776 "ignored-dir": {
1777 "c.txt": "",
1778 "d.txt": "",
1779 }
1780 }),
1781 )
1782 .await;
1783 let project_a = cx_a.update(|cx| {
1784 Project::local(
1785 client_a.clone(),
1786 client_a.user_store.clone(),
1787 lang_registry.clone(),
1788 fs.clone(),
1789 cx,
1790 )
1791 });
1792 let project_id = project_a
1793 .read_with(cx_a, |project, _| project.next_remote_id())
1794 .await;
1795 let (worktree_a, _) = project_a
1796 .update(cx_a, |p, cx| {
1797 p.find_or_create_local_worktree("/a", true, cx)
1798 })
1799 .await
1800 .unwrap();
1801 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1802 worktree_a
1803 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1804 .await;
1805
1806 // Join that project as client B
1807 let client_b_peer_id = client_b.peer_id;
1808 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1809 let replica_id_b = project_b.read_with(cx_b, |project, _| {
1810 assert_eq!(
1811 project
1812 .collaborators()
1813 .get(&client_a.peer_id)
1814 .unwrap()
1815 .user
1816 .github_login,
1817 "user_a"
1818 );
1819 project.replica_id()
1820 });
1821
1822 deterministic.run_until_parked();
1823 project_a.read_with(cx_a, |project, _| {
1824 let client_b_collaborator = project.collaborators().get(&client_b_peer_id).unwrap();
1825 assert_eq!(client_b_collaborator.replica_id, replica_id_b);
1826 assert_eq!(client_b_collaborator.user.github_login, "user_b");
1827 });
1828 project_b.read_with(cx_b, |project, cx| {
1829 let worktree = project.worktrees(cx).next().unwrap().read(cx);
1830 assert_eq!(
1831 worktree.paths().map(AsRef::as_ref).collect::<Vec<_>>(),
1832 [
1833 Path::new(".gitignore"),
1834 Path::new("a.txt"),
1835 Path::new("b.txt"),
1836 Path::new("ignored-dir"),
1837 Path::new("ignored-dir/c.txt"),
1838 Path::new("ignored-dir/d.txt"),
1839 ]
1840 );
1841 });
1842
1843 // Open the same file as client B and client A.
1844 let buffer_b = project_b
1845 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1846 .await
1847 .unwrap();
1848 buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1849 project_a.read_with(cx_a, |project, cx| {
1850 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1851 });
1852 let buffer_a = project_a
1853 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1854 .await
1855 .unwrap();
1856
1857 let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1858
1859 // TODO
1860 // // Create a selection set as client B and see that selection set as client A.
1861 // buffer_a
1862 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1863 // .await;
1864
1865 // Edit the buffer as client B and see that edit as client A.
1866 editor_b.update(cx_b, |editor, cx| {
1867 editor.handle_input(&Input("ok, ".into()), cx)
1868 });
1869 buffer_a
1870 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1871 .await;
1872
1873 // TODO
1874 // // Remove the selection set as client B, see those selections disappear as client A.
1875 cx_b.update(move |_| drop(editor_b));
1876 // buffer_a
1877 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1878 // .await;
1879
1880 // Client B can join again on a different window because they are already a participant.
1881 let client_b2 = server.create_client(cx_b2, "user_b").await;
1882 let project_b2 = Project::remote(
1883 project_id,
1884 client_b2.client.clone(),
1885 client_b2.user_store.clone(),
1886 lang_registry.clone(),
1887 FakeFs::new(cx_b2.background()),
1888 &mut cx_b2.to_async(),
1889 )
1890 .await
1891 .unwrap();
1892 deterministic.run_until_parked();
1893 project_a.read_with(cx_a, |project, _| {
1894 assert_eq!(project.collaborators().len(), 2);
1895 });
1896 project_b.read_with(cx_b, |project, _| {
1897 assert_eq!(project.collaborators().len(), 2);
1898 });
1899 project_b2.read_with(cx_b2, |project, _| {
1900 assert_eq!(project.collaborators().len(), 2);
1901 });
1902
1903 // Dropping client B's first project removes only that from client A's collaborators.
1904 cx_b.update(move |_| {
1905 drop(client_b.project.take());
1906 drop(project_b);
1907 });
1908 deterministic.run_until_parked();
1909 project_a.read_with(cx_a, |project, _| {
1910 assert_eq!(project.collaborators().len(), 1);
1911 });
1912 project_b2.read_with(cx_b2, |project, _| {
1913 assert_eq!(project.collaborators().len(), 1);
1914 });
1915 }
1916
1917 #[gpui::test(iterations = 10)]
1918 async fn test_unshare_project(
1919 deterministic: Arc<Deterministic>,
1920 cx_a: &mut TestAppContext,
1921 cx_b: &mut TestAppContext,
1922 ) {
1923 let lang_registry = Arc::new(LanguageRegistry::test());
1924 let fs = FakeFs::new(cx_a.background());
1925 cx_a.foreground().forbid_parking();
1926
1927 // Connect to a server as 2 clients.
1928 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1929 let client_a = server.create_client(cx_a, "user_a").await;
1930 let mut client_b = server.create_client(cx_b, "user_b").await;
1931 server
1932 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1933 .await;
1934
1935 // Share a project as client A
1936 fs.insert_tree(
1937 "/a",
1938 json!({
1939 "a.txt": "a-contents",
1940 "b.txt": "b-contents",
1941 }),
1942 )
1943 .await;
1944 let project_a = cx_a.update(|cx| {
1945 Project::local(
1946 client_a.clone(),
1947 client_a.user_store.clone(),
1948 lang_registry.clone(),
1949 fs.clone(),
1950 cx,
1951 )
1952 });
1953 let (worktree_a, _) = project_a
1954 .update(cx_a, |p, cx| {
1955 p.find_or_create_local_worktree("/a", true, cx)
1956 })
1957 .await
1958 .unwrap();
1959 worktree_a
1960 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1961 .await;
1962 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1963
1964 // Join that project as client B
1965 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1966 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1967 project_b
1968 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1969 .await
1970 .unwrap();
1971
1972 // When client B leaves the project, it gets automatically unshared.
1973 cx_b.update(|_| {
1974 drop(client_b.project.take());
1975 drop(project_b);
1976 });
1977 deterministic.run_until_parked();
1978 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1979
1980 // When client B joins again, the project gets re-shared.
1981 let project_b2 = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1982 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1983 project_b2
1984 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1985 .await
1986 .unwrap();
1987
1988 // When client A (the host) leaves, the project gets unshared and guests are notified.
1989 cx_a.update(|_| drop(project_a));
1990 deterministic.run_until_parked();
1991 project_b2.read_with(cx_b, |project, _| {
1992 assert!(project.is_read_only());
1993 assert!(project.collaborators().is_empty());
1994 });
1995 }
1996
1997 #[gpui::test(iterations = 10)]
1998 async fn test_host_disconnect(
1999 deterministic: Arc<Deterministic>,
2000 cx_a: &mut TestAppContext,
2001 cx_b: &mut TestAppContext,
2002 cx_c: &mut TestAppContext,
2003 ) {
2004 let lang_registry = Arc::new(LanguageRegistry::test());
2005 let fs = FakeFs::new(cx_a.background());
2006 cx_a.foreground().forbid_parking();
2007
2008 // Connect to a server as 3 clients.
2009 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2010 let client_a = server.create_client(cx_a, "user_a").await;
2011 let mut client_b = server.create_client(cx_b, "user_b").await;
2012 let client_c = server.create_client(cx_c, "user_c").await;
2013 server
2014 .make_contacts(vec![
2015 (&client_a, cx_a),
2016 (&client_b, cx_b),
2017 (&client_c, cx_c),
2018 ])
2019 .await;
2020
2021 // Share a project as client A
2022 fs.insert_tree(
2023 "/a",
2024 json!({
2025 "a.txt": "a-contents",
2026 "b.txt": "b-contents",
2027 }),
2028 )
2029 .await;
2030 let project_a = cx_a.update(|cx| {
2031 Project::local(
2032 client_a.clone(),
2033 client_a.user_store.clone(),
2034 lang_registry.clone(),
2035 fs.clone(),
2036 cx,
2037 )
2038 });
2039 let project_id = project_a
2040 .read_with(cx_a, |project, _| project.next_remote_id())
2041 .await;
2042 let (worktree_a, _) = project_a
2043 .update(cx_a, |p, cx| {
2044 p.find_or_create_local_worktree("/a", true, cx)
2045 })
2046 .await
2047 .unwrap();
2048 worktree_a
2049 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2050 .await;
2051 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2052
2053 // Join that project as client B
2054 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2055 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
2056 project_b
2057 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2058 .await
2059 .unwrap();
2060
2061 // Request to join that project as client C
2062 let project_c = cx_c.spawn(|mut cx| {
2063 let client = client_c.client.clone();
2064 let user_store = client_c.user_store.clone();
2065 let lang_registry = lang_registry.clone();
2066 async move {
2067 Project::remote(
2068 project_id,
2069 client,
2070 user_store,
2071 lang_registry.clone(),
2072 FakeFs::new(cx.background()),
2073 &mut cx,
2074 )
2075 .await
2076 }
2077 });
2078 deterministic.run_until_parked();
2079
2080 // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
2081 server.disconnect_client(client_a.current_user_id(cx_a));
2082 cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
2083 project_a
2084 .condition(cx_a, |project, _| project.collaborators().is_empty())
2085 .await;
2086 project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
2087 project_b
2088 .condition(cx_b, |project, _| project.is_read_only())
2089 .await;
2090 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
2091 cx_b.update(|_| {
2092 drop(project_b);
2093 });
2094 assert!(matches!(
2095 project_c.await.unwrap_err(),
2096 project::JoinProjectError::HostWentOffline
2097 ));
2098
2099 // Ensure guests can still join.
2100 let project_b2 = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2101 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
2102 project_b2
2103 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2104 .await
2105 .unwrap();
2106 }
2107
2108 #[gpui::test(iterations = 10)]
2109 async fn test_decline_join_request(
2110 deterministic: Arc<Deterministic>,
2111 cx_a: &mut TestAppContext,
2112 cx_b: &mut TestAppContext,
2113 ) {
2114 let lang_registry = Arc::new(LanguageRegistry::test());
2115 let fs = FakeFs::new(cx_a.background());
2116 cx_a.foreground().forbid_parking();
2117
2118 // Connect to a server as 2 clients.
2119 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2120 let client_a = server.create_client(cx_a, "user_a").await;
2121 let client_b = server.create_client(cx_b, "user_b").await;
2122 server
2123 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2124 .await;
2125
2126 // Share a project as client A
2127 fs.insert_tree("/a", json!({})).await;
2128 let project_a = cx_a.update(|cx| {
2129 Project::local(
2130 client_a.clone(),
2131 client_a.user_store.clone(),
2132 lang_registry.clone(),
2133 fs.clone(),
2134 cx,
2135 )
2136 });
2137 let project_id = project_a
2138 .read_with(cx_a, |project, _| project.next_remote_id())
2139 .await;
2140 let (worktree_a, _) = project_a
2141 .update(cx_a, |p, cx| {
2142 p.find_or_create_local_worktree("/a", true, cx)
2143 })
2144 .await
2145 .unwrap();
2146 worktree_a
2147 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2148 .await;
2149
2150 // Request to join that project as client B
2151 let project_b = cx_b.spawn(|mut cx| {
2152 let client = client_b.client.clone();
2153 let user_store = client_b.user_store.clone();
2154 let lang_registry = lang_registry.clone();
2155 async move {
2156 Project::remote(
2157 project_id,
2158 client,
2159 user_store,
2160 lang_registry.clone(),
2161 FakeFs::new(cx.background()),
2162 &mut cx,
2163 )
2164 .await
2165 }
2166 });
2167 deterministic.run_until_parked();
2168 project_a.update(cx_a, |project, cx| {
2169 project.respond_to_join_request(client_b.user_id().unwrap(), false, cx)
2170 });
2171 assert!(matches!(
2172 project_b.await.unwrap_err(),
2173 project::JoinProjectError::HostDeclined
2174 ));
2175
2176 // Request to join the project again as client B
2177 let project_b = cx_b.spawn(|mut cx| {
2178 let client = client_b.client.clone();
2179 let user_store = client_b.user_store.clone();
2180 let lang_registry = lang_registry.clone();
2181 async move {
2182 Project::remote(
2183 project_id,
2184 client,
2185 user_store,
2186 lang_registry.clone(),
2187 FakeFs::new(cx.background()),
2188 &mut cx,
2189 )
2190 .await
2191 }
2192 });
2193
2194 // Close the project on the host
2195 deterministic.run_until_parked();
2196 cx_a.update(|_| drop(project_a));
2197 deterministic.run_until_parked();
2198 assert!(matches!(
2199 project_b.await.unwrap_err(),
2200 project::JoinProjectError::HostClosedProject
2201 ));
2202 }
2203
2204 #[gpui::test(iterations = 10)]
2205 async fn test_cancel_join_request(
2206 deterministic: Arc<Deterministic>,
2207 cx_a: &mut TestAppContext,
2208 cx_b: &mut TestAppContext,
2209 ) {
2210 let lang_registry = Arc::new(LanguageRegistry::test());
2211 let fs = FakeFs::new(cx_a.background());
2212 cx_a.foreground().forbid_parking();
2213
2214 // Connect to a server as 2 clients.
2215 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2216 let client_a = server.create_client(cx_a, "user_a").await;
2217 let client_b = server.create_client(cx_b, "user_b").await;
2218 server
2219 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2220 .await;
2221
2222 // Share a project as client A
2223 fs.insert_tree("/a", json!({})).await;
2224 let project_a = cx_a.update(|cx| {
2225 Project::local(
2226 client_a.clone(),
2227 client_a.user_store.clone(),
2228 lang_registry.clone(),
2229 fs.clone(),
2230 cx,
2231 )
2232 });
2233 let project_id = project_a
2234 .read_with(cx_a, |project, _| project.next_remote_id())
2235 .await;
2236
2237 let project_a_events = Rc::new(RefCell::new(Vec::new()));
2238 let user_b = client_a
2239 .user_store
2240 .update(cx_a, |store, cx| {
2241 store.fetch_user(client_b.user_id().unwrap(), cx)
2242 })
2243 .await
2244 .unwrap();
2245 project_a.update(cx_a, {
2246 let project_a_events = project_a_events.clone();
2247 move |_, cx| {
2248 cx.subscribe(&cx.handle(), move |_, _, event, _| {
2249 project_a_events.borrow_mut().push(event.clone());
2250 })
2251 .detach();
2252 }
2253 });
2254
2255 let (worktree_a, _) = project_a
2256 .update(cx_a, |p, cx| {
2257 p.find_or_create_local_worktree("/a", true, cx)
2258 })
2259 .await
2260 .unwrap();
2261 worktree_a
2262 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2263 .await;
2264
2265 // Request to join that project as client B
2266 let project_b = cx_b.spawn(|mut cx| {
2267 let client = client_b.client.clone();
2268 let user_store = client_b.user_store.clone();
2269 let lang_registry = lang_registry.clone();
2270 async move {
2271 Project::remote(
2272 project_id,
2273 client,
2274 user_store,
2275 lang_registry.clone(),
2276 FakeFs::new(cx.background()),
2277 &mut cx,
2278 )
2279 .await
2280 }
2281 });
2282 deterministic.run_until_parked();
2283 assert_eq!(
2284 &*project_a_events.borrow(),
2285 &[project::Event::ContactRequestedJoin(user_b.clone())]
2286 );
2287 project_a_events.borrow_mut().clear();
2288
2289 // Cancel the join request by leaving the project
2290 client_b
2291 .client
2292 .send(proto::LeaveProject { project_id })
2293 .unwrap();
2294 drop(project_b);
2295
2296 deterministic.run_until_parked();
2297 assert_eq!(
2298 &*project_a_events.borrow(),
2299 &[project::Event::ContactCancelledJoinRequest(user_b.clone())]
2300 );
2301 }
2302
2303 #[gpui::test(iterations = 10)]
2304 async fn test_propagate_saves_and_fs_changes(
2305 cx_a: &mut TestAppContext,
2306 cx_b: &mut TestAppContext,
2307 cx_c: &mut TestAppContext,
2308 ) {
2309 let lang_registry = Arc::new(LanguageRegistry::test());
2310 let fs = FakeFs::new(cx_a.background());
2311 cx_a.foreground().forbid_parking();
2312
2313 // Connect to a server as 3 clients.
2314 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2315 let client_a = server.create_client(cx_a, "user_a").await;
2316 let mut client_b = server.create_client(cx_b, "user_b").await;
2317 let mut client_c = server.create_client(cx_c, "user_c").await;
2318 server
2319 .make_contacts(vec![
2320 (&client_a, cx_a),
2321 (&client_b, cx_b),
2322 (&client_c, cx_c),
2323 ])
2324 .await;
2325
2326 // Share a worktree as client A.
2327 fs.insert_tree(
2328 "/a",
2329 json!({
2330 "file1": "",
2331 "file2": ""
2332 }),
2333 )
2334 .await;
2335 let project_a = cx_a.update(|cx| {
2336 Project::local(
2337 client_a.clone(),
2338 client_a.user_store.clone(),
2339 lang_registry.clone(),
2340 fs.clone(),
2341 cx,
2342 )
2343 });
2344 let (worktree_a, _) = project_a
2345 .update(cx_a, |p, cx| {
2346 p.find_or_create_local_worktree("/a", true, cx)
2347 })
2348 .await
2349 .unwrap();
2350 worktree_a
2351 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2352 .await;
2353 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2354
2355 // Join that worktree as clients B and C.
2356 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2357 let project_c = client_c.build_remote_project(&project_a, cx_a, cx_c).await;
2358 let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2359 let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
2360
2361 // Open and edit a buffer as both guests B and C.
2362 let buffer_b = project_b
2363 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
2364 .await
2365 .unwrap();
2366 let buffer_c = project_c
2367 .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
2368 .await
2369 .unwrap();
2370 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "i-am-b, ")], cx));
2371 buffer_c.update(cx_c, |buf, cx| buf.edit([(0..0, "i-am-c, ")], cx));
2372
2373 // Open and edit that buffer as the host.
2374 let buffer_a = project_a
2375 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
2376 .await
2377 .unwrap();
2378
2379 buffer_a
2380 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
2381 .await;
2382 buffer_a.update(cx_a, |buf, cx| {
2383 buf.edit([(buf.len()..buf.len(), "i-am-a")], cx)
2384 });
2385
2386 // Wait for edits to propagate
2387 buffer_a
2388 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2389 .await;
2390 buffer_b
2391 .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2392 .await;
2393 buffer_c
2394 .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2395 .await;
2396
2397 // Edit the buffer as the host and concurrently save as guest B.
2398 let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
2399 buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "hi-a, ")], cx));
2400 save_b.await.unwrap();
2401 assert_eq!(
2402 fs.load("/a/file1".as_ref()).await.unwrap(),
2403 "hi-a, i-am-c, i-am-b, i-am-a"
2404 );
2405 buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
2406 buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
2407 buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
2408
2409 worktree_a.flush_fs_events(cx_a).await;
2410
2411 // Make changes on host's file system, see those changes on guest worktrees.
2412 fs.rename(
2413 "/a/file1".as_ref(),
2414 "/a/file1-renamed".as_ref(),
2415 Default::default(),
2416 )
2417 .await
2418 .unwrap();
2419
2420 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
2421 .await
2422 .unwrap();
2423 fs.insert_file(Path::new("/a/file4"), "4".into()).await;
2424
2425 worktree_a
2426 .condition(&cx_a, |tree, _| {
2427 tree.paths()
2428 .map(|p| p.to_string_lossy())
2429 .collect::<Vec<_>>()
2430 == ["file1-renamed", "file3", "file4"]
2431 })
2432 .await;
2433 worktree_b
2434 .condition(&cx_b, |tree, _| {
2435 tree.paths()
2436 .map(|p| p.to_string_lossy())
2437 .collect::<Vec<_>>()
2438 == ["file1-renamed", "file3", "file4"]
2439 })
2440 .await;
2441 worktree_c
2442 .condition(&cx_c, |tree, _| {
2443 tree.paths()
2444 .map(|p| p.to_string_lossy())
2445 .collect::<Vec<_>>()
2446 == ["file1-renamed", "file3", "file4"]
2447 })
2448 .await;
2449
2450 // Ensure buffer files are updated as well.
2451 buffer_a
2452 .condition(&cx_a, |buf, _| {
2453 buf.file().unwrap().path().to_str() == Some("file1-renamed")
2454 })
2455 .await;
2456 buffer_b
2457 .condition(&cx_b, |buf, _| {
2458 buf.file().unwrap().path().to_str() == Some("file1-renamed")
2459 })
2460 .await;
2461 buffer_c
2462 .condition(&cx_c, |buf, _| {
2463 buf.file().unwrap().path().to_str() == Some("file1-renamed")
2464 })
2465 .await;
2466 }
2467
2468 #[gpui::test(iterations = 10)]
2469 async fn test_fs_operations(
2470 executor: Arc<Deterministic>,
2471 cx_a: &mut TestAppContext,
2472 cx_b: &mut TestAppContext,
2473 ) {
2474 executor.forbid_parking();
2475 let fs = FakeFs::new(cx_a.background());
2476
2477 // Connect to a server as 2 clients.
2478 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2479 let mut client_a = server.create_client(cx_a, "user_a").await;
2480 let mut client_b = server.create_client(cx_b, "user_b").await;
2481 server
2482 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2483 .await;
2484
2485 // Share a project as client A
2486 fs.insert_tree(
2487 "/dir",
2488 json!({
2489 "a.txt": "a-contents",
2490 "b.txt": "b-contents",
2491 }),
2492 )
2493 .await;
2494
2495 let (project_a, worktree_id) = client_a.build_local_project(fs, "/dir", cx_a).await;
2496 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2497
2498 let worktree_a =
2499 project_a.read_with(cx_a, |project, cx| project.worktrees(cx).next().unwrap());
2500 let worktree_b =
2501 project_b.read_with(cx_b, |project, cx| project.worktrees(cx).next().unwrap());
2502
2503 let entry = project_b
2504 .update(cx_b, |project, cx| {
2505 project
2506 .create_entry((worktree_id, "c.txt"), false, cx)
2507 .unwrap()
2508 })
2509 .await
2510 .unwrap();
2511 worktree_a.read_with(cx_a, |worktree, _| {
2512 assert_eq!(
2513 worktree
2514 .paths()
2515 .map(|p| p.to_string_lossy())
2516 .collect::<Vec<_>>(),
2517 ["a.txt", "b.txt", "c.txt"]
2518 );
2519 });
2520 worktree_b.read_with(cx_b, |worktree, _| {
2521 assert_eq!(
2522 worktree
2523 .paths()
2524 .map(|p| p.to_string_lossy())
2525 .collect::<Vec<_>>(),
2526 ["a.txt", "b.txt", "c.txt"]
2527 );
2528 });
2529
2530 project_b
2531 .update(cx_b, |project, cx| {
2532 project.rename_entry(entry.id, Path::new("d.txt"), cx)
2533 })
2534 .unwrap()
2535 .await
2536 .unwrap();
2537 worktree_a.read_with(cx_a, |worktree, _| {
2538 assert_eq!(
2539 worktree
2540 .paths()
2541 .map(|p| p.to_string_lossy())
2542 .collect::<Vec<_>>(),
2543 ["a.txt", "b.txt", "d.txt"]
2544 );
2545 });
2546 worktree_b.read_with(cx_b, |worktree, _| {
2547 assert_eq!(
2548 worktree
2549 .paths()
2550 .map(|p| p.to_string_lossy())
2551 .collect::<Vec<_>>(),
2552 ["a.txt", "b.txt", "d.txt"]
2553 );
2554 });
2555
2556 let dir_entry = project_b
2557 .update(cx_b, |project, cx| {
2558 project
2559 .create_entry((worktree_id, "DIR"), true, cx)
2560 .unwrap()
2561 })
2562 .await
2563 .unwrap();
2564 worktree_a.read_with(cx_a, |worktree, _| {
2565 assert_eq!(
2566 worktree
2567 .paths()
2568 .map(|p| p.to_string_lossy())
2569 .collect::<Vec<_>>(),
2570 ["DIR", "a.txt", "b.txt", "d.txt"]
2571 );
2572 });
2573 worktree_b.read_with(cx_b, |worktree, _| {
2574 assert_eq!(
2575 worktree
2576 .paths()
2577 .map(|p| p.to_string_lossy())
2578 .collect::<Vec<_>>(),
2579 ["DIR", "a.txt", "b.txt", "d.txt"]
2580 );
2581 });
2582
2583 project_b
2584 .update(cx_b, |project, cx| {
2585 project.delete_entry(dir_entry.id, cx).unwrap()
2586 })
2587 .await
2588 .unwrap();
2589 worktree_a.read_with(cx_a, |worktree, _| {
2590 assert_eq!(
2591 worktree
2592 .paths()
2593 .map(|p| p.to_string_lossy())
2594 .collect::<Vec<_>>(),
2595 ["a.txt", "b.txt", "d.txt"]
2596 );
2597 });
2598 worktree_b.read_with(cx_b, |worktree, _| {
2599 assert_eq!(
2600 worktree
2601 .paths()
2602 .map(|p| p.to_string_lossy())
2603 .collect::<Vec<_>>(),
2604 ["a.txt", "b.txt", "d.txt"]
2605 );
2606 });
2607
2608 project_b
2609 .update(cx_b, |project, cx| {
2610 project.delete_entry(entry.id, cx).unwrap()
2611 })
2612 .await
2613 .unwrap();
2614 worktree_a.read_with(cx_a, |worktree, _| {
2615 assert_eq!(
2616 worktree
2617 .paths()
2618 .map(|p| p.to_string_lossy())
2619 .collect::<Vec<_>>(),
2620 ["a.txt", "b.txt"]
2621 );
2622 });
2623 worktree_b.read_with(cx_b, |worktree, _| {
2624 assert_eq!(
2625 worktree
2626 .paths()
2627 .map(|p| p.to_string_lossy())
2628 .collect::<Vec<_>>(),
2629 ["a.txt", "b.txt"]
2630 );
2631 });
2632 }
2633
2634 #[gpui::test(iterations = 10)]
2635 async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2636 cx_a.foreground().forbid_parking();
2637 let lang_registry = Arc::new(LanguageRegistry::test());
2638 let fs = FakeFs::new(cx_a.background());
2639
2640 // Connect to a server as 2 clients.
2641 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2642 let client_a = server.create_client(cx_a, "user_a").await;
2643 let mut client_b = server.create_client(cx_b, "user_b").await;
2644 server
2645 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2646 .await;
2647
2648 // Share a project as client A
2649 fs.insert_tree(
2650 "/dir",
2651 json!({
2652 "a.txt": "a-contents",
2653 }),
2654 )
2655 .await;
2656
2657 let project_a = cx_a.update(|cx| {
2658 Project::local(
2659 client_a.clone(),
2660 client_a.user_store.clone(),
2661 lang_registry.clone(),
2662 fs.clone(),
2663 cx,
2664 )
2665 });
2666 let (worktree_a, _) = project_a
2667 .update(cx_a, |p, cx| {
2668 p.find_or_create_local_worktree("/dir", true, cx)
2669 })
2670 .await
2671 .unwrap();
2672 worktree_a
2673 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2674 .await;
2675 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2676
2677 // Join that project as client B
2678 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2679
2680 // Open a buffer as client B
2681 let buffer_b = project_b
2682 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2683 .await
2684 .unwrap();
2685
2686 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "world ")], cx));
2687 buffer_b.read_with(cx_b, |buf, _| {
2688 assert!(buf.is_dirty());
2689 assert!(!buf.has_conflict());
2690 });
2691
2692 buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
2693 buffer_b
2694 .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
2695 .await;
2696 buffer_b.read_with(cx_b, |buf, _| {
2697 assert!(!buf.has_conflict());
2698 });
2699
2700 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "hello ")], cx));
2701 buffer_b.read_with(cx_b, |buf, _| {
2702 assert!(buf.is_dirty());
2703 assert!(!buf.has_conflict());
2704 });
2705 }
2706
2707 #[gpui::test(iterations = 10)]
2708 async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2709 cx_a.foreground().forbid_parking();
2710 let lang_registry = Arc::new(LanguageRegistry::test());
2711 let fs = FakeFs::new(cx_a.background());
2712
2713 // Connect to a server as 2 clients.
2714 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2715 let client_a = server.create_client(cx_a, "user_a").await;
2716 let mut client_b = server.create_client(cx_b, "user_b").await;
2717 server
2718 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2719 .await;
2720
2721 // Share a project as client A
2722 fs.insert_tree(
2723 "/dir",
2724 json!({
2725 "a.txt": "a-contents",
2726 }),
2727 )
2728 .await;
2729
2730 let project_a = cx_a.update(|cx| {
2731 Project::local(
2732 client_a.clone(),
2733 client_a.user_store.clone(),
2734 lang_registry.clone(),
2735 fs.clone(),
2736 cx,
2737 )
2738 });
2739 let (worktree_a, _) = project_a
2740 .update(cx_a, |p, cx| {
2741 p.find_or_create_local_worktree("/dir", true, cx)
2742 })
2743 .await
2744 .unwrap();
2745 worktree_a
2746 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2747 .await;
2748 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2749
2750 // Join that project as client B
2751 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2752 let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2753
2754 // Open a buffer as client B
2755 let buffer_b = project_b
2756 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2757 .await
2758 .unwrap();
2759 buffer_b.read_with(cx_b, |buf, _| {
2760 assert!(!buf.is_dirty());
2761 assert!(!buf.has_conflict());
2762 });
2763
2764 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
2765 .await
2766 .unwrap();
2767 buffer_b
2768 .condition(&cx_b, |buf, _| {
2769 buf.text() == "new contents" && !buf.is_dirty()
2770 })
2771 .await;
2772 buffer_b.read_with(cx_b, |buf, _| {
2773 assert!(!buf.has_conflict());
2774 });
2775 }
2776
2777 #[gpui::test(iterations = 10)]
2778 async fn test_editing_while_guest_opens_buffer(
2779 cx_a: &mut TestAppContext,
2780 cx_b: &mut TestAppContext,
2781 ) {
2782 cx_a.foreground().forbid_parking();
2783 let lang_registry = Arc::new(LanguageRegistry::test());
2784 let fs = FakeFs::new(cx_a.background());
2785
2786 // Connect to a server as 2 clients.
2787 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2788 let client_a = server.create_client(cx_a, "user_a").await;
2789 let mut client_b = server.create_client(cx_b, "user_b").await;
2790 server
2791 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2792 .await;
2793
2794 // Share a project as client A
2795 fs.insert_tree(
2796 "/dir",
2797 json!({
2798 "a.txt": "a-contents",
2799 }),
2800 )
2801 .await;
2802 let project_a = cx_a.update(|cx| {
2803 Project::local(
2804 client_a.clone(),
2805 client_a.user_store.clone(),
2806 lang_registry.clone(),
2807 fs.clone(),
2808 cx,
2809 )
2810 });
2811 let (worktree_a, _) = project_a
2812 .update(cx_a, |p, cx| {
2813 p.find_or_create_local_worktree("/dir", true, cx)
2814 })
2815 .await
2816 .unwrap();
2817 worktree_a
2818 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2819 .await;
2820 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2821
2822 // Join that project as client B
2823 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2824
2825 // Open a buffer as client A
2826 let buffer_a = project_a
2827 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2828 .await
2829 .unwrap();
2830
2831 // Start opening the same buffer as client B
2832 let buffer_b = cx_b
2833 .background()
2834 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2835
2836 // Edit the buffer as client A while client B is still opening it.
2837 cx_b.background().simulate_random_delay().await;
2838 buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "X")], cx));
2839 cx_b.background().simulate_random_delay().await;
2840 buffer_a.update(cx_a, |buf, cx| buf.edit([(1..1, "Y")], cx));
2841
2842 let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2843 let buffer_b = buffer_b.await.unwrap();
2844 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2845 }
2846
2847 #[gpui::test(iterations = 10)]
2848 async fn test_leaving_worktree_while_opening_buffer(
2849 cx_a: &mut TestAppContext,
2850 cx_b: &mut TestAppContext,
2851 ) {
2852 cx_a.foreground().forbid_parking();
2853 let lang_registry = Arc::new(LanguageRegistry::test());
2854 let fs = FakeFs::new(cx_a.background());
2855
2856 // Connect to a server as 2 clients.
2857 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2858 let client_a = server.create_client(cx_a, "user_a").await;
2859 let mut client_b = server.create_client(cx_b, "user_b").await;
2860 server
2861 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2862 .await;
2863
2864 // Share a project as client A
2865 fs.insert_tree(
2866 "/dir",
2867 json!({
2868 "a.txt": "a-contents",
2869 }),
2870 )
2871 .await;
2872 let project_a = cx_a.update(|cx| {
2873 Project::local(
2874 client_a.clone(),
2875 client_a.user_store.clone(),
2876 lang_registry.clone(),
2877 fs.clone(),
2878 cx,
2879 )
2880 });
2881 let (worktree_a, _) = project_a
2882 .update(cx_a, |p, cx| {
2883 p.find_or_create_local_worktree("/dir", true, cx)
2884 })
2885 .await
2886 .unwrap();
2887 worktree_a
2888 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2889 .await;
2890 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2891
2892 // Join that project as client B
2893 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2894
2895 // See that a guest has joined as client A.
2896 project_a
2897 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2898 .await;
2899
2900 // Begin opening a buffer as client B, but leave the project before the open completes.
2901 let buffer_b = cx_b
2902 .background()
2903 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2904 cx_b.update(|_| {
2905 drop(client_b.project.take());
2906 drop(project_b);
2907 });
2908 drop(buffer_b);
2909
2910 // See that the guest has left.
2911 project_a
2912 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2913 .await;
2914 }
2915
2916 #[gpui::test(iterations = 10)]
2917 async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2918 cx_a.foreground().forbid_parking();
2919 let lang_registry = Arc::new(LanguageRegistry::test());
2920 let fs = FakeFs::new(cx_a.background());
2921
2922 // Connect to a server as 2 clients.
2923 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2924 let client_a = server.create_client(cx_a, "user_a").await;
2925 let mut client_b = server.create_client(cx_b, "user_b").await;
2926 server
2927 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2928 .await;
2929
2930 // Share a project as client A
2931 fs.insert_tree(
2932 "/a",
2933 json!({
2934 "a.txt": "a-contents",
2935 "b.txt": "b-contents",
2936 }),
2937 )
2938 .await;
2939 let project_a = cx_a.update(|cx| {
2940 Project::local(
2941 client_a.clone(),
2942 client_a.user_store.clone(),
2943 lang_registry.clone(),
2944 fs.clone(),
2945 cx,
2946 )
2947 });
2948 let (worktree_a, _) = project_a
2949 .update(cx_a, |p, cx| {
2950 p.find_or_create_local_worktree("/a", true, cx)
2951 })
2952 .await
2953 .unwrap();
2954 worktree_a
2955 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2956 .await;
2957
2958 // Join that project as client B
2959 let _project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2960
2961 // Client A sees that a guest has joined.
2962 project_a
2963 .condition(cx_a, |p, _| p.collaborators().len() == 1)
2964 .await;
2965
2966 // Drop client B's connection and ensure client A observes client B leaving the project.
2967 client_b.disconnect(&cx_b.to_async()).unwrap();
2968 project_a
2969 .condition(cx_a, |p, _| p.collaborators().len() == 0)
2970 .await;
2971
2972 // Rejoin the project as client B
2973 let _project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2974
2975 // Client A sees that a guest has re-joined.
2976 project_a
2977 .condition(cx_a, |p, _| p.collaborators().len() == 1)
2978 .await;
2979
2980 // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2981 client_b.wait_for_current_user(cx_b).await;
2982 server.disconnect_client(client_b.current_user_id(cx_b));
2983 cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
2984 project_a
2985 .condition(cx_a, |p, _| p.collaborators().len() == 0)
2986 .await;
2987 }
2988
2989 #[gpui::test(iterations = 10)]
2990 async fn test_collaborating_with_diagnostics(
2991 deterministic: Arc<Deterministic>,
2992 cx_a: &mut TestAppContext,
2993 cx_b: &mut TestAppContext,
2994 cx_c: &mut TestAppContext,
2995 ) {
2996 deterministic.forbid_parking();
2997 let lang_registry = Arc::new(LanguageRegistry::test());
2998 let fs = FakeFs::new(cx_a.background());
2999
3000 // Set up a fake language server.
3001 let mut language = Language::new(
3002 LanguageConfig {
3003 name: "Rust".into(),
3004 path_suffixes: vec!["rs".to_string()],
3005 ..Default::default()
3006 },
3007 Some(tree_sitter_rust::language()),
3008 );
3009 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3010 lang_registry.add(Arc::new(language));
3011
3012 // Connect to a server as 2 clients.
3013 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3014 let client_a = server.create_client(cx_a, "user_a").await;
3015 let mut client_b = server.create_client(cx_b, "user_b").await;
3016 let mut client_c = server.create_client(cx_c, "user_c").await;
3017 server
3018 .make_contacts(vec![
3019 (&client_a, cx_a),
3020 (&client_b, cx_b),
3021 (&client_c, cx_c),
3022 ])
3023 .await;
3024
3025 // Share a project as client A
3026 fs.insert_tree(
3027 "/a",
3028 json!({
3029 "a.rs": "let one = two",
3030 "other.rs": "",
3031 }),
3032 )
3033 .await;
3034 let project_a = cx_a.update(|cx| {
3035 Project::local(
3036 client_a.clone(),
3037 client_a.user_store.clone(),
3038 lang_registry.clone(),
3039 fs.clone(),
3040 cx,
3041 )
3042 });
3043 let (worktree_a, _) = project_a
3044 .update(cx_a, |p, cx| {
3045 p.find_or_create_local_worktree("/a", true, cx)
3046 })
3047 .await
3048 .unwrap();
3049 worktree_a
3050 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3051 .await;
3052 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3053 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3054
3055 // Cause the language server to start.
3056 let _buffer = cx_a
3057 .background()
3058 .spawn(project_a.update(cx_a, |project, cx| {
3059 project.open_buffer(
3060 ProjectPath {
3061 worktree_id,
3062 path: Path::new("other.rs").into(),
3063 },
3064 cx,
3065 )
3066 }))
3067 .await
3068 .unwrap();
3069
3070 // Join the worktree as client B.
3071 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3072
3073 // Simulate a language server reporting errors for a file.
3074 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3075 fake_language_server
3076 .receive_notification::<lsp::notification::DidOpenTextDocument>()
3077 .await;
3078 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
3079 lsp::PublishDiagnosticsParams {
3080 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
3081 version: None,
3082 diagnostics: vec![lsp::Diagnostic {
3083 severity: Some(lsp::DiagnosticSeverity::ERROR),
3084 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
3085 message: "message 1".to_string(),
3086 ..Default::default()
3087 }],
3088 },
3089 );
3090
3091 // Wait for server to see the diagnostics update.
3092 deterministic.run_until_parked();
3093 {
3094 let store = server.store.read().await;
3095 let project = store.project(project_id).unwrap();
3096 let worktree = project.worktrees.get(&worktree_id.to_proto()).unwrap();
3097 assert!(!worktree.diagnostic_summaries.is_empty());
3098 }
3099
3100 // Ensure client B observes the new diagnostics.
3101 project_b.read_with(cx_b, |project, cx| {
3102 assert_eq!(
3103 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
3104 &[(
3105 ProjectPath {
3106 worktree_id,
3107 path: Arc::from(Path::new("a.rs")),
3108 },
3109 DiagnosticSummary {
3110 error_count: 1,
3111 warning_count: 0,
3112 ..Default::default()
3113 },
3114 )]
3115 )
3116 });
3117
3118 // Join project as client C and observe the diagnostics.
3119 let project_c = client_c.build_remote_project(&project_a, cx_a, cx_c).await;
3120 project_c.read_with(cx_c, |project, cx| {
3121 assert_eq!(
3122 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
3123 &[(
3124 ProjectPath {
3125 worktree_id,
3126 path: Arc::from(Path::new("a.rs")),
3127 },
3128 DiagnosticSummary {
3129 error_count: 1,
3130 warning_count: 0,
3131 ..Default::default()
3132 },
3133 )]
3134 )
3135 });
3136
3137 // Simulate a language server reporting more errors for a file.
3138 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
3139 lsp::PublishDiagnosticsParams {
3140 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
3141 version: None,
3142 diagnostics: vec![
3143 lsp::Diagnostic {
3144 severity: Some(lsp::DiagnosticSeverity::ERROR),
3145 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
3146 message: "message 1".to_string(),
3147 ..Default::default()
3148 },
3149 lsp::Diagnostic {
3150 severity: Some(lsp::DiagnosticSeverity::WARNING),
3151 range: lsp::Range::new(
3152 lsp::Position::new(0, 10),
3153 lsp::Position::new(0, 13),
3154 ),
3155 message: "message 2".to_string(),
3156 ..Default::default()
3157 },
3158 ],
3159 },
3160 );
3161
3162 // Clients B and C get the updated summaries
3163 deterministic.run_until_parked();
3164 project_b.read_with(cx_b, |project, cx| {
3165 assert_eq!(
3166 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
3167 [(
3168 ProjectPath {
3169 worktree_id,
3170 path: Arc::from(Path::new("a.rs")),
3171 },
3172 DiagnosticSummary {
3173 error_count: 1,
3174 warning_count: 1,
3175 ..Default::default()
3176 },
3177 )]
3178 );
3179 });
3180 project_c.read_with(cx_c, |project, cx| {
3181 assert_eq!(
3182 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
3183 [(
3184 ProjectPath {
3185 worktree_id,
3186 path: Arc::from(Path::new("a.rs")),
3187 },
3188 DiagnosticSummary {
3189 error_count: 1,
3190 warning_count: 1,
3191 ..Default::default()
3192 },
3193 )]
3194 );
3195 });
3196
3197 // Open the file with the errors on client B. They should be present.
3198 let buffer_b = cx_b
3199 .background()
3200 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3201 .await
3202 .unwrap();
3203
3204 buffer_b.read_with(cx_b, |buffer, _| {
3205 assert_eq!(
3206 buffer
3207 .snapshot()
3208 .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
3209 .map(|entry| entry)
3210 .collect::<Vec<_>>(),
3211 &[
3212 DiagnosticEntry {
3213 range: Point::new(0, 4)..Point::new(0, 7),
3214 diagnostic: Diagnostic {
3215 group_id: 0,
3216 message: "message 1".to_string(),
3217 severity: lsp::DiagnosticSeverity::ERROR,
3218 is_primary: true,
3219 ..Default::default()
3220 }
3221 },
3222 DiagnosticEntry {
3223 range: Point::new(0, 10)..Point::new(0, 13),
3224 diagnostic: Diagnostic {
3225 group_id: 1,
3226 severity: lsp::DiagnosticSeverity::WARNING,
3227 message: "message 2".to_string(),
3228 is_primary: true,
3229 ..Default::default()
3230 }
3231 }
3232 ]
3233 );
3234 });
3235
3236 // Simulate a language server reporting no errors for a file.
3237 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
3238 lsp::PublishDiagnosticsParams {
3239 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
3240 version: None,
3241 diagnostics: vec![],
3242 },
3243 );
3244 deterministic.run_until_parked();
3245 project_a.read_with(cx_a, |project, cx| {
3246 assert_eq!(project.diagnostic_summaries(cx).collect::<Vec<_>>(), [])
3247 });
3248 project_b.read_with(cx_b, |project, cx| {
3249 assert_eq!(project.diagnostic_summaries(cx).collect::<Vec<_>>(), [])
3250 });
3251 project_c.read_with(cx_c, |project, cx| {
3252 assert_eq!(project.diagnostic_summaries(cx).collect::<Vec<_>>(), [])
3253 });
3254 }
3255
3256 #[gpui::test(iterations = 10)]
3257 async fn test_collaborating_with_completion(
3258 cx_a: &mut TestAppContext,
3259 cx_b: &mut TestAppContext,
3260 ) {
3261 cx_a.foreground().forbid_parking();
3262 let lang_registry = Arc::new(LanguageRegistry::test());
3263 let fs = FakeFs::new(cx_a.background());
3264
3265 // Set up a fake language server.
3266 let mut language = Language::new(
3267 LanguageConfig {
3268 name: "Rust".into(),
3269 path_suffixes: vec!["rs".to_string()],
3270 ..Default::default()
3271 },
3272 Some(tree_sitter_rust::language()),
3273 );
3274 let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
3275 capabilities: lsp::ServerCapabilities {
3276 completion_provider: Some(lsp::CompletionOptions {
3277 trigger_characters: Some(vec![".".to_string()]),
3278 ..Default::default()
3279 }),
3280 ..Default::default()
3281 },
3282 ..Default::default()
3283 });
3284 lang_registry.add(Arc::new(language));
3285
3286 // Connect to a server as 2 clients.
3287 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3288 let client_a = server.create_client(cx_a, "user_a").await;
3289 let mut client_b = server.create_client(cx_b, "user_b").await;
3290 server
3291 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3292 .await;
3293
3294 // Share a project as client A
3295 fs.insert_tree(
3296 "/a",
3297 json!({
3298 "main.rs": "fn main() { a }",
3299 "other.rs": "",
3300 }),
3301 )
3302 .await;
3303 let project_a = cx_a.update(|cx| {
3304 Project::local(
3305 client_a.clone(),
3306 client_a.user_store.clone(),
3307 lang_registry.clone(),
3308 fs.clone(),
3309 cx,
3310 )
3311 });
3312 let (worktree_a, _) = project_a
3313 .update(cx_a, |p, cx| {
3314 p.find_or_create_local_worktree("/a", true, cx)
3315 })
3316 .await
3317 .unwrap();
3318 worktree_a
3319 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3320 .await;
3321 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3322
3323 // Join the worktree as client B.
3324 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3325
3326 // Open a file in an editor as the guest.
3327 let buffer_b = project_b
3328 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
3329 .await
3330 .unwrap();
3331 let (window_b, _) = cx_b.add_window(|_| EmptyView);
3332 let editor_b = cx_b.add_view(window_b, |cx| {
3333 Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
3334 });
3335
3336 let fake_language_server = fake_language_servers.next().await.unwrap();
3337 buffer_b
3338 .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
3339 .await;
3340
3341 // Type a completion trigger character as the guest.
3342 editor_b.update(cx_b, |editor, cx| {
3343 editor.change_selections(None, cx, |s| s.select_ranges([13..13]));
3344 editor.handle_input(&Input(".".into()), cx);
3345 cx.focus(&editor_b);
3346 });
3347
3348 // Receive a completion request as the host's language server.
3349 // Return some completions from the host's language server.
3350 cx_a.foreground().start_waiting();
3351 fake_language_server
3352 .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
3353 assert_eq!(
3354 params.text_document_position.text_document.uri,
3355 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3356 );
3357 assert_eq!(
3358 params.text_document_position.position,
3359 lsp::Position::new(0, 14),
3360 );
3361
3362 Ok(Some(lsp::CompletionResponse::Array(vec![
3363 lsp::CompletionItem {
3364 label: "first_method(…)".into(),
3365 detail: Some("fn(&mut self, B) -> C".into()),
3366 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3367 new_text: "first_method($1)".to_string(),
3368 range: lsp::Range::new(
3369 lsp::Position::new(0, 14),
3370 lsp::Position::new(0, 14),
3371 ),
3372 })),
3373 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3374 ..Default::default()
3375 },
3376 lsp::CompletionItem {
3377 label: "second_method(…)".into(),
3378 detail: Some("fn(&mut self, C) -> D<E>".into()),
3379 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3380 new_text: "second_method()".to_string(),
3381 range: lsp::Range::new(
3382 lsp::Position::new(0, 14),
3383 lsp::Position::new(0, 14),
3384 ),
3385 })),
3386 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3387 ..Default::default()
3388 },
3389 ])))
3390 })
3391 .next()
3392 .await
3393 .unwrap();
3394 cx_a.foreground().finish_waiting();
3395
3396 // Open the buffer on the host.
3397 let buffer_a = project_a
3398 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
3399 .await
3400 .unwrap();
3401 buffer_a
3402 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
3403 .await;
3404
3405 // Confirm a completion on the guest.
3406 editor_b
3407 .condition(&cx_b, |editor, _| editor.context_menu_visible())
3408 .await;
3409 editor_b.update(cx_b, |editor, cx| {
3410 editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
3411 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
3412 });
3413
3414 // Return a resolved completion from the host's language server.
3415 // The resolved completion has an additional text edit.
3416 fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
3417 |params, _| async move {
3418 assert_eq!(params.label, "first_method(…)");
3419 Ok(lsp::CompletionItem {
3420 label: "first_method(…)".into(),
3421 detail: Some("fn(&mut self, B) -> C".into()),
3422 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3423 new_text: "first_method($1)".to_string(),
3424 range: lsp::Range::new(
3425 lsp::Position::new(0, 14),
3426 lsp::Position::new(0, 14),
3427 ),
3428 })),
3429 additional_text_edits: Some(vec![lsp::TextEdit {
3430 new_text: "use d::SomeTrait;\n".to_string(),
3431 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
3432 }]),
3433 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3434 ..Default::default()
3435 })
3436 },
3437 );
3438
3439 // The additional edit is applied.
3440 buffer_a
3441 .condition(&cx_a, |buffer, _| {
3442 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3443 })
3444 .await;
3445 buffer_b
3446 .condition(&cx_b, |buffer, _| {
3447 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3448 })
3449 .await;
3450 }
3451
3452 #[gpui::test(iterations = 10)]
3453 async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3454 cx_a.foreground().forbid_parking();
3455 let lang_registry = Arc::new(LanguageRegistry::test());
3456 let fs = FakeFs::new(cx_a.background());
3457
3458 // Connect to a server as 2 clients.
3459 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3460 let client_a = server.create_client(cx_a, "user_a").await;
3461 let mut client_b = server.create_client(cx_b, "user_b").await;
3462 server
3463 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3464 .await;
3465
3466 // Share a project as client A
3467 fs.insert_tree(
3468 "/a",
3469 json!({
3470 "a.rs": "let one = 1;",
3471 }),
3472 )
3473 .await;
3474 let project_a = cx_a.update(|cx| {
3475 Project::local(
3476 client_a.clone(),
3477 client_a.user_store.clone(),
3478 lang_registry.clone(),
3479 fs.clone(),
3480 cx,
3481 )
3482 });
3483 let (worktree_a, _) = project_a
3484 .update(cx_a, |p, cx| {
3485 p.find_or_create_local_worktree("/a", true, cx)
3486 })
3487 .await
3488 .unwrap();
3489 worktree_a
3490 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3491 .await;
3492 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3493 let buffer_a = project_a
3494 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
3495 .await
3496 .unwrap();
3497
3498 // Join the worktree as client B.
3499 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3500
3501 let buffer_b = cx_b
3502 .background()
3503 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3504 .await
3505 .unwrap();
3506 buffer_b.update(cx_b, |buffer, cx| {
3507 buffer.edit([(4..7, "six")], cx);
3508 buffer.edit([(10..11, "6")], cx);
3509 assert_eq!(buffer.text(), "let six = 6;");
3510 assert!(buffer.is_dirty());
3511 assert!(!buffer.has_conflict());
3512 });
3513 buffer_a
3514 .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
3515 .await;
3516
3517 fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
3518 .await
3519 .unwrap();
3520 buffer_a
3521 .condition(cx_a, |buffer, _| buffer.has_conflict())
3522 .await;
3523 buffer_b
3524 .condition(cx_b, |buffer, _| buffer.has_conflict())
3525 .await;
3526
3527 project_b
3528 .update(cx_b, |project, cx| {
3529 project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
3530 })
3531 .await
3532 .unwrap();
3533 buffer_a.read_with(cx_a, |buffer, _| {
3534 assert_eq!(buffer.text(), "let seven = 7;");
3535 assert!(!buffer.is_dirty());
3536 assert!(!buffer.has_conflict());
3537 });
3538 buffer_b.read_with(cx_b, |buffer, _| {
3539 assert_eq!(buffer.text(), "let seven = 7;");
3540 assert!(!buffer.is_dirty());
3541 assert!(!buffer.has_conflict());
3542 });
3543
3544 buffer_a.update(cx_a, |buffer, cx| {
3545 // Undoing on the host is a no-op when the reload was initiated by the guest.
3546 buffer.undo(cx);
3547 assert_eq!(buffer.text(), "let seven = 7;");
3548 assert!(!buffer.is_dirty());
3549 assert!(!buffer.has_conflict());
3550 });
3551 buffer_b.update(cx_b, |buffer, cx| {
3552 // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
3553 buffer.undo(cx);
3554 assert_eq!(buffer.text(), "let six = 6;");
3555 assert!(buffer.is_dirty());
3556 assert!(!buffer.has_conflict());
3557 });
3558 }
3559
3560 #[gpui::test(iterations = 10)]
3561 async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3562 cx_a.foreground().forbid_parking();
3563 let lang_registry = Arc::new(LanguageRegistry::test());
3564 let fs = FakeFs::new(cx_a.background());
3565
3566 // Set up a fake language server.
3567 let mut language = Language::new(
3568 LanguageConfig {
3569 name: "Rust".into(),
3570 path_suffixes: vec!["rs".to_string()],
3571 ..Default::default()
3572 },
3573 Some(tree_sitter_rust::language()),
3574 );
3575 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3576 lang_registry.add(Arc::new(language));
3577
3578 // Connect to a server as 2 clients.
3579 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3580 let client_a = server.create_client(cx_a, "user_a").await;
3581 let mut client_b = server.create_client(cx_b, "user_b").await;
3582 server
3583 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3584 .await;
3585
3586 // Share a project as client A
3587 fs.insert_tree(
3588 "/a",
3589 json!({
3590 "a.rs": "let one = two",
3591 }),
3592 )
3593 .await;
3594 let project_a = cx_a.update(|cx| {
3595 Project::local(
3596 client_a.clone(),
3597 client_a.user_store.clone(),
3598 lang_registry.clone(),
3599 fs.clone(),
3600 cx,
3601 )
3602 });
3603 let (worktree_a, _) = project_a
3604 .update(cx_a, |p, cx| {
3605 p.find_or_create_local_worktree("/a", true, cx)
3606 })
3607 .await
3608 .unwrap();
3609 worktree_a
3610 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3611 .await;
3612 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3613
3614 // Join the project as client B.
3615 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3616
3617 let buffer_b = cx_b
3618 .background()
3619 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3620 .await
3621 .unwrap();
3622
3623 let fake_language_server = fake_language_servers.next().await.unwrap();
3624 fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
3625 Ok(Some(vec![
3626 lsp::TextEdit {
3627 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
3628 new_text: "h".to_string(),
3629 },
3630 lsp::TextEdit {
3631 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
3632 new_text: "y".to_string(),
3633 },
3634 ]))
3635 });
3636
3637 project_b
3638 .update(cx_b, |project, cx| {
3639 project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
3640 })
3641 .await
3642 .unwrap();
3643 assert_eq!(
3644 buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
3645 "let honey = two"
3646 );
3647 }
3648
3649 #[gpui::test(iterations = 10)]
3650 async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3651 cx_a.foreground().forbid_parking();
3652 let lang_registry = Arc::new(LanguageRegistry::test());
3653 let fs = FakeFs::new(cx_a.background());
3654 fs.insert_tree(
3655 "/root-1",
3656 json!({
3657 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
3658 }),
3659 )
3660 .await;
3661 fs.insert_tree(
3662 "/root-2",
3663 json!({
3664 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
3665 }),
3666 )
3667 .await;
3668
3669 // Set up a fake language server.
3670 let mut language = Language::new(
3671 LanguageConfig {
3672 name: "Rust".into(),
3673 path_suffixes: vec!["rs".to_string()],
3674 ..Default::default()
3675 },
3676 Some(tree_sitter_rust::language()),
3677 );
3678 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3679 lang_registry.add(Arc::new(language));
3680
3681 // Connect to a server as 2 clients.
3682 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3683 let client_a = server.create_client(cx_a, "user_a").await;
3684 let mut client_b = server.create_client(cx_b, "user_b").await;
3685 server
3686 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3687 .await;
3688
3689 // Share a project as client A
3690 let project_a = cx_a.update(|cx| {
3691 Project::local(
3692 client_a.clone(),
3693 client_a.user_store.clone(),
3694 lang_registry.clone(),
3695 fs.clone(),
3696 cx,
3697 )
3698 });
3699 let (worktree_a, _) = project_a
3700 .update(cx_a, |p, cx| {
3701 p.find_or_create_local_worktree("/root-1", true, cx)
3702 })
3703 .await
3704 .unwrap();
3705 worktree_a
3706 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3707 .await;
3708 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3709
3710 // Join the project as client B.
3711 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3712
3713 // Open the file on client B.
3714 let buffer_b = cx_b
3715 .background()
3716 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3717 .await
3718 .unwrap();
3719
3720 // Request the definition of a symbol as the guest.
3721 let fake_language_server = fake_language_servers.next().await.unwrap();
3722 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3723 |_, _| async move {
3724 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3725 lsp::Location::new(
3726 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3727 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3728 ),
3729 )))
3730 },
3731 );
3732
3733 let definitions_1 = project_b
3734 .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
3735 .await
3736 .unwrap();
3737 cx_b.read(|cx| {
3738 assert_eq!(definitions_1.len(), 1);
3739 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3740 let target_buffer = definitions_1[0].buffer.read(cx);
3741 assert_eq!(
3742 target_buffer.text(),
3743 "const TWO: usize = 2;\nconst THREE: usize = 3;"
3744 );
3745 assert_eq!(
3746 definitions_1[0].range.to_point(target_buffer),
3747 Point::new(0, 6)..Point::new(0, 9)
3748 );
3749 });
3750
3751 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
3752 // the previous call to `definition`.
3753 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3754 |_, _| async move {
3755 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3756 lsp::Location::new(
3757 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3758 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
3759 ),
3760 )))
3761 },
3762 );
3763
3764 let definitions_2 = project_b
3765 .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
3766 .await
3767 .unwrap();
3768 cx_b.read(|cx| {
3769 assert_eq!(definitions_2.len(), 1);
3770 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3771 let target_buffer = definitions_2[0].buffer.read(cx);
3772 assert_eq!(
3773 target_buffer.text(),
3774 "const TWO: usize = 2;\nconst THREE: usize = 3;"
3775 );
3776 assert_eq!(
3777 definitions_2[0].range.to_point(target_buffer),
3778 Point::new(1, 6)..Point::new(1, 11)
3779 );
3780 });
3781 assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
3782 }
3783
3784 #[gpui::test(iterations = 10)]
3785 async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3786 cx_a.foreground().forbid_parking();
3787 let lang_registry = Arc::new(LanguageRegistry::test());
3788 let fs = FakeFs::new(cx_a.background());
3789 fs.insert_tree(
3790 "/root-1",
3791 json!({
3792 "one.rs": "const ONE: usize = 1;",
3793 "two.rs": "const TWO: usize = one::ONE + one::ONE;",
3794 }),
3795 )
3796 .await;
3797 fs.insert_tree(
3798 "/root-2",
3799 json!({
3800 "three.rs": "const THREE: usize = two::TWO + one::ONE;",
3801 }),
3802 )
3803 .await;
3804
3805 // Set up a fake language server.
3806 let mut language = Language::new(
3807 LanguageConfig {
3808 name: "Rust".into(),
3809 path_suffixes: vec!["rs".to_string()],
3810 ..Default::default()
3811 },
3812 Some(tree_sitter_rust::language()),
3813 );
3814 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3815 lang_registry.add(Arc::new(language));
3816
3817 // Connect to a server as 2 clients.
3818 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3819 let client_a = server.create_client(cx_a, "user_a").await;
3820 let mut client_b = server.create_client(cx_b, "user_b").await;
3821 server
3822 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3823 .await;
3824
3825 // Share a project as client A
3826 let project_a = cx_a.update(|cx| {
3827 Project::local(
3828 client_a.clone(),
3829 client_a.user_store.clone(),
3830 lang_registry.clone(),
3831 fs.clone(),
3832 cx,
3833 )
3834 });
3835 let (worktree_a, _) = project_a
3836 .update(cx_a, |p, cx| {
3837 p.find_or_create_local_worktree("/root-1", true, cx)
3838 })
3839 .await
3840 .unwrap();
3841 worktree_a
3842 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3843 .await;
3844 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3845
3846 // Join the worktree as client B.
3847 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3848
3849 // Open the file on client B.
3850 let buffer_b = cx_b
3851 .background()
3852 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3853 .await
3854 .unwrap();
3855
3856 // Request references to a symbol as the guest.
3857 let fake_language_server = fake_language_servers.next().await.unwrap();
3858 fake_language_server.handle_request::<lsp::request::References, _, _>(
3859 |params, _| async move {
3860 assert_eq!(
3861 params.text_document_position.text_document.uri.as_str(),
3862 "file:///root-1/one.rs"
3863 );
3864 Ok(Some(vec![
3865 lsp::Location {
3866 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3867 range: lsp::Range::new(
3868 lsp::Position::new(0, 24),
3869 lsp::Position::new(0, 27),
3870 ),
3871 },
3872 lsp::Location {
3873 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3874 range: lsp::Range::new(
3875 lsp::Position::new(0, 35),
3876 lsp::Position::new(0, 38),
3877 ),
3878 },
3879 lsp::Location {
3880 uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3881 range: lsp::Range::new(
3882 lsp::Position::new(0, 37),
3883 lsp::Position::new(0, 40),
3884 ),
3885 },
3886 ]))
3887 },
3888 );
3889
3890 let references = project_b
3891 .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3892 .await
3893 .unwrap();
3894 cx_b.read(|cx| {
3895 assert_eq!(references.len(), 3);
3896 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3897
3898 let two_buffer = references[0].buffer.read(cx);
3899 let three_buffer = references[2].buffer.read(cx);
3900 assert_eq!(
3901 two_buffer.file().unwrap().path().as_ref(),
3902 Path::new("two.rs")
3903 );
3904 assert_eq!(references[1].buffer, references[0].buffer);
3905 assert_eq!(
3906 three_buffer.file().unwrap().full_path(cx),
3907 Path::new("three.rs")
3908 );
3909
3910 assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3911 assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3912 assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3913 });
3914 }
3915
3916 #[gpui::test(iterations = 10)]
3917 async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3918 cx_a.foreground().forbid_parking();
3919 let lang_registry = Arc::new(LanguageRegistry::test());
3920 let fs = FakeFs::new(cx_a.background());
3921 fs.insert_tree(
3922 "/root-1",
3923 json!({
3924 "a": "hello world",
3925 "b": "goodnight moon",
3926 "c": "a world of goo",
3927 "d": "world champion of clown world",
3928 }),
3929 )
3930 .await;
3931 fs.insert_tree(
3932 "/root-2",
3933 json!({
3934 "e": "disney world is fun",
3935 }),
3936 )
3937 .await;
3938
3939 // Connect to a server as 2 clients.
3940 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3941 let client_a = server.create_client(cx_a, "user_a").await;
3942 let mut client_b = server.create_client(cx_b, "user_b").await;
3943 server
3944 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3945 .await;
3946
3947 // Share a project as client A
3948 let project_a = cx_a.update(|cx| {
3949 Project::local(
3950 client_a.clone(),
3951 client_a.user_store.clone(),
3952 lang_registry.clone(),
3953 fs.clone(),
3954 cx,
3955 )
3956 });
3957
3958 let (worktree_1, _) = project_a
3959 .update(cx_a, |p, cx| {
3960 p.find_or_create_local_worktree("/root-1", true, cx)
3961 })
3962 .await
3963 .unwrap();
3964 worktree_1
3965 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3966 .await;
3967 let (worktree_2, _) = project_a
3968 .update(cx_a, |p, cx| {
3969 p.find_or_create_local_worktree("/root-2", true, cx)
3970 })
3971 .await
3972 .unwrap();
3973 worktree_2
3974 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3975 .await;
3976
3977 // Join the worktree as client B.
3978 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3979 let results = project_b
3980 .update(cx_b, |project, cx| {
3981 project.search(SearchQuery::text("world", false, false), cx)
3982 })
3983 .await
3984 .unwrap();
3985
3986 let mut ranges_by_path = results
3987 .into_iter()
3988 .map(|(buffer, ranges)| {
3989 buffer.read_with(cx_b, |buffer, cx| {
3990 let path = buffer.file().unwrap().full_path(cx);
3991 let offset_ranges = ranges
3992 .into_iter()
3993 .map(|range| range.to_offset(buffer))
3994 .collect::<Vec<_>>();
3995 (path, offset_ranges)
3996 })
3997 })
3998 .collect::<Vec<_>>();
3999 ranges_by_path.sort_by_key(|(path, _)| path.clone());
4000
4001 assert_eq!(
4002 ranges_by_path,
4003 &[
4004 (PathBuf::from("root-1/a"), vec![6..11]),
4005 (PathBuf::from("root-1/c"), vec![2..7]),
4006 (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
4007 (PathBuf::from("root-2/e"), vec![7..12]),
4008 ]
4009 );
4010 }
4011
4012 #[gpui::test(iterations = 10)]
4013 async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4014 cx_a.foreground().forbid_parking();
4015 let lang_registry = Arc::new(LanguageRegistry::test());
4016 let fs = FakeFs::new(cx_a.background());
4017 fs.insert_tree(
4018 "/root-1",
4019 json!({
4020 "main.rs": "fn double(number: i32) -> i32 { number + number }",
4021 }),
4022 )
4023 .await;
4024
4025 // Set up a fake language server.
4026 let mut language = Language::new(
4027 LanguageConfig {
4028 name: "Rust".into(),
4029 path_suffixes: vec!["rs".to_string()],
4030 ..Default::default()
4031 },
4032 Some(tree_sitter_rust::language()),
4033 );
4034 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4035 lang_registry.add(Arc::new(language));
4036
4037 // Connect to a server as 2 clients.
4038 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4039 let client_a = server.create_client(cx_a, "user_a").await;
4040 let mut client_b = server.create_client(cx_b, "user_b").await;
4041 server
4042 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4043 .await;
4044
4045 // Share a project as client A
4046 let project_a = cx_a.update(|cx| {
4047 Project::local(
4048 client_a.clone(),
4049 client_a.user_store.clone(),
4050 lang_registry.clone(),
4051 fs.clone(),
4052 cx,
4053 )
4054 });
4055 let (worktree_a, _) = project_a
4056 .update(cx_a, |p, cx| {
4057 p.find_or_create_local_worktree("/root-1", true, cx)
4058 })
4059 .await
4060 .unwrap();
4061 worktree_a
4062 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4063 .await;
4064 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4065
4066 // Join the worktree as client B.
4067 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4068
4069 // Open the file on client B.
4070 let buffer_b = cx_b
4071 .background()
4072 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
4073 .await
4074 .unwrap();
4075
4076 // Request document highlights as the guest.
4077 let fake_language_server = fake_language_servers.next().await.unwrap();
4078 fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
4079 |params, _| async move {
4080 assert_eq!(
4081 params
4082 .text_document_position_params
4083 .text_document
4084 .uri
4085 .as_str(),
4086 "file:///root-1/main.rs"
4087 );
4088 assert_eq!(
4089 params.text_document_position_params.position,
4090 lsp::Position::new(0, 34)
4091 );
4092 Ok(Some(vec![
4093 lsp::DocumentHighlight {
4094 kind: Some(lsp::DocumentHighlightKind::WRITE),
4095 range: lsp::Range::new(
4096 lsp::Position::new(0, 10),
4097 lsp::Position::new(0, 16),
4098 ),
4099 },
4100 lsp::DocumentHighlight {
4101 kind: Some(lsp::DocumentHighlightKind::READ),
4102 range: lsp::Range::new(
4103 lsp::Position::new(0, 32),
4104 lsp::Position::new(0, 38),
4105 ),
4106 },
4107 lsp::DocumentHighlight {
4108 kind: Some(lsp::DocumentHighlightKind::READ),
4109 range: lsp::Range::new(
4110 lsp::Position::new(0, 41),
4111 lsp::Position::new(0, 47),
4112 ),
4113 },
4114 ]))
4115 },
4116 );
4117
4118 let highlights = project_b
4119 .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
4120 .await
4121 .unwrap();
4122 buffer_b.read_with(cx_b, |buffer, _| {
4123 let snapshot = buffer.snapshot();
4124
4125 let highlights = highlights
4126 .into_iter()
4127 .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
4128 .collect::<Vec<_>>();
4129 assert_eq!(
4130 highlights,
4131 &[
4132 (lsp::DocumentHighlightKind::WRITE, 10..16),
4133 (lsp::DocumentHighlightKind::READ, 32..38),
4134 (lsp::DocumentHighlightKind::READ, 41..47)
4135 ]
4136 )
4137 });
4138 }
4139
4140 #[gpui::test(iterations = 10)]
4141 async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4142 cx_a.foreground().forbid_parking();
4143 let lang_registry = Arc::new(LanguageRegistry::test());
4144 let fs = FakeFs::new(cx_a.background());
4145 fs.insert_tree(
4146 "/code",
4147 json!({
4148 "crate-1": {
4149 "one.rs": "const ONE: usize = 1;",
4150 },
4151 "crate-2": {
4152 "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
4153 },
4154 "private": {
4155 "passwords.txt": "the-password",
4156 }
4157 }),
4158 )
4159 .await;
4160
4161 // Set up a fake language server.
4162 let mut language = Language::new(
4163 LanguageConfig {
4164 name: "Rust".into(),
4165 path_suffixes: vec!["rs".to_string()],
4166 ..Default::default()
4167 },
4168 Some(tree_sitter_rust::language()),
4169 );
4170 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4171 lang_registry.add(Arc::new(language));
4172
4173 // Connect to a server as 2 clients.
4174 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4175 let client_a = server.create_client(cx_a, "user_a").await;
4176 let mut client_b = server.create_client(cx_b, "user_b").await;
4177 server
4178 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4179 .await;
4180
4181 // Share a project as client A
4182 let project_a = cx_a.update(|cx| {
4183 Project::local(
4184 client_a.clone(),
4185 client_a.user_store.clone(),
4186 lang_registry.clone(),
4187 fs.clone(),
4188 cx,
4189 )
4190 });
4191 let (worktree_a, _) = project_a
4192 .update(cx_a, |p, cx| {
4193 p.find_or_create_local_worktree("/code/crate-1", true, cx)
4194 })
4195 .await
4196 .unwrap();
4197 worktree_a
4198 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4199 .await;
4200 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4201
4202 // Join the worktree as client B.
4203 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4204
4205 // Cause the language server to start.
4206 let _buffer = cx_b
4207 .background()
4208 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
4209 .await
4210 .unwrap();
4211
4212 let fake_language_server = fake_language_servers.next().await.unwrap();
4213 fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
4214 |_, _| async move {
4215 #[allow(deprecated)]
4216 Ok(Some(vec![lsp::SymbolInformation {
4217 name: "TWO".into(),
4218 location: lsp::Location {
4219 uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
4220 range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
4221 },
4222 kind: lsp::SymbolKind::CONSTANT,
4223 tags: None,
4224 container_name: None,
4225 deprecated: None,
4226 }]))
4227 },
4228 );
4229
4230 // Request the definition of a symbol as the guest.
4231 let symbols = project_b
4232 .update(cx_b, |p, cx| p.symbols("two", cx))
4233 .await
4234 .unwrap();
4235 assert_eq!(symbols.len(), 1);
4236 assert_eq!(symbols[0].name, "TWO");
4237
4238 // Open one of the returned symbols.
4239 let buffer_b_2 = project_b
4240 .update(cx_b, |project, cx| {
4241 project.open_buffer_for_symbol(&symbols[0], cx)
4242 })
4243 .await
4244 .unwrap();
4245 buffer_b_2.read_with(cx_b, |buffer, _| {
4246 assert_eq!(
4247 buffer.file().unwrap().path().as_ref(),
4248 Path::new("../crate-2/two.rs")
4249 );
4250 });
4251
4252 // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
4253 let mut fake_symbol = symbols[0].clone();
4254 fake_symbol.path = Path::new("/code/secrets").into();
4255 let error = project_b
4256 .update(cx_b, |project, cx| {
4257 project.open_buffer_for_symbol(&fake_symbol, cx)
4258 })
4259 .await
4260 .unwrap_err();
4261 assert!(error.to_string().contains("invalid symbol signature"));
4262 }
4263
4264 #[gpui::test(iterations = 10)]
4265 async fn test_open_buffer_while_getting_definition_pointing_to_it(
4266 cx_a: &mut TestAppContext,
4267 cx_b: &mut TestAppContext,
4268 mut rng: StdRng,
4269 ) {
4270 cx_a.foreground().forbid_parking();
4271 let lang_registry = Arc::new(LanguageRegistry::test());
4272 let fs = FakeFs::new(cx_a.background());
4273 fs.insert_tree(
4274 "/root",
4275 json!({
4276 "a.rs": "const ONE: usize = b::TWO;",
4277 "b.rs": "const TWO: usize = 2",
4278 }),
4279 )
4280 .await;
4281
4282 // Set up a fake language server.
4283 let mut language = Language::new(
4284 LanguageConfig {
4285 name: "Rust".into(),
4286 path_suffixes: vec!["rs".to_string()],
4287 ..Default::default()
4288 },
4289 Some(tree_sitter_rust::language()),
4290 );
4291 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4292 lang_registry.add(Arc::new(language));
4293
4294 // Connect to a server as 2 clients.
4295 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4296 let client_a = server.create_client(cx_a, "user_a").await;
4297 let mut client_b = server.create_client(cx_b, "user_b").await;
4298 server
4299 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4300 .await;
4301
4302 // Share a project as client A
4303 let project_a = cx_a.update(|cx| {
4304 Project::local(
4305 client_a.clone(),
4306 client_a.user_store.clone(),
4307 lang_registry.clone(),
4308 fs.clone(),
4309 cx,
4310 )
4311 });
4312
4313 let (worktree_a, _) = project_a
4314 .update(cx_a, |p, cx| {
4315 p.find_or_create_local_worktree("/root", true, cx)
4316 })
4317 .await
4318 .unwrap();
4319 worktree_a
4320 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4321 .await;
4322 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4323
4324 // Join the project as client B.
4325 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4326
4327 let buffer_b1 = cx_b
4328 .background()
4329 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
4330 .await
4331 .unwrap();
4332
4333 let fake_language_server = fake_language_servers.next().await.unwrap();
4334 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
4335 |_, _| async move {
4336 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
4337 lsp::Location::new(
4338 lsp::Url::from_file_path("/root/b.rs").unwrap(),
4339 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
4340 ),
4341 )))
4342 },
4343 );
4344
4345 let definitions;
4346 let buffer_b2;
4347 if rng.gen() {
4348 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4349 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4350 } else {
4351 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4352 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4353 }
4354
4355 let buffer_b2 = buffer_b2.await.unwrap();
4356 let definitions = definitions.await.unwrap();
4357 assert_eq!(definitions.len(), 1);
4358 assert_eq!(definitions[0].buffer, buffer_b2);
4359 }
4360
4361 #[gpui::test(iterations = 10)]
4362 async fn test_collaborating_with_code_actions(
4363 cx_a: &mut TestAppContext,
4364 cx_b: &mut TestAppContext,
4365 ) {
4366 cx_a.foreground().forbid_parking();
4367 let lang_registry = Arc::new(LanguageRegistry::test());
4368 let fs = FakeFs::new(cx_a.background());
4369 cx_b.update(|cx| editor::init(cx));
4370
4371 // Set up a fake language server.
4372 let mut language = Language::new(
4373 LanguageConfig {
4374 name: "Rust".into(),
4375 path_suffixes: vec!["rs".to_string()],
4376 ..Default::default()
4377 },
4378 Some(tree_sitter_rust::language()),
4379 );
4380 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4381 lang_registry.add(Arc::new(language));
4382
4383 // Connect to a server as 2 clients.
4384 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4385 let client_a = server.create_client(cx_a, "user_a").await;
4386 let mut client_b = server.create_client(cx_b, "user_b").await;
4387 server
4388 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4389 .await;
4390
4391 // Share a project as client A
4392 fs.insert_tree(
4393 "/a",
4394 json!({
4395 "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
4396 "other.rs": "pub fn foo() -> usize { 4 }",
4397 }),
4398 )
4399 .await;
4400 let project_a = cx_a.update(|cx| {
4401 Project::local(
4402 client_a.clone(),
4403 client_a.user_store.clone(),
4404 lang_registry.clone(),
4405 fs.clone(),
4406 cx,
4407 )
4408 });
4409 let (worktree_a, _) = project_a
4410 .update(cx_a, |p, cx| {
4411 p.find_or_create_local_worktree("/a", true, cx)
4412 })
4413 .await
4414 .unwrap();
4415 worktree_a
4416 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4417 .await;
4418 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4419
4420 // Join the project as client B.
4421 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4422 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(project_b.clone(), cx));
4423 let editor_b = workspace_b
4424 .update(cx_b, |workspace, cx| {
4425 workspace.open_path((worktree_id, "main.rs"), true, cx)
4426 })
4427 .await
4428 .unwrap()
4429 .downcast::<Editor>()
4430 .unwrap();
4431
4432 let mut fake_language_server = fake_language_servers.next().await.unwrap();
4433 fake_language_server
4434 .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4435 assert_eq!(
4436 params.text_document.uri,
4437 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4438 );
4439 assert_eq!(params.range.start, lsp::Position::new(0, 0));
4440 assert_eq!(params.range.end, lsp::Position::new(0, 0));
4441 Ok(None)
4442 })
4443 .next()
4444 .await;
4445
4446 // Move cursor to a location that contains code actions.
4447 editor_b.update(cx_b, |editor, cx| {
4448 editor.change_selections(None, cx, |s| {
4449 s.select_ranges([Point::new(1, 31)..Point::new(1, 31)])
4450 });
4451 cx.focus(&editor_b);
4452 });
4453
4454 fake_language_server
4455 .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4456 assert_eq!(
4457 params.text_document.uri,
4458 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4459 );
4460 assert_eq!(params.range.start, lsp::Position::new(1, 31));
4461 assert_eq!(params.range.end, lsp::Position::new(1, 31));
4462
4463 Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
4464 lsp::CodeAction {
4465 title: "Inline into all callers".to_string(),
4466 edit: Some(lsp::WorkspaceEdit {
4467 changes: Some(
4468 [
4469 (
4470 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4471 vec![lsp::TextEdit::new(
4472 lsp::Range::new(
4473 lsp::Position::new(1, 22),
4474 lsp::Position::new(1, 34),
4475 ),
4476 "4".to_string(),
4477 )],
4478 ),
4479 (
4480 lsp::Url::from_file_path("/a/other.rs").unwrap(),
4481 vec![lsp::TextEdit::new(
4482 lsp::Range::new(
4483 lsp::Position::new(0, 0),
4484 lsp::Position::new(0, 27),
4485 ),
4486 "".to_string(),
4487 )],
4488 ),
4489 ]
4490 .into_iter()
4491 .collect(),
4492 ),
4493 ..Default::default()
4494 }),
4495 data: Some(json!({
4496 "codeActionParams": {
4497 "range": {
4498 "start": {"line": 1, "column": 31},
4499 "end": {"line": 1, "column": 31},
4500 }
4501 }
4502 })),
4503 ..Default::default()
4504 },
4505 )]))
4506 })
4507 .next()
4508 .await;
4509
4510 // Toggle code actions and wait for them to display.
4511 editor_b.update(cx_b, |editor, cx| {
4512 editor.toggle_code_actions(
4513 &ToggleCodeActions {
4514 deployed_from_indicator: false,
4515 },
4516 cx,
4517 );
4518 });
4519 editor_b
4520 .condition(&cx_b, |editor, _| editor.context_menu_visible())
4521 .await;
4522
4523 fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
4524
4525 // Confirming the code action will trigger a resolve request.
4526 let confirm_action = workspace_b
4527 .update(cx_b, |workspace, cx| {
4528 Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
4529 })
4530 .unwrap();
4531 fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
4532 |_, _| async move {
4533 Ok(lsp::CodeAction {
4534 title: "Inline into all callers".to_string(),
4535 edit: Some(lsp::WorkspaceEdit {
4536 changes: Some(
4537 [
4538 (
4539 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4540 vec![lsp::TextEdit::new(
4541 lsp::Range::new(
4542 lsp::Position::new(1, 22),
4543 lsp::Position::new(1, 34),
4544 ),
4545 "4".to_string(),
4546 )],
4547 ),
4548 (
4549 lsp::Url::from_file_path("/a/other.rs").unwrap(),
4550 vec![lsp::TextEdit::new(
4551 lsp::Range::new(
4552 lsp::Position::new(0, 0),
4553 lsp::Position::new(0, 27),
4554 ),
4555 "".to_string(),
4556 )],
4557 ),
4558 ]
4559 .into_iter()
4560 .collect(),
4561 ),
4562 ..Default::default()
4563 }),
4564 ..Default::default()
4565 })
4566 },
4567 );
4568
4569 // After the action is confirmed, an editor containing both modified files is opened.
4570 confirm_action.await.unwrap();
4571 let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4572 workspace
4573 .active_item(cx)
4574 .unwrap()
4575 .downcast::<Editor>()
4576 .unwrap()
4577 });
4578 code_action_editor.update(cx_b, |editor, cx| {
4579 assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4580 editor.undo(&Undo, cx);
4581 assert_eq!(
4582 editor.text(cx),
4583 "mod other;\nfn main() { let foo = other::foo(); }\npub fn foo() -> usize { 4 }"
4584 );
4585 editor.redo(&Redo, cx);
4586 assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4587 });
4588 }
4589
4590 #[gpui::test(iterations = 10)]
4591 async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4592 cx_a.foreground().forbid_parking();
4593 let lang_registry = Arc::new(LanguageRegistry::test());
4594 let fs = FakeFs::new(cx_a.background());
4595 cx_b.update(|cx| editor::init(cx));
4596
4597 // Set up a fake language server.
4598 let mut language = Language::new(
4599 LanguageConfig {
4600 name: "Rust".into(),
4601 path_suffixes: vec!["rs".to_string()],
4602 ..Default::default()
4603 },
4604 Some(tree_sitter_rust::language()),
4605 );
4606 let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
4607 capabilities: lsp::ServerCapabilities {
4608 rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
4609 prepare_provider: Some(true),
4610 work_done_progress_options: Default::default(),
4611 })),
4612 ..Default::default()
4613 },
4614 ..Default::default()
4615 });
4616 lang_registry.add(Arc::new(language));
4617
4618 // Connect to a server as 2 clients.
4619 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4620 let client_a = server.create_client(cx_a, "user_a").await;
4621 let mut client_b = server.create_client(cx_b, "user_b").await;
4622 server
4623 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4624 .await;
4625
4626 // Share a project as client A
4627 fs.insert_tree(
4628 "/dir",
4629 json!({
4630 "one.rs": "const ONE: usize = 1;",
4631 "two.rs": "const TWO: usize = one::ONE + one::ONE;"
4632 }),
4633 )
4634 .await;
4635 let project_a = cx_a.update(|cx| {
4636 Project::local(
4637 client_a.clone(),
4638 client_a.user_store.clone(),
4639 lang_registry.clone(),
4640 fs.clone(),
4641 cx,
4642 )
4643 });
4644 let (worktree_a, _) = project_a
4645 .update(cx_a, |p, cx| {
4646 p.find_or_create_local_worktree("/dir", true, cx)
4647 })
4648 .await
4649 .unwrap();
4650 worktree_a
4651 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4652 .await;
4653 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4654
4655 // Join the worktree as client B.
4656 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4657 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(project_b.clone(), cx));
4658 let editor_b = workspace_b
4659 .update(cx_b, |workspace, cx| {
4660 workspace.open_path((worktree_id, "one.rs"), true, cx)
4661 })
4662 .await
4663 .unwrap()
4664 .downcast::<Editor>()
4665 .unwrap();
4666 let fake_language_server = fake_language_servers.next().await.unwrap();
4667
4668 // Move cursor to a location that can be renamed.
4669 let prepare_rename = editor_b.update(cx_b, |editor, cx| {
4670 editor.change_selections(None, cx, |s| s.select_ranges([7..7]));
4671 editor.rename(&Rename, cx).unwrap()
4672 });
4673
4674 fake_language_server
4675 .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
4676 assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
4677 assert_eq!(params.position, lsp::Position::new(0, 7));
4678 Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4679 lsp::Position::new(0, 6),
4680 lsp::Position::new(0, 9),
4681 ))))
4682 })
4683 .next()
4684 .await
4685 .unwrap();
4686 prepare_rename.await.unwrap();
4687 editor_b.update(cx_b, |editor, cx| {
4688 let rename = editor.pending_rename().unwrap();
4689 let buffer = editor.buffer().read(cx).snapshot(cx);
4690 assert_eq!(
4691 rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
4692 6..9
4693 );
4694 rename.editor.update(cx, |rename_editor, cx| {
4695 rename_editor.buffer().update(cx, |rename_buffer, cx| {
4696 rename_buffer.edit([(0..3, "THREE")], cx);
4697 });
4698 });
4699 });
4700
4701 let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
4702 Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
4703 });
4704 fake_language_server
4705 .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
4706 assert_eq!(
4707 params.text_document_position.text_document.uri.as_str(),
4708 "file:///dir/one.rs"
4709 );
4710 assert_eq!(
4711 params.text_document_position.position,
4712 lsp::Position::new(0, 6)
4713 );
4714 assert_eq!(params.new_name, "THREE");
4715 Ok(Some(lsp::WorkspaceEdit {
4716 changes: Some(
4717 [
4718 (
4719 lsp::Url::from_file_path("/dir/one.rs").unwrap(),
4720 vec![lsp::TextEdit::new(
4721 lsp::Range::new(
4722 lsp::Position::new(0, 6),
4723 lsp::Position::new(0, 9),
4724 ),
4725 "THREE".to_string(),
4726 )],
4727 ),
4728 (
4729 lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4730 vec![
4731 lsp::TextEdit::new(
4732 lsp::Range::new(
4733 lsp::Position::new(0, 24),
4734 lsp::Position::new(0, 27),
4735 ),
4736 "THREE".to_string(),
4737 ),
4738 lsp::TextEdit::new(
4739 lsp::Range::new(
4740 lsp::Position::new(0, 35),
4741 lsp::Position::new(0, 38),
4742 ),
4743 "THREE".to_string(),
4744 ),
4745 ],
4746 ),
4747 ]
4748 .into_iter()
4749 .collect(),
4750 ),
4751 ..Default::default()
4752 }))
4753 })
4754 .next()
4755 .await
4756 .unwrap();
4757 confirm_rename.await.unwrap();
4758
4759 let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4760 workspace
4761 .active_item(cx)
4762 .unwrap()
4763 .downcast::<Editor>()
4764 .unwrap()
4765 });
4766 rename_editor.update(cx_b, |editor, cx| {
4767 assert_eq!(
4768 editor.text(cx),
4769 "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4770 );
4771 editor.undo(&Undo, cx);
4772 assert_eq!(
4773 editor.text(cx),
4774 "const ONE: usize = 1;\nconst TWO: usize = one::ONE + one::ONE;"
4775 );
4776 editor.redo(&Redo, cx);
4777 assert_eq!(
4778 editor.text(cx),
4779 "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4780 );
4781 });
4782
4783 // Ensure temporary rename edits cannot be undone/redone.
4784 editor_b.update(cx_b, |editor, cx| {
4785 editor.undo(&Undo, cx);
4786 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4787 editor.undo(&Undo, cx);
4788 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4789 editor.redo(&Redo, cx);
4790 assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4791 })
4792 }
4793
4794 #[gpui::test(iterations = 10)]
4795 async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4796 cx_a.foreground().forbid_parking();
4797
4798 // Connect to a server as 2 clients.
4799 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4800 let client_a = server.create_client(cx_a, "user_a").await;
4801 let client_b = server.create_client(cx_b, "user_b").await;
4802
4803 // Create an org that includes these 2 users.
4804 let db = &server.app_state.db;
4805 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4806 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4807 .await
4808 .unwrap();
4809 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4810 .await
4811 .unwrap();
4812
4813 // Create a channel that includes all the users.
4814 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4815 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4816 .await
4817 .unwrap();
4818 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4819 .await
4820 .unwrap();
4821 db.create_channel_message(
4822 channel_id,
4823 client_b.current_user_id(&cx_b),
4824 "hello A, it's B.",
4825 OffsetDateTime::now_utc(),
4826 1,
4827 )
4828 .await
4829 .unwrap();
4830
4831 let channels_a = cx_a
4832 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4833 channels_a
4834 .condition(cx_a, |list, _| list.available_channels().is_some())
4835 .await;
4836 channels_a.read_with(cx_a, |list, _| {
4837 assert_eq!(
4838 list.available_channels().unwrap(),
4839 &[ChannelDetails {
4840 id: channel_id.to_proto(),
4841 name: "test-channel".to_string()
4842 }]
4843 )
4844 });
4845 let channel_a = channels_a.update(cx_a, |this, cx| {
4846 this.get_channel(channel_id.to_proto(), cx).unwrap()
4847 });
4848 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4849 channel_a
4850 .condition(&cx_a, |channel, _| {
4851 channel_messages(channel)
4852 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4853 })
4854 .await;
4855
4856 let channels_b = cx_b
4857 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4858 channels_b
4859 .condition(cx_b, |list, _| list.available_channels().is_some())
4860 .await;
4861 channels_b.read_with(cx_b, |list, _| {
4862 assert_eq!(
4863 list.available_channels().unwrap(),
4864 &[ChannelDetails {
4865 id: channel_id.to_proto(),
4866 name: "test-channel".to_string()
4867 }]
4868 )
4869 });
4870
4871 let channel_b = channels_b.update(cx_b, |this, cx| {
4872 this.get_channel(channel_id.to_proto(), cx).unwrap()
4873 });
4874 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4875 channel_b
4876 .condition(&cx_b, |channel, _| {
4877 channel_messages(channel)
4878 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4879 })
4880 .await;
4881
4882 channel_a
4883 .update(cx_a, |channel, cx| {
4884 channel
4885 .send_message("oh, hi B.".to_string(), cx)
4886 .unwrap()
4887 .detach();
4888 let task = channel.send_message("sup".to_string(), cx).unwrap();
4889 assert_eq!(
4890 channel_messages(channel),
4891 &[
4892 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4893 ("user_a".to_string(), "oh, hi B.".to_string(), true),
4894 ("user_a".to_string(), "sup".to_string(), true)
4895 ]
4896 );
4897 task
4898 })
4899 .await
4900 .unwrap();
4901
4902 channel_b
4903 .condition(&cx_b, |channel, _| {
4904 channel_messages(channel)
4905 == [
4906 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4907 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4908 ("user_a".to_string(), "sup".to_string(), false),
4909 ]
4910 })
4911 .await;
4912
4913 assert_eq!(
4914 server
4915 .state()
4916 .await
4917 .channel(channel_id)
4918 .unwrap()
4919 .connection_ids
4920 .len(),
4921 2
4922 );
4923 cx_b.update(|_| drop(channel_b));
4924 server
4925 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4926 .await;
4927
4928 cx_a.update(|_| drop(channel_a));
4929 server
4930 .condition(|state| state.channel(channel_id).is_none())
4931 .await;
4932 }
4933
4934 #[gpui::test(iterations = 10)]
4935 async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4936 cx_a.foreground().forbid_parking();
4937
4938 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4939 let client_a = server.create_client(cx_a, "user_a").await;
4940
4941 let db = &server.app_state.db;
4942 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4943 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4944 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4945 .await
4946 .unwrap();
4947 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4948 .await
4949 .unwrap();
4950
4951 let channels_a = cx_a
4952 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4953 channels_a
4954 .condition(cx_a, |list, _| list.available_channels().is_some())
4955 .await;
4956 let channel_a = channels_a.update(cx_a, |this, cx| {
4957 this.get_channel(channel_id.to_proto(), cx).unwrap()
4958 });
4959
4960 // Messages aren't allowed to be too long.
4961 channel_a
4962 .update(cx_a, |channel, cx| {
4963 let long_body = "this is long.\n".repeat(1024);
4964 channel.send_message(long_body, cx).unwrap()
4965 })
4966 .await
4967 .unwrap_err();
4968
4969 // Messages aren't allowed to be blank.
4970 channel_a.update(cx_a, |channel, cx| {
4971 channel.send_message(String::new(), cx).unwrap_err()
4972 });
4973
4974 // Leading and trailing whitespace are trimmed.
4975 channel_a
4976 .update(cx_a, |channel, cx| {
4977 channel
4978 .send_message("\n surrounded by whitespace \n".to_string(), cx)
4979 .unwrap()
4980 })
4981 .await
4982 .unwrap();
4983 assert_eq!(
4984 db.get_channel_messages(channel_id, 10, None)
4985 .await
4986 .unwrap()
4987 .iter()
4988 .map(|m| &m.body)
4989 .collect::<Vec<_>>(),
4990 &["surrounded by whitespace"]
4991 );
4992 }
4993
4994 #[gpui::test(iterations = 10)]
4995 async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4996 cx_a.foreground().forbid_parking();
4997
4998 // Connect to a server as 2 clients.
4999 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5000 let client_a = server.create_client(cx_a, "user_a").await;
5001 let client_b = server.create_client(cx_b, "user_b").await;
5002 let mut status_b = client_b.status();
5003
5004 // Create an org that includes these 2 users.
5005 let db = &server.app_state.db;
5006 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
5007 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
5008 .await
5009 .unwrap();
5010 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
5011 .await
5012 .unwrap();
5013
5014 // Create a channel that includes all the users.
5015 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
5016 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
5017 .await
5018 .unwrap();
5019 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
5020 .await
5021 .unwrap();
5022 db.create_channel_message(
5023 channel_id,
5024 client_b.current_user_id(&cx_b),
5025 "hello A, it's B.",
5026 OffsetDateTime::now_utc(),
5027 2,
5028 )
5029 .await
5030 .unwrap();
5031
5032 let channels_a = cx_a
5033 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
5034 channels_a
5035 .condition(cx_a, |list, _| list.available_channels().is_some())
5036 .await;
5037
5038 channels_a.read_with(cx_a, |list, _| {
5039 assert_eq!(
5040 list.available_channels().unwrap(),
5041 &[ChannelDetails {
5042 id: channel_id.to_proto(),
5043 name: "test-channel".to_string()
5044 }]
5045 )
5046 });
5047 let channel_a = channels_a.update(cx_a, |this, cx| {
5048 this.get_channel(channel_id.to_proto(), cx).unwrap()
5049 });
5050 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
5051 channel_a
5052 .condition(&cx_a, |channel, _| {
5053 channel_messages(channel)
5054 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
5055 })
5056 .await;
5057
5058 let channels_b = cx_b
5059 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
5060 channels_b
5061 .condition(cx_b, |list, _| list.available_channels().is_some())
5062 .await;
5063 channels_b.read_with(cx_b, |list, _| {
5064 assert_eq!(
5065 list.available_channels().unwrap(),
5066 &[ChannelDetails {
5067 id: channel_id.to_proto(),
5068 name: "test-channel".to_string()
5069 }]
5070 )
5071 });
5072
5073 let channel_b = channels_b.update(cx_b, |this, cx| {
5074 this.get_channel(channel_id.to_proto(), cx).unwrap()
5075 });
5076 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
5077 channel_b
5078 .condition(&cx_b, |channel, _| {
5079 channel_messages(channel)
5080 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
5081 })
5082 .await;
5083
5084 // Disconnect client B, ensuring we can still access its cached channel data.
5085 server.forbid_connections();
5086 server.disconnect_client(client_b.current_user_id(&cx_b));
5087 cx_b.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
5088 while !matches!(
5089 status_b.next().await,
5090 Some(client::Status::ReconnectionError { .. })
5091 ) {}
5092
5093 channels_b.read_with(cx_b, |channels, _| {
5094 assert_eq!(
5095 channels.available_channels().unwrap(),
5096 [ChannelDetails {
5097 id: channel_id.to_proto(),
5098 name: "test-channel".to_string()
5099 }]
5100 )
5101 });
5102 channel_b.read_with(cx_b, |channel, _| {
5103 assert_eq!(
5104 channel_messages(channel),
5105 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
5106 )
5107 });
5108
5109 // Send a message from client B while it is disconnected.
5110 channel_b
5111 .update(cx_b, |channel, cx| {
5112 let task = channel
5113 .send_message("can you see this?".to_string(), cx)
5114 .unwrap();
5115 assert_eq!(
5116 channel_messages(channel),
5117 &[
5118 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5119 ("user_b".to_string(), "can you see this?".to_string(), true)
5120 ]
5121 );
5122 task
5123 })
5124 .await
5125 .unwrap_err();
5126
5127 // Send a message from client A while B is disconnected.
5128 channel_a
5129 .update(cx_a, |channel, cx| {
5130 channel
5131 .send_message("oh, hi B.".to_string(), cx)
5132 .unwrap()
5133 .detach();
5134 let task = channel.send_message("sup".to_string(), cx).unwrap();
5135 assert_eq!(
5136 channel_messages(channel),
5137 &[
5138 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5139 ("user_a".to_string(), "oh, hi B.".to_string(), true),
5140 ("user_a".to_string(), "sup".to_string(), true)
5141 ]
5142 );
5143 task
5144 })
5145 .await
5146 .unwrap();
5147
5148 // Give client B a chance to reconnect.
5149 server.allow_connections();
5150 cx_b.foreground().advance_clock(Duration::from_secs(10));
5151
5152 // Verify that B sees the new messages upon reconnection, as well as the message client B
5153 // sent while offline.
5154 channel_b
5155 .condition(&cx_b, |channel, _| {
5156 channel_messages(channel)
5157 == [
5158 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5159 ("user_a".to_string(), "oh, hi B.".to_string(), false),
5160 ("user_a".to_string(), "sup".to_string(), false),
5161 ("user_b".to_string(), "can you see this?".to_string(), false),
5162 ]
5163 })
5164 .await;
5165
5166 // Ensure client A and B can communicate normally after reconnection.
5167 channel_a
5168 .update(cx_a, |channel, cx| {
5169 channel.send_message("you online?".to_string(), cx).unwrap()
5170 })
5171 .await
5172 .unwrap();
5173 channel_b
5174 .condition(&cx_b, |channel, _| {
5175 channel_messages(channel)
5176 == [
5177 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5178 ("user_a".to_string(), "oh, hi B.".to_string(), false),
5179 ("user_a".to_string(), "sup".to_string(), false),
5180 ("user_b".to_string(), "can you see this?".to_string(), false),
5181 ("user_a".to_string(), "you online?".to_string(), false),
5182 ]
5183 })
5184 .await;
5185
5186 channel_b
5187 .update(cx_b, |channel, cx| {
5188 channel.send_message("yep".to_string(), cx).unwrap()
5189 })
5190 .await
5191 .unwrap();
5192 channel_a
5193 .condition(&cx_a, |channel, _| {
5194 channel_messages(channel)
5195 == [
5196 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5197 ("user_a".to_string(), "oh, hi B.".to_string(), false),
5198 ("user_a".to_string(), "sup".to_string(), false),
5199 ("user_b".to_string(), "can you see this?".to_string(), false),
5200 ("user_a".to_string(), "you online?".to_string(), false),
5201 ("user_b".to_string(), "yep".to_string(), false),
5202 ]
5203 })
5204 .await;
5205 }
5206
5207 #[gpui::test(iterations = 10)]
5208 async fn test_contacts(
5209 deterministic: Arc<Deterministic>,
5210 cx_a: &mut TestAppContext,
5211 cx_b: &mut TestAppContext,
5212 cx_c: &mut TestAppContext,
5213 ) {
5214 cx_a.foreground().forbid_parking();
5215
5216 // Connect to a server as 3 clients.
5217 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5218 let mut client_a = server.create_client(cx_a, "user_a").await;
5219 let mut client_b = server.create_client(cx_b, "user_b").await;
5220 let client_c = server.create_client(cx_c, "user_c").await;
5221 server
5222 .make_contacts(vec![
5223 (&client_a, cx_a),
5224 (&client_b, cx_b),
5225 (&client_c, cx_c),
5226 ])
5227 .await;
5228
5229 deterministic.run_until_parked();
5230 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5231 client.user_store.read_with(*cx, |store, _| {
5232 assert_eq!(
5233 contacts(store),
5234 [
5235 ("user_a", true, vec![]),
5236 ("user_b", true, vec![]),
5237 ("user_c", true, vec![])
5238 ],
5239 "{} has the wrong contacts",
5240 client.username
5241 )
5242 });
5243 }
5244
5245 // Share a project as client A.
5246 let fs = FakeFs::new(cx_a.background());
5247 fs.create_dir(Path::new("/a")).await.unwrap();
5248 let (project_a, _) = client_a.build_local_project(fs, "/a", cx_a).await;
5249
5250 deterministic.run_until_parked();
5251 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5252 client.user_store.read_with(*cx, |store, _| {
5253 assert_eq!(
5254 contacts(store),
5255 [
5256 ("user_a", true, vec![("a", vec![])]),
5257 ("user_b", true, vec![]),
5258 ("user_c", true, vec![])
5259 ],
5260 "{} has the wrong contacts",
5261 client.username
5262 )
5263 });
5264 }
5265
5266 let _project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5267
5268 deterministic.run_until_parked();
5269 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5270 client.user_store.read_with(*cx, |store, _| {
5271 assert_eq!(
5272 contacts(store),
5273 [
5274 ("user_a", true, vec![("a", vec!["user_b"])]),
5275 ("user_b", true, vec![]),
5276 ("user_c", true, vec![])
5277 ],
5278 "{} has the wrong contacts",
5279 client.username
5280 )
5281 });
5282 }
5283
5284 // Add a local project as client B
5285 let fs = FakeFs::new(cx_b.background());
5286 fs.create_dir(Path::new("/b")).await.unwrap();
5287 let (_project_b, _) = client_b.build_local_project(fs, "/b", cx_a).await;
5288
5289 deterministic.run_until_parked();
5290 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5291 client.user_store.read_with(*cx, |store, _| {
5292 assert_eq!(
5293 contacts(store),
5294 [
5295 ("user_a", true, vec![("a", vec!["user_b"])]),
5296 ("user_b", true, vec![("b", vec![])]),
5297 ("user_c", true, vec![])
5298 ],
5299 "{} has the wrong contacts",
5300 client.username
5301 )
5302 });
5303 }
5304
5305 project_a
5306 .condition(&cx_a, |project, _| {
5307 project.collaborators().contains_key(&client_b.peer_id)
5308 })
5309 .await;
5310
5311 client_a.project.take();
5312 cx_a.update(move |_| drop(project_a));
5313 deterministic.run_until_parked();
5314 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5315 client.user_store.read_with(*cx, |store, _| {
5316 assert_eq!(
5317 contacts(store),
5318 [
5319 ("user_a", true, vec![]),
5320 ("user_b", true, vec![("b", vec![])]),
5321 ("user_c", true, vec![])
5322 ],
5323 "{} has the wrong contacts",
5324 client.username
5325 )
5326 });
5327 }
5328
5329 server.disconnect_client(client_c.current_user_id(cx_c));
5330 server.forbid_connections();
5331 deterministic.advance_clock(rpc::RECEIVE_TIMEOUT);
5332 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b)] {
5333 client.user_store.read_with(*cx, |store, _| {
5334 assert_eq!(
5335 contacts(store),
5336 [
5337 ("user_a", true, vec![]),
5338 ("user_b", true, vec![("b", vec![])]),
5339 ("user_c", false, vec![])
5340 ],
5341 "{} has the wrong contacts",
5342 client.username
5343 )
5344 });
5345 }
5346 client_c
5347 .user_store
5348 .read_with(cx_c, |store, _| assert_eq!(contacts(store), []));
5349
5350 server.allow_connections();
5351 client_c
5352 .authenticate_and_connect(false, &cx_c.to_async())
5353 .await
5354 .unwrap();
5355
5356 deterministic.run_until_parked();
5357 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5358 client.user_store.read_with(*cx, |store, _| {
5359 assert_eq!(
5360 contacts(store),
5361 [
5362 ("user_a", true, vec![]),
5363 ("user_b", true, vec![("b", vec![])]),
5364 ("user_c", true, vec![])
5365 ],
5366 "{} has the wrong contacts",
5367 client.username
5368 )
5369 });
5370 }
5371
5372 fn contacts(user_store: &UserStore) -> Vec<(&str, bool, Vec<(&str, Vec<&str>)>)> {
5373 user_store
5374 .contacts()
5375 .iter()
5376 .map(|contact| {
5377 let projects = contact
5378 .projects
5379 .iter()
5380 .map(|p| {
5381 (
5382 p.worktree_root_names[0].as_str(),
5383 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
5384 )
5385 })
5386 .collect();
5387 (contact.user.github_login.as_str(), contact.online, projects)
5388 })
5389 .collect()
5390 }
5391 }
5392
5393 #[gpui::test(iterations = 10)]
5394 async fn test_contact_requests(
5395 executor: Arc<Deterministic>,
5396 cx_a: &mut TestAppContext,
5397 cx_a2: &mut TestAppContext,
5398 cx_b: &mut TestAppContext,
5399 cx_b2: &mut TestAppContext,
5400 cx_c: &mut TestAppContext,
5401 cx_c2: &mut TestAppContext,
5402 ) {
5403 cx_a.foreground().forbid_parking();
5404
5405 // Connect to a server as 3 clients.
5406 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5407 let client_a = server.create_client(cx_a, "user_a").await;
5408 let client_a2 = server.create_client(cx_a2, "user_a").await;
5409 let client_b = server.create_client(cx_b, "user_b").await;
5410 let client_b2 = server.create_client(cx_b2, "user_b").await;
5411 let client_c = server.create_client(cx_c, "user_c").await;
5412 let client_c2 = server.create_client(cx_c2, "user_c").await;
5413
5414 assert_eq!(client_a.user_id().unwrap(), client_a2.user_id().unwrap());
5415 assert_eq!(client_b.user_id().unwrap(), client_b2.user_id().unwrap());
5416 assert_eq!(client_c.user_id().unwrap(), client_c2.user_id().unwrap());
5417
5418 // User A and User C request that user B become their contact.
5419 client_a
5420 .user_store
5421 .update(cx_a, |store, cx| {
5422 store.request_contact(client_b.user_id().unwrap(), cx)
5423 })
5424 .await
5425 .unwrap();
5426 client_c
5427 .user_store
5428 .update(cx_c, |store, cx| {
5429 store.request_contact(client_b.user_id().unwrap(), cx)
5430 })
5431 .await
5432 .unwrap();
5433 executor.run_until_parked();
5434
5435 // All users see the pending request appear in all their clients.
5436 assert_eq!(
5437 client_a.summarize_contacts(&cx_a).outgoing_requests,
5438 &["user_b"]
5439 );
5440 assert_eq!(
5441 client_a2.summarize_contacts(&cx_a2).outgoing_requests,
5442 &["user_b"]
5443 );
5444 assert_eq!(
5445 client_b.summarize_contacts(&cx_b).incoming_requests,
5446 &["user_a", "user_c"]
5447 );
5448 assert_eq!(
5449 client_b2.summarize_contacts(&cx_b2).incoming_requests,
5450 &["user_a", "user_c"]
5451 );
5452 assert_eq!(
5453 client_c.summarize_contacts(&cx_c).outgoing_requests,
5454 &["user_b"]
5455 );
5456 assert_eq!(
5457 client_c2.summarize_contacts(&cx_c2).outgoing_requests,
5458 &["user_b"]
5459 );
5460
5461 // Contact requests are present upon connecting (tested here via disconnect/reconnect)
5462 disconnect_and_reconnect(&client_a, cx_a).await;
5463 disconnect_and_reconnect(&client_b, cx_b).await;
5464 disconnect_and_reconnect(&client_c, cx_c).await;
5465 executor.run_until_parked();
5466 assert_eq!(
5467 client_a.summarize_contacts(&cx_a).outgoing_requests,
5468 &["user_b"]
5469 );
5470 assert_eq!(
5471 client_b.summarize_contacts(&cx_b).incoming_requests,
5472 &["user_a", "user_c"]
5473 );
5474 assert_eq!(
5475 client_c.summarize_contacts(&cx_c).outgoing_requests,
5476 &["user_b"]
5477 );
5478
5479 // User B accepts the request from user A.
5480 client_b
5481 .user_store
5482 .update(cx_b, |store, cx| {
5483 store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
5484 })
5485 .await
5486 .unwrap();
5487
5488 executor.run_until_parked();
5489
5490 // User B sees user A as their contact now in all client, and the incoming request from them is removed.
5491 let contacts_b = client_b.summarize_contacts(&cx_b);
5492 assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5493 assert_eq!(contacts_b.incoming_requests, &["user_c"]);
5494 let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5495 assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5496 assert_eq!(contacts_b2.incoming_requests, &["user_c"]);
5497
5498 // User A sees user B as their contact now in all clients, and the outgoing request to them is removed.
5499 let contacts_a = client_a.summarize_contacts(&cx_a);
5500 assert_eq!(contacts_a.current, &["user_a", "user_b"]);
5501 assert!(contacts_a.outgoing_requests.is_empty());
5502 let contacts_a2 = client_a2.summarize_contacts(&cx_a2);
5503 assert_eq!(contacts_a2.current, &["user_a", "user_b"]);
5504 assert!(contacts_a2.outgoing_requests.is_empty());
5505
5506 // Contacts are present upon connecting (tested here via disconnect/reconnect)
5507 disconnect_and_reconnect(&client_a, cx_a).await;
5508 disconnect_and_reconnect(&client_b, cx_b).await;
5509 disconnect_and_reconnect(&client_c, cx_c).await;
5510 executor.run_until_parked();
5511 assert_eq!(
5512 client_a.summarize_contacts(&cx_a).current,
5513 &["user_a", "user_b"]
5514 );
5515 assert_eq!(
5516 client_b.summarize_contacts(&cx_b).current,
5517 &["user_a", "user_b"]
5518 );
5519 assert_eq!(
5520 client_b.summarize_contacts(&cx_b).incoming_requests,
5521 &["user_c"]
5522 );
5523 assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5524 assert_eq!(
5525 client_c.summarize_contacts(&cx_c).outgoing_requests,
5526 &["user_b"]
5527 );
5528
5529 // User B rejects the request from user C.
5530 client_b
5531 .user_store
5532 .update(cx_b, |store, cx| {
5533 store.respond_to_contact_request(client_c.user_id().unwrap(), false, cx)
5534 })
5535 .await
5536 .unwrap();
5537
5538 executor.run_until_parked();
5539
5540 // User B doesn't see user C as their contact, and the incoming request from them is removed.
5541 let contacts_b = client_b.summarize_contacts(&cx_b);
5542 assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5543 assert!(contacts_b.incoming_requests.is_empty());
5544 let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5545 assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5546 assert!(contacts_b2.incoming_requests.is_empty());
5547
5548 // User C doesn't see user B as their contact, and the outgoing request to them is removed.
5549 let contacts_c = client_c.summarize_contacts(&cx_c);
5550 assert_eq!(contacts_c.current, &["user_c"]);
5551 assert!(contacts_c.outgoing_requests.is_empty());
5552 let contacts_c2 = client_c2.summarize_contacts(&cx_c2);
5553 assert_eq!(contacts_c2.current, &["user_c"]);
5554 assert!(contacts_c2.outgoing_requests.is_empty());
5555
5556 // Incoming/outgoing requests are not present upon connecting (tested here via disconnect/reconnect)
5557 disconnect_and_reconnect(&client_a, cx_a).await;
5558 disconnect_and_reconnect(&client_b, cx_b).await;
5559 disconnect_and_reconnect(&client_c, cx_c).await;
5560 executor.run_until_parked();
5561 assert_eq!(
5562 client_a.summarize_contacts(&cx_a).current,
5563 &["user_a", "user_b"]
5564 );
5565 assert_eq!(
5566 client_b.summarize_contacts(&cx_b).current,
5567 &["user_a", "user_b"]
5568 );
5569 assert!(client_b
5570 .summarize_contacts(&cx_b)
5571 .incoming_requests
5572 .is_empty());
5573 assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5574 assert!(client_c
5575 .summarize_contacts(&cx_c)
5576 .outgoing_requests
5577 .is_empty());
5578
5579 async fn disconnect_and_reconnect(client: &TestClient, cx: &mut TestAppContext) {
5580 client.disconnect(&cx.to_async()).unwrap();
5581 client.clear_contacts(cx).await;
5582 client
5583 .authenticate_and_connect(false, &cx.to_async())
5584 .await
5585 .unwrap();
5586 }
5587 }
5588
5589 #[gpui::test(iterations = 10)]
5590 async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5591 cx_a.foreground().forbid_parking();
5592 let fs = FakeFs::new(cx_a.background());
5593
5594 // 2 clients connect to a server.
5595 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5596 let mut client_a = server.create_client(cx_a, "user_a").await;
5597 let mut client_b = server.create_client(cx_b, "user_b").await;
5598 server
5599 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5600 .await;
5601 cx_a.update(editor::init);
5602 cx_b.update(editor::init);
5603
5604 // Client A shares a project.
5605 fs.insert_tree(
5606 "/a",
5607 json!({
5608 "1.txt": "one",
5609 "2.txt": "two",
5610 "3.txt": "three",
5611 }),
5612 )
5613 .await;
5614 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5615
5616 // Client B joins the project.
5617 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5618
5619 // Client A opens some editors.
5620 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5621 let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5622 let editor_a1 = workspace_a
5623 .update(cx_a, |workspace, cx| {
5624 workspace.open_path((worktree_id, "1.txt"), true, cx)
5625 })
5626 .await
5627 .unwrap()
5628 .downcast::<Editor>()
5629 .unwrap();
5630 let editor_a2 = workspace_a
5631 .update(cx_a, |workspace, cx| {
5632 workspace.open_path((worktree_id, "2.txt"), true, cx)
5633 })
5634 .await
5635 .unwrap()
5636 .downcast::<Editor>()
5637 .unwrap();
5638
5639 // Client B opens an editor.
5640 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5641 let editor_b1 = workspace_b
5642 .update(cx_b, |workspace, cx| {
5643 workspace.open_path((worktree_id, "1.txt"), true, cx)
5644 })
5645 .await
5646 .unwrap()
5647 .downcast::<Editor>()
5648 .unwrap();
5649
5650 let client_a_id = project_b.read_with(cx_b, |project, _| {
5651 project.collaborators().values().next().unwrap().peer_id
5652 });
5653 let client_b_id = project_a.read_with(cx_a, |project, _| {
5654 project.collaborators().values().next().unwrap().peer_id
5655 });
5656
5657 // When client B starts following client A, all visible view states are replicated to client B.
5658 editor_a1.update(cx_a, |editor, cx| {
5659 editor.change_selections(None, cx, |s| s.select_ranges([0..1]))
5660 });
5661 editor_a2.update(cx_a, |editor, cx| {
5662 editor.change_selections(None, cx, |s| s.select_ranges([2..3]))
5663 });
5664 workspace_b
5665 .update(cx_b, |workspace, cx| {
5666 workspace
5667 .toggle_follow(&ToggleFollow(client_a_id), cx)
5668 .unwrap()
5669 })
5670 .await
5671 .unwrap();
5672
5673 let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5674 workspace
5675 .active_item(cx)
5676 .unwrap()
5677 .downcast::<Editor>()
5678 .unwrap()
5679 });
5680 assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
5681 assert_eq!(
5682 editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
5683 Some((worktree_id, "2.txt").into())
5684 );
5685 assert_eq!(
5686 editor_b2.read_with(cx_b, |editor, cx| editor.selections.ranges(cx)),
5687 vec![2..3]
5688 );
5689 assert_eq!(
5690 editor_b1.read_with(cx_b, |editor, cx| editor.selections.ranges(cx)),
5691 vec![0..1]
5692 );
5693
5694 // When client A activates a different editor, client B does so as well.
5695 workspace_a.update(cx_a, |workspace, cx| {
5696 workspace.activate_item(&editor_a1, cx)
5697 });
5698 workspace_b
5699 .condition(cx_b, |workspace, cx| {
5700 workspace.active_item(cx).unwrap().id() == editor_b1.id()
5701 })
5702 .await;
5703
5704 // When client A navigates back and forth, client B does so as well.
5705 workspace_a
5706 .update(cx_a, |workspace, cx| {
5707 workspace::Pane::go_back(workspace, None, cx)
5708 })
5709 .await;
5710 workspace_b
5711 .condition(cx_b, |workspace, cx| {
5712 workspace.active_item(cx).unwrap().id() == editor_b2.id()
5713 })
5714 .await;
5715
5716 workspace_a
5717 .update(cx_a, |workspace, cx| {
5718 workspace::Pane::go_forward(workspace, None, cx)
5719 })
5720 .await;
5721 workspace_b
5722 .condition(cx_b, |workspace, cx| {
5723 workspace.active_item(cx).unwrap().id() == editor_b1.id()
5724 })
5725 .await;
5726
5727 // Changes to client A's editor are reflected on client B.
5728 editor_a1.update(cx_a, |editor, cx| {
5729 editor.change_selections(None, cx, |s| s.select_ranges([1..1, 2..2]));
5730 });
5731 editor_b1
5732 .condition(cx_b, |editor, cx| {
5733 editor.selections.ranges(cx) == vec![1..1, 2..2]
5734 })
5735 .await;
5736
5737 editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
5738 editor_b1
5739 .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
5740 .await;
5741
5742 editor_a1.update(cx_a, |editor, cx| {
5743 editor.change_selections(None, cx, |s| s.select_ranges([3..3]));
5744 editor.set_scroll_position(vec2f(0., 100.), cx);
5745 });
5746 editor_b1
5747 .condition(cx_b, |editor, cx| {
5748 editor.selections.ranges(cx) == vec![3..3]
5749 })
5750 .await;
5751
5752 // After unfollowing, client B stops receiving updates from client A.
5753 workspace_b.update(cx_b, |workspace, cx| {
5754 workspace.unfollow(&workspace.active_pane().clone(), cx)
5755 });
5756 workspace_a.update(cx_a, |workspace, cx| {
5757 workspace.activate_item(&editor_a2, cx)
5758 });
5759 cx_a.foreground().run_until_parked();
5760 assert_eq!(
5761 workspace_b.read_with(cx_b, |workspace, cx| workspace
5762 .active_item(cx)
5763 .unwrap()
5764 .id()),
5765 editor_b1.id()
5766 );
5767
5768 // Client A starts following client B.
5769 workspace_a
5770 .update(cx_a, |workspace, cx| {
5771 workspace
5772 .toggle_follow(&ToggleFollow(client_b_id), cx)
5773 .unwrap()
5774 })
5775 .await
5776 .unwrap();
5777 assert_eq!(
5778 workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5779 Some(client_b_id)
5780 );
5781 assert_eq!(
5782 workspace_a.read_with(cx_a, |workspace, cx| workspace
5783 .active_item(cx)
5784 .unwrap()
5785 .id()),
5786 editor_a1.id()
5787 );
5788
5789 // Following interrupts when client B disconnects.
5790 client_b.disconnect(&cx_b.to_async()).unwrap();
5791 cx_a.foreground().run_until_parked();
5792 assert_eq!(
5793 workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5794 None
5795 );
5796 }
5797
5798 #[gpui::test(iterations = 10)]
5799 async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5800 cx_a.foreground().forbid_parking();
5801 let fs = FakeFs::new(cx_a.background());
5802
5803 // 2 clients connect to a server.
5804 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5805 let mut client_a = server.create_client(cx_a, "user_a").await;
5806 let mut client_b = server.create_client(cx_b, "user_b").await;
5807 server
5808 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5809 .await;
5810 cx_a.update(editor::init);
5811 cx_b.update(editor::init);
5812
5813 // Client A shares a project.
5814 fs.insert_tree(
5815 "/a",
5816 json!({
5817 "1.txt": "one",
5818 "2.txt": "two",
5819 "3.txt": "three",
5820 "4.txt": "four",
5821 }),
5822 )
5823 .await;
5824 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5825
5826 // Client B joins the project.
5827 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5828
5829 // Client A opens some editors.
5830 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5831 let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5832 let _editor_a1 = workspace_a
5833 .update(cx_a, |workspace, cx| {
5834 workspace.open_path((worktree_id, "1.txt"), true, cx)
5835 })
5836 .await
5837 .unwrap()
5838 .downcast::<Editor>()
5839 .unwrap();
5840
5841 // Client B opens an editor.
5842 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5843 let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5844 let _editor_b1 = workspace_b
5845 .update(cx_b, |workspace, cx| {
5846 workspace.open_path((worktree_id, "2.txt"), true, cx)
5847 })
5848 .await
5849 .unwrap()
5850 .downcast::<Editor>()
5851 .unwrap();
5852
5853 // Clients A and B follow each other in split panes
5854 workspace_a
5855 .update(cx_a, |workspace, cx| {
5856 workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5857 assert_ne!(*workspace.active_pane(), pane_a1);
5858 let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
5859 workspace
5860 .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5861 .unwrap()
5862 })
5863 .await
5864 .unwrap();
5865 workspace_b
5866 .update(cx_b, |workspace, cx| {
5867 workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5868 assert_ne!(*workspace.active_pane(), pane_b1);
5869 let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
5870 workspace
5871 .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5872 .unwrap()
5873 })
5874 .await
5875 .unwrap();
5876
5877 workspace_a
5878 .update(cx_a, |workspace, cx| {
5879 workspace.activate_next_pane(cx);
5880 assert_eq!(*workspace.active_pane(), pane_a1);
5881 workspace.open_path((worktree_id, "3.txt"), true, cx)
5882 })
5883 .await
5884 .unwrap();
5885 workspace_b
5886 .update(cx_b, |workspace, cx| {
5887 workspace.activate_next_pane(cx);
5888 assert_eq!(*workspace.active_pane(), pane_b1);
5889 workspace.open_path((worktree_id, "4.txt"), true, cx)
5890 })
5891 .await
5892 .unwrap();
5893 cx_a.foreground().run_until_parked();
5894
5895 // Ensure leader updates don't change the active pane of followers
5896 workspace_a.read_with(cx_a, |workspace, _| {
5897 assert_eq!(*workspace.active_pane(), pane_a1);
5898 });
5899 workspace_b.read_with(cx_b, |workspace, _| {
5900 assert_eq!(*workspace.active_pane(), pane_b1);
5901 });
5902
5903 // Ensure peers following each other doesn't cause an infinite loop.
5904 assert_eq!(
5905 workspace_a.read_with(cx_a, |workspace, cx| workspace
5906 .active_item(cx)
5907 .unwrap()
5908 .project_path(cx)),
5909 Some((worktree_id, "3.txt").into())
5910 );
5911 workspace_a.update(cx_a, |workspace, cx| {
5912 assert_eq!(
5913 workspace.active_item(cx).unwrap().project_path(cx),
5914 Some((worktree_id, "3.txt").into())
5915 );
5916 workspace.activate_next_pane(cx);
5917 assert_eq!(
5918 workspace.active_item(cx).unwrap().project_path(cx),
5919 Some((worktree_id, "4.txt").into())
5920 );
5921 });
5922 workspace_b.update(cx_b, |workspace, cx| {
5923 assert_eq!(
5924 workspace.active_item(cx).unwrap().project_path(cx),
5925 Some((worktree_id, "4.txt").into())
5926 );
5927 workspace.activate_next_pane(cx);
5928 assert_eq!(
5929 workspace.active_item(cx).unwrap().project_path(cx),
5930 Some((worktree_id, "3.txt").into())
5931 );
5932 });
5933 }
5934
5935 #[gpui::test(iterations = 10)]
5936 async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5937 cx_a.foreground().forbid_parking();
5938 let fs = FakeFs::new(cx_a.background());
5939
5940 // 2 clients connect to a server.
5941 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5942 let mut client_a = server.create_client(cx_a, "user_a").await;
5943 let mut client_b = server.create_client(cx_b, "user_b").await;
5944 server
5945 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5946 .await;
5947 cx_a.update(editor::init);
5948 cx_b.update(editor::init);
5949
5950 // Client A shares a project.
5951 fs.insert_tree(
5952 "/a",
5953 json!({
5954 "1.txt": "one",
5955 "2.txt": "two",
5956 "3.txt": "three",
5957 }),
5958 )
5959 .await;
5960 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5961
5962 // Client B joins the project.
5963 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5964
5965 // Client A opens some editors.
5966 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5967 let _editor_a1 = workspace_a
5968 .update(cx_a, |workspace, cx| {
5969 workspace.open_path((worktree_id, "1.txt"), true, cx)
5970 })
5971 .await
5972 .unwrap()
5973 .downcast::<Editor>()
5974 .unwrap();
5975
5976 // Client B starts following client A.
5977 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5978 let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5979 let leader_id = project_b.read_with(cx_b, |project, _| {
5980 project.collaborators().values().next().unwrap().peer_id
5981 });
5982 workspace_b
5983 .update(cx_b, |workspace, cx| {
5984 workspace
5985 .toggle_follow(&ToggleFollow(leader_id), cx)
5986 .unwrap()
5987 })
5988 .await
5989 .unwrap();
5990 assert_eq!(
5991 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5992 Some(leader_id)
5993 );
5994 let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5995 workspace
5996 .active_item(cx)
5997 .unwrap()
5998 .downcast::<Editor>()
5999 .unwrap()
6000 });
6001
6002 // When client B moves, it automatically stops following client A.
6003 editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
6004 assert_eq!(
6005 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6006 None
6007 );
6008
6009 workspace_b
6010 .update(cx_b, |workspace, cx| {
6011 workspace
6012 .toggle_follow(&ToggleFollow(leader_id), cx)
6013 .unwrap()
6014 })
6015 .await
6016 .unwrap();
6017 assert_eq!(
6018 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6019 Some(leader_id)
6020 );
6021
6022 // When client B edits, it automatically stops following client A.
6023 editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
6024 assert_eq!(
6025 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6026 None
6027 );
6028
6029 workspace_b
6030 .update(cx_b, |workspace, cx| {
6031 workspace
6032 .toggle_follow(&ToggleFollow(leader_id), cx)
6033 .unwrap()
6034 })
6035 .await
6036 .unwrap();
6037 assert_eq!(
6038 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6039 Some(leader_id)
6040 );
6041
6042 // When client B scrolls, it automatically stops following client A.
6043 editor_b2.update(cx_b, |editor, cx| {
6044 editor.set_scroll_position(vec2f(0., 3.), cx)
6045 });
6046 assert_eq!(
6047 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6048 None
6049 );
6050
6051 workspace_b
6052 .update(cx_b, |workspace, cx| {
6053 workspace
6054 .toggle_follow(&ToggleFollow(leader_id), cx)
6055 .unwrap()
6056 })
6057 .await
6058 .unwrap();
6059 assert_eq!(
6060 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6061 Some(leader_id)
6062 );
6063
6064 // When client B activates a different pane, it continues following client A in the original pane.
6065 workspace_b.update(cx_b, |workspace, cx| {
6066 workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
6067 });
6068 assert_eq!(
6069 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6070 Some(leader_id)
6071 );
6072
6073 workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
6074 assert_eq!(
6075 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6076 Some(leader_id)
6077 );
6078
6079 // When client B activates a different item in the original pane, it automatically stops following client A.
6080 workspace_b
6081 .update(cx_b, |workspace, cx| {
6082 workspace.open_path((worktree_id, "2.txt"), true, cx)
6083 })
6084 .await
6085 .unwrap();
6086 assert_eq!(
6087 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6088 None
6089 );
6090 }
6091
6092 #[gpui::test(iterations = 100)]
6093 async fn test_random_collaboration(
6094 cx: &mut TestAppContext,
6095 deterministic: Arc<Deterministic>,
6096 rng: StdRng,
6097 ) {
6098 cx.foreground().forbid_parking();
6099 let max_peers = env::var("MAX_PEERS")
6100 .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
6101 .unwrap_or(5);
6102 assert!(max_peers <= 5);
6103
6104 let max_operations = env::var("OPERATIONS")
6105 .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
6106 .unwrap_or(10);
6107
6108 let rng = Arc::new(Mutex::new(rng));
6109
6110 let guest_lang_registry = Arc::new(LanguageRegistry::test());
6111 let host_language_registry = Arc::new(LanguageRegistry::test());
6112
6113 let fs = FakeFs::new(cx.background());
6114 fs.insert_tree("/_collab", json!({"init": ""})).await;
6115
6116 let mut server = TestServer::start(cx.foreground(), cx.background()).await;
6117 let db = server.app_state.db.clone();
6118 let host_user_id = db.create_user("host", None, false).await.unwrap();
6119 for username in ["guest-1", "guest-2", "guest-3", "guest-4"] {
6120 let guest_user_id = db.create_user(username, None, false).await.unwrap();
6121 server
6122 .app_state
6123 .db
6124 .send_contact_request(guest_user_id, host_user_id)
6125 .await
6126 .unwrap();
6127 server
6128 .app_state
6129 .db
6130 .respond_to_contact_request(host_user_id, guest_user_id, true)
6131 .await
6132 .unwrap();
6133 }
6134
6135 let mut clients = Vec::new();
6136 let mut user_ids = Vec::new();
6137 let mut op_start_signals = Vec::new();
6138
6139 let mut next_entity_id = 100000;
6140 let mut host_cx = TestAppContext::new(
6141 cx.foreground_platform(),
6142 cx.platform(),
6143 deterministic.build_foreground(next_entity_id),
6144 deterministic.build_background(),
6145 cx.font_cache(),
6146 cx.leak_detector(),
6147 next_entity_id,
6148 );
6149 let host = server.create_client(&mut host_cx, "host").await;
6150 let host_project = host_cx.update(|cx| {
6151 Project::local(
6152 host.client.clone(),
6153 host.user_store.clone(),
6154 host_language_registry.clone(),
6155 fs.clone(),
6156 cx,
6157 )
6158 });
6159 let host_project_id = host_project
6160 .update(&mut host_cx, |p, _| p.next_remote_id())
6161 .await;
6162
6163 let (collab_worktree, _) = host_project
6164 .update(&mut host_cx, |project, cx| {
6165 project.find_or_create_local_worktree("/_collab", true, cx)
6166 })
6167 .await
6168 .unwrap();
6169 collab_worktree
6170 .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
6171 .await;
6172
6173 // Set up fake language servers.
6174 let mut language = Language::new(
6175 LanguageConfig {
6176 name: "Rust".into(),
6177 path_suffixes: vec!["rs".to_string()],
6178 ..Default::default()
6179 },
6180 None,
6181 );
6182 let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
6183 name: "the-fake-language-server",
6184 capabilities: lsp::LanguageServer::full_capabilities(),
6185 initializer: Some(Box::new({
6186 let rng = rng.clone();
6187 let fs = fs.clone();
6188 let project = host_project.downgrade();
6189 move |fake_server: &mut FakeLanguageServer| {
6190 fake_server.handle_request::<lsp::request::Completion, _, _>(
6191 |_, _| async move {
6192 Ok(Some(lsp::CompletionResponse::Array(vec![
6193 lsp::CompletionItem {
6194 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
6195 range: lsp::Range::new(
6196 lsp::Position::new(0, 0),
6197 lsp::Position::new(0, 0),
6198 ),
6199 new_text: "the-new-text".to_string(),
6200 })),
6201 ..Default::default()
6202 },
6203 ])))
6204 },
6205 );
6206
6207 fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
6208 |_, _| async move {
6209 Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
6210 lsp::CodeAction {
6211 title: "the-code-action".to_string(),
6212 ..Default::default()
6213 },
6214 )]))
6215 },
6216 );
6217
6218 fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
6219 |params, _| async move {
6220 Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
6221 params.position,
6222 params.position,
6223 ))))
6224 },
6225 );
6226
6227 fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
6228 let fs = fs.clone();
6229 let rng = rng.clone();
6230 move |_, _| {
6231 let fs = fs.clone();
6232 let rng = rng.clone();
6233 async move {
6234 let files = fs.files().await;
6235 let mut rng = rng.lock();
6236 let count = rng.gen_range::<usize, _>(1..3);
6237 let files = (0..count)
6238 .map(|_| files.choose(&mut *rng).unwrap())
6239 .collect::<Vec<_>>();
6240 log::info!("LSP: Returning definitions in files {:?}", &files);
6241 Ok(Some(lsp::GotoDefinitionResponse::Array(
6242 files
6243 .into_iter()
6244 .map(|file| lsp::Location {
6245 uri: lsp::Url::from_file_path(file).unwrap(),
6246 range: Default::default(),
6247 })
6248 .collect(),
6249 )))
6250 }
6251 }
6252 });
6253
6254 fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
6255 let rng = rng.clone();
6256 let project = project.clone();
6257 move |params, mut cx| {
6258 let highlights = if let Some(project) = project.upgrade(&cx) {
6259 project.update(&mut cx, |project, cx| {
6260 let path = params
6261 .text_document_position_params
6262 .text_document
6263 .uri
6264 .to_file_path()
6265 .unwrap();
6266 let (worktree, relative_path) =
6267 project.find_local_worktree(&path, cx)?;
6268 let project_path =
6269 ProjectPath::from((worktree.read(cx).id(), relative_path));
6270 let buffer =
6271 project.get_open_buffer(&project_path, cx)?.read(cx);
6272
6273 let mut highlights = Vec::new();
6274 let highlight_count = rng.lock().gen_range(1..=5);
6275 let mut prev_end = 0;
6276 for _ in 0..highlight_count {
6277 let range =
6278 buffer.random_byte_range(prev_end, &mut *rng.lock());
6279
6280 highlights.push(lsp::DocumentHighlight {
6281 range: range_to_lsp(range.to_point_utf16(buffer)),
6282 kind: Some(lsp::DocumentHighlightKind::READ),
6283 });
6284 prev_end = range.end;
6285 }
6286 Some(highlights)
6287 })
6288 } else {
6289 None
6290 };
6291 async move { Ok(highlights) }
6292 }
6293 });
6294 }
6295 })),
6296 ..Default::default()
6297 });
6298 host_language_registry.add(Arc::new(language));
6299
6300 let op_start_signal = futures::channel::mpsc::unbounded();
6301 user_ids.push(host.current_user_id(&host_cx));
6302 op_start_signals.push(op_start_signal.0);
6303 clients.push(host_cx.foreground().spawn(host.simulate_host(
6304 host_project,
6305 op_start_signal.1,
6306 rng.clone(),
6307 host_cx,
6308 )));
6309
6310 let disconnect_host_at = if rng.lock().gen_bool(0.2) {
6311 rng.lock().gen_range(0..max_operations)
6312 } else {
6313 max_operations
6314 };
6315 let mut available_guests = vec![
6316 "guest-1".to_string(),
6317 "guest-2".to_string(),
6318 "guest-3".to_string(),
6319 "guest-4".to_string(),
6320 ];
6321 let mut operations = 0;
6322 while operations < max_operations {
6323 if operations == disconnect_host_at {
6324 server.disconnect_client(user_ids[0]);
6325 cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6326 drop(op_start_signals);
6327 let mut clients = futures::future::join_all(clients).await;
6328 cx.foreground().run_until_parked();
6329
6330 let (host, mut host_cx, host_err) = clients.remove(0);
6331 if let Some(host_err) = host_err {
6332 log::error!("host error - {:?}", host_err);
6333 }
6334 host.project
6335 .as_ref()
6336 .unwrap()
6337 .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
6338 for (guest, mut guest_cx, guest_err) in clients {
6339 if let Some(guest_err) = guest_err {
6340 log::error!("{} error - {:?}", guest.username, guest_err);
6341 }
6342
6343 let contacts = server
6344 .app_state
6345 .db
6346 .get_contacts(guest.current_user_id(&guest_cx))
6347 .await
6348 .unwrap();
6349 let contacts = server
6350 .store
6351 .read()
6352 .await
6353 .build_initial_contacts_update(contacts)
6354 .contacts;
6355 assert!(!contacts
6356 .iter()
6357 .flat_map(|contact| &contact.projects)
6358 .any(|project| project.id == host_project_id));
6359 guest
6360 .project
6361 .as_ref()
6362 .unwrap()
6363 .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6364 guest_cx.update(|_| drop(guest));
6365 }
6366 host_cx.update(|_| drop(host));
6367
6368 return;
6369 }
6370
6371 let distribution = rng.lock().gen_range(0..100);
6372 match distribution {
6373 0..=19 if !available_guests.is_empty() => {
6374 let guest_ix = rng.lock().gen_range(0..available_guests.len());
6375 let guest_username = available_guests.remove(guest_ix);
6376 log::info!("Adding new connection for {}", guest_username);
6377 next_entity_id += 100000;
6378 let mut guest_cx = TestAppContext::new(
6379 cx.foreground_platform(),
6380 cx.platform(),
6381 deterministic.build_foreground(next_entity_id),
6382 deterministic.build_background(),
6383 cx.font_cache(),
6384 cx.leak_detector(),
6385 next_entity_id,
6386 );
6387 let guest = server.create_client(&mut guest_cx, &guest_username).await;
6388 let guest_project = Project::remote(
6389 host_project_id,
6390 guest.client.clone(),
6391 guest.user_store.clone(),
6392 guest_lang_registry.clone(),
6393 FakeFs::new(cx.background()),
6394 &mut guest_cx.to_async(),
6395 )
6396 .await
6397 .unwrap();
6398 let op_start_signal = futures::channel::mpsc::unbounded();
6399 user_ids.push(guest.current_user_id(&guest_cx));
6400 op_start_signals.push(op_start_signal.0);
6401 clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
6402 guest_username.clone(),
6403 guest_project,
6404 op_start_signal.1,
6405 rng.clone(),
6406 guest_cx,
6407 )));
6408
6409 log::info!("Added connection for {}", guest_username);
6410 operations += 1;
6411 }
6412 20..=29 if clients.len() > 1 => {
6413 let guest_ix = rng.lock().gen_range(1..clients.len());
6414 log::info!("Removing guest {}", user_ids[guest_ix]);
6415 let removed_guest_id = user_ids.remove(guest_ix);
6416 let guest = clients.remove(guest_ix);
6417 op_start_signals.remove(guest_ix);
6418 server.forbid_connections();
6419 server.disconnect_client(removed_guest_id);
6420 cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6421 let (guest, mut guest_cx, guest_err) = guest.await;
6422 server.allow_connections();
6423
6424 if let Some(guest_err) = guest_err {
6425 log::error!("{} error - {:?}", guest.username, guest_err);
6426 }
6427 guest
6428 .project
6429 .as_ref()
6430 .unwrap()
6431 .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6432 for user_id in &user_ids {
6433 let contacts = server.app_state.db.get_contacts(*user_id).await.unwrap();
6434 let contacts = server
6435 .store
6436 .read()
6437 .await
6438 .build_initial_contacts_update(contacts)
6439 .contacts;
6440 for contact in contacts {
6441 if contact.online {
6442 assert_ne!(
6443 contact.user_id, removed_guest_id.0 as u64,
6444 "removed guest is still a contact of another peer"
6445 );
6446 }
6447 for project in contact.projects {
6448 for project_guest_id in project.guests {
6449 assert_ne!(
6450 project_guest_id, removed_guest_id.0 as u64,
6451 "removed guest appears as still participating on a project"
6452 );
6453 }
6454 }
6455 }
6456 }
6457
6458 log::info!("{} removed", guest.username);
6459 available_guests.push(guest.username.clone());
6460 guest_cx.update(|_| drop(guest));
6461
6462 operations += 1;
6463 }
6464 _ => {
6465 while operations < max_operations && rng.lock().gen_bool(0.7) {
6466 op_start_signals
6467 .choose(&mut *rng.lock())
6468 .unwrap()
6469 .unbounded_send(())
6470 .unwrap();
6471 operations += 1;
6472 }
6473
6474 if rng.lock().gen_bool(0.8) {
6475 cx.foreground().run_until_parked();
6476 }
6477 }
6478 }
6479 }
6480
6481 drop(op_start_signals);
6482 let mut clients = futures::future::join_all(clients).await;
6483 cx.foreground().run_until_parked();
6484
6485 let (host_client, mut host_cx, host_err) = clients.remove(0);
6486 if let Some(host_err) = host_err {
6487 panic!("host error - {:?}", host_err);
6488 }
6489 let host_project = host_client.project.as_ref().unwrap();
6490 let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
6491 project
6492 .worktrees(cx)
6493 .map(|worktree| {
6494 let snapshot = worktree.read(cx).snapshot();
6495 (snapshot.id(), snapshot)
6496 })
6497 .collect::<BTreeMap<_, _>>()
6498 });
6499
6500 host_client
6501 .project
6502 .as_ref()
6503 .unwrap()
6504 .read_with(&host_cx, |project, cx| project.check_invariants(cx));
6505
6506 for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
6507 if let Some(guest_err) = guest_err {
6508 panic!("{} error - {:?}", guest_client.username, guest_err);
6509 }
6510 let worktree_snapshots =
6511 guest_client
6512 .project
6513 .as_ref()
6514 .unwrap()
6515 .read_with(&guest_cx, |project, cx| {
6516 project
6517 .worktrees(cx)
6518 .map(|worktree| {
6519 let worktree = worktree.read(cx);
6520 (worktree.id(), worktree.snapshot())
6521 })
6522 .collect::<BTreeMap<_, _>>()
6523 });
6524
6525 assert_eq!(
6526 worktree_snapshots.keys().collect::<Vec<_>>(),
6527 host_worktree_snapshots.keys().collect::<Vec<_>>(),
6528 "{} has different worktrees than the host",
6529 guest_client.username
6530 );
6531 for (id, host_snapshot) in &host_worktree_snapshots {
6532 let guest_snapshot = &worktree_snapshots[id];
6533 assert_eq!(
6534 guest_snapshot.root_name(),
6535 host_snapshot.root_name(),
6536 "{} has different root name than the host for worktree {}",
6537 guest_client.username,
6538 id
6539 );
6540 assert_eq!(
6541 guest_snapshot.entries(false).collect::<Vec<_>>(),
6542 host_snapshot.entries(false).collect::<Vec<_>>(),
6543 "{} has different snapshot than the host for worktree {}",
6544 guest_client.username,
6545 id
6546 );
6547 assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id());
6548 }
6549
6550 guest_client
6551 .project
6552 .as_ref()
6553 .unwrap()
6554 .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
6555
6556 for guest_buffer in &guest_client.buffers {
6557 let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
6558 let host_buffer = host_project.read_with(&host_cx, |project, cx| {
6559 project.buffer_for_id(buffer_id, cx).expect(&format!(
6560 "host does not have buffer for guest:{}, peer:{}, id:{}",
6561 guest_client.username, guest_client.peer_id, buffer_id
6562 ))
6563 });
6564 let path = host_buffer
6565 .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
6566
6567 assert_eq!(
6568 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
6569 0,
6570 "{}, buffer {}, path {:?} has deferred operations",
6571 guest_client.username,
6572 buffer_id,
6573 path,
6574 );
6575 assert_eq!(
6576 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
6577 host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
6578 "{}, buffer {}, path {:?}, differs from the host's buffer",
6579 guest_client.username,
6580 buffer_id,
6581 path
6582 );
6583 }
6584
6585 guest_cx.update(|_| drop(guest_client));
6586 }
6587
6588 host_cx.update(|_| drop(host_client));
6589 }
6590
6591 struct TestServer {
6592 peer: Arc<Peer>,
6593 app_state: Arc<AppState>,
6594 server: Arc<Server>,
6595 foreground: Rc<executor::Foreground>,
6596 notifications: mpsc::UnboundedReceiver<()>,
6597 connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
6598 forbid_connections: Arc<AtomicBool>,
6599 _test_db: TestDb,
6600 }
6601
6602 impl TestServer {
6603 async fn start(
6604 foreground: Rc<executor::Foreground>,
6605 background: Arc<executor::Background>,
6606 ) -> Self {
6607 let test_db = TestDb::fake(background);
6608 let app_state = Self::build_app_state(&test_db).await;
6609 let peer = Peer::new();
6610 let notifications = mpsc::unbounded();
6611 let server = Server::new(app_state.clone(), Some(notifications.0));
6612 Self {
6613 peer,
6614 app_state,
6615 server,
6616 foreground,
6617 notifications: notifications.1,
6618 connection_killers: Default::default(),
6619 forbid_connections: Default::default(),
6620 _test_db: test_db,
6621 }
6622 }
6623
6624 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
6625 cx.update(|cx| {
6626 let settings = Settings::test(cx);
6627 cx.set_global(settings);
6628 });
6629
6630 let http = FakeHttpClient::with_404_response();
6631 let user_id =
6632 if let Ok(Some(user)) = self.app_state.db.get_user_by_github_login(name).await {
6633 user.id
6634 } else {
6635 self.app_state.db.create_user(name, None, false).await.unwrap()
6636 };
6637 let client_name = name.to_string();
6638 let mut client = Client::new(http.clone());
6639 let server = self.server.clone();
6640 let db = self.app_state.db.clone();
6641 let connection_killers = self.connection_killers.clone();
6642 let forbid_connections = self.forbid_connections.clone();
6643 let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
6644
6645 Arc::get_mut(&mut client)
6646 .unwrap()
6647 .override_authenticate(move |cx| {
6648 cx.spawn(|_| async move {
6649 let access_token = "the-token".to_string();
6650 Ok(Credentials {
6651 user_id: user_id.0 as u64,
6652 access_token,
6653 })
6654 })
6655 })
6656 .override_establish_connection(move |credentials, cx| {
6657 assert_eq!(credentials.user_id, user_id.0 as u64);
6658 assert_eq!(credentials.access_token, "the-token");
6659
6660 let server = server.clone();
6661 let db = db.clone();
6662 let connection_killers = connection_killers.clone();
6663 let forbid_connections = forbid_connections.clone();
6664 let client_name = client_name.clone();
6665 let connection_id_tx = connection_id_tx.clone();
6666 cx.spawn(move |cx| async move {
6667 if forbid_connections.load(SeqCst) {
6668 Err(EstablishConnectionError::other(anyhow!(
6669 "server is forbidding connections"
6670 )))
6671 } else {
6672 let (client_conn, server_conn, killed) =
6673 Connection::in_memory(cx.background());
6674 connection_killers.lock().insert(user_id, killed);
6675 let user = db.get_user_by_id(user_id).await.unwrap().unwrap();
6676 cx.background()
6677 .spawn(server.handle_connection(
6678 server_conn,
6679 client_name,
6680 user,
6681 Some(connection_id_tx),
6682 cx.background(),
6683 ))
6684 .detach();
6685 Ok(client_conn)
6686 }
6687 })
6688 });
6689
6690 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
6691 let app_state = Arc::new(workspace::AppState {
6692 client: client.clone(),
6693 user_store: user_store.clone(),
6694 languages: Arc::new(LanguageRegistry::new(Task::ready(()))),
6695 themes: ThemeRegistry::new((), cx.font_cache()),
6696 fs: FakeFs::new(cx.background()),
6697 build_window_options: || Default::default(),
6698 initialize_workspace: |_, _, _| unimplemented!(),
6699 });
6700
6701 Channel::init(&client);
6702 Project::init(&client);
6703 cx.update(|cx| workspace::init(app_state.clone(), cx));
6704
6705 client
6706 .authenticate_and_connect(false, &cx.to_async())
6707 .await
6708 .unwrap();
6709 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
6710
6711 let client = TestClient {
6712 client,
6713 peer_id,
6714 username: name.to_string(),
6715 user_store,
6716 language_registry: Arc::new(LanguageRegistry::test()),
6717 project: Default::default(),
6718 buffers: Default::default(),
6719 };
6720 client.wait_for_current_user(cx).await;
6721 client
6722 }
6723
6724 fn disconnect_client(&self, user_id: UserId) {
6725 self.connection_killers
6726 .lock()
6727 .remove(&user_id)
6728 .unwrap()
6729 .store(true, SeqCst);
6730 }
6731
6732 fn forbid_connections(&self) {
6733 self.forbid_connections.store(true, SeqCst);
6734 }
6735
6736 fn allow_connections(&self) {
6737 self.forbid_connections.store(false, SeqCst);
6738 }
6739
6740 async fn make_contacts(&self, mut clients: Vec<(&TestClient, &mut TestAppContext)>) {
6741 while let Some((client_a, cx_a)) = clients.pop() {
6742 for (client_b, cx_b) in &mut clients {
6743 client_a
6744 .user_store
6745 .update(cx_a, |store, cx| {
6746 store.request_contact(client_b.user_id().unwrap(), cx)
6747 })
6748 .await
6749 .unwrap();
6750 cx_a.foreground().run_until_parked();
6751 client_b
6752 .user_store
6753 .update(*cx_b, |store, cx| {
6754 store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
6755 })
6756 .await
6757 .unwrap();
6758 }
6759 }
6760 }
6761
6762 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
6763 Arc::new(AppState {
6764 db: test_db.db().clone(),
6765 api_token: Default::default(),
6766 invite_link_prefix: Default::default(),
6767 })
6768 }
6769
6770 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
6771 self.server.store.read().await
6772 }
6773
6774 async fn condition<F>(&mut self, mut predicate: F)
6775 where
6776 F: FnMut(&Store) -> bool,
6777 {
6778 assert!(
6779 self.foreground.parking_forbidden(),
6780 "you must call forbid_parking to use server conditions so we don't block indefinitely"
6781 );
6782 while !(predicate)(&*self.server.store.read().await) {
6783 self.foreground.start_waiting();
6784 self.notifications.next().await;
6785 self.foreground.finish_waiting();
6786 }
6787 }
6788 }
6789
6790 impl Deref for TestServer {
6791 type Target = Server;
6792
6793 fn deref(&self) -> &Self::Target {
6794 &self.server
6795 }
6796 }
6797
6798 impl Drop for TestServer {
6799 fn drop(&mut self) {
6800 self.peer.reset();
6801 }
6802 }
6803
6804 struct TestClient {
6805 client: Arc<Client>,
6806 username: String,
6807 pub peer_id: PeerId,
6808 pub user_store: ModelHandle<UserStore>,
6809 language_registry: Arc<LanguageRegistry>,
6810 project: Option<ModelHandle<Project>>,
6811 buffers: HashSet<ModelHandle<language::Buffer>>,
6812 }
6813
6814 impl Deref for TestClient {
6815 type Target = Arc<Client>;
6816
6817 fn deref(&self) -> &Self::Target {
6818 &self.client
6819 }
6820 }
6821
6822 struct ContactsSummary {
6823 pub current: Vec<String>,
6824 pub outgoing_requests: Vec<String>,
6825 pub incoming_requests: Vec<String>,
6826 }
6827
6828 impl TestClient {
6829 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
6830 UserId::from_proto(
6831 self.user_store
6832 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
6833 )
6834 }
6835
6836 async fn wait_for_current_user(&self, cx: &TestAppContext) {
6837 let mut authed_user = self
6838 .user_store
6839 .read_with(cx, |user_store, _| user_store.watch_current_user());
6840 while authed_user.next().await.unwrap().is_none() {}
6841 }
6842
6843 async fn clear_contacts(&self, cx: &mut TestAppContext) {
6844 self.user_store
6845 .update(cx, |store, _| store.clear_contacts())
6846 .await;
6847 }
6848
6849 fn summarize_contacts(&self, cx: &TestAppContext) -> ContactsSummary {
6850 self.user_store.read_with(cx, |store, _| ContactsSummary {
6851 current: store
6852 .contacts()
6853 .iter()
6854 .map(|contact| contact.user.github_login.clone())
6855 .collect(),
6856 outgoing_requests: store
6857 .outgoing_contact_requests()
6858 .iter()
6859 .map(|user| user.github_login.clone())
6860 .collect(),
6861 incoming_requests: store
6862 .incoming_contact_requests()
6863 .iter()
6864 .map(|user| user.github_login.clone())
6865 .collect(),
6866 })
6867 }
6868
6869 async fn build_local_project(
6870 &mut self,
6871 fs: Arc<FakeFs>,
6872 root_path: impl AsRef<Path>,
6873 cx: &mut TestAppContext,
6874 ) -> (ModelHandle<Project>, WorktreeId) {
6875 let project = cx.update(|cx| {
6876 Project::local(
6877 self.client.clone(),
6878 self.user_store.clone(),
6879 self.language_registry.clone(),
6880 fs,
6881 cx,
6882 )
6883 });
6884 self.project = Some(project.clone());
6885 let (worktree, _) = project
6886 .update(cx, |p, cx| {
6887 p.find_or_create_local_worktree(root_path, true, cx)
6888 })
6889 .await
6890 .unwrap();
6891 worktree
6892 .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
6893 .await;
6894 project
6895 .update(cx, |project, _| project.next_remote_id())
6896 .await;
6897 (project, worktree.read_with(cx, |tree, _| tree.id()))
6898 }
6899
6900 async fn build_remote_project(
6901 &mut self,
6902 host_project: &ModelHandle<Project>,
6903 host_cx: &mut TestAppContext,
6904 guest_cx: &mut TestAppContext,
6905 ) -> ModelHandle<Project> {
6906 let host_project_id = host_project
6907 .read_with(host_cx, |project, _| project.next_remote_id())
6908 .await;
6909 let guest_user_id = self.user_id().unwrap();
6910 let languages =
6911 host_project.read_with(host_cx, |project, _| project.languages().clone());
6912 let project_b = guest_cx.spawn(|mut cx| {
6913 let user_store = self.user_store.clone();
6914 let guest_client = self.client.clone();
6915 async move {
6916 Project::remote(
6917 host_project_id,
6918 guest_client,
6919 user_store.clone(),
6920 languages,
6921 FakeFs::new(cx.background()),
6922 &mut cx,
6923 )
6924 .await
6925 .unwrap()
6926 }
6927 });
6928 host_cx.foreground().run_until_parked();
6929 host_project.update(host_cx, |project, cx| {
6930 project.respond_to_join_request(guest_user_id, true, cx)
6931 });
6932 let project = project_b.await;
6933 self.project = Some(project.clone());
6934 project
6935 }
6936
6937 fn build_workspace(
6938 &self,
6939 project: &ModelHandle<Project>,
6940 cx: &mut TestAppContext,
6941 ) -> ViewHandle<Workspace> {
6942 let (window_id, _) = cx.add_window(|_| EmptyView);
6943 cx.add_view(window_id, |cx| Workspace::new(project.clone(), cx))
6944 }
6945
6946 async fn simulate_host(
6947 mut self,
6948 project: ModelHandle<Project>,
6949 op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6950 rng: Arc<Mutex<StdRng>>,
6951 mut cx: TestAppContext,
6952 ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6953 async fn simulate_host_internal(
6954 client: &mut TestClient,
6955 project: ModelHandle<Project>,
6956 mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6957 rng: Arc<Mutex<StdRng>>,
6958 cx: &mut TestAppContext,
6959 ) -> anyhow::Result<()> {
6960 let fs = project.read_with(cx, |project, _| project.fs().clone());
6961
6962 cx.update(|cx| {
6963 cx.subscribe(&project, move |project, event, cx| {
6964 if let project::Event::ContactRequestedJoin(user) = event {
6965 log::info!("Host: accepting join request from {}", user.github_login);
6966 project.update(cx, |project, cx| {
6967 project.respond_to_join_request(user.id, true, cx)
6968 });
6969 }
6970 })
6971 .detach();
6972 });
6973
6974 while op_start_signal.next().await.is_some() {
6975 let distribution = rng.lock().gen_range::<usize, _>(0..100);
6976 let files = fs.as_fake().files().await;
6977 match distribution {
6978 0..=19 if !files.is_empty() => {
6979 let path = files.choose(&mut *rng.lock()).unwrap();
6980 let mut path = path.as_path();
6981 while let Some(parent_path) = path.parent() {
6982 path = parent_path;
6983 if rng.lock().gen() {
6984 break;
6985 }
6986 }
6987
6988 log::info!("Host: find/create local worktree {:?}", path);
6989 let find_or_create_worktree = project.update(cx, |project, cx| {
6990 project.find_or_create_local_worktree(path, true, cx)
6991 });
6992 if rng.lock().gen() {
6993 cx.background().spawn(find_or_create_worktree).detach();
6994 } else {
6995 find_or_create_worktree.await?;
6996 }
6997 }
6998 20..=79 if !files.is_empty() => {
6999 let buffer = if client.buffers.is_empty() || rng.lock().gen() {
7000 let file = files.choose(&mut *rng.lock()).unwrap();
7001 let (worktree, path) = project
7002 .update(cx, |project, cx| {
7003 project.find_or_create_local_worktree(
7004 file.clone(),
7005 true,
7006 cx,
7007 )
7008 })
7009 .await?;
7010 let project_path =
7011 worktree.read_with(cx, |worktree, _| (worktree.id(), path));
7012 log::info!(
7013 "Host: opening path {:?}, worktree {}, relative_path {:?}",
7014 file,
7015 project_path.0,
7016 project_path.1
7017 );
7018 let buffer = project
7019 .update(cx, |project, cx| project.open_buffer(project_path, cx))
7020 .await
7021 .unwrap();
7022 client.buffers.insert(buffer.clone());
7023 buffer
7024 } else {
7025 client
7026 .buffers
7027 .iter()
7028 .choose(&mut *rng.lock())
7029 .unwrap()
7030 .clone()
7031 };
7032
7033 if rng.lock().gen_bool(0.1) {
7034 cx.update(|cx| {
7035 log::info!(
7036 "Host: dropping buffer {:?}",
7037 buffer.read(cx).file().unwrap().full_path(cx)
7038 );
7039 client.buffers.remove(&buffer);
7040 drop(buffer);
7041 });
7042 } else {
7043 buffer.update(cx, |buffer, cx| {
7044 log::info!(
7045 "Host: updating buffer {:?} ({})",
7046 buffer.file().unwrap().full_path(cx),
7047 buffer.remote_id()
7048 );
7049
7050 if rng.lock().gen_bool(0.7) {
7051 buffer.randomly_edit(&mut *rng.lock(), 5, cx);
7052 } else {
7053 buffer.randomly_undo_redo(&mut *rng.lock(), cx);
7054 }
7055 });
7056 }
7057 }
7058 _ => loop {
7059 let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
7060 let mut path = PathBuf::new();
7061 path.push("/");
7062 for _ in 0..path_component_count {
7063 let letter = rng.lock().gen_range(b'a'..=b'z');
7064 path.push(std::str::from_utf8(&[letter]).unwrap());
7065 }
7066 path.set_extension("rs");
7067 let parent_path = path.parent().unwrap();
7068
7069 log::info!("Host: creating file {:?}", path,);
7070
7071 if fs.create_dir(&parent_path).await.is_ok()
7072 && fs.create_file(&path, Default::default()).await.is_ok()
7073 {
7074 break;
7075 } else {
7076 log::info!("Host: cannot create file");
7077 }
7078 },
7079 }
7080
7081 cx.background().simulate_random_delay().await;
7082 }
7083
7084 Ok(())
7085 }
7086
7087 let result =
7088 simulate_host_internal(&mut self, project.clone(), op_start_signal, rng, &mut cx)
7089 .await;
7090 log::info!("Host done");
7091 self.project = Some(project);
7092 (self, cx, result.err())
7093 }
7094
7095 pub async fn simulate_guest(
7096 mut self,
7097 guest_username: String,
7098 project: ModelHandle<Project>,
7099 op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
7100 rng: Arc<Mutex<StdRng>>,
7101 mut cx: TestAppContext,
7102 ) -> (Self, TestAppContext, Option<anyhow::Error>) {
7103 async fn simulate_guest_internal(
7104 client: &mut TestClient,
7105 guest_username: &str,
7106 project: ModelHandle<Project>,
7107 mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
7108 rng: Arc<Mutex<StdRng>>,
7109 cx: &mut TestAppContext,
7110 ) -> anyhow::Result<()> {
7111 while op_start_signal.next().await.is_some() {
7112 let buffer = if client.buffers.is_empty() || rng.lock().gen() {
7113 let worktree = if let Some(worktree) =
7114 project.read_with(cx, |project, cx| {
7115 project
7116 .worktrees(&cx)
7117 .filter(|worktree| {
7118 let worktree = worktree.read(cx);
7119 worktree.is_visible()
7120 && worktree.entries(false).any(|e| e.is_file())
7121 })
7122 .choose(&mut *rng.lock())
7123 }) {
7124 worktree
7125 } else {
7126 cx.background().simulate_random_delay().await;
7127 continue;
7128 };
7129
7130 let (worktree_root_name, project_path) =
7131 worktree.read_with(cx, |worktree, _| {
7132 let entry = worktree
7133 .entries(false)
7134 .filter(|e| e.is_file())
7135 .choose(&mut *rng.lock())
7136 .unwrap();
7137 (
7138 worktree.root_name().to_string(),
7139 (worktree.id(), entry.path.clone()),
7140 )
7141 });
7142 log::info!(
7143 "{}: opening path {:?} in worktree {} ({})",
7144 guest_username,
7145 project_path.1,
7146 project_path.0,
7147 worktree_root_name,
7148 );
7149 let buffer = project
7150 .update(cx, |project, cx| {
7151 project.open_buffer(project_path.clone(), cx)
7152 })
7153 .await?;
7154 log::info!(
7155 "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
7156 guest_username,
7157 project_path.1,
7158 project_path.0,
7159 worktree_root_name,
7160 buffer.read_with(cx, |buffer, _| buffer.remote_id())
7161 );
7162 client.buffers.insert(buffer.clone());
7163 buffer
7164 } else {
7165 client
7166 .buffers
7167 .iter()
7168 .choose(&mut *rng.lock())
7169 .unwrap()
7170 .clone()
7171 };
7172
7173 let choice = rng.lock().gen_range(0..100);
7174 match choice {
7175 0..=9 => {
7176 cx.update(|cx| {
7177 log::info!(
7178 "{}: dropping buffer {:?}",
7179 guest_username,
7180 buffer.read(cx).file().unwrap().full_path(cx)
7181 );
7182 client.buffers.remove(&buffer);
7183 drop(buffer);
7184 });
7185 }
7186 10..=19 => {
7187 let completions = project.update(cx, |project, cx| {
7188 log::info!(
7189 "{}: requesting completions for buffer {} ({:?})",
7190 guest_username,
7191 buffer.read(cx).remote_id(),
7192 buffer.read(cx).file().unwrap().full_path(cx)
7193 );
7194 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7195 project.completions(&buffer, offset, cx)
7196 });
7197 let completions = cx.background().spawn(async move {
7198 completions
7199 .await
7200 .map_err(|err| anyhow!("completions request failed: {:?}", err))
7201 });
7202 if rng.lock().gen_bool(0.3) {
7203 log::info!("{}: detaching completions request", guest_username);
7204 cx.update(|cx| completions.detach_and_log_err(cx));
7205 } else {
7206 completions.await?;
7207 }
7208 }
7209 20..=29 => {
7210 let code_actions = project.update(cx, |project, cx| {
7211 log::info!(
7212 "{}: requesting code actions for buffer {} ({:?})",
7213 guest_username,
7214 buffer.read(cx).remote_id(),
7215 buffer.read(cx).file().unwrap().full_path(cx)
7216 );
7217 let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
7218 project.code_actions(&buffer, range, cx)
7219 });
7220 let code_actions = cx.background().spawn(async move {
7221 code_actions.await.map_err(|err| {
7222 anyhow!("code actions request failed: {:?}", err)
7223 })
7224 });
7225 if rng.lock().gen_bool(0.3) {
7226 log::info!("{}: detaching code actions request", guest_username);
7227 cx.update(|cx| code_actions.detach_and_log_err(cx));
7228 } else {
7229 code_actions.await?;
7230 }
7231 }
7232 30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
7233 let (requested_version, save) = buffer.update(cx, |buffer, cx| {
7234 log::info!(
7235 "{}: saving buffer {} ({:?})",
7236 guest_username,
7237 buffer.remote_id(),
7238 buffer.file().unwrap().full_path(cx)
7239 );
7240 (buffer.version(), buffer.save(cx))
7241 });
7242 let save = cx.background().spawn(async move {
7243 let (saved_version, _) = save
7244 .await
7245 .map_err(|err| anyhow!("save request failed: {:?}", err))?;
7246 assert!(saved_version.observed_all(&requested_version));
7247 Ok::<_, anyhow::Error>(())
7248 });
7249 if rng.lock().gen_bool(0.3) {
7250 log::info!("{}: detaching save request", guest_username);
7251 cx.update(|cx| save.detach_and_log_err(cx));
7252 } else {
7253 save.await?;
7254 }
7255 }
7256 40..=44 => {
7257 let prepare_rename = project.update(cx, |project, cx| {
7258 log::info!(
7259 "{}: preparing rename for buffer {} ({:?})",
7260 guest_username,
7261 buffer.read(cx).remote_id(),
7262 buffer.read(cx).file().unwrap().full_path(cx)
7263 );
7264 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7265 project.prepare_rename(buffer, offset, cx)
7266 });
7267 let prepare_rename = cx.background().spawn(async move {
7268 prepare_rename.await.map_err(|err| {
7269 anyhow!("prepare rename request failed: {:?}", err)
7270 })
7271 });
7272 if rng.lock().gen_bool(0.3) {
7273 log::info!("{}: detaching prepare rename request", guest_username);
7274 cx.update(|cx| prepare_rename.detach_and_log_err(cx));
7275 } else {
7276 prepare_rename.await?;
7277 }
7278 }
7279 45..=49 => {
7280 let definitions = project.update(cx, |project, cx| {
7281 log::info!(
7282 "{}: requesting definitions for buffer {} ({:?})",
7283 guest_username,
7284 buffer.read(cx).remote_id(),
7285 buffer.read(cx).file().unwrap().full_path(cx)
7286 );
7287 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7288 project.definition(&buffer, offset, cx)
7289 });
7290 let definitions = cx.background().spawn(async move {
7291 definitions
7292 .await
7293 .map_err(|err| anyhow!("definitions request failed: {:?}", err))
7294 });
7295 if rng.lock().gen_bool(0.3) {
7296 log::info!("{}: detaching definitions request", guest_username);
7297 cx.update(|cx| definitions.detach_and_log_err(cx));
7298 } else {
7299 client
7300 .buffers
7301 .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
7302 }
7303 }
7304 50..=54 => {
7305 let highlights = project.update(cx, |project, cx| {
7306 log::info!(
7307 "{}: requesting highlights for buffer {} ({:?})",
7308 guest_username,
7309 buffer.read(cx).remote_id(),
7310 buffer.read(cx).file().unwrap().full_path(cx)
7311 );
7312 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7313 project.document_highlights(&buffer, offset, cx)
7314 });
7315 let highlights = cx.background().spawn(async move {
7316 highlights
7317 .await
7318 .map_err(|err| anyhow!("highlights request failed: {:?}", err))
7319 });
7320 if rng.lock().gen_bool(0.3) {
7321 log::info!("{}: detaching highlights request", guest_username);
7322 cx.update(|cx| highlights.detach_and_log_err(cx));
7323 } else {
7324 highlights.await?;
7325 }
7326 }
7327 55..=59 => {
7328 let search = project.update(cx, |project, cx| {
7329 let query = rng.lock().gen_range('a'..='z');
7330 log::info!("{}: project-wide search {:?}", guest_username, query);
7331 project.search(SearchQuery::text(query, false, false), cx)
7332 });
7333 let search = cx.background().spawn(async move {
7334 search
7335 .await
7336 .map_err(|err| anyhow!("search request failed: {:?}", err))
7337 });
7338 if rng.lock().gen_bool(0.3) {
7339 log::info!("{}: detaching search request", guest_username);
7340 cx.update(|cx| search.detach_and_log_err(cx));
7341 } else {
7342 client.buffers.extend(search.await?.into_keys());
7343 }
7344 }
7345 60..=69 => {
7346 let worktree = project
7347 .read_with(cx, |project, cx| {
7348 project
7349 .worktrees(&cx)
7350 .filter(|worktree| {
7351 let worktree = worktree.read(cx);
7352 worktree.is_visible()
7353 && worktree.entries(false).any(|e| e.is_file())
7354 && worktree
7355 .root_entry()
7356 .map_or(false, |e| e.is_dir())
7357 })
7358 .choose(&mut *rng.lock())
7359 })
7360 .unwrap();
7361 let (worktree_id, worktree_root_name) = worktree
7362 .read_with(cx, |worktree, _| {
7363 (worktree.id(), worktree.root_name().to_string())
7364 });
7365
7366 let mut new_name = String::new();
7367 for _ in 0..10 {
7368 let letter = rng.lock().gen_range('a'..='z');
7369 new_name.push(letter);
7370 }
7371 let mut new_path = PathBuf::new();
7372 new_path.push(new_name);
7373 new_path.set_extension("rs");
7374 log::info!(
7375 "{}: creating {:?} in worktree {} ({})",
7376 guest_username,
7377 new_path,
7378 worktree_id,
7379 worktree_root_name,
7380 );
7381 project
7382 .update(cx, |project, cx| {
7383 project.create_entry((worktree_id, new_path), false, cx)
7384 })
7385 .unwrap()
7386 .await?;
7387 }
7388 _ => {
7389 buffer.update(cx, |buffer, cx| {
7390 log::info!(
7391 "{}: updating buffer {} ({:?})",
7392 guest_username,
7393 buffer.remote_id(),
7394 buffer.file().unwrap().full_path(cx)
7395 );
7396 if rng.lock().gen_bool(0.7) {
7397 buffer.randomly_edit(&mut *rng.lock(), 5, cx);
7398 } else {
7399 buffer.randomly_undo_redo(&mut *rng.lock(), cx);
7400 }
7401 });
7402 }
7403 }
7404 cx.background().simulate_random_delay().await;
7405 }
7406 Ok(())
7407 }
7408
7409 let result = simulate_guest_internal(
7410 &mut self,
7411 &guest_username,
7412 project.clone(),
7413 op_start_signal,
7414 rng,
7415 &mut cx,
7416 )
7417 .await;
7418 log::info!("{}: done", guest_username);
7419
7420 self.project = Some(project);
7421 (self, cx, result.err())
7422 }
7423 }
7424
7425 impl Drop for TestClient {
7426 fn drop(&mut self) {
7427 self.client.tear_down();
7428 }
7429 }
7430
7431 impl Executor for Arc<gpui::executor::Background> {
7432 type Sleep = gpui::executor::Timer;
7433
7434 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
7435 self.spawn(future).detach();
7436 }
7437
7438 fn sleep(&self, duration: Duration) -> Self::Sleep {
7439 self.as_ref().timer(duration)
7440 }
7441 }
7442
7443 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
7444 channel
7445 .messages()
7446 .cursor::<()>()
7447 .map(|m| {
7448 (
7449 m.sender.github_login.clone(),
7450 m.body.clone(),
7451 m.is_pending(),
7452 )
7453 })
7454 .collect()
7455 }
7456
7457 struct EmptyView;
7458
7459 impl gpui::Entity for EmptyView {
7460 type Event = ();
7461 }
7462
7463 impl gpui::View for EmptyView {
7464 fn ui_name() -> &'static str {
7465 "empty view"
7466 }
7467
7468 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
7469 gpui::Element::boxed(gpui::elements::Empty::new())
7470 }
7471 }
7472}