1mod store;
2
3use crate::{
4 auth,
5 db::{self, ChannelId, MessageId, UserId},
6 AppState, Result,
7};
8use anyhow::anyhow;
9use async_tungstenite::tungstenite::{
10 protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
11};
12use axum::{
13 body::Body,
14 extract::{
15 ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
16 ConnectInfo, WebSocketUpgrade,
17 },
18 headers::{Header, HeaderName},
19 http::StatusCode,
20 middleware,
21 response::IntoResponse,
22 routing::get,
23 Extension, Router, TypedHeader,
24};
25use collections::HashMap;
26use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
27use lazy_static::lazy_static;
28use rpc::{
29 proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
30 Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
31};
32use std::{
33 any::TypeId,
34 future::Future,
35 marker::PhantomData,
36 net::SocketAddr,
37 ops::{Deref, DerefMut},
38 rc::Rc,
39 sync::{
40 atomic::{AtomicBool, Ordering::SeqCst},
41 Arc,
42 },
43 time::Duration,
44};
45use store::{Store, Worktree};
46use time::OffsetDateTime;
47use tokio::{
48 sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
49 time::Sleep,
50};
51use tower::ServiceBuilder;
52use tracing::{info_span, Instrument};
53
54type MessageHandler =
55 Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
56
57struct Response<R> {
58 server: Arc<Server>,
59 receipt: Receipt<R>,
60 responded: Arc<AtomicBool>,
61}
62
63impl<R: RequestMessage> Response<R> {
64 fn send(self, payload: R::Response) -> Result<()> {
65 self.responded.store(true, SeqCst);
66 self.server.peer.respond(self.receipt, payload)?;
67 Ok(())
68 }
69
70 fn into_receipt(self) -> Receipt<R> {
71 self.responded.store(true, SeqCst);
72 self.receipt
73 }
74}
75
76pub struct Server {
77 peer: Arc<Peer>,
78 store: RwLock<Store>,
79 app_state: Arc<AppState>,
80 handlers: HashMap<TypeId, MessageHandler>,
81 notifications: Option<mpsc::UnboundedSender<()>>,
82}
83
84pub trait Executor: Send + Clone {
85 type Sleep: Send + Future;
86 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
87 fn sleep(&self, duration: Duration) -> Self::Sleep;
88}
89
90#[derive(Clone)]
91pub struct RealExecutor;
92
93const MESSAGE_COUNT_PER_PAGE: usize = 100;
94const MAX_MESSAGE_LEN: usize = 1024;
95
96struct StoreReadGuard<'a> {
97 guard: RwLockReadGuard<'a, Store>,
98 _not_send: PhantomData<Rc<()>>,
99}
100
101struct StoreWriteGuard<'a> {
102 guard: RwLockWriteGuard<'a, Store>,
103 _not_send: PhantomData<Rc<()>>,
104}
105
106impl Server {
107 pub fn new(
108 app_state: Arc<AppState>,
109 notifications: Option<mpsc::UnboundedSender<()>>,
110 ) -> Arc<Self> {
111 let mut server = Self {
112 peer: Peer::new(),
113 app_state,
114 store: Default::default(),
115 handlers: Default::default(),
116 notifications,
117 };
118
119 server
120 .add_request_handler(Server::ping)
121 .add_request_handler(Server::register_project)
122 .add_message_handler(Server::unregister_project)
123 .add_request_handler(Server::join_project)
124 .add_message_handler(Server::leave_project)
125 .add_message_handler(Server::respond_to_join_project_request)
126 .add_request_handler(Server::register_worktree)
127 .add_message_handler(Server::unregister_worktree)
128 .add_request_handler(Server::update_worktree)
129 .add_message_handler(Server::start_language_server)
130 .add_message_handler(Server::update_language_server)
131 .add_message_handler(Server::update_diagnostic_summary)
132 .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
133 .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
134 .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
135 .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
136 .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
137 .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
138 .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
139 .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
140 .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
141 .add_request_handler(
142 Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
143 )
144 .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
145 .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
146 .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
147 .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
148 .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
149 .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
150 .add_request_handler(Server::forward_project_request::<proto::CreateProjectEntry>)
151 .add_request_handler(Server::forward_project_request::<proto::RenameProjectEntry>)
152 .add_request_handler(Server::forward_project_request::<proto::DeleteProjectEntry>)
153 .add_request_handler(Server::update_buffer)
154 .add_message_handler(Server::update_buffer_file)
155 .add_message_handler(Server::buffer_reloaded)
156 .add_message_handler(Server::buffer_saved)
157 .add_request_handler(Server::save_buffer)
158 .add_request_handler(Server::get_channels)
159 .add_request_handler(Server::get_users)
160 .add_request_handler(Server::fuzzy_search_users)
161 .add_request_handler(Server::request_contact)
162 .add_request_handler(Server::remove_contact)
163 .add_request_handler(Server::respond_to_contact_request)
164 .add_request_handler(Server::join_channel)
165 .add_message_handler(Server::leave_channel)
166 .add_request_handler(Server::send_channel_message)
167 .add_request_handler(Server::follow)
168 .add_message_handler(Server::unfollow)
169 .add_message_handler(Server::update_followers)
170 .add_request_handler(Server::get_channel_messages);
171
172 Arc::new(server)
173 }
174
175 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
176 where
177 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
178 Fut: 'static + Send + Future<Output = Result<()>>,
179 M: EnvelopedMessage,
180 {
181 let prev_handler = self.handlers.insert(
182 TypeId::of::<M>(),
183 Box::new(move |server, envelope| {
184 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
185 let span = info_span!(
186 "handle message",
187 payload_type = envelope.payload_type_name(),
188 payload = format!("{:?}", envelope.payload).as_str(),
189 );
190 let future = (handler)(server, *envelope);
191 async move {
192 if let Err(error) = future.await {
193 tracing::error!(%error, "error handling message");
194 }
195 }
196 .instrument(span)
197 .boxed()
198 }),
199 );
200 if prev_handler.is_some() {
201 panic!("registered a handler for the same message twice");
202 }
203 self
204 }
205
206 /// Handle a request while holding a lock to the store. This is useful when we're registering
207 /// a connection but we want to respond on the connection before anybody else can send on it.
208 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
209 where
210 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>, Response<M>) -> Fut,
211 Fut: Send + Future<Output = Result<()>>,
212 M: RequestMessage,
213 {
214 let handler = Arc::new(handler);
215 self.add_message_handler(move |server, envelope| {
216 let receipt = envelope.receipt();
217 let handler = handler.clone();
218 async move {
219 let responded = Arc::new(AtomicBool::default());
220 let response = Response {
221 server: server.clone(),
222 responded: responded.clone(),
223 receipt: envelope.receipt(),
224 };
225 match (handler)(server.clone(), envelope, response).await {
226 Ok(()) => {
227 if responded.load(std::sync::atomic::Ordering::SeqCst) {
228 Ok(())
229 } else {
230 Err(anyhow!("handler did not send a response"))?
231 }
232 }
233 Err(error) => {
234 server.peer.respond_with_error(
235 receipt,
236 proto::Error {
237 message: error.to_string(),
238 },
239 )?;
240 Err(error)
241 }
242 }
243 }
244 })
245 }
246
247 pub fn handle_connection<E: Executor>(
248 self: &Arc<Self>,
249 connection: Connection,
250 address: String,
251 user_id: UserId,
252 mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
253 executor: E,
254 ) -> impl Future<Output = Result<()>> {
255 let mut this = self.clone();
256 let span = info_span!("handle connection", %user_id, %address);
257 async move {
258 let (connection_id, handle_io, mut incoming_rx) = this
259 .peer
260 .add_connection(connection, {
261 let executor = executor.clone();
262 move |duration| {
263 let timer = executor.sleep(duration);
264 async move {
265 timer.await;
266 }
267 }
268 })
269 .await;
270
271 tracing::info!(%user_id, %connection_id, %address, "connection opened");
272
273 if let Some(send_connection_id) = send_connection_id.as_mut() {
274 let _ = send_connection_id.send(connection_id).await;
275 }
276
277 let contacts = this.app_state.db.get_contacts(user_id).await?;
278
279 {
280 let mut store = this.store_mut().await;
281 store.add_connection(connection_id, user_id);
282 this.peer.send(connection_id, store.build_initial_contacts_update(contacts))?;
283 }
284 this.update_user_contacts(user_id).await?;
285
286 let handle_io = handle_io.fuse();
287 futures::pin_mut!(handle_io);
288 loop {
289 let next_message = incoming_rx.next().fuse();
290 futures::pin_mut!(next_message);
291 futures::select_biased! {
292 result = handle_io => {
293 if let Err(error) = result {
294 tracing::error!(%error, "error handling I/O");
295 }
296 break;
297 }
298 message = next_message => {
299 if let Some(message) = message {
300 let type_name = message.payload_type_name();
301 let span = tracing::info_span!("receive message", %user_id, %connection_id, %address, type_name);
302 async {
303 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
304 let notifications = this.notifications.clone();
305 let is_background = message.is_background();
306 let handle_message = (handler)(this.clone(), message);
307 let handle_message = async move {
308 handle_message.await;
309 if let Some(mut notifications) = notifications {
310 let _ = notifications.send(()).await;
311 }
312 };
313 if is_background {
314 executor.spawn_detached(handle_message);
315 } else {
316 handle_message.await;
317 }
318 } else {
319 tracing::error!("no message handler");
320 }
321 }.instrument(span).await;
322 } else {
323 tracing::info!(%user_id, %connection_id, %address, "connection closed");
324 break;
325 }
326 }
327 }
328 }
329
330 if let Err(error) = this.sign_out(connection_id).await {
331 tracing::error!(%error, "error signing out");
332 }
333
334 Ok(())
335 }.instrument(span)
336 }
337
338 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
339 self.peer.disconnect(connection_id);
340
341 let removed_user_id = {
342 let mut store = self.store_mut().await;
343 let removed_connection = store.remove_connection(connection_id)?;
344
345 for (project_id, project) in removed_connection.hosted_projects {
346 broadcast(connection_id, project.guests.keys().copied(), |conn_id| {
347 self.peer
348 .send(conn_id, proto::UnregisterProject { project_id })
349 });
350
351 for (_, receipts) in project.join_requests {
352 for receipt in receipts {
353 self.peer.respond(
354 receipt,
355 proto::JoinProjectResponse {
356 variant: Some(proto::join_project_response::Variant::Decline(
357 proto::join_project_response::Decline {
358 reason: proto::join_project_response::decline::Reason::WentOffline as i32
359 },
360 )),
361 },
362 )?;
363 }
364 }
365 }
366
367 for project_id in removed_connection.guest_project_ids {
368 if let Some(project) = store.project(project_id).trace_err() {
369 broadcast(connection_id, project.connection_ids(), |conn_id| {
370 self.peer.send(
371 conn_id,
372 proto::RemoveProjectCollaborator {
373 project_id,
374 peer_id: connection_id.0,
375 },
376 )
377 });
378 if project.guests.is_empty() {
379 self.peer
380 .send(
381 project.host_connection_id,
382 proto::ProjectUnshared { project_id },
383 )
384 .trace_err();
385 }
386 }
387 }
388
389 removed_connection.user_id
390 };
391
392 self.update_user_contacts(removed_user_id).await?;
393
394 Ok(())
395 }
396
397 async fn ping(
398 self: Arc<Server>,
399 _: TypedEnvelope<proto::Ping>,
400 response: Response<proto::Ping>,
401 ) -> Result<()> {
402 response.send(proto::Ack {})?;
403 Ok(())
404 }
405
406 async fn register_project(
407 self: Arc<Server>,
408 request: TypedEnvelope<proto::RegisterProject>,
409 response: Response<proto::RegisterProject>,
410 ) -> Result<()> {
411 let user_id;
412 let project_id;
413 {
414 let mut state = self.store_mut().await;
415 user_id = state.user_id_for_connection(request.sender_id)?;
416 project_id = state.register_project(request.sender_id, user_id);
417 };
418 self.update_user_contacts(user_id).await?;
419 response.send(proto::RegisterProjectResponse { project_id })?;
420 Ok(())
421 }
422
423 async fn unregister_project(
424 self: Arc<Server>,
425 request: TypedEnvelope<proto::UnregisterProject>,
426 ) -> Result<()> {
427 let (user_id, project) = {
428 let mut state = self.store_mut().await;
429 let project =
430 state.unregister_project(request.payload.project_id, request.sender_id)?;
431 (state.user_id_for_connection(request.sender_id)?, project)
432 };
433 for (_, receipts) in project.join_requests {
434 for receipt in receipts {
435 self.peer.respond(
436 receipt,
437 proto::JoinProjectResponse {
438 variant: Some(proto::join_project_response::Variant::Decline(
439 proto::join_project_response::Decline {
440 reason: proto::join_project_response::decline::Reason::Closed
441 as i32,
442 },
443 )),
444 },
445 )?;
446 }
447 }
448
449 self.update_user_contacts(user_id).await?;
450 Ok(())
451 }
452
453 async fn update_user_contacts(self: &Arc<Server>, user_id: UserId) -> Result<()> {
454 let contacts = self.app_state.db.get_contacts(user_id).await?;
455 let store = self.store().await;
456 let updated_contact = store.contact_for_user(user_id, false);
457 for contact in contacts {
458 if let db::Contact::Accepted {
459 user_id: contact_user_id,
460 ..
461 } = contact
462 {
463 for contact_conn_id in store.connection_ids_for_user(contact_user_id) {
464 self.peer
465 .send(
466 contact_conn_id,
467 proto::UpdateContacts {
468 contacts: vec![updated_contact.clone()],
469 remove_contacts: Default::default(),
470 incoming_requests: Default::default(),
471 remove_incoming_requests: Default::default(),
472 outgoing_requests: Default::default(),
473 remove_outgoing_requests: Default::default(),
474 },
475 )
476 .trace_err();
477 }
478 }
479 }
480 Ok(())
481 }
482
483 async fn join_project(
484 self: Arc<Server>,
485 request: TypedEnvelope<proto::JoinProject>,
486 response: Response<proto::JoinProject>,
487 ) -> Result<()> {
488 let project_id = request.payload.project_id;
489 let host_user_id;
490 let guest_user_id;
491 let host_connection_id;
492 {
493 let state = self.store().await;
494 let project = state.project(project_id)?;
495 host_user_id = project.host_user_id;
496 host_connection_id = project.host_connection_id;
497 guest_user_id = state.user_id_for_connection(request.sender_id)?;
498 };
499
500 let has_contact = self
501 .app_state
502 .db
503 .has_contact(guest_user_id, host_user_id)
504 .await?;
505 if !has_contact {
506 return Err(anyhow!("no such project"))?;
507 }
508
509 self.store_mut().await.request_join_project(
510 guest_user_id,
511 project_id,
512 response.into_receipt(),
513 )?;
514 self.peer.send(
515 host_connection_id,
516 proto::RequestJoinProject {
517 project_id,
518 requester_id: guest_user_id.to_proto(),
519 },
520 )?;
521 Ok(())
522 }
523
524 async fn respond_to_join_project_request(
525 self: Arc<Server>,
526 request: TypedEnvelope<proto::RespondToJoinProjectRequest>,
527 ) -> Result<()> {
528 let host_user_id;
529
530 {
531 let mut state = self.store_mut().await;
532 let project_id = request.payload.project_id;
533 let project = state.project(project_id)?;
534 if project.host_connection_id != request.sender_id {
535 Err(anyhow!("no such connection"))?;
536 }
537
538 host_user_id = project.host_user_id;
539 let guest_user_id = UserId::from_proto(request.payload.requester_id);
540
541 if !request.payload.allow {
542 let receipts = state
543 .deny_join_project_request(request.sender_id, guest_user_id, project_id)
544 .ok_or_else(|| anyhow!("no such request"))?;
545 for receipt in receipts {
546 self.peer.respond(
547 receipt,
548 proto::JoinProjectResponse {
549 variant: Some(proto::join_project_response::Variant::Decline(
550 proto::join_project_response::Decline {
551 reason: proto::join_project_response::decline::Reason::Declined
552 as i32,
553 },
554 )),
555 },
556 )?;
557 }
558 return Ok(());
559 }
560
561 let (receipts_with_replica_ids, project) = state
562 .accept_join_project_request(request.sender_id, guest_user_id, project_id)
563 .ok_or_else(|| anyhow!("no such request"))?;
564
565 let peer_count = project.guests.len();
566 let mut collaborators = Vec::with_capacity(peer_count);
567 collaborators.push(proto::Collaborator {
568 peer_id: project.host_connection_id.0,
569 replica_id: 0,
570 user_id: project.host_user_id.to_proto(),
571 });
572 let worktrees = project
573 .worktrees
574 .iter()
575 .filter_map(|(id, shared_worktree)| {
576 let worktree = project.worktrees.get(&id)?;
577 Some(proto::Worktree {
578 id: *id,
579 root_name: worktree.root_name.clone(),
580 entries: shared_worktree.entries.values().cloned().collect(),
581 diagnostic_summaries: shared_worktree
582 .diagnostic_summaries
583 .values()
584 .cloned()
585 .collect(),
586 visible: worktree.visible,
587 scan_id: shared_worktree.scan_id,
588 })
589 })
590 .collect::<Vec<_>>();
591
592 // Add all guests other than the requesting user's own connections as collaborators
593 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &project.guests {
594 if receipts_with_replica_ids
595 .iter()
596 .all(|(receipt, _)| receipt.sender_id != *peer_conn_id)
597 {
598 collaborators.push(proto::Collaborator {
599 peer_id: peer_conn_id.0,
600 replica_id: *peer_replica_id as u32,
601 user_id: peer_user_id.to_proto(),
602 });
603 }
604 }
605
606 for conn_id in project.connection_ids() {
607 for (receipt, replica_id) in &receipts_with_replica_ids {
608 if conn_id != receipt.sender_id {
609 self.peer.send(
610 conn_id,
611 proto::AddProjectCollaborator {
612 project_id,
613 collaborator: Some(proto::Collaborator {
614 peer_id: receipt.sender_id.0,
615 replica_id: *replica_id as u32,
616 user_id: guest_user_id.to_proto(),
617 }),
618 },
619 )?;
620 }
621 }
622 }
623
624 for (receipt, replica_id) in receipts_with_replica_ids {
625 self.peer.respond(
626 receipt,
627 proto::JoinProjectResponse {
628 variant: Some(proto::join_project_response::Variant::Accept(
629 proto::join_project_response::Accept {
630 worktrees: worktrees.clone(),
631 replica_id: replica_id as u32,
632 collaborators: collaborators.clone(),
633 language_servers: project.language_servers.clone(),
634 },
635 )),
636 },
637 )?;
638 }
639 }
640
641 self.update_user_contacts(host_user_id).await?;
642 Ok(())
643 }
644
645 async fn leave_project(
646 self: Arc<Server>,
647 request: TypedEnvelope<proto::LeaveProject>,
648 ) -> Result<()> {
649 let sender_id = request.sender_id;
650 let project_id = request.payload.project_id;
651 let project;
652 {
653 let mut state = self.store_mut().await;
654 project = state.leave_project(sender_id, project_id)?;
655 let unshare = project.connection_ids.len() <= 1;
656 broadcast(sender_id, project.connection_ids, |conn_id| {
657 self.peer.send(
658 conn_id,
659 proto::RemoveProjectCollaborator {
660 project_id,
661 peer_id: sender_id.0,
662 },
663 )
664 });
665 if unshare {
666 self.peer.send(
667 project.host_connection_id,
668 proto::ProjectUnshared { project_id },
669 )?;
670 }
671 }
672 self.update_user_contacts(project.host_user_id).await?;
673 Ok(())
674 }
675
676 async fn register_worktree(
677 self: Arc<Server>,
678 request: TypedEnvelope<proto::RegisterWorktree>,
679 response: Response<proto::RegisterWorktree>,
680 ) -> Result<()> {
681 let host_user_id;
682 {
683 let mut state = self.store_mut().await;
684 host_user_id = state.user_id_for_connection(request.sender_id)?;
685
686 let guest_connection_ids = state
687 .read_project(request.payload.project_id, request.sender_id)?
688 .guest_connection_ids();
689 state.register_worktree(
690 request.payload.project_id,
691 request.payload.worktree_id,
692 request.sender_id,
693 Worktree {
694 root_name: request.payload.root_name.clone(),
695 visible: request.payload.visible,
696 ..Default::default()
697 },
698 )?;
699
700 broadcast(request.sender_id, guest_connection_ids, |connection_id| {
701 self.peer
702 .forward_send(request.sender_id, connection_id, request.payload.clone())
703 });
704 }
705 self.update_user_contacts(host_user_id).await?;
706 response.send(proto::Ack {})?;
707 Ok(())
708 }
709
710 async fn unregister_worktree(
711 self: Arc<Server>,
712 request: TypedEnvelope<proto::UnregisterWorktree>,
713 ) -> Result<()> {
714 let host_user_id;
715 let project_id = request.payload.project_id;
716 let worktree_id = request.payload.worktree_id;
717 {
718 let mut state = self.store_mut().await;
719 let (_, guest_connection_ids) =
720 state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
721 host_user_id = state.user_id_for_connection(request.sender_id)?;
722 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
723 self.peer.send(
724 conn_id,
725 proto::UnregisterWorktree {
726 project_id,
727 worktree_id,
728 },
729 )
730 });
731 }
732 self.update_user_contacts(host_user_id).await?;
733 Ok(())
734 }
735
736 async fn update_worktree(
737 self: Arc<Server>,
738 request: TypedEnvelope<proto::UpdateWorktree>,
739 response: Response<proto::UpdateWorktree>,
740 ) -> Result<()> {
741 let connection_ids = self.store_mut().await.update_worktree(
742 request.sender_id,
743 request.payload.project_id,
744 request.payload.worktree_id,
745 &request.payload.removed_entries,
746 &request.payload.updated_entries,
747 request.payload.scan_id,
748 )?;
749
750 broadcast(request.sender_id, connection_ids, |connection_id| {
751 self.peer
752 .forward_send(request.sender_id, connection_id, request.payload.clone())
753 });
754 response.send(proto::Ack {})?;
755 Ok(())
756 }
757
758 async fn update_diagnostic_summary(
759 self: Arc<Server>,
760 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
761 ) -> Result<()> {
762 let summary = request
763 .payload
764 .summary
765 .clone()
766 .ok_or_else(|| anyhow!("invalid summary"))?;
767 let receiver_ids = self.store_mut().await.update_diagnostic_summary(
768 request.payload.project_id,
769 request.payload.worktree_id,
770 request.sender_id,
771 summary,
772 )?;
773
774 broadcast(request.sender_id, receiver_ids, |connection_id| {
775 self.peer
776 .forward_send(request.sender_id, connection_id, request.payload.clone())
777 });
778 Ok(())
779 }
780
781 async fn start_language_server(
782 self: Arc<Server>,
783 request: TypedEnvelope<proto::StartLanguageServer>,
784 ) -> Result<()> {
785 let receiver_ids = self.store_mut().await.start_language_server(
786 request.payload.project_id,
787 request.sender_id,
788 request
789 .payload
790 .server
791 .clone()
792 .ok_or_else(|| anyhow!("invalid language server"))?,
793 )?;
794 broadcast(request.sender_id, receiver_ids, |connection_id| {
795 self.peer
796 .forward_send(request.sender_id, connection_id, request.payload.clone())
797 });
798 Ok(())
799 }
800
801 async fn update_language_server(
802 self: Arc<Server>,
803 request: TypedEnvelope<proto::UpdateLanguageServer>,
804 ) -> Result<()> {
805 let receiver_ids = self
806 .store()
807 .await
808 .project_connection_ids(request.payload.project_id, request.sender_id)?;
809 broadcast(request.sender_id, receiver_ids, |connection_id| {
810 self.peer
811 .forward_send(request.sender_id, connection_id, request.payload.clone())
812 });
813 Ok(())
814 }
815
816 async fn forward_project_request<T>(
817 self: Arc<Server>,
818 request: TypedEnvelope<T>,
819 response: Response<T>,
820 ) -> Result<()>
821 where
822 T: EntityMessage + RequestMessage,
823 {
824 let host_connection_id = self
825 .store()
826 .await
827 .read_project(request.payload.remote_entity_id(), request.sender_id)?
828 .host_connection_id;
829
830 response.send(
831 self.peer
832 .forward_request(request.sender_id, host_connection_id, request.payload)
833 .await?,
834 )?;
835 Ok(())
836 }
837
838 async fn save_buffer(
839 self: Arc<Server>,
840 request: TypedEnvelope<proto::SaveBuffer>,
841 response: Response<proto::SaveBuffer>,
842 ) -> Result<()> {
843 let host = self
844 .store()
845 .await
846 .read_project(request.payload.project_id, request.sender_id)?
847 .host_connection_id;
848 let response_payload = self
849 .peer
850 .forward_request(request.sender_id, host, request.payload.clone())
851 .await?;
852
853 let mut guests = self
854 .store()
855 .await
856 .read_project(request.payload.project_id, request.sender_id)?
857 .connection_ids();
858 guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
859 broadcast(host, guests, |conn_id| {
860 self.peer
861 .forward_send(host, conn_id, response_payload.clone())
862 });
863 response.send(response_payload)?;
864 Ok(())
865 }
866
867 async fn update_buffer(
868 self: Arc<Server>,
869 request: TypedEnvelope<proto::UpdateBuffer>,
870 response: Response<proto::UpdateBuffer>,
871 ) -> Result<()> {
872 let receiver_ids = self
873 .store()
874 .await
875 .project_connection_ids(request.payload.project_id, request.sender_id)?;
876 broadcast(request.sender_id, receiver_ids, |connection_id| {
877 self.peer
878 .forward_send(request.sender_id, connection_id, request.payload.clone())
879 });
880 response.send(proto::Ack {})?;
881 Ok(())
882 }
883
884 async fn update_buffer_file(
885 self: Arc<Server>,
886 request: TypedEnvelope<proto::UpdateBufferFile>,
887 ) -> Result<()> {
888 let receiver_ids = self
889 .store()
890 .await
891 .project_connection_ids(request.payload.project_id, request.sender_id)?;
892 broadcast(request.sender_id, receiver_ids, |connection_id| {
893 self.peer
894 .forward_send(request.sender_id, connection_id, request.payload.clone())
895 });
896 Ok(())
897 }
898
899 async fn buffer_reloaded(
900 self: Arc<Server>,
901 request: TypedEnvelope<proto::BufferReloaded>,
902 ) -> Result<()> {
903 let receiver_ids = self
904 .store()
905 .await
906 .project_connection_ids(request.payload.project_id, request.sender_id)?;
907 broadcast(request.sender_id, receiver_ids, |connection_id| {
908 self.peer
909 .forward_send(request.sender_id, connection_id, request.payload.clone())
910 });
911 Ok(())
912 }
913
914 async fn buffer_saved(
915 self: Arc<Server>,
916 request: TypedEnvelope<proto::BufferSaved>,
917 ) -> Result<()> {
918 let receiver_ids = self
919 .store()
920 .await
921 .project_connection_ids(request.payload.project_id, request.sender_id)?;
922 broadcast(request.sender_id, receiver_ids, |connection_id| {
923 self.peer
924 .forward_send(request.sender_id, connection_id, request.payload.clone())
925 });
926 Ok(())
927 }
928
929 async fn follow(
930 self: Arc<Self>,
931 request: TypedEnvelope<proto::Follow>,
932 response: Response<proto::Follow>,
933 ) -> Result<()> {
934 let leader_id = ConnectionId(request.payload.leader_id);
935 let follower_id = request.sender_id;
936 if !self
937 .store()
938 .await
939 .project_connection_ids(request.payload.project_id, follower_id)?
940 .contains(&leader_id)
941 {
942 Err(anyhow!("no such peer"))?;
943 }
944 let mut response_payload = self
945 .peer
946 .forward_request(request.sender_id, leader_id, request.payload)
947 .await?;
948 response_payload
949 .views
950 .retain(|view| view.leader_id != Some(follower_id.0));
951 response.send(response_payload)?;
952 Ok(())
953 }
954
955 async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
956 let leader_id = ConnectionId(request.payload.leader_id);
957 if !self
958 .store()
959 .await
960 .project_connection_ids(request.payload.project_id, request.sender_id)?
961 .contains(&leader_id)
962 {
963 Err(anyhow!("no such peer"))?;
964 }
965 self.peer
966 .forward_send(request.sender_id, leader_id, request.payload)?;
967 Ok(())
968 }
969
970 async fn update_followers(
971 self: Arc<Self>,
972 request: TypedEnvelope<proto::UpdateFollowers>,
973 ) -> Result<()> {
974 let connection_ids = self
975 .store()
976 .await
977 .project_connection_ids(request.payload.project_id, request.sender_id)?;
978 let leader_id = request
979 .payload
980 .variant
981 .as_ref()
982 .and_then(|variant| match variant {
983 proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
984 proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
985 proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
986 });
987 for follower_id in &request.payload.follower_ids {
988 let follower_id = ConnectionId(*follower_id);
989 if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
990 self.peer
991 .forward_send(request.sender_id, follower_id, request.payload.clone())?;
992 }
993 }
994 Ok(())
995 }
996
997 async fn get_channels(
998 self: Arc<Server>,
999 request: TypedEnvelope<proto::GetChannels>,
1000 response: Response<proto::GetChannels>,
1001 ) -> Result<()> {
1002 let user_id = self
1003 .store()
1004 .await
1005 .user_id_for_connection(request.sender_id)?;
1006 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
1007 response.send(proto::GetChannelsResponse {
1008 channels: channels
1009 .into_iter()
1010 .map(|chan| proto::Channel {
1011 id: chan.id.to_proto(),
1012 name: chan.name,
1013 })
1014 .collect(),
1015 })?;
1016 Ok(())
1017 }
1018
1019 async fn get_users(
1020 self: Arc<Server>,
1021 request: TypedEnvelope<proto::GetUsers>,
1022 response: Response<proto::GetUsers>,
1023 ) -> Result<()> {
1024 let user_ids = request
1025 .payload
1026 .user_ids
1027 .into_iter()
1028 .map(UserId::from_proto)
1029 .collect();
1030 let users = self
1031 .app_state
1032 .db
1033 .get_users_by_ids(user_ids)
1034 .await?
1035 .into_iter()
1036 .map(|user| proto::User {
1037 id: user.id.to_proto(),
1038 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1039 github_login: user.github_login,
1040 })
1041 .collect();
1042 response.send(proto::UsersResponse { users })?;
1043 Ok(())
1044 }
1045
1046 async fn fuzzy_search_users(
1047 self: Arc<Server>,
1048 request: TypedEnvelope<proto::FuzzySearchUsers>,
1049 response: Response<proto::FuzzySearchUsers>,
1050 ) -> Result<()> {
1051 let user_id = self
1052 .store()
1053 .await
1054 .user_id_for_connection(request.sender_id)?;
1055 let query = request.payload.query;
1056 let db = &self.app_state.db;
1057 let users = match query.len() {
1058 0 => vec![],
1059 1 | 2 => db
1060 .get_user_by_github_login(&query)
1061 .await?
1062 .into_iter()
1063 .collect(),
1064 _ => db.fuzzy_search_users(&query, 10).await?,
1065 };
1066 let users = users
1067 .into_iter()
1068 .filter(|user| user.id != user_id)
1069 .map(|user| proto::User {
1070 id: user.id.to_proto(),
1071 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1072 github_login: user.github_login,
1073 })
1074 .collect();
1075 response.send(proto::UsersResponse { users })?;
1076 Ok(())
1077 }
1078
1079 async fn request_contact(
1080 self: Arc<Server>,
1081 request: TypedEnvelope<proto::RequestContact>,
1082 response: Response<proto::RequestContact>,
1083 ) -> Result<()> {
1084 let requester_id = self
1085 .store()
1086 .await
1087 .user_id_for_connection(request.sender_id)?;
1088 let responder_id = UserId::from_proto(request.payload.responder_id);
1089 if requester_id == responder_id {
1090 return Err(anyhow!("cannot add yourself as a contact"))?;
1091 }
1092
1093 self.app_state
1094 .db
1095 .send_contact_request(requester_id, responder_id)
1096 .await?;
1097
1098 // Update outgoing contact requests of requester
1099 let mut update = proto::UpdateContacts::default();
1100 update.outgoing_requests.push(responder_id.to_proto());
1101 for connection_id in self.store().await.connection_ids_for_user(requester_id) {
1102 self.peer.send(connection_id, update.clone())?;
1103 }
1104
1105 // Update incoming contact requests of responder
1106 let mut update = proto::UpdateContacts::default();
1107 update
1108 .incoming_requests
1109 .push(proto::IncomingContactRequest {
1110 requester_id: requester_id.to_proto(),
1111 should_notify: true,
1112 });
1113 for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1114 self.peer.send(connection_id, update.clone())?;
1115 }
1116
1117 response.send(proto::Ack {})?;
1118 Ok(())
1119 }
1120
1121 async fn respond_to_contact_request(
1122 self: Arc<Server>,
1123 request: TypedEnvelope<proto::RespondToContactRequest>,
1124 response: Response<proto::RespondToContactRequest>,
1125 ) -> Result<()> {
1126 let responder_id = self
1127 .store()
1128 .await
1129 .user_id_for_connection(request.sender_id)?;
1130 let requester_id = UserId::from_proto(request.payload.requester_id);
1131 if request.payload.response == proto::ContactRequestResponse::Dismiss as i32 {
1132 self.app_state
1133 .db
1134 .dismiss_contact_notification(responder_id, requester_id)
1135 .await?;
1136 } else {
1137 let accept = request.payload.response == proto::ContactRequestResponse::Accept as i32;
1138 self.app_state
1139 .db
1140 .respond_to_contact_request(responder_id, requester_id, accept)
1141 .await?;
1142
1143 let store = self.store().await;
1144 // Update responder with new contact
1145 let mut update = proto::UpdateContacts::default();
1146 if accept {
1147 update
1148 .contacts
1149 .push(store.contact_for_user(requester_id, false));
1150 }
1151 update
1152 .remove_incoming_requests
1153 .push(requester_id.to_proto());
1154 for connection_id in store.connection_ids_for_user(responder_id) {
1155 self.peer.send(connection_id, update.clone())?;
1156 }
1157
1158 // Update requester with new contact
1159 let mut update = proto::UpdateContacts::default();
1160 if accept {
1161 update
1162 .contacts
1163 .push(store.contact_for_user(responder_id, true));
1164 }
1165 update
1166 .remove_outgoing_requests
1167 .push(responder_id.to_proto());
1168 for connection_id in store.connection_ids_for_user(requester_id) {
1169 self.peer.send(connection_id, update.clone())?;
1170 }
1171 }
1172
1173 response.send(proto::Ack {})?;
1174 Ok(())
1175 }
1176
1177 async fn remove_contact(
1178 self: Arc<Server>,
1179 request: TypedEnvelope<proto::RemoveContact>,
1180 response: Response<proto::RemoveContact>,
1181 ) -> Result<()> {
1182 let requester_id = self
1183 .store()
1184 .await
1185 .user_id_for_connection(request.sender_id)?;
1186 let responder_id = UserId::from_proto(request.payload.user_id);
1187 self.app_state
1188 .db
1189 .remove_contact(requester_id, responder_id)
1190 .await?;
1191
1192 // Update outgoing contact requests of requester
1193 let mut update = proto::UpdateContacts::default();
1194 update
1195 .remove_outgoing_requests
1196 .push(responder_id.to_proto());
1197 for connection_id in self.store().await.connection_ids_for_user(requester_id) {
1198 self.peer.send(connection_id, update.clone())?;
1199 }
1200
1201 // Update incoming contact requests of responder
1202 let mut update = proto::UpdateContacts::default();
1203 update
1204 .remove_incoming_requests
1205 .push(requester_id.to_proto());
1206 for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1207 self.peer.send(connection_id, update.clone())?;
1208 }
1209
1210 response.send(proto::Ack {})?;
1211 Ok(())
1212 }
1213
1214 async fn join_channel(
1215 self: Arc<Self>,
1216 request: TypedEnvelope<proto::JoinChannel>,
1217 response: Response<proto::JoinChannel>,
1218 ) -> Result<()> {
1219 let user_id = self
1220 .store()
1221 .await
1222 .user_id_for_connection(request.sender_id)?;
1223 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1224 if !self
1225 .app_state
1226 .db
1227 .can_user_access_channel(user_id, channel_id)
1228 .await?
1229 {
1230 Err(anyhow!("access denied"))?;
1231 }
1232
1233 self.store_mut()
1234 .await
1235 .join_channel(request.sender_id, channel_id);
1236 let messages = self
1237 .app_state
1238 .db
1239 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
1240 .await?
1241 .into_iter()
1242 .map(|msg| proto::ChannelMessage {
1243 id: msg.id.to_proto(),
1244 body: msg.body,
1245 timestamp: msg.sent_at.unix_timestamp() as u64,
1246 sender_id: msg.sender_id.to_proto(),
1247 nonce: Some(msg.nonce.as_u128().into()),
1248 })
1249 .collect::<Vec<_>>();
1250 response.send(proto::JoinChannelResponse {
1251 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1252 messages,
1253 })?;
1254 Ok(())
1255 }
1256
1257 async fn leave_channel(
1258 self: Arc<Self>,
1259 request: TypedEnvelope<proto::LeaveChannel>,
1260 ) -> Result<()> {
1261 let user_id = self
1262 .store()
1263 .await
1264 .user_id_for_connection(request.sender_id)?;
1265 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1266 if !self
1267 .app_state
1268 .db
1269 .can_user_access_channel(user_id, channel_id)
1270 .await?
1271 {
1272 Err(anyhow!("access denied"))?;
1273 }
1274
1275 self.store_mut()
1276 .await
1277 .leave_channel(request.sender_id, channel_id);
1278
1279 Ok(())
1280 }
1281
1282 async fn send_channel_message(
1283 self: Arc<Self>,
1284 request: TypedEnvelope<proto::SendChannelMessage>,
1285 response: Response<proto::SendChannelMessage>,
1286 ) -> Result<()> {
1287 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1288 let user_id;
1289 let connection_ids;
1290 {
1291 let state = self.store().await;
1292 user_id = state.user_id_for_connection(request.sender_id)?;
1293 connection_ids = state.channel_connection_ids(channel_id)?;
1294 }
1295
1296 // Validate the message body.
1297 let body = request.payload.body.trim().to_string();
1298 if body.len() > MAX_MESSAGE_LEN {
1299 return Err(anyhow!("message is too long"))?;
1300 }
1301 if body.is_empty() {
1302 return Err(anyhow!("message can't be blank"))?;
1303 }
1304
1305 let timestamp = OffsetDateTime::now_utc();
1306 let nonce = request
1307 .payload
1308 .nonce
1309 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
1310
1311 let message_id = self
1312 .app_state
1313 .db
1314 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1315 .await?
1316 .to_proto();
1317 let message = proto::ChannelMessage {
1318 sender_id: user_id.to_proto(),
1319 id: message_id,
1320 body,
1321 timestamp: timestamp.unix_timestamp() as u64,
1322 nonce: Some(nonce),
1323 };
1324 broadcast(request.sender_id, connection_ids, |conn_id| {
1325 self.peer.send(
1326 conn_id,
1327 proto::ChannelMessageSent {
1328 channel_id: channel_id.to_proto(),
1329 message: Some(message.clone()),
1330 },
1331 )
1332 });
1333 response.send(proto::SendChannelMessageResponse {
1334 message: Some(message),
1335 })?;
1336 Ok(())
1337 }
1338
1339 async fn get_channel_messages(
1340 self: Arc<Self>,
1341 request: TypedEnvelope<proto::GetChannelMessages>,
1342 response: Response<proto::GetChannelMessages>,
1343 ) -> Result<()> {
1344 let user_id = self
1345 .store()
1346 .await
1347 .user_id_for_connection(request.sender_id)?;
1348 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1349 if !self
1350 .app_state
1351 .db
1352 .can_user_access_channel(user_id, channel_id)
1353 .await?
1354 {
1355 Err(anyhow!("access denied"))?;
1356 }
1357
1358 let messages = self
1359 .app_state
1360 .db
1361 .get_channel_messages(
1362 channel_id,
1363 MESSAGE_COUNT_PER_PAGE,
1364 Some(MessageId::from_proto(request.payload.before_message_id)),
1365 )
1366 .await?
1367 .into_iter()
1368 .map(|msg| proto::ChannelMessage {
1369 id: msg.id.to_proto(),
1370 body: msg.body,
1371 timestamp: msg.sent_at.unix_timestamp() as u64,
1372 sender_id: msg.sender_id.to_proto(),
1373 nonce: Some(msg.nonce.as_u128().into()),
1374 })
1375 .collect::<Vec<_>>();
1376 response.send(proto::GetChannelMessagesResponse {
1377 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1378 messages,
1379 })?;
1380 Ok(())
1381 }
1382
1383 async fn store<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1384 #[cfg(test)]
1385 tokio::task::yield_now().await;
1386 let guard = self.store.read().await;
1387 #[cfg(test)]
1388 tokio::task::yield_now().await;
1389 StoreReadGuard {
1390 guard,
1391 _not_send: PhantomData,
1392 }
1393 }
1394
1395 async fn store_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1396 #[cfg(test)]
1397 tokio::task::yield_now().await;
1398 let guard = self.store.write().await;
1399 #[cfg(test)]
1400 tokio::task::yield_now().await;
1401 StoreWriteGuard {
1402 guard,
1403 _not_send: PhantomData,
1404 }
1405 }
1406}
1407
1408impl<'a> Deref for StoreReadGuard<'a> {
1409 type Target = Store;
1410
1411 fn deref(&self) -> &Self::Target {
1412 &*self.guard
1413 }
1414}
1415
1416impl<'a> Deref for StoreWriteGuard<'a> {
1417 type Target = Store;
1418
1419 fn deref(&self) -> &Self::Target {
1420 &*self.guard
1421 }
1422}
1423
1424impl<'a> DerefMut for StoreWriteGuard<'a> {
1425 fn deref_mut(&mut self) -> &mut Self::Target {
1426 &mut *self.guard
1427 }
1428}
1429
1430impl<'a> Drop for StoreWriteGuard<'a> {
1431 fn drop(&mut self) {
1432 #[cfg(test)]
1433 self.check_invariants();
1434
1435 let metrics = self.metrics();
1436 tracing::info!(
1437 connections = metrics.connections,
1438 registered_projects = metrics.registered_projects,
1439 shared_projects = metrics.shared_projects,
1440 "metrics"
1441 );
1442 }
1443}
1444
1445impl Executor for RealExecutor {
1446 type Sleep = Sleep;
1447
1448 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1449 tokio::task::spawn(future);
1450 }
1451
1452 fn sleep(&self, duration: Duration) -> Self::Sleep {
1453 tokio::time::sleep(duration)
1454 }
1455}
1456
1457fn broadcast<F>(
1458 sender_id: ConnectionId,
1459 receiver_ids: impl IntoIterator<Item = ConnectionId>,
1460 mut f: F,
1461) where
1462 F: FnMut(ConnectionId) -> anyhow::Result<()>,
1463{
1464 for receiver_id in receiver_ids {
1465 if receiver_id != sender_id {
1466 f(receiver_id).trace_err();
1467 }
1468 }
1469}
1470
1471lazy_static! {
1472 static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1473}
1474
1475pub struct ProtocolVersion(u32);
1476
1477impl Header for ProtocolVersion {
1478 fn name() -> &'static HeaderName {
1479 &ZED_PROTOCOL_VERSION
1480 }
1481
1482 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1483 where
1484 Self: Sized,
1485 I: Iterator<Item = &'i axum::http::HeaderValue>,
1486 {
1487 let version = values
1488 .next()
1489 .ok_or_else(|| axum::headers::Error::invalid())?
1490 .to_str()
1491 .map_err(|_| axum::headers::Error::invalid())?
1492 .parse()
1493 .map_err(|_| axum::headers::Error::invalid())?;
1494 Ok(Self(version))
1495 }
1496
1497 fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1498 values.extend([self.0.to_string().parse().unwrap()]);
1499 }
1500}
1501
1502pub fn routes(app_state: Arc<AppState>) -> Router<Body> {
1503 let server = Server::new(app_state.clone(), None);
1504 Router::new()
1505 .route("/rpc", get(handle_websocket_request))
1506 .layer(
1507 ServiceBuilder::new()
1508 .layer(Extension(app_state))
1509 .layer(middleware::from_fn(auth::validate_header))
1510 .layer(Extension(server)),
1511 )
1512}
1513
1514pub async fn handle_websocket_request(
1515 TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1516 ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1517 Extension(server): Extension<Arc<Server>>,
1518 Extension(user_id): Extension<UserId>,
1519 ws: WebSocketUpgrade,
1520) -> axum::response::Response {
1521 if protocol_version != rpc::PROTOCOL_VERSION {
1522 return (
1523 StatusCode::UPGRADE_REQUIRED,
1524 "client must be upgraded".to_string(),
1525 )
1526 .into_response();
1527 }
1528 let socket_address = socket_address.to_string();
1529 ws.on_upgrade(move |socket| {
1530 use util::ResultExt;
1531 let socket = socket
1532 .map_ok(to_tungstenite_message)
1533 .err_into()
1534 .with(|message| async move { Ok(to_axum_message(message)) });
1535 let connection = Connection::new(Box::pin(socket));
1536 async move {
1537 server
1538 .handle_connection(connection, socket_address, user_id, None, RealExecutor)
1539 .await
1540 .log_err();
1541 }
1542 })
1543}
1544
1545fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1546 match message {
1547 TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1548 TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1549 TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1550 TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1551 TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1552 code: frame.code.into(),
1553 reason: frame.reason,
1554 })),
1555 }
1556}
1557
1558fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1559 match message {
1560 AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1561 AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1562 AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1563 AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1564 AxumMessage::Close(frame) => {
1565 TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1566 code: frame.code.into(),
1567 reason: frame.reason,
1568 }))
1569 }
1570 }
1571}
1572
1573pub trait ResultExt {
1574 type Ok;
1575
1576 fn trace_err(self) -> Option<Self::Ok>;
1577}
1578
1579impl<T, E> ResultExt for Result<T, E>
1580where
1581 E: std::fmt::Debug,
1582{
1583 type Ok = T;
1584
1585 fn trace_err(self) -> Option<T> {
1586 match self {
1587 Ok(value) => Some(value),
1588 Err(error) => {
1589 tracing::error!("{:?}", error);
1590 None
1591 }
1592 }
1593 }
1594}
1595
1596#[cfg(test)]
1597mod tests {
1598 use super::*;
1599 use crate::{
1600 db::{tests::TestDb, UserId},
1601 AppState,
1602 };
1603 use ::rpc::Peer;
1604 use client::{
1605 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1606 EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1607 };
1608 use collections::{BTreeMap, HashSet};
1609 use editor::{
1610 self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1611 ToOffset, ToggleCodeActions, Undo,
1612 };
1613 use gpui::{
1614 executor::{self, Deterministic},
1615 geometry::vector::vec2f,
1616 ModelHandle, TestAppContext, ViewHandle,
1617 };
1618 use language::{
1619 range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1620 LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1621 };
1622 use lsp::{self, FakeLanguageServer};
1623 use parking_lot::Mutex;
1624 use project::{
1625 fs::{FakeFs, Fs as _},
1626 search::SearchQuery,
1627 worktree::WorktreeHandle,
1628 DiagnosticSummary, Project, ProjectPath, WorktreeId,
1629 };
1630 use rand::prelude::*;
1631 use rpc::PeerId;
1632 use serde_json::json;
1633 use settings::Settings;
1634 use sqlx::types::time::OffsetDateTime;
1635 use std::{
1636 env,
1637 ops::Deref,
1638 path::{Path, PathBuf},
1639 rc::Rc,
1640 sync::{
1641 atomic::{AtomicBool, Ordering::SeqCst},
1642 Arc,
1643 },
1644 time::Duration,
1645 };
1646 use theme::ThemeRegistry;
1647 use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
1648
1649 #[cfg(test)]
1650 #[ctor::ctor]
1651 fn init_logger() {
1652 if std::env::var("RUST_LOG").is_ok() {
1653 env_logger::init();
1654 }
1655 }
1656
1657 #[gpui::test(iterations = 10)]
1658 async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1659 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1660 let lang_registry = Arc::new(LanguageRegistry::test());
1661 let fs = FakeFs::new(cx_a.background());
1662 cx_a.foreground().forbid_parking();
1663
1664 // Connect to a server as 2 clients.
1665 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1666 let client_a = server.create_client(cx_a, "user_a").await;
1667 let mut client_b = server.create_client(cx_b, "user_b").await;
1668 server
1669 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1670 .await;
1671
1672 // Share a project as client A
1673 fs.insert_tree(
1674 "/a",
1675 json!({
1676 "a.txt": "a-contents",
1677 "b.txt": "b-contents",
1678 }),
1679 )
1680 .await;
1681 let project_a = cx_a.update(|cx| {
1682 Project::local(
1683 client_a.clone(),
1684 client_a.user_store.clone(),
1685 lang_registry.clone(),
1686 fs.clone(),
1687 cx,
1688 )
1689 });
1690 let (worktree_a, _) = project_a
1691 .update(cx_a, |p, cx| {
1692 p.find_or_create_local_worktree("/a", true, cx)
1693 })
1694 .await
1695 .unwrap();
1696 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1697 worktree_a
1698 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1699 .await;
1700
1701 // Join that project as client B
1702 let client_b_peer_id = client_b.peer_id;
1703 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1704
1705 let replica_id_b = project_b.read_with(cx_b, |project, _| {
1706 assert_eq!(
1707 project
1708 .collaborators()
1709 .get(&client_a.peer_id)
1710 .unwrap()
1711 .user
1712 .github_login,
1713 "user_a"
1714 );
1715 project.replica_id()
1716 });
1717 project_a
1718 .condition(&cx_a, |tree, _| {
1719 tree.collaborators()
1720 .get(&client_b_peer_id)
1721 .map_or(false, |collaborator| {
1722 collaborator.replica_id == replica_id_b
1723 && collaborator.user.github_login == "user_b"
1724 })
1725 })
1726 .await;
1727
1728 // Open the same file as client B and client A.
1729 let buffer_b = project_b
1730 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1731 .await
1732 .unwrap();
1733 buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1734 project_a.read_with(cx_a, |project, cx| {
1735 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1736 });
1737 let buffer_a = project_a
1738 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1739 .await
1740 .unwrap();
1741
1742 let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1743
1744 // TODO
1745 // // Create a selection set as client B and see that selection set as client A.
1746 // buffer_a
1747 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1748 // .await;
1749
1750 // Edit the buffer as client B and see that edit as client A.
1751 editor_b.update(cx_b, |editor, cx| {
1752 editor.handle_input(&Input("ok, ".into()), cx)
1753 });
1754 buffer_a
1755 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1756 .await;
1757
1758 // TODO
1759 // // Remove the selection set as client B, see those selections disappear as client A.
1760 cx_b.update(move |_| drop(editor_b));
1761 // buffer_a
1762 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1763 // .await;
1764
1765 // Dropping the client B's project removes client B from client A's collaborators.
1766 cx_b.update(move |_| {
1767 drop(client_b.project.take());
1768 drop(project_b);
1769 });
1770 project_a
1771 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1772 .await;
1773 }
1774
1775 #[gpui::test(iterations = 10)]
1776 async fn test_unshare_project(
1777 deterministic: Arc<Deterministic>,
1778 cx_a: &mut TestAppContext,
1779 cx_b: &mut TestAppContext,
1780 ) {
1781 let lang_registry = Arc::new(LanguageRegistry::test());
1782 let fs = FakeFs::new(cx_a.background());
1783 cx_a.foreground().forbid_parking();
1784
1785 // Connect to a server as 2 clients.
1786 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1787 let client_a = server.create_client(cx_a, "user_a").await;
1788 let mut client_b = server.create_client(cx_b, "user_b").await;
1789 server
1790 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1791 .await;
1792
1793 // Share a project as client A
1794 fs.insert_tree(
1795 "/a",
1796 json!({
1797 "a.txt": "a-contents",
1798 "b.txt": "b-contents",
1799 }),
1800 )
1801 .await;
1802 let project_a = cx_a.update(|cx| {
1803 Project::local(
1804 client_a.clone(),
1805 client_a.user_store.clone(),
1806 lang_registry.clone(),
1807 fs.clone(),
1808 cx,
1809 )
1810 });
1811 let (worktree_a, _) = project_a
1812 .update(cx_a, |p, cx| {
1813 p.find_or_create_local_worktree("/a", true, cx)
1814 })
1815 .await
1816 .unwrap();
1817 worktree_a
1818 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1819 .await;
1820 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1821
1822 // Join that project as client B
1823 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1824 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1825 project_b
1826 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1827 .await
1828 .unwrap();
1829
1830 // When client B leaves the project, it gets automatically unshared.
1831 cx_b.update(|_| {
1832 drop(client_b.project.take());
1833 drop(project_b);
1834 });
1835 deterministic.run_until_parked();
1836 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1837
1838 // When client B joins again, the project gets re-shared.
1839 let project_b2 = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1840 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1841 project_b2
1842 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1843 .await
1844 .unwrap();
1845 }
1846
1847 #[gpui::test(iterations = 10)]
1848 async fn test_host_disconnect(
1849 cx_a: &mut TestAppContext,
1850 cx_b: &mut TestAppContext,
1851 cx_c: &mut TestAppContext,
1852 ) {
1853 let lang_registry = Arc::new(LanguageRegistry::test());
1854 let fs = FakeFs::new(cx_a.background());
1855 cx_a.foreground().forbid_parking();
1856
1857 // Connect to a server as 3 clients.
1858 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1859 let client_a = server.create_client(cx_a, "user_a").await;
1860 let mut client_b = server.create_client(cx_b, "user_b").await;
1861 let client_c = server.create_client(cx_c, "user_c").await;
1862 server
1863 .make_contacts(vec![
1864 (&client_a, cx_a),
1865 (&client_b, cx_b),
1866 (&client_c, cx_c),
1867 ])
1868 .await;
1869
1870 // Share a project as client A
1871 fs.insert_tree(
1872 "/a",
1873 json!({
1874 "a.txt": "a-contents",
1875 "b.txt": "b-contents",
1876 }),
1877 )
1878 .await;
1879 let project_a = cx_a.update(|cx| {
1880 Project::local(
1881 client_a.clone(),
1882 client_a.user_store.clone(),
1883 lang_registry.clone(),
1884 fs.clone(),
1885 cx,
1886 )
1887 });
1888 let project_id = project_a
1889 .read_with(cx_a, |project, _| project.next_remote_id())
1890 .await;
1891 let (worktree_a, _) = project_a
1892 .update(cx_a, |p, cx| {
1893 p.find_or_create_local_worktree("/a", true, cx)
1894 })
1895 .await
1896 .unwrap();
1897 worktree_a
1898 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1899 .await;
1900 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1901
1902 // Join that project as client B
1903 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1904 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1905 project_b
1906 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1907 .await
1908 .unwrap();
1909
1910 // Request to join that project as client C
1911 let project_c = cx_c.spawn(|mut cx| {
1912 let client = client_c.client.clone();
1913 let user_store = client_c.user_store.clone();
1914 let lang_registry = lang_registry.clone();
1915 async move {
1916 Project::remote(
1917 project_id,
1918 client,
1919 user_store,
1920 lang_registry.clone(),
1921 FakeFs::new(cx.background()),
1922 &mut cx,
1923 )
1924 .await
1925 }
1926 });
1927
1928 // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
1929 server.disconnect_client(client_a.current_user_id(cx_a));
1930 cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
1931 project_a
1932 .condition(cx_a, |project, _| project.collaborators().is_empty())
1933 .await;
1934 project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
1935 project_b
1936 .condition(cx_b, |project, _| project.is_read_only())
1937 .await;
1938 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1939 cx_b.update(|_| {
1940 drop(project_b);
1941 });
1942 assert!(matches!(
1943 project_c.await.unwrap_err(),
1944 project::JoinProjectError::HostWentOffline
1945 ));
1946
1947 // Ensure guests can still join.
1948 let project_b2 = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1949 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1950 project_b2
1951 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1952 .await
1953 .unwrap();
1954 }
1955
1956 #[gpui::test(iterations = 10)]
1957 async fn test_decline_join_request(
1958 deterministic: Arc<Deterministic>,
1959 cx_a: &mut TestAppContext,
1960 cx_b: &mut TestAppContext,
1961 ) {
1962 let lang_registry = Arc::new(LanguageRegistry::test());
1963 let fs = FakeFs::new(cx_a.background());
1964 cx_a.foreground().forbid_parking();
1965
1966 // Connect to a server as 2 clients.
1967 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1968 let client_a = server.create_client(cx_a, "user_a").await;
1969 let client_b = server.create_client(cx_b, "user_b").await;
1970 server
1971 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1972 .await;
1973
1974 // Share a project as client A
1975 fs.insert_tree("/a", json!({})).await;
1976 let project_a = cx_a.update(|cx| {
1977 Project::local(
1978 client_a.clone(),
1979 client_a.user_store.clone(),
1980 lang_registry.clone(),
1981 fs.clone(),
1982 cx,
1983 )
1984 });
1985 let project_id = project_a
1986 .read_with(cx_a, |project, _| project.next_remote_id())
1987 .await;
1988 let (worktree_a, _) = project_a
1989 .update(cx_a, |p, cx| {
1990 p.find_or_create_local_worktree("/a", true, cx)
1991 })
1992 .await
1993 .unwrap();
1994 worktree_a
1995 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1996 .await;
1997
1998 // Request to join that project as client B
1999 let project_b = cx_b.spawn(|mut cx| {
2000 let client = client_b.client.clone();
2001 let user_store = client_b.user_store.clone();
2002 let lang_registry = lang_registry.clone();
2003 async move {
2004 Project::remote(
2005 project_id,
2006 client,
2007 user_store,
2008 lang_registry.clone(),
2009 FakeFs::new(cx.background()),
2010 &mut cx,
2011 )
2012 .await
2013 }
2014 });
2015 deterministic.run_until_parked();
2016 project_a.update(cx_a, |project, cx| {
2017 project.respond_to_join_request(client_b.user_id().unwrap(), false, cx)
2018 });
2019 assert!(matches!(
2020 project_b.await.unwrap_err(),
2021 project::JoinProjectError::HostDeclined
2022 ));
2023
2024 // Request to join the project again as client B
2025 let project_b = cx_b.spawn(|mut cx| {
2026 let client = client_b.client.clone();
2027 let user_store = client_b.user_store.clone();
2028 let lang_registry = lang_registry.clone();
2029 async move {
2030 Project::remote(
2031 project_id,
2032 client,
2033 user_store,
2034 lang_registry.clone(),
2035 FakeFs::new(cx.background()),
2036 &mut cx,
2037 )
2038 .await
2039 }
2040 });
2041
2042 // Close the project on the host
2043 deterministic.run_until_parked();
2044 cx_a.update(|_| drop(project_a));
2045 deterministic.run_until_parked();
2046 assert!(matches!(
2047 project_b.await.unwrap_err(),
2048 project::JoinProjectError::HostClosedProject
2049 ));
2050 }
2051
2052 #[gpui::test(iterations = 10)]
2053 async fn test_propagate_saves_and_fs_changes(
2054 cx_a: &mut TestAppContext,
2055 cx_b: &mut TestAppContext,
2056 cx_c: &mut TestAppContext,
2057 ) {
2058 let lang_registry = Arc::new(LanguageRegistry::test());
2059 let fs = FakeFs::new(cx_a.background());
2060 cx_a.foreground().forbid_parking();
2061
2062 // Connect to a server as 3 clients.
2063 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2064 let client_a = server.create_client(cx_a, "user_a").await;
2065 let mut client_b = server.create_client(cx_b, "user_b").await;
2066 let mut client_c = server.create_client(cx_c, "user_c").await;
2067 server
2068 .make_contacts(vec![
2069 (&client_a, cx_a),
2070 (&client_b, cx_b),
2071 (&client_c, cx_c),
2072 ])
2073 .await;
2074
2075 // Share a worktree as client A.
2076 fs.insert_tree(
2077 "/a",
2078 json!({
2079 "file1": "",
2080 "file2": ""
2081 }),
2082 )
2083 .await;
2084 let project_a = cx_a.update(|cx| {
2085 Project::local(
2086 client_a.clone(),
2087 client_a.user_store.clone(),
2088 lang_registry.clone(),
2089 fs.clone(),
2090 cx,
2091 )
2092 });
2093 let (worktree_a, _) = project_a
2094 .update(cx_a, |p, cx| {
2095 p.find_or_create_local_worktree("/a", true, cx)
2096 })
2097 .await
2098 .unwrap();
2099 worktree_a
2100 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2101 .await;
2102 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2103
2104 // Join that worktree as clients B and C.
2105 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2106 let project_c = client_c.build_remote_project(&project_a, cx_a, cx_c).await;
2107 let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2108 let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
2109
2110 // Open and edit a buffer as both guests B and C.
2111 let buffer_b = project_b
2112 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
2113 .await
2114 .unwrap();
2115 let buffer_c = project_c
2116 .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
2117 .await
2118 .unwrap();
2119 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "i-am-b, ")], cx));
2120 buffer_c.update(cx_c, |buf, cx| buf.edit([(0..0, "i-am-c, ")], cx));
2121
2122 // Open and edit that buffer as the host.
2123 let buffer_a = project_a
2124 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
2125 .await
2126 .unwrap();
2127
2128 buffer_a
2129 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
2130 .await;
2131 buffer_a.update(cx_a, |buf, cx| {
2132 buf.edit([(buf.len()..buf.len(), "i-am-a")], cx)
2133 });
2134
2135 // Wait for edits to propagate
2136 buffer_a
2137 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2138 .await;
2139 buffer_b
2140 .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2141 .await;
2142 buffer_c
2143 .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2144 .await;
2145
2146 // Edit the buffer as the host and concurrently save as guest B.
2147 let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
2148 buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "hi-a, ")], cx));
2149 save_b.await.unwrap();
2150 assert_eq!(
2151 fs.load("/a/file1".as_ref()).await.unwrap(),
2152 "hi-a, i-am-c, i-am-b, i-am-a"
2153 );
2154 buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
2155 buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
2156 buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
2157
2158 worktree_a.flush_fs_events(cx_a).await;
2159
2160 // Make changes on host's file system, see those changes on guest worktrees.
2161 fs.rename(
2162 "/a/file1".as_ref(),
2163 "/a/file1-renamed".as_ref(),
2164 Default::default(),
2165 )
2166 .await
2167 .unwrap();
2168
2169 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
2170 .await
2171 .unwrap();
2172 fs.insert_file(Path::new("/a/file4"), "4".into()).await;
2173
2174 worktree_a
2175 .condition(&cx_a, |tree, _| {
2176 tree.paths()
2177 .map(|p| p.to_string_lossy())
2178 .collect::<Vec<_>>()
2179 == ["file1-renamed", "file3", "file4"]
2180 })
2181 .await;
2182 worktree_b
2183 .condition(&cx_b, |tree, _| {
2184 tree.paths()
2185 .map(|p| p.to_string_lossy())
2186 .collect::<Vec<_>>()
2187 == ["file1-renamed", "file3", "file4"]
2188 })
2189 .await;
2190 worktree_c
2191 .condition(&cx_c, |tree, _| {
2192 tree.paths()
2193 .map(|p| p.to_string_lossy())
2194 .collect::<Vec<_>>()
2195 == ["file1-renamed", "file3", "file4"]
2196 })
2197 .await;
2198
2199 // Ensure buffer files are updated as well.
2200 buffer_a
2201 .condition(&cx_a, |buf, _| {
2202 buf.file().unwrap().path().to_str() == Some("file1-renamed")
2203 })
2204 .await;
2205 buffer_b
2206 .condition(&cx_b, |buf, _| {
2207 buf.file().unwrap().path().to_str() == Some("file1-renamed")
2208 })
2209 .await;
2210 buffer_c
2211 .condition(&cx_c, |buf, _| {
2212 buf.file().unwrap().path().to_str() == Some("file1-renamed")
2213 })
2214 .await;
2215 }
2216
2217 #[gpui::test(iterations = 10)]
2218 async fn test_fs_operations(
2219 executor: Arc<Deterministic>,
2220 cx_a: &mut TestAppContext,
2221 cx_b: &mut TestAppContext,
2222 ) {
2223 executor.forbid_parking();
2224 let fs = FakeFs::new(cx_a.background());
2225
2226 // Connect to a server as 2 clients.
2227 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2228 let mut client_a = server.create_client(cx_a, "user_a").await;
2229 let mut client_b = server.create_client(cx_b, "user_b").await;
2230 server
2231 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2232 .await;
2233
2234 // Share a project as client A
2235 fs.insert_tree(
2236 "/dir",
2237 json!({
2238 "a.txt": "a-contents",
2239 "b.txt": "b-contents",
2240 }),
2241 )
2242 .await;
2243
2244 let (project_a, worktree_id) = client_a.build_local_project(fs, "/dir", cx_a).await;
2245 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2246
2247 let worktree_a =
2248 project_a.read_with(cx_a, |project, cx| project.worktrees(cx).next().unwrap());
2249 let worktree_b =
2250 project_b.read_with(cx_b, |project, cx| project.worktrees(cx).next().unwrap());
2251
2252 let entry = project_b
2253 .update(cx_b, |project, cx| {
2254 project
2255 .create_entry((worktree_id, "c.txt"), false, cx)
2256 .unwrap()
2257 })
2258 .await
2259 .unwrap();
2260 worktree_a.read_with(cx_a, |worktree, _| {
2261 assert_eq!(
2262 worktree
2263 .paths()
2264 .map(|p| p.to_string_lossy())
2265 .collect::<Vec<_>>(),
2266 ["a.txt", "b.txt", "c.txt"]
2267 );
2268 });
2269 worktree_b.read_with(cx_b, |worktree, _| {
2270 assert_eq!(
2271 worktree
2272 .paths()
2273 .map(|p| p.to_string_lossy())
2274 .collect::<Vec<_>>(),
2275 ["a.txt", "b.txt", "c.txt"]
2276 );
2277 });
2278
2279 project_b
2280 .update(cx_b, |project, cx| {
2281 project.rename_entry(entry.id, Path::new("d.txt"), cx)
2282 })
2283 .unwrap()
2284 .await
2285 .unwrap();
2286 worktree_a.read_with(cx_a, |worktree, _| {
2287 assert_eq!(
2288 worktree
2289 .paths()
2290 .map(|p| p.to_string_lossy())
2291 .collect::<Vec<_>>(),
2292 ["a.txt", "b.txt", "d.txt"]
2293 );
2294 });
2295 worktree_b.read_with(cx_b, |worktree, _| {
2296 assert_eq!(
2297 worktree
2298 .paths()
2299 .map(|p| p.to_string_lossy())
2300 .collect::<Vec<_>>(),
2301 ["a.txt", "b.txt", "d.txt"]
2302 );
2303 });
2304
2305 let dir_entry = project_b
2306 .update(cx_b, |project, cx| {
2307 project
2308 .create_entry((worktree_id, "DIR"), true, cx)
2309 .unwrap()
2310 })
2311 .await
2312 .unwrap();
2313 worktree_a.read_with(cx_a, |worktree, _| {
2314 assert_eq!(
2315 worktree
2316 .paths()
2317 .map(|p| p.to_string_lossy())
2318 .collect::<Vec<_>>(),
2319 ["DIR", "a.txt", "b.txt", "d.txt"]
2320 );
2321 });
2322 worktree_b.read_with(cx_b, |worktree, _| {
2323 assert_eq!(
2324 worktree
2325 .paths()
2326 .map(|p| p.to_string_lossy())
2327 .collect::<Vec<_>>(),
2328 ["DIR", "a.txt", "b.txt", "d.txt"]
2329 );
2330 });
2331
2332 project_b
2333 .update(cx_b, |project, cx| {
2334 project.delete_entry(dir_entry.id, cx).unwrap()
2335 })
2336 .await
2337 .unwrap();
2338 worktree_a.read_with(cx_a, |worktree, _| {
2339 assert_eq!(
2340 worktree
2341 .paths()
2342 .map(|p| p.to_string_lossy())
2343 .collect::<Vec<_>>(),
2344 ["a.txt", "b.txt", "d.txt"]
2345 );
2346 });
2347 worktree_b.read_with(cx_b, |worktree, _| {
2348 assert_eq!(
2349 worktree
2350 .paths()
2351 .map(|p| p.to_string_lossy())
2352 .collect::<Vec<_>>(),
2353 ["a.txt", "b.txt", "d.txt"]
2354 );
2355 });
2356
2357 project_b
2358 .update(cx_b, |project, cx| {
2359 project.delete_entry(entry.id, cx).unwrap()
2360 })
2361 .await
2362 .unwrap();
2363 worktree_a.read_with(cx_a, |worktree, _| {
2364 assert_eq!(
2365 worktree
2366 .paths()
2367 .map(|p| p.to_string_lossy())
2368 .collect::<Vec<_>>(),
2369 ["a.txt", "b.txt"]
2370 );
2371 });
2372 worktree_b.read_with(cx_b, |worktree, _| {
2373 assert_eq!(
2374 worktree
2375 .paths()
2376 .map(|p| p.to_string_lossy())
2377 .collect::<Vec<_>>(),
2378 ["a.txt", "b.txt"]
2379 );
2380 });
2381 }
2382
2383 #[gpui::test(iterations = 10)]
2384 async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2385 cx_a.foreground().forbid_parking();
2386 let lang_registry = Arc::new(LanguageRegistry::test());
2387 let fs = FakeFs::new(cx_a.background());
2388
2389 // Connect to a server as 2 clients.
2390 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2391 let client_a = server.create_client(cx_a, "user_a").await;
2392 let mut client_b = server.create_client(cx_b, "user_b").await;
2393 server
2394 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2395 .await;
2396
2397 // Share a project as client A
2398 fs.insert_tree(
2399 "/dir",
2400 json!({
2401 "a.txt": "a-contents",
2402 }),
2403 )
2404 .await;
2405
2406 let project_a = cx_a.update(|cx| {
2407 Project::local(
2408 client_a.clone(),
2409 client_a.user_store.clone(),
2410 lang_registry.clone(),
2411 fs.clone(),
2412 cx,
2413 )
2414 });
2415 let (worktree_a, _) = project_a
2416 .update(cx_a, |p, cx| {
2417 p.find_or_create_local_worktree("/dir", true, cx)
2418 })
2419 .await
2420 .unwrap();
2421 worktree_a
2422 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2423 .await;
2424 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2425
2426 // Join that project as client B
2427 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2428
2429 // Open a buffer as client B
2430 let buffer_b = project_b
2431 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2432 .await
2433 .unwrap();
2434
2435 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "world ")], cx));
2436 buffer_b.read_with(cx_b, |buf, _| {
2437 assert!(buf.is_dirty());
2438 assert!(!buf.has_conflict());
2439 });
2440
2441 buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
2442 buffer_b
2443 .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
2444 .await;
2445 buffer_b.read_with(cx_b, |buf, _| {
2446 assert!(!buf.has_conflict());
2447 });
2448
2449 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "hello ")], cx));
2450 buffer_b.read_with(cx_b, |buf, _| {
2451 assert!(buf.is_dirty());
2452 assert!(!buf.has_conflict());
2453 });
2454 }
2455
2456 #[gpui::test(iterations = 10)]
2457 async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2458 cx_a.foreground().forbid_parking();
2459 let lang_registry = Arc::new(LanguageRegistry::test());
2460 let fs = FakeFs::new(cx_a.background());
2461
2462 // Connect to a server as 2 clients.
2463 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2464 let client_a = server.create_client(cx_a, "user_a").await;
2465 let mut client_b = server.create_client(cx_b, "user_b").await;
2466 server
2467 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2468 .await;
2469
2470 // Share a project as client A
2471 fs.insert_tree(
2472 "/dir",
2473 json!({
2474 "a.txt": "a-contents",
2475 }),
2476 )
2477 .await;
2478
2479 let project_a = cx_a.update(|cx| {
2480 Project::local(
2481 client_a.clone(),
2482 client_a.user_store.clone(),
2483 lang_registry.clone(),
2484 fs.clone(),
2485 cx,
2486 )
2487 });
2488 let (worktree_a, _) = project_a
2489 .update(cx_a, |p, cx| {
2490 p.find_or_create_local_worktree("/dir", true, cx)
2491 })
2492 .await
2493 .unwrap();
2494 worktree_a
2495 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2496 .await;
2497 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2498
2499 // Join that project as client B
2500 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2501 let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2502
2503 // Open a buffer as client B
2504 let buffer_b = project_b
2505 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2506 .await
2507 .unwrap();
2508 buffer_b.read_with(cx_b, |buf, _| {
2509 assert!(!buf.is_dirty());
2510 assert!(!buf.has_conflict());
2511 });
2512
2513 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
2514 .await
2515 .unwrap();
2516 buffer_b
2517 .condition(&cx_b, |buf, _| {
2518 buf.text() == "new contents" && !buf.is_dirty()
2519 })
2520 .await;
2521 buffer_b.read_with(cx_b, |buf, _| {
2522 assert!(!buf.has_conflict());
2523 });
2524 }
2525
2526 #[gpui::test(iterations = 10)]
2527 async fn test_editing_while_guest_opens_buffer(
2528 cx_a: &mut TestAppContext,
2529 cx_b: &mut TestAppContext,
2530 ) {
2531 cx_a.foreground().forbid_parking();
2532 let lang_registry = Arc::new(LanguageRegistry::test());
2533 let fs = FakeFs::new(cx_a.background());
2534
2535 // Connect to a server as 2 clients.
2536 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2537 let client_a = server.create_client(cx_a, "user_a").await;
2538 let mut client_b = server.create_client(cx_b, "user_b").await;
2539 server
2540 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2541 .await;
2542
2543 // Share a project as client A
2544 fs.insert_tree(
2545 "/dir",
2546 json!({
2547 "a.txt": "a-contents",
2548 }),
2549 )
2550 .await;
2551 let project_a = cx_a.update(|cx| {
2552 Project::local(
2553 client_a.clone(),
2554 client_a.user_store.clone(),
2555 lang_registry.clone(),
2556 fs.clone(),
2557 cx,
2558 )
2559 });
2560 let (worktree_a, _) = project_a
2561 .update(cx_a, |p, cx| {
2562 p.find_or_create_local_worktree("/dir", true, cx)
2563 })
2564 .await
2565 .unwrap();
2566 worktree_a
2567 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2568 .await;
2569 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2570
2571 // Join that project as client B
2572 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2573
2574 // Open a buffer as client A
2575 let buffer_a = project_a
2576 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2577 .await
2578 .unwrap();
2579
2580 // Start opening the same buffer as client B
2581 let buffer_b = cx_b
2582 .background()
2583 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2584
2585 // Edit the buffer as client A while client B is still opening it.
2586 cx_b.background().simulate_random_delay().await;
2587 buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "X")], cx));
2588 cx_b.background().simulate_random_delay().await;
2589 buffer_a.update(cx_a, |buf, cx| buf.edit([(1..1, "Y")], cx));
2590
2591 let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2592 let buffer_b = buffer_b.await.unwrap();
2593 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2594 }
2595
2596 #[gpui::test(iterations = 10)]
2597 async fn test_leaving_worktree_while_opening_buffer(
2598 cx_a: &mut TestAppContext,
2599 cx_b: &mut TestAppContext,
2600 ) {
2601 cx_a.foreground().forbid_parking();
2602 let lang_registry = Arc::new(LanguageRegistry::test());
2603 let fs = FakeFs::new(cx_a.background());
2604
2605 // Connect to a server as 2 clients.
2606 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2607 let client_a = server.create_client(cx_a, "user_a").await;
2608 let mut client_b = server.create_client(cx_b, "user_b").await;
2609 server
2610 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2611 .await;
2612
2613 // Share a project as client A
2614 fs.insert_tree(
2615 "/dir",
2616 json!({
2617 "a.txt": "a-contents",
2618 }),
2619 )
2620 .await;
2621 let project_a = cx_a.update(|cx| {
2622 Project::local(
2623 client_a.clone(),
2624 client_a.user_store.clone(),
2625 lang_registry.clone(),
2626 fs.clone(),
2627 cx,
2628 )
2629 });
2630 let (worktree_a, _) = project_a
2631 .update(cx_a, |p, cx| {
2632 p.find_or_create_local_worktree("/dir", true, cx)
2633 })
2634 .await
2635 .unwrap();
2636 worktree_a
2637 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2638 .await;
2639 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2640
2641 // Join that project as client B
2642 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2643
2644 // See that a guest has joined as client A.
2645 project_a
2646 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2647 .await;
2648
2649 // Begin opening a buffer as client B, but leave the project before the open completes.
2650 let buffer_b = cx_b
2651 .background()
2652 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2653 cx_b.update(|_| {
2654 drop(client_b.project.take());
2655 drop(project_b);
2656 });
2657 drop(buffer_b);
2658
2659 // See that the guest has left.
2660 project_a
2661 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2662 .await;
2663 }
2664
2665 #[gpui::test(iterations = 10)]
2666 async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2667 cx_a.foreground().forbid_parking();
2668 let lang_registry = Arc::new(LanguageRegistry::test());
2669 let fs = FakeFs::new(cx_a.background());
2670
2671 // Connect to a server as 2 clients.
2672 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2673 let client_a = server.create_client(cx_a, "user_a").await;
2674 let mut client_b = server.create_client(cx_b, "user_b").await;
2675 server
2676 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2677 .await;
2678
2679 // Share a project as client A
2680 fs.insert_tree(
2681 "/a",
2682 json!({
2683 "a.txt": "a-contents",
2684 "b.txt": "b-contents",
2685 }),
2686 )
2687 .await;
2688 let project_a = cx_a.update(|cx| {
2689 Project::local(
2690 client_a.clone(),
2691 client_a.user_store.clone(),
2692 lang_registry.clone(),
2693 fs.clone(),
2694 cx,
2695 )
2696 });
2697 let (worktree_a, _) = project_a
2698 .update(cx_a, |p, cx| {
2699 p.find_or_create_local_worktree("/a", true, cx)
2700 })
2701 .await
2702 .unwrap();
2703 worktree_a
2704 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2705 .await;
2706
2707 // Join that project as client B
2708 let _project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2709
2710 // Client A sees that a guest has joined.
2711 project_a
2712 .condition(cx_a, |p, _| p.collaborators().len() == 1)
2713 .await;
2714
2715 // Drop client B's connection and ensure client A observes client B leaving the project.
2716 client_b.disconnect(&cx_b.to_async()).unwrap();
2717 project_a
2718 .condition(cx_a, |p, _| p.collaborators().len() == 0)
2719 .await;
2720
2721 // Rejoin the project as client B
2722 let _project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2723
2724 // Client A sees that a guest has re-joined.
2725 project_a
2726 .condition(cx_a, |p, _| p.collaborators().len() == 1)
2727 .await;
2728
2729 // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2730 client_b.wait_for_current_user(cx_b).await;
2731 server.disconnect_client(client_b.current_user_id(cx_b));
2732 cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
2733 project_a
2734 .condition(cx_a, |p, _| p.collaborators().len() == 0)
2735 .await;
2736 }
2737
2738 #[gpui::test(iterations = 10)]
2739 async fn test_collaborating_with_diagnostics(
2740 deterministic: Arc<Deterministic>,
2741 cx_a: &mut TestAppContext,
2742 cx_b: &mut TestAppContext,
2743 cx_c: &mut TestAppContext,
2744 ) {
2745 deterministic.forbid_parking();
2746 let lang_registry = Arc::new(LanguageRegistry::test());
2747 let fs = FakeFs::new(cx_a.background());
2748
2749 // Set up a fake language server.
2750 let mut language = Language::new(
2751 LanguageConfig {
2752 name: "Rust".into(),
2753 path_suffixes: vec!["rs".to_string()],
2754 ..Default::default()
2755 },
2756 Some(tree_sitter_rust::language()),
2757 );
2758 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2759 lang_registry.add(Arc::new(language));
2760
2761 // Connect to a server as 2 clients.
2762 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2763 let client_a = server.create_client(cx_a, "user_a").await;
2764 let mut client_b = server.create_client(cx_b, "user_b").await;
2765 let mut client_c = server.create_client(cx_c, "user_c").await;
2766 server
2767 .make_contacts(vec![
2768 (&client_a, cx_a),
2769 (&client_b, cx_b),
2770 (&client_c, cx_c),
2771 ])
2772 .await;
2773
2774 // Share a project as client A
2775 fs.insert_tree(
2776 "/a",
2777 json!({
2778 "a.rs": "let one = two",
2779 "other.rs": "",
2780 }),
2781 )
2782 .await;
2783 let project_a = cx_a.update(|cx| {
2784 Project::local(
2785 client_a.clone(),
2786 client_a.user_store.clone(),
2787 lang_registry.clone(),
2788 fs.clone(),
2789 cx,
2790 )
2791 });
2792 let (worktree_a, _) = project_a
2793 .update(cx_a, |p, cx| {
2794 p.find_or_create_local_worktree("/a", true, cx)
2795 })
2796 .await
2797 .unwrap();
2798 worktree_a
2799 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2800 .await;
2801 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2802 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2803
2804 // Cause the language server to start.
2805 let _buffer = cx_a
2806 .background()
2807 .spawn(project_a.update(cx_a, |project, cx| {
2808 project.open_buffer(
2809 ProjectPath {
2810 worktree_id,
2811 path: Path::new("other.rs").into(),
2812 },
2813 cx,
2814 )
2815 }))
2816 .await
2817 .unwrap();
2818
2819 // Join the worktree as client B.
2820 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2821
2822 // Simulate a language server reporting errors for a file.
2823 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2824 fake_language_server
2825 .receive_notification::<lsp::notification::DidOpenTextDocument>()
2826 .await;
2827 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2828 lsp::PublishDiagnosticsParams {
2829 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2830 version: None,
2831 diagnostics: vec![lsp::Diagnostic {
2832 severity: Some(lsp::DiagnosticSeverity::ERROR),
2833 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2834 message: "message 1".to_string(),
2835 ..Default::default()
2836 }],
2837 },
2838 );
2839
2840 // Wait for server to see the diagnostics update.
2841 deterministic.run_until_parked();
2842 {
2843 let store = server.store.read().await;
2844 let project = store.project(project_id).unwrap();
2845 let worktree = project.worktrees.get(&worktree_id.to_proto()).unwrap();
2846 assert!(!worktree.diagnostic_summaries.is_empty());
2847 }
2848
2849 // Ensure client B observes the new diagnostics.
2850 project_b.read_with(cx_b, |project, cx| {
2851 assert_eq!(
2852 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2853 &[(
2854 ProjectPath {
2855 worktree_id,
2856 path: Arc::from(Path::new("a.rs")),
2857 },
2858 DiagnosticSummary {
2859 error_count: 1,
2860 warning_count: 0,
2861 ..Default::default()
2862 },
2863 )]
2864 )
2865 });
2866
2867 // Join project as client C and observe the diagnostics.
2868 let project_c = client_c.build_remote_project(&project_a, cx_a, cx_c).await;
2869 project_c.read_with(cx_c, |project, cx| {
2870 assert_eq!(
2871 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2872 &[(
2873 ProjectPath {
2874 worktree_id,
2875 path: Arc::from(Path::new("a.rs")),
2876 },
2877 DiagnosticSummary {
2878 error_count: 1,
2879 warning_count: 0,
2880 ..Default::default()
2881 },
2882 )]
2883 )
2884 });
2885
2886 // Simulate a language server reporting more errors for a file.
2887 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2888 lsp::PublishDiagnosticsParams {
2889 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2890 version: None,
2891 diagnostics: vec![
2892 lsp::Diagnostic {
2893 severity: Some(lsp::DiagnosticSeverity::ERROR),
2894 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2895 message: "message 1".to_string(),
2896 ..Default::default()
2897 },
2898 lsp::Diagnostic {
2899 severity: Some(lsp::DiagnosticSeverity::WARNING),
2900 range: lsp::Range::new(
2901 lsp::Position::new(0, 10),
2902 lsp::Position::new(0, 13),
2903 ),
2904 message: "message 2".to_string(),
2905 ..Default::default()
2906 },
2907 ],
2908 },
2909 );
2910
2911 // Clients B and C get the updated summaries
2912 deterministic.run_until_parked();
2913 project_b.read_with(cx_b, |project, cx| {
2914 assert_eq!(
2915 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2916 [(
2917 ProjectPath {
2918 worktree_id,
2919 path: Arc::from(Path::new("a.rs")),
2920 },
2921 DiagnosticSummary {
2922 error_count: 1,
2923 warning_count: 1,
2924 ..Default::default()
2925 },
2926 )]
2927 );
2928 });
2929 project_c.read_with(cx_c, |project, cx| {
2930 assert_eq!(
2931 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2932 [(
2933 ProjectPath {
2934 worktree_id,
2935 path: Arc::from(Path::new("a.rs")),
2936 },
2937 DiagnosticSummary {
2938 error_count: 1,
2939 warning_count: 1,
2940 ..Default::default()
2941 },
2942 )]
2943 );
2944 });
2945
2946 // Open the file with the errors on client B. They should be present.
2947 let buffer_b = cx_b
2948 .background()
2949 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2950 .await
2951 .unwrap();
2952
2953 buffer_b.read_with(cx_b, |buffer, _| {
2954 assert_eq!(
2955 buffer
2956 .snapshot()
2957 .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2958 .map(|entry| entry)
2959 .collect::<Vec<_>>(),
2960 &[
2961 DiagnosticEntry {
2962 range: Point::new(0, 4)..Point::new(0, 7),
2963 diagnostic: Diagnostic {
2964 group_id: 0,
2965 message: "message 1".to_string(),
2966 severity: lsp::DiagnosticSeverity::ERROR,
2967 is_primary: true,
2968 ..Default::default()
2969 }
2970 },
2971 DiagnosticEntry {
2972 range: Point::new(0, 10)..Point::new(0, 13),
2973 diagnostic: Diagnostic {
2974 group_id: 1,
2975 severity: lsp::DiagnosticSeverity::WARNING,
2976 message: "message 2".to_string(),
2977 is_primary: true,
2978 ..Default::default()
2979 }
2980 }
2981 ]
2982 );
2983 });
2984
2985 // Simulate a language server reporting no errors for a file.
2986 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2987 lsp::PublishDiagnosticsParams {
2988 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2989 version: None,
2990 diagnostics: vec![],
2991 },
2992 );
2993 deterministic.run_until_parked();
2994 project_a.read_with(cx_a, |project, cx| {
2995 assert_eq!(project.diagnostic_summaries(cx).collect::<Vec<_>>(), [])
2996 });
2997 project_b.read_with(cx_b, |project, cx| {
2998 assert_eq!(project.diagnostic_summaries(cx).collect::<Vec<_>>(), [])
2999 });
3000 project_c.read_with(cx_c, |project, cx| {
3001 assert_eq!(project.diagnostic_summaries(cx).collect::<Vec<_>>(), [])
3002 });
3003 }
3004
3005 #[gpui::test(iterations = 10)]
3006 async fn test_collaborating_with_completion(
3007 cx_a: &mut TestAppContext,
3008 cx_b: &mut TestAppContext,
3009 ) {
3010 cx_a.foreground().forbid_parking();
3011 let lang_registry = Arc::new(LanguageRegistry::test());
3012 let fs = FakeFs::new(cx_a.background());
3013
3014 // Set up a fake language server.
3015 let mut language = Language::new(
3016 LanguageConfig {
3017 name: "Rust".into(),
3018 path_suffixes: vec!["rs".to_string()],
3019 ..Default::default()
3020 },
3021 Some(tree_sitter_rust::language()),
3022 );
3023 let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
3024 capabilities: lsp::ServerCapabilities {
3025 completion_provider: Some(lsp::CompletionOptions {
3026 trigger_characters: Some(vec![".".to_string()]),
3027 ..Default::default()
3028 }),
3029 ..Default::default()
3030 },
3031 ..Default::default()
3032 });
3033 lang_registry.add(Arc::new(language));
3034
3035 // Connect to a server as 2 clients.
3036 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3037 let client_a = server.create_client(cx_a, "user_a").await;
3038 let mut client_b = server.create_client(cx_b, "user_b").await;
3039 server
3040 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3041 .await;
3042
3043 // Share a project as client A
3044 fs.insert_tree(
3045 "/a",
3046 json!({
3047 "main.rs": "fn main() { a }",
3048 "other.rs": "",
3049 }),
3050 )
3051 .await;
3052 let project_a = cx_a.update(|cx| {
3053 Project::local(
3054 client_a.clone(),
3055 client_a.user_store.clone(),
3056 lang_registry.clone(),
3057 fs.clone(),
3058 cx,
3059 )
3060 });
3061 let (worktree_a, _) = project_a
3062 .update(cx_a, |p, cx| {
3063 p.find_or_create_local_worktree("/a", true, cx)
3064 })
3065 .await
3066 .unwrap();
3067 worktree_a
3068 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3069 .await;
3070 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3071
3072 // Join the worktree as client B.
3073 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3074
3075 // Open a file in an editor as the guest.
3076 let buffer_b = project_b
3077 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
3078 .await
3079 .unwrap();
3080 let (window_b, _) = cx_b.add_window(|_| EmptyView);
3081 let editor_b = cx_b.add_view(window_b, |cx| {
3082 Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
3083 });
3084
3085 let fake_language_server = fake_language_servers.next().await.unwrap();
3086 buffer_b
3087 .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
3088 .await;
3089
3090 // Type a completion trigger character as the guest.
3091 editor_b.update(cx_b, |editor, cx| {
3092 editor.select_ranges([13..13], None, cx);
3093 editor.handle_input(&Input(".".into()), cx);
3094 cx.focus(&editor_b);
3095 });
3096
3097 // Receive a completion request as the host's language server.
3098 // Return some completions from the host's language server.
3099 cx_a.foreground().start_waiting();
3100 fake_language_server
3101 .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
3102 assert_eq!(
3103 params.text_document_position.text_document.uri,
3104 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3105 );
3106 assert_eq!(
3107 params.text_document_position.position,
3108 lsp::Position::new(0, 14),
3109 );
3110
3111 Ok(Some(lsp::CompletionResponse::Array(vec![
3112 lsp::CompletionItem {
3113 label: "first_method(…)".into(),
3114 detail: Some("fn(&mut self, B) -> C".into()),
3115 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3116 new_text: "first_method($1)".to_string(),
3117 range: lsp::Range::new(
3118 lsp::Position::new(0, 14),
3119 lsp::Position::new(0, 14),
3120 ),
3121 })),
3122 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3123 ..Default::default()
3124 },
3125 lsp::CompletionItem {
3126 label: "second_method(…)".into(),
3127 detail: Some("fn(&mut self, C) -> D<E>".into()),
3128 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3129 new_text: "second_method()".to_string(),
3130 range: lsp::Range::new(
3131 lsp::Position::new(0, 14),
3132 lsp::Position::new(0, 14),
3133 ),
3134 })),
3135 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3136 ..Default::default()
3137 },
3138 ])))
3139 })
3140 .next()
3141 .await
3142 .unwrap();
3143 cx_a.foreground().finish_waiting();
3144
3145 // Open the buffer on the host.
3146 let buffer_a = project_a
3147 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
3148 .await
3149 .unwrap();
3150 buffer_a
3151 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
3152 .await;
3153
3154 // Confirm a completion on the guest.
3155 editor_b
3156 .condition(&cx_b, |editor, _| editor.context_menu_visible())
3157 .await;
3158 editor_b.update(cx_b, |editor, cx| {
3159 editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
3160 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
3161 });
3162
3163 // Return a resolved completion from the host's language server.
3164 // The resolved completion has an additional text edit.
3165 fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
3166 |params, _| async move {
3167 assert_eq!(params.label, "first_method(…)");
3168 Ok(lsp::CompletionItem {
3169 label: "first_method(…)".into(),
3170 detail: Some("fn(&mut self, B) -> C".into()),
3171 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3172 new_text: "first_method($1)".to_string(),
3173 range: lsp::Range::new(
3174 lsp::Position::new(0, 14),
3175 lsp::Position::new(0, 14),
3176 ),
3177 })),
3178 additional_text_edits: Some(vec![lsp::TextEdit {
3179 new_text: "use d::SomeTrait;\n".to_string(),
3180 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
3181 }]),
3182 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3183 ..Default::default()
3184 })
3185 },
3186 );
3187
3188 // The additional edit is applied.
3189 buffer_a
3190 .condition(&cx_a, |buffer, _| {
3191 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3192 })
3193 .await;
3194 buffer_b
3195 .condition(&cx_b, |buffer, _| {
3196 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3197 })
3198 .await;
3199 }
3200
3201 #[gpui::test(iterations = 10)]
3202 async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3203 cx_a.foreground().forbid_parking();
3204 let lang_registry = Arc::new(LanguageRegistry::test());
3205 let fs = FakeFs::new(cx_a.background());
3206
3207 // Connect to a server as 2 clients.
3208 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3209 let client_a = server.create_client(cx_a, "user_a").await;
3210 let mut client_b = server.create_client(cx_b, "user_b").await;
3211 server
3212 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3213 .await;
3214
3215 // Share a project as client A
3216 fs.insert_tree(
3217 "/a",
3218 json!({
3219 "a.rs": "let one = 1;",
3220 }),
3221 )
3222 .await;
3223 let project_a = cx_a.update(|cx| {
3224 Project::local(
3225 client_a.clone(),
3226 client_a.user_store.clone(),
3227 lang_registry.clone(),
3228 fs.clone(),
3229 cx,
3230 )
3231 });
3232 let (worktree_a, _) = project_a
3233 .update(cx_a, |p, cx| {
3234 p.find_or_create_local_worktree("/a", true, cx)
3235 })
3236 .await
3237 .unwrap();
3238 worktree_a
3239 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3240 .await;
3241 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3242 let buffer_a = project_a
3243 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
3244 .await
3245 .unwrap();
3246
3247 // Join the worktree as client B.
3248 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3249
3250 let buffer_b = cx_b
3251 .background()
3252 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3253 .await
3254 .unwrap();
3255 buffer_b.update(cx_b, |buffer, cx| {
3256 buffer.edit([(4..7, "six")], cx);
3257 buffer.edit([(10..11, "6")], cx);
3258 assert_eq!(buffer.text(), "let six = 6;");
3259 assert!(buffer.is_dirty());
3260 assert!(!buffer.has_conflict());
3261 });
3262 buffer_a
3263 .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
3264 .await;
3265
3266 fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
3267 .await
3268 .unwrap();
3269 buffer_a
3270 .condition(cx_a, |buffer, _| buffer.has_conflict())
3271 .await;
3272 buffer_b
3273 .condition(cx_b, |buffer, _| buffer.has_conflict())
3274 .await;
3275
3276 project_b
3277 .update(cx_b, |project, cx| {
3278 project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
3279 })
3280 .await
3281 .unwrap();
3282 buffer_a.read_with(cx_a, |buffer, _| {
3283 assert_eq!(buffer.text(), "let seven = 7;");
3284 assert!(!buffer.is_dirty());
3285 assert!(!buffer.has_conflict());
3286 });
3287 buffer_b.read_with(cx_b, |buffer, _| {
3288 assert_eq!(buffer.text(), "let seven = 7;");
3289 assert!(!buffer.is_dirty());
3290 assert!(!buffer.has_conflict());
3291 });
3292
3293 buffer_a.update(cx_a, |buffer, cx| {
3294 // Undoing on the host is a no-op when the reload was initiated by the guest.
3295 buffer.undo(cx);
3296 assert_eq!(buffer.text(), "let seven = 7;");
3297 assert!(!buffer.is_dirty());
3298 assert!(!buffer.has_conflict());
3299 });
3300 buffer_b.update(cx_b, |buffer, cx| {
3301 // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
3302 buffer.undo(cx);
3303 assert_eq!(buffer.text(), "let six = 6;");
3304 assert!(buffer.is_dirty());
3305 assert!(!buffer.has_conflict());
3306 });
3307 }
3308
3309 #[gpui::test(iterations = 10)]
3310 async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3311 cx_a.foreground().forbid_parking();
3312 let lang_registry = Arc::new(LanguageRegistry::test());
3313 let fs = FakeFs::new(cx_a.background());
3314
3315 // Set up a fake language server.
3316 let mut language = Language::new(
3317 LanguageConfig {
3318 name: "Rust".into(),
3319 path_suffixes: vec!["rs".to_string()],
3320 ..Default::default()
3321 },
3322 Some(tree_sitter_rust::language()),
3323 );
3324 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3325 lang_registry.add(Arc::new(language));
3326
3327 // Connect to a server as 2 clients.
3328 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3329 let client_a = server.create_client(cx_a, "user_a").await;
3330 let mut client_b = server.create_client(cx_b, "user_b").await;
3331 server
3332 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3333 .await;
3334
3335 // Share a project as client A
3336 fs.insert_tree(
3337 "/a",
3338 json!({
3339 "a.rs": "let one = two",
3340 }),
3341 )
3342 .await;
3343 let project_a = cx_a.update(|cx| {
3344 Project::local(
3345 client_a.clone(),
3346 client_a.user_store.clone(),
3347 lang_registry.clone(),
3348 fs.clone(),
3349 cx,
3350 )
3351 });
3352 let (worktree_a, _) = project_a
3353 .update(cx_a, |p, cx| {
3354 p.find_or_create_local_worktree("/a", true, cx)
3355 })
3356 .await
3357 .unwrap();
3358 worktree_a
3359 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3360 .await;
3361 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3362
3363 // Join the project as client B.
3364 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3365
3366 let buffer_b = cx_b
3367 .background()
3368 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3369 .await
3370 .unwrap();
3371
3372 let fake_language_server = fake_language_servers.next().await.unwrap();
3373 fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
3374 Ok(Some(vec![
3375 lsp::TextEdit {
3376 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
3377 new_text: "h".to_string(),
3378 },
3379 lsp::TextEdit {
3380 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
3381 new_text: "y".to_string(),
3382 },
3383 ]))
3384 });
3385
3386 project_b
3387 .update(cx_b, |project, cx| {
3388 project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
3389 })
3390 .await
3391 .unwrap();
3392 assert_eq!(
3393 buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
3394 "let honey = two"
3395 );
3396 }
3397
3398 #[gpui::test(iterations = 10)]
3399 async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3400 cx_a.foreground().forbid_parking();
3401 let lang_registry = Arc::new(LanguageRegistry::test());
3402 let fs = FakeFs::new(cx_a.background());
3403 fs.insert_tree(
3404 "/root-1",
3405 json!({
3406 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
3407 }),
3408 )
3409 .await;
3410 fs.insert_tree(
3411 "/root-2",
3412 json!({
3413 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
3414 }),
3415 )
3416 .await;
3417
3418 // Set up a fake language server.
3419 let mut language = Language::new(
3420 LanguageConfig {
3421 name: "Rust".into(),
3422 path_suffixes: vec!["rs".to_string()],
3423 ..Default::default()
3424 },
3425 Some(tree_sitter_rust::language()),
3426 );
3427 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3428 lang_registry.add(Arc::new(language));
3429
3430 // Connect to a server as 2 clients.
3431 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3432 let client_a = server.create_client(cx_a, "user_a").await;
3433 let mut client_b = server.create_client(cx_b, "user_b").await;
3434 server
3435 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3436 .await;
3437
3438 // Share a project as client A
3439 let project_a = cx_a.update(|cx| {
3440 Project::local(
3441 client_a.clone(),
3442 client_a.user_store.clone(),
3443 lang_registry.clone(),
3444 fs.clone(),
3445 cx,
3446 )
3447 });
3448 let (worktree_a, _) = project_a
3449 .update(cx_a, |p, cx| {
3450 p.find_or_create_local_worktree("/root-1", true, cx)
3451 })
3452 .await
3453 .unwrap();
3454 worktree_a
3455 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3456 .await;
3457 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3458
3459 // Join the project as client B.
3460 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3461
3462 // Open the file on client B.
3463 let buffer_b = cx_b
3464 .background()
3465 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3466 .await
3467 .unwrap();
3468
3469 // Request the definition of a symbol as the guest.
3470 let fake_language_server = fake_language_servers.next().await.unwrap();
3471 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3472 |_, _| async move {
3473 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3474 lsp::Location::new(
3475 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3476 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3477 ),
3478 )))
3479 },
3480 );
3481
3482 let definitions_1 = project_b
3483 .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
3484 .await
3485 .unwrap();
3486 cx_b.read(|cx| {
3487 assert_eq!(definitions_1.len(), 1);
3488 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3489 let target_buffer = definitions_1[0].buffer.read(cx);
3490 assert_eq!(
3491 target_buffer.text(),
3492 "const TWO: usize = 2;\nconst THREE: usize = 3;"
3493 );
3494 assert_eq!(
3495 definitions_1[0].range.to_point(target_buffer),
3496 Point::new(0, 6)..Point::new(0, 9)
3497 );
3498 });
3499
3500 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
3501 // the previous call to `definition`.
3502 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3503 |_, _| async move {
3504 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3505 lsp::Location::new(
3506 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3507 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
3508 ),
3509 )))
3510 },
3511 );
3512
3513 let definitions_2 = project_b
3514 .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
3515 .await
3516 .unwrap();
3517 cx_b.read(|cx| {
3518 assert_eq!(definitions_2.len(), 1);
3519 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3520 let target_buffer = definitions_2[0].buffer.read(cx);
3521 assert_eq!(
3522 target_buffer.text(),
3523 "const TWO: usize = 2;\nconst THREE: usize = 3;"
3524 );
3525 assert_eq!(
3526 definitions_2[0].range.to_point(target_buffer),
3527 Point::new(1, 6)..Point::new(1, 11)
3528 );
3529 });
3530 assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
3531 }
3532
3533 #[gpui::test(iterations = 10)]
3534 async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3535 cx_a.foreground().forbid_parking();
3536 let lang_registry = Arc::new(LanguageRegistry::test());
3537 let fs = FakeFs::new(cx_a.background());
3538 fs.insert_tree(
3539 "/root-1",
3540 json!({
3541 "one.rs": "const ONE: usize = 1;",
3542 "two.rs": "const TWO: usize = one::ONE + one::ONE;",
3543 }),
3544 )
3545 .await;
3546 fs.insert_tree(
3547 "/root-2",
3548 json!({
3549 "three.rs": "const THREE: usize = two::TWO + one::ONE;",
3550 }),
3551 )
3552 .await;
3553
3554 // Set up a fake language server.
3555 let mut language = Language::new(
3556 LanguageConfig {
3557 name: "Rust".into(),
3558 path_suffixes: vec!["rs".to_string()],
3559 ..Default::default()
3560 },
3561 Some(tree_sitter_rust::language()),
3562 );
3563 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3564 lang_registry.add(Arc::new(language));
3565
3566 // Connect to a server as 2 clients.
3567 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3568 let client_a = server.create_client(cx_a, "user_a").await;
3569 let mut client_b = server.create_client(cx_b, "user_b").await;
3570 server
3571 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3572 .await;
3573
3574 // Share a project as client A
3575 let project_a = cx_a.update(|cx| {
3576 Project::local(
3577 client_a.clone(),
3578 client_a.user_store.clone(),
3579 lang_registry.clone(),
3580 fs.clone(),
3581 cx,
3582 )
3583 });
3584 let (worktree_a, _) = project_a
3585 .update(cx_a, |p, cx| {
3586 p.find_or_create_local_worktree("/root-1", true, cx)
3587 })
3588 .await
3589 .unwrap();
3590 worktree_a
3591 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3592 .await;
3593 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3594
3595 // Join the worktree as client B.
3596 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3597
3598 // Open the file on client B.
3599 let buffer_b = cx_b
3600 .background()
3601 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3602 .await
3603 .unwrap();
3604
3605 // Request references to a symbol as the guest.
3606 let fake_language_server = fake_language_servers.next().await.unwrap();
3607 fake_language_server.handle_request::<lsp::request::References, _, _>(
3608 |params, _| async move {
3609 assert_eq!(
3610 params.text_document_position.text_document.uri.as_str(),
3611 "file:///root-1/one.rs"
3612 );
3613 Ok(Some(vec![
3614 lsp::Location {
3615 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3616 range: lsp::Range::new(
3617 lsp::Position::new(0, 24),
3618 lsp::Position::new(0, 27),
3619 ),
3620 },
3621 lsp::Location {
3622 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3623 range: lsp::Range::new(
3624 lsp::Position::new(0, 35),
3625 lsp::Position::new(0, 38),
3626 ),
3627 },
3628 lsp::Location {
3629 uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3630 range: lsp::Range::new(
3631 lsp::Position::new(0, 37),
3632 lsp::Position::new(0, 40),
3633 ),
3634 },
3635 ]))
3636 },
3637 );
3638
3639 let references = project_b
3640 .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3641 .await
3642 .unwrap();
3643 cx_b.read(|cx| {
3644 assert_eq!(references.len(), 3);
3645 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3646
3647 let two_buffer = references[0].buffer.read(cx);
3648 let three_buffer = references[2].buffer.read(cx);
3649 assert_eq!(
3650 two_buffer.file().unwrap().path().as_ref(),
3651 Path::new("two.rs")
3652 );
3653 assert_eq!(references[1].buffer, references[0].buffer);
3654 assert_eq!(
3655 three_buffer.file().unwrap().full_path(cx),
3656 Path::new("three.rs")
3657 );
3658
3659 assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3660 assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3661 assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3662 });
3663 }
3664
3665 #[gpui::test(iterations = 10)]
3666 async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3667 cx_a.foreground().forbid_parking();
3668 let lang_registry = Arc::new(LanguageRegistry::test());
3669 let fs = FakeFs::new(cx_a.background());
3670 fs.insert_tree(
3671 "/root-1",
3672 json!({
3673 "a": "hello world",
3674 "b": "goodnight moon",
3675 "c": "a world of goo",
3676 "d": "world champion of clown world",
3677 }),
3678 )
3679 .await;
3680 fs.insert_tree(
3681 "/root-2",
3682 json!({
3683 "e": "disney world is fun",
3684 }),
3685 )
3686 .await;
3687
3688 // Connect to a server as 2 clients.
3689 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3690 let client_a = server.create_client(cx_a, "user_a").await;
3691 let mut client_b = server.create_client(cx_b, "user_b").await;
3692 server
3693 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3694 .await;
3695
3696 // Share a project as client A
3697 let project_a = cx_a.update(|cx| {
3698 Project::local(
3699 client_a.clone(),
3700 client_a.user_store.clone(),
3701 lang_registry.clone(),
3702 fs.clone(),
3703 cx,
3704 )
3705 });
3706
3707 let (worktree_1, _) = project_a
3708 .update(cx_a, |p, cx| {
3709 p.find_or_create_local_worktree("/root-1", true, cx)
3710 })
3711 .await
3712 .unwrap();
3713 worktree_1
3714 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3715 .await;
3716 let (worktree_2, _) = project_a
3717 .update(cx_a, |p, cx| {
3718 p.find_or_create_local_worktree("/root-2", true, cx)
3719 })
3720 .await
3721 .unwrap();
3722 worktree_2
3723 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3724 .await;
3725
3726 // Join the worktree as client B.
3727 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3728 let results = project_b
3729 .update(cx_b, |project, cx| {
3730 project.search(SearchQuery::text("world", false, false), cx)
3731 })
3732 .await
3733 .unwrap();
3734
3735 let mut ranges_by_path = results
3736 .into_iter()
3737 .map(|(buffer, ranges)| {
3738 buffer.read_with(cx_b, |buffer, cx| {
3739 let path = buffer.file().unwrap().full_path(cx);
3740 let offset_ranges = ranges
3741 .into_iter()
3742 .map(|range| range.to_offset(buffer))
3743 .collect::<Vec<_>>();
3744 (path, offset_ranges)
3745 })
3746 })
3747 .collect::<Vec<_>>();
3748 ranges_by_path.sort_by_key(|(path, _)| path.clone());
3749
3750 assert_eq!(
3751 ranges_by_path,
3752 &[
3753 (PathBuf::from("root-1/a"), vec![6..11]),
3754 (PathBuf::from("root-1/c"), vec![2..7]),
3755 (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3756 (PathBuf::from("root-2/e"), vec![7..12]),
3757 ]
3758 );
3759 }
3760
3761 #[gpui::test(iterations = 10)]
3762 async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3763 cx_a.foreground().forbid_parking();
3764 let lang_registry = Arc::new(LanguageRegistry::test());
3765 let fs = FakeFs::new(cx_a.background());
3766 fs.insert_tree(
3767 "/root-1",
3768 json!({
3769 "main.rs": "fn double(number: i32) -> i32 { number + number }",
3770 }),
3771 )
3772 .await;
3773
3774 // Set up a fake language server.
3775 let mut language = Language::new(
3776 LanguageConfig {
3777 name: "Rust".into(),
3778 path_suffixes: vec!["rs".to_string()],
3779 ..Default::default()
3780 },
3781 Some(tree_sitter_rust::language()),
3782 );
3783 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3784 lang_registry.add(Arc::new(language));
3785
3786 // Connect to a server as 2 clients.
3787 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3788 let client_a = server.create_client(cx_a, "user_a").await;
3789 let mut client_b = server.create_client(cx_b, "user_b").await;
3790 server
3791 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3792 .await;
3793
3794 // Share a project as client A
3795 let project_a = cx_a.update(|cx| {
3796 Project::local(
3797 client_a.clone(),
3798 client_a.user_store.clone(),
3799 lang_registry.clone(),
3800 fs.clone(),
3801 cx,
3802 )
3803 });
3804 let (worktree_a, _) = project_a
3805 .update(cx_a, |p, cx| {
3806 p.find_or_create_local_worktree("/root-1", true, cx)
3807 })
3808 .await
3809 .unwrap();
3810 worktree_a
3811 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3812 .await;
3813 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3814
3815 // Join the worktree as client B.
3816 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3817
3818 // Open the file on client B.
3819 let buffer_b = cx_b
3820 .background()
3821 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
3822 .await
3823 .unwrap();
3824
3825 // Request document highlights as the guest.
3826 let fake_language_server = fake_language_servers.next().await.unwrap();
3827 fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
3828 |params, _| async move {
3829 assert_eq!(
3830 params
3831 .text_document_position_params
3832 .text_document
3833 .uri
3834 .as_str(),
3835 "file:///root-1/main.rs"
3836 );
3837 assert_eq!(
3838 params.text_document_position_params.position,
3839 lsp::Position::new(0, 34)
3840 );
3841 Ok(Some(vec![
3842 lsp::DocumentHighlight {
3843 kind: Some(lsp::DocumentHighlightKind::WRITE),
3844 range: lsp::Range::new(
3845 lsp::Position::new(0, 10),
3846 lsp::Position::new(0, 16),
3847 ),
3848 },
3849 lsp::DocumentHighlight {
3850 kind: Some(lsp::DocumentHighlightKind::READ),
3851 range: lsp::Range::new(
3852 lsp::Position::new(0, 32),
3853 lsp::Position::new(0, 38),
3854 ),
3855 },
3856 lsp::DocumentHighlight {
3857 kind: Some(lsp::DocumentHighlightKind::READ),
3858 range: lsp::Range::new(
3859 lsp::Position::new(0, 41),
3860 lsp::Position::new(0, 47),
3861 ),
3862 },
3863 ]))
3864 },
3865 );
3866
3867 let highlights = project_b
3868 .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
3869 .await
3870 .unwrap();
3871 buffer_b.read_with(cx_b, |buffer, _| {
3872 let snapshot = buffer.snapshot();
3873
3874 let highlights = highlights
3875 .into_iter()
3876 .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3877 .collect::<Vec<_>>();
3878 assert_eq!(
3879 highlights,
3880 &[
3881 (lsp::DocumentHighlightKind::WRITE, 10..16),
3882 (lsp::DocumentHighlightKind::READ, 32..38),
3883 (lsp::DocumentHighlightKind::READ, 41..47)
3884 ]
3885 )
3886 });
3887 }
3888
3889 #[gpui::test(iterations = 10)]
3890 async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3891 cx_a.foreground().forbid_parking();
3892 let lang_registry = Arc::new(LanguageRegistry::test());
3893 let fs = FakeFs::new(cx_a.background());
3894 fs.insert_tree(
3895 "/code",
3896 json!({
3897 "crate-1": {
3898 "one.rs": "const ONE: usize = 1;",
3899 },
3900 "crate-2": {
3901 "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3902 },
3903 "private": {
3904 "passwords.txt": "the-password",
3905 }
3906 }),
3907 )
3908 .await;
3909
3910 // Set up a fake language server.
3911 let mut language = Language::new(
3912 LanguageConfig {
3913 name: "Rust".into(),
3914 path_suffixes: vec!["rs".to_string()],
3915 ..Default::default()
3916 },
3917 Some(tree_sitter_rust::language()),
3918 );
3919 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3920 lang_registry.add(Arc::new(language));
3921
3922 // Connect to a server as 2 clients.
3923 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3924 let client_a = server.create_client(cx_a, "user_a").await;
3925 let mut client_b = server.create_client(cx_b, "user_b").await;
3926 server
3927 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3928 .await;
3929
3930 // Share a project as client A
3931 let project_a = cx_a.update(|cx| {
3932 Project::local(
3933 client_a.clone(),
3934 client_a.user_store.clone(),
3935 lang_registry.clone(),
3936 fs.clone(),
3937 cx,
3938 )
3939 });
3940 let (worktree_a, _) = project_a
3941 .update(cx_a, |p, cx| {
3942 p.find_or_create_local_worktree("/code/crate-1", true, cx)
3943 })
3944 .await
3945 .unwrap();
3946 worktree_a
3947 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3948 .await;
3949 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3950
3951 // Join the worktree as client B.
3952 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3953
3954 // Cause the language server to start.
3955 let _buffer = cx_b
3956 .background()
3957 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3958 .await
3959 .unwrap();
3960
3961 let fake_language_server = fake_language_servers.next().await.unwrap();
3962 fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
3963 |_, _| async move {
3964 #[allow(deprecated)]
3965 Ok(Some(vec![lsp::SymbolInformation {
3966 name: "TWO".into(),
3967 location: lsp::Location {
3968 uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3969 range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3970 },
3971 kind: lsp::SymbolKind::CONSTANT,
3972 tags: None,
3973 container_name: None,
3974 deprecated: None,
3975 }]))
3976 },
3977 );
3978
3979 // Request the definition of a symbol as the guest.
3980 let symbols = project_b
3981 .update(cx_b, |p, cx| p.symbols("two", cx))
3982 .await
3983 .unwrap();
3984 assert_eq!(symbols.len(), 1);
3985 assert_eq!(symbols[0].name, "TWO");
3986
3987 // Open one of the returned symbols.
3988 let buffer_b_2 = project_b
3989 .update(cx_b, |project, cx| {
3990 project.open_buffer_for_symbol(&symbols[0], cx)
3991 })
3992 .await
3993 .unwrap();
3994 buffer_b_2.read_with(cx_b, |buffer, _| {
3995 assert_eq!(
3996 buffer.file().unwrap().path().as_ref(),
3997 Path::new("../crate-2/two.rs")
3998 );
3999 });
4000
4001 // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
4002 let mut fake_symbol = symbols[0].clone();
4003 fake_symbol.path = Path::new("/code/secrets").into();
4004 let error = project_b
4005 .update(cx_b, |project, cx| {
4006 project.open_buffer_for_symbol(&fake_symbol, cx)
4007 })
4008 .await
4009 .unwrap_err();
4010 assert!(error.to_string().contains("invalid symbol signature"));
4011 }
4012
4013 #[gpui::test(iterations = 10)]
4014 async fn test_open_buffer_while_getting_definition_pointing_to_it(
4015 cx_a: &mut TestAppContext,
4016 cx_b: &mut TestAppContext,
4017 mut rng: StdRng,
4018 ) {
4019 cx_a.foreground().forbid_parking();
4020 let lang_registry = Arc::new(LanguageRegistry::test());
4021 let fs = FakeFs::new(cx_a.background());
4022 fs.insert_tree(
4023 "/root",
4024 json!({
4025 "a.rs": "const ONE: usize = b::TWO;",
4026 "b.rs": "const TWO: usize = 2",
4027 }),
4028 )
4029 .await;
4030
4031 // Set up a fake language server.
4032 let mut language = Language::new(
4033 LanguageConfig {
4034 name: "Rust".into(),
4035 path_suffixes: vec!["rs".to_string()],
4036 ..Default::default()
4037 },
4038 Some(tree_sitter_rust::language()),
4039 );
4040 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4041 lang_registry.add(Arc::new(language));
4042
4043 // Connect to a server as 2 clients.
4044 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4045 let client_a = server.create_client(cx_a, "user_a").await;
4046 let mut client_b = server.create_client(cx_b, "user_b").await;
4047 server
4048 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4049 .await;
4050
4051 // Share a project as client A
4052 let project_a = cx_a.update(|cx| {
4053 Project::local(
4054 client_a.clone(),
4055 client_a.user_store.clone(),
4056 lang_registry.clone(),
4057 fs.clone(),
4058 cx,
4059 )
4060 });
4061
4062 let (worktree_a, _) = project_a
4063 .update(cx_a, |p, cx| {
4064 p.find_or_create_local_worktree("/root", true, cx)
4065 })
4066 .await
4067 .unwrap();
4068 worktree_a
4069 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4070 .await;
4071 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4072
4073 // Join the project as client B.
4074 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4075
4076 let buffer_b1 = cx_b
4077 .background()
4078 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
4079 .await
4080 .unwrap();
4081
4082 let fake_language_server = fake_language_servers.next().await.unwrap();
4083 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
4084 |_, _| async move {
4085 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
4086 lsp::Location::new(
4087 lsp::Url::from_file_path("/root/b.rs").unwrap(),
4088 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
4089 ),
4090 )))
4091 },
4092 );
4093
4094 let definitions;
4095 let buffer_b2;
4096 if rng.gen() {
4097 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4098 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4099 } else {
4100 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4101 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4102 }
4103
4104 let buffer_b2 = buffer_b2.await.unwrap();
4105 let definitions = definitions.await.unwrap();
4106 assert_eq!(definitions.len(), 1);
4107 assert_eq!(definitions[0].buffer, buffer_b2);
4108 }
4109
4110 #[gpui::test(iterations = 10)]
4111 async fn test_collaborating_with_code_actions(
4112 cx_a: &mut TestAppContext,
4113 cx_b: &mut TestAppContext,
4114 ) {
4115 cx_a.foreground().forbid_parking();
4116 let lang_registry = Arc::new(LanguageRegistry::test());
4117 let fs = FakeFs::new(cx_a.background());
4118 cx_b.update(|cx| editor::init(cx));
4119
4120 // Set up a fake language server.
4121 let mut language = Language::new(
4122 LanguageConfig {
4123 name: "Rust".into(),
4124 path_suffixes: vec!["rs".to_string()],
4125 ..Default::default()
4126 },
4127 Some(tree_sitter_rust::language()),
4128 );
4129 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4130 lang_registry.add(Arc::new(language));
4131
4132 // Connect to a server as 2 clients.
4133 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4134 let client_a = server.create_client(cx_a, "user_a").await;
4135 let mut client_b = server.create_client(cx_b, "user_b").await;
4136 server
4137 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4138 .await;
4139
4140 // Share a project as client A
4141 fs.insert_tree(
4142 "/a",
4143 json!({
4144 "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
4145 "other.rs": "pub fn foo() -> usize { 4 }",
4146 }),
4147 )
4148 .await;
4149 let project_a = cx_a.update(|cx| {
4150 Project::local(
4151 client_a.clone(),
4152 client_a.user_store.clone(),
4153 lang_registry.clone(),
4154 fs.clone(),
4155 cx,
4156 )
4157 });
4158 let (worktree_a, _) = project_a
4159 .update(cx_a, |p, cx| {
4160 p.find_or_create_local_worktree("/a", true, cx)
4161 })
4162 .await
4163 .unwrap();
4164 worktree_a
4165 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4166 .await;
4167 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4168
4169 // Join the project as client B.
4170 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4171 let mut params = cx_b.update(WorkspaceParams::test);
4172 params.languages = lang_registry.clone();
4173 params.project = project_b.clone();
4174 params.client = client_b.client.clone();
4175 params.user_store = client_b.user_store.clone();
4176
4177 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
4178 let editor_b = workspace_b
4179 .update(cx_b, |workspace, cx| {
4180 workspace.open_path((worktree_id, "main.rs"), true, cx)
4181 })
4182 .await
4183 .unwrap()
4184 .downcast::<Editor>()
4185 .unwrap();
4186
4187 let mut fake_language_server = fake_language_servers.next().await.unwrap();
4188 fake_language_server
4189 .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4190 assert_eq!(
4191 params.text_document.uri,
4192 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4193 );
4194 assert_eq!(params.range.start, lsp::Position::new(0, 0));
4195 assert_eq!(params.range.end, lsp::Position::new(0, 0));
4196 Ok(None)
4197 })
4198 .next()
4199 .await;
4200
4201 // Move cursor to a location that contains code actions.
4202 editor_b.update(cx_b, |editor, cx| {
4203 editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
4204 cx.focus(&editor_b);
4205 });
4206
4207 fake_language_server
4208 .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4209 assert_eq!(
4210 params.text_document.uri,
4211 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4212 );
4213 assert_eq!(params.range.start, lsp::Position::new(1, 31));
4214 assert_eq!(params.range.end, lsp::Position::new(1, 31));
4215
4216 Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
4217 lsp::CodeAction {
4218 title: "Inline into all callers".to_string(),
4219 edit: Some(lsp::WorkspaceEdit {
4220 changes: Some(
4221 [
4222 (
4223 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4224 vec![lsp::TextEdit::new(
4225 lsp::Range::new(
4226 lsp::Position::new(1, 22),
4227 lsp::Position::new(1, 34),
4228 ),
4229 "4".to_string(),
4230 )],
4231 ),
4232 (
4233 lsp::Url::from_file_path("/a/other.rs").unwrap(),
4234 vec![lsp::TextEdit::new(
4235 lsp::Range::new(
4236 lsp::Position::new(0, 0),
4237 lsp::Position::new(0, 27),
4238 ),
4239 "".to_string(),
4240 )],
4241 ),
4242 ]
4243 .into_iter()
4244 .collect(),
4245 ),
4246 ..Default::default()
4247 }),
4248 data: Some(json!({
4249 "codeActionParams": {
4250 "range": {
4251 "start": {"line": 1, "column": 31},
4252 "end": {"line": 1, "column": 31},
4253 }
4254 }
4255 })),
4256 ..Default::default()
4257 },
4258 )]))
4259 })
4260 .next()
4261 .await;
4262
4263 // Toggle code actions and wait for them to display.
4264 editor_b.update(cx_b, |editor, cx| {
4265 editor.toggle_code_actions(
4266 &ToggleCodeActions {
4267 deployed_from_indicator: false,
4268 },
4269 cx,
4270 );
4271 });
4272 editor_b
4273 .condition(&cx_b, |editor, _| editor.context_menu_visible())
4274 .await;
4275
4276 fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
4277
4278 // Confirming the code action will trigger a resolve request.
4279 let confirm_action = workspace_b
4280 .update(cx_b, |workspace, cx| {
4281 Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
4282 })
4283 .unwrap();
4284 fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
4285 |_, _| async move {
4286 Ok(lsp::CodeAction {
4287 title: "Inline into all callers".to_string(),
4288 edit: Some(lsp::WorkspaceEdit {
4289 changes: Some(
4290 [
4291 (
4292 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4293 vec![lsp::TextEdit::new(
4294 lsp::Range::new(
4295 lsp::Position::new(1, 22),
4296 lsp::Position::new(1, 34),
4297 ),
4298 "4".to_string(),
4299 )],
4300 ),
4301 (
4302 lsp::Url::from_file_path("/a/other.rs").unwrap(),
4303 vec![lsp::TextEdit::new(
4304 lsp::Range::new(
4305 lsp::Position::new(0, 0),
4306 lsp::Position::new(0, 27),
4307 ),
4308 "".to_string(),
4309 )],
4310 ),
4311 ]
4312 .into_iter()
4313 .collect(),
4314 ),
4315 ..Default::default()
4316 }),
4317 ..Default::default()
4318 })
4319 },
4320 );
4321
4322 // After the action is confirmed, an editor containing both modified files is opened.
4323 confirm_action.await.unwrap();
4324 let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4325 workspace
4326 .active_item(cx)
4327 .unwrap()
4328 .downcast::<Editor>()
4329 .unwrap()
4330 });
4331 code_action_editor.update(cx_b, |editor, cx| {
4332 assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4333 editor.undo(&Undo, cx);
4334 assert_eq!(
4335 editor.text(cx),
4336 "mod other;\nfn main() { let foo = other::foo(); }\npub fn foo() -> usize { 4 }"
4337 );
4338 editor.redo(&Redo, cx);
4339 assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4340 });
4341 }
4342
4343 #[gpui::test(iterations = 10)]
4344 async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4345 cx_a.foreground().forbid_parking();
4346 let lang_registry = Arc::new(LanguageRegistry::test());
4347 let fs = FakeFs::new(cx_a.background());
4348 cx_b.update(|cx| editor::init(cx));
4349
4350 // Set up a fake language server.
4351 let mut language = Language::new(
4352 LanguageConfig {
4353 name: "Rust".into(),
4354 path_suffixes: vec!["rs".to_string()],
4355 ..Default::default()
4356 },
4357 Some(tree_sitter_rust::language()),
4358 );
4359 let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
4360 capabilities: lsp::ServerCapabilities {
4361 rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
4362 prepare_provider: Some(true),
4363 work_done_progress_options: Default::default(),
4364 })),
4365 ..Default::default()
4366 },
4367 ..Default::default()
4368 });
4369 lang_registry.add(Arc::new(language));
4370
4371 // Connect to a server as 2 clients.
4372 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4373 let client_a = server.create_client(cx_a, "user_a").await;
4374 let mut client_b = server.create_client(cx_b, "user_b").await;
4375 server
4376 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4377 .await;
4378
4379 // Share a project as client A
4380 fs.insert_tree(
4381 "/dir",
4382 json!({
4383 "one.rs": "const ONE: usize = 1;",
4384 "two.rs": "const TWO: usize = one::ONE + one::ONE;"
4385 }),
4386 )
4387 .await;
4388 let project_a = cx_a.update(|cx| {
4389 Project::local(
4390 client_a.clone(),
4391 client_a.user_store.clone(),
4392 lang_registry.clone(),
4393 fs.clone(),
4394 cx,
4395 )
4396 });
4397 let (worktree_a, _) = project_a
4398 .update(cx_a, |p, cx| {
4399 p.find_or_create_local_worktree("/dir", true, cx)
4400 })
4401 .await
4402 .unwrap();
4403 worktree_a
4404 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4405 .await;
4406 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4407
4408 // Join the worktree as client B.
4409 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4410 let mut params = cx_b.update(WorkspaceParams::test);
4411 params.languages = lang_registry.clone();
4412 params.project = project_b.clone();
4413 params.client = client_b.client.clone();
4414 params.user_store = client_b.user_store.clone();
4415
4416 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
4417 let editor_b = workspace_b
4418 .update(cx_b, |workspace, cx| {
4419 workspace.open_path((worktree_id, "one.rs"), true, cx)
4420 })
4421 .await
4422 .unwrap()
4423 .downcast::<Editor>()
4424 .unwrap();
4425 let fake_language_server = fake_language_servers.next().await.unwrap();
4426
4427 // Move cursor to a location that can be renamed.
4428 let prepare_rename = editor_b.update(cx_b, |editor, cx| {
4429 editor.select_ranges([7..7], None, cx);
4430 editor.rename(&Rename, cx).unwrap()
4431 });
4432
4433 fake_language_server
4434 .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
4435 assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
4436 assert_eq!(params.position, lsp::Position::new(0, 7));
4437 Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4438 lsp::Position::new(0, 6),
4439 lsp::Position::new(0, 9),
4440 ))))
4441 })
4442 .next()
4443 .await
4444 .unwrap();
4445 prepare_rename.await.unwrap();
4446 editor_b.update(cx_b, |editor, cx| {
4447 let rename = editor.pending_rename().unwrap();
4448 let buffer = editor.buffer().read(cx).snapshot(cx);
4449 assert_eq!(
4450 rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
4451 6..9
4452 );
4453 rename.editor.update(cx, |rename_editor, cx| {
4454 rename_editor.buffer().update(cx, |rename_buffer, cx| {
4455 rename_buffer.edit([(0..3, "THREE")], cx);
4456 });
4457 });
4458 });
4459
4460 let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
4461 Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
4462 });
4463 fake_language_server
4464 .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
4465 assert_eq!(
4466 params.text_document_position.text_document.uri.as_str(),
4467 "file:///dir/one.rs"
4468 );
4469 assert_eq!(
4470 params.text_document_position.position,
4471 lsp::Position::new(0, 6)
4472 );
4473 assert_eq!(params.new_name, "THREE");
4474 Ok(Some(lsp::WorkspaceEdit {
4475 changes: Some(
4476 [
4477 (
4478 lsp::Url::from_file_path("/dir/one.rs").unwrap(),
4479 vec![lsp::TextEdit::new(
4480 lsp::Range::new(
4481 lsp::Position::new(0, 6),
4482 lsp::Position::new(0, 9),
4483 ),
4484 "THREE".to_string(),
4485 )],
4486 ),
4487 (
4488 lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4489 vec![
4490 lsp::TextEdit::new(
4491 lsp::Range::new(
4492 lsp::Position::new(0, 24),
4493 lsp::Position::new(0, 27),
4494 ),
4495 "THREE".to_string(),
4496 ),
4497 lsp::TextEdit::new(
4498 lsp::Range::new(
4499 lsp::Position::new(0, 35),
4500 lsp::Position::new(0, 38),
4501 ),
4502 "THREE".to_string(),
4503 ),
4504 ],
4505 ),
4506 ]
4507 .into_iter()
4508 .collect(),
4509 ),
4510 ..Default::default()
4511 }))
4512 })
4513 .next()
4514 .await
4515 .unwrap();
4516 confirm_rename.await.unwrap();
4517
4518 let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4519 workspace
4520 .active_item(cx)
4521 .unwrap()
4522 .downcast::<Editor>()
4523 .unwrap()
4524 });
4525 rename_editor.update(cx_b, |editor, cx| {
4526 assert_eq!(
4527 editor.text(cx),
4528 "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4529 );
4530 editor.undo(&Undo, cx);
4531 assert_eq!(
4532 editor.text(cx),
4533 "const ONE: usize = 1;\nconst TWO: usize = one::ONE + one::ONE;"
4534 );
4535 editor.redo(&Redo, cx);
4536 assert_eq!(
4537 editor.text(cx),
4538 "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4539 );
4540 });
4541
4542 // Ensure temporary rename edits cannot be undone/redone.
4543 editor_b.update(cx_b, |editor, cx| {
4544 editor.undo(&Undo, cx);
4545 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4546 editor.undo(&Undo, cx);
4547 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4548 editor.redo(&Redo, cx);
4549 assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4550 })
4551 }
4552
4553 #[gpui::test(iterations = 10)]
4554 async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4555 cx_a.foreground().forbid_parking();
4556
4557 // Connect to a server as 2 clients.
4558 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4559 let client_a = server.create_client(cx_a, "user_a").await;
4560 let client_b = server.create_client(cx_b, "user_b").await;
4561
4562 // Create an org that includes these 2 users.
4563 let db = &server.app_state.db;
4564 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4565 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4566 .await
4567 .unwrap();
4568 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4569 .await
4570 .unwrap();
4571
4572 // Create a channel that includes all the users.
4573 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4574 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4575 .await
4576 .unwrap();
4577 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4578 .await
4579 .unwrap();
4580 db.create_channel_message(
4581 channel_id,
4582 client_b.current_user_id(&cx_b),
4583 "hello A, it's B.",
4584 OffsetDateTime::now_utc(),
4585 1,
4586 )
4587 .await
4588 .unwrap();
4589
4590 let channels_a = cx_a
4591 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4592 channels_a
4593 .condition(cx_a, |list, _| list.available_channels().is_some())
4594 .await;
4595 channels_a.read_with(cx_a, |list, _| {
4596 assert_eq!(
4597 list.available_channels().unwrap(),
4598 &[ChannelDetails {
4599 id: channel_id.to_proto(),
4600 name: "test-channel".to_string()
4601 }]
4602 )
4603 });
4604 let channel_a = channels_a.update(cx_a, |this, cx| {
4605 this.get_channel(channel_id.to_proto(), cx).unwrap()
4606 });
4607 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4608 channel_a
4609 .condition(&cx_a, |channel, _| {
4610 channel_messages(channel)
4611 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4612 })
4613 .await;
4614
4615 let channels_b = cx_b
4616 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4617 channels_b
4618 .condition(cx_b, |list, _| list.available_channels().is_some())
4619 .await;
4620 channels_b.read_with(cx_b, |list, _| {
4621 assert_eq!(
4622 list.available_channels().unwrap(),
4623 &[ChannelDetails {
4624 id: channel_id.to_proto(),
4625 name: "test-channel".to_string()
4626 }]
4627 )
4628 });
4629
4630 let channel_b = channels_b.update(cx_b, |this, cx| {
4631 this.get_channel(channel_id.to_proto(), cx).unwrap()
4632 });
4633 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4634 channel_b
4635 .condition(&cx_b, |channel, _| {
4636 channel_messages(channel)
4637 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4638 })
4639 .await;
4640
4641 channel_a
4642 .update(cx_a, |channel, cx| {
4643 channel
4644 .send_message("oh, hi B.".to_string(), cx)
4645 .unwrap()
4646 .detach();
4647 let task = channel.send_message("sup".to_string(), cx).unwrap();
4648 assert_eq!(
4649 channel_messages(channel),
4650 &[
4651 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4652 ("user_a".to_string(), "oh, hi B.".to_string(), true),
4653 ("user_a".to_string(), "sup".to_string(), true)
4654 ]
4655 );
4656 task
4657 })
4658 .await
4659 .unwrap();
4660
4661 channel_b
4662 .condition(&cx_b, |channel, _| {
4663 channel_messages(channel)
4664 == [
4665 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4666 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4667 ("user_a".to_string(), "sup".to_string(), false),
4668 ]
4669 })
4670 .await;
4671
4672 assert_eq!(
4673 server
4674 .state()
4675 .await
4676 .channel(channel_id)
4677 .unwrap()
4678 .connection_ids
4679 .len(),
4680 2
4681 );
4682 cx_b.update(|_| drop(channel_b));
4683 server
4684 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4685 .await;
4686
4687 cx_a.update(|_| drop(channel_a));
4688 server
4689 .condition(|state| state.channel(channel_id).is_none())
4690 .await;
4691 }
4692
4693 #[gpui::test(iterations = 10)]
4694 async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4695 cx_a.foreground().forbid_parking();
4696
4697 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4698 let client_a = server.create_client(cx_a, "user_a").await;
4699
4700 let db = &server.app_state.db;
4701 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4702 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4703 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4704 .await
4705 .unwrap();
4706 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4707 .await
4708 .unwrap();
4709
4710 let channels_a = cx_a
4711 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4712 channels_a
4713 .condition(cx_a, |list, _| list.available_channels().is_some())
4714 .await;
4715 let channel_a = channels_a.update(cx_a, |this, cx| {
4716 this.get_channel(channel_id.to_proto(), cx).unwrap()
4717 });
4718
4719 // Messages aren't allowed to be too long.
4720 channel_a
4721 .update(cx_a, |channel, cx| {
4722 let long_body = "this is long.\n".repeat(1024);
4723 channel.send_message(long_body, cx).unwrap()
4724 })
4725 .await
4726 .unwrap_err();
4727
4728 // Messages aren't allowed to be blank.
4729 channel_a.update(cx_a, |channel, cx| {
4730 channel.send_message(String::new(), cx).unwrap_err()
4731 });
4732
4733 // Leading and trailing whitespace are trimmed.
4734 channel_a
4735 .update(cx_a, |channel, cx| {
4736 channel
4737 .send_message("\n surrounded by whitespace \n".to_string(), cx)
4738 .unwrap()
4739 })
4740 .await
4741 .unwrap();
4742 assert_eq!(
4743 db.get_channel_messages(channel_id, 10, None)
4744 .await
4745 .unwrap()
4746 .iter()
4747 .map(|m| &m.body)
4748 .collect::<Vec<_>>(),
4749 &["surrounded by whitespace"]
4750 );
4751 }
4752
4753 #[gpui::test(iterations = 10)]
4754 async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4755 cx_a.foreground().forbid_parking();
4756
4757 // Connect to a server as 2 clients.
4758 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4759 let client_a = server.create_client(cx_a, "user_a").await;
4760 let client_b = server.create_client(cx_b, "user_b").await;
4761 let mut status_b = client_b.status();
4762
4763 // Create an org that includes these 2 users.
4764 let db = &server.app_state.db;
4765 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4766 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4767 .await
4768 .unwrap();
4769 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4770 .await
4771 .unwrap();
4772
4773 // Create a channel that includes all the users.
4774 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4775 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4776 .await
4777 .unwrap();
4778 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4779 .await
4780 .unwrap();
4781 db.create_channel_message(
4782 channel_id,
4783 client_b.current_user_id(&cx_b),
4784 "hello A, it's B.",
4785 OffsetDateTime::now_utc(),
4786 2,
4787 )
4788 .await
4789 .unwrap();
4790
4791 let channels_a = cx_a
4792 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4793 channels_a
4794 .condition(cx_a, |list, _| list.available_channels().is_some())
4795 .await;
4796
4797 channels_a.read_with(cx_a, |list, _| {
4798 assert_eq!(
4799 list.available_channels().unwrap(),
4800 &[ChannelDetails {
4801 id: channel_id.to_proto(),
4802 name: "test-channel".to_string()
4803 }]
4804 )
4805 });
4806 let channel_a = channels_a.update(cx_a, |this, cx| {
4807 this.get_channel(channel_id.to_proto(), cx).unwrap()
4808 });
4809 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4810 channel_a
4811 .condition(&cx_a, |channel, _| {
4812 channel_messages(channel)
4813 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4814 })
4815 .await;
4816
4817 let channels_b = cx_b
4818 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4819 channels_b
4820 .condition(cx_b, |list, _| list.available_channels().is_some())
4821 .await;
4822 channels_b.read_with(cx_b, |list, _| {
4823 assert_eq!(
4824 list.available_channels().unwrap(),
4825 &[ChannelDetails {
4826 id: channel_id.to_proto(),
4827 name: "test-channel".to_string()
4828 }]
4829 )
4830 });
4831
4832 let channel_b = channels_b.update(cx_b, |this, cx| {
4833 this.get_channel(channel_id.to_proto(), cx).unwrap()
4834 });
4835 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4836 channel_b
4837 .condition(&cx_b, |channel, _| {
4838 channel_messages(channel)
4839 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4840 })
4841 .await;
4842
4843 // Disconnect client B, ensuring we can still access its cached channel data.
4844 server.forbid_connections();
4845 server.disconnect_client(client_b.current_user_id(&cx_b));
4846 cx_b.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
4847 while !matches!(
4848 status_b.next().await,
4849 Some(client::Status::ReconnectionError { .. })
4850 ) {}
4851
4852 channels_b.read_with(cx_b, |channels, _| {
4853 assert_eq!(
4854 channels.available_channels().unwrap(),
4855 [ChannelDetails {
4856 id: channel_id.to_proto(),
4857 name: "test-channel".to_string()
4858 }]
4859 )
4860 });
4861 channel_b.read_with(cx_b, |channel, _| {
4862 assert_eq!(
4863 channel_messages(channel),
4864 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4865 )
4866 });
4867
4868 // Send a message from client B while it is disconnected.
4869 channel_b
4870 .update(cx_b, |channel, cx| {
4871 let task = channel
4872 .send_message("can you see this?".to_string(), cx)
4873 .unwrap();
4874 assert_eq!(
4875 channel_messages(channel),
4876 &[
4877 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4878 ("user_b".to_string(), "can you see this?".to_string(), true)
4879 ]
4880 );
4881 task
4882 })
4883 .await
4884 .unwrap_err();
4885
4886 // Send a message from client A while B is disconnected.
4887 channel_a
4888 .update(cx_a, |channel, cx| {
4889 channel
4890 .send_message("oh, hi B.".to_string(), cx)
4891 .unwrap()
4892 .detach();
4893 let task = channel.send_message("sup".to_string(), cx).unwrap();
4894 assert_eq!(
4895 channel_messages(channel),
4896 &[
4897 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4898 ("user_a".to_string(), "oh, hi B.".to_string(), true),
4899 ("user_a".to_string(), "sup".to_string(), true)
4900 ]
4901 );
4902 task
4903 })
4904 .await
4905 .unwrap();
4906
4907 // Give client B a chance to reconnect.
4908 server.allow_connections();
4909 cx_b.foreground().advance_clock(Duration::from_secs(10));
4910
4911 // Verify that B sees the new messages upon reconnection, as well as the message client B
4912 // sent while offline.
4913 channel_b
4914 .condition(&cx_b, |channel, _| {
4915 channel_messages(channel)
4916 == [
4917 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4918 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4919 ("user_a".to_string(), "sup".to_string(), false),
4920 ("user_b".to_string(), "can you see this?".to_string(), false),
4921 ]
4922 })
4923 .await;
4924
4925 // Ensure client A and B can communicate normally after reconnection.
4926 channel_a
4927 .update(cx_a, |channel, cx| {
4928 channel.send_message("you online?".to_string(), cx).unwrap()
4929 })
4930 .await
4931 .unwrap();
4932 channel_b
4933 .condition(&cx_b, |channel, _| {
4934 channel_messages(channel)
4935 == [
4936 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4937 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4938 ("user_a".to_string(), "sup".to_string(), false),
4939 ("user_b".to_string(), "can you see this?".to_string(), false),
4940 ("user_a".to_string(), "you online?".to_string(), false),
4941 ]
4942 })
4943 .await;
4944
4945 channel_b
4946 .update(cx_b, |channel, cx| {
4947 channel.send_message("yep".to_string(), cx).unwrap()
4948 })
4949 .await
4950 .unwrap();
4951 channel_a
4952 .condition(&cx_a, |channel, _| {
4953 channel_messages(channel)
4954 == [
4955 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4956 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4957 ("user_a".to_string(), "sup".to_string(), false),
4958 ("user_b".to_string(), "can you see this?".to_string(), false),
4959 ("user_a".to_string(), "you online?".to_string(), false),
4960 ("user_b".to_string(), "yep".to_string(), false),
4961 ]
4962 })
4963 .await;
4964 }
4965
4966 #[gpui::test(iterations = 10)]
4967 async fn test_contacts(
4968 deterministic: Arc<Deterministic>,
4969 cx_a: &mut TestAppContext,
4970 cx_b: &mut TestAppContext,
4971 cx_c: &mut TestAppContext,
4972 ) {
4973 cx_a.foreground().forbid_parking();
4974
4975 // Connect to a server as 3 clients.
4976 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4977 let mut client_a = server.create_client(cx_a, "user_a").await;
4978 let mut client_b = server.create_client(cx_b, "user_b").await;
4979 let client_c = server.create_client(cx_c, "user_c").await;
4980 server
4981 .make_contacts(vec![
4982 (&client_a, cx_a),
4983 (&client_b, cx_b),
4984 (&client_c, cx_c),
4985 ])
4986 .await;
4987
4988 deterministic.run_until_parked();
4989 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
4990 client.user_store.read_with(*cx, |store, _| {
4991 assert_eq!(
4992 contacts(store),
4993 [
4994 ("user_a", true, vec![]),
4995 ("user_b", true, vec![]),
4996 ("user_c", true, vec![])
4997 ],
4998 "{} has the wrong contacts",
4999 client.username
5000 )
5001 });
5002 }
5003
5004 // Share a project as client A.
5005 let fs = FakeFs::new(cx_a.background());
5006 fs.create_dir(Path::new("/a")).await.unwrap();
5007 let (project_a, _) = client_a.build_local_project(fs, "/a", cx_a).await;
5008
5009 deterministic.run_until_parked();
5010 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5011 client.user_store.read_with(*cx, |store, _| {
5012 assert_eq!(
5013 contacts(store),
5014 [
5015 ("user_a", true, vec![("a", vec![])]),
5016 ("user_b", true, vec![]),
5017 ("user_c", true, vec![])
5018 ],
5019 "{} has the wrong contacts",
5020 client.username
5021 )
5022 });
5023 }
5024
5025 let _project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5026
5027 deterministic.run_until_parked();
5028 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5029 client.user_store.read_with(*cx, |store, _| {
5030 assert_eq!(
5031 contacts(store),
5032 [
5033 ("user_a", true, vec![("a", vec!["user_b"])]),
5034 ("user_b", true, vec![]),
5035 ("user_c", true, vec![])
5036 ],
5037 "{} has the wrong contacts",
5038 client.username
5039 )
5040 });
5041 }
5042
5043 // Add a local project as client B
5044 let fs = FakeFs::new(cx_b.background());
5045 fs.create_dir(Path::new("/b")).await.unwrap();
5046 let (_project_b, _) = client_b.build_local_project(fs, "/b", cx_a).await;
5047
5048 deterministic.run_until_parked();
5049 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5050 client.user_store.read_with(*cx, |store, _| {
5051 assert_eq!(
5052 contacts(store),
5053 [
5054 ("user_a", true, vec![("a", vec!["user_b"])]),
5055 ("user_b", true, vec![("b", vec![])]),
5056 ("user_c", true, vec![])
5057 ],
5058 "{} has the wrong contacts",
5059 client.username
5060 )
5061 });
5062 }
5063
5064 project_a
5065 .condition(&cx_a, |project, _| {
5066 project.collaborators().contains_key(&client_b.peer_id)
5067 })
5068 .await;
5069
5070 client_a.project.take();
5071 cx_a.update(move |_| drop(project_a));
5072 deterministic.run_until_parked();
5073 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5074 client.user_store.read_with(*cx, |store, _| {
5075 assert_eq!(
5076 contacts(store),
5077 [
5078 ("user_a", true, vec![]),
5079 ("user_b", true, vec![("b", vec![])]),
5080 ("user_c", true, vec![])
5081 ],
5082 "{} has the wrong contacts",
5083 client.username
5084 )
5085 });
5086 }
5087
5088 server.disconnect_client(client_c.current_user_id(cx_c));
5089 server.forbid_connections();
5090 deterministic.advance_clock(rpc::RECEIVE_TIMEOUT);
5091 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b)] {
5092 client.user_store.read_with(*cx, |store, _| {
5093 assert_eq!(
5094 contacts(store),
5095 [
5096 ("user_a", true, vec![]),
5097 ("user_b", true, vec![("b", vec![])]),
5098 ("user_c", false, vec![])
5099 ],
5100 "{} has the wrong contacts",
5101 client.username
5102 )
5103 });
5104 }
5105 client_c
5106 .user_store
5107 .read_with(cx_c, |store, _| assert_eq!(contacts(store), []));
5108
5109 server.allow_connections();
5110 client_c
5111 .authenticate_and_connect(false, &cx_c.to_async())
5112 .await
5113 .unwrap();
5114
5115 deterministic.run_until_parked();
5116 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5117 client.user_store.read_with(*cx, |store, _| {
5118 assert_eq!(
5119 contacts(store),
5120 [
5121 ("user_a", true, vec![]),
5122 ("user_b", true, vec![("b", vec![])]),
5123 ("user_c", true, vec![])
5124 ],
5125 "{} has the wrong contacts",
5126 client.username
5127 )
5128 });
5129 }
5130
5131 fn contacts(user_store: &UserStore) -> Vec<(&str, bool, Vec<(&str, Vec<&str>)>)> {
5132 user_store
5133 .contacts()
5134 .iter()
5135 .map(|contact| {
5136 let projects = contact
5137 .projects
5138 .iter()
5139 .map(|p| {
5140 (
5141 p.worktree_root_names[0].as_str(),
5142 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
5143 )
5144 })
5145 .collect();
5146 (contact.user.github_login.as_str(), contact.online, projects)
5147 })
5148 .collect()
5149 }
5150 }
5151
5152 #[gpui::test(iterations = 10)]
5153 async fn test_contact_requests(
5154 executor: Arc<Deterministic>,
5155 cx_a: &mut TestAppContext,
5156 cx_a2: &mut TestAppContext,
5157 cx_b: &mut TestAppContext,
5158 cx_b2: &mut TestAppContext,
5159 cx_c: &mut TestAppContext,
5160 cx_c2: &mut TestAppContext,
5161 ) {
5162 cx_a.foreground().forbid_parking();
5163
5164 // Connect to a server as 3 clients.
5165 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5166 let client_a = server.create_client(cx_a, "user_a").await;
5167 let client_a2 = server.create_client(cx_a2, "user_a").await;
5168 let client_b = server.create_client(cx_b, "user_b").await;
5169 let client_b2 = server.create_client(cx_b2, "user_b").await;
5170 let client_c = server.create_client(cx_c, "user_c").await;
5171 let client_c2 = server.create_client(cx_c2, "user_c").await;
5172
5173 assert_eq!(client_a.user_id().unwrap(), client_a2.user_id().unwrap());
5174 assert_eq!(client_b.user_id().unwrap(), client_b2.user_id().unwrap());
5175 assert_eq!(client_c.user_id().unwrap(), client_c2.user_id().unwrap());
5176
5177 // User A and User C request that user B become their contact.
5178 client_a
5179 .user_store
5180 .update(cx_a, |store, cx| {
5181 store.request_contact(client_b.user_id().unwrap(), cx)
5182 })
5183 .await
5184 .unwrap();
5185 client_c
5186 .user_store
5187 .update(cx_c, |store, cx| {
5188 store.request_contact(client_b.user_id().unwrap(), cx)
5189 })
5190 .await
5191 .unwrap();
5192 executor.run_until_parked();
5193
5194 // All users see the pending request appear in all their clients.
5195 assert_eq!(
5196 client_a.summarize_contacts(&cx_a).outgoing_requests,
5197 &["user_b"]
5198 );
5199 assert_eq!(
5200 client_a2.summarize_contacts(&cx_a2).outgoing_requests,
5201 &["user_b"]
5202 );
5203 assert_eq!(
5204 client_b.summarize_contacts(&cx_b).incoming_requests,
5205 &["user_a", "user_c"]
5206 );
5207 assert_eq!(
5208 client_b2.summarize_contacts(&cx_b2).incoming_requests,
5209 &["user_a", "user_c"]
5210 );
5211 assert_eq!(
5212 client_c.summarize_contacts(&cx_c).outgoing_requests,
5213 &["user_b"]
5214 );
5215 assert_eq!(
5216 client_c2.summarize_contacts(&cx_c2).outgoing_requests,
5217 &["user_b"]
5218 );
5219
5220 // Contact requests are present upon connecting (tested here via disconnect/reconnect)
5221 disconnect_and_reconnect(&client_a, cx_a).await;
5222 disconnect_and_reconnect(&client_b, cx_b).await;
5223 disconnect_and_reconnect(&client_c, cx_c).await;
5224 executor.run_until_parked();
5225 assert_eq!(
5226 client_a.summarize_contacts(&cx_a).outgoing_requests,
5227 &["user_b"]
5228 );
5229 assert_eq!(
5230 client_b.summarize_contacts(&cx_b).incoming_requests,
5231 &["user_a", "user_c"]
5232 );
5233 assert_eq!(
5234 client_c.summarize_contacts(&cx_c).outgoing_requests,
5235 &["user_b"]
5236 );
5237
5238 // User B accepts the request from user A.
5239 client_b
5240 .user_store
5241 .update(cx_b, |store, cx| {
5242 store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
5243 })
5244 .await
5245 .unwrap();
5246
5247 executor.run_until_parked();
5248
5249 // User B sees user A as their contact now in all client, and the incoming request from them is removed.
5250 let contacts_b = client_b.summarize_contacts(&cx_b);
5251 assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5252 assert_eq!(contacts_b.incoming_requests, &["user_c"]);
5253 let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5254 assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5255 assert_eq!(contacts_b2.incoming_requests, &["user_c"]);
5256
5257 // User A sees user B as their contact now in all clients, and the outgoing request to them is removed.
5258 let contacts_a = client_a.summarize_contacts(&cx_a);
5259 assert_eq!(contacts_a.current, &["user_a", "user_b"]);
5260 assert!(contacts_a.outgoing_requests.is_empty());
5261 let contacts_a2 = client_a2.summarize_contacts(&cx_a2);
5262 assert_eq!(contacts_a2.current, &["user_a", "user_b"]);
5263 assert!(contacts_a2.outgoing_requests.is_empty());
5264
5265 // Contacts are present upon connecting (tested here via disconnect/reconnect)
5266 disconnect_and_reconnect(&client_a, cx_a).await;
5267 disconnect_and_reconnect(&client_b, cx_b).await;
5268 disconnect_and_reconnect(&client_c, cx_c).await;
5269 executor.run_until_parked();
5270 assert_eq!(
5271 client_a.summarize_contacts(&cx_a).current,
5272 &["user_a", "user_b"]
5273 );
5274 assert_eq!(
5275 client_b.summarize_contacts(&cx_b).current,
5276 &["user_a", "user_b"]
5277 );
5278 assert_eq!(
5279 client_b.summarize_contacts(&cx_b).incoming_requests,
5280 &["user_c"]
5281 );
5282 assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5283 assert_eq!(
5284 client_c.summarize_contacts(&cx_c).outgoing_requests,
5285 &["user_b"]
5286 );
5287
5288 // User B rejects the request from user C.
5289 client_b
5290 .user_store
5291 .update(cx_b, |store, cx| {
5292 store.respond_to_contact_request(client_c.user_id().unwrap(), false, cx)
5293 })
5294 .await
5295 .unwrap();
5296
5297 executor.run_until_parked();
5298
5299 // User B doesn't see user C as their contact, and the incoming request from them is removed.
5300 let contacts_b = client_b.summarize_contacts(&cx_b);
5301 assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5302 assert!(contacts_b.incoming_requests.is_empty());
5303 let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5304 assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5305 assert!(contacts_b2.incoming_requests.is_empty());
5306
5307 // User C doesn't see user B as their contact, and the outgoing request to them is removed.
5308 let contacts_c = client_c.summarize_contacts(&cx_c);
5309 assert_eq!(contacts_c.current, &["user_c"]);
5310 assert!(contacts_c.outgoing_requests.is_empty());
5311 let contacts_c2 = client_c2.summarize_contacts(&cx_c2);
5312 assert_eq!(contacts_c2.current, &["user_c"]);
5313 assert!(contacts_c2.outgoing_requests.is_empty());
5314
5315 // Incoming/outgoing requests are not present upon connecting (tested here via disconnect/reconnect)
5316 disconnect_and_reconnect(&client_a, cx_a).await;
5317 disconnect_and_reconnect(&client_b, cx_b).await;
5318 disconnect_and_reconnect(&client_c, cx_c).await;
5319 executor.run_until_parked();
5320 assert_eq!(
5321 client_a.summarize_contacts(&cx_a).current,
5322 &["user_a", "user_b"]
5323 );
5324 assert_eq!(
5325 client_b.summarize_contacts(&cx_b).current,
5326 &["user_a", "user_b"]
5327 );
5328 assert!(client_b
5329 .summarize_contacts(&cx_b)
5330 .incoming_requests
5331 .is_empty());
5332 assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5333 assert!(client_c
5334 .summarize_contacts(&cx_c)
5335 .outgoing_requests
5336 .is_empty());
5337
5338 async fn disconnect_and_reconnect(client: &TestClient, cx: &mut TestAppContext) {
5339 client.disconnect(&cx.to_async()).unwrap();
5340 client.clear_contacts(cx).await;
5341 client
5342 .authenticate_and_connect(false, &cx.to_async())
5343 .await
5344 .unwrap();
5345 }
5346 }
5347
5348 #[gpui::test(iterations = 10)]
5349 async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5350 cx_a.foreground().forbid_parking();
5351 let fs = FakeFs::new(cx_a.background());
5352
5353 // 2 clients connect to a server.
5354 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5355 let mut client_a = server.create_client(cx_a, "user_a").await;
5356 let mut client_b = server.create_client(cx_b, "user_b").await;
5357 server
5358 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5359 .await;
5360 cx_a.update(editor::init);
5361 cx_b.update(editor::init);
5362
5363 // Client A shares a project.
5364 fs.insert_tree(
5365 "/a",
5366 json!({
5367 "1.txt": "one",
5368 "2.txt": "two",
5369 "3.txt": "three",
5370 }),
5371 )
5372 .await;
5373 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5374
5375 // Client B joins the project.
5376 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5377
5378 // Client A opens some editors.
5379 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5380 let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5381 let editor_a1 = workspace_a
5382 .update(cx_a, |workspace, cx| {
5383 workspace.open_path((worktree_id, "1.txt"), true, cx)
5384 })
5385 .await
5386 .unwrap()
5387 .downcast::<Editor>()
5388 .unwrap();
5389 let editor_a2 = workspace_a
5390 .update(cx_a, |workspace, cx| {
5391 workspace.open_path((worktree_id, "2.txt"), true, cx)
5392 })
5393 .await
5394 .unwrap()
5395 .downcast::<Editor>()
5396 .unwrap();
5397
5398 // Client B opens an editor.
5399 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5400 let editor_b1 = workspace_b
5401 .update(cx_b, |workspace, cx| {
5402 workspace.open_path((worktree_id, "1.txt"), true, cx)
5403 })
5404 .await
5405 .unwrap()
5406 .downcast::<Editor>()
5407 .unwrap();
5408
5409 let client_a_id = project_b.read_with(cx_b, |project, _| {
5410 project.collaborators().values().next().unwrap().peer_id
5411 });
5412 let client_b_id = project_a.read_with(cx_a, |project, _| {
5413 project.collaborators().values().next().unwrap().peer_id
5414 });
5415
5416 // When client B starts following client A, all visible view states are replicated to client B.
5417 editor_a1.update(cx_a, |editor, cx| editor.select_ranges([0..1], None, cx));
5418 editor_a2.update(cx_a, |editor, cx| editor.select_ranges([2..3], None, cx));
5419 workspace_b
5420 .update(cx_b, |workspace, cx| {
5421 workspace
5422 .toggle_follow(&ToggleFollow(client_a_id), cx)
5423 .unwrap()
5424 })
5425 .await
5426 .unwrap();
5427
5428 let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5429 workspace
5430 .active_item(cx)
5431 .unwrap()
5432 .downcast::<Editor>()
5433 .unwrap()
5434 });
5435 assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
5436 assert_eq!(
5437 editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
5438 Some((worktree_id, "2.txt").into())
5439 );
5440 assert_eq!(
5441 editor_b2.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
5442 vec![2..3]
5443 );
5444 assert_eq!(
5445 editor_b1.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
5446 vec![0..1]
5447 );
5448
5449 // When client A activates a different editor, client B does so as well.
5450 workspace_a.update(cx_a, |workspace, cx| {
5451 workspace.activate_item(&editor_a1, cx)
5452 });
5453 workspace_b
5454 .condition(cx_b, |workspace, cx| {
5455 workspace.active_item(cx).unwrap().id() == editor_b1.id()
5456 })
5457 .await;
5458
5459 // When client A navigates back and forth, client B does so as well.
5460 workspace_a
5461 .update(cx_a, |workspace, cx| {
5462 workspace::Pane::go_back(workspace, None, cx)
5463 })
5464 .await;
5465 workspace_b
5466 .condition(cx_b, |workspace, cx| {
5467 workspace.active_item(cx).unwrap().id() == editor_b2.id()
5468 })
5469 .await;
5470
5471 workspace_a
5472 .update(cx_a, |workspace, cx| {
5473 workspace::Pane::go_forward(workspace, None, cx)
5474 })
5475 .await;
5476 workspace_b
5477 .condition(cx_b, |workspace, cx| {
5478 workspace.active_item(cx).unwrap().id() == editor_b1.id()
5479 })
5480 .await;
5481
5482 // Changes to client A's editor are reflected on client B.
5483 editor_a1.update(cx_a, |editor, cx| {
5484 editor.select_ranges([1..1, 2..2], None, cx);
5485 });
5486 editor_b1
5487 .condition(cx_b, |editor, cx| {
5488 editor.selected_ranges(cx) == vec![1..1, 2..2]
5489 })
5490 .await;
5491
5492 editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
5493 editor_b1
5494 .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
5495 .await;
5496
5497 editor_a1.update(cx_a, |editor, cx| {
5498 editor.select_ranges([3..3], None, cx);
5499 editor.set_scroll_position(vec2f(0., 100.), cx);
5500 });
5501 editor_b1
5502 .condition(cx_b, |editor, cx| editor.selected_ranges(cx) == vec![3..3])
5503 .await;
5504
5505 // After unfollowing, client B stops receiving updates from client A.
5506 workspace_b.update(cx_b, |workspace, cx| {
5507 workspace.unfollow(&workspace.active_pane().clone(), cx)
5508 });
5509 workspace_a.update(cx_a, |workspace, cx| {
5510 workspace.activate_item(&editor_a2, cx)
5511 });
5512 cx_a.foreground().run_until_parked();
5513 assert_eq!(
5514 workspace_b.read_with(cx_b, |workspace, cx| workspace
5515 .active_item(cx)
5516 .unwrap()
5517 .id()),
5518 editor_b1.id()
5519 );
5520
5521 // Client A starts following client B.
5522 workspace_a
5523 .update(cx_a, |workspace, cx| {
5524 workspace
5525 .toggle_follow(&ToggleFollow(client_b_id), cx)
5526 .unwrap()
5527 })
5528 .await
5529 .unwrap();
5530 assert_eq!(
5531 workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5532 Some(client_b_id)
5533 );
5534 assert_eq!(
5535 workspace_a.read_with(cx_a, |workspace, cx| workspace
5536 .active_item(cx)
5537 .unwrap()
5538 .id()),
5539 editor_a1.id()
5540 );
5541
5542 // Following interrupts when client B disconnects.
5543 client_b.disconnect(&cx_b.to_async()).unwrap();
5544 cx_a.foreground().run_until_parked();
5545 assert_eq!(
5546 workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5547 None
5548 );
5549 }
5550
5551 #[gpui::test(iterations = 10)]
5552 async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5553 cx_a.foreground().forbid_parking();
5554 let fs = FakeFs::new(cx_a.background());
5555
5556 // 2 clients connect to a server.
5557 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5558 let mut client_a = server.create_client(cx_a, "user_a").await;
5559 let mut client_b = server.create_client(cx_b, "user_b").await;
5560 server
5561 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5562 .await;
5563 cx_a.update(editor::init);
5564 cx_b.update(editor::init);
5565
5566 // Client A shares a project.
5567 fs.insert_tree(
5568 "/a",
5569 json!({
5570 "1.txt": "one",
5571 "2.txt": "two",
5572 "3.txt": "three",
5573 "4.txt": "four",
5574 }),
5575 )
5576 .await;
5577 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5578
5579 // Client B joins the project.
5580 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5581
5582 // Client A opens some editors.
5583 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5584 let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5585 let _editor_a1 = workspace_a
5586 .update(cx_a, |workspace, cx| {
5587 workspace.open_path((worktree_id, "1.txt"), true, cx)
5588 })
5589 .await
5590 .unwrap()
5591 .downcast::<Editor>()
5592 .unwrap();
5593
5594 // Client B opens an editor.
5595 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5596 let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5597 let _editor_b1 = workspace_b
5598 .update(cx_b, |workspace, cx| {
5599 workspace.open_path((worktree_id, "2.txt"), true, cx)
5600 })
5601 .await
5602 .unwrap()
5603 .downcast::<Editor>()
5604 .unwrap();
5605
5606 // Clients A and B follow each other in split panes
5607 workspace_a
5608 .update(cx_a, |workspace, cx| {
5609 workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5610 assert_ne!(*workspace.active_pane(), pane_a1);
5611 let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
5612 workspace
5613 .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5614 .unwrap()
5615 })
5616 .await
5617 .unwrap();
5618 workspace_b
5619 .update(cx_b, |workspace, cx| {
5620 workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5621 assert_ne!(*workspace.active_pane(), pane_b1);
5622 let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
5623 workspace
5624 .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5625 .unwrap()
5626 })
5627 .await
5628 .unwrap();
5629
5630 workspace_a
5631 .update(cx_a, |workspace, cx| {
5632 workspace.activate_next_pane(cx);
5633 assert_eq!(*workspace.active_pane(), pane_a1);
5634 workspace.open_path((worktree_id, "3.txt"), true, cx)
5635 })
5636 .await
5637 .unwrap();
5638 workspace_b
5639 .update(cx_b, |workspace, cx| {
5640 workspace.activate_next_pane(cx);
5641 assert_eq!(*workspace.active_pane(), pane_b1);
5642 workspace.open_path((worktree_id, "4.txt"), true, cx)
5643 })
5644 .await
5645 .unwrap();
5646 cx_a.foreground().run_until_parked();
5647
5648 // Ensure leader updates don't change the active pane of followers
5649 workspace_a.read_with(cx_a, |workspace, _| {
5650 assert_eq!(*workspace.active_pane(), pane_a1);
5651 });
5652 workspace_b.read_with(cx_b, |workspace, _| {
5653 assert_eq!(*workspace.active_pane(), pane_b1);
5654 });
5655
5656 // Ensure peers following each other doesn't cause an infinite loop.
5657 assert_eq!(
5658 workspace_a.read_with(cx_a, |workspace, cx| workspace
5659 .active_item(cx)
5660 .unwrap()
5661 .project_path(cx)),
5662 Some((worktree_id, "3.txt").into())
5663 );
5664 workspace_a.update(cx_a, |workspace, cx| {
5665 assert_eq!(
5666 workspace.active_item(cx).unwrap().project_path(cx),
5667 Some((worktree_id, "3.txt").into())
5668 );
5669 workspace.activate_next_pane(cx);
5670 assert_eq!(
5671 workspace.active_item(cx).unwrap().project_path(cx),
5672 Some((worktree_id, "4.txt").into())
5673 );
5674 });
5675 workspace_b.update(cx_b, |workspace, cx| {
5676 assert_eq!(
5677 workspace.active_item(cx).unwrap().project_path(cx),
5678 Some((worktree_id, "4.txt").into())
5679 );
5680 workspace.activate_next_pane(cx);
5681 assert_eq!(
5682 workspace.active_item(cx).unwrap().project_path(cx),
5683 Some((worktree_id, "3.txt").into())
5684 );
5685 });
5686 }
5687
5688 #[gpui::test(iterations = 10)]
5689 async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5690 cx_a.foreground().forbid_parking();
5691 let fs = FakeFs::new(cx_a.background());
5692
5693 // 2 clients connect to a server.
5694 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5695 let mut client_a = server.create_client(cx_a, "user_a").await;
5696 let mut client_b = server.create_client(cx_b, "user_b").await;
5697 server
5698 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5699 .await;
5700 cx_a.update(editor::init);
5701 cx_b.update(editor::init);
5702
5703 // Client A shares a project.
5704 fs.insert_tree(
5705 "/a",
5706 json!({
5707 "1.txt": "one",
5708 "2.txt": "two",
5709 "3.txt": "three",
5710 }),
5711 )
5712 .await;
5713 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5714
5715 // Client B joins the project.
5716 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5717
5718 // Client A opens some editors.
5719 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5720 let _editor_a1 = workspace_a
5721 .update(cx_a, |workspace, cx| {
5722 workspace.open_path((worktree_id, "1.txt"), true, cx)
5723 })
5724 .await
5725 .unwrap()
5726 .downcast::<Editor>()
5727 .unwrap();
5728
5729 // Client B starts following client A.
5730 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5731 let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5732 let leader_id = project_b.read_with(cx_b, |project, _| {
5733 project.collaborators().values().next().unwrap().peer_id
5734 });
5735 workspace_b
5736 .update(cx_b, |workspace, cx| {
5737 workspace
5738 .toggle_follow(&ToggleFollow(leader_id), cx)
5739 .unwrap()
5740 })
5741 .await
5742 .unwrap();
5743 assert_eq!(
5744 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5745 Some(leader_id)
5746 );
5747 let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5748 workspace
5749 .active_item(cx)
5750 .unwrap()
5751 .downcast::<Editor>()
5752 .unwrap()
5753 });
5754
5755 // When client B moves, it automatically stops following client A.
5756 editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
5757 assert_eq!(
5758 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5759 None
5760 );
5761
5762 workspace_b
5763 .update(cx_b, |workspace, cx| {
5764 workspace
5765 .toggle_follow(&ToggleFollow(leader_id), cx)
5766 .unwrap()
5767 })
5768 .await
5769 .unwrap();
5770 assert_eq!(
5771 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5772 Some(leader_id)
5773 );
5774
5775 // When client B edits, it automatically stops following client A.
5776 editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
5777 assert_eq!(
5778 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5779 None
5780 );
5781
5782 workspace_b
5783 .update(cx_b, |workspace, cx| {
5784 workspace
5785 .toggle_follow(&ToggleFollow(leader_id), cx)
5786 .unwrap()
5787 })
5788 .await
5789 .unwrap();
5790 assert_eq!(
5791 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5792 Some(leader_id)
5793 );
5794
5795 // When client B scrolls, it automatically stops following client A.
5796 editor_b2.update(cx_b, |editor, cx| {
5797 editor.set_scroll_position(vec2f(0., 3.), cx)
5798 });
5799 assert_eq!(
5800 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5801 None
5802 );
5803
5804 workspace_b
5805 .update(cx_b, |workspace, cx| {
5806 workspace
5807 .toggle_follow(&ToggleFollow(leader_id), cx)
5808 .unwrap()
5809 })
5810 .await
5811 .unwrap();
5812 assert_eq!(
5813 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5814 Some(leader_id)
5815 );
5816
5817 // When client B activates a different pane, it continues following client A in the original pane.
5818 workspace_b.update(cx_b, |workspace, cx| {
5819 workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
5820 });
5821 assert_eq!(
5822 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5823 Some(leader_id)
5824 );
5825
5826 workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
5827 assert_eq!(
5828 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5829 Some(leader_id)
5830 );
5831
5832 // When client B activates a different item in the original pane, it automatically stops following client A.
5833 workspace_b
5834 .update(cx_b, |workspace, cx| {
5835 workspace.open_path((worktree_id, "2.txt"), true, cx)
5836 })
5837 .await
5838 .unwrap();
5839 assert_eq!(
5840 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5841 None
5842 );
5843 }
5844
5845 #[gpui::test(iterations = 100)]
5846 async fn test_random_collaboration(
5847 cx: &mut TestAppContext,
5848 deterministic: Arc<Deterministic>,
5849 rng: StdRng,
5850 ) {
5851 cx.foreground().forbid_parking();
5852 let max_peers = env::var("MAX_PEERS")
5853 .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
5854 .unwrap_or(5);
5855 assert!(max_peers <= 5);
5856
5857 let max_operations = env::var("OPERATIONS")
5858 .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
5859 .unwrap_or(10);
5860
5861 let rng = Arc::new(Mutex::new(rng));
5862
5863 let guest_lang_registry = Arc::new(LanguageRegistry::test());
5864 let host_language_registry = Arc::new(LanguageRegistry::test());
5865
5866 let fs = FakeFs::new(cx.background());
5867 fs.insert_tree("/_collab", json!({"init": ""})).await;
5868
5869 let mut server = TestServer::start(cx.foreground(), cx.background()).await;
5870 let db = server.app_state.db.clone();
5871 let host_user_id = db.create_user("host", false).await.unwrap();
5872 for username in ["guest-1", "guest-2", "guest-3", "guest-4"] {
5873 let guest_user_id = db.create_user(username, false).await.unwrap();
5874 server
5875 .app_state
5876 .db
5877 .send_contact_request(guest_user_id, host_user_id)
5878 .await
5879 .unwrap();
5880 server
5881 .app_state
5882 .db
5883 .respond_to_contact_request(host_user_id, guest_user_id, true)
5884 .await
5885 .unwrap();
5886 }
5887
5888 let mut clients = Vec::new();
5889 let mut user_ids = Vec::new();
5890 let mut op_start_signals = Vec::new();
5891
5892 let mut next_entity_id = 100000;
5893 let mut host_cx = TestAppContext::new(
5894 cx.foreground_platform(),
5895 cx.platform(),
5896 deterministic.build_foreground(next_entity_id),
5897 deterministic.build_background(),
5898 cx.font_cache(),
5899 cx.leak_detector(),
5900 next_entity_id,
5901 );
5902 let host = server.create_client(&mut host_cx, "host").await;
5903 let host_project = host_cx.update(|cx| {
5904 Project::local(
5905 host.client.clone(),
5906 host.user_store.clone(),
5907 host_language_registry.clone(),
5908 fs.clone(),
5909 cx,
5910 )
5911 });
5912 let host_project_id = host_project
5913 .update(&mut host_cx, |p, _| p.next_remote_id())
5914 .await;
5915
5916 let (collab_worktree, _) = host_project
5917 .update(&mut host_cx, |project, cx| {
5918 project.find_or_create_local_worktree("/_collab", true, cx)
5919 })
5920 .await
5921 .unwrap();
5922 collab_worktree
5923 .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
5924 .await;
5925
5926 // Set up fake language servers.
5927 let mut language = Language::new(
5928 LanguageConfig {
5929 name: "Rust".into(),
5930 path_suffixes: vec!["rs".to_string()],
5931 ..Default::default()
5932 },
5933 None,
5934 );
5935 let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
5936 name: "the-fake-language-server",
5937 capabilities: lsp::LanguageServer::full_capabilities(),
5938 initializer: Some(Box::new({
5939 let rng = rng.clone();
5940 let fs = fs.clone();
5941 let project = host_project.downgrade();
5942 move |fake_server: &mut FakeLanguageServer| {
5943 fake_server.handle_request::<lsp::request::Completion, _, _>(
5944 |_, _| async move {
5945 Ok(Some(lsp::CompletionResponse::Array(vec![
5946 lsp::CompletionItem {
5947 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
5948 range: lsp::Range::new(
5949 lsp::Position::new(0, 0),
5950 lsp::Position::new(0, 0),
5951 ),
5952 new_text: "the-new-text".to_string(),
5953 })),
5954 ..Default::default()
5955 },
5956 ])))
5957 },
5958 );
5959
5960 fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
5961 |_, _| async move {
5962 Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
5963 lsp::CodeAction {
5964 title: "the-code-action".to_string(),
5965 ..Default::default()
5966 },
5967 )]))
5968 },
5969 );
5970
5971 fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
5972 |params, _| async move {
5973 Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
5974 params.position,
5975 params.position,
5976 ))))
5977 },
5978 );
5979
5980 fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
5981 let fs = fs.clone();
5982 let rng = rng.clone();
5983 move |_, _| {
5984 let fs = fs.clone();
5985 let rng = rng.clone();
5986 async move {
5987 let files = fs.files().await;
5988 let mut rng = rng.lock();
5989 let count = rng.gen_range::<usize, _>(1..3);
5990 let files = (0..count)
5991 .map(|_| files.choose(&mut *rng).unwrap())
5992 .collect::<Vec<_>>();
5993 log::info!("LSP: Returning definitions in files {:?}", &files);
5994 Ok(Some(lsp::GotoDefinitionResponse::Array(
5995 files
5996 .into_iter()
5997 .map(|file| lsp::Location {
5998 uri: lsp::Url::from_file_path(file).unwrap(),
5999 range: Default::default(),
6000 })
6001 .collect(),
6002 )))
6003 }
6004 }
6005 });
6006
6007 fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
6008 let rng = rng.clone();
6009 let project = project.clone();
6010 move |params, mut cx| {
6011 let highlights = if let Some(project) = project.upgrade(&cx) {
6012 project.update(&mut cx, |project, cx| {
6013 let path = params
6014 .text_document_position_params
6015 .text_document
6016 .uri
6017 .to_file_path()
6018 .unwrap();
6019 let (worktree, relative_path) =
6020 project.find_local_worktree(&path, cx)?;
6021 let project_path =
6022 ProjectPath::from((worktree.read(cx).id(), relative_path));
6023 let buffer =
6024 project.get_open_buffer(&project_path, cx)?.read(cx);
6025
6026 let mut highlights = Vec::new();
6027 let highlight_count = rng.lock().gen_range(1..=5);
6028 let mut prev_end = 0;
6029 for _ in 0..highlight_count {
6030 let range =
6031 buffer.random_byte_range(prev_end, &mut *rng.lock());
6032
6033 highlights.push(lsp::DocumentHighlight {
6034 range: range_to_lsp(range.to_point_utf16(buffer)),
6035 kind: Some(lsp::DocumentHighlightKind::READ),
6036 });
6037 prev_end = range.end;
6038 }
6039 Some(highlights)
6040 })
6041 } else {
6042 None
6043 };
6044 async move { Ok(highlights) }
6045 }
6046 });
6047 }
6048 })),
6049 ..Default::default()
6050 });
6051 host_language_registry.add(Arc::new(language));
6052
6053 let op_start_signal = futures::channel::mpsc::unbounded();
6054 user_ids.push(host.current_user_id(&host_cx));
6055 op_start_signals.push(op_start_signal.0);
6056 clients.push(host_cx.foreground().spawn(host.simulate_host(
6057 host_project,
6058 op_start_signal.1,
6059 rng.clone(),
6060 host_cx,
6061 )));
6062
6063 let disconnect_host_at = if rng.lock().gen_bool(0.2) {
6064 rng.lock().gen_range(0..max_operations)
6065 } else {
6066 max_operations
6067 };
6068 let mut available_guests = vec![
6069 "guest-1".to_string(),
6070 "guest-2".to_string(),
6071 "guest-3".to_string(),
6072 "guest-4".to_string(),
6073 ];
6074 let mut operations = 0;
6075 while operations < max_operations {
6076 if operations == disconnect_host_at {
6077 server.disconnect_client(user_ids[0]);
6078 cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6079 drop(op_start_signals);
6080 let mut clients = futures::future::join_all(clients).await;
6081 cx.foreground().run_until_parked();
6082
6083 let (host, mut host_cx, host_err) = clients.remove(0);
6084 if let Some(host_err) = host_err {
6085 log::error!("host error - {:?}", host_err);
6086 }
6087 host.project
6088 .as_ref()
6089 .unwrap()
6090 .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
6091 for (guest, mut guest_cx, guest_err) in clients {
6092 if let Some(guest_err) = guest_err {
6093 log::error!("{} error - {:?}", guest.username, guest_err);
6094 }
6095
6096 let contacts = server
6097 .app_state
6098 .db
6099 .get_contacts(guest.current_user_id(&guest_cx))
6100 .await
6101 .unwrap();
6102 let contacts = server
6103 .store
6104 .read()
6105 .await
6106 .build_initial_contacts_update(contacts)
6107 .contacts;
6108 assert!(!contacts
6109 .iter()
6110 .flat_map(|contact| &contact.projects)
6111 .any(|project| project.id == host_project_id));
6112 guest
6113 .project
6114 .as_ref()
6115 .unwrap()
6116 .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6117 guest_cx.update(|_| drop(guest));
6118 }
6119 host_cx.update(|_| drop(host));
6120
6121 return;
6122 }
6123
6124 let distribution = rng.lock().gen_range(0..100);
6125 match distribution {
6126 0..=19 if !available_guests.is_empty() => {
6127 let guest_ix = rng.lock().gen_range(0..available_guests.len());
6128 let guest_username = available_guests.remove(guest_ix);
6129 log::info!("Adding new connection for {}", guest_username);
6130 next_entity_id += 100000;
6131 let mut guest_cx = TestAppContext::new(
6132 cx.foreground_platform(),
6133 cx.platform(),
6134 deterministic.build_foreground(next_entity_id),
6135 deterministic.build_background(),
6136 cx.font_cache(),
6137 cx.leak_detector(),
6138 next_entity_id,
6139 );
6140 let guest = server.create_client(&mut guest_cx, &guest_username).await;
6141 let guest_project = Project::remote(
6142 host_project_id,
6143 guest.client.clone(),
6144 guest.user_store.clone(),
6145 guest_lang_registry.clone(),
6146 FakeFs::new(cx.background()),
6147 &mut guest_cx.to_async(),
6148 )
6149 .await
6150 .unwrap();
6151 let op_start_signal = futures::channel::mpsc::unbounded();
6152 user_ids.push(guest.current_user_id(&guest_cx));
6153 op_start_signals.push(op_start_signal.0);
6154 clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
6155 guest_username.clone(),
6156 guest_project,
6157 op_start_signal.1,
6158 rng.clone(),
6159 guest_cx,
6160 )));
6161
6162 log::info!("Added connection for {}", guest_username);
6163 operations += 1;
6164 }
6165 20..=29 if clients.len() > 1 => {
6166 let guest_ix = rng.lock().gen_range(1..clients.len());
6167 log::info!("Removing guest {}", user_ids[guest_ix]);
6168 let removed_guest_id = user_ids.remove(guest_ix);
6169 let guest = clients.remove(guest_ix);
6170 op_start_signals.remove(guest_ix);
6171 server.forbid_connections();
6172 server.disconnect_client(removed_guest_id);
6173 cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6174 let (guest, mut guest_cx, guest_err) = guest.await;
6175 server.allow_connections();
6176
6177 if let Some(guest_err) = guest_err {
6178 log::error!("{} error - {:?}", guest.username, guest_err);
6179 }
6180 guest
6181 .project
6182 .as_ref()
6183 .unwrap()
6184 .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6185 for user_id in &user_ids {
6186 let contacts = server.app_state.db.get_contacts(*user_id).await.unwrap();
6187 let contacts = server
6188 .store
6189 .read()
6190 .await
6191 .build_initial_contacts_update(contacts)
6192 .contacts;
6193 for contact in contacts {
6194 if contact.online {
6195 assert_ne!(
6196 contact.user_id, removed_guest_id.0 as u64,
6197 "removed guest is still a contact of another peer"
6198 );
6199 }
6200 for project in contact.projects {
6201 for project_guest_id in project.guests {
6202 assert_ne!(
6203 project_guest_id, removed_guest_id.0 as u64,
6204 "removed guest appears as still participating on a project"
6205 );
6206 }
6207 }
6208 }
6209 }
6210
6211 log::info!("{} removed", guest.username);
6212 available_guests.push(guest.username.clone());
6213 guest_cx.update(|_| drop(guest));
6214
6215 operations += 1;
6216 }
6217 _ => {
6218 while operations < max_operations && rng.lock().gen_bool(0.7) {
6219 op_start_signals
6220 .choose(&mut *rng.lock())
6221 .unwrap()
6222 .unbounded_send(())
6223 .unwrap();
6224 operations += 1;
6225 }
6226
6227 if rng.lock().gen_bool(0.8) {
6228 cx.foreground().run_until_parked();
6229 }
6230 }
6231 }
6232 }
6233
6234 drop(op_start_signals);
6235 let mut clients = futures::future::join_all(clients).await;
6236 cx.foreground().run_until_parked();
6237
6238 let (host_client, mut host_cx, host_err) = clients.remove(0);
6239 if let Some(host_err) = host_err {
6240 panic!("host error - {:?}", host_err);
6241 }
6242 let host_project = host_client.project.as_ref().unwrap();
6243 let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
6244 project
6245 .worktrees(cx)
6246 .map(|worktree| {
6247 let snapshot = worktree.read(cx).snapshot();
6248 (snapshot.id(), snapshot)
6249 })
6250 .collect::<BTreeMap<_, _>>()
6251 });
6252
6253 host_client
6254 .project
6255 .as_ref()
6256 .unwrap()
6257 .read_with(&host_cx, |project, cx| project.check_invariants(cx));
6258
6259 for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
6260 if let Some(guest_err) = guest_err {
6261 panic!("{} error - {:?}", guest_client.username, guest_err);
6262 }
6263 let worktree_snapshots =
6264 guest_client
6265 .project
6266 .as_ref()
6267 .unwrap()
6268 .read_with(&guest_cx, |project, cx| {
6269 project
6270 .worktrees(cx)
6271 .map(|worktree| {
6272 let worktree = worktree.read(cx);
6273 (worktree.id(), worktree.snapshot())
6274 })
6275 .collect::<BTreeMap<_, _>>()
6276 });
6277
6278 assert_eq!(
6279 worktree_snapshots.keys().collect::<Vec<_>>(),
6280 host_worktree_snapshots.keys().collect::<Vec<_>>(),
6281 "{} has different worktrees than the host",
6282 guest_client.username
6283 );
6284 for (id, host_snapshot) in &host_worktree_snapshots {
6285 let guest_snapshot = &worktree_snapshots[id];
6286 assert_eq!(
6287 guest_snapshot.root_name(),
6288 host_snapshot.root_name(),
6289 "{} has different root name than the host for worktree {}",
6290 guest_client.username,
6291 id
6292 );
6293 assert_eq!(
6294 guest_snapshot.entries(false).collect::<Vec<_>>(),
6295 host_snapshot.entries(false).collect::<Vec<_>>(),
6296 "{} has different snapshot than the host for worktree {}",
6297 guest_client.username,
6298 id
6299 );
6300 assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id());
6301 }
6302
6303 guest_client
6304 .project
6305 .as_ref()
6306 .unwrap()
6307 .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
6308
6309 for guest_buffer in &guest_client.buffers {
6310 let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
6311 let host_buffer = host_project.read_with(&host_cx, |project, cx| {
6312 project.buffer_for_id(buffer_id, cx).expect(&format!(
6313 "host does not have buffer for guest:{}, peer:{}, id:{}",
6314 guest_client.username, guest_client.peer_id, buffer_id
6315 ))
6316 });
6317 let path = host_buffer
6318 .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
6319
6320 assert_eq!(
6321 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
6322 0,
6323 "{}, buffer {}, path {:?} has deferred operations",
6324 guest_client.username,
6325 buffer_id,
6326 path,
6327 );
6328 assert_eq!(
6329 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
6330 host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
6331 "{}, buffer {}, path {:?}, differs from the host's buffer",
6332 guest_client.username,
6333 buffer_id,
6334 path
6335 );
6336 }
6337
6338 guest_cx.update(|_| drop(guest_client));
6339 }
6340
6341 host_cx.update(|_| drop(host_client));
6342 }
6343
6344 struct TestServer {
6345 peer: Arc<Peer>,
6346 app_state: Arc<AppState>,
6347 server: Arc<Server>,
6348 foreground: Rc<executor::Foreground>,
6349 notifications: mpsc::UnboundedReceiver<()>,
6350 connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
6351 forbid_connections: Arc<AtomicBool>,
6352 _test_db: TestDb,
6353 }
6354
6355 impl TestServer {
6356 async fn start(
6357 foreground: Rc<executor::Foreground>,
6358 background: Arc<executor::Background>,
6359 ) -> Self {
6360 let test_db = TestDb::fake(background);
6361 let app_state = Self::build_app_state(&test_db).await;
6362 let peer = Peer::new();
6363 let notifications = mpsc::unbounded();
6364 let server = Server::new(app_state.clone(), Some(notifications.0));
6365 Self {
6366 peer,
6367 app_state,
6368 server,
6369 foreground,
6370 notifications: notifications.1,
6371 connection_killers: Default::default(),
6372 forbid_connections: Default::default(),
6373 _test_db: test_db,
6374 }
6375 }
6376
6377 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
6378 cx.update(|cx| {
6379 let settings = Settings::test(cx);
6380 cx.set_global(settings);
6381 });
6382
6383 let http = FakeHttpClient::with_404_response();
6384 let user_id =
6385 if let Ok(Some(user)) = self.app_state.db.get_user_by_github_login(name).await {
6386 user.id
6387 } else {
6388 self.app_state.db.create_user(name, false).await.unwrap()
6389 };
6390 let client_name = name.to_string();
6391 let mut client = Client::new(http.clone());
6392 let server = self.server.clone();
6393 let connection_killers = self.connection_killers.clone();
6394 let forbid_connections = self.forbid_connections.clone();
6395 let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
6396
6397 Arc::get_mut(&mut client)
6398 .unwrap()
6399 .override_authenticate(move |cx| {
6400 cx.spawn(|_| async move {
6401 let access_token = "the-token".to_string();
6402 Ok(Credentials {
6403 user_id: user_id.0 as u64,
6404 access_token,
6405 })
6406 })
6407 })
6408 .override_establish_connection(move |credentials, cx| {
6409 assert_eq!(credentials.user_id, user_id.0 as u64);
6410 assert_eq!(credentials.access_token, "the-token");
6411
6412 let server = server.clone();
6413 let connection_killers = connection_killers.clone();
6414 let forbid_connections = forbid_connections.clone();
6415 let client_name = client_name.clone();
6416 let connection_id_tx = connection_id_tx.clone();
6417 cx.spawn(move |cx| async move {
6418 if forbid_connections.load(SeqCst) {
6419 Err(EstablishConnectionError::other(anyhow!(
6420 "server is forbidding connections"
6421 )))
6422 } else {
6423 let (client_conn, server_conn, killed) =
6424 Connection::in_memory(cx.background());
6425 connection_killers.lock().insert(user_id, killed);
6426 cx.background()
6427 .spawn(server.handle_connection(
6428 server_conn,
6429 client_name,
6430 user_id,
6431 Some(connection_id_tx),
6432 cx.background(),
6433 ))
6434 .detach();
6435 Ok(client_conn)
6436 }
6437 })
6438 });
6439
6440 Channel::init(&client);
6441 Project::init(&client);
6442 cx.update(|cx| {
6443 workspace::init(&client, cx);
6444 });
6445
6446 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
6447 client
6448 .authenticate_and_connect(false, &cx.to_async())
6449 .await
6450 .unwrap();
6451 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
6452
6453 let client = TestClient {
6454 client,
6455 peer_id,
6456 username: name.to_string(),
6457 user_store,
6458 language_registry: Arc::new(LanguageRegistry::test()),
6459 project: Default::default(),
6460 buffers: Default::default(),
6461 };
6462 client.wait_for_current_user(cx).await;
6463 client
6464 }
6465
6466 fn disconnect_client(&self, user_id: UserId) {
6467 self.connection_killers
6468 .lock()
6469 .remove(&user_id)
6470 .unwrap()
6471 .store(true, SeqCst);
6472 }
6473
6474 fn forbid_connections(&self) {
6475 self.forbid_connections.store(true, SeqCst);
6476 }
6477
6478 fn allow_connections(&self) {
6479 self.forbid_connections.store(false, SeqCst);
6480 }
6481
6482 async fn make_contacts(&self, mut clients: Vec<(&TestClient, &mut TestAppContext)>) {
6483 while let Some((client_a, cx_a)) = clients.pop() {
6484 for (client_b, cx_b) in &mut clients {
6485 client_a
6486 .user_store
6487 .update(cx_a, |store, cx| {
6488 store.request_contact(client_b.user_id().unwrap(), cx)
6489 })
6490 .await
6491 .unwrap();
6492 cx_a.foreground().run_until_parked();
6493 client_b
6494 .user_store
6495 .update(*cx_b, |store, cx| {
6496 store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
6497 })
6498 .await
6499 .unwrap();
6500 }
6501 }
6502 }
6503
6504 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
6505 Arc::new(AppState {
6506 db: test_db.db().clone(),
6507 api_token: Default::default(),
6508 })
6509 }
6510
6511 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
6512 self.server.store.read().await
6513 }
6514
6515 async fn condition<F>(&mut self, mut predicate: F)
6516 where
6517 F: FnMut(&Store) -> bool,
6518 {
6519 assert!(
6520 self.foreground.parking_forbidden(),
6521 "you must call forbid_parking to use server conditions so we don't block indefinitely"
6522 );
6523 while !(predicate)(&*self.server.store.read().await) {
6524 self.foreground.start_waiting();
6525 self.notifications.next().await;
6526 self.foreground.finish_waiting();
6527 }
6528 }
6529 }
6530
6531 impl Deref for TestServer {
6532 type Target = Server;
6533
6534 fn deref(&self) -> &Self::Target {
6535 &self.server
6536 }
6537 }
6538
6539 impl Drop for TestServer {
6540 fn drop(&mut self) {
6541 self.peer.reset();
6542 }
6543 }
6544
6545 struct TestClient {
6546 client: Arc<Client>,
6547 username: String,
6548 pub peer_id: PeerId,
6549 pub user_store: ModelHandle<UserStore>,
6550 language_registry: Arc<LanguageRegistry>,
6551 project: Option<ModelHandle<Project>>,
6552 buffers: HashSet<ModelHandle<language::Buffer>>,
6553 }
6554
6555 impl Deref for TestClient {
6556 type Target = Arc<Client>;
6557
6558 fn deref(&self) -> &Self::Target {
6559 &self.client
6560 }
6561 }
6562
6563 struct ContactsSummary {
6564 pub current: Vec<String>,
6565 pub outgoing_requests: Vec<String>,
6566 pub incoming_requests: Vec<String>,
6567 }
6568
6569 impl TestClient {
6570 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
6571 UserId::from_proto(
6572 self.user_store
6573 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
6574 )
6575 }
6576
6577 async fn wait_for_current_user(&self, cx: &TestAppContext) {
6578 let mut authed_user = self
6579 .user_store
6580 .read_with(cx, |user_store, _| user_store.watch_current_user());
6581 while authed_user.next().await.unwrap().is_none() {}
6582 }
6583
6584 async fn clear_contacts(&self, cx: &mut TestAppContext) {
6585 self.user_store
6586 .update(cx, |store, _| store.clear_contacts())
6587 .await;
6588 }
6589
6590 fn summarize_contacts(&self, cx: &TestAppContext) -> ContactsSummary {
6591 self.user_store.read_with(cx, |store, _| ContactsSummary {
6592 current: store
6593 .contacts()
6594 .iter()
6595 .map(|contact| contact.user.github_login.clone())
6596 .collect(),
6597 outgoing_requests: store
6598 .outgoing_contact_requests()
6599 .iter()
6600 .map(|user| user.github_login.clone())
6601 .collect(),
6602 incoming_requests: store
6603 .incoming_contact_requests()
6604 .iter()
6605 .map(|user| user.github_login.clone())
6606 .collect(),
6607 })
6608 }
6609
6610 async fn build_local_project(
6611 &mut self,
6612 fs: Arc<FakeFs>,
6613 root_path: impl AsRef<Path>,
6614 cx: &mut TestAppContext,
6615 ) -> (ModelHandle<Project>, WorktreeId) {
6616 let project = cx.update(|cx| {
6617 Project::local(
6618 self.client.clone(),
6619 self.user_store.clone(),
6620 self.language_registry.clone(),
6621 fs,
6622 cx,
6623 )
6624 });
6625 self.project = Some(project.clone());
6626 let (worktree, _) = project
6627 .update(cx, |p, cx| {
6628 p.find_or_create_local_worktree(root_path, true, cx)
6629 })
6630 .await
6631 .unwrap();
6632 worktree
6633 .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
6634 .await;
6635 project
6636 .update(cx, |project, _| project.next_remote_id())
6637 .await;
6638 (project, worktree.read_with(cx, |tree, _| tree.id()))
6639 }
6640
6641 async fn build_remote_project(
6642 &mut self,
6643 host_project: &ModelHandle<Project>,
6644 host_cx: &mut TestAppContext,
6645 guest_cx: &mut TestAppContext,
6646 ) -> ModelHandle<Project> {
6647 let host_project_id = host_project
6648 .read_with(host_cx, |project, _| project.next_remote_id())
6649 .await;
6650 let guest_user_id = self.user_id().unwrap();
6651 let languages =
6652 host_project.read_with(host_cx, |project, _| project.languages().clone());
6653 let project_b = guest_cx.spawn(|mut cx| {
6654 let user_store = self.user_store.clone();
6655 let guest_client = self.client.clone();
6656 async move {
6657 Project::remote(
6658 host_project_id,
6659 guest_client,
6660 user_store.clone(),
6661 languages,
6662 FakeFs::new(cx.background()),
6663 &mut cx,
6664 )
6665 .await
6666 .unwrap()
6667 }
6668 });
6669 host_cx.foreground().run_until_parked();
6670 host_project.update(host_cx, |project, cx| {
6671 project.respond_to_join_request(guest_user_id, true, cx)
6672 });
6673 let project = project_b.await;
6674 self.project = Some(project.clone());
6675 project
6676 }
6677
6678 fn build_workspace(
6679 &self,
6680 project: &ModelHandle<Project>,
6681 cx: &mut TestAppContext,
6682 ) -> ViewHandle<Workspace> {
6683 let (window_id, _) = cx.add_window(|_| EmptyView);
6684 cx.add_view(window_id, |cx| {
6685 let fs = project.read(cx).fs().clone();
6686 Workspace::new(
6687 &WorkspaceParams {
6688 fs,
6689 project: project.clone(),
6690 user_store: self.user_store.clone(),
6691 languages: self.language_registry.clone(),
6692 themes: ThemeRegistry::new((), cx.font_cache().clone()),
6693 channel_list: cx.add_model(|cx| {
6694 ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
6695 }),
6696 client: self.client.clone(),
6697 },
6698 cx,
6699 )
6700 })
6701 }
6702
6703 async fn simulate_host(
6704 mut self,
6705 project: ModelHandle<Project>,
6706 op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6707 rng: Arc<Mutex<StdRng>>,
6708 mut cx: TestAppContext,
6709 ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6710 async fn simulate_host_internal(
6711 client: &mut TestClient,
6712 project: ModelHandle<Project>,
6713 mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6714 rng: Arc<Mutex<StdRng>>,
6715 cx: &mut TestAppContext,
6716 ) -> anyhow::Result<()> {
6717 let fs = project.read_with(cx, |project, _| project.fs().clone());
6718
6719 cx.update(|cx| {
6720 cx.subscribe(&project, move |project, event, cx| {
6721 if let project::Event::ContactRequestedJoin(user) = event {
6722 log::info!("Host: accepting join request from {}", user.github_login);
6723 project.update(cx, |project, cx| {
6724 project.respond_to_join_request(user.id, true, cx)
6725 });
6726 }
6727 })
6728 .detach();
6729 });
6730
6731 while op_start_signal.next().await.is_some() {
6732 let distribution = rng.lock().gen_range::<usize, _>(0..100);
6733 let files = fs.as_fake().files().await;
6734 match distribution {
6735 0..=19 if !files.is_empty() => {
6736 let path = files.choose(&mut *rng.lock()).unwrap();
6737 let mut path = path.as_path();
6738 while let Some(parent_path) = path.parent() {
6739 path = parent_path;
6740 if rng.lock().gen() {
6741 break;
6742 }
6743 }
6744
6745 log::info!("Host: find/create local worktree {:?}", path);
6746 let find_or_create_worktree = project.update(cx, |project, cx| {
6747 project.find_or_create_local_worktree(path, true, cx)
6748 });
6749 if rng.lock().gen() {
6750 cx.background().spawn(find_or_create_worktree).detach();
6751 } else {
6752 find_or_create_worktree.await?;
6753 }
6754 }
6755 20..=79 if !files.is_empty() => {
6756 let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6757 let file = files.choose(&mut *rng.lock()).unwrap();
6758 let (worktree, path) = project
6759 .update(cx, |project, cx| {
6760 project.find_or_create_local_worktree(
6761 file.clone(),
6762 true,
6763 cx,
6764 )
6765 })
6766 .await?;
6767 let project_path =
6768 worktree.read_with(cx, |worktree, _| (worktree.id(), path));
6769 log::info!(
6770 "Host: opening path {:?}, worktree {}, relative_path {:?}",
6771 file,
6772 project_path.0,
6773 project_path.1
6774 );
6775 let buffer = project
6776 .update(cx, |project, cx| project.open_buffer(project_path, cx))
6777 .await
6778 .unwrap();
6779 client.buffers.insert(buffer.clone());
6780 buffer
6781 } else {
6782 client
6783 .buffers
6784 .iter()
6785 .choose(&mut *rng.lock())
6786 .unwrap()
6787 .clone()
6788 };
6789
6790 if rng.lock().gen_bool(0.1) {
6791 cx.update(|cx| {
6792 log::info!(
6793 "Host: dropping buffer {:?}",
6794 buffer.read(cx).file().unwrap().full_path(cx)
6795 );
6796 client.buffers.remove(&buffer);
6797 drop(buffer);
6798 });
6799 } else {
6800 buffer.update(cx, |buffer, cx| {
6801 log::info!(
6802 "Host: updating buffer {:?} ({})",
6803 buffer.file().unwrap().full_path(cx),
6804 buffer.remote_id()
6805 );
6806
6807 if rng.lock().gen_bool(0.7) {
6808 buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6809 } else {
6810 buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6811 }
6812 });
6813 }
6814 }
6815 _ => loop {
6816 let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
6817 let mut path = PathBuf::new();
6818 path.push("/");
6819 for _ in 0..path_component_count {
6820 let letter = rng.lock().gen_range(b'a'..=b'z');
6821 path.push(std::str::from_utf8(&[letter]).unwrap());
6822 }
6823 path.set_extension("rs");
6824 let parent_path = path.parent().unwrap();
6825
6826 log::info!("Host: creating file {:?}", path,);
6827
6828 if fs.create_dir(&parent_path).await.is_ok()
6829 && fs.create_file(&path, Default::default()).await.is_ok()
6830 {
6831 break;
6832 } else {
6833 log::info!("Host: cannot create file");
6834 }
6835 },
6836 }
6837
6838 cx.background().simulate_random_delay().await;
6839 }
6840
6841 Ok(())
6842 }
6843
6844 let result =
6845 simulate_host_internal(&mut self, project.clone(), op_start_signal, rng, &mut cx)
6846 .await;
6847 log::info!("Host done");
6848 self.project = Some(project);
6849 (self, cx, result.err())
6850 }
6851
6852 pub async fn simulate_guest(
6853 mut self,
6854 guest_username: String,
6855 project: ModelHandle<Project>,
6856 op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6857 rng: Arc<Mutex<StdRng>>,
6858 mut cx: TestAppContext,
6859 ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6860 async fn simulate_guest_internal(
6861 client: &mut TestClient,
6862 guest_username: &str,
6863 project: ModelHandle<Project>,
6864 mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6865 rng: Arc<Mutex<StdRng>>,
6866 cx: &mut TestAppContext,
6867 ) -> anyhow::Result<()> {
6868 while op_start_signal.next().await.is_some() {
6869 let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6870 let worktree = if let Some(worktree) =
6871 project.read_with(cx, |project, cx| {
6872 project
6873 .worktrees(&cx)
6874 .filter(|worktree| {
6875 let worktree = worktree.read(cx);
6876 worktree.is_visible()
6877 && worktree.entries(false).any(|e| e.is_file())
6878 })
6879 .choose(&mut *rng.lock())
6880 }) {
6881 worktree
6882 } else {
6883 cx.background().simulate_random_delay().await;
6884 continue;
6885 };
6886
6887 let (worktree_root_name, project_path) =
6888 worktree.read_with(cx, |worktree, _| {
6889 let entry = worktree
6890 .entries(false)
6891 .filter(|e| e.is_file())
6892 .choose(&mut *rng.lock())
6893 .unwrap();
6894 (
6895 worktree.root_name().to_string(),
6896 (worktree.id(), entry.path.clone()),
6897 )
6898 });
6899 log::info!(
6900 "{}: opening path {:?} in worktree {} ({})",
6901 guest_username,
6902 project_path.1,
6903 project_path.0,
6904 worktree_root_name,
6905 );
6906 let buffer = project
6907 .update(cx, |project, cx| {
6908 project.open_buffer(project_path.clone(), cx)
6909 })
6910 .await?;
6911 log::info!(
6912 "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
6913 guest_username,
6914 project_path.1,
6915 project_path.0,
6916 worktree_root_name,
6917 buffer.read_with(cx, |buffer, _| buffer.remote_id())
6918 );
6919 client.buffers.insert(buffer.clone());
6920 buffer
6921 } else {
6922 client
6923 .buffers
6924 .iter()
6925 .choose(&mut *rng.lock())
6926 .unwrap()
6927 .clone()
6928 };
6929
6930 let choice = rng.lock().gen_range(0..100);
6931 match choice {
6932 0..=9 => {
6933 cx.update(|cx| {
6934 log::info!(
6935 "{}: dropping buffer {:?}",
6936 guest_username,
6937 buffer.read(cx).file().unwrap().full_path(cx)
6938 );
6939 client.buffers.remove(&buffer);
6940 drop(buffer);
6941 });
6942 }
6943 10..=19 => {
6944 let completions = project.update(cx, |project, cx| {
6945 log::info!(
6946 "{}: requesting completions for buffer {} ({:?})",
6947 guest_username,
6948 buffer.read(cx).remote_id(),
6949 buffer.read(cx).file().unwrap().full_path(cx)
6950 );
6951 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6952 project.completions(&buffer, offset, cx)
6953 });
6954 let completions = cx.background().spawn(async move {
6955 completions
6956 .await
6957 .map_err(|err| anyhow!("completions request failed: {:?}", err))
6958 });
6959 if rng.lock().gen_bool(0.3) {
6960 log::info!("{}: detaching completions request", guest_username);
6961 cx.update(|cx| completions.detach_and_log_err(cx));
6962 } else {
6963 completions.await?;
6964 }
6965 }
6966 20..=29 => {
6967 let code_actions = project.update(cx, |project, cx| {
6968 log::info!(
6969 "{}: requesting code actions for buffer {} ({:?})",
6970 guest_username,
6971 buffer.read(cx).remote_id(),
6972 buffer.read(cx).file().unwrap().full_path(cx)
6973 );
6974 let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
6975 project.code_actions(&buffer, range, cx)
6976 });
6977 let code_actions = cx.background().spawn(async move {
6978 code_actions.await.map_err(|err| {
6979 anyhow!("code actions request failed: {:?}", err)
6980 })
6981 });
6982 if rng.lock().gen_bool(0.3) {
6983 log::info!("{}: detaching code actions request", guest_username);
6984 cx.update(|cx| code_actions.detach_and_log_err(cx));
6985 } else {
6986 code_actions.await?;
6987 }
6988 }
6989 30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
6990 let (requested_version, save) = buffer.update(cx, |buffer, cx| {
6991 log::info!(
6992 "{}: saving buffer {} ({:?})",
6993 guest_username,
6994 buffer.remote_id(),
6995 buffer.file().unwrap().full_path(cx)
6996 );
6997 (buffer.version(), buffer.save(cx))
6998 });
6999 let save = cx.background().spawn(async move {
7000 let (saved_version, _) = save
7001 .await
7002 .map_err(|err| anyhow!("save request failed: {:?}", err))?;
7003 assert!(saved_version.observed_all(&requested_version));
7004 Ok::<_, anyhow::Error>(())
7005 });
7006 if rng.lock().gen_bool(0.3) {
7007 log::info!("{}: detaching save request", guest_username);
7008 cx.update(|cx| save.detach_and_log_err(cx));
7009 } else {
7010 save.await?;
7011 }
7012 }
7013 40..=44 => {
7014 let prepare_rename = project.update(cx, |project, cx| {
7015 log::info!(
7016 "{}: preparing rename for buffer {} ({:?})",
7017 guest_username,
7018 buffer.read(cx).remote_id(),
7019 buffer.read(cx).file().unwrap().full_path(cx)
7020 );
7021 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7022 project.prepare_rename(buffer, offset, cx)
7023 });
7024 let prepare_rename = cx.background().spawn(async move {
7025 prepare_rename.await.map_err(|err| {
7026 anyhow!("prepare rename request failed: {:?}", err)
7027 })
7028 });
7029 if rng.lock().gen_bool(0.3) {
7030 log::info!("{}: detaching prepare rename request", guest_username);
7031 cx.update(|cx| prepare_rename.detach_and_log_err(cx));
7032 } else {
7033 prepare_rename.await?;
7034 }
7035 }
7036 45..=49 => {
7037 let definitions = project.update(cx, |project, cx| {
7038 log::info!(
7039 "{}: requesting definitions for buffer {} ({:?})",
7040 guest_username,
7041 buffer.read(cx).remote_id(),
7042 buffer.read(cx).file().unwrap().full_path(cx)
7043 );
7044 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7045 project.definition(&buffer, offset, cx)
7046 });
7047 let definitions = cx.background().spawn(async move {
7048 definitions
7049 .await
7050 .map_err(|err| anyhow!("definitions request failed: {:?}", err))
7051 });
7052 if rng.lock().gen_bool(0.3) {
7053 log::info!("{}: detaching definitions request", guest_username);
7054 cx.update(|cx| definitions.detach_and_log_err(cx));
7055 } else {
7056 client
7057 .buffers
7058 .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
7059 }
7060 }
7061 50..=54 => {
7062 let highlights = project.update(cx, |project, cx| {
7063 log::info!(
7064 "{}: requesting highlights for buffer {} ({:?})",
7065 guest_username,
7066 buffer.read(cx).remote_id(),
7067 buffer.read(cx).file().unwrap().full_path(cx)
7068 );
7069 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7070 project.document_highlights(&buffer, offset, cx)
7071 });
7072 let highlights = cx.background().spawn(async move {
7073 highlights
7074 .await
7075 .map_err(|err| anyhow!("highlights request failed: {:?}", err))
7076 });
7077 if rng.lock().gen_bool(0.3) {
7078 log::info!("{}: detaching highlights request", guest_username);
7079 cx.update(|cx| highlights.detach_and_log_err(cx));
7080 } else {
7081 highlights.await?;
7082 }
7083 }
7084 55..=59 => {
7085 let search = project.update(cx, |project, cx| {
7086 let query = rng.lock().gen_range('a'..='z');
7087 log::info!("{}: project-wide search {:?}", guest_username, query);
7088 project.search(SearchQuery::text(query, false, false), cx)
7089 });
7090 let search = cx.background().spawn(async move {
7091 search
7092 .await
7093 .map_err(|err| anyhow!("search request failed: {:?}", err))
7094 });
7095 if rng.lock().gen_bool(0.3) {
7096 log::info!("{}: detaching search request", guest_username);
7097 cx.update(|cx| search.detach_and_log_err(cx));
7098 } else {
7099 client.buffers.extend(search.await?.into_keys());
7100 }
7101 }
7102 60..=69 => {
7103 let worktree = project
7104 .read_with(cx, |project, cx| {
7105 project
7106 .worktrees(&cx)
7107 .filter(|worktree| {
7108 let worktree = worktree.read(cx);
7109 worktree.is_visible()
7110 && worktree.entries(false).any(|e| e.is_file())
7111 && worktree
7112 .root_entry()
7113 .map_or(false, |e| e.is_dir())
7114 })
7115 .choose(&mut *rng.lock())
7116 })
7117 .unwrap();
7118 let (worktree_id, worktree_root_name) = worktree
7119 .read_with(cx, |worktree, _| {
7120 (worktree.id(), worktree.root_name().to_string())
7121 });
7122
7123 let mut new_name = String::new();
7124 for _ in 0..10 {
7125 let letter = rng.lock().gen_range('a'..='z');
7126 new_name.push(letter);
7127 }
7128 let mut new_path = PathBuf::new();
7129 new_path.push(new_name);
7130 new_path.set_extension("rs");
7131 log::info!(
7132 "{}: creating {:?} in worktree {} ({})",
7133 guest_username,
7134 new_path,
7135 worktree_id,
7136 worktree_root_name,
7137 );
7138 project
7139 .update(cx, |project, cx| {
7140 project.create_entry((worktree_id, new_path), false, cx)
7141 })
7142 .unwrap()
7143 .await?;
7144 }
7145 _ => {
7146 buffer.update(cx, |buffer, cx| {
7147 log::info!(
7148 "{}: updating buffer {} ({:?})",
7149 guest_username,
7150 buffer.remote_id(),
7151 buffer.file().unwrap().full_path(cx)
7152 );
7153 if rng.lock().gen_bool(0.7) {
7154 buffer.randomly_edit(&mut *rng.lock(), 5, cx);
7155 } else {
7156 buffer.randomly_undo_redo(&mut *rng.lock(), cx);
7157 }
7158 });
7159 }
7160 }
7161 cx.background().simulate_random_delay().await;
7162 }
7163 Ok(())
7164 }
7165
7166 let result = simulate_guest_internal(
7167 &mut self,
7168 &guest_username,
7169 project.clone(),
7170 op_start_signal,
7171 rng,
7172 &mut cx,
7173 )
7174 .await;
7175 log::info!("{}: done", guest_username);
7176
7177 self.project = Some(project);
7178 (self, cx, result.err())
7179 }
7180 }
7181
7182 impl Drop for TestClient {
7183 fn drop(&mut self) {
7184 self.client.tear_down();
7185 }
7186 }
7187
7188 impl Executor for Arc<gpui::executor::Background> {
7189 type Sleep = gpui::executor::Timer;
7190
7191 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
7192 self.spawn(future).detach();
7193 }
7194
7195 fn sleep(&self, duration: Duration) -> Self::Sleep {
7196 self.as_ref().timer(duration)
7197 }
7198 }
7199
7200 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
7201 channel
7202 .messages()
7203 .cursor::<()>()
7204 .map(|m| {
7205 (
7206 m.sender.github_login.clone(),
7207 m.body.clone(),
7208 m.is_pending(),
7209 )
7210 })
7211 .collect()
7212 }
7213
7214 struct EmptyView;
7215
7216 impl gpui::Entity for EmptyView {
7217 type Event = ();
7218 }
7219
7220 impl gpui::View for EmptyView {
7221 fn ui_name() -> &'static str {
7222 "empty view"
7223 }
7224
7225 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
7226 gpui::Element::boxed(gpui::elements::Empty::new())
7227 }
7228 }
7229}