1mod store;
2
3use crate::{
4 auth,
5 db::{self, ChannelId, MessageId, UserId},
6 AppState, Result,
7};
8use anyhow::anyhow;
9use async_tungstenite::tungstenite::{
10 protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
11};
12use axum::{
13 body::Body,
14 extract::{
15 ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
16 ConnectInfo, WebSocketUpgrade,
17 },
18 headers::{Header, HeaderName},
19 http::StatusCode,
20 middleware,
21 response::IntoResponse,
22 routing::get,
23 Extension, Router, TypedHeader,
24};
25use collections::HashMap;
26use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
27use lazy_static::lazy_static;
28use rpc::{
29 proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
30 Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
31};
32use std::{
33 any::TypeId,
34 future::Future,
35 marker::PhantomData,
36 net::SocketAddr,
37 ops::{Deref, DerefMut},
38 rc::Rc,
39 sync::{
40 atomic::{AtomicBool, Ordering::SeqCst},
41 Arc,
42 },
43 time::Duration,
44};
45use store::{Store, Worktree};
46use time::OffsetDateTime;
47use tokio::{
48 sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
49 time::Sleep,
50};
51use tower::ServiceBuilder;
52use tracing::{info_span, Instrument};
53
54type MessageHandler =
55 Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
56
57struct Response<R> {
58 server: Arc<Server>,
59 receipt: Receipt<R>,
60 responded: Arc<AtomicBool>,
61}
62
63impl<R: RequestMessage> Response<R> {
64 fn send(self, payload: R::Response) -> Result<()> {
65 self.responded.store(true, SeqCst);
66 self.server.peer.respond(self.receipt, payload)?;
67 Ok(())
68 }
69
70 fn into_receipt(self) -> Receipt<R> {
71 self.responded.store(true, SeqCst);
72 self.receipt
73 }
74}
75
76pub struct Server {
77 peer: Arc<Peer>,
78 store: RwLock<Store>,
79 app_state: Arc<AppState>,
80 handlers: HashMap<TypeId, MessageHandler>,
81 notifications: Option<mpsc::UnboundedSender<()>>,
82}
83
84pub trait Executor: Send + Clone {
85 type Sleep: Send + Future;
86 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
87 fn sleep(&self, duration: Duration) -> Self::Sleep;
88}
89
90#[derive(Clone)]
91pub struct RealExecutor;
92
93const MESSAGE_COUNT_PER_PAGE: usize = 100;
94const MAX_MESSAGE_LEN: usize = 1024;
95
96struct StoreReadGuard<'a> {
97 guard: RwLockReadGuard<'a, Store>,
98 _not_send: PhantomData<Rc<()>>,
99}
100
101struct StoreWriteGuard<'a> {
102 guard: RwLockWriteGuard<'a, Store>,
103 _not_send: PhantomData<Rc<()>>,
104}
105
106impl Server {
107 pub fn new(
108 app_state: Arc<AppState>,
109 notifications: Option<mpsc::UnboundedSender<()>>,
110 ) -> Arc<Self> {
111 let mut server = Self {
112 peer: Peer::new(),
113 app_state,
114 store: Default::default(),
115 handlers: Default::default(),
116 notifications,
117 };
118
119 server
120 .add_request_handler(Server::ping)
121 .add_request_handler(Server::register_project)
122 .add_message_handler(Server::unregister_project)
123 .add_request_handler(Server::join_project)
124 .add_message_handler(Server::leave_project)
125 .add_message_handler(Server::respond_to_join_project_request)
126 .add_request_handler(Server::register_worktree)
127 .add_message_handler(Server::unregister_worktree)
128 .add_request_handler(Server::update_worktree)
129 .add_message_handler(Server::start_language_server)
130 .add_message_handler(Server::update_language_server)
131 .add_message_handler(Server::update_diagnostic_summary)
132 .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
133 .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
134 .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
135 .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
136 .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
137 .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
138 .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
139 .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
140 .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
141 .add_request_handler(
142 Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
143 )
144 .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
145 .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
146 .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
147 .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
148 .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
149 .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
150 .add_request_handler(Server::forward_project_request::<proto::CreateProjectEntry>)
151 .add_request_handler(Server::forward_project_request::<proto::RenameProjectEntry>)
152 .add_request_handler(Server::forward_project_request::<proto::DeleteProjectEntry>)
153 .add_request_handler(Server::update_buffer)
154 .add_message_handler(Server::update_buffer_file)
155 .add_message_handler(Server::buffer_reloaded)
156 .add_message_handler(Server::buffer_saved)
157 .add_request_handler(Server::save_buffer)
158 .add_request_handler(Server::get_channels)
159 .add_request_handler(Server::get_users)
160 .add_request_handler(Server::fuzzy_search_users)
161 .add_request_handler(Server::request_contact)
162 .add_request_handler(Server::remove_contact)
163 .add_request_handler(Server::respond_to_contact_request)
164 .add_request_handler(Server::join_channel)
165 .add_message_handler(Server::leave_channel)
166 .add_request_handler(Server::send_channel_message)
167 .add_request_handler(Server::follow)
168 .add_message_handler(Server::unfollow)
169 .add_message_handler(Server::update_followers)
170 .add_request_handler(Server::get_channel_messages);
171
172 Arc::new(server)
173 }
174
175 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
176 where
177 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
178 Fut: 'static + Send + Future<Output = Result<()>>,
179 M: EnvelopedMessage,
180 {
181 let prev_handler = self.handlers.insert(
182 TypeId::of::<M>(),
183 Box::new(move |server, envelope| {
184 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
185 let span = info_span!(
186 "handle message",
187 payload_type = envelope.payload_type_name(),
188 payload = format!("{:?}", envelope.payload).as_str(),
189 );
190 let future = (handler)(server, *envelope);
191 async move {
192 if let Err(error) = future.await {
193 tracing::error!(%error, "error handling message");
194 }
195 }
196 .instrument(span)
197 .boxed()
198 }),
199 );
200 if prev_handler.is_some() {
201 panic!("registered a handler for the same message twice");
202 }
203 self
204 }
205
206 /// Handle a request while holding a lock to the store. This is useful when we're registering
207 /// a connection but we want to respond on the connection before anybody else can send on it.
208 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
209 where
210 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>, Response<M>) -> Fut,
211 Fut: Send + Future<Output = Result<()>>,
212 M: RequestMessage,
213 {
214 let handler = Arc::new(handler);
215 self.add_message_handler(move |server, envelope| {
216 let receipt = envelope.receipt();
217 let handler = handler.clone();
218 async move {
219 let responded = Arc::new(AtomicBool::default());
220 let response = Response {
221 server: server.clone(),
222 responded: responded.clone(),
223 receipt: envelope.receipt(),
224 };
225 match (handler)(server.clone(), envelope, response).await {
226 Ok(()) => {
227 if responded.load(std::sync::atomic::Ordering::SeqCst) {
228 Ok(())
229 } else {
230 Err(anyhow!("handler did not send a response"))?
231 }
232 }
233 Err(error) => {
234 server.peer.respond_with_error(
235 receipt,
236 proto::Error {
237 message: error.to_string(),
238 },
239 )?;
240 Err(error)
241 }
242 }
243 }
244 })
245 }
246
247 pub fn handle_connection<E: Executor>(
248 self: &Arc<Self>,
249 connection: Connection,
250 address: String,
251 user_id: UserId,
252 mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
253 executor: E,
254 ) -> impl Future<Output = Result<()>> {
255 let mut this = self.clone();
256 let span = info_span!("handle connection", %user_id, %address);
257 async move {
258 let (connection_id, handle_io, mut incoming_rx) = this
259 .peer
260 .add_connection(connection, {
261 let executor = executor.clone();
262 move |duration| {
263 let timer = executor.sleep(duration);
264 async move {
265 timer.await;
266 }
267 }
268 })
269 .await;
270
271 tracing::info!(%user_id, %connection_id, %address, "connection opened");
272
273 if let Some(send_connection_id) = send_connection_id.as_mut() {
274 let _ = send_connection_id.send(connection_id).await;
275 }
276
277 let contacts = this.app_state.db.get_contacts(user_id).await?;
278
279 {
280 let mut store = this.store_mut().await;
281 store.add_connection(connection_id, user_id);
282 this.peer.send(connection_id, store.build_initial_contacts_update(contacts))?;
283 }
284 this.update_user_contacts(user_id).await?;
285
286 let handle_io = handle_io.fuse();
287 futures::pin_mut!(handle_io);
288 loop {
289 let next_message = incoming_rx.next().fuse();
290 futures::pin_mut!(next_message);
291 futures::select_biased! {
292 result = handle_io => {
293 if let Err(error) = result {
294 tracing::error!(%error, "error handling I/O");
295 }
296 break;
297 }
298 message = next_message => {
299 if let Some(message) = message {
300 let type_name = message.payload_type_name();
301 let span = tracing::info_span!("receive message", %user_id, %connection_id, %address, type_name);
302 async {
303 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
304 let notifications = this.notifications.clone();
305 let is_background = message.is_background();
306 let handle_message = (handler)(this.clone(), message);
307 let handle_message = async move {
308 handle_message.await;
309 if let Some(mut notifications) = notifications {
310 let _ = notifications.send(()).await;
311 }
312 };
313 if is_background {
314 executor.spawn_detached(handle_message);
315 } else {
316 handle_message.await;
317 }
318 } else {
319 tracing::error!("no message handler");
320 }
321 }.instrument(span).await;
322 } else {
323 tracing::info!(%user_id, %connection_id, %address, "connection closed");
324 break;
325 }
326 }
327 }
328 }
329
330 if let Err(error) = this.sign_out(connection_id).await {
331 tracing::error!(%error, "error signing out");
332 }
333
334 Ok(())
335 }.instrument(span)
336 }
337
338 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
339 println!("Signing out {:?}", connection_id);
340 self.peer.disconnect(connection_id);
341 let removed_connection = self.store_mut().await.remove_connection(connection_id)?;
342
343 for (project_id, project) in removed_connection.hosted_projects {
344 broadcast(connection_id, project.guests.keys().copied(), |conn_id| {
345 self.peer
346 .send(conn_id, proto::UnregisterProject { project_id })
347 });
348 }
349
350 for (project_id, peer_ids) in removed_connection.guest_project_ids {
351 broadcast(connection_id, peer_ids, |conn_id| {
352 self.peer.send(
353 conn_id,
354 proto::RemoveProjectCollaborator {
355 project_id,
356 peer_id: connection_id.0,
357 },
358 )
359 });
360 }
361
362 self.update_user_contacts(removed_connection.user_id)
363 .await?;
364
365 Ok(())
366 }
367
368 async fn ping(
369 self: Arc<Server>,
370 _: TypedEnvelope<proto::Ping>,
371 response: Response<proto::Ping>,
372 ) -> Result<()> {
373 response.send(proto::Ack {})?;
374 Ok(())
375 }
376
377 async fn register_project(
378 self: Arc<Server>,
379 request: TypedEnvelope<proto::RegisterProject>,
380 response: Response<proto::RegisterProject>,
381 ) -> Result<()> {
382 let user_id;
383 let project_id;
384 {
385 let mut state = self.store_mut().await;
386 user_id = state.user_id_for_connection(request.sender_id)?;
387 project_id = state.register_project(request.sender_id, user_id);
388 };
389 self.update_user_contacts(user_id).await?;
390 response.send(proto::RegisterProjectResponse { project_id })?;
391 Ok(())
392 }
393
394 async fn unregister_project(
395 self: Arc<Server>,
396 request: TypedEnvelope<proto::UnregisterProject>,
397 ) -> Result<()> {
398 let user_id = {
399 let mut state = self.store_mut().await;
400 state.unregister_project(request.payload.project_id, request.sender_id)?;
401 state.user_id_for_connection(request.sender_id)?
402 };
403
404 self.update_user_contacts(user_id).await?;
405 Ok(())
406 }
407
408 // async fn share_project(
409 // self: Arc<Server>,
410 // request: TypedEnvelope<proto::ShareProject>,
411 // response: Response<proto::ShareProject>,
412 // ) -> Result<()> {
413 // let user_id = {
414 // let mut state = self.store_mut().await;
415 // state.share_project(request.payload.project_id, request.sender_id)?;
416 // state.user_id_for_connection(request.sender_id)?
417 // };
418 // self.update_user_contacts(user_id).await?;
419 // response.send(proto::Ack {})?;
420 // Ok(())
421 // }
422
423 async fn update_user_contacts(self: &Arc<Server>, user_id: UserId) -> Result<()> {
424 let contacts = self.app_state.db.get_contacts(user_id).await?;
425 let store = self.store().await;
426 let updated_contact = store.contact_for_user(user_id, false);
427 for contact in contacts {
428 if let db::Contact::Accepted {
429 user_id: contact_user_id,
430 ..
431 } = contact
432 {
433 for contact_conn_id in store.connection_ids_for_user(contact_user_id) {
434 self.peer
435 .send(
436 contact_conn_id,
437 proto::UpdateContacts {
438 contacts: vec![updated_contact.clone()],
439 remove_contacts: Default::default(),
440 incoming_requests: Default::default(),
441 remove_incoming_requests: Default::default(),
442 outgoing_requests: Default::default(),
443 remove_outgoing_requests: Default::default(),
444 },
445 )
446 .trace_err();
447 }
448 }
449 }
450 Ok(())
451 }
452
453 async fn join_project(
454 self: Arc<Server>,
455 request: TypedEnvelope<proto::JoinProject>,
456 response: Response<proto::JoinProject>,
457 ) -> Result<()> {
458 let project_id = request.payload.project_id;
459 let host_user_id;
460 let guest_user_id;
461 let host_connection_id;
462 {
463 let state = self.store().await;
464 let project = state.project(project_id)?;
465 host_user_id = project.host_user_id;
466 host_connection_id = project.host_connection_id;
467 guest_user_id = state.user_id_for_connection(request.sender_id)?;
468 };
469
470 let has_contact = self
471 .app_state
472 .db
473 .has_contact(guest_user_id, host_user_id)
474 .await?;
475 if !has_contact {
476 return Err(anyhow!("no such project"))?;
477 }
478
479 self.store_mut().await.request_join_project(
480 guest_user_id,
481 project_id,
482 response.into_receipt(),
483 )?;
484 self.peer.send(
485 host_connection_id,
486 proto::RequestJoinProject {
487 project_id,
488 requester_id: guest_user_id.to_proto(),
489 },
490 )?;
491 Ok(())
492 }
493
494 async fn respond_to_join_project_request(
495 self: Arc<Server>,
496 request: TypedEnvelope<proto::RespondToJoinProjectRequest>,
497 ) -> Result<()> {
498 let host_user_id;
499
500 {
501 let mut state = self.store_mut().await;
502 let project_id = request.payload.project_id;
503 let project = state.project(project_id)?;
504 if project.host_connection_id != request.sender_id {
505 Err(anyhow!("no such connection"))?;
506 }
507
508 host_user_id = project.host_user_id;
509 let guest_user_id = UserId::from_proto(request.payload.requester_id);
510
511 if !request.payload.allow {
512 let receipts = state
513 .deny_join_project_request(request.sender_id, guest_user_id, project_id)
514 .ok_or_else(|| anyhow!("no such request"))?;
515 for receipt in receipts {
516 self.peer.respond(
517 receipt,
518 proto::JoinProjectResponse {
519 variant: Some(proto::join_project_response::Variant::Decline(
520 proto::join_project_response::Decline {},
521 )),
522 },
523 )?;
524 }
525 return Ok(());
526 }
527
528 let (receipts_with_replica_ids, project) = state
529 .accept_join_project_request(request.sender_id, guest_user_id, project_id)
530 .ok_or_else(|| anyhow!("no such request"))?;
531
532 let peer_count = project.guests.len();
533 let mut collaborators = Vec::with_capacity(peer_count);
534 collaborators.push(proto::Collaborator {
535 peer_id: project.host_connection_id.0,
536 replica_id: 0,
537 user_id: project.host_user_id.to_proto(),
538 });
539 let worktrees = project
540 .worktrees
541 .iter()
542 .filter_map(|(id, shared_worktree)| {
543 let worktree = project.worktrees.get(&id)?;
544 Some(proto::Worktree {
545 id: *id,
546 root_name: worktree.root_name.clone(),
547 entries: shared_worktree.entries.values().cloned().collect(),
548 diagnostic_summaries: shared_worktree
549 .diagnostic_summaries
550 .values()
551 .cloned()
552 .collect(),
553 visible: worktree.visible,
554 scan_id: shared_worktree.scan_id,
555 })
556 })
557 .collect::<Vec<_>>();
558
559 // Add all guests other than the requesting user's own connections as collaborators
560 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &project.guests {
561 if receipts_with_replica_ids
562 .iter()
563 .all(|(receipt, _)| receipt.sender_id != *peer_conn_id)
564 {
565 collaborators.push(proto::Collaborator {
566 peer_id: peer_conn_id.0,
567 replica_id: *peer_replica_id as u32,
568 user_id: peer_user_id.to_proto(),
569 });
570 }
571 }
572
573 for conn_id in project.connection_ids() {
574 for (receipt, replica_id) in &receipts_with_replica_ids {
575 if conn_id != receipt.sender_id {
576 self.peer.send(
577 conn_id,
578 proto::AddProjectCollaborator {
579 project_id,
580 collaborator: Some(proto::Collaborator {
581 peer_id: receipt.sender_id.0,
582 replica_id: *replica_id as u32,
583 user_id: guest_user_id.to_proto(),
584 }),
585 },
586 )?;
587 }
588 }
589 }
590
591 for (receipt, replica_id) in receipts_with_replica_ids {
592 self.peer.respond(
593 receipt,
594 proto::JoinProjectResponse {
595 variant: Some(proto::join_project_response::Variant::Accept(
596 proto::join_project_response::Accept {
597 worktrees: worktrees.clone(),
598 replica_id: replica_id as u32,
599 collaborators: collaborators.clone(),
600 language_servers: project.language_servers.clone(),
601 },
602 )),
603 },
604 )?;
605 }
606 }
607
608 self.update_user_contacts(host_user_id).await?;
609 Ok(())
610 }
611
612 async fn leave_project(
613 self: Arc<Server>,
614 request: TypedEnvelope<proto::LeaveProject>,
615 ) -> Result<()> {
616 let sender_id = request.sender_id;
617 let project_id = request.payload.project_id;
618 let project;
619 {
620 let mut state = self.store_mut().await;
621 project = state.leave_project(sender_id, project_id)?;
622 broadcast(sender_id, project.connection_ids, |conn_id| {
623 self.peer.send(
624 conn_id,
625 proto::RemoveProjectCollaborator {
626 project_id,
627 peer_id: sender_id.0,
628 },
629 )
630 });
631 }
632 self.update_user_contacts(project.host_user_id).await?;
633 Ok(())
634 }
635
636 async fn register_worktree(
637 self: Arc<Server>,
638 request: TypedEnvelope<proto::RegisterWorktree>,
639 response: Response<proto::RegisterWorktree>,
640 ) -> Result<()> {
641 let host_user_id;
642 {
643 let mut state = self.store_mut().await;
644 host_user_id = state.user_id_for_connection(request.sender_id)?;
645
646 let guest_connection_ids = state
647 .read_project(request.payload.project_id, request.sender_id)?
648 .guest_connection_ids();
649 state.register_worktree(
650 request.payload.project_id,
651 request.payload.worktree_id,
652 request.sender_id,
653 Worktree {
654 root_name: request.payload.root_name.clone(),
655 visible: request.payload.visible,
656 ..Default::default()
657 },
658 )?;
659
660 broadcast(request.sender_id, guest_connection_ids, |connection_id| {
661 self.peer
662 .forward_send(request.sender_id, connection_id, request.payload.clone())
663 });
664 }
665 self.update_user_contacts(host_user_id).await?;
666 response.send(proto::Ack {})?;
667 Ok(())
668 }
669
670 async fn unregister_worktree(
671 self: Arc<Server>,
672 request: TypedEnvelope<proto::UnregisterWorktree>,
673 ) -> Result<()> {
674 let host_user_id;
675 let project_id = request.payload.project_id;
676 let worktree_id = request.payload.worktree_id;
677 {
678 let mut state = self.store_mut().await;
679 let (_, guest_connection_ids) =
680 state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
681 host_user_id = state.user_id_for_connection(request.sender_id)?;
682 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
683 self.peer.send(
684 conn_id,
685 proto::UnregisterWorktree {
686 project_id,
687 worktree_id,
688 },
689 )
690 });
691 }
692 self.update_user_contacts(host_user_id).await?;
693 Ok(())
694 }
695
696 async fn update_worktree(
697 self: Arc<Server>,
698 request: TypedEnvelope<proto::UpdateWorktree>,
699 response: Response<proto::UpdateWorktree>,
700 ) -> Result<()> {
701 let connection_ids = self.store_mut().await.update_worktree(
702 request.sender_id,
703 request.payload.project_id,
704 request.payload.worktree_id,
705 &request.payload.removed_entries,
706 &request.payload.updated_entries,
707 request.payload.scan_id,
708 )?;
709
710 broadcast(request.sender_id, connection_ids, |connection_id| {
711 self.peer
712 .forward_send(request.sender_id, connection_id, request.payload.clone())
713 });
714 response.send(proto::Ack {})?;
715 Ok(())
716 }
717
718 async fn update_diagnostic_summary(
719 self: Arc<Server>,
720 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
721 ) -> Result<()> {
722 let summary = request
723 .payload
724 .summary
725 .clone()
726 .ok_or_else(|| anyhow!("invalid summary"))?;
727 let receiver_ids = self.store_mut().await.update_diagnostic_summary(
728 request.payload.project_id,
729 request.payload.worktree_id,
730 request.sender_id,
731 summary,
732 )?;
733
734 broadcast(request.sender_id, receiver_ids, |connection_id| {
735 self.peer
736 .forward_send(request.sender_id, connection_id, request.payload.clone())
737 });
738 Ok(())
739 }
740
741 async fn start_language_server(
742 self: Arc<Server>,
743 request: TypedEnvelope<proto::StartLanguageServer>,
744 ) -> Result<()> {
745 let receiver_ids = self.store_mut().await.start_language_server(
746 request.payload.project_id,
747 request.sender_id,
748 request
749 .payload
750 .server
751 .clone()
752 .ok_or_else(|| anyhow!("invalid language server"))?,
753 )?;
754 broadcast(request.sender_id, receiver_ids, |connection_id| {
755 self.peer
756 .forward_send(request.sender_id, connection_id, request.payload.clone())
757 });
758 Ok(())
759 }
760
761 async fn update_language_server(
762 self: Arc<Server>,
763 request: TypedEnvelope<proto::UpdateLanguageServer>,
764 ) -> Result<()> {
765 let receiver_ids = self
766 .store()
767 .await
768 .project_connection_ids(request.payload.project_id, request.sender_id)?;
769 broadcast(request.sender_id, receiver_ids, |connection_id| {
770 self.peer
771 .forward_send(request.sender_id, connection_id, request.payload.clone())
772 });
773 Ok(())
774 }
775
776 async fn forward_project_request<T>(
777 self: Arc<Server>,
778 request: TypedEnvelope<T>,
779 response: Response<T>,
780 ) -> Result<()>
781 where
782 T: EntityMessage + RequestMessage,
783 {
784 let host_connection_id = self
785 .store()
786 .await
787 .read_project(request.payload.remote_entity_id(), request.sender_id)?
788 .host_connection_id;
789
790 response.send(
791 self.peer
792 .forward_request(request.sender_id, host_connection_id, request.payload)
793 .await?,
794 )?;
795 Ok(())
796 }
797
798 async fn save_buffer(
799 self: Arc<Server>,
800 request: TypedEnvelope<proto::SaveBuffer>,
801 response: Response<proto::SaveBuffer>,
802 ) -> Result<()> {
803 let host = self
804 .store()
805 .await
806 .read_project(request.payload.project_id, request.sender_id)?
807 .host_connection_id;
808 let response_payload = self
809 .peer
810 .forward_request(request.sender_id, host, request.payload.clone())
811 .await?;
812
813 let mut guests = self
814 .store()
815 .await
816 .read_project(request.payload.project_id, request.sender_id)?
817 .connection_ids();
818 guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
819 broadcast(host, guests, |conn_id| {
820 self.peer
821 .forward_send(host, conn_id, response_payload.clone())
822 });
823 response.send(response_payload)?;
824 Ok(())
825 }
826
827 async fn update_buffer(
828 self: Arc<Server>,
829 request: TypedEnvelope<proto::UpdateBuffer>,
830 response: Response<proto::UpdateBuffer>,
831 ) -> Result<()> {
832 println!("{:?}: update buffer", request.sender_id);
833 let receiver_ids = self
834 .store()
835 .await
836 .project_connection_ids(request.payload.project_id, request.sender_id)?;
837 broadcast(request.sender_id, receiver_ids, |connection_id| {
838 self.peer
839 .forward_send(request.sender_id, connection_id, request.payload.clone())
840 });
841 response.send(proto::Ack {})?;
842 Ok(())
843 }
844
845 async fn update_buffer_file(
846 self: Arc<Server>,
847 request: TypedEnvelope<proto::UpdateBufferFile>,
848 ) -> Result<()> {
849 let receiver_ids = self
850 .store()
851 .await
852 .project_connection_ids(request.payload.project_id, request.sender_id)?;
853 broadcast(request.sender_id, receiver_ids, |connection_id| {
854 self.peer
855 .forward_send(request.sender_id, connection_id, request.payload.clone())
856 });
857 Ok(())
858 }
859
860 async fn buffer_reloaded(
861 self: Arc<Server>,
862 request: TypedEnvelope<proto::BufferReloaded>,
863 ) -> Result<()> {
864 let receiver_ids = self
865 .store()
866 .await
867 .project_connection_ids(request.payload.project_id, request.sender_id)?;
868 broadcast(request.sender_id, receiver_ids, |connection_id| {
869 self.peer
870 .forward_send(request.sender_id, connection_id, request.payload.clone())
871 });
872 Ok(())
873 }
874
875 async fn buffer_saved(
876 self: Arc<Server>,
877 request: TypedEnvelope<proto::BufferSaved>,
878 ) -> Result<()> {
879 let receiver_ids = self
880 .store()
881 .await
882 .project_connection_ids(request.payload.project_id, request.sender_id)?;
883 broadcast(request.sender_id, receiver_ids, |connection_id| {
884 self.peer
885 .forward_send(request.sender_id, connection_id, request.payload.clone())
886 });
887 Ok(())
888 }
889
890 async fn follow(
891 self: Arc<Self>,
892 request: TypedEnvelope<proto::Follow>,
893 response: Response<proto::Follow>,
894 ) -> Result<()> {
895 let leader_id = ConnectionId(request.payload.leader_id);
896 let follower_id = request.sender_id;
897 if !self
898 .store()
899 .await
900 .project_connection_ids(request.payload.project_id, follower_id)?
901 .contains(&leader_id)
902 {
903 Err(anyhow!("no such peer"))?;
904 }
905 let mut response_payload = self
906 .peer
907 .forward_request(request.sender_id, leader_id, request.payload)
908 .await?;
909 response_payload
910 .views
911 .retain(|view| view.leader_id != Some(follower_id.0));
912 response.send(response_payload)?;
913 Ok(())
914 }
915
916 async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
917 let leader_id = ConnectionId(request.payload.leader_id);
918 if !self
919 .store()
920 .await
921 .project_connection_ids(request.payload.project_id, request.sender_id)?
922 .contains(&leader_id)
923 {
924 Err(anyhow!("no such peer"))?;
925 }
926 self.peer
927 .forward_send(request.sender_id, leader_id, request.payload)?;
928 Ok(())
929 }
930
931 async fn update_followers(
932 self: Arc<Self>,
933 request: TypedEnvelope<proto::UpdateFollowers>,
934 ) -> Result<()> {
935 let connection_ids = self
936 .store()
937 .await
938 .project_connection_ids(request.payload.project_id, request.sender_id)?;
939 let leader_id = request
940 .payload
941 .variant
942 .as_ref()
943 .and_then(|variant| match variant {
944 proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
945 proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
946 proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
947 });
948 for follower_id in &request.payload.follower_ids {
949 let follower_id = ConnectionId(*follower_id);
950 if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
951 self.peer
952 .forward_send(request.sender_id, follower_id, request.payload.clone())?;
953 }
954 }
955 Ok(())
956 }
957
958 async fn get_channels(
959 self: Arc<Server>,
960 request: TypedEnvelope<proto::GetChannels>,
961 response: Response<proto::GetChannels>,
962 ) -> Result<()> {
963 let user_id = self
964 .store()
965 .await
966 .user_id_for_connection(request.sender_id)?;
967 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
968 response.send(proto::GetChannelsResponse {
969 channels: channels
970 .into_iter()
971 .map(|chan| proto::Channel {
972 id: chan.id.to_proto(),
973 name: chan.name,
974 })
975 .collect(),
976 })?;
977 Ok(())
978 }
979
980 async fn get_users(
981 self: Arc<Server>,
982 request: TypedEnvelope<proto::GetUsers>,
983 response: Response<proto::GetUsers>,
984 ) -> Result<()> {
985 let user_ids = request
986 .payload
987 .user_ids
988 .into_iter()
989 .map(UserId::from_proto)
990 .collect();
991 let users = self
992 .app_state
993 .db
994 .get_users_by_ids(user_ids)
995 .await?
996 .into_iter()
997 .map(|user| proto::User {
998 id: user.id.to_proto(),
999 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1000 github_login: user.github_login,
1001 })
1002 .collect();
1003 response.send(proto::UsersResponse { users })?;
1004 Ok(())
1005 }
1006
1007 async fn fuzzy_search_users(
1008 self: Arc<Server>,
1009 request: TypedEnvelope<proto::FuzzySearchUsers>,
1010 response: Response<proto::FuzzySearchUsers>,
1011 ) -> Result<()> {
1012 let user_id = self
1013 .store()
1014 .await
1015 .user_id_for_connection(request.sender_id)?;
1016 let query = request.payload.query;
1017 let db = &self.app_state.db;
1018 let users = match query.len() {
1019 0 => vec![],
1020 1 | 2 => db
1021 .get_user_by_github_login(&query)
1022 .await?
1023 .into_iter()
1024 .collect(),
1025 _ => db.fuzzy_search_users(&query, 10).await?,
1026 };
1027 let users = users
1028 .into_iter()
1029 .filter(|user| user.id != user_id)
1030 .map(|user| proto::User {
1031 id: user.id.to_proto(),
1032 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1033 github_login: user.github_login,
1034 })
1035 .collect();
1036 response.send(proto::UsersResponse { users })?;
1037 Ok(())
1038 }
1039
1040 async fn request_contact(
1041 self: Arc<Server>,
1042 request: TypedEnvelope<proto::RequestContact>,
1043 response: Response<proto::RequestContact>,
1044 ) -> Result<()> {
1045 let requester_id = self
1046 .store()
1047 .await
1048 .user_id_for_connection(request.sender_id)?;
1049 let responder_id = UserId::from_proto(request.payload.responder_id);
1050 if requester_id == responder_id {
1051 return Err(anyhow!("cannot add yourself as a contact"))?;
1052 }
1053
1054 self.app_state
1055 .db
1056 .send_contact_request(requester_id, responder_id)
1057 .await?;
1058
1059 // Update outgoing contact requests of requester
1060 let mut update = proto::UpdateContacts::default();
1061 update.outgoing_requests.push(responder_id.to_proto());
1062 for connection_id in self.store().await.connection_ids_for_user(requester_id) {
1063 self.peer.send(connection_id, update.clone())?;
1064 }
1065
1066 // Update incoming contact requests of responder
1067 let mut update = proto::UpdateContacts::default();
1068 update
1069 .incoming_requests
1070 .push(proto::IncomingContactRequest {
1071 requester_id: requester_id.to_proto(),
1072 should_notify: true,
1073 });
1074 for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1075 self.peer.send(connection_id, update.clone())?;
1076 }
1077
1078 response.send(proto::Ack {})?;
1079 Ok(())
1080 }
1081
1082 async fn respond_to_contact_request(
1083 self: Arc<Server>,
1084 request: TypedEnvelope<proto::RespondToContactRequest>,
1085 response: Response<proto::RespondToContactRequest>,
1086 ) -> Result<()> {
1087 let responder_id = self
1088 .store()
1089 .await
1090 .user_id_for_connection(request.sender_id)?;
1091 let requester_id = UserId::from_proto(request.payload.requester_id);
1092 if request.payload.response == proto::ContactRequestResponse::Dismiss as i32 {
1093 self.app_state
1094 .db
1095 .dismiss_contact_notification(responder_id, requester_id)
1096 .await?;
1097 } else {
1098 let accept = request.payload.response == proto::ContactRequestResponse::Accept as i32;
1099 self.app_state
1100 .db
1101 .respond_to_contact_request(responder_id, requester_id, accept)
1102 .await?;
1103
1104 let store = self.store().await;
1105 // Update responder with new contact
1106 let mut update = proto::UpdateContacts::default();
1107 if accept {
1108 update
1109 .contacts
1110 .push(store.contact_for_user(requester_id, false));
1111 }
1112 update
1113 .remove_incoming_requests
1114 .push(requester_id.to_proto());
1115 for connection_id in store.connection_ids_for_user(responder_id) {
1116 self.peer.send(connection_id, update.clone())?;
1117 }
1118
1119 // Update requester with new contact
1120 let mut update = proto::UpdateContacts::default();
1121 if accept {
1122 update
1123 .contacts
1124 .push(store.contact_for_user(responder_id, true));
1125 }
1126 update
1127 .remove_outgoing_requests
1128 .push(responder_id.to_proto());
1129 for connection_id in store.connection_ids_for_user(requester_id) {
1130 self.peer.send(connection_id, update.clone())?;
1131 }
1132 }
1133
1134 response.send(proto::Ack {})?;
1135 Ok(())
1136 }
1137
1138 async fn remove_contact(
1139 self: Arc<Server>,
1140 request: TypedEnvelope<proto::RemoveContact>,
1141 response: Response<proto::RemoveContact>,
1142 ) -> Result<()> {
1143 let requester_id = self
1144 .store()
1145 .await
1146 .user_id_for_connection(request.sender_id)?;
1147 let responder_id = UserId::from_proto(request.payload.user_id);
1148 self.app_state
1149 .db
1150 .remove_contact(requester_id, responder_id)
1151 .await?;
1152
1153 // Update outgoing contact requests of requester
1154 let mut update = proto::UpdateContacts::default();
1155 update
1156 .remove_outgoing_requests
1157 .push(responder_id.to_proto());
1158 for connection_id in self.store().await.connection_ids_for_user(requester_id) {
1159 self.peer.send(connection_id, update.clone())?;
1160 }
1161
1162 // Update incoming contact requests of responder
1163 let mut update = proto::UpdateContacts::default();
1164 update
1165 .remove_incoming_requests
1166 .push(requester_id.to_proto());
1167 for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1168 self.peer.send(connection_id, update.clone())?;
1169 }
1170
1171 response.send(proto::Ack {})?;
1172 Ok(())
1173 }
1174
1175 // #[instrument(skip(self, state, user_ids))]
1176 // fn update_contacts_for_users<'a>(
1177 // self: &Arc<Self>,
1178 // state: &Store,
1179 // user_ids: impl IntoIterator<Item = &'a UserId>,
1180 // ) {
1181 // for user_id in user_ids {
1182 // let contacts = state.contacts_for_user(*user_id);
1183 // for connection_id in state.connection_ids_for_user(*user_id) {
1184 // self.peer
1185 // .send(
1186 // connection_id,
1187 // proto::UpdateContacts {
1188 // contacts: contacts.clone(),
1189 // pending_requests_from_user_ids: Default::default(),
1190 // pending_requests_to_user_ids: Default::default(),
1191 // },
1192 // )
1193 // .trace_err();
1194 // }
1195 // }
1196 // }
1197
1198 async fn join_channel(
1199 self: Arc<Self>,
1200 request: TypedEnvelope<proto::JoinChannel>,
1201 response: Response<proto::JoinChannel>,
1202 ) -> Result<()> {
1203 let user_id = self
1204 .store()
1205 .await
1206 .user_id_for_connection(request.sender_id)?;
1207 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1208 if !self
1209 .app_state
1210 .db
1211 .can_user_access_channel(user_id, channel_id)
1212 .await?
1213 {
1214 Err(anyhow!("access denied"))?;
1215 }
1216
1217 self.store_mut()
1218 .await
1219 .join_channel(request.sender_id, channel_id);
1220 let messages = self
1221 .app_state
1222 .db
1223 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
1224 .await?
1225 .into_iter()
1226 .map(|msg| proto::ChannelMessage {
1227 id: msg.id.to_proto(),
1228 body: msg.body,
1229 timestamp: msg.sent_at.unix_timestamp() as u64,
1230 sender_id: msg.sender_id.to_proto(),
1231 nonce: Some(msg.nonce.as_u128().into()),
1232 })
1233 .collect::<Vec<_>>();
1234 response.send(proto::JoinChannelResponse {
1235 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1236 messages,
1237 })?;
1238 Ok(())
1239 }
1240
1241 async fn leave_channel(
1242 self: Arc<Self>,
1243 request: TypedEnvelope<proto::LeaveChannel>,
1244 ) -> Result<()> {
1245 let user_id = self
1246 .store()
1247 .await
1248 .user_id_for_connection(request.sender_id)?;
1249 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1250 if !self
1251 .app_state
1252 .db
1253 .can_user_access_channel(user_id, channel_id)
1254 .await?
1255 {
1256 Err(anyhow!("access denied"))?;
1257 }
1258
1259 self.store_mut()
1260 .await
1261 .leave_channel(request.sender_id, channel_id);
1262
1263 Ok(())
1264 }
1265
1266 async fn send_channel_message(
1267 self: Arc<Self>,
1268 request: TypedEnvelope<proto::SendChannelMessage>,
1269 response: Response<proto::SendChannelMessage>,
1270 ) -> Result<()> {
1271 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1272 let user_id;
1273 let connection_ids;
1274 {
1275 let state = self.store().await;
1276 user_id = state.user_id_for_connection(request.sender_id)?;
1277 connection_ids = state.channel_connection_ids(channel_id)?;
1278 }
1279
1280 // Validate the message body.
1281 let body = request.payload.body.trim().to_string();
1282 if body.len() > MAX_MESSAGE_LEN {
1283 return Err(anyhow!("message is too long"))?;
1284 }
1285 if body.is_empty() {
1286 return Err(anyhow!("message can't be blank"))?;
1287 }
1288
1289 let timestamp = OffsetDateTime::now_utc();
1290 let nonce = request
1291 .payload
1292 .nonce
1293 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
1294
1295 let message_id = self
1296 .app_state
1297 .db
1298 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1299 .await?
1300 .to_proto();
1301 let message = proto::ChannelMessage {
1302 sender_id: user_id.to_proto(),
1303 id: message_id,
1304 body,
1305 timestamp: timestamp.unix_timestamp() as u64,
1306 nonce: Some(nonce),
1307 };
1308 broadcast(request.sender_id, connection_ids, |conn_id| {
1309 self.peer.send(
1310 conn_id,
1311 proto::ChannelMessageSent {
1312 channel_id: channel_id.to_proto(),
1313 message: Some(message.clone()),
1314 },
1315 )
1316 });
1317 response.send(proto::SendChannelMessageResponse {
1318 message: Some(message),
1319 })?;
1320 Ok(())
1321 }
1322
1323 async fn get_channel_messages(
1324 self: Arc<Self>,
1325 request: TypedEnvelope<proto::GetChannelMessages>,
1326 response: Response<proto::GetChannelMessages>,
1327 ) -> Result<()> {
1328 let user_id = self
1329 .store()
1330 .await
1331 .user_id_for_connection(request.sender_id)?;
1332 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1333 if !self
1334 .app_state
1335 .db
1336 .can_user_access_channel(user_id, channel_id)
1337 .await?
1338 {
1339 Err(anyhow!("access denied"))?;
1340 }
1341
1342 let messages = self
1343 .app_state
1344 .db
1345 .get_channel_messages(
1346 channel_id,
1347 MESSAGE_COUNT_PER_PAGE,
1348 Some(MessageId::from_proto(request.payload.before_message_id)),
1349 )
1350 .await?
1351 .into_iter()
1352 .map(|msg| proto::ChannelMessage {
1353 id: msg.id.to_proto(),
1354 body: msg.body,
1355 timestamp: msg.sent_at.unix_timestamp() as u64,
1356 sender_id: msg.sender_id.to_proto(),
1357 nonce: Some(msg.nonce.as_u128().into()),
1358 })
1359 .collect::<Vec<_>>();
1360 response.send(proto::GetChannelMessagesResponse {
1361 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1362 messages,
1363 })?;
1364 Ok(())
1365 }
1366
1367 async fn store<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1368 #[cfg(test)]
1369 tokio::task::yield_now().await;
1370 let guard = self.store.read().await;
1371 #[cfg(test)]
1372 tokio::task::yield_now().await;
1373 StoreReadGuard {
1374 guard,
1375 _not_send: PhantomData,
1376 }
1377 }
1378
1379 async fn store_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1380 #[cfg(test)]
1381 tokio::task::yield_now().await;
1382 let guard = self.store.write().await;
1383 #[cfg(test)]
1384 tokio::task::yield_now().await;
1385 StoreWriteGuard {
1386 guard,
1387 _not_send: PhantomData,
1388 }
1389 }
1390}
1391
1392impl<'a> Deref for StoreReadGuard<'a> {
1393 type Target = Store;
1394
1395 fn deref(&self) -> &Self::Target {
1396 &*self.guard
1397 }
1398}
1399
1400impl<'a> Deref for StoreWriteGuard<'a> {
1401 type Target = Store;
1402
1403 fn deref(&self) -> &Self::Target {
1404 &*self.guard
1405 }
1406}
1407
1408impl<'a> DerefMut for StoreWriteGuard<'a> {
1409 fn deref_mut(&mut self) -> &mut Self::Target {
1410 &mut *self.guard
1411 }
1412}
1413
1414impl<'a> Drop for StoreWriteGuard<'a> {
1415 fn drop(&mut self) {
1416 #[cfg(test)]
1417 self.check_invariants();
1418
1419 let metrics = self.metrics();
1420 tracing::info!(
1421 connections = metrics.connections,
1422 registered_projects = metrics.registered_projects,
1423 shared_projects = metrics.shared_projects,
1424 "metrics"
1425 );
1426 }
1427}
1428
1429impl Executor for RealExecutor {
1430 type Sleep = Sleep;
1431
1432 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1433 tokio::task::spawn(future);
1434 }
1435
1436 fn sleep(&self, duration: Duration) -> Self::Sleep {
1437 tokio::time::sleep(duration)
1438 }
1439}
1440
1441fn broadcast<F>(
1442 sender_id: ConnectionId,
1443 receiver_ids: impl IntoIterator<Item = ConnectionId>,
1444 mut f: F,
1445) where
1446 F: FnMut(ConnectionId) -> anyhow::Result<()>,
1447{
1448 for receiver_id in receiver_ids {
1449 if receiver_id != sender_id {
1450 f(receiver_id).trace_err();
1451 }
1452 }
1453}
1454
1455lazy_static! {
1456 static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1457}
1458
1459pub struct ProtocolVersion(u32);
1460
1461impl Header for ProtocolVersion {
1462 fn name() -> &'static HeaderName {
1463 &ZED_PROTOCOL_VERSION
1464 }
1465
1466 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1467 where
1468 Self: Sized,
1469 I: Iterator<Item = &'i axum::http::HeaderValue>,
1470 {
1471 let version = values
1472 .next()
1473 .ok_or_else(|| axum::headers::Error::invalid())?
1474 .to_str()
1475 .map_err(|_| axum::headers::Error::invalid())?
1476 .parse()
1477 .map_err(|_| axum::headers::Error::invalid())?;
1478 Ok(Self(version))
1479 }
1480
1481 fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1482 values.extend([self.0.to_string().parse().unwrap()]);
1483 }
1484}
1485
1486pub fn routes(app_state: Arc<AppState>) -> Router<Body> {
1487 let server = Server::new(app_state.clone(), None);
1488 Router::new()
1489 .route("/rpc", get(handle_websocket_request))
1490 .layer(
1491 ServiceBuilder::new()
1492 .layer(Extension(app_state))
1493 .layer(middleware::from_fn(auth::validate_header))
1494 .layer(Extension(server)),
1495 )
1496}
1497
1498pub async fn handle_websocket_request(
1499 TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1500 ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1501 Extension(server): Extension<Arc<Server>>,
1502 Extension(user_id): Extension<UserId>,
1503 ws: WebSocketUpgrade,
1504) -> axum::response::Response {
1505 if protocol_version != rpc::PROTOCOL_VERSION {
1506 return (
1507 StatusCode::UPGRADE_REQUIRED,
1508 "client must be upgraded".to_string(),
1509 )
1510 .into_response();
1511 }
1512 let socket_address = socket_address.to_string();
1513 ws.on_upgrade(move |socket| {
1514 use util::ResultExt;
1515 let socket = socket
1516 .map_ok(to_tungstenite_message)
1517 .err_into()
1518 .with(|message| async move { Ok(to_axum_message(message)) });
1519 let connection = Connection::new(Box::pin(socket));
1520 async move {
1521 server
1522 .handle_connection(connection, socket_address, user_id, None, RealExecutor)
1523 .await
1524 .log_err();
1525 }
1526 })
1527}
1528
1529fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1530 match message {
1531 TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1532 TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1533 TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1534 TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1535 TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1536 code: frame.code.into(),
1537 reason: frame.reason,
1538 })),
1539 }
1540}
1541
1542fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1543 match message {
1544 AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1545 AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1546 AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1547 AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1548 AxumMessage::Close(frame) => {
1549 TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1550 code: frame.code.into(),
1551 reason: frame.reason,
1552 }))
1553 }
1554 }
1555}
1556
1557pub trait ResultExt {
1558 type Ok;
1559
1560 fn trace_err(self) -> Option<Self::Ok>;
1561}
1562
1563impl<T, E> ResultExt for Result<T, E>
1564where
1565 E: std::fmt::Debug,
1566{
1567 type Ok = T;
1568
1569 fn trace_err(self) -> Option<T> {
1570 match self {
1571 Ok(value) => Some(value),
1572 Err(error) => {
1573 tracing::error!("{:?}", error);
1574 None
1575 }
1576 }
1577 }
1578}
1579
1580#[cfg(test)]
1581mod tests {
1582 use super::*;
1583 use crate::{
1584 db::{tests::TestDb, UserId},
1585 AppState,
1586 };
1587 use ::rpc::Peer;
1588 use client::{
1589 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1590 EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1591 };
1592 use collections::{BTreeMap, HashSet};
1593 use editor::{
1594 self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1595 ToOffset, ToggleCodeActions, Undo,
1596 };
1597 use gpui::{
1598 executor::{self, Deterministic},
1599 geometry::vector::vec2f,
1600 ModelHandle, TestAppContext, ViewHandle,
1601 };
1602 use language::{
1603 range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1604 LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1605 };
1606 use lsp::{self, FakeLanguageServer};
1607 use parking_lot::Mutex;
1608 use project::{
1609 fs::{FakeFs, Fs as _},
1610 search::SearchQuery,
1611 worktree::WorktreeHandle,
1612 DiagnosticSummary, Project, ProjectPath, WorktreeId,
1613 };
1614 use rand::prelude::*;
1615 use rpc::PeerId;
1616 use serde_json::json;
1617 use settings::Settings;
1618 use sqlx::types::time::OffsetDateTime;
1619 use std::{
1620 env,
1621 ops::Deref,
1622 path::{Path, PathBuf},
1623 rc::Rc,
1624 sync::{
1625 atomic::{AtomicBool, Ordering::SeqCst},
1626 Arc,
1627 },
1628 time::Duration,
1629 };
1630 use theme::ThemeRegistry;
1631 use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
1632
1633 #[cfg(test)]
1634 #[ctor::ctor]
1635 fn init_logger() {
1636 if std::env::var("RUST_LOG").is_ok() {
1637 env_logger::init();
1638 }
1639 }
1640
1641 #[gpui::test(iterations = 10)]
1642 async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1643 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1644 let lang_registry = Arc::new(LanguageRegistry::test());
1645 let fs = FakeFs::new(cx_a.background());
1646 cx_a.foreground().forbid_parking();
1647
1648 // Connect to a server as 2 clients.
1649 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1650 let client_a = server.create_client(cx_a, "user_a").await;
1651 let mut client_b = server.create_client(cx_b, "user_b").await;
1652 server
1653 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1654 .await;
1655
1656 // Share a project as client A
1657 fs.insert_tree(
1658 "/a",
1659 json!({
1660 "a.txt": "a-contents",
1661 "b.txt": "b-contents",
1662 }),
1663 )
1664 .await;
1665 let project_a = cx_a.update(|cx| {
1666 Project::local(
1667 client_a.clone(),
1668 client_a.user_store.clone(),
1669 lang_registry.clone(),
1670 fs.clone(),
1671 cx,
1672 )
1673 });
1674 let (worktree_a, _) = project_a
1675 .update(cx_a, |p, cx| {
1676 p.find_or_create_local_worktree("/a", true, cx)
1677 })
1678 .await
1679 .unwrap();
1680 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1681 worktree_a
1682 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1683 .await;
1684
1685 // Join that project as client B
1686 let client_b_peer_id = client_b.peer_id;
1687 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1688
1689 let replica_id_b = project_b.read_with(cx_b, |project, _| {
1690 assert_eq!(
1691 project
1692 .collaborators()
1693 .get(&client_a.peer_id)
1694 .unwrap()
1695 .user
1696 .github_login,
1697 "user_a"
1698 );
1699 project.replica_id()
1700 });
1701 project_a
1702 .condition(&cx_a, |tree, _| {
1703 tree.collaborators()
1704 .get(&client_b_peer_id)
1705 .map_or(false, |collaborator| {
1706 collaborator.replica_id == replica_id_b
1707 && collaborator.user.github_login == "user_b"
1708 })
1709 })
1710 .await;
1711
1712 // Open the same file as client B and client A.
1713 let buffer_b = project_b
1714 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1715 .await
1716 .unwrap();
1717 buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1718 project_a.read_with(cx_a, |project, cx| {
1719 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1720 });
1721 let buffer_a = project_a
1722 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1723 .await
1724 .unwrap();
1725
1726 let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1727
1728 // TODO
1729 // // Create a selection set as client B and see that selection set as client A.
1730 // buffer_a
1731 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1732 // .await;
1733
1734 // Edit the buffer as client B and see that edit as client A.
1735 editor_b.update(cx_b, |editor, cx| {
1736 editor.handle_input(&Input("ok, ".into()), cx)
1737 });
1738 buffer_a
1739 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1740 .await;
1741
1742 // TODO
1743 // // Remove the selection set as client B, see those selections disappear as client A.
1744 cx_b.update(move |_| drop(editor_b));
1745 // buffer_a
1746 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1747 // .await;
1748
1749 // Dropping the client B's project removes client B from client A's collaborators.
1750 cx_b.update(move |_| {
1751 drop(client_b.project.take());
1752 drop(project_b);
1753 });
1754 project_a
1755 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1756 .await;
1757 }
1758
1759 #[gpui::test(iterations = 10)]
1760 async fn test_unshare_project(
1761 deterministic: Arc<Deterministic>,
1762 cx_a: &mut TestAppContext,
1763 cx_b: &mut TestAppContext,
1764 ) {
1765 let lang_registry = Arc::new(LanguageRegistry::test());
1766 let fs = FakeFs::new(cx_a.background());
1767 cx_a.foreground().forbid_parking();
1768
1769 // Connect to a server as 2 clients.
1770 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1771 let client_a = server.create_client(cx_a, "user_a").await;
1772 let mut client_b = server.create_client(cx_b, "user_b").await;
1773 server
1774 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1775 .await;
1776
1777 // Share a project as client A
1778 fs.insert_tree(
1779 "/a",
1780 json!({
1781 "a.txt": "a-contents",
1782 "b.txt": "b-contents",
1783 }),
1784 )
1785 .await;
1786 let project_a = cx_a.update(|cx| {
1787 Project::local(
1788 client_a.clone(),
1789 client_a.user_store.clone(),
1790 lang_registry.clone(),
1791 fs.clone(),
1792 cx,
1793 )
1794 });
1795 let (worktree_a, _) = project_a
1796 .update(cx_a, |p, cx| {
1797 p.find_or_create_local_worktree("/a", true, cx)
1798 })
1799 .await
1800 .unwrap();
1801 worktree_a
1802 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1803 .await;
1804 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1805
1806 // Join that project as client B
1807 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1808 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1809 project_b
1810 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1811 .await
1812 .unwrap();
1813
1814 // When client B leaves the project, it gets automatically unshared.
1815 cx_b.update(|_| {
1816 drop(client_b.project.take());
1817 drop(project_b);
1818 });
1819 deterministic.run_until_parked();
1820 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1821
1822 // When client B joins again, the project gets re-shared.
1823 let project_b2 = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1824 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1825 project_b2
1826 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1827 .await
1828 .unwrap();
1829 }
1830
1831 #[gpui::test(iterations = 10)]
1832 async fn test_host_disconnect(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1833 let lang_registry = Arc::new(LanguageRegistry::test());
1834 let fs = FakeFs::new(cx_a.background());
1835 cx_a.foreground().forbid_parking();
1836
1837 // Connect to a server as 2 clients.
1838 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1839 let client_a = server.create_client(cx_a, "user_a").await;
1840 let mut client_b = server.create_client(cx_b, "user_b").await;
1841 server
1842 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1843 .await;
1844
1845 // Share a project as client A
1846 fs.insert_tree(
1847 "/a",
1848 json!({
1849 "a.txt": "a-contents",
1850 "b.txt": "b-contents",
1851 }),
1852 )
1853 .await;
1854 let project_a = cx_a.update(|cx| {
1855 Project::local(
1856 client_a.clone(),
1857 client_a.user_store.clone(),
1858 lang_registry.clone(),
1859 fs.clone(),
1860 cx,
1861 )
1862 });
1863 let (worktree_a, _) = project_a
1864 .update(cx_a, |p, cx| {
1865 p.find_or_create_local_worktree("/a", true, cx)
1866 })
1867 .await
1868 .unwrap();
1869 worktree_a
1870 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1871 .await;
1872 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1873
1874 // Join that project as client B
1875 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1876 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1877 project_b
1878 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1879 .await
1880 .unwrap();
1881
1882 // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
1883 server.disconnect_client(client_a.current_user_id(cx_a));
1884 cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
1885 project_a
1886 .condition(cx_a, |project, _| project.collaborators().is_empty())
1887 .await;
1888 project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
1889 project_b
1890 .condition(cx_b, |project, _| project.is_read_only())
1891 .await;
1892 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1893 cx_b.update(|_| {
1894 drop(project_b);
1895 });
1896
1897 // Ensure guests can still join.
1898 let project_b2 = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1899 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1900 project_b2
1901 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1902 .await
1903 .unwrap();
1904 }
1905
1906 #[gpui::test(iterations = 10)]
1907 async fn test_propagate_saves_and_fs_changes(
1908 cx_a: &mut TestAppContext,
1909 cx_b: &mut TestAppContext,
1910 cx_c: &mut TestAppContext,
1911 ) {
1912 let lang_registry = Arc::new(LanguageRegistry::test());
1913 let fs = FakeFs::new(cx_a.background());
1914 cx_a.foreground().forbid_parking();
1915
1916 // Connect to a server as 3 clients.
1917 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1918 let client_a = server.create_client(cx_a, "user_a").await;
1919 let mut client_b = server.create_client(cx_b, "user_b").await;
1920 let mut client_c = server.create_client(cx_c, "user_c").await;
1921 server
1922 .make_contacts(vec![
1923 (&client_a, cx_a),
1924 (&client_b, cx_b),
1925 (&client_c, cx_c),
1926 ])
1927 .await;
1928
1929 // Share a worktree as client A.
1930 fs.insert_tree(
1931 "/a",
1932 json!({
1933 "file1": "",
1934 "file2": ""
1935 }),
1936 )
1937 .await;
1938 let project_a = cx_a.update(|cx| {
1939 Project::local(
1940 client_a.clone(),
1941 client_a.user_store.clone(),
1942 lang_registry.clone(),
1943 fs.clone(),
1944 cx,
1945 )
1946 });
1947 let (worktree_a, _) = project_a
1948 .update(cx_a, |p, cx| {
1949 p.find_or_create_local_worktree("/a", true, cx)
1950 })
1951 .await
1952 .unwrap();
1953 worktree_a
1954 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1955 .await;
1956 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1957
1958 // Join that worktree as clients B and C.
1959 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1960 let project_c = client_c.build_remote_project(&project_a, cx_a, cx_c).await;
1961 let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1962 let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1963
1964 // Open and edit a buffer as both guests B and C.
1965 let buffer_b = project_b
1966 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1967 .await
1968 .unwrap();
1969 let buffer_c = project_c
1970 .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1971 .await
1972 .unwrap();
1973 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "i-am-b, ")], cx));
1974 buffer_c.update(cx_c, |buf, cx| buf.edit([(0..0, "i-am-c, ")], cx));
1975
1976 // Open and edit that buffer as the host.
1977 let buffer_a = project_a
1978 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1979 .await
1980 .unwrap();
1981
1982 buffer_a
1983 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1984 .await;
1985 buffer_a.update(cx_a, |buf, cx| {
1986 buf.edit([(buf.len()..buf.len(), "i-am-a")], cx)
1987 });
1988
1989 // Wait for edits to propagate
1990 buffer_a
1991 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1992 .await;
1993 buffer_b
1994 .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1995 .await;
1996 buffer_c
1997 .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1998 .await;
1999
2000 // Edit the buffer as the host and concurrently save as guest B.
2001 let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
2002 buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "hi-a, ")], cx));
2003 save_b.await.unwrap();
2004 assert_eq!(
2005 fs.load("/a/file1".as_ref()).await.unwrap(),
2006 "hi-a, i-am-c, i-am-b, i-am-a"
2007 );
2008 buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
2009 buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
2010 buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
2011
2012 worktree_a.flush_fs_events(cx_a).await;
2013
2014 // Make changes on host's file system, see those changes on guest worktrees.
2015 fs.rename(
2016 "/a/file1".as_ref(),
2017 "/a/file1-renamed".as_ref(),
2018 Default::default(),
2019 )
2020 .await
2021 .unwrap();
2022
2023 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
2024 .await
2025 .unwrap();
2026 fs.insert_file(Path::new("/a/file4"), "4".into()).await;
2027
2028 worktree_a
2029 .condition(&cx_a, |tree, _| {
2030 tree.paths()
2031 .map(|p| p.to_string_lossy())
2032 .collect::<Vec<_>>()
2033 == ["file1-renamed", "file3", "file4"]
2034 })
2035 .await;
2036 worktree_b
2037 .condition(&cx_b, |tree, _| {
2038 tree.paths()
2039 .map(|p| p.to_string_lossy())
2040 .collect::<Vec<_>>()
2041 == ["file1-renamed", "file3", "file4"]
2042 })
2043 .await;
2044 worktree_c
2045 .condition(&cx_c, |tree, _| {
2046 tree.paths()
2047 .map(|p| p.to_string_lossy())
2048 .collect::<Vec<_>>()
2049 == ["file1-renamed", "file3", "file4"]
2050 })
2051 .await;
2052
2053 // Ensure buffer files are updated as well.
2054 buffer_a
2055 .condition(&cx_a, |buf, _| {
2056 buf.file().unwrap().path().to_str() == Some("file1-renamed")
2057 })
2058 .await;
2059 buffer_b
2060 .condition(&cx_b, |buf, _| {
2061 buf.file().unwrap().path().to_str() == Some("file1-renamed")
2062 })
2063 .await;
2064 buffer_c
2065 .condition(&cx_c, |buf, _| {
2066 buf.file().unwrap().path().to_str() == Some("file1-renamed")
2067 })
2068 .await;
2069 }
2070
2071 #[gpui::test(iterations = 10)]
2072 async fn test_fs_operations(
2073 executor: Arc<Deterministic>,
2074 cx_a: &mut TestAppContext,
2075 cx_b: &mut TestAppContext,
2076 ) {
2077 executor.forbid_parking();
2078 let fs = FakeFs::new(cx_a.background());
2079
2080 // Connect to a server as 2 clients.
2081 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2082 let mut client_a = server.create_client(cx_a, "user_a").await;
2083 let mut client_b = server.create_client(cx_b, "user_b").await;
2084 server
2085 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2086 .await;
2087
2088 // Share a project as client A
2089 fs.insert_tree(
2090 "/dir",
2091 json!({
2092 "a.txt": "a-contents",
2093 "b.txt": "b-contents",
2094 }),
2095 )
2096 .await;
2097
2098 let (project_a, worktree_id) = client_a.build_local_project(fs, "/dir", cx_a).await;
2099 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2100
2101 let worktree_a =
2102 project_a.read_with(cx_a, |project, cx| project.worktrees(cx).next().unwrap());
2103 let worktree_b =
2104 project_b.read_with(cx_b, |project, cx| project.worktrees(cx).next().unwrap());
2105
2106 let entry = project_b
2107 .update(cx_b, |project, cx| {
2108 project
2109 .create_entry((worktree_id, "c.txt"), false, cx)
2110 .unwrap()
2111 })
2112 .await
2113 .unwrap();
2114 worktree_a.read_with(cx_a, |worktree, _| {
2115 assert_eq!(
2116 worktree
2117 .paths()
2118 .map(|p| p.to_string_lossy())
2119 .collect::<Vec<_>>(),
2120 ["a.txt", "b.txt", "c.txt"]
2121 );
2122 });
2123 worktree_b.read_with(cx_b, |worktree, _| {
2124 assert_eq!(
2125 worktree
2126 .paths()
2127 .map(|p| p.to_string_lossy())
2128 .collect::<Vec<_>>(),
2129 ["a.txt", "b.txt", "c.txt"]
2130 );
2131 });
2132
2133 project_b
2134 .update(cx_b, |project, cx| {
2135 project.rename_entry(entry.id, Path::new("d.txt"), cx)
2136 })
2137 .unwrap()
2138 .await
2139 .unwrap();
2140 worktree_a.read_with(cx_a, |worktree, _| {
2141 assert_eq!(
2142 worktree
2143 .paths()
2144 .map(|p| p.to_string_lossy())
2145 .collect::<Vec<_>>(),
2146 ["a.txt", "b.txt", "d.txt"]
2147 );
2148 });
2149 worktree_b.read_with(cx_b, |worktree, _| {
2150 assert_eq!(
2151 worktree
2152 .paths()
2153 .map(|p| p.to_string_lossy())
2154 .collect::<Vec<_>>(),
2155 ["a.txt", "b.txt", "d.txt"]
2156 );
2157 });
2158
2159 let dir_entry = project_b
2160 .update(cx_b, |project, cx| {
2161 project
2162 .create_entry((worktree_id, "DIR"), true, cx)
2163 .unwrap()
2164 })
2165 .await
2166 .unwrap();
2167 worktree_a.read_with(cx_a, |worktree, _| {
2168 assert_eq!(
2169 worktree
2170 .paths()
2171 .map(|p| p.to_string_lossy())
2172 .collect::<Vec<_>>(),
2173 ["DIR", "a.txt", "b.txt", "d.txt"]
2174 );
2175 });
2176 worktree_b.read_with(cx_b, |worktree, _| {
2177 assert_eq!(
2178 worktree
2179 .paths()
2180 .map(|p| p.to_string_lossy())
2181 .collect::<Vec<_>>(),
2182 ["DIR", "a.txt", "b.txt", "d.txt"]
2183 );
2184 });
2185
2186 project_b
2187 .update(cx_b, |project, cx| {
2188 project.delete_entry(dir_entry.id, cx).unwrap()
2189 })
2190 .await
2191 .unwrap();
2192 worktree_a.read_with(cx_a, |worktree, _| {
2193 assert_eq!(
2194 worktree
2195 .paths()
2196 .map(|p| p.to_string_lossy())
2197 .collect::<Vec<_>>(),
2198 ["a.txt", "b.txt", "d.txt"]
2199 );
2200 });
2201 worktree_b.read_with(cx_b, |worktree, _| {
2202 assert_eq!(
2203 worktree
2204 .paths()
2205 .map(|p| p.to_string_lossy())
2206 .collect::<Vec<_>>(),
2207 ["a.txt", "b.txt", "d.txt"]
2208 );
2209 });
2210
2211 project_b
2212 .update(cx_b, |project, cx| {
2213 project.delete_entry(entry.id, cx).unwrap()
2214 })
2215 .await
2216 .unwrap();
2217 worktree_a.read_with(cx_a, |worktree, _| {
2218 assert_eq!(
2219 worktree
2220 .paths()
2221 .map(|p| p.to_string_lossy())
2222 .collect::<Vec<_>>(),
2223 ["a.txt", "b.txt"]
2224 );
2225 });
2226 worktree_b.read_with(cx_b, |worktree, _| {
2227 assert_eq!(
2228 worktree
2229 .paths()
2230 .map(|p| p.to_string_lossy())
2231 .collect::<Vec<_>>(),
2232 ["a.txt", "b.txt"]
2233 );
2234 });
2235 }
2236
2237 #[gpui::test(iterations = 10)]
2238 async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2239 cx_a.foreground().forbid_parking();
2240 let lang_registry = Arc::new(LanguageRegistry::test());
2241 let fs = FakeFs::new(cx_a.background());
2242
2243 // Connect to a server as 2 clients.
2244 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2245 let client_a = server.create_client(cx_a, "user_a").await;
2246 let mut client_b = server.create_client(cx_b, "user_b").await;
2247 server
2248 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2249 .await;
2250
2251 // Share a project as client A
2252 fs.insert_tree(
2253 "/dir",
2254 json!({
2255 "a.txt": "a-contents",
2256 }),
2257 )
2258 .await;
2259
2260 let project_a = cx_a.update(|cx| {
2261 Project::local(
2262 client_a.clone(),
2263 client_a.user_store.clone(),
2264 lang_registry.clone(),
2265 fs.clone(),
2266 cx,
2267 )
2268 });
2269 let (worktree_a, _) = project_a
2270 .update(cx_a, |p, cx| {
2271 p.find_or_create_local_worktree("/dir", true, cx)
2272 })
2273 .await
2274 .unwrap();
2275 worktree_a
2276 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2277 .await;
2278 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2279
2280 // Join that project as client B
2281 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2282
2283 // Open a buffer as client B
2284 let buffer_b = project_b
2285 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2286 .await
2287 .unwrap();
2288
2289 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "world ")], cx));
2290 buffer_b.read_with(cx_b, |buf, _| {
2291 assert!(buf.is_dirty());
2292 assert!(!buf.has_conflict());
2293 });
2294
2295 buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
2296 buffer_b
2297 .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
2298 .await;
2299 buffer_b.read_with(cx_b, |buf, _| {
2300 assert!(!buf.has_conflict());
2301 });
2302
2303 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "hello ")], cx));
2304 buffer_b.read_with(cx_b, |buf, _| {
2305 assert!(buf.is_dirty());
2306 assert!(!buf.has_conflict());
2307 });
2308 }
2309
2310 #[gpui::test(iterations = 10)]
2311 async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2312 cx_a.foreground().forbid_parking();
2313 let lang_registry = Arc::new(LanguageRegistry::test());
2314 let fs = FakeFs::new(cx_a.background());
2315
2316 // Connect to a server as 2 clients.
2317 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2318 let client_a = server.create_client(cx_a, "user_a").await;
2319 let mut client_b = server.create_client(cx_b, "user_b").await;
2320 server
2321 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2322 .await;
2323
2324 // Share a project as client A
2325 fs.insert_tree(
2326 "/dir",
2327 json!({
2328 "a.txt": "a-contents",
2329 }),
2330 )
2331 .await;
2332
2333 let project_a = cx_a.update(|cx| {
2334 Project::local(
2335 client_a.clone(),
2336 client_a.user_store.clone(),
2337 lang_registry.clone(),
2338 fs.clone(),
2339 cx,
2340 )
2341 });
2342 let (worktree_a, _) = project_a
2343 .update(cx_a, |p, cx| {
2344 p.find_or_create_local_worktree("/dir", true, cx)
2345 })
2346 .await
2347 .unwrap();
2348 worktree_a
2349 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2350 .await;
2351 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2352
2353 // Join that project as client B
2354 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2355 let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2356
2357 // Open a buffer as client B
2358 let buffer_b = project_b
2359 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2360 .await
2361 .unwrap();
2362 buffer_b.read_with(cx_b, |buf, _| {
2363 assert!(!buf.is_dirty());
2364 assert!(!buf.has_conflict());
2365 });
2366
2367 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
2368 .await
2369 .unwrap();
2370 buffer_b
2371 .condition(&cx_b, |buf, _| {
2372 buf.text() == "new contents" && !buf.is_dirty()
2373 })
2374 .await;
2375 buffer_b.read_with(cx_b, |buf, _| {
2376 assert!(!buf.has_conflict());
2377 });
2378 }
2379
2380 #[gpui::test(iterations = 10)]
2381 async fn test_editing_while_guest_opens_buffer(
2382 cx_a: &mut TestAppContext,
2383 cx_b: &mut TestAppContext,
2384 ) {
2385 cx_a.foreground().forbid_parking();
2386 let lang_registry = Arc::new(LanguageRegistry::test());
2387 let fs = FakeFs::new(cx_a.background());
2388
2389 // Connect to a server as 2 clients.
2390 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2391 let client_a = server.create_client(cx_a, "user_a").await;
2392 let mut client_b = server.create_client(cx_b, "user_b").await;
2393 server
2394 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2395 .await;
2396
2397 // Share a project as client A
2398 fs.insert_tree(
2399 "/dir",
2400 json!({
2401 "a.txt": "a-contents",
2402 }),
2403 )
2404 .await;
2405 let project_a = cx_a.update(|cx| {
2406 Project::local(
2407 client_a.clone(),
2408 client_a.user_store.clone(),
2409 lang_registry.clone(),
2410 fs.clone(),
2411 cx,
2412 )
2413 });
2414 let (worktree_a, _) = project_a
2415 .update(cx_a, |p, cx| {
2416 p.find_or_create_local_worktree("/dir", true, cx)
2417 })
2418 .await
2419 .unwrap();
2420 worktree_a
2421 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2422 .await;
2423 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2424
2425 // Join that project as client B
2426 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2427
2428 // Open a buffer as client A
2429 let buffer_a = project_a
2430 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2431 .await
2432 .unwrap();
2433
2434 // Start opening the same buffer as client B
2435 let buffer_b = cx_b
2436 .background()
2437 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2438
2439 // Edit the buffer as client A while client B is still opening it.
2440 cx_b.background().simulate_random_delay().await;
2441 buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "X")], cx));
2442 cx_b.background().simulate_random_delay().await;
2443 buffer_a.update(cx_a, |buf, cx| buf.edit([(1..1, "Y")], cx));
2444
2445 let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2446 let buffer_b = buffer_b.await.unwrap();
2447 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2448 }
2449
2450 #[gpui::test(iterations = 10)]
2451 async fn test_leaving_worktree_while_opening_buffer(
2452 cx_a: &mut TestAppContext,
2453 cx_b: &mut TestAppContext,
2454 ) {
2455 cx_a.foreground().forbid_parking();
2456 let lang_registry = Arc::new(LanguageRegistry::test());
2457 let fs = FakeFs::new(cx_a.background());
2458
2459 // Connect to a server as 2 clients.
2460 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2461 let client_a = server.create_client(cx_a, "user_a").await;
2462 let mut client_b = server.create_client(cx_b, "user_b").await;
2463 server
2464 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2465 .await;
2466
2467 // Share a project as client A
2468 fs.insert_tree(
2469 "/dir",
2470 json!({
2471 "a.txt": "a-contents",
2472 }),
2473 )
2474 .await;
2475 let project_a = cx_a.update(|cx| {
2476 Project::local(
2477 client_a.clone(),
2478 client_a.user_store.clone(),
2479 lang_registry.clone(),
2480 fs.clone(),
2481 cx,
2482 )
2483 });
2484 let (worktree_a, _) = project_a
2485 .update(cx_a, |p, cx| {
2486 p.find_or_create_local_worktree("/dir", true, cx)
2487 })
2488 .await
2489 .unwrap();
2490 worktree_a
2491 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2492 .await;
2493 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2494
2495 // Join that project as client B
2496 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2497
2498 // See that a guest has joined as client A.
2499 project_a
2500 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2501 .await;
2502
2503 // Begin opening a buffer as client B, but leave the project before the open completes.
2504 let buffer_b = cx_b
2505 .background()
2506 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2507 cx_b.update(|_| {
2508 drop(client_b.project.take());
2509 drop(project_b);
2510 });
2511 drop(buffer_b);
2512
2513 // See that the guest has left.
2514 project_a
2515 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2516 .await;
2517 }
2518
2519 #[gpui::test(iterations = 10)]
2520 async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2521 cx_a.foreground().forbid_parking();
2522 let lang_registry = Arc::new(LanguageRegistry::test());
2523 let fs = FakeFs::new(cx_a.background());
2524
2525 // Connect to a server as 2 clients.
2526 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2527 let client_a = server.create_client(cx_a, "user_a").await;
2528 let mut client_b = server.create_client(cx_b, "user_b").await;
2529 server
2530 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2531 .await;
2532
2533 // Share a project as client A
2534 fs.insert_tree(
2535 "/a",
2536 json!({
2537 "a.txt": "a-contents",
2538 "b.txt": "b-contents",
2539 }),
2540 )
2541 .await;
2542 let project_a = cx_a.update(|cx| {
2543 Project::local(
2544 client_a.clone(),
2545 client_a.user_store.clone(),
2546 lang_registry.clone(),
2547 fs.clone(),
2548 cx,
2549 )
2550 });
2551 let (worktree_a, _) = project_a
2552 .update(cx_a, |p, cx| {
2553 p.find_or_create_local_worktree("/a", true, cx)
2554 })
2555 .await
2556 .unwrap();
2557 worktree_a
2558 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2559 .await;
2560
2561 // Join that project as client B
2562 let _project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2563
2564 // Client A sees that a guest has joined.
2565 project_a
2566 .condition(cx_a, |p, _| p.collaborators().len() == 1)
2567 .await;
2568
2569 // Drop client B's connection and ensure client A observes client B leaving the project.
2570 client_b.disconnect(&cx_b.to_async()).unwrap();
2571 project_a
2572 .condition(cx_a, |p, _| p.collaborators().len() == 0)
2573 .await;
2574
2575 // Rejoin the project as client B
2576 let _project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2577
2578 // Client A sees that a guest has re-joined.
2579 project_a
2580 .condition(cx_a, |p, _| p.collaborators().len() == 1)
2581 .await;
2582
2583 // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2584 client_b.wait_for_current_user(cx_b).await;
2585 server.disconnect_client(client_b.current_user_id(cx_b));
2586 cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
2587 project_a
2588 .condition(cx_a, |p, _| p.collaborators().len() == 0)
2589 .await;
2590 }
2591
2592 #[gpui::test(iterations = 10)]
2593 async fn test_collaborating_with_diagnostics(
2594 deterministic: Arc<Deterministic>,
2595 cx_a: &mut TestAppContext,
2596 cx_b: &mut TestAppContext,
2597 cx_c: &mut TestAppContext,
2598 ) {
2599 deterministic.forbid_parking();
2600 let lang_registry = Arc::new(LanguageRegistry::test());
2601 let fs = FakeFs::new(cx_a.background());
2602
2603 // Set up a fake language server.
2604 let mut language = Language::new(
2605 LanguageConfig {
2606 name: "Rust".into(),
2607 path_suffixes: vec!["rs".to_string()],
2608 ..Default::default()
2609 },
2610 Some(tree_sitter_rust::language()),
2611 );
2612 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2613 lang_registry.add(Arc::new(language));
2614
2615 // Connect to a server as 2 clients.
2616 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2617 let client_a = server.create_client(cx_a, "user_a").await;
2618 let mut client_b = server.create_client(cx_b, "user_b").await;
2619 let mut client_c = server.create_client(cx_c, "user_c").await;
2620 server
2621 .make_contacts(vec![
2622 (&client_a, cx_a),
2623 (&client_b, cx_b),
2624 (&client_c, cx_c),
2625 ])
2626 .await;
2627
2628 // Share a project as client A
2629 fs.insert_tree(
2630 "/a",
2631 json!({
2632 "a.rs": "let one = two",
2633 "other.rs": "",
2634 }),
2635 )
2636 .await;
2637 let project_a = cx_a.update(|cx| {
2638 Project::local(
2639 client_a.clone(),
2640 client_a.user_store.clone(),
2641 lang_registry.clone(),
2642 fs.clone(),
2643 cx,
2644 )
2645 });
2646 let (worktree_a, _) = project_a
2647 .update(cx_a, |p, cx| {
2648 p.find_or_create_local_worktree("/a", true, cx)
2649 })
2650 .await
2651 .unwrap();
2652 worktree_a
2653 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2654 .await;
2655 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2656 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2657
2658 // Cause the language server to start.
2659 let _buffer = cx_a
2660 .background()
2661 .spawn(project_a.update(cx_a, |project, cx| {
2662 project.open_buffer(
2663 ProjectPath {
2664 worktree_id,
2665 path: Path::new("other.rs").into(),
2666 },
2667 cx,
2668 )
2669 }))
2670 .await
2671 .unwrap();
2672
2673 // Join the worktree as client B.
2674 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2675
2676 // Simulate a language server reporting errors for a file.
2677 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2678 fake_language_server
2679 .receive_notification::<lsp::notification::DidOpenTextDocument>()
2680 .await;
2681 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2682 lsp::PublishDiagnosticsParams {
2683 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2684 version: None,
2685 diagnostics: vec![lsp::Diagnostic {
2686 severity: Some(lsp::DiagnosticSeverity::ERROR),
2687 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2688 message: "message 1".to_string(),
2689 ..Default::default()
2690 }],
2691 },
2692 );
2693
2694 // Wait for server to see the diagnostics update.
2695 deterministic.run_until_parked();
2696 {
2697 let store = server.store.read().await;
2698 let project = store.project(project_id).unwrap();
2699 let worktree = project.worktrees.get(&worktree_id.to_proto()).unwrap();
2700 assert!(!worktree.diagnostic_summaries.is_empty());
2701 }
2702
2703 // Ensure client B observes the new diagnostics.
2704 project_b.read_with(cx_b, |project, cx| {
2705 assert_eq!(
2706 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2707 &[(
2708 ProjectPath {
2709 worktree_id,
2710 path: Arc::from(Path::new("a.rs")),
2711 },
2712 DiagnosticSummary {
2713 error_count: 1,
2714 warning_count: 0,
2715 ..Default::default()
2716 },
2717 )]
2718 )
2719 });
2720
2721 // Join project as client C and observe the diagnostics.
2722 let project_c = client_c.build_remote_project(&project_a, cx_a, cx_c).await;
2723 project_c.read_with(cx_c, |project, cx| {
2724 assert_eq!(
2725 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2726 &[(
2727 ProjectPath {
2728 worktree_id,
2729 path: Arc::from(Path::new("a.rs")),
2730 },
2731 DiagnosticSummary {
2732 error_count: 1,
2733 warning_count: 0,
2734 ..Default::default()
2735 },
2736 )]
2737 )
2738 });
2739
2740 // Simulate a language server reporting more errors for a file.
2741 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2742 lsp::PublishDiagnosticsParams {
2743 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2744 version: None,
2745 diagnostics: vec![
2746 lsp::Diagnostic {
2747 severity: Some(lsp::DiagnosticSeverity::ERROR),
2748 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2749 message: "message 1".to_string(),
2750 ..Default::default()
2751 },
2752 lsp::Diagnostic {
2753 severity: Some(lsp::DiagnosticSeverity::WARNING),
2754 range: lsp::Range::new(
2755 lsp::Position::new(0, 10),
2756 lsp::Position::new(0, 13),
2757 ),
2758 message: "message 2".to_string(),
2759 ..Default::default()
2760 },
2761 ],
2762 },
2763 );
2764
2765 // Clients B and C get the updated summaries
2766 deterministic.run_until_parked();
2767 project_b.read_with(cx_b, |project, cx| {
2768 assert_eq!(
2769 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2770 [(
2771 ProjectPath {
2772 worktree_id,
2773 path: Arc::from(Path::new("a.rs")),
2774 },
2775 DiagnosticSummary {
2776 error_count: 1,
2777 warning_count: 1,
2778 ..Default::default()
2779 },
2780 )]
2781 );
2782 });
2783 project_c.read_with(cx_c, |project, cx| {
2784 assert_eq!(
2785 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2786 [(
2787 ProjectPath {
2788 worktree_id,
2789 path: Arc::from(Path::new("a.rs")),
2790 },
2791 DiagnosticSummary {
2792 error_count: 1,
2793 warning_count: 1,
2794 ..Default::default()
2795 },
2796 )]
2797 );
2798 });
2799
2800 // Open the file with the errors on client B. They should be present.
2801 let buffer_b = cx_b
2802 .background()
2803 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2804 .await
2805 .unwrap();
2806
2807 buffer_b.read_with(cx_b, |buffer, _| {
2808 assert_eq!(
2809 buffer
2810 .snapshot()
2811 .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2812 .map(|entry| entry)
2813 .collect::<Vec<_>>(),
2814 &[
2815 DiagnosticEntry {
2816 range: Point::new(0, 4)..Point::new(0, 7),
2817 diagnostic: Diagnostic {
2818 group_id: 0,
2819 message: "message 1".to_string(),
2820 severity: lsp::DiagnosticSeverity::ERROR,
2821 is_primary: true,
2822 ..Default::default()
2823 }
2824 },
2825 DiagnosticEntry {
2826 range: Point::new(0, 10)..Point::new(0, 13),
2827 diagnostic: Diagnostic {
2828 group_id: 1,
2829 severity: lsp::DiagnosticSeverity::WARNING,
2830 message: "message 2".to_string(),
2831 is_primary: true,
2832 ..Default::default()
2833 }
2834 }
2835 ]
2836 );
2837 });
2838
2839 // Simulate a language server reporting no errors for a file.
2840 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2841 lsp::PublishDiagnosticsParams {
2842 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2843 version: None,
2844 diagnostics: vec![],
2845 },
2846 );
2847 deterministic.run_until_parked();
2848 project_a.read_with(cx_a, |project, cx| {
2849 assert_eq!(project.diagnostic_summaries(cx).collect::<Vec<_>>(), [])
2850 });
2851 project_b.read_with(cx_b, |project, cx| {
2852 assert_eq!(project.diagnostic_summaries(cx).collect::<Vec<_>>(), [])
2853 });
2854 project_c.read_with(cx_c, |project, cx| {
2855 assert_eq!(project.diagnostic_summaries(cx).collect::<Vec<_>>(), [])
2856 });
2857 }
2858
2859 #[gpui::test(iterations = 10)]
2860 async fn test_collaborating_with_completion(
2861 cx_a: &mut TestAppContext,
2862 cx_b: &mut TestAppContext,
2863 ) {
2864 cx_a.foreground().forbid_parking();
2865 let lang_registry = Arc::new(LanguageRegistry::test());
2866 let fs = FakeFs::new(cx_a.background());
2867
2868 // Set up a fake language server.
2869 let mut language = Language::new(
2870 LanguageConfig {
2871 name: "Rust".into(),
2872 path_suffixes: vec!["rs".to_string()],
2873 ..Default::default()
2874 },
2875 Some(tree_sitter_rust::language()),
2876 );
2877 let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
2878 capabilities: lsp::ServerCapabilities {
2879 completion_provider: Some(lsp::CompletionOptions {
2880 trigger_characters: Some(vec![".".to_string()]),
2881 ..Default::default()
2882 }),
2883 ..Default::default()
2884 },
2885 ..Default::default()
2886 });
2887 lang_registry.add(Arc::new(language));
2888
2889 // Connect to a server as 2 clients.
2890 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2891 let client_a = server.create_client(cx_a, "user_a").await;
2892 let mut client_b = server.create_client(cx_b, "user_b").await;
2893 server
2894 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2895 .await;
2896
2897 // Share a project as client A
2898 fs.insert_tree(
2899 "/a",
2900 json!({
2901 "main.rs": "fn main() { a }",
2902 "other.rs": "",
2903 }),
2904 )
2905 .await;
2906 let project_a = cx_a.update(|cx| {
2907 Project::local(
2908 client_a.clone(),
2909 client_a.user_store.clone(),
2910 lang_registry.clone(),
2911 fs.clone(),
2912 cx,
2913 )
2914 });
2915 let (worktree_a, _) = project_a
2916 .update(cx_a, |p, cx| {
2917 p.find_or_create_local_worktree("/a", true, cx)
2918 })
2919 .await
2920 .unwrap();
2921 worktree_a
2922 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2923 .await;
2924 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2925
2926 // Join the worktree as client B.
2927 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2928
2929 // Open a file in an editor as the guest.
2930 let buffer_b = project_b
2931 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2932 .await
2933 .unwrap();
2934 let (window_b, _) = cx_b.add_window(|_| EmptyView);
2935 let editor_b = cx_b.add_view(window_b, |cx| {
2936 Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
2937 });
2938
2939 let fake_language_server = fake_language_servers.next().await.unwrap();
2940 buffer_b
2941 .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2942 .await;
2943
2944 // Type a completion trigger character as the guest.
2945 editor_b.update(cx_b, |editor, cx| {
2946 editor.select_ranges([13..13], None, cx);
2947 editor.handle_input(&Input(".".into()), cx);
2948 cx.focus(&editor_b);
2949 });
2950
2951 // Receive a completion request as the host's language server.
2952 // Return some completions from the host's language server.
2953 cx_a.foreground().start_waiting();
2954 fake_language_server
2955 .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
2956 assert_eq!(
2957 params.text_document_position.text_document.uri,
2958 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2959 );
2960 assert_eq!(
2961 params.text_document_position.position,
2962 lsp::Position::new(0, 14),
2963 );
2964
2965 Ok(Some(lsp::CompletionResponse::Array(vec![
2966 lsp::CompletionItem {
2967 label: "first_method(…)".into(),
2968 detail: Some("fn(&mut self, B) -> C".into()),
2969 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2970 new_text: "first_method($1)".to_string(),
2971 range: lsp::Range::new(
2972 lsp::Position::new(0, 14),
2973 lsp::Position::new(0, 14),
2974 ),
2975 })),
2976 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2977 ..Default::default()
2978 },
2979 lsp::CompletionItem {
2980 label: "second_method(…)".into(),
2981 detail: Some("fn(&mut self, C) -> D<E>".into()),
2982 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2983 new_text: "second_method()".to_string(),
2984 range: lsp::Range::new(
2985 lsp::Position::new(0, 14),
2986 lsp::Position::new(0, 14),
2987 ),
2988 })),
2989 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2990 ..Default::default()
2991 },
2992 ])))
2993 })
2994 .next()
2995 .await
2996 .unwrap();
2997 cx_a.foreground().finish_waiting();
2998
2999 // Open the buffer on the host.
3000 let buffer_a = project_a
3001 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
3002 .await
3003 .unwrap();
3004 buffer_a
3005 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
3006 .await;
3007
3008 // Confirm a completion on the guest.
3009 editor_b
3010 .condition(&cx_b, |editor, _| editor.context_menu_visible())
3011 .await;
3012 editor_b.update(cx_b, |editor, cx| {
3013 editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
3014 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
3015 });
3016
3017 // Return a resolved completion from the host's language server.
3018 // The resolved completion has an additional text edit.
3019 fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
3020 |params, _| async move {
3021 assert_eq!(params.label, "first_method(…)");
3022 Ok(lsp::CompletionItem {
3023 label: "first_method(…)".into(),
3024 detail: Some("fn(&mut self, B) -> C".into()),
3025 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3026 new_text: "first_method($1)".to_string(),
3027 range: lsp::Range::new(
3028 lsp::Position::new(0, 14),
3029 lsp::Position::new(0, 14),
3030 ),
3031 })),
3032 additional_text_edits: Some(vec![lsp::TextEdit {
3033 new_text: "use d::SomeTrait;\n".to_string(),
3034 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
3035 }]),
3036 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3037 ..Default::default()
3038 })
3039 },
3040 );
3041
3042 // The additional edit is applied.
3043 buffer_a
3044 .condition(&cx_a, |buffer, _| {
3045 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3046 })
3047 .await;
3048 buffer_b
3049 .condition(&cx_b, |buffer, _| {
3050 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3051 })
3052 .await;
3053 }
3054
3055 #[gpui::test(iterations = 10)]
3056 async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3057 cx_a.foreground().forbid_parking();
3058 let lang_registry = Arc::new(LanguageRegistry::test());
3059 let fs = FakeFs::new(cx_a.background());
3060
3061 // Connect to a server as 2 clients.
3062 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3063 let client_a = server.create_client(cx_a, "user_a").await;
3064 let mut client_b = server.create_client(cx_b, "user_b").await;
3065 server
3066 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3067 .await;
3068
3069 // Share a project as client A
3070 fs.insert_tree(
3071 "/a",
3072 json!({
3073 "a.rs": "let one = 1;",
3074 }),
3075 )
3076 .await;
3077 let project_a = cx_a.update(|cx| {
3078 Project::local(
3079 client_a.clone(),
3080 client_a.user_store.clone(),
3081 lang_registry.clone(),
3082 fs.clone(),
3083 cx,
3084 )
3085 });
3086 let (worktree_a, _) = project_a
3087 .update(cx_a, |p, cx| {
3088 p.find_or_create_local_worktree("/a", true, cx)
3089 })
3090 .await
3091 .unwrap();
3092 worktree_a
3093 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3094 .await;
3095 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3096 let buffer_a = project_a
3097 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
3098 .await
3099 .unwrap();
3100
3101 // Join the worktree as client B.
3102 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3103
3104 let buffer_b = cx_b
3105 .background()
3106 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3107 .await
3108 .unwrap();
3109 buffer_b.update(cx_b, |buffer, cx| {
3110 buffer.edit([(4..7, "six")], cx);
3111 buffer.edit([(10..11, "6")], cx);
3112 assert_eq!(buffer.text(), "let six = 6;");
3113 assert!(buffer.is_dirty());
3114 assert!(!buffer.has_conflict());
3115 });
3116 buffer_a
3117 .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
3118 .await;
3119
3120 fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
3121 .await
3122 .unwrap();
3123 buffer_a
3124 .condition(cx_a, |buffer, _| buffer.has_conflict())
3125 .await;
3126 buffer_b
3127 .condition(cx_b, |buffer, _| buffer.has_conflict())
3128 .await;
3129
3130 project_b
3131 .update(cx_b, |project, cx| {
3132 project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
3133 })
3134 .await
3135 .unwrap();
3136 buffer_a.read_with(cx_a, |buffer, _| {
3137 assert_eq!(buffer.text(), "let seven = 7;");
3138 assert!(!buffer.is_dirty());
3139 assert!(!buffer.has_conflict());
3140 });
3141 buffer_b.read_with(cx_b, |buffer, _| {
3142 assert_eq!(buffer.text(), "let seven = 7;");
3143 assert!(!buffer.is_dirty());
3144 assert!(!buffer.has_conflict());
3145 });
3146
3147 buffer_a.update(cx_a, |buffer, cx| {
3148 // Undoing on the host is a no-op when the reload was initiated by the guest.
3149 buffer.undo(cx);
3150 assert_eq!(buffer.text(), "let seven = 7;");
3151 assert!(!buffer.is_dirty());
3152 assert!(!buffer.has_conflict());
3153 });
3154 buffer_b.update(cx_b, |buffer, cx| {
3155 // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
3156 buffer.undo(cx);
3157 assert_eq!(buffer.text(), "let six = 6;");
3158 assert!(buffer.is_dirty());
3159 assert!(!buffer.has_conflict());
3160 });
3161 }
3162
3163 #[gpui::test(iterations = 10)]
3164 async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3165 cx_a.foreground().forbid_parking();
3166 let lang_registry = Arc::new(LanguageRegistry::test());
3167 let fs = FakeFs::new(cx_a.background());
3168
3169 // Set up a fake language server.
3170 let mut language = Language::new(
3171 LanguageConfig {
3172 name: "Rust".into(),
3173 path_suffixes: vec!["rs".to_string()],
3174 ..Default::default()
3175 },
3176 Some(tree_sitter_rust::language()),
3177 );
3178 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3179 lang_registry.add(Arc::new(language));
3180
3181 // Connect to a server as 2 clients.
3182 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3183 let client_a = server.create_client(cx_a, "user_a").await;
3184 let mut client_b = server.create_client(cx_b, "user_b").await;
3185 server
3186 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3187 .await;
3188
3189 // Share a project as client A
3190 fs.insert_tree(
3191 "/a",
3192 json!({
3193 "a.rs": "let one = two",
3194 }),
3195 )
3196 .await;
3197 let project_a = cx_a.update(|cx| {
3198 Project::local(
3199 client_a.clone(),
3200 client_a.user_store.clone(),
3201 lang_registry.clone(),
3202 fs.clone(),
3203 cx,
3204 )
3205 });
3206 let (worktree_a, _) = project_a
3207 .update(cx_a, |p, cx| {
3208 p.find_or_create_local_worktree("/a", true, cx)
3209 })
3210 .await
3211 .unwrap();
3212 worktree_a
3213 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3214 .await;
3215 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3216
3217 // Join the project as client B.
3218 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3219
3220 let buffer_b = cx_b
3221 .background()
3222 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3223 .await
3224 .unwrap();
3225
3226 let fake_language_server = fake_language_servers.next().await.unwrap();
3227 fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
3228 Ok(Some(vec![
3229 lsp::TextEdit {
3230 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
3231 new_text: "h".to_string(),
3232 },
3233 lsp::TextEdit {
3234 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
3235 new_text: "y".to_string(),
3236 },
3237 ]))
3238 });
3239
3240 project_b
3241 .update(cx_b, |project, cx| {
3242 project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
3243 })
3244 .await
3245 .unwrap();
3246 assert_eq!(
3247 buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
3248 "let honey = two"
3249 );
3250 }
3251
3252 #[gpui::test(iterations = 10)]
3253 async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3254 cx_a.foreground().forbid_parking();
3255 let lang_registry = Arc::new(LanguageRegistry::test());
3256 let fs = FakeFs::new(cx_a.background());
3257 fs.insert_tree(
3258 "/root-1",
3259 json!({
3260 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
3261 }),
3262 )
3263 .await;
3264 fs.insert_tree(
3265 "/root-2",
3266 json!({
3267 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
3268 }),
3269 )
3270 .await;
3271
3272 // Set up a fake language server.
3273 let mut language = Language::new(
3274 LanguageConfig {
3275 name: "Rust".into(),
3276 path_suffixes: vec!["rs".to_string()],
3277 ..Default::default()
3278 },
3279 Some(tree_sitter_rust::language()),
3280 );
3281 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3282 lang_registry.add(Arc::new(language));
3283
3284 // Connect to a server as 2 clients.
3285 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3286 let client_a = server.create_client(cx_a, "user_a").await;
3287 let mut client_b = server.create_client(cx_b, "user_b").await;
3288 server
3289 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3290 .await;
3291
3292 // Share a project as client A
3293 let project_a = cx_a.update(|cx| {
3294 Project::local(
3295 client_a.clone(),
3296 client_a.user_store.clone(),
3297 lang_registry.clone(),
3298 fs.clone(),
3299 cx,
3300 )
3301 });
3302 let (worktree_a, _) = project_a
3303 .update(cx_a, |p, cx| {
3304 p.find_or_create_local_worktree("/root-1", true, cx)
3305 })
3306 .await
3307 .unwrap();
3308 worktree_a
3309 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3310 .await;
3311 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3312
3313 // Join the project as client B.
3314 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3315
3316 // Open the file on client B.
3317 let buffer_b = cx_b
3318 .background()
3319 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3320 .await
3321 .unwrap();
3322
3323 // Request the definition of a symbol as the guest.
3324 let fake_language_server = fake_language_servers.next().await.unwrap();
3325 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3326 |_, _| async move {
3327 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3328 lsp::Location::new(
3329 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3330 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3331 ),
3332 )))
3333 },
3334 );
3335
3336 let definitions_1 = project_b
3337 .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
3338 .await
3339 .unwrap();
3340 cx_b.read(|cx| {
3341 assert_eq!(definitions_1.len(), 1);
3342 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3343 let target_buffer = definitions_1[0].buffer.read(cx);
3344 assert_eq!(
3345 target_buffer.text(),
3346 "const TWO: usize = 2;\nconst THREE: usize = 3;"
3347 );
3348 assert_eq!(
3349 definitions_1[0].range.to_point(target_buffer),
3350 Point::new(0, 6)..Point::new(0, 9)
3351 );
3352 });
3353
3354 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
3355 // the previous call to `definition`.
3356 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3357 |_, _| async move {
3358 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3359 lsp::Location::new(
3360 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3361 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
3362 ),
3363 )))
3364 },
3365 );
3366
3367 let definitions_2 = project_b
3368 .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
3369 .await
3370 .unwrap();
3371 cx_b.read(|cx| {
3372 assert_eq!(definitions_2.len(), 1);
3373 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3374 let target_buffer = definitions_2[0].buffer.read(cx);
3375 assert_eq!(
3376 target_buffer.text(),
3377 "const TWO: usize = 2;\nconst THREE: usize = 3;"
3378 );
3379 assert_eq!(
3380 definitions_2[0].range.to_point(target_buffer),
3381 Point::new(1, 6)..Point::new(1, 11)
3382 );
3383 });
3384 assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
3385 }
3386
3387 #[gpui::test(iterations = 10)]
3388 async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3389 cx_a.foreground().forbid_parking();
3390 let lang_registry = Arc::new(LanguageRegistry::test());
3391 let fs = FakeFs::new(cx_a.background());
3392 fs.insert_tree(
3393 "/root-1",
3394 json!({
3395 "one.rs": "const ONE: usize = 1;",
3396 "two.rs": "const TWO: usize = one::ONE + one::ONE;",
3397 }),
3398 )
3399 .await;
3400 fs.insert_tree(
3401 "/root-2",
3402 json!({
3403 "three.rs": "const THREE: usize = two::TWO + one::ONE;",
3404 }),
3405 )
3406 .await;
3407
3408 // Set up a fake language server.
3409 let mut language = Language::new(
3410 LanguageConfig {
3411 name: "Rust".into(),
3412 path_suffixes: vec!["rs".to_string()],
3413 ..Default::default()
3414 },
3415 Some(tree_sitter_rust::language()),
3416 );
3417 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3418 lang_registry.add(Arc::new(language));
3419
3420 // Connect to a server as 2 clients.
3421 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3422 let client_a = server.create_client(cx_a, "user_a").await;
3423 let mut client_b = server.create_client(cx_b, "user_b").await;
3424 server
3425 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3426 .await;
3427
3428 // Share a project as client A
3429 let project_a = cx_a.update(|cx| {
3430 Project::local(
3431 client_a.clone(),
3432 client_a.user_store.clone(),
3433 lang_registry.clone(),
3434 fs.clone(),
3435 cx,
3436 )
3437 });
3438 let (worktree_a, _) = project_a
3439 .update(cx_a, |p, cx| {
3440 p.find_or_create_local_worktree("/root-1", true, cx)
3441 })
3442 .await
3443 .unwrap();
3444 worktree_a
3445 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3446 .await;
3447 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3448
3449 // Join the worktree as client B.
3450 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3451
3452 // Open the file on client B.
3453 let buffer_b = cx_b
3454 .background()
3455 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3456 .await
3457 .unwrap();
3458
3459 // Request references to a symbol as the guest.
3460 let fake_language_server = fake_language_servers.next().await.unwrap();
3461 fake_language_server.handle_request::<lsp::request::References, _, _>(
3462 |params, _| async move {
3463 assert_eq!(
3464 params.text_document_position.text_document.uri.as_str(),
3465 "file:///root-1/one.rs"
3466 );
3467 Ok(Some(vec![
3468 lsp::Location {
3469 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3470 range: lsp::Range::new(
3471 lsp::Position::new(0, 24),
3472 lsp::Position::new(0, 27),
3473 ),
3474 },
3475 lsp::Location {
3476 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3477 range: lsp::Range::new(
3478 lsp::Position::new(0, 35),
3479 lsp::Position::new(0, 38),
3480 ),
3481 },
3482 lsp::Location {
3483 uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3484 range: lsp::Range::new(
3485 lsp::Position::new(0, 37),
3486 lsp::Position::new(0, 40),
3487 ),
3488 },
3489 ]))
3490 },
3491 );
3492
3493 let references = project_b
3494 .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3495 .await
3496 .unwrap();
3497 cx_b.read(|cx| {
3498 assert_eq!(references.len(), 3);
3499 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3500
3501 let two_buffer = references[0].buffer.read(cx);
3502 let three_buffer = references[2].buffer.read(cx);
3503 assert_eq!(
3504 two_buffer.file().unwrap().path().as_ref(),
3505 Path::new("two.rs")
3506 );
3507 assert_eq!(references[1].buffer, references[0].buffer);
3508 assert_eq!(
3509 three_buffer.file().unwrap().full_path(cx),
3510 Path::new("three.rs")
3511 );
3512
3513 assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3514 assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3515 assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3516 });
3517 }
3518
3519 #[gpui::test(iterations = 10)]
3520 async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3521 cx_a.foreground().forbid_parking();
3522 let lang_registry = Arc::new(LanguageRegistry::test());
3523 let fs = FakeFs::new(cx_a.background());
3524 fs.insert_tree(
3525 "/root-1",
3526 json!({
3527 "a": "hello world",
3528 "b": "goodnight moon",
3529 "c": "a world of goo",
3530 "d": "world champion of clown world",
3531 }),
3532 )
3533 .await;
3534 fs.insert_tree(
3535 "/root-2",
3536 json!({
3537 "e": "disney world is fun",
3538 }),
3539 )
3540 .await;
3541
3542 // Connect to a server as 2 clients.
3543 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3544 let client_a = server.create_client(cx_a, "user_a").await;
3545 let mut client_b = server.create_client(cx_b, "user_b").await;
3546 server
3547 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3548 .await;
3549
3550 // Share a project as client A
3551 let project_a = cx_a.update(|cx| {
3552 Project::local(
3553 client_a.clone(),
3554 client_a.user_store.clone(),
3555 lang_registry.clone(),
3556 fs.clone(),
3557 cx,
3558 )
3559 });
3560
3561 let (worktree_1, _) = project_a
3562 .update(cx_a, |p, cx| {
3563 p.find_or_create_local_worktree("/root-1", true, cx)
3564 })
3565 .await
3566 .unwrap();
3567 worktree_1
3568 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3569 .await;
3570 let (worktree_2, _) = project_a
3571 .update(cx_a, |p, cx| {
3572 p.find_or_create_local_worktree("/root-2", true, cx)
3573 })
3574 .await
3575 .unwrap();
3576 worktree_2
3577 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3578 .await;
3579
3580 // Join the worktree as client B.
3581 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3582 let results = project_b
3583 .update(cx_b, |project, cx| {
3584 project.search(SearchQuery::text("world", false, false), cx)
3585 })
3586 .await
3587 .unwrap();
3588
3589 let mut ranges_by_path = results
3590 .into_iter()
3591 .map(|(buffer, ranges)| {
3592 buffer.read_with(cx_b, |buffer, cx| {
3593 let path = buffer.file().unwrap().full_path(cx);
3594 let offset_ranges = ranges
3595 .into_iter()
3596 .map(|range| range.to_offset(buffer))
3597 .collect::<Vec<_>>();
3598 (path, offset_ranges)
3599 })
3600 })
3601 .collect::<Vec<_>>();
3602 ranges_by_path.sort_by_key(|(path, _)| path.clone());
3603
3604 assert_eq!(
3605 ranges_by_path,
3606 &[
3607 (PathBuf::from("root-1/a"), vec![6..11]),
3608 (PathBuf::from("root-1/c"), vec![2..7]),
3609 (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3610 (PathBuf::from("root-2/e"), vec![7..12]),
3611 ]
3612 );
3613 }
3614
3615 #[gpui::test(iterations = 10)]
3616 async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3617 cx_a.foreground().forbid_parking();
3618 let lang_registry = Arc::new(LanguageRegistry::test());
3619 let fs = FakeFs::new(cx_a.background());
3620 fs.insert_tree(
3621 "/root-1",
3622 json!({
3623 "main.rs": "fn double(number: i32) -> i32 { number + number }",
3624 }),
3625 )
3626 .await;
3627
3628 // Set up a fake language server.
3629 let mut language = Language::new(
3630 LanguageConfig {
3631 name: "Rust".into(),
3632 path_suffixes: vec!["rs".to_string()],
3633 ..Default::default()
3634 },
3635 Some(tree_sitter_rust::language()),
3636 );
3637 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3638 lang_registry.add(Arc::new(language));
3639
3640 // Connect to a server as 2 clients.
3641 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3642 let client_a = server.create_client(cx_a, "user_a").await;
3643 let mut client_b = server.create_client(cx_b, "user_b").await;
3644 server
3645 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3646 .await;
3647
3648 // Share a project as client A
3649 let project_a = cx_a.update(|cx| {
3650 Project::local(
3651 client_a.clone(),
3652 client_a.user_store.clone(),
3653 lang_registry.clone(),
3654 fs.clone(),
3655 cx,
3656 )
3657 });
3658 let (worktree_a, _) = project_a
3659 .update(cx_a, |p, cx| {
3660 p.find_or_create_local_worktree("/root-1", true, cx)
3661 })
3662 .await
3663 .unwrap();
3664 worktree_a
3665 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3666 .await;
3667 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3668
3669 // Join the worktree as client B.
3670 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3671
3672 // Open the file on client B.
3673 let buffer_b = cx_b
3674 .background()
3675 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
3676 .await
3677 .unwrap();
3678
3679 // Request document highlights as the guest.
3680 let fake_language_server = fake_language_servers.next().await.unwrap();
3681 fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
3682 |params, _| async move {
3683 assert_eq!(
3684 params
3685 .text_document_position_params
3686 .text_document
3687 .uri
3688 .as_str(),
3689 "file:///root-1/main.rs"
3690 );
3691 assert_eq!(
3692 params.text_document_position_params.position,
3693 lsp::Position::new(0, 34)
3694 );
3695 Ok(Some(vec![
3696 lsp::DocumentHighlight {
3697 kind: Some(lsp::DocumentHighlightKind::WRITE),
3698 range: lsp::Range::new(
3699 lsp::Position::new(0, 10),
3700 lsp::Position::new(0, 16),
3701 ),
3702 },
3703 lsp::DocumentHighlight {
3704 kind: Some(lsp::DocumentHighlightKind::READ),
3705 range: lsp::Range::new(
3706 lsp::Position::new(0, 32),
3707 lsp::Position::new(0, 38),
3708 ),
3709 },
3710 lsp::DocumentHighlight {
3711 kind: Some(lsp::DocumentHighlightKind::READ),
3712 range: lsp::Range::new(
3713 lsp::Position::new(0, 41),
3714 lsp::Position::new(0, 47),
3715 ),
3716 },
3717 ]))
3718 },
3719 );
3720
3721 let highlights = project_b
3722 .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
3723 .await
3724 .unwrap();
3725 buffer_b.read_with(cx_b, |buffer, _| {
3726 let snapshot = buffer.snapshot();
3727
3728 let highlights = highlights
3729 .into_iter()
3730 .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3731 .collect::<Vec<_>>();
3732 assert_eq!(
3733 highlights,
3734 &[
3735 (lsp::DocumentHighlightKind::WRITE, 10..16),
3736 (lsp::DocumentHighlightKind::READ, 32..38),
3737 (lsp::DocumentHighlightKind::READ, 41..47)
3738 ]
3739 )
3740 });
3741 }
3742
3743 #[gpui::test(iterations = 10)]
3744 async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3745 cx_a.foreground().forbid_parking();
3746 let lang_registry = Arc::new(LanguageRegistry::test());
3747 let fs = FakeFs::new(cx_a.background());
3748 fs.insert_tree(
3749 "/code",
3750 json!({
3751 "crate-1": {
3752 "one.rs": "const ONE: usize = 1;",
3753 },
3754 "crate-2": {
3755 "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3756 },
3757 "private": {
3758 "passwords.txt": "the-password",
3759 }
3760 }),
3761 )
3762 .await;
3763
3764 // Set up a fake language server.
3765 let mut language = Language::new(
3766 LanguageConfig {
3767 name: "Rust".into(),
3768 path_suffixes: vec!["rs".to_string()],
3769 ..Default::default()
3770 },
3771 Some(tree_sitter_rust::language()),
3772 );
3773 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3774 lang_registry.add(Arc::new(language));
3775
3776 // Connect to a server as 2 clients.
3777 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3778 let client_a = server.create_client(cx_a, "user_a").await;
3779 let mut client_b = server.create_client(cx_b, "user_b").await;
3780 server
3781 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3782 .await;
3783
3784 // Share a project as client A
3785 let project_a = cx_a.update(|cx| {
3786 Project::local(
3787 client_a.clone(),
3788 client_a.user_store.clone(),
3789 lang_registry.clone(),
3790 fs.clone(),
3791 cx,
3792 )
3793 });
3794 let (worktree_a, _) = project_a
3795 .update(cx_a, |p, cx| {
3796 p.find_or_create_local_worktree("/code/crate-1", true, cx)
3797 })
3798 .await
3799 .unwrap();
3800 worktree_a
3801 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3802 .await;
3803 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3804
3805 // Join the worktree as client B.
3806 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3807
3808 // Cause the language server to start.
3809 let _buffer = cx_b
3810 .background()
3811 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3812 .await
3813 .unwrap();
3814
3815 let fake_language_server = fake_language_servers.next().await.unwrap();
3816 fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
3817 |_, _| async move {
3818 #[allow(deprecated)]
3819 Ok(Some(vec![lsp::SymbolInformation {
3820 name: "TWO".into(),
3821 location: lsp::Location {
3822 uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3823 range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3824 },
3825 kind: lsp::SymbolKind::CONSTANT,
3826 tags: None,
3827 container_name: None,
3828 deprecated: None,
3829 }]))
3830 },
3831 );
3832
3833 // Request the definition of a symbol as the guest.
3834 let symbols = project_b
3835 .update(cx_b, |p, cx| p.symbols("two", cx))
3836 .await
3837 .unwrap();
3838 assert_eq!(symbols.len(), 1);
3839 assert_eq!(symbols[0].name, "TWO");
3840
3841 // Open one of the returned symbols.
3842 let buffer_b_2 = project_b
3843 .update(cx_b, |project, cx| {
3844 project.open_buffer_for_symbol(&symbols[0], cx)
3845 })
3846 .await
3847 .unwrap();
3848 buffer_b_2.read_with(cx_b, |buffer, _| {
3849 assert_eq!(
3850 buffer.file().unwrap().path().as_ref(),
3851 Path::new("../crate-2/two.rs")
3852 );
3853 });
3854
3855 // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3856 let mut fake_symbol = symbols[0].clone();
3857 fake_symbol.path = Path::new("/code/secrets").into();
3858 let error = project_b
3859 .update(cx_b, |project, cx| {
3860 project.open_buffer_for_symbol(&fake_symbol, cx)
3861 })
3862 .await
3863 .unwrap_err();
3864 assert!(error.to_string().contains("invalid symbol signature"));
3865 }
3866
3867 #[gpui::test(iterations = 10)]
3868 async fn test_open_buffer_while_getting_definition_pointing_to_it(
3869 cx_a: &mut TestAppContext,
3870 cx_b: &mut TestAppContext,
3871 mut rng: StdRng,
3872 ) {
3873 cx_a.foreground().forbid_parking();
3874 let lang_registry = Arc::new(LanguageRegistry::test());
3875 let fs = FakeFs::new(cx_a.background());
3876 fs.insert_tree(
3877 "/root",
3878 json!({
3879 "a.rs": "const ONE: usize = b::TWO;",
3880 "b.rs": "const TWO: usize = 2",
3881 }),
3882 )
3883 .await;
3884
3885 // Set up a fake language server.
3886 let mut language = Language::new(
3887 LanguageConfig {
3888 name: "Rust".into(),
3889 path_suffixes: vec!["rs".to_string()],
3890 ..Default::default()
3891 },
3892 Some(tree_sitter_rust::language()),
3893 );
3894 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3895 lang_registry.add(Arc::new(language));
3896
3897 // Connect to a server as 2 clients.
3898 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3899 let client_a = server.create_client(cx_a, "user_a").await;
3900 let mut client_b = server.create_client(cx_b, "user_b").await;
3901 server
3902 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3903 .await;
3904
3905 // Share a project as client A
3906 let project_a = cx_a.update(|cx| {
3907 Project::local(
3908 client_a.clone(),
3909 client_a.user_store.clone(),
3910 lang_registry.clone(),
3911 fs.clone(),
3912 cx,
3913 )
3914 });
3915
3916 let (worktree_a, _) = project_a
3917 .update(cx_a, |p, cx| {
3918 p.find_or_create_local_worktree("/root", true, cx)
3919 })
3920 .await
3921 .unwrap();
3922 worktree_a
3923 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3924 .await;
3925 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3926
3927 // Join the project as client B.
3928 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3929
3930 let buffer_b1 = cx_b
3931 .background()
3932 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3933 .await
3934 .unwrap();
3935
3936 let fake_language_server = fake_language_servers.next().await.unwrap();
3937 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3938 |_, _| async move {
3939 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3940 lsp::Location::new(
3941 lsp::Url::from_file_path("/root/b.rs").unwrap(),
3942 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3943 ),
3944 )))
3945 },
3946 );
3947
3948 let definitions;
3949 let buffer_b2;
3950 if rng.gen() {
3951 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3952 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3953 } else {
3954 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3955 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3956 }
3957
3958 let buffer_b2 = buffer_b2.await.unwrap();
3959 let definitions = definitions.await.unwrap();
3960 assert_eq!(definitions.len(), 1);
3961 assert_eq!(definitions[0].buffer, buffer_b2);
3962 }
3963
3964 #[gpui::test(iterations = 10)]
3965 async fn test_collaborating_with_code_actions(
3966 cx_a: &mut TestAppContext,
3967 cx_b: &mut TestAppContext,
3968 ) {
3969 cx_a.foreground().forbid_parking();
3970 let lang_registry = Arc::new(LanguageRegistry::test());
3971 let fs = FakeFs::new(cx_a.background());
3972 cx_b.update(|cx| editor::init(cx));
3973
3974 // Set up a fake language server.
3975 let mut language = Language::new(
3976 LanguageConfig {
3977 name: "Rust".into(),
3978 path_suffixes: vec!["rs".to_string()],
3979 ..Default::default()
3980 },
3981 Some(tree_sitter_rust::language()),
3982 );
3983 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3984 lang_registry.add(Arc::new(language));
3985
3986 // Connect to a server as 2 clients.
3987 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3988 let client_a = server.create_client(cx_a, "user_a").await;
3989 let mut client_b = server.create_client(cx_b, "user_b").await;
3990 server
3991 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3992 .await;
3993
3994 // Share a project as client A
3995 fs.insert_tree(
3996 "/a",
3997 json!({
3998 "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3999 "other.rs": "pub fn foo() -> usize { 4 }",
4000 }),
4001 )
4002 .await;
4003 let project_a = cx_a.update(|cx| {
4004 Project::local(
4005 client_a.clone(),
4006 client_a.user_store.clone(),
4007 lang_registry.clone(),
4008 fs.clone(),
4009 cx,
4010 )
4011 });
4012 let (worktree_a, _) = project_a
4013 .update(cx_a, |p, cx| {
4014 p.find_or_create_local_worktree("/a", true, cx)
4015 })
4016 .await
4017 .unwrap();
4018 worktree_a
4019 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4020 .await;
4021 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4022
4023 // Join the project as client B.
4024 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4025 let mut params = cx_b.update(WorkspaceParams::test);
4026 params.languages = lang_registry.clone();
4027 params.project = project_b.clone();
4028 params.client = client_b.client.clone();
4029 params.user_store = client_b.user_store.clone();
4030
4031 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
4032 let editor_b = workspace_b
4033 .update(cx_b, |workspace, cx| {
4034 workspace.open_path((worktree_id, "main.rs"), true, cx)
4035 })
4036 .await
4037 .unwrap()
4038 .downcast::<Editor>()
4039 .unwrap();
4040
4041 let mut fake_language_server = fake_language_servers.next().await.unwrap();
4042 fake_language_server
4043 .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4044 assert_eq!(
4045 params.text_document.uri,
4046 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4047 );
4048 assert_eq!(params.range.start, lsp::Position::new(0, 0));
4049 assert_eq!(params.range.end, lsp::Position::new(0, 0));
4050 Ok(None)
4051 })
4052 .next()
4053 .await;
4054
4055 // Move cursor to a location that contains code actions.
4056 editor_b.update(cx_b, |editor, cx| {
4057 editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
4058 cx.focus(&editor_b);
4059 });
4060
4061 fake_language_server
4062 .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4063 assert_eq!(
4064 params.text_document.uri,
4065 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4066 );
4067 assert_eq!(params.range.start, lsp::Position::new(1, 31));
4068 assert_eq!(params.range.end, lsp::Position::new(1, 31));
4069
4070 Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
4071 lsp::CodeAction {
4072 title: "Inline into all callers".to_string(),
4073 edit: Some(lsp::WorkspaceEdit {
4074 changes: Some(
4075 [
4076 (
4077 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4078 vec![lsp::TextEdit::new(
4079 lsp::Range::new(
4080 lsp::Position::new(1, 22),
4081 lsp::Position::new(1, 34),
4082 ),
4083 "4".to_string(),
4084 )],
4085 ),
4086 (
4087 lsp::Url::from_file_path("/a/other.rs").unwrap(),
4088 vec![lsp::TextEdit::new(
4089 lsp::Range::new(
4090 lsp::Position::new(0, 0),
4091 lsp::Position::new(0, 27),
4092 ),
4093 "".to_string(),
4094 )],
4095 ),
4096 ]
4097 .into_iter()
4098 .collect(),
4099 ),
4100 ..Default::default()
4101 }),
4102 data: Some(json!({
4103 "codeActionParams": {
4104 "range": {
4105 "start": {"line": 1, "column": 31},
4106 "end": {"line": 1, "column": 31},
4107 }
4108 }
4109 })),
4110 ..Default::default()
4111 },
4112 )]))
4113 })
4114 .next()
4115 .await;
4116
4117 // Toggle code actions and wait for them to display.
4118 editor_b.update(cx_b, |editor, cx| {
4119 editor.toggle_code_actions(
4120 &ToggleCodeActions {
4121 deployed_from_indicator: false,
4122 },
4123 cx,
4124 );
4125 });
4126 editor_b
4127 .condition(&cx_b, |editor, _| editor.context_menu_visible())
4128 .await;
4129
4130 fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
4131
4132 // Confirming the code action will trigger a resolve request.
4133 let confirm_action = workspace_b
4134 .update(cx_b, |workspace, cx| {
4135 Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
4136 })
4137 .unwrap();
4138 fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
4139 |_, _| async move {
4140 Ok(lsp::CodeAction {
4141 title: "Inline into all callers".to_string(),
4142 edit: Some(lsp::WorkspaceEdit {
4143 changes: Some(
4144 [
4145 (
4146 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4147 vec![lsp::TextEdit::new(
4148 lsp::Range::new(
4149 lsp::Position::new(1, 22),
4150 lsp::Position::new(1, 34),
4151 ),
4152 "4".to_string(),
4153 )],
4154 ),
4155 (
4156 lsp::Url::from_file_path("/a/other.rs").unwrap(),
4157 vec![lsp::TextEdit::new(
4158 lsp::Range::new(
4159 lsp::Position::new(0, 0),
4160 lsp::Position::new(0, 27),
4161 ),
4162 "".to_string(),
4163 )],
4164 ),
4165 ]
4166 .into_iter()
4167 .collect(),
4168 ),
4169 ..Default::default()
4170 }),
4171 ..Default::default()
4172 })
4173 },
4174 );
4175
4176 // After the action is confirmed, an editor containing both modified files is opened.
4177 confirm_action.await.unwrap();
4178 let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4179 workspace
4180 .active_item(cx)
4181 .unwrap()
4182 .downcast::<Editor>()
4183 .unwrap()
4184 });
4185 code_action_editor.update(cx_b, |editor, cx| {
4186 assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4187 editor.undo(&Undo, cx);
4188 assert_eq!(
4189 editor.text(cx),
4190 "mod other;\nfn main() { let foo = other::foo(); }\npub fn foo() -> usize { 4 }"
4191 );
4192 editor.redo(&Redo, cx);
4193 assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4194 });
4195 }
4196
4197 #[gpui::test(iterations = 10)]
4198 async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4199 cx_a.foreground().forbid_parking();
4200 let lang_registry = Arc::new(LanguageRegistry::test());
4201 let fs = FakeFs::new(cx_a.background());
4202 cx_b.update(|cx| editor::init(cx));
4203
4204 // Set up a fake language server.
4205 let mut language = Language::new(
4206 LanguageConfig {
4207 name: "Rust".into(),
4208 path_suffixes: vec!["rs".to_string()],
4209 ..Default::default()
4210 },
4211 Some(tree_sitter_rust::language()),
4212 );
4213 let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
4214 capabilities: lsp::ServerCapabilities {
4215 rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
4216 prepare_provider: Some(true),
4217 work_done_progress_options: Default::default(),
4218 })),
4219 ..Default::default()
4220 },
4221 ..Default::default()
4222 });
4223 lang_registry.add(Arc::new(language));
4224
4225 // Connect to a server as 2 clients.
4226 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4227 let client_a = server.create_client(cx_a, "user_a").await;
4228 let mut client_b = server.create_client(cx_b, "user_b").await;
4229 server
4230 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4231 .await;
4232
4233 // Share a project as client A
4234 fs.insert_tree(
4235 "/dir",
4236 json!({
4237 "one.rs": "const ONE: usize = 1;",
4238 "two.rs": "const TWO: usize = one::ONE + one::ONE;"
4239 }),
4240 )
4241 .await;
4242 let project_a = cx_a.update(|cx| {
4243 Project::local(
4244 client_a.clone(),
4245 client_a.user_store.clone(),
4246 lang_registry.clone(),
4247 fs.clone(),
4248 cx,
4249 )
4250 });
4251 let (worktree_a, _) = project_a
4252 .update(cx_a, |p, cx| {
4253 p.find_or_create_local_worktree("/dir", true, cx)
4254 })
4255 .await
4256 .unwrap();
4257 worktree_a
4258 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4259 .await;
4260 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4261
4262 // Join the worktree as client B.
4263 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4264 let mut params = cx_b.update(WorkspaceParams::test);
4265 params.languages = lang_registry.clone();
4266 params.project = project_b.clone();
4267 params.client = client_b.client.clone();
4268 params.user_store = client_b.user_store.clone();
4269
4270 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
4271 let editor_b = workspace_b
4272 .update(cx_b, |workspace, cx| {
4273 workspace.open_path((worktree_id, "one.rs"), true, cx)
4274 })
4275 .await
4276 .unwrap()
4277 .downcast::<Editor>()
4278 .unwrap();
4279 let fake_language_server = fake_language_servers.next().await.unwrap();
4280
4281 // Move cursor to a location that can be renamed.
4282 let prepare_rename = editor_b.update(cx_b, |editor, cx| {
4283 editor.select_ranges([7..7], None, cx);
4284 editor.rename(&Rename, cx).unwrap()
4285 });
4286
4287 fake_language_server
4288 .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
4289 assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
4290 assert_eq!(params.position, lsp::Position::new(0, 7));
4291 Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4292 lsp::Position::new(0, 6),
4293 lsp::Position::new(0, 9),
4294 ))))
4295 })
4296 .next()
4297 .await
4298 .unwrap();
4299 prepare_rename.await.unwrap();
4300 editor_b.update(cx_b, |editor, cx| {
4301 let rename = editor.pending_rename().unwrap();
4302 let buffer = editor.buffer().read(cx).snapshot(cx);
4303 assert_eq!(
4304 rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
4305 6..9
4306 );
4307 rename.editor.update(cx, |rename_editor, cx| {
4308 rename_editor.buffer().update(cx, |rename_buffer, cx| {
4309 rename_buffer.edit([(0..3, "THREE")], cx);
4310 });
4311 });
4312 });
4313
4314 let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
4315 Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
4316 });
4317 fake_language_server
4318 .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
4319 assert_eq!(
4320 params.text_document_position.text_document.uri.as_str(),
4321 "file:///dir/one.rs"
4322 );
4323 assert_eq!(
4324 params.text_document_position.position,
4325 lsp::Position::new(0, 6)
4326 );
4327 assert_eq!(params.new_name, "THREE");
4328 Ok(Some(lsp::WorkspaceEdit {
4329 changes: Some(
4330 [
4331 (
4332 lsp::Url::from_file_path("/dir/one.rs").unwrap(),
4333 vec![lsp::TextEdit::new(
4334 lsp::Range::new(
4335 lsp::Position::new(0, 6),
4336 lsp::Position::new(0, 9),
4337 ),
4338 "THREE".to_string(),
4339 )],
4340 ),
4341 (
4342 lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4343 vec![
4344 lsp::TextEdit::new(
4345 lsp::Range::new(
4346 lsp::Position::new(0, 24),
4347 lsp::Position::new(0, 27),
4348 ),
4349 "THREE".to_string(),
4350 ),
4351 lsp::TextEdit::new(
4352 lsp::Range::new(
4353 lsp::Position::new(0, 35),
4354 lsp::Position::new(0, 38),
4355 ),
4356 "THREE".to_string(),
4357 ),
4358 ],
4359 ),
4360 ]
4361 .into_iter()
4362 .collect(),
4363 ),
4364 ..Default::default()
4365 }))
4366 })
4367 .next()
4368 .await
4369 .unwrap();
4370 confirm_rename.await.unwrap();
4371
4372 let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4373 workspace
4374 .active_item(cx)
4375 .unwrap()
4376 .downcast::<Editor>()
4377 .unwrap()
4378 });
4379 rename_editor.update(cx_b, |editor, cx| {
4380 assert_eq!(
4381 editor.text(cx),
4382 "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4383 );
4384 editor.undo(&Undo, cx);
4385 assert_eq!(
4386 editor.text(cx),
4387 "const ONE: usize = 1;\nconst TWO: usize = one::ONE + one::ONE;"
4388 );
4389 editor.redo(&Redo, cx);
4390 assert_eq!(
4391 editor.text(cx),
4392 "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4393 );
4394 });
4395
4396 // Ensure temporary rename edits cannot be undone/redone.
4397 editor_b.update(cx_b, |editor, cx| {
4398 editor.undo(&Undo, cx);
4399 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4400 editor.undo(&Undo, cx);
4401 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4402 editor.redo(&Redo, cx);
4403 assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4404 })
4405 }
4406
4407 #[gpui::test(iterations = 10)]
4408 async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4409 cx_a.foreground().forbid_parking();
4410
4411 // Connect to a server as 2 clients.
4412 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4413 let client_a = server.create_client(cx_a, "user_a").await;
4414 let client_b = server.create_client(cx_b, "user_b").await;
4415
4416 // Create an org that includes these 2 users.
4417 let db = &server.app_state.db;
4418 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4419 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4420 .await
4421 .unwrap();
4422 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4423 .await
4424 .unwrap();
4425
4426 // Create a channel that includes all the users.
4427 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4428 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4429 .await
4430 .unwrap();
4431 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4432 .await
4433 .unwrap();
4434 db.create_channel_message(
4435 channel_id,
4436 client_b.current_user_id(&cx_b),
4437 "hello A, it's B.",
4438 OffsetDateTime::now_utc(),
4439 1,
4440 )
4441 .await
4442 .unwrap();
4443
4444 let channels_a = cx_a
4445 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4446 channels_a
4447 .condition(cx_a, |list, _| list.available_channels().is_some())
4448 .await;
4449 channels_a.read_with(cx_a, |list, _| {
4450 assert_eq!(
4451 list.available_channels().unwrap(),
4452 &[ChannelDetails {
4453 id: channel_id.to_proto(),
4454 name: "test-channel".to_string()
4455 }]
4456 )
4457 });
4458 let channel_a = channels_a.update(cx_a, |this, cx| {
4459 this.get_channel(channel_id.to_proto(), cx).unwrap()
4460 });
4461 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4462 channel_a
4463 .condition(&cx_a, |channel, _| {
4464 channel_messages(channel)
4465 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4466 })
4467 .await;
4468
4469 let channels_b = cx_b
4470 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4471 channels_b
4472 .condition(cx_b, |list, _| list.available_channels().is_some())
4473 .await;
4474 channels_b.read_with(cx_b, |list, _| {
4475 assert_eq!(
4476 list.available_channels().unwrap(),
4477 &[ChannelDetails {
4478 id: channel_id.to_proto(),
4479 name: "test-channel".to_string()
4480 }]
4481 )
4482 });
4483
4484 let channel_b = channels_b.update(cx_b, |this, cx| {
4485 this.get_channel(channel_id.to_proto(), cx).unwrap()
4486 });
4487 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4488 channel_b
4489 .condition(&cx_b, |channel, _| {
4490 channel_messages(channel)
4491 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4492 })
4493 .await;
4494
4495 channel_a
4496 .update(cx_a, |channel, cx| {
4497 channel
4498 .send_message("oh, hi B.".to_string(), cx)
4499 .unwrap()
4500 .detach();
4501 let task = channel.send_message("sup".to_string(), cx).unwrap();
4502 assert_eq!(
4503 channel_messages(channel),
4504 &[
4505 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4506 ("user_a".to_string(), "oh, hi B.".to_string(), true),
4507 ("user_a".to_string(), "sup".to_string(), true)
4508 ]
4509 );
4510 task
4511 })
4512 .await
4513 .unwrap();
4514
4515 channel_b
4516 .condition(&cx_b, |channel, _| {
4517 channel_messages(channel)
4518 == [
4519 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4520 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4521 ("user_a".to_string(), "sup".to_string(), false),
4522 ]
4523 })
4524 .await;
4525
4526 assert_eq!(
4527 server
4528 .state()
4529 .await
4530 .channel(channel_id)
4531 .unwrap()
4532 .connection_ids
4533 .len(),
4534 2
4535 );
4536 cx_b.update(|_| drop(channel_b));
4537 server
4538 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4539 .await;
4540
4541 cx_a.update(|_| drop(channel_a));
4542 server
4543 .condition(|state| state.channel(channel_id).is_none())
4544 .await;
4545 }
4546
4547 #[gpui::test(iterations = 10)]
4548 async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4549 cx_a.foreground().forbid_parking();
4550
4551 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4552 let client_a = server.create_client(cx_a, "user_a").await;
4553
4554 let db = &server.app_state.db;
4555 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4556 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4557 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4558 .await
4559 .unwrap();
4560 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4561 .await
4562 .unwrap();
4563
4564 let channels_a = cx_a
4565 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4566 channels_a
4567 .condition(cx_a, |list, _| list.available_channels().is_some())
4568 .await;
4569 let channel_a = channels_a.update(cx_a, |this, cx| {
4570 this.get_channel(channel_id.to_proto(), cx).unwrap()
4571 });
4572
4573 // Messages aren't allowed to be too long.
4574 channel_a
4575 .update(cx_a, |channel, cx| {
4576 let long_body = "this is long.\n".repeat(1024);
4577 channel.send_message(long_body, cx).unwrap()
4578 })
4579 .await
4580 .unwrap_err();
4581
4582 // Messages aren't allowed to be blank.
4583 channel_a.update(cx_a, |channel, cx| {
4584 channel.send_message(String::new(), cx).unwrap_err()
4585 });
4586
4587 // Leading and trailing whitespace are trimmed.
4588 channel_a
4589 .update(cx_a, |channel, cx| {
4590 channel
4591 .send_message("\n surrounded by whitespace \n".to_string(), cx)
4592 .unwrap()
4593 })
4594 .await
4595 .unwrap();
4596 assert_eq!(
4597 db.get_channel_messages(channel_id, 10, None)
4598 .await
4599 .unwrap()
4600 .iter()
4601 .map(|m| &m.body)
4602 .collect::<Vec<_>>(),
4603 &["surrounded by whitespace"]
4604 );
4605 }
4606
4607 #[gpui::test(iterations = 10)]
4608 async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4609 cx_a.foreground().forbid_parking();
4610
4611 // Connect to a server as 2 clients.
4612 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4613 let client_a = server.create_client(cx_a, "user_a").await;
4614 let client_b = server.create_client(cx_b, "user_b").await;
4615 let mut status_b = client_b.status();
4616
4617 // Create an org that includes these 2 users.
4618 let db = &server.app_state.db;
4619 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4620 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4621 .await
4622 .unwrap();
4623 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4624 .await
4625 .unwrap();
4626
4627 // Create a channel that includes all the users.
4628 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4629 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4630 .await
4631 .unwrap();
4632 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4633 .await
4634 .unwrap();
4635 db.create_channel_message(
4636 channel_id,
4637 client_b.current_user_id(&cx_b),
4638 "hello A, it's B.",
4639 OffsetDateTime::now_utc(),
4640 2,
4641 )
4642 .await
4643 .unwrap();
4644
4645 let channels_a = cx_a
4646 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4647 channels_a
4648 .condition(cx_a, |list, _| list.available_channels().is_some())
4649 .await;
4650
4651 channels_a.read_with(cx_a, |list, _| {
4652 assert_eq!(
4653 list.available_channels().unwrap(),
4654 &[ChannelDetails {
4655 id: channel_id.to_proto(),
4656 name: "test-channel".to_string()
4657 }]
4658 )
4659 });
4660 let channel_a = channels_a.update(cx_a, |this, cx| {
4661 this.get_channel(channel_id.to_proto(), cx).unwrap()
4662 });
4663 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4664 channel_a
4665 .condition(&cx_a, |channel, _| {
4666 channel_messages(channel)
4667 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4668 })
4669 .await;
4670
4671 let channels_b = cx_b
4672 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4673 channels_b
4674 .condition(cx_b, |list, _| list.available_channels().is_some())
4675 .await;
4676 channels_b.read_with(cx_b, |list, _| {
4677 assert_eq!(
4678 list.available_channels().unwrap(),
4679 &[ChannelDetails {
4680 id: channel_id.to_proto(),
4681 name: "test-channel".to_string()
4682 }]
4683 )
4684 });
4685
4686 let channel_b = channels_b.update(cx_b, |this, cx| {
4687 this.get_channel(channel_id.to_proto(), cx).unwrap()
4688 });
4689 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4690 channel_b
4691 .condition(&cx_b, |channel, _| {
4692 channel_messages(channel)
4693 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4694 })
4695 .await;
4696
4697 // Disconnect client B, ensuring we can still access its cached channel data.
4698 server.forbid_connections();
4699 server.disconnect_client(client_b.current_user_id(&cx_b));
4700 cx_b.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
4701 while !matches!(
4702 status_b.next().await,
4703 Some(client::Status::ReconnectionError { .. })
4704 ) {}
4705
4706 channels_b.read_with(cx_b, |channels, _| {
4707 assert_eq!(
4708 channels.available_channels().unwrap(),
4709 [ChannelDetails {
4710 id: channel_id.to_proto(),
4711 name: "test-channel".to_string()
4712 }]
4713 )
4714 });
4715 channel_b.read_with(cx_b, |channel, _| {
4716 assert_eq!(
4717 channel_messages(channel),
4718 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4719 )
4720 });
4721
4722 // Send a message from client B while it is disconnected.
4723 channel_b
4724 .update(cx_b, |channel, cx| {
4725 let task = channel
4726 .send_message("can you see this?".to_string(), cx)
4727 .unwrap();
4728 assert_eq!(
4729 channel_messages(channel),
4730 &[
4731 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4732 ("user_b".to_string(), "can you see this?".to_string(), true)
4733 ]
4734 );
4735 task
4736 })
4737 .await
4738 .unwrap_err();
4739
4740 // Send a message from client A while B is disconnected.
4741 channel_a
4742 .update(cx_a, |channel, cx| {
4743 channel
4744 .send_message("oh, hi B.".to_string(), cx)
4745 .unwrap()
4746 .detach();
4747 let task = channel.send_message("sup".to_string(), cx).unwrap();
4748 assert_eq!(
4749 channel_messages(channel),
4750 &[
4751 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4752 ("user_a".to_string(), "oh, hi B.".to_string(), true),
4753 ("user_a".to_string(), "sup".to_string(), true)
4754 ]
4755 );
4756 task
4757 })
4758 .await
4759 .unwrap();
4760
4761 // Give client B a chance to reconnect.
4762 server.allow_connections();
4763 cx_b.foreground().advance_clock(Duration::from_secs(10));
4764
4765 // Verify that B sees the new messages upon reconnection, as well as the message client B
4766 // sent while offline.
4767 channel_b
4768 .condition(&cx_b, |channel, _| {
4769 channel_messages(channel)
4770 == [
4771 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4772 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4773 ("user_a".to_string(), "sup".to_string(), false),
4774 ("user_b".to_string(), "can you see this?".to_string(), false),
4775 ]
4776 })
4777 .await;
4778
4779 // Ensure client A and B can communicate normally after reconnection.
4780 channel_a
4781 .update(cx_a, |channel, cx| {
4782 channel.send_message("you online?".to_string(), cx).unwrap()
4783 })
4784 .await
4785 .unwrap();
4786 channel_b
4787 .condition(&cx_b, |channel, _| {
4788 channel_messages(channel)
4789 == [
4790 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4791 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4792 ("user_a".to_string(), "sup".to_string(), false),
4793 ("user_b".to_string(), "can you see this?".to_string(), false),
4794 ("user_a".to_string(), "you online?".to_string(), false),
4795 ]
4796 })
4797 .await;
4798
4799 channel_b
4800 .update(cx_b, |channel, cx| {
4801 channel.send_message("yep".to_string(), cx).unwrap()
4802 })
4803 .await
4804 .unwrap();
4805 channel_a
4806 .condition(&cx_a, |channel, _| {
4807 channel_messages(channel)
4808 == [
4809 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4810 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4811 ("user_a".to_string(), "sup".to_string(), false),
4812 ("user_b".to_string(), "can you see this?".to_string(), false),
4813 ("user_a".to_string(), "you online?".to_string(), false),
4814 ("user_b".to_string(), "yep".to_string(), false),
4815 ]
4816 })
4817 .await;
4818 }
4819
4820 #[gpui::test(iterations = 10)]
4821 async fn test_contacts(
4822 deterministic: Arc<Deterministic>,
4823 cx_a: &mut TestAppContext,
4824 cx_b: &mut TestAppContext,
4825 cx_c: &mut TestAppContext,
4826 ) {
4827 cx_a.foreground().forbid_parking();
4828
4829 // Connect to a server as 3 clients.
4830 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4831 let mut client_a = server.create_client(cx_a, "user_a").await;
4832 let mut client_b = server.create_client(cx_b, "user_b").await;
4833 let client_c = server.create_client(cx_c, "user_c").await;
4834 server
4835 .make_contacts(vec![
4836 (&client_a, cx_a),
4837 (&client_b, cx_b),
4838 (&client_c, cx_c),
4839 ])
4840 .await;
4841
4842 deterministic.run_until_parked();
4843 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
4844 client.user_store.read_with(*cx, |store, _| {
4845 assert_eq!(
4846 contacts(store),
4847 [
4848 ("user_a", true, vec![]),
4849 ("user_b", true, vec![]),
4850 ("user_c", true, vec![])
4851 ]
4852 )
4853 });
4854 }
4855
4856 // Share a project as client A.
4857 let fs = FakeFs::new(cx_a.background());
4858 fs.create_dir(Path::new("/a")).await.unwrap();
4859 let (project_a, _) = client_a.build_local_project(fs, "/a", cx_a).await;
4860
4861 deterministic.run_until_parked();
4862 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
4863 client.user_store.read_with(*cx, |store, _| {
4864 assert_eq!(
4865 contacts(store),
4866 [
4867 ("user_a", true, vec![("a", vec![])]),
4868 ("user_b", true, vec![]),
4869 ("user_c", true, vec![])
4870 ]
4871 )
4872 });
4873 }
4874
4875 let _project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4876
4877 deterministic.run_until_parked();
4878 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
4879 client.user_store.read_with(*cx, |store, _| {
4880 assert_eq!(
4881 contacts(store),
4882 [
4883 ("user_a", true, vec![("a", vec!["user_b"])]),
4884 ("user_b", true, vec![]),
4885 ("user_c", true, vec![])
4886 ]
4887 )
4888 });
4889 }
4890
4891 // Add a local project as client B
4892 let fs = FakeFs::new(cx_b.background());
4893 fs.create_dir(Path::new("/b")).await.unwrap();
4894 let (_project_b, _) = client_b.build_local_project(fs, "/b", cx_a).await;
4895
4896 deterministic.run_until_parked();
4897 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
4898 client.user_store.read_with(*cx, |store, _| {
4899 assert_eq!(
4900 contacts(store),
4901 [
4902 ("user_a", true, vec![("a", vec!["user_b"])]),
4903 ("user_b", true, vec![("b", vec![])]),
4904 ("user_c", true, vec![])
4905 ]
4906 )
4907 });
4908 }
4909
4910 project_a
4911 .condition(&cx_a, |project, _| {
4912 project.collaborators().contains_key(&client_b.peer_id)
4913 })
4914 .await;
4915
4916 client_a.project.take();
4917 cx_a.update(move |_| drop(project_a));
4918 deterministic.run_until_parked();
4919 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
4920 client.user_store.read_with(*cx, |store, _| {
4921 assert_eq!(
4922 contacts(store),
4923 [
4924 ("user_a", true, vec![]),
4925 ("user_b", true, vec![("b", vec![])]),
4926 ("user_c", true, vec![])
4927 ]
4928 )
4929 });
4930 }
4931
4932 server.disconnect_client(client_c.current_user_id(cx_c));
4933 server.forbid_connections();
4934 deterministic.advance_clock(rpc::RECEIVE_TIMEOUT);
4935 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b)] {
4936 client.user_store.read_with(*cx, |store, _| {
4937 assert_eq!(
4938 contacts(store),
4939 [
4940 ("user_a", true, vec![]),
4941 ("user_b", true, vec![("b", vec![])]),
4942 ("user_c", false, vec![])
4943 ]
4944 )
4945 });
4946 }
4947 client_c
4948 .user_store
4949 .read_with(cx_c, |store, _| assert_eq!(contacts(store), []));
4950
4951 server.allow_connections();
4952 client_c
4953 .authenticate_and_connect(false, &cx_c.to_async())
4954 .await
4955 .unwrap();
4956
4957 deterministic.run_until_parked();
4958 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
4959 client.user_store.read_with(*cx, |store, _| {
4960 assert_eq!(
4961 contacts(store),
4962 [
4963 ("user_a", true, vec![]),
4964 ("user_b", true, vec![("b", vec![])]),
4965 ("user_c", true, vec![])
4966 ]
4967 )
4968 });
4969 }
4970
4971 fn contacts(user_store: &UserStore) -> Vec<(&str, bool, Vec<(&str, Vec<&str>)>)> {
4972 user_store
4973 .contacts()
4974 .iter()
4975 .map(|contact| {
4976 let projects = contact
4977 .projects
4978 .iter()
4979 .map(|p| {
4980 (
4981 p.worktree_root_names[0].as_str(),
4982 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4983 )
4984 })
4985 .collect();
4986 (contact.user.github_login.as_str(), contact.online, projects)
4987 })
4988 .collect()
4989 }
4990 }
4991
4992 #[gpui::test(iterations = 10)]
4993 async fn test_contact_requests(
4994 executor: Arc<Deterministic>,
4995 cx_a: &mut TestAppContext,
4996 cx_a2: &mut TestAppContext,
4997 cx_b: &mut TestAppContext,
4998 cx_b2: &mut TestAppContext,
4999 cx_c: &mut TestAppContext,
5000 cx_c2: &mut TestAppContext,
5001 ) {
5002 cx_a.foreground().forbid_parking();
5003
5004 // Connect to a server as 3 clients.
5005 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5006 let client_a = server.create_client(cx_a, "user_a").await;
5007 let client_a2 = server.create_client(cx_a2, "user_a").await;
5008 let client_b = server.create_client(cx_b, "user_b").await;
5009 let client_b2 = server.create_client(cx_b2, "user_b").await;
5010 let client_c = server.create_client(cx_c, "user_c").await;
5011 let client_c2 = server.create_client(cx_c2, "user_c").await;
5012
5013 assert_eq!(client_a.user_id().unwrap(), client_a2.user_id().unwrap());
5014 assert_eq!(client_b.user_id().unwrap(), client_b2.user_id().unwrap());
5015 assert_eq!(client_c.user_id().unwrap(), client_c2.user_id().unwrap());
5016
5017 // User A and User C request that user B become their contact.
5018 client_a
5019 .user_store
5020 .update(cx_a, |store, cx| {
5021 store.request_contact(client_b.user_id().unwrap(), cx)
5022 })
5023 .await
5024 .unwrap();
5025 client_c
5026 .user_store
5027 .update(cx_c, |store, cx| {
5028 store.request_contact(client_b.user_id().unwrap(), cx)
5029 })
5030 .await
5031 .unwrap();
5032 executor.run_until_parked();
5033
5034 // All users see the pending request appear in all their clients.
5035 assert_eq!(
5036 client_a.summarize_contacts(&cx_a).outgoing_requests,
5037 &["user_b"]
5038 );
5039 assert_eq!(
5040 client_a2.summarize_contacts(&cx_a2).outgoing_requests,
5041 &["user_b"]
5042 );
5043 assert_eq!(
5044 client_b.summarize_contacts(&cx_b).incoming_requests,
5045 &["user_a", "user_c"]
5046 );
5047 assert_eq!(
5048 client_b2.summarize_contacts(&cx_b2).incoming_requests,
5049 &["user_a", "user_c"]
5050 );
5051 assert_eq!(
5052 client_c.summarize_contacts(&cx_c).outgoing_requests,
5053 &["user_b"]
5054 );
5055 assert_eq!(
5056 client_c2.summarize_contacts(&cx_c2).outgoing_requests,
5057 &["user_b"]
5058 );
5059
5060 // Contact requests are present upon connecting (tested here via disconnect/reconnect)
5061 disconnect_and_reconnect(&client_a, cx_a).await;
5062 disconnect_and_reconnect(&client_b, cx_b).await;
5063 disconnect_and_reconnect(&client_c, cx_c).await;
5064 executor.run_until_parked();
5065 assert_eq!(
5066 client_a.summarize_contacts(&cx_a).outgoing_requests,
5067 &["user_b"]
5068 );
5069 assert_eq!(
5070 client_b.summarize_contacts(&cx_b).incoming_requests,
5071 &["user_a", "user_c"]
5072 );
5073 assert_eq!(
5074 client_c.summarize_contacts(&cx_c).outgoing_requests,
5075 &["user_b"]
5076 );
5077
5078 // User B accepts the request from user A.
5079 client_b
5080 .user_store
5081 .update(cx_b, |store, cx| {
5082 store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
5083 })
5084 .await
5085 .unwrap();
5086
5087 executor.run_until_parked();
5088
5089 // User B sees user A as their contact now in all client, and the incoming request from them is removed.
5090 let contacts_b = client_b.summarize_contacts(&cx_b);
5091 assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5092 assert_eq!(contacts_b.incoming_requests, &["user_c"]);
5093 let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5094 assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5095 assert_eq!(contacts_b2.incoming_requests, &["user_c"]);
5096
5097 // User A sees user B as their contact now in all clients, and the outgoing request to them is removed.
5098 let contacts_a = client_a.summarize_contacts(&cx_a);
5099 assert_eq!(contacts_a.current, &["user_a", "user_b"]);
5100 assert!(contacts_a.outgoing_requests.is_empty());
5101 let contacts_a2 = client_a2.summarize_contacts(&cx_a2);
5102 assert_eq!(contacts_a2.current, &["user_a", "user_b"]);
5103 assert!(contacts_a2.outgoing_requests.is_empty());
5104
5105 // Contacts are present upon connecting (tested here via disconnect/reconnect)
5106 disconnect_and_reconnect(&client_a, cx_a).await;
5107 disconnect_and_reconnect(&client_b, cx_b).await;
5108 disconnect_and_reconnect(&client_c, cx_c).await;
5109 executor.run_until_parked();
5110 assert_eq!(
5111 client_a.summarize_contacts(&cx_a).current,
5112 &["user_a", "user_b"]
5113 );
5114 assert_eq!(
5115 client_b.summarize_contacts(&cx_b).current,
5116 &["user_a", "user_b"]
5117 );
5118 assert_eq!(
5119 client_b.summarize_contacts(&cx_b).incoming_requests,
5120 &["user_c"]
5121 );
5122 assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5123 assert_eq!(
5124 client_c.summarize_contacts(&cx_c).outgoing_requests,
5125 &["user_b"]
5126 );
5127
5128 // User B rejects the request from user C.
5129 client_b
5130 .user_store
5131 .update(cx_b, |store, cx| {
5132 store.respond_to_contact_request(client_c.user_id().unwrap(), false, cx)
5133 })
5134 .await
5135 .unwrap();
5136
5137 executor.run_until_parked();
5138
5139 // User B doesn't see user C as their contact, and the incoming request from them is removed.
5140 let contacts_b = client_b.summarize_contacts(&cx_b);
5141 assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5142 assert!(contacts_b.incoming_requests.is_empty());
5143 let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5144 assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5145 assert!(contacts_b2.incoming_requests.is_empty());
5146
5147 // User C doesn't see user B as their contact, and the outgoing request to them is removed.
5148 let contacts_c = client_c.summarize_contacts(&cx_c);
5149 assert_eq!(contacts_c.current, &["user_c"]);
5150 assert!(contacts_c.outgoing_requests.is_empty());
5151 let contacts_c2 = client_c2.summarize_contacts(&cx_c2);
5152 assert_eq!(contacts_c2.current, &["user_c"]);
5153 assert!(contacts_c2.outgoing_requests.is_empty());
5154
5155 // Incoming/outgoing requests are not present upon connecting (tested here via disconnect/reconnect)
5156 disconnect_and_reconnect(&client_a, cx_a).await;
5157 disconnect_and_reconnect(&client_b, cx_b).await;
5158 disconnect_and_reconnect(&client_c, cx_c).await;
5159 executor.run_until_parked();
5160 assert_eq!(
5161 client_a.summarize_contacts(&cx_a).current,
5162 &["user_a", "user_b"]
5163 );
5164 assert_eq!(
5165 client_b.summarize_contacts(&cx_b).current,
5166 &["user_a", "user_b"]
5167 );
5168 assert!(client_b
5169 .summarize_contacts(&cx_b)
5170 .incoming_requests
5171 .is_empty());
5172 assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5173 assert!(client_c
5174 .summarize_contacts(&cx_c)
5175 .outgoing_requests
5176 .is_empty());
5177
5178 async fn disconnect_and_reconnect(client: &TestClient, cx: &mut TestAppContext) {
5179 client.disconnect(&cx.to_async()).unwrap();
5180 client.clear_contacts(cx).await;
5181 client
5182 .authenticate_and_connect(false, &cx.to_async())
5183 .await
5184 .unwrap();
5185 }
5186 }
5187
5188 #[gpui::test(iterations = 10)]
5189 async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5190 cx_a.foreground().forbid_parking();
5191 let fs = FakeFs::new(cx_a.background());
5192
5193 // 2 clients connect to a server.
5194 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5195 let mut client_a = server.create_client(cx_a, "user_a").await;
5196 let mut client_b = server.create_client(cx_b, "user_b").await;
5197 server
5198 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5199 .await;
5200 cx_a.update(editor::init);
5201 cx_b.update(editor::init);
5202
5203 // Client A shares a project.
5204 fs.insert_tree(
5205 "/a",
5206 json!({
5207 "1.txt": "one",
5208 "2.txt": "two",
5209 "3.txt": "three",
5210 }),
5211 )
5212 .await;
5213 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5214
5215 // Client B joins the project.
5216 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5217
5218 // Client A opens some editors.
5219 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5220 let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5221 let editor_a1 = workspace_a
5222 .update(cx_a, |workspace, cx| {
5223 workspace.open_path((worktree_id, "1.txt"), true, cx)
5224 })
5225 .await
5226 .unwrap()
5227 .downcast::<Editor>()
5228 .unwrap();
5229 let editor_a2 = workspace_a
5230 .update(cx_a, |workspace, cx| {
5231 workspace.open_path((worktree_id, "2.txt"), true, cx)
5232 })
5233 .await
5234 .unwrap()
5235 .downcast::<Editor>()
5236 .unwrap();
5237
5238 // Client B opens an editor.
5239 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5240 let editor_b1 = workspace_b
5241 .update(cx_b, |workspace, cx| {
5242 workspace.open_path((worktree_id, "1.txt"), true, cx)
5243 })
5244 .await
5245 .unwrap()
5246 .downcast::<Editor>()
5247 .unwrap();
5248
5249 let client_a_id = project_b.read_with(cx_b, |project, _| {
5250 project.collaborators().values().next().unwrap().peer_id
5251 });
5252 let client_b_id = project_a.read_with(cx_a, |project, _| {
5253 project.collaborators().values().next().unwrap().peer_id
5254 });
5255
5256 // When client B starts following client A, all visible view states are replicated to client B.
5257 editor_a1.update(cx_a, |editor, cx| editor.select_ranges([0..1], None, cx));
5258 editor_a2.update(cx_a, |editor, cx| editor.select_ranges([2..3], None, cx));
5259 workspace_b
5260 .update(cx_b, |workspace, cx| {
5261 workspace
5262 .toggle_follow(&ToggleFollow(client_a_id), cx)
5263 .unwrap()
5264 })
5265 .await
5266 .unwrap();
5267
5268 let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5269 workspace
5270 .active_item(cx)
5271 .unwrap()
5272 .downcast::<Editor>()
5273 .unwrap()
5274 });
5275 assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
5276 assert_eq!(
5277 editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
5278 Some((worktree_id, "2.txt").into())
5279 );
5280 assert_eq!(
5281 editor_b2.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
5282 vec![2..3]
5283 );
5284 assert_eq!(
5285 editor_b1.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
5286 vec![0..1]
5287 );
5288
5289 // When client A activates a different editor, client B does so as well.
5290 workspace_a.update(cx_a, |workspace, cx| {
5291 workspace.activate_item(&editor_a1, cx)
5292 });
5293 workspace_b
5294 .condition(cx_b, |workspace, cx| {
5295 workspace.active_item(cx).unwrap().id() == editor_b1.id()
5296 })
5297 .await;
5298
5299 // When client A navigates back and forth, client B does so as well.
5300 workspace_a
5301 .update(cx_a, |workspace, cx| {
5302 workspace::Pane::go_back(workspace, None, cx)
5303 })
5304 .await;
5305 workspace_b
5306 .condition(cx_b, |workspace, cx| {
5307 workspace.active_item(cx).unwrap().id() == editor_b2.id()
5308 })
5309 .await;
5310
5311 workspace_a
5312 .update(cx_a, |workspace, cx| {
5313 workspace::Pane::go_forward(workspace, None, cx)
5314 })
5315 .await;
5316 workspace_b
5317 .condition(cx_b, |workspace, cx| {
5318 workspace.active_item(cx).unwrap().id() == editor_b1.id()
5319 })
5320 .await;
5321
5322 // Changes to client A's editor are reflected on client B.
5323 editor_a1.update(cx_a, |editor, cx| {
5324 editor.select_ranges([1..1, 2..2], None, cx);
5325 });
5326 editor_b1
5327 .condition(cx_b, |editor, cx| {
5328 editor.selected_ranges(cx) == vec![1..1, 2..2]
5329 })
5330 .await;
5331
5332 editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
5333 editor_b1
5334 .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
5335 .await;
5336
5337 editor_a1.update(cx_a, |editor, cx| {
5338 editor.select_ranges([3..3], None, cx);
5339 editor.set_scroll_position(vec2f(0., 100.), cx);
5340 });
5341 editor_b1
5342 .condition(cx_b, |editor, cx| editor.selected_ranges(cx) == vec![3..3])
5343 .await;
5344
5345 // After unfollowing, client B stops receiving updates from client A.
5346 workspace_b.update(cx_b, |workspace, cx| {
5347 workspace.unfollow(&workspace.active_pane().clone(), cx)
5348 });
5349 workspace_a.update(cx_a, |workspace, cx| {
5350 workspace.activate_item(&editor_a2, cx)
5351 });
5352 cx_a.foreground().run_until_parked();
5353 assert_eq!(
5354 workspace_b.read_with(cx_b, |workspace, cx| workspace
5355 .active_item(cx)
5356 .unwrap()
5357 .id()),
5358 editor_b1.id()
5359 );
5360
5361 // Client A starts following client B.
5362 workspace_a
5363 .update(cx_a, |workspace, cx| {
5364 workspace
5365 .toggle_follow(&ToggleFollow(client_b_id), cx)
5366 .unwrap()
5367 })
5368 .await
5369 .unwrap();
5370 assert_eq!(
5371 workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5372 Some(client_b_id)
5373 );
5374 assert_eq!(
5375 workspace_a.read_with(cx_a, |workspace, cx| workspace
5376 .active_item(cx)
5377 .unwrap()
5378 .id()),
5379 editor_a1.id()
5380 );
5381
5382 // Following interrupts when client B disconnects.
5383 client_b.disconnect(&cx_b.to_async()).unwrap();
5384 cx_a.foreground().run_until_parked();
5385 assert_eq!(
5386 workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5387 None
5388 );
5389 }
5390
5391 #[gpui::test(iterations = 10)]
5392 async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5393 cx_a.foreground().forbid_parking();
5394 let fs = FakeFs::new(cx_a.background());
5395
5396 // 2 clients connect to a server.
5397 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5398 let mut client_a = server.create_client(cx_a, "user_a").await;
5399 let mut client_b = server.create_client(cx_b, "user_b").await;
5400 server
5401 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5402 .await;
5403 cx_a.update(editor::init);
5404 cx_b.update(editor::init);
5405
5406 // Client A shares a project.
5407 fs.insert_tree(
5408 "/a",
5409 json!({
5410 "1.txt": "one",
5411 "2.txt": "two",
5412 "3.txt": "three",
5413 "4.txt": "four",
5414 }),
5415 )
5416 .await;
5417 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5418
5419 // Client B joins the project.
5420 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5421
5422 // Client A opens some editors.
5423 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5424 let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5425 let _editor_a1 = workspace_a
5426 .update(cx_a, |workspace, cx| {
5427 workspace.open_path((worktree_id, "1.txt"), true, cx)
5428 })
5429 .await
5430 .unwrap()
5431 .downcast::<Editor>()
5432 .unwrap();
5433
5434 // Client B opens an editor.
5435 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5436 let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5437 let _editor_b1 = workspace_b
5438 .update(cx_b, |workspace, cx| {
5439 workspace.open_path((worktree_id, "2.txt"), true, cx)
5440 })
5441 .await
5442 .unwrap()
5443 .downcast::<Editor>()
5444 .unwrap();
5445
5446 // Clients A and B follow each other in split panes
5447 workspace_a
5448 .update(cx_a, |workspace, cx| {
5449 workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5450 assert_ne!(*workspace.active_pane(), pane_a1);
5451 let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
5452 workspace
5453 .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5454 .unwrap()
5455 })
5456 .await
5457 .unwrap();
5458 workspace_b
5459 .update(cx_b, |workspace, cx| {
5460 workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5461 assert_ne!(*workspace.active_pane(), pane_b1);
5462 let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
5463 workspace
5464 .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5465 .unwrap()
5466 })
5467 .await
5468 .unwrap();
5469
5470 workspace_a
5471 .update(cx_a, |workspace, cx| {
5472 workspace.activate_next_pane(cx);
5473 assert_eq!(*workspace.active_pane(), pane_a1);
5474 workspace.open_path((worktree_id, "3.txt"), true, cx)
5475 })
5476 .await
5477 .unwrap();
5478 workspace_b
5479 .update(cx_b, |workspace, cx| {
5480 workspace.activate_next_pane(cx);
5481 assert_eq!(*workspace.active_pane(), pane_b1);
5482 workspace.open_path((worktree_id, "4.txt"), true, cx)
5483 })
5484 .await
5485 .unwrap();
5486 cx_a.foreground().run_until_parked();
5487
5488 // Ensure leader updates don't change the active pane of followers
5489 workspace_a.read_with(cx_a, |workspace, _| {
5490 assert_eq!(*workspace.active_pane(), pane_a1);
5491 });
5492 workspace_b.read_with(cx_b, |workspace, _| {
5493 assert_eq!(*workspace.active_pane(), pane_b1);
5494 });
5495
5496 // Ensure peers following each other doesn't cause an infinite loop.
5497 assert_eq!(
5498 workspace_a.read_with(cx_a, |workspace, cx| workspace
5499 .active_item(cx)
5500 .unwrap()
5501 .project_path(cx)),
5502 Some((worktree_id, "3.txt").into())
5503 );
5504 workspace_a.update(cx_a, |workspace, cx| {
5505 assert_eq!(
5506 workspace.active_item(cx).unwrap().project_path(cx),
5507 Some((worktree_id, "3.txt").into())
5508 );
5509 workspace.activate_next_pane(cx);
5510 assert_eq!(
5511 workspace.active_item(cx).unwrap().project_path(cx),
5512 Some((worktree_id, "4.txt").into())
5513 );
5514 });
5515 workspace_b.update(cx_b, |workspace, cx| {
5516 assert_eq!(
5517 workspace.active_item(cx).unwrap().project_path(cx),
5518 Some((worktree_id, "4.txt").into())
5519 );
5520 workspace.activate_next_pane(cx);
5521 assert_eq!(
5522 workspace.active_item(cx).unwrap().project_path(cx),
5523 Some((worktree_id, "3.txt").into())
5524 );
5525 });
5526 }
5527
5528 #[gpui::test(iterations = 10)]
5529 async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5530 cx_a.foreground().forbid_parking();
5531 let fs = FakeFs::new(cx_a.background());
5532
5533 // 2 clients connect to a server.
5534 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5535 let mut client_a = server.create_client(cx_a, "user_a").await;
5536 let mut client_b = server.create_client(cx_b, "user_b").await;
5537 server
5538 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5539 .await;
5540 cx_a.update(editor::init);
5541 cx_b.update(editor::init);
5542
5543 // Client A shares a project.
5544 fs.insert_tree(
5545 "/a",
5546 json!({
5547 "1.txt": "one",
5548 "2.txt": "two",
5549 "3.txt": "three",
5550 }),
5551 )
5552 .await;
5553 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5554
5555 // Client B joins the project.
5556 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5557
5558 // Client A opens some editors.
5559 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5560 let _editor_a1 = workspace_a
5561 .update(cx_a, |workspace, cx| {
5562 workspace.open_path((worktree_id, "1.txt"), true, cx)
5563 })
5564 .await
5565 .unwrap()
5566 .downcast::<Editor>()
5567 .unwrap();
5568
5569 // Client B starts following client A.
5570 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5571 let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5572 let leader_id = project_b.read_with(cx_b, |project, _| {
5573 project.collaborators().values().next().unwrap().peer_id
5574 });
5575 workspace_b
5576 .update(cx_b, |workspace, cx| {
5577 workspace
5578 .toggle_follow(&ToggleFollow(leader_id), cx)
5579 .unwrap()
5580 })
5581 .await
5582 .unwrap();
5583 assert_eq!(
5584 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5585 Some(leader_id)
5586 );
5587 let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5588 workspace
5589 .active_item(cx)
5590 .unwrap()
5591 .downcast::<Editor>()
5592 .unwrap()
5593 });
5594
5595 // When client B moves, it automatically stops following client A.
5596 editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
5597 assert_eq!(
5598 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5599 None
5600 );
5601
5602 workspace_b
5603 .update(cx_b, |workspace, cx| {
5604 workspace
5605 .toggle_follow(&ToggleFollow(leader_id), cx)
5606 .unwrap()
5607 })
5608 .await
5609 .unwrap();
5610 assert_eq!(
5611 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5612 Some(leader_id)
5613 );
5614
5615 // When client B edits, it automatically stops following client A.
5616 editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
5617 assert_eq!(
5618 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5619 None
5620 );
5621
5622 workspace_b
5623 .update(cx_b, |workspace, cx| {
5624 workspace
5625 .toggle_follow(&ToggleFollow(leader_id), cx)
5626 .unwrap()
5627 })
5628 .await
5629 .unwrap();
5630 assert_eq!(
5631 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5632 Some(leader_id)
5633 );
5634
5635 // When client B scrolls, it automatically stops following client A.
5636 editor_b2.update(cx_b, |editor, cx| {
5637 editor.set_scroll_position(vec2f(0., 3.), cx)
5638 });
5639 assert_eq!(
5640 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5641 None
5642 );
5643
5644 workspace_b
5645 .update(cx_b, |workspace, cx| {
5646 workspace
5647 .toggle_follow(&ToggleFollow(leader_id), cx)
5648 .unwrap()
5649 })
5650 .await
5651 .unwrap();
5652 assert_eq!(
5653 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5654 Some(leader_id)
5655 );
5656
5657 // When client B activates a different pane, it continues following client A in the original pane.
5658 workspace_b.update(cx_b, |workspace, cx| {
5659 workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
5660 });
5661 assert_eq!(
5662 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5663 Some(leader_id)
5664 );
5665
5666 workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
5667 assert_eq!(
5668 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5669 Some(leader_id)
5670 );
5671
5672 // When client B activates a different item in the original pane, it automatically stops following client A.
5673 workspace_b
5674 .update(cx_b, |workspace, cx| {
5675 workspace.open_path((worktree_id, "2.txt"), true, cx)
5676 })
5677 .await
5678 .unwrap();
5679 assert_eq!(
5680 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5681 None
5682 );
5683 }
5684
5685 #[gpui::test(iterations = 100)]
5686 async fn test_random_collaboration(
5687 cx: &mut TestAppContext,
5688 deterministic: Arc<Deterministic>,
5689 rng: StdRng,
5690 ) {
5691 cx.foreground().forbid_parking();
5692 let max_peers = env::var("MAX_PEERS")
5693 .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
5694 .unwrap_or(5);
5695 assert!(max_peers <= 5);
5696
5697 let max_operations = env::var("OPERATIONS")
5698 .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
5699 .unwrap_or(10);
5700
5701 let rng = Arc::new(Mutex::new(rng));
5702
5703 let guest_lang_registry = Arc::new(LanguageRegistry::test());
5704 let host_language_registry = Arc::new(LanguageRegistry::test());
5705
5706 let fs = FakeFs::new(cx.background());
5707 fs.insert_tree("/_collab", json!({"init": ""})).await;
5708
5709 let mut server = TestServer::start(cx.foreground(), cx.background()).await;
5710 let db = server.app_state.db.clone();
5711 let host_user_id = db.create_user("host", false).await.unwrap();
5712 for username in ["guest-1", "guest-2", "guest-3", "guest-4"] {
5713 let guest_user_id = db.create_user(username, false).await.unwrap();
5714 server
5715 .app_state
5716 .db
5717 .send_contact_request(guest_user_id, host_user_id)
5718 .await
5719 .unwrap();
5720 server
5721 .app_state
5722 .db
5723 .respond_to_contact_request(host_user_id, guest_user_id, true)
5724 .await
5725 .unwrap();
5726 }
5727
5728 let mut clients = Vec::new();
5729 let mut user_ids = Vec::new();
5730 let mut op_start_signals = Vec::new();
5731
5732 let mut next_entity_id = 100000;
5733 let mut host_cx = TestAppContext::new(
5734 cx.foreground_platform(),
5735 cx.platform(),
5736 deterministic.build_foreground(next_entity_id),
5737 deterministic.build_background(),
5738 cx.font_cache(),
5739 cx.leak_detector(),
5740 next_entity_id,
5741 );
5742 let host = server.create_client(&mut host_cx, "host").await;
5743 let host_project = host_cx.update(|cx| {
5744 Project::local(
5745 host.client.clone(),
5746 host.user_store.clone(),
5747 host_language_registry.clone(),
5748 fs.clone(),
5749 cx,
5750 )
5751 });
5752 let host_project_id = host_project
5753 .update(&mut host_cx, |p, _| p.next_remote_id())
5754 .await;
5755
5756 let (collab_worktree, _) = host_project
5757 .update(&mut host_cx, |project, cx| {
5758 project.find_or_create_local_worktree("/_collab", true, cx)
5759 })
5760 .await
5761 .unwrap();
5762 collab_worktree
5763 .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
5764 .await;
5765
5766 // Set up fake language servers.
5767 let mut language = Language::new(
5768 LanguageConfig {
5769 name: "Rust".into(),
5770 path_suffixes: vec!["rs".to_string()],
5771 ..Default::default()
5772 },
5773 None,
5774 );
5775 let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
5776 name: "the-fake-language-server",
5777 capabilities: lsp::LanguageServer::full_capabilities(),
5778 initializer: Some(Box::new({
5779 let rng = rng.clone();
5780 let fs = fs.clone();
5781 let project = host_project.downgrade();
5782 move |fake_server: &mut FakeLanguageServer| {
5783 fake_server.handle_request::<lsp::request::Completion, _, _>(
5784 |_, _| async move {
5785 Ok(Some(lsp::CompletionResponse::Array(vec![
5786 lsp::CompletionItem {
5787 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
5788 range: lsp::Range::new(
5789 lsp::Position::new(0, 0),
5790 lsp::Position::new(0, 0),
5791 ),
5792 new_text: "the-new-text".to_string(),
5793 })),
5794 ..Default::default()
5795 },
5796 ])))
5797 },
5798 );
5799
5800 fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
5801 |_, _| async move {
5802 Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
5803 lsp::CodeAction {
5804 title: "the-code-action".to_string(),
5805 ..Default::default()
5806 },
5807 )]))
5808 },
5809 );
5810
5811 fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
5812 |params, _| async move {
5813 Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
5814 params.position,
5815 params.position,
5816 ))))
5817 },
5818 );
5819
5820 fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
5821 let fs = fs.clone();
5822 let rng = rng.clone();
5823 move |_, _| {
5824 let fs = fs.clone();
5825 let rng = rng.clone();
5826 async move {
5827 let files = fs.files().await;
5828 let mut rng = rng.lock();
5829 let count = rng.gen_range::<usize, _>(1..3);
5830 let files = (0..count)
5831 .map(|_| files.choose(&mut *rng).unwrap())
5832 .collect::<Vec<_>>();
5833 log::info!("LSP: Returning definitions in files {:?}", &files);
5834 Ok(Some(lsp::GotoDefinitionResponse::Array(
5835 files
5836 .into_iter()
5837 .map(|file| lsp::Location {
5838 uri: lsp::Url::from_file_path(file).unwrap(),
5839 range: Default::default(),
5840 })
5841 .collect(),
5842 )))
5843 }
5844 }
5845 });
5846
5847 fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
5848 let rng = rng.clone();
5849 let project = project.clone();
5850 move |params, mut cx| {
5851 let highlights = if let Some(project) = project.upgrade(&cx) {
5852 project.update(&mut cx, |project, cx| {
5853 let path = params
5854 .text_document_position_params
5855 .text_document
5856 .uri
5857 .to_file_path()
5858 .unwrap();
5859 let (worktree, relative_path) =
5860 project.find_local_worktree(&path, cx)?;
5861 let project_path =
5862 ProjectPath::from((worktree.read(cx).id(), relative_path));
5863 let buffer =
5864 project.get_open_buffer(&project_path, cx)?.read(cx);
5865
5866 let mut highlights = Vec::new();
5867 let highlight_count = rng.lock().gen_range(1..=5);
5868 let mut prev_end = 0;
5869 for _ in 0..highlight_count {
5870 let range =
5871 buffer.random_byte_range(prev_end, &mut *rng.lock());
5872
5873 highlights.push(lsp::DocumentHighlight {
5874 range: range_to_lsp(range.to_point_utf16(buffer)),
5875 kind: Some(lsp::DocumentHighlightKind::READ),
5876 });
5877 prev_end = range.end;
5878 }
5879 Some(highlights)
5880 })
5881 } else {
5882 None
5883 };
5884 async move { Ok(highlights) }
5885 }
5886 });
5887 }
5888 })),
5889 ..Default::default()
5890 });
5891 host_language_registry.add(Arc::new(language));
5892
5893 let op_start_signal = futures::channel::mpsc::unbounded();
5894 user_ids.push(host.current_user_id(&host_cx));
5895 op_start_signals.push(op_start_signal.0);
5896 clients.push(host_cx.foreground().spawn(host.simulate_host(
5897 host_project,
5898 op_start_signal.1,
5899 rng.clone(),
5900 host_cx,
5901 )));
5902
5903 let disconnect_host_at = if rng.lock().gen_bool(0.2) {
5904 rng.lock().gen_range(0..max_operations)
5905 } else {
5906 max_operations
5907 };
5908 let mut available_guests = vec![
5909 "guest-1".to_string(),
5910 "guest-2".to_string(),
5911 "guest-3".to_string(),
5912 "guest-4".to_string(),
5913 ];
5914 let mut operations = 0;
5915 while operations < max_operations {
5916 if operations == disconnect_host_at {
5917 server.disconnect_client(user_ids[0]);
5918 cx.foreground().advance_clock(RECEIVE_TIMEOUT);
5919 drop(op_start_signals);
5920 let mut clients = futures::future::join_all(clients).await;
5921 cx.foreground().run_until_parked();
5922
5923 let (host, mut host_cx, host_err) = clients.remove(0);
5924 if let Some(host_err) = host_err {
5925 log::error!("host error - {:?}", host_err);
5926 }
5927 host.project
5928 .as_ref()
5929 .unwrap()
5930 .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
5931 for (guest, mut guest_cx, guest_err) in clients {
5932 if let Some(guest_err) = guest_err {
5933 log::error!("{} error - {:?}", guest.username, guest_err);
5934 }
5935 // TODO
5936 // let contacts = server
5937 // .store
5938 // .read()
5939 // .await
5940 // .contacts_for_user(guest.current_user_id(&guest_cx));
5941 // assert!(!contacts
5942 // .iter()
5943 // .flat_map(|contact| &contact.projects)
5944 // .any(|project| project.id == host_project_id));
5945 guest
5946 .project
5947 .as_ref()
5948 .unwrap()
5949 .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
5950 guest_cx.update(|_| drop(guest));
5951 }
5952 host_cx.update(|_| drop(host));
5953
5954 return;
5955 }
5956
5957 let distribution = rng.lock().gen_range(0..100);
5958 match distribution {
5959 0..=19 if !available_guests.is_empty() => {
5960 let guest_ix = rng.lock().gen_range(0..available_guests.len());
5961 let guest_username = available_guests.remove(guest_ix);
5962 log::info!("Adding new connection for {}", guest_username);
5963 next_entity_id += 100000;
5964 let mut guest_cx = TestAppContext::new(
5965 cx.foreground_platform(),
5966 cx.platform(),
5967 deterministic.build_foreground(next_entity_id),
5968 deterministic.build_background(),
5969 cx.font_cache(),
5970 cx.leak_detector(),
5971 next_entity_id,
5972 );
5973 let guest = server.create_client(&mut guest_cx, &guest_username).await;
5974 let guest_project = Project::remote(
5975 host_project_id,
5976 guest.client.clone(),
5977 guest.user_store.clone(),
5978 guest_lang_registry.clone(),
5979 FakeFs::new(cx.background()),
5980 &mut guest_cx.to_async(),
5981 )
5982 .await
5983 .unwrap();
5984 let op_start_signal = futures::channel::mpsc::unbounded();
5985 user_ids.push(guest.current_user_id(&guest_cx));
5986 op_start_signals.push(op_start_signal.0);
5987 clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
5988 guest_username.clone(),
5989 guest_project,
5990 op_start_signal.1,
5991 rng.clone(),
5992 guest_cx,
5993 )));
5994
5995 log::info!("Added connection for {}", guest_username);
5996 operations += 1;
5997 }
5998 20..=29 if clients.len() > 1 => {
5999 let guest_ix = rng.lock().gen_range(1..clients.len());
6000 log::info!("Removing guest {}", user_ids[guest_ix]);
6001 let removed_guest_id = user_ids.remove(guest_ix);
6002 let guest = clients.remove(guest_ix);
6003 op_start_signals.remove(guest_ix);
6004 server.disconnect_client(removed_guest_id);
6005 cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6006 let (guest, mut guest_cx, guest_err) = guest.await;
6007 if let Some(guest_err) = guest_err {
6008 log::error!("{} error - {:?}", guest.username, guest_err);
6009 }
6010 guest
6011 .project
6012 .as_ref()
6013 .unwrap()
6014 .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6015 // TODO
6016 // for user_id in &user_ids {
6017 // for contact in server.store.read().await.contacts_for_user(*user_id) {
6018 // assert_ne!(
6019 // contact.user_id, removed_guest_id.0 as u64,
6020 // "removed guest is still a contact of another peer"
6021 // );
6022 // for project in contact.projects {
6023 // for project_guest_id in project.guests {
6024 // assert_ne!(
6025 // project_guest_id, removed_guest_id.0 as u64,
6026 // "removed guest appears as still participating on a project"
6027 // );
6028 // }
6029 // }
6030 // }
6031 // }
6032
6033 log::info!("{} removed", guest.username);
6034 available_guests.push(guest.username.clone());
6035 guest_cx.update(|_| drop(guest));
6036
6037 operations += 1;
6038 }
6039 _ => {
6040 while operations < max_operations && rng.lock().gen_bool(0.7) {
6041 op_start_signals
6042 .choose(&mut *rng.lock())
6043 .unwrap()
6044 .unbounded_send(())
6045 .unwrap();
6046 operations += 1;
6047 }
6048
6049 if rng.lock().gen_bool(0.8) {
6050 cx.foreground().run_until_parked();
6051 }
6052 }
6053 }
6054 }
6055
6056 drop(op_start_signals);
6057 let mut clients = futures::future::join_all(clients).await;
6058 cx.foreground().run_until_parked();
6059
6060 let (host_client, mut host_cx, host_err) = clients.remove(0);
6061 if let Some(host_err) = host_err {
6062 panic!("host error - {:?}", host_err);
6063 }
6064 let host_project = host_client.project.as_ref().unwrap();
6065 let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
6066 project
6067 .worktrees(cx)
6068 .map(|worktree| {
6069 let snapshot = worktree.read(cx).snapshot();
6070 (snapshot.id(), snapshot)
6071 })
6072 .collect::<BTreeMap<_, _>>()
6073 });
6074
6075 host_client
6076 .project
6077 .as_ref()
6078 .unwrap()
6079 .read_with(&host_cx, |project, cx| project.check_invariants(cx));
6080
6081 for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
6082 if let Some(guest_err) = guest_err {
6083 panic!("{} error - {:?}", guest_client.username, guest_err);
6084 }
6085 let worktree_snapshots =
6086 guest_client
6087 .project
6088 .as_ref()
6089 .unwrap()
6090 .read_with(&guest_cx, |project, cx| {
6091 project
6092 .worktrees(cx)
6093 .map(|worktree| {
6094 let worktree = worktree.read(cx);
6095 (worktree.id(), worktree.snapshot())
6096 })
6097 .collect::<BTreeMap<_, _>>()
6098 });
6099
6100 assert_eq!(
6101 worktree_snapshots.keys().collect::<Vec<_>>(),
6102 host_worktree_snapshots.keys().collect::<Vec<_>>(),
6103 "{} has different worktrees than the host",
6104 guest_client.username
6105 );
6106 for (id, host_snapshot) in &host_worktree_snapshots {
6107 let guest_snapshot = &worktree_snapshots[id];
6108 assert_eq!(
6109 guest_snapshot.root_name(),
6110 host_snapshot.root_name(),
6111 "{} has different root name than the host for worktree {}",
6112 guest_client.username,
6113 id
6114 );
6115 assert_eq!(
6116 guest_snapshot.entries(false).collect::<Vec<_>>(),
6117 host_snapshot.entries(false).collect::<Vec<_>>(),
6118 "{} has different snapshot than the host for worktree {}",
6119 guest_client.username,
6120 id
6121 );
6122 assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id());
6123 }
6124
6125 guest_client
6126 .project
6127 .as_ref()
6128 .unwrap()
6129 .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
6130
6131 for guest_buffer in &guest_client.buffers {
6132 let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
6133 let host_buffer = host_project.read_with(&host_cx, |project, cx| {
6134 project.buffer_for_id(buffer_id, cx).expect(&format!(
6135 "host does not have buffer for guest:{}, peer:{}, id:{}",
6136 guest_client.username, guest_client.peer_id, buffer_id
6137 ))
6138 });
6139 let path = host_buffer
6140 .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
6141
6142 assert_eq!(
6143 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
6144 0,
6145 "{}, buffer {}, path {:?} has deferred operations",
6146 guest_client.username,
6147 buffer_id,
6148 path,
6149 );
6150 assert_eq!(
6151 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
6152 host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
6153 "{}, buffer {}, path {:?}, differs from the host's buffer",
6154 guest_client.username,
6155 buffer_id,
6156 path
6157 );
6158 }
6159
6160 guest_cx.update(|_| drop(guest_client));
6161 }
6162
6163 host_cx.update(|_| drop(host_client));
6164 }
6165
6166 struct TestServer {
6167 peer: Arc<Peer>,
6168 app_state: Arc<AppState>,
6169 server: Arc<Server>,
6170 foreground: Rc<executor::Foreground>,
6171 notifications: mpsc::UnboundedReceiver<()>,
6172 connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
6173 forbid_connections: Arc<AtomicBool>,
6174 _test_db: TestDb,
6175 }
6176
6177 impl TestServer {
6178 async fn start(
6179 foreground: Rc<executor::Foreground>,
6180 background: Arc<executor::Background>,
6181 ) -> Self {
6182 let test_db = TestDb::fake(background);
6183 let app_state = Self::build_app_state(&test_db).await;
6184 let peer = Peer::new();
6185 let notifications = mpsc::unbounded();
6186 let server = Server::new(app_state.clone(), Some(notifications.0));
6187 Self {
6188 peer,
6189 app_state,
6190 server,
6191 foreground,
6192 notifications: notifications.1,
6193 connection_killers: Default::default(),
6194 forbid_connections: Default::default(),
6195 _test_db: test_db,
6196 }
6197 }
6198
6199 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
6200 cx.update(|cx| {
6201 let settings = Settings::test(cx);
6202 cx.set_global(settings);
6203 });
6204
6205 let http = FakeHttpClient::with_404_response();
6206 let user_id =
6207 if let Ok(Some(user)) = self.app_state.db.get_user_by_github_login(name).await {
6208 user.id
6209 } else {
6210 self.app_state.db.create_user(name, false).await.unwrap()
6211 };
6212 let client_name = name.to_string();
6213 let mut client = Client::new(http.clone());
6214 let server = self.server.clone();
6215 let connection_killers = self.connection_killers.clone();
6216 let forbid_connections = self.forbid_connections.clone();
6217 let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
6218
6219 Arc::get_mut(&mut client)
6220 .unwrap()
6221 .override_authenticate(move |cx| {
6222 cx.spawn(|_| async move {
6223 let access_token = "the-token".to_string();
6224 Ok(Credentials {
6225 user_id: user_id.0 as u64,
6226 access_token,
6227 })
6228 })
6229 })
6230 .override_establish_connection(move |credentials, cx| {
6231 assert_eq!(credentials.user_id, user_id.0 as u64);
6232 assert_eq!(credentials.access_token, "the-token");
6233
6234 let server = server.clone();
6235 let connection_killers = connection_killers.clone();
6236 let forbid_connections = forbid_connections.clone();
6237 let client_name = client_name.clone();
6238 let connection_id_tx = connection_id_tx.clone();
6239 cx.spawn(move |cx| async move {
6240 if forbid_connections.load(SeqCst) {
6241 Err(EstablishConnectionError::other(anyhow!(
6242 "server is forbidding connections"
6243 )))
6244 } else {
6245 let (client_conn, server_conn, killed) =
6246 Connection::in_memory(cx.background());
6247 connection_killers.lock().insert(user_id, killed);
6248 cx.background()
6249 .spawn(server.handle_connection(
6250 server_conn,
6251 client_name,
6252 user_id,
6253 Some(connection_id_tx),
6254 cx.background(),
6255 ))
6256 .detach();
6257 Ok(client_conn)
6258 }
6259 })
6260 });
6261
6262 client
6263 .authenticate_and_connect(false, &cx.to_async())
6264 .await
6265 .unwrap();
6266
6267 Channel::init(&client);
6268 Project::init(&client);
6269 cx.update(|cx| {
6270 workspace::init(&client, cx);
6271 });
6272
6273 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
6274 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
6275
6276 let client = TestClient {
6277 client,
6278 peer_id,
6279 username: name.to_string(),
6280 user_store,
6281 language_registry: Arc::new(LanguageRegistry::test()),
6282 project: Default::default(),
6283 buffers: Default::default(),
6284 };
6285 client.wait_for_current_user(cx).await;
6286 client
6287 }
6288
6289 fn disconnect_client(&self, user_id: UserId) {
6290 self.connection_killers
6291 .lock()
6292 .remove(&user_id)
6293 .unwrap()
6294 .store(true, SeqCst);
6295 }
6296
6297 fn forbid_connections(&self) {
6298 self.forbid_connections.store(true, SeqCst);
6299 }
6300
6301 fn allow_connections(&self) {
6302 self.forbid_connections.store(false, SeqCst);
6303 }
6304
6305 async fn make_contacts(&self, mut clients: Vec<(&TestClient, &mut TestAppContext)>) {
6306 while let Some((client_a, cx_a)) = clients.pop() {
6307 for (client_b, cx_b) in &mut clients {
6308 client_a
6309 .user_store
6310 .update(cx_a, |store, cx| {
6311 store.request_contact(client_b.user_id().unwrap(), cx)
6312 })
6313 .await
6314 .unwrap();
6315 cx_a.foreground().run_until_parked();
6316 client_b
6317 .user_store
6318 .update(*cx_b, |store, cx| {
6319 store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
6320 })
6321 .await
6322 .unwrap();
6323 }
6324 }
6325 }
6326
6327 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
6328 Arc::new(AppState {
6329 db: test_db.db().clone(),
6330 api_token: Default::default(),
6331 })
6332 }
6333
6334 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
6335 self.server.store.read().await
6336 }
6337
6338 async fn condition<F>(&mut self, mut predicate: F)
6339 where
6340 F: FnMut(&Store) -> bool,
6341 {
6342 assert!(
6343 self.foreground.parking_forbidden(),
6344 "you must call forbid_parking to use server conditions so we don't block indefinitely"
6345 );
6346 while !(predicate)(&*self.server.store.read().await) {
6347 self.foreground.start_waiting();
6348 self.notifications.next().await;
6349 self.foreground.finish_waiting();
6350 }
6351 }
6352 }
6353
6354 impl Deref for TestServer {
6355 type Target = Server;
6356
6357 fn deref(&self) -> &Self::Target {
6358 &self.server
6359 }
6360 }
6361
6362 impl Drop for TestServer {
6363 fn drop(&mut self) {
6364 self.peer.reset();
6365 }
6366 }
6367
6368 struct TestClient {
6369 client: Arc<Client>,
6370 username: String,
6371 pub peer_id: PeerId,
6372 pub user_store: ModelHandle<UserStore>,
6373 language_registry: Arc<LanguageRegistry>,
6374 project: Option<ModelHandle<Project>>,
6375 buffers: HashSet<ModelHandle<language::Buffer>>,
6376 }
6377
6378 impl Deref for TestClient {
6379 type Target = Arc<Client>;
6380
6381 fn deref(&self) -> &Self::Target {
6382 &self.client
6383 }
6384 }
6385
6386 struct ContactsSummary {
6387 pub current: Vec<String>,
6388 pub outgoing_requests: Vec<String>,
6389 pub incoming_requests: Vec<String>,
6390 }
6391
6392 impl TestClient {
6393 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
6394 UserId::from_proto(
6395 self.user_store
6396 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
6397 )
6398 }
6399
6400 async fn wait_for_current_user(&self, cx: &TestAppContext) {
6401 let mut authed_user = self
6402 .user_store
6403 .read_with(cx, |user_store, _| user_store.watch_current_user());
6404 while authed_user.next().await.unwrap().is_none() {}
6405 }
6406
6407 async fn clear_contacts(&self, cx: &mut TestAppContext) {
6408 self.user_store
6409 .update(cx, |store, _| store.clear_contacts())
6410 .await;
6411 }
6412
6413 fn summarize_contacts(&self, cx: &TestAppContext) -> ContactsSummary {
6414 self.user_store.read_with(cx, |store, _| ContactsSummary {
6415 current: store
6416 .contacts()
6417 .iter()
6418 .map(|contact| contact.user.github_login.clone())
6419 .collect(),
6420 outgoing_requests: store
6421 .outgoing_contact_requests()
6422 .iter()
6423 .map(|user| user.github_login.clone())
6424 .collect(),
6425 incoming_requests: store
6426 .incoming_contact_requests()
6427 .iter()
6428 .map(|user| user.github_login.clone())
6429 .collect(),
6430 })
6431 }
6432
6433 async fn build_local_project(
6434 &mut self,
6435 fs: Arc<FakeFs>,
6436 root_path: impl AsRef<Path>,
6437 cx: &mut TestAppContext,
6438 ) -> (ModelHandle<Project>, WorktreeId) {
6439 let project = cx.update(|cx| {
6440 Project::local(
6441 self.client.clone(),
6442 self.user_store.clone(),
6443 self.language_registry.clone(),
6444 fs,
6445 cx,
6446 )
6447 });
6448 self.project = Some(project.clone());
6449 let (worktree, _) = project
6450 .update(cx, |p, cx| {
6451 p.find_or_create_local_worktree(root_path, true, cx)
6452 })
6453 .await
6454 .unwrap();
6455 worktree
6456 .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
6457 .await;
6458 project
6459 .update(cx, |project, _| project.next_remote_id())
6460 .await;
6461 (project, worktree.read_with(cx, |tree, _| tree.id()))
6462 }
6463
6464 async fn build_remote_project(
6465 &mut self,
6466 host_project: &ModelHandle<Project>,
6467 host_cx: &mut TestAppContext,
6468 guest_cx: &mut TestAppContext,
6469 ) -> ModelHandle<Project> {
6470 let host_project_id = host_project
6471 .read_with(host_cx, |project, _| project.next_remote_id())
6472 .await;
6473 let guest_user_id = self.user_id().unwrap();
6474 let languages =
6475 host_project.read_with(host_cx, |project, _| project.languages().clone());
6476 let project_b = guest_cx.spawn(|mut cx| {
6477 let user_store = self.user_store.clone();
6478 let guest_client = self.client.clone();
6479 async move {
6480 Project::remote(
6481 host_project_id,
6482 guest_client,
6483 user_store.clone(),
6484 languages,
6485 FakeFs::new(cx.background()),
6486 &mut cx,
6487 )
6488 .await
6489 .unwrap()
6490 }
6491 });
6492 host_cx.foreground().run_until_parked();
6493 host_project.update(host_cx, |project, cx| {
6494 project.respond_to_join_request(guest_user_id, true, cx)
6495 });
6496 let project = project_b.await;
6497 self.project = Some(project.clone());
6498 project
6499 }
6500
6501 fn build_workspace(
6502 &self,
6503 project: &ModelHandle<Project>,
6504 cx: &mut TestAppContext,
6505 ) -> ViewHandle<Workspace> {
6506 let (window_id, _) = cx.add_window(|_| EmptyView);
6507 cx.add_view(window_id, |cx| {
6508 let fs = project.read(cx).fs().clone();
6509 Workspace::new(
6510 &WorkspaceParams {
6511 fs,
6512 project: project.clone(),
6513 user_store: self.user_store.clone(),
6514 languages: self.language_registry.clone(),
6515 themes: ThemeRegistry::new((), cx.font_cache().clone()),
6516 channel_list: cx.add_model(|cx| {
6517 ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
6518 }),
6519 client: self.client.clone(),
6520 },
6521 cx,
6522 )
6523 })
6524 }
6525
6526 async fn simulate_host(
6527 mut self,
6528 project: ModelHandle<Project>,
6529 op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6530 rng: Arc<Mutex<StdRng>>,
6531 mut cx: TestAppContext,
6532 ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6533 async fn simulate_host_internal(
6534 client: &mut TestClient,
6535 project: ModelHandle<Project>,
6536 mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6537 rng: Arc<Mutex<StdRng>>,
6538 cx: &mut TestAppContext,
6539 ) -> anyhow::Result<()> {
6540 let fs = project.read_with(cx, |project, _| project.fs().clone());
6541
6542 cx.update(|cx| {
6543 cx.subscribe(&project, move |project, event, cx| {
6544 if let project::Event::ContactRequestedJoin(user) = event {
6545 log::info!("Host: accepting join request from {}", user.github_login);
6546 project.update(cx, |project, cx| {
6547 project.respond_to_join_request(user.id, true, cx)
6548 });
6549 }
6550 })
6551 .detach();
6552 });
6553
6554 while op_start_signal.next().await.is_some() {
6555 let distribution = rng.lock().gen_range::<usize, _>(0..100);
6556 let files = fs.as_fake().files().await;
6557 match distribution {
6558 0..=19 if !files.is_empty() => {
6559 let path = files.choose(&mut *rng.lock()).unwrap();
6560 let mut path = path.as_path();
6561 while let Some(parent_path) = path.parent() {
6562 path = parent_path;
6563 if rng.lock().gen() {
6564 break;
6565 }
6566 }
6567
6568 log::info!("Host: find/create local worktree {:?}", path);
6569 let find_or_create_worktree = project.update(cx, |project, cx| {
6570 project.find_or_create_local_worktree(path, true, cx)
6571 });
6572 if rng.lock().gen() {
6573 cx.background().spawn(find_or_create_worktree).detach();
6574 } else {
6575 find_or_create_worktree.await?;
6576 }
6577 }
6578 20..=79 if !files.is_empty() => {
6579 let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6580 let file = files.choose(&mut *rng.lock()).unwrap();
6581 let (worktree, path) = project
6582 .update(cx, |project, cx| {
6583 project.find_or_create_local_worktree(
6584 file.clone(),
6585 true,
6586 cx,
6587 )
6588 })
6589 .await?;
6590 let project_path =
6591 worktree.read_with(cx, |worktree, _| (worktree.id(), path));
6592 log::info!(
6593 "Host: opening path {:?}, worktree {}, relative_path {:?}",
6594 file,
6595 project_path.0,
6596 project_path.1
6597 );
6598 let buffer = project
6599 .update(cx, |project, cx| project.open_buffer(project_path, cx))
6600 .await
6601 .unwrap();
6602 client.buffers.insert(buffer.clone());
6603 buffer
6604 } else {
6605 client
6606 .buffers
6607 .iter()
6608 .choose(&mut *rng.lock())
6609 .unwrap()
6610 .clone()
6611 };
6612
6613 if rng.lock().gen_bool(0.1) {
6614 cx.update(|cx| {
6615 log::info!(
6616 "Host: dropping buffer {:?}",
6617 buffer.read(cx).file().unwrap().full_path(cx)
6618 );
6619 client.buffers.remove(&buffer);
6620 drop(buffer);
6621 });
6622 } else {
6623 buffer.update(cx, |buffer, cx| {
6624 log::info!(
6625 "Host: updating buffer {:?} ({})",
6626 buffer.file().unwrap().full_path(cx),
6627 buffer.remote_id()
6628 );
6629
6630 if rng.lock().gen_bool(0.7) {
6631 buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6632 } else {
6633 buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6634 }
6635 });
6636 }
6637 }
6638 _ => loop {
6639 let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
6640 let mut path = PathBuf::new();
6641 path.push("/");
6642 for _ in 0..path_component_count {
6643 let letter = rng.lock().gen_range(b'a'..=b'z');
6644 path.push(std::str::from_utf8(&[letter]).unwrap());
6645 }
6646 path.set_extension("rs");
6647 let parent_path = path.parent().unwrap();
6648
6649 log::info!("Host: creating file {:?}", path,);
6650
6651 if fs.create_dir(&parent_path).await.is_ok()
6652 && fs.create_file(&path, Default::default()).await.is_ok()
6653 {
6654 break;
6655 } else {
6656 log::info!("Host: cannot create file");
6657 }
6658 },
6659 }
6660
6661 cx.background().simulate_random_delay().await;
6662 }
6663
6664 Ok(())
6665 }
6666
6667 let result =
6668 simulate_host_internal(&mut self, project.clone(), op_start_signal, rng, &mut cx)
6669 .await;
6670 log::info!("Host done");
6671 self.project = Some(project);
6672 (self, cx, result.err())
6673 }
6674
6675 pub async fn simulate_guest(
6676 mut self,
6677 guest_username: String,
6678 project: ModelHandle<Project>,
6679 op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6680 rng: Arc<Mutex<StdRng>>,
6681 mut cx: TestAppContext,
6682 ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6683 async fn simulate_guest_internal(
6684 client: &mut TestClient,
6685 guest_username: &str,
6686 project: ModelHandle<Project>,
6687 mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6688 rng: Arc<Mutex<StdRng>>,
6689 cx: &mut TestAppContext,
6690 ) -> anyhow::Result<()> {
6691 while op_start_signal.next().await.is_some() {
6692 let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6693 let worktree = if let Some(worktree) =
6694 project.read_with(cx, |project, cx| {
6695 project
6696 .worktrees(&cx)
6697 .filter(|worktree| {
6698 let worktree = worktree.read(cx);
6699 worktree.is_visible()
6700 && worktree.entries(false).any(|e| e.is_file())
6701 })
6702 .choose(&mut *rng.lock())
6703 }) {
6704 worktree
6705 } else {
6706 cx.background().simulate_random_delay().await;
6707 continue;
6708 };
6709
6710 let (worktree_root_name, project_path) =
6711 worktree.read_with(cx, |worktree, _| {
6712 let entry = worktree
6713 .entries(false)
6714 .filter(|e| e.is_file())
6715 .choose(&mut *rng.lock())
6716 .unwrap();
6717 (
6718 worktree.root_name().to_string(),
6719 (worktree.id(), entry.path.clone()),
6720 )
6721 });
6722 log::info!(
6723 "{}: opening path {:?} in worktree {} ({})",
6724 guest_username,
6725 project_path.1,
6726 project_path.0,
6727 worktree_root_name,
6728 );
6729 let buffer = project
6730 .update(cx, |project, cx| {
6731 project.open_buffer(project_path.clone(), cx)
6732 })
6733 .await?;
6734 log::info!(
6735 "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
6736 guest_username,
6737 project_path.1,
6738 project_path.0,
6739 worktree_root_name,
6740 buffer.read_with(cx, |buffer, _| buffer.remote_id())
6741 );
6742 client.buffers.insert(buffer.clone());
6743 buffer
6744 } else {
6745 client
6746 .buffers
6747 .iter()
6748 .choose(&mut *rng.lock())
6749 .unwrap()
6750 .clone()
6751 };
6752
6753 let choice = rng.lock().gen_range(0..100);
6754 match choice {
6755 0..=9 => {
6756 cx.update(|cx| {
6757 log::info!(
6758 "{}: dropping buffer {:?}",
6759 guest_username,
6760 buffer.read(cx).file().unwrap().full_path(cx)
6761 );
6762 client.buffers.remove(&buffer);
6763 drop(buffer);
6764 });
6765 }
6766 10..=19 => {
6767 let completions = project.update(cx, |project, cx| {
6768 log::info!(
6769 "{}: requesting completions for buffer {} ({:?})",
6770 guest_username,
6771 buffer.read(cx).remote_id(),
6772 buffer.read(cx).file().unwrap().full_path(cx)
6773 );
6774 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6775 project.completions(&buffer, offset, cx)
6776 });
6777 let completions = cx.background().spawn(async move {
6778 completions
6779 .await
6780 .map_err(|err| anyhow!("completions request failed: {:?}", err))
6781 });
6782 if rng.lock().gen_bool(0.3) {
6783 log::info!("{}: detaching completions request", guest_username);
6784 cx.update(|cx| completions.detach_and_log_err(cx));
6785 } else {
6786 completions.await?;
6787 }
6788 }
6789 20..=29 => {
6790 let code_actions = project.update(cx, |project, cx| {
6791 log::info!(
6792 "{}: requesting code actions for buffer {} ({:?})",
6793 guest_username,
6794 buffer.read(cx).remote_id(),
6795 buffer.read(cx).file().unwrap().full_path(cx)
6796 );
6797 let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
6798 project.code_actions(&buffer, range, cx)
6799 });
6800 let code_actions = cx.background().spawn(async move {
6801 code_actions.await.map_err(|err| {
6802 anyhow!("code actions request failed: {:?}", err)
6803 })
6804 });
6805 if rng.lock().gen_bool(0.3) {
6806 log::info!("{}: detaching code actions request", guest_username);
6807 cx.update(|cx| code_actions.detach_and_log_err(cx));
6808 } else {
6809 code_actions.await?;
6810 }
6811 }
6812 30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
6813 let (requested_version, save) = buffer.update(cx, |buffer, cx| {
6814 log::info!(
6815 "{}: saving buffer {} ({:?})",
6816 guest_username,
6817 buffer.remote_id(),
6818 buffer.file().unwrap().full_path(cx)
6819 );
6820 (buffer.version(), buffer.save(cx))
6821 });
6822 let save = cx.background().spawn(async move {
6823 let (saved_version, _) = save
6824 .await
6825 .map_err(|err| anyhow!("save request failed: {:?}", err))?;
6826 assert!(saved_version.observed_all(&requested_version));
6827 Ok::<_, anyhow::Error>(())
6828 });
6829 if rng.lock().gen_bool(0.3) {
6830 log::info!("{}: detaching save request", guest_username);
6831 cx.update(|cx| save.detach_and_log_err(cx));
6832 } else {
6833 save.await?;
6834 }
6835 }
6836 40..=44 => {
6837 let prepare_rename = project.update(cx, |project, cx| {
6838 log::info!(
6839 "{}: preparing rename for buffer {} ({:?})",
6840 guest_username,
6841 buffer.read(cx).remote_id(),
6842 buffer.read(cx).file().unwrap().full_path(cx)
6843 );
6844 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6845 project.prepare_rename(buffer, offset, cx)
6846 });
6847 let prepare_rename = cx.background().spawn(async move {
6848 prepare_rename.await.map_err(|err| {
6849 anyhow!("prepare rename request failed: {:?}", err)
6850 })
6851 });
6852 if rng.lock().gen_bool(0.3) {
6853 log::info!("{}: detaching prepare rename request", guest_username);
6854 cx.update(|cx| prepare_rename.detach_and_log_err(cx));
6855 } else {
6856 prepare_rename.await?;
6857 }
6858 }
6859 45..=49 => {
6860 let definitions = project.update(cx, |project, cx| {
6861 log::info!(
6862 "{}: requesting definitions for buffer {} ({:?})",
6863 guest_username,
6864 buffer.read(cx).remote_id(),
6865 buffer.read(cx).file().unwrap().full_path(cx)
6866 );
6867 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6868 project.definition(&buffer, offset, cx)
6869 });
6870 let definitions = cx.background().spawn(async move {
6871 definitions
6872 .await
6873 .map_err(|err| anyhow!("definitions request failed: {:?}", err))
6874 });
6875 if rng.lock().gen_bool(0.3) {
6876 log::info!("{}: detaching definitions request", guest_username);
6877 cx.update(|cx| definitions.detach_and_log_err(cx));
6878 } else {
6879 client
6880 .buffers
6881 .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
6882 }
6883 }
6884 50..=54 => {
6885 let highlights = project.update(cx, |project, cx| {
6886 log::info!(
6887 "{}: requesting highlights for buffer {} ({:?})",
6888 guest_username,
6889 buffer.read(cx).remote_id(),
6890 buffer.read(cx).file().unwrap().full_path(cx)
6891 );
6892 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6893 project.document_highlights(&buffer, offset, cx)
6894 });
6895 let highlights = cx.background().spawn(async move {
6896 highlights
6897 .await
6898 .map_err(|err| anyhow!("highlights request failed: {:?}", err))
6899 });
6900 if rng.lock().gen_bool(0.3) {
6901 log::info!("{}: detaching highlights request", guest_username);
6902 cx.update(|cx| highlights.detach_and_log_err(cx));
6903 } else {
6904 highlights.await?;
6905 }
6906 }
6907 55..=59 => {
6908 let search = project.update(cx, |project, cx| {
6909 let query = rng.lock().gen_range('a'..='z');
6910 log::info!("{}: project-wide search {:?}", guest_username, query);
6911 project.search(SearchQuery::text(query, false, false), cx)
6912 });
6913 let search = cx.background().spawn(async move {
6914 search
6915 .await
6916 .map_err(|err| anyhow!("search request failed: {:?}", err))
6917 });
6918 if rng.lock().gen_bool(0.3) {
6919 log::info!("{}: detaching search request", guest_username);
6920 cx.update(|cx| search.detach_and_log_err(cx));
6921 } else {
6922 client.buffers.extend(search.await?.into_keys());
6923 }
6924 }
6925 60..=69 => {
6926 let worktree = project
6927 .read_with(cx, |project, cx| {
6928 project
6929 .worktrees(&cx)
6930 .filter(|worktree| {
6931 let worktree = worktree.read(cx);
6932 worktree.is_visible()
6933 && worktree.entries(false).any(|e| e.is_file())
6934 && worktree
6935 .root_entry()
6936 .map_or(false, |e| e.is_dir())
6937 })
6938 .choose(&mut *rng.lock())
6939 })
6940 .unwrap();
6941 let (worktree_id, worktree_root_name) = worktree
6942 .read_with(cx, |worktree, _| {
6943 (worktree.id(), worktree.root_name().to_string())
6944 });
6945
6946 let mut new_name = String::new();
6947 for _ in 0..10 {
6948 let letter = rng.lock().gen_range('a'..='z');
6949 new_name.push(letter);
6950 }
6951 let mut new_path = PathBuf::new();
6952 new_path.push(new_name);
6953 new_path.set_extension("rs");
6954 log::info!(
6955 "{}: creating {:?} in worktree {} ({})",
6956 guest_username,
6957 new_path,
6958 worktree_id,
6959 worktree_root_name,
6960 );
6961 project
6962 .update(cx, |project, cx| {
6963 project.create_entry((worktree_id, new_path), false, cx)
6964 })
6965 .unwrap()
6966 .await?;
6967 }
6968 _ => {
6969 buffer.update(cx, |buffer, cx| {
6970 log::info!(
6971 "{}: updating buffer {} ({:?})",
6972 guest_username,
6973 buffer.remote_id(),
6974 buffer.file().unwrap().full_path(cx)
6975 );
6976 if rng.lock().gen_bool(0.7) {
6977 buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6978 } else {
6979 buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6980 }
6981 });
6982 }
6983 }
6984 cx.background().simulate_random_delay().await;
6985 }
6986 Ok(())
6987 }
6988
6989 let result = simulate_guest_internal(
6990 &mut self,
6991 &guest_username,
6992 project.clone(),
6993 op_start_signal,
6994 rng,
6995 &mut cx,
6996 )
6997 .await;
6998 log::info!("{}: done", guest_username);
6999
7000 self.project = Some(project);
7001 (self, cx, result.err())
7002 }
7003 }
7004
7005 impl Drop for TestClient {
7006 fn drop(&mut self) {
7007 self.client.tear_down();
7008 }
7009 }
7010
7011 impl Executor for Arc<gpui::executor::Background> {
7012 type Sleep = gpui::executor::Timer;
7013
7014 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
7015 self.spawn(future).detach();
7016 }
7017
7018 fn sleep(&self, duration: Duration) -> Self::Sleep {
7019 self.as_ref().timer(duration)
7020 }
7021 }
7022
7023 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
7024 channel
7025 .messages()
7026 .cursor::<()>()
7027 .map(|m| {
7028 (
7029 m.sender.github_login.clone(),
7030 m.body.clone(),
7031 m.is_pending(),
7032 )
7033 })
7034 .collect()
7035 }
7036
7037 struct EmptyView;
7038
7039 impl gpui::Entity for EmptyView {
7040 type Event = ();
7041 }
7042
7043 impl gpui::View for EmptyView {
7044 fn ui_name() -> &'static str {
7045 "empty view"
7046 }
7047
7048 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
7049 gpui::Element::boxed(gpui::elements::Empty::new())
7050 }
7051 }
7052}