1mod store;
2
3use crate::{
4 auth,
5 db::{self, ChannelId, MessageId, UserId},
6 AppState, Result,
7};
8use anyhow::anyhow;
9use async_tungstenite::tungstenite::{
10 protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
11};
12use axum::{
13 body::Body,
14 extract::{
15 ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
16 ConnectInfo, WebSocketUpgrade,
17 },
18 headers::{Header, HeaderName},
19 http::StatusCode,
20 middleware,
21 response::IntoResponse,
22 routing::get,
23 Extension, Router, TypedHeader,
24};
25use collections::HashMap;
26use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
27use lazy_static::lazy_static;
28use rpc::{
29 proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
30 Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
31};
32use std::{
33 any::TypeId,
34 future::Future,
35 marker::PhantomData,
36 net::SocketAddr,
37 ops::{Deref, DerefMut},
38 rc::Rc,
39 sync::{
40 atomic::{AtomicBool, Ordering::SeqCst},
41 Arc,
42 },
43 time::Duration,
44};
45use store::{Store, Worktree};
46use time::OffsetDateTime;
47use tokio::{
48 sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
49 time::Sleep,
50};
51use tower::ServiceBuilder;
52use tracing::{info_span, Instrument};
53
54type MessageHandler =
55 Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
56
57struct Response<R> {
58 server: Arc<Server>,
59 receipt: Receipt<R>,
60 responded: Arc<AtomicBool>,
61}
62
63impl<R: RequestMessage> Response<R> {
64 fn send(self, payload: R::Response) -> Result<()> {
65 self.responded.store(true, SeqCst);
66 self.server.peer.respond(self.receipt, payload)?;
67 Ok(())
68 }
69
70 fn into_receipt(self) -> Receipt<R> {
71 self.responded.store(true, SeqCst);
72 self.receipt
73 }
74}
75
76pub struct Server {
77 peer: Arc<Peer>,
78 store: RwLock<Store>,
79 app_state: Arc<AppState>,
80 handlers: HashMap<TypeId, MessageHandler>,
81 notifications: Option<mpsc::UnboundedSender<()>>,
82}
83
84pub trait Executor: Send + Clone {
85 type Sleep: Send + Future;
86 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
87 fn sleep(&self, duration: Duration) -> Self::Sleep;
88}
89
90#[derive(Clone)]
91pub struct RealExecutor;
92
93const MESSAGE_COUNT_PER_PAGE: usize = 100;
94const MAX_MESSAGE_LEN: usize = 1024;
95
96struct StoreReadGuard<'a> {
97 guard: RwLockReadGuard<'a, Store>,
98 _not_send: PhantomData<Rc<()>>,
99}
100
101struct StoreWriteGuard<'a> {
102 guard: RwLockWriteGuard<'a, Store>,
103 _not_send: PhantomData<Rc<()>>,
104}
105
106impl Server {
107 pub fn new(
108 app_state: Arc<AppState>,
109 notifications: Option<mpsc::UnboundedSender<()>>,
110 ) -> Arc<Self> {
111 let mut server = Self {
112 peer: Peer::new(),
113 app_state,
114 store: Default::default(),
115 handlers: Default::default(),
116 notifications,
117 };
118
119 server
120 .add_request_handler(Server::ping)
121 .add_request_handler(Server::register_project)
122 .add_message_handler(Server::unregister_project)
123 .add_request_handler(Server::join_project)
124 .add_message_handler(Server::leave_project)
125 .add_message_handler(Server::respond_to_join_project_request)
126 .add_request_handler(Server::register_worktree)
127 .add_message_handler(Server::unregister_worktree)
128 .add_request_handler(Server::update_worktree)
129 .add_message_handler(Server::start_language_server)
130 .add_message_handler(Server::update_language_server)
131 .add_message_handler(Server::update_diagnostic_summary)
132 .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
133 .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
134 .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
135 .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
136 .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
137 .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
138 .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
139 .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
140 .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
141 .add_request_handler(
142 Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
143 )
144 .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
145 .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
146 .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
147 .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
148 .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
149 .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
150 .add_request_handler(Server::forward_project_request::<proto::CreateProjectEntry>)
151 .add_request_handler(Server::forward_project_request::<proto::RenameProjectEntry>)
152 .add_request_handler(Server::forward_project_request::<proto::DeleteProjectEntry>)
153 .add_request_handler(Server::update_buffer)
154 .add_message_handler(Server::update_buffer_file)
155 .add_message_handler(Server::buffer_reloaded)
156 .add_message_handler(Server::buffer_saved)
157 .add_request_handler(Server::save_buffer)
158 .add_request_handler(Server::get_channels)
159 .add_request_handler(Server::get_users)
160 .add_request_handler(Server::fuzzy_search_users)
161 .add_request_handler(Server::request_contact)
162 .add_request_handler(Server::remove_contact)
163 .add_request_handler(Server::respond_to_contact_request)
164 .add_request_handler(Server::join_channel)
165 .add_message_handler(Server::leave_channel)
166 .add_request_handler(Server::send_channel_message)
167 .add_request_handler(Server::follow)
168 .add_message_handler(Server::unfollow)
169 .add_message_handler(Server::update_followers)
170 .add_request_handler(Server::get_channel_messages);
171
172 Arc::new(server)
173 }
174
175 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
176 where
177 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
178 Fut: 'static + Send + Future<Output = Result<()>>,
179 M: EnvelopedMessage,
180 {
181 let prev_handler = self.handlers.insert(
182 TypeId::of::<M>(),
183 Box::new(move |server, envelope| {
184 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
185 let span = info_span!(
186 "handle message",
187 payload_type = envelope.payload_type_name(),
188 payload = format!("{:?}", envelope.payload).as_str(),
189 );
190 let future = (handler)(server, *envelope);
191 async move {
192 if let Err(error) = future.await {
193 tracing::error!(%error, "error handling message");
194 }
195 }
196 .instrument(span)
197 .boxed()
198 }),
199 );
200 if prev_handler.is_some() {
201 panic!("registered a handler for the same message twice");
202 }
203 self
204 }
205
206 /// Handle a request while holding a lock to the store. This is useful when we're registering
207 /// a connection but we want to respond on the connection before anybody else can send on it.
208 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
209 where
210 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>, Response<M>) -> Fut,
211 Fut: Send + Future<Output = Result<()>>,
212 M: RequestMessage,
213 {
214 let handler = Arc::new(handler);
215 self.add_message_handler(move |server, envelope| {
216 let receipt = envelope.receipt();
217 let handler = handler.clone();
218 async move {
219 let responded = Arc::new(AtomicBool::default());
220 let response = Response {
221 server: server.clone(),
222 responded: responded.clone(),
223 receipt: envelope.receipt(),
224 };
225 match (handler)(server.clone(), envelope, response).await {
226 Ok(()) => {
227 if responded.load(std::sync::atomic::Ordering::SeqCst) {
228 Ok(())
229 } else {
230 Err(anyhow!("handler did not send a response"))?
231 }
232 }
233 Err(error) => {
234 server.peer.respond_with_error(
235 receipt,
236 proto::Error {
237 message: error.to_string(),
238 },
239 )?;
240 Err(error)
241 }
242 }
243 }
244 })
245 }
246
247 pub fn handle_connection<E: Executor>(
248 self: &Arc<Self>,
249 connection: Connection,
250 address: String,
251 user_id: UserId,
252 mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
253 executor: E,
254 ) -> impl Future<Output = Result<()>> {
255 let mut this = self.clone();
256 let span = info_span!("handle connection", %user_id, %address);
257 async move {
258 let (connection_id, handle_io, mut incoming_rx) = this
259 .peer
260 .add_connection(connection, {
261 let executor = executor.clone();
262 move |duration| {
263 let timer = executor.sleep(duration);
264 async move {
265 timer.await;
266 }
267 }
268 })
269 .await;
270
271 tracing::info!(%user_id, %connection_id, %address, "connection opened");
272
273 if let Some(send_connection_id) = send_connection_id.as_mut() {
274 let _ = send_connection_id.send(connection_id).await;
275 }
276
277 let contacts = this.app_state.db.get_contacts(user_id).await?;
278
279 {
280 let mut store = this.store_mut().await;
281 store.add_connection(connection_id, user_id);
282 this.peer.send(connection_id, store.build_initial_contacts_update(contacts))?;
283 }
284 this.update_user_contacts(user_id).await?;
285
286 let handle_io = handle_io.fuse();
287 futures::pin_mut!(handle_io);
288 loop {
289 let next_message = incoming_rx.next().fuse();
290 futures::pin_mut!(next_message);
291 futures::select_biased! {
292 result = handle_io => {
293 if let Err(error) = result {
294 tracing::error!(%error, "error handling I/O");
295 }
296 break;
297 }
298 message = next_message => {
299 if let Some(message) = message {
300 let type_name = message.payload_type_name();
301 let span = tracing::info_span!("receive message", %user_id, %connection_id, %address, type_name);
302 async {
303 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
304 let notifications = this.notifications.clone();
305 let is_background = message.is_background();
306 let handle_message = (handler)(this.clone(), message);
307 let handle_message = async move {
308 handle_message.await;
309 if let Some(mut notifications) = notifications {
310 let _ = notifications.send(()).await;
311 }
312 };
313 if is_background {
314 executor.spawn_detached(handle_message);
315 } else {
316 handle_message.await;
317 }
318 } else {
319 tracing::error!("no message handler");
320 }
321 }.instrument(span).await;
322 } else {
323 tracing::info!(%user_id, %connection_id, %address, "connection closed");
324 break;
325 }
326 }
327 }
328 }
329
330 if let Err(error) = this.sign_out(connection_id).await {
331 tracing::error!(%error, "error signing out");
332 }
333
334 Ok(())
335 }.instrument(span)
336 }
337
338 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
339 self.peer.disconnect(connection_id);
340
341 let removed_user_id = {
342 let mut store = self.store_mut().await;
343 let removed_connection = store.remove_connection(connection_id)?;
344
345 for (project_id, project) in removed_connection.hosted_projects {
346 broadcast(connection_id, project.guests.keys().copied(), |conn_id| {
347 self.peer
348 .send(conn_id, proto::UnregisterProject { project_id })
349 });
350 }
351
352 for project_id in removed_connection.guest_project_ids {
353 if let Some(project) = store.project(project_id).trace_err() {
354 broadcast(connection_id, project.connection_ids(), |conn_id| {
355 self.peer.send(
356 conn_id,
357 proto::RemoveProjectCollaborator {
358 project_id,
359 peer_id: connection_id.0,
360 },
361 )
362 });
363 if project.guests.is_empty() {
364 self.peer
365 .send(
366 project.host_connection_id,
367 proto::ProjectUnshared { project_id },
368 )
369 .trace_err();
370 }
371 }
372 }
373
374 removed_connection.user_id
375 };
376
377 self.update_user_contacts(removed_user_id).await?;
378
379 Ok(())
380 }
381
382 async fn ping(
383 self: Arc<Server>,
384 _: TypedEnvelope<proto::Ping>,
385 response: Response<proto::Ping>,
386 ) -> Result<()> {
387 response.send(proto::Ack {})?;
388 Ok(())
389 }
390
391 async fn register_project(
392 self: Arc<Server>,
393 request: TypedEnvelope<proto::RegisterProject>,
394 response: Response<proto::RegisterProject>,
395 ) -> Result<()> {
396 let user_id;
397 let project_id;
398 {
399 let mut state = self.store_mut().await;
400 user_id = state.user_id_for_connection(request.sender_id)?;
401 project_id = state.register_project(request.sender_id, user_id);
402 };
403 self.update_user_contacts(user_id).await?;
404 response.send(proto::RegisterProjectResponse { project_id })?;
405 Ok(())
406 }
407
408 async fn unregister_project(
409 self: Arc<Server>,
410 request: TypedEnvelope<proto::UnregisterProject>,
411 ) -> Result<()> {
412 let user_id = {
413 let mut state = self.store_mut().await;
414 state.unregister_project(request.payload.project_id, request.sender_id)?;
415 state.user_id_for_connection(request.sender_id)?
416 };
417
418 self.update_user_contacts(user_id).await?;
419 Ok(())
420 }
421
422 async fn update_user_contacts(self: &Arc<Server>, user_id: UserId) -> Result<()> {
423 let contacts = self.app_state.db.get_contacts(user_id).await?;
424 let store = self.store().await;
425 let updated_contact = store.contact_for_user(user_id, false);
426 for contact in contacts {
427 if let db::Contact::Accepted {
428 user_id: contact_user_id,
429 ..
430 } = contact
431 {
432 for contact_conn_id in store.connection_ids_for_user(contact_user_id) {
433 self.peer
434 .send(
435 contact_conn_id,
436 proto::UpdateContacts {
437 contacts: vec![updated_contact.clone()],
438 remove_contacts: Default::default(),
439 incoming_requests: Default::default(),
440 remove_incoming_requests: Default::default(),
441 outgoing_requests: Default::default(),
442 remove_outgoing_requests: Default::default(),
443 },
444 )
445 .trace_err();
446 }
447 }
448 }
449 Ok(())
450 }
451
452 async fn join_project(
453 self: Arc<Server>,
454 request: TypedEnvelope<proto::JoinProject>,
455 response: Response<proto::JoinProject>,
456 ) -> Result<()> {
457 let project_id = request.payload.project_id;
458 let host_user_id;
459 let guest_user_id;
460 let host_connection_id;
461 {
462 let state = self.store().await;
463 let project = state.project(project_id)?;
464 host_user_id = project.host_user_id;
465 host_connection_id = project.host_connection_id;
466 guest_user_id = state.user_id_for_connection(request.sender_id)?;
467 };
468
469 let has_contact = self
470 .app_state
471 .db
472 .has_contact(guest_user_id, host_user_id)
473 .await?;
474 if !has_contact {
475 return Err(anyhow!("no such project"))?;
476 }
477
478 self.store_mut().await.request_join_project(
479 guest_user_id,
480 project_id,
481 response.into_receipt(),
482 )?;
483 self.peer.send(
484 host_connection_id,
485 proto::RequestJoinProject {
486 project_id,
487 requester_id: guest_user_id.to_proto(),
488 },
489 )?;
490 Ok(())
491 }
492
493 async fn respond_to_join_project_request(
494 self: Arc<Server>,
495 request: TypedEnvelope<proto::RespondToJoinProjectRequest>,
496 ) -> Result<()> {
497 let host_user_id;
498
499 {
500 let mut state = self.store_mut().await;
501 let project_id = request.payload.project_id;
502 let project = state.project(project_id)?;
503 if project.host_connection_id != request.sender_id {
504 Err(anyhow!("no such connection"))?;
505 }
506
507 host_user_id = project.host_user_id;
508 let guest_user_id = UserId::from_proto(request.payload.requester_id);
509
510 if !request.payload.allow {
511 let receipts = state
512 .deny_join_project_request(request.sender_id, guest_user_id, project_id)
513 .ok_or_else(|| anyhow!("no such request"))?;
514 for receipt in receipts {
515 self.peer.respond(
516 receipt,
517 proto::JoinProjectResponse {
518 variant: Some(proto::join_project_response::Variant::Decline(
519 proto::join_project_response::Decline {},
520 )),
521 },
522 )?;
523 }
524 return Ok(());
525 }
526
527 let (receipts_with_replica_ids, project) = state
528 .accept_join_project_request(request.sender_id, guest_user_id, project_id)
529 .ok_or_else(|| anyhow!("no such request"))?;
530
531 let peer_count = project.guests.len();
532 let mut collaborators = Vec::with_capacity(peer_count);
533 collaborators.push(proto::Collaborator {
534 peer_id: project.host_connection_id.0,
535 replica_id: 0,
536 user_id: project.host_user_id.to_proto(),
537 });
538 let worktrees = project
539 .worktrees
540 .iter()
541 .filter_map(|(id, shared_worktree)| {
542 let worktree = project.worktrees.get(&id)?;
543 Some(proto::Worktree {
544 id: *id,
545 root_name: worktree.root_name.clone(),
546 entries: shared_worktree.entries.values().cloned().collect(),
547 diagnostic_summaries: shared_worktree
548 .diagnostic_summaries
549 .values()
550 .cloned()
551 .collect(),
552 visible: worktree.visible,
553 scan_id: shared_worktree.scan_id,
554 })
555 })
556 .collect::<Vec<_>>();
557
558 // Add all guests other than the requesting user's own connections as collaborators
559 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &project.guests {
560 if receipts_with_replica_ids
561 .iter()
562 .all(|(receipt, _)| receipt.sender_id != *peer_conn_id)
563 {
564 collaborators.push(proto::Collaborator {
565 peer_id: peer_conn_id.0,
566 replica_id: *peer_replica_id as u32,
567 user_id: peer_user_id.to_proto(),
568 });
569 }
570 }
571
572 for conn_id in project.connection_ids() {
573 for (receipt, replica_id) in &receipts_with_replica_ids {
574 if conn_id != receipt.sender_id {
575 self.peer.send(
576 conn_id,
577 proto::AddProjectCollaborator {
578 project_id,
579 collaborator: Some(proto::Collaborator {
580 peer_id: receipt.sender_id.0,
581 replica_id: *replica_id as u32,
582 user_id: guest_user_id.to_proto(),
583 }),
584 },
585 )?;
586 }
587 }
588 }
589
590 for (receipt, replica_id) in receipts_with_replica_ids {
591 self.peer.respond(
592 receipt,
593 proto::JoinProjectResponse {
594 variant: Some(proto::join_project_response::Variant::Accept(
595 proto::join_project_response::Accept {
596 worktrees: worktrees.clone(),
597 replica_id: replica_id as u32,
598 collaborators: collaborators.clone(),
599 language_servers: project.language_servers.clone(),
600 },
601 )),
602 },
603 )?;
604 }
605 }
606
607 self.update_user_contacts(host_user_id).await?;
608 Ok(())
609 }
610
611 async fn leave_project(
612 self: Arc<Server>,
613 request: TypedEnvelope<proto::LeaveProject>,
614 ) -> Result<()> {
615 let sender_id = request.sender_id;
616 let project_id = request.payload.project_id;
617 let project;
618 {
619 let mut state = self.store_mut().await;
620 project = state.leave_project(sender_id, project_id)?;
621 let unshare = project.connection_ids.len() <= 1;
622 broadcast(sender_id, project.connection_ids, |conn_id| {
623 self.peer.send(
624 conn_id,
625 proto::RemoveProjectCollaborator {
626 project_id,
627 peer_id: sender_id.0,
628 },
629 )
630 });
631 if unshare {
632 self.peer.send(
633 project.host_connection_id,
634 proto::ProjectUnshared { project_id },
635 )?;
636 }
637 }
638 self.update_user_contacts(project.host_user_id).await?;
639 Ok(())
640 }
641
642 async fn register_worktree(
643 self: Arc<Server>,
644 request: TypedEnvelope<proto::RegisterWorktree>,
645 response: Response<proto::RegisterWorktree>,
646 ) -> Result<()> {
647 let host_user_id;
648 {
649 let mut state = self.store_mut().await;
650 host_user_id = state.user_id_for_connection(request.sender_id)?;
651
652 let guest_connection_ids = state
653 .read_project(request.payload.project_id, request.sender_id)?
654 .guest_connection_ids();
655 state.register_worktree(
656 request.payload.project_id,
657 request.payload.worktree_id,
658 request.sender_id,
659 Worktree {
660 root_name: request.payload.root_name.clone(),
661 visible: request.payload.visible,
662 ..Default::default()
663 },
664 )?;
665
666 broadcast(request.sender_id, guest_connection_ids, |connection_id| {
667 self.peer
668 .forward_send(request.sender_id, connection_id, request.payload.clone())
669 });
670 }
671 self.update_user_contacts(host_user_id).await?;
672 response.send(proto::Ack {})?;
673 Ok(())
674 }
675
676 async fn unregister_worktree(
677 self: Arc<Server>,
678 request: TypedEnvelope<proto::UnregisterWorktree>,
679 ) -> Result<()> {
680 let host_user_id;
681 let project_id = request.payload.project_id;
682 let worktree_id = request.payload.worktree_id;
683 {
684 let mut state = self.store_mut().await;
685 let (_, guest_connection_ids) =
686 state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
687 host_user_id = state.user_id_for_connection(request.sender_id)?;
688 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
689 self.peer.send(
690 conn_id,
691 proto::UnregisterWorktree {
692 project_id,
693 worktree_id,
694 },
695 )
696 });
697 }
698 self.update_user_contacts(host_user_id).await?;
699 Ok(())
700 }
701
702 async fn update_worktree(
703 self: Arc<Server>,
704 request: TypedEnvelope<proto::UpdateWorktree>,
705 response: Response<proto::UpdateWorktree>,
706 ) -> Result<()> {
707 let connection_ids = self.store_mut().await.update_worktree(
708 request.sender_id,
709 request.payload.project_id,
710 request.payload.worktree_id,
711 &request.payload.removed_entries,
712 &request.payload.updated_entries,
713 request.payload.scan_id,
714 )?;
715
716 broadcast(request.sender_id, connection_ids, |connection_id| {
717 self.peer
718 .forward_send(request.sender_id, connection_id, request.payload.clone())
719 });
720 response.send(proto::Ack {})?;
721 Ok(())
722 }
723
724 async fn update_diagnostic_summary(
725 self: Arc<Server>,
726 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
727 ) -> Result<()> {
728 let summary = request
729 .payload
730 .summary
731 .clone()
732 .ok_or_else(|| anyhow!("invalid summary"))?;
733 let receiver_ids = self.store_mut().await.update_diagnostic_summary(
734 request.payload.project_id,
735 request.payload.worktree_id,
736 request.sender_id,
737 summary,
738 )?;
739
740 broadcast(request.sender_id, receiver_ids, |connection_id| {
741 self.peer
742 .forward_send(request.sender_id, connection_id, request.payload.clone())
743 });
744 Ok(())
745 }
746
747 async fn start_language_server(
748 self: Arc<Server>,
749 request: TypedEnvelope<proto::StartLanguageServer>,
750 ) -> Result<()> {
751 let receiver_ids = self.store_mut().await.start_language_server(
752 request.payload.project_id,
753 request.sender_id,
754 request
755 .payload
756 .server
757 .clone()
758 .ok_or_else(|| anyhow!("invalid language server"))?,
759 )?;
760 broadcast(request.sender_id, receiver_ids, |connection_id| {
761 self.peer
762 .forward_send(request.sender_id, connection_id, request.payload.clone())
763 });
764 Ok(())
765 }
766
767 async fn update_language_server(
768 self: Arc<Server>,
769 request: TypedEnvelope<proto::UpdateLanguageServer>,
770 ) -> Result<()> {
771 let receiver_ids = self
772 .store()
773 .await
774 .project_connection_ids(request.payload.project_id, request.sender_id)?;
775 broadcast(request.sender_id, receiver_ids, |connection_id| {
776 self.peer
777 .forward_send(request.sender_id, connection_id, request.payload.clone())
778 });
779 Ok(())
780 }
781
782 async fn forward_project_request<T>(
783 self: Arc<Server>,
784 request: TypedEnvelope<T>,
785 response: Response<T>,
786 ) -> Result<()>
787 where
788 T: EntityMessage + RequestMessage,
789 {
790 let host_connection_id = self
791 .store()
792 .await
793 .read_project(request.payload.remote_entity_id(), request.sender_id)?
794 .host_connection_id;
795
796 response.send(
797 self.peer
798 .forward_request(request.sender_id, host_connection_id, request.payload)
799 .await?,
800 )?;
801 Ok(())
802 }
803
804 async fn save_buffer(
805 self: Arc<Server>,
806 request: TypedEnvelope<proto::SaveBuffer>,
807 response: Response<proto::SaveBuffer>,
808 ) -> Result<()> {
809 let host = self
810 .store()
811 .await
812 .read_project(request.payload.project_id, request.sender_id)?
813 .host_connection_id;
814 let response_payload = self
815 .peer
816 .forward_request(request.sender_id, host, request.payload.clone())
817 .await?;
818
819 let mut guests = self
820 .store()
821 .await
822 .read_project(request.payload.project_id, request.sender_id)?
823 .connection_ids();
824 guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
825 broadcast(host, guests, |conn_id| {
826 self.peer
827 .forward_send(host, conn_id, response_payload.clone())
828 });
829 response.send(response_payload)?;
830 Ok(())
831 }
832
833 async fn update_buffer(
834 self: Arc<Server>,
835 request: TypedEnvelope<proto::UpdateBuffer>,
836 response: Response<proto::UpdateBuffer>,
837 ) -> Result<()> {
838 let receiver_ids = self
839 .store()
840 .await
841 .project_connection_ids(request.payload.project_id, request.sender_id)?;
842 broadcast(request.sender_id, receiver_ids, |connection_id| {
843 self.peer
844 .forward_send(request.sender_id, connection_id, request.payload.clone())
845 });
846 response.send(proto::Ack {})?;
847 Ok(())
848 }
849
850 async fn update_buffer_file(
851 self: Arc<Server>,
852 request: TypedEnvelope<proto::UpdateBufferFile>,
853 ) -> Result<()> {
854 let receiver_ids = self
855 .store()
856 .await
857 .project_connection_ids(request.payload.project_id, request.sender_id)?;
858 broadcast(request.sender_id, receiver_ids, |connection_id| {
859 self.peer
860 .forward_send(request.sender_id, connection_id, request.payload.clone())
861 });
862 Ok(())
863 }
864
865 async fn buffer_reloaded(
866 self: Arc<Server>,
867 request: TypedEnvelope<proto::BufferReloaded>,
868 ) -> Result<()> {
869 let receiver_ids = self
870 .store()
871 .await
872 .project_connection_ids(request.payload.project_id, request.sender_id)?;
873 broadcast(request.sender_id, receiver_ids, |connection_id| {
874 self.peer
875 .forward_send(request.sender_id, connection_id, request.payload.clone())
876 });
877 Ok(())
878 }
879
880 async fn buffer_saved(
881 self: Arc<Server>,
882 request: TypedEnvelope<proto::BufferSaved>,
883 ) -> Result<()> {
884 let receiver_ids = self
885 .store()
886 .await
887 .project_connection_ids(request.payload.project_id, request.sender_id)?;
888 broadcast(request.sender_id, receiver_ids, |connection_id| {
889 self.peer
890 .forward_send(request.sender_id, connection_id, request.payload.clone())
891 });
892 Ok(())
893 }
894
895 async fn follow(
896 self: Arc<Self>,
897 request: TypedEnvelope<proto::Follow>,
898 response: Response<proto::Follow>,
899 ) -> Result<()> {
900 let leader_id = ConnectionId(request.payload.leader_id);
901 let follower_id = request.sender_id;
902 if !self
903 .store()
904 .await
905 .project_connection_ids(request.payload.project_id, follower_id)?
906 .contains(&leader_id)
907 {
908 Err(anyhow!("no such peer"))?;
909 }
910 let mut response_payload = self
911 .peer
912 .forward_request(request.sender_id, leader_id, request.payload)
913 .await?;
914 response_payload
915 .views
916 .retain(|view| view.leader_id != Some(follower_id.0));
917 response.send(response_payload)?;
918 Ok(())
919 }
920
921 async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
922 let leader_id = ConnectionId(request.payload.leader_id);
923 if !self
924 .store()
925 .await
926 .project_connection_ids(request.payload.project_id, request.sender_id)?
927 .contains(&leader_id)
928 {
929 Err(anyhow!("no such peer"))?;
930 }
931 self.peer
932 .forward_send(request.sender_id, leader_id, request.payload)?;
933 Ok(())
934 }
935
936 async fn update_followers(
937 self: Arc<Self>,
938 request: TypedEnvelope<proto::UpdateFollowers>,
939 ) -> Result<()> {
940 let connection_ids = self
941 .store()
942 .await
943 .project_connection_ids(request.payload.project_id, request.sender_id)?;
944 let leader_id = request
945 .payload
946 .variant
947 .as_ref()
948 .and_then(|variant| match variant {
949 proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
950 proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
951 proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
952 });
953 for follower_id in &request.payload.follower_ids {
954 let follower_id = ConnectionId(*follower_id);
955 if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
956 self.peer
957 .forward_send(request.sender_id, follower_id, request.payload.clone())?;
958 }
959 }
960 Ok(())
961 }
962
963 async fn get_channels(
964 self: Arc<Server>,
965 request: TypedEnvelope<proto::GetChannels>,
966 response: Response<proto::GetChannels>,
967 ) -> Result<()> {
968 let user_id = self
969 .store()
970 .await
971 .user_id_for_connection(request.sender_id)?;
972 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
973 response.send(proto::GetChannelsResponse {
974 channels: channels
975 .into_iter()
976 .map(|chan| proto::Channel {
977 id: chan.id.to_proto(),
978 name: chan.name,
979 })
980 .collect(),
981 })?;
982 Ok(())
983 }
984
985 async fn get_users(
986 self: Arc<Server>,
987 request: TypedEnvelope<proto::GetUsers>,
988 response: Response<proto::GetUsers>,
989 ) -> Result<()> {
990 let user_ids = request
991 .payload
992 .user_ids
993 .into_iter()
994 .map(UserId::from_proto)
995 .collect();
996 let users = self
997 .app_state
998 .db
999 .get_users_by_ids(user_ids)
1000 .await?
1001 .into_iter()
1002 .map(|user| proto::User {
1003 id: user.id.to_proto(),
1004 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1005 github_login: user.github_login,
1006 })
1007 .collect();
1008 response.send(proto::UsersResponse { users })?;
1009 Ok(())
1010 }
1011
1012 async fn fuzzy_search_users(
1013 self: Arc<Server>,
1014 request: TypedEnvelope<proto::FuzzySearchUsers>,
1015 response: Response<proto::FuzzySearchUsers>,
1016 ) -> Result<()> {
1017 let user_id = self
1018 .store()
1019 .await
1020 .user_id_for_connection(request.sender_id)?;
1021 let query = request.payload.query;
1022 let db = &self.app_state.db;
1023 let users = match query.len() {
1024 0 => vec![],
1025 1 | 2 => db
1026 .get_user_by_github_login(&query)
1027 .await?
1028 .into_iter()
1029 .collect(),
1030 _ => db.fuzzy_search_users(&query, 10).await?,
1031 };
1032 let users = users
1033 .into_iter()
1034 .filter(|user| user.id != user_id)
1035 .map(|user| proto::User {
1036 id: user.id.to_proto(),
1037 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1038 github_login: user.github_login,
1039 })
1040 .collect();
1041 response.send(proto::UsersResponse { users })?;
1042 Ok(())
1043 }
1044
1045 async fn request_contact(
1046 self: Arc<Server>,
1047 request: TypedEnvelope<proto::RequestContact>,
1048 response: Response<proto::RequestContact>,
1049 ) -> Result<()> {
1050 let requester_id = self
1051 .store()
1052 .await
1053 .user_id_for_connection(request.sender_id)?;
1054 let responder_id = UserId::from_proto(request.payload.responder_id);
1055 if requester_id == responder_id {
1056 return Err(anyhow!("cannot add yourself as a contact"))?;
1057 }
1058
1059 self.app_state
1060 .db
1061 .send_contact_request(requester_id, responder_id)
1062 .await?;
1063
1064 // Update outgoing contact requests of requester
1065 let mut update = proto::UpdateContacts::default();
1066 update.outgoing_requests.push(responder_id.to_proto());
1067 for connection_id in self.store().await.connection_ids_for_user(requester_id) {
1068 self.peer.send(connection_id, update.clone())?;
1069 }
1070
1071 // Update incoming contact requests of responder
1072 let mut update = proto::UpdateContacts::default();
1073 update
1074 .incoming_requests
1075 .push(proto::IncomingContactRequest {
1076 requester_id: requester_id.to_proto(),
1077 should_notify: true,
1078 });
1079 for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1080 self.peer.send(connection_id, update.clone())?;
1081 }
1082
1083 response.send(proto::Ack {})?;
1084 Ok(())
1085 }
1086
1087 async fn respond_to_contact_request(
1088 self: Arc<Server>,
1089 request: TypedEnvelope<proto::RespondToContactRequest>,
1090 response: Response<proto::RespondToContactRequest>,
1091 ) -> Result<()> {
1092 let responder_id = self
1093 .store()
1094 .await
1095 .user_id_for_connection(request.sender_id)?;
1096 let requester_id = UserId::from_proto(request.payload.requester_id);
1097 if request.payload.response == proto::ContactRequestResponse::Dismiss as i32 {
1098 self.app_state
1099 .db
1100 .dismiss_contact_notification(responder_id, requester_id)
1101 .await?;
1102 } else {
1103 let accept = request.payload.response == proto::ContactRequestResponse::Accept as i32;
1104 self.app_state
1105 .db
1106 .respond_to_contact_request(responder_id, requester_id, accept)
1107 .await?;
1108
1109 let store = self.store().await;
1110 // Update responder with new contact
1111 let mut update = proto::UpdateContacts::default();
1112 if accept {
1113 update
1114 .contacts
1115 .push(store.contact_for_user(requester_id, false));
1116 }
1117 update
1118 .remove_incoming_requests
1119 .push(requester_id.to_proto());
1120 for connection_id in store.connection_ids_for_user(responder_id) {
1121 self.peer.send(connection_id, update.clone())?;
1122 }
1123
1124 // Update requester with new contact
1125 let mut update = proto::UpdateContacts::default();
1126 if accept {
1127 update
1128 .contacts
1129 .push(store.contact_for_user(responder_id, true));
1130 }
1131 update
1132 .remove_outgoing_requests
1133 .push(responder_id.to_proto());
1134 for connection_id in store.connection_ids_for_user(requester_id) {
1135 self.peer.send(connection_id, update.clone())?;
1136 }
1137 }
1138
1139 response.send(proto::Ack {})?;
1140 Ok(())
1141 }
1142
1143 async fn remove_contact(
1144 self: Arc<Server>,
1145 request: TypedEnvelope<proto::RemoveContact>,
1146 response: Response<proto::RemoveContact>,
1147 ) -> Result<()> {
1148 let requester_id = self
1149 .store()
1150 .await
1151 .user_id_for_connection(request.sender_id)?;
1152 let responder_id = UserId::from_proto(request.payload.user_id);
1153 self.app_state
1154 .db
1155 .remove_contact(requester_id, responder_id)
1156 .await?;
1157
1158 // Update outgoing contact requests of requester
1159 let mut update = proto::UpdateContacts::default();
1160 update
1161 .remove_outgoing_requests
1162 .push(responder_id.to_proto());
1163 for connection_id in self.store().await.connection_ids_for_user(requester_id) {
1164 self.peer.send(connection_id, update.clone())?;
1165 }
1166
1167 // Update incoming contact requests of responder
1168 let mut update = proto::UpdateContacts::default();
1169 update
1170 .remove_incoming_requests
1171 .push(requester_id.to_proto());
1172 for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1173 self.peer.send(connection_id, update.clone())?;
1174 }
1175
1176 response.send(proto::Ack {})?;
1177 Ok(())
1178 }
1179
1180 async fn join_channel(
1181 self: Arc<Self>,
1182 request: TypedEnvelope<proto::JoinChannel>,
1183 response: Response<proto::JoinChannel>,
1184 ) -> Result<()> {
1185 let user_id = self
1186 .store()
1187 .await
1188 .user_id_for_connection(request.sender_id)?;
1189 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1190 if !self
1191 .app_state
1192 .db
1193 .can_user_access_channel(user_id, channel_id)
1194 .await?
1195 {
1196 Err(anyhow!("access denied"))?;
1197 }
1198
1199 self.store_mut()
1200 .await
1201 .join_channel(request.sender_id, channel_id);
1202 let messages = self
1203 .app_state
1204 .db
1205 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
1206 .await?
1207 .into_iter()
1208 .map(|msg| proto::ChannelMessage {
1209 id: msg.id.to_proto(),
1210 body: msg.body,
1211 timestamp: msg.sent_at.unix_timestamp() as u64,
1212 sender_id: msg.sender_id.to_proto(),
1213 nonce: Some(msg.nonce.as_u128().into()),
1214 })
1215 .collect::<Vec<_>>();
1216 response.send(proto::JoinChannelResponse {
1217 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1218 messages,
1219 })?;
1220 Ok(())
1221 }
1222
1223 async fn leave_channel(
1224 self: Arc<Self>,
1225 request: TypedEnvelope<proto::LeaveChannel>,
1226 ) -> Result<()> {
1227 let user_id = self
1228 .store()
1229 .await
1230 .user_id_for_connection(request.sender_id)?;
1231 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1232 if !self
1233 .app_state
1234 .db
1235 .can_user_access_channel(user_id, channel_id)
1236 .await?
1237 {
1238 Err(anyhow!("access denied"))?;
1239 }
1240
1241 self.store_mut()
1242 .await
1243 .leave_channel(request.sender_id, channel_id);
1244
1245 Ok(())
1246 }
1247
1248 async fn send_channel_message(
1249 self: Arc<Self>,
1250 request: TypedEnvelope<proto::SendChannelMessage>,
1251 response: Response<proto::SendChannelMessage>,
1252 ) -> Result<()> {
1253 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1254 let user_id;
1255 let connection_ids;
1256 {
1257 let state = self.store().await;
1258 user_id = state.user_id_for_connection(request.sender_id)?;
1259 connection_ids = state.channel_connection_ids(channel_id)?;
1260 }
1261
1262 // Validate the message body.
1263 let body = request.payload.body.trim().to_string();
1264 if body.len() > MAX_MESSAGE_LEN {
1265 return Err(anyhow!("message is too long"))?;
1266 }
1267 if body.is_empty() {
1268 return Err(anyhow!("message can't be blank"))?;
1269 }
1270
1271 let timestamp = OffsetDateTime::now_utc();
1272 let nonce = request
1273 .payload
1274 .nonce
1275 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
1276
1277 let message_id = self
1278 .app_state
1279 .db
1280 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1281 .await?
1282 .to_proto();
1283 let message = proto::ChannelMessage {
1284 sender_id: user_id.to_proto(),
1285 id: message_id,
1286 body,
1287 timestamp: timestamp.unix_timestamp() as u64,
1288 nonce: Some(nonce),
1289 };
1290 broadcast(request.sender_id, connection_ids, |conn_id| {
1291 self.peer.send(
1292 conn_id,
1293 proto::ChannelMessageSent {
1294 channel_id: channel_id.to_proto(),
1295 message: Some(message.clone()),
1296 },
1297 )
1298 });
1299 response.send(proto::SendChannelMessageResponse {
1300 message: Some(message),
1301 })?;
1302 Ok(())
1303 }
1304
1305 async fn get_channel_messages(
1306 self: Arc<Self>,
1307 request: TypedEnvelope<proto::GetChannelMessages>,
1308 response: Response<proto::GetChannelMessages>,
1309 ) -> Result<()> {
1310 let user_id = self
1311 .store()
1312 .await
1313 .user_id_for_connection(request.sender_id)?;
1314 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1315 if !self
1316 .app_state
1317 .db
1318 .can_user_access_channel(user_id, channel_id)
1319 .await?
1320 {
1321 Err(anyhow!("access denied"))?;
1322 }
1323
1324 let messages = self
1325 .app_state
1326 .db
1327 .get_channel_messages(
1328 channel_id,
1329 MESSAGE_COUNT_PER_PAGE,
1330 Some(MessageId::from_proto(request.payload.before_message_id)),
1331 )
1332 .await?
1333 .into_iter()
1334 .map(|msg| proto::ChannelMessage {
1335 id: msg.id.to_proto(),
1336 body: msg.body,
1337 timestamp: msg.sent_at.unix_timestamp() as u64,
1338 sender_id: msg.sender_id.to_proto(),
1339 nonce: Some(msg.nonce.as_u128().into()),
1340 })
1341 .collect::<Vec<_>>();
1342 response.send(proto::GetChannelMessagesResponse {
1343 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1344 messages,
1345 })?;
1346 Ok(())
1347 }
1348
1349 async fn store<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1350 #[cfg(test)]
1351 tokio::task::yield_now().await;
1352 let guard = self.store.read().await;
1353 #[cfg(test)]
1354 tokio::task::yield_now().await;
1355 StoreReadGuard {
1356 guard,
1357 _not_send: PhantomData,
1358 }
1359 }
1360
1361 async fn store_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1362 #[cfg(test)]
1363 tokio::task::yield_now().await;
1364 let guard = self.store.write().await;
1365 #[cfg(test)]
1366 tokio::task::yield_now().await;
1367 StoreWriteGuard {
1368 guard,
1369 _not_send: PhantomData,
1370 }
1371 }
1372}
1373
1374impl<'a> Deref for StoreReadGuard<'a> {
1375 type Target = Store;
1376
1377 fn deref(&self) -> &Self::Target {
1378 &*self.guard
1379 }
1380}
1381
1382impl<'a> Deref for StoreWriteGuard<'a> {
1383 type Target = Store;
1384
1385 fn deref(&self) -> &Self::Target {
1386 &*self.guard
1387 }
1388}
1389
1390impl<'a> DerefMut for StoreWriteGuard<'a> {
1391 fn deref_mut(&mut self) -> &mut Self::Target {
1392 &mut *self.guard
1393 }
1394}
1395
1396impl<'a> Drop for StoreWriteGuard<'a> {
1397 fn drop(&mut self) {
1398 #[cfg(test)]
1399 self.check_invariants();
1400
1401 let metrics = self.metrics();
1402 tracing::info!(
1403 connections = metrics.connections,
1404 registered_projects = metrics.registered_projects,
1405 shared_projects = metrics.shared_projects,
1406 "metrics"
1407 );
1408 }
1409}
1410
1411impl Executor for RealExecutor {
1412 type Sleep = Sleep;
1413
1414 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1415 tokio::task::spawn(future);
1416 }
1417
1418 fn sleep(&self, duration: Duration) -> Self::Sleep {
1419 tokio::time::sleep(duration)
1420 }
1421}
1422
1423fn broadcast<F>(
1424 sender_id: ConnectionId,
1425 receiver_ids: impl IntoIterator<Item = ConnectionId>,
1426 mut f: F,
1427) where
1428 F: FnMut(ConnectionId) -> anyhow::Result<()>,
1429{
1430 for receiver_id in receiver_ids {
1431 if receiver_id != sender_id {
1432 f(receiver_id).trace_err();
1433 }
1434 }
1435}
1436
1437lazy_static! {
1438 static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1439}
1440
1441pub struct ProtocolVersion(u32);
1442
1443impl Header for ProtocolVersion {
1444 fn name() -> &'static HeaderName {
1445 &ZED_PROTOCOL_VERSION
1446 }
1447
1448 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1449 where
1450 Self: Sized,
1451 I: Iterator<Item = &'i axum::http::HeaderValue>,
1452 {
1453 let version = values
1454 .next()
1455 .ok_or_else(|| axum::headers::Error::invalid())?
1456 .to_str()
1457 .map_err(|_| axum::headers::Error::invalid())?
1458 .parse()
1459 .map_err(|_| axum::headers::Error::invalid())?;
1460 Ok(Self(version))
1461 }
1462
1463 fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1464 values.extend([self.0.to_string().parse().unwrap()]);
1465 }
1466}
1467
1468pub fn routes(app_state: Arc<AppState>) -> Router<Body> {
1469 let server = Server::new(app_state.clone(), None);
1470 Router::new()
1471 .route("/rpc", get(handle_websocket_request))
1472 .layer(
1473 ServiceBuilder::new()
1474 .layer(Extension(app_state))
1475 .layer(middleware::from_fn(auth::validate_header))
1476 .layer(Extension(server)),
1477 )
1478}
1479
1480pub async fn handle_websocket_request(
1481 TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1482 ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1483 Extension(server): Extension<Arc<Server>>,
1484 Extension(user_id): Extension<UserId>,
1485 ws: WebSocketUpgrade,
1486) -> axum::response::Response {
1487 if protocol_version != rpc::PROTOCOL_VERSION {
1488 return (
1489 StatusCode::UPGRADE_REQUIRED,
1490 "client must be upgraded".to_string(),
1491 )
1492 .into_response();
1493 }
1494 let socket_address = socket_address.to_string();
1495 ws.on_upgrade(move |socket| {
1496 use util::ResultExt;
1497 let socket = socket
1498 .map_ok(to_tungstenite_message)
1499 .err_into()
1500 .with(|message| async move { Ok(to_axum_message(message)) });
1501 let connection = Connection::new(Box::pin(socket));
1502 async move {
1503 server
1504 .handle_connection(connection, socket_address, user_id, None, RealExecutor)
1505 .await
1506 .log_err();
1507 }
1508 })
1509}
1510
1511fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1512 match message {
1513 TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1514 TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1515 TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1516 TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1517 TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1518 code: frame.code.into(),
1519 reason: frame.reason,
1520 })),
1521 }
1522}
1523
1524fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1525 match message {
1526 AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1527 AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1528 AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1529 AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1530 AxumMessage::Close(frame) => {
1531 TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1532 code: frame.code.into(),
1533 reason: frame.reason,
1534 }))
1535 }
1536 }
1537}
1538
1539pub trait ResultExt {
1540 type Ok;
1541
1542 fn trace_err(self) -> Option<Self::Ok>;
1543}
1544
1545impl<T, E> ResultExt for Result<T, E>
1546where
1547 E: std::fmt::Debug,
1548{
1549 type Ok = T;
1550
1551 fn trace_err(self) -> Option<T> {
1552 match self {
1553 Ok(value) => Some(value),
1554 Err(error) => {
1555 tracing::error!("{:?}", error);
1556 None
1557 }
1558 }
1559 }
1560}
1561
1562#[cfg(test)]
1563mod tests {
1564 use super::*;
1565 use crate::{
1566 db::{tests::TestDb, UserId},
1567 AppState,
1568 };
1569 use ::rpc::Peer;
1570 use client::{
1571 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1572 EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1573 };
1574 use collections::{BTreeMap, HashSet};
1575 use editor::{
1576 self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1577 ToOffset, ToggleCodeActions, Undo,
1578 };
1579 use gpui::{
1580 executor::{self, Deterministic},
1581 geometry::vector::vec2f,
1582 ModelHandle, TestAppContext, ViewHandle,
1583 };
1584 use language::{
1585 range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1586 LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1587 };
1588 use lsp::{self, FakeLanguageServer};
1589 use parking_lot::Mutex;
1590 use project::{
1591 fs::{FakeFs, Fs as _},
1592 search::SearchQuery,
1593 worktree::WorktreeHandle,
1594 DiagnosticSummary, Project, ProjectPath, WorktreeId,
1595 };
1596 use rand::prelude::*;
1597 use rpc::PeerId;
1598 use serde_json::json;
1599 use settings::Settings;
1600 use sqlx::types::time::OffsetDateTime;
1601 use std::{
1602 env,
1603 ops::Deref,
1604 path::{Path, PathBuf},
1605 rc::Rc,
1606 sync::{
1607 atomic::{AtomicBool, Ordering::SeqCst},
1608 Arc,
1609 },
1610 time::Duration,
1611 };
1612 use theme::ThemeRegistry;
1613 use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
1614
1615 #[cfg(test)]
1616 #[ctor::ctor]
1617 fn init_logger() {
1618 if std::env::var("RUST_LOG").is_ok() {
1619 env_logger::init();
1620 }
1621 }
1622
1623 #[gpui::test(iterations = 10)]
1624 async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1625 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1626 let lang_registry = Arc::new(LanguageRegistry::test());
1627 let fs = FakeFs::new(cx_a.background());
1628 cx_a.foreground().forbid_parking();
1629
1630 // Connect to a server as 2 clients.
1631 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1632 let client_a = server.create_client(cx_a, "user_a").await;
1633 let mut client_b = server.create_client(cx_b, "user_b").await;
1634 server
1635 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1636 .await;
1637
1638 // Share a project as client A
1639 fs.insert_tree(
1640 "/a",
1641 json!({
1642 "a.txt": "a-contents",
1643 "b.txt": "b-contents",
1644 }),
1645 )
1646 .await;
1647 let project_a = cx_a.update(|cx| {
1648 Project::local(
1649 client_a.clone(),
1650 client_a.user_store.clone(),
1651 lang_registry.clone(),
1652 fs.clone(),
1653 cx,
1654 )
1655 });
1656 let (worktree_a, _) = project_a
1657 .update(cx_a, |p, cx| {
1658 p.find_or_create_local_worktree("/a", true, cx)
1659 })
1660 .await
1661 .unwrap();
1662 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1663 worktree_a
1664 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1665 .await;
1666
1667 // Join that project as client B
1668 let client_b_peer_id = client_b.peer_id;
1669 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1670
1671 let replica_id_b = project_b.read_with(cx_b, |project, _| {
1672 assert_eq!(
1673 project
1674 .collaborators()
1675 .get(&client_a.peer_id)
1676 .unwrap()
1677 .user
1678 .github_login,
1679 "user_a"
1680 );
1681 project.replica_id()
1682 });
1683 project_a
1684 .condition(&cx_a, |tree, _| {
1685 tree.collaborators()
1686 .get(&client_b_peer_id)
1687 .map_or(false, |collaborator| {
1688 collaborator.replica_id == replica_id_b
1689 && collaborator.user.github_login == "user_b"
1690 })
1691 })
1692 .await;
1693
1694 // Open the same file as client B and client A.
1695 let buffer_b = project_b
1696 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1697 .await
1698 .unwrap();
1699 buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1700 project_a.read_with(cx_a, |project, cx| {
1701 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1702 });
1703 let buffer_a = project_a
1704 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1705 .await
1706 .unwrap();
1707
1708 let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1709
1710 // TODO
1711 // // Create a selection set as client B and see that selection set as client A.
1712 // buffer_a
1713 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1714 // .await;
1715
1716 // Edit the buffer as client B and see that edit as client A.
1717 editor_b.update(cx_b, |editor, cx| {
1718 editor.handle_input(&Input("ok, ".into()), cx)
1719 });
1720 buffer_a
1721 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1722 .await;
1723
1724 // TODO
1725 // // Remove the selection set as client B, see those selections disappear as client A.
1726 cx_b.update(move |_| drop(editor_b));
1727 // buffer_a
1728 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1729 // .await;
1730
1731 // Dropping the client B's project removes client B from client A's collaborators.
1732 cx_b.update(move |_| {
1733 drop(client_b.project.take());
1734 drop(project_b);
1735 });
1736 project_a
1737 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1738 .await;
1739 }
1740
1741 #[gpui::test(iterations = 10)]
1742 async fn test_unshare_project(
1743 deterministic: Arc<Deterministic>,
1744 cx_a: &mut TestAppContext,
1745 cx_b: &mut TestAppContext,
1746 ) {
1747 let lang_registry = Arc::new(LanguageRegistry::test());
1748 let fs = FakeFs::new(cx_a.background());
1749 cx_a.foreground().forbid_parking();
1750
1751 // Connect to a server as 2 clients.
1752 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1753 let client_a = server.create_client(cx_a, "user_a").await;
1754 let mut client_b = server.create_client(cx_b, "user_b").await;
1755 server
1756 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1757 .await;
1758
1759 // Share a project as client A
1760 fs.insert_tree(
1761 "/a",
1762 json!({
1763 "a.txt": "a-contents",
1764 "b.txt": "b-contents",
1765 }),
1766 )
1767 .await;
1768 let project_a = cx_a.update(|cx| {
1769 Project::local(
1770 client_a.clone(),
1771 client_a.user_store.clone(),
1772 lang_registry.clone(),
1773 fs.clone(),
1774 cx,
1775 )
1776 });
1777 let (worktree_a, _) = project_a
1778 .update(cx_a, |p, cx| {
1779 p.find_or_create_local_worktree("/a", true, cx)
1780 })
1781 .await
1782 .unwrap();
1783 worktree_a
1784 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1785 .await;
1786 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1787
1788 // Join that project as client B
1789 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1790 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1791 project_b
1792 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1793 .await
1794 .unwrap();
1795
1796 // When client B leaves the project, it gets automatically unshared.
1797 cx_b.update(|_| {
1798 drop(client_b.project.take());
1799 drop(project_b);
1800 });
1801 deterministic.run_until_parked();
1802 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1803
1804 // When client B joins again, the project gets re-shared.
1805 let project_b2 = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1806 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1807 project_b2
1808 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1809 .await
1810 .unwrap();
1811 }
1812
1813 #[gpui::test(iterations = 10)]
1814 async fn test_host_disconnect(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1815 let lang_registry = Arc::new(LanguageRegistry::test());
1816 let fs = FakeFs::new(cx_a.background());
1817 cx_a.foreground().forbid_parking();
1818
1819 // Connect to a server as 2 clients.
1820 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1821 let client_a = server.create_client(cx_a, "user_a").await;
1822 let mut client_b = server.create_client(cx_b, "user_b").await;
1823 server
1824 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1825 .await;
1826
1827 // Share a project as client A
1828 fs.insert_tree(
1829 "/a",
1830 json!({
1831 "a.txt": "a-contents",
1832 "b.txt": "b-contents",
1833 }),
1834 )
1835 .await;
1836 let project_a = cx_a.update(|cx| {
1837 Project::local(
1838 client_a.clone(),
1839 client_a.user_store.clone(),
1840 lang_registry.clone(),
1841 fs.clone(),
1842 cx,
1843 )
1844 });
1845 let (worktree_a, _) = project_a
1846 .update(cx_a, |p, cx| {
1847 p.find_or_create_local_worktree("/a", true, cx)
1848 })
1849 .await
1850 .unwrap();
1851 worktree_a
1852 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1853 .await;
1854 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1855
1856 // Join that project as client B
1857 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1858 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1859 project_b
1860 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1861 .await
1862 .unwrap();
1863
1864 // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
1865 server.disconnect_client(client_a.current_user_id(cx_a));
1866 cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
1867 project_a
1868 .condition(cx_a, |project, _| project.collaborators().is_empty())
1869 .await;
1870 project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
1871 project_b
1872 .condition(cx_b, |project, _| project.is_read_only())
1873 .await;
1874 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1875 cx_b.update(|_| {
1876 drop(project_b);
1877 });
1878
1879 // Ensure guests can still join.
1880 let project_b2 = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1881 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1882 project_b2
1883 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1884 .await
1885 .unwrap();
1886 }
1887
1888 #[gpui::test(iterations = 10)]
1889 async fn test_propagate_saves_and_fs_changes(
1890 cx_a: &mut TestAppContext,
1891 cx_b: &mut TestAppContext,
1892 cx_c: &mut TestAppContext,
1893 ) {
1894 let lang_registry = Arc::new(LanguageRegistry::test());
1895 let fs = FakeFs::new(cx_a.background());
1896 cx_a.foreground().forbid_parking();
1897
1898 // Connect to a server as 3 clients.
1899 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1900 let client_a = server.create_client(cx_a, "user_a").await;
1901 let mut client_b = server.create_client(cx_b, "user_b").await;
1902 let mut client_c = server.create_client(cx_c, "user_c").await;
1903 server
1904 .make_contacts(vec![
1905 (&client_a, cx_a),
1906 (&client_b, cx_b),
1907 (&client_c, cx_c),
1908 ])
1909 .await;
1910
1911 // Share a worktree as client A.
1912 fs.insert_tree(
1913 "/a",
1914 json!({
1915 "file1": "",
1916 "file2": ""
1917 }),
1918 )
1919 .await;
1920 let project_a = cx_a.update(|cx| {
1921 Project::local(
1922 client_a.clone(),
1923 client_a.user_store.clone(),
1924 lang_registry.clone(),
1925 fs.clone(),
1926 cx,
1927 )
1928 });
1929 let (worktree_a, _) = project_a
1930 .update(cx_a, |p, cx| {
1931 p.find_or_create_local_worktree("/a", true, cx)
1932 })
1933 .await
1934 .unwrap();
1935 worktree_a
1936 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1937 .await;
1938 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1939
1940 // Join that worktree as clients B and C.
1941 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1942 let project_c = client_c.build_remote_project(&project_a, cx_a, cx_c).await;
1943 let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1944 let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1945
1946 // Open and edit a buffer as both guests B and C.
1947 let buffer_b = project_b
1948 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1949 .await
1950 .unwrap();
1951 let buffer_c = project_c
1952 .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1953 .await
1954 .unwrap();
1955 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "i-am-b, ")], cx));
1956 buffer_c.update(cx_c, |buf, cx| buf.edit([(0..0, "i-am-c, ")], cx));
1957
1958 // Open and edit that buffer as the host.
1959 let buffer_a = project_a
1960 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1961 .await
1962 .unwrap();
1963
1964 buffer_a
1965 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1966 .await;
1967 buffer_a.update(cx_a, |buf, cx| {
1968 buf.edit([(buf.len()..buf.len(), "i-am-a")], cx)
1969 });
1970
1971 // Wait for edits to propagate
1972 buffer_a
1973 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1974 .await;
1975 buffer_b
1976 .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1977 .await;
1978 buffer_c
1979 .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1980 .await;
1981
1982 // Edit the buffer as the host and concurrently save as guest B.
1983 let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1984 buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "hi-a, ")], cx));
1985 save_b.await.unwrap();
1986 assert_eq!(
1987 fs.load("/a/file1".as_ref()).await.unwrap(),
1988 "hi-a, i-am-c, i-am-b, i-am-a"
1989 );
1990 buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1991 buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1992 buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1993
1994 worktree_a.flush_fs_events(cx_a).await;
1995
1996 // Make changes on host's file system, see those changes on guest worktrees.
1997 fs.rename(
1998 "/a/file1".as_ref(),
1999 "/a/file1-renamed".as_ref(),
2000 Default::default(),
2001 )
2002 .await
2003 .unwrap();
2004
2005 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
2006 .await
2007 .unwrap();
2008 fs.insert_file(Path::new("/a/file4"), "4".into()).await;
2009
2010 worktree_a
2011 .condition(&cx_a, |tree, _| {
2012 tree.paths()
2013 .map(|p| p.to_string_lossy())
2014 .collect::<Vec<_>>()
2015 == ["file1-renamed", "file3", "file4"]
2016 })
2017 .await;
2018 worktree_b
2019 .condition(&cx_b, |tree, _| {
2020 tree.paths()
2021 .map(|p| p.to_string_lossy())
2022 .collect::<Vec<_>>()
2023 == ["file1-renamed", "file3", "file4"]
2024 })
2025 .await;
2026 worktree_c
2027 .condition(&cx_c, |tree, _| {
2028 tree.paths()
2029 .map(|p| p.to_string_lossy())
2030 .collect::<Vec<_>>()
2031 == ["file1-renamed", "file3", "file4"]
2032 })
2033 .await;
2034
2035 // Ensure buffer files are updated as well.
2036 buffer_a
2037 .condition(&cx_a, |buf, _| {
2038 buf.file().unwrap().path().to_str() == Some("file1-renamed")
2039 })
2040 .await;
2041 buffer_b
2042 .condition(&cx_b, |buf, _| {
2043 buf.file().unwrap().path().to_str() == Some("file1-renamed")
2044 })
2045 .await;
2046 buffer_c
2047 .condition(&cx_c, |buf, _| {
2048 buf.file().unwrap().path().to_str() == Some("file1-renamed")
2049 })
2050 .await;
2051 }
2052
2053 #[gpui::test(iterations = 10)]
2054 async fn test_fs_operations(
2055 executor: Arc<Deterministic>,
2056 cx_a: &mut TestAppContext,
2057 cx_b: &mut TestAppContext,
2058 ) {
2059 executor.forbid_parking();
2060 let fs = FakeFs::new(cx_a.background());
2061
2062 // Connect to a server as 2 clients.
2063 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2064 let mut client_a = server.create_client(cx_a, "user_a").await;
2065 let mut client_b = server.create_client(cx_b, "user_b").await;
2066 server
2067 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2068 .await;
2069
2070 // Share a project as client A
2071 fs.insert_tree(
2072 "/dir",
2073 json!({
2074 "a.txt": "a-contents",
2075 "b.txt": "b-contents",
2076 }),
2077 )
2078 .await;
2079
2080 let (project_a, worktree_id) = client_a.build_local_project(fs, "/dir", cx_a).await;
2081 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2082
2083 let worktree_a =
2084 project_a.read_with(cx_a, |project, cx| project.worktrees(cx).next().unwrap());
2085 let worktree_b =
2086 project_b.read_with(cx_b, |project, cx| project.worktrees(cx).next().unwrap());
2087
2088 let entry = project_b
2089 .update(cx_b, |project, cx| {
2090 project
2091 .create_entry((worktree_id, "c.txt"), false, cx)
2092 .unwrap()
2093 })
2094 .await
2095 .unwrap();
2096 worktree_a.read_with(cx_a, |worktree, _| {
2097 assert_eq!(
2098 worktree
2099 .paths()
2100 .map(|p| p.to_string_lossy())
2101 .collect::<Vec<_>>(),
2102 ["a.txt", "b.txt", "c.txt"]
2103 );
2104 });
2105 worktree_b.read_with(cx_b, |worktree, _| {
2106 assert_eq!(
2107 worktree
2108 .paths()
2109 .map(|p| p.to_string_lossy())
2110 .collect::<Vec<_>>(),
2111 ["a.txt", "b.txt", "c.txt"]
2112 );
2113 });
2114
2115 project_b
2116 .update(cx_b, |project, cx| {
2117 project.rename_entry(entry.id, Path::new("d.txt"), cx)
2118 })
2119 .unwrap()
2120 .await
2121 .unwrap();
2122 worktree_a.read_with(cx_a, |worktree, _| {
2123 assert_eq!(
2124 worktree
2125 .paths()
2126 .map(|p| p.to_string_lossy())
2127 .collect::<Vec<_>>(),
2128 ["a.txt", "b.txt", "d.txt"]
2129 );
2130 });
2131 worktree_b.read_with(cx_b, |worktree, _| {
2132 assert_eq!(
2133 worktree
2134 .paths()
2135 .map(|p| p.to_string_lossy())
2136 .collect::<Vec<_>>(),
2137 ["a.txt", "b.txt", "d.txt"]
2138 );
2139 });
2140
2141 let dir_entry = project_b
2142 .update(cx_b, |project, cx| {
2143 project
2144 .create_entry((worktree_id, "DIR"), true, cx)
2145 .unwrap()
2146 })
2147 .await
2148 .unwrap();
2149 worktree_a.read_with(cx_a, |worktree, _| {
2150 assert_eq!(
2151 worktree
2152 .paths()
2153 .map(|p| p.to_string_lossy())
2154 .collect::<Vec<_>>(),
2155 ["DIR", "a.txt", "b.txt", "d.txt"]
2156 );
2157 });
2158 worktree_b.read_with(cx_b, |worktree, _| {
2159 assert_eq!(
2160 worktree
2161 .paths()
2162 .map(|p| p.to_string_lossy())
2163 .collect::<Vec<_>>(),
2164 ["DIR", "a.txt", "b.txt", "d.txt"]
2165 );
2166 });
2167
2168 project_b
2169 .update(cx_b, |project, cx| {
2170 project.delete_entry(dir_entry.id, cx).unwrap()
2171 })
2172 .await
2173 .unwrap();
2174 worktree_a.read_with(cx_a, |worktree, _| {
2175 assert_eq!(
2176 worktree
2177 .paths()
2178 .map(|p| p.to_string_lossy())
2179 .collect::<Vec<_>>(),
2180 ["a.txt", "b.txt", "d.txt"]
2181 );
2182 });
2183 worktree_b.read_with(cx_b, |worktree, _| {
2184 assert_eq!(
2185 worktree
2186 .paths()
2187 .map(|p| p.to_string_lossy())
2188 .collect::<Vec<_>>(),
2189 ["a.txt", "b.txt", "d.txt"]
2190 );
2191 });
2192
2193 project_b
2194 .update(cx_b, |project, cx| {
2195 project.delete_entry(entry.id, cx).unwrap()
2196 })
2197 .await
2198 .unwrap();
2199 worktree_a.read_with(cx_a, |worktree, _| {
2200 assert_eq!(
2201 worktree
2202 .paths()
2203 .map(|p| p.to_string_lossy())
2204 .collect::<Vec<_>>(),
2205 ["a.txt", "b.txt"]
2206 );
2207 });
2208 worktree_b.read_with(cx_b, |worktree, _| {
2209 assert_eq!(
2210 worktree
2211 .paths()
2212 .map(|p| p.to_string_lossy())
2213 .collect::<Vec<_>>(),
2214 ["a.txt", "b.txt"]
2215 );
2216 });
2217 }
2218
2219 #[gpui::test(iterations = 10)]
2220 async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2221 cx_a.foreground().forbid_parking();
2222 let lang_registry = Arc::new(LanguageRegistry::test());
2223 let fs = FakeFs::new(cx_a.background());
2224
2225 // Connect to a server as 2 clients.
2226 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2227 let client_a = server.create_client(cx_a, "user_a").await;
2228 let mut client_b = server.create_client(cx_b, "user_b").await;
2229 server
2230 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2231 .await;
2232
2233 // Share a project as client A
2234 fs.insert_tree(
2235 "/dir",
2236 json!({
2237 "a.txt": "a-contents",
2238 }),
2239 )
2240 .await;
2241
2242 let project_a = cx_a.update(|cx| {
2243 Project::local(
2244 client_a.clone(),
2245 client_a.user_store.clone(),
2246 lang_registry.clone(),
2247 fs.clone(),
2248 cx,
2249 )
2250 });
2251 let (worktree_a, _) = project_a
2252 .update(cx_a, |p, cx| {
2253 p.find_or_create_local_worktree("/dir", true, cx)
2254 })
2255 .await
2256 .unwrap();
2257 worktree_a
2258 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2259 .await;
2260 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2261
2262 // Join that project as client B
2263 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2264
2265 // Open a buffer as client B
2266 let buffer_b = project_b
2267 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2268 .await
2269 .unwrap();
2270
2271 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "world ")], cx));
2272 buffer_b.read_with(cx_b, |buf, _| {
2273 assert!(buf.is_dirty());
2274 assert!(!buf.has_conflict());
2275 });
2276
2277 buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
2278 buffer_b
2279 .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
2280 .await;
2281 buffer_b.read_with(cx_b, |buf, _| {
2282 assert!(!buf.has_conflict());
2283 });
2284
2285 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "hello ")], cx));
2286 buffer_b.read_with(cx_b, |buf, _| {
2287 assert!(buf.is_dirty());
2288 assert!(!buf.has_conflict());
2289 });
2290 }
2291
2292 #[gpui::test(iterations = 10)]
2293 async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2294 cx_a.foreground().forbid_parking();
2295 let lang_registry = Arc::new(LanguageRegistry::test());
2296 let fs = FakeFs::new(cx_a.background());
2297
2298 // Connect to a server as 2 clients.
2299 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2300 let client_a = server.create_client(cx_a, "user_a").await;
2301 let mut client_b = server.create_client(cx_b, "user_b").await;
2302 server
2303 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2304 .await;
2305
2306 // Share a project as client A
2307 fs.insert_tree(
2308 "/dir",
2309 json!({
2310 "a.txt": "a-contents",
2311 }),
2312 )
2313 .await;
2314
2315 let project_a = cx_a.update(|cx| {
2316 Project::local(
2317 client_a.clone(),
2318 client_a.user_store.clone(),
2319 lang_registry.clone(),
2320 fs.clone(),
2321 cx,
2322 )
2323 });
2324 let (worktree_a, _) = project_a
2325 .update(cx_a, |p, cx| {
2326 p.find_or_create_local_worktree("/dir", true, cx)
2327 })
2328 .await
2329 .unwrap();
2330 worktree_a
2331 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2332 .await;
2333 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2334
2335 // Join that project as client B
2336 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2337 let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2338
2339 // Open a buffer as client B
2340 let buffer_b = project_b
2341 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2342 .await
2343 .unwrap();
2344 buffer_b.read_with(cx_b, |buf, _| {
2345 assert!(!buf.is_dirty());
2346 assert!(!buf.has_conflict());
2347 });
2348
2349 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
2350 .await
2351 .unwrap();
2352 buffer_b
2353 .condition(&cx_b, |buf, _| {
2354 buf.text() == "new contents" && !buf.is_dirty()
2355 })
2356 .await;
2357 buffer_b.read_with(cx_b, |buf, _| {
2358 assert!(!buf.has_conflict());
2359 });
2360 }
2361
2362 #[gpui::test(iterations = 10)]
2363 async fn test_editing_while_guest_opens_buffer(
2364 cx_a: &mut TestAppContext,
2365 cx_b: &mut TestAppContext,
2366 ) {
2367 cx_a.foreground().forbid_parking();
2368 let lang_registry = Arc::new(LanguageRegistry::test());
2369 let fs = FakeFs::new(cx_a.background());
2370
2371 // Connect to a server as 2 clients.
2372 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2373 let client_a = server.create_client(cx_a, "user_a").await;
2374 let mut client_b = server.create_client(cx_b, "user_b").await;
2375 server
2376 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2377 .await;
2378
2379 // Share a project as client A
2380 fs.insert_tree(
2381 "/dir",
2382 json!({
2383 "a.txt": "a-contents",
2384 }),
2385 )
2386 .await;
2387 let project_a = cx_a.update(|cx| {
2388 Project::local(
2389 client_a.clone(),
2390 client_a.user_store.clone(),
2391 lang_registry.clone(),
2392 fs.clone(),
2393 cx,
2394 )
2395 });
2396 let (worktree_a, _) = project_a
2397 .update(cx_a, |p, cx| {
2398 p.find_or_create_local_worktree("/dir", true, cx)
2399 })
2400 .await
2401 .unwrap();
2402 worktree_a
2403 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2404 .await;
2405 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2406
2407 // Join that project as client B
2408 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2409
2410 // Open a buffer as client A
2411 let buffer_a = project_a
2412 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2413 .await
2414 .unwrap();
2415
2416 // Start opening the same buffer as client B
2417 let buffer_b = cx_b
2418 .background()
2419 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2420
2421 // Edit the buffer as client A while client B is still opening it.
2422 cx_b.background().simulate_random_delay().await;
2423 buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "X")], cx));
2424 cx_b.background().simulate_random_delay().await;
2425 buffer_a.update(cx_a, |buf, cx| buf.edit([(1..1, "Y")], cx));
2426
2427 let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2428 let buffer_b = buffer_b.await.unwrap();
2429 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2430 }
2431
2432 #[gpui::test(iterations = 10)]
2433 async fn test_leaving_worktree_while_opening_buffer(
2434 cx_a: &mut TestAppContext,
2435 cx_b: &mut TestAppContext,
2436 ) {
2437 cx_a.foreground().forbid_parking();
2438 let lang_registry = Arc::new(LanguageRegistry::test());
2439 let fs = FakeFs::new(cx_a.background());
2440
2441 // Connect to a server as 2 clients.
2442 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2443 let client_a = server.create_client(cx_a, "user_a").await;
2444 let mut client_b = server.create_client(cx_b, "user_b").await;
2445 server
2446 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2447 .await;
2448
2449 // Share a project as client A
2450 fs.insert_tree(
2451 "/dir",
2452 json!({
2453 "a.txt": "a-contents",
2454 }),
2455 )
2456 .await;
2457 let project_a = cx_a.update(|cx| {
2458 Project::local(
2459 client_a.clone(),
2460 client_a.user_store.clone(),
2461 lang_registry.clone(),
2462 fs.clone(),
2463 cx,
2464 )
2465 });
2466 let (worktree_a, _) = project_a
2467 .update(cx_a, |p, cx| {
2468 p.find_or_create_local_worktree("/dir", true, cx)
2469 })
2470 .await
2471 .unwrap();
2472 worktree_a
2473 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2474 .await;
2475 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2476
2477 // Join that project as client B
2478 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2479
2480 // See that a guest has joined as client A.
2481 project_a
2482 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2483 .await;
2484
2485 // Begin opening a buffer as client B, but leave the project before the open completes.
2486 let buffer_b = cx_b
2487 .background()
2488 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2489 cx_b.update(|_| {
2490 drop(client_b.project.take());
2491 drop(project_b);
2492 });
2493 drop(buffer_b);
2494
2495 // See that the guest has left.
2496 project_a
2497 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2498 .await;
2499 }
2500
2501 #[gpui::test(iterations = 10)]
2502 async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2503 cx_a.foreground().forbid_parking();
2504 let lang_registry = Arc::new(LanguageRegistry::test());
2505 let fs = FakeFs::new(cx_a.background());
2506
2507 // Connect to a server as 2 clients.
2508 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2509 let client_a = server.create_client(cx_a, "user_a").await;
2510 let mut client_b = server.create_client(cx_b, "user_b").await;
2511 server
2512 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2513 .await;
2514
2515 // Share a project as client A
2516 fs.insert_tree(
2517 "/a",
2518 json!({
2519 "a.txt": "a-contents",
2520 "b.txt": "b-contents",
2521 }),
2522 )
2523 .await;
2524 let project_a = cx_a.update(|cx| {
2525 Project::local(
2526 client_a.clone(),
2527 client_a.user_store.clone(),
2528 lang_registry.clone(),
2529 fs.clone(),
2530 cx,
2531 )
2532 });
2533 let (worktree_a, _) = project_a
2534 .update(cx_a, |p, cx| {
2535 p.find_or_create_local_worktree("/a", true, cx)
2536 })
2537 .await
2538 .unwrap();
2539 worktree_a
2540 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2541 .await;
2542
2543 // Join that project as client B
2544 let _project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2545
2546 // Client A sees that a guest has joined.
2547 project_a
2548 .condition(cx_a, |p, _| p.collaborators().len() == 1)
2549 .await;
2550
2551 // Drop client B's connection and ensure client A observes client B leaving the project.
2552 client_b.disconnect(&cx_b.to_async()).unwrap();
2553 project_a
2554 .condition(cx_a, |p, _| p.collaborators().len() == 0)
2555 .await;
2556
2557 // Rejoin the project as client B
2558 let _project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2559
2560 // Client A sees that a guest has re-joined.
2561 project_a
2562 .condition(cx_a, |p, _| p.collaborators().len() == 1)
2563 .await;
2564
2565 // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2566 client_b.wait_for_current_user(cx_b).await;
2567 server.disconnect_client(client_b.current_user_id(cx_b));
2568 cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
2569 project_a
2570 .condition(cx_a, |p, _| p.collaborators().len() == 0)
2571 .await;
2572 }
2573
2574 #[gpui::test(iterations = 10)]
2575 async fn test_collaborating_with_diagnostics(
2576 deterministic: Arc<Deterministic>,
2577 cx_a: &mut TestAppContext,
2578 cx_b: &mut TestAppContext,
2579 cx_c: &mut TestAppContext,
2580 ) {
2581 deterministic.forbid_parking();
2582 let lang_registry = Arc::new(LanguageRegistry::test());
2583 let fs = FakeFs::new(cx_a.background());
2584
2585 // Set up a fake language server.
2586 let mut language = Language::new(
2587 LanguageConfig {
2588 name: "Rust".into(),
2589 path_suffixes: vec!["rs".to_string()],
2590 ..Default::default()
2591 },
2592 Some(tree_sitter_rust::language()),
2593 );
2594 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2595 lang_registry.add(Arc::new(language));
2596
2597 // Connect to a server as 2 clients.
2598 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2599 let client_a = server.create_client(cx_a, "user_a").await;
2600 let mut client_b = server.create_client(cx_b, "user_b").await;
2601 let mut client_c = server.create_client(cx_c, "user_c").await;
2602 server
2603 .make_contacts(vec![
2604 (&client_a, cx_a),
2605 (&client_b, cx_b),
2606 (&client_c, cx_c),
2607 ])
2608 .await;
2609
2610 // Share a project as client A
2611 fs.insert_tree(
2612 "/a",
2613 json!({
2614 "a.rs": "let one = two",
2615 "other.rs": "",
2616 }),
2617 )
2618 .await;
2619 let project_a = cx_a.update(|cx| {
2620 Project::local(
2621 client_a.clone(),
2622 client_a.user_store.clone(),
2623 lang_registry.clone(),
2624 fs.clone(),
2625 cx,
2626 )
2627 });
2628 let (worktree_a, _) = project_a
2629 .update(cx_a, |p, cx| {
2630 p.find_or_create_local_worktree("/a", true, cx)
2631 })
2632 .await
2633 .unwrap();
2634 worktree_a
2635 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2636 .await;
2637 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2638 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2639
2640 // Cause the language server to start.
2641 let _buffer = cx_a
2642 .background()
2643 .spawn(project_a.update(cx_a, |project, cx| {
2644 project.open_buffer(
2645 ProjectPath {
2646 worktree_id,
2647 path: Path::new("other.rs").into(),
2648 },
2649 cx,
2650 )
2651 }))
2652 .await
2653 .unwrap();
2654
2655 // Join the worktree as client B.
2656 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2657
2658 // Simulate a language server reporting errors for a file.
2659 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2660 fake_language_server
2661 .receive_notification::<lsp::notification::DidOpenTextDocument>()
2662 .await;
2663 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2664 lsp::PublishDiagnosticsParams {
2665 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2666 version: None,
2667 diagnostics: vec![lsp::Diagnostic {
2668 severity: Some(lsp::DiagnosticSeverity::ERROR),
2669 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2670 message: "message 1".to_string(),
2671 ..Default::default()
2672 }],
2673 },
2674 );
2675
2676 // Wait for server to see the diagnostics update.
2677 deterministic.run_until_parked();
2678 {
2679 let store = server.store.read().await;
2680 let project = store.project(project_id).unwrap();
2681 let worktree = project.worktrees.get(&worktree_id.to_proto()).unwrap();
2682 assert!(!worktree.diagnostic_summaries.is_empty());
2683 }
2684
2685 // Ensure client B observes the new diagnostics.
2686 project_b.read_with(cx_b, |project, cx| {
2687 assert_eq!(
2688 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2689 &[(
2690 ProjectPath {
2691 worktree_id,
2692 path: Arc::from(Path::new("a.rs")),
2693 },
2694 DiagnosticSummary {
2695 error_count: 1,
2696 warning_count: 0,
2697 ..Default::default()
2698 },
2699 )]
2700 )
2701 });
2702
2703 // Join project as client C and observe the diagnostics.
2704 let project_c = client_c.build_remote_project(&project_a, cx_a, cx_c).await;
2705 project_c.read_with(cx_c, |project, cx| {
2706 assert_eq!(
2707 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2708 &[(
2709 ProjectPath {
2710 worktree_id,
2711 path: Arc::from(Path::new("a.rs")),
2712 },
2713 DiagnosticSummary {
2714 error_count: 1,
2715 warning_count: 0,
2716 ..Default::default()
2717 },
2718 )]
2719 )
2720 });
2721
2722 // Simulate a language server reporting more errors for a file.
2723 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2724 lsp::PublishDiagnosticsParams {
2725 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2726 version: None,
2727 diagnostics: vec![
2728 lsp::Diagnostic {
2729 severity: Some(lsp::DiagnosticSeverity::ERROR),
2730 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2731 message: "message 1".to_string(),
2732 ..Default::default()
2733 },
2734 lsp::Diagnostic {
2735 severity: Some(lsp::DiagnosticSeverity::WARNING),
2736 range: lsp::Range::new(
2737 lsp::Position::new(0, 10),
2738 lsp::Position::new(0, 13),
2739 ),
2740 message: "message 2".to_string(),
2741 ..Default::default()
2742 },
2743 ],
2744 },
2745 );
2746
2747 // Clients B and C get the updated summaries
2748 deterministic.run_until_parked();
2749 project_b.read_with(cx_b, |project, cx| {
2750 assert_eq!(
2751 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2752 [(
2753 ProjectPath {
2754 worktree_id,
2755 path: Arc::from(Path::new("a.rs")),
2756 },
2757 DiagnosticSummary {
2758 error_count: 1,
2759 warning_count: 1,
2760 ..Default::default()
2761 },
2762 )]
2763 );
2764 });
2765 project_c.read_with(cx_c, |project, cx| {
2766 assert_eq!(
2767 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2768 [(
2769 ProjectPath {
2770 worktree_id,
2771 path: Arc::from(Path::new("a.rs")),
2772 },
2773 DiagnosticSummary {
2774 error_count: 1,
2775 warning_count: 1,
2776 ..Default::default()
2777 },
2778 )]
2779 );
2780 });
2781
2782 // Open the file with the errors on client B. They should be present.
2783 let buffer_b = cx_b
2784 .background()
2785 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2786 .await
2787 .unwrap();
2788
2789 buffer_b.read_with(cx_b, |buffer, _| {
2790 assert_eq!(
2791 buffer
2792 .snapshot()
2793 .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2794 .map(|entry| entry)
2795 .collect::<Vec<_>>(),
2796 &[
2797 DiagnosticEntry {
2798 range: Point::new(0, 4)..Point::new(0, 7),
2799 diagnostic: Diagnostic {
2800 group_id: 0,
2801 message: "message 1".to_string(),
2802 severity: lsp::DiagnosticSeverity::ERROR,
2803 is_primary: true,
2804 ..Default::default()
2805 }
2806 },
2807 DiagnosticEntry {
2808 range: Point::new(0, 10)..Point::new(0, 13),
2809 diagnostic: Diagnostic {
2810 group_id: 1,
2811 severity: lsp::DiagnosticSeverity::WARNING,
2812 message: "message 2".to_string(),
2813 is_primary: true,
2814 ..Default::default()
2815 }
2816 }
2817 ]
2818 );
2819 });
2820
2821 // Simulate a language server reporting no errors for a file.
2822 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2823 lsp::PublishDiagnosticsParams {
2824 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2825 version: None,
2826 diagnostics: vec![],
2827 },
2828 );
2829 deterministic.run_until_parked();
2830 project_a.read_with(cx_a, |project, cx| {
2831 assert_eq!(project.diagnostic_summaries(cx).collect::<Vec<_>>(), [])
2832 });
2833 project_b.read_with(cx_b, |project, cx| {
2834 assert_eq!(project.diagnostic_summaries(cx).collect::<Vec<_>>(), [])
2835 });
2836 project_c.read_with(cx_c, |project, cx| {
2837 assert_eq!(project.diagnostic_summaries(cx).collect::<Vec<_>>(), [])
2838 });
2839 }
2840
2841 #[gpui::test(iterations = 10)]
2842 async fn test_collaborating_with_completion(
2843 cx_a: &mut TestAppContext,
2844 cx_b: &mut TestAppContext,
2845 ) {
2846 cx_a.foreground().forbid_parking();
2847 let lang_registry = Arc::new(LanguageRegistry::test());
2848 let fs = FakeFs::new(cx_a.background());
2849
2850 // Set up a fake language server.
2851 let mut language = Language::new(
2852 LanguageConfig {
2853 name: "Rust".into(),
2854 path_suffixes: vec!["rs".to_string()],
2855 ..Default::default()
2856 },
2857 Some(tree_sitter_rust::language()),
2858 );
2859 let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
2860 capabilities: lsp::ServerCapabilities {
2861 completion_provider: Some(lsp::CompletionOptions {
2862 trigger_characters: Some(vec![".".to_string()]),
2863 ..Default::default()
2864 }),
2865 ..Default::default()
2866 },
2867 ..Default::default()
2868 });
2869 lang_registry.add(Arc::new(language));
2870
2871 // Connect to a server as 2 clients.
2872 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2873 let client_a = server.create_client(cx_a, "user_a").await;
2874 let mut client_b = server.create_client(cx_b, "user_b").await;
2875 server
2876 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2877 .await;
2878
2879 // Share a project as client A
2880 fs.insert_tree(
2881 "/a",
2882 json!({
2883 "main.rs": "fn main() { a }",
2884 "other.rs": "",
2885 }),
2886 )
2887 .await;
2888 let project_a = cx_a.update(|cx| {
2889 Project::local(
2890 client_a.clone(),
2891 client_a.user_store.clone(),
2892 lang_registry.clone(),
2893 fs.clone(),
2894 cx,
2895 )
2896 });
2897 let (worktree_a, _) = project_a
2898 .update(cx_a, |p, cx| {
2899 p.find_or_create_local_worktree("/a", true, cx)
2900 })
2901 .await
2902 .unwrap();
2903 worktree_a
2904 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2905 .await;
2906 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2907
2908 // Join the worktree as client B.
2909 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2910
2911 // Open a file in an editor as the guest.
2912 let buffer_b = project_b
2913 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2914 .await
2915 .unwrap();
2916 let (window_b, _) = cx_b.add_window(|_| EmptyView);
2917 let editor_b = cx_b.add_view(window_b, |cx| {
2918 Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
2919 });
2920
2921 let fake_language_server = fake_language_servers.next().await.unwrap();
2922 buffer_b
2923 .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2924 .await;
2925
2926 // Type a completion trigger character as the guest.
2927 editor_b.update(cx_b, |editor, cx| {
2928 editor.select_ranges([13..13], None, cx);
2929 editor.handle_input(&Input(".".into()), cx);
2930 cx.focus(&editor_b);
2931 });
2932
2933 // Receive a completion request as the host's language server.
2934 // Return some completions from the host's language server.
2935 cx_a.foreground().start_waiting();
2936 fake_language_server
2937 .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
2938 assert_eq!(
2939 params.text_document_position.text_document.uri,
2940 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2941 );
2942 assert_eq!(
2943 params.text_document_position.position,
2944 lsp::Position::new(0, 14),
2945 );
2946
2947 Ok(Some(lsp::CompletionResponse::Array(vec![
2948 lsp::CompletionItem {
2949 label: "first_method(…)".into(),
2950 detail: Some("fn(&mut self, B) -> C".into()),
2951 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2952 new_text: "first_method($1)".to_string(),
2953 range: lsp::Range::new(
2954 lsp::Position::new(0, 14),
2955 lsp::Position::new(0, 14),
2956 ),
2957 })),
2958 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2959 ..Default::default()
2960 },
2961 lsp::CompletionItem {
2962 label: "second_method(…)".into(),
2963 detail: Some("fn(&mut self, C) -> D<E>".into()),
2964 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2965 new_text: "second_method()".to_string(),
2966 range: lsp::Range::new(
2967 lsp::Position::new(0, 14),
2968 lsp::Position::new(0, 14),
2969 ),
2970 })),
2971 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2972 ..Default::default()
2973 },
2974 ])))
2975 })
2976 .next()
2977 .await
2978 .unwrap();
2979 cx_a.foreground().finish_waiting();
2980
2981 // Open the buffer on the host.
2982 let buffer_a = project_a
2983 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2984 .await
2985 .unwrap();
2986 buffer_a
2987 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2988 .await;
2989
2990 // Confirm a completion on the guest.
2991 editor_b
2992 .condition(&cx_b, |editor, _| editor.context_menu_visible())
2993 .await;
2994 editor_b.update(cx_b, |editor, cx| {
2995 editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
2996 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2997 });
2998
2999 // Return a resolved completion from the host's language server.
3000 // The resolved completion has an additional text edit.
3001 fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
3002 |params, _| async move {
3003 assert_eq!(params.label, "first_method(…)");
3004 Ok(lsp::CompletionItem {
3005 label: "first_method(…)".into(),
3006 detail: Some("fn(&mut self, B) -> C".into()),
3007 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3008 new_text: "first_method($1)".to_string(),
3009 range: lsp::Range::new(
3010 lsp::Position::new(0, 14),
3011 lsp::Position::new(0, 14),
3012 ),
3013 })),
3014 additional_text_edits: Some(vec![lsp::TextEdit {
3015 new_text: "use d::SomeTrait;\n".to_string(),
3016 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
3017 }]),
3018 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3019 ..Default::default()
3020 })
3021 },
3022 );
3023
3024 // The additional edit is applied.
3025 buffer_a
3026 .condition(&cx_a, |buffer, _| {
3027 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3028 })
3029 .await;
3030 buffer_b
3031 .condition(&cx_b, |buffer, _| {
3032 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3033 })
3034 .await;
3035 }
3036
3037 #[gpui::test(iterations = 10)]
3038 async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3039 cx_a.foreground().forbid_parking();
3040 let lang_registry = Arc::new(LanguageRegistry::test());
3041 let fs = FakeFs::new(cx_a.background());
3042
3043 // Connect to a server as 2 clients.
3044 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3045 let client_a = server.create_client(cx_a, "user_a").await;
3046 let mut client_b = server.create_client(cx_b, "user_b").await;
3047 server
3048 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3049 .await;
3050
3051 // Share a project as client A
3052 fs.insert_tree(
3053 "/a",
3054 json!({
3055 "a.rs": "let one = 1;",
3056 }),
3057 )
3058 .await;
3059 let project_a = cx_a.update(|cx| {
3060 Project::local(
3061 client_a.clone(),
3062 client_a.user_store.clone(),
3063 lang_registry.clone(),
3064 fs.clone(),
3065 cx,
3066 )
3067 });
3068 let (worktree_a, _) = project_a
3069 .update(cx_a, |p, cx| {
3070 p.find_or_create_local_worktree("/a", true, cx)
3071 })
3072 .await
3073 .unwrap();
3074 worktree_a
3075 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3076 .await;
3077 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3078 let buffer_a = project_a
3079 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
3080 .await
3081 .unwrap();
3082
3083 // Join the worktree as client B.
3084 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3085
3086 let buffer_b = cx_b
3087 .background()
3088 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3089 .await
3090 .unwrap();
3091 buffer_b.update(cx_b, |buffer, cx| {
3092 buffer.edit([(4..7, "six")], cx);
3093 buffer.edit([(10..11, "6")], cx);
3094 assert_eq!(buffer.text(), "let six = 6;");
3095 assert!(buffer.is_dirty());
3096 assert!(!buffer.has_conflict());
3097 });
3098 buffer_a
3099 .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
3100 .await;
3101
3102 fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
3103 .await
3104 .unwrap();
3105 buffer_a
3106 .condition(cx_a, |buffer, _| buffer.has_conflict())
3107 .await;
3108 buffer_b
3109 .condition(cx_b, |buffer, _| buffer.has_conflict())
3110 .await;
3111
3112 project_b
3113 .update(cx_b, |project, cx| {
3114 project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
3115 })
3116 .await
3117 .unwrap();
3118 buffer_a.read_with(cx_a, |buffer, _| {
3119 assert_eq!(buffer.text(), "let seven = 7;");
3120 assert!(!buffer.is_dirty());
3121 assert!(!buffer.has_conflict());
3122 });
3123 buffer_b.read_with(cx_b, |buffer, _| {
3124 assert_eq!(buffer.text(), "let seven = 7;");
3125 assert!(!buffer.is_dirty());
3126 assert!(!buffer.has_conflict());
3127 });
3128
3129 buffer_a.update(cx_a, |buffer, cx| {
3130 // Undoing on the host is a no-op when the reload was initiated by the guest.
3131 buffer.undo(cx);
3132 assert_eq!(buffer.text(), "let seven = 7;");
3133 assert!(!buffer.is_dirty());
3134 assert!(!buffer.has_conflict());
3135 });
3136 buffer_b.update(cx_b, |buffer, cx| {
3137 // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
3138 buffer.undo(cx);
3139 assert_eq!(buffer.text(), "let six = 6;");
3140 assert!(buffer.is_dirty());
3141 assert!(!buffer.has_conflict());
3142 });
3143 }
3144
3145 #[gpui::test(iterations = 10)]
3146 async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3147 cx_a.foreground().forbid_parking();
3148 let lang_registry = Arc::new(LanguageRegistry::test());
3149 let fs = FakeFs::new(cx_a.background());
3150
3151 // Set up a fake language server.
3152 let mut language = Language::new(
3153 LanguageConfig {
3154 name: "Rust".into(),
3155 path_suffixes: vec!["rs".to_string()],
3156 ..Default::default()
3157 },
3158 Some(tree_sitter_rust::language()),
3159 );
3160 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3161 lang_registry.add(Arc::new(language));
3162
3163 // Connect to a server as 2 clients.
3164 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3165 let client_a = server.create_client(cx_a, "user_a").await;
3166 let mut client_b = server.create_client(cx_b, "user_b").await;
3167 server
3168 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3169 .await;
3170
3171 // Share a project as client A
3172 fs.insert_tree(
3173 "/a",
3174 json!({
3175 "a.rs": "let one = two",
3176 }),
3177 )
3178 .await;
3179 let project_a = cx_a.update(|cx| {
3180 Project::local(
3181 client_a.clone(),
3182 client_a.user_store.clone(),
3183 lang_registry.clone(),
3184 fs.clone(),
3185 cx,
3186 )
3187 });
3188 let (worktree_a, _) = project_a
3189 .update(cx_a, |p, cx| {
3190 p.find_or_create_local_worktree("/a", true, cx)
3191 })
3192 .await
3193 .unwrap();
3194 worktree_a
3195 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3196 .await;
3197 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3198
3199 // Join the project as client B.
3200 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3201
3202 let buffer_b = cx_b
3203 .background()
3204 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3205 .await
3206 .unwrap();
3207
3208 let fake_language_server = fake_language_servers.next().await.unwrap();
3209 fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
3210 Ok(Some(vec![
3211 lsp::TextEdit {
3212 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
3213 new_text: "h".to_string(),
3214 },
3215 lsp::TextEdit {
3216 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
3217 new_text: "y".to_string(),
3218 },
3219 ]))
3220 });
3221
3222 project_b
3223 .update(cx_b, |project, cx| {
3224 project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
3225 })
3226 .await
3227 .unwrap();
3228 assert_eq!(
3229 buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
3230 "let honey = two"
3231 );
3232 }
3233
3234 #[gpui::test(iterations = 10)]
3235 async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3236 cx_a.foreground().forbid_parking();
3237 let lang_registry = Arc::new(LanguageRegistry::test());
3238 let fs = FakeFs::new(cx_a.background());
3239 fs.insert_tree(
3240 "/root-1",
3241 json!({
3242 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
3243 }),
3244 )
3245 .await;
3246 fs.insert_tree(
3247 "/root-2",
3248 json!({
3249 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
3250 }),
3251 )
3252 .await;
3253
3254 // Set up a fake language server.
3255 let mut language = Language::new(
3256 LanguageConfig {
3257 name: "Rust".into(),
3258 path_suffixes: vec!["rs".to_string()],
3259 ..Default::default()
3260 },
3261 Some(tree_sitter_rust::language()),
3262 );
3263 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3264 lang_registry.add(Arc::new(language));
3265
3266 // Connect to a server as 2 clients.
3267 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3268 let client_a = server.create_client(cx_a, "user_a").await;
3269 let mut client_b = server.create_client(cx_b, "user_b").await;
3270 server
3271 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3272 .await;
3273
3274 // Share a project as client A
3275 let project_a = cx_a.update(|cx| {
3276 Project::local(
3277 client_a.clone(),
3278 client_a.user_store.clone(),
3279 lang_registry.clone(),
3280 fs.clone(),
3281 cx,
3282 )
3283 });
3284 let (worktree_a, _) = project_a
3285 .update(cx_a, |p, cx| {
3286 p.find_or_create_local_worktree("/root-1", true, cx)
3287 })
3288 .await
3289 .unwrap();
3290 worktree_a
3291 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3292 .await;
3293 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3294
3295 // Join the project as client B.
3296 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3297
3298 // Open the file on client B.
3299 let buffer_b = cx_b
3300 .background()
3301 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3302 .await
3303 .unwrap();
3304
3305 // Request the definition of a symbol as the guest.
3306 let fake_language_server = fake_language_servers.next().await.unwrap();
3307 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3308 |_, _| async move {
3309 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3310 lsp::Location::new(
3311 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3312 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3313 ),
3314 )))
3315 },
3316 );
3317
3318 let definitions_1 = project_b
3319 .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
3320 .await
3321 .unwrap();
3322 cx_b.read(|cx| {
3323 assert_eq!(definitions_1.len(), 1);
3324 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3325 let target_buffer = definitions_1[0].buffer.read(cx);
3326 assert_eq!(
3327 target_buffer.text(),
3328 "const TWO: usize = 2;\nconst THREE: usize = 3;"
3329 );
3330 assert_eq!(
3331 definitions_1[0].range.to_point(target_buffer),
3332 Point::new(0, 6)..Point::new(0, 9)
3333 );
3334 });
3335
3336 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
3337 // the previous call to `definition`.
3338 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3339 |_, _| async move {
3340 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3341 lsp::Location::new(
3342 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3343 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
3344 ),
3345 )))
3346 },
3347 );
3348
3349 let definitions_2 = project_b
3350 .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
3351 .await
3352 .unwrap();
3353 cx_b.read(|cx| {
3354 assert_eq!(definitions_2.len(), 1);
3355 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3356 let target_buffer = definitions_2[0].buffer.read(cx);
3357 assert_eq!(
3358 target_buffer.text(),
3359 "const TWO: usize = 2;\nconst THREE: usize = 3;"
3360 );
3361 assert_eq!(
3362 definitions_2[0].range.to_point(target_buffer),
3363 Point::new(1, 6)..Point::new(1, 11)
3364 );
3365 });
3366 assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
3367 }
3368
3369 #[gpui::test(iterations = 10)]
3370 async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3371 cx_a.foreground().forbid_parking();
3372 let lang_registry = Arc::new(LanguageRegistry::test());
3373 let fs = FakeFs::new(cx_a.background());
3374 fs.insert_tree(
3375 "/root-1",
3376 json!({
3377 "one.rs": "const ONE: usize = 1;",
3378 "two.rs": "const TWO: usize = one::ONE + one::ONE;",
3379 }),
3380 )
3381 .await;
3382 fs.insert_tree(
3383 "/root-2",
3384 json!({
3385 "three.rs": "const THREE: usize = two::TWO + one::ONE;",
3386 }),
3387 )
3388 .await;
3389
3390 // Set up a fake language server.
3391 let mut language = Language::new(
3392 LanguageConfig {
3393 name: "Rust".into(),
3394 path_suffixes: vec!["rs".to_string()],
3395 ..Default::default()
3396 },
3397 Some(tree_sitter_rust::language()),
3398 );
3399 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3400 lang_registry.add(Arc::new(language));
3401
3402 // Connect to a server as 2 clients.
3403 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3404 let client_a = server.create_client(cx_a, "user_a").await;
3405 let mut client_b = server.create_client(cx_b, "user_b").await;
3406 server
3407 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3408 .await;
3409
3410 // Share a project as client A
3411 let project_a = cx_a.update(|cx| {
3412 Project::local(
3413 client_a.clone(),
3414 client_a.user_store.clone(),
3415 lang_registry.clone(),
3416 fs.clone(),
3417 cx,
3418 )
3419 });
3420 let (worktree_a, _) = project_a
3421 .update(cx_a, |p, cx| {
3422 p.find_or_create_local_worktree("/root-1", true, cx)
3423 })
3424 .await
3425 .unwrap();
3426 worktree_a
3427 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3428 .await;
3429 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3430
3431 // Join the worktree as client B.
3432 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3433
3434 // Open the file on client B.
3435 let buffer_b = cx_b
3436 .background()
3437 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3438 .await
3439 .unwrap();
3440
3441 // Request references to a symbol as the guest.
3442 let fake_language_server = fake_language_servers.next().await.unwrap();
3443 fake_language_server.handle_request::<lsp::request::References, _, _>(
3444 |params, _| async move {
3445 assert_eq!(
3446 params.text_document_position.text_document.uri.as_str(),
3447 "file:///root-1/one.rs"
3448 );
3449 Ok(Some(vec![
3450 lsp::Location {
3451 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3452 range: lsp::Range::new(
3453 lsp::Position::new(0, 24),
3454 lsp::Position::new(0, 27),
3455 ),
3456 },
3457 lsp::Location {
3458 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3459 range: lsp::Range::new(
3460 lsp::Position::new(0, 35),
3461 lsp::Position::new(0, 38),
3462 ),
3463 },
3464 lsp::Location {
3465 uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3466 range: lsp::Range::new(
3467 lsp::Position::new(0, 37),
3468 lsp::Position::new(0, 40),
3469 ),
3470 },
3471 ]))
3472 },
3473 );
3474
3475 let references = project_b
3476 .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3477 .await
3478 .unwrap();
3479 cx_b.read(|cx| {
3480 assert_eq!(references.len(), 3);
3481 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3482
3483 let two_buffer = references[0].buffer.read(cx);
3484 let three_buffer = references[2].buffer.read(cx);
3485 assert_eq!(
3486 two_buffer.file().unwrap().path().as_ref(),
3487 Path::new("two.rs")
3488 );
3489 assert_eq!(references[1].buffer, references[0].buffer);
3490 assert_eq!(
3491 three_buffer.file().unwrap().full_path(cx),
3492 Path::new("three.rs")
3493 );
3494
3495 assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3496 assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3497 assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3498 });
3499 }
3500
3501 #[gpui::test(iterations = 10)]
3502 async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3503 cx_a.foreground().forbid_parking();
3504 let lang_registry = Arc::new(LanguageRegistry::test());
3505 let fs = FakeFs::new(cx_a.background());
3506 fs.insert_tree(
3507 "/root-1",
3508 json!({
3509 "a": "hello world",
3510 "b": "goodnight moon",
3511 "c": "a world of goo",
3512 "d": "world champion of clown world",
3513 }),
3514 )
3515 .await;
3516 fs.insert_tree(
3517 "/root-2",
3518 json!({
3519 "e": "disney world is fun",
3520 }),
3521 )
3522 .await;
3523
3524 // Connect to a server as 2 clients.
3525 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3526 let client_a = server.create_client(cx_a, "user_a").await;
3527 let mut client_b = server.create_client(cx_b, "user_b").await;
3528 server
3529 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3530 .await;
3531
3532 // Share a project as client A
3533 let project_a = cx_a.update(|cx| {
3534 Project::local(
3535 client_a.clone(),
3536 client_a.user_store.clone(),
3537 lang_registry.clone(),
3538 fs.clone(),
3539 cx,
3540 )
3541 });
3542
3543 let (worktree_1, _) = project_a
3544 .update(cx_a, |p, cx| {
3545 p.find_or_create_local_worktree("/root-1", true, cx)
3546 })
3547 .await
3548 .unwrap();
3549 worktree_1
3550 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3551 .await;
3552 let (worktree_2, _) = project_a
3553 .update(cx_a, |p, cx| {
3554 p.find_or_create_local_worktree("/root-2", true, cx)
3555 })
3556 .await
3557 .unwrap();
3558 worktree_2
3559 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3560 .await;
3561
3562 // Join the worktree as client B.
3563 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3564 let results = project_b
3565 .update(cx_b, |project, cx| {
3566 project.search(SearchQuery::text("world", false, false), cx)
3567 })
3568 .await
3569 .unwrap();
3570
3571 let mut ranges_by_path = results
3572 .into_iter()
3573 .map(|(buffer, ranges)| {
3574 buffer.read_with(cx_b, |buffer, cx| {
3575 let path = buffer.file().unwrap().full_path(cx);
3576 let offset_ranges = ranges
3577 .into_iter()
3578 .map(|range| range.to_offset(buffer))
3579 .collect::<Vec<_>>();
3580 (path, offset_ranges)
3581 })
3582 })
3583 .collect::<Vec<_>>();
3584 ranges_by_path.sort_by_key(|(path, _)| path.clone());
3585
3586 assert_eq!(
3587 ranges_by_path,
3588 &[
3589 (PathBuf::from("root-1/a"), vec![6..11]),
3590 (PathBuf::from("root-1/c"), vec![2..7]),
3591 (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3592 (PathBuf::from("root-2/e"), vec![7..12]),
3593 ]
3594 );
3595 }
3596
3597 #[gpui::test(iterations = 10)]
3598 async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3599 cx_a.foreground().forbid_parking();
3600 let lang_registry = Arc::new(LanguageRegistry::test());
3601 let fs = FakeFs::new(cx_a.background());
3602 fs.insert_tree(
3603 "/root-1",
3604 json!({
3605 "main.rs": "fn double(number: i32) -> i32 { number + number }",
3606 }),
3607 )
3608 .await;
3609
3610 // Set up a fake language server.
3611 let mut language = Language::new(
3612 LanguageConfig {
3613 name: "Rust".into(),
3614 path_suffixes: vec!["rs".to_string()],
3615 ..Default::default()
3616 },
3617 Some(tree_sitter_rust::language()),
3618 );
3619 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3620 lang_registry.add(Arc::new(language));
3621
3622 // Connect to a server as 2 clients.
3623 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3624 let client_a = server.create_client(cx_a, "user_a").await;
3625 let mut client_b = server.create_client(cx_b, "user_b").await;
3626 server
3627 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3628 .await;
3629
3630 // Share a project as client A
3631 let project_a = cx_a.update(|cx| {
3632 Project::local(
3633 client_a.clone(),
3634 client_a.user_store.clone(),
3635 lang_registry.clone(),
3636 fs.clone(),
3637 cx,
3638 )
3639 });
3640 let (worktree_a, _) = project_a
3641 .update(cx_a, |p, cx| {
3642 p.find_or_create_local_worktree("/root-1", true, cx)
3643 })
3644 .await
3645 .unwrap();
3646 worktree_a
3647 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3648 .await;
3649 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3650
3651 // Join the worktree as client B.
3652 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3653
3654 // Open the file on client B.
3655 let buffer_b = cx_b
3656 .background()
3657 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
3658 .await
3659 .unwrap();
3660
3661 // Request document highlights as the guest.
3662 let fake_language_server = fake_language_servers.next().await.unwrap();
3663 fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
3664 |params, _| async move {
3665 assert_eq!(
3666 params
3667 .text_document_position_params
3668 .text_document
3669 .uri
3670 .as_str(),
3671 "file:///root-1/main.rs"
3672 );
3673 assert_eq!(
3674 params.text_document_position_params.position,
3675 lsp::Position::new(0, 34)
3676 );
3677 Ok(Some(vec![
3678 lsp::DocumentHighlight {
3679 kind: Some(lsp::DocumentHighlightKind::WRITE),
3680 range: lsp::Range::new(
3681 lsp::Position::new(0, 10),
3682 lsp::Position::new(0, 16),
3683 ),
3684 },
3685 lsp::DocumentHighlight {
3686 kind: Some(lsp::DocumentHighlightKind::READ),
3687 range: lsp::Range::new(
3688 lsp::Position::new(0, 32),
3689 lsp::Position::new(0, 38),
3690 ),
3691 },
3692 lsp::DocumentHighlight {
3693 kind: Some(lsp::DocumentHighlightKind::READ),
3694 range: lsp::Range::new(
3695 lsp::Position::new(0, 41),
3696 lsp::Position::new(0, 47),
3697 ),
3698 },
3699 ]))
3700 },
3701 );
3702
3703 let highlights = project_b
3704 .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
3705 .await
3706 .unwrap();
3707 buffer_b.read_with(cx_b, |buffer, _| {
3708 let snapshot = buffer.snapshot();
3709
3710 let highlights = highlights
3711 .into_iter()
3712 .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3713 .collect::<Vec<_>>();
3714 assert_eq!(
3715 highlights,
3716 &[
3717 (lsp::DocumentHighlightKind::WRITE, 10..16),
3718 (lsp::DocumentHighlightKind::READ, 32..38),
3719 (lsp::DocumentHighlightKind::READ, 41..47)
3720 ]
3721 )
3722 });
3723 }
3724
3725 #[gpui::test(iterations = 10)]
3726 async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3727 cx_a.foreground().forbid_parking();
3728 let lang_registry = Arc::new(LanguageRegistry::test());
3729 let fs = FakeFs::new(cx_a.background());
3730 fs.insert_tree(
3731 "/code",
3732 json!({
3733 "crate-1": {
3734 "one.rs": "const ONE: usize = 1;",
3735 },
3736 "crate-2": {
3737 "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3738 },
3739 "private": {
3740 "passwords.txt": "the-password",
3741 }
3742 }),
3743 )
3744 .await;
3745
3746 // Set up a fake language server.
3747 let mut language = Language::new(
3748 LanguageConfig {
3749 name: "Rust".into(),
3750 path_suffixes: vec!["rs".to_string()],
3751 ..Default::default()
3752 },
3753 Some(tree_sitter_rust::language()),
3754 );
3755 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3756 lang_registry.add(Arc::new(language));
3757
3758 // Connect to a server as 2 clients.
3759 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3760 let client_a = server.create_client(cx_a, "user_a").await;
3761 let mut client_b = server.create_client(cx_b, "user_b").await;
3762 server
3763 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3764 .await;
3765
3766 // Share a project as client A
3767 let project_a = cx_a.update(|cx| {
3768 Project::local(
3769 client_a.clone(),
3770 client_a.user_store.clone(),
3771 lang_registry.clone(),
3772 fs.clone(),
3773 cx,
3774 )
3775 });
3776 let (worktree_a, _) = project_a
3777 .update(cx_a, |p, cx| {
3778 p.find_or_create_local_worktree("/code/crate-1", true, cx)
3779 })
3780 .await
3781 .unwrap();
3782 worktree_a
3783 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3784 .await;
3785 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3786
3787 // Join the worktree as client B.
3788 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3789
3790 // Cause the language server to start.
3791 let _buffer = cx_b
3792 .background()
3793 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3794 .await
3795 .unwrap();
3796
3797 let fake_language_server = fake_language_servers.next().await.unwrap();
3798 fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
3799 |_, _| async move {
3800 #[allow(deprecated)]
3801 Ok(Some(vec![lsp::SymbolInformation {
3802 name: "TWO".into(),
3803 location: lsp::Location {
3804 uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3805 range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3806 },
3807 kind: lsp::SymbolKind::CONSTANT,
3808 tags: None,
3809 container_name: None,
3810 deprecated: None,
3811 }]))
3812 },
3813 );
3814
3815 // Request the definition of a symbol as the guest.
3816 let symbols = project_b
3817 .update(cx_b, |p, cx| p.symbols("two", cx))
3818 .await
3819 .unwrap();
3820 assert_eq!(symbols.len(), 1);
3821 assert_eq!(symbols[0].name, "TWO");
3822
3823 // Open one of the returned symbols.
3824 let buffer_b_2 = project_b
3825 .update(cx_b, |project, cx| {
3826 project.open_buffer_for_symbol(&symbols[0], cx)
3827 })
3828 .await
3829 .unwrap();
3830 buffer_b_2.read_with(cx_b, |buffer, _| {
3831 assert_eq!(
3832 buffer.file().unwrap().path().as_ref(),
3833 Path::new("../crate-2/two.rs")
3834 );
3835 });
3836
3837 // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3838 let mut fake_symbol = symbols[0].clone();
3839 fake_symbol.path = Path::new("/code/secrets").into();
3840 let error = project_b
3841 .update(cx_b, |project, cx| {
3842 project.open_buffer_for_symbol(&fake_symbol, cx)
3843 })
3844 .await
3845 .unwrap_err();
3846 assert!(error.to_string().contains("invalid symbol signature"));
3847 }
3848
3849 #[gpui::test(iterations = 10)]
3850 async fn test_open_buffer_while_getting_definition_pointing_to_it(
3851 cx_a: &mut TestAppContext,
3852 cx_b: &mut TestAppContext,
3853 mut rng: StdRng,
3854 ) {
3855 cx_a.foreground().forbid_parking();
3856 let lang_registry = Arc::new(LanguageRegistry::test());
3857 let fs = FakeFs::new(cx_a.background());
3858 fs.insert_tree(
3859 "/root",
3860 json!({
3861 "a.rs": "const ONE: usize = b::TWO;",
3862 "b.rs": "const TWO: usize = 2",
3863 }),
3864 )
3865 .await;
3866
3867 // Set up a fake language server.
3868 let mut language = Language::new(
3869 LanguageConfig {
3870 name: "Rust".into(),
3871 path_suffixes: vec!["rs".to_string()],
3872 ..Default::default()
3873 },
3874 Some(tree_sitter_rust::language()),
3875 );
3876 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3877 lang_registry.add(Arc::new(language));
3878
3879 // Connect to a server as 2 clients.
3880 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3881 let client_a = server.create_client(cx_a, "user_a").await;
3882 let mut client_b = server.create_client(cx_b, "user_b").await;
3883 server
3884 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3885 .await;
3886
3887 // Share a project as client A
3888 let project_a = cx_a.update(|cx| {
3889 Project::local(
3890 client_a.clone(),
3891 client_a.user_store.clone(),
3892 lang_registry.clone(),
3893 fs.clone(),
3894 cx,
3895 )
3896 });
3897
3898 let (worktree_a, _) = project_a
3899 .update(cx_a, |p, cx| {
3900 p.find_or_create_local_worktree("/root", true, cx)
3901 })
3902 .await
3903 .unwrap();
3904 worktree_a
3905 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3906 .await;
3907 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3908
3909 // Join the project as client B.
3910 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3911
3912 let buffer_b1 = cx_b
3913 .background()
3914 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3915 .await
3916 .unwrap();
3917
3918 let fake_language_server = fake_language_servers.next().await.unwrap();
3919 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3920 |_, _| async move {
3921 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3922 lsp::Location::new(
3923 lsp::Url::from_file_path("/root/b.rs").unwrap(),
3924 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3925 ),
3926 )))
3927 },
3928 );
3929
3930 let definitions;
3931 let buffer_b2;
3932 if rng.gen() {
3933 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3934 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3935 } else {
3936 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3937 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3938 }
3939
3940 let buffer_b2 = buffer_b2.await.unwrap();
3941 let definitions = definitions.await.unwrap();
3942 assert_eq!(definitions.len(), 1);
3943 assert_eq!(definitions[0].buffer, buffer_b2);
3944 }
3945
3946 #[gpui::test(iterations = 10)]
3947 async fn test_collaborating_with_code_actions(
3948 cx_a: &mut TestAppContext,
3949 cx_b: &mut TestAppContext,
3950 ) {
3951 cx_a.foreground().forbid_parking();
3952 let lang_registry = Arc::new(LanguageRegistry::test());
3953 let fs = FakeFs::new(cx_a.background());
3954 cx_b.update(|cx| editor::init(cx));
3955
3956 // Set up a fake language server.
3957 let mut language = Language::new(
3958 LanguageConfig {
3959 name: "Rust".into(),
3960 path_suffixes: vec!["rs".to_string()],
3961 ..Default::default()
3962 },
3963 Some(tree_sitter_rust::language()),
3964 );
3965 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3966 lang_registry.add(Arc::new(language));
3967
3968 // Connect to a server as 2 clients.
3969 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3970 let client_a = server.create_client(cx_a, "user_a").await;
3971 let mut client_b = server.create_client(cx_b, "user_b").await;
3972 server
3973 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3974 .await;
3975
3976 // Share a project as client A
3977 fs.insert_tree(
3978 "/a",
3979 json!({
3980 "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3981 "other.rs": "pub fn foo() -> usize { 4 }",
3982 }),
3983 )
3984 .await;
3985 let project_a = cx_a.update(|cx| {
3986 Project::local(
3987 client_a.clone(),
3988 client_a.user_store.clone(),
3989 lang_registry.clone(),
3990 fs.clone(),
3991 cx,
3992 )
3993 });
3994 let (worktree_a, _) = project_a
3995 .update(cx_a, |p, cx| {
3996 p.find_or_create_local_worktree("/a", true, cx)
3997 })
3998 .await
3999 .unwrap();
4000 worktree_a
4001 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4002 .await;
4003 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4004
4005 // Join the project as client B.
4006 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4007 let mut params = cx_b.update(WorkspaceParams::test);
4008 params.languages = lang_registry.clone();
4009 params.project = project_b.clone();
4010 params.client = client_b.client.clone();
4011 params.user_store = client_b.user_store.clone();
4012
4013 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
4014 let editor_b = workspace_b
4015 .update(cx_b, |workspace, cx| {
4016 workspace.open_path((worktree_id, "main.rs"), true, cx)
4017 })
4018 .await
4019 .unwrap()
4020 .downcast::<Editor>()
4021 .unwrap();
4022
4023 let mut fake_language_server = fake_language_servers.next().await.unwrap();
4024 fake_language_server
4025 .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4026 assert_eq!(
4027 params.text_document.uri,
4028 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4029 );
4030 assert_eq!(params.range.start, lsp::Position::new(0, 0));
4031 assert_eq!(params.range.end, lsp::Position::new(0, 0));
4032 Ok(None)
4033 })
4034 .next()
4035 .await;
4036
4037 // Move cursor to a location that contains code actions.
4038 editor_b.update(cx_b, |editor, cx| {
4039 editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
4040 cx.focus(&editor_b);
4041 });
4042
4043 fake_language_server
4044 .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4045 assert_eq!(
4046 params.text_document.uri,
4047 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4048 );
4049 assert_eq!(params.range.start, lsp::Position::new(1, 31));
4050 assert_eq!(params.range.end, lsp::Position::new(1, 31));
4051
4052 Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
4053 lsp::CodeAction {
4054 title: "Inline into all callers".to_string(),
4055 edit: Some(lsp::WorkspaceEdit {
4056 changes: Some(
4057 [
4058 (
4059 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4060 vec![lsp::TextEdit::new(
4061 lsp::Range::new(
4062 lsp::Position::new(1, 22),
4063 lsp::Position::new(1, 34),
4064 ),
4065 "4".to_string(),
4066 )],
4067 ),
4068 (
4069 lsp::Url::from_file_path("/a/other.rs").unwrap(),
4070 vec![lsp::TextEdit::new(
4071 lsp::Range::new(
4072 lsp::Position::new(0, 0),
4073 lsp::Position::new(0, 27),
4074 ),
4075 "".to_string(),
4076 )],
4077 ),
4078 ]
4079 .into_iter()
4080 .collect(),
4081 ),
4082 ..Default::default()
4083 }),
4084 data: Some(json!({
4085 "codeActionParams": {
4086 "range": {
4087 "start": {"line": 1, "column": 31},
4088 "end": {"line": 1, "column": 31},
4089 }
4090 }
4091 })),
4092 ..Default::default()
4093 },
4094 )]))
4095 })
4096 .next()
4097 .await;
4098
4099 // Toggle code actions and wait for them to display.
4100 editor_b.update(cx_b, |editor, cx| {
4101 editor.toggle_code_actions(
4102 &ToggleCodeActions {
4103 deployed_from_indicator: false,
4104 },
4105 cx,
4106 );
4107 });
4108 editor_b
4109 .condition(&cx_b, |editor, _| editor.context_menu_visible())
4110 .await;
4111
4112 fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
4113
4114 // Confirming the code action will trigger a resolve request.
4115 let confirm_action = workspace_b
4116 .update(cx_b, |workspace, cx| {
4117 Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
4118 })
4119 .unwrap();
4120 fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
4121 |_, _| async move {
4122 Ok(lsp::CodeAction {
4123 title: "Inline into all callers".to_string(),
4124 edit: Some(lsp::WorkspaceEdit {
4125 changes: Some(
4126 [
4127 (
4128 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4129 vec![lsp::TextEdit::new(
4130 lsp::Range::new(
4131 lsp::Position::new(1, 22),
4132 lsp::Position::new(1, 34),
4133 ),
4134 "4".to_string(),
4135 )],
4136 ),
4137 (
4138 lsp::Url::from_file_path("/a/other.rs").unwrap(),
4139 vec![lsp::TextEdit::new(
4140 lsp::Range::new(
4141 lsp::Position::new(0, 0),
4142 lsp::Position::new(0, 27),
4143 ),
4144 "".to_string(),
4145 )],
4146 ),
4147 ]
4148 .into_iter()
4149 .collect(),
4150 ),
4151 ..Default::default()
4152 }),
4153 ..Default::default()
4154 })
4155 },
4156 );
4157
4158 // After the action is confirmed, an editor containing both modified files is opened.
4159 confirm_action.await.unwrap();
4160 let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4161 workspace
4162 .active_item(cx)
4163 .unwrap()
4164 .downcast::<Editor>()
4165 .unwrap()
4166 });
4167 code_action_editor.update(cx_b, |editor, cx| {
4168 assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4169 editor.undo(&Undo, cx);
4170 assert_eq!(
4171 editor.text(cx),
4172 "mod other;\nfn main() { let foo = other::foo(); }\npub fn foo() -> usize { 4 }"
4173 );
4174 editor.redo(&Redo, cx);
4175 assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4176 });
4177 }
4178
4179 #[gpui::test(iterations = 10)]
4180 async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4181 cx_a.foreground().forbid_parking();
4182 let lang_registry = Arc::new(LanguageRegistry::test());
4183 let fs = FakeFs::new(cx_a.background());
4184 cx_b.update(|cx| editor::init(cx));
4185
4186 // Set up a fake language server.
4187 let mut language = Language::new(
4188 LanguageConfig {
4189 name: "Rust".into(),
4190 path_suffixes: vec!["rs".to_string()],
4191 ..Default::default()
4192 },
4193 Some(tree_sitter_rust::language()),
4194 );
4195 let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
4196 capabilities: lsp::ServerCapabilities {
4197 rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
4198 prepare_provider: Some(true),
4199 work_done_progress_options: Default::default(),
4200 })),
4201 ..Default::default()
4202 },
4203 ..Default::default()
4204 });
4205 lang_registry.add(Arc::new(language));
4206
4207 // Connect to a server as 2 clients.
4208 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4209 let client_a = server.create_client(cx_a, "user_a").await;
4210 let mut client_b = server.create_client(cx_b, "user_b").await;
4211 server
4212 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4213 .await;
4214
4215 // Share a project as client A
4216 fs.insert_tree(
4217 "/dir",
4218 json!({
4219 "one.rs": "const ONE: usize = 1;",
4220 "two.rs": "const TWO: usize = one::ONE + one::ONE;"
4221 }),
4222 )
4223 .await;
4224 let project_a = cx_a.update(|cx| {
4225 Project::local(
4226 client_a.clone(),
4227 client_a.user_store.clone(),
4228 lang_registry.clone(),
4229 fs.clone(),
4230 cx,
4231 )
4232 });
4233 let (worktree_a, _) = project_a
4234 .update(cx_a, |p, cx| {
4235 p.find_or_create_local_worktree("/dir", true, cx)
4236 })
4237 .await
4238 .unwrap();
4239 worktree_a
4240 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4241 .await;
4242 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4243
4244 // Join the worktree as client B.
4245 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4246 let mut params = cx_b.update(WorkspaceParams::test);
4247 params.languages = lang_registry.clone();
4248 params.project = project_b.clone();
4249 params.client = client_b.client.clone();
4250 params.user_store = client_b.user_store.clone();
4251
4252 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
4253 let editor_b = workspace_b
4254 .update(cx_b, |workspace, cx| {
4255 workspace.open_path((worktree_id, "one.rs"), true, cx)
4256 })
4257 .await
4258 .unwrap()
4259 .downcast::<Editor>()
4260 .unwrap();
4261 let fake_language_server = fake_language_servers.next().await.unwrap();
4262
4263 // Move cursor to a location that can be renamed.
4264 let prepare_rename = editor_b.update(cx_b, |editor, cx| {
4265 editor.select_ranges([7..7], None, cx);
4266 editor.rename(&Rename, cx).unwrap()
4267 });
4268
4269 fake_language_server
4270 .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
4271 assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
4272 assert_eq!(params.position, lsp::Position::new(0, 7));
4273 Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4274 lsp::Position::new(0, 6),
4275 lsp::Position::new(0, 9),
4276 ))))
4277 })
4278 .next()
4279 .await
4280 .unwrap();
4281 prepare_rename.await.unwrap();
4282 editor_b.update(cx_b, |editor, cx| {
4283 let rename = editor.pending_rename().unwrap();
4284 let buffer = editor.buffer().read(cx).snapshot(cx);
4285 assert_eq!(
4286 rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
4287 6..9
4288 );
4289 rename.editor.update(cx, |rename_editor, cx| {
4290 rename_editor.buffer().update(cx, |rename_buffer, cx| {
4291 rename_buffer.edit([(0..3, "THREE")], cx);
4292 });
4293 });
4294 });
4295
4296 let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
4297 Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
4298 });
4299 fake_language_server
4300 .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
4301 assert_eq!(
4302 params.text_document_position.text_document.uri.as_str(),
4303 "file:///dir/one.rs"
4304 );
4305 assert_eq!(
4306 params.text_document_position.position,
4307 lsp::Position::new(0, 6)
4308 );
4309 assert_eq!(params.new_name, "THREE");
4310 Ok(Some(lsp::WorkspaceEdit {
4311 changes: Some(
4312 [
4313 (
4314 lsp::Url::from_file_path("/dir/one.rs").unwrap(),
4315 vec![lsp::TextEdit::new(
4316 lsp::Range::new(
4317 lsp::Position::new(0, 6),
4318 lsp::Position::new(0, 9),
4319 ),
4320 "THREE".to_string(),
4321 )],
4322 ),
4323 (
4324 lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4325 vec![
4326 lsp::TextEdit::new(
4327 lsp::Range::new(
4328 lsp::Position::new(0, 24),
4329 lsp::Position::new(0, 27),
4330 ),
4331 "THREE".to_string(),
4332 ),
4333 lsp::TextEdit::new(
4334 lsp::Range::new(
4335 lsp::Position::new(0, 35),
4336 lsp::Position::new(0, 38),
4337 ),
4338 "THREE".to_string(),
4339 ),
4340 ],
4341 ),
4342 ]
4343 .into_iter()
4344 .collect(),
4345 ),
4346 ..Default::default()
4347 }))
4348 })
4349 .next()
4350 .await
4351 .unwrap();
4352 confirm_rename.await.unwrap();
4353
4354 let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4355 workspace
4356 .active_item(cx)
4357 .unwrap()
4358 .downcast::<Editor>()
4359 .unwrap()
4360 });
4361 rename_editor.update(cx_b, |editor, cx| {
4362 assert_eq!(
4363 editor.text(cx),
4364 "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4365 );
4366 editor.undo(&Undo, cx);
4367 assert_eq!(
4368 editor.text(cx),
4369 "const ONE: usize = 1;\nconst TWO: usize = one::ONE + one::ONE;"
4370 );
4371 editor.redo(&Redo, cx);
4372 assert_eq!(
4373 editor.text(cx),
4374 "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4375 );
4376 });
4377
4378 // Ensure temporary rename edits cannot be undone/redone.
4379 editor_b.update(cx_b, |editor, cx| {
4380 editor.undo(&Undo, cx);
4381 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4382 editor.undo(&Undo, cx);
4383 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4384 editor.redo(&Redo, cx);
4385 assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4386 })
4387 }
4388
4389 #[gpui::test(iterations = 10)]
4390 async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4391 cx_a.foreground().forbid_parking();
4392
4393 // Connect to a server as 2 clients.
4394 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4395 let client_a = server.create_client(cx_a, "user_a").await;
4396 let client_b = server.create_client(cx_b, "user_b").await;
4397
4398 // Create an org that includes these 2 users.
4399 let db = &server.app_state.db;
4400 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4401 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4402 .await
4403 .unwrap();
4404 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4405 .await
4406 .unwrap();
4407
4408 // Create a channel that includes all the users.
4409 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4410 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4411 .await
4412 .unwrap();
4413 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4414 .await
4415 .unwrap();
4416 db.create_channel_message(
4417 channel_id,
4418 client_b.current_user_id(&cx_b),
4419 "hello A, it's B.",
4420 OffsetDateTime::now_utc(),
4421 1,
4422 )
4423 .await
4424 .unwrap();
4425
4426 let channels_a = cx_a
4427 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4428 channels_a
4429 .condition(cx_a, |list, _| list.available_channels().is_some())
4430 .await;
4431 channels_a.read_with(cx_a, |list, _| {
4432 assert_eq!(
4433 list.available_channels().unwrap(),
4434 &[ChannelDetails {
4435 id: channel_id.to_proto(),
4436 name: "test-channel".to_string()
4437 }]
4438 )
4439 });
4440 let channel_a = channels_a.update(cx_a, |this, cx| {
4441 this.get_channel(channel_id.to_proto(), cx).unwrap()
4442 });
4443 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4444 channel_a
4445 .condition(&cx_a, |channel, _| {
4446 channel_messages(channel)
4447 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4448 })
4449 .await;
4450
4451 let channels_b = cx_b
4452 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4453 channels_b
4454 .condition(cx_b, |list, _| list.available_channels().is_some())
4455 .await;
4456 channels_b.read_with(cx_b, |list, _| {
4457 assert_eq!(
4458 list.available_channels().unwrap(),
4459 &[ChannelDetails {
4460 id: channel_id.to_proto(),
4461 name: "test-channel".to_string()
4462 }]
4463 )
4464 });
4465
4466 let channel_b = channels_b.update(cx_b, |this, cx| {
4467 this.get_channel(channel_id.to_proto(), cx).unwrap()
4468 });
4469 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4470 channel_b
4471 .condition(&cx_b, |channel, _| {
4472 channel_messages(channel)
4473 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4474 })
4475 .await;
4476
4477 channel_a
4478 .update(cx_a, |channel, cx| {
4479 channel
4480 .send_message("oh, hi B.".to_string(), cx)
4481 .unwrap()
4482 .detach();
4483 let task = channel.send_message("sup".to_string(), cx).unwrap();
4484 assert_eq!(
4485 channel_messages(channel),
4486 &[
4487 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4488 ("user_a".to_string(), "oh, hi B.".to_string(), true),
4489 ("user_a".to_string(), "sup".to_string(), true)
4490 ]
4491 );
4492 task
4493 })
4494 .await
4495 .unwrap();
4496
4497 channel_b
4498 .condition(&cx_b, |channel, _| {
4499 channel_messages(channel)
4500 == [
4501 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4502 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4503 ("user_a".to_string(), "sup".to_string(), false),
4504 ]
4505 })
4506 .await;
4507
4508 assert_eq!(
4509 server
4510 .state()
4511 .await
4512 .channel(channel_id)
4513 .unwrap()
4514 .connection_ids
4515 .len(),
4516 2
4517 );
4518 cx_b.update(|_| drop(channel_b));
4519 server
4520 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4521 .await;
4522
4523 cx_a.update(|_| drop(channel_a));
4524 server
4525 .condition(|state| state.channel(channel_id).is_none())
4526 .await;
4527 }
4528
4529 #[gpui::test(iterations = 10)]
4530 async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4531 cx_a.foreground().forbid_parking();
4532
4533 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4534 let client_a = server.create_client(cx_a, "user_a").await;
4535
4536 let db = &server.app_state.db;
4537 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4538 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4539 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4540 .await
4541 .unwrap();
4542 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4543 .await
4544 .unwrap();
4545
4546 let channels_a = cx_a
4547 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4548 channels_a
4549 .condition(cx_a, |list, _| list.available_channels().is_some())
4550 .await;
4551 let channel_a = channels_a.update(cx_a, |this, cx| {
4552 this.get_channel(channel_id.to_proto(), cx).unwrap()
4553 });
4554
4555 // Messages aren't allowed to be too long.
4556 channel_a
4557 .update(cx_a, |channel, cx| {
4558 let long_body = "this is long.\n".repeat(1024);
4559 channel.send_message(long_body, cx).unwrap()
4560 })
4561 .await
4562 .unwrap_err();
4563
4564 // Messages aren't allowed to be blank.
4565 channel_a.update(cx_a, |channel, cx| {
4566 channel.send_message(String::new(), cx).unwrap_err()
4567 });
4568
4569 // Leading and trailing whitespace are trimmed.
4570 channel_a
4571 .update(cx_a, |channel, cx| {
4572 channel
4573 .send_message("\n surrounded by whitespace \n".to_string(), cx)
4574 .unwrap()
4575 })
4576 .await
4577 .unwrap();
4578 assert_eq!(
4579 db.get_channel_messages(channel_id, 10, None)
4580 .await
4581 .unwrap()
4582 .iter()
4583 .map(|m| &m.body)
4584 .collect::<Vec<_>>(),
4585 &["surrounded by whitespace"]
4586 );
4587 }
4588
4589 #[gpui::test(iterations = 10)]
4590 async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4591 cx_a.foreground().forbid_parking();
4592
4593 // Connect to a server as 2 clients.
4594 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4595 let client_a = server.create_client(cx_a, "user_a").await;
4596 let client_b = server.create_client(cx_b, "user_b").await;
4597 let mut status_b = client_b.status();
4598
4599 // Create an org that includes these 2 users.
4600 let db = &server.app_state.db;
4601 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4602 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4603 .await
4604 .unwrap();
4605 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4606 .await
4607 .unwrap();
4608
4609 // Create a channel that includes all the users.
4610 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4611 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4612 .await
4613 .unwrap();
4614 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4615 .await
4616 .unwrap();
4617 db.create_channel_message(
4618 channel_id,
4619 client_b.current_user_id(&cx_b),
4620 "hello A, it's B.",
4621 OffsetDateTime::now_utc(),
4622 2,
4623 )
4624 .await
4625 .unwrap();
4626
4627 let channels_a = cx_a
4628 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4629 channels_a
4630 .condition(cx_a, |list, _| list.available_channels().is_some())
4631 .await;
4632
4633 channels_a.read_with(cx_a, |list, _| {
4634 assert_eq!(
4635 list.available_channels().unwrap(),
4636 &[ChannelDetails {
4637 id: channel_id.to_proto(),
4638 name: "test-channel".to_string()
4639 }]
4640 )
4641 });
4642 let channel_a = channels_a.update(cx_a, |this, cx| {
4643 this.get_channel(channel_id.to_proto(), cx).unwrap()
4644 });
4645 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4646 channel_a
4647 .condition(&cx_a, |channel, _| {
4648 channel_messages(channel)
4649 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4650 })
4651 .await;
4652
4653 let channels_b = cx_b
4654 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4655 channels_b
4656 .condition(cx_b, |list, _| list.available_channels().is_some())
4657 .await;
4658 channels_b.read_with(cx_b, |list, _| {
4659 assert_eq!(
4660 list.available_channels().unwrap(),
4661 &[ChannelDetails {
4662 id: channel_id.to_proto(),
4663 name: "test-channel".to_string()
4664 }]
4665 )
4666 });
4667
4668 let channel_b = channels_b.update(cx_b, |this, cx| {
4669 this.get_channel(channel_id.to_proto(), cx).unwrap()
4670 });
4671 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4672 channel_b
4673 .condition(&cx_b, |channel, _| {
4674 channel_messages(channel)
4675 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4676 })
4677 .await;
4678
4679 // Disconnect client B, ensuring we can still access its cached channel data.
4680 server.forbid_connections();
4681 server.disconnect_client(client_b.current_user_id(&cx_b));
4682 cx_b.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
4683 while !matches!(
4684 status_b.next().await,
4685 Some(client::Status::ReconnectionError { .. })
4686 ) {}
4687
4688 channels_b.read_with(cx_b, |channels, _| {
4689 assert_eq!(
4690 channels.available_channels().unwrap(),
4691 [ChannelDetails {
4692 id: channel_id.to_proto(),
4693 name: "test-channel".to_string()
4694 }]
4695 )
4696 });
4697 channel_b.read_with(cx_b, |channel, _| {
4698 assert_eq!(
4699 channel_messages(channel),
4700 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4701 )
4702 });
4703
4704 // Send a message from client B while it is disconnected.
4705 channel_b
4706 .update(cx_b, |channel, cx| {
4707 let task = channel
4708 .send_message("can you see this?".to_string(), cx)
4709 .unwrap();
4710 assert_eq!(
4711 channel_messages(channel),
4712 &[
4713 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4714 ("user_b".to_string(), "can you see this?".to_string(), true)
4715 ]
4716 );
4717 task
4718 })
4719 .await
4720 .unwrap_err();
4721
4722 // Send a message from client A while B is disconnected.
4723 channel_a
4724 .update(cx_a, |channel, cx| {
4725 channel
4726 .send_message("oh, hi B.".to_string(), cx)
4727 .unwrap()
4728 .detach();
4729 let task = channel.send_message("sup".to_string(), cx).unwrap();
4730 assert_eq!(
4731 channel_messages(channel),
4732 &[
4733 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4734 ("user_a".to_string(), "oh, hi B.".to_string(), true),
4735 ("user_a".to_string(), "sup".to_string(), true)
4736 ]
4737 );
4738 task
4739 })
4740 .await
4741 .unwrap();
4742
4743 // Give client B a chance to reconnect.
4744 server.allow_connections();
4745 cx_b.foreground().advance_clock(Duration::from_secs(10));
4746
4747 // Verify that B sees the new messages upon reconnection, as well as the message client B
4748 // sent while offline.
4749 channel_b
4750 .condition(&cx_b, |channel, _| {
4751 channel_messages(channel)
4752 == [
4753 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4754 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4755 ("user_a".to_string(), "sup".to_string(), false),
4756 ("user_b".to_string(), "can you see this?".to_string(), false),
4757 ]
4758 })
4759 .await;
4760
4761 // Ensure client A and B can communicate normally after reconnection.
4762 channel_a
4763 .update(cx_a, |channel, cx| {
4764 channel.send_message("you online?".to_string(), cx).unwrap()
4765 })
4766 .await
4767 .unwrap();
4768 channel_b
4769 .condition(&cx_b, |channel, _| {
4770 channel_messages(channel)
4771 == [
4772 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4773 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4774 ("user_a".to_string(), "sup".to_string(), false),
4775 ("user_b".to_string(), "can you see this?".to_string(), false),
4776 ("user_a".to_string(), "you online?".to_string(), false),
4777 ]
4778 })
4779 .await;
4780
4781 channel_b
4782 .update(cx_b, |channel, cx| {
4783 channel.send_message("yep".to_string(), cx).unwrap()
4784 })
4785 .await
4786 .unwrap();
4787 channel_a
4788 .condition(&cx_a, |channel, _| {
4789 channel_messages(channel)
4790 == [
4791 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4792 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4793 ("user_a".to_string(), "sup".to_string(), false),
4794 ("user_b".to_string(), "can you see this?".to_string(), false),
4795 ("user_a".to_string(), "you online?".to_string(), false),
4796 ("user_b".to_string(), "yep".to_string(), false),
4797 ]
4798 })
4799 .await;
4800 }
4801
4802 #[gpui::test(iterations = 10)]
4803 async fn test_contacts(
4804 deterministic: Arc<Deterministic>,
4805 cx_a: &mut TestAppContext,
4806 cx_b: &mut TestAppContext,
4807 cx_c: &mut TestAppContext,
4808 ) {
4809 cx_a.foreground().forbid_parking();
4810
4811 // Connect to a server as 3 clients.
4812 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4813 let mut client_a = server.create_client(cx_a, "user_a").await;
4814 let mut client_b = server.create_client(cx_b, "user_b").await;
4815 let client_c = server.create_client(cx_c, "user_c").await;
4816 server
4817 .make_contacts(vec![
4818 (&client_a, cx_a),
4819 (&client_b, cx_b),
4820 (&client_c, cx_c),
4821 ])
4822 .await;
4823
4824 deterministic.run_until_parked();
4825 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
4826 client.user_store.read_with(*cx, |store, _| {
4827 assert_eq!(
4828 contacts(store),
4829 [
4830 ("user_a", true, vec![]),
4831 ("user_b", true, vec![]),
4832 ("user_c", true, vec![])
4833 ],
4834 "{} has the wrong contacts",
4835 client.username
4836 )
4837 });
4838 }
4839
4840 // Share a project as client A.
4841 let fs = FakeFs::new(cx_a.background());
4842 fs.create_dir(Path::new("/a")).await.unwrap();
4843 let (project_a, _) = client_a.build_local_project(fs, "/a", cx_a).await;
4844
4845 deterministic.run_until_parked();
4846 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
4847 client.user_store.read_with(*cx, |store, _| {
4848 assert_eq!(
4849 contacts(store),
4850 [
4851 ("user_a", true, vec![("a", vec![])]),
4852 ("user_b", true, vec![]),
4853 ("user_c", true, vec![])
4854 ],
4855 "{} has the wrong contacts",
4856 client.username
4857 )
4858 });
4859 }
4860
4861 let _project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4862
4863 deterministic.run_until_parked();
4864 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
4865 client.user_store.read_with(*cx, |store, _| {
4866 assert_eq!(
4867 contacts(store),
4868 [
4869 ("user_a", true, vec![("a", vec!["user_b"])]),
4870 ("user_b", true, vec![]),
4871 ("user_c", true, vec![])
4872 ],
4873 "{} has the wrong contacts",
4874 client.username
4875 )
4876 });
4877 }
4878
4879 // Add a local project as client B
4880 let fs = FakeFs::new(cx_b.background());
4881 fs.create_dir(Path::new("/b")).await.unwrap();
4882 let (_project_b, _) = client_b.build_local_project(fs, "/b", cx_a).await;
4883
4884 deterministic.run_until_parked();
4885 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
4886 client.user_store.read_with(*cx, |store, _| {
4887 assert_eq!(
4888 contacts(store),
4889 [
4890 ("user_a", true, vec![("a", vec!["user_b"])]),
4891 ("user_b", true, vec![("b", vec![])]),
4892 ("user_c", true, vec![])
4893 ],
4894 "{} has the wrong contacts",
4895 client.username
4896 )
4897 });
4898 }
4899
4900 project_a
4901 .condition(&cx_a, |project, _| {
4902 project.collaborators().contains_key(&client_b.peer_id)
4903 })
4904 .await;
4905
4906 client_a.project.take();
4907 cx_a.update(move |_| drop(project_a));
4908 deterministic.run_until_parked();
4909 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
4910 client.user_store.read_with(*cx, |store, _| {
4911 assert_eq!(
4912 contacts(store),
4913 [
4914 ("user_a", true, vec![]),
4915 ("user_b", true, vec![("b", vec![])]),
4916 ("user_c", true, vec![])
4917 ],
4918 "{} has the wrong contacts",
4919 client.username
4920 )
4921 });
4922 }
4923
4924 server.disconnect_client(client_c.current_user_id(cx_c));
4925 server.forbid_connections();
4926 deterministic.advance_clock(rpc::RECEIVE_TIMEOUT);
4927 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b)] {
4928 client.user_store.read_with(*cx, |store, _| {
4929 assert_eq!(
4930 contacts(store),
4931 [
4932 ("user_a", true, vec![]),
4933 ("user_b", true, vec![("b", vec![])]),
4934 ("user_c", false, vec![])
4935 ],
4936 "{} has the wrong contacts",
4937 client.username
4938 )
4939 });
4940 }
4941 client_c
4942 .user_store
4943 .read_with(cx_c, |store, _| assert_eq!(contacts(store), []));
4944
4945 server.allow_connections();
4946 client_c
4947 .authenticate_and_connect(false, &cx_c.to_async())
4948 .await
4949 .unwrap();
4950
4951 deterministic.run_until_parked();
4952 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
4953 client.user_store.read_with(*cx, |store, _| {
4954 assert_eq!(
4955 contacts(store),
4956 [
4957 ("user_a", true, vec![]),
4958 ("user_b", true, vec![("b", vec![])]),
4959 ("user_c", true, vec![])
4960 ],
4961 "{} has the wrong contacts",
4962 client.username
4963 )
4964 });
4965 }
4966
4967 fn contacts(user_store: &UserStore) -> Vec<(&str, bool, Vec<(&str, Vec<&str>)>)> {
4968 user_store
4969 .contacts()
4970 .iter()
4971 .map(|contact| {
4972 let projects = contact
4973 .projects
4974 .iter()
4975 .map(|p| {
4976 (
4977 p.worktree_root_names[0].as_str(),
4978 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4979 )
4980 })
4981 .collect();
4982 (contact.user.github_login.as_str(), contact.online, projects)
4983 })
4984 .collect()
4985 }
4986 }
4987
4988 #[gpui::test(iterations = 10)]
4989 async fn test_contact_requests(
4990 executor: Arc<Deterministic>,
4991 cx_a: &mut TestAppContext,
4992 cx_a2: &mut TestAppContext,
4993 cx_b: &mut TestAppContext,
4994 cx_b2: &mut TestAppContext,
4995 cx_c: &mut TestAppContext,
4996 cx_c2: &mut TestAppContext,
4997 ) {
4998 cx_a.foreground().forbid_parking();
4999
5000 // Connect to a server as 3 clients.
5001 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5002 let client_a = server.create_client(cx_a, "user_a").await;
5003 let client_a2 = server.create_client(cx_a2, "user_a").await;
5004 let client_b = server.create_client(cx_b, "user_b").await;
5005 let client_b2 = server.create_client(cx_b2, "user_b").await;
5006 let client_c = server.create_client(cx_c, "user_c").await;
5007 let client_c2 = server.create_client(cx_c2, "user_c").await;
5008
5009 assert_eq!(client_a.user_id().unwrap(), client_a2.user_id().unwrap());
5010 assert_eq!(client_b.user_id().unwrap(), client_b2.user_id().unwrap());
5011 assert_eq!(client_c.user_id().unwrap(), client_c2.user_id().unwrap());
5012
5013 // User A and User C request that user B become their contact.
5014 client_a
5015 .user_store
5016 .update(cx_a, |store, cx| {
5017 store.request_contact(client_b.user_id().unwrap(), cx)
5018 })
5019 .await
5020 .unwrap();
5021 client_c
5022 .user_store
5023 .update(cx_c, |store, cx| {
5024 store.request_contact(client_b.user_id().unwrap(), cx)
5025 })
5026 .await
5027 .unwrap();
5028 executor.run_until_parked();
5029
5030 // All users see the pending request appear in all their clients.
5031 assert_eq!(
5032 client_a.summarize_contacts(&cx_a).outgoing_requests,
5033 &["user_b"]
5034 );
5035 assert_eq!(
5036 client_a2.summarize_contacts(&cx_a2).outgoing_requests,
5037 &["user_b"]
5038 );
5039 assert_eq!(
5040 client_b.summarize_contacts(&cx_b).incoming_requests,
5041 &["user_a", "user_c"]
5042 );
5043 assert_eq!(
5044 client_b2.summarize_contacts(&cx_b2).incoming_requests,
5045 &["user_a", "user_c"]
5046 );
5047 assert_eq!(
5048 client_c.summarize_contacts(&cx_c).outgoing_requests,
5049 &["user_b"]
5050 );
5051 assert_eq!(
5052 client_c2.summarize_contacts(&cx_c2).outgoing_requests,
5053 &["user_b"]
5054 );
5055
5056 // Contact requests are present upon connecting (tested here via disconnect/reconnect)
5057 disconnect_and_reconnect(&client_a, cx_a).await;
5058 disconnect_and_reconnect(&client_b, cx_b).await;
5059 disconnect_and_reconnect(&client_c, cx_c).await;
5060 executor.run_until_parked();
5061 assert_eq!(
5062 client_a.summarize_contacts(&cx_a).outgoing_requests,
5063 &["user_b"]
5064 );
5065 assert_eq!(
5066 client_b.summarize_contacts(&cx_b).incoming_requests,
5067 &["user_a", "user_c"]
5068 );
5069 assert_eq!(
5070 client_c.summarize_contacts(&cx_c).outgoing_requests,
5071 &["user_b"]
5072 );
5073
5074 // User B accepts the request from user A.
5075 client_b
5076 .user_store
5077 .update(cx_b, |store, cx| {
5078 store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
5079 })
5080 .await
5081 .unwrap();
5082
5083 executor.run_until_parked();
5084
5085 // User B sees user A as their contact now in all client, and the incoming request from them is removed.
5086 let contacts_b = client_b.summarize_contacts(&cx_b);
5087 assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5088 assert_eq!(contacts_b.incoming_requests, &["user_c"]);
5089 let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5090 assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5091 assert_eq!(contacts_b2.incoming_requests, &["user_c"]);
5092
5093 // User A sees user B as their contact now in all clients, and the outgoing request to them is removed.
5094 let contacts_a = client_a.summarize_contacts(&cx_a);
5095 assert_eq!(contacts_a.current, &["user_a", "user_b"]);
5096 assert!(contacts_a.outgoing_requests.is_empty());
5097 let contacts_a2 = client_a2.summarize_contacts(&cx_a2);
5098 assert_eq!(contacts_a2.current, &["user_a", "user_b"]);
5099 assert!(contacts_a2.outgoing_requests.is_empty());
5100
5101 // Contacts are present upon connecting (tested here via disconnect/reconnect)
5102 disconnect_and_reconnect(&client_a, cx_a).await;
5103 disconnect_and_reconnect(&client_b, cx_b).await;
5104 disconnect_and_reconnect(&client_c, cx_c).await;
5105 executor.run_until_parked();
5106 assert_eq!(
5107 client_a.summarize_contacts(&cx_a).current,
5108 &["user_a", "user_b"]
5109 );
5110 assert_eq!(
5111 client_b.summarize_contacts(&cx_b).current,
5112 &["user_a", "user_b"]
5113 );
5114 assert_eq!(
5115 client_b.summarize_contacts(&cx_b).incoming_requests,
5116 &["user_c"]
5117 );
5118 assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5119 assert_eq!(
5120 client_c.summarize_contacts(&cx_c).outgoing_requests,
5121 &["user_b"]
5122 );
5123
5124 // User B rejects the request from user C.
5125 client_b
5126 .user_store
5127 .update(cx_b, |store, cx| {
5128 store.respond_to_contact_request(client_c.user_id().unwrap(), false, cx)
5129 })
5130 .await
5131 .unwrap();
5132
5133 executor.run_until_parked();
5134
5135 // User B doesn't see user C as their contact, and the incoming request from them is removed.
5136 let contacts_b = client_b.summarize_contacts(&cx_b);
5137 assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5138 assert!(contacts_b.incoming_requests.is_empty());
5139 let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5140 assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5141 assert!(contacts_b2.incoming_requests.is_empty());
5142
5143 // User C doesn't see user B as their contact, and the outgoing request to them is removed.
5144 let contacts_c = client_c.summarize_contacts(&cx_c);
5145 assert_eq!(contacts_c.current, &["user_c"]);
5146 assert!(contacts_c.outgoing_requests.is_empty());
5147 let contacts_c2 = client_c2.summarize_contacts(&cx_c2);
5148 assert_eq!(contacts_c2.current, &["user_c"]);
5149 assert!(contacts_c2.outgoing_requests.is_empty());
5150
5151 // Incoming/outgoing requests are not present upon connecting (tested here via disconnect/reconnect)
5152 disconnect_and_reconnect(&client_a, cx_a).await;
5153 disconnect_and_reconnect(&client_b, cx_b).await;
5154 disconnect_and_reconnect(&client_c, cx_c).await;
5155 executor.run_until_parked();
5156 assert_eq!(
5157 client_a.summarize_contacts(&cx_a).current,
5158 &["user_a", "user_b"]
5159 );
5160 assert_eq!(
5161 client_b.summarize_contacts(&cx_b).current,
5162 &["user_a", "user_b"]
5163 );
5164 assert!(client_b
5165 .summarize_contacts(&cx_b)
5166 .incoming_requests
5167 .is_empty());
5168 assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5169 assert!(client_c
5170 .summarize_contacts(&cx_c)
5171 .outgoing_requests
5172 .is_empty());
5173
5174 async fn disconnect_and_reconnect(client: &TestClient, cx: &mut TestAppContext) {
5175 client.disconnect(&cx.to_async()).unwrap();
5176 client.clear_contacts(cx).await;
5177 client
5178 .authenticate_and_connect(false, &cx.to_async())
5179 .await
5180 .unwrap();
5181 }
5182 }
5183
5184 #[gpui::test(iterations = 10)]
5185 async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5186 cx_a.foreground().forbid_parking();
5187 let fs = FakeFs::new(cx_a.background());
5188
5189 // 2 clients connect to a server.
5190 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5191 let mut client_a = server.create_client(cx_a, "user_a").await;
5192 let mut client_b = server.create_client(cx_b, "user_b").await;
5193 server
5194 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5195 .await;
5196 cx_a.update(editor::init);
5197 cx_b.update(editor::init);
5198
5199 // Client A shares a project.
5200 fs.insert_tree(
5201 "/a",
5202 json!({
5203 "1.txt": "one",
5204 "2.txt": "two",
5205 "3.txt": "three",
5206 }),
5207 )
5208 .await;
5209 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5210
5211 // Client B joins the project.
5212 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5213
5214 // Client A opens some editors.
5215 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5216 let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5217 let editor_a1 = workspace_a
5218 .update(cx_a, |workspace, cx| {
5219 workspace.open_path((worktree_id, "1.txt"), true, cx)
5220 })
5221 .await
5222 .unwrap()
5223 .downcast::<Editor>()
5224 .unwrap();
5225 let editor_a2 = workspace_a
5226 .update(cx_a, |workspace, cx| {
5227 workspace.open_path((worktree_id, "2.txt"), true, cx)
5228 })
5229 .await
5230 .unwrap()
5231 .downcast::<Editor>()
5232 .unwrap();
5233
5234 // Client B opens an editor.
5235 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5236 let editor_b1 = workspace_b
5237 .update(cx_b, |workspace, cx| {
5238 workspace.open_path((worktree_id, "1.txt"), true, cx)
5239 })
5240 .await
5241 .unwrap()
5242 .downcast::<Editor>()
5243 .unwrap();
5244
5245 let client_a_id = project_b.read_with(cx_b, |project, _| {
5246 project.collaborators().values().next().unwrap().peer_id
5247 });
5248 let client_b_id = project_a.read_with(cx_a, |project, _| {
5249 project.collaborators().values().next().unwrap().peer_id
5250 });
5251
5252 // When client B starts following client A, all visible view states are replicated to client B.
5253 editor_a1.update(cx_a, |editor, cx| editor.select_ranges([0..1], None, cx));
5254 editor_a2.update(cx_a, |editor, cx| editor.select_ranges([2..3], None, cx));
5255 workspace_b
5256 .update(cx_b, |workspace, cx| {
5257 workspace
5258 .toggle_follow(&ToggleFollow(client_a_id), cx)
5259 .unwrap()
5260 })
5261 .await
5262 .unwrap();
5263
5264 let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5265 workspace
5266 .active_item(cx)
5267 .unwrap()
5268 .downcast::<Editor>()
5269 .unwrap()
5270 });
5271 assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
5272 assert_eq!(
5273 editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
5274 Some((worktree_id, "2.txt").into())
5275 );
5276 assert_eq!(
5277 editor_b2.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
5278 vec![2..3]
5279 );
5280 assert_eq!(
5281 editor_b1.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
5282 vec![0..1]
5283 );
5284
5285 // When client A activates a different editor, client B does so as well.
5286 workspace_a.update(cx_a, |workspace, cx| {
5287 workspace.activate_item(&editor_a1, cx)
5288 });
5289 workspace_b
5290 .condition(cx_b, |workspace, cx| {
5291 workspace.active_item(cx).unwrap().id() == editor_b1.id()
5292 })
5293 .await;
5294
5295 // When client A navigates back and forth, client B does so as well.
5296 workspace_a
5297 .update(cx_a, |workspace, cx| {
5298 workspace::Pane::go_back(workspace, None, cx)
5299 })
5300 .await;
5301 workspace_b
5302 .condition(cx_b, |workspace, cx| {
5303 workspace.active_item(cx).unwrap().id() == editor_b2.id()
5304 })
5305 .await;
5306
5307 workspace_a
5308 .update(cx_a, |workspace, cx| {
5309 workspace::Pane::go_forward(workspace, None, cx)
5310 })
5311 .await;
5312 workspace_b
5313 .condition(cx_b, |workspace, cx| {
5314 workspace.active_item(cx).unwrap().id() == editor_b1.id()
5315 })
5316 .await;
5317
5318 // Changes to client A's editor are reflected on client B.
5319 editor_a1.update(cx_a, |editor, cx| {
5320 editor.select_ranges([1..1, 2..2], None, cx);
5321 });
5322 editor_b1
5323 .condition(cx_b, |editor, cx| {
5324 editor.selected_ranges(cx) == vec![1..1, 2..2]
5325 })
5326 .await;
5327
5328 editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
5329 editor_b1
5330 .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
5331 .await;
5332
5333 editor_a1.update(cx_a, |editor, cx| {
5334 editor.select_ranges([3..3], None, cx);
5335 editor.set_scroll_position(vec2f(0., 100.), cx);
5336 });
5337 editor_b1
5338 .condition(cx_b, |editor, cx| editor.selected_ranges(cx) == vec![3..3])
5339 .await;
5340
5341 // After unfollowing, client B stops receiving updates from client A.
5342 workspace_b.update(cx_b, |workspace, cx| {
5343 workspace.unfollow(&workspace.active_pane().clone(), cx)
5344 });
5345 workspace_a.update(cx_a, |workspace, cx| {
5346 workspace.activate_item(&editor_a2, cx)
5347 });
5348 cx_a.foreground().run_until_parked();
5349 assert_eq!(
5350 workspace_b.read_with(cx_b, |workspace, cx| workspace
5351 .active_item(cx)
5352 .unwrap()
5353 .id()),
5354 editor_b1.id()
5355 );
5356
5357 // Client A starts following client B.
5358 workspace_a
5359 .update(cx_a, |workspace, cx| {
5360 workspace
5361 .toggle_follow(&ToggleFollow(client_b_id), cx)
5362 .unwrap()
5363 })
5364 .await
5365 .unwrap();
5366 assert_eq!(
5367 workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5368 Some(client_b_id)
5369 );
5370 assert_eq!(
5371 workspace_a.read_with(cx_a, |workspace, cx| workspace
5372 .active_item(cx)
5373 .unwrap()
5374 .id()),
5375 editor_a1.id()
5376 );
5377
5378 // Following interrupts when client B disconnects.
5379 client_b.disconnect(&cx_b.to_async()).unwrap();
5380 cx_a.foreground().run_until_parked();
5381 assert_eq!(
5382 workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5383 None
5384 );
5385 }
5386
5387 #[gpui::test(iterations = 10)]
5388 async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5389 cx_a.foreground().forbid_parking();
5390 let fs = FakeFs::new(cx_a.background());
5391
5392 // 2 clients connect to a server.
5393 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5394 let mut client_a = server.create_client(cx_a, "user_a").await;
5395 let mut client_b = server.create_client(cx_b, "user_b").await;
5396 server
5397 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5398 .await;
5399 cx_a.update(editor::init);
5400 cx_b.update(editor::init);
5401
5402 // Client A shares a project.
5403 fs.insert_tree(
5404 "/a",
5405 json!({
5406 "1.txt": "one",
5407 "2.txt": "two",
5408 "3.txt": "three",
5409 "4.txt": "four",
5410 }),
5411 )
5412 .await;
5413 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5414
5415 // Client B joins the project.
5416 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5417
5418 // Client A opens some editors.
5419 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5420 let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5421 let _editor_a1 = workspace_a
5422 .update(cx_a, |workspace, cx| {
5423 workspace.open_path((worktree_id, "1.txt"), true, cx)
5424 })
5425 .await
5426 .unwrap()
5427 .downcast::<Editor>()
5428 .unwrap();
5429
5430 // Client B opens an editor.
5431 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5432 let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5433 let _editor_b1 = workspace_b
5434 .update(cx_b, |workspace, cx| {
5435 workspace.open_path((worktree_id, "2.txt"), true, cx)
5436 })
5437 .await
5438 .unwrap()
5439 .downcast::<Editor>()
5440 .unwrap();
5441
5442 // Clients A and B follow each other in split panes
5443 workspace_a
5444 .update(cx_a, |workspace, cx| {
5445 workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5446 assert_ne!(*workspace.active_pane(), pane_a1);
5447 let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
5448 workspace
5449 .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5450 .unwrap()
5451 })
5452 .await
5453 .unwrap();
5454 workspace_b
5455 .update(cx_b, |workspace, cx| {
5456 workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5457 assert_ne!(*workspace.active_pane(), pane_b1);
5458 let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
5459 workspace
5460 .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5461 .unwrap()
5462 })
5463 .await
5464 .unwrap();
5465
5466 workspace_a
5467 .update(cx_a, |workspace, cx| {
5468 workspace.activate_next_pane(cx);
5469 assert_eq!(*workspace.active_pane(), pane_a1);
5470 workspace.open_path((worktree_id, "3.txt"), true, cx)
5471 })
5472 .await
5473 .unwrap();
5474 workspace_b
5475 .update(cx_b, |workspace, cx| {
5476 workspace.activate_next_pane(cx);
5477 assert_eq!(*workspace.active_pane(), pane_b1);
5478 workspace.open_path((worktree_id, "4.txt"), true, cx)
5479 })
5480 .await
5481 .unwrap();
5482 cx_a.foreground().run_until_parked();
5483
5484 // Ensure leader updates don't change the active pane of followers
5485 workspace_a.read_with(cx_a, |workspace, _| {
5486 assert_eq!(*workspace.active_pane(), pane_a1);
5487 });
5488 workspace_b.read_with(cx_b, |workspace, _| {
5489 assert_eq!(*workspace.active_pane(), pane_b1);
5490 });
5491
5492 // Ensure peers following each other doesn't cause an infinite loop.
5493 assert_eq!(
5494 workspace_a.read_with(cx_a, |workspace, cx| workspace
5495 .active_item(cx)
5496 .unwrap()
5497 .project_path(cx)),
5498 Some((worktree_id, "3.txt").into())
5499 );
5500 workspace_a.update(cx_a, |workspace, cx| {
5501 assert_eq!(
5502 workspace.active_item(cx).unwrap().project_path(cx),
5503 Some((worktree_id, "3.txt").into())
5504 );
5505 workspace.activate_next_pane(cx);
5506 assert_eq!(
5507 workspace.active_item(cx).unwrap().project_path(cx),
5508 Some((worktree_id, "4.txt").into())
5509 );
5510 });
5511 workspace_b.update(cx_b, |workspace, cx| {
5512 assert_eq!(
5513 workspace.active_item(cx).unwrap().project_path(cx),
5514 Some((worktree_id, "4.txt").into())
5515 );
5516 workspace.activate_next_pane(cx);
5517 assert_eq!(
5518 workspace.active_item(cx).unwrap().project_path(cx),
5519 Some((worktree_id, "3.txt").into())
5520 );
5521 });
5522 }
5523
5524 #[gpui::test(iterations = 10)]
5525 async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5526 cx_a.foreground().forbid_parking();
5527 let fs = FakeFs::new(cx_a.background());
5528
5529 // 2 clients connect to a server.
5530 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5531 let mut client_a = server.create_client(cx_a, "user_a").await;
5532 let mut client_b = server.create_client(cx_b, "user_b").await;
5533 server
5534 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5535 .await;
5536 cx_a.update(editor::init);
5537 cx_b.update(editor::init);
5538
5539 // Client A shares a project.
5540 fs.insert_tree(
5541 "/a",
5542 json!({
5543 "1.txt": "one",
5544 "2.txt": "two",
5545 "3.txt": "three",
5546 }),
5547 )
5548 .await;
5549 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5550
5551 // Client B joins the project.
5552 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5553
5554 // Client A opens some editors.
5555 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5556 let _editor_a1 = workspace_a
5557 .update(cx_a, |workspace, cx| {
5558 workspace.open_path((worktree_id, "1.txt"), true, cx)
5559 })
5560 .await
5561 .unwrap()
5562 .downcast::<Editor>()
5563 .unwrap();
5564
5565 // Client B starts following client A.
5566 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5567 let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5568 let leader_id = project_b.read_with(cx_b, |project, _| {
5569 project.collaborators().values().next().unwrap().peer_id
5570 });
5571 workspace_b
5572 .update(cx_b, |workspace, cx| {
5573 workspace
5574 .toggle_follow(&ToggleFollow(leader_id), cx)
5575 .unwrap()
5576 })
5577 .await
5578 .unwrap();
5579 assert_eq!(
5580 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5581 Some(leader_id)
5582 );
5583 let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5584 workspace
5585 .active_item(cx)
5586 .unwrap()
5587 .downcast::<Editor>()
5588 .unwrap()
5589 });
5590
5591 // When client B moves, it automatically stops following client A.
5592 editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
5593 assert_eq!(
5594 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5595 None
5596 );
5597
5598 workspace_b
5599 .update(cx_b, |workspace, cx| {
5600 workspace
5601 .toggle_follow(&ToggleFollow(leader_id), cx)
5602 .unwrap()
5603 })
5604 .await
5605 .unwrap();
5606 assert_eq!(
5607 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5608 Some(leader_id)
5609 );
5610
5611 // When client B edits, it automatically stops following client A.
5612 editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
5613 assert_eq!(
5614 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5615 None
5616 );
5617
5618 workspace_b
5619 .update(cx_b, |workspace, cx| {
5620 workspace
5621 .toggle_follow(&ToggleFollow(leader_id), cx)
5622 .unwrap()
5623 })
5624 .await
5625 .unwrap();
5626 assert_eq!(
5627 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5628 Some(leader_id)
5629 );
5630
5631 // When client B scrolls, it automatically stops following client A.
5632 editor_b2.update(cx_b, |editor, cx| {
5633 editor.set_scroll_position(vec2f(0., 3.), cx)
5634 });
5635 assert_eq!(
5636 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5637 None
5638 );
5639
5640 workspace_b
5641 .update(cx_b, |workspace, cx| {
5642 workspace
5643 .toggle_follow(&ToggleFollow(leader_id), cx)
5644 .unwrap()
5645 })
5646 .await
5647 .unwrap();
5648 assert_eq!(
5649 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5650 Some(leader_id)
5651 );
5652
5653 // When client B activates a different pane, it continues following client A in the original pane.
5654 workspace_b.update(cx_b, |workspace, cx| {
5655 workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
5656 });
5657 assert_eq!(
5658 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5659 Some(leader_id)
5660 );
5661
5662 workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
5663 assert_eq!(
5664 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5665 Some(leader_id)
5666 );
5667
5668 // When client B activates a different item in the original pane, it automatically stops following client A.
5669 workspace_b
5670 .update(cx_b, |workspace, cx| {
5671 workspace.open_path((worktree_id, "2.txt"), true, cx)
5672 })
5673 .await
5674 .unwrap();
5675 assert_eq!(
5676 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5677 None
5678 );
5679 }
5680
5681 #[gpui::test(iterations = 100)]
5682 async fn test_random_collaboration(
5683 cx: &mut TestAppContext,
5684 deterministic: Arc<Deterministic>,
5685 rng: StdRng,
5686 ) {
5687 cx.foreground().forbid_parking();
5688 let max_peers = env::var("MAX_PEERS")
5689 .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
5690 .unwrap_or(5);
5691 assert!(max_peers <= 5);
5692
5693 let max_operations = env::var("OPERATIONS")
5694 .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
5695 .unwrap_or(10);
5696
5697 let rng = Arc::new(Mutex::new(rng));
5698
5699 let guest_lang_registry = Arc::new(LanguageRegistry::test());
5700 let host_language_registry = Arc::new(LanguageRegistry::test());
5701
5702 let fs = FakeFs::new(cx.background());
5703 fs.insert_tree("/_collab", json!({"init": ""})).await;
5704
5705 let mut server = TestServer::start(cx.foreground(), cx.background()).await;
5706 let db = server.app_state.db.clone();
5707 let host_user_id = db.create_user("host", false).await.unwrap();
5708 for username in ["guest-1", "guest-2", "guest-3", "guest-4"] {
5709 let guest_user_id = db.create_user(username, false).await.unwrap();
5710 server
5711 .app_state
5712 .db
5713 .send_contact_request(guest_user_id, host_user_id)
5714 .await
5715 .unwrap();
5716 server
5717 .app_state
5718 .db
5719 .respond_to_contact_request(host_user_id, guest_user_id, true)
5720 .await
5721 .unwrap();
5722 }
5723
5724 let mut clients = Vec::new();
5725 let mut user_ids = Vec::new();
5726 let mut op_start_signals = Vec::new();
5727
5728 let mut next_entity_id = 100000;
5729 let mut host_cx = TestAppContext::new(
5730 cx.foreground_platform(),
5731 cx.platform(),
5732 deterministic.build_foreground(next_entity_id),
5733 deterministic.build_background(),
5734 cx.font_cache(),
5735 cx.leak_detector(),
5736 next_entity_id,
5737 );
5738 let host = server.create_client(&mut host_cx, "host").await;
5739 let host_project = host_cx.update(|cx| {
5740 Project::local(
5741 host.client.clone(),
5742 host.user_store.clone(),
5743 host_language_registry.clone(),
5744 fs.clone(),
5745 cx,
5746 )
5747 });
5748 let host_project_id = host_project
5749 .update(&mut host_cx, |p, _| p.next_remote_id())
5750 .await;
5751
5752 let (collab_worktree, _) = host_project
5753 .update(&mut host_cx, |project, cx| {
5754 project.find_or_create_local_worktree("/_collab", true, cx)
5755 })
5756 .await
5757 .unwrap();
5758 collab_worktree
5759 .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
5760 .await;
5761
5762 // Set up fake language servers.
5763 let mut language = Language::new(
5764 LanguageConfig {
5765 name: "Rust".into(),
5766 path_suffixes: vec!["rs".to_string()],
5767 ..Default::default()
5768 },
5769 None,
5770 );
5771 let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
5772 name: "the-fake-language-server",
5773 capabilities: lsp::LanguageServer::full_capabilities(),
5774 initializer: Some(Box::new({
5775 let rng = rng.clone();
5776 let fs = fs.clone();
5777 let project = host_project.downgrade();
5778 move |fake_server: &mut FakeLanguageServer| {
5779 fake_server.handle_request::<lsp::request::Completion, _, _>(
5780 |_, _| async move {
5781 Ok(Some(lsp::CompletionResponse::Array(vec![
5782 lsp::CompletionItem {
5783 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
5784 range: lsp::Range::new(
5785 lsp::Position::new(0, 0),
5786 lsp::Position::new(0, 0),
5787 ),
5788 new_text: "the-new-text".to_string(),
5789 })),
5790 ..Default::default()
5791 },
5792 ])))
5793 },
5794 );
5795
5796 fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
5797 |_, _| async move {
5798 Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
5799 lsp::CodeAction {
5800 title: "the-code-action".to_string(),
5801 ..Default::default()
5802 },
5803 )]))
5804 },
5805 );
5806
5807 fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
5808 |params, _| async move {
5809 Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
5810 params.position,
5811 params.position,
5812 ))))
5813 },
5814 );
5815
5816 fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
5817 let fs = fs.clone();
5818 let rng = rng.clone();
5819 move |_, _| {
5820 let fs = fs.clone();
5821 let rng = rng.clone();
5822 async move {
5823 let files = fs.files().await;
5824 let mut rng = rng.lock();
5825 let count = rng.gen_range::<usize, _>(1..3);
5826 let files = (0..count)
5827 .map(|_| files.choose(&mut *rng).unwrap())
5828 .collect::<Vec<_>>();
5829 log::info!("LSP: Returning definitions in files {:?}", &files);
5830 Ok(Some(lsp::GotoDefinitionResponse::Array(
5831 files
5832 .into_iter()
5833 .map(|file| lsp::Location {
5834 uri: lsp::Url::from_file_path(file).unwrap(),
5835 range: Default::default(),
5836 })
5837 .collect(),
5838 )))
5839 }
5840 }
5841 });
5842
5843 fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
5844 let rng = rng.clone();
5845 let project = project.clone();
5846 move |params, mut cx| {
5847 let highlights = if let Some(project) = project.upgrade(&cx) {
5848 project.update(&mut cx, |project, cx| {
5849 let path = params
5850 .text_document_position_params
5851 .text_document
5852 .uri
5853 .to_file_path()
5854 .unwrap();
5855 let (worktree, relative_path) =
5856 project.find_local_worktree(&path, cx)?;
5857 let project_path =
5858 ProjectPath::from((worktree.read(cx).id(), relative_path));
5859 let buffer =
5860 project.get_open_buffer(&project_path, cx)?.read(cx);
5861
5862 let mut highlights = Vec::new();
5863 let highlight_count = rng.lock().gen_range(1..=5);
5864 let mut prev_end = 0;
5865 for _ in 0..highlight_count {
5866 let range =
5867 buffer.random_byte_range(prev_end, &mut *rng.lock());
5868
5869 highlights.push(lsp::DocumentHighlight {
5870 range: range_to_lsp(range.to_point_utf16(buffer)),
5871 kind: Some(lsp::DocumentHighlightKind::READ),
5872 });
5873 prev_end = range.end;
5874 }
5875 Some(highlights)
5876 })
5877 } else {
5878 None
5879 };
5880 async move { Ok(highlights) }
5881 }
5882 });
5883 }
5884 })),
5885 ..Default::default()
5886 });
5887 host_language_registry.add(Arc::new(language));
5888
5889 let op_start_signal = futures::channel::mpsc::unbounded();
5890 user_ids.push(host.current_user_id(&host_cx));
5891 op_start_signals.push(op_start_signal.0);
5892 clients.push(host_cx.foreground().spawn(host.simulate_host(
5893 host_project,
5894 op_start_signal.1,
5895 rng.clone(),
5896 host_cx,
5897 )));
5898
5899 let disconnect_host_at = if rng.lock().gen_bool(0.2) {
5900 rng.lock().gen_range(0..max_operations)
5901 } else {
5902 max_operations
5903 };
5904 let mut available_guests = vec![
5905 "guest-1".to_string(),
5906 "guest-2".to_string(),
5907 "guest-3".to_string(),
5908 "guest-4".to_string(),
5909 ];
5910 let mut operations = 0;
5911 while operations < max_operations {
5912 if operations == disconnect_host_at {
5913 server.disconnect_client(user_ids[0]);
5914 cx.foreground().advance_clock(RECEIVE_TIMEOUT);
5915 drop(op_start_signals);
5916 let mut clients = futures::future::join_all(clients).await;
5917 cx.foreground().run_until_parked();
5918
5919 let (host, mut host_cx, host_err) = clients.remove(0);
5920 if let Some(host_err) = host_err {
5921 log::error!("host error - {:?}", host_err);
5922 }
5923 host.project
5924 .as_ref()
5925 .unwrap()
5926 .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
5927 for (guest, mut guest_cx, guest_err) in clients {
5928 if let Some(guest_err) = guest_err {
5929 log::error!("{} error - {:?}", guest.username, guest_err);
5930 }
5931
5932 let contacts = server
5933 .app_state
5934 .db
5935 .get_contacts(guest.current_user_id(&guest_cx))
5936 .await
5937 .unwrap();
5938 let contacts = server
5939 .store
5940 .read()
5941 .await
5942 .build_initial_contacts_update(contacts)
5943 .contacts;
5944 assert!(!contacts
5945 .iter()
5946 .flat_map(|contact| &contact.projects)
5947 .any(|project| project.id == host_project_id));
5948 guest
5949 .project
5950 .as_ref()
5951 .unwrap()
5952 .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
5953 guest_cx.update(|_| drop(guest));
5954 }
5955 host_cx.update(|_| drop(host));
5956
5957 return;
5958 }
5959
5960 let distribution = rng.lock().gen_range(0..100);
5961 match distribution {
5962 0..=19 if !available_guests.is_empty() => {
5963 let guest_ix = rng.lock().gen_range(0..available_guests.len());
5964 let guest_username = available_guests.remove(guest_ix);
5965 log::info!("Adding new connection for {}", guest_username);
5966 next_entity_id += 100000;
5967 let mut guest_cx = TestAppContext::new(
5968 cx.foreground_platform(),
5969 cx.platform(),
5970 deterministic.build_foreground(next_entity_id),
5971 deterministic.build_background(),
5972 cx.font_cache(),
5973 cx.leak_detector(),
5974 next_entity_id,
5975 );
5976 let guest = server.create_client(&mut guest_cx, &guest_username).await;
5977 let guest_project = Project::remote(
5978 host_project_id,
5979 guest.client.clone(),
5980 guest.user_store.clone(),
5981 guest_lang_registry.clone(),
5982 FakeFs::new(cx.background()),
5983 &mut guest_cx.to_async(),
5984 )
5985 .await
5986 .unwrap();
5987 let op_start_signal = futures::channel::mpsc::unbounded();
5988 user_ids.push(guest.current_user_id(&guest_cx));
5989 op_start_signals.push(op_start_signal.0);
5990 clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
5991 guest_username.clone(),
5992 guest_project,
5993 op_start_signal.1,
5994 rng.clone(),
5995 guest_cx,
5996 )));
5997
5998 log::info!("Added connection for {}", guest_username);
5999 operations += 1;
6000 }
6001 20..=29 if clients.len() > 1 => {
6002 let guest_ix = rng.lock().gen_range(1..clients.len());
6003 log::info!("Removing guest {}", user_ids[guest_ix]);
6004 let removed_guest_id = user_ids.remove(guest_ix);
6005 let guest = clients.remove(guest_ix);
6006 op_start_signals.remove(guest_ix);
6007 server.forbid_connections();
6008 server.disconnect_client(removed_guest_id);
6009 cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6010 let (guest, mut guest_cx, guest_err) = guest.await;
6011 server.allow_connections();
6012
6013 if let Some(guest_err) = guest_err {
6014 log::error!("{} error - {:?}", guest.username, guest_err);
6015 }
6016 guest
6017 .project
6018 .as_ref()
6019 .unwrap()
6020 .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6021 for user_id in &user_ids {
6022 let contacts = server.app_state.db.get_contacts(*user_id).await.unwrap();
6023 let contacts = server
6024 .store
6025 .read()
6026 .await
6027 .build_initial_contacts_update(contacts)
6028 .contacts;
6029 for contact in contacts {
6030 if contact.online {
6031 assert_ne!(
6032 contact.user_id, removed_guest_id.0 as u64,
6033 "removed guest is still a contact of another peer"
6034 );
6035 }
6036 for project in contact.projects {
6037 for project_guest_id in project.guests {
6038 assert_ne!(
6039 project_guest_id, removed_guest_id.0 as u64,
6040 "removed guest appears as still participating on a project"
6041 );
6042 }
6043 }
6044 }
6045 }
6046
6047 log::info!("{} removed", guest.username);
6048 available_guests.push(guest.username.clone());
6049 guest_cx.update(|_| drop(guest));
6050
6051 operations += 1;
6052 }
6053 _ => {
6054 while operations < max_operations && rng.lock().gen_bool(0.7) {
6055 op_start_signals
6056 .choose(&mut *rng.lock())
6057 .unwrap()
6058 .unbounded_send(())
6059 .unwrap();
6060 operations += 1;
6061 }
6062
6063 if rng.lock().gen_bool(0.8) {
6064 cx.foreground().run_until_parked();
6065 }
6066 }
6067 }
6068 }
6069
6070 drop(op_start_signals);
6071 let mut clients = futures::future::join_all(clients).await;
6072 cx.foreground().run_until_parked();
6073
6074 let (host_client, mut host_cx, host_err) = clients.remove(0);
6075 if let Some(host_err) = host_err {
6076 panic!("host error - {:?}", host_err);
6077 }
6078 let host_project = host_client.project.as_ref().unwrap();
6079 let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
6080 project
6081 .worktrees(cx)
6082 .map(|worktree| {
6083 let snapshot = worktree.read(cx).snapshot();
6084 (snapshot.id(), snapshot)
6085 })
6086 .collect::<BTreeMap<_, _>>()
6087 });
6088
6089 host_client
6090 .project
6091 .as_ref()
6092 .unwrap()
6093 .read_with(&host_cx, |project, cx| project.check_invariants(cx));
6094
6095 for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
6096 if let Some(guest_err) = guest_err {
6097 panic!("{} error - {:?}", guest_client.username, guest_err);
6098 }
6099 let worktree_snapshots =
6100 guest_client
6101 .project
6102 .as_ref()
6103 .unwrap()
6104 .read_with(&guest_cx, |project, cx| {
6105 project
6106 .worktrees(cx)
6107 .map(|worktree| {
6108 let worktree = worktree.read(cx);
6109 (worktree.id(), worktree.snapshot())
6110 })
6111 .collect::<BTreeMap<_, _>>()
6112 });
6113
6114 assert_eq!(
6115 worktree_snapshots.keys().collect::<Vec<_>>(),
6116 host_worktree_snapshots.keys().collect::<Vec<_>>(),
6117 "{} has different worktrees than the host",
6118 guest_client.username
6119 );
6120 for (id, host_snapshot) in &host_worktree_snapshots {
6121 let guest_snapshot = &worktree_snapshots[id];
6122 assert_eq!(
6123 guest_snapshot.root_name(),
6124 host_snapshot.root_name(),
6125 "{} has different root name than the host for worktree {}",
6126 guest_client.username,
6127 id
6128 );
6129 assert_eq!(
6130 guest_snapshot.entries(false).collect::<Vec<_>>(),
6131 host_snapshot.entries(false).collect::<Vec<_>>(),
6132 "{} has different snapshot than the host for worktree {}",
6133 guest_client.username,
6134 id
6135 );
6136 assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id());
6137 }
6138
6139 guest_client
6140 .project
6141 .as_ref()
6142 .unwrap()
6143 .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
6144
6145 for guest_buffer in &guest_client.buffers {
6146 let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
6147 let host_buffer = host_project.read_with(&host_cx, |project, cx| {
6148 project.buffer_for_id(buffer_id, cx).expect(&format!(
6149 "host does not have buffer for guest:{}, peer:{}, id:{}",
6150 guest_client.username, guest_client.peer_id, buffer_id
6151 ))
6152 });
6153 let path = host_buffer
6154 .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
6155
6156 assert_eq!(
6157 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
6158 0,
6159 "{}, buffer {}, path {:?} has deferred operations",
6160 guest_client.username,
6161 buffer_id,
6162 path,
6163 );
6164 assert_eq!(
6165 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
6166 host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
6167 "{}, buffer {}, path {:?}, differs from the host's buffer",
6168 guest_client.username,
6169 buffer_id,
6170 path
6171 );
6172 }
6173
6174 guest_cx.update(|_| drop(guest_client));
6175 }
6176
6177 host_cx.update(|_| drop(host_client));
6178 }
6179
6180 struct TestServer {
6181 peer: Arc<Peer>,
6182 app_state: Arc<AppState>,
6183 server: Arc<Server>,
6184 foreground: Rc<executor::Foreground>,
6185 notifications: mpsc::UnboundedReceiver<()>,
6186 connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
6187 forbid_connections: Arc<AtomicBool>,
6188 _test_db: TestDb,
6189 }
6190
6191 impl TestServer {
6192 async fn start(
6193 foreground: Rc<executor::Foreground>,
6194 background: Arc<executor::Background>,
6195 ) -> Self {
6196 let test_db = TestDb::fake(background);
6197 let app_state = Self::build_app_state(&test_db).await;
6198 let peer = Peer::new();
6199 let notifications = mpsc::unbounded();
6200 let server = Server::new(app_state.clone(), Some(notifications.0));
6201 Self {
6202 peer,
6203 app_state,
6204 server,
6205 foreground,
6206 notifications: notifications.1,
6207 connection_killers: Default::default(),
6208 forbid_connections: Default::default(),
6209 _test_db: test_db,
6210 }
6211 }
6212
6213 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
6214 cx.update(|cx| {
6215 let settings = Settings::test(cx);
6216 cx.set_global(settings);
6217 });
6218
6219 let http = FakeHttpClient::with_404_response();
6220 let user_id =
6221 if let Ok(Some(user)) = self.app_state.db.get_user_by_github_login(name).await {
6222 user.id
6223 } else {
6224 self.app_state.db.create_user(name, false).await.unwrap()
6225 };
6226 let client_name = name.to_string();
6227 let mut client = Client::new(http.clone());
6228 let server = self.server.clone();
6229 let connection_killers = self.connection_killers.clone();
6230 let forbid_connections = self.forbid_connections.clone();
6231 let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
6232
6233 Arc::get_mut(&mut client)
6234 .unwrap()
6235 .override_authenticate(move |cx| {
6236 cx.spawn(|_| async move {
6237 let access_token = "the-token".to_string();
6238 Ok(Credentials {
6239 user_id: user_id.0 as u64,
6240 access_token,
6241 })
6242 })
6243 })
6244 .override_establish_connection(move |credentials, cx| {
6245 assert_eq!(credentials.user_id, user_id.0 as u64);
6246 assert_eq!(credentials.access_token, "the-token");
6247
6248 let server = server.clone();
6249 let connection_killers = connection_killers.clone();
6250 let forbid_connections = forbid_connections.clone();
6251 let client_name = client_name.clone();
6252 let connection_id_tx = connection_id_tx.clone();
6253 cx.spawn(move |cx| async move {
6254 if forbid_connections.load(SeqCst) {
6255 Err(EstablishConnectionError::other(anyhow!(
6256 "server is forbidding connections"
6257 )))
6258 } else {
6259 let (client_conn, server_conn, killed) =
6260 Connection::in_memory(cx.background());
6261 connection_killers.lock().insert(user_id, killed);
6262 cx.background()
6263 .spawn(server.handle_connection(
6264 server_conn,
6265 client_name,
6266 user_id,
6267 Some(connection_id_tx),
6268 cx.background(),
6269 ))
6270 .detach();
6271 Ok(client_conn)
6272 }
6273 })
6274 });
6275
6276 Channel::init(&client);
6277 Project::init(&client);
6278 cx.update(|cx| {
6279 workspace::init(&client, cx);
6280 });
6281
6282 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
6283 client
6284 .authenticate_and_connect(false, &cx.to_async())
6285 .await
6286 .unwrap();
6287 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
6288
6289 let client = TestClient {
6290 client,
6291 peer_id,
6292 username: name.to_string(),
6293 user_store,
6294 language_registry: Arc::new(LanguageRegistry::test()),
6295 project: Default::default(),
6296 buffers: Default::default(),
6297 };
6298 client.wait_for_current_user(cx).await;
6299 client
6300 }
6301
6302 fn disconnect_client(&self, user_id: UserId) {
6303 self.connection_killers
6304 .lock()
6305 .remove(&user_id)
6306 .unwrap()
6307 .store(true, SeqCst);
6308 }
6309
6310 fn forbid_connections(&self) {
6311 self.forbid_connections.store(true, SeqCst);
6312 }
6313
6314 fn allow_connections(&self) {
6315 self.forbid_connections.store(false, SeqCst);
6316 }
6317
6318 async fn make_contacts(&self, mut clients: Vec<(&TestClient, &mut TestAppContext)>) {
6319 while let Some((client_a, cx_a)) = clients.pop() {
6320 for (client_b, cx_b) in &mut clients {
6321 client_a
6322 .user_store
6323 .update(cx_a, |store, cx| {
6324 store.request_contact(client_b.user_id().unwrap(), cx)
6325 })
6326 .await
6327 .unwrap();
6328 cx_a.foreground().run_until_parked();
6329 client_b
6330 .user_store
6331 .update(*cx_b, |store, cx| {
6332 store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
6333 })
6334 .await
6335 .unwrap();
6336 }
6337 }
6338 }
6339
6340 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
6341 Arc::new(AppState {
6342 db: test_db.db().clone(),
6343 api_token: Default::default(),
6344 })
6345 }
6346
6347 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
6348 self.server.store.read().await
6349 }
6350
6351 async fn condition<F>(&mut self, mut predicate: F)
6352 where
6353 F: FnMut(&Store) -> bool,
6354 {
6355 assert!(
6356 self.foreground.parking_forbidden(),
6357 "you must call forbid_parking to use server conditions so we don't block indefinitely"
6358 );
6359 while !(predicate)(&*self.server.store.read().await) {
6360 self.foreground.start_waiting();
6361 self.notifications.next().await;
6362 self.foreground.finish_waiting();
6363 }
6364 }
6365 }
6366
6367 impl Deref for TestServer {
6368 type Target = Server;
6369
6370 fn deref(&self) -> &Self::Target {
6371 &self.server
6372 }
6373 }
6374
6375 impl Drop for TestServer {
6376 fn drop(&mut self) {
6377 self.peer.reset();
6378 }
6379 }
6380
6381 struct TestClient {
6382 client: Arc<Client>,
6383 username: String,
6384 pub peer_id: PeerId,
6385 pub user_store: ModelHandle<UserStore>,
6386 language_registry: Arc<LanguageRegistry>,
6387 project: Option<ModelHandle<Project>>,
6388 buffers: HashSet<ModelHandle<language::Buffer>>,
6389 }
6390
6391 impl Deref for TestClient {
6392 type Target = Arc<Client>;
6393
6394 fn deref(&self) -> &Self::Target {
6395 &self.client
6396 }
6397 }
6398
6399 struct ContactsSummary {
6400 pub current: Vec<String>,
6401 pub outgoing_requests: Vec<String>,
6402 pub incoming_requests: Vec<String>,
6403 }
6404
6405 impl TestClient {
6406 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
6407 UserId::from_proto(
6408 self.user_store
6409 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
6410 )
6411 }
6412
6413 async fn wait_for_current_user(&self, cx: &TestAppContext) {
6414 let mut authed_user = self
6415 .user_store
6416 .read_with(cx, |user_store, _| user_store.watch_current_user());
6417 while authed_user.next().await.unwrap().is_none() {}
6418 }
6419
6420 async fn clear_contacts(&self, cx: &mut TestAppContext) {
6421 self.user_store
6422 .update(cx, |store, _| store.clear_contacts())
6423 .await;
6424 }
6425
6426 fn summarize_contacts(&self, cx: &TestAppContext) -> ContactsSummary {
6427 self.user_store.read_with(cx, |store, _| ContactsSummary {
6428 current: store
6429 .contacts()
6430 .iter()
6431 .map(|contact| contact.user.github_login.clone())
6432 .collect(),
6433 outgoing_requests: store
6434 .outgoing_contact_requests()
6435 .iter()
6436 .map(|user| user.github_login.clone())
6437 .collect(),
6438 incoming_requests: store
6439 .incoming_contact_requests()
6440 .iter()
6441 .map(|user| user.github_login.clone())
6442 .collect(),
6443 })
6444 }
6445
6446 async fn build_local_project(
6447 &mut self,
6448 fs: Arc<FakeFs>,
6449 root_path: impl AsRef<Path>,
6450 cx: &mut TestAppContext,
6451 ) -> (ModelHandle<Project>, WorktreeId) {
6452 let project = cx.update(|cx| {
6453 Project::local(
6454 self.client.clone(),
6455 self.user_store.clone(),
6456 self.language_registry.clone(),
6457 fs,
6458 cx,
6459 )
6460 });
6461 self.project = Some(project.clone());
6462 let (worktree, _) = project
6463 .update(cx, |p, cx| {
6464 p.find_or_create_local_worktree(root_path, true, cx)
6465 })
6466 .await
6467 .unwrap();
6468 worktree
6469 .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
6470 .await;
6471 project
6472 .update(cx, |project, _| project.next_remote_id())
6473 .await;
6474 (project, worktree.read_with(cx, |tree, _| tree.id()))
6475 }
6476
6477 async fn build_remote_project(
6478 &mut self,
6479 host_project: &ModelHandle<Project>,
6480 host_cx: &mut TestAppContext,
6481 guest_cx: &mut TestAppContext,
6482 ) -> ModelHandle<Project> {
6483 let host_project_id = host_project
6484 .read_with(host_cx, |project, _| project.next_remote_id())
6485 .await;
6486 let guest_user_id = self.user_id().unwrap();
6487 let languages =
6488 host_project.read_with(host_cx, |project, _| project.languages().clone());
6489 let project_b = guest_cx.spawn(|mut cx| {
6490 let user_store = self.user_store.clone();
6491 let guest_client = self.client.clone();
6492 async move {
6493 Project::remote(
6494 host_project_id,
6495 guest_client,
6496 user_store.clone(),
6497 languages,
6498 FakeFs::new(cx.background()),
6499 &mut cx,
6500 )
6501 .await
6502 .unwrap()
6503 }
6504 });
6505 host_cx.foreground().run_until_parked();
6506 host_project.update(host_cx, |project, cx| {
6507 project.respond_to_join_request(guest_user_id, true, cx)
6508 });
6509 let project = project_b.await;
6510 self.project = Some(project.clone());
6511 project
6512 }
6513
6514 fn build_workspace(
6515 &self,
6516 project: &ModelHandle<Project>,
6517 cx: &mut TestAppContext,
6518 ) -> ViewHandle<Workspace> {
6519 let (window_id, _) = cx.add_window(|_| EmptyView);
6520 cx.add_view(window_id, |cx| {
6521 let fs = project.read(cx).fs().clone();
6522 Workspace::new(
6523 &WorkspaceParams {
6524 fs,
6525 project: project.clone(),
6526 user_store: self.user_store.clone(),
6527 languages: self.language_registry.clone(),
6528 themes: ThemeRegistry::new((), cx.font_cache().clone()),
6529 channel_list: cx.add_model(|cx| {
6530 ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
6531 }),
6532 client: self.client.clone(),
6533 },
6534 cx,
6535 )
6536 })
6537 }
6538
6539 async fn simulate_host(
6540 mut self,
6541 project: ModelHandle<Project>,
6542 op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6543 rng: Arc<Mutex<StdRng>>,
6544 mut cx: TestAppContext,
6545 ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6546 async fn simulate_host_internal(
6547 client: &mut TestClient,
6548 project: ModelHandle<Project>,
6549 mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6550 rng: Arc<Mutex<StdRng>>,
6551 cx: &mut TestAppContext,
6552 ) -> anyhow::Result<()> {
6553 let fs = project.read_with(cx, |project, _| project.fs().clone());
6554
6555 cx.update(|cx| {
6556 cx.subscribe(&project, move |project, event, cx| {
6557 if let project::Event::ContactRequestedJoin(user) = event {
6558 log::info!("Host: accepting join request from {}", user.github_login);
6559 project.update(cx, |project, cx| {
6560 project.respond_to_join_request(user.id, true, cx)
6561 });
6562 }
6563 })
6564 .detach();
6565 });
6566
6567 while op_start_signal.next().await.is_some() {
6568 let distribution = rng.lock().gen_range::<usize, _>(0..100);
6569 let files = fs.as_fake().files().await;
6570 match distribution {
6571 0..=19 if !files.is_empty() => {
6572 let path = files.choose(&mut *rng.lock()).unwrap();
6573 let mut path = path.as_path();
6574 while let Some(parent_path) = path.parent() {
6575 path = parent_path;
6576 if rng.lock().gen() {
6577 break;
6578 }
6579 }
6580
6581 log::info!("Host: find/create local worktree {:?}", path);
6582 let find_or_create_worktree = project.update(cx, |project, cx| {
6583 project.find_or_create_local_worktree(path, true, cx)
6584 });
6585 if rng.lock().gen() {
6586 cx.background().spawn(find_or_create_worktree).detach();
6587 } else {
6588 find_or_create_worktree.await?;
6589 }
6590 }
6591 20..=79 if !files.is_empty() => {
6592 let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6593 let file = files.choose(&mut *rng.lock()).unwrap();
6594 let (worktree, path) = project
6595 .update(cx, |project, cx| {
6596 project.find_or_create_local_worktree(
6597 file.clone(),
6598 true,
6599 cx,
6600 )
6601 })
6602 .await?;
6603 let project_path =
6604 worktree.read_with(cx, |worktree, _| (worktree.id(), path));
6605 log::info!(
6606 "Host: opening path {:?}, worktree {}, relative_path {:?}",
6607 file,
6608 project_path.0,
6609 project_path.1
6610 );
6611 let buffer = project
6612 .update(cx, |project, cx| project.open_buffer(project_path, cx))
6613 .await
6614 .unwrap();
6615 client.buffers.insert(buffer.clone());
6616 buffer
6617 } else {
6618 client
6619 .buffers
6620 .iter()
6621 .choose(&mut *rng.lock())
6622 .unwrap()
6623 .clone()
6624 };
6625
6626 if rng.lock().gen_bool(0.1) {
6627 cx.update(|cx| {
6628 log::info!(
6629 "Host: dropping buffer {:?}",
6630 buffer.read(cx).file().unwrap().full_path(cx)
6631 );
6632 client.buffers.remove(&buffer);
6633 drop(buffer);
6634 });
6635 } else {
6636 buffer.update(cx, |buffer, cx| {
6637 log::info!(
6638 "Host: updating buffer {:?} ({})",
6639 buffer.file().unwrap().full_path(cx),
6640 buffer.remote_id()
6641 );
6642
6643 if rng.lock().gen_bool(0.7) {
6644 buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6645 } else {
6646 buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6647 }
6648 });
6649 }
6650 }
6651 _ => loop {
6652 let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
6653 let mut path = PathBuf::new();
6654 path.push("/");
6655 for _ in 0..path_component_count {
6656 let letter = rng.lock().gen_range(b'a'..=b'z');
6657 path.push(std::str::from_utf8(&[letter]).unwrap());
6658 }
6659 path.set_extension("rs");
6660 let parent_path = path.parent().unwrap();
6661
6662 log::info!("Host: creating file {:?}", path,);
6663
6664 if fs.create_dir(&parent_path).await.is_ok()
6665 && fs.create_file(&path, Default::default()).await.is_ok()
6666 {
6667 break;
6668 } else {
6669 log::info!("Host: cannot create file");
6670 }
6671 },
6672 }
6673
6674 cx.background().simulate_random_delay().await;
6675 }
6676
6677 Ok(())
6678 }
6679
6680 let result =
6681 simulate_host_internal(&mut self, project.clone(), op_start_signal, rng, &mut cx)
6682 .await;
6683 log::info!("Host done");
6684 self.project = Some(project);
6685 (self, cx, result.err())
6686 }
6687
6688 pub async fn simulate_guest(
6689 mut self,
6690 guest_username: String,
6691 project: ModelHandle<Project>,
6692 op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6693 rng: Arc<Mutex<StdRng>>,
6694 mut cx: TestAppContext,
6695 ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6696 async fn simulate_guest_internal(
6697 client: &mut TestClient,
6698 guest_username: &str,
6699 project: ModelHandle<Project>,
6700 mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6701 rng: Arc<Mutex<StdRng>>,
6702 cx: &mut TestAppContext,
6703 ) -> anyhow::Result<()> {
6704 while op_start_signal.next().await.is_some() {
6705 let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6706 let worktree = if let Some(worktree) =
6707 project.read_with(cx, |project, cx| {
6708 project
6709 .worktrees(&cx)
6710 .filter(|worktree| {
6711 let worktree = worktree.read(cx);
6712 worktree.is_visible()
6713 && worktree.entries(false).any(|e| e.is_file())
6714 })
6715 .choose(&mut *rng.lock())
6716 }) {
6717 worktree
6718 } else {
6719 cx.background().simulate_random_delay().await;
6720 continue;
6721 };
6722
6723 let (worktree_root_name, project_path) =
6724 worktree.read_with(cx, |worktree, _| {
6725 let entry = worktree
6726 .entries(false)
6727 .filter(|e| e.is_file())
6728 .choose(&mut *rng.lock())
6729 .unwrap();
6730 (
6731 worktree.root_name().to_string(),
6732 (worktree.id(), entry.path.clone()),
6733 )
6734 });
6735 log::info!(
6736 "{}: opening path {:?} in worktree {} ({})",
6737 guest_username,
6738 project_path.1,
6739 project_path.0,
6740 worktree_root_name,
6741 );
6742 let buffer = project
6743 .update(cx, |project, cx| {
6744 project.open_buffer(project_path.clone(), cx)
6745 })
6746 .await?;
6747 log::info!(
6748 "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
6749 guest_username,
6750 project_path.1,
6751 project_path.0,
6752 worktree_root_name,
6753 buffer.read_with(cx, |buffer, _| buffer.remote_id())
6754 );
6755 client.buffers.insert(buffer.clone());
6756 buffer
6757 } else {
6758 client
6759 .buffers
6760 .iter()
6761 .choose(&mut *rng.lock())
6762 .unwrap()
6763 .clone()
6764 };
6765
6766 let choice = rng.lock().gen_range(0..100);
6767 match choice {
6768 0..=9 => {
6769 cx.update(|cx| {
6770 log::info!(
6771 "{}: dropping buffer {:?}",
6772 guest_username,
6773 buffer.read(cx).file().unwrap().full_path(cx)
6774 );
6775 client.buffers.remove(&buffer);
6776 drop(buffer);
6777 });
6778 }
6779 10..=19 => {
6780 let completions = project.update(cx, |project, cx| {
6781 log::info!(
6782 "{}: requesting completions for buffer {} ({:?})",
6783 guest_username,
6784 buffer.read(cx).remote_id(),
6785 buffer.read(cx).file().unwrap().full_path(cx)
6786 );
6787 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6788 project.completions(&buffer, offset, cx)
6789 });
6790 let completions = cx.background().spawn(async move {
6791 completions
6792 .await
6793 .map_err(|err| anyhow!("completions request failed: {:?}", err))
6794 });
6795 if rng.lock().gen_bool(0.3) {
6796 log::info!("{}: detaching completions request", guest_username);
6797 cx.update(|cx| completions.detach_and_log_err(cx));
6798 } else {
6799 completions.await?;
6800 }
6801 }
6802 20..=29 => {
6803 let code_actions = project.update(cx, |project, cx| {
6804 log::info!(
6805 "{}: requesting code actions for buffer {} ({:?})",
6806 guest_username,
6807 buffer.read(cx).remote_id(),
6808 buffer.read(cx).file().unwrap().full_path(cx)
6809 );
6810 let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
6811 project.code_actions(&buffer, range, cx)
6812 });
6813 let code_actions = cx.background().spawn(async move {
6814 code_actions.await.map_err(|err| {
6815 anyhow!("code actions request failed: {:?}", err)
6816 })
6817 });
6818 if rng.lock().gen_bool(0.3) {
6819 log::info!("{}: detaching code actions request", guest_username);
6820 cx.update(|cx| code_actions.detach_and_log_err(cx));
6821 } else {
6822 code_actions.await?;
6823 }
6824 }
6825 30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
6826 let (requested_version, save) = buffer.update(cx, |buffer, cx| {
6827 log::info!(
6828 "{}: saving buffer {} ({:?})",
6829 guest_username,
6830 buffer.remote_id(),
6831 buffer.file().unwrap().full_path(cx)
6832 );
6833 (buffer.version(), buffer.save(cx))
6834 });
6835 let save = cx.background().spawn(async move {
6836 let (saved_version, _) = save
6837 .await
6838 .map_err(|err| anyhow!("save request failed: {:?}", err))?;
6839 assert!(saved_version.observed_all(&requested_version));
6840 Ok::<_, anyhow::Error>(())
6841 });
6842 if rng.lock().gen_bool(0.3) {
6843 log::info!("{}: detaching save request", guest_username);
6844 cx.update(|cx| save.detach_and_log_err(cx));
6845 } else {
6846 save.await?;
6847 }
6848 }
6849 40..=44 => {
6850 let prepare_rename = project.update(cx, |project, cx| {
6851 log::info!(
6852 "{}: preparing rename for buffer {} ({:?})",
6853 guest_username,
6854 buffer.read(cx).remote_id(),
6855 buffer.read(cx).file().unwrap().full_path(cx)
6856 );
6857 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6858 project.prepare_rename(buffer, offset, cx)
6859 });
6860 let prepare_rename = cx.background().spawn(async move {
6861 prepare_rename.await.map_err(|err| {
6862 anyhow!("prepare rename request failed: {:?}", err)
6863 })
6864 });
6865 if rng.lock().gen_bool(0.3) {
6866 log::info!("{}: detaching prepare rename request", guest_username);
6867 cx.update(|cx| prepare_rename.detach_and_log_err(cx));
6868 } else {
6869 prepare_rename.await?;
6870 }
6871 }
6872 45..=49 => {
6873 let definitions = project.update(cx, |project, cx| {
6874 log::info!(
6875 "{}: requesting definitions for buffer {} ({:?})",
6876 guest_username,
6877 buffer.read(cx).remote_id(),
6878 buffer.read(cx).file().unwrap().full_path(cx)
6879 );
6880 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6881 project.definition(&buffer, offset, cx)
6882 });
6883 let definitions = cx.background().spawn(async move {
6884 definitions
6885 .await
6886 .map_err(|err| anyhow!("definitions request failed: {:?}", err))
6887 });
6888 if rng.lock().gen_bool(0.3) {
6889 log::info!("{}: detaching definitions request", guest_username);
6890 cx.update(|cx| definitions.detach_and_log_err(cx));
6891 } else {
6892 client
6893 .buffers
6894 .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
6895 }
6896 }
6897 50..=54 => {
6898 let highlights = project.update(cx, |project, cx| {
6899 log::info!(
6900 "{}: requesting highlights for buffer {} ({:?})",
6901 guest_username,
6902 buffer.read(cx).remote_id(),
6903 buffer.read(cx).file().unwrap().full_path(cx)
6904 );
6905 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6906 project.document_highlights(&buffer, offset, cx)
6907 });
6908 let highlights = cx.background().spawn(async move {
6909 highlights
6910 .await
6911 .map_err(|err| anyhow!("highlights request failed: {:?}", err))
6912 });
6913 if rng.lock().gen_bool(0.3) {
6914 log::info!("{}: detaching highlights request", guest_username);
6915 cx.update(|cx| highlights.detach_and_log_err(cx));
6916 } else {
6917 highlights.await?;
6918 }
6919 }
6920 55..=59 => {
6921 let search = project.update(cx, |project, cx| {
6922 let query = rng.lock().gen_range('a'..='z');
6923 log::info!("{}: project-wide search {:?}", guest_username, query);
6924 project.search(SearchQuery::text(query, false, false), cx)
6925 });
6926 let search = cx.background().spawn(async move {
6927 search
6928 .await
6929 .map_err(|err| anyhow!("search request failed: {:?}", err))
6930 });
6931 if rng.lock().gen_bool(0.3) {
6932 log::info!("{}: detaching search request", guest_username);
6933 cx.update(|cx| search.detach_and_log_err(cx));
6934 } else {
6935 client.buffers.extend(search.await?.into_keys());
6936 }
6937 }
6938 60..=69 => {
6939 let worktree = project
6940 .read_with(cx, |project, cx| {
6941 project
6942 .worktrees(&cx)
6943 .filter(|worktree| {
6944 let worktree = worktree.read(cx);
6945 worktree.is_visible()
6946 && worktree.entries(false).any(|e| e.is_file())
6947 && worktree
6948 .root_entry()
6949 .map_or(false, |e| e.is_dir())
6950 })
6951 .choose(&mut *rng.lock())
6952 })
6953 .unwrap();
6954 let (worktree_id, worktree_root_name) = worktree
6955 .read_with(cx, |worktree, _| {
6956 (worktree.id(), worktree.root_name().to_string())
6957 });
6958
6959 let mut new_name = String::new();
6960 for _ in 0..10 {
6961 let letter = rng.lock().gen_range('a'..='z');
6962 new_name.push(letter);
6963 }
6964 let mut new_path = PathBuf::new();
6965 new_path.push(new_name);
6966 new_path.set_extension("rs");
6967 log::info!(
6968 "{}: creating {:?} in worktree {} ({})",
6969 guest_username,
6970 new_path,
6971 worktree_id,
6972 worktree_root_name,
6973 );
6974 project
6975 .update(cx, |project, cx| {
6976 project.create_entry((worktree_id, new_path), false, cx)
6977 })
6978 .unwrap()
6979 .await?;
6980 }
6981 _ => {
6982 buffer.update(cx, |buffer, cx| {
6983 log::info!(
6984 "{}: updating buffer {} ({:?})",
6985 guest_username,
6986 buffer.remote_id(),
6987 buffer.file().unwrap().full_path(cx)
6988 );
6989 if rng.lock().gen_bool(0.7) {
6990 buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6991 } else {
6992 buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6993 }
6994 });
6995 }
6996 }
6997 cx.background().simulate_random_delay().await;
6998 }
6999 Ok(())
7000 }
7001
7002 let result = simulate_guest_internal(
7003 &mut self,
7004 &guest_username,
7005 project.clone(),
7006 op_start_signal,
7007 rng,
7008 &mut cx,
7009 )
7010 .await;
7011 log::info!("{}: done", guest_username);
7012
7013 self.project = Some(project);
7014 (self, cx, result.err())
7015 }
7016 }
7017
7018 impl Drop for TestClient {
7019 fn drop(&mut self) {
7020 self.client.tear_down();
7021 }
7022 }
7023
7024 impl Executor for Arc<gpui::executor::Background> {
7025 type Sleep = gpui::executor::Timer;
7026
7027 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
7028 self.spawn(future).detach();
7029 }
7030
7031 fn sleep(&self, duration: Duration) -> Self::Sleep {
7032 self.as_ref().timer(duration)
7033 }
7034 }
7035
7036 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
7037 channel
7038 .messages()
7039 .cursor::<()>()
7040 .map(|m| {
7041 (
7042 m.sender.github_login.clone(),
7043 m.body.clone(),
7044 m.is_pending(),
7045 )
7046 })
7047 .collect()
7048 }
7049
7050 struct EmptyView;
7051
7052 impl gpui::Entity for EmptyView {
7053 type Event = ();
7054 }
7055
7056 impl gpui::View for EmptyView {
7057 fn ui_name() -> &'static str {
7058 "empty view"
7059 }
7060
7061 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
7062 gpui::Element::boxed(gpui::elements::Empty::new())
7063 }
7064 }
7065}