1mod store;
2
3use crate::{
4 auth,
5 db::{self, ChannelId, MessageId, User, UserId},
6 AppState, Result,
7};
8use anyhow::anyhow;
9use async_tungstenite::tungstenite::{
10 protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
11};
12use axum::{
13 body::Body,
14 extract::{
15 ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
16 ConnectInfo, WebSocketUpgrade,
17 },
18 headers::{Header, HeaderName},
19 http::StatusCode,
20 middleware,
21 response::IntoResponse,
22 routing::get,
23 Extension, Router, TypedHeader,
24};
25use collections::HashMap;
26use futures::{
27 channel::mpsc,
28 future::{self, BoxFuture},
29 FutureExt, SinkExt, StreamExt, TryStreamExt,
30};
31use lazy_static::lazy_static;
32use rpc::{
33 proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
34 Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
35};
36use std::{
37 any::TypeId,
38 future::Future,
39 marker::PhantomData,
40 net::SocketAddr,
41 ops::{Deref, DerefMut},
42 rc::Rc,
43 sync::{
44 atomic::{AtomicBool, Ordering::SeqCst},
45 Arc,
46 },
47 time::Duration,
48};
49use store::{Store, Worktree};
50use time::OffsetDateTime;
51use tokio::{
52 sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
53 time::Sleep,
54};
55use tower::ServiceBuilder;
56use tracing::{info_span, instrument, Instrument};
57
58type MessageHandler =
59 Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
60
61struct Response<R> {
62 server: Arc<Server>,
63 receipt: Receipt<R>,
64 responded: Arc<AtomicBool>,
65}
66
67impl<R: RequestMessage> Response<R> {
68 fn send(self, payload: R::Response) -> Result<()> {
69 self.responded.store(true, SeqCst);
70 self.server.peer.respond(self.receipt, payload)?;
71 Ok(())
72 }
73
74 fn into_receipt(self) -> Receipt<R> {
75 self.responded.store(true, SeqCst);
76 self.receipt
77 }
78}
79
80pub struct Server {
81 peer: Arc<Peer>,
82 store: RwLock<Store>,
83 app_state: Arc<AppState>,
84 handlers: HashMap<TypeId, MessageHandler>,
85 notifications: Option<mpsc::UnboundedSender<()>>,
86}
87
88pub trait Executor: Send + Clone {
89 type Sleep: Send + Future;
90 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
91 fn sleep(&self, duration: Duration) -> Self::Sleep;
92}
93
94#[derive(Clone)]
95pub struct RealExecutor;
96
97const MESSAGE_COUNT_PER_PAGE: usize = 100;
98const MAX_MESSAGE_LEN: usize = 1024;
99
100struct StoreReadGuard<'a> {
101 guard: RwLockReadGuard<'a, Store>,
102 _not_send: PhantomData<Rc<()>>,
103}
104
105struct StoreWriteGuard<'a> {
106 guard: RwLockWriteGuard<'a, Store>,
107 _not_send: PhantomData<Rc<()>>,
108}
109
110impl Server {
111 pub fn new(
112 app_state: Arc<AppState>,
113 notifications: Option<mpsc::UnboundedSender<()>>,
114 ) -> Arc<Self> {
115 let mut server = Self {
116 peer: Peer::new(),
117 app_state,
118 store: Default::default(),
119 handlers: Default::default(),
120 notifications,
121 };
122
123 server
124 .add_request_handler(Server::ping)
125 .add_request_handler(Server::register_project)
126 .add_message_handler(Server::unregister_project)
127 .add_request_handler(Server::join_project)
128 .add_message_handler(Server::leave_project)
129 .add_message_handler(Server::respond_to_join_project_request)
130 .add_request_handler(Server::register_worktree)
131 .add_message_handler(Server::unregister_worktree)
132 .add_request_handler(Server::update_worktree)
133 .add_message_handler(Server::start_language_server)
134 .add_message_handler(Server::update_language_server)
135 .add_message_handler(Server::update_diagnostic_summary)
136 .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
137 .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
138 .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
139 .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
140 .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
141 .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
142 .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
143 .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
144 .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
145 .add_request_handler(
146 Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
147 )
148 .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
149 .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
150 .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
151 .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
152 .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
153 .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
154 .add_request_handler(Server::forward_project_request::<proto::CreateProjectEntry>)
155 .add_request_handler(Server::forward_project_request::<proto::RenameProjectEntry>)
156 .add_request_handler(Server::forward_project_request::<proto::DeleteProjectEntry>)
157 .add_request_handler(Server::update_buffer)
158 .add_message_handler(Server::update_buffer_file)
159 .add_message_handler(Server::buffer_reloaded)
160 .add_message_handler(Server::buffer_saved)
161 .add_request_handler(Server::save_buffer)
162 .add_request_handler(Server::get_channels)
163 .add_request_handler(Server::get_users)
164 .add_request_handler(Server::fuzzy_search_users)
165 .add_request_handler(Server::request_contact)
166 .add_request_handler(Server::remove_contact)
167 .add_request_handler(Server::respond_to_contact_request)
168 .add_request_handler(Server::join_channel)
169 .add_message_handler(Server::leave_channel)
170 .add_request_handler(Server::send_channel_message)
171 .add_request_handler(Server::follow)
172 .add_message_handler(Server::unfollow)
173 .add_message_handler(Server::update_followers)
174 .add_request_handler(Server::get_channel_messages);
175
176 Arc::new(server)
177 }
178
179 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
180 where
181 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
182 Fut: 'static + Send + Future<Output = Result<()>>,
183 M: EnvelopedMessage,
184 {
185 let prev_handler = self.handlers.insert(
186 TypeId::of::<M>(),
187 Box::new(move |server, envelope| {
188 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
189 let span = info_span!(
190 "handle message",
191 payload_type = envelope.payload_type_name(),
192 payload = format!("{:?}", envelope.payload).as_str(),
193 );
194 let future = (handler)(server, *envelope);
195 async move {
196 if let Err(error) = future.await {
197 tracing::error!(%error, "error handling message");
198 }
199 }
200 .instrument(span)
201 .boxed()
202 }),
203 );
204 if prev_handler.is_some() {
205 panic!("registered a handler for the same message twice");
206 }
207 self
208 }
209
210 /// Handle a request while holding a lock to the store. This is useful when we're registering
211 /// a connection but we want to respond on the connection before anybody else can send on it.
212 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
213 where
214 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>, Response<M>) -> Fut,
215 Fut: Send + Future<Output = Result<()>>,
216 M: RequestMessage,
217 {
218 let handler = Arc::new(handler);
219 self.add_message_handler(move |server, envelope| {
220 let receipt = envelope.receipt();
221 let handler = handler.clone();
222 async move {
223 let responded = Arc::new(AtomicBool::default());
224 let response = Response {
225 server: server.clone(),
226 responded: responded.clone(),
227 receipt: envelope.receipt(),
228 };
229 match (handler)(server.clone(), envelope, response).await {
230 Ok(()) => {
231 if responded.load(std::sync::atomic::Ordering::SeqCst) {
232 Ok(())
233 } else {
234 Err(anyhow!("handler did not send a response"))?
235 }
236 }
237 Err(error) => {
238 server.peer.respond_with_error(
239 receipt,
240 proto::Error {
241 message: error.to_string(),
242 },
243 )?;
244 Err(error)
245 }
246 }
247 }
248 })
249 }
250
251 pub fn handle_connection<E: Executor>(
252 self: &Arc<Self>,
253 connection: Connection,
254 address: String,
255 user: User,
256 mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
257 executor: E,
258 ) -> impl Future<Output = Result<()>> {
259 let mut this = self.clone();
260 let user_id = user.id;
261 let login = user.github_login;
262 let span = info_span!("handle connection", %user_id, %login, %address);
263 async move {
264 let (connection_id, handle_io, mut incoming_rx) = this
265 .peer
266 .add_connection(connection, {
267 let executor = executor.clone();
268 move |duration| {
269 let timer = executor.sleep(duration);
270 async move {
271 timer.await;
272 }
273 }
274 })
275 .await;
276
277 tracing::info!(%user_id, %login, %connection_id, %address, "connection opened");
278
279 if let Some(send_connection_id) = send_connection_id.as_mut() {
280 let _ = send_connection_id.send(connection_id).await;
281 }
282
283 let (contacts, invite_code) = future::try_join(
284 this.app_state.db.get_contacts(user_id),
285 this.app_state.db.get_invite_code_for_user(user_id)
286 ).await?;
287
288 {
289 let mut store = this.store_mut().await;
290 store.add_connection(connection_id, user_id);
291 this.peer.send(connection_id, store.build_initial_contacts_update(contacts))?;
292
293 if let Some((code, count)) = invite_code {
294 this.peer.send(connection_id, proto::UpdateInviteInfo {
295 url: format!("{}{}", this.app_state.invite_link_prefix, code),
296 count,
297 })?;
298 }
299 }
300 this.update_user_contacts(user_id).await?;
301
302 let handle_io = handle_io.fuse();
303 futures::pin_mut!(handle_io);
304 loop {
305 let next_message = incoming_rx.next().fuse();
306 futures::pin_mut!(next_message);
307 futures::select_biased! {
308 result = handle_io => {
309 if let Err(error) = result {
310 tracing::error!(?error, %user_id, %login, %connection_id, %address, "error handling I/O");
311 }
312 break;
313 }
314 message = next_message => {
315 if let Some(message) = message {
316 let type_name = message.payload_type_name();
317 let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name);
318 async {
319 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
320 let notifications = this.notifications.clone();
321 let is_background = message.is_background();
322 let handle_message = (handler)(this.clone(), message);
323 let handle_message = async move {
324 handle_message.await;
325 if let Some(mut notifications) = notifications {
326 let _ = notifications.send(()).await;
327 }
328 };
329 if is_background {
330 executor.spawn_detached(handle_message);
331 } else {
332 handle_message.await;
333 }
334 } else {
335 tracing::error!(%user_id, %login, %connection_id, %address, "no message handler");
336 }
337 }.instrument(span).await;
338 } else {
339 tracing::info!(%user_id, %login, %connection_id, %address, "connection closed");
340 break;
341 }
342 }
343 }
344 }
345
346 tracing::info!(%user_id, %login, %connection_id, %address, "signing out");
347 if let Err(error) = this.sign_out(connection_id).await {
348 tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out");
349 }
350
351 Ok(())
352 }.instrument(span)
353 }
354
355 #[instrument(skip(self), err)]
356 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
357 self.peer.disconnect(connection_id);
358
359 let removed_user_id = {
360 let mut store = self.store_mut().await;
361 let removed_connection = store.remove_connection(connection_id)?;
362
363 for (project_id, project) in removed_connection.hosted_projects {
364 broadcast(connection_id, project.guests.keys().copied(), |conn_id| {
365 self.peer
366 .send(conn_id, proto::UnregisterProject { project_id })
367 });
368
369 for (_, receipts) in project.join_requests {
370 for receipt in receipts {
371 self.peer.respond(
372 receipt,
373 proto::JoinProjectResponse {
374 variant: Some(proto::join_project_response::Variant::Decline(
375 proto::join_project_response::Decline {
376 reason: proto::join_project_response::decline::Reason::WentOffline as i32
377 },
378 )),
379 },
380 )?;
381 }
382 }
383 }
384
385 for project_id in removed_connection.guest_project_ids {
386 if let Some(project) = store.project(project_id).trace_err() {
387 broadcast(connection_id, project.connection_ids(), |conn_id| {
388 self.peer.send(
389 conn_id,
390 proto::RemoveProjectCollaborator {
391 project_id,
392 peer_id: connection_id.0,
393 },
394 )
395 });
396 if project.guests.is_empty() {
397 self.peer
398 .send(
399 project.host_connection_id,
400 proto::ProjectUnshared { project_id },
401 )
402 .trace_err();
403 }
404 }
405 }
406
407 removed_connection.user_id
408 };
409
410 self.update_user_contacts(removed_user_id).await?;
411
412 Ok(())
413 }
414
415 async fn ping(
416 self: Arc<Server>,
417 _: TypedEnvelope<proto::Ping>,
418 response: Response<proto::Ping>,
419 ) -> Result<()> {
420 response.send(proto::Ack {})?;
421 Ok(())
422 }
423
424 async fn register_project(
425 self: Arc<Server>,
426 request: TypedEnvelope<proto::RegisterProject>,
427 response: Response<proto::RegisterProject>,
428 ) -> Result<()> {
429 let user_id;
430 let project_id;
431 {
432 let mut state = self.store_mut().await;
433 user_id = state.user_id_for_connection(request.sender_id)?;
434 project_id = state.register_project(request.sender_id, user_id);
435 };
436 self.update_user_contacts(user_id).await?;
437 response.send(proto::RegisterProjectResponse { project_id })?;
438 Ok(())
439 }
440
441 async fn unregister_project(
442 self: Arc<Server>,
443 request: TypedEnvelope<proto::UnregisterProject>,
444 ) -> Result<()> {
445 let (user_id, project) = {
446 let mut state = self.store_mut().await;
447 let project =
448 state.unregister_project(request.payload.project_id, request.sender_id)?;
449 (state.user_id_for_connection(request.sender_id)?, project)
450 };
451 for (_, receipts) in project.join_requests {
452 for receipt in receipts {
453 self.peer.respond(
454 receipt,
455 proto::JoinProjectResponse {
456 variant: Some(proto::join_project_response::Variant::Decline(
457 proto::join_project_response::Decline {
458 reason: proto::join_project_response::decline::Reason::Closed
459 as i32,
460 },
461 )),
462 },
463 )?;
464 }
465 }
466
467 self.update_user_contacts(user_id).await?;
468 Ok(())
469 }
470
471 async fn update_user_contacts(self: &Arc<Server>, user_id: UserId) -> Result<()> {
472 let contacts = self.app_state.db.get_contacts(user_id).await?;
473 let store = self.store().await;
474 let updated_contact = store.contact_for_user(user_id, false);
475 for contact in contacts {
476 if let db::Contact::Accepted {
477 user_id: contact_user_id,
478 ..
479 } = contact
480 {
481 for contact_conn_id in store.connection_ids_for_user(contact_user_id) {
482 self.peer
483 .send(
484 contact_conn_id,
485 proto::UpdateContacts {
486 contacts: vec![updated_contact.clone()],
487 remove_contacts: Default::default(),
488 incoming_requests: Default::default(),
489 remove_incoming_requests: Default::default(),
490 outgoing_requests: Default::default(),
491 remove_outgoing_requests: Default::default(),
492 },
493 )
494 .trace_err();
495 }
496 }
497 }
498 Ok(())
499 }
500
501 async fn join_project(
502 self: Arc<Server>,
503 request: TypedEnvelope<proto::JoinProject>,
504 response: Response<proto::JoinProject>,
505 ) -> Result<()> {
506 let project_id = request.payload.project_id;
507 let host_user_id;
508 let guest_user_id;
509 let host_connection_id;
510 {
511 let state = self.store().await;
512 let project = state.project(project_id)?;
513 host_user_id = project.host_user_id;
514 host_connection_id = project.host_connection_id;
515 guest_user_id = state.user_id_for_connection(request.sender_id)?;
516 };
517
518 let has_contact = self
519 .app_state
520 .db
521 .has_contact(guest_user_id, host_user_id)
522 .await?;
523 if !has_contact {
524 return Err(anyhow!("no such project"))?;
525 }
526
527 self.store_mut().await.request_join_project(
528 guest_user_id,
529 project_id,
530 response.into_receipt(),
531 )?;
532 self.peer.send(
533 host_connection_id,
534 proto::RequestJoinProject {
535 project_id,
536 requester_id: guest_user_id.to_proto(),
537 },
538 )?;
539 Ok(())
540 }
541
542 async fn respond_to_join_project_request(
543 self: Arc<Server>,
544 request: TypedEnvelope<proto::RespondToJoinProjectRequest>,
545 ) -> Result<()> {
546 let host_user_id;
547
548 {
549 let mut state = self.store_mut().await;
550 let project_id = request.payload.project_id;
551 let project = state.project(project_id)?;
552 if project.host_connection_id != request.sender_id {
553 Err(anyhow!("no such connection"))?;
554 }
555
556 host_user_id = project.host_user_id;
557 let guest_user_id = UserId::from_proto(request.payload.requester_id);
558
559 if !request.payload.allow {
560 let receipts = state
561 .deny_join_project_request(request.sender_id, guest_user_id, project_id)
562 .ok_or_else(|| anyhow!("no such request"))?;
563 for receipt in receipts {
564 self.peer.respond(
565 receipt,
566 proto::JoinProjectResponse {
567 variant: Some(proto::join_project_response::Variant::Decline(
568 proto::join_project_response::Decline {
569 reason: proto::join_project_response::decline::Reason::Declined
570 as i32,
571 },
572 )),
573 },
574 )?;
575 }
576 return Ok(());
577 }
578
579 let (receipts_with_replica_ids, project) = state
580 .accept_join_project_request(request.sender_id, guest_user_id, project_id)
581 .ok_or_else(|| anyhow!("no such request"))?;
582
583 let peer_count = project.guests.len();
584 let mut collaborators = Vec::with_capacity(peer_count);
585 collaborators.push(proto::Collaborator {
586 peer_id: project.host_connection_id.0,
587 replica_id: 0,
588 user_id: project.host_user_id.to_proto(),
589 });
590 let worktrees = project
591 .worktrees
592 .iter()
593 .filter_map(|(id, shared_worktree)| {
594 let worktree = project.worktrees.get(&id)?;
595 Some(proto::Worktree {
596 id: *id,
597 root_name: worktree.root_name.clone(),
598 entries: shared_worktree.entries.values().cloned().collect(),
599 diagnostic_summaries: shared_worktree
600 .diagnostic_summaries
601 .values()
602 .cloned()
603 .collect(),
604 visible: worktree.visible,
605 scan_id: shared_worktree.scan_id,
606 })
607 })
608 .collect::<Vec<_>>();
609
610 // Add all guests other than the requesting user's own connections as collaborators
611 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &project.guests {
612 if receipts_with_replica_ids
613 .iter()
614 .all(|(receipt, _)| receipt.sender_id != *peer_conn_id)
615 {
616 collaborators.push(proto::Collaborator {
617 peer_id: peer_conn_id.0,
618 replica_id: *peer_replica_id as u32,
619 user_id: peer_user_id.to_proto(),
620 });
621 }
622 }
623
624 for conn_id in project.connection_ids() {
625 for (receipt, replica_id) in &receipts_with_replica_ids {
626 if conn_id != receipt.sender_id {
627 self.peer.send(
628 conn_id,
629 proto::AddProjectCollaborator {
630 project_id,
631 collaborator: Some(proto::Collaborator {
632 peer_id: receipt.sender_id.0,
633 replica_id: *replica_id as u32,
634 user_id: guest_user_id.to_proto(),
635 }),
636 },
637 )?;
638 }
639 }
640 }
641
642 for (receipt, replica_id) in receipts_with_replica_ids {
643 self.peer.respond(
644 receipt,
645 proto::JoinProjectResponse {
646 variant: Some(proto::join_project_response::Variant::Accept(
647 proto::join_project_response::Accept {
648 worktrees: worktrees.clone(),
649 replica_id: replica_id as u32,
650 collaborators: collaborators.clone(),
651 language_servers: project.language_servers.clone(),
652 },
653 )),
654 },
655 )?;
656 }
657 }
658
659 self.update_user_contacts(host_user_id).await?;
660 Ok(())
661 }
662
663 async fn leave_project(
664 self: Arc<Server>,
665 request: TypedEnvelope<proto::LeaveProject>,
666 ) -> Result<()> {
667 let sender_id = request.sender_id;
668 let project_id = request.payload.project_id;
669 let project;
670 {
671 let mut store = self.store_mut().await;
672 project = store.leave_project(sender_id, project_id)?;
673
674 if project.remove_collaborator {
675 broadcast(sender_id, project.connection_ids, |conn_id| {
676 self.peer.send(
677 conn_id,
678 proto::RemoveProjectCollaborator {
679 project_id,
680 peer_id: sender_id.0,
681 },
682 )
683 });
684 }
685
686 if let Some(requester_id) = project.cancel_request {
687 self.peer.send(
688 project.host_connection_id,
689 proto::JoinProjectRequestCancelled {
690 project_id,
691 requester_id: requester_id.to_proto(),
692 },
693 )?;
694 }
695
696 if project.unshare {
697 self.peer.send(
698 project.host_connection_id,
699 proto::ProjectUnshared { project_id },
700 )?;
701 }
702 }
703 self.update_user_contacts(project.host_user_id).await?;
704 Ok(())
705 }
706
707 async fn register_worktree(
708 self: Arc<Server>,
709 request: TypedEnvelope<proto::RegisterWorktree>,
710 response: Response<proto::RegisterWorktree>,
711 ) -> Result<()> {
712 let host_user_id;
713 {
714 let mut state = self.store_mut().await;
715 host_user_id = state.user_id_for_connection(request.sender_id)?;
716
717 let guest_connection_ids = state
718 .read_project(request.payload.project_id, request.sender_id)?
719 .guest_connection_ids();
720 state.register_worktree(
721 request.payload.project_id,
722 request.payload.worktree_id,
723 request.sender_id,
724 Worktree {
725 root_name: request.payload.root_name.clone(),
726 visible: request.payload.visible,
727 ..Default::default()
728 },
729 )?;
730
731 broadcast(request.sender_id, guest_connection_ids, |connection_id| {
732 self.peer
733 .forward_send(request.sender_id, connection_id, request.payload.clone())
734 });
735 }
736 self.update_user_contacts(host_user_id).await?;
737 response.send(proto::Ack {})?;
738 Ok(())
739 }
740
741 async fn unregister_worktree(
742 self: Arc<Server>,
743 request: TypedEnvelope<proto::UnregisterWorktree>,
744 ) -> Result<()> {
745 let host_user_id;
746 let project_id = request.payload.project_id;
747 let worktree_id = request.payload.worktree_id;
748 {
749 let mut state = self.store_mut().await;
750 let (_, guest_connection_ids) =
751 state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
752 host_user_id = state.user_id_for_connection(request.sender_id)?;
753 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
754 self.peer.send(
755 conn_id,
756 proto::UnregisterWorktree {
757 project_id,
758 worktree_id,
759 },
760 )
761 });
762 }
763 self.update_user_contacts(host_user_id).await?;
764 Ok(())
765 }
766
767 async fn update_worktree(
768 self: Arc<Server>,
769 request: TypedEnvelope<proto::UpdateWorktree>,
770 response: Response<proto::UpdateWorktree>,
771 ) -> Result<()> {
772 let connection_ids = self.store_mut().await.update_worktree(
773 request.sender_id,
774 request.payload.project_id,
775 request.payload.worktree_id,
776 &request.payload.removed_entries,
777 &request.payload.updated_entries,
778 request.payload.scan_id,
779 )?;
780
781 broadcast(request.sender_id, connection_ids, |connection_id| {
782 self.peer
783 .forward_send(request.sender_id, connection_id, request.payload.clone())
784 });
785 response.send(proto::Ack {})?;
786 Ok(())
787 }
788
789 async fn update_diagnostic_summary(
790 self: Arc<Server>,
791 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
792 ) -> Result<()> {
793 let summary = request
794 .payload
795 .summary
796 .clone()
797 .ok_or_else(|| anyhow!("invalid summary"))?;
798 let receiver_ids = self.store_mut().await.update_diagnostic_summary(
799 request.payload.project_id,
800 request.payload.worktree_id,
801 request.sender_id,
802 summary,
803 )?;
804
805 broadcast(request.sender_id, receiver_ids, |connection_id| {
806 self.peer
807 .forward_send(request.sender_id, connection_id, request.payload.clone())
808 });
809 Ok(())
810 }
811
812 async fn start_language_server(
813 self: Arc<Server>,
814 request: TypedEnvelope<proto::StartLanguageServer>,
815 ) -> Result<()> {
816 let receiver_ids = self.store_mut().await.start_language_server(
817 request.payload.project_id,
818 request.sender_id,
819 request
820 .payload
821 .server
822 .clone()
823 .ok_or_else(|| anyhow!("invalid language server"))?,
824 )?;
825 broadcast(request.sender_id, receiver_ids, |connection_id| {
826 self.peer
827 .forward_send(request.sender_id, connection_id, request.payload.clone())
828 });
829 Ok(())
830 }
831
832 async fn update_language_server(
833 self: Arc<Server>,
834 request: TypedEnvelope<proto::UpdateLanguageServer>,
835 ) -> Result<()> {
836 let receiver_ids = self
837 .store()
838 .await
839 .project_connection_ids(request.payload.project_id, request.sender_id)?;
840 broadcast(request.sender_id, receiver_ids, |connection_id| {
841 self.peer
842 .forward_send(request.sender_id, connection_id, request.payload.clone())
843 });
844 Ok(())
845 }
846
847 async fn forward_project_request<T>(
848 self: Arc<Server>,
849 request: TypedEnvelope<T>,
850 response: Response<T>,
851 ) -> Result<()>
852 where
853 T: EntityMessage + RequestMessage,
854 {
855 let host_connection_id = self
856 .store()
857 .await
858 .read_project(request.payload.remote_entity_id(), request.sender_id)?
859 .host_connection_id;
860
861 response.send(
862 self.peer
863 .forward_request(request.sender_id, host_connection_id, request.payload)
864 .await?,
865 )?;
866 Ok(())
867 }
868
869 async fn save_buffer(
870 self: Arc<Server>,
871 request: TypedEnvelope<proto::SaveBuffer>,
872 response: Response<proto::SaveBuffer>,
873 ) -> Result<()> {
874 let host = self
875 .store()
876 .await
877 .read_project(request.payload.project_id, request.sender_id)?
878 .host_connection_id;
879 let response_payload = self
880 .peer
881 .forward_request(request.sender_id, host, request.payload.clone())
882 .await?;
883
884 let mut guests = self
885 .store()
886 .await
887 .read_project(request.payload.project_id, request.sender_id)?
888 .connection_ids();
889 guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
890 broadcast(host, guests, |conn_id| {
891 self.peer
892 .forward_send(host, conn_id, response_payload.clone())
893 });
894 response.send(response_payload)?;
895 Ok(())
896 }
897
898 async fn update_buffer(
899 self: Arc<Server>,
900 request: TypedEnvelope<proto::UpdateBuffer>,
901 response: Response<proto::UpdateBuffer>,
902 ) -> Result<()> {
903 let receiver_ids = self
904 .store()
905 .await
906 .project_connection_ids(request.payload.project_id, request.sender_id)?;
907 broadcast(request.sender_id, receiver_ids, |connection_id| {
908 self.peer
909 .forward_send(request.sender_id, connection_id, request.payload.clone())
910 });
911 response.send(proto::Ack {})?;
912 Ok(())
913 }
914
915 async fn update_buffer_file(
916 self: Arc<Server>,
917 request: TypedEnvelope<proto::UpdateBufferFile>,
918 ) -> Result<()> {
919 let receiver_ids = self
920 .store()
921 .await
922 .project_connection_ids(request.payload.project_id, request.sender_id)?;
923 broadcast(request.sender_id, receiver_ids, |connection_id| {
924 self.peer
925 .forward_send(request.sender_id, connection_id, request.payload.clone())
926 });
927 Ok(())
928 }
929
930 async fn buffer_reloaded(
931 self: Arc<Server>,
932 request: TypedEnvelope<proto::BufferReloaded>,
933 ) -> Result<()> {
934 let receiver_ids = self
935 .store()
936 .await
937 .project_connection_ids(request.payload.project_id, request.sender_id)?;
938 broadcast(request.sender_id, receiver_ids, |connection_id| {
939 self.peer
940 .forward_send(request.sender_id, connection_id, request.payload.clone())
941 });
942 Ok(())
943 }
944
945 async fn buffer_saved(
946 self: Arc<Server>,
947 request: TypedEnvelope<proto::BufferSaved>,
948 ) -> Result<()> {
949 let receiver_ids = self
950 .store()
951 .await
952 .project_connection_ids(request.payload.project_id, request.sender_id)?;
953 broadcast(request.sender_id, receiver_ids, |connection_id| {
954 self.peer
955 .forward_send(request.sender_id, connection_id, request.payload.clone())
956 });
957 Ok(())
958 }
959
960 async fn follow(
961 self: Arc<Self>,
962 request: TypedEnvelope<proto::Follow>,
963 response: Response<proto::Follow>,
964 ) -> Result<()> {
965 let leader_id = ConnectionId(request.payload.leader_id);
966 let follower_id = request.sender_id;
967 if !self
968 .store()
969 .await
970 .project_connection_ids(request.payload.project_id, follower_id)?
971 .contains(&leader_id)
972 {
973 Err(anyhow!("no such peer"))?;
974 }
975 let mut response_payload = self
976 .peer
977 .forward_request(request.sender_id, leader_id, request.payload)
978 .await?;
979 response_payload
980 .views
981 .retain(|view| view.leader_id != Some(follower_id.0));
982 response.send(response_payload)?;
983 Ok(())
984 }
985
986 async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
987 let leader_id = ConnectionId(request.payload.leader_id);
988 if !self
989 .store()
990 .await
991 .project_connection_ids(request.payload.project_id, request.sender_id)?
992 .contains(&leader_id)
993 {
994 Err(anyhow!("no such peer"))?;
995 }
996 self.peer
997 .forward_send(request.sender_id, leader_id, request.payload)?;
998 Ok(())
999 }
1000
1001 async fn update_followers(
1002 self: Arc<Self>,
1003 request: TypedEnvelope<proto::UpdateFollowers>,
1004 ) -> Result<()> {
1005 let connection_ids = self
1006 .store()
1007 .await
1008 .project_connection_ids(request.payload.project_id, request.sender_id)?;
1009 let leader_id = request
1010 .payload
1011 .variant
1012 .as_ref()
1013 .and_then(|variant| match variant {
1014 proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
1015 proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
1016 proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
1017 });
1018 for follower_id in &request.payload.follower_ids {
1019 let follower_id = ConnectionId(*follower_id);
1020 if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
1021 self.peer
1022 .forward_send(request.sender_id, follower_id, request.payload.clone())?;
1023 }
1024 }
1025 Ok(())
1026 }
1027
1028 async fn get_channels(
1029 self: Arc<Server>,
1030 request: TypedEnvelope<proto::GetChannels>,
1031 response: Response<proto::GetChannels>,
1032 ) -> Result<()> {
1033 let user_id = self
1034 .store()
1035 .await
1036 .user_id_for_connection(request.sender_id)?;
1037 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
1038 response.send(proto::GetChannelsResponse {
1039 channels: channels
1040 .into_iter()
1041 .map(|chan| proto::Channel {
1042 id: chan.id.to_proto(),
1043 name: chan.name,
1044 })
1045 .collect(),
1046 })?;
1047 Ok(())
1048 }
1049
1050 async fn get_users(
1051 self: Arc<Server>,
1052 request: TypedEnvelope<proto::GetUsers>,
1053 response: Response<proto::GetUsers>,
1054 ) -> Result<()> {
1055 let user_ids = request
1056 .payload
1057 .user_ids
1058 .into_iter()
1059 .map(UserId::from_proto)
1060 .collect();
1061 let users = self
1062 .app_state
1063 .db
1064 .get_users_by_ids(user_ids)
1065 .await?
1066 .into_iter()
1067 .map(|user| proto::User {
1068 id: user.id.to_proto(),
1069 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1070 github_login: user.github_login,
1071 })
1072 .collect();
1073 response.send(proto::UsersResponse { users })?;
1074 Ok(())
1075 }
1076
1077 async fn fuzzy_search_users(
1078 self: Arc<Server>,
1079 request: TypedEnvelope<proto::FuzzySearchUsers>,
1080 response: Response<proto::FuzzySearchUsers>,
1081 ) -> Result<()> {
1082 let user_id = self
1083 .store()
1084 .await
1085 .user_id_for_connection(request.sender_id)?;
1086 let query = request.payload.query;
1087 let db = &self.app_state.db;
1088 let users = match query.len() {
1089 0 => vec![],
1090 1 | 2 => db
1091 .get_user_by_github_login(&query)
1092 .await?
1093 .into_iter()
1094 .collect(),
1095 _ => db.fuzzy_search_users(&query, 10).await?,
1096 };
1097 let users = users
1098 .into_iter()
1099 .filter(|user| user.id != user_id)
1100 .map(|user| proto::User {
1101 id: user.id.to_proto(),
1102 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1103 github_login: user.github_login,
1104 })
1105 .collect();
1106 response.send(proto::UsersResponse { users })?;
1107 Ok(())
1108 }
1109
1110 async fn request_contact(
1111 self: Arc<Server>,
1112 request: TypedEnvelope<proto::RequestContact>,
1113 response: Response<proto::RequestContact>,
1114 ) -> Result<()> {
1115 let requester_id = self
1116 .store()
1117 .await
1118 .user_id_for_connection(request.sender_id)?;
1119 let responder_id = UserId::from_proto(request.payload.responder_id);
1120 if requester_id == responder_id {
1121 return Err(anyhow!("cannot add yourself as a contact"))?;
1122 }
1123
1124 self.app_state
1125 .db
1126 .send_contact_request(requester_id, responder_id)
1127 .await?;
1128
1129 // Update outgoing contact requests of requester
1130 let mut update = proto::UpdateContacts::default();
1131 update.outgoing_requests.push(responder_id.to_proto());
1132 for connection_id in self.store().await.connection_ids_for_user(requester_id) {
1133 self.peer.send(connection_id, update.clone())?;
1134 }
1135
1136 // Update incoming contact requests of responder
1137 let mut update = proto::UpdateContacts::default();
1138 update
1139 .incoming_requests
1140 .push(proto::IncomingContactRequest {
1141 requester_id: requester_id.to_proto(),
1142 should_notify: true,
1143 });
1144 for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1145 self.peer.send(connection_id, update.clone())?;
1146 }
1147
1148 response.send(proto::Ack {})?;
1149 Ok(())
1150 }
1151
1152 async fn respond_to_contact_request(
1153 self: Arc<Server>,
1154 request: TypedEnvelope<proto::RespondToContactRequest>,
1155 response: Response<proto::RespondToContactRequest>,
1156 ) -> Result<()> {
1157 let responder_id = self
1158 .store()
1159 .await
1160 .user_id_for_connection(request.sender_id)?;
1161 let requester_id = UserId::from_proto(request.payload.requester_id);
1162 if request.payload.response == proto::ContactRequestResponse::Dismiss as i32 {
1163 self.app_state
1164 .db
1165 .dismiss_contact_notification(responder_id, requester_id)
1166 .await?;
1167 } else {
1168 let accept = request.payload.response == proto::ContactRequestResponse::Accept as i32;
1169 self.app_state
1170 .db
1171 .respond_to_contact_request(responder_id, requester_id, accept)
1172 .await?;
1173
1174 let store = self.store().await;
1175 // Update responder with new contact
1176 let mut update = proto::UpdateContacts::default();
1177 if accept {
1178 update
1179 .contacts
1180 .push(store.contact_for_user(requester_id, false));
1181 }
1182 update
1183 .remove_incoming_requests
1184 .push(requester_id.to_proto());
1185 for connection_id in store.connection_ids_for_user(responder_id) {
1186 self.peer.send(connection_id, update.clone())?;
1187 }
1188
1189 // Update requester with new contact
1190 let mut update = proto::UpdateContacts::default();
1191 if accept {
1192 update
1193 .contacts
1194 .push(store.contact_for_user(responder_id, true));
1195 }
1196 update
1197 .remove_outgoing_requests
1198 .push(responder_id.to_proto());
1199 for connection_id in store.connection_ids_for_user(requester_id) {
1200 self.peer.send(connection_id, update.clone())?;
1201 }
1202 }
1203
1204 response.send(proto::Ack {})?;
1205 Ok(())
1206 }
1207
1208 async fn remove_contact(
1209 self: Arc<Server>,
1210 request: TypedEnvelope<proto::RemoveContact>,
1211 response: Response<proto::RemoveContact>,
1212 ) -> Result<()> {
1213 let requester_id = self
1214 .store()
1215 .await
1216 .user_id_for_connection(request.sender_id)?;
1217 let responder_id = UserId::from_proto(request.payload.user_id);
1218 self.app_state
1219 .db
1220 .remove_contact(requester_id, responder_id)
1221 .await?;
1222
1223 // Update outgoing contact requests of requester
1224 let mut update = proto::UpdateContacts::default();
1225 update
1226 .remove_outgoing_requests
1227 .push(responder_id.to_proto());
1228 for connection_id in self.store().await.connection_ids_for_user(requester_id) {
1229 self.peer.send(connection_id, update.clone())?;
1230 }
1231
1232 // Update incoming contact requests of responder
1233 let mut update = proto::UpdateContacts::default();
1234 update
1235 .remove_incoming_requests
1236 .push(requester_id.to_proto());
1237 for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1238 self.peer.send(connection_id, update.clone())?;
1239 }
1240
1241 response.send(proto::Ack {})?;
1242 Ok(())
1243 }
1244
1245 async fn join_channel(
1246 self: Arc<Self>,
1247 request: TypedEnvelope<proto::JoinChannel>,
1248 response: Response<proto::JoinChannel>,
1249 ) -> Result<()> {
1250 let user_id = self
1251 .store()
1252 .await
1253 .user_id_for_connection(request.sender_id)?;
1254 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1255 if !self
1256 .app_state
1257 .db
1258 .can_user_access_channel(user_id, channel_id)
1259 .await?
1260 {
1261 Err(anyhow!("access denied"))?;
1262 }
1263
1264 self.store_mut()
1265 .await
1266 .join_channel(request.sender_id, channel_id);
1267 let messages = self
1268 .app_state
1269 .db
1270 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
1271 .await?
1272 .into_iter()
1273 .map(|msg| proto::ChannelMessage {
1274 id: msg.id.to_proto(),
1275 body: msg.body,
1276 timestamp: msg.sent_at.unix_timestamp() as u64,
1277 sender_id: msg.sender_id.to_proto(),
1278 nonce: Some(msg.nonce.as_u128().into()),
1279 })
1280 .collect::<Vec<_>>();
1281 response.send(proto::JoinChannelResponse {
1282 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1283 messages,
1284 })?;
1285 Ok(())
1286 }
1287
1288 async fn leave_channel(
1289 self: Arc<Self>,
1290 request: TypedEnvelope<proto::LeaveChannel>,
1291 ) -> Result<()> {
1292 let user_id = self
1293 .store()
1294 .await
1295 .user_id_for_connection(request.sender_id)?;
1296 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1297 if !self
1298 .app_state
1299 .db
1300 .can_user_access_channel(user_id, channel_id)
1301 .await?
1302 {
1303 Err(anyhow!("access denied"))?;
1304 }
1305
1306 self.store_mut()
1307 .await
1308 .leave_channel(request.sender_id, channel_id);
1309
1310 Ok(())
1311 }
1312
1313 async fn send_channel_message(
1314 self: Arc<Self>,
1315 request: TypedEnvelope<proto::SendChannelMessage>,
1316 response: Response<proto::SendChannelMessage>,
1317 ) -> Result<()> {
1318 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1319 let user_id;
1320 let connection_ids;
1321 {
1322 let state = self.store().await;
1323 user_id = state.user_id_for_connection(request.sender_id)?;
1324 connection_ids = state.channel_connection_ids(channel_id)?;
1325 }
1326
1327 // Validate the message body.
1328 let body = request.payload.body.trim().to_string();
1329 if body.len() > MAX_MESSAGE_LEN {
1330 return Err(anyhow!("message is too long"))?;
1331 }
1332 if body.is_empty() {
1333 return Err(anyhow!("message can't be blank"))?;
1334 }
1335
1336 let timestamp = OffsetDateTime::now_utc();
1337 let nonce = request
1338 .payload
1339 .nonce
1340 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
1341
1342 let message_id = self
1343 .app_state
1344 .db
1345 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1346 .await?
1347 .to_proto();
1348 let message = proto::ChannelMessage {
1349 sender_id: user_id.to_proto(),
1350 id: message_id,
1351 body,
1352 timestamp: timestamp.unix_timestamp() as u64,
1353 nonce: Some(nonce),
1354 };
1355 broadcast(request.sender_id, connection_ids, |conn_id| {
1356 self.peer.send(
1357 conn_id,
1358 proto::ChannelMessageSent {
1359 channel_id: channel_id.to_proto(),
1360 message: Some(message.clone()),
1361 },
1362 )
1363 });
1364 response.send(proto::SendChannelMessageResponse {
1365 message: Some(message),
1366 })?;
1367 Ok(())
1368 }
1369
1370 async fn get_channel_messages(
1371 self: Arc<Self>,
1372 request: TypedEnvelope<proto::GetChannelMessages>,
1373 response: Response<proto::GetChannelMessages>,
1374 ) -> Result<()> {
1375 let user_id = self
1376 .store()
1377 .await
1378 .user_id_for_connection(request.sender_id)?;
1379 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1380 if !self
1381 .app_state
1382 .db
1383 .can_user_access_channel(user_id, channel_id)
1384 .await?
1385 {
1386 Err(anyhow!("access denied"))?;
1387 }
1388
1389 let messages = self
1390 .app_state
1391 .db
1392 .get_channel_messages(
1393 channel_id,
1394 MESSAGE_COUNT_PER_PAGE,
1395 Some(MessageId::from_proto(request.payload.before_message_id)),
1396 )
1397 .await?
1398 .into_iter()
1399 .map(|msg| proto::ChannelMessage {
1400 id: msg.id.to_proto(),
1401 body: msg.body,
1402 timestamp: msg.sent_at.unix_timestamp() as u64,
1403 sender_id: msg.sender_id.to_proto(),
1404 nonce: Some(msg.nonce.as_u128().into()),
1405 })
1406 .collect::<Vec<_>>();
1407 response.send(proto::GetChannelMessagesResponse {
1408 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1409 messages,
1410 })?;
1411 Ok(())
1412 }
1413
1414 async fn store<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1415 #[cfg(test)]
1416 tokio::task::yield_now().await;
1417 let guard = self.store.read().await;
1418 #[cfg(test)]
1419 tokio::task::yield_now().await;
1420 StoreReadGuard {
1421 guard,
1422 _not_send: PhantomData,
1423 }
1424 }
1425
1426 async fn store_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1427 #[cfg(test)]
1428 tokio::task::yield_now().await;
1429 let guard = self.store.write().await;
1430 #[cfg(test)]
1431 tokio::task::yield_now().await;
1432 StoreWriteGuard {
1433 guard,
1434 _not_send: PhantomData,
1435 }
1436 }
1437}
1438
1439impl<'a> Deref for StoreReadGuard<'a> {
1440 type Target = Store;
1441
1442 fn deref(&self) -> &Self::Target {
1443 &*self.guard
1444 }
1445}
1446
1447impl<'a> Deref for StoreWriteGuard<'a> {
1448 type Target = Store;
1449
1450 fn deref(&self) -> &Self::Target {
1451 &*self.guard
1452 }
1453}
1454
1455impl<'a> DerefMut for StoreWriteGuard<'a> {
1456 fn deref_mut(&mut self) -> &mut Self::Target {
1457 &mut *self.guard
1458 }
1459}
1460
1461impl<'a> Drop for StoreWriteGuard<'a> {
1462 fn drop(&mut self) {
1463 #[cfg(test)]
1464 self.check_invariants();
1465
1466 let metrics = self.metrics();
1467 tracing::info!(
1468 connections = metrics.connections,
1469 registered_projects = metrics.registered_projects,
1470 shared_projects = metrics.shared_projects,
1471 "metrics"
1472 );
1473 }
1474}
1475
1476impl Executor for RealExecutor {
1477 type Sleep = Sleep;
1478
1479 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1480 tokio::task::spawn(future);
1481 }
1482
1483 fn sleep(&self, duration: Duration) -> Self::Sleep {
1484 tokio::time::sleep(duration)
1485 }
1486}
1487
1488fn broadcast<F>(
1489 sender_id: ConnectionId,
1490 receiver_ids: impl IntoIterator<Item = ConnectionId>,
1491 mut f: F,
1492) where
1493 F: FnMut(ConnectionId) -> anyhow::Result<()>,
1494{
1495 for receiver_id in receiver_ids {
1496 if receiver_id != sender_id {
1497 f(receiver_id).trace_err();
1498 }
1499 }
1500}
1501
1502lazy_static! {
1503 static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1504}
1505
1506pub struct ProtocolVersion(u32);
1507
1508impl Header for ProtocolVersion {
1509 fn name() -> &'static HeaderName {
1510 &ZED_PROTOCOL_VERSION
1511 }
1512
1513 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1514 where
1515 Self: Sized,
1516 I: Iterator<Item = &'i axum::http::HeaderValue>,
1517 {
1518 let version = values
1519 .next()
1520 .ok_or_else(|| axum::headers::Error::invalid())?
1521 .to_str()
1522 .map_err(|_| axum::headers::Error::invalid())?
1523 .parse()
1524 .map_err(|_| axum::headers::Error::invalid())?;
1525 Ok(Self(version))
1526 }
1527
1528 fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1529 values.extend([self.0.to_string().parse().unwrap()]);
1530 }
1531}
1532
1533pub fn routes(app_state: Arc<AppState>) -> Router<Body> {
1534 let server = Server::new(app_state.clone(), None);
1535 Router::new()
1536 .route("/rpc", get(handle_websocket_request))
1537 .layer(
1538 ServiceBuilder::new()
1539 .layer(Extension(app_state))
1540 .layer(middleware::from_fn(auth::validate_header))
1541 .layer(Extension(server)),
1542 )
1543}
1544
1545pub async fn handle_websocket_request(
1546 TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1547 ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1548 Extension(server): Extension<Arc<Server>>,
1549 Extension(user): Extension<User>,
1550 ws: WebSocketUpgrade,
1551) -> axum::response::Response {
1552 if protocol_version != rpc::PROTOCOL_VERSION {
1553 return (
1554 StatusCode::UPGRADE_REQUIRED,
1555 "client must be upgraded".to_string(),
1556 )
1557 .into_response();
1558 }
1559 let socket_address = socket_address.to_string();
1560 ws.on_upgrade(move |socket| {
1561 use util::ResultExt;
1562 let socket = socket
1563 .map_ok(to_tungstenite_message)
1564 .err_into()
1565 .with(|message| async move { Ok(to_axum_message(message)) });
1566 let connection = Connection::new(Box::pin(socket));
1567 async move {
1568 server
1569 .handle_connection(connection, socket_address, user, None, RealExecutor)
1570 .await
1571 .log_err();
1572 }
1573 })
1574}
1575
1576fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1577 match message {
1578 TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1579 TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1580 TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1581 TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1582 TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1583 code: frame.code.into(),
1584 reason: frame.reason,
1585 })),
1586 }
1587}
1588
1589fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1590 match message {
1591 AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1592 AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1593 AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1594 AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1595 AxumMessage::Close(frame) => {
1596 TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1597 code: frame.code.into(),
1598 reason: frame.reason,
1599 }))
1600 }
1601 }
1602}
1603
1604pub trait ResultExt {
1605 type Ok;
1606
1607 fn trace_err(self) -> Option<Self::Ok>;
1608}
1609
1610impl<T, E> ResultExt for Result<T, E>
1611where
1612 E: std::fmt::Debug,
1613{
1614 type Ok = T;
1615
1616 fn trace_err(self) -> Option<T> {
1617 match self {
1618 Ok(value) => Some(value),
1619 Err(error) => {
1620 tracing::error!("{:?}", error);
1621 None
1622 }
1623 }
1624 }
1625}
1626
1627#[cfg(test)]
1628mod tests {
1629 use super::*;
1630 use crate::{
1631 db::{tests::TestDb, UserId},
1632 AppState,
1633 };
1634 use ::rpc::Peer;
1635 use client::{
1636 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1637 EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1638 };
1639 use collections::{BTreeMap, HashSet};
1640 use editor::{
1641 self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1642 ToOffset, ToggleCodeActions, Undo,
1643 };
1644 use gpui::{
1645 executor::{self, Deterministic},
1646 geometry::vector::vec2f,
1647 ModelHandle, TestAppContext, ViewHandle,
1648 };
1649 use language::{
1650 range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1651 LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1652 };
1653 use lsp::{self, FakeLanguageServer};
1654 use parking_lot::Mutex;
1655 use project::{
1656 fs::{FakeFs, Fs as _},
1657 search::SearchQuery,
1658 worktree::WorktreeHandle,
1659 DiagnosticSummary, Project, ProjectPath, WorktreeId,
1660 };
1661 use rand::prelude::*;
1662 use rpc::PeerId;
1663 use serde_json::json;
1664 use settings::Settings;
1665 use sqlx::types::time::OffsetDateTime;
1666 use std::{
1667 cell::RefCell,
1668 env,
1669 ops::Deref,
1670 path::{Path, PathBuf},
1671 rc::Rc,
1672 sync::{
1673 atomic::{AtomicBool, Ordering::SeqCst},
1674 Arc,
1675 },
1676 time::Duration,
1677 };
1678 use theme::ThemeRegistry;
1679 use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
1680
1681 #[cfg(test)]
1682 #[ctor::ctor]
1683 fn init_logger() {
1684 if std::env::var("RUST_LOG").is_ok() {
1685 env_logger::init();
1686 }
1687 }
1688
1689 #[gpui::test(iterations = 10)]
1690 async fn test_share_project(
1691 deterministic: Arc<Deterministic>,
1692 cx_a: &mut TestAppContext,
1693 cx_b: &mut TestAppContext,
1694 cx_b2: &mut TestAppContext,
1695 ) {
1696 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1697 let lang_registry = Arc::new(LanguageRegistry::test());
1698 let fs = FakeFs::new(cx_a.background());
1699 cx_a.foreground().forbid_parking();
1700
1701 // Connect to a server as 2 clients.
1702 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1703 let client_a = server.create_client(cx_a, "user_a").await;
1704 let mut client_b = server.create_client(cx_b, "user_b").await;
1705 server
1706 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1707 .await;
1708
1709 // Share a project as client A
1710 fs.insert_tree(
1711 "/a",
1712 json!({
1713 "a.txt": "a-contents",
1714 "b.txt": "b-contents",
1715 }),
1716 )
1717 .await;
1718 let project_a = cx_a.update(|cx| {
1719 Project::local(
1720 client_a.clone(),
1721 client_a.user_store.clone(),
1722 lang_registry.clone(),
1723 fs.clone(),
1724 cx,
1725 )
1726 });
1727 let project_id = project_a
1728 .read_with(cx_a, |project, _| project.next_remote_id())
1729 .await;
1730 let (worktree_a, _) = project_a
1731 .update(cx_a, |p, cx| {
1732 p.find_or_create_local_worktree("/a", true, cx)
1733 })
1734 .await
1735 .unwrap();
1736 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1737 worktree_a
1738 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1739 .await;
1740
1741 // Join that project as client B
1742 let client_b_peer_id = client_b.peer_id;
1743 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1744
1745 let replica_id_b = project_b.read_with(cx_b, |project, _| {
1746 assert_eq!(
1747 project
1748 .collaborators()
1749 .get(&client_a.peer_id)
1750 .unwrap()
1751 .user
1752 .github_login,
1753 "user_a"
1754 );
1755 project.replica_id()
1756 });
1757 project_a
1758 .condition(&cx_a, |tree, _| {
1759 tree.collaborators()
1760 .get(&client_b_peer_id)
1761 .map_or(false, |collaborator| {
1762 collaborator.replica_id == replica_id_b
1763 && collaborator.user.github_login == "user_b"
1764 })
1765 })
1766 .await;
1767
1768 // Open the same file as client B and client A.
1769 let buffer_b = project_b
1770 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1771 .await
1772 .unwrap();
1773 buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1774 project_a.read_with(cx_a, |project, cx| {
1775 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1776 });
1777 let buffer_a = project_a
1778 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1779 .await
1780 .unwrap();
1781
1782 let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1783
1784 // TODO
1785 // // Create a selection set as client B and see that selection set as client A.
1786 // buffer_a
1787 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1788 // .await;
1789
1790 // Edit the buffer as client B and see that edit as client A.
1791 editor_b.update(cx_b, |editor, cx| {
1792 editor.handle_input(&Input("ok, ".into()), cx)
1793 });
1794 buffer_a
1795 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1796 .await;
1797
1798 // TODO
1799 // // Remove the selection set as client B, see those selections disappear as client A.
1800 cx_b.update(move |_| drop(editor_b));
1801 // buffer_a
1802 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1803 // .await;
1804
1805 // Client B can join again on a different window because they are already a participant.
1806 let client_b2 = server.create_client(cx_b2, "user_b").await;
1807 let project_b2 = Project::remote(
1808 project_id,
1809 client_b2.client.clone(),
1810 client_b2.user_store.clone(),
1811 lang_registry.clone(),
1812 FakeFs::new(cx_b2.background()),
1813 &mut cx_b2.to_async(),
1814 )
1815 .await
1816 .unwrap();
1817 deterministic.run_until_parked();
1818 project_a.read_with(cx_a, |project, _| {
1819 assert_eq!(project.collaborators().len(), 2);
1820 });
1821 project_b.read_with(cx_b, |project, _| {
1822 assert_eq!(project.collaborators().len(), 2);
1823 });
1824 project_b2.read_with(cx_b2, |project, _| {
1825 assert_eq!(project.collaborators().len(), 2);
1826 });
1827
1828 // Dropping client B's first project removes only that from client A's collaborators.
1829 cx_b.update(move |_| {
1830 drop(client_b.project.take());
1831 drop(project_b);
1832 });
1833 deterministic.run_until_parked();
1834 project_a.read_with(cx_a, |project, _| {
1835 assert_eq!(project.collaborators().len(), 1);
1836 });
1837 project_b2.read_with(cx_b2, |project, _| {
1838 assert_eq!(project.collaborators().len(), 1);
1839 });
1840 }
1841
1842 #[gpui::test(iterations = 10)]
1843 async fn test_unshare_project(
1844 deterministic: Arc<Deterministic>,
1845 cx_a: &mut TestAppContext,
1846 cx_b: &mut TestAppContext,
1847 ) {
1848 let lang_registry = Arc::new(LanguageRegistry::test());
1849 let fs = FakeFs::new(cx_a.background());
1850 cx_a.foreground().forbid_parking();
1851
1852 // Connect to a server as 2 clients.
1853 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1854 let client_a = server.create_client(cx_a, "user_a").await;
1855 let mut client_b = server.create_client(cx_b, "user_b").await;
1856 server
1857 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1858 .await;
1859
1860 // Share a project as client A
1861 fs.insert_tree(
1862 "/a",
1863 json!({
1864 "a.txt": "a-contents",
1865 "b.txt": "b-contents",
1866 }),
1867 )
1868 .await;
1869 let project_a = cx_a.update(|cx| {
1870 Project::local(
1871 client_a.clone(),
1872 client_a.user_store.clone(),
1873 lang_registry.clone(),
1874 fs.clone(),
1875 cx,
1876 )
1877 });
1878 let (worktree_a, _) = project_a
1879 .update(cx_a, |p, cx| {
1880 p.find_or_create_local_worktree("/a", true, cx)
1881 })
1882 .await
1883 .unwrap();
1884 worktree_a
1885 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1886 .await;
1887 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1888
1889 // Join that project as client B
1890 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1891 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1892 project_b
1893 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1894 .await
1895 .unwrap();
1896
1897 // When client B leaves the project, it gets automatically unshared.
1898 cx_b.update(|_| {
1899 drop(client_b.project.take());
1900 drop(project_b);
1901 });
1902 deterministic.run_until_parked();
1903 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1904
1905 // When client B joins again, the project gets re-shared.
1906 let project_b2 = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1907 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1908 project_b2
1909 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1910 .await
1911 .unwrap();
1912 }
1913
1914 #[gpui::test(iterations = 10)]
1915 async fn test_host_disconnect(
1916 deterministic: Arc<Deterministic>,
1917 cx_a: &mut TestAppContext,
1918 cx_b: &mut TestAppContext,
1919 cx_c: &mut TestAppContext,
1920 ) {
1921 let lang_registry = Arc::new(LanguageRegistry::test());
1922 let fs = FakeFs::new(cx_a.background());
1923 cx_a.foreground().forbid_parking();
1924
1925 // Connect to a server as 3 clients.
1926 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1927 let client_a = server.create_client(cx_a, "user_a").await;
1928 let mut client_b = server.create_client(cx_b, "user_b").await;
1929 let client_c = server.create_client(cx_c, "user_c").await;
1930 server
1931 .make_contacts(vec![
1932 (&client_a, cx_a),
1933 (&client_b, cx_b),
1934 (&client_c, cx_c),
1935 ])
1936 .await;
1937
1938 // Share a project as client A
1939 fs.insert_tree(
1940 "/a",
1941 json!({
1942 "a.txt": "a-contents",
1943 "b.txt": "b-contents",
1944 }),
1945 )
1946 .await;
1947 let project_a = cx_a.update(|cx| {
1948 Project::local(
1949 client_a.clone(),
1950 client_a.user_store.clone(),
1951 lang_registry.clone(),
1952 fs.clone(),
1953 cx,
1954 )
1955 });
1956 let project_id = project_a
1957 .read_with(cx_a, |project, _| project.next_remote_id())
1958 .await;
1959 let (worktree_a, _) = project_a
1960 .update(cx_a, |p, cx| {
1961 p.find_or_create_local_worktree("/a", true, cx)
1962 })
1963 .await
1964 .unwrap();
1965 worktree_a
1966 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1967 .await;
1968 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1969
1970 // Join that project as client B
1971 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1972 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1973 project_b
1974 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1975 .await
1976 .unwrap();
1977
1978 // Request to join that project as client C
1979 let project_c = cx_c.spawn(|mut cx| {
1980 let client = client_c.client.clone();
1981 let user_store = client_c.user_store.clone();
1982 let lang_registry = lang_registry.clone();
1983 async move {
1984 Project::remote(
1985 project_id,
1986 client,
1987 user_store,
1988 lang_registry.clone(),
1989 FakeFs::new(cx.background()),
1990 &mut cx,
1991 )
1992 .await
1993 }
1994 });
1995 deterministic.run_until_parked();
1996
1997 // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
1998 server.disconnect_client(client_a.current_user_id(cx_a));
1999 cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
2000 project_a
2001 .condition(cx_a, |project, _| project.collaborators().is_empty())
2002 .await;
2003 project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
2004 project_b
2005 .condition(cx_b, |project, _| project.is_read_only())
2006 .await;
2007 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
2008 cx_b.update(|_| {
2009 drop(project_b);
2010 });
2011 assert!(matches!(
2012 project_c.await.unwrap_err(),
2013 project::JoinProjectError::HostWentOffline
2014 ));
2015
2016 // Ensure guests can still join.
2017 let project_b2 = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2018 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
2019 project_b2
2020 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2021 .await
2022 .unwrap();
2023 }
2024
2025 #[gpui::test(iterations = 10)]
2026 async fn test_decline_join_request(
2027 deterministic: Arc<Deterministic>,
2028 cx_a: &mut TestAppContext,
2029 cx_b: &mut TestAppContext,
2030 ) {
2031 let lang_registry = Arc::new(LanguageRegistry::test());
2032 let fs = FakeFs::new(cx_a.background());
2033 cx_a.foreground().forbid_parking();
2034
2035 // Connect to a server as 2 clients.
2036 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2037 let client_a = server.create_client(cx_a, "user_a").await;
2038 let client_b = server.create_client(cx_b, "user_b").await;
2039 server
2040 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2041 .await;
2042
2043 // Share a project as client A
2044 fs.insert_tree("/a", json!({})).await;
2045 let project_a = cx_a.update(|cx| {
2046 Project::local(
2047 client_a.clone(),
2048 client_a.user_store.clone(),
2049 lang_registry.clone(),
2050 fs.clone(),
2051 cx,
2052 )
2053 });
2054 let project_id = project_a
2055 .read_with(cx_a, |project, _| project.next_remote_id())
2056 .await;
2057 let (worktree_a, _) = project_a
2058 .update(cx_a, |p, cx| {
2059 p.find_or_create_local_worktree("/a", true, cx)
2060 })
2061 .await
2062 .unwrap();
2063 worktree_a
2064 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2065 .await;
2066
2067 // Request to join that project as client B
2068 let project_b = cx_b.spawn(|mut cx| {
2069 let client = client_b.client.clone();
2070 let user_store = client_b.user_store.clone();
2071 let lang_registry = lang_registry.clone();
2072 async move {
2073 Project::remote(
2074 project_id,
2075 client,
2076 user_store,
2077 lang_registry.clone(),
2078 FakeFs::new(cx.background()),
2079 &mut cx,
2080 )
2081 .await
2082 }
2083 });
2084 deterministic.run_until_parked();
2085 project_a.update(cx_a, |project, cx| {
2086 project.respond_to_join_request(client_b.user_id().unwrap(), false, cx)
2087 });
2088 assert!(matches!(
2089 project_b.await.unwrap_err(),
2090 project::JoinProjectError::HostDeclined
2091 ));
2092
2093 // Request to join the project again as client B
2094 let project_b = cx_b.spawn(|mut cx| {
2095 let client = client_b.client.clone();
2096 let user_store = client_b.user_store.clone();
2097 let lang_registry = lang_registry.clone();
2098 async move {
2099 Project::remote(
2100 project_id,
2101 client,
2102 user_store,
2103 lang_registry.clone(),
2104 FakeFs::new(cx.background()),
2105 &mut cx,
2106 )
2107 .await
2108 }
2109 });
2110
2111 // Close the project on the host
2112 deterministic.run_until_parked();
2113 cx_a.update(|_| drop(project_a));
2114 deterministic.run_until_parked();
2115 assert!(matches!(
2116 project_b.await.unwrap_err(),
2117 project::JoinProjectError::HostClosedProject
2118 ));
2119 }
2120
2121 #[gpui::test(iterations = 10)]
2122 async fn test_cancel_join_request(
2123 deterministic: Arc<Deterministic>,
2124 cx_a: &mut TestAppContext,
2125 cx_b: &mut TestAppContext,
2126 ) {
2127 let lang_registry = Arc::new(LanguageRegistry::test());
2128 let fs = FakeFs::new(cx_a.background());
2129 cx_a.foreground().forbid_parking();
2130
2131 // Connect to a server as 2 clients.
2132 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2133 let client_a = server.create_client(cx_a, "user_a").await;
2134 let client_b = server.create_client(cx_b, "user_b").await;
2135 server
2136 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2137 .await;
2138
2139 // Share a project as client A
2140 fs.insert_tree("/a", json!({})).await;
2141 let project_a = cx_a.update(|cx| {
2142 Project::local(
2143 client_a.clone(),
2144 client_a.user_store.clone(),
2145 lang_registry.clone(),
2146 fs.clone(),
2147 cx,
2148 )
2149 });
2150 let project_id = project_a
2151 .read_with(cx_a, |project, _| project.next_remote_id())
2152 .await;
2153
2154 let project_a_events = Rc::new(RefCell::new(Vec::new()));
2155 let user_b = client_a
2156 .user_store
2157 .update(cx_a, |store, cx| {
2158 store.fetch_user(client_b.user_id().unwrap(), cx)
2159 })
2160 .await
2161 .unwrap();
2162 project_a.update(cx_a, {
2163 let project_a_events = project_a_events.clone();
2164 move |_, cx| {
2165 cx.subscribe(&cx.handle(), move |_, _, event, _| {
2166 project_a_events.borrow_mut().push(event.clone());
2167 })
2168 .detach();
2169 }
2170 });
2171
2172 let (worktree_a, _) = project_a
2173 .update(cx_a, |p, cx| {
2174 p.find_or_create_local_worktree("/a", true, cx)
2175 })
2176 .await
2177 .unwrap();
2178 worktree_a
2179 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2180 .await;
2181
2182 // Request to join that project as client B
2183 let project_b = cx_b.spawn(|mut cx| {
2184 let client = client_b.client.clone();
2185 let user_store = client_b.user_store.clone();
2186 let lang_registry = lang_registry.clone();
2187 async move {
2188 Project::remote(
2189 project_id,
2190 client,
2191 user_store,
2192 lang_registry.clone(),
2193 FakeFs::new(cx.background()),
2194 &mut cx,
2195 )
2196 .await
2197 }
2198 });
2199 deterministic.run_until_parked();
2200 assert_eq!(
2201 &*project_a_events.borrow(),
2202 &[project::Event::ContactRequestedJoin(user_b.clone())]
2203 );
2204 project_a_events.borrow_mut().clear();
2205
2206 // Cancel the join request by leaving the project
2207 client_b
2208 .client
2209 .send(proto::LeaveProject { project_id })
2210 .unwrap();
2211 drop(project_b);
2212
2213 deterministic.run_until_parked();
2214 assert_eq!(
2215 &*project_a_events.borrow(),
2216 &[project::Event::ContactCancelledJoinRequest(user_b.clone())]
2217 );
2218 }
2219
2220 #[gpui::test(iterations = 10)]
2221 async fn test_propagate_saves_and_fs_changes(
2222 cx_a: &mut TestAppContext,
2223 cx_b: &mut TestAppContext,
2224 cx_c: &mut TestAppContext,
2225 ) {
2226 let lang_registry = Arc::new(LanguageRegistry::test());
2227 let fs = FakeFs::new(cx_a.background());
2228 cx_a.foreground().forbid_parking();
2229
2230 // Connect to a server as 3 clients.
2231 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2232 let client_a = server.create_client(cx_a, "user_a").await;
2233 let mut client_b = server.create_client(cx_b, "user_b").await;
2234 let mut client_c = server.create_client(cx_c, "user_c").await;
2235 server
2236 .make_contacts(vec![
2237 (&client_a, cx_a),
2238 (&client_b, cx_b),
2239 (&client_c, cx_c),
2240 ])
2241 .await;
2242
2243 // Share a worktree as client A.
2244 fs.insert_tree(
2245 "/a",
2246 json!({
2247 "file1": "",
2248 "file2": ""
2249 }),
2250 )
2251 .await;
2252 let project_a = cx_a.update(|cx| {
2253 Project::local(
2254 client_a.clone(),
2255 client_a.user_store.clone(),
2256 lang_registry.clone(),
2257 fs.clone(),
2258 cx,
2259 )
2260 });
2261 let (worktree_a, _) = project_a
2262 .update(cx_a, |p, cx| {
2263 p.find_or_create_local_worktree("/a", true, cx)
2264 })
2265 .await
2266 .unwrap();
2267 worktree_a
2268 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2269 .await;
2270 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2271
2272 // Join that worktree as clients B and C.
2273 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2274 let project_c = client_c.build_remote_project(&project_a, cx_a, cx_c).await;
2275 let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2276 let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
2277
2278 // Open and edit a buffer as both guests B and C.
2279 let buffer_b = project_b
2280 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
2281 .await
2282 .unwrap();
2283 let buffer_c = project_c
2284 .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
2285 .await
2286 .unwrap();
2287 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "i-am-b, ")], cx));
2288 buffer_c.update(cx_c, |buf, cx| buf.edit([(0..0, "i-am-c, ")], cx));
2289
2290 // Open and edit that buffer as the host.
2291 let buffer_a = project_a
2292 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
2293 .await
2294 .unwrap();
2295
2296 buffer_a
2297 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
2298 .await;
2299 buffer_a.update(cx_a, |buf, cx| {
2300 buf.edit([(buf.len()..buf.len(), "i-am-a")], cx)
2301 });
2302
2303 // Wait for edits to propagate
2304 buffer_a
2305 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2306 .await;
2307 buffer_b
2308 .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2309 .await;
2310 buffer_c
2311 .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2312 .await;
2313
2314 // Edit the buffer as the host and concurrently save as guest B.
2315 let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
2316 buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "hi-a, ")], cx));
2317 save_b.await.unwrap();
2318 assert_eq!(
2319 fs.load("/a/file1".as_ref()).await.unwrap(),
2320 "hi-a, i-am-c, i-am-b, i-am-a"
2321 );
2322 buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
2323 buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
2324 buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
2325
2326 worktree_a.flush_fs_events(cx_a).await;
2327
2328 // Make changes on host's file system, see those changes on guest worktrees.
2329 fs.rename(
2330 "/a/file1".as_ref(),
2331 "/a/file1-renamed".as_ref(),
2332 Default::default(),
2333 )
2334 .await
2335 .unwrap();
2336
2337 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
2338 .await
2339 .unwrap();
2340 fs.insert_file(Path::new("/a/file4"), "4".into()).await;
2341
2342 worktree_a
2343 .condition(&cx_a, |tree, _| {
2344 tree.paths()
2345 .map(|p| p.to_string_lossy())
2346 .collect::<Vec<_>>()
2347 == ["file1-renamed", "file3", "file4"]
2348 })
2349 .await;
2350 worktree_b
2351 .condition(&cx_b, |tree, _| {
2352 tree.paths()
2353 .map(|p| p.to_string_lossy())
2354 .collect::<Vec<_>>()
2355 == ["file1-renamed", "file3", "file4"]
2356 })
2357 .await;
2358 worktree_c
2359 .condition(&cx_c, |tree, _| {
2360 tree.paths()
2361 .map(|p| p.to_string_lossy())
2362 .collect::<Vec<_>>()
2363 == ["file1-renamed", "file3", "file4"]
2364 })
2365 .await;
2366
2367 // Ensure buffer files are updated as well.
2368 buffer_a
2369 .condition(&cx_a, |buf, _| {
2370 buf.file().unwrap().path().to_str() == Some("file1-renamed")
2371 })
2372 .await;
2373 buffer_b
2374 .condition(&cx_b, |buf, _| {
2375 buf.file().unwrap().path().to_str() == Some("file1-renamed")
2376 })
2377 .await;
2378 buffer_c
2379 .condition(&cx_c, |buf, _| {
2380 buf.file().unwrap().path().to_str() == Some("file1-renamed")
2381 })
2382 .await;
2383 }
2384
2385 #[gpui::test(iterations = 10)]
2386 async fn test_fs_operations(
2387 executor: Arc<Deterministic>,
2388 cx_a: &mut TestAppContext,
2389 cx_b: &mut TestAppContext,
2390 ) {
2391 executor.forbid_parking();
2392 let fs = FakeFs::new(cx_a.background());
2393
2394 // Connect to a server as 2 clients.
2395 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2396 let mut client_a = server.create_client(cx_a, "user_a").await;
2397 let mut client_b = server.create_client(cx_b, "user_b").await;
2398 server
2399 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2400 .await;
2401
2402 // Share a project as client A
2403 fs.insert_tree(
2404 "/dir",
2405 json!({
2406 "a.txt": "a-contents",
2407 "b.txt": "b-contents",
2408 }),
2409 )
2410 .await;
2411
2412 let (project_a, worktree_id) = client_a.build_local_project(fs, "/dir", cx_a).await;
2413 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2414
2415 let worktree_a =
2416 project_a.read_with(cx_a, |project, cx| project.worktrees(cx).next().unwrap());
2417 let worktree_b =
2418 project_b.read_with(cx_b, |project, cx| project.worktrees(cx).next().unwrap());
2419
2420 let entry = project_b
2421 .update(cx_b, |project, cx| {
2422 project
2423 .create_entry((worktree_id, "c.txt"), false, cx)
2424 .unwrap()
2425 })
2426 .await
2427 .unwrap();
2428 worktree_a.read_with(cx_a, |worktree, _| {
2429 assert_eq!(
2430 worktree
2431 .paths()
2432 .map(|p| p.to_string_lossy())
2433 .collect::<Vec<_>>(),
2434 ["a.txt", "b.txt", "c.txt"]
2435 );
2436 });
2437 worktree_b.read_with(cx_b, |worktree, _| {
2438 assert_eq!(
2439 worktree
2440 .paths()
2441 .map(|p| p.to_string_lossy())
2442 .collect::<Vec<_>>(),
2443 ["a.txt", "b.txt", "c.txt"]
2444 );
2445 });
2446
2447 project_b
2448 .update(cx_b, |project, cx| {
2449 project.rename_entry(entry.id, Path::new("d.txt"), cx)
2450 })
2451 .unwrap()
2452 .await
2453 .unwrap();
2454 worktree_a.read_with(cx_a, |worktree, _| {
2455 assert_eq!(
2456 worktree
2457 .paths()
2458 .map(|p| p.to_string_lossy())
2459 .collect::<Vec<_>>(),
2460 ["a.txt", "b.txt", "d.txt"]
2461 );
2462 });
2463 worktree_b.read_with(cx_b, |worktree, _| {
2464 assert_eq!(
2465 worktree
2466 .paths()
2467 .map(|p| p.to_string_lossy())
2468 .collect::<Vec<_>>(),
2469 ["a.txt", "b.txt", "d.txt"]
2470 );
2471 });
2472
2473 let dir_entry = project_b
2474 .update(cx_b, |project, cx| {
2475 project
2476 .create_entry((worktree_id, "DIR"), true, cx)
2477 .unwrap()
2478 })
2479 .await
2480 .unwrap();
2481 worktree_a.read_with(cx_a, |worktree, _| {
2482 assert_eq!(
2483 worktree
2484 .paths()
2485 .map(|p| p.to_string_lossy())
2486 .collect::<Vec<_>>(),
2487 ["DIR", "a.txt", "b.txt", "d.txt"]
2488 );
2489 });
2490 worktree_b.read_with(cx_b, |worktree, _| {
2491 assert_eq!(
2492 worktree
2493 .paths()
2494 .map(|p| p.to_string_lossy())
2495 .collect::<Vec<_>>(),
2496 ["DIR", "a.txt", "b.txt", "d.txt"]
2497 );
2498 });
2499
2500 project_b
2501 .update(cx_b, |project, cx| {
2502 project.delete_entry(dir_entry.id, cx).unwrap()
2503 })
2504 .await
2505 .unwrap();
2506 worktree_a.read_with(cx_a, |worktree, _| {
2507 assert_eq!(
2508 worktree
2509 .paths()
2510 .map(|p| p.to_string_lossy())
2511 .collect::<Vec<_>>(),
2512 ["a.txt", "b.txt", "d.txt"]
2513 );
2514 });
2515 worktree_b.read_with(cx_b, |worktree, _| {
2516 assert_eq!(
2517 worktree
2518 .paths()
2519 .map(|p| p.to_string_lossy())
2520 .collect::<Vec<_>>(),
2521 ["a.txt", "b.txt", "d.txt"]
2522 );
2523 });
2524
2525 project_b
2526 .update(cx_b, |project, cx| {
2527 project.delete_entry(entry.id, cx).unwrap()
2528 })
2529 .await
2530 .unwrap();
2531 worktree_a.read_with(cx_a, |worktree, _| {
2532 assert_eq!(
2533 worktree
2534 .paths()
2535 .map(|p| p.to_string_lossy())
2536 .collect::<Vec<_>>(),
2537 ["a.txt", "b.txt"]
2538 );
2539 });
2540 worktree_b.read_with(cx_b, |worktree, _| {
2541 assert_eq!(
2542 worktree
2543 .paths()
2544 .map(|p| p.to_string_lossy())
2545 .collect::<Vec<_>>(),
2546 ["a.txt", "b.txt"]
2547 );
2548 });
2549 }
2550
2551 #[gpui::test(iterations = 10)]
2552 async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2553 cx_a.foreground().forbid_parking();
2554 let lang_registry = Arc::new(LanguageRegistry::test());
2555 let fs = FakeFs::new(cx_a.background());
2556
2557 // Connect to a server as 2 clients.
2558 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2559 let client_a = server.create_client(cx_a, "user_a").await;
2560 let mut client_b = server.create_client(cx_b, "user_b").await;
2561 server
2562 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2563 .await;
2564
2565 // Share a project as client A
2566 fs.insert_tree(
2567 "/dir",
2568 json!({
2569 "a.txt": "a-contents",
2570 }),
2571 )
2572 .await;
2573
2574 let project_a = cx_a.update(|cx| {
2575 Project::local(
2576 client_a.clone(),
2577 client_a.user_store.clone(),
2578 lang_registry.clone(),
2579 fs.clone(),
2580 cx,
2581 )
2582 });
2583 let (worktree_a, _) = project_a
2584 .update(cx_a, |p, cx| {
2585 p.find_or_create_local_worktree("/dir", true, cx)
2586 })
2587 .await
2588 .unwrap();
2589 worktree_a
2590 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2591 .await;
2592 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2593
2594 // Join that project as client B
2595 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2596
2597 // Open a buffer as client B
2598 let buffer_b = project_b
2599 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2600 .await
2601 .unwrap();
2602
2603 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "world ")], cx));
2604 buffer_b.read_with(cx_b, |buf, _| {
2605 assert!(buf.is_dirty());
2606 assert!(!buf.has_conflict());
2607 });
2608
2609 buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
2610 buffer_b
2611 .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
2612 .await;
2613 buffer_b.read_with(cx_b, |buf, _| {
2614 assert!(!buf.has_conflict());
2615 });
2616
2617 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "hello ")], cx));
2618 buffer_b.read_with(cx_b, |buf, _| {
2619 assert!(buf.is_dirty());
2620 assert!(!buf.has_conflict());
2621 });
2622 }
2623
2624 #[gpui::test(iterations = 10)]
2625 async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2626 cx_a.foreground().forbid_parking();
2627 let lang_registry = Arc::new(LanguageRegistry::test());
2628 let fs = FakeFs::new(cx_a.background());
2629
2630 // Connect to a server as 2 clients.
2631 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2632 let client_a = server.create_client(cx_a, "user_a").await;
2633 let mut client_b = server.create_client(cx_b, "user_b").await;
2634 server
2635 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2636 .await;
2637
2638 // Share a project as client A
2639 fs.insert_tree(
2640 "/dir",
2641 json!({
2642 "a.txt": "a-contents",
2643 }),
2644 )
2645 .await;
2646
2647 let project_a = cx_a.update(|cx| {
2648 Project::local(
2649 client_a.clone(),
2650 client_a.user_store.clone(),
2651 lang_registry.clone(),
2652 fs.clone(),
2653 cx,
2654 )
2655 });
2656 let (worktree_a, _) = project_a
2657 .update(cx_a, |p, cx| {
2658 p.find_or_create_local_worktree("/dir", true, cx)
2659 })
2660 .await
2661 .unwrap();
2662 worktree_a
2663 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2664 .await;
2665 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2666
2667 // Join that project as client B
2668 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2669 let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2670
2671 // Open a buffer as client B
2672 let buffer_b = project_b
2673 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2674 .await
2675 .unwrap();
2676 buffer_b.read_with(cx_b, |buf, _| {
2677 assert!(!buf.is_dirty());
2678 assert!(!buf.has_conflict());
2679 });
2680
2681 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
2682 .await
2683 .unwrap();
2684 buffer_b
2685 .condition(&cx_b, |buf, _| {
2686 buf.text() == "new contents" && !buf.is_dirty()
2687 })
2688 .await;
2689 buffer_b.read_with(cx_b, |buf, _| {
2690 assert!(!buf.has_conflict());
2691 });
2692 }
2693
2694 #[gpui::test(iterations = 10)]
2695 async fn test_editing_while_guest_opens_buffer(
2696 cx_a: &mut TestAppContext,
2697 cx_b: &mut TestAppContext,
2698 ) {
2699 cx_a.foreground().forbid_parking();
2700 let lang_registry = Arc::new(LanguageRegistry::test());
2701 let fs = FakeFs::new(cx_a.background());
2702
2703 // Connect to a server as 2 clients.
2704 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2705 let client_a = server.create_client(cx_a, "user_a").await;
2706 let mut client_b = server.create_client(cx_b, "user_b").await;
2707 server
2708 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2709 .await;
2710
2711 // Share a project as client A
2712 fs.insert_tree(
2713 "/dir",
2714 json!({
2715 "a.txt": "a-contents",
2716 }),
2717 )
2718 .await;
2719 let project_a = cx_a.update(|cx| {
2720 Project::local(
2721 client_a.clone(),
2722 client_a.user_store.clone(),
2723 lang_registry.clone(),
2724 fs.clone(),
2725 cx,
2726 )
2727 });
2728 let (worktree_a, _) = project_a
2729 .update(cx_a, |p, cx| {
2730 p.find_or_create_local_worktree("/dir", true, cx)
2731 })
2732 .await
2733 .unwrap();
2734 worktree_a
2735 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2736 .await;
2737 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2738
2739 // Join that project as client B
2740 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2741
2742 // Open a buffer as client A
2743 let buffer_a = project_a
2744 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2745 .await
2746 .unwrap();
2747
2748 // Start opening the same buffer as client B
2749 let buffer_b = cx_b
2750 .background()
2751 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2752
2753 // Edit the buffer as client A while client B is still opening it.
2754 cx_b.background().simulate_random_delay().await;
2755 buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "X")], cx));
2756 cx_b.background().simulate_random_delay().await;
2757 buffer_a.update(cx_a, |buf, cx| buf.edit([(1..1, "Y")], cx));
2758
2759 let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2760 let buffer_b = buffer_b.await.unwrap();
2761 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2762 }
2763
2764 #[gpui::test(iterations = 10)]
2765 async fn test_leaving_worktree_while_opening_buffer(
2766 cx_a: &mut TestAppContext,
2767 cx_b: &mut TestAppContext,
2768 ) {
2769 cx_a.foreground().forbid_parking();
2770 let lang_registry = Arc::new(LanguageRegistry::test());
2771 let fs = FakeFs::new(cx_a.background());
2772
2773 // Connect to a server as 2 clients.
2774 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2775 let client_a = server.create_client(cx_a, "user_a").await;
2776 let mut client_b = server.create_client(cx_b, "user_b").await;
2777 server
2778 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2779 .await;
2780
2781 // Share a project as client A
2782 fs.insert_tree(
2783 "/dir",
2784 json!({
2785 "a.txt": "a-contents",
2786 }),
2787 )
2788 .await;
2789 let project_a = cx_a.update(|cx| {
2790 Project::local(
2791 client_a.clone(),
2792 client_a.user_store.clone(),
2793 lang_registry.clone(),
2794 fs.clone(),
2795 cx,
2796 )
2797 });
2798 let (worktree_a, _) = project_a
2799 .update(cx_a, |p, cx| {
2800 p.find_or_create_local_worktree("/dir", true, cx)
2801 })
2802 .await
2803 .unwrap();
2804 worktree_a
2805 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2806 .await;
2807 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2808
2809 // Join that project as client B
2810 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2811
2812 // See that a guest has joined as client A.
2813 project_a
2814 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2815 .await;
2816
2817 // Begin opening a buffer as client B, but leave the project before the open completes.
2818 let buffer_b = cx_b
2819 .background()
2820 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2821 cx_b.update(|_| {
2822 drop(client_b.project.take());
2823 drop(project_b);
2824 });
2825 drop(buffer_b);
2826
2827 // See that the guest has left.
2828 project_a
2829 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2830 .await;
2831 }
2832
2833 #[gpui::test(iterations = 10)]
2834 async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2835 cx_a.foreground().forbid_parking();
2836 let lang_registry = Arc::new(LanguageRegistry::test());
2837 let fs = FakeFs::new(cx_a.background());
2838
2839 // Connect to a server as 2 clients.
2840 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2841 let client_a = server.create_client(cx_a, "user_a").await;
2842 let mut client_b = server.create_client(cx_b, "user_b").await;
2843 server
2844 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2845 .await;
2846
2847 // Share a project as client A
2848 fs.insert_tree(
2849 "/a",
2850 json!({
2851 "a.txt": "a-contents",
2852 "b.txt": "b-contents",
2853 }),
2854 )
2855 .await;
2856 let project_a = cx_a.update(|cx| {
2857 Project::local(
2858 client_a.clone(),
2859 client_a.user_store.clone(),
2860 lang_registry.clone(),
2861 fs.clone(),
2862 cx,
2863 )
2864 });
2865 let (worktree_a, _) = project_a
2866 .update(cx_a, |p, cx| {
2867 p.find_or_create_local_worktree("/a", true, cx)
2868 })
2869 .await
2870 .unwrap();
2871 worktree_a
2872 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2873 .await;
2874
2875 // Join that project as client B
2876 let _project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2877
2878 // Client A sees that a guest has joined.
2879 project_a
2880 .condition(cx_a, |p, _| p.collaborators().len() == 1)
2881 .await;
2882
2883 // Drop client B's connection and ensure client A observes client B leaving the project.
2884 client_b.disconnect(&cx_b.to_async()).unwrap();
2885 project_a
2886 .condition(cx_a, |p, _| p.collaborators().len() == 0)
2887 .await;
2888
2889 // Rejoin the project as client B
2890 let _project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2891
2892 // Client A sees that a guest has re-joined.
2893 project_a
2894 .condition(cx_a, |p, _| p.collaborators().len() == 1)
2895 .await;
2896
2897 // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2898 client_b.wait_for_current_user(cx_b).await;
2899 server.disconnect_client(client_b.current_user_id(cx_b));
2900 cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
2901 project_a
2902 .condition(cx_a, |p, _| p.collaborators().len() == 0)
2903 .await;
2904 }
2905
2906 #[gpui::test(iterations = 10)]
2907 async fn test_collaborating_with_diagnostics(
2908 deterministic: Arc<Deterministic>,
2909 cx_a: &mut TestAppContext,
2910 cx_b: &mut TestAppContext,
2911 cx_c: &mut TestAppContext,
2912 ) {
2913 deterministic.forbid_parking();
2914 let lang_registry = Arc::new(LanguageRegistry::test());
2915 let fs = FakeFs::new(cx_a.background());
2916
2917 // Set up a fake language server.
2918 let mut language = Language::new(
2919 LanguageConfig {
2920 name: "Rust".into(),
2921 path_suffixes: vec!["rs".to_string()],
2922 ..Default::default()
2923 },
2924 Some(tree_sitter_rust::language()),
2925 );
2926 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2927 lang_registry.add(Arc::new(language));
2928
2929 // Connect to a server as 2 clients.
2930 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2931 let client_a = server.create_client(cx_a, "user_a").await;
2932 let mut client_b = server.create_client(cx_b, "user_b").await;
2933 let mut client_c = server.create_client(cx_c, "user_c").await;
2934 server
2935 .make_contacts(vec![
2936 (&client_a, cx_a),
2937 (&client_b, cx_b),
2938 (&client_c, cx_c),
2939 ])
2940 .await;
2941
2942 // Share a project as client A
2943 fs.insert_tree(
2944 "/a",
2945 json!({
2946 "a.rs": "let one = two",
2947 "other.rs": "",
2948 }),
2949 )
2950 .await;
2951 let project_a = cx_a.update(|cx| {
2952 Project::local(
2953 client_a.clone(),
2954 client_a.user_store.clone(),
2955 lang_registry.clone(),
2956 fs.clone(),
2957 cx,
2958 )
2959 });
2960 let (worktree_a, _) = project_a
2961 .update(cx_a, |p, cx| {
2962 p.find_or_create_local_worktree("/a", true, cx)
2963 })
2964 .await
2965 .unwrap();
2966 worktree_a
2967 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2968 .await;
2969 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2970 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2971
2972 // Cause the language server to start.
2973 let _buffer = cx_a
2974 .background()
2975 .spawn(project_a.update(cx_a, |project, cx| {
2976 project.open_buffer(
2977 ProjectPath {
2978 worktree_id,
2979 path: Path::new("other.rs").into(),
2980 },
2981 cx,
2982 )
2983 }))
2984 .await
2985 .unwrap();
2986
2987 // Join the worktree as client B.
2988 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2989
2990 // Simulate a language server reporting errors for a file.
2991 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2992 fake_language_server
2993 .receive_notification::<lsp::notification::DidOpenTextDocument>()
2994 .await;
2995 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2996 lsp::PublishDiagnosticsParams {
2997 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2998 version: None,
2999 diagnostics: vec![lsp::Diagnostic {
3000 severity: Some(lsp::DiagnosticSeverity::ERROR),
3001 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
3002 message: "message 1".to_string(),
3003 ..Default::default()
3004 }],
3005 },
3006 );
3007
3008 // Wait for server to see the diagnostics update.
3009 deterministic.run_until_parked();
3010 {
3011 let store = server.store.read().await;
3012 let project = store.project(project_id).unwrap();
3013 let worktree = project.worktrees.get(&worktree_id.to_proto()).unwrap();
3014 assert!(!worktree.diagnostic_summaries.is_empty());
3015 }
3016
3017 // Ensure client B observes the new diagnostics.
3018 project_b.read_with(cx_b, |project, cx| {
3019 assert_eq!(
3020 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
3021 &[(
3022 ProjectPath {
3023 worktree_id,
3024 path: Arc::from(Path::new("a.rs")),
3025 },
3026 DiagnosticSummary {
3027 error_count: 1,
3028 warning_count: 0,
3029 ..Default::default()
3030 },
3031 )]
3032 )
3033 });
3034
3035 // Join project as client C and observe the diagnostics.
3036 let project_c = client_c.build_remote_project(&project_a, cx_a, cx_c).await;
3037 project_c.read_with(cx_c, |project, cx| {
3038 assert_eq!(
3039 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
3040 &[(
3041 ProjectPath {
3042 worktree_id,
3043 path: Arc::from(Path::new("a.rs")),
3044 },
3045 DiagnosticSummary {
3046 error_count: 1,
3047 warning_count: 0,
3048 ..Default::default()
3049 },
3050 )]
3051 )
3052 });
3053
3054 // Simulate a language server reporting more errors for a file.
3055 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
3056 lsp::PublishDiagnosticsParams {
3057 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
3058 version: None,
3059 diagnostics: vec![
3060 lsp::Diagnostic {
3061 severity: Some(lsp::DiagnosticSeverity::ERROR),
3062 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
3063 message: "message 1".to_string(),
3064 ..Default::default()
3065 },
3066 lsp::Diagnostic {
3067 severity: Some(lsp::DiagnosticSeverity::WARNING),
3068 range: lsp::Range::new(
3069 lsp::Position::new(0, 10),
3070 lsp::Position::new(0, 13),
3071 ),
3072 message: "message 2".to_string(),
3073 ..Default::default()
3074 },
3075 ],
3076 },
3077 );
3078
3079 // Clients B and C get the updated summaries
3080 deterministic.run_until_parked();
3081 project_b.read_with(cx_b, |project, cx| {
3082 assert_eq!(
3083 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
3084 [(
3085 ProjectPath {
3086 worktree_id,
3087 path: Arc::from(Path::new("a.rs")),
3088 },
3089 DiagnosticSummary {
3090 error_count: 1,
3091 warning_count: 1,
3092 ..Default::default()
3093 },
3094 )]
3095 );
3096 });
3097 project_c.read_with(cx_c, |project, cx| {
3098 assert_eq!(
3099 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
3100 [(
3101 ProjectPath {
3102 worktree_id,
3103 path: Arc::from(Path::new("a.rs")),
3104 },
3105 DiagnosticSummary {
3106 error_count: 1,
3107 warning_count: 1,
3108 ..Default::default()
3109 },
3110 )]
3111 );
3112 });
3113
3114 // Open the file with the errors on client B. They should be present.
3115 let buffer_b = cx_b
3116 .background()
3117 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3118 .await
3119 .unwrap();
3120
3121 buffer_b.read_with(cx_b, |buffer, _| {
3122 assert_eq!(
3123 buffer
3124 .snapshot()
3125 .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
3126 .map(|entry| entry)
3127 .collect::<Vec<_>>(),
3128 &[
3129 DiagnosticEntry {
3130 range: Point::new(0, 4)..Point::new(0, 7),
3131 diagnostic: Diagnostic {
3132 group_id: 0,
3133 message: "message 1".to_string(),
3134 severity: lsp::DiagnosticSeverity::ERROR,
3135 is_primary: true,
3136 ..Default::default()
3137 }
3138 },
3139 DiagnosticEntry {
3140 range: Point::new(0, 10)..Point::new(0, 13),
3141 diagnostic: Diagnostic {
3142 group_id: 1,
3143 severity: lsp::DiagnosticSeverity::WARNING,
3144 message: "message 2".to_string(),
3145 is_primary: true,
3146 ..Default::default()
3147 }
3148 }
3149 ]
3150 );
3151 });
3152
3153 // Simulate a language server reporting no errors for a file.
3154 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
3155 lsp::PublishDiagnosticsParams {
3156 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
3157 version: None,
3158 diagnostics: vec![],
3159 },
3160 );
3161 deterministic.run_until_parked();
3162 project_a.read_with(cx_a, |project, cx| {
3163 assert_eq!(project.diagnostic_summaries(cx).collect::<Vec<_>>(), [])
3164 });
3165 project_b.read_with(cx_b, |project, cx| {
3166 assert_eq!(project.diagnostic_summaries(cx).collect::<Vec<_>>(), [])
3167 });
3168 project_c.read_with(cx_c, |project, cx| {
3169 assert_eq!(project.diagnostic_summaries(cx).collect::<Vec<_>>(), [])
3170 });
3171 }
3172
3173 #[gpui::test(iterations = 10)]
3174 async fn test_collaborating_with_completion(
3175 cx_a: &mut TestAppContext,
3176 cx_b: &mut TestAppContext,
3177 ) {
3178 cx_a.foreground().forbid_parking();
3179 let lang_registry = Arc::new(LanguageRegistry::test());
3180 let fs = FakeFs::new(cx_a.background());
3181
3182 // Set up a fake language server.
3183 let mut language = Language::new(
3184 LanguageConfig {
3185 name: "Rust".into(),
3186 path_suffixes: vec!["rs".to_string()],
3187 ..Default::default()
3188 },
3189 Some(tree_sitter_rust::language()),
3190 );
3191 let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
3192 capabilities: lsp::ServerCapabilities {
3193 completion_provider: Some(lsp::CompletionOptions {
3194 trigger_characters: Some(vec![".".to_string()]),
3195 ..Default::default()
3196 }),
3197 ..Default::default()
3198 },
3199 ..Default::default()
3200 });
3201 lang_registry.add(Arc::new(language));
3202
3203 // Connect to a server as 2 clients.
3204 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3205 let client_a = server.create_client(cx_a, "user_a").await;
3206 let mut client_b = server.create_client(cx_b, "user_b").await;
3207 server
3208 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3209 .await;
3210
3211 // Share a project as client A
3212 fs.insert_tree(
3213 "/a",
3214 json!({
3215 "main.rs": "fn main() { a }",
3216 "other.rs": "",
3217 }),
3218 )
3219 .await;
3220 let project_a = cx_a.update(|cx| {
3221 Project::local(
3222 client_a.clone(),
3223 client_a.user_store.clone(),
3224 lang_registry.clone(),
3225 fs.clone(),
3226 cx,
3227 )
3228 });
3229 let (worktree_a, _) = project_a
3230 .update(cx_a, |p, cx| {
3231 p.find_or_create_local_worktree("/a", true, cx)
3232 })
3233 .await
3234 .unwrap();
3235 worktree_a
3236 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3237 .await;
3238 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3239
3240 // Join the worktree as client B.
3241 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3242
3243 // Open a file in an editor as the guest.
3244 let buffer_b = project_b
3245 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
3246 .await
3247 .unwrap();
3248 let (window_b, _) = cx_b.add_window(|_| EmptyView);
3249 let editor_b = cx_b.add_view(window_b, |cx| {
3250 Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
3251 });
3252
3253 let fake_language_server = fake_language_servers.next().await.unwrap();
3254 buffer_b
3255 .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
3256 .await;
3257
3258 // Type a completion trigger character as the guest.
3259 editor_b.update(cx_b, |editor, cx| {
3260 editor.change_selections(None, cx, |s| s.select_ranges([13..13]));
3261 editor.handle_input(&Input(".".into()), cx);
3262 cx.focus(&editor_b);
3263 });
3264
3265 // Receive a completion request as the host's language server.
3266 // Return some completions from the host's language server.
3267 cx_a.foreground().start_waiting();
3268 fake_language_server
3269 .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
3270 assert_eq!(
3271 params.text_document_position.text_document.uri,
3272 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3273 );
3274 assert_eq!(
3275 params.text_document_position.position,
3276 lsp::Position::new(0, 14),
3277 );
3278
3279 Ok(Some(lsp::CompletionResponse::Array(vec![
3280 lsp::CompletionItem {
3281 label: "first_method(…)".into(),
3282 detail: Some("fn(&mut self, B) -> C".into()),
3283 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3284 new_text: "first_method($1)".to_string(),
3285 range: lsp::Range::new(
3286 lsp::Position::new(0, 14),
3287 lsp::Position::new(0, 14),
3288 ),
3289 })),
3290 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3291 ..Default::default()
3292 },
3293 lsp::CompletionItem {
3294 label: "second_method(…)".into(),
3295 detail: Some("fn(&mut self, C) -> D<E>".into()),
3296 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3297 new_text: "second_method()".to_string(),
3298 range: lsp::Range::new(
3299 lsp::Position::new(0, 14),
3300 lsp::Position::new(0, 14),
3301 ),
3302 })),
3303 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3304 ..Default::default()
3305 },
3306 ])))
3307 })
3308 .next()
3309 .await
3310 .unwrap();
3311 cx_a.foreground().finish_waiting();
3312
3313 // Open the buffer on the host.
3314 let buffer_a = project_a
3315 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
3316 .await
3317 .unwrap();
3318 buffer_a
3319 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
3320 .await;
3321
3322 // Confirm a completion on the guest.
3323 editor_b
3324 .condition(&cx_b, |editor, _| editor.context_menu_visible())
3325 .await;
3326 editor_b.update(cx_b, |editor, cx| {
3327 editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
3328 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
3329 });
3330
3331 // Return a resolved completion from the host's language server.
3332 // The resolved completion has an additional text edit.
3333 fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
3334 |params, _| async move {
3335 assert_eq!(params.label, "first_method(…)");
3336 Ok(lsp::CompletionItem {
3337 label: "first_method(…)".into(),
3338 detail: Some("fn(&mut self, B) -> C".into()),
3339 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3340 new_text: "first_method($1)".to_string(),
3341 range: lsp::Range::new(
3342 lsp::Position::new(0, 14),
3343 lsp::Position::new(0, 14),
3344 ),
3345 })),
3346 additional_text_edits: Some(vec![lsp::TextEdit {
3347 new_text: "use d::SomeTrait;\n".to_string(),
3348 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
3349 }]),
3350 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3351 ..Default::default()
3352 })
3353 },
3354 );
3355
3356 // The additional edit is applied.
3357 buffer_a
3358 .condition(&cx_a, |buffer, _| {
3359 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3360 })
3361 .await;
3362 buffer_b
3363 .condition(&cx_b, |buffer, _| {
3364 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3365 })
3366 .await;
3367 }
3368
3369 #[gpui::test(iterations = 10)]
3370 async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3371 cx_a.foreground().forbid_parking();
3372 let lang_registry = Arc::new(LanguageRegistry::test());
3373 let fs = FakeFs::new(cx_a.background());
3374
3375 // Connect to a server as 2 clients.
3376 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3377 let client_a = server.create_client(cx_a, "user_a").await;
3378 let mut client_b = server.create_client(cx_b, "user_b").await;
3379 server
3380 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3381 .await;
3382
3383 // Share a project as client A
3384 fs.insert_tree(
3385 "/a",
3386 json!({
3387 "a.rs": "let one = 1;",
3388 }),
3389 )
3390 .await;
3391 let project_a = cx_a.update(|cx| {
3392 Project::local(
3393 client_a.clone(),
3394 client_a.user_store.clone(),
3395 lang_registry.clone(),
3396 fs.clone(),
3397 cx,
3398 )
3399 });
3400 let (worktree_a, _) = project_a
3401 .update(cx_a, |p, cx| {
3402 p.find_or_create_local_worktree("/a", true, cx)
3403 })
3404 .await
3405 .unwrap();
3406 worktree_a
3407 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3408 .await;
3409 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3410 let buffer_a = project_a
3411 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
3412 .await
3413 .unwrap();
3414
3415 // Join the worktree as client B.
3416 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3417
3418 let buffer_b = cx_b
3419 .background()
3420 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3421 .await
3422 .unwrap();
3423 buffer_b.update(cx_b, |buffer, cx| {
3424 buffer.edit([(4..7, "six")], cx);
3425 buffer.edit([(10..11, "6")], cx);
3426 assert_eq!(buffer.text(), "let six = 6;");
3427 assert!(buffer.is_dirty());
3428 assert!(!buffer.has_conflict());
3429 });
3430 buffer_a
3431 .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
3432 .await;
3433
3434 fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
3435 .await
3436 .unwrap();
3437 buffer_a
3438 .condition(cx_a, |buffer, _| buffer.has_conflict())
3439 .await;
3440 buffer_b
3441 .condition(cx_b, |buffer, _| buffer.has_conflict())
3442 .await;
3443
3444 project_b
3445 .update(cx_b, |project, cx| {
3446 project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
3447 })
3448 .await
3449 .unwrap();
3450 buffer_a.read_with(cx_a, |buffer, _| {
3451 assert_eq!(buffer.text(), "let seven = 7;");
3452 assert!(!buffer.is_dirty());
3453 assert!(!buffer.has_conflict());
3454 });
3455 buffer_b.read_with(cx_b, |buffer, _| {
3456 assert_eq!(buffer.text(), "let seven = 7;");
3457 assert!(!buffer.is_dirty());
3458 assert!(!buffer.has_conflict());
3459 });
3460
3461 buffer_a.update(cx_a, |buffer, cx| {
3462 // Undoing on the host is a no-op when the reload was initiated by the guest.
3463 buffer.undo(cx);
3464 assert_eq!(buffer.text(), "let seven = 7;");
3465 assert!(!buffer.is_dirty());
3466 assert!(!buffer.has_conflict());
3467 });
3468 buffer_b.update(cx_b, |buffer, cx| {
3469 // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
3470 buffer.undo(cx);
3471 assert_eq!(buffer.text(), "let six = 6;");
3472 assert!(buffer.is_dirty());
3473 assert!(!buffer.has_conflict());
3474 });
3475 }
3476
3477 #[gpui::test(iterations = 10)]
3478 async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3479 cx_a.foreground().forbid_parking();
3480 let lang_registry = Arc::new(LanguageRegistry::test());
3481 let fs = FakeFs::new(cx_a.background());
3482
3483 // Set up a fake language server.
3484 let mut language = Language::new(
3485 LanguageConfig {
3486 name: "Rust".into(),
3487 path_suffixes: vec!["rs".to_string()],
3488 ..Default::default()
3489 },
3490 Some(tree_sitter_rust::language()),
3491 );
3492 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3493 lang_registry.add(Arc::new(language));
3494
3495 // Connect to a server as 2 clients.
3496 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3497 let client_a = server.create_client(cx_a, "user_a").await;
3498 let mut client_b = server.create_client(cx_b, "user_b").await;
3499 server
3500 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3501 .await;
3502
3503 // Share a project as client A
3504 fs.insert_tree(
3505 "/a",
3506 json!({
3507 "a.rs": "let one = two",
3508 }),
3509 )
3510 .await;
3511 let project_a = cx_a.update(|cx| {
3512 Project::local(
3513 client_a.clone(),
3514 client_a.user_store.clone(),
3515 lang_registry.clone(),
3516 fs.clone(),
3517 cx,
3518 )
3519 });
3520 let (worktree_a, _) = project_a
3521 .update(cx_a, |p, cx| {
3522 p.find_or_create_local_worktree("/a", true, cx)
3523 })
3524 .await
3525 .unwrap();
3526 worktree_a
3527 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3528 .await;
3529 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3530
3531 // Join the project as client B.
3532 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3533
3534 let buffer_b = cx_b
3535 .background()
3536 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3537 .await
3538 .unwrap();
3539
3540 let fake_language_server = fake_language_servers.next().await.unwrap();
3541 fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
3542 Ok(Some(vec![
3543 lsp::TextEdit {
3544 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
3545 new_text: "h".to_string(),
3546 },
3547 lsp::TextEdit {
3548 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
3549 new_text: "y".to_string(),
3550 },
3551 ]))
3552 });
3553
3554 project_b
3555 .update(cx_b, |project, cx| {
3556 project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
3557 })
3558 .await
3559 .unwrap();
3560 assert_eq!(
3561 buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
3562 "let honey = two"
3563 );
3564 }
3565
3566 #[gpui::test(iterations = 10)]
3567 async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3568 cx_a.foreground().forbid_parking();
3569 let lang_registry = Arc::new(LanguageRegistry::test());
3570 let fs = FakeFs::new(cx_a.background());
3571 fs.insert_tree(
3572 "/root-1",
3573 json!({
3574 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
3575 }),
3576 )
3577 .await;
3578 fs.insert_tree(
3579 "/root-2",
3580 json!({
3581 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
3582 }),
3583 )
3584 .await;
3585
3586 // Set up a fake language server.
3587 let mut language = Language::new(
3588 LanguageConfig {
3589 name: "Rust".into(),
3590 path_suffixes: vec!["rs".to_string()],
3591 ..Default::default()
3592 },
3593 Some(tree_sitter_rust::language()),
3594 );
3595 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3596 lang_registry.add(Arc::new(language));
3597
3598 // Connect to a server as 2 clients.
3599 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3600 let client_a = server.create_client(cx_a, "user_a").await;
3601 let mut client_b = server.create_client(cx_b, "user_b").await;
3602 server
3603 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3604 .await;
3605
3606 // Share a project as client A
3607 let project_a = cx_a.update(|cx| {
3608 Project::local(
3609 client_a.clone(),
3610 client_a.user_store.clone(),
3611 lang_registry.clone(),
3612 fs.clone(),
3613 cx,
3614 )
3615 });
3616 let (worktree_a, _) = project_a
3617 .update(cx_a, |p, cx| {
3618 p.find_or_create_local_worktree("/root-1", true, cx)
3619 })
3620 .await
3621 .unwrap();
3622 worktree_a
3623 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3624 .await;
3625 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3626
3627 // Join the project as client B.
3628 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3629
3630 // Open the file on client B.
3631 let buffer_b = cx_b
3632 .background()
3633 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3634 .await
3635 .unwrap();
3636
3637 // Request the definition of a symbol as the guest.
3638 let fake_language_server = fake_language_servers.next().await.unwrap();
3639 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3640 |_, _| async move {
3641 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3642 lsp::Location::new(
3643 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3644 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3645 ),
3646 )))
3647 },
3648 );
3649
3650 let definitions_1 = project_b
3651 .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
3652 .await
3653 .unwrap();
3654 cx_b.read(|cx| {
3655 assert_eq!(definitions_1.len(), 1);
3656 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3657 let target_buffer = definitions_1[0].buffer.read(cx);
3658 assert_eq!(
3659 target_buffer.text(),
3660 "const TWO: usize = 2;\nconst THREE: usize = 3;"
3661 );
3662 assert_eq!(
3663 definitions_1[0].range.to_point(target_buffer),
3664 Point::new(0, 6)..Point::new(0, 9)
3665 );
3666 });
3667
3668 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
3669 // the previous call to `definition`.
3670 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3671 |_, _| async move {
3672 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3673 lsp::Location::new(
3674 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3675 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
3676 ),
3677 )))
3678 },
3679 );
3680
3681 let definitions_2 = project_b
3682 .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
3683 .await
3684 .unwrap();
3685 cx_b.read(|cx| {
3686 assert_eq!(definitions_2.len(), 1);
3687 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3688 let target_buffer = definitions_2[0].buffer.read(cx);
3689 assert_eq!(
3690 target_buffer.text(),
3691 "const TWO: usize = 2;\nconst THREE: usize = 3;"
3692 );
3693 assert_eq!(
3694 definitions_2[0].range.to_point(target_buffer),
3695 Point::new(1, 6)..Point::new(1, 11)
3696 );
3697 });
3698 assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
3699 }
3700
3701 #[gpui::test(iterations = 10)]
3702 async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3703 cx_a.foreground().forbid_parking();
3704 let lang_registry = Arc::new(LanguageRegistry::test());
3705 let fs = FakeFs::new(cx_a.background());
3706 fs.insert_tree(
3707 "/root-1",
3708 json!({
3709 "one.rs": "const ONE: usize = 1;",
3710 "two.rs": "const TWO: usize = one::ONE + one::ONE;",
3711 }),
3712 )
3713 .await;
3714 fs.insert_tree(
3715 "/root-2",
3716 json!({
3717 "three.rs": "const THREE: usize = two::TWO + one::ONE;",
3718 }),
3719 )
3720 .await;
3721
3722 // Set up a fake language server.
3723 let mut language = Language::new(
3724 LanguageConfig {
3725 name: "Rust".into(),
3726 path_suffixes: vec!["rs".to_string()],
3727 ..Default::default()
3728 },
3729 Some(tree_sitter_rust::language()),
3730 );
3731 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3732 lang_registry.add(Arc::new(language));
3733
3734 // Connect to a server as 2 clients.
3735 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3736 let client_a = server.create_client(cx_a, "user_a").await;
3737 let mut client_b = server.create_client(cx_b, "user_b").await;
3738 server
3739 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3740 .await;
3741
3742 // Share a project as client A
3743 let project_a = cx_a.update(|cx| {
3744 Project::local(
3745 client_a.clone(),
3746 client_a.user_store.clone(),
3747 lang_registry.clone(),
3748 fs.clone(),
3749 cx,
3750 )
3751 });
3752 let (worktree_a, _) = project_a
3753 .update(cx_a, |p, cx| {
3754 p.find_or_create_local_worktree("/root-1", true, cx)
3755 })
3756 .await
3757 .unwrap();
3758 worktree_a
3759 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3760 .await;
3761 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3762
3763 // Join the worktree as client B.
3764 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3765
3766 // Open the file on client B.
3767 let buffer_b = cx_b
3768 .background()
3769 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3770 .await
3771 .unwrap();
3772
3773 // Request references to a symbol as the guest.
3774 let fake_language_server = fake_language_servers.next().await.unwrap();
3775 fake_language_server.handle_request::<lsp::request::References, _, _>(
3776 |params, _| async move {
3777 assert_eq!(
3778 params.text_document_position.text_document.uri.as_str(),
3779 "file:///root-1/one.rs"
3780 );
3781 Ok(Some(vec![
3782 lsp::Location {
3783 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3784 range: lsp::Range::new(
3785 lsp::Position::new(0, 24),
3786 lsp::Position::new(0, 27),
3787 ),
3788 },
3789 lsp::Location {
3790 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3791 range: lsp::Range::new(
3792 lsp::Position::new(0, 35),
3793 lsp::Position::new(0, 38),
3794 ),
3795 },
3796 lsp::Location {
3797 uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3798 range: lsp::Range::new(
3799 lsp::Position::new(0, 37),
3800 lsp::Position::new(0, 40),
3801 ),
3802 },
3803 ]))
3804 },
3805 );
3806
3807 let references = project_b
3808 .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3809 .await
3810 .unwrap();
3811 cx_b.read(|cx| {
3812 assert_eq!(references.len(), 3);
3813 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3814
3815 let two_buffer = references[0].buffer.read(cx);
3816 let three_buffer = references[2].buffer.read(cx);
3817 assert_eq!(
3818 two_buffer.file().unwrap().path().as_ref(),
3819 Path::new("two.rs")
3820 );
3821 assert_eq!(references[1].buffer, references[0].buffer);
3822 assert_eq!(
3823 three_buffer.file().unwrap().full_path(cx),
3824 Path::new("three.rs")
3825 );
3826
3827 assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3828 assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3829 assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3830 });
3831 }
3832
3833 #[gpui::test(iterations = 10)]
3834 async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3835 cx_a.foreground().forbid_parking();
3836 let lang_registry = Arc::new(LanguageRegistry::test());
3837 let fs = FakeFs::new(cx_a.background());
3838 fs.insert_tree(
3839 "/root-1",
3840 json!({
3841 "a": "hello world",
3842 "b": "goodnight moon",
3843 "c": "a world of goo",
3844 "d": "world champion of clown world",
3845 }),
3846 )
3847 .await;
3848 fs.insert_tree(
3849 "/root-2",
3850 json!({
3851 "e": "disney world is fun",
3852 }),
3853 )
3854 .await;
3855
3856 // Connect to a server as 2 clients.
3857 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3858 let client_a = server.create_client(cx_a, "user_a").await;
3859 let mut client_b = server.create_client(cx_b, "user_b").await;
3860 server
3861 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3862 .await;
3863
3864 // Share a project as client A
3865 let project_a = cx_a.update(|cx| {
3866 Project::local(
3867 client_a.clone(),
3868 client_a.user_store.clone(),
3869 lang_registry.clone(),
3870 fs.clone(),
3871 cx,
3872 )
3873 });
3874
3875 let (worktree_1, _) = project_a
3876 .update(cx_a, |p, cx| {
3877 p.find_or_create_local_worktree("/root-1", true, cx)
3878 })
3879 .await
3880 .unwrap();
3881 worktree_1
3882 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3883 .await;
3884 let (worktree_2, _) = project_a
3885 .update(cx_a, |p, cx| {
3886 p.find_or_create_local_worktree("/root-2", true, cx)
3887 })
3888 .await
3889 .unwrap();
3890 worktree_2
3891 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3892 .await;
3893
3894 // Join the worktree as client B.
3895 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3896 let results = project_b
3897 .update(cx_b, |project, cx| {
3898 project.search(SearchQuery::text("world", false, false), cx)
3899 })
3900 .await
3901 .unwrap();
3902
3903 let mut ranges_by_path = results
3904 .into_iter()
3905 .map(|(buffer, ranges)| {
3906 buffer.read_with(cx_b, |buffer, cx| {
3907 let path = buffer.file().unwrap().full_path(cx);
3908 let offset_ranges = ranges
3909 .into_iter()
3910 .map(|range| range.to_offset(buffer))
3911 .collect::<Vec<_>>();
3912 (path, offset_ranges)
3913 })
3914 })
3915 .collect::<Vec<_>>();
3916 ranges_by_path.sort_by_key(|(path, _)| path.clone());
3917
3918 assert_eq!(
3919 ranges_by_path,
3920 &[
3921 (PathBuf::from("root-1/a"), vec![6..11]),
3922 (PathBuf::from("root-1/c"), vec![2..7]),
3923 (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3924 (PathBuf::from("root-2/e"), vec![7..12]),
3925 ]
3926 );
3927 }
3928
3929 #[gpui::test(iterations = 10)]
3930 async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3931 cx_a.foreground().forbid_parking();
3932 let lang_registry = Arc::new(LanguageRegistry::test());
3933 let fs = FakeFs::new(cx_a.background());
3934 fs.insert_tree(
3935 "/root-1",
3936 json!({
3937 "main.rs": "fn double(number: i32) -> i32 { number + number }",
3938 }),
3939 )
3940 .await;
3941
3942 // Set up a fake language server.
3943 let mut language = Language::new(
3944 LanguageConfig {
3945 name: "Rust".into(),
3946 path_suffixes: vec!["rs".to_string()],
3947 ..Default::default()
3948 },
3949 Some(tree_sitter_rust::language()),
3950 );
3951 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3952 lang_registry.add(Arc::new(language));
3953
3954 // Connect to a server as 2 clients.
3955 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3956 let client_a = server.create_client(cx_a, "user_a").await;
3957 let mut client_b = server.create_client(cx_b, "user_b").await;
3958 server
3959 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3960 .await;
3961
3962 // Share a project as client A
3963 let project_a = cx_a.update(|cx| {
3964 Project::local(
3965 client_a.clone(),
3966 client_a.user_store.clone(),
3967 lang_registry.clone(),
3968 fs.clone(),
3969 cx,
3970 )
3971 });
3972 let (worktree_a, _) = project_a
3973 .update(cx_a, |p, cx| {
3974 p.find_or_create_local_worktree("/root-1", true, cx)
3975 })
3976 .await
3977 .unwrap();
3978 worktree_a
3979 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3980 .await;
3981 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3982
3983 // Join the worktree as client B.
3984 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3985
3986 // Open the file on client B.
3987 let buffer_b = cx_b
3988 .background()
3989 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
3990 .await
3991 .unwrap();
3992
3993 // Request document highlights as the guest.
3994 let fake_language_server = fake_language_servers.next().await.unwrap();
3995 fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
3996 |params, _| async move {
3997 assert_eq!(
3998 params
3999 .text_document_position_params
4000 .text_document
4001 .uri
4002 .as_str(),
4003 "file:///root-1/main.rs"
4004 );
4005 assert_eq!(
4006 params.text_document_position_params.position,
4007 lsp::Position::new(0, 34)
4008 );
4009 Ok(Some(vec![
4010 lsp::DocumentHighlight {
4011 kind: Some(lsp::DocumentHighlightKind::WRITE),
4012 range: lsp::Range::new(
4013 lsp::Position::new(0, 10),
4014 lsp::Position::new(0, 16),
4015 ),
4016 },
4017 lsp::DocumentHighlight {
4018 kind: Some(lsp::DocumentHighlightKind::READ),
4019 range: lsp::Range::new(
4020 lsp::Position::new(0, 32),
4021 lsp::Position::new(0, 38),
4022 ),
4023 },
4024 lsp::DocumentHighlight {
4025 kind: Some(lsp::DocumentHighlightKind::READ),
4026 range: lsp::Range::new(
4027 lsp::Position::new(0, 41),
4028 lsp::Position::new(0, 47),
4029 ),
4030 },
4031 ]))
4032 },
4033 );
4034
4035 let highlights = project_b
4036 .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
4037 .await
4038 .unwrap();
4039 buffer_b.read_with(cx_b, |buffer, _| {
4040 let snapshot = buffer.snapshot();
4041
4042 let highlights = highlights
4043 .into_iter()
4044 .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
4045 .collect::<Vec<_>>();
4046 assert_eq!(
4047 highlights,
4048 &[
4049 (lsp::DocumentHighlightKind::WRITE, 10..16),
4050 (lsp::DocumentHighlightKind::READ, 32..38),
4051 (lsp::DocumentHighlightKind::READ, 41..47)
4052 ]
4053 )
4054 });
4055 }
4056
4057 #[gpui::test(iterations = 10)]
4058 async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4059 cx_a.foreground().forbid_parking();
4060 let lang_registry = Arc::new(LanguageRegistry::test());
4061 let fs = FakeFs::new(cx_a.background());
4062 fs.insert_tree(
4063 "/code",
4064 json!({
4065 "crate-1": {
4066 "one.rs": "const ONE: usize = 1;",
4067 },
4068 "crate-2": {
4069 "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
4070 },
4071 "private": {
4072 "passwords.txt": "the-password",
4073 }
4074 }),
4075 )
4076 .await;
4077
4078 // Set up a fake language server.
4079 let mut language = Language::new(
4080 LanguageConfig {
4081 name: "Rust".into(),
4082 path_suffixes: vec!["rs".to_string()],
4083 ..Default::default()
4084 },
4085 Some(tree_sitter_rust::language()),
4086 );
4087 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4088 lang_registry.add(Arc::new(language));
4089
4090 // Connect to a server as 2 clients.
4091 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4092 let client_a = server.create_client(cx_a, "user_a").await;
4093 let mut client_b = server.create_client(cx_b, "user_b").await;
4094 server
4095 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4096 .await;
4097
4098 // Share a project as client A
4099 let project_a = cx_a.update(|cx| {
4100 Project::local(
4101 client_a.clone(),
4102 client_a.user_store.clone(),
4103 lang_registry.clone(),
4104 fs.clone(),
4105 cx,
4106 )
4107 });
4108 let (worktree_a, _) = project_a
4109 .update(cx_a, |p, cx| {
4110 p.find_or_create_local_worktree("/code/crate-1", true, cx)
4111 })
4112 .await
4113 .unwrap();
4114 worktree_a
4115 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4116 .await;
4117 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4118
4119 // Join the worktree as client B.
4120 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4121
4122 // Cause the language server to start.
4123 let _buffer = cx_b
4124 .background()
4125 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
4126 .await
4127 .unwrap();
4128
4129 let fake_language_server = fake_language_servers.next().await.unwrap();
4130 fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
4131 |_, _| async move {
4132 #[allow(deprecated)]
4133 Ok(Some(vec![lsp::SymbolInformation {
4134 name: "TWO".into(),
4135 location: lsp::Location {
4136 uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
4137 range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
4138 },
4139 kind: lsp::SymbolKind::CONSTANT,
4140 tags: None,
4141 container_name: None,
4142 deprecated: None,
4143 }]))
4144 },
4145 );
4146
4147 // Request the definition of a symbol as the guest.
4148 let symbols = project_b
4149 .update(cx_b, |p, cx| p.symbols("two", cx))
4150 .await
4151 .unwrap();
4152 assert_eq!(symbols.len(), 1);
4153 assert_eq!(symbols[0].name, "TWO");
4154
4155 // Open one of the returned symbols.
4156 let buffer_b_2 = project_b
4157 .update(cx_b, |project, cx| {
4158 project.open_buffer_for_symbol(&symbols[0], cx)
4159 })
4160 .await
4161 .unwrap();
4162 buffer_b_2.read_with(cx_b, |buffer, _| {
4163 assert_eq!(
4164 buffer.file().unwrap().path().as_ref(),
4165 Path::new("../crate-2/two.rs")
4166 );
4167 });
4168
4169 // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
4170 let mut fake_symbol = symbols[0].clone();
4171 fake_symbol.path = Path::new("/code/secrets").into();
4172 let error = project_b
4173 .update(cx_b, |project, cx| {
4174 project.open_buffer_for_symbol(&fake_symbol, cx)
4175 })
4176 .await
4177 .unwrap_err();
4178 assert!(error.to_string().contains("invalid symbol signature"));
4179 }
4180
4181 #[gpui::test(iterations = 10)]
4182 async fn test_open_buffer_while_getting_definition_pointing_to_it(
4183 cx_a: &mut TestAppContext,
4184 cx_b: &mut TestAppContext,
4185 mut rng: StdRng,
4186 ) {
4187 cx_a.foreground().forbid_parking();
4188 let lang_registry = Arc::new(LanguageRegistry::test());
4189 let fs = FakeFs::new(cx_a.background());
4190 fs.insert_tree(
4191 "/root",
4192 json!({
4193 "a.rs": "const ONE: usize = b::TWO;",
4194 "b.rs": "const TWO: usize = 2",
4195 }),
4196 )
4197 .await;
4198
4199 // Set up a fake language server.
4200 let mut language = Language::new(
4201 LanguageConfig {
4202 name: "Rust".into(),
4203 path_suffixes: vec!["rs".to_string()],
4204 ..Default::default()
4205 },
4206 Some(tree_sitter_rust::language()),
4207 );
4208 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4209 lang_registry.add(Arc::new(language));
4210
4211 // Connect to a server as 2 clients.
4212 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4213 let client_a = server.create_client(cx_a, "user_a").await;
4214 let mut client_b = server.create_client(cx_b, "user_b").await;
4215 server
4216 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4217 .await;
4218
4219 // Share a project as client A
4220 let project_a = cx_a.update(|cx| {
4221 Project::local(
4222 client_a.clone(),
4223 client_a.user_store.clone(),
4224 lang_registry.clone(),
4225 fs.clone(),
4226 cx,
4227 )
4228 });
4229
4230 let (worktree_a, _) = project_a
4231 .update(cx_a, |p, cx| {
4232 p.find_or_create_local_worktree("/root", true, cx)
4233 })
4234 .await
4235 .unwrap();
4236 worktree_a
4237 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4238 .await;
4239 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4240
4241 // Join the project as client B.
4242 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4243
4244 let buffer_b1 = cx_b
4245 .background()
4246 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
4247 .await
4248 .unwrap();
4249
4250 let fake_language_server = fake_language_servers.next().await.unwrap();
4251 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
4252 |_, _| async move {
4253 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
4254 lsp::Location::new(
4255 lsp::Url::from_file_path("/root/b.rs").unwrap(),
4256 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
4257 ),
4258 )))
4259 },
4260 );
4261
4262 let definitions;
4263 let buffer_b2;
4264 if rng.gen() {
4265 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4266 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4267 } else {
4268 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4269 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4270 }
4271
4272 let buffer_b2 = buffer_b2.await.unwrap();
4273 let definitions = definitions.await.unwrap();
4274 assert_eq!(definitions.len(), 1);
4275 assert_eq!(definitions[0].buffer, buffer_b2);
4276 }
4277
4278 #[gpui::test(iterations = 10)]
4279 async fn test_collaborating_with_code_actions(
4280 cx_a: &mut TestAppContext,
4281 cx_b: &mut TestAppContext,
4282 ) {
4283 cx_a.foreground().forbid_parking();
4284 let lang_registry = Arc::new(LanguageRegistry::test());
4285 let fs = FakeFs::new(cx_a.background());
4286 cx_b.update(|cx| editor::init(cx));
4287
4288 // Set up a fake language server.
4289 let mut language = Language::new(
4290 LanguageConfig {
4291 name: "Rust".into(),
4292 path_suffixes: vec!["rs".to_string()],
4293 ..Default::default()
4294 },
4295 Some(tree_sitter_rust::language()),
4296 );
4297 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4298 lang_registry.add(Arc::new(language));
4299
4300 // Connect to a server as 2 clients.
4301 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4302 let client_a = server.create_client(cx_a, "user_a").await;
4303 let mut client_b = server.create_client(cx_b, "user_b").await;
4304 server
4305 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4306 .await;
4307
4308 // Share a project as client A
4309 fs.insert_tree(
4310 "/a",
4311 json!({
4312 "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
4313 "other.rs": "pub fn foo() -> usize { 4 }",
4314 }),
4315 )
4316 .await;
4317 let project_a = cx_a.update(|cx| {
4318 Project::local(
4319 client_a.clone(),
4320 client_a.user_store.clone(),
4321 lang_registry.clone(),
4322 fs.clone(),
4323 cx,
4324 )
4325 });
4326 let (worktree_a, _) = project_a
4327 .update(cx_a, |p, cx| {
4328 p.find_or_create_local_worktree("/a", true, cx)
4329 })
4330 .await
4331 .unwrap();
4332 worktree_a
4333 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4334 .await;
4335 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4336
4337 // Join the project as client B.
4338 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4339 let mut params = cx_b.update(WorkspaceParams::test);
4340 params.languages = lang_registry.clone();
4341 params.project = project_b.clone();
4342 params.client = client_b.client.clone();
4343 params.user_store = client_b.user_store.clone();
4344
4345 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
4346 let editor_b = workspace_b
4347 .update(cx_b, |workspace, cx| {
4348 workspace.open_path((worktree_id, "main.rs"), true, cx)
4349 })
4350 .await
4351 .unwrap()
4352 .downcast::<Editor>()
4353 .unwrap();
4354
4355 let mut fake_language_server = fake_language_servers.next().await.unwrap();
4356 fake_language_server
4357 .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4358 assert_eq!(
4359 params.text_document.uri,
4360 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4361 );
4362 assert_eq!(params.range.start, lsp::Position::new(0, 0));
4363 assert_eq!(params.range.end, lsp::Position::new(0, 0));
4364 Ok(None)
4365 })
4366 .next()
4367 .await;
4368
4369 // Move cursor to a location that contains code actions.
4370 editor_b.update(cx_b, |editor, cx| {
4371 editor.change_selections(None, cx, |s| {
4372 s.select_ranges([Point::new(1, 31)..Point::new(1, 31)])
4373 });
4374 cx.focus(&editor_b);
4375 });
4376
4377 fake_language_server
4378 .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4379 assert_eq!(
4380 params.text_document.uri,
4381 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4382 );
4383 assert_eq!(params.range.start, lsp::Position::new(1, 31));
4384 assert_eq!(params.range.end, lsp::Position::new(1, 31));
4385
4386 Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
4387 lsp::CodeAction {
4388 title: "Inline into all callers".to_string(),
4389 edit: Some(lsp::WorkspaceEdit {
4390 changes: Some(
4391 [
4392 (
4393 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4394 vec![lsp::TextEdit::new(
4395 lsp::Range::new(
4396 lsp::Position::new(1, 22),
4397 lsp::Position::new(1, 34),
4398 ),
4399 "4".to_string(),
4400 )],
4401 ),
4402 (
4403 lsp::Url::from_file_path("/a/other.rs").unwrap(),
4404 vec![lsp::TextEdit::new(
4405 lsp::Range::new(
4406 lsp::Position::new(0, 0),
4407 lsp::Position::new(0, 27),
4408 ),
4409 "".to_string(),
4410 )],
4411 ),
4412 ]
4413 .into_iter()
4414 .collect(),
4415 ),
4416 ..Default::default()
4417 }),
4418 data: Some(json!({
4419 "codeActionParams": {
4420 "range": {
4421 "start": {"line": 1, "column": 31},
4422 "end": {"line": 1, "column": 31},
4423 }
4424 }
4425 })),
4426 ..Default::default()
4427 },
4428 )]))
4429 })
4430 .next()
4431 .await;
4432
4433 // Toggle code actions and wait for them to display.
4434 editor_b.update(cx_b, |editor, cx| {
4435 editor.toggle_code_actions(
4436 &ToggleCodeActions {
4437 deployed_from_indicator: false,
4438 },
4439 cx,
4440 );
4441 });
4442 editor_b
4443 .condition(&cx_b, |editor, _| editor.context_menu_visible())
4444 .await;
4445
4446 fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
4447
4448 // Confirming the code action will trigger a resolve request.
4449 let confirm_action = workspace_b
4450 .update(cx_b, |workspace, cx| {
4451 Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
4452 })
4453 .unwrap();
4454 fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
4455 |_, _| async move {
4456 Ok(lsp::CodeAction {
4457 title: "Inline into all callers".to_string(),
4458 edit: Some(lsp::WorkspaceEdit {
4459 changes: Some(
4460 [
4461 (
4462 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4463 vec![lsp::TextEdit::new(
4464 lsp::Range::new(
4465 lsp::Position::new(1, 22),
4466 lsp::Position::new(1, 34),
4467 ),
4468 "4".to_string(),
4469 )],
4470 ),
4471 (
4472 lsp::Url::from_file_path("/a/other.rs").unwrap(),
4473 vec![lsp::TextEdit::new(
4474 lsp::Range::new(
4475 lsp::Position::new(0, 0),
4476 lsp::Position::new(0, 27),
4477 ),
4478 "".to_string(),
4479 )],
4480 ),
4481 ]
4482 .into_iter()
4483 .collect(),
4484 ),
4485 ..Default::default()
4486 }),
4487 ..Default::default()
4488 })
4489 },
4490 );
4491
4492 // After the action is confirmed, an editor containing both modified files is opened.
4493 confirm_action.await.unwrap();
4494 let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4495 workspace
4496 .active_item(cx)
4497 .unwrap()
4498 .downcast::<Editor>()
4499 .unwrap()
4500 });
4501 code_action_editor.update(cx_b, |editor, cx| {
4502 assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4503 editor.undo(&Undo, cx);
4504 assert_eq!(
4505 editor.text(cx),
4506 "mod other;\nfn main() { let foo = other::foo(); }\npub fn foo() -> usize { 4 }"
4507 );
4508 editor.redo(&Redo, cx);
4509 assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4510 });
4511 }
4512
4513 #[gpui::test(iterations = 10)]
4514 async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4515 cx_a.foreground().forbid_parking();
4516 let lang_registry = Arc::new(LanguageRegistry::test());
4517 let fs = FakeFs::new(cx_a.background());
4518 cx_b.update(|cx| editor::init(cx));
4519
4520 // Set up a fake language server.
4521 let mut language = Language::new(
4522 LanguageConfig {
4523 name: "Rust".into(),
4524 path_suffixes: vec!["rs".to_string()],
4525 ..Default::default()
4526 },
4527 Some(tree_sitter_rust::language()),
4528 );
4529 let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
4530 capabilities: lsp::ServerCapabilities {
4531 rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
4532 prepare_provider: Some(true),
4533 work_done_progress_options: Default::default(),
4534 })),
4535 ..Default::default()
4536 },
4537 ..Default::default()
4538 });
4539 lang_registry.add(Arc::new(language));
4540
4541 // Connect to a server as 2 clients.
4542 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4543 let client_a = server.create_client(cx_a, "user_a").await;
4544 let mut client_b = server.create_client(cx_b, "user_b").await;
4545 server
4546 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4547 .await;
4548
4549 // Share a project as client A
4550 fs.insert_tree(
4551 "/dir",
4552 json!({
4553 "one.rs": "const ONE: usize = 1;",
4554 "two.rs": "const TWO: usize = one::ONE + one::ONE;"
4555 }),
4556 )
4557 .await;
4558 let project_a = cx_a.update(|cx| {
4559 Project::local(
4560 client_a.clone(),
4561 client_a.user_store.clone(),
4562 lang_registry.clone(),
4563 fs.clone(),
4564 cx,
4565 )
4566 });
4567 let (worktree_a, _) = project_a
4568 .update(cx_a, |p, cx| {
4569 p.find_or_create_local_worktree("/dir", true, cx)
4570 })
4571 .await
4572 .unwrap();
4573 worktree_a
4574 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4575 .await;
4576 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4577
4578 // Join the worktree as client B.
4579 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4580 let mut params = cx_b.update(WorkspaceParams::test);
4581 params.languages = lang_registry.clone();
4582 params.project = project_b.clone();
4583 params.client = client_b.client.clone();
4584 params.user_store = client_b.user_store.clone();
4585
4586 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
4587 let editor_b = workspace_b
4588 .update(cx_b, |workspace, cx| {
4589 workspace.open_path((worktree_id, "one.rs"), true, cx)
4590 })
4591 .await
4592 .unwrap()
4593 .downcast::<Editor>()
4594 .unwrap();
4595 let fake_language_server = fake_language_servers.next().await.unwrap();
4596
4597 // Move cursor to a location that can be renamed.
4598 let prepare_rename = editor_b.update(cx_b, |editor, cx| {
4599 editor.change_selections(None, cx, |s| s.select_ranges([7..7]));
4600 editor.rename(&Rename, cx).unwrap()
4601 });
4602
4603 fake_language_server
4604 .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
4605 assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
4606 assert_eq!(params.position, lsp::Position::new(0, 7));
4607 Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4608 lsp::Position::new(0, 6),
4609 lsp::Position::new(0, 9),
4610 ))))
4611 })
4612 .next()
4613 .await
4614 .unwrap();
4615 prepare_rename.await.unwrap();
4616 editor_b.update(cx_b, |editor, cx| {
4617 let rename = editor.pending_rename().unwrap();
4618 let buffer = editor.buffer().read(cx).snapshot(cx);
4619 assert_eq!(
4620 rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
4621 6..9
4622 );
4623 rename.editor.update(cx, |rename_editor, cx| {
4624 rename_editor.buffer().update(cx, |rename_buffer, cx| {
4625 rename_buffer.edit([(0..3, "THREE")], cx);
4626 });
4627 });
4628 });
4629
4630 let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
4631 Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
4632 });
4633 fake_language_server
4634 .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
4635 assert_eq!(
4636 params.text_document_position.text_document.uri.as_str(),
4637 "file:///dir/one.rs"
4638 );
4639 assert_eq!(
4640 params.text_document_position.position,
4641 lsp::Position::new(0, 6)
4642 );
4643 assert_eq!(params.new_name, "THREE");
4644 Ok(Some(lsp::WorkspaceEdit {
4645 changes: Some(
4646 [
4647 (
4648 lsp::Url::from_file_path("/dir/one.rs").unwrap(),
4649 vec![lsp::TextEdit::new(
4650 lsp::Range::new(
4651 lsp::Position::new(0, 6),
4652 lsp::Position::new(0, 9),
4653 ),
4654 "THREE".to_string(),
4655 )],
4656 ),
4657 (
4658 lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4659 vec![
4660 lsp::TextEdit::new(
4661 lsp::Range::new(
4662 lsp::Position::new(0, 24),
4663 lsp::Position::new(0, 27),
4664 ),
4665 "THREE".to_string(),
4666 ),
4667 lsp::TextEdit::new(
4668 lsp::Range::new(
4669 lsp::Position::new(0, 35),
4670 lsp::Position::new(0, 38),
4671 ),
4672 "THREE".to_string(),
4673 ),
4674 ],
4675 ),
4676 ]
4677 .into_iter()
4678 .collect(),
4679 ),
4680 ..Default::default()
4681 }))
4682 })
4683 .next()
4684 .await
4685 .unwrap();
4686 confirm_rename.await.unwrap();
4687
4688 let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4689 workspace
4690 .active_item(cx)
4691 .unwrap()
4692 .downcast::<Editor>()
4693 .unwrap()
4694 });
4695 rename_editor.update(cx_b, |editor, cx| {
4696 assert_eq!(
4697 editor.text(cx),
4698 "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4699 );
4700 editor.undo(&Undo, cx);
4701 assert_eq!(
4702 editor.text(cx),
4703 "const ONE: usize = 1;\nconst TWO: usize = one::ONE + one::ONE;"
4704 );
4705 editor.redo(&Redo, cx);
4706 assert_eq!(
4707 editor.text(cx),
4708 "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4709 );
4710 });
4711
4712 // Ensure temporary rename edits cannot be undone/redone.
4713 editor_b.update(cx_b, |editor, cx| {
4714 editor.undo(&Undo, cx);
4715 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4716 editor.undo(&Undo, cx);
4717 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4718 editor.redo(&Redo, cx);
4719 assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4720 })
4721 }
4722
4723 #[gpui::test(iterations = 10)]
4724 async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4725 cx_a.foreground().forbid_parking();
4726
4727 // Connect to a server as 2 clients.
4728 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4729 let client_a = server.create_client(cx_a, "user_a").await;
4730 let client_b = server.create_client(cx_b, "user_b").await;
4731
4732 // Create an org that includes these 2 users.
4733 let db = &server.app_state.db;
4734 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4735 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4736 .await
4737 .unwrap();
4738 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4739 .await
4740 .unwrap();
4741
4742 // Create a channel that includes all the users.
4743 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4744 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4745 .await
4746 .unwrap();
4747 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4748 .await
4749 .unwrap();
4750 db.create_channel_message(
4751 channel_id,
4752 client_b.current_user_id(&cx_b),
4753 "hello A, it's B.",
4754 OffsetDateTime::now_utc(),
4755 1,
4756 )
4757 .await
4758 .unwrap();
4759
4760 let channels_a = cx_a
4761 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4762 channels_a
4763 .condition(cx_a, |list, _| list.available_channels().is_some())
4764 .await;
4765 channels_a.read_with(cx_a, |list, _| {
4766 assert_eq!(
4767 list.available_channels().unwrap(),
4768 &[ChannelDetails {
4769 id: channel_id.to_proto(),
4770 name: "test-channel".to_string()
4771 }]
4772 )
4773 });
4774 let channel_a = channels_a.update(cx_a, |this, cx| {
4775 this.get_channel(channel_id.to_proto(), cx).unwrap()
4776 });
4777 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4778 channel_a
4779 .condition(&cx_a, |channel, _| {
4780 channel_messages(channel)
4781 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4782 })
4783 .await;
4784
4785 let channels_b = cx_b
4786 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4787 channels_b
4788 .condition(cx_b, |list, _| list.available_channels().is_some())
4789 .await;
4790 channels_b.read_with(cx_b, |list, _| {
4791 assert_eq!(
4792 list.available_channels().unwrap(),
4793 &[ChannelDetails {
4794 id: channel_id.to_proto(),
4795 name: "test-channel".to_string()
4796 }]
4797 )
4798 });
4799
4800 let channel_b = channels_b.update(cx_b, |this, cx| {
4801 this.get_channel(channel_id.to_proto(), cx).unwrap()
4802 });
4803 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4804 channel_b
4805 .condition(&cx_b, |channel, _| {
4806 channel_messages(channel)
4807 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4808 })
4809 .await;
4810
4811 channel_a
4812 .update(cx_a, |channel, cx| {
4813 channel
4814 .send_message("oh, hi B.".to_string(), cx)
4815 .unwrap()
4816 .detach();
4817 let task = channel.send_message("sup".to_string(), cx).unwrap();
4818 assert_eq!(
4819 channel_messages(channel),
4820 &[
4821 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4822 ("user_a".to_string(), "oh, hi B.".to_string(), true),
4823 ("user_a".to_string(), "sup".to_string(), true)
4824 ]
4825 );
4826 task
4827 })
4828 .await
4829 .unwrap();
4830
4831 channel_b
4832 .condition(&cx_b, |channel, _| {
4833 channel_messages(channel)
4834 == [
4835 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4836 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4837 ("user_a".to_string(), "sup".to_string(), false),
4838 ]
4839 })
4840 .await;
4841
4842 assert_eq!(
4843 server
4844 .state()
4845 .await
4846 .channel(channel_id)
4847 .unwrap()
4848 .connection_ids
4849 .len(),
4850 2
4851 );
4852 cx_b.update(|_| drop(channel_b));
4853 server
4854 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4855 .await;
4856
4857 cx_a.update(|_| drop(channel_a));
4858 server
4859 .condition(|state| state.channel(channel_id).is_none())
4860 .await;
4861 }
4862
4863 #[gpui::test(iterations = 10)]
4864 async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4865 cx_a.foreground().forbid_parking();
4866
4867 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4868 let client_a = server.create_client(cx_a, "user_a").await;
4869
4870 let db = &server.app_state.db;
4871 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4872 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4873 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4874 .await
4875 .unwrap();
4876 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4877 .await
4878 .unwrap();
4879
4880 let channels_a = cx_a
4881 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4882 channels_a
4883 .condition(cx_a, |list, _| list.available_channels().is_some())
4884 .await;
4885 let channel_a = channels_a.update(cx_a, |this, cx| {
4886 this.get_channel(channel_id.to_proto(), cx).unwrap()
4887 });
4888
4889 // Messages aren't allowed to be too long.
4890 channel_a
4891 .update(cx_a, |channel, cx| {
4892 let long_body = "this is long.\n".repeat(1024);
4893 channel.send_message(long_body, cx).unwrap()
4894 })
4895 .await
4896 .unwrap_err();
4897
4898 // Messages aren't allowed to be blank.
4899 channel_a.update(cx_a, |channel, cx| {
4900 channel.send_message(String::new(), cx).unwrap_err()
4901 });
4902
4903 // Leading and trailing whitespace are trimmed.
4904 channel_a
4905 .update(cx_a, |channel, cx| {
4906 channel
4907 .send_message("\n surrounded by whitespace \n".to_string(), cx)
4908 .unwrap()
4909 })
4910 .await
4911 .unwrap();
4912 assert_eq!(
4913 db.get_channel_messages(channel_id, 10, None)
4914 .await
4915 .unwrap()
4916 .iter()
4917 .map(|m| &m.body)
4918 .collect::<Vec<_>>(),
4919 &["surrounded by whitespace"]
4920 );
4921 }
4922
4923 #[gpui::test(iterations = 10)]
4924 async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4925 cx_a.foreground().forbid_parking();
4926
4927 // Connect to a server as 2 clients.
4928 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4929 let client_a = server.create_client(cx_a, "user_a").await;
4930 let client_b = server.create_client(cx_b, "user_b").await;
4931 let mut status_b = client_b.status();
4932
4933 // Create an org that includes these 2 users.
4934 let db = &server.app_state.db;
4935 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4936 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4937 .await
4938 .unwrap();
4939 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4940 .await
4941 .unwrap();
4942
4943 // Create a channel that includes all the users.
4944 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4945 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4946 .await
4947 .unwrap();
4948 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4949 .await
4950 .unwrap();
4951 db.create_channel_message(
4952 channel_id,
4953 client_b.current_user_id(&cx_b),
4954 "hello A, it's B.",
4955 OffsetDateTime::now_utc(),
4956 2,
4957 )
4958 .await
4959 .unwrap();
4960
4961 let channels_a = cx_a
4962 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4963 channels_a
4964 .condition(cx_a, |list, _| list.available_channels().is_some())
4965 .await;
4966
4967 channels_a.read_with(cx_a, |list, _| {
4968 assert_eq!(
4969 list.available_channels().unwrap(),
4970 &[ChannelDetails {
4971 id: channel_id.to_proto(),
4972 name: "test-channel".to_string()
4973 }]
4974 )
4975 });
4976 let channel_a = channels_a.update(cx_a, |this, cx| {
4977 this.get_channel(channel_id.to_proto(), cx).unwrap()
4978 });
4979 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4980 channel_a
4981 .condition(&cx_a, |channel, _| {
4982 channel_messages(channel)
4983 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4984 })
4985 .await;
4986
4987 let channels_b = cx_b
4988 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4989 channels_b
4990 .condition(cx_b, |list, _| list.available_channels().is_some())
4991 .await;
4992 channels_b.read_with(cx_b, |list, _| {
4993 assert_eq!(
4994 list.available_channels().unwrap(),
4995 &[ChannelDetails {
4996 id: channel_id.to_proto(),
4997 name: "test-channel".to_string()
4998 }]
4999 )
5000 });
5001
5002 let channel_b = channels_b.update(cx_b, |this, cx| {
5003 this.get_channel(channel_id.to_proto(), cx).unwrap()
5004 });
5005 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
5006 channel_b
5007 .condition(&cx_b, |channel, _| {
5008 channel_messages(channel)
5009 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
5010 })
5011 .await;
5012
5013 // Disconnect client B, ensuring we can still access its cached channel data.
5014 server.forbid_connections();
5015 server.disconnect_client(client_b.current_user_id(&cx_b));
5016 cx_b.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
5017 while !matches!(
5018 status_b.next().await,
5019 Some(client::Status::ReconnectionError { .. })
5020 ) {}
5021
5022 channels_b.read_with(cx_b, |channels, _| {
5023 assert_eq!(
5024 channels.available_channels().unwrap(),
5025 [ChannelDetails {
5026 id: channel_id.to_proto(),
5027 name: "test-channel".to_string()
5028 }]
5029 )
5030 });
5031 channel_b.read_with(cx_b, |channel, _| {
5032 assert_eq!(
5033 channel_messages(channel),
5034 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
5035 )
5036 });
5037
5038 // Send a message from client B while it is disconnected.
5039 channel_b
5040 .update(cx_b, |channel, cx| {
5041 let task = channel
5042 .send_message("can you see this?".to_string(), cx)
5043 .unwrap();
5044 assert_eq!(
5045 channel_messages(channel),
5046 &[
5047 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5048 ("user_b".to_string(), "can you see this?".to_string(), true)
5049 ]
5050 );
5051 task
5052 })
5053 .await
5054 .unwrap_err();
5055
5056 // Send a message from client A while B is disconnected.
5057 channel_a
5058 .update(cx_a, |channel, cx| {
5059 channel
5060 .send_message("oh, hi B.".to_string(), cx)
5061 .unwrap()
5062 .detach();
5063 let task = channel.send_message("sup".to_string(), cx).unwrap();
5064 assert_eq!(
5065 channel_messages(channel),
5066 &[
5067 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5068 ("user_a".to_string(), "oh, hi B.".to_string(), true),
5069 ("user_a".to_string(), "sup".to_string(), true)
5070 ]
5071 );
5072 task
5073 })
5074 .await
5075 .unwrap();
5076
5077 // Give client B a chance to reconnect.
5078 server.allow_connections();
5079 cx_b.foreground().advance_clock(Duration::from_secs(10));
5080
5081 // Verify that B sees the new messages upon reconnection, as well as the message client B
5082 // sent while offline.
5083 channel_b
5084 .condition(&cx_b, |channel, _| {
5085 channel_messages(channel)
5086 == [
5087 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5088 ("user_a".to_string(), "oh, hi B.".to_string(), false),
5089 ("user_a".to_string(), "sup".to_string(), false),
5090 ("user_b".to_string(), "can you see this?".to_string(), false),
5091 ]
5092 })
5093 .await;
5094
5095 // Ensure client A and B can communicate normally after reconnection.
5096 channel_a
5097 .update(cx_a, |channel, cx| {
5098 channel.send_message("you online?".to_string(), cx).unwrap()
5099 })
5100 .await
5101 .unwrap();
5102 channel_b
5103 .condition(&cx_b, |channel, _| {
5104 channel_messages(channel)
5105 == [
5106 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5107 ("user_a".to_string(), "oh, hi B.".to_string(), false),
5108 ("user_a".to_string(), "sup".to_string(), false),
5109 ("user_b".to_string(), "can you see this?".to_string(), false),
5110 ("user_a".to_string(), "you online?".to_string(), false),
5111 ]
5112 })
5113 .await;
5114
5115 channel_b
5116 .update(cx_b, |channel, cx| {
5117 channel.send_message("yep".to_string(), cx).unwrap()
5118 })
5119 .await
5120 .unwrap();
5121 channel_a
5122 .condition(&cx_a, |channel, _| {
5123 channel_messages(channel)
5124 == [
5125 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5126 ("user_a".to_string(), "oh, hi B.".to_string(), false),
5127 ("user_a".to_string(), "sup".to_string(), false),
5128 ("user_b".to_string(), "can you see this?".to_string(), false),
5129 ("user_a".to_string(), "you online?".to_string(), false),
5130 ("user_b".to_string(), "yep".to_string(), false),
5131 ]
5132 })
5133 .await;
5134 }
5135
5136 #[gpui::test(iterations = 10)]
5137 async fn test_contacts(
5138 deterministic: Arc<Deterministic>,
5139 cx_a: &mut TestAppContext,
5140 cx_b: &mut TestAppContext,
5141 cx_c: &mut TestAppContext,
5142 ) {
5143 cx_a.foreground().forbid_parking();
5144
5145 // Connect to a server as 3 clients.
5146 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5147 let mut client_a = server.create_client(cx_a, "user_a").await;
5148 let mut client_b = server.create_client(cx_b, "user_b").await;
5149 let client_c = server.create_client(cx_c, "user_c").await;
5150 server
5151 .make_contacts(vec![
5152 (&client_a, cx_a),
5153 (&client_b, cx_b),
5154 (&client_c, cx_c),
5155 ])
5156 .await;
5157
5158 deterministic.run_until_parked();
5159 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5160 client.user_store.read_with(*cx, |store, _| {
5161 assert_eq!(
5162 contacts(store),
5163 [
5164 ("user_a", true, vec![]),
5165 ("user_b", true, vec![]),
5166 ("user_c", true, vec![])
5167 ],
5168 "{} has the wrong contacts",
5169 client.username
5170 )
5171 });
5172 }
5173
5174 // Share a project as client A.
5175 let fs = FakeFs::new(cx_a.background());
5176 fs.create_dir(Path::new("/a")).await.unwrap();
5177 let (project_a, _) = client_a.build_local_project(fs, "/a", cx_a).await;
5178
5179 deterministic.run_until_parked();
5180 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5181 client.user_store.read_with(*cx, |store, _| {
5182 assert_eq!(
5183 contacts(store),
5184 [
5185 ("user_a", true, vec![("a", vec![])]),
5186 ("user_b", true, vec![]),
5187 ("user_c", true, vec![])
5188 ],
5189 "{} has the wrong contacts",
5190 client.username
5191 )
5192 });
5193 }
5194
5195 let _project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5196
5197 deterministic.run_until_parked();
5198 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5199 client.user_store.read_with(*cx, |store, _| {
5200 assert_eq!(
5201 contacts(store),
5202 [
5203 ("user_a", true, vec![("a", vec!["user_b"])]),
5204 ("user_b", true, vec![]),
5205 ("user_c", true, vec![])
5206 ],
5207 "{} has the wrong contacts",
5208 client.username
5209 )
5210 });
5211 }
5212
5213 // Add a local project as client B
5214 let fs = FakeFs::new(cx_b.background());
5215 fs.create_dir(Path::new("/b")).await.unwrap();
5216 let (_project_b, _) = client_b.build_local_project(fs, "/b", cx_a).await;
5217
5218 deterministic.run_until_parked();
5219 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5220 client.user_store.read_with(*cx, |store, _| {
5221 assert_eq!(
5222 contacts(store),
5223 [
5224 ("user_a", true, vec![("a", vec!["user_b"])]),
5225 ("user_b", true, vec![("b", vec![])]),
5226 ("user_c", true, vec![])
5227 ],
5228 "{} has the wrong contacts",
5229 client.username
5230 )
5231 });
5232 }
5233
5234 project_a
5235 .condition(&cx_a, |project, _| {
5236 project.collaborators().contains_key(&client_b.peer_id)
5237 })
5238 .await;
5239
5240 client_a.project.take();
5241 cx_a.update(move |_| drop(project_a));
5242 deterministic.run_until_parked();
5243 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5244 client.user_store.read_with(*cx, |store, _| {
5245 assert_eq!(
5246 contacts(store),
5247 [
5248 ("user_a", true, vec![]),
5249 ("user_b", true, vec![("b", vec![])]),
5250 ("user_c", true, vec![])
5251 ],
5252 "{} has the wrong contacts",
5253 client.username
5254 )
5255 });
5256 }
5257
5258 server.disconnect_client(client_c.current_user_id(cx_c));
5259 server.forbid_connections();
5260 deterministic.advance_clock(rpc::RECEIVE_TIMEOUT);
5261 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b)] {
5262 client.user_store.read_with(*cx, |store, _| {
5263 assert_eq!(
5264 contacts(store),
5265 [
5266 ("user_a", true, vec![]),
5267 ("user_b", true, vec![("b", vec![])]),
5268 ("user_c", false, vec![])
5269 ],
5270 "{} has the wrong contacts",
5271 client.username
5272 )
5273 });
5274 }
5275 client_c
5276 .user_store
5277 .read_with(cx_c, |store, _| assert_eq!(contacts(store), []));
5278
5279 server.allow_connections();
5280 client_c
5281 .authenticate_and_connect(false, &cx_c.to_async())
5282 .await
5283 .unwrap();
5284
5285 deterministic.run_until_parked();
5286 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5287 client.user_store.read_with(*cx, |store, _| {
5288 assert_eq!(
5289 contacts(store),
5290 [
5291 ("user_a", true, vec![]),
5292 ("user_b", true, vec![("b", vec![])]),
5293 ("user_c", true, vec![])
5294 ],
5295 "{} has the wrong contacts",
5296 client.username
5297 )
5298 });
5299 }
5300
5301 fn contacts(user_store: &UserStore) -> Vec<(&str, bool, Vec<(&str, Vec<&str>)>)> {
5302 user_store
5303 .contacts()
5304 .iter()
5305 .map(|contact| {
5306 let projects = contact
5307 .projects
5308 .iter()
5309 .map(|p| {
5310 (
5311 p.worktree_root_names[0].as_str(),
5312 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
5313 )
5314 })
5315 .collect();
5316 (contact.user.github_login.as_str(), contact.online, projects)
5317 })
5318 .collect()
5319 }
5320 }
5321
5322 #[gpui::test(iterations = 10)]
5323 async fn test_contact_requests(
5324 executor: Arc<Deterministic>,
5325 cx_a: &mut TestAppContext,
5326 cx_a2: &mut TestAppContext,
5327 cx_b: &mut TestAppContext,
5328 cx_b2: &mut TestAppContext,
5329 cx_c: &mut TestAppContext,
5330 cx_c2: &mut TestAppContext,
5331 ) {
5332 cx_a.foreground().forbid_parking();
5333
5334 // Connect to a server as 3 clients.
5335 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5336 let client_a = server.create_client(cx_a, "user_a").await;
5337 let client_a2 = server.create_client(cx_a2, "user_a").await;
5338 let client_b = server.create_client(cx_b, "user_b").await;
5339 let client_b2 = server.create_client(cx_b2, "user_b").await;
5340 let client_c = server.create_client(cx_c, "user_c").await;
5341 let client_c2 = server.create_client(cx_c2, "user_c").await;
5342
5343 assert_eq!(client_a.user_id().unwrap(), client_a2.user_id().unwrap());
5344 assert_eq!(client_b.user_id().unwrap(), client_b2.user_id().unwrap());
5345 assert_eq!(client_c.user_id().unwrap(), client_c2.user_id().unwrap());
5346
5347 // User A and User C request that user B become their contact.
5348 client_a
5349 .user_store
5350 .update(cx_a, |store, cx| {
5351 store.request_contact(client_b.user_id().unwrap(), cx)
5352 })
5353 .await
5354 .unwrap();
5355 client_c
5356 .user_store
5357 .update(cx_c, |store, cx| {
5358 store.request_contact(client_b.user_id().unwrap(), cx)
5359 })
5360 .await
5361 .unwrap();
5362 executor.run_until_parked();
5363
5364 // All users see the pending request appear in all their clients.
5365 assert_eq!(
5366 client_a.summarize_contacts(&cx_a).outgoing_requests,
5367 &["user_b"]
5368 );
5369 assert_eq!(
5370 client_a2.summarize_contacts(&cx_a2).outgoing_requests,
5371 &["user_b"]
5372 );
5373 assert_eq!(
5374 client_b.summarize_contacts(&cx_b).incoming_requests,
5375 &["user_a", "user_c"]
5376 );
5377 assert_eq!(
5378 client_b2.summarize_contacts(&cx_b2).incoming_requests,
5379 &["user_a", "user_c"]
5380 );
5381 assert_eq!(
5382 client_c.summarize_contacts(&cx_c).outgoing_requests,
5383 &["user_b"]
5384 );
5385 assert_eq!(
5386 client_c2.summarize_contacts(&cx_c2).outgoing_requests,
5387 &["user_b"]
5388 );
5389
5390 // Contact requests are present upon connecting (tested here via disconnect/reconnect)
5391 disconnect_and_reconnect(&client_a, cx_a).await;
5392 disconnect_and_reconnect(&client_b, cx_b).await;
5393 disconnect_and_reconnect(&client_c, cx_c).await;
5394 executor.run_until_parked();
5395 assert_eq!(
5396 client_a.summarize_contacts(&cx_a).outgoing_requests,
5397 &["user_b"]
5398 );
5399 assert_eq!(
5400 client_b.summarize_contacts(&cx_b).incoming_requests,
5401 &["user_a", "user_c"]
5402 );
5403 assert_eq!(
5404 client_c.summarize_contacts(&cx_c).outgoing_requests,
5405 &["user_b"]
5406 );
5407
5408 // User B accepts the request from user A.
5409 client_b
5410 .user_store
5411 .update(cx_b, |store, cx| {
5412 store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
5413 })
5414 .await
5415 .unwrap();
5416
5417 executor.run_until_parked();
5418
5419 // User B sees user A as their contact now in all client, and the incoming request from them is removed.
5420 let contacts_b = client_b.summarize_contacts(&cx_b);
5421 assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5422 assert_eq!(contacts_b.incoming_requests, &["user_c"]);
5423 let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5424 assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5425 assert_eq!(contacts_b2.incoming_requests, &["user_c"]);
5426
5427 // User A sees user B as their contact now in all clients, and the outgoing request to them is removed.
5428 let contacts_a = client_a.summarize_contacts(&cx_a);
5429 assert_eq!(contacts_a.current, &["user_a", "user_b"]);
5430 assert!(contacts_a.outgoing_requests.is_empty());
5431 let contacts_a2 = client_a2.summarize_contacts(&cx_a2);
5432 assert_eq!(contacts_a2.current, &["user_a", "user_b"]);
5433 assert!(contacts_a2.outgoing_requests.is_empty());
5434
5435 // Contacts are present upon connecting (tested here via disconnect/reconnect)
5436 disconnect_and_reconnect(&client_a, cx_a).await;
5437 disconnect_and_reconnect(&client_b, cx_b).await;
5438 disconnect_and_reconnect(&client_c, cx_c).await;
5439 executor.run_until_parked();
5440 assert_eq!(
5441 client_a.summarize_contacts(&cx_a).current,
5442 &["user_a", "user_b"]
5443 );
5444 assert_eq!(
5445 client_b.summarize_contacts(&cx_b).current,
5446 &["user_a", "user_b"]
5447 );
5448 assert_eq!(
5449 client_b.summarize_contacts(&cx_b).incoming_requests,
5450 &["user_c"]
5451 );
5452 assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5453 assert_eq!(
5454 client_c.summarize_contacts(&cx_c).outgoing_requests,
5455 &["user_b"]
5456 );
5457
5458 // User B rejects the request from user C.
5459 client_b
5460 .user_store
5461 .update(cx_b, |store, cx| {
5462 store.respond_to_contact_request(client_c.user_id().unwrap(), false, cx)
5463 })
5464 .await
5465 .unwrap();
5466
5467 executor.run_until_parked();
5468
5469 // User B doesn't see user C as their contact, and the incoming request from them is removed.
5470 let contacts_b = client_b.summarize_contacts(&cx_b);
5471 assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5472 assert!(contacts_b.incoming_requests.is_empty());
5473 let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5474 assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5475 assert!(contacts_b2.incoming_requests.is_empty());
5476
5477 // User C doesn't see user B as their contact, and the outgoing request to them is removed.
5478 let contacts_c = client_c.summarize_contacts(&cx_c);
5479 assert_eq!(contacts_c.current, &["user_c"]);
5480 assert!(contacts_c.outgoing_requests.is_empty());
5481 let contacts_c2 = client_c2.summarize_contacts(&cx_c2);
5482 assert_eq!(contacts_c2.current, &["user_c"]);
5483 assert!(contacts_c2.outgoing_requests.is_empty());
5484
5485 // Incoming/outgoing requests are not present upon connecting (tested here via disconnect/reconnect)
5486 disconnect_and_reconnect(&client_a, cx_a).await;
5487 disconnect_and_reconnect(&client_b, cx_b).await;
5488 disconnect_and_reconnect(&client_c, cx_c).await;
5489 executor.run_until_parked();
5490 assert_eq!(
5491 client_a.summarize_contacts(&cx_a).current,
5492 &["user_a", "user_b"]
5493 );
5494 assert_eq!(
5495 client_b.summarize_contacts(&cx_b).current,
5496 &["user_a", "user_b"]
5497 );
5498 assert!(client_b
5499 .summarize_contacts(&cx_b)
5500 .incoming_requests
5501 .is_empty());
5502 assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5503 assert!(client_c
5504 .summarize_contacts(&cx_c)
5505 .outgoing_requests
5506 .is_empty());
5507
5508 async fn disconnect_and_reconnect(client: &TestClient, cx: &mut TestAppContext) {
5509 client.disconnect(&cx.to_async()).unwrap();
5510 client.clear_contacts(cx).await;
5511 client
5512 .authenticate_and_connect(false, &cx.to_async())
5513 .await
5514 .unwrap();
5515 }
5516 }
5517
5518 #[gpui::test(iterations = 10)]
5519 async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5520 cx_a.foreground().forbid_parking();
5521 let fs = FakeFs::new(cx_a.background());
5522
5523 // 2 clients connect to a server.
5524 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5525 let mut client_a = server.create_client(cx_a, "user_a").await;
5526 let mut client_b = server.create_client(cx_b, "user_b").await;
5527 server
5528 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5529 .await;
5530 cx_a.update(editor::init);
5531 cx_b.update(editor::init);
5532
5533 // Client A shares a project.
5534 fs.insert_tree(
5535 "/a",
5536 json!({
5537 "1.txt": "one",
5538 "2.txt": "two",
5539 "3.txt": "three",
5540 }),
5541 )
5542 .await;
5543 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5544
5545 // Client B joins the project.
5546 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5547
5548 // Client A opens some editors.
5549 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5550 let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5551 let editor_a1 = workspace_a
5552 .update(cx_a, |workspace, cx| {
5553 workspace.open_path((worktree_id, "1.txt"), true, cx)
5554 })
5555 .await
5556 .unwrap()
5557 .downcast::<Editor>()
5558 .unwrap();
5559 let editor_a2 = workspace_a
5560 .update(cx_a, |workspace, cx| {
5561 workspace.open_path((worktree_id, "2.txt"), true, cx)
5562 })
5563 .await
5564 .unwrap()
5565 .downcast::<Editor>()
5566 .unwrap();
5567
5568 // Client B opens an editor.
5569 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5570 let editor_b1 = workspace_b
5571 .update(cx_b, |workspace, cx| {
5572 workspace.open_path((worktree_id, "1.txt"), true, cx)
5573 })
5574 .await
5575 .unwrap()
5576 .downcast::<Editor>()
5577 .unwrap();
5578
5579 let client_a_id = project_b.read_with(cx_b, |project, _| {
5580 project.collaborators().values().next().unwrap().peer_id
5581 });
5582 let client_b_id = project_a.read_with(cx_a, |project, _| {
5583 project.collaborators().values().next().unwrap().peer_id
5584 });
5585
5586 // When client B starts following client A, all visible view states are replicated to client B.
5587 editor_a1.update(cx_a, |editor, cx| {
5588 editor.change_selections(None, cx, |s| s.select_ranges([0..1]))
5589 });
5590 editor_a2.update(cx_a, |editor, cx| {
5591 editor.change_selections(None, cx, |s| s.select_ranges([2..3]))
5592 });
5593 workspace_b
5594 .update(cx_b, |workspace, cx| {
5595 workspace
5596 .toggle_follow(&ToggleFollow(client_a_id), cx)
5597 .unwrap()
5598 })
5599 .await
5600 .unwrap();
5601
5602 let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5603 workspace
5604 .active_item(cx)
5605 .unwrap()
5606 .downcast::<Editor>()
5607 .unwrap()
5608 });
5609 assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
5610 assert_eq!(
5611 editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
5612 Some((worktree_id, "2.txt").into())
5613 );
5614 assert_eq!(
5615 editor_b2.read_with(cx_b, |editor, cx| editor.selections.ranges(cx)),
5616 vec![2..3]
5617 );
5618 assert_eq!(
5619 editor_b1.read_with(cx_b, |editor, cx| editor.selections.ranges(cx)),
5620 vec![0..1]
5621 );
5622
5623 // When client A activates a different editor, client B does so as well.
5624 workspace_a.update(cx_a, |workspace, cx| {
5625 workspace.activate_item(&editor_a1, cx)
5626 });
5627 workspace_b
5628 .condition(cx_b, |workspace, cx| {
5629 workspace.active_item(cx).unwrap().id() == editor_b1.id()
5630 })
5631 .await;
5632
5633 // When client A navigates back and forth, client B does so as well.
5634 workspace_a
5635 .update(cx_a, |workspace, cx| {
5636 workspace::Pane::go_back(workspace, None, cx)
5637 })
5638 .await;
5639 workspace_b
5640 .condition(cx_b, |workspace, cx| {
5641 workspace.active_item(cx).unwrap().id() == editor_b2.id()
5642 })
5643 .await;
5644
5645 workspace_a
5646 .update(cx_a, |workspace, cx| {
5647 workspace::Pane::go_forward(workspace, None, cx)
5648 })
5649 .await;
5650 workspace_b
5651 .condition(cx_b, |workspace, cx| {
5652 workspace.active_item(cx).unwrap().id() == editor_b1.id()
5653 })
5654 .await;
5655
5656 // Changes to client A's editor are reflected on client B.
5657 editor_a1.update(cx_a, |editor, cx| {
5658 editor.change_selections(None, cx, |s| s.select_ranges([1..1, 2..2]));
5659 });
5660 editor_b1
5661 .condition(cx_b, |editor, cx| {
5662 editor.selections.ranges(cx) == vec![1..1, 2..2]
5663 })
5664 .await;
5665
5666 editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
5667 editor_b1
5668 .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
5669 .await;
5670
5671 editor_a1.update(cx_a, |editor, cx| {
5672 editor.change_selections(None, cx, |s| s.select_ranges([3..3]));
5673 editor.set_scroll_position(vec2f(0., 100.), cx);
5674 });
5675 editor_b1
5676 .condition(cx_b, |editor, cx| {
5677 editor.selections.ranges(cx) == vec![3..3]
5678 })
5679 .await;
5680
5681 // After unfollowing, client B stops receiving updates from client A.
5682 workspace_b.update(cx_b, |workspace, cx| {
5683 workspace.unfollow(&workspace.active_pane().clone(), cx)
5684 });
5685 workspace_a.update(cx_a, |workspace, cx| {
5686 workspace.activate_item(&editor_a2, cx)
5687 });
5688 cx_a.foreground().run_until_parked();
5689 assert_eq!(
5690 workspace_b.read_with(cx_b, |workspace, cx| workspace
5691 .active_item(cx)
5692 .unwrap()
5693 .id()),
5694 editor_b1.id()
5695 );
5696
5697 // Client A starts following client B.
5698 workspace_a
5699 .update(cx_a, |workspace, cx| {
5700 workspace
5701 .toggle_follow(&ToggleFollow(client_b_id), cx)
5702 .unwrap()
5703 })
5704 .await
5705 .unwrap();
5706 assert_eq!(
5707 workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5708 Some(client_b_id)
5709 );
5710 assert_eq!(
5711 workspace_a.read_with(cx_a, |workspace, cx| workspace
5712 .active_item(cx)
5713 .unwrap()
5714 .id()),
5715 editor_a1.id()
5716 );
5717
5718 // Following interrupts when client B disconnects.
5719 client_b.disconnect(&cx_b.to_async()).unwrap();
5720 cx_a.foreground().run_until_parked();
5721 assert_eq!(
5722 workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5723 None
5724 );
5725 }
5726
5727 #[gpui::test(iterations = 10)]
5728 async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5729 cx_a.foreground().forbid_parking();
5730 let fs = FakeFs::new(cx_a.background());
5731
5732 // 2 clients connect to a server.
5733 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5734 let mut client_a = server.create_client(cx_a, "user_a").await;
5735 let mut client_b = server.create_client(cx_b, "user_b").await;
5736 server
5737 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5738 .await;
5739 cx_a.update(editor::init);
5740 cx_b.update(editor::init);
5741
5742 // Client A shares a project.
5743 fs.insert_tree(
5744 "/a",
5745 json!({
5746 "1.txt": "one",
5747 "2.txt": "two",
5748 "3.txt": "three",
5749 "4.txt": "four",
5750 }),
5751 )
5752 .await;
5753 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5754
5755 // Client B joins the project.
5756 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5757
5758 // Client A opens some editors.
5759 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5760 let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5761 let _editor_a1 = workspace_a
5762 .update(cx_a, |workspace, cx| {
5763 workspace.open_path((worktree_id, "1.txt"), true, cx)
5764 })
5765 .await
5766 .unwrap()
5767 .downcast::<Editor>()
5768 .unwrap();
5769
5770 // Client B opens an editor.
5771 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5772 let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5773 let _editor_b1 = workspace_b
5774 .update(cx_b, |workspace, cx| {
5775 workspace.open_path((worktree_id, "2.txt"), true, cx)
5776 })
5777 .await
5778 .unwrap()
5779 .downcast::<Editor>()
5780 .unwrap();
5781
5782 // Clients A and B follow each other in split panes
5783 workspace_a
5784 .update(cx_a, |workspace, cx| {
5785 workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5786 assert_ne!(*workspace.active_pane(), pane_a1);
5787 let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
5788 workspace
5789 .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5790 .unwrap()
5791 })
5792 .await
5793 .unwrap();
5794 workspace_b
5795 .update(cx_b, |workspace, cx| {
5796 workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5797 assert_ne!(*workspace.active_pane(), pane_b1);
5798 let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
5799 workspace
5800 .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5801 .unwrap()
5802 })
5803 .await
5804 .unwrap();
5805
5806 workspace_a
5807 .update(cx_a, |workspace, cx| {
5808 workspace.activate_next_pane(cx);
5809 assert_eq!(*workspace.active_pane(), pane_a1);
5810 workspace.open_path((worktree_id, "3.txt"), true, cx)
5811 })
5812 .await
5813 .unwrap();
5814 workspace_b
5815 .update(cx_b, |workspace, cx| {
5816 workspace.activate_next_pane(cx);
5817 assert_eq!(*workspace.active_pane(), pane_b1);
5818 workspace.open_path((worktree_id, "4.txt"), true, cx)
5819 })
5820 .await
5821 .unwrap();
5822 cx_a.foreground().run_until_parked();
5823
5824 // Ensure leader updates don't change the active pane of followers
5825 workspace_a.read_with(cx_a, |workspace, _| {
5826 assert_eq!(*workspace.active_pane(), pane_a1);
5827 });
5828 workspace_b.read_with(cx_b, |workspace, _| {
5829 assert_eq!(*workspace.active_pane(), pane_b1);
5830 });
5831
5832 // Ensure peers following each other doesn't cause an infinite loop.
5833 assert_eq!(
5834 workspace_a.read_with(cx_a, |workspace, cx| workspace
5835 .active_item(cx)
5836 .unwrap()
5837 .project_path(cx)),
5838 Some((worktree_id, "3.txt").into())
5839 );
5840 workspace_a.update(cx_a, |workspace, cx| {
5841 assert_eq!(
5842 workspace.active_item(cx).unwrap().project_path(cx),
5843 Some((worktree_id, "3.txt").into())
5844 );
5845 workspace.activate_next_pane(cx);
5846 assert_eq!(
5847 workspace.active_item(cx).unwrap().project_path(cx),
5848 Some((worktree_id, "4.txt").into())
5849 );
5850 });
5851 workspace_b.update(cx_b, |workspace, cx| {
5852 assert_eq!(
5853 workspace.active_item(cx).unwrap().project_path(cx),
5854 Some((worktree_id, "4.txt").into())
5855 );
5856 workspace.activate_next_pane(cx);
5857 assert_eq!(
5858 workspace.active_item(cx).unwrap().project_path(cx),
5859 Some((worktree_id, "3.txt").into())
5860 );
5861 });
5862 }
5863
5864 #[gpui::test(iterations = 10)]
5865 async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5866 cx_a.foreground().forbid_parking();
5867 let fs = FakeFs::new(cx_a.background());
5868
5869 // 2 clients connect to a server.
5870 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5871 let mut client_a = server.create_client(cx_a, "user_a").await;
5872 let mut client_b = server.create_client(cx_b, "user_b").await;
5873 server
5874 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5875 .await;
5876 cx_a.update(editor::init);
5877 cx_b.update(editor::init);
5878
5879 // Client A shares a project.
5880 fs.insert_tree(
5881 "/a",
5882 json!({
5883 "1.txt": "one",
5884 "2.txt": "two",
5885 "3.txt": "three",
5886 }),
5887 )
5888 .await;
5889 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5890
5891 // Client B joins the project.
5892 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5893
5894 // Client A opens some editors.
5895 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5896 let _editor_a1 = workspace_a
5897 .update(cx_a, |workspace, cx| {
5898 workspace.open_path((worktree_id, "1.txt"), true, cx)
5899 })
5900 .await
5901 .unwrap()
5902 .downcast::<Editor>()
5903 .unwrap();
5904
5905 // Client B starts following client A.
5906 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5907 let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5908 let leader_id = project_b.read_with(cx_b, |project, _| {
5909 project.collaborators().values().next().unwrap().peer_id
5910 });
5911 workspace_b
5912 .update(cx_b, |workspace, cx| {
5913 workspace
5914 .toggle_follow(&ToggleFollow(leader_id), cx)
5915 .unwrap()
5916 })
5917 .await
5918 .unwrap();
5919 assert_eq!(
5920 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5921 Some(leader_id)
5922 );
5923 let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5924 workspace
5925 .active_item(cx)
5926 .unwrap()
5927 .downcast::<Editor>()
5928 .unwrap()
5929 });
5930
5931 // When client B moves, it automatically stops following client A.
5932 editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
5933 assert_eq!(
5934 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5935 None
5936 );
5937
5938 workspace_b
5939 .update(cx_b, |workspace, cx| {
5940 workspace
5941 .toggle_follow(&ToggleFollow(leader_id), cx)
5942 .unwrap()
5943 })
5944 .await
5945 .unwrap();
5946 assert_eq!(
5947 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5948 Some(leader_id)
5949 );
5950
5951 // When client B edits, it automatically stops following client A.
5952 editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
5953 assert_eq!(
5954 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5955 None
5956 );
5957
5958 workspace_b
5959 .update(cx_b, |workspace, cx| {
5960 workspace
5961 .toggle_follow(&ToggleFollow(leader_id), cx)
5962 .unwrap()
5963 })
5964 .await
5965 .unwrap();
5966 assert_eq!(
5967 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5968 Some(leader_id)
5969 );
5970
5971 // When client B scrolls, it automatically stops following client A.
5972 editor_b2.update(cx_b, |editor, cx| {
5973 editor.set_scroll_position(vec2f(0., 3.), cx)
5974 });
5975 assert_eq!(
5976 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5977 None
5978 );
5979
5980 workspace_b
5981 .update(cx_b, |workspace, cx| {
5982 workspace
5983 .toggle_follow(&ToggleFollow(leader_id), cx)
5984 .unwrap()
5985 })
5986 .await
5987 .unwrap();
5988 assert_eq!(
5989 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5990 Some(leader_id)
5991 );
5992
5993 // When client B activates a different pane, it continues following client A in the original pane.
5994 workspace_b.update(cx_b, |workspace, cx| {
5995 workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
5996 });
5997 assert_eq!(
5998 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5999 Some(leader_id)
6000 );
6001
6002 workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
6003 assert_eq!(
6004 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6005 Some(leader_id)
6006 );
6007
6008 // When client B activates a different item in the original pane, it automatically stops following client A.
6009 workspace_b
6010 .update(cx_b, |workspace, cx| {
6011 workspace.open_path((worktree_id, "2.txt"), true, cx)
6012 })
6013 .await
6014 .unwrap();
6015 assert_eq!(
6016 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6017 None
6018 );
6019 }
6020
6021 #[gpui::test(iterations = 100)]
6022 async fn test_random_collaboration(
6023 cx: &mut TestAppContext,
6024 deterministic: Arc<Deterministic>,
6025 rng: StdRng,
6026 ) {
6027 cx.foreground().forbid_parking();
6028 let max_peers = env::var("MAX_PEERS")
6029 .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
6030 .unwrap_or(5);
6031 assert!(max_peers <= 5);
6032
6033 let max_operations = env::var("OPERATIONS")
6034 .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
6035 .unwrap_or(10);
6036
6037 let rng = Arc::new(Mutex::new(rng));
6038
6039 let guest_lang_registry = Arc::new(LanguageRegistry::test());
6040 let host_language_registry = Arc::new(LanguageRegistry::test());
6041
6042 let fs = FakeFs::new(cx.background());
6043 fs.insert_tree("/_collab", json!({"init": ""})).await;
6044
6045 let mut server = TestServer::start(cx.foreground(), cx.background()).await;
6046 let db = server.app_state.db.clone();
6047 let host_user_id = db.create_user("host", false).await.unwrap();
6048 for username in ["guest-1", "guest-2", "guest-3", "guest-4"] {
6049 let guest_user_id = db.create_user(username, false).await.unwrap();
6050 server
6051 .app_state
6052 .db
6053 .send_contact_request(guest_user_id, host_user_id)
6054 .await
6055 .unwrap();
6056 server
6057 .app_state
6058 .db
6059 .respond_to_contact_request(host_user_id, guest_user_id, true)
6060 .await
6061 .unwrap();
6062 }
6063
6064 let mut clients = Vec::new();
6065 let mut user_ids = Vec::new();
6066 let mut op_start_signals = Vec::new();
6067
6068 let mut next_entity_id = 100000;
6069 let mut host_cx = TestAppContext::new(
6070 cx.foreground_platform(),
6071 cx.platform(),
6072 deterministic.build_foreground(next_entity_id),
6073 deterministic.build_background(),
6074 cx.font_cache(),
6075 cx.leak_detector(),
6076 next_entity_id,
6077 );
6078 let host = server.create_client(&mut host_cx, "host").await;
6079 let host_project = host_cx.update(|cx| {
6080 Project::local(
6081 host.client.clone(),
6082 host.user_store.clone(),
6083 host_language_registry.clone(),
6084 fs.clone(),
6085 cx,
6086 )
6087 });
6088 let host_project_id = host_project
6089 .update(&mut host_cx, |p, _| p.next_remote_id())
6090 .await;
6091
6092 let (collab_worktree, _) = host_project
6093 .update(&mut host_cx, |project, cx| {
6094 project.find_or_create_local_worktree("/_collab", true, cx)
6095 })
6096 .await
6097 .unwrap();
6098 collab_worktree
6099 .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
6100 .await;
6101
6102 // Set up fake language servers.
6103 let mut language = Language::new(
6104 LanguageConfig {
6105 name: "Rust".into(),
6106 path_suffixes: vec!["rs".to_string()],
6107 ..Default::default()
6108 },
6109 None,
6110 );
6111 let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
6112 name: "the-fake-language-server",
6113 capabilities: lsp::LanguageServer::full_capabilities(),
6114 initializer: Some(Box::new({
6115 let rng = rng.clone();
6116 let fs = fs.clone();
6117 let project = host_project.downgrade();
6118 move |fake_server: &mut FakeLanguageServer| {
6119 fake_server.handle_request::<lsp::request::Completion, _, _>(
6120 |_, _| async move {
6121 Ok(Some(lsp::CompletionResponse::Array(vec![
6122 lsp::CompletionItem {
6123 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
6124 range: lsp::Range::new(
6125 lsp::Position::new(0, 0),
6126 lsp::Position::new(0, 0),
6127 ),
6128 new_text: "the-new-text".to_string(),
6129 })),
6130 ..Default::default()
6131 },
6132 ])))
6133 },
6134 );
6135
6136 fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
6137 |_, _| async move {
6138 Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
6139 lsp::CodeAction {
6140 title: "the-code-action".to_string(),
6141 ..Default::default()
6142 },
6143 )]))
6144 },
6145 );
6146
6147 fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
6148 |params, _| async move {
6149 Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
6150 params.position,
6151 params.position,
6152 ))))
6153 },
6154 );
6155
6156 fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
6157 let fs = fs.clone();
6158 let rng = rng.clone();
6159 move |_, _| {
6160 let fs = fs.clone();
6161 let rng = rng.clone();
6162 async move {
6163 let files = fs.files().await;
6164 let mut rng = rng.lock();
6165 let count = rng.gen_range::<usize, _>(1..3);
6166 let files = (0..count)
6167 .map(|_| files.choose(&mut *rng).unwrap())
6168 .collect::<Vec<_>>();
6169 log::info!("LSP: Returning definitions in files {:?}", &files);
6170 Ok(Some(lsp::GotoDefinitionResponse::Array(
6171 files
6172 .into_iter()
6173 .map(|file| lsp::Location {
6174 uri: lsp::Url::from_file_path(file).unwrap(),
6175 range: Default::default(),
6176 })
6177 .collect(),
6178 )))
6179 }
6180 }
6181 });
6182
6183 fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
6184 let rng = rng.clone();
6185 let project = project.clone();
6186 move |params, mut cx| {
6187 let highlights = if let Some(project) = project.upgrade(&cx) {
6188 project.update(&mut cx, |project, cx| {
6189 let path = params
6190 .text_document_position_params
6191 .text_document
6192 .uri
6193 .to_file_path()
6194 .unwrap();
6195 let (worktree, relative_path) =
6196 project.find_local_worktree(&path, cx)?;
6197 let project_path =
6198 ProjectPath::from((worktree.read(cx).id(), relative_path));
6199 let buffer =
6200 project.get_open_buffer(&project_path, cx)?.read(cx);
6201
6202 let mut highlights = Vec::new();
6203 let highlight_count = rng.lock().gen_range(1..=5);
6204 let mut prev_end = 0;
6205 for _ in 0..highlight_count {
6206 let range =
6207 buffer.random_byte_range(prev_end, &mut *rng.lock());
6208
6209 highlights.push(lsp::DocumentHighlight {
6210 range: range_to_lsp(range.to_point_utf16(buffer)),
6211 kind: Some(lsp::DocumentHighlightKind::READ),
6212 });
6213 prev_end = range.end;
6214 }
6215 Some(highlights)
6216 })
6217 } else {
6218 None
6219 };
6220 async move { Ok(highlights) }
6221 }
6222 });
6223 }
6224 })),
6225 ..Default::default()
6226 });
6227 host_language_registry.add(Arc::new(language));
6228
6229 let op_start_signal = futures::channel::mpsc::unbounded();
6230 user_ids.push(host.current_user_id(&host_cx));
6231 op_start_signals.push(op_start_signal.0);
6232 clients.push(host_cx.foreground().spawn(host.simulate_host(
6233 host_project,
6234 op_start_signal.1,
6235 rng.clone(),
6236 host_cx,
6237 )));
6238
6239 let disconnect_host_at = if rng.lock().gen_bool(0.2) {
6240 rng.lock().gen_range(0..max_operations)
6241 } else {
6242 max_operations
6243 };
6244 let mut available_guests = vec![
6245 "guest-1".to_string(),
6246 "guest-2".to_string(),
6247 "guest-3".to_string(),
6248 "guest-4".to_string(),
6249 ];
6250 let mut operations = 0;
6251 while operations < max_operations {
6252 if operations == disconnect_host_at {
6253 server.disconnect_client(user_ids[0]);
6254 cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6255 drop(op_start_signals);
6256 let mut clients = futures::future::join_all(clients).await;
6257 cx.foreground().run_until_parked();
6258
6259 let (host, mut host_cx, host_err) = clients.remove(0);
6260 if let Some(host_err) = host_err {
6261 log::error!("host error - {:?}", host_err);
6262 }
6263 host.project
6264 .as_ref()
6265 .unwrap()
6266 .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
6267 for (guest, mut guest_cx, guest_err) in clients {
6268 if let Some(guest_err) = guest_err {
6269 log::error!("{} error - {:?}", guest.username, guest_err);
6270 }
6271
6272 let contacts = server
6273 .app_state
6274 .db
6275 .get_contacts(guest.current_user_id(&guest_cx))
6276 .await
6277 .unwrap();
6278 let contacts = server
6279 .store
6280 .read()
6281 .await
6282 .build_initial_contacts_update(contacts)
6283 .contacts;
6284 assert!(!contacts
6285 .iter()
6286 .flat_map(|contact| &contact.projects)
6287 .any(|project| project.id == host_project_id));
6288 guest
6289 .project
6290 .as_ref()
6291 .unwrap()
6292 .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6293 guest_cx.update(|_| drop(guest));
6294 }
6295 host_cx.update(|_| drop(host));
6296
6297 return;
6298 }
6299
6300 let distribution = rng.lock().gen_range(0..100);
6301 match distribution {
6302 0..=19 if !available_guests.is_empty() => {
6303 let guest_ix = rng.lock().gen_range(0..available_guests.len());
6304 let guest_username = available_guests.remove(guest_ix);
6305 log::info!("Adding new connection for {}", guest_username);
6306 next_entity_id += 100000;
6307 let mut guest_cx = TestAppContext::new(
6308 cx.foreground_platform(),
6309 cx.platform(),
6310 deterministic.build_foreground(next_entity_id),
6311 deterministic.build_background(),
6312 cx.font_cache(),
6313 cx.leak_detector(),
6314 next_entity_id,
6315 );
6316 let guest = server.create_client(&mut guest_cx, &guest_username).await;
6317 let guest_project = Project::remote(
6318 host_project_id,
6319 guest.client.clone(),
6320 guest.user_store.clone(),
6321 guest_lang_registry.clone(),
6322 FakeFs::new(cx.background()),
6323 &mut guest_cx.to_async(),
6324 )
6325 .await
6326 .unwrap();
6327 let op_start_signal = futures::channel::mpsc::unbounded();
6328 user_ids.push(guest.current_user_id(&guest_cx));
6329 op_start_signals.push(op_start_signal.0);
6330 clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
6331 guest_username.clone(),
6332 guest_project,
6333 op_start_signal.1,
6334 rng.clone(),
6335 guest_cx,
6336 )));
6337
6338 log::info!("Added connection for {}", guest_username);
6339 operations += 1;
6340 }
6341 20..=29 if clients.len() > 1 => {
6342 let guest_ix = rng.lock().gen_range(1..clients.len());
6343 log::info!("Removing guest {}", user_ids[guest_ix]);
6344 let removed_guest_id = user_ids.remove(guest_ix);
6345 let guest = clients.remove(guest_ix);
6346 op_start_signals.remove(guest_ix);
6347 server.forbid_connections();
6348 server.disconnect_client(removed_guest_id);
6349 cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6350 let (guest, mut guest_cx, guest_err) = guest.await;
6351 server.allow_connections();
6352
6353 if let Some(guest_err) = guest_err {
6354 log::error!("{} error - {:?}", guest.username, guest_err);
6355 }
6356 guest
6357 .project
6358 .as_ref()
6359 .unwrap()
6360 .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6361 for user_id in &user_ids {
6362 let contacts = server.app_state.db.get_contacts(*user_id).await.unwrap();
6363 let contacts = server
6364 .store
6365 .read()
6366 .await
6367 .build_initial_contacts_update(contacts)
6368 .contacts;
6369 for contact in contacts {
6370 if contact.online {
6371 assert_ne!(
6372 contact.user_id, removed_guest_id.0 as u64,
6373 "removed guest is still a contact of another peer"
6374 );
6375 }
6376 for project in contact.projects {
6377 for project_guest_id in project.guests {
6378 assert_ne!(
6379 project_guest_id, removed_guest_id.0 as u64,
6380 "removed guest appears as still participating on a project"
6381 );
6382 }
6383 }
6384 }
6385 }
6386
6387 log::info!("{} removed", guest.username);
6388 available_guests.push(guest.username.clone());
6389 guest_cx.update(|_| drop(guest));
6390
6391 operations += 1;
6392 }
6393 _ => {
6394 while operations < max_operations && rng.lock().gen_bool(0.7) {
6395 op_start_signals
6396 .choose(&mut *rng.lock())
6397 .unwrap()
6398 .unbounded_send(())
6399 .unwrap();
6400 operations += 1;
6401 }
6402
6403 if rng.lock().gen_bool(0.8) {
6404 cx.foreground().run_until_parked();
6405 }
6406 }
6407 }
6408 }
6409
6410 drop(op_start_signals);
6411 let mut clients = futures::future::join_all(clients).await;
6412 cx.foreground().run_until_parked();
6413
6414 let (host_client, mut host_cx, host_err) = clients.remove(0);
6415 if let Some(host_err) = host_err {
6416 panic!("host error - {:?}", host_err);
6417 }
6418 let host_project = host_client.project.as_ref().unwrap();
6419 let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
6420 project
6421 .worktrees(cx)
6422 .map(|worktree| {
6423 let snapshot = worktree.read(cx).snapshot();
6424 (snapshot.id(), snapshot)
6425 })
6426 .collect::<BTreeMap<_, _>>()
6427 });
6428
6429 host_client
6430 .project
6431 .as_ref()
6432 .unwrap()
6433 .read_with(&host_cx, |project, cx| project.check_invariants(cx));
6434
6435 for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
6436 if let Some(guest_err) = guest_err {
6437 panic!("{} error - {:?}", guest_client.username, guest_err);
6438 }
6439 let worktree_snapshots =
6440 guest_client
6441 .project
6442 .as_ref()
6443 .unwrap()
6444 .read_with(&guest_cx, |project, cx| {
6445 project
6446 .worktrees(cx)
6447 .map(|worktree| {
6448 let worktree = worktree.read(cx);
6449 (worktree.id(), worktree.snapshot())
6450 })
6451 .collect::<BTreeMap<_, _>>()
6452 });
6453
6454 assert_eq!(
6455 worktree_snapshots.keys().collect::<Vec<_>>(),
6456 host_worktree_snapshots.keys().collect::<Vec<_>>(),
6457 "{} has different worktrees than the host",
6458 guest_client.username
6459 );
6460 for (id, host_snapshot) in &host_worktree_snapshots {
6461 let guest_snapshot = &worktree_snapshots[id];
6462 assert_eq!(
6463 guest_snapshot.root_name(),
6464 host_snapshot.root_name(),
6465 "{} has different root name than the host for worktree {}",
6466 guest_client.username,
6467 id
6468 );
6469 assert_eq!(
6470 guest_snapshot.entries(false).collect::<Vec<_>>(),
6471 host_snapshot.entries(false).collect::<Vec<_>>(),
6472 "{} has different snapshot than the host for worktree {}",
6473 guest_client.username,
6474 id
6475 );
6476 assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id());
6477 }
6478
6479 guest_client
6480 .project
6481 .as_ref()
6482 .unwrap()
6483 .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
6484
6485 for guest_buffer in &guest_client.buffers {
6486 let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
6487 let host_buffer = host_project.read_with(&host_cx, |project, cx| {
6488 project.buffer_for_id(buffer_id, cx).expect(&format!(
6489 "host does not have buffer for guest:{}, peer:{}, id:{}",
6490 guest_client.username, guest_client.peer_id, buffer_id
6491 ))
6492 });
6493 let path = host_buffer
6494 .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
6495
6496 assert_eq!(
6497 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
6498 0,
6499 "{}, buffer {}, path {:?} has deferred operations",
6500 guest_client.username,
6501 buffer_id,
6502 path,
6503 );
6504 assert_eq!(
6505 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
6506 host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
6507 "{}, buffer {}, path {:?}, differs from the host's buffer",
6508 guest_client.username,
6509 buffer_id,
6510 path
6511 );
6512 }
6513
6514 guest_cx.update(|_| drop(guest_client));
6515 }
6516
6517 host_cx.update(|_| drop(host_client));
6518 }
6519
6520 struct TestServer {
6521 peer: Arc<Peer>,
6522 app_state: Arc<AppState>,
6523 server: Arc<Server>,
6524 foreground: Rc<executor::Foreground>,
6525 notifications: mpsc::UnboundedReceiver<()>,
6526 connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
6527 forbid_connections: Arc<AtomicBool>,
6528 _test_db: TestDb,
6529 }
6530
6531 impl TestServer {
6532 async fn start(
6533 foreground: Rc<executor::Foreground>,
6534 background: Arc<executor::Background>,
6535 ) -> Self {
6536 let test_db = TestDb::fake(background);
6537 let app_state = Self::build_app_state(&test_db).await;
6538 let peer = Peer::new();
6539 let notifications = mpsc::unbounded();
6540 let server = Server::new(app_state.clone(), Some(notifications.0));
6541 Self {
6542 peer,
6543 app_state,
6544 server,
6545 foreground,
6546 notifications: notifications.1,
6547 connection_killers: Default::default(),
6548 forbid_connections: Default::default(),
6549 _test_db: test_db,
6550 }
6551 }
6552
6553 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
6554 cx.update(|cx| {
6555 let settings = Settings::test(cx);
6556 cx.set_global(settings);
6557 });
6558
6559 let http = FakeHttpClient::with_404_response();
6560 let user_id =
6561 if let Ok(Some(user)) = self.app_state.db.get_user_by_github_login(name).await {
6562 user.id
6563 } else {
6564 self.app_state.db.create_user(name, false).await.unwrap()
6565 };
6566 let client_name = name.to_string();
6567 let mut client = Client::new(http.clone());
6568 let server = self.server.clone();
6569 let db = self.app_state.db.clone();
6570 let connection_killers = self.connection_killers.clone();
6571 let forbid_connections = self.forbid_connections.clone();
6572 let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
6573
6574 Arc::get_mut(&mut client)
6575 .unwrap()
6576 .override_authenticate(move |cx| {
6577 cx.spawn(|_| async move {
6578 let access_token = "the-token".to_string();
6579 Ok(Credentials {
6580 user_id: user_id.0 as u64,
6581 access_token,
6582 })
6583 })
6584 })
6585 .override_establish_connection(move |credentials, cx| {
6586 assert_eq!(credentials.user_id, user_id.0 as u64);
6587 assert_eq!(credentials.access_token, "the-token");
6588
6589 let server = server.clone();
6590 let db = db.clone();
6591 let connection_killers = connection_killers.clone();
6592 let forbid_connections = forbid_connections.clone();
6593 let client_name = client_name.clone();
6594 let connection_id_tx = connection_id_tx.clone();
6595 cx.spawn(move |cx| async move {
6596 if forbid_connections.load(SeqCst) {
6597 Err(EstablishConnectionError::other(anyhow!(
6598 "server is forbidding connections"
6599 )))
6600 } else {
6601 let (client_conn, server_conn, killed) =
6602 Connection::in_memory(cx.background());
6603 connection_killers.lock().insert(user_id, killed);
6604 let user = db.get_user_by_id(user_id).await.unwrap().unwrap();
6605 cx.background()
6606 .spawn(server.handle_connection(
6607 server_conn,
6608 client_name,
6609 user,
6610 Some(connection_id_tx),
6611 cx.background(),
6612 ))
6613 .detach();
6614 Ok(client_conn)
6615 }
6616 })
6617 });
6618
6619 Channel::init(&client);
6620 Project::init(&client);
6621 cx.update(|cx| {
6622 workspace::init(&client, cx);
6623 });
6624
6625 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
6626 client
6627 .authenticate_and_connect(false, &cx.to_async())
6628 .await
6629 .unwrap();
6630 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
6631
6632 let client = TestClient {
6633 client,
6634 peer_id,
6635 username: name.to_string(),
6636 user_store,
6637 language_registry: Arc::new(LanguageRegistry::test()),
6638 project: Default::default(),
6639 buffers: Default::default(),
6640 };
6641 client.wait_for_current_user(cx).await;
6642 client
6643 }
6644
6645 fn disconnect_client(&self, user_id: UserId) {
6646 self.connection_killers
6647 .lock()
6648 .remove(&user_id)
6649 .unwrap()
6650 .store(true, SeqCst);
6651 }
6652
6653 fn forbid_connections(&self) {
6654 self.forbid_connections.store(true, SeqCst);
6655 }
6656
6657 fn allow_connections(&self) {
6658 self.forbid_connections.store(false, SeqCst);
6659 }
6660
6661 async fn make_contacts(&self, mut clients: Vec<(&TestClient, &mut TestAppContext)>) {
6662 while let Some((client_a, cx_a)) = clients.pop() {
6663 for (client_b, cx_b) in &mut clients {
6664 client_a
6665 .user_store
6666 .update(cx_a, |store, cx| {
6667 store.request_contact(client_b.user_id().unwrap(), cx)
6668 })
6669 .await
6670 .unwrap();
6671 cx_a.foreground().run_until_parked();
6672 client_b
6673 .user_store
6674 .update(*cx_b, |store, cx| {
6675 store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
6676 })
6677 .await
6678 .unwrap();
6679 }
6680 }
6681 }
6682
6683 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
6684 Arc::new(AppState {
6685 db: test_db.db().clone(),
6686 api_token: Default::default(),
6687 invite_link_prefix: Default::default(),
6688 })
6689 }
6690
6691 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
6692 self.server.store.read().await
6693 }
6694
6695 async fn condition<F>(&mut self, mut predicate: F)
6696 where
6697 F: FnMut(&Store) -> bool,
6698 {
6699 assert!(
6700 self.foreground.parking_forbidden(),
6701 "you must call forbid_parking to use server conditions so we don't block indefinitely"
6702 );
6703 while !(predicate)(&*self.server.store.read().await) {
6704 self.foreground.start_waiting();
6705 self.notifications.next().await;
6706 self.foreground.finish_waiting();
6707 }
6708 }
6709 }
6710
6711 impl Deref for TestServer {
6712 type Target = Server;
6713
6714 fn deref(&self) -> &Self::Target {
6715 &self.server
6716 }
6717 }
6718
6719 impl Drop for TestServer {
6720 fn drop(&mut self) {
6721 self.peer.reset();
6722 }
6723 }
6724
6725 struct TestClient {
6726 client: Arc<Client>,
6727 username: String,
6728 pub peer_id: PeerId,
6729 pub user_store: ModelHandle<UserStore>,
6730 language_registry: Arc<LanguageRegistry>,
6731 project: Option<ModelHandle<Project>>,
6732 buffers: HashSet<ModelHandle<language::Buffer>>,
6733 }
6734
6735 impl Deref for TestClient {
6736 type Target = Arc<Client>;
6737
6738 fn deref(&self) -> &Self::Target {
6739 &self.client
6740 }
6741 }
6742
6743 struct ContactsSummary {
6744 pub current: Vec<String>,
6745 pub outgoing_requests: Vec<String>,
6746 pub incoming_requests: Vec<String>,
6747 }
6748
6749 impl TestClient {
6750 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
6751 UserId::from_proto(
6752 self.user_store
6753 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
6754 )
6755 }
6756
6757 async fn wait_for_current_user(&self, cx: &TestAppContext) {
6758 let mut authed_user = self
6759 .user_store
6760 .read_with(cx, |user_store, _| user_store.watch_current_user());
6761 while authed_user.next().await.unwrap().is_none() {}
6762 }
6763
6764 async fn clear_contacts(&self, cx: &mut TestAppContext) {
6765 self.user_store
6766 .update(cx, |store, _| store.clear_contacts())
6767 .await;
6768 }
6769
6770 fn summarize_contacts(&self, cx: &TestAppContext) -> ContactsSummary {
6771 self.user_store.read_with(cx, |store, _| ContactsSummary {
6772 current: store
6773 .contacts()
6774 .iter()
6775 .map(|contact| contact.user.github_login.clone())
6776 .collect(),
6777 outgoing_requests: store
6778 .outgoing_contact_requests()
6779 .iter()
6780 .map(|user| user.github_login.clone())
6781 .collect(),
6782 incoming_requests: store
6783 .incoming_contact_requests()
6784 .iter()
6785 .map(|user| user.github_login.clone())
6786 .collect(),
6787 })
6788 }
6789
6790 async fn build_local_project(
6791 &mut self,
6792 fs: Arc<FakeFs>,
6793 root_path: impl AsRef<Path>,
6794 cx: &mut TestAppContext,
6795 ) -> (ModelHandle<Project>, WorktreeId) {
6796 let project = cx.update(|cx| {
6797 Project::local(
6798 self.client.clone(),
6799 self.user_store.clone(),
6800 self.language_registry.clone(),
6801 fs,
6802 cx,
6803 )
6804 });
6805 self.project = Some(project.clone());
6806 let (worktree, _) = project
6807 .update(cx, |p, cx| {
6808 p.find_or_create_local_worktree(root_path, true, cx)
6809 })
6810 .await
6811 .unwrap();
6812 worktree
6813 .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
6814 .await;
6815 project
6816 .update(cx, |project, _| project.next_remote_id())
6817 .await;
6818 (project, worktree.read_with(cx, |tree, _| tree.id()))
6819 }
6820
6821 async fn build_remote_project(
6822 &mut self,
6823 host_project: &ModelHandle<Project>,
6824 host_cx: &mut TestAppContext,
6825 guest_cx: &mut TestAppContext,
6826 ) -> ModelHandle<Project> {
6827 let host_project_id = host_project
6828 .read_with(host_cx, |project, _| project.next_remote_id())
6829 .await;
6830 let guest_user_id = self.user_id().unwrap();
6831 let languages =
6832 host_project.read_with(host_cx, |project, _| project.languages().clone());
6833 let project_b = guest_cx.spawn(|mut cx| {
6834 let user_store = self.user_store.clone();
6835 let guest_client = self.client.clone();
6836 async move {
6837 Project::remote(
6838 host_project_id,
6839 guest_client,
6840 user_store.clone(),
6841 languages,
6842 FakeFs::new(cx.background()),
6843 &mut cx,
6844 )
6845 .await
6846 .unwrap()
6847 }
6848 });
6849 host_cx.foreground().run_until_parked();
6850 host_project.update(host_cx, |project, cx| {
6851 project.respond_to_join_request(guest_user_id, true, cx)
6852 });
6853 let project = project_b.await;
6854 self.project = Some(project.clone());
6855 project
6856 }
6857
6858 fn build_workspace(
6859 &self,
6860 project: &ModelHandle<Project>,
6861 cx: &mut TestAppContext,
6862 ) -> ViewHandle<Workspace> {
6863 let (window_id, _) = cx.add_window(|_| EmptyView);
6864 cx.add_view(window_id, |cx| {
6865 let fs = project.read(cx).fs().clone();
6866 Workspace::new(
6867 &WorkspaceParams {
6868 fs,
6869 project: project.clone(),
6870 user_store: self.user_store.clone(),
6871 languages: self.language_registry.clone(),
6872 themes: ThemeRegistry::new((), cx.font_cache().clone()),
6873 channel_list: cx.add_model(|cx| {
6874 ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
6875 }),
6876 client: self.client.clone(),
6877 },
6878 cx,
6879 )
6880 })
6881 }
6882
6883 async fn simulate_host(
6884 mut self,
6885 project: ModelHandle<Project>,
6886 op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6887 rng: Arc<Mutex<StdRng>>,
6888 mut cx: TestAppContext,
6889 ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6890 async fn simulate_host_internal(
6891 client: &mut TestClient,
6892 project: ModelHandle<Project>,
6893 mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6894 rng: Arc<Mutex<StdRng>>,
6895 cx: &mut TestAppContext,
6896 ) -> anyhow::Result<()> {
6897 let fs = project.read_with(cx, |project, _| project.fs().clone());
6898
6899 cx.update(|cx| {
6900 cx.subscribe(&project, move |project, event, cx| {
6901 if let project::Event::ContactRequestedJoin(user) = event {
6902 log::info!("Host: accepting join request from {}", user.github_login);
6903 project.update(cx, |project, cx| {
6904 project.respond_to_join_request(user.id, true, cx)
6905 });
6906 }
6907 })
6908 .detach();
6909 });
6910
6911 while op_start_signal.next().await.is_some() {
6912 let distribution = rng.lock().gen_range::<usize, _>(0..100);
6913 let files = fs.as_fake().files().await;
6914 match distribution {
6915 0..=19 if !files.is_empty() => {
6916 let path = files.choose(&mut *rng.lock()).unwrap();
6917 let mut path = path.as_path();
6918 while let Some(parent_path) = path.parent() {
6919 path = parent_path;
6920 if rng.lock().gen() {
6921 break;
6922 }
6923 }
6924
6925 log::info!("Host: find/create local worktree {:?}", path);
6926 let find_or_create_worktree = project.update(cx, |project, cx| {
6927 project.find_or_create_local_worktree(path, true, cx)
6928 });
6929 if rng.lock().gen() {
6930 cx.background().spawn(find_or_create_worktree).detach();
6931 } else {
6932 find_or_create_worktree.await?;
6933 }
6934 }
6935 20..=79 if !files.is_empty() => {
6936 let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6937 let file = files.choose(&mut *rng.lock()).unwrap();
6938 let (worktree, path) = project
6939 .update(cx, |project, cx| {
6940 project.find_or_create_local_worktree(
6941 file.clone(),
6942 true,
6943 cx,
6944 )
6945 })
6946 .await?;
6947 let project_path =
6948 worktree.read_with(cx, |worktree, _| (worktree.id(), path));
6949 log::info!(
6950 "Host: opening path {:?}, worktree {}, relative_path {:?}",
6951 file,
6952 project_path.0,
6953 project_path.1
6954 );
6955 let buffer = project
6956 .update(cx, |project, cx| project.open_buffer(project_path, cx))
6957 .await
6958 .unwrap();
6959 client.buffers.insert(buffer.clone());
6960 buffer
6961 } else {
6962 client
6963 .buffers
6964 .iter()
6965 .choose(&mut *rng.lock())
6966 .unwrap()
6967 .clone()
6968 };
6969
6970 if rng.lock().gen_bool(0.1) {
6971 cx.update(|cx| {
6972 log::info!(
6973 "Host: dropping buffer {:?}",
6974 buffer.read(cx).file().unwrap().full_path(cx)
6975 );
6976 client.buffers.remove(&buffer);
6977 drop(buffer);
6978 });
6979 } else {
6980 buffer.update(cx, |buffer, cx| {
6981 log::info!(
6982 "Host: updating buffer {:?} ({})",
6983 buffer.file().unwrap().full_path(cx),
6984 buffer.remote_id()
6985 );
6986
6987 if rng.lock().gen_bool(0.7) {
6988 buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6989 } else {
6990 buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6991 }
6992 });
6993 }
6994 }
6995 _ => loop {
6996 let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
6997 let mut path = PathBuf::new();
6998 path.push("/");
6999 for _ in 0..path_component_count {
7000 let letter = rng.lock().gen_range(b'a'..=b'z');
7001 path.push(std::str::from_utf8(&[letter]).unwrap());
7002 }
7003 path.set_extension("rs");
7004 let parent_path = path.parent().unwrap();
7005
7006 log::info!("Host: creating file {:?}", path,);
7007
7008 if fs.create_dir(&parent_path).await.is_ok()
7009 && fs.create_file(&path, Default::default()).await.is_ok()
7010 {
7011 break;
7012 } else {
7013 log::info!("Host: cannot create file");
7014 }
7015 },
7016 }
7017
7018 cx.background().simulate_random_delay().await;
7019 }
7020
7021 Ok(())
7022 }
7023
7024 let result =
7025 simulate_host_internal(&mut self, project.clone(), op_start_signal, rng, &mut cx)
7026 .await;
7027 log::info!("Host done");
7028 self.project = Some(project);
7029 (self, cx, result.err())
7030 }
7031
7032 pub async fn simulate_guest(
7033 mut self,
7034 guest_username: String,
7035 project: ModelHandle<Project>,
7036 op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
7037 rng: Arc<Mutex<StdRng>>,
7038 mut cx: TestAppContext,
7039 ) -> (Self, TestAppContext, Option<anyhow::Error>) {
7040 async fn simulate_guest_internal(
7041 client: &mut TestClient,
7042 guest_username: &str,
7043 project: ModelHandle<Project>,
7044 mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
7045 rng: Arc<Mutex<StdRng>>,
7046 cx: &mut TestAppContext,
7047 ) -> anyhow::Result<()> {
7048 while op_start_signal.next().await.is_some() {
7049 let buffer = if client.buffers.is_empty() || rng.lock().gen() {
7050 let worktree = if let Some(worktree) =
7051 project.read_with(cx, |project, cx| {
7052 project
7053 .worktrees(&cx)
7054 .filter(|worktree| {
7055 let worktree = worktree.read(cx);
7056 worktree.is_visible()
7057 && worktree.entries(false).any(|e| e.is_file())
7058 })
7059 .choose(&mut *rng.lock())
7060 }) {
7061 worktree
7062 } else {
7063 cx.background().simulate_random_delay().await;
7064 continue;
7065 };
7066
7067 let (worktree_root_name, project_path) =
7068 worktree.read_with(cx, |worktree, _| {
7069 let entry = worktree
7070 .entries(false)
7071 .filter(|e| e.is_file())
7072 .choose(&mut *rng.lock())
7073 .unwrap();
7074 (
7075 worktree.root_name().to_string(),
7076 (worktree.id(), entry.path.clone()),
7077 )
7078 });
7079 log::info!(
7080 "{}: opening path {:?} in worktree {} ({})",
7081 guest_username,
7082 project_path.1,
7083 project_path.0,
7084 worktree_root_name,
7085 );
7086 let buffer = project
7087 .update(cx, |project, cx| {
7088 project.open_buffer(project_path.clone(), cx)
7089 })
7090 .await?;
7091 log::info!(
7092 "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
7093 guest_username,
7094 project_path.1,
7095 project_path.0,
7096 worktree_root_name,
7097 buffer.read_with(cx, |buffer, _| buffer.remote_id())
7098 );
7099 client.buffers.insert(buffer.clone());
7100 buffer
7101 } else {
7102 client
7103 .buffers
7104 .iter()
7105 .choose(&mut *rng.lock())
7106 .unwrap()
7107 .clone()
7108 };
7109
7110 let choice = rng.lock().gen_range(0..100);
7111 match choice {
7112 0..=9 => {
7113 cx.update(|cx| {
7114 log::info!(
7115 "{}: dropping buffer {:?}",
7116 guest_username,
7117 buffer.read(cx).file().unwrap().full_path(cx)
7118 );
7119 client.buffers.remove(&buffer);
7120 drop(buffer);
7121 });
7122 }
7123 10..=19 => {
7124 let completions = project.update(cx, |project, cx| {
7125 log::info!(
7126 "{}: requesting completions for buffer {} ({:?})",
7127 guest_username,
7128 buffer.read(cx).remote_id(),
7129 buffer.read(cx).file().unwrap().full_path(cx)
7130 );
7131 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7132 project.completions(&buffer, offset, cx)
7133 });
7134 let completions = cx.background().spawn(async move {
7135 completions
7136 .await
7137 .map_err(|err| anyhow!("completions request failed: {:?}", err))
7138 });
7139 if rng.lock().gen_bool(0.3) {
7140 log::info!("{}: detaching completions request", guest_username);
7141 cx.update(|cx| completions.detach_and_log_err(cx));
7142 } else {
7143 completions.await?;
7144 }
7145 }
7146 20..=29 => {
7147 let code_actions = project.update(cx, |project, cx| {
7148 log::info!(
7149 "{}: requesting code actions for buffer {} ({:?})",
7150 guest_username,
7151 buffer.read(cx).remote_id(),
7152 buffer.read(cx).file().unwrap().full_path(cx)
7153 );
7154 let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
7155 project.code_actions(&buffer, range, cx)
7156 });
7157 let code_actions = cx.background().spawn(async move {
7158 code_actions.await.map_err(|err| {
7159 anyhow!("code actions request failed: {:?}", err)
7160 })
7161 });
7162 if rng.lock().gen_bool(0.3) {
7163 log::info!("{}: detaching code actions request", guest_username);
7164 cx.update(|cx| code_actions.detach_and_log_err(cx));
7165 } else {
7166 code_actions.await?;
7167 }
7168 }
7169 30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
7170 let (requested_version, save) = buffer.update(cx, |buffer, cx| {
7171 log::info!(
7172 "{}: saving buffer {} ({:?})",
7173 guest_username,
7174 buffer.remote_id(),
7175 buffer.file().unwrap().full_path(cx)
7176 );
7177 (buffer.version(), buffer.save(cx))
7178 });
7179 let save = cx.background().spawn(async move {
7180 let (saved_version, _) = save
7181 .await
7182 .map_err(|err| anyhow!("save request failed: {:?}", err))?;
7183 assert!(saved_version.observed_all(&requested_version));
7184 Ok::<_, anyhow::Error>(())
7185 });
7186 if rng.lock().gen_bool(0.3) {
7187 log::info!("{}: detaching save request", guest_username);
7188 cx.update(|cx| save.detach_and_log_err(cx));
7189 } else {
7190 save.await?;
7191 }
7192 }
7193 40..=44 => {
7194 let prepare_rename = project.update(cx, |project, cx| {
7195 log::info!(
7196 "{}: preparing rename for buffer {} ({:?})",
7197 guest_username,
7198 buffer.read(cx).remote_id(),
7199 buffer.read(cx).file().unwrap().full_path(cx)
7200 );
7201 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7202 project.prepare_rename(buffer, offset, cx)
7203 });
7204 let prepare_rename = cx.background().spawn(async move {
7205 prepare_rename.await.map_err(|err| {
7206 anyhow!("prepare rename request failed: {:?}", err)
7207 })
7208 });
7209 if rng.lock().gen_bool(0.3) {
7210 log::info!("{}: detaching prepare rename request", guest_username);
7211 cx.update(|cx| prepare_rename.detach_and_log_err(cx));
7212 } else {
7213 prepare_rename.await?;
7214 }
7215 }
7216 45..=49 => {
7217 let definitions = project.update(cx, |project, cx| {
7218 log::info!(
7219 "{}: requesting definitions for buffer {} ({:?})",
7220 guest_username,
7221 buffer.read(cx).remote_id(),
7222 buffer.read(cx).file().unwrap().full_path(cx)
7223 );
7224 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7225 project.definition(&buffer, offset, cx)
7226 });
7227 let definitions = cx.background().spawn(async move {
7228 definitions
7229 .await
7230 .map_err(|err| anyhow!("definitions request failed: {:?}", err))
7231 });
7232 if rng.lock().gen_bool(0.3) {
7233 log::info!("{}: detaching definitions request", guest_username);
7234 cx.update(|cx| definitions.detach_and_log_err(cx));
7235 } else {
7236 client
7237 .buffers
7238 .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
7239 }
7240 }
7241 50..=54 => {
7242 let highlights = project.update(cx, |project, cx| {
7243 log::info!(
7244 "{}: requesting highlights for buffer {} ({:?})",
7245 guest_username,
7246 buffer.read(cx).remote_id(),
7247 buffer.read(cx).file().unwrap().full_path(cx)
7248 );
7249 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7250 project.document_highlights(&buffer, offset, cx)
7251 });
7252 let highlights = cx.background().spawn(async move {
7253 highlights
7254 .await
7255 .map_err(|err| anyhow!("highlights request failed: {:?}", err))
7256 });
7257 if rng.lock().gen_bool(0.3) {
7258 log::info!("{}: detaching highlights request", guest_username);
7259 cx.update(|cx| highlights.detach_and_log_err(cx));
7260 } else {
7261 highlights.await?;
7262 }
7263 }
7264 55..=59 => {
7265 let search = project.update(cx, |project, cx| {
7266 let query = rng.lock().gen_range('a'..='z');
7267 log::info!("{}: project-wide search {:?}", guest_username, query);
7268 project.search(SearchQuery::text(query, false, false), cx)
7269 });
7270 let search = cx.background().spawn(async move {
7271 search
7272 .await
7273 .map_err(|err| anyhow!("search request failed: {:?}", err))
7274 });
7275 if rng.lock().gen_bool(0.3) {
7276 log::info!("{}: detaching search request", guest_username);
7277 cx.update(|cx| search.detach_and_log_err(cx));
7278 } else {
7279 client.buffers.extend(search.await?.into_keys());
7280 }
7281 }
7282 60..=69 => {
7283 let worktree = project
7284 .read_with(cx, |project, cx| {
7285 project
7286 .worktrees(&cx)
7287 .filter(|worktree| {
7288 let worktree = worktree.read(cx);
7289 worktree.is_visible()
7290 && worktree.entries(false).any(|e| e.is_file())
7291 && worktree
7292 .root_entry()
7293 .map_or(false, |e| e.is_dir())
7294 })
7295 .choose(&mut *rng.lock())
7296 })
7297 .unwrap();
7298 let (worktree_id, worktree_root_name) = worktree
7299 .read_with(cx, |worktree, _| {
7300 (worktree.id(), worktree.root_name().to_string())
7301 });
7302
7303 let mut new_name = String::new();
7304 for _ in 0..10 {
7305 let letter = rng.lock().gen_range('a'..='z');
7306 new_name.push(letter);
7307 }
7308 let mut new_path = PathBuf::new();
7309 new_path.push(new_name);
7310 new_path.set_extension("rs");
7311 log::info!(
7312 "{}: creating {:?} in worktree {} ({})",
7313 guest_username,
7314 new_path,
7315 worktree_id,
7316 worktree_root_name,
7317 );
7318 project
7319 .update(cx, |project, cx| {
7320 project.create_entry((worktree_id, new_path), false, cx)
7321 })
7322 .unwrap()
7323 .await?;
7324 }
7325 _ => {
7326 buffer.update(cx, |buffer, cx| {
7327 log::info!(
7328 "{}: updating buffer {} ({:?})",
7329 guest_username,
7330 buffer.remote_id(),
7331 buffer.file().unwrap().full_path(cx)
7332 );
7333 if rng.lock().gen_bool(0.7) {
7334 buffer.randomly_edit(&mut *rng.lock(), 5, cx);
7335 } else {
7336 buffer.randomly_undo_redo(&mut *rng.lock(), cx);
7337 }
7338 });
7339 }
7340 }
7341 cx.background().simulate_random_delay().await;
7342 }
7343 Ok(())
7344 }
7345
7346 let result = simulate_guest_internal(
7347 &mut self,
7348 &guest_username,
7349 project.clone(),
7350 op_start_signal,
7351 rng,
7352 &mut cx,
7353 )
7354 .await;
7355 log::info!("{}: done", guest_username);
7356
7357 self.project = Some(project);
7358 (self, cx, result.err())
7359 }
7360 }
7361
7362 impl Drop for TestClient {
7363 fn drop(&mut self) {
7364 self.client.tear_down();
7365 }
7366 }
7367
7368 impl Executor for Arc<gpui::executor::Background> {
7369 type Sleep = gpui::executor::Timer;
7370
7371 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
7372 self.spawn(future).detach();
7373 }
7374
7375 fn sleep(&self, duration: Duration) -> Self::Sleep {
7376 self.as_ref().timer(duration)
7377 }
7378 }
7379
7380 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
7381 channel
7382 .messages()
7383 .cursor::<()>()
7384 .map(|m| {
7385 (
7386 m.sender.github_login.clone(),
7387 m.body.clone(),
7388 m.is_pending(),
7389 )
7390 })
7391 .collect()
7392 }
7393
7394 struct EmptyView;
7395
7396 impl gpui::Entity for EmptyView {
7397 type Event = ();
7398 }
7399
7400 impl gpui::View for EmptyView {
7401 fn ui_name() -> &'static str {
7402 "empty view"
7403 }
7404
7405 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
7406 gpui::Element::boxed(gpui::elements::Empty::new())
7407 }
7408 }
7409}