1mod store;
2
3use crate::{
4 auth,
5 db::{self, ChannelId, MessageId, User, UserId},
6 AppState, Result,
7};
8use anyhow::anyhow;
9use async_tungstenite::tungstenite::{
10 protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
11};
12use axum::{
13 body::Body,
14 extract::{
15 ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
16 ConnectInfo, WebSocketUpgrade,
17 },
18 headers::{Header, HeaderName},
19 http::StatusCode,
20 middleware,
21 response::IntoResponse,
22 routing::get,
23 Extension, Router, TypedHeader,
24};
25use collections::HashMap;
26use futures::{
27 channel::mpsc,
28 future::{self, BoxFuture},
29 FutureExt, SinkExt, StreamExt, TryStreamExt,
30};
31use lazy_static::lazy_static;
32use rpc::{
33 proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
34 Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
35};
36use std::{
37 any::TypeId,
38 future::Future,
39 marker::PhantomData,
40 net::SocketAddr,
41 ops::{Deref, DerefMut},
42 rc::Rc,
43 sync::{
44 atomic::{AtomicBool, Ordering::SeqCst},
45 Arc,
46 },
47 time::Duration,
48};
49use store::{Store, Worktree};
50use time::OffsetDateTime;
51use tokio::{
52 sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
53 time::Sleep,
54};
55use tower::ServiceBuilder;
56use tracing::{info_span, instrument, Instrument};
57
58type MessageHandler =
59 Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
60
61struct Response<R> {
62 server: Arc<Server>,
63 receipt: Receipt<R>,
64 responded: Arc<AtomicBool>,
65}
66
67impl<R: RequestMessage> Response<R> {
68 fn send(self, payload: R::Response) -> Result<()> {
69 self.responded.store(true, SeqCst);
70 self.server.peer.respond(self.receipt, payload)?;
71 Ok(())
72 }
73
74 fn into_receipt(self) -> Receipt<R> {
75 self.responded.store(true, SeqCst);
76 self.receipt
77 }
78}
79
80pub struct Server {
81 peer: Arc<Peer>,
82 store: RwLock<Store>,
83 app_state: Arc<AppState>,
84 handlers: HashMap<TypeId, MessageHandler>,
85 notifications: Option<mpsc::UnboundedSender<()>>,
86}
87
88pub trait Executor: Send + Clone {
89 type Sleep: Send + Future;
90 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
91 fn sleep(&self, duration: Duration) -> Self::Sleep;
92}
93
94#[derive(Clone)]
95pub struct RealExecutor;
96
97const MESSAGE_COUNT_PER_PAGE: usize = 100;
98const MAX_MESSAGE_LEN: usize = 1024;
99
100struct StoreReadGuard<'a> {
101 guard: RwLockReadGuard<'a, Store>,
102 _not_send: PhantomData<Rc<()>>,
103}
104
105struct StoreWriteGuard<'a> {
106 guard: RwLockWriteGuard<'a, Store>,
107 _not_send: PhantomData<Rc<()>>,
108}
109
110impl Server {
111 pub fn new(
112 app_state: Arc<AppState>,
113 notifications: Option<mpsc::UnboundedSender<()>>,
114 ) -> Arc<Self> {
115 let mut server = Self {
116 peer: Peer::new(),
117 app_state,
118 store: Default::default(),
119 handlers: Default::default(),
120 notifications,
121 };
122
123 server
124 .add_request_handler(Server::ping)
125 .add_request_handler(Server::register_project)
126 .add_message_handler(Server::unregister_project)
127 .add_request_handler(Server::join_project)
128 .add_message_handler(Server::leave_project)
129 .add_message_handler(Server::respond_to_join_project_request)
130 .add_request_handler(Server::register_worktree)
131 .add_message_handler(Server::unregister_worktree)
132 .add_request_handler(Server::update_worktree)
133 .add_message_handler(Server::start_language_server)
134 .add_message_handler(Server::update_language_server)
135 .add_message_handler(Server::update_diagnostic_summary)
136 .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
137 .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
138 .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
139 .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
140 .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
141 .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
142 .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
143 .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
144 .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
145 .add_request_handler(
146 Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
147 )
148 .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
149 .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
150 .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
151 .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
152 .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
153 .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
154 .add_request_handler(Server::forward_project_request::<proto::CreateProjectEntry>)
155 .add_request_handler(Server::forward_project_request::<proto::RenameProjectEntry>)
156 .add_request_handler(Server::forward_project_request::<proto::DeleteProjectEntry>)
157 .add_request_handler(Server::update_buffer)
158 .add_message_handler(Server::update_buffer_file)
159 .add_message_handler(Server::buffer_reloaded)
160 .add_message_handler(Server::buffer_saved)
161 .add_request_handler(Server::save_buffer)
162 .add_request_handler(Server::get_channels)
163 .add_request_handler(Server::get_users)
164 .add_request_handler(Server::fuzzy_search_users)
165 .add_request_handler(Server::request_contact)
166 .add_request_handler(Server::remove_contact)
167 .add_request_handler(Server::respond_to_contact_request)
168 .add_request_handler(Server::join_channel)
169 .add_message_handler(Server::leave_channel)
170 .add_request_handler(Server::send_channel_message)
171 .add_request_handler(Server::follow)
172 .add_message_handler(Server::unfollow)
173 .add_message_handler(Server::update_followers)
174 .add_request_handler(Server::get_channel_messages);
175
176 Arc::new(server)
177 }
178
179 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
180 where
181 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
182 Fut: 'static + Send + Future<Output = Result<()>>,
183 M: EnvelopedMessage,
184 {
185 let prev_handler = self.handlers.insert(
186 TypeId::of::<M>(),
187 Box::new(move |server, envelope| {
188 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
189 let span = info_span!(
190 "handle message",
191 payload_type = envelope.payload_type_name(),
192 payload = format!("{:?}", envelope.payload).as_str(),
193 );
194 let future = (handler)(server, *envelope);
195 async move {
196 if let Err(error) = future.await {
197 tracing::error!(%error, "error handling message");
198 }
199 }
200 .instrument(span)
201 .boxed()
202 }),
203 );
204 if prev_handler.is_some() {
205 panic!("registered a handler for the same message twice");
206 }
207 self
208 }
209
210 /// Handle a request while holding a lock to the store. This is useful when we're registering
211 /// a connection but we want to respond on the connection before anybody else can send on it.
212 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
213 where
214 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>, Response<M>) -> Fut,
215 Fut: Send + Future<Output = Result<()>>,
216 M: RequestMessage,
217 {
218 let handler = Arc::new(handler);
219 self.add_message_handler(move |server, envelope| {
220 let receipt = envelope.receipt();
221 let handler = handler.clone();
222 async move {
223 let responded = Arc::new(AtomicBool::default());
224 let response = Response {
225 server: server.clone(),
226 responded: responded.clone(),
227 receipt: envelope.receipt(),
228 };
229 match (handler)(server.clone(), envelope, response).await {
230 Ok(()) => {
231 if responded.load(std::sync::atomic::Ordering::SeqCst) {
232 Ok(())
233 } else {
234 Err(anyhow!("handler did not send a response"))?
235 }
236 }
237 Err(error) => {
238 server.peer.respond_with_error(
239 receipt,
240 proto::Error {
241 message: error.to_string(),
242 },
243 )?;
244 Err(error)
245 }
246 }
247 }
248 })
249 }
250
251 pub fn handle_connection<E: Executor>(
252 self: &Arc<Self>,
253 connection: Connection,
254 address: String,
255 user: User,
256 mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
257 executor: E,
258 ) -> impl Future<Output = Result<()>> {
259 let mut this = self.clone();
260 let user_id = user.id;
261 let login = user.github_login;
262 let span = info_span!("handle connection", %user_id, %login, %address);
263 async move {
264 let (connection_id, handle_io, mut incoming_rx) = this
265 .peer
266 .add_connection(connection, {
267 let executor = executor.clone();
268 move |duration| {
269 let timer = executor.sleep(duration);
270 async move {
271 timer.await;
272 }
273 }
274 })
275 .await;
276
277 tracing::info!(%user_id, %login, %connection_id, %address, "connection opened");
278
279 if let Some(send_connection_id) = send_connection_id.as_mut() {
280 let _ = send_connection_id.send(connection_id).await;
281 }
282
283 if !user.connected_once {
284 this.peer.send(connection_id, proto::ShowContacts {})?;
285 this.app_state.db.set_user_connected_once(user_id, true).await?;
286 }
287
288 let (contacts, invite_code) = future::try_join(
289 this.app_state.db.get_contacts(user_id),
290 this.app_state.db.get_invite_code_for_user(user_id)
291 ).await?;
292
293 {
294 let mut store = this.store_mut().await;
295 store.add_connection(connection_id, user_id);
296 this.peer.send(connection_id, store.build_initial_contacts_update(contacts))?;
297
298 if let Some((code, count)) = invite_code {
299 this.peer.send(connection_id, proto::UpdateInviteInfo {
300 url: format!("{}{}", this.app_state.invite_link_prefix, code),
301 count,
302 })?;
303 }
304 }
305 this.update_user_contacts(user_id).await?;
306
307 let handle_io = handle_io.fuse();
308 futures::pin_mut!(handle_io);
309 loop {
310 let next_message = incoming_rx.next().fuse();
311 futures::pin_mut!(next_message);
312 futures::select_biased! {
313 result = handle_io => {
314 if let Err(error) = result {
315 tracing::error!(?error, %user_id, %login, %connection_id, %address, "error handling I/O");
316 }
317 break;
318 }
319 message = next_message => {
320 if let Some(message) = message {
321 let type_name = message.payload_type_name();
322 let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name);
323 async {
324 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
325 let notifications = this.notifications.clone();
326 let is_background = message.is_background();
327 let handle_message = (handler)(this.clone(), message);
328 let handle_message = async move {
329 handle_message.await;
330 if let Some(mut notifications) = notifications {
331 let _ = notifications.send(()).await;
332 }
333 };
334 if is_background {
335 executor.spawn_detached(handle_message);
336 } else {
337 handle_message.await;
338 }
339 } else {
340 tracing::error!(%user_id, %login, %connection_id, %address, "no message handler");
341 }
342 }.instrument(span).await;
343 } else {
344 tracing::info!(%user_id, %login, %connection_id, %address, "connection closed");
345 break;
346 }
347 }
348 }
349 }
350
351 tracing::info!(%user_id, %login, %connection_id, %address, "signing out");
352 if let Err(error) = this.sign_out(connection_id).await {
353 tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out");
354 }
355
356 Ok(())
357 }.instrument(span)
358 }
359
360 #[instrument(skip(self), err)]
361 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
362 self.peer.disconnect(connection_id);
363
364 let removed_user_id = {
365 let mut store = self.store_mut().await;
366 let removed_connection = store.remove_connection(connection_id)?;
367
368 for (project_id, project) in removed_connection.hosted_projects {
369 broadcast(connection_id, project.guests.keys().copied(), |conn_id| {
370 self.peer
371 .send(conn_id, proto::UnregisterProject { project_id })
372 });
373
374 for (_, receipts) in project.join_requests {
375 for receipt in receipts {
376 self.peer.respond(
377 receipt,
378 proto::JoinProjectResponse {
379 variant: Some(proto::join_project_response::Variant::Decline(
380 proto::join_project_response::Decline {
381 reason: proto::join_project_response::decline::Reason::WentOffline as i32
382 },
383 )),
384 },
385 )?;
386 }
387 }
388 }
389
390 for project_id in removed_connection.guest_project_ids {
391 if let Some(project) = store.project(project_id).trace_err() {
392 broadcast(connection_id, project.connection_ids(), |conn_id| {
393 self.peer.send(
394 conn_id,
395 proto::RemoveProjectCollaborator {
396 project_id,
397 peer_id: connection_id.0,
398 },
399 )
400 });
401 if project.guests.is_empty() {
402 self.peer
403 .send(
404 project.host_connection_id,
405 proto::ProjectUnshared { project_id },
406 )
407 .trace_err();
408 }
409 }
410 }
411
412 removed_connection.user_id
413 };
414
415 self.update_user_contacts(removed_user_id).await?;
416
417 Ok(())
418 }
419
420 pub async fn invite_code_redeemed(self: &Arc<Self>, code: &str, invitee_id: UserId) -> Result<()> {
421 let user = self.app_state.db.get_user_for_invite_code(code).await?;
422 let store = self.store().await;
423 let invitee_contact = store.contact_for_user(invitee_id, true);
424 for connection_id in store.connection_ids_for_user(user.id) {
425 self.peer.send(connection_id, proto::UpdateContacts {
426 contacts: vec![invitee_contact.clone()],
427 ..Default::default()
428 })?;
429 self.peer.send(connection_id, proto::UpdateInviteInfo {
430 url: format!("{}{}", self.app_state.invite_link_prefix, code),
431 count: user.invite_count as u32,
432 })?;
433 }
434 Ok(())
435 }
436
437 async fn ping(
438 self: Arc<Server>,
439 _: TypedEnvelope<proto::Ping>,
440 response: Response<proto::Ping>,
441 ) -> Result<()> {
442 response.send(proto::Ack {})?;
443 Ok(())
444 }
445
446 async fn register_project(
447 self: Arc<Server>,
448 request: TypedEnvelope<proto::RegisterProject>,
449 response: Response<proto::RegisterProject>,
450 ) -> Result<()> {
451 let user_id;
452 let project_id;
453 {
454 let mut state = self.store_mut().await;
455 user_id = state.user_id_for_connection(request.sender_id)?;
456 project_id = state.register_project(request.sender_id, user_id);
457 };
458 self.update_user_contacts(user_id).await?;
459 response.send(proto::RegisterProjectResponse { project_id })?;
460 Ok(())
461 }
462
463 async fn unregister_project(
464 self: Arc<Server>,
465 request: TypedEnvelope<proto::UnregisterProject>,
466 ) -> Result<()> {
467 let (user_id, project) = {
468 let mut state = self.store_mut().await;
469 let project =
470 state.unregister_project(request.payload.project_id, request.sender_id)?;
471 (state.user_id_for_connection(request.sender_id)?, project)
472 };
473
474 broadcast(
475 request.sender_id,
476 project.guests.keys().copied(),
477 |conn_id| {
478 self.peer.send(
479 conn_id,
480 proto::UnregisterProject {
481 project_id: request.payload.project_id,
482 },
483 )
484 },
485 );
486 for (_, receipts) in project.join_requests {
487 for receipt in receipts {
488 self.peer.respond(
489 receipt,
490 proto::JoinProjectResponse {
491 variant: Some(proto::join_project_response::Variant::Decline(
492 proto::join_project_response::Decline {
493 reason: proto::join_project_response::decline::Reason::Closed
494 as i32,
495 },
496 )),
497 },
498 )?;
499 }
500 }
501
502 self.update_user_contacts(user_id).await?;
503 Ok(())
504 }
505
506 async fn update_user_contacts(self: &Arc<Server>, user_id: UserId) -> Result<()> {
507 let contacts = self.app_state.db.get_contacts(user_id).await?;
508 let store = self.store().await;
509 let updated_contact = store.contact_for_user(user_id, false);
510 for contact in contacts {
511 if let db::Contact::Accepted {
512 user_id: contact_user_id,
513 ..
514 } = contact
515 {
516 for contact_conn_id in store.connection_ids_for_user(contact_user_id) {
517 self.peer
518 .send(
519 contact_conn_id,
520 proto::UpdateContacts {
521 contacts: vec![updated_contact.clone()],
522 remove_contacts: Default::default(),
523 incoming_requests: Default::default(),
524 remove_incoming_requests: Default::default(),
525 outgoing_requests: Default::default(),
526 remove_outgoing_requests: Default::default(),
527 },
528 )
529 .trace_err();
530 }
531 }
532 }
533 Ok(())
534 }
535
536 async fn join_project(
537 self: Arc<Server>,
538 request: TypedEnvelope<proto::JoinProject>,
539 response: Response<proto::JoinProject>,
540 ) -> Result<()> {
541 let project_id = request.payload.project_id;
542 let host_user_id;
543 let guest_user_id;
544 let host_connection_id;
545 {
546 let state = self.store().await;
547 let project = state.project(project_id)?;
548 host_user_id = project.host_user_id;
549 host_connection_id = project.host_connection_id;
550 guest_user_id = state.user_id_for_connection(request.sender_id)?;
551 };
552
553 let has_contact = self
554 .app_state
555 .db
556 .has_contact(guest_user_id, host_user_id)
557 .await?;
558 if !has_contact {
559 return Err(anyhow!("no such project"))?;
560 }
561
562 self.store_mut().await.request_join_project(
563 guest_user_id,
564 project_id,
565 response.into_receipt(),
566 )?;
567 self.peer.send(
568 host_connection_id,
569 proto::RequestJoinProject {
570 project_id,
571 requester_id: guest_user_id.to_proto(),
572 },
573 )?;
574 Ok(())
575 }
576
577 async fn respond_to_join_project_request(
578 self: Arc<Server>,
579 request: TypedEnvelope<proto::RespondToJoinProjectRequest>,
580 ) -> Result<()> {
581 let host_user_id;
582
583 {
584 let mut state = self.store_mut().await;
585 let project_id = request.payload.project_id;
586 let project = state.project(project_id)?;
587 if project.host_connection_id != request.sender_id {
588 Err(anyhow!("no such connection"))?;
589 }
590
591 host_user_id = project.host_user_id;
592 let guest_user_id = UserId::from_proto(request.payload.requester_id);
593
594 if !request.payload.allow {
595 let receipts = state
596 .deny_join_project_request(request.sender_id, guest_user_id, project_id)
597 .ok_or_else(|| anyhow!("no such request"))?;
598 for receipt in receipts {
599 self.peer.respond(
600 receipt,
601 proto::JoinProjectResponse {
602 variant: Some(proto::join_project_response::Variant::Decline(
603 proto::join_project_response::Decline {
604 reason: proto::join_project_response::decline::Reason::Declined
605 as i32,
606 },
607 )),
608 },
609 )?;
610 }
611 return Ok(());
612 }
613
614 let (receipts_with_replica_ids, project) = state
615 .accept_join_project_request(request.sender_id, guest_user_id, project_id)
616 .ok_or_else(|| anyhow!("no such request"))?;
617
618 let peer_count = project.guests.len();
619 let mut collaborators = Vec::with_capacity(peer_count);
620 collaborators.push(proto::Collaborator {
621 peer_id: project.host_connection_id.0,
622 replica_id: 0,
623 user_id: project.host_user_id.to_proto(),
624 });
625 let worktrees = project
626 .worktrees
627 .iter()
628 .filter_map(|(id, shared_worktree)| {
629 let worktree = project.worktrees.get(&id)?;
630 Some(proto::Worktree {
631 id: *id,
632 root_name: worktree.root_name.clone(),
633 entries: shared_worktree.entries.values().cloned().collect(),
634 diagnostic_summaries: shared_worktree
635 .diagnostic_summaries
636 .values()
637 .cloned()
638 .collect(),
639 visible: worktree.visible,
640 scan_id: shared_worktree.scan_id,
641 })
642 })
643 .collect::<Vec<_>>();
644
645 // Add all guests other than the requesting user's own connections as collaborators
646 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &project.guests {
647 if receipts_with_replica_ids
648 .iter()
649 .all(|(receipt, _)| receipt.sender_id != *peer_conn_id)
650 {
651 collaborators.push(proto::Collaborator {
652 peer_id: peer_conn_id.0,
653 replica_id: *peer_replica_id as u32,
654 user_id: peer_user_id.to_proto(),
655 });
656 }
657 }
658
659 for conn_id in project.connection_ids() {
660 for (receipt, replica_id) in &receipts_with_replica_ids {
661 if conn_id != receipt.sender_id {
662 self.peer.send(
663 conn_id,
664 proto::AddProjectCollaborator {
665 project_id,
666 collaborator: Some(proto::Collaborator {
667 peer_id: receipt.sender_id.0,
668 replica_id: *replica_id as u32,
669 user_id: guest_user_id.to_proto(),
670 }),
671 },
672 )?;
673 }
674 }
675 }
676
677 for (receipt, replica_id) in receipts_with_replica_ids {
678 self.peer.respond(
679 receipt,
680 proto::JoinProjectResponse {
681 variant: Some(proto::join_project_response::Variant::Accept(
682 proto::join_project_response::Accept {
683 worktrees: worktrees.clone(),
684 replica_id: replica_id as u32,
685 collaborators: collaborators.clone(),
686 language_servers: project.language_servers.clone(),
687 },
688 )),
689 },
690 )?;
691 }
692 }
693
694 self.update_user_contacts(host_user_id).await?;
695 Ok(())
696 }
697
698 async fn leave_project(
699 self: Arc<Server>,
700 request: TypedEnvelope<proto::LeaveProject>,
701 ) -> Result<()> {
702 let sender_id = request.sender_id;
703 let project_id = request.payload.project_id;
704 let project;
705 {
706 let mut store = self.store_mut().await;
707 project = store.leave_project(sender_id, project_id)?;
708
709 if project.remove_collaborator {
710 broadcast(sender_id, project.connection_ids, |conn_id| {
711 self.peer.send(
712 conn_id,
713 proto::RemoveProjectCollaborator {
714 project_id,
715 peer_id: sender_id.0,
716 },
717 )
718 });
719 }
720
721 if let Some(requester_id) = project.cancel_request {
722 self.peer.send(
723 project.host_connection_id,
724 proto::JoinProjectRequestCancelled {
725 project_id,
726 requester_id: requester_id.to_proto(),
727 },
728 )?;
729 }
730
731 if project.unshare {
732 self.peer.send(
733 project.host_connection_id,
734 proto::ProjectUnshared { project_id },
735 )?;
736 }
737 }
738 self.update_user_contacts(project.host_user_id).await?;
739 Ok(())
740 }
741
742 async fn register_worktree(
743 self: Arc<Server>,
744 request: TypedEnvelope<proto::RegisterWorktree>,
745 response: Response<proto::RegisterWorktree>,
746 ) -> Result<()> {
747 let host_user_id;
748 {
749 let mut state = self.store_mut().await;
750 host_user_id = state.user_id_for_connection(request.sender_id)?;
751
752 let guest_connection_ids = state
753 .read_project(request.payload.project_id, request.sender_id)?
754 .guest_connection_ids();
755 state.register_worktree(
756 request.payload.project_id,
757 request.payload.worktree_id,
758 request.sender_id,
759 Worktree {
760 root_name: request.payload.root_name.clone(),
761 visible: request.payload.visible,
762 ..Default::default()
763 },
764 )?;
765
766 broadcast(request.sender_id, guest_connection_ids, |connection_id| {
767 self.peer
768 .forward_send(request.sender_id, connection_id, request.payload.clone())
769 });
770 }
771 self.update_user_contacts(host_user_id).await?;
772 response.send(proto::Ack {})?;
773 Ok(())
774 }
775
776 async fn unregister_worktree(
777 self: Arc<Server>,
778 request: TypedEnvelope<proto::UnregisterWorktree>,
779 ) -> Result<()> {
780 let host_user_id;
781 let project_id = request.payload.project_id;
782 let worktree_id = request.payload.worktree_id;
783 {
784 let mut state = self.store_mut().await;
785 let (_, guest_connection_ids) =
786 state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
787 host_user_id = state.user_id_for_connection(request.sender_id)?;
788 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
789 self.peer.send(
790 conn_id,
791 proto::UnregisterWorktree {
792 project_id,
793 worktree_id,
794 },
795 )
796 });
797 }
798 self.update_user_contacts(host_user_id).await?;
799 Ok(())
800 }
801
802 async fn update_worktree(
803 self: Arc<Server>,
804 request: TypedEnvelope<proto::UpdateWorktree>,
805 response: Response<proto::UpdateWorktree>,
806 ) -> Result<()> {
807 let connection_ids = self.store_mut().await.update_worktree(
808 request.sender_id,
809 request.payload.project_id,
810 request.payload.worktree_id,
811 &request.payload.removed_entries,
812 &request.payload.updated_entries,
813 request.payload.scan_id,
814 )?;
815
816 broadcast(request.sender_id, connection_ids, |connection_id| {
817 self.peer
818 .forward_send(request.sender_id, connection_id, request.payload.clone())
819 });
820 response.send(proto::Ack {})?;
821 Ok(())
822 }
823
824 async fn update_diagnostic_summary(
825 self: Arc<Server>,
826 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
827 ) -> Result<()> {
828 let summary = request
829 .payload
830 .summary
831 .clone()
832 .ok_or_else(|| anyhow!("invalid summary"))?;
833 let receiver_ids = self.store_mut().await.update_diagnostic_summary(
834 request.payload.project_id,
835 request.payload.worktree_id,
836 request.sender_id,
837 summary,
838 )?;
839
840 broadcast(request.sender_id, receiver_ids, |connection_id| {
841 self.peer
842 .forward_send(request.sender_id, connection_id, request.payload.clone())
843 });
844 Ok(())
845 }
846
847 async fn start_language_server(
848 self: Arc<Server>,
849 request: TypedEnvelope<proto::StartLanguageServer>,
850 ) -> Result<()> {
851 let receiver_ids = self.store_mut().await.start_language_server(
852 request.payload.project_id,
853 request.sender_id,
854 request
855 .payload
856 .server
857 .clone()
858 .ok_or_else(|| anyhow!("invalid language server"))?,
859 )?;
860 broadcast(request.sender_id, receiver_ids, |connection_id| {
861 self.peer
862 .forward_send(request.sender_id, connection_id, request.payload.clone())
863 });
864 Ok(())
865 }
866
867 async fn update_language_server(
868 self: Arc<Server>,
869 request: TypedEnvelope<proto::UpdateLanguageServer>,
870 ) -> Result<()> {
871 let receiver_ids = self
872 .store()
873 .await
874 .project_connection_ids(request.payload.project_id, request.sender_id)?;
875 broadcast(request.sender_id, receiver_ids, |connection_id| {
876 self.peer
877 .forward_send(request.sender_id, connection_id, request.payload.clone())
878 });
879 Ok(())
880 }
881
882 async fn forward_project_request<T>(
883 self: Arc<Server>,
884 request: TypedEnvelope<T>,
885 response: Response<T>,
886 ) -> Result<()>
887 where
888 T: EntityMessage + RequestMessage,
889 {
890 let host_connection_id = self
891 .store()
892 .await
893 .read_project(request.payload.remote_entity_id(), request.sender_id)?
894 .host_connection_id;
895
896 response.send(
897 self.peer
898 .forward_request(request.sender_id, host_connection_id, request.payload)
899 .await?,
900 )?;
901 Ok(())
902 }
903
904 async fn save_buffer(
905 self: Arc<Server>,
906 request: TypedEnvelope<proto::SaveBuffer>,
907 response: Response<proto::SaveBuffer>,
908 ) -> Result<()> {
909 let host = self
910 .store()
911 .await
912 .read_project(request.payload.project_id, request.sender_id)?
913 .host_connection_id;
914 let response_payload = self
915 .peer
916 .forward_request(request.sender_id, host, request.payload.clone())
917 .await?;
918
919 let mut guests = self
920 .store()
921 .await
922 .read_project(request.payload.project_id, request.sender_id)?
923 .connection_ids();
924 guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
925 broadcast(host, guests, |conn_id| {
926 self.peer
927 .forward_send(host, conn_id, response_payload.clone())
928 });
929 response.send(response_payload)?;
930 Ok(())
931 }
932
933 async fn update_buffer(
934 self: Arc<Server>,
935 request: TypedEnvelope<proto::UpdateBuffer>,
936 response: Response<proto::UpdateBuffer>,
937 ) -> Result<()> {
938 let receiver_ids = self
939 .store()
940 .await
941 .project_connection_ids(request.payload.project_id, request.sender_id)?;
942 broadcast(request.sender_id, receiver_ids, |connection_id| {
943 self.peer
944 .forward_send(request.sender_id, connection_id, request.payload.clone())
945 });
946 response.send(proto::Ack {})?;
947 Ok(())
948 }
949
950 async fn update_buffer_file(
951 self: Arc<Server>,
952 request: TypedEnvelope<proto::UpdateBufferFile>,
953 ) -> Result<()> {
954 let receiver_ids = self
955 .store()
956 .await
957 .project_connection_ids(request.payload.project_id, request.sender_id)?;
958 broadcast(request.sender_id, receiver_ids, |connection_id| {
959 self.peer
960 .forward_send(request.sender_id, connection_id, request.payload.clone())
961 });
962 Ok(())
963 }
964
965 async fn buffer_reloaded(
966 self: Arc<Server>,
967 request: TypedEnvelope<proto::BufferReloaded>,
968 ) -> Result<()> {
969 let receiver_ids = self
970 .store()
971 .await
972 .project_connection_ids(request.payload.project_id, request.sender_id)?;
973 broadcast(request.sender_id, receiver_ids, |connection_id| {
974 self.peer
975 .forward_send(request.sender_id, connection_id, request.payload.clone())
976 });
977 Ok(())
978 }
979
980 async fn buffer_saved(
981 self: Arc<Server>,
982 request: TypedEnvelope<proto::BufferSaved>,
983 ) -> Result<()> {
984 let receiver_ids = self
985 .store()
986 .await
987 .project_connection_ids(request.payload.project_id, request.sender_id)?;
988 broadcast(request.sender_id, receiver_ids, |connection_id| {
989 self.peer
990 .forward_send(request.sender_id, connection_id, request.payload.clone())
991 });
992 Ok(())
993 }
994
995 async fn follow(
996 self: Arc<Self>,
997 request: TypedEnvelope<proto::Follow>,
998 response: Response<proto::Follow>,
999 ) -> Result<()> {
1000 let leader_id = ConnectionId(request.payload.leader_id);
1001 let follower_id = request.sender_id;
1002 if !self
1003 .store()
1004 .await
1005 .project_connection_ids(request.payload.project_id, follower_id)?
1006 .contains(&leader_id)
1007 {
1008 Err(anyhow!("no such peer"))?;
1009 }
1010 let mut response_payload = self
1011 .peer
1012 .forward_request(request.sender_id, leader_id, request.payload)
1013 .await?;
1014 response_payload
1015 .views
1016 .retain(|view| view.leader_id != Some(follower_id.0));
1017 response.send(response_payload)?;
1018 Ok(())
1019 }
1020
1021 async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
1022 let leader_id = ConnectionId(request.payload.leader_id);
1023 if !self
1024 .store()
1025 .await
1026 .project_connection_ids(request.payload.project_id, request.sender_id)?
1027 .contains(&leader_id)
1028 {
1029 Err(anyhow!("no such peer"))?;
1030 }
1031 self.peer
1032 .forward_send(request.sender_id, leader_id, request.payload)?;
1033 Ok(())
1034 }
1035
1036 async fn update_followers(
1037 self: Arc<Self>,
1038 request: TypedEnvelope<proto::UpdateFollowers>,
1039 ) -> Result<()> {
1040 let connection_ids = self
1041 .store()
1042 .await
1043 .project_connection_ids(request.payload.project_id, request.sender_id)?;
1044 let leader_id = request
1045 .payload
1046 .variant
1047 .as_ref()
1048 .and_then(|variant| match variant {
1049 proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
1050 proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
1051 proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
1052 });
1053 for follower_id in &request.payload.follower_ids {
1054 let follower_id = ConnectionId(*follower_id);
1055 if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
1056 self.peer
1057 .forward_send(request.sender_id, follower_id, request.payload.clone())?;
1058 }
1059 }
1060 Ok(())
1061 }
1062
1063 async fn get_channels(
1064 self: Arc<Server>,
1065 request: TypedEnvelope<proto::GetChannels>,
1066 response: Response<proto::GetChannels>,
1067 ) -> Result<()> {
1068 let user_id = self
1069 .store()
1070 .await
1071 .user_id_for_connection(request.sender_id)?;
1072 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
1073 response.send(proto::GetChannelsResponse {
1074 channels: channels
1075 .into_iter()
1076 .map(|chan| proto::Channel {
1077 id: chan.id.to_proto(),
1078 name: chan.name,
1079 })
1080 .collect(),
1081 })?;
1082 Ok(())
1083 }
1084
1085 async fn get_users(
1086 self: Arc<Server>,
1087 request: TypedEnvelope<proto::GetUsers>,
1088 response: Response<proto::GetUsers>,
1089 ) -> Result<()> {
1090 let user_ids = request
1091 .payload
1092 .user_ids
1093 .into_iter()
1094 .map(UserId::from_proto)
1095 .collect();
1096 let users = self
1097 .app_state
1098 .db
1099 .get_users_by_ids(user_ids)
1100 .await?
1101 .into_iter()
1102 .map(|user| proto::User {
1103 id: user.id.to_proto(),
1104 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1105 github_login: user.github_login,
1106 })
1107 .collect();
1108 response.send(proto::UsersResponse { users })?;
1109 Ok(())
1110 }
1111
1112 async fn fuzzy_search_users(
1113 self: Arc<Server>,
1114 request: TypedEnvelope<proto::FuzzySearchUsers>,
1115 response: Response<proto::FuzzySearchUsers>,
1116 ) -> Result<()> {
1117 let user_id = self
1118 .store()
1119 .await
1120 .user_id_for_connection(request.sender_id)?;
1121 let query = request.payload.query;
1122 let db = &self.app_state.db;
1123 let users = match query.len() {
1124 0 => vec![],
1125 1 | 2 => db
1126 .get_user_by_github_login(&query)
1127 .await?
1128 .into_iter()
1129 .collect(),
1130 _ => db.fuzzy_search_users(&query, 10).await?,
1131 };
1132 let users = users
1133 .into_iter()
1134 .filter(|user| user.id != user_id)
1135 .map(|user| proto::User {
1136 id: user.id.to_proto(),
1137 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1138 github_login: user.github_login,
1139 })
1140 .collect();
1141 response.send(proto::UsersResponse { users })?;
1142 Ok(())
1143 }
1144
1145 async fn request_contact(
1146 self: Arc<Server>,
1147 request: TypedEnvelope<proto::RequestContact>,
1148 response: Response<proto::RequestContact>,
1149 ) -> Result<()> {
1150 let requester_id = self
1151 .store()
1152 .await
1153 .user_id_for_connection(request.sender_id)?;
1154 let responder_id = UserId::from_proto(request.payload.responder_id);
1155 if requester_id == responder_id {
1156 return Err(anyhow!("cannot add yourself as a contact"))?;
1157 }
1158
1159 self.app_state
1160 .db
1161 .send_contact_request(requester_id, responder_id)
1162 .await?;
1163
1164 // Update outgoing contact requests of requester
1165 let mut update = proto::UpdateContacts::default();
1166 update.outgoing_requests.push(responder_id.to_proto());
1167 for connection_id in self.store().await.connection_ids_for_user(requester_id) {
1168 self.peer.send(connection_id, update.clone())?;
1169 }
1170
1171 // Update incoming contact requests of responder
1172 let mut update = proto::UpdateContacts::default();
1173 update
1174 .incoming_requests
1175 .push(proto::IncomingContactRequest {
1176 requester_id: requester_id.to_proto(),
1177 should_notify: true,
1178 });
1179 for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1180 self.peer.send(connection_id, update.clone())?;
1181 }
1182
1183 response.send(proto::Ack {})?;
1184 Ok(())
1185 }
1186
1187 async fn respond_to_contact_request(
1188 self: Arc<Server>,
1189 request: TypedEnvelope<proto::RespondToContactRequest>,
1190 response: Response<proto::RespondToContactRequest>,
1191 ) -> Result<()> {
1192 let responder_id = self
1193 .store()
1194 .await
1195 .user_id_for_connection(request.sender_id)?;
1196 let requester_id = UserId::from_proto(request.payload.requester_id);
1197 if request.payload.response == proto::ContactRequestResponse::Dismiss as i32 {
1198 self.app_state
1199 .db
1200 .dismiss_contact_notification(responder_id, requester_id)
1201 .await?;
1202 } else {
1203 let accept = request.payload.response == proto::ContactRequestResponse::Accept as i32;
1204 self.app_state
1205 .db
1206 .respond_to_contact_request(responder_id, requester_id, accept)
1207 .await?;
1208
1209 let store = self.store().await;
1210 // Update responder with new contact
1211 let mut update = proto::UpdateContacts::default();
1212 if accept {
1213 update
1214 .contacts
1215 .push(store.contact_for_user(requester_id, false));
1216 }
1217 update
1218 .remove_incoming_requests
1219 .push(requester_id.to_proto());
1220 for connection_id in store.connection_ids_for_user(responder_id) {
1221 self.peer.send(connection_id, update.clone())?;
1222 }
1223
1224 // Update requester with new contact
1225 let mut update = proto::UpdateContacts::default();
1226 if accept {
1227 update
1228 .contacts
1229 .push(store.contact_for_user(responder_id, true));
1230 }
1231 update
1232 .remove_outgoing_requests
1233 .push(responder_id.to_proto());
1234 for connection_id in store.connection_ids_for_user(requester_id) {
1235 self.peer.send(connection_id, update.clone())?;
1236 }
1237 }
1238
1239 response.send(proto::Ack {})?;
1240 Ok(())
1241 }
1242
1243 async fn remove_contact(
1244 self: Arc<Server>,
1245 request: TypedEnvelope<proto::RemoveContact>,
1246 response: Response<proto::RemoveContact>,
1247 ) -> Result<()> {
1248 let requester_id = self
1249 .store()
1250 .await
1251 .user_id_for_connection(request.sender_id)?;
1252 let responder_id = UserId::from_proto(request.payload.user_id);
1253 self.app_state
1254 .db
1255 .remove_contact(requester_id, responder_id)
1256 .await?;
1257
1258 // Update outgoing contact requests of requester
1259 let mut update = proto::UpdateContacts::default();
1260 update
1261 .remove_outgoing_requests
1262 .push(responder_id.to_proto());
1263 for connection_id in self.store().await.connection_ids_for_user(requester_id) {
1264 self.peer.send(connection_id, update.clone())?;
1265 }
1266
1267 // Update incoming contact requests of responder
1268 let mut update = proto::UpdateContacts::default();
1269 update
1270 .remove_incoming_requests
1271 .push(requester_id.to_proto());
1272 for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1273 self.peer.send(connection_id, update.clone())?;
1274 }
1275
1276 response.send(proto::Ack {})?;
1277 Ok(())
1278 }
1279
1280 async fn join_channel(
1281 self: Arc<Self>,
1282 request: TypedEnvelope<proto::JoinChannel>,
1283 response: Response<proto::JoinChannel>,
1284 ) -> Result<()> {
1285 let user_id = self
1286 .store()
1287 .await
1288 .user_id_for_connection(request.sender_id)?;
1289 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1290 if !self
1291 .app_state
1292 .db
1293 .can_user_access_channel(user_id, channel_id)
1294 .await?
1295 {
1296 Err(anyhow!("access denied"))?;
1297 }
1298
1299 self.store_mut()
1300 .await
1301 .join_channel(request.sender_id, channel_id);
1302 let messages = self
1303 .app_state
1304 .db
1305 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
1306 .await?
1307 .into_iter()
1308 .map(|msg| proto::ChannelMessage {
1309 id: msg.id.to_proto(),
1310 body: msg.body,
1311 timestamp: msg.sent_at.unix_timestamp() as u64,
1312 sender_id: msg.sender_id.to_proto(),
1313 nonce: Some(msg.nonce.as_u128().into()),
1314 })
1315 .collect::<Vec<_>>();
1316 response.send(proto::JoinChannelResponse {
1317 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1318 messages,
1319 })?;
1320 Ok(())
1321 }
1322
1323 async fn leave_channel(
1324 self: Arc<Self>,
1325 request: TypedEnvelope<proto::LeaveChannel>,
1326 ) -> Result<()> {
1327 let user_id = self
1328 .store()
1329 .await
1330 .user_id_for_connection(request.sender_id)?;
1331 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1332 if !self
1333 .app_state
1334 .db
1335 .can_user_access_channel(user_id, channel_id)
1336 .await?
1337 {
1338 Err(anyhow!("access denied"))?;
1339 }
1340
1341 self.store_mut()
1342 .await
1343 .leave_channel(request.sender_id, channel_id);
1344
1345 Ok(())
1346 }
1347
1348 async fn send_channel_message(
1349 self: Arc<Self>,
1350 request: TypedEnvelope<proto::SendChannelMessage>,
1351 response: Response<proto::SendChannelMessage>,
1352 ) -> Result<()> {
1353 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1354 let user_id;
1355 let connection_ids;
1356 {
1357 let state = self.store().await;
1358 user_id = state.user_id_for_connection(request.sender_id)?;
1359 connection_ids = state.channel_connection_ids(channel_id)?;
1360 }
1361
1362 // Validate the message body.
1363 let body = request.payload.body.trim().to_string();
1364 if body.len() > MAX_MESSAGE_LEN {
1365 return Err(anyhow!("message is too long"))?;
1366 }
1367 if body.is_empty() {
1368 return Err(anyhow!("message can't be blank"))?;
1369 }
1370
1371 let timestamp = OffsetDateTime::now_utc();
1372 let nonce = request
1373 .payload
1374 .nonce
1375 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
1376
1377 let message_id = self
1378 .app_state
1379 .db
1380 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1381 .await?
1382 .to_proto();
1383 let message = proto::ChannelMessage {
1384 sender_id: user_id.to_proto(),
1385 id: message_id,
1386 body,
1387 timestamp: timestamp.unix_timestamp() as u64,
1388 nonce: Some(nonce),
1389 };
1390 broadcast(request.sender_id, connection_ids, |conn_id| {
1391 self.peer.send(
1392 conn_id,
1393 proto::ChannelMessageSent {
1394 channel_id: channel_id.to_proto(),
1395 message: Some(message.clone()),
1396 },
1397 )
1398 });
1399 response.send(proto::SendChannelMessageResponse {
1400 message: Some(message),
1401 })?;
1402 Ok(())
1403 }
1404
1405 async fn get_channel_messages(
1406 self: Arc<Self>,
1407 request: TypedEnvelope<proto::GetChannelMessages>,
1408 response: Response<proto::GetChannelMessages>,
1409 ) -> Result<()> {
1410 let user_id = self
1411 .store()
1412 .await
1413 .user_id_for_connection(request.sender_id)?;
1414 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1415 if !self
1416 .app_state
1417 .db
1418 .can_user_access_channel(user_id, channel_id)
1419 .await?
1420 {
1421 Err(anyhow!("access denied"))?;
1422 }
1423
1424 let messages = self
1425 .app_state
1426 .db
1427 .get_channel_messages(
1428 channel_id,
1429 MESSAGE_COUNT_PER_PAGE,
1430 Some(MessageId::from_proto(request.payload.before_message_id)),
1431 )
1432 .await?
1433 .into_iter()
1434 .map(|msg| proto::ChannelMessage {
1435 id: msg.id.to_proto(),
1436 body: msg.body,
1437 timestamp: msg.sent_at.unix_timestamp() as u64,
1438 sender_id: msg.sender_id.to_proto(),
1439 nonce: Some(msg.nonce.as_u128().into()),
1440 })
1441 .collect::<Vec<_>>();
1442 response.send(proto::GetChannelMessagesResponse {
1443 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1444 messages,
1445 })?;
1446 Ok(())
1447 }
1448
1449 async fn store<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1450 #[cfg(test)]
1451 tokio::task::yield_now().await;
1452 let guard = self.store.read().await;
1453 #[cfg(test)]
1454 tokio::task::yield_now().await;
1455 StoreReadGuard {
1456 guard,
1457 _not_send: PhantomData,
1458 }
1459 }
1460
1461 async fn store_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1462 #[cfg(test)]
1463 tokio::task::yield_now().await;
1464 let guard = self.store.write().await;
1465 #[cfg(test)]
1466 tokio::task::yield_now().await;
1467 StoreWriteGuard {
1468 guard,
1469 _not_send: PhantomData,
1470 }
1471 }
1472}
1473
1474impl<'a> Deref for StoreReadGuard<'a> {
1475 type Target = Store;
1476
1477 fn deref(&self) -> &Self::Target {
1478 &*self.guard
1479 }
1480}
1481
1482impl<'a> Deref for StoreWriteGuard<'a> {
1483 type Target = Store;
1484
1485 fn deref(&self) -> &Self::Target {
1486 &*self.guard
1487 }
1488}
1489
1490impl<'a> DerefMut for StoreWriteGuard<'a> {
1491 fn deref_mut(&mut self) -> &mut Self::Target {
1492 &mut *self.guard
1493 }
1494}
1495
1496impl<'a> Drop for StoreWriteGuard<'a> {
1497 fn drop(&mut self) {
1498 #[cfg(test)]
1499 self.check_invariants();
1500
1501 let metrics = self.metrics();
1502 tracing::info!(
1503 connections = metrics.connections,
1504 registered_projects = metrics.registered_projects,
1505 shared_projects = metrics.shared_projects,
1506 "metrics"
1507 );
1508 }
1509}
1510
1511impl Executor for RealExecutor {
1512 type Sleep = Sleep;
1513
1514 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1515 tokio::task::spawn(future);
1516 }
1517
1518 fn sleep(&self, duration: Duration) -> Self::Sleep {
1519 tokio::time::sleep(duration)
1520 }
1521}
1522
1523fn broadcast<F>(
1524 sender_id: ConnectionId,
1525 receiver_ids: impl IntoIterator<Item = ConnectionId>,
1526 mut f: F,
1527) where
1528 F: FnMut(ConnectionId) -> anyhow::Result<()>,
1529{
1530 for receiver_id in receiver_ids {
1531 if receiver_id != sender_id {
1532 f(receiver_id).trace_err();
1533 }
1534 }
1535}
1536
1537lazy_static! {
1538 static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1539}
1540
1541pub struct ProtocolVersion(u32);
1542
1543impl Header for ProtocolVersion {
1544 fn name() -> &'static HeaderName {
1545 &ZED_PROTOCOL_VERSION
1546 }
1547
1548 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1549 where
1550 Self: Sized,
1551 I: Iterator<Item = &'i axum::http::HeaderValue>,
1552 {
1553 let version = values
1554 .next()
1555 .ok_or_else(|| axum::headers::Error::invalid())?
1556 .to_str()
1557 .map_err(|_| axum::headers::Error::invalid())?
1558 .parse()
1559 .map_err(|_| axum::headers::Error::invalid())?;
1560 Ok(Self(version))
1561 }
1562
1563 fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1564 values.extend([self.0.to_string().parse().unwrap()]);
1565 }
1566}
1567
1568pub fn routes(server: Arc<Server>) -> Router<Body> {
1569 Router::new()
1570 .route("/rpc", get(handle_websocket_request))
1571 .layer(
1572 ServiceBuilder::new()
1573 .layer(Extension(server.app_state.clone()))
1574 .layer(middleware::from_fn(auth::validate_header))
1575 .layer(Extension(server)),
1576 )
1577}
1578
1579pub async fn handle_websocket_request(
1580 TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1581 ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1582 Extension(server): Extension<Arc<Server>>,
1583 Extension(user): Extension<User>,
1584 ws: WebSocketUpgrade,
1585) -> axum::response::Response {
1586 if protocol_version != rpc::PROTOCOL_VERSION {
1587 return (
1588 StatusCode::UPGRADE_REQUIRED,
1589 "client must be upgraded".to_string(),
1590 )
1591 .into_response();
1592 }
1593 let socket_address = socket_address.to_string();
1594 ws.on_upgrade(move |socket| {
1595 use util::ResultExt;
1596 let socket = socket
1597 .map_ok(to_tungstenite_message)
1598 .err_into()
1599 .with(|message| async move { Ok(to_axum_message(message)) });
1600 let connection = Connection::new(Box::pin(socket));
1601 async move {
1602 server
1603 .handle_connection(connection, socket_address, user, None, RealExecutor)
1604 .await
1605 .log_err();
1606 }
1607 })
1608}
1609
1610fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1611 match message {
1612 TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1613 TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1614 TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1615 TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1616 TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1617 code: frame.code.into(),
1618 reason: frame.reason,
1619 })),
1620 }
1621}
1622
1623fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1624 match message {
1625 AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1626 AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1627 AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1628 AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1629 AxumMessage::Close(frame) => {
1630 TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1631 code: frame.code.into(),
1632 reason: frame.reason,
1633 }))
1634 }
1635 }
1636}
1637
1638pub trait ResultExt {
1639 type Ok;
1640
1641 fn trace_err(self) -> Option<Self::Ok>;
1642}
1643
1644impl<T, E> ResultExt for Result<T, E>
1645where
1646 E: std::fmt::Debug,
1647{
1648 type Ok = T;
1649
1650 fn trace_err(self) -> Option<T> {
1651 match self {
1652 Ok(value) => Some(value),
1653 Err(error) => {
1654 tracing::error!("{:?}", error);
1655 None
1656 }
1657 }
1658 }
1659}
1660
1661#[cfg(test)]
1662mod tests {
1663 use super::*;
1664 use crate::{
1665 db::{tests::TestDb, UserId},
1666 AppState,
1667 };
1668 use ::rpc::Peer;
1669 use client::{
1670 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1671 EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1672 };
1673 use collections::{BTreeMap, HashSet};
1674 use editor::{
1675 self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1676 ToOffset, ToggleCodeActions, Undo,
1677 };
1678 use gpui::{
1679 executor::{self, Deterministic},
1680 geometry::vector::vec2f,
1681 ModelHandle, Task, TestAppContext, ViewHandle,
1682 };
1683 use language::{
1684 range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1685 LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1686 };
1687 use lsp::{self, FakeLanguageServer};
1688 use parking_lot::Mutex;
1689 use project::{
1690 fs::{FakeFs, Fs as _},
1691 search::SearchQuery,
1692 worktree::WorktreeHandle,
1693 DiagnosticSummary, Project, ProjectPath, WorktreeId,
1694 };
1695 use rand::prelude::*;
1696 use rpc::PeerId;
1697 use serde_json::json;
1698 use settings::Settings;
1699 use sqlx::types::time::OffsetDateTime;
1700 use std::{
1701 cell::RefCell,
1702 env,
1703 ops::Deref,
1704 path::{Path, PathBuf},
1705 rc::Rc,
1706 sync::{
1707 atomic::{AtomicBool, Ordering::SeqCst},
1708 Arc,
1709 },
1710 time::Duration,
1711 };
1712 use theme::ThemeRegistry;
1713 use workspace::{Item, SplitDirection, ToggleFollow, Workspace};
1714
1715 #[cfg(test)]
1716 #[ctor::ctor]
1717 fn init_logger() {
1718 if std::env::var("RUST_LOG").is_ok() {
1719 env_logger::init();
1720 }
1721 }
1722
1723 #[gpui::test(iterations = 10)]
1724 async fn test_share_project(
1725 deterministic: Arc<Deterministic>,
1726 cx_a: &mut TestAppContext,
1727 cx_b: &mut TestAppContext,
1728 cx_b2: &mut TestAppContext,
1729 ) {
1730 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1731 let lang_registry = Arc::new(LanguageRegistry::test());
1732 let fs = FakeFs::new(cx_a.background());
1733 cx_a.foreground().forbid_parking();
1734
1735 // Connect to a server as 2 clients.
1736 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1737 let client_a = server.create_client(cx_a, "user_a").await;
1738 let mut client_b = server.create_client(cx_b, "user_b").await;
1739 server
1740 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1741 .await;
1742
1743 // Share a project as client A
1744 fs.insert_tree(
1745 "/a",
1746 json!({
1747 ".gitignore": "ignored-dir",
1748 "a.txt": "a-contents",
1749 "b.txt": "b-contents",
1750 "ignored-dir": {
1751 "c.txt": "",
1752 "d.txt": "",
1753 }
1754 }),
1755 )
1756 .await;
1757 let project_a = cx_a.update(|cx| {
1758 Project::local(
1759 client_a.clone(),
1760 client_a.user_store.clone(),
1761 lang_registry.clone(),
1762 fs.clone(),
1763 cx,
1764 )
1765 });
1766 let project_id = project_a
1767 .read_with(cx_a, |project, _| project.next_remote_id())
1768 .await;
1769 let (worktree_a, _) = project_a
1770 .update(cx_a, |p, cx| {
1771 p.find_or_create_local_worktree("/a", true, cx)
1772 })
1773 .await
1774 .unwrap();
1775 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1776 worktree_a
1777 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1778 .await;
1779
1780 // Join that project as client B
1781 let client_b_peer_id = client_b.peer_id;
1782 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1783 let replica_id_b = project_b.read_with(cx_b, |project, _| {
1784 assert_eq!(
1785 project
1786 .collaborators()
1787 .get(&client_a.peer_id)
1788 .unwrap()
1789 .user
1790 .github_login,
1791 "user_a"
1792 );
1793 project.replica_id()
1794 });
1795
1796 deterministic.run_until_parked();
1797 project_a.read_with(cx_a, |project, _| {
1798 let client_b_collaborator = project.collaborators().get(&client_b_peer_id).unwrap();
1799 assert_eq!(client_b_collaborator.replica_id, replica_id_b);
1800 assert_eq!(client_b_collaborator.user.github_login, "user_b");
1801 });
1802 project_b.read_with(cx_b, |project, cx| {
1803 let worktree = project.worktrees(cx).next().unwrap().read(cx);
1804 assert_eq!(
1805 worktree.paths().map(AsRef::as_ref).collect::<Vec<_>>(),
1806 [
1807 Path::new(".gitignore"),
1808 Path::new("a.txt"),
1809 Path::new("b.txt"),
1810 Path::new("ignored-dir"),
1811 Path::new("ignored-dir/c.txt"),
1812 Path::new("ignored-dir/d.txt"),
1813 ]
1814 );
1815 });
1816
1817 // Open the same file as client B and client A.
1818 let buffer_b = project_b
1819 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1820 .await
1821 .unwrap();
1822 buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1823 project_a.read_with(cx_a, |project, cx| {
1824 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1825 });
1826 let buffer_a = project_a
1827 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1828 .await
1829 .unwrap();
1830
1831 let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1832
1833 // TODO
1834 // // Create a selection set as client B and see that selection set as client A.
1835 // buffer_a
1836 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1837 // .await;
1838
1839 // Edit the buffer as client B and see that edit as client A.
1840 editor_b.update(cx_b, |editor, cx| {
1841 editor.handle_input(&Input("ok, ".into()), cx)
1842 });
1843 buffer_a
1844 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1845 .await;
1846
1847 // TODO
1848 // // Remove the selection set as client B, see those selections disappear as client A.
1849 cx_b.update(move |_| drop(editor_b));
1850 // buffer_a
1851 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1852 // .await;
1853
1854 // Client B can join again on a different window because they are already a participant.
1855 let client_b2 = server.create_client(cx_b2, "user_b").await;
1856 let project_b2 = Project::remote(
1857 project_id,
1858 client_b2.client.clone(),
1859 client_b2.user_store.clone(),
1860 lang_registry.clone(),
1861 FakeFs::new(cx_b2.background()),
1862 &mut cx_b2.to_async(),
1863 )
1864 .await
1865 .unwrap();
1866 deterministic.run_until_parked();
1867 project_a.read_with(cx_a, |project, _| {
1868 assert_eq!(project.collaborators().len(), 2);
1869 });
1870 project_b.read_with(cx_b, |project, _| {
1871 assert_eq!(project.collaborators().len(), 2);
1872 });
1873 project_b2.read_with(cx_b2, |project, _| {
1874 assert_eq!(project.collaborators().len(), 2);
1875 });
1876
1877 // Dropping client B's first project removes only that from client A's collaborators.
1878 cx_b.update(move |_| {
1879 drop(client_b.project.take());
1880 drop(project_b);
1881 });
1882 deterministic.run_until_parked();
1883 project_a.read_with(cx_a, |project, _| {
1884 assert_eq!(project.collaborators().len(), 1);
1885 });
1886 project_b2.read_with(cx_b2, |project, _| {
1887 assert_eq!(project.collaborators().len(), 1);
1888 });
1889 }
1890
1891 #[gpui::test(iterations = 10)]
1892 async fn test_unshare_project(
1893 deterministic: Arc<Deterministic>,
1894 cx_a: &mut TestAppContext,
1895 cx_b: &mut TestAppContext,
1896 ) {
1897 let lang_registry = Arc::new(LanguageRegistry::test());
1898 let fs = FakeFs::new(cx_a.background());
1899 cx_a.foreground().forbid_parking();
1900
1901 // Connect to a server as 2 clients.
1902 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1903 let client_a = server.create_client(cx_a, "user_a").await;
1904 let mut client_b = server.create_client(cx_b, "user_b").await;
1905 server
1906 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1907 .await;
1908
1909 // Share a project as client A
1910 fs.insert_tree(
1911 "/a",
1912 json!({
1913 "a.txt": "a-contents",
1914 "b.txt": "b-contents",
1915 }),
1916 )
1917 .await;
1918 let project_a = cx_a.update(|cx| {
1919 Project::local(
1920 client_a.clone(),
1921 client_a.user_store.clone(),
1922 lang_registry.clone(),
1923 fs.clone(),
1924 cx,
1925 )
1926 });
1927 let (worktree_a, _) = project_a
1928 .update(cx_a, |p, cx| {
1929 p.find_or_create_local_worktree("/a", true, cx)
1930 })
1931 .await
1932 .unwrap();
1933 worktree_a
1934 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1935 .await;
1936 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1937
1938 // Join that project as client B
1939 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1940 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1941 project_b
1942 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1943 .await
1944 .unwrap();
1945
1946 // When client B leaves the project, it gets automatically unshared.
1947 cx_b.update(|_| {
1948 drop(client_b.project.take());
1949 drop(project_b);
1950 });
1951 deterministic.run_until_parked();
1952 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1953
1954 // When client B joins again, the project gets re-shared.
1955 let project_b2 = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1956 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1957 project_b2
1958 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1959 .await
1960 .unwrap();
1961
1962 // When client A (the host) leaves, the project gets unshared and guests are notified.
1963 cx_a.update(|_| drop(project_a));
1964 deterministic.run_until_parked();
1965 project_b2.read_with(cx_b, |project, _| {
1966 assert!(project.is_read_only());
1967 assert!(project.collaborators().is_empty());
1968 });
1969 }
1970
1971 #[gpui::test(iterations = 10)]
1972 async fn test_host_disconnect(
1973 deterministic: Arc<Deterministic>,
1974 cx_a: &mut TestAppContext,
1975 cx_b: &mut TestAppContext,
1976 cx_c: &mut TestAppContext,
1977 ) {
1978 let lang_registry = Arc::new(LanguageRegistry::test());
1979 let fs = FakeFs::new(cx_a.background());
1980 cx_a.foreground().forbid_parking();
1981
1982 // Connect to a server as 3 clients.
1983 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1984 let client_a = server.create_client(cx_a, "user_a").await;
1985 let mut client_b = server.create_client(cx_b, "user_b").await;
1986 let client_c = server.create_client(cx_c, "user_c").await;
1987 server
1988 .make_contacts(vec![
1989 (&client_a, cx_a),
1990 (&client_b, cx_b),
1991 (&client_c, cx_c),
1992 ])
1993 .await;
1994
1995 // Share a project as client A
1996 fs.insert_tree(
1997 "/a",
1998 json!({
1999 "a.txt": "a-contents",
2000 "b.txt": "b-contents",
2001 }),
2002 )
2003 .await;
2004 let project_a = cx_a.update(|cx| {
2005 Project::local(
2006 client_a.clone(),
2007 client_a.user_store.clone(),
2008 lang_registry.clone(),
2009 fs.clone(),
2010 cx,
2011 )
2012 });
2013 let project_id = project_a
2014 .read_with(cx_a, |project, _| project.next_remote_id())
2015 .await;
2016 let (worktree_a, _) = project_a
2017 .update(cx_a, |p, cx| {
2018 p.find_or_create_local_worktree("/a", true, cx)
2019 })
2020 .await
2021 .unwrap();
2022 worktree_a
2023 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2024 .await;
2025 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2026
2027 // Join that project as client B
2028 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2029 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
2030 project_b
2031 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2032 .await
2033 .unwrap();
2034
2035 // Request to join that project as client C
2036 let project_c = cx_c.spawn(|mut cx| {
2037 let client = client_c.client.clone();
2038 let user_store = client_c.user_store.clone();
2039 let lang_registry = lang_registry.clone();
2040 async move {
2041 Project::remote(
2042 project_id,
2043 client,
2044 user_store,
2045 lang_registry.clone(),
2046 FakeFs::new(cx.background()),
2047 &mut cx,
2048 )
2049 .await
2050 }
2051 });
2052 deterministic.run_until_parked();
2053
2054 // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
2055 server.disconnect_client(client_a.current_user_id(cx_a));
2056 cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
2057 project_a
2058 .condition(cx_a, |project, _| project.collaborators().is_empty())
2059 .await;
2060 project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
2061 project_b
2062 .condition(cx_b, |project, _| project.is_read_only())
2063 .await;
2064 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
2065 cx_b.update(|_| {
2066 drop(project_b);
2067 });
2068 assert!(matches!(
2069 project_c.await.unwrap_err(),
2070 project::JoinProjectError::HostWentOffline
2071 ));
2072
2073 // Ensure guests can still join.
2074 let project_b2 = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2075 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
2076 project_b2
2077 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2078 .await
2079 .unwrap();
2080 }
2081
2082 #[gpui::test(iterations = 10)]
2083 async fn test_decline_join_request(
2084 deterministic: Arc<Deterministic>,
2085 cx_a: &mut TestAppContext,
2086 cx_b: &mut TestAppContext,
2087 ) {
2088 let lang_registry = Arc::new(LanguageRegistry::test());
2089 let fs = FakeFs::new(cx_a.background());
2090 cx_a.foreground().forbid_parking();
2091
2092 // Connect to a server as 2 clients.
2093 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2094 let client_a = server.create_client(cx_a, "user_a").await;
2095 let client_b = server.create_client(cx_b, "user_b").await;
2096 server
2097 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2098 .await;
2099
2100 // Share a project as client A
2101 fs.insert_tree("/a", json!({})).await;
2102 let project_a = cx_a.update(|cx| {
2103 Project::local(
2104 client_a.clone(),
2105 client_a.user_store.clone(),
2106 lang_registry.clone(),
2107 fs.clone(),
2108 cx,
2109 )
2110 });
2111 let project_id = project_a
2112 .read_with(cx_a, |project, _| project.next_remote_id())
2113 .await;
2114 let (worktree_a, _) = project_a
2115 .update(cx_a, |p, cx| {
2116 p.find_or_create_local_worktree("/a", true, cx)
2117 })
2118 .await
2119 .unwrap();
2120 worktree_a
2121 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2122 .await;
2123
2124 // Request to join that project as client B
2125 let project_b = cx_b.spawn(|mut cx| {
2126 let client = client_b.client.clone();
2127 let user_store = client_b.user_store.clone();
2128 let lang_registry = lang_registry.clone();
2129 async move {
2130 Project::remote(
2131 project_id,
2132 client,
2133 user_store,
2134 lang_registry.clone(),
2135 FakeFs::new(cx.background()),
2136 &mut cx,
2137 )
2138 .await
2139 }
2140 });
2141 deterministic.run_until_parked();
2142 project_a.update(cx_a, |project, cx| {
2143 project.respond_to_join_request(client_b.user_id().unwrap(), false, cx)
2144 });
2145 assert!(matches!(
2146 project_b.await.unwrap_err(),
2147 project::JoinProjectError::HostDeclined
2148 ));
2149
2150 // Request to join the project again as client B
2151 let project_b = cx_b.spawn(|mut cx| {
2152 let client = client_b.client.clone();
2153 let user_store = client_b.user_store.clone();
2154 let lang_registry = lang_registry.clone();
2155 async move {
2156 Project::remote(
2157 project_id,
2158 client,
2159 user_store,
2160 lang_registry.clone(),
2161 FakeFs::new(cx.background()),
2162 &mut cx,
2163 )
2164 .await
2165 }
2166 });
2167
2168 // Close the project on the host
2169 deterministic.run_until_parked();
2170 cx_a.update(|_| drop(project_a));
2171 deterministic.run_until_parked();
2172 assert!(matches!(
2173 project_b.await.unwrap_err(),
2174 project::JoinProjectError::HostClosedProject
2175 ));
2176 }
2177
2178 #[gpui::test(iterations = 10)]
2179 async fn test_cancel_join_request(
2180 deterministic: Arc<Deterministic>,
2181 cx_a: &mut TestAppContext,
2182 cx_b: &mut TestAppContext,
2183 ) {
2184 let lang_registry = Arc::new(LanguageRegistry::test());
2185 let fs = FakeFs::new(cx_a.background());
2186 cx_a.foreground().forbid_parking();
2187
2188 // Connect to a server as 2 clients.
2189 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2190 let client_a = server.create_client(cx_a, "user_a").await;
2191 let client_b = server.create_client(cx_b, "user_b").await;
2192 server
2193 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2194 .await;
2195
2196 // Share a project as client A
2197 fs.insert_tree("/a", json!({})).await;
2198 let project_a = cx_a.update(|cx| {
2199 Project::local(
2200 client_a.clone(),
2201 client_a.user_store.clone(),
2202 lang_registry.clone(),
2203 fs.clone(),
2204 cx,
2205 )
2206 });
2207 let project_id = project_a
2208 .read_with(cx_a, |project, _| project.next_remote_id())
2209 .await;
2210
2211 let project_a_events = Rc::new(RefCell::new(Vec::new()));
2212 let user_b = client_a
2213 .user_store
2214 .update(cx_a, |store, cx| {
2215 store.fetch_user(client_b.user_id().unwrap(), cx)
2216 })
2217 .await
2218 .unwrap();
2219 project_a.update(cx_a, {
2220 let project_a_events = project_a_events.clone();
2221 move |_, cx| {
2222 cx.subscribe(&cx.handle(), move |_, _, event, _| {
2223 project_a_events.borrow_mut().push(event.clone());
2224 })
2225 .detach();
2226 }
2227 });
2228
2229 let (worktree_a, _) = project_a
2230 .update(cx_a, |p, cx| {
2231 p.find_or_create_local_worktree("/a", true, cx)
2232 })
2233 .await
2234 .unwrap();
2235 worktree_a
2236 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2237 .await;
2238
2239 // Request to join that project as client B
2240 let project_b = cx_b.spawn(|mut cx| {
2241 let client = client_b.client.clone();
2242 let user_store = client_b.user_store.clone();
2243 let lang_registry = lang_registry.clone();
2244 async move {
2245 Project::remote(
2246 project_id,
2247 client,
2248 user_store,
2249 lang_registry.clone(),
2250 FakeFs::new(cx.background()),
2251 &mut cx,
2252 )
2253 .await
2254 }
2255 });
2256 deterministic.run_until_parked();
2257 assert_eq!(
2258 &*project_a_events.borrow(),
2259 &[project::Event::ContactRequestedJoin(user_b.clone())]
2260 );
2261 project_a_events.borrow_mut().clear();
2262
2263 // Cancel the join request by leaving the project
2264 client_b
2265 .client
2266 .send(proto::LeaveProject { project_id })
2267 .unwrap();
2268 drop(project_b);
2269
2270 deterministic.run_until_parked();
2271 assert_eq!(
2272 &*project_a_events.borrow(),
2273 &[project::Event::ContactCancelledJoinRequest(user_b.clone())]
2274 );
2275 }
2276
2277 #[gpui::test(iterations = 10)]
2278 async fn test_propagate_saves_and_fs_changes(
2279 cx_a: &mut TestAppContext,
2280 cx_b: &mut TestAppContext,
2281 cx_c: &mut TestAppContext,
2282 ) {
2283 let lang_registry = Arc::new(LanguageRegistry::test());
2284 let fs = FakeFs::new(cx_a.background());
2285 cx_a.foreground().forbid_parking();
2286
2287 // Connect to a server as 3 clients.
2288 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2289 let client_a = server.create_client(cx_a, "user_a").await;
2290 let mut client_b = server.create_client(cx_b, "user_b").await;
2291 let mut client_c = server.create_client(cx_c, "user_c").await;
2292 server
2293 .make_contacts(vec![
2294 (&client_a, cx_a),
2295 (&client_b, cx_b),
2296 (&client_c, cx_c),
2297 ])
2298 .await;
2299
2300 // Share a worktree as client A.
2301 fs.insert_tree(
2302 "/a",
2303 json!({
2304 "file1": "",
2305 "file2": ""
2306 }),
2307 )
2308 .await;
2309 let project_a = cx_a.update(|cx| {
2310 Project::local(
2311 client_a.clone(),
2312 client_a.user_store.clone(),
2313 lang_registry.clone(),
2314 fs.clone(),
2315 cx,
2316 )
2317 });
2318 let (worktree_a, _) = project_a
2319 .update(cx_a, |p, cx| {
2320 p.find_or_create_local_worktree("/a", true, cx)
2321 })
2322 .await
2323 .unwrap();
2324 worktree_a
2325 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2326 .await;
2327 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2328
2329 // Join that worktree as clients B and C.
2330 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2331 let project_c = client_c.build_remote_project(&project_a, cx_a, cx_c).await;
2332 let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2333 let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
2334
2335 // Open and edit a buffer as both guests B and C.
2336 let buffer_b = project_b
2337 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
2338 .await
2339 .unwrap();
2340 let buffer_c = project_c
2341 .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
2342 .await
2343 .unwrap();
2344 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "i-am-b, ")], cx));
2345 buffer_c.update(cx_c, |buf, cx| buf.edit([(0..0, "i-am-c, ")], cx));
2346
2347 // Open and edit that buffer as the host.
2348 let buffer_a = project_a
2349 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
2350 .await
2351 .unwrap();
2352
2353 buffer_a
2354 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
2355 .await;
2356 buffer_a.update(cx_a, |buf, cx| {
2357 buf.edit([(buf.len()..buf.len(), "i-am-a")], cx)
2358 });
2359
2360 // Wait for edits to propagate
2361 buffer_a
2362 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2363 .await;
2364 buffer_b
2365 .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2366 .await;
2367 buffer_c
2368 .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2369 .await;
2370
2371 // Edit the buffer as the host and concurrently save as guest B.
2372 let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
2373 buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "hi-a, ")], cx));
2374 save_b.await.unwrap();
2375 assert_eq!(
2376 fs.load("/a/file1".as_ref()).await.unwrap(),
2377 "hi-a, i-am-c, i-am-b, i-am-a"
2378 );
2379 buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
2380 buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
2381 buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
2382
2383 worktree_a.flush_fs_events(cx_a).await;
2384
2385 // Make changes on host's file system, see those changes on guest worktrees.
2386 fs.rename(
2387 "/a/file1".as_ref(),
2388 "/a/file1-renamed".as_ref(),
2389 Default::default(),
2390 )
2391 .await
2392 .unwrap();
2393
2394 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
2395 .await
2396 .unwrap();
2397 fs.insert_file(Path::new("/a/file4"), "4".into()).await;
2398
2399 worktree_a
2400 .condition(&cx_a, |tree, _| {
2401 tree.paths()
2402 .map(|p| p.to_string_lossy())
2403 .collect::<Vec<_>>()
2404 == ["file1-renamed", "file3", "file4"]
2405 })
2406 .await;
2407 worktree_b
2408 .condition(&cx_b, |tree, _| {
2409 tree.paths()
2410 .map(|p| p.to_string_lossy())
2411 .collect::<Vec<_>>()
2412 == ["file1-renamed", "file3", "file4"]
2413 })
2414 .await;
2415 worktree_c
2416 .condition(&cx_c, |tree, _| {
2417 tree.paths()
2418 .map(|p| p.to_string_lossy())
2419 .collect::<Vec<_>>()
2420 == ["file1-renamed", "file3", "file4"]
2421 })
2422 .await;
2423
2424 // Ensure buffer files are updated as well.
2425 buffer_a
2426 .condition(&cx_a, |buf, _| {
2427 buf.file().unwrap().path().to_str() == Some("file1-renamed")
2428 })
2429 .await;
2430 buffer_b
2431 .condition(&cx_b, |buf, _| {
2432 buf.file().unwrap().path().to_str() == Some("file1-renamed")
2433 })
2434 .await;
2435 buffer_c
2436 .condition(&cx_c, |buf, _| {
2437 buf.file().unwrap().path().to_str() == Some("file1-renamed")
2438 })
2439 .await;
2440 }
2441
2442 #[gpui::test(iterations = 10)]
2443 async fn test_fs_operations(
2444 executor: Arc<Deterministic>,
2445 cx_a: &mut TestAppContext,
2446 cx_b: &mut TestAppContext,
2447 ) {
2448 executor.forbid_parking();
2449 let fs = FakeFs::new(cx_a.background());
2450
2451 // Connect to a server as 2 clients.
2452 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2453 let mut client_a = server.create_client(cx_a, "user_a").await;
2454 let mut client_b = server.create_client(cx_b, "user_b").await;
2455 server
2456 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2457 .await;
2458
2459 // Share a project as client A
2460 fs.insert_tree(
2461 "/dir",
2462 json!({
2463 "a.txt": "a-contents",
2464 "b.txt": "b-contents",
2465 }),
2466 )
2467 .await;
2468
2469 let (project_a, worktree_id) = client_a.build_local_project(fs, "/dir", cx_a).await;
2470 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2471
2472 let worktree_a =
2473 project_a.read_with(cx_a, |project, cx| project.worktrees(cx).next().unwrap());
2474 let worktree_b =
2475 project_b.read_with(cx_b, |project, cx| project.worktrees(cx).next().unwrap());
2476
2477 let entry = project_b
2478 .update(cx_b, |project, cx| {
2479 project
2480 .create_entry((worktree_id, "c.txt"), false, cx)
2481 .unwrap()
2482 })
2483 .await
2484 .unwrap();
2485 worktree_a.read_with(cx_a, |worktree, _| {
2486 assert_eq!(
2487 worktree
2488 .paths()
2489 .map(|p| p.to_string_lossy())
2490 .collect::<Vec<_>>(),
2491 ["a.txt", "b.txt", "c.txt"]
2492 );
2493 });
2494 worktree_b.read_with(cx_b, |worktree, _| {
2495 assert_eq!(
2496 worktree
2497 .paths()
2498 .map(|p| p.to_string_lossy())
2499 .collect::<Vec<_>>(),
2500 ["a.txt", "b.txt", "c.txt"]
2501 );
2502 });
2503
2504 project_b
2505 .update(cx_b, |project, cx| {
2506 project.rename_entry(entry.id, Path::new("d.txt"), cx)
2507 })
2508 .unwrap()
2509 .await
2510 .unwrap();
2511 worktree_a.read_with(cx_a, |worktree, _| {
2512 assert_eq!(
2513 worktree
2514 .paths()
2515 .map(|p| p.to_string_lossy())
2516 .collect::<Vec<_>>(),
2517 ["a.txt", "b.txt", "d.txt"]
2518 );
2519 });
2520 worktree_b.read_with(cx_b, |worktree, _| {
2521 assert_eq!(
2522 worktree
2523 .paths()
2524 .map(|p| p.to_string_lossy())
2525 .collect::<Vec<_>>(),
2526 ["a.txt", "b.txt", "d.txt"]
2527 );
2528 });
2529
2530 let dir_entry = project_b
2531 .update(cx_b, |project, cx| {
2532 project
2533 .create_entry((worktree_id, "DIR"), true, cx)
2534 .unwrap()
2535 })
2536 .await
2537 .unwrap();
2538 worktree_a.read_with(cx_a, |worktree, _| {
2539 assert_eq!(
2540 worktree
2541 .paths()
2542 .map(|p| p.to_string_lossy())
2543 .collect::<Vec<_>>(),
2544 ["DIR", "a.txt", "b.txt", "d.txt"]
2545 );
2546 });
2547 worktree_b.read_with(cx_b, |worktree, _| {
2548 assert_eq!(
2549 worktree
2550 .paths()
2551 .map(|p| p.to_string_lossy())
2552 .collect::<Vec<_>>(),
2553 ["DIR", "a.txt", "b.txt", "d.txt"]
2554 );
2555 });
2556
2557 project_b
2558 .update(cx_b, |project, cx| {
2559 project.delete_entry(dir_entry.id, cx).unwrap()
2560 })
2561 .await
2562 .unwrap();
2563 worktree_a.read_with(cx_a, |worktree, _| {
2564 assert_eq!(
2565 worktree
2566 .paths()
2567 .map(|p| p.to_string_lossy())
2568 .collect::<Vec<_>>(),
2569 ["a.txt", "b.txt", "d.txt"]
2570 );
2571 });
2572 worktree_b.read_with(cx_b, |worktree, _| {
2573 assert_eq!(
2574 worktree
2575 .paths()
2576 .map(|p| p.to_string_lossy())
2577 .collect::<Vec<_>>(),
2578 ["a.txt", "b.txt", "d.txt"]
2579 );
2580 });
2581
2582 project_b
2583 .update(cx_b, |project, cx| {
2584 project.delete_entry(entry.id, cx).unwrap()
2585 })
2586 .await
2587 .unwrap();
2588 worktree_a.read_with(cx_a, |worktree, _| {
2589 assert_eq!(
2590 worktree
2591 .paths()
2592 .map(|p| p.to_string_lossy())
2593 .collect::<Vec<_>>(),
2594 ["a.txt", "b.txt"]
2595 );
2596 });
2597 worktree_b.read_with(cx_b, |worktree, _| {
2598 assert_eq!(
2599 worktree
2600 .paths()
2601 .map(|p| p.to_string_lossy())
2602 .collect::<Vec<_>>(),
2603 ["a.txt", "b.txt"]
2604 );
2605 });
2606 }
2607
2608 #[gpui::test(iterations = 10)]
2609 async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2610 cx_a.foreground().forbid_parking();
2611 let lang_registry = Arc::new(LanguageRegistry::test());
2612 let fs = FakeFs::new(cx_a.background());
2613
2614 // Connect to a server as 2 clients.
2615 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2616 let client_a = server.create_client(cx_a, "user_a").await;
2617 let mut client_b = server.create_client(cx_b, "user_b").await;
2618 server
2619 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2620 .await;
2621
2622 // Share a project as client A
2623 fs.insert_tree(
2624 "/dir",
2625 json!({
2626 "a.txt": "a-contents",
2627 }),
2628 )
2629 .await;
2630
2631 let project_a = cx_a.update(|cx| {
2632 Project::local(
2633 client_a.clone(),
2634 client_a.user_store.clone(),
2635 lang_registry.clone(),
2636 fs.clone(),
2637 cx,
2638 )
2639 });
2640 let (worktree_a, _) = project_a
2641 .update(cx_a, |p, cx| {
2642 p.find_or_create_local_worktree("/dir", true, cx)
2643 })
2644 .await
2645 .unwrap();
2646 worktree_a
2647 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2648 .await;
2649 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2650
2651 // Join that project as client B
2652 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2653
2654 // Open a buffer as client B
2655 let buffer_b = project_b
2656 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2657 .await
2658 .unwrap();
2659
2660 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "world ")], cx));
2661 buffer_b.read_with(cx_b, |buf, _| {
2662 assert!(buf.is_dirty());
2663 assert!(!buf.has_conflict());
2664 });
2665
2666 buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
2667 buffer_b
2668 .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
2669 .await;
2670 buffer_b.read_with(cx_b, |buf, _| {
2671 assert!(!buf.has_conflict());
2672 });
2673
2674 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "hello ")], cx));
2675 buffer_b.read_with(cx_b, |buf, _| {
2676 assert!(buf.is_dirty());
2677 assert!(!buf.has_conflict());
2678 });
2679 }
2680
2681 #[gpui::test(iterations = 10)]
2682 async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2683 cx_a.foreground().forbid_parking();
2684 let lang_registry = Arc::new(LanguageRegistry::test());
2685 let fs = FakeFs::new(cx_a.background());
2686
2687 // Connect to a server as 2 clients.
2688 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2689 let client_a = server.create_client(cx_a, "user_a").await;
2690 let mut client_b = server.create_client(cx_b, "user_b").await;
2691 server
2692 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2693 .await;
2694
2695 // Share a project as client A
2696 fs.insert_tree(
2697 "/dir",
2698 json!({
2699 "a.txt": "a-contents",
2700 }),
2701 )
2702 .await;
2703
2704 let project_a = cx_a.update(|cx| {
2705 Project::local(
2706 client_a.clone(),
2707 client_a.user_store.clone(),
2708 lang_registry.clone(),
2709 fs.clone(),
2710 cx,
2711 )
2712 });
2713 let (worktree_a, _) = project_a
2714 .update(cx_a, |p, cx| {
2715 p.find_or_create_local_worktree("/dir", true, cx)
2716 })
2717 .await
2718 .unwrap();
2719 worktree_a
2720 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2721 .await;
2722 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2723
2724 // Join that project as client B
2725 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2726 let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2727
2728 // Open a buffer as client B
2729 let buffer_b = project_b
2730 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2731 .await
2732 .unwrap();
2733 buffer_b.read_with(cx_b, |buf, _| {
2734 assert!(!buf.is_dirty());
2735 assert!(!buf.has_conflict());
2736 });
2737
2738 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
2739 .await
2740 .unwrap();
2741 buffer_b
2742 .condition(&cx_b, |buf, _| {
2743 buf.text() == "new contents" && !buf.is_dirty()
2744 })
2745 .await;
2746 buffer_b.read_with(cx_b, |buf, _| {
2747 assert!(!buf.has_conflict());
2748 });
2749 }
2750
2751 #[gpui::test(iterations = 10)]
2752 async fn test_editing_while_guest_opens_buffer(
2753 cx_a: &mut TestAppContext,
2754 cx_b: &mut TestAppContext,
2755 ) {
2756 cx_a.foreground().forbid_parking();
2757 let lang_registry = Arc::new(LanguageRegistry::test());
2758 let fs = FakeFs::new(cx_a.background());
2759
2760 // Connect to a server as 2 clients.
2761 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2762 let client_a = server.create_client(cx_a, "user_a").await;
2763 let mut client_b = server.create_client(cx_b, "user_b").await;
2764 server
2765 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2766 .await;
2767
2768 // Share a project as client A
2769 fs.insert_tree(
2770 "/dir",
2771 json!({
2772 "a.txt": "a-contents",
2773 }),
2774 )
2775 .await;
2776 let project_a = cx_a.update(|cx| {
2777 Project::local(
2778 client_a.clone(),
2779 client_a.user_store.clone(),
2780 lang_registry.clone(),
2781 fs.clone(),
2782 cx,
2783 )
2784 });
2785 let (worktree_a, _) = project_a
2786 .update(cx_a, |p, cx| {
2787 p.find_or_create_local_worktree("/dir", true, cx)
2788 })
2789 .await
2790 .unwrap();
2791 worktree_a
2792 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2793 .await;
2794 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2795
2796 // Join that project as client B
2797 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2798
2799 // Open a buffer as client A
2800 let buffer_a = project_a
2801 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2802 .await
2803 .unwrap();
2804
2805 // Start opening the same buffer as client B
2806 let buffer_b = cx_b
2807 .background()
2808 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2809
2810 // Edit the buffer as client A while client B is still opening it.
2811 cx_b.background().simulate_random_delay().await;
2812 buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "X")], cx));
2813 cx_b.background().simulate_random_delay().await;
2814 buffer_a.update(cx_a, |buf, cx| buf.edit([(1..1, "Y")], cx));
2815
2816 let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2817 let buffer_b = buffer_b.await.unwrap();
2818 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2819 }
2820
2821 #[gpui::test(iterations = 10)]
2822 async fn test_leaving_worktree_while_opening_buffer(
2823 cx_a: &mut TestAppContext,
2824 cx_b: &mut TestAppContext,
2825 ) {
2826 cx_a.foreground().forbid_parking();
2827 let lang_registry = Arc::new(LanguageRegistry::test());
2828 let fs = FakeFs::new(cx_a.background());
2829
2830 // Connect to a server as 2 clients.
2831 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2832 let client_a = server.create_client(cx_a, "user_a").await;
2833 let mut client_b = server.create_client(cx_b, "user_b").await;
2834 server
2835 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2836 .await;
2837
2838 // Share a project as client A
2839 fs.insert_tree(
2840 "/dir",
2841 json!({
2842 "a.txt": "a-contents",
2843 }),
2844 )
2845 .await;
2846 let project_a = cx_a.update(|cx| {
2847 Project::local(
2848 client_a.clone(),
2849 client_a.user_store.clone(),
2850 lang_registry.clone(),
2851 fs.clone(),
2852 cx,
2853 )
2854 });
2855 let (worktree_a, _) = project_a
2856 .update(cx_a, |p, cx| {
2857 p.find_or_create_local_worktree("/dir", true, cx)
2858 })
2859 .await
2860 .unwrap();
2861 worktree_a
2862 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2863 .await;
2864 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2865
2866 // Join that project as client B
2867 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2868
2869 // See that a guest has joined as client A.
2870 project_a
2871 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2872 .await;
2873
2874 // Begin opening a buffer as client B, but leave the project before the open completes.
2875 let buffer_b = cx_b
2876 .background()
2877 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2878 cx_b.update(|_| {
2879 drop(client_b.project.take());
2880 drop(project_b);
2881 });
2882 drop(buffer_b);
2883
2884 // See that the guest has left.
2885 project_a
2886 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2887 .await;
2888 }
2889
2890 #[gpui::test(iterations = 10)]
2891 async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2892 cx_a.foreground().forbid_parking();
2893 let lang_registry = Arc::new(LanguageRegistry::test());
2894 let fs = FakeFs::new(cx_a.background());
2895
2896 // Connect to a server as 2 clients.
2897 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2898 let client_a = server.create_client(cx_a, "user_a").await;
2899 let mut client_b = server.create_client(cx_b, "user_b").await;
2900 server
2901 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2902 .await;
2903
2904 // Share a project as client A
2905 fs.insert_tree(
2906 "/a",
2907 json!({
2908 "a.txt": "a-contents",
2909 "b.txt": "b-contents",
2910 }),
2911 )
2912 .await;
2913 let project_a = cx_a.update(|cx| {
2914 Project::local(
2915 client_a.clone(),
2916 client_a.user_store.clone(),
2917 lang_registry.clone(),
2918 fs.clone(),
2919 cx,
2920 )
2921 });
2922 let (worktree_a, _) = project_a
2923 .update(cx_a, |p, cx| {
2924 p.find_or_create_local_worktree("/a", true, cx)
2925 })
2926 .await
2927 .unwrap();
2928 worktree_a
2929 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2930 .await;
2931
2932 // Join that project as client B
2933 let _project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2934
2935 // Client A sees that a guest has joined.
2936 project_a
2937 .condition(cx_a, |p, _| p.collaborators().len() == 1)
2938 .await;
2939
2940 // Drop client B's connection and ensure client A observes client B leaving the project.
2941 client_b.disconnect(&cx_b.to_async()).unwrap();
2942 project_a
2943 .condition(cx_a, |p, _| p.collaborators().len() == 0)
2944 .await;
2945
2946 // Rejoin the project as client B
2947 let _project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2948
2949 // Client A sees that a guest has re-joined.
2950 project_a
2951 .condition(cx_a, |p, _| p.collaborators().len() == 1)
2952 .await;
2953
2954 // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2955 client_b.wait_for_current_user(cx_b).await;
2956 server.disconnect_client(client_b.current_user_id(cx_b));
2957 cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
2958 project_a
2959 .condition(cx_a, |p, _| p.collaborators().len() == 0)
2960 .await;
2961 }
2962
2963 #[gpui::test(iterations = 10)]
2964 async fn test_collaborating_with_diagnostics(
2965 deterministic: Arc<Deterministic>,
2966 cx_a: &mut TestAppContext,
2967 cx_b: &mut TestAppContext,
2968 cx_c: &mut TestAppContext,
2969 ) {
2970 deterministic.forbid_parking();
2971 let lang_registry = Arc::new(LanguageRegistry::test());
2972 let fs = FakeFs::new(cx_a.background());
2973
2974 // Set up a fake language server.
2975 let mut language = Language::new(
2976 LanguageConfig {
2977 name: "Rust".into(),
2978 path_suffixes: vec!["rs".to_string()],
2979 ..Default::default()
2980 },
2981 Some(tree_sitter_rust::language()),
2982 );
2983 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2984 lang_registry.add(Arc::new(language));
2985
2986 // Connect to a server as 2 clients.
2987 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2988 let client_a = server.create_client(cx_a, "user_a").await;
2989 let mut client_b = server.create_client(cx_b, "user_b").await;
2990 let mut client_c = server.create_client(cx_c, "user_c").await;
2991 server
2992 .make_contacts(vec![
2993 (&client_a, cx_a),
2994 (&client_b, cx_b),
2995 (&client_c, cx_c),
2996 ])
2997 .await;
2998
2999 // Share a project as client A
3000 fs.insert_tree(
3001 "/a",
3002 json!({
3003 "a.rs": "let one = two",
3004 "other.rs": "",
3005 }),
3006 )
3007 .await;
3008 let project_a = cx_a.update(|cx| {
3009 Project::local(
3010 client_a.clone(),
3011 client_a.user_store.clone(),
3012 lang_registry.clone(),
3013 fs.clone(),
3014 cx,
3015 )
3016 });
3017 let (worktree_a, _) = project_a
3018 .update(cx_a, |p, cx| {
3019 p.find_or_create_local_worktree("/a", true, cx)
3020 })
3021 .await
3022 .unwrap();
3023 worktree_a
3024 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3025 .await;
3026 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3027 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3028
3029 // Cause the language server to start.
3030 let _buffer = cx_a
3031 .background()
3032 .spawn(project_a.update(cx_a, |project, cx| {
3033 project.open_buffer(
3034 ProjectPath {
3035 worktree_id,
3036 path: Path::new("other.rs").into(),
3037 },
3038 cx,
3039 )
3040 }))
3041 .await
3042 .unwrap();
3043
3044 // Join the worktree as client B.
3045 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3046
3047 // Simulate a language server reporting errors for a file.
3048 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3049 fake_language_server
3050 .receive_notification::<lsp::notification::DidOpenTextDocument>()
3051 .await;
3052 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
3053 lsp::PublishDiagnosticsParams {
3054 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
3055 version: None,
3056 diagnostics: vec![lsp::Diagnostic {
3057 severity: Some(lsp::DiagnosticSeverity::ERROR),
3058 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
3059 message: "message 1".to_string(),
3060 ..Default::default()
3061 }],
3062 },
3063 );
3064
3065 // Wait for server to see the diagnostics update.
3066 deterministic.run_until_parked();
3067 {
3068 let store = server.store.read().await;
3069 let project = store.project(project_id).unwrap();
3070 let worktree = project.worktrees.get(&worktree_id.to_proto()).unwrap();
3071 assert!(!worktree.diagnostic_summaries.is_empty());
3072 }
3073
3074 // Ensure client B observes the new diagnostics.
3075 project_b.read_with(cx_b, |project, cx| {
3076 assert_eq!(
3077 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
3078 &[(
3079 ProjectPath {
3080 worktree_id,
3081 path: Arc::from(Path::new("a.rs")),
3082 },
3083 DiagnosticSummary {
3084 error_count: 1,
3085 warning_count: 0,
3086 ..Default::default()
3087 },
3088 )]
3089 )
3090 });
3091
3092 // Join project as client C and observe the diagnostics.
3093 let project_c = client_c.build_remote_project(&project_a, cx_a, cx_c).await;
3094 project_c.read_with(cx_c, |project, cx| {
3095 assert_eq!(
3096 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
3097 &[(
3098 ProjectPath {
3099 worktree_id,
3100 path: Arc::from(Path::new("a.rs")),
3101 },
3102 DiagnosticSummary {
3103 error_count: 1,
3104 warning_count: 0,
3105 ..Default::default()
3106 },
3107 )]
3108 )
3109 });
3110
3111 // Simulate a language server reporting more errors for a file.
3112 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
3113 lsp::PublishDiagnosticsParams {
3114 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
3115 version: None,
3116 diagnostics: vec![
3117 lsp::Diagnostic {
3118 severity: Some(lsp::DiagnosticSeverity::ERROR),
3119 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
3120 message: "message 1".to_string(),
3121 ..Default::default()
3122 },
3123 lsp::Diagnostic {
3124 severity: Some(lsp::DiagnosticSeverity::WARNING),
3125 range: lsp::Range::new(
3126 lsp::Position::new(0, 10),
3127 lsp::Position::new(0, 13),
3128 ),
3129 message: "message 2".to_string(),
3130 ..Default::default()
3131 },
3132 ],
3133 },
3134 );
3135
3136 // Clients B and C get the updated summaries
3137 deterministic.run_until_parked();
3138 project_b.read_with(cx_b, |project, cx| {
3139 assert_eq!(
3140 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
3141 [(
3142 ProjectPath {
3143 worktree_id,
3144 path: Arc::from(Path::new("a.rs")),
3145 },
3146 DiagnosticSummary {
3147 error_count: 1,
3148 warning_count: 1,
3149 ..Default::default()
3150 },
3151 )]
3152 );
3153 });
3154 project_c.read_with(cx_c, |project, cx| {
3155 assert_eq!(
3156 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
3157 [(
3158 ProjectPath {
3159 worktree_id,
3160 path: Arc::from(Path::new("a.rs")),
3161 },
3162 DiagnosticSummary {
3163 error_count: 1,
3164 warning_count: 1,
3165 ..Default::default()
3166 },
3167 )]
3168 );
3169 });
3170
3171 // Open the file with the errors on client B. They should be present.
3172 let buffer_b = cx_b
3173 .background()
3174 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3175 .await
3176 .unwrap();
3177
3178 buffer_b.read_with(cx_b, |buffer, _| {
3179 assert_eq!(
3180 buffer
3181 .snapshot()
3182 .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
3183 .map(|entry| entry)
3184 .collect::<Vec<_>>(),
3185 &[
3186 DiagnosticEntry {
3187 range: Point::new(0, 4)..Point::new(0, 7),
3188 diagnostic: Diagnostic {
3189 group_id: 0,
3190 message: "message 1".to_string(),
3191 severity: lsp::DiagnosticSeverity::ERROR,
3192 is_primary: true,
3193 ..Default::default()
3194 }
3195 },
3196 DiagnosticEntry {
3197 range: Point::new(0, 10)..Point::new(0, 13),
3198 diagnostic: Diagnostic {
3199 group_id: 1,
3200 severity: lsp::DiagnosticSeverity::WARNING,
3201 message: "message 2".to_string(),
3202 is_primary: true,
3203 ..Default::default()
3204 }
3205 }
3206 ]
3207 );
3208 });
3209
3210 // Simulate a language server reporting no errors for a file.
3211 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
3212 lsp::PublishDiagnosticsParams {
3213 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
3214 version: None,
3215 diagnostics: vec![],
3216 },
3217 );
3218 deterministic.run_until_parked();
3219 project_a.read_with(cx_a, |project, cx| {
3220 assert_eq!(project.diagnostic_summaries(cx).collect::<Vec<_>>(), [])
3221 });
3222 project_b.read_with(cx_b, |project, cx| {
3223 assert_eq!(project.diagnostic_summaries(cx).collect::<Vec<_>>(), [])
3224 });
3225 project_c.read_with(cx_c, |project, cx| {
3226 assert_eq!(project.diagnostic_summaries(cx).collect::<Vec<_>>(), [])
3227 });
3228 }
3229
3230 #[gpui::test(iterations = 10)]
3231 async fn test_collaborating_with_completion(
3232 cx_a: &mut TestAppContext,
3233 cx_b: &mut TestAppContext,
3234 ) {
3235 cx_a.foreground().forbid_parking();
3236 let lang_registry = Arc::new(LanguageRegistry::test());
3237 let fs = FakeFs::new(cx_a.background());
3238
3239 // Set up a fake language server.
3240 let mut language = Language::new(
3241 LanguageConfig {
3242 name: "Rust".into(),
3243 path_suffixes: vec!["rs".to_string()],
3244 ..Default::default()
3245 },
3246 Some(tree_sitter_rust::language()),
3247 );
3248 let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
3249 capabilities: lsp::ServerCapabilities {
3250 completion_provider: Some(lsp::CompletionOptions {
3251 trigger_characters: Some(vec![".".to_string()]),
3252 ..Default::default()
3253 }),
3254 ..Default::default()
3255 },
3256 ..Default::default()
3257 });
3258 lang_registry.add(Arc::new(language));
3259
3260 // Connect to a server as 2 clients.
3261 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3262 let client_a = server.create_client(cx_a, "user_a").await;
3263 let mut client_b = server.create_client(cx_b, "user_b").await;
3264 server
3265 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3266 .await;
3267
3268 // Share a project as client A
3269 fs.insert_tree(
3270 "/a",
3271 json!({
3272 "main.rs": "fn main() { a }",
3273 "other.rs": "",
3274 }),
3275 )
3276 .await;
3277 let project_a = cx_a.update(|cx| {
3278 Project::local(
3279 client_a.clone(),
3280 client_a.user_store.clone(),
3281 lang_registry.clone(),
3282 fs.clone(),
3283 cx,
3284 )
3285 });
3286 let (worktree_a, _) = project_a
3287 .update(cx_a, |p, cx| {
3288 p.find_or_create_local_worktree("/a", true, cx)
3289 })
3290 .await
3291 .unwrap();
3292 worktree_a
3293 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3294 .await;
3295 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3296
3297 // Join the worktree as client B.
3298 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3299
3300 // Open a file in an editor as the guest.
3301 let buffer_b = project_b
3302 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
3303 .await
3304 .unwrap();
3305 let (window_b, _) = cx_b.add_window(|_| EmptyView);
3306 let editor_b = cx_b.add_view(window_b, |cx| {
3307 Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
3308 });
3309
3310 let fake_language_server = fake_language_servers.next().await.unwrap();
3311 buffer_b
3312 .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
3313 .await;
3314
3315 // Type a completion trigger character as the guest.
3316 editor_b.update(cx_b, |editor, cx| {
3317 editor.change_selections(None, cx, |s| s.select_ranges([13..13]));
3318 editor.handle_input(&Input(".".into()), cx);
3319 cx.focus(&editor_b);
3320 });
3321
3322 // Receive a completion request as the host's language server.
3323 // Return some completions from the host's language server.
3324 cx_a.foreground().start_waiting();
3325 fake_language_server
3326 .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
3327 assert_eq!(
3328 params.text_document_position.text_document.uri,
3329 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3330 );
3331 assert_eq!(
3332 params.text_document_position.position,
3333 lsp::Position::new(0, 14),
3334 );
3335
3336 Ok(Some(lsp::CompletionResponse::Array(vec![
3337 lsp::CompletionItem {
3338 label: "first_method(…)".into(),
3339 detail: Some("fn(&mut self, B) -> C".into()),
3340 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3341 new_text: "first_method($1)".to_string(),
3342 range: lsp::Range::new(
3343 lsp::Position::new(0, 14),
3344 lsp::Position::new(0, 14),
3345 ),
3346 })),
3347 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3348 ..Default::default()
3349 },
3350 lsp::CompletionItem {
3351 label: "second_method(…)".into(),
3352 detail: Some("fn(&mut self, C) -> D<E>".into()),
3353 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3354 new_text: "second_method()".to_string(),
3355 range: lsp::Range::new(
3356 lsp::Position::new(0, 14),
3357 lsp::Position::new(0, 14),
3358 ),
3359 })),
3360 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3361 ..Default::default()
3362 },
3363 ])))
3364 })
3365 .next()
3366 .await
3367 .unwrap();
3368 cx_a.foreground().finish_waiting();
3369
3370 // Open the buffer on the host.
3371 let buffer_a = project_a
3372 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
3373 .await
3374 .unwrap();
3375 buffer_a
3376 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
3377 .await;
3378
3379 // Confirm a completion on the guest.
3380 editor_b
3381 .condition(&cx_b, |editor, _| editor.context_menu_visible())
3382 .await;
3383 editor_b.update(cx_b, |editor, cx| {
3384 editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
3385 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
3386 });
3387
3388 // Return a resolved completion from the host's language server.
3389 // The resolved completion has an additional text edit.
3390 fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
3391 |params, _| async move {
3392 assert_eq!(params.label, "first_method(…)");
3393 Ok(lsp::CompletionItem {
3394 label: "first_method(…)".into(),
3395 detail: Some("fn(&mut self, B) -> C".into()),
3396 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3397 new_text: "first_method($1)".to_string(),
3398 range: lsp::Range::new(
3399 lsp::Position::new(0, 14),
3400 lsp::Position::new(0, 14),
3401 ),
3402 })),
3403 additional_text_edits: Some(vec![lsp::TextEdit {
3404 new_text: "use d::SomeTrait;\n".to_string(),
3405 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
3406 }]),
3407 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3408 ..Default::default()
3409 })
3410 },
3411 );
3412
3413 // The additional edit is applied.
3414 buffer_a
3415 .condition(&cx_a, |buffer, _| {
3416 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3417 })
3418 .await;
3419 buffer_b
3420 .condition(&cx_b, |buffer, _| {
3421 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3422 })
3423 .await;
3424 }
3425
3426 #[gpui::test(iterations = 10)]
3427 async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3428 cx_a.foreground().forbid_parking();
3429 let lang_registry = Arc::new(LanguageRegistry::test());
3430 let fs = FakeFs::new(cx_a.background());
3431
3432 // Connect to a server as 2 clients.
3433 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3434 let client_a = server.create_client(cx_a, "user_a").await;
3435 let mut client_b = server.create_client(cx_b, "user_b").await;
3436 server
3437 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3438 .await;
3439
3440 // Share a project as client A
3441 fs.insert_tree(
3442 "/a",
3443 json!({
3444 "a.rs": "let one = 1;",
3445 }),
3446 )
3447 .await;
3448 let project_a = cx_a.update(|cx| {
3449 Project::local(
3450 client_a.clone(),
3451 client_a.user_store.clone(),
3452 lang_registry.clone(),
3453 fs.clone(),
3454 cx,
3455 )
3456 });
3457 let (worktree_a, _) = project_a
3458 .update(cx_a, |p, cx| {
3459 p.find_or_create_local_worktree("/a", true, cx)
3460 })
3461 .await
3462 .unwrap();
3463 worktree_a
3464 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3465 .await;
3466 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3467 let buffer_a = project_a
3468 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
3469 .await
3470 .unwrap();
3471
3472 // Join the worktree as client B.
3473 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3474
3475 let buffer_b = cx_b
3476 .background()
3477 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3478 .await
3479 .unwrap();
3480 buffer_b.update(cx_b, |buffer, cx| {
3481 buffer.edit([(4..7, "six")], cx);
3482 buffer.edit([(10..11, "6")], cx);
3483 assert_eq!(buffer.text(), "let six = 6;");
3484 assert!(buffer.is_dirty());
3485 assert!(!buffer.has_conflict());
3486 });
3487 buffer_a
3488 .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
3489 .await;
3490
3491 fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
3492 .await
3493 .unwrap();
3494 buffer_a
3495 .condition(cx_a, |buffer, _| buffer.has_conflict())
3496 .await;
3497 buffer_b
3498 .condition(cx_b, |buffer, _| buffer.has_conflict())
3499 .await;
3500
3501 project_b
3502 .update(cx_b, |project, cx| {
3503 project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
3504 })
3505 .await
3506 .unwrap();
3507 buffer_a.read_with(cx_a, |buffer, _| {
3508 assert_eq!(buffer.text(), "let seven = 7;");
3509 assert!(!buffer.is_dirty());
3510 assert!(!buffer.has_conflict());
3511 });
3512 buffer_b.read_with(cx_b, |buffer, _| {
3513 assert_eq!(buffer.text(), "let seven = 7;");
3514 assert!(!buffer.is_dirty());
3515 assert!(!buffer.has_conflict());
3516 });
3517
3518 buffer_a.update(cx_a, |buffer, cx| {
3519 // Undoing on the host is a no-op when the reload was initiated by the guest.
3520 buffer.undo(cx);
3521 assert_eq!(buffer.text(), "let seven = 7;");
3522 assert!(!buffer.is_dirty());
3523 assert!(!buffer.has_conflict());
3524 });
3525 buffer_b.update(cx_b, |buffer, cx| {
3526 // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
3527 buffer.undo(cx);
3528 assert_eq!(buffer.text(), "let six = 6;");
3529 assert!(buffer.is_dirty());
3530 assert!(!buffer.has_conflict());
3531 });
3532 }
3533
3534 #[gpui::test(iterations = 10)]
3535 async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3536 cx_a.foreground().forbid_parking();
3537 let lang_registry = Arc::new(LanguageRegistry::test());
3538 let fs = FakeFs::new(cx_a.background());
3539
3540 // Set up a fake language server.
3541 let mut language = Language::new(
3542 LanguageConfig {
3543 name: "Rust".into(),
3544 path_suffixes: vec!["rs".to_string()],
3545 ..Default::default()
3546 },
3547 Some(tree_sitter_rust::language()),
3548 );
3549 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3550 lang_registry.add(Arc::new(language));
3551
3552 // Connect to a server as 2 clients.
3553 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3554 let client_a = server.create_client(cx_a, "user_a").await;
3555 let mut client_b = server.create_client(cx_b, "user_b").await;
3556 server
3557 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3558 .await;
3559
3560 // Share a project as client A
3561 fs.insert_tree(
3562 "/a",
3563 json!({
3564 "a.rs": "let one = two",
3565 }),
3566 )
3567 .await;
3568 let project_a = cx_a.update(|cx| {
3569 Project::local(
3570 client_a.clone(),
3571 client_a.user_store.clone(),
3572 lang_registry.clone(),
3573 fs.clone(),
3574 cx,
3575 )
3576 });
3577 let (worktree_a, _) = project_a
3578 .update(cx_a, |p, cx| {
3579 p.find_or_create_local_worktree("/a", true, cx)
3580 })
3581 .await
3582 .unwrap();
3583 worktree_a
3584 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3585 .await;
3586 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3587
3588 // Join the project as client B.
3589 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3590
3591 let buffer_b = cx_b
3592 .background()
3593 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3594 .await
3595 .unwrap();
3596
3597 let fake_language_server = fake_language_servers.next().await.unwrap();
3598 fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
3599 Ok(Some(vec![
3600 lsp::TextEdit {
3601 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
3602 new_text: "h".to_string(),
3603 },
3604 lsp::TextEdit {
3605 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
3606 new_text: "y".to_string(),
3607 },
3608 ]))
3609 });
3610
3611 project_b
3612 .update(cx_b, |project, cx| {
3613 project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
3614 })
3615 .await
3616 .unwrap();
3617 assert_eq!(
3618 buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
3619 "let honey = two"
3620 );
3621 }
3622
3623 #[gpui::test(iterations = 10)]
3624 async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3625 cx_a.foreground().forbid_parking();
3626 let lang_registry = Arc::new(LanguageRegistry::test());
3627 let fs = FakeFs::new(cx_a.background());
3628 fs.insert_tree(
3629 "/root-1",
3630 json!({
3631 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
3632 }),
3633 )
3634 .await;
3635 fs.insert_tree(
3636 "/root-2",
3637 json!({
3638 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
3639 }),
3640 )
3641 .await;
3642
3643 // Set up a fake language server.
3644 let mut language = Language::new(
3645 LanguageConfig {
3646 name: "Rust".into(),
3647 path_suffixes: vec!["rs".to_string()],
3648 ..Default::default()
3649 },
3650 Some(tree_sitter_rust::language()),
3651 );
3652 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3653 lang_registry.add(Arc::new(language));
3654
3655 // Connect to a server as 2 clients.
3656 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3657 let client_a = server.create_client(cx_a, "user_a").await;
3658 let mut client_b = server.create_client(cx_b, "user_b").await;
3659 server
3660 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3661 .await;
3662
3663 // Share a project as client A
3664 let project_a = cx_a.update(|cx| {
3665 Project::local(
3666 client_a.clone(),
3667 client_a.user_store.clone(),
3668 lang_registry.clone(),
3669 fs.clone(),
3670 cx,
3671 )
3672 });
3673 let (worktree_a, _) = project_a
3674 .update(cx_a, |p, cx| {
3675 p.find_or_create_local_worktree("/root-1", true, cx)
3676 })
3677 .await
3678 .unwrap();
3679 worktree_a
3680 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3681 .await;
3682 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3683
3684 // Join the project as client B.
3685 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3686
3687 // Open the file on client B.
3688 let buffer_b = cx_b
3689 .background()
3690 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3691 .await
3692 .unwrap();
3693
3694 // Request the definition of a symbol as the guest.
3695 let fake_language_server = fake_language_servers.next().await.unwrap();
3696 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3697 |_, _| async move {
3698 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3699 lsp::Location::new(
3700 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3701 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3702 ),
3703 )))
3704 },
3705 );
3706
3707 let definitions_1 = project_b
3708 .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
3709 .await
3710 .unwrap();
3711 cx_b.read(|cx| {
3712 assert_eq!(definitions_1.len(), 1);
3713 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3714 let target_buffer = definitions_1[0].buffer.read(cx);
3715 assert_eq!(
3716 target_buffer.text(),
3717 "const TWO: usize = 2;\nconst THREE: usize = 3;"
3718 );
3719 assert_eq!(
3720 definitions_1[0].range.to_point(target_buffer),
3721 Point::new(0, 6)..Point::new(0, 9)
3722 );
3723 });
3724
3725 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
3726 // the previous call to `definition`.
3727 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3728 |_, _| async move {
3729 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3730 lsp::Location::new(
3731 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3732 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
3733 ),
3734 )))
3735 },
3736 );
3737
3738 let definitions_2 = project_b
3739 .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
3740 .await
3741 .unwrap();
3742 cx_b.read(|cx| {
3743 assert_eq!(definitions_2.len(), 1);
3744 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3745 let target_buffer = definitions_2[0].buffer.read(cx);
3746 assert_eq!(
3747 target_buffer.text(),
3748 "const TWO: usize = 2;\nconst THREE: usize = 3;"
3749 );
3750 assert_eq!(
3751 definitions_2[0].range.to_point(target_buffer),
3752 Point::new(1, 6)..Point::new(1, 11)
3753 );
3754 });
3755 assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
3756 }
3757
3758 #[gpui::test(iterations = 10)]
3759 async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3760 cx_a.foreground().forbid_parking();
3761 let lang_registry = Arc::new(LanguageRegistry::test());
3762 let fs = FakeFs::new(cx_a.background());
3763 fs.insert_tree(
3764 "/root-1",
3765 json!({
3766 "one.rs": "const ONE: usize = 1;",
3767 "two.rs": "const TWO: usize = one::ONE + one::ONE;",
3768 }),
3769 )
3770 .await;
3771 fs.insert_tree(
3772 "/root-2",
3773 json!({
3774 "three.rs": "const THREE: usize = two::TWO + one::ONE;",
3775 }),
3776 )
3777 .await;
3778
3779 // Set up a fake language server.
3780 let mut language = Language::new(
3781 LanguageConfig {
3782 name: "Rust".into(),
3783 path_suffixes: vec!["rs".to_string()],
3784 ..Default::default()
3785 },
3786 Some(tree_sitter_rust::language()),
3787 );
3788 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3789 lang_registry.add(Arc::new(language));
3790
3791 // Connect to a server as 2 clients.
3792 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3793 let client_a = server.create_client(cx_a, "user_a").await;
3794 let mut client_b = server.create_client(cx_b, "user_b").await;
3795 server
3796 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3797 .await;
3798
3799 // Share a project as client A
3800 let project_a = cx_a.update(|cx| {
3801 Project::local(
3802 client_a.clone(),
3803 client_a.user_store.clone(),
3804 lang_registry.clone(),
3805 fs.clone(),
3806 cx,
3807 )
3808 });
3809 let (worktree_a, _) = project_a
3810 .update(cx_a, |p, cx| {
3811 p.find_or_create_local_worktree("/root-1", true, cx)
3812 })
3813 .await
3814 .unwrap();
3815 worktree_a
3816 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3817 .await;
3818 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3819
3820 // Join the worktree as client B.
3821 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3822
3823 // Open the file on client B.
3824 let buffer_b = cx_b
3825 .background()
3826 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3827 .await
3828 .unwrap();
3829
3830 // Request references to a symbol as the guest.
3831 let fake_language_server = fake_language_servers.next().await.unwrap();
3832 fake_language_server.handle_request::<lsp::request::References, _, _>(
3833 |params, _| async move {
3834 assert_eq!(
3835 params.text_document_position.text_document.uri.as_str(),
3836 "file:///root-1/one.rs"
3837 );
3838 Ok(Some(vec![
3839 lsp::Location {
3840 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3841 range: lsp::Range::new(
3842 lsp::Position::new(0, 24),
3843 lsp::Position::new(0, 27),
3844 ),
3845 },
3846 lsp::Location {
3847 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3848 range: lsp::Range::new(
3849 lsp::Position::new(0, 35),
3850 lsp::Position::new(0, 38),
3851 ),
3852 },
3853 lsp::Location {
3854 uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3855 range: lsp::Range::new(
3856 lsp::Position::new(0, 37),
3857 lsp::Position::new(0, 40),
3858 ),
3859 },
3860 ]))
3861 },
3862 );
3863
3864 let references = project_b
3865 .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3866 .await
3867 .unwrap();
3868 cx_b.read(|cx| {
3869 assert_eq!(references.len(), 3);
3870 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3871
3872 let two_buffer = references[0].buffer.read(cx);
3873 let three_buffer = references[2].buffer.read(cx);
3874 assert_eq!(
3875 two_buffer.file().unwrap().path().as_ref(),
3876 Path::new("two.rs")
3877 );
3878 assert_eq!(references[1].buffer, references[0].buffer);
3879 assert_eq!(
3880 three_buffer.file().unwrap().full_path(cx),
3881 Path::new("three.rs")
3882 );
3883
3884 assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3885 assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3886 assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3887 });
3888 }
3889
3890 #[gpui::test(iterations = 10)]
3891 async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3892 cx_a.foreground().forbid_parking();
3893 let lang_registry = Arc::new(LanguageRegistry::test());
3894 let fs = FakeFs::new(cx_a.background());
3895 fs.insert_tree(
3896 "/root-1",
3897 json!({
3898 "a": "hello world",
3899 "b": "goodnight moon",
3900 "c": "a world of goo",
3901 "d": "world champion of clown world",
3902 }),
3903 )
3904 .await;
3905 fs.insert_tree(
3906 "/root-2",
3907 json!({
3908 "e": "disney world is fun",
3909 }),
3910 )
3911 .await;
3912
3913 // Connect to a server as 2 clients.
3914 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3915 let client_a = server.create_client(cx_a, "user_a").await;
3916 let mut client_b = server.create_client(cx_b, "user_b").await;
3917 server
3918 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3919 .await;
3920
3921 // Share a project as client A
3922 let project_a = cx_a.update(|cx| {
3923 Project::local(
3924 client_a.clone(),
3925 client_a.user_store.clone(),
3926 lang_registry.clone(),
3927 fs.clone(),
3928 cx,
3929 )
3930 });
3931
3932 let (worktree_1, _) = project_a
3933 .update(cx_a, |p, cx| {
3934 p.find_or_create_local_worktree("/root-1", true, cx)
3935 })
3936 .await
3937 .unwrap();
3938 worktree_1
3939 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3940 .await;
3941 let (worktree_2, _) = project_a
3942 .update(cx_a, |p, cx| {
3943 p.find_or_create_local_worktree("/root-2", true, cx)
3944 })
3945 .await
3946 .unwrap();
3947 worktree_2
3948 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3949 .await;
3950
3951 // Join the worktree as client B.
3952 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3953 let results = project_b
3954 .update(cx_b, |project, cx| {
3955 project.search(SearchQuery::text("world", false, false), cx)
3956 })
3957 .await
3958 .unwrap();
3959
3960 let mut ranges_by_path = results
3961 .into_iter()
3962 .map(|(buffer, ranges)| {
3963 buffer.read_with(cx_b, |buffer, cx| {
3964 let path = buffer.file().unwrap().full_path(cx);
3965 let offset_ranges = ranges
3966 .into_iter()
3967 .map(|range| range.to_offset(buffer))
3968 .collect::<Vec<_>>();
3969 (path, offset_ranges)
3970 })
3971 })
3972 .collect::<Vec<_>>();
3973 ranges_by_path.sort_by_key(|(path, _)| path.clone());
3974
3975 assert_eq!(
3976 ranges_by_path,
3977 &[
3978 (PathBuf::from("root-1/a"), vec![6..11]),
3979 (PathBuf::from("root-1/c"), vec![2..7]),
3980 (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3981 (PathBuf::from("root-2/e"), vec![7..12]),
3982 ]
3983 );
3984 }
3985
3986 #[gpui::test(iterations = 10)]
3987 async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3988 cx_a.foreground().forbid_parking();
3989 let lang_registry = Arc::new(LanguageRegistry::test());
3990 let fs = FakeFs::new(cx_a.background());
3991 fs.insert_tree(
3992 "/root-1",
3993 json!({
3994 "main.rs": "fn double(number: i32) -> i32 { number + number }",
3995 }),
3996 )
3997 .await;
3998
3999 // Set up a fake language server.
4000 let mut language = Language::new(
4001 LanguageConfig {
4002 name: "Rust".into(),
4003 path_suffixes: vec!["rs".to_string()],
4004 ..Default::default()
4005 },
4006 Some(tree_sitter_rust::language()),
4007 );
4008 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4009 lang_registry.add(Arc::new(language));
4010
4011 // Connect to a server as 2 clients.
4012 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4013 let client_a = server.create_client(cx_a, "user_a").await;
4014 let mut client_b = server.create_client(cx_b, "user_b").await;
4015 server
4016 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4017 .await;
4018
4019 // Share a project as client A
4020 let project_a = cx_a.update(|cx| {
4021 Project::local(
4022 client_a.clone(),
4023 client_a.user_store.clone(),
4024 lang_registry.clone(),
4025 fs.clone(),
4026 cx,
4027 )
4028 });
4029 let (worktree_a, _) = project_a
4030 .update(cx_a, |p, cx| {
4031 p.find_or_create_local_worktree("/root-1", true, cx)
4032 })
4033 .await
4034 .unwrap();
4035 worktree_a
4036 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4037 .await;
4038 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4039
4040 // Join the worktree as client B.
4041 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4042
4043 // Open the file on client B.
4044 let buffer_b = cx_b
4045 .background()
4046 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
4047 .await
4048 .unwrap();
4049
4050 // Request document highlights as the guest.
4051 let fake_language_server = fake_language_servers.next().await.unwrap();
4052 fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
4053 |params, _| async move {
4054 assert_eq!(
4055 params
4056 .text_document_position_params
4057 .text_document
4058 .uri
4059 .as_str(),
4060 "file:///root-1/main.rs"
4061 );
4062 assert_eq!(
4063 params.text_document_position_params.position,
4064 lsp::Position::new(0, 34)
4065 );
4066 Ok(Some(vec![
4067 lsp::DocumentHighlight {
4068 kind: Some(lsp::DocumentHighlightKind::WRITE),
4069 range: lsp::Range::new(
4070 lsp::Position::new(0, 10),
4071 lsp::Position::new(0, 16),
4072 ),
4073 },
4074 lsp::DocumentHighlight {
4075 kind: Some(lsp::DocumentHighlightKind::READ),
4076 range: lsp::Range::new(
4077 lsp::Position::new(0, 32),
4078 lsp::Position::new(0, 38),
4079 ),
4080 },
4081 lsp::DocumentHighlight {
4082 kind: Some(lsp::DocumentHighlightKind::READ),
4083 range: lsp::Range::new(
4084 lsp::Position::new(0, 41),
4085 lsp::Position::new(0, 47),
4086 ),
4087 },
4088 ]))
4089 },
4090 );
4091
4092 let highlights = project_b
4093 .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
4094 .await
4095 .unwrap();
4096 buffer_b.read_with(cx_b, |buffer, _| {
4097 let snapshot = buffer.snapshot();
4098
4099 let highlights = highlights
4100 .into_iter()
4101 .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
4102 .collect::<Vec<_>>();
4103 assert_eq!(
4104 highlights,
4105 &[
4106 (lsp::DocumentHighlightKind::WRITE, 10..16),
4107 (lsp::DocumentHighlightKind::READ, 32..38),
4108 (lsp::DocumentHighlightKind::READ, 41..47)
4109 ]
4110 )
4111 });
4112 }
4113
4114 #[gpui::test(iterations = 10)]
4115 async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4116 cx_a.foreground().forbid_parking();
4117 let lang_registry = Arc::new(LanguageRegistry::test());
4118 let fs = FakeFs::new(cx_a.background());
4119 fs.insert_tree(
4120 "/code",
4121 json!({
4122 "crate-1": {
4123 "one.rs": "const ONE: usize = 1;",
4124 },
4125 "crate-2": {
4126 "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
4127 },
4128 "private": {
4129 "passwords.txt": "the-password",
4130 }
4131 }),
4132 )
4133 .await;
4134
4135 // Set up a fake language server.
4136 let mut language = Language::new(
4137 LanguageConfig {
4138 name: "Rust".into(),
4139 path_suffixes: vec!["rs".to_string()],
4140 ..Default::default()
4141 },
4142 Some(tree_sitter_rust::language()),
4143 );
4144 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4145 lang_registry.add(Arc::new(language));
4146
4147 // Connect to a server as 2 clients.
4148 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4149 let client_a = server.create_client(cx_a, "user_a").await;
4150 let mut client_b = server.create_client(cx_b, "user_b").await;
4151 server
4152 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4153 .await;
4154
4155 // Share a project as client A
4156 let project_a = cx_a.update(|cx| {
4157 Project::local(
4158 client_a.clone(),
4159 client_a.user_store.clone(),
4160 lang_registry.clone(),
4161 fs.clone(),
4162 cx,
4163 )
4164 });
4165 let (worktree_a, _) = project_a
4166 .update(cx_a, |p, cx| {
4167 p.find_or_create_local_worktree("/code/crate-1", true, cx)
4168 })
4169 .await
4170 .unwrap();
4171 worktree_a
4172 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4173 .await;
4174 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4175
4176 // Join the worktree as client B.
4177 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4178
4179 // Cause the language server to start.
4180 let _buffer = cx_b
4181 .background()
4182 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
4183 .await
4184 .unwrap();
4185
4186 let fake_language_server = fake_language_servers.next().await.unwrap();
4187 fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
4188 |_, _| async move {
4189 #[allow(deprecated)]
4190 Ok(Some(vec![lsp::SymbolInformation {
4191 name: "TWO".into(),
4192 location: lsp::Location {
4193 uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
4194 range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
4195 },
4196 kind: lsp::SymbolKind::CONSTANT,
4197 tags: None,
4198 container_name: None,
4199 deprecated: None,
4200 }]))
4201 },
4202 );
4203
4204 // Request the definition of a symbol as the guest.
4205 let symbols = project_b
4206 .update(cx_b, |p, cx| p.symbols("two", cx))
4207 .await
4208 .unwrap();
4209 assert_eq!(symbols.len(), 1);
4210 assert_eq!(symbols[0].name, "TWO");
4211
4212 // Open one of the returned symbols.
4213 let buffer_b_2 = project_b
4214 .update(cx_b, |project, cx| {
4215 project.open_buffer_for_symbol(&symbols[0], cx)
4216 })
4217 .await
4218 .unwrap();
4219 buffer_b_2.read_with(cx_b, |buffer, _| {
4220 assert_eq!(
4221 buffer.file().unwrap().path().as_ref(),
4222 Path::new("../crate-2/two.rs")
4223 );
4224 });
4225
4226 // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
4227 let mut fake_symbol = symbols[0].clone();
4228 fake_symbol.path = Path::new("/code/secrets").into();
4229 let error = project_b
4230 .update(cx_b, |project, cx| {
4231 project.open_buffer_for_symbol(&fake_symbol, cx)
4232 })
4233 .await
4234 .unwrap_err();
4235 assert!(error.to_string().contains("invalid symbol signature"));
4236 }
4237
4238 #[gpui::test(iterations = 10)]
4239 async fn test_open_buffer_while_getting_definition_pointing_to_it(
4240 cx_a: &mut TestAppContext,
4241 cx_b: &mut TestAppContext,
4242 mut rng: StdRng,
4243 ) {
4244 cx_a.foreground().forbid_parking();
4245 let lang_registry = Arc::new(LanguageRegistry::test());
4246 let fs = FakeFs::new(cx_a.background());
4247 fs.insert_tree(
4248 "/root",
4249 json!({
4250 "a.rs": "const ONE: usize = b::TWO;",
4251 "b.rs": "const TWO: usize = 2",
4252 }),
4253 )
4254 .await;
4255
4256 // Set up a fake language server.
4257 let mut language = Language::new(
4258 LanguageConfig {
4259 name: "Rust".into(),
4260 path_suffixes: vec!["rs".to_string()],
4261 ..Default::default()
4262 },
4263 Some(tree_sitter_rust::language()),
4264 );
4265 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4266 lang_registry.add(Arc::new(language));
4267
4268 // Connect to a server as 2 clients.
4269 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4270 let client_a = server.create_client(cx_a, "user_a").await;
4271 let mut client_b = server.create_client(cx_b, "user_b").await;
4272 server
4273 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4274 .await;
4275
4276 // Share a project as client A
4277 let project_a = cx_a.update(|cx| {
4278 Project::local(
4279 client_a.clone(),
4280 client_a.user_store.clone(),
4281 lang_registry.clone(),
4282 fs.clone(),
4283 cx,
4284 )
4285 });
4286
4287 let (worktree_a, _) = project_a
4288 .update(cx_a, |p, cx| {
4289 p.find_or_create_local_worktree("/root", true, cx)
4290 })
4291 .await
4292 .unwrap();
4293 worktree_a
4294 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4295 .await;
4296 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4297
4298 // Join the project as client B.
4299 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4300
4301 let buffer_b1 = cx_b
4302 .background()
4303 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
4304 .await
4305 .unwrap();
4306
4307 let fake_language_server = fake_language_servers.next().await.unwrap();
4308 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
4309 |_, _| async move {
4310 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
4311 lsp::Location::new(
4312 lsp::Url::from_file_path("/root/b.rs").unwrap(),
4313 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
4314 ),
4315 )))
4316 },
4317 );
4318
4319 let definitions;
4320 let buffer_b2;
4321 if rng.gen() {
4322 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4323 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4324 } else {
4325 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4326 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4327 }
4328
4329 let buffer_b2 = buffer_b2.await.unwrap();
4330 let definitions = definitions.await.unwrap();
4331 assert_eq!(definitions.len(), 1);
4332 assert_eq!(definitions[0].buffer, buffer_b2);
4333 }
4334
4335 #[gpui::test(iterations = 10)]
4336 async fn test_collaborating_with_code_actions(
4337 cx_a: &mut TestAppContext,
4338 cx_b: &mut TestAppContext,
4339 ) {
4340 cx_a.foreground().forbid_parking();
4341 let lang_registry = Arc::new(LanguageRegistry::test());
4342 let fs = FakeFs::new(cx_a.background());
4343 cx_b.update(|cx| editor::init(cx));
4344
4345 // Set up a fake language server.
4346 let mut language = Language::new(
4347 LanguageConfig {
4348 name: "Rust".into(),
4349 path_suffixes: vec!["rs".to_string()],
4350 ..Default::default()
4351 },
4352 Some(tree_sitter_rust::language()),
4353 );
4354 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4355 lang_registry.add(Arc::new(language));
4356
4357 // Connect to a server as 2 clients.
4358 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4359 let client_a = server.create_client(cx_a, "user_a").await;
4360 let mut client_b = server.create_client(cx_b, "user_b").await;
4361 server
4362 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4363 .await;
4364
4365 // Share a project as client A
4366 fs.insert_tree(
4367 "/a",
4368 json!({
4369 "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
4370 "other.rs": "pub fn foo() -> usize { 4 }",
4371 }),
4372 )
4373 .await;
4374 let project_a = cx_a.update(|cx| {
4375 Project::local(
4376 client_a.clone(),
4377 client_a.user_store.clone(),
4378 lang_registry.clone(),
4379 fs.clone(),
4380 cx,
4381 )
4382 });
4383 let (worktree_a, _) = project_a
4384 .update(cx_a, |p, cx| {
4385 p.find_or_create_local_worktree("/a", true, cx)
4386 })
4387 .await
4388 .unwrap();
4389 worktree_a
4390 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4391 .await;
4392 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4393
4394 // Join the project as client B.
4395 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4396 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(project_b.clone(), cx));
4397 let editor_b = workspace_b
4398 .update(cx_b, |workspace, cx| {
4399 workspace.open_path((worktree_id, "main.rs"), true, cx)
4400 })
4401 .await
4402 .unwrap()
4403 .downcast::<Editor>()
4404 .unwrap();
4405
4406 let mut fake_language_server = fake_language_servers.next().await.unwrap();
4407 fake_language_server
4408 .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4409 assert_eq!(
4410 params.text_document.uri,
4411 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4412 );
4413 assert_eq!(params.range.start, lsp::Position::new(0, 0));
4414 assert_eq!(params.range.end, lsp::Position::new(0, 0));
4415 Ok(None)
4416 })
4417 .next()
4418 .await;
4419
4420 // Move cursor to a location that contains code actions.
4421 editor_b.update(cx_b, |editor, cx| {
4422 editor.change_selections(None, cx, |s| {
4423 s.select_ranges([Point::new(1, 31)..Point::new(1, 31)])
4424 });
4425 cx.focus(&editor_b);
4426 });
4427
4428 fake_language_server
4429 .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4430 assert_eq!(
4431 params.text_document.uri,
4432 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4433 );
4434 assert_eq!(params.range.start, lsp::Position::new(1, 31));
4435 assert_eq!(params.range.end, lsp::Position::new(1, 31));
4436
4437 Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
4438 lsp::CodeAction {
4439 title: "Inline into all callers".to_string(),
4440 edit: Some(lsp::WorkspaceEdit {
4441 changes: Some(
4442 [
4443 (
4444 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4445 vec![lsp::TextEdit::new(
4446 lsp::Range::new(
4447 lsp::Position::new(1, 22),
4448 lsp::Position::new(1, 34),
4449 ),
4450 "4".to_string(),
4451 )],
4452 ),
4453 (
4454 lsp::Url::from_file_path("/a/other.rs").unwrap(),
4455 vec![lsp::TextEdit::new(
4456 lsp::Range::new(
4457 lsp::Position::new(0, 0),
4458 lsp::Position::new(0, 27),
4459 ),
4460 "".to_string(),
4461 )],
4462 ),
4463 ]
4464 .into_iter()
4465 .collect(),
4466 ),
4467 ..Default::default()
4468 }),
4469 data: Some(json!({
4470 "codeActionParams": {
4471 "range": {
4472 "start": {"line": 1, "column": 31},
4473 "end": {"line": 1, "column": 31},
4474 }
4475 }
4476 })),
4477 ..Default::default()
4478 },
4479 )]))
4480 })
4481 .next()
4482 .await;
4483
4484 // Toggle code actions and wait for them to display.
4485 editor_b.update(cx_b, |editor, cx| {
4486 editor.toggle_code_actions(
4487 &ToggleCodeActions {
4488 deployed_from_indicator: false,
4489 },
4490 cx,
4491 );
4492 });
4493 editor_b
4494 .condition(&cx_b, |editor, _| editor.context_menu_visible())
4495 .await;
4496
4497 fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
4498
4499 // Confirming the code action will trigger a resolve request.
4500 let confirm_action = workspace_b
4501 .update(cx_b, |workspace, cx| {
4502 Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
4503 })
4504 .unwrap();
4505 fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
4506 |_, _| async move {
4507 Ok(lsp::CodeAction {
4508 title: "Inline into all callers".to_string(),
4509 edit: Some(lsp::WorkspaceEdit {
4510 changes: Some(
4511 [
4512 (
4513 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4514 vec![lsp::TextEdit::new(
4515 lsp::Range::new(
4516 lsp::Position::new(1, 22),
4517 lsp::Position::new(1, 34),
4518 ),
4519 "4".to_string(),
4520 )],
4521 ),
4522 (
4523 lsp::Url::from_file_path("/a/other.rs").unwrap(),
4524 vec![lsp::TextEdit::new(
4525 lsp::Range::new(
4526 lsp::Position::new(0, 0),
4527 lsp::Position::new(0, 27),
4528 ),
4529 "".to_string(),
4530 )],
4531 ),
4532 ]
4533 .into_iter()
4534 .collect(),
4535 ),
4536 ..Default::default()
4537 }),
4538 ..Default::default()
4539 })
4540 },
4541 );
4542
4543 // After the action is confirmed, an editor containing both modified files is opened.
4544 confirm_action.await.unwrap();
4545 let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4546 workspace
4547 .active_item(cx)
4548 .unwrap()
4549 .downcast::<Editor>()
4550 .unwrap()
4551 });
4552 code_action_editor.update(cx_b, |editor, cx| {
4553 assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4554 editor.undo(&Undo, cx);
4555 assert_eq!(
4556 editor.text(cx),
4557 "mod other;\nfn main() { let foo = other::foo(); }\npub fn foo() -> usize { 4 }"
4558 );
4559 editor.redo(&Redo, cx);
4560 assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4561 });
4562 }
4563
4564 #[gpui::test(iterations = 10)]
4565 async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4566 cx_a.foreground().forbid_parking();
4567 let lang_registry = Arc::new(LanguageRegistry::test());
4568 let fs = FakeFs::new(cx_a.background());
4569 cx_b.update(|cx| editor::init(cx));
4570
4571 // Set up a fake language server.
4572 let mut language = Language::new(
4573 LanguageConfig {
4574 name: "Rust".into(),
4575 path_suffixes: vec!["rs".to_string()],
4576 ..Default::default()
4577 },
4578 Some(tree_sitter_rust::language()),
4579 );
4580 let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
4581 capabilities: lsp::ServerCapabilities {
4582 rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
4583 prepare_provider: Some(true),
4584 work_done_progress_options: Default::default(),
4585 })),
4586 ..Default::default()
4587 },
4588 ..Default::default()
4589 });
4590 lang_registry.add(Arc::new(language));
4591
4592 // Connect to a server as 2 clients.
4593 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4594 let client_a = server.create_client(cx_a, "user_a").await;
4595 let mut client_b = server.create_client(cx_b, "user_b").await;
4596 server
4597 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4598 .await;
4599
4600 // Share a project as client A
4601 fs.insert_tree(
4602 "/dir",
4603 json!({
4604 "one.rs": "const ONE: usize = 1;",
4605 "two.rs": "const TWO: usize = one::ONE + one::ONE;"
4606 }),
4607 )
4608 .await;
4609 let project_a = cx_a.update(|cx| {
4610 Project::local(
4611 client_a.clone(),
4612 client_a.user_store.clone(),
4613 lang_registry.clone(),
4614 fs.clone(),
4615 cx,
4616 )
4617 });
4618 let (worktree_a, _) = project_a
4619 .update(cx_a, |p, cx| {
4620 p.find_or_create_local_worktree("/dir", true, cx)
4621 })
4622 .await
4623 .unwrap();
4624 worktree_a
4625 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4626 .await;
4627 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4628
4629 // Join the worktree as client B.
4630 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4631 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(project_b.clone(), cx));
4632 let editor_b = workspace_b
4633 .update(cx_b, |workspace, cx| {
4634 workspace.open_path((worktree_id, "one.rs"), true, cx)
4635 })
4636 .await
4637 .unwrap()
4638 .downcast::<Editor>()
4639 .unwrap();
4640 let fake_language_server = fake_language_servers.next().await.unwrap();
4641
4642 // Move cursor to a location that can be renamed.
4643 let prepare_rename = editor_b.update(cx_b, |editor, cx| {
4644 editor.change_selections(None, cx, |s| s.select_ranges([7..7]));
4645 editor.rename(&Rename, cx).unwrap()
4646 });
4647
4648 fake_language_server
4649 .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
4650 assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
4651 assert_eq!(params.position, lsp::Position::new(0, 7));
4652 Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4653 lsp::Position::new(0, 6),
4654 lsp::Position::new(0, 9),
4655 ))))
4656 })
4657 .next()
4658 .await
4659 .unwrap();
4660 prepare_rename.await.unwrap();
4661 editor_b.update(cx_b, |editor, cx| {
4662 let rename = editor.pending_rename().unwrap();
4663 let buffer = editor.buffer().read(cx).snapshot(cx);
4664 assert_eq!(
4665 rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
4666 6..9
4667 );
4668 rename.editor.update(cx, |rename_editor, cx| {
4669 rename_editor.buffer().update(cx, |rename_buffer, cx| {
4670 rename_buffer.edit([(0..3, "THREE")], cx);
4671 });
4672 });
4673 });
4674
4675 let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
4676 Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
4677 });
4678 fake_language_server
4679 .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
4680 assert_eq!(
4681 params.text_document_position.text_document.uri.as_str(),
4682 "file:///dir/one.rs"
4683 );
4684 assert_eq!(
4685 params.text_document_position.position,
4686 lsp::Position::new(0, 6)
4687 );
4688 assert_eq!(params.new_name, "THREE");
4689 Ok(Some(lsp::WorkspaceEdit {
4690 changes: Some(
4691 [
4692 (
4693 lsp::Url::from_file_path("/dir/one.rs").unwrap(),
4694 vec![lsp::TextEdit::new(
4695 lsp::Range::new(
4696 lsp::Position::new(0, 6),
4697 lsp::Position::new(0, 9),
4698 ),
4699 "THREE".to_string(),
4700 )],
4701 ),
4702 (
4703 lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4704 vec![
4705 lsp::TextEdit::new(
4706 lsp::Range::new(
4707 lsp::Position::new(0, 24),
4708 lsp::Position::new(0, 27),
4709 ),
4710 "THREE".to_string(),
4711 ),
4712 lsp::TextEdit::new(
4713 lsp::Range::new(
4714 lsp::Position::new(0, 35),
4715 lsp::Position::new(0, 38),
4716 ),
4717 "THREE".to_string(),
4718 ),
4719 ],
4720 ),
4721 ]
4722 .into_iter()
4723 .collect(),
4724 ),
4725 ..Default::default()
4726 }))
4727 })
4728 .next()
4729 .await
4730 .unwrap();
4731 confirm_rename.await.unwrap();
4732
4733 let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4734 workspace
4735 .active_item(cx)
4736 .unwrap()
4737 .downcast::<Editor>()
4738 .unwrap()
4739 });
4740 rename_editor.update(cx_b, |editor, cx| {
4741 assert_eq!(
4742 editor.text(cx),
4743 "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4744 );
4745 editor.undo(&Undo, cx);
4746 assert_eq!(
4747 editor.text(cx),
4748 "const ONE: usize = 1;\nconst TWO: usize = one::ONE + one::ONE;"
4749 );
4750 editor.redo(&Redo, cx);
4751 assert_eq!(
4752 editor.text(cx),
4753 "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4754 );
4755 });
4756
4757 // Ensure temporary rename edits cannot be undone/redone.
4758 editor_b.update(cx_b, |editor, cx| {
4759 editor.undo(&Undo, cx);
4760 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4761 editor.undo(&Undo, cx);
4762 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4763 editor.redo(&Redo, cx);
4764 assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4765 })
4766 }
4767
4768 #[gpui::test(iterations = 10)]
4769 async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4770 cx_a.foreground().forbid_parking();
4771
4772 // Connect to a server as 2 clients.
4773 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4774 let client_a = server.create_client(cx_a, "user_a").await;
4775 let client_b = server.create_client(cx_b, "user_b").await;
4776
4777 // Create an org that includes these 2 users.
4778 let db = &server.app_state.db;
4779 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4780 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4781 .await
4782 .unwrap();
4783 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4784 .await
4785 .unwrap();
4786
4787 // Create a channel that includes all the users.
4788 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4789 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4790 .await
4791 .unwrap();
4792 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4793 .await
4794 .unwrap();
4795 db.create_channel_message(
4796 channel_id,
4797 client_b.current_user_id(&cx_b),
4798 "hello A, it's B.",
4799 OffsetDateTime::now_utc(),
4800 1,
4801 )
4802 .await
4803 .unwrap();
4804
4805 let channels_a = cx_a
4806 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4807 channels_a
4808 .condition(cx_a, |list, _| list.available_channels().is_some())
4809 .await;
4810 channels_a.read_with(cx_a, |list, _| {
4811 assert_eq!(
4812 list.available_channels().unwrap(),
4813 &[ChannelDetails {
4814 id: channel_id.to_proto(),
4815 name: "test-channel".to_string()
4816 }]
4817 )
4818 });
4819 let channel_a = channels_a.update(cx_a, |this, cx| {
4820 this.get_channel(channel_id.to_proto(), cx).unwrap()
4821 });
4822 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4823 channel_a
4824 .condition(&cx_a, |channel, _| {
4825 channel_messages(channel)
4826 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4827 })
4828 .await;
4829
4830 let channels_b = cx_b
4831 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4832 channels_b
4833 .condition(cx_b, |list, _| list.available_channels().is_some())
4834 .await;
4835 channels_b.read_with(cx_b, |list, _| {
4836 assert_eq!(
4837 list.available_channels().unwrap(),
4838 &[ChannelDetails {
4839 id: channel_id.to_proto(),
4840 name: "test-channel".to_string()
4841 }]
4842 )
4843 });
4844
4845 let channel_b = channels_b.update(cx_b, |this, cx| {
4846 this.get_channel(channel_id.to_proto(), cx).unwrap()
4847 });
4848 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4849 channel_b
4850 .condition(&cx_b, |channel, _| {
4851 channel_messages(channel)
4852 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4853 })
4854 .await;
4855
4856 channel_a
4857 .update(cx_a, |channel, cx| {
4858 channel
4859 .send_message("oh, hi B.".to_string(), cx)
4860 .unwrap()
4861 .detach();
4862 let task = channel.send_message("sup".to_string(), cx).unwrap();
4863 assert_eq!(
4864 channel_messages(channel),
4865 &[
4866 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4867 ("user_a".to_string(), "oh, hi B.".to_string(), true),
4868 ("user_a".to_string(), "sup".to_string(), true)
4869 ]
4870 );
4871 task
4872 })
4873 .await
4874 .unwrap();
4875
4876 channel_b
4877 .condition(&cx_b, |channel, _| {
4878 channel_messages(channel)
4879 == [
4880 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4881 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4882 ("user_a".to_string(), "sup".to_string(), false),
4883 ]
4884 })
4885 .await;
4886
4887 assert_eq!(
4888 server
4889 .state()
4890 .await
4891 .channel(channel_id)
4892 .unwrap()
4893 .connection_ids
4894 .len(),
4895 2
4896 );
4897 cx_b.update(|_| drop(channel_b));
4898 server
4899 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4900 .await;
4901
4902 cx_a.update(|_| drop(channel_a));
4903 server
4904 .condition(|state| state.channel(channel_id).is_none())
4905 .await;
4906 }
4907
4908 #[gpui::test(iterations = 10)]
4909 async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4910 cx_a.foreground().forbid_parking();
4911
4912 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4913 let client_a = server.create_client(cx_a, "user_a").await;
4914
4915 let db = &server.app_state.db;
4916 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4917 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4918 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4919 .await
4920 .unwrap();
4921 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4922 .await
4923 .unwrap();
4924
4925 let channels_a = cx_a
4926 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4927 channels_a
4928 .condition(cx_a, |list, _| list.available_channels().is_some())
4929 .await;
4930 let channel_a = channels_a.update(cx_a, |this, cx| {
4931 this.get_channel(channel_id.to_proto(), cx).unwrap()
4932 });
4933
4934 // Messages aren't allowed to be too long.
4935 channel_a
4936 .update(cx_a, |channel, cx| {
4937 let long_body = "this is long.\n".repeat(1024);
4938 channel.send_message(long_body, cx).unwrap()
4939 })
4940 .await
4941 .unwrap_err();
4942
4943 // Messages aren't allowed to be blank.
4944 channel_a.update(cx_a, |channel, cx| {
4945 channel.send_message(String::new(), cx).unwrap_err()
4946 });
4947
4948 // Leading and trailing whitespace are trimmed.
4949 channel_a
4950 .update(cx_a, |channel, cx| {
4951 channel
4952 .send_message("\n surrounded by whitespace \n".to_string(), cx)
4953 .unwrap()
4954 })
4955 .await
4956 .unwrap();
4957 assert_eq!(
4958 db.get_channel_messages(channel_id, 10, None)
4959 .await
4960 .unwrap()
4961 .iter()
4962 .map(|m| &m.body)
4963 .collect::<Vec<_>>(),
4964 &["surrounded by whitespace"]
4965 );
4966 }
4967
4968 #[gpui::test(iterations = 10)]
4969 async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4970 cx_a.foreground().forbid_parking();
4971
4972 // Connect to a server as 2 clients.
4973 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4974 let client_a = server.create_client(cx_a, "user_a").await;
4975 let client_b = server.create_client(cx_b, "user_b").await;
4976 let mut status_b = client_b.status();
4977
4978 // Create an org that includes these 2 users.
4979 let db = &server.app_state.db;
4980 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4981 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4982 .await
4983 .unwrap();
4984 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4985 .await
4986 .unwrap();
4987
4988 // Create a channel that includes all the users.
4989 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4990 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4991 .await
4992 .unwrap();
4993 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4994 .await
4995 .unwrap();
4996 db.create_channel_message(
4997 channel_id,
4998 client_b.current_user_id(&cx_b),
4999 "hello A, it's B.",
5000 OffsetDateTime::now_utc(),
5001 2,
5002 )
5003 .await
5004 .unwrap();
5005
5006 let channels_a = cx_a
5007 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
5008 channels_a
5009 .condition(cx_a, |list, _| list.available_channels().is_some())
5010 .await;
5011
5012 channels_a.read_with(cx_a, |list, _| {
5013 assert_eq!(
5014 list.available_channels().unwrap(),
5015 &[ChannelDetails {
5016 id: channel_id.to_proto(),
5017 name: "test-channel".to_string()
5018 }]
5019 )
5020 });
5021 let channel_a = channels_a.update(cx_a, |this, cx| {
5022 this.get_channel(channel_id.to_proto(), cx).unwrap()
5023 });
5024 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
5025 channel_a
5026 .condition(&cx_a, |channel, _| {
5027 channel_messages(channel)
5028 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
5029 })
5030 .await;
5031
5032 let channels_b = cx_b
5033 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
5034 channels_b
5035 .condition(cx_b, |list, _| list.available_channels().is_some())
5036 .await;
5037 channels_b.read_with(cx_b, |list, _| {
5038 assert_eq!(
5039 list.available_channels().unwrap(),
5040 &[ChannelDetails {
5041 id: channel_id.to_proto(),
5042 name: "test-channel".to_string()
5043 }]
5044 )
5045 });
5046
5047 let channel_b = channels_b.update(cx_b, |this, cx| {
5048 this.get_channel(channel_id.to_proto(), cx).unwrap()
5049 });
5050 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
5051 channel_b
5052 .condition(&cx_b, |channel, _| {
5053 channel_messages(channel)
5054 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
5055 })
5056 .await;
5057
5058 // Disconnect client B, ensuring we can still access its cached channel data.
5059 server.forbid_connections();
5060 server.disconnect_client(client_b.current_user_id(&cx_b));
5061 cx_b.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
5062 while !matches!(
5063 status_b.next().await,
5064 Some(client::Status::ReconnectionError { .. })
5065 ) {}
5066
5067 channels_b.read_with(cx_b, |channels, _| {
5068 assert_eq!(
5069 channels.available_channels().unwrap(),
5070 [ChannelDetails {
5071 id: channel_id.to_proto(),
5072 name: "test-channel".to_string()
5073 }]
5074 )
5075 });
5076 channel_b.read_with(cx_b, |channel, _| {
5077 assert_eq!(
5078 channel_messages(channel),
5079 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
5080 )
5081 });
5082
5083 // Send a message from client B while it is disconnected.
5084 channel_b
5085 .update(cx_b, |channel, cx| {
5086 let task = channel
5087 .send_message("can you see this?".to_string(), cx)
5088 .unwrap();
5089 assert_eq!(
5090 channel_messages(channel),
5091 &[
5092 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5093 ("user_b".to_string(), "can you see this?".to_string(), true)
5094 ]
5095 );
5096 task
5097 })
5098 .await
5099 .unwrap_err();
5100
5101 // Send a message from client A while B is disconnected.
5102 channel_a
5103 .update(cx_a, |channel, cx| {
5104 channel
5105 .send_message("oh, hi B.".to_string(), cx)
5106 .unwrap()
5107 .detach();
5108 let task = channel.send_message("sup".to_string(), cx).unwrap();
5109 assert_eq!(
5110 channel_messages(channel),
5111 &[
5112 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5113 ("user_a".to_string(), "oh, hi B.".to_string(), true),
5114 ("user_a".to_string(), "sup".to_string(), true)
5115 ]
5116 );
5117 task
5118 })
5119 .await
5120 .unwrap();
5121
5122 // Give client B a chance to reconnect.
5123 server.allow_connections();
5124 cx_b.foreground().advance_clock(Duration::from_secs(10));
5125
5126 // Verify that B sees the new messages upon reconnection, as well as the message client B
5127 // sent while offline.
5128 channel_b
5129 .condition(&cx_b, |channel, _| {
5130 channel_messages(channel)
5131 == [
5132 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5133 ("user_a".to_string(), "oh, hi B.".to_string(), false),
5134 ("user_a".to_string(), "sup".to_string(), false),
5135 ("user_b".to_string(), "can you see this?".to_string(), false),
5136 ]
5137 })
5138 .await;
5139
5140 // Ensure client A and B can communicate normally after reconnection.
5141 channel_a
5142 .update(cx_a, |channel, cx| {
5143 channel.send_message("you online?".to_string(), cx).unwrap()
5144 })
5145 .await
5146 .unwrap();
5147 channel_b
5148 .condition(&cx_b, |channel, _| {
5149 channel_messages(channel)
5150 == [
5151 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5152 ("user_a".to_string(), "oh, hi B.".to_string(), false),
5153 ("user_a".to_string(), "sup".to_string(), false),
5154 ("user_b".to_string(), "can you see this?".to_string(), false),
5155 ("user_a".to_string(), "you online?".to_string(), false),
5156 ]
5157 })
5158 .await;
5159
5160 channel_b
5161 .update(cx_b, |channel, cx| {
5162 channel.send_message("yep".to_string(), cx).unwrap()
5163 })
5164 .await
5165 .unwrap();
5166 channel_a
5167 .condition(&cx_a, |channel, _| {
5168 channel_messages(channel)
5169 == [
5170 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5171 ("user_a".to_string(), "oh, hi B.".to_string(), false),
5172 ("user_a".to_string(), "sup".to_string(), false),
5173 ("user_b".to_string(), "can you see this?".to_string(), false),
5174 ("user_a".to_string(), "you online?".to_string(), false),
5175 ("user_b".to_string(), "yep".to_string(), false),
5176 ]
5177 })
5178 .await;
5179 }
5180
5181 #[gpui::test(iterations = 10)]
5182 async fn test_contacts(
5183 deterministic: Arc<Deterministic>,
5184 cx_a: &mut TestAppContext,
5185 cx_b: &mut TestAppContext,
5186 cx_c: &mut TestAppContext,
5187 ) {
5188 cx_a.foreground().forbid_parking();
5189
5190 // Connect to a server as 3 clients.
5191 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5192 let mut client_a = server.create_client(cx_a, "user_a").await;
5193 let mut client_b = server.create_client(cx_b, "user_b").await;
5194 let client_c = server.create_client(cx_c, "user_c").await;
5195 server
5196 .make_contacts(vec![
5197 (&client_a, cx_a),
5198 (&client_b, cx_b),
5199 (&client_c, cx_c),
5200 ])
5201 .await;
5202
5203 deterministic.run_until_parked();
5204 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5205 client.user_store.read_with(*cx, |store, _| {
5206 assert_eq!(
5207 contacts(store),
5208 [
5209 ("user_a", true, vec![]),
5210 ("user_b", true, vec![]),
5211 ("user_c", true, vec![])
5212 ],
5213 "{} has the wrong contacts",
5214 client.username
5215 )
5216 });
5217 }
5218
5219 // Share a project as client A.
5220 let fs = FakeFs::new(cx_a.background());
5221 fs.create_dir(Path::new("/a")).await.unwrap();
5222 let (project_a, _) = client_a.build_local_project(fs, "/a", cx_a).await;
5223
5224 deterministic.run_until_parked();
5225 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5226 client.user_store.read_with(*cx, |store, _| {
5227 assert_eq!(
5228 contacts(store),
5229 [
5230 ("user_a", true, vec![("a", vec![])]),
5231 ("user_b", true, vec![]),
5232 ("user_c", true, vec![])
5233 ],
5234 "{} has the wrong contacts",
5235 client.username
5236 )
5237 });
5238 }
5239
5240 let _project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5241
5242 deterministic.run_until_parked();
5243 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5244 client.user_store.read_with(*cx, |store, _| {
5245 assert_eq!(
5246 contacts(store),
5247 [
5248 ("user_a", true, vec![("a", vec!["user_b"])]),
5249 ("user_b", true, vec![]),
5250 ("user_c", true, vec![])
5251 ],
5252 "{} has the wrong contacts",
5253 client.username
5254 )
5255 });
5256 }
5257
5258 // Add a local project as client B
5259 let fs = FakeFs::new(cx_b.background());
5260 fs.create_dir(Path::new("/b")).await.unwrap();
5261 let (_project_b, _) = client_b.build_local_project(fs, "/b", cx_a).await;
5262
5263 deterministic.run_until_parked();
5264 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5265 client.user_store.read_with(*cx, |store, _| {
5266 assert_eq!(
5267 contacts(store),
5268 [
5269 ("user_a", true, vec![("a", vec!["user_b"])]),
5270 ("user_b", true, vec![("b", vec![])]),
5271 ("user_c", true, vec![])
5272 ],
5273 "{} has the wrong contacts",
5274 client.username
5275 )
5276 });
5277 }
5278
5279 project_a
5280 .condition(&cx_a, |project, _| {
5281 project.collaborators().contains_key(&client_b.peer_id)
5282 })
5283 .await;
5284
5285 client_a.project.take();
5286 cx_a.update(move |_| drop(project_a));
5287 deterministic.run_until_parked();
5288 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5289 client.user_store.read_with(*cx, |store, _| {
5290 assert_eq!(
5291 contacts(store),
5292 [
5293 ("user_a", true, vec![]),
5294 ("user_b", true, vec![("b", vec![])]),
5295 ("user_c", true, vec![])
5296 ],
5297 "{} has the wrong contacts",
5298 client.username
5299 )
5300 });
5301 }
5302
5303 server.disconnect_client(client_c.current_user_id(cx_c));
5304 server.forbid_connections();
5305 deterministic.advance_clock(rpc::RECEIVE_TIMEOUT);
5306 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b)] {
5307 client.user_store.read_with(*cx, |store, _| {
5308 assert_eq!(
5309 contacts(store),
5310 [
5311 ("user_a", true, vec![]),
5312 ("user_b", true, vec![("b", vec![])]),
5313 ("user_c", false, vec![])
5314 ],
5315 "{} has the wrong contacts",
5316 client.username
5317 )
5318 });
5319 }
5320 client_c
5321 .user_store
5322 .read_with(cx_c, |store, _| assert_eq!(contacts(store), []));
5323
5324 server.allow_connections();
5325 client_c
5326 .authenticate_and_connect(false, &cx_c.to_async())
5327 .await
5328 .unwrap();
5329
5330 deterministic.run_until_parked();
5331 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5332 client.user_store.read_with(*cx, |store, _| {
5333 assert_eq!(
5334 contacts(store),
5335 [
5336 ("user_a", true, vec![]),
5337 ("user_b", true, vec![("b", vec![])]),
5338 ("user_c", true, vec![])
5339 ],
5340 "{} has the wrong contacts",
5341 client.username
5342 )
5343 });
5344 }
5345
5346 fn contacts(user_store: &UserStore) -> Vec<(&str, bool, Vec<(&str, Vec<&str>)>)> {
5347 user_store
5348 .contacts()
5349 .iter()
5350 .map(|contact| {
5351 let projects = contact
5352 .projects
5353 .iter()
5354 .map(|p| {
5355 (
5356 p.worktree_root_names[0].as_str(),
5357 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
5358 )
5359 })
5360 .collect();
5361 (contact.user.github_login.as_str(), contact.online, projects)
5362 })
5363 .collect()
5364 }
5365 }
5366
5367 #[gpui::test(iterations = 10)]
5368 async fn test_contact_requests(
5369 executor: Arc<Deterministic>,
5370 cx_a: &mut TestAppContext,
5371 cx_a2: &mut TestAppContext,
5372 cx_b: &mut TestAppContext,
5373 cx_b2: &mut TestAppContext,
5374 cx_c: &mut TestAppContext,
5375 cx_c2: &mut TestAppContext,
5376 ) {
5377 cx_a.foreground().forbid_parking();
5378
5379 // Connect to a server as 3 clients.
5380 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5381 let client_a = server.create_client(cx_a, "user_a").await;
5382 let client_a2 = server.create_client(cx_a2, "user_a").await;
5383 let client_b = server.create_client(cx_b, "user_b").await;
5384 let client_b2 = server.create_client(cx_b2, "user_b").await;
5385 let client_c = server.create_client(cx_c, "user_c").await;
5386 let client_c2 = server.create_client(cx_c2, "user_c").await;
5387
5388 assert_eq!(client_a.user_id().unwrap(), client_a2.user_id().unwrap());
5389 assert_eq!(client_b.user_id().unwrap(), client_b2.user_id().unwrap());
5390 assert_eq!(client_c.user_id().unwrap(), client_c2.user_id().unwrap());
5391
5392 // User A and User C request that user B become their contact.
5393 client_a
5394 .user_store
5395 .update(cx_a, |store, cx| {
5396 store.request_contact(client_b.user_id().unwrap(), cx)
5397 })
5398 .await
5399 .unwrap();
5400 client_c
5401 .user_store
5402 .update(cx_c, |store, cx| {
5403 store.request_contact(client_b.user_id().unwrap(), cx)
5404 })
5405 .await
5406 .unwrap();
5407 executor.run_until_parked();
5408
5409 // All users see the pending request appear in all their clients.
5410 assert_eq!(
5411 client_a.summarize_contacts(&cx_a).outgoing_requests,
5412 &["user_b"]
5413 );
5414 assert_eq!(
5415 client_a2.summarize_contacts(&cx_a2).outgoing_requests,
5416 &["user_b"]
5417 );
5418 assert_eq!(
5419 client_b.summarize_contacts(&cx_b).incoming_requests,
5420 &["user_a", "user_c"]
5421 );
5422 assert_eq!(
5423 client_b2.summarize_contacts(&cx_b2).incoming_requests,
5424 &["user_a", "user_c"]
5425 );
5426 assert_eq!(
5427 client_c.summarize_contacts(&cx_c).outgoing_requests,
5428 &["user_b"]
5429 );
5430 assert_eq!(
5431 client_c2.summarize_contacts(&cx_c2).outgoing_requests,
5432 &["user_b"]
5433 );
5434
5435 // Contact requests are present upon connecting (tested here via disconnect/reconnect)
5436 disconnect_and_reconnect(&client_a, cx_a).await;
5437 disconnect_and_reconnect(&client_b, cx_b).await;
5438 disconnect_and_reconnect(&client_c, cx_c).await;
5439 executor.run_until_parked();
5440 assert_eq!(
5441 client_a.summarize_contacts(&cx_a).outgoing_requests,
5442 &["user_b"]
5443 );
5444 assert_eq!(
5445 client_b.summarize_contacts(&cx_b).incoming_requests,
5446 &["user_a", "user_c"]
5447 );
5448 assert_eq!(
5449 client_c.summarize_contacts(&cx_c).outgoing_requests,
5450 &["user_b"]
5451 );
5452
5453 // User B accepts the request from user A.
5454 client_b
5455 .user_store
5456 .update(cx_b, |store, cx| {
5457 store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
5458 })
5459 .await
5460 .unwrap();
5461
5462 executor.run_until_parked();
5463
5464 // User B sees user A as their contact now in all client, and the incoming request from them is removed.
5465 let contacts_b = client_b.summarize_contacts(&cx_b);
5466 assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5467 assert_eq!(contacts_b.incoming_requests, &["user_c"]);
5468 let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5469 assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5470 assert_eq!(contacts_b2.incoming_requests, &["user_c"]);
5471
5472 // User A sees user B as their contact now in all clients, and the outgoing request to them is removed.
5473 let contacts_a = client_a.summarize_contacts(&cx_a);
5474 assert_eq!(contacts_a.current, &["user_a", "user_b"]);
5475 assert!(contacts_a.outgoing_requests.is_empty());
5476 let contacts_a2 = client_a2.summarize_contacts(&cx_a2);
5477 assert_eq!(contacts_a2.current, &["user_a", "user_b"]);
5478 assert!(contacts_a2.outgoing_requests.is_empty());
5479
5480 // Contacts are present upon connecting (tested here via disconnect/reconnect)
5481 disconnect_and_reconnect(&client_a, cx_a).await;
5482 disconnect_and_reconnect(&client_b, cx_b).await;
5483 disconnect_and_reconnect(&client_c, cx_c).await;
5484 executor.run_until_parked();
5485 assert_eq!(
5486 client_a.summarize_contacts(&cx_a).current,
5487 &["user_a", "user_b"]
5488 );
5489 assert_eq!(
5490 client_b.summarize_contacts(&cx_b).current,
5491 &["user_a", "user_b"]
5492 );
5493 assert_eq!(
5494 client_b.summarize_contacts(&cx_b).incoming_requests,
5495 &["user_c"]
5496 );
5497 assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5498 assert_eq!(
5499 client_c.summarize_contacts(&cx_c).outgoing_requests,
5500 &["user_b"]
5501 );
5502
5503 // User B rejects the request from user C.
5504 client_b
5505 .user_store
5506 .update(cx_b, |store, cx| {
5507 store.respond_to_contact_request(client_c.user_id().unwrap(), false, cx)
5508 })
5509 .await
5510 .unwrap();
5511
5512 executor.run_until_parked();
5513
5514 // User B doesn't see user C as their contact, and the incoming request from them is removed.
5515 let contacts_b = client_b.summarize_contacts(&cx_b);
5516 assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5517 assert!(contacts_b.incoming_requests.is_empty());
5518 let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5519 assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5520 assert!(contacts_b2.incoming_requests.is_empty());
5521
5522 // User C doesn't see user B as their contact, and the outgoing request to them is removed.
5523 let contacts_c = client_c.summarize_contacts(&cx_c);
5524 assert_eq!(contacts_c.current, &["user_c"]);
5525 assert!(contacts_c.outgoing_requests.is_empty());
5526 let contacts_c2 = client_c2.summarize_contacts(&cx_c2);
5527 assert_eq!(contacts_c2.current, &["user_c"]);
5528 assert!(contacts_c2.outgoing_requests.is_empty());
5529
5530 // Incoming/outgoing requests are not present upon connecting (tested here via disconnect/reconnect)
5531 disconnect_and_reconnect(&client_a, cx_a).await;
5532 disconnect_and_reconnect(&client_b, cx_b).await;
5533 disconnect_and_reconnect(&client_c, cx_c).await;
5534 executor.run_until_parked();
5535 assert_eq!(
5536 client_a.summarize_contacts(&cx_a).current,
5537 &["user_a", "user_b"]
5538 );
5539 assert_eq!(
5540 client_b.summarize_contacts(&cx_b).current,
5541 &["user_a", "user_b"]
5542 );
5543 assert!(client_b
5544 .summarize_contacts(&cx_b)
5545 .incoming_requests
5546 .is_empty());
5547 assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5548 assert!(client_c
5549 .summarize_contacts(&cx_c)
5550 .outgoing_requests
5551 .is_empty());
5552
5553 async fn disconnect_and_reconnect(client: &TestClient, cx: &mut TestAppContext) {
5554 client.disconnect(&cx.to_async()).unwrap();
5555 client.clear_contacts(cx).await;
5556 client
5557 .authenticate_and_connect(false, &cx.to_async())
5558 .await
5559 .unwrap();
5560 }
5561 }
5562
5563 #[gpui::test(iterations = 10)]
5564 async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5565 cx_a.foreground().forbid_parking();
5566 let fs = FakeFs::new(cx_a.background());
5567
5568 // 2 clients connect to a server.
5569 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5570 let mut client_a = server.create_client(cx_a, "user_a").await;
5571 let mut client_b = server.create_client(cx_b, "user_b").await;
5572 server
5573 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5574 .await;
5575 cx_a.update(editor::init);
5576 cx_b.update(editor::init);
5577
5578 // Client A shares a project.
5579 fs.insert_tree(
5580 "/a",
5581 json!({
5582 "1.txt": "one",
5583 "2.txt": "two",
5584 "3.txt": "three",
5585 }),
5586 )
5587 .await;
5588 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5589
5590 // Client B joins the project.
5591 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5592
5593 // Client A opens some editors.
5594 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5595 let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5596 let editor_a1 = workspace_a
5597 .update(cx_a, |workspace, cx| {
5598 workspace.open_path((worktree_id, "1.txt"), true, cx)
5599 })
5600 .await
5601 .unwrap()
5602 .downcast::<Editor>()
5603 .unwrap();
5604 let editor_a2 = workspace_a
5605 .update(cx_a, |workspace, cx| {
5606 workspace.open_path((worktree_id, "2.txt"), true, cx)
5607 })
5608 .await
5609 .unwrap()
5610 .downcast::<Editor>()
5611 .unwrap();
5612
5613 // Client B opens an editor.
5614 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5615 let editor_b1 = workspace_b
5616 .update(cx_b, |workspace, cx| {
5617 workspace.open_path((worktree_id, "1.txt"), true, cx)
5618 })
5619 .await
5620 .unwrap()
5621 .downcast::<Editor>()
5622 .unwrap();
5623
5624 let client_a_id = project_b.read_with(cx_b, |project, _| {
5625 project.collaborators().values().next().unwrap().peer_id
5626 });
5627 let client_b_id = project_a.read_with(cx_a, |project, _| {
5628 project.collaborators().values().next().unwrap().peer_id
5629 });
5630
5631 // When client B starts following client A, all visible view states are replicated to client B.
5632 editor_a1.update(cx_a, |editor, cx| {
5633 editor.change_selections(None, cx, |s| s.select_ranges([0..1]))
5634 });
5635 editor_a2.update(cx_a, |editor, cx| {
5636 editor.change_selections(None, cx, |s| s.select_ranges([2..3]))
5637 });
5638 workspace_b
5639 .update(cx_b, |workspace, cx| {
5640 workspace
5641 .toggle_follow(&ToggleFollow(client_a_id), cx)
5642 .unwrap()
5643 })
5644 .await
5645 .unwrap();
5646
5647 let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5648 workspace
5649 .active_item(cx)
5650 .unwrap()
5651 .downcast::<Editor>()
5652 .unwrap()
5653 });
5654 assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
5655 assert_eq!(
5656 editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
5657 Some((worktree_id, "2.txt").into())
5658 );
5659 assert_eq!(
5660 editor_b2.read_with(cx_b, |editor, cx| editor.selections.ranges(cx)),
5661 vec![2..3]
5662 );
5663 assert_eq!(
5664 editor_b1.read_with(cx_b, |editor, cx| editor.selections.ranges(cx)),
5665 vec![0..1]
5666 );
5667
5668 // When client A activates a different editor, client B does so as well.
5669 workspace_a.update(cx_a, |workspace, cx| {
5670 workspace.activate_item(&editor_a1, cx)
5671 });
5672 workspace_b
5673 .condition(cx_b, |workspace, cx| {
5674 workspace.active_item(cx).unwrap().id() == editor_b1.id()
5675 })
5676 .await;
5677
5678 // When client A navigates back and forth, client B does so as well.
5679 workspace_a
5680 .update(cx_a, |workspace, cx| {
5681 workspace::Pane::go_back(workspace, None, cx)
5682 })
5683 .await;
5684 workspace_b
5685 .condition(cx_b, |workspace, cx| {
5686 workspace.active_item(cx).unwrap().id() == editor_b2.id()
5687 })
5688 .await;
5689
5690 workspace_a
5691 .update(cx_a, |workspace, cx| {
5692 workspace::Pane::go_forward(workspace, None, cx)
5693 })
5694 .await;
5695 workspace_b
5696 .condition(cx_b, |workspace, cx| {
5697 workspace.active_item(cx).unwrap().id() == editor_b1.id()
5698 })
5699 .await;
5700
5701 // Changes to client A's editor are reflected on client B.
5702 editor_a1.update(cx_a, |editor, cx| {
5703 editor.change_selections(None, cx, |s| s.select_ranges([1..1, 2..2]));
5704 });
5705 editor_b1
5706 .condition(cx_b, |editor, cx| {
5707 editor.selections.ranges(cx) == vec![1..1, 2..2]
5708 })
5709 .await;
5710
5711 editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
5712 editor_b1
5713 .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
5714 .await;
5715
5716 editor_a1.update(cx_a, |editor, cx| {
5717 editor.change_selections(None, cx, |s| s.select_ranges([3..3]));
5718 editor.set_scroll_position(vec2f(0., 100.), cx);
5719 });
5720 editor_b1
5721 .condition(cx_b, |editor, cx| {
5722 editor.selections.ranges(cx) == vec![3..3]
5723 })
5724 .await;
5725
5726 // After unfollowing, client B stops receiving updates from client A.
5727 workspace_b.update(cx_b, |workspace, cx| {
5728 workspace.unfollow(&workspace.active_pane().clone(), cx)
5729 });
5730 workspace_a.update(cx_a, |workspace, cx| {
5731 workspace.activate_item(&editor_a2, cx)
5732 });
5733 cx_a.foreground().run_until_parked();
5734 assert_eq!(
5735 workspace_b.read_with(cx_b, |workspace, cx| workspace
5736 .active_item(cx)
5737 .unwrap()
5738 .id()),
5739 editor_b1.id()
5740 );
5741
5742 // Client A starts following client B.
5743 workspace_a
5744 .update(cx_a, |workspace, cx| {
5745 workspace
5746 .toggle_follow(&ToggleFollow(client_b_id), cx)
5747 .unwrap()
5748 })
5749 .await
5750 .unwrap();
5751 assert_eq!(
5752 workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5753 Some(client_b_id)
5754 );
5755 assert_eq!(
5756 workspace_a.read_with(cx_a, |workspace, cx| workspace
5757 .active_item(cx)
5758 .unwrap()
5759 .id()),
5760 editor_a1.id()
5761 );
5762
5763 // Following interrupts when client B disconnects.
5764 client_b.disconnect(&cx_b.to_async()).unwrap();
5765 cx_a.foreground().run_until_parked();
5766 assert_eq!(
5767 workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5768 None
5769 );
5770 }
5771
5772 #[gpui::test(iterations = 10)]
5773 async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5774 cx_a.foreground().forbid_parking();
5775 let fs = FakeFs::new(cx_a.background());
5776
5777 // 2 clients connect to a server.
5778 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5779 let mut client_a = server.create_client(cx_a, "user_a").await;
5780 let mut client_b = server.create_client(cx_b, "user_b").await;
5781 server
5782 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5783 .await;
5784 cx_a.update(editor::init);
5785 cx_b.update(editor::init);
5786
5787 // Client A shares a project.
5788 fs.insert_tree(
5789 "/a",
5790 json!({
5791 "1.txt": "one",
5792 "2.txt": "two",
5793 "3.txt": "three",
5794 "4.txt": "four",
5795 }),
5796 )
5797 .await;
5798 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5799
5800 // Client B joins the project.
5801 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5802
5803 // Client A opens some editors.
5804 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5805 let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5806 let _editor_a1 = workspace_a
5807 .update(cx_a, |workspace, cx| {
5808 workspace.open_path((worktree_id, "1.txt"), true, cx)
5809 })
5810 .await
5811 .unwrap()
5812 .downcast::<Editor>()
5813 .unwrap();
5814
5815 // Client B opens an editor.
5816 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5817 let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5818 let _editor_b1 = workspace_b
5819 .update(cx_b, |workspace, cx| {
5820 workspace.open_path((worktree_id, "2.txt"), true, cx)
5821 })
5822 .await
5823 .unwrap()
5824 .downcast::<Editor>()
5825 .unwrap();
5826
5827 // Clients A and B follow each other in split panes
5828 workspace_a
5829 .update(cx_a, |workspace, cx| {
5830 workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5831 assert_ne!(*workspace.active_pane(), pane_a1);
5832 let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
5833 workspace
5834 .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5835 .unwrap()
5836 })
5837 .await
5838 .unwrap();
5839 workspace_b
5840 .update(cx_b, |workspace, cx| {
5841 workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5842 assert_ne!(*workspace.active_pane(), pane_b1);
5843 let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
5844 workspace
5845 .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5846 .unwrap()
5847 })
5848 .await
5849 .unwrap();
5850
5851 workspace_a
5852 .update(cx_a, |workspace, cx| {
5853 workspace.activate_next_pane(cx);
5854 assert_eq!(*workspace.active_pane(), pane_a1);
5855 workspace.open_path((worktree_id, "3.txt"), true, cx)
5856 })
5857 .await
5858 .unwrap();
5859 workspace_b
5860 .update(cx_b, |workspace, cx| {
5861 workspace.activate_next_pane(cx);
5862 assert_eq!(*workspace.active_pane(), pane_b1);
5863 workspace.open_path((worktree_id, "4.txt"), true, cx)
5864 })
5865 .await
5866 .unwrap();
5867 cx_a.foreground().run_until_parked();
5868
5869 // Ensure leader updates don't change the active pane of followers
5870 workspace_a.read_with(cx_a, |workspace, _| {
5871 assert_eq!(*workspace.active_pane(), pane_a1);
5872 });
5873 workspace_b.read_with(cx_b, |workspace, _| {
5874 assert_eq!(*workspace.active_pane(), pane_b1);
5875 });
5876
5877 // Ensure peers following each other doesn't cause an infinite loop.
5878 assert_eq!(
5879 workspace_a.read_with(cx_a, |workspace, cx| workspace
5880 .active_item(cx)
5881 .unwrap()
5882 .project_path(cx)),
5883 Some((worktree_id, "3.txt").into())
5884 );
5885 workspace_a.update(cx_a, |workspace, cx| {
5886 assert_eq!(
5887 workspace.active_item(cx).unwrap().project_path(cx),
5888 Some((worktree_id, "3.txt").into())
5889 );
5890 workspace.activate_next_pane(cx);
5891 assert_eq!(
5892 workspace.active_item(cx).unwrap().project_path(cx),
5893 Some((worktree_id, "4.txt").into())
5894 );
5895 });
5896 workspace_b.update(cx_b, |workspace, cx| {
5897 assert_eq!(
5898 workspace.active_item(cx).unwrap().project_path(cx),
5899 Some((worktree_id, "4.txt").into())
5900 );
5901 workspace.activate_next_pane(cx);
5902 assert_eq!(
5903 workspace.active_item(cx).unwrap().project_path(cx),
5904 Some((worktree_id, "3.txt").into())
5905 );
5906 });
5907 }
5908
5909 #[gpui::test(iterations = 10)]
5910 async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5911 cx_a.foreground().forbid_parking();
5912 let fs = FakeFs::new(cx_a.background());
5913
5914 // 2 clients connect to a server.
5915 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5916 let mut client_a = server.create_client(cx_a, "user_a").await;
5917 let mut client_b = server.create_client(cx_b, "user_b").await;
5918 server
5919 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5920 .await;
5921 cx_a.update(editor::init);
5922 cx_b.update(editor::init);
5923
5924 // Client A shares a project.
5925 fs.insert_tree(
5926 "/a",
5927 json!({
5928 "1.txt": "one",
5929 "2.txt": "two",
5930 "3.txt": "three",
5931 }),
5932 )
5933 .await;
5934 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5935
5936 // Client B joins the project.
5937 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5938
5939 // Client A opens some editors.
5940 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5941 let _editor_a1 = workspace_a
5942 .update(cx_a, |workspace, cx| {
5943 workspace.open_path((worktree_id, "1.txt"), true, cx)
5944 })
5945 .await
5946 .unwrap()
5947 .downcast::<Editor>()
5948 .unwrap();
5949
5950 // Client B starts following client A.
5951 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5952 let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5953 let leader_id = project_b.read_with(cx_b, |project, _| {
5954 project.collaborators().values().next().unwrap().peer_id
5955 });
5956 workspace_b
5957 .update(cx_b, |workspace, cx| {
5958 workspace
5959 .toggle_follow(&ToggleFollow(leader_id), cx)
5960 .unwrap()
5961 })
5962 .await
5963 .unwrap();
5964 assert_eq!(
5965 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5966 Some(leader_id)
5967 );
5968 let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5969 workspace
5970 .active_item(cx)
5971 .unwrap()
5972 .downcast::<Editor>()
5973 .unwrap()
5974 });
5975
5976 // When client B moves, it automatically stops following client A.
5977 editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
5978 assert_eq!(
5979 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5980 None
5981 );
5982
5983 workspace_b
5984 .update(cx_b, |workspace, cx| {
5985 workspace
5986 .toggle_follow(&ToggleFollow(leader_id), cx)
5987 .unwrap()
5988 })
5989 .await
5990 .unwrap();
5991 assert_eq!(
5992 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5993 Some(leader_id)
5994 );
5995
5996 // When client B edits, it automatically stops following client A.
5997 editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
5998 assert_eq!(
5999 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6000 None
6001 );
6002
6003 workspace_b
6004 .update(cx_b, |workspace, cx| {
6005 workspace
6006 .toggle_follow(&ToggleFollow(leader_id), cx)
6007 .unwrap()
6008 })
6009 .await
6010 .unwrap();
6011 assert_eq!(
6012 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6013 Some(leader_id)
6014 );
6015
6016 // When client B scrolls, it automatically stops following client A.
6017 editor_b2.update(cx_b, |editor, cx| {
6018 editor.set_scroll_position(vec2f(0., 3.), cx)
6019 });
6020 assert_eq!(
6021 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6022 None
6023 );
6024
6025 workspace_b
6026 .update(cx_b, |workspace, cx| {
6027 workspace
6028 .toggle_follow(&ToggleFollow(leader_id), cx)
6029 .unwrap()
6030 })
6031 .await
6032 .unwrap();
6033 assert_eq!(
6034 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6035 Some(leader_id)
6036 );
6037
6038 // When client B activates a different pane, it continues following client A in the original pane.
6039 workspace_b.update(cx_b, |workspace, cx| {
6040 workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
6041 });
6042 assert_eq!(
6043 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6044 Some(leader_id)
6045 );
6046
6047 workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
6048 assert_eq!(
6049 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6050 Some(leader_id)
6051 );
6052
6053 // When client B activates a different item in the original pane, it automatically stops following client A.
6054 workspace_b
6055 .update(cx_b, |workspace, cx| {
6056 workspace.open_path((worktree_id, "2.txt"), true, cx)
6057 })
6058 .await
6059 .unwrap();
6060 assert_eq!(
6061 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6062 None
6063 );
6064 }
6065
6066 #[gpui::test(iterations = 100)]
6067 async fn test_random_collaboration(
6068 cx: &mut TestAppContext,
6069 deterministic: Arc<Deterministic>,
6070 rng: StdRng,
6071 ) {
6072 cx.foreground().forbid_parking();
6073 let max_peers = env::var("MAX_PEERS")
6074 .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
6075 .unwrap_or(5);
6076 assert!(max_peers <= 5);
6077
6078 let max_operations = env::var("OPERATIONS")
6079 .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
6080 .unwrap_or(10);
6081
6082 let rng = Arc::new(Mutex::new(rng));
6083
6084 let guest_lang_registry = Arc::new(LanguageRegistry::test());
6085 let host_language_registry = Arc::new(LanguageRegistry::test());
6086
6087 let fs = FakeFs::new(cx.background());
6088 fs.insert_tree("/_collab", json!({"init": ""})).await;
6089
6090 let mut server = TestServer::start(cx.foreground(), cx.background()).await;
6091 let db = server.app_state.db.clone();
6092 let host_user_id = db.create_user("host", None, false).await.unwrap();
6093 for username in ["guest-1", "guest-2", "guest-3", "guest-4"] {
6094 let guest_user_id = db.create_user(username, None, false).await.unwrap();
6095 server
6096 .app_state
6097 .db
6098 .send_contact_request(guest_user_id, host_user_id)
6099 .await
6100 .unwrap();
6101 server
6102 .app_state
6103 .db
6104 .respond_to_contact_request(host_user_id, guest_user_id, true)
6105 .await
6106 .unwrap();
6107 }
6108
6109 let mut clients = Vec::new();
6110 let mut user_ids = Vec::new();
6111 let mut op_start_signals = Vec::new();
6112
6113 let mut next_entity_id = 100000;
6114 let mut host_cx = TestAppContext::new(
6115 cx.foreground_platform(),
6116 cx.platform(),
6117 deterministic.build_foreground(next_entity_id),
6118 deterministic.build_background(),
6119 cx.font_cache(),
6120 cx.leak_detector(),
6121 next_entity_id,
6122 );
6123 let host = server.create_client(&mut host_cx, "host").await;
6124 let host_project = host_cx.update(|cx| {
6125 Project::local(
6126 host.client.clone(),
6127 host.user_store.clone(),
6128 host_language_registry.clone(),
6129 fs.clone(),
6130 cx,
6131 )
6132 });
6133 let host_project_id = host_project
6134 .update(&mut host_cx, |p, _| p.next_remote_id())
6135 .await;
6136
6137 let (collab_worktree, _) = host_project
6138 .update(&mut host_cx, |project, cx| {
6139 project.find_or_create_local_worktree("/_collab", true, cx)
6140 })
6141 .await
6142 .unwrap();
6143 collab_worktree
6144 .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
6145 .await;
6146
6147 // Set up fake language servers.
6148 let mut language = Language::new(
6149 LanguageConfig {
6150 name: "Rust".into(),
6151 path_suffixes: vec!["rs".to_string()],
6152 ..Default::default()
6153 },
6154 None,
6155 );
6156 let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
6157 name: "the-fake-language-server",
6158 capabilities: lsp::LanguageServer::full_capabilities(),
6159 initializer: Some(Box::new({
6160 let rng = rng.clone();
6161 let fs = fs.clone();
6162 let project = host_project.downgrade();
6163 move |fake_server: &mut FakeLanguageServer| {
6164 fake_server.handle_request::<lsp::request::Completion, _, _>(
6165 |_, _| async move {
6166 Ok(Some(lsp::CompletionResponse::Array(vec![
6167 lsp::CompletionItem {
6168 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
6169 range: lsp::Range::new(
6170 lsp::Position::new(0, 0),
6171 lsp::Position::new(0, 0),
6172 ),
6173 new_text: "the-new-text".to_string(),
6174 })),
6175 ..Default::default()
6176 },
6177 ])))
6178 },
6179 );
6180
6181 fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
6182 |_, _| async move {
6183 Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
6184 lsp::CodeAction {
6185 title: "the-code-action".to_string(),
6186 ..Default::default()
6187 },
6188 )]))
6189 },
6190 );
6191
6192 fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
6193 |params, _| async move {
6194 Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
6195 params.position,
6196 params.position,
6197 ))))
6198 },
6199 );
6200
6201 fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
6202 let fs = fs.clone();
6203 let rng = rng.clone();
6204 move |_, _| {
6205 let fs = fs.clone();
6206 let rng = rng.clone();
6207 async move {
6208 let files = fs.files().await;
6209 let mut rng = rng.lock();
6210 let count = rng.gen_range::<usize, _>(1..3);
6211 let files = (0..count)
6212 .map(|_| files.choose(&mut *rng).unwrap())
6213 .collect::<Vec<_>>();
6214 log::info!("LSP: Returning definitions in files {:?}", &files);
6215 Ok(Some(lsp::GotoDefinitionResponse::Array(
6216 files
6217 .into_iter()
6218 .map(|file| lsp::Location {
6219 uri: lsp::Url::from_file_path(file).unwrap(),
6220 range: Default::default(),
6221 })
6222 .collect(),
6223 )))
6224 }
6225 }
6226 });
6227
6228 fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
6229 let rng = rng.clone();
6230 let project = project.clone();
6231 move |params, mut cx| {
6232 let highlights = if let Some(project) = project.upgrade(&cx) {
6233 project.update(&mut cx, |project, cx| {
6234 let path = params
6235 .text_document_position_params
6236 .text_document
6237 .uri
6238 .to_file_path()
6239 .unwrap();
6240 let (worktree, relative_path) =
6241 project.find_local_worktree(&path, cx)?;
6242 let project_path =
6243 ProjectPath::from((worktree.read(cx).id(), relative_path));
6244 let buffer =
6245 project.get_open_buffer(&project_path, cx)?.read(cx);
6246
6247 let mut highlights = Vec::new();
6248 let highlight_count = rng.lock().gen_range(1..=5);
6249 let mut prev_end = 0;
6250 for _ in 0..highlight_count {
6251 let range =
6252 buffer.random_byte_range(prev_end, &mut *rng.lock());
6253
6254 highlights.push(lsp::DocumentHighlight {
6255 range: range_to_lsp(range.to_point_utf16(buffer)),
6256 kind: Some(lsp::DocumentHighlightKind::READ),
6257 });
6258 prev_end = range.end;
6259 }
6260 Some(highlights)
6261 })
6262 } else {
6263 None
6264 };
6265 async move { Ok(highlights) }
6266 }
6267 });
6268 }
6269 })),
6270 ..Default::default()
6271 });
6272 host_language_registry.add(Arc::new(language));
6273
6274 let op_start_signal = futures::channel::mpsc::unbounded();
6275 user_ids.push(host.current_user_id(&host_cx));
6276 op_start_signals.push(op_start_signal.0);
6277 clients.push(host_cx.foreground().spawn(host.simulate_host(
6278 host_project,
6279 op_start_signal.1,
6280 rng.clone(),
6281 host_cx,
6282 )));
6283
6284 let disconnect_host_at = if rng.lock().gen_bool(0.2) {
6285 rng.lock().gen_range(0..max_operations)
6286 } else {
6287 max_operations
6288 };
6289 let mut available_guests = vec![
6290 "guest-1".to_string(),
6291 "guest-2".to_string(),
6292 "guest-3".to_string(),
6293 "guest-4".to_string(),
6294 ];
6295 let mut operations = 0;
6296 while operations < max_operations {
6297 if operations == disconnect_host_at {
6298 server.disconnect_client(user_ids[0]);
6299 cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6300 drop(op_start_signals);
6301 let mut clients = futures::future::join_all(clients).await;
6302 cx.foreground().run_until_parked();
6303
6304 let (host, mut host_cx, host_err) = clients.remove(0);
6305 if let Some(host_err) = host_err {
6306 log::error!("host error - {:?}", host_err);
6307 }
6308 host.project
6309 .as_ref()
6310 .unwrap()
6311 .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
6312 for (guest, mut guest_cx, guest_err) in clients {
6313 if let Some(guest_err) = guest_err {
6314 log::error!("{} error - {:?}", guest.username, guest_err);
6315 }
6316
6317 let contacts = server
6318 .app_state
6319 .db
6320 .get_contacts(guest.current_user_id(&guest_cx))
6321 .await
6322 .unwrap();
6323 let contacts = server
6324 .store
6325 .read()
6326 .await
6327 .build_initial_contacts_update(contacts)
6328 .contacts;
6329 assert!(!contacts
6330 .iter()
6331 .flat_map(|contact| &contact.projects)
6332 .any(|project| project.id == host_project_id));
6333 guest
6334 .project
6335 .as_ref()
6336 .unwrap()
6337 .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6338 guest_cx.update(|_| drop(guest));
6339 }
6340 host_cx.update(|_| drop(host));
6341
6342 return;
6343 }
6344
6345 let distribution = rng.lock().gen_range(0..100);
6346 match distribution {
6347 0..=19 if !available_guests.is_empty() => {
6348 let guest_ix = rng.lock().gen_range(0..available_guests.len());
6349 let guest_username = available_guests.remove(guest_ix);
6350 log::info!("Adding new connection for {}", guest_username);
6351 next_entity_id += 100000;
6352 let mut guest_cx = TestAppContext::new(
6353 cx.foreground_platform(),
6354 cx.platform(),
6355 deterministic.build_foreground(next_entity_id),
6356 deterministic.build_background(),
6357 cx.font_cache(),
6358 cx.leak_detector(),
6359 next_entity_id,
6360 );
6361 let guest = server.create_client(&mut guest_cx, &guest_username).await;
6362 let guest_project = Project::remote(
6363 host_project_id,
6364 guest.client.clone(),
6365 guest.user_store.clone(),
6366 guest_lang_registry.clone(),
6367 FakeFs::new(cx.background()),
6368 &mut guest_cx.to_async(),
6369 )
6370 .await
6371 .unwrap();
6372 let op_start_signal = futures::channel::mpsc::unbounded();
6373 user_ids.push(guest.current_user_id(&guest_cx));
6374 op_start_signals.push(op_start_signal.0);
6375 clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
6376 guest_username.clone(),
6377 guest_project,
6378 op_start_signal.1,
6379 rng.clone(),
6380 guest_cx,
6381 )));
6382
6383 log::info!("Added connection for {}", guest_username);
6384 operations += 1;
6385 }
6386 20..=29 if clients.len() > 1 => {
6387 let guest_ix = rng.lock().gen_range(1..clients.len());
6388 log::info!("Removing guest {}", user_ids[guest_ix]);
6389 let removed_guest_id = user_ids.remove(guest_ix);
6390 let guest = clients.remove(guest_ix);
6391 op_start_signals.remove(guest_ix);
6392 server.forbid_connections();
6393 server.disconnect_client(removed_guest_id);
6394 cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6395 let (guest, mut guest_cx, guest_err) = guest.await;
6396 server.allow_connections();
6397
6398 if let Some(guest_err) = guest_err {
6399 log::error!("{} error - {:?}", guest.username, guest_err);
6400 }
6401 guest
6402 .project
6403 .as_ref()
6404 .unwrap()
6405 .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6406 for user_id in &user_ids {
6407 let contacts = server.app_state.db.get_contacts(*user_id).await.unwrap();
6408 let contacts = server
6409 .store
6410 .read()
6411 .await
6412 .build_initial_contacts_update(contacts)
6413 .contacts;
6414 for contact in contacts {
6415 if contact.online {
6416 assert_ne!(
6417 contact.user_id, removed_guest_id.0 as u64,
6418 "removed guest is still a contact of another peer"
6419 );
6420 }
6421 for project in contact.projects {
6422 for project_guest_id in project.guests {
6423 assert_ne!(
6424 project_guest_id, removed_guest_id.0 as u64,
6425 "removed guest appears as still participating on a project"
6426 );
6427 }
6428 }
6429 }
6430 }
6431
6432 log::info!("{} removed", guest.username);
6433 available_guests.push(guest.username.clone());
6434 guest_cx.update(|_| drop(guest));
6435
6436 operations += 1;
6437 }
6438 _ => {
6439 while operations < max_operations && rng.lock().gen_bool(0.7) {
6440 op_start_signals
6441 .choose(&mut *rng.lock())
6442 .unwrap()
6443 .unbounded_send(())
6444 .unwrap();
6445 operations += 1;
6446 }
6447
6448 if rng.lock().gen_bool(0.8) {
6449 cx.foreground().run_until_parked();
6450 }
6451 }
6452 }
6453 }
6454
6455 drop(op_start_signals);
6456 let mut clients = futures::future::join_all(clients).await;
6457 cx.foreground().run_until_parked();
6458
6459 let (host_client, mut host_cx, host_err) = clients.remove(0);
6460 if let Some(host_err) = host_err {
6461 panic!("host error - {:?}", host_err);
6462 }
6463 let host_project = host_client.project.as_ref().unwrap();
6464 let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
6465 project
6466 .worktrees(cx)
6467 .map(|worktree| {
6468 let snapshot = worktree.read(cx).snapshot();
6469 (snapshot.id(), snapshot)
6470 })
6471 .collect::<BTreeMap<_, _>>()
6472 });
6473
6474 host_client
6475 .project
6476 .as_ref()
6477 .unwrap()
6478 .read_with(&host_cx, |project, cx| project.check_invariants(cx));
6479
6480 for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
6481 if let Some(guest_err) = guest_err {
6482 panic!("{} error - {:?}", guest_client.username, guest_err);
6483 }
6484 let worktree_snapshots =
6485 guest_client
6486 .project
6487 .as_ref()
6488 .unwrap()
6489 .read_with(&guest_cx, |project, cx| {
6490 project
6491 .worktrees(cx)
6492 .map(|worktree| {
6493 let worktree = worktree.read(cx);
6494 (worktree.id(), worktree.snapshot())
6495 })
6496 .collect::<BTreeMap<_, _>>()
6497 });
6498
6499 assert_eq!(
6500 worktree_snapshots.keys().collect::<Vec<_>>(),
6501 host_worktree_snapshots.keys().collect::<Vec<_>>(),
6502 "{} has different worktrees than the host",
6503 guest_client.username
6504 );
6505 for (id, host_snapshot) in &host_worktree_snapshots {
6506 let guest_snapshot = &worktree_snapshots[id];
6507 assert_eq!(
6508 guest_snapshot.root_name(),
6509 host_snapshot.root_name(),
6510 "{} has different root name than the host for worktree {}",
6511 guest_client.username,
6512 id
6513 );
6514 assert_eq!(
6515 guest_snapshot.entries(false).collect::<Vec<_>>(),
6516 host_snapshot.entries(false).collect::<Vec<_>>(),
6517 "{} has different snapshot than the host for worktree {}",
6518 guest_client.username,
6519 id
6520 );
6521 assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id());
6522 }
6523
6524 guest_client
6525 .project
6526 .as_ref()
6527 .unwrap()
6528 .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
6529
6530 for guest_buffer in &guest_client.buffers {
6531 let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
6532 let host_buffer = host_project.read_with(&host_cx, |project, cx| {
6533 project.buffer_for_id(buffer_id, cx).expect(&format!(
6534 "host does not have buffer for guest:{}, peer:{}, id:{}",
6535 guest_client.username, guest_client.peer_id, buffer_id
6536 ))
6537 });
6538 let path = host_buffer
6539 .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
6540
6541 assert_eq!(
6542 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
6543 0,
6544 "{}, buffer {}, path {:?} has deferred operations",
6545 guest_client.username,
6546 buffer_id,
6547 path,
6548 );
6549 assert_eq!(
6550 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
6551 host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
6552 "{}, buffer {}, path {:?}, differs from the host's buffer",
6553 guest_client.username,
6554 buffer_id,
6555 path
6556 );
6557 }
6558
6559 guest_cx.update(|_| drop(guest_client));
6560 }
6561
6562 host_cx.update(|_| drop(host_client));
6563 }
6564
6565 struct TestServer {
6566 peer: Arc<Peer>,
6567 app_state: Arc<AppState>,
6568 server: Arc<Server>,
6569 foreground: Rc<executor::Foreground>,
6570 notifications: mpsc::UnboundedReceiver<()>,
6571 connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
6572 forbid_connections: Arc<AtomicBool>,
6573 _test_db: TestDb,
6574 }
6575
6576 impl TestServer {
6577 async fn start(
6578 foreground: Rc<executor::Foreground>,
6579 background: Arc<executor::Background>,
6580 ) -> Self {
6581 let test_db = TestDb::fake(background);
6582 let app_state = Self::build_app_state(&test_db).await;
6583 let peer = Peer::new();
6584 let notifications = mpsc::unbounded();
6585 let server = Server::new(app_state.clone(), Some(notifications.0));
6586 Self {
6587 peer,
6588 app_state,
6589 server,
6590 foreground,
6591 notifications: notifications.1,
6592 connection_killers: Default::default(),
6593 forbid_connections: Default::default(),
6594 _test_db: test_db,
6595 }
6596 }
6597
6598 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
6599 cx.update(|cx| {
6600 let settings = Settings::test(cx);
6601 cx.set_global(settings);
6602 });
6603
6604 let http = FakeHttpClient::with_404_response();
6605 let user_id =
6606 if let Ok(Some(user)) = self.app_state.db.get_user_by_github_login(name).await {
6607 user.id
6608 } else {
6609 self.app_state.db.create_user(name, None, false).await.unwrap()
6610 };
6611 let client_name = name.to_string();
6612 let mut client = Client::new(http.clone());
6613 let server = self.server.clone();
6614 let db = self.app_state.db.clone();
6615 let connection_killers = self.connection_killers.clone();
6616 let forbid_connections = self.forbid_connections.clone();
6617 let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
6618
6619 Arc::get_mut(&mut client)
6620 .unwrap()
6621 .override_authenticate(move |cx| {
6622 cx.spawn(|_| async move {
6623 let access_token = "the-token".to_string();
6624 Ok(Credentials {
6625 user_id: user_id.0 as u64,
6626 access_token,
6627 })
6628 })
6629 })
6630 .override_establish_connection(move |credentials, cx| {
6631 assert_eq!(credentials.user_id, user_id.0 as u64);
6632 assert_eq!(credentials.access_token, "the-token");
6633
6634 let server = server.clone();
6635 let db = db.clone();
6636 let connection_killers = connection_killers.clone();
6637 let forbid_connections = forbid_connections.clone();
6638 let client_name = client_name.clone();
6639 let connection_id_tx = connection_id_tx.clone();
6640 cx.spawn(move |cx| async move {
6641 if forbid_connections.load(SeqCst) {
6642 Err(EstablishConnectionError::other(anyhow!(
6643 "server is forbidding connections"
6644 )))
6645 } else {
6646 let (client_conn, server_conn, killed) =
6647 Connection::in_memory(cx.background());
6648 connection_killers.lock().insert(user_id, killed);
6649 let user = db.get_user_by_id(user_id).await.unwrap().unwrap();
6650 cx.background()
6651 .spawn(server.handle_connection(
6652 server_conn,
6653 client_name,
6654 user,
6655 Some(connection_id_tx),
6656 cx.background(),
6657 ))
6658 .detach();
6659 Ok(client_conn)
6660 }
6661 })
6662 });
6663
6664 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
6665 let app_state = Arc::new(workspace::AppState {
6666 client: client.clone(),
6667 user_store: user_store.clone(),
6668 languages: Arc::new(LanguageRegistry::new(Task::ready(()))),
6669 themes: ThemeRegistry::new((), cx.font_cache()),
6670 fs: FakeFs::new(cx.background()),
6671 build_window_options: || Default::default(),
6672 initialize_workspace: |_, _, _| unimplemented!(),
6673 });
6674
6675 Channel::init(&client);
6676 Project::init(&client);
6677 cx.update(|cx| workspace::init(app_state.clone(), cx));
6678
6679 client
6680 .authenticate_and_connect(false, &cx.to_async())
6681 .await
6682 .unwrap();
6683 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
6684
6685 let client = TestClient {
6686 client,
6687 peer_id,
6688 username: name.to_string(),
6689 user_store,
6690 language_registry: Arc::new(LanguageRegistry::test()),
6691 project: Default::default(),
6692 buffers: Default::default(),
6693 };
6694 client.wait_for_current_user(cx).await;
6695 client
6696 }
6697
6698 fn disconnect_client(&self, user_id: UserId) {
6699 self.connection_killers
6700 .lock()
6701 .remove(&user_id)
6702 .unwrap()
6703 .store(true, SeqCst);
6704 }
6705
6706 fn forbid_connections(&self) {
6707 self.forbid_connections.store(true, SeqCst);
6708 }
6709
6710 fn allow_connections(&self) {
6711 self.forbid_connections.store(false, SeqCst);
6712 }
6713
6714 async fn make_contacts(&self, mut clients: Vec<(&TestClient, &mut TestAppContext)>) {
6715 while let Some((client_a, cx_a)) = clients.pop() {
6716 for (client_b, cx_b) in &mut clients {
6717 client_a
6718 .user_store
6719 .update(cx_a, |store, cx| {
6720 store.request_contact(client_b.user_id().unwrap(), cx)
6721 })
6722 .await
6723 .unwrap();
6724 cx_a.foreground().run_until_parked();
6725 client_b
6726 .user_store
6727 .update(*cx_b, |store, cx| {
6728 store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
6729 })
6730 .await
6731 .unwrap();
6732 }
6733 }
6734 }
6735
6736 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
6737 Arc::new(AppState {
6738 db: test_db.db().clone(),
6739 api_token: Default::default(),
6740 invite_link_prefix: Default::default(),
6741 })
6742 }
6743
6744 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
6745 self.server.store.read().await
6746 }
6747
6748 async fn condition<F>(&mut self, mut predicate: F)
6749 where
6750 F: FnMut(&Store) -> bool,
6751 {
6752 assert!(
6753 self.foreground.parking_forbidden(),
6754 "you must call forbid_parking to use server conditions so we don't block indefinitely"
6755 );
6756 while !(predicate)(&*self.server.store.read().await) {
6757 self.foreground.start_waiting();
6758 self.notifications.next().await;
6759 self.foreground.finish_waiting();
6760 }
6761 }
6762 }
6763
6764 impl Deref for TestServer {
6765 type Target = Server;
6766
6767 fn deref(&self) -> &Self::Target {
6768 &self.server
6769 }
6770 }
6771
6772 impl Drop for TestServer {
6773 fn drop(&mut self) {
6774 self.peer.reset();
6775 }
6776 }
6777
6778 struct TestClient {
6779 client: Arc<Client>,
6780 username: String,
6781 pub peer_id: PeerId,
6782 pub user_store: ModelHandle<UserStore>,
6783 language_registry: Arc<LanguageRegistry>,
6784 project: Option<ModelHandle<Project>>,
6785 buffers: HashSet<ModelHandle<language::Buffer>>,
6786 }
6787
6788 impl Deref for TestClient {
6789 type Target = Arc<Client>;
6790
6791 fn deref(&self) -> &Self::Target {
6792 &self.client
6793 }
6794 }
6795
6796 struct ContactsSummary {
6797 pub current: Vec<String>,
6798 pub outgoing_requests: Vec<String>,
6799 pub incoming_requests: Vec<String>,
6800 }
6801
6802 impl TestClient {
6803 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
6804 UserId::from_proto(
6805 self.user_store
6806 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
6807 )
6808 }
6809
6810 async fn wait_for_current_user(&self, cx: &TestAppContext) {
6811 let mut authed_user = self
6812 .user_store
6813 .read_with(cx, |user_store, _| user_store.watch_current_user());
6814 while authed_user.next().await.unwrap().is_none() {}
6815 }
6816
6817 async fn clear_contacts(&self, cx: &mut TestAppContext) {
6818 self.user_store
6819 .update(cx, |store, _| store.clear_contacts())
6820 .await;
6821 }
6822
6823 fn summarize_contacts(&self, cx: &TestAppContext) -> ContactsSummary {
6824 self.user_store.read_with(cx, |store, _| ContactsSummary {
6825 current: store
6826 .contacts()
6827 .iter()
6828 .map(|contact| contact.user.github_login.clone())
6829 .collect(),
6830 outgoing_requests: store
6831 .outgoing_contact_requests()
6832 .iter()
6833 .map(|user| user.github_login.clone())
6834 .collect(),
6835 incoming_requests: store
6836 .incoming_contact_requests()
6837 .iter()
6838 .map(|user| user.github_login.clone())
6839 .collect(),
6840 })
6841 }
6842
6843 async fn build_local_project(
6844 &mut self,
6845 fs: Arc<FakeFs>,
6846 root_path: impl AsRef<Path>,
6847 cx: &mut TestAppContext,
6848 ) -> (ModelHandle<Project>, WorktreeId) {
6849 let project = cx.update(|cx| {
6850 Project::local(
6851 self.client.clone(),
6852 self.user_store.clone(),
6853 self.language_registry.clone(),
6854 fs,
6855 cx,
6856 )
6857 });
6858 self.project = Some(project.clone());
6859 let (worktree, _) = project
6860 .update(cx, |p, cx| {
6861 p.find_or_create_local_worktree(root_path, true, cx)
6862 })
6863 .await
6864 .unwrap();
6865 worktree
6866 .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
6867 .await;
6868 project
6869 .update(cx, |project, _| project.next_remote_id())
6870 .await;
6871 (project, worktree.read_with(cx, |tree, _| tree.id()))
6872 }
6873
6874 async fn build_remote_project(
6875 &mut self,
6876 host_project: &ModelHandle<Project>,
6877 host_cx: &mut TestAppContext,
6878 guest_cx: &mut TestAppContext,
6879 ) -> ModelHandle<Project> {
6880 let host_project_id = host_project
6881 .read_with(host_cx, |project, _| project.next_remote_id())
6882 .await;
6883 let guest_user_id = self.user_id().unwrap();
6884 let languages =
6885 host_project.read_with(host_cx, |project, _| project.languages().clone());
6886 let project_b = guest_cx.spawn(|mut cx| {
6887 let user_store = self.user_store.clone();
6888 let guest_client = self.client.clone();
6889 async move {
6890 Project::remote(
6891 host_project_id,
6892 guest_client,
6893 user_store.clone(),
6894 languages,
6895 FakeFs::new(cx.background()),
6896 &mut cx,
6897 )
6898 .await
6899 .unwrap()
6900 }
6901 });
6902 host_cx.foreground().run_until_parked();
6903 host_project.update(host_cx, |project, cx| {
6904 project.respond_to_join_request(guest_user_id, true, cx)
6905 });
6906 let project = project_b.await;
6907 self.project = Some(project.clone());
6908 project
6909 }
6910
6911 fn build_workspace(
6912 &self,
6913 project: &ModelHandle<Project>,
6914 cx: &mut TestAppContext,
6915 ) -> ViewHandle<Workspace> {
6916 let (window_id, _) = cx.add_window(|_| EmptyView);
6917 cx.add_view(window_id, |cx| Workspace::new(project.clone(), cx))
6918 }
6919
6920 async fn simulate_host(
6921 mut self,
6922 project: ModelHandle<Project>,
6923 op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6924 rng: Arc<Mutex<StdRng>>,
6925 mut cx: TestAppContext,
6926 ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6927 async fn simulate_host_internal(
6928 client: &mut TestClient,
6929 project: ModelHandle<Project>,
6930 mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6931 rng: Arc<Mutex<StdRng>>,
6932 cx: &mut TestAppContext,
6933 ) -> anyhow::Result<()> {
6934 let fs = project.read_with(cx, |project, _| project.fs().clone());
6935
6936 cx.update(|cx| {
6937 cx.subscribe(&project, move |project, event, cx| {
6938 if let project::Event::ContactRequestedJoin(user) = event {
6939 log::info!("Host: accepting join request from {}", user.github_login);
6940 project.update(cx, |project, cx| {
6941 project.respond_to_join_request(user.id, true, cx)
6942 });
6943 }
6944 })
6945 .detach();
6946 });
6947
6948 while op_start_signal.next().await.is_some() {
6949 let distribution = rng.lock().gen_range::<usize, _>(0..100);
6950 let files = fs.as_fake().files().await;
6951 match distribution {
6952 0..=19 if !files.is_empty() => {
6953 let path = files.choose(&mut *rng.lock()).unwrap();
6954 let mut path = path.as_path();
6955 while let Some(parent_path) = path.parent() {
6956 path = parent_path;
6957 if rng.lock().gen() {
6958 break;
6959 }
6960 }
6961
6962 log::info!("Host: find/create local worktree {:?}", path);
6963 let find_or_create_worktree = project.update(cx, |project, cx| {
6964 project.find_or_create_local_worktree(path, true, cx)
6965 });
6966 if rng.lock().gen() {
6967 cx.background().spawn(find_or_create_worktree).detach();
6968 } else {
6969 find_or_create_worktree.await?;
6970 }
6971 }
6972 20..=79 if !files.is_empty() => {
6973 let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6974 let file = files.choose(&mut *rng.lock()).unwrap();
6975 let (worktree, path) = project
6976 .update(cx, |project, cx| {
6977 project.find_or_create_local_worktree(
6978 file.clone(),
6979 true,
6980 cx,
6981 )
6982 })
6983 .await?;
6984 let project_path =
6985 worktree.read_with(cx, |worktree, _| (worktree.id(), path));
6986 log::info!(
6987 "Host: opening path {:?}, worktree {}, relative_path {:?}",
6988 file,
6989 project_path.0,
6990 project_path.1
6991 );
6992 let buffer = project
6993 .update(cx, |project, cx| project.open_buffer(project_path, cx))
6994 .await
6995 .unwrap();
6996 client.buffers.insert(buffer.clone());
6997 buffer
6998 } else {
6999 client
7000 .buffers
7001 .iter()
7002 .choose(&mut *rng.lock())
7003 .unwrap()
7004 .clone()
7005 };
7006
7007 if rng.lock().gen_bool(0.1) {
7008 cx.update(|cx| {
7009 log::info!(
7010 "Host: dropping buffer {:?}",
7011 buffer.read(cx).file().unwrap().full_path(cx)
7012 );
7013 client.buffers.remove(&buffer);
7014 drop(buffer);
7015 });
7016 } else {
7017 buffer.update(cx, |buffer, cx| {
7018 log::info!(
7019 "Host: updating buffer {:?} ({})",
7020 buffer.file().unwrap().full_path(cx),
7021 buffer.remote_id()
7022 );
7023
7024 if rng.lock().gen_bool(0.7) {
7025 buffer.randomly_edit(&mut *rng.lock(), 5, cx);
7026 } else {
7027 buffer.randomly_undo_redo(&mut *rng.lock(), cx);
7028 }
7029 });
7030 }
7031 }
7032 _ => loop {
7033 let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
7034 let mut path = PathBuf::new();
7035 path.push("/");
7036 for _ in 0..path_component_count {
7037 let letter = rng.lock().gen_range(b'a'..=b'z');
7038 path.push(std::str::from_utf8(&[letter]).unwrap());
7039 }
7040 path.set_extension("rs");
7041 let parent_path = path.parent().unwrap();
7042
7043 log::info!("Host: creating file {:?}", path,);
7044
7045 if fs.create_dir(&parent_path).await.is_ok()
7046 && fs.create_file(&path, Default::default()).await.is_ok()
7047 {
7048 break;
7049 } else {
7050 log::info!("Host: cannot create file");
7051 }
7052 },
7053 }
7054
7055 cx.background().simulate_random_delay().await;
7056 }
7057
7058 Ok(())
7059 }
7060
7061 let result =
7062 simulate_host_internal(&mut self, project.clone(), op_start_signal, rng, &mut cx)
7063 .await;
7064 log::info!("Host done");
7065 self.project = Some(project);
7066 (self, cx, result.err())
7067 }
7068
7069 pub async fn simulate_guest(
7070 mut self,
7071 guest_username: String,
7072 project: ModelHandle<Project>,
7073 op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
7074 rng: Arc<Mutex<StdRng>>,
7075 mut cx: TestAppContext,
7076 ) -> (Self, TestAppContext, Option<anyhow::Error>) {
7077 async fn simulate_guest_internal(
7078 client: &mut TestClient,
7079 guest_username: &str,
7080 project: ModelHandle<Project>,
7081 mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
7082 rng: Arc<Mutex<StdRng>>,
7083 cx: &mut TestAppContext,
7084 ) -> anyhow::Result<()> {
7085 while op_start_signal.next().await.is_some() {
7086 let buffer = if client.buffers.is_empty() || rng.lock().gen() {
7087 let worktree = if let Some(worktree) =
7088 project.read_with(cx, |project, cx| {
7089 project
7090 .worktrees(&cx)
7091 .filter(|worktree| {
7092 let worktree = worktree.read(cx);
7093 worktree.is_visible()
7094 && worktree.entries(false).any(|e| e.is_file())
7095 })
7096 .choose(&mut *rng.lock())
7097 }) {
7098 worktree
7099 } else {
7100 cx.background().simulate_random_delay().await;
7101 continue;
7102 };
7103
7104 let (worktree_root_name, project_path) =
7105 worktree.read_with(cx, |worktree, _| {
7106 let entry = worktree
7107 .entries(false)
7108 .filter(|e| e.is_file())
7109 .choose(&mut *rng.lock())
7110 .unwrap();
7111 (
7112 worktree.root_name().to_string(),
7113 (worktree.id(), entry.path.clone()),
7114 )
7115 });
7116 log::info!(
7117 "{}: opening path {:?} in worktree {} ({})",
7118 guest_username,
7119 project_path.1,
7120 project_path.0,
7121 worktree_root_name,
7122 );
7123 let buffer = project
7124 .update(cx, |project, cx| {
7125 project.open_buffer(project_path.clone(), cx)
7126 })
7127 .await?;
7128 log::info!(
7129 "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
7130 guest_username,
7131 project_path.1,
7132 project_path.0,
7133 worktree_root_name,
7134 buffer.read_with(cx, |buffer, _| buffer.remote_id())
7135 );
7136 client.buffers.insert(buffer.clone());
7137 buffer
7138 } else {
7139 client
7140 .buffers
7141 .iter()
7142 .choose(&mut *rng.lock())
7143 .unwrap()
7144 .clone()
7145 };
7146
7147 let choice = rng.lock().gen_range(0..100);
7148 match choice {
7149 0..=9 => {
7150 cx.update(|cx| {
7151 log::info!(
7152 "{}: dropping buffer {:?}",
7153 guest_username,
7154 buffer.read(cx).file().unwrap().full_path(cx)
7155 );
7156 client.buffers.remove(&buffer);
7157 drop(buffer);
7158 });
7159 }
7160 10..=19 => {
7161 let completions = project.update(cx, |project, cx| {
7162 log::info!(
7163 "{}: requesting completions for buffer {} ({:?})",
7164 guest_username,
7165 buffer.read(cx).remote_id(),
7166 buffer.read(cx).file().unwrap().full_path(cx)
7167 );
7168 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7169 project.completions(&buffer, offset, cx)
7170 });
7171 let completions = cx.background().spawn(async move {
7172 completions
7173 .await
7174 .map_err(|err| anyhow!("completions request failed: {:?}", err))
7175 });
7176 if rng.lock().gen_bool(0.3) {
7177 log::info!("{}: detaching completions request", guest_username);
7178 cx.update(|cx| completions.detach_and_log_err(cx));
7179 } else {
7180 completions.await?;
7181 }
7182 }
7183 20..=29 => {
7184 let code_actions = project.update(cx, |project, cx| {
7185 log::info!(
7186 "{}: requesting code actions for buffer {} ({:?})",
7187 guest_username,
7188 buffer.read(cx).remote_id(),
7189 buffer.read(cx).file().unwrap().full_path(cx)
7190 );
7191 let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
7192 project.code_actions(&buffer, range, cx)
7193 });
7194 let code_actions = cx.background().spawn(async move {
7195 code_actions.await.map_err(|err| {
7196 anyhow!("code actions request failed: {:?}", err)
7197 })
7198 });
7199 if rng.lock().gen_bool(0.3) {
7200 log::info!("{}: detaching code actions request", guest_username);
7201 cx.update(|cx| code_actions.detach_and_log_err(cx));
7202 } else {
7203 code_actions.await?;
7204 }
7205 }
7206 30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
7207 let (requested_version, save) = buffer.update(cx, |buffer, cx| {
7208 log::info!(
7209 "{}: saving buffer {} ({:?})",
7210 guest_username,
7211 buffer.remote_id(),
7212 buffer.file().unwrap().full_path(cx)
7213 );
7214 (buffer.version(), buffer.save(cx))
7215 });
7216 let save = cx.background().spawn(async move {
7217 let (saved_version, _) = save
7218 .await
7219 .map_err(|err| anyhow!("save request failed: {:?}", err))?;
7220 assert!(saved_version.observed_all(&requested_version));
7221 Ok::<_, anyhow::Error>(())
7222 });
7223 if rng.lock().gen_bool(0.3) {
7224 log::info!("{}: detaching save request", guest_username);
7225 cx.update(|cx| save.detach_and_log_err(cx));
7226 } else {
7227 save.await?;
7228 }
7229 }
7230 40..=44 => {
7231 let prepare_rename = project.update(cx, |project, cx| {
7232 log::info!(
7233 "{}: preparing rename for buffer {} ({:?})",
7234 guest_username,
7235 buffer.read(cx).remote_id(),
7236 buffer.read(cx).file().unwrap().full_path(cx)
7237 );
7238 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7239 project.prepare_rename(buffer, offset, cx)
7240 });
7241 let prepare_rename = cx.background().spawn(async move {
7242 prepare_rename.await.map_err(|err| {
7243 anyhow!("prepare rename request failed: {:?}", err)
7244 })
7245 });
7246 if rng.lock().gen_bool(0.3) {
7247 log::info!("{}: detaching prepare rename request", guest_username);
7248 cx.update(|cx| prepare_rename.detach_and_log_err(cx));
7249 } else {
7250 prepare_rename.await?;
7251 }
7252 }
7253 45..=49 => {
7254 let definitions = project.update(cx, |project, cx| {
7255 log::info!(
7256 "{}: requesting definitions for buffer {} ({:?})",
7257 guest_username,
7258 buffer.read(cx).remote_id(),
7259 buffer.read(cx).file().unwrap().full_path(cx)
7260 );
7261 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7262 project.definition(&buffer, offset, cx)
7263 });
7264 let definitions = cx.background().spawn(async move {
7265 definitions
7266 .await
7267 .map_err(|err| anyhow!("definitions request failed: {:?}", err))
7268 });
7269 if rng.lock().gen_bool(0.3) {
7270 log::info!("{}: detaching definitions request", guest_username);
7271 cx.update(|cx| definitions.detach_and_log_err(cx));
7272 } else {
7273 client
7274 .buffers
7275 .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
7276 }
7277 }
7278 50..=54 => {
7279 let highlights = project.update(cx, |project, cx| {
7280 log::info!(
7281 "{}: requesting highlights for buffer {} ({:?})",
7282 guest_username,
7283 buffer.read(cx).remote_id(),
7284 buffer.read(cx).file().unwrap().full_path(cx)
7285 );
7286 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7287 project.document_highlights(&buffer, offset, cx)
7288 });
7289 let highlights = cx.background().spawn(async move {
7290 highlights
7291 .await
7292 .map_err(|err| anyhow!("highlights request failed: {:?}", err))
7293 });
7294 if rng.lock().gen_bool(0.3) {
7295 log::info!("{}: detaching highlights request", guest_username);
7296 cx.update(|cx| highlights.detach_and_log_err(cx));
7297 } else {
7298 highlights.await?;
7299 }
7300 }
7301 55..=59 => {
7302 let search = project.update(cx, |project, cx| {
7303 let query = rng.lock().gen_range('a'..='z');
7304 log::info!("{}: project-wide search {:?}", guest_username, query);
7305 project.search(SearchQuery::text(query, false, false), cx)
7306 });
7307 let search = cx.background().spawn(async move {
7308 search
7309 .await
7310 .map_err(|err| anyhow!("search request failed: {:?}", err))
7311 });
7312 if rng.lock().gen_bool(0.3) {
7313 log::info!("{}: detaching search request", guest_username);
7314 cx.update(|cx| search.detach_and_log_err(cx));
7315 } else {
7316 client.buffers.extend(search.await?.into_keys());
7317 }
7318 }
7319 60..=69 => {
7320 let worktree = project
7321 .read_with(cx, |project, cx| {
7322 project
7323 .worktrees(&cx)
7324 .filter(|worktree| {
7325 let worktree = worktree.read(cx);
7326 worktree.is_visible()
7327 && worktree.entries(false).any(|e| e.is_file())
7328 && worktree
7329 .root_entry()
7330 .map_or(false, |e| e.is_dir())
7331 })
7332 .choose(&mut *rng.lock())
7333 })
7334 .unwrap();
7335 let (worktree_id, worktree_root_name) = worktree
7336 .read_with(cx, |worktree, _| {
7337 (worktree.id(), worktree.root_name().to_string())
7338 });
7339
7340 let mut new_name = String::new();
7341 for _ in 0..10 {
7342 let letter = rng.lock().gen_range('a'..='z');
7343 new_name.push(letter);
7344 }
7345 let mut new_path = PathBuf::new();
7346 new_path.push(new_name);
7347 new_path.set_extension("rs");
7348 log::info!(
7349 "{}: creating {:?} in worktree {} ({})",
7350 guest_username,
7351 new_path,
7352 worktree_id,
7353 worktree_root_name,
7354 );
7355 project
7356 .update(cx, |project, cx| {
7357 project.create_entry((worktree_id, new_path), false, cx)
7358 })
7359 .unwrap()
7360 .await?;
7361 }
7362 _ => {
7363 buffer.update(cx, |buffer, cx| {
7364 log::info!(
7365 "{}: updating buffer {} ({:?})",
7366 guest_username,
7367 buffer.remote_id(),
7368 buffer.file().unwrap().full_path(cx)
7369 );
7370 if rng.lock().gen_bool(0.7) {
7371 buffer.randomly_edit(&mut *rng.lock(), 5, cx);
7372 } else {
7373 buffer.randomly_undo_redo(&mut *rng.lock(), cx);
7374 }
7375 });
7376 }
7377 }
7378 cx.background().simulate_random_delay().await;
7379 }
7380 Ok(())
7381 }
7382
7383 let result = simulate_guest_internal(
7384 &mut self,
7385 &guest_username,
7386 project.clone(),
7387 op_start_signal,
7388 rng,
7389 &mut cx,
7390 )
7391 .await;
7392 log::info!("{}: done", guest_username);
7393
7394 self.project = Some(project);
7395 (self, cx, result.err())
7396 }
7397 }
7398
7399 impl Drop for TestClient {
7400 fn drop(&mut self) {
7401 self.client.tear_down();
7402 }
7403 }
7404
7405 impl Executor for Arc<gpui::executor::Background> {
7406 type Sleep = gpui::executor::Timer;
7407
7408 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
7409 self.spawn(future).detach();
7410 }
7411
7412 fn sleep(&self, duration: Duration) -> Self::Sleep {
7413 self.as_ref().timer(duration)
7414 }
7415 }
7416
7417 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
7418 channel
7419 .messages()
7420 .cursor::<()>()
7421 .map(|m| {
7422 (
7423 m.sender.github_login.clone(),
7424 m.body.clone(),
7425 m.is_pending(),
7426 )
7427 })
7428 .collect()
7429 }
7430
7431 struct EmptyView;
7432
7433 impl gpui::Entity for EmptyView {
7434 type Event = ();
7435 }
7436
7437 impl gpui::View for EmptyView {
7438 fn ui_name() -> &'static str {
7439 "empty view"
7440 }
7441
7442 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
7443 gpui::Element::boxed(gpui::elements::Empty::new())
7444 }
7445 }
7446}