1mod store;
2
3use crate::{
4 auth,
5 db::{ChannelId, MessageId, UserId},
6 AppState, Result,
7};
8use anyhow::anyhow;
9use async_tungstenite::tungstenite::{
10 protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
11};
12use axum::{
13 body::Body,
14 extract::{
15 ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
16 ConnectInfo, WebSocketUpgrade,
17 },
18 headers::{Header, HeaderName},
19 http::StatusCode,
20 middleware,
21 response::IntoResponse,
22 routing::get,
23 Extension, Router, TypedHeader,
24};
25use collections::HashMap;
26use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
27use lazy_static::lazy_static;
28use rpc::{
29 proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
30 Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
31};
32use std::{
33 any::TypeId,
34 future::Future,
35 marker::PhantomData,
36 net::SocketAddr,
37 ops::{Deref, DerefMut},
38 rc::Rc,
39 sync::{
40 atomic::{AtomicBool, Ordering::SeqCst},
41 Arc,
42 },
43 time::Duration,
44};
45use store::{Store, Worktree};
46use time::OffsetDateTime;
47use tokio::{
48 sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
49 time::Sleep,
50};
51use tower::ServiceBuilder;
52use tracing::{info_span, Instrument};
53
54type MessageHandler =
55 Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
56
57struct Response<R> {
58 server: Arc<Server>,
59 receipt: Receipt<R>,
60 responded: Arc<AtomicBool>,
61}
62
63impl<R: RequestMessage> Response<R> {
64 fn send(self, payload: R::Response) -> Result<()> {
65 self.responded.store(true, SeqCst);
66 self.server.peer.respond(self.receipt, payload)?;
67 Ok(())
68 }
69}
70
71pub struct Server {
72 peer: Arc<Peer>,
73 store: RwLock<Store>,
74 app_state: Arc<AppState>,
75 handlers: HashMap<TypeId, MessageHandler>,
76 notifications: Option<mpsc::UnboundedSender<()>>,
77}
78
79pub trait Executor: Send + Clone {
80 type Sleep: Send + Future;
81 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
82 fn sleep(&self, duration: Duration) -> Self::Sleep;
83}
84
85#[derive(Clone)]
86pub struct RealExecutor;
87
88const MESSAGE_COUNT_PER_PAGE: usize = 100;
89const MAX_MESSAGE_LEN: usize = 1024;
90
91struct StoreReadGuard<'a> {
92 guard: RwLockReadGuard<'a, Store>,
93 _not_send: PhantomData<Rc<()>>,
94}
95
96struct StoreWriteGuard<'a> {
97 guard: RwLockWriteGuard<'a, Store>,
98 _not_send: PhantomData<Rc<()>>,
99}
100
101impl Server {
102 pub fn new(
103 app_state: Arc<AppState>,
104 notifications: Option<mpsc::UnboundedSender<()>>,
105 ) -> Arc<Self> {
106 let mut server = Self {
107 peer: Peer::new(),
108 app_state,
109 store: Default::default(),
110 handlers: Default::default(),
111 notifications,
112 };
113
114 server
115 .add_request_handler(Server::ping)
116 .add_request_handler(Server::register_project)
117 .add_message_handler(Server::unregister_project)
118 .add_request_handler(Server::share_project)
119 .add_message_handler(Server::unshare_project)
120 .add_request_handler(Server::join_project)
121 .add_message_handler(Server::leave_project)
122 .add_request_handler(Server::register_worktree)
123 .add_message_handler(Server::unregister_worktree)
124 .add_request_handler(Server::update_worktree)
125 .add_message_handler(Server::start_language_server)
126 .add_message_handler(Server::update_language_server)
127 .add_message_handler(Server::update_diagnostic_summary)
128 .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
129 .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
130 .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
131 .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
132 .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
133 .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
134 .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
135 .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
136 .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
137 .add_request_handler(
138 Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
139 )
140 .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
141 .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
142 .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
143 .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
144 .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
145 .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
146 .add_request_handler(Server::forward_project_request::<proto::CreateProjectEntry>)
147 .add_request_handler(Server::forward_project_request::<proto::RenameProjectEntry>)
148 .add_request_handler(Server::forward_project_request::<proto::DeleteProjectEntry>)
149 .add_request_handler(Server::update_buffer)
150 .add_message_handler(Server::update_buffer_file)
151 .add_message_handler(Server::buffer_reloaded)
152 .add_message_handler(Server::buffer_saved)
153 .add_request_handler(Server::save_buffer)
154 .add_request_handler(Server::get_channels)
155 .add_request_handler(Server::get_users)
156 .add_request_handler(Server::fuzzy_search_users)
157 .add_request_handler(Server::request_contact)
158 .add_request_handler(Server::remove_contact)
159 .add_request_handler(Server::respond_to_contact_request)
160 .add_request_handler(Server::join_channel)
161 .add_message_handler(Server::leave_channel)
162 .add_request_handler(Server::send_channel_message)
163 .add_request_handler(Server::follow)
164 .add_message_handler(Server::unfollow)
165 .add_message_handler(Server::update_followers)
166 .add_request_handler(Server::get_channel_messages);
167
168 Arc::new(server)
169 }
170
171 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
172 where
173 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
174 Fut: 'static + Send + Future<Output = Result<()>>,
175 M: EnvelopedMessage,
176 {
177 let prev_handler = self.handlers.insert(
178 TypeId::of::<M>(),
179 Box::new(move |server, envelope| {
180 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
181 let span = info_span!(
182 "handle message",
183 payload_type = envelope.payload_type_name(),
184 payload = format!("{:?}", envelope.payload).as_str(),
185 );
186 let future = (handler)(server, *envelope);
187 async move {
188 if let Err(error) = future.await {
189 tracing::error!(%error, "error handling message");
190 }
191 }
192 .instrument(span)
193 .boxed()
194 }),
195 );
196 if prev_handler.is_some() {
197 panic!("registered a handler for the same message twice");
198 }
199 self
200 }
201
202 /// Handle a request while holding a lock to the store. This is useful when we're registering
203 /// a connection but we want to respond on the connection before anybody else can send on it.
204 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
205 where
206 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>, Response<M>) -> Fut,
207 Fut: Send + Future<Output = Result<()>>,
208 M: RequestMessage,
209 {
210 let handler = Arc::new(handler);
211 self.add_message_handler(move |server, envelope| {
212 let receipt = envelope.receipt();
213 let handler = handler.clone();
214 async move {
215 let responded = Arc::new(AtomicBool::default());
216 let response = Response {
217 server: server.clone(),
218 responded: responded.clone(),
219 receipt: envelope.receipt(),
220 };
221 match (handler)(server.clone(), envelope, response).await {
222 Ok(()) => {
223 if responded.load(std::sync::atomic::Ordering::SeqCst) {
224 Ok(())
225 } else {
226 Err(anyhow!("handler did not send a response"))?
227 }
228 }
229 Err(error) => {
230 server.peer.respond_with_error(
231 receipt,
232 proto::Error {
233 message: error.to_string(),
234 },
235 )?;
236 Err(error)
237 }
238 }
239 }
240 })
241 }
242
243 pub fn handle_connection<E: Executor>(
244 self: &Arc<Self>,
245 connection: Connection,
246 address: String,
247 user_id: UserId,
248 mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
249 executor: E,
250 ) -> impl Future<Output = Result<()>> {
251 let mut this = self.clone();
252 let span = info_span!("handle connection", %user_id, %address);
253 async move {
254 let (connection_id, handle_io, mut incoming_rx) = this
255 .peer
256 .add_connection(connection, {
257 let executor = executor.clone();
258 move |duration| {
259 let timer = executor.sleep(duration);
260 async move {
261 timer.await;
262 }
263 }
264 })
265 .await;
266
267 tracing::info!(%user_id, %connection_id, %address, "connection opened");
268
269 if let Some(send_connection_id) = send_connection_id.as_mut() {
270 let _ = send_connection_id.send(connection_id).await;
271 }
272
273 let contacts = this.app_state.db.get_contacts(user_id).await?;
274
275 {
276 let mut store = this.store_mut().await;
277 store.add_connection(connection_id, user_id);
278 this.peer.send(connection_id, store.build_initial_contacts_update(contacts))?;
279 }
280 this.update_user_contacts(user_id).await?;
281
282 let handle_io = handle_io.fuse();
283 futures::pin_mut!(handle_io);
284 loop {
285 let next_message = incoming_rx.next().fuse();
286 futures::pin_mut!(next_message);
287 futures::select_biased! {
288 result = handle_io => {
289 if let Err(error) = result {
290 tracing::error!(%error, "error handling I/O");
291 }
292 break;
293 }
294 message = next_message => {
295 if let Some(message) = message {
296 let type_name = message.payload_type_name();
297 let span = tracing::info_span!("receive message", %user_id, %connection_id, %address, type_name);
298 async {
299 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
300 let notifications = this.notifications.clone();
301 let is_background = message.is_background();
302 let handle_message = (handler)(this.clone(), message);
303 let handle_message = async move {
304 handle_message.await;
305 if let Some(mut notifications) = notifications {
306 let _ = notifications.send(()).await;
307 }
308 };
309 if is_background {
310 executor.spawn_detached(handle_message);
311 } else {
312 handle_message.await;
313 }
314 } else {
315 tracing::error!("no message handler");
316 }
317 }.instrument(span).await;
318 } else {
319 tracing::info!(%user_id, %connection_id, %address, "connection closed");
320 break;
321 }
322 }
323 }
324 }
325
326 if let Err(error) = this.sign_out(connection_id).await {
327 tracing::error!(%error, "error signing out");
328 }
329
330 Ok(())
331 }.instrument(span)
332 }
333
334 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
335 self.peer.disconnect(connection_id);
336 let removed_connection = self.store_mut().await.remove_connection(connection_id)?;
337
338 for (project_id, project) in removed_connection.hosted_projects {
339 if let Some(share) = project.share {
340 broadcast(connection_id, share.guests.keys().copied(), |conn_id| {
341 self.peer
342 .send(conn_id, proto::UnshareProject { project_id })
343 });
344 }
345 }
346
347 for (project_id, peer_ids) in removed_connection.guest_project_ids {
348 broadcast(connection_id, peer_ids, |conn_id| {
349 self.peer.send(
350 conn_id,
351 proto::RemoveProjectCollaborator {
352 project_id,
353 peer_id: connection_id.0,
354 },
355 )
356 });
357 }
358
359 self.update_user_contacts(removed_connection.user_id)
360 .await?;
361
362 Ok(())
363 }
364
365 async fn ping(
366 self: Arc<Server>,
367 _: TypedEnvelope<proto::Ping>,
368 response: Response<proto::Ping>,
369 ) -> Result<()> {
370 response.send(proto::Ack {})?;
371 Ok(())
372 }
373
374 async fn register_project(
375 self: Arc<Server>,
376 request: TypedEnvelope<proto::RegisterProject>,
377 response: Response<proto::RegisterProject>,
378 ) -> Result<()> {
379 let user_id;
380 let project_id;
381 {
382 let mut state = self.store_mut().await;
383 user_id = state.user_id_for_connection(request.sender_id)?;
384 project_id = state.register_project(request.sender_id, user_id);
385 };
386 self.update_user_contacts(user_id).await?;
387 response.send(proto::RegisterProjectResponse { project_id })?;
388 Ok(())
389 }
390
391 async fn unregister_project(
392 self: Arc<Server>,
393 request: TypedEnvelope<proto::UnregisterProject>,
394 ) -> Result<()> {
395 let user_id = {
396 let mut state = self.store_mut().await;
397 state.unregister_project(request.payload.project_id, request.sender_id)?;
398 state.user_id_for_connection(request.sender_id)?
399 };
400
401 self.update_user_contacts(user_id).await?;
402 Ok(())
403 }
404
405 async fn share_project(
406 self: Arc<Server>,
407 request: TypedEnvelope<proto::ShareProject>,
408 response: Response<proto::ShareProject>,
409 ) -> Result<()> {
410 let user_id = {
411 let mut state = self.store_mut().await;
412 state.share_project(request.payload.project_id, request.sender_id)?;
413 state.user_id_for_connection(request.sender_id)?
414 };
415 self.update_user_contacts(user_id).await?;
416 response.send(proto::Ack {})?;
417 Ok(())
418 }
419
420 async fn update_user_contacts(self: &Arc<Server>, user_id: UserId) -> Result<()> {
421 let contacts = self.app_state.db.get_contacts(user_id).await?;
422 let store = self.store().await;
423 let updated_contact = store.contact_for_user(user_id);
424 for contact_user_id in contacts.current {
425 for contact_conn_id in store.connection_ids_for_user(contact_user_id) {
426 self.peer
427 .send(
428 contact_conn_id,
429 proto::UpdateContacts {
430 contacts: vec![updated_contact.clone()],
431 remove_contacts: Default::default(),
432 incoming_requests: Default::default(),
433 remove_incoming_requests: Default::default(),
434 outgoing_requests: Default::default(),
435 remove_outgoing_requests: Default::default(),
436 },
437 )
438 .trace_err();
439 }
440 }
441 Ok(())
442 }
443
444 async fn unshare_project(
445 self: Arc<Server>,
446 request: TypedEnvelope<proto::UnshareProject>,
447 ) -> Result<()> {
448 let project_id = request.payload.project_id;
449 let project;
450 {
451 let mut state = self.store_mut().await;
452 project = state.unshare_project(project_id, request.sender_id)?;
453 broadcast(request.sender_id, project.connection_ids, |conn_id| {
454 self.peer
455 .send(conn_id, proto::UnshareProject { project_id })
456 });
457 }
458 self.update_user_contacts(project.host_user_id).await?;
459 Ok(())
460 }
461
462 async fn join_project(
463 self: Arc<Server>,
464 request: TypedEnvelope<proto::JoinProject>,
465 response: Response<proto::JoinProject>,
466 ) -> Result<()> {
467 let project_id = request.payload.project_id;
468 let host_user_id;
469 let guest_user_id;
470 {
471 let state = self.store().await;
472 host_user_id = state.project(project_id)?.host_user_id;
473 guest_user_id = state.user_id_for_connection(request.sender_id)?;
474 };
475
476 let guest_contacts = self.app_state.db.get_contacts(guest_user_id).await?;
477 if !guest_contacts.current.contains(&host_user_id) {
478 return Err(anyhow!("no such project"))?;
479 }
480
481 {
482 let state = &mut *self.store_mut().await;
483 let joined = state.join_project(request.sender_id, guest_user_id, project_id)?;
484 let share = joined.project.share()?;
485 let peer_count = share.guests.len();
486 let mut collaborators = Vec::with_capacity(peer_count);
487 collaborators.push(proto::Collaborator {
488 peer_id: joined.project.host_connection_id.0,
489 replica_id: 0,
490 user_id: joined.project.host_user_id.to_proto(),
491 });
492 let worktrees = share
493 .worktrees
494 .iter()
495 .filter_map(|(id, shared_worktree)| {
496 let worktree = joined.project.worktrees.get(&id)?;
497 Some(proto::Worktree {
498 id: *id,
499 root_name: worktree.root_name.clone(),
500 entries: shared_worktree.entries.values().cloned().collect(),
501 diagnostic_summaries: shared_worktree
502 .diagnostic_summaries
503 .values()
504 .cloned()
505 .collect(),
506 visible: worktree.visible,
507 scan_id: shared_worktree.scan_id,
508 })
509 })
510 .collect();
511 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
512 if *peer_conn_id != request.sender_id {
513 collaborators.push(proto::Collaborator {
514 peer_id: peer_conn_id.0,
515 replica_id: *peer_replica_id as u32,
516 user_id: peer_user_id.to_proto(),
517 });
518 }
519 }
520 broadcast(
521 request.sender_id,
522 joined.project.connection_ids(),
523 |conn_id| {
524 self.peer.send(
525 conn_id,
526 proto::AddProjectCollaborator {
527 project_id,
528 collaborator: Some(proto::Collaborator {
529 peer_id: request.sender_id.0,
530 replica_id: joined.replica_id as u32,
531 user_id: guest_user_id.to_proto(),
532 }),
533 },
534 )
535 },
536 );
537 response.send(proto::JoinProjectResponse {
538 worktrees,
539 replica_id: joined.replica_id as u32,
540 collaborators,
541 language_servers: joined.project.language_servers.clone(),
542 })?;
543 }
544 self.update_user_contacts(host_user_id).await?;
545 Ok(())
546 }
547
548 async fn leave_project(
549 self: Arc<Server>,
550 request: TypedEnvelope<proto::LeaveProject>,
551 ) -> Result<()> {
552 let sender_id = request.sender_id;
553 let project_id = request.payload.project_id;
554 let project;
555 {
556 let mut state = self.store_mut().await;
557 project = state.leave_project(sender_id, project_id)?;
558 broadcast(sender_id, project.connection_ids, |conn_id| {
559 self.peer.send(
560 conn_id,
561 proto::RemoveProjectCollaborator {
562 project_id,
563 peer_id: sender_id.0,
564 },
565 )
566 });
567 }
568 self.update_user_contacts(project.host_user_id).await?;
569 Ok(())
570 }
571
572 async fn register_worktree(
573 self: Arc<Server>,
574 request: TypedEnvelope<proto::RegisterWorktree>,
575 response: Response<proto::RegisterWorktree>,
576 ) -> Result<()> {
577 let host_user_id;
578 {
579 let mut state = self.store_mut().await;
580 host_user_id = state.user_id_for_connection(request.sender_id)?;
581
582 let guest_connection_ids = state
583 .read_project(request.payload.project_id, request.sender_id)?
584 .guest_connection_ids();
585 state.register_worktree(
586 request.payload.project_id,
587 request.payload.worktree_id,
588 request.sender_id,
589 Worktree {
590 root_name: request.payload.root_name.clone(),
591 visible: request.payload.visible,
592 },
593 )?;
594
595 broadcast(request.sender_id, guest_connection_ids, |connection_id| {
596 self.peer
597 .forward_send(request.sender_id, connection_id, request.payload.clone())
598 });
599 }
600 self.update_user_contacts(host_user_id).await?;
601 response.send(proto::Ack {})?;
602 Ok(())
603 }
604
605 async fn unregister_worktree(
606 self: Arc<Server>,
607 request: TypedEnvelope<proto::UnregisterWorktree>,
608 ) -> Result<()> {
609 let host_user_id;
610 let project_id = request.payload.project_id;
611 let worktree_id = request.payload.worktree_id;
612 {
613 let mut state = self.store_mut().await;
614 let (_, guest_connection_ids) =
615 state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
616 host_user_id = state.user_id_for_connection(request.sender_id)?;
617 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
618 self.peer.send(
619 conn_id,
620 proto::UnregisterWorktree {
621 project_id,
622 worktree_id,
623 },
624 )
625 });
626 }
627 self.update_user_contacts(host_user_id).await?;
628 Ok(())
629 }
630
631 async fn update_worktree(
632 self: Arc<Server>,
633 request: TypedEnvelope<proto::UpdateWorktree>,
634 response: Response<proto::UpdateWorktree>,
635 ) -> Result<()> {
636 let connection_ids = self.store_mut().await.update_worktree(
637 request.sender_id,
638 request.payload.project_id,
639 request.payload.worktree_id,
640 &request.payload.removed_entries,
641 &request.payload.updated_entries,
642 request.payload.scan_id,
643 )?;
644
645 broadcast(request.sender_id, connection_ids, |connection_id| {
646 self.peer
647 .forward_send(request.sender_id, connection_id, request.payload.clone())
648 });
649 response.send(proto::Ack {})?;
650 Ok(())
651 }
652
653 async fn update_diagnostic_summary(
654 self: Arc<Server>,
655 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
656 ) -> Result<()> {
657 let summary = request
658 .payload
659 .summary
660 .clone()
661 .ok_or_else(|| anyhow!("invalid summary"))?;
662 let receiver_ids = self.store_mut().await.update_diagnostic_summary(
663 request.payload.project_id,
664 request.payload.worktree_id,
665 request.sender_id,
666 summary,
667 )?;
668
669 broadcast(request.sender_id, receiver_ids, |connection_id| {
670 self.peer
671 .forward_send(request.sender_id, connection_id, request.payload.clone())
672 });
673 Ok(())
674 }
675
676 async fn start_language_server(
677 self: Arc<Server>,
678 request: TypedEnvelope<proto::StartLanguageServer>,
679 ) -> Result<()> {
680 let receiver_ids = self.store_mut().await.start_language_server(
681 request.payload.project_id,
682 request.sender_id,
683 request
684 .payload
685 .server
686 .clone()
687 .ok_or_else(|| anyhow!("invalid language server"))?,
688 )?;
689 broadcast(request.sender_id, receiver_ids, |connection_id| {
690 self.peer
691 .forward_send(request.sender_id, connection_id, request.payload.clone())
692 });
693 Ok(())
694 }
695
696 async fn update_language_server(
697 self: Arc<Server>,
698 request: TypedEnvelope<proto::UpdateLanguageServer>,
699 ) -> Result<()> {
700 let receiver_ids = self
701 .store()
702 .await
703 .project_connection_ids(request.payload.project_id, request.sender_id)?;
704 broadcast(request.sender_id, receiver_ids, |connection_id| {
705 self.peer
706 .forward_send(request.sender_id, connection_id, request.payload.clone())
707 });
708 Ok(())
709 }
710
711 async fn forward_project_request<T>(
712 self: Arc<Server>,
713 request: TypedEnvelope<T>,
714 response: Response<T>,
715 ) -> Result<()>
716 where
717 T: EntityMessage + RequestMessage,
718 {
719 let host_connection_id = self
720 .store()
721 .await
722 .read_project(request.payload.remote_entity_id(), request.sender_id)?
723 .host_connection_id;
724
725 response.send(
726 self.peer
727 .forward_request(request.sender_id, host_connection_id, request.payload)
728 .await?,
729 )?;
730 Ok(())
731 }
732
733 async fn save_buffer(
734 self: Arc<Server>,
735 request: TypedEnvelope<proto::SaveBuffer>,
736 response: Response<proto::SaveBuffer>,
737 ) -> Result<()> {
738 let host = self
739 .store()
740 .await
741 .read_project(request.payload.project_id, request.sender_id)?
742 .host_connection_id;
743 let response_payload = self
744 .peer
745 .forward_request(request.sender_id, host, request.payload.clone())
746 .await?;
747
748 let mut guests = self
749 .store()
750 .await
751 .read_project(request.payload.project_id, request.sender_id)?
752 .connection_ids();
753 guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
754 broadcast(host, guests, |conn_id| {
755 self.peer
756 .forward_send(host, conn_id, response_payload.clone())
757 });
758 response.send(response_payload)?;
759 Ok(())
760 }
761
762 async fn update_buffer(
763 self: Arc<Server>,
764 request: TypedEnvelope<proto::UpdateBuffer>,
765 response: Response<proto::UpdateBuffer>,
766 ) -> Result<()> {
767 let receiver_ids = self
768 .store()
769 .await
770 .project_connection_ids(request.payload.project_id, request.sender_id)?;
771 broadcast(request.sender_id, receiver_ids, |connection_id| {
772 self.peer
773 .forward_send(request.sender_id, connection_id, request.payload.clone())
774 });
775 response.send(proto::Ack {})?;
776 Ok(())
777 }
778
779 async fn update_buffer_file(
780 self: Arc<Server>,
781 request: TypedEnvelope<proto::UpdateBufferFile>,
782 ) -> Result<()> {
783 let receiver_ids = self
784 .store()
785 .await
786 .project_connection_ids(request.payload.project_id, request.sender_id)?;
787 broadcast(request.sender_id, receiver_ids, |connection_id| {
788 self.peer
789 .forward_send(request.sender_id, connection_id, request.payload.clone())
790 });
791 Ok(())
792 }
793
794 async fn buffer_reloaded(
795 self: Arc<Server>,
796 request: TypedEnvelope<proto::BufferReloaded>,
797 ) -> Result<()> {
798 let receiver_ids = self
799 .store()
800 .await
801 .project_connection_ids(request.payload.project_id, request.sender_id)?;
802 broadcast(request.sender_id, receiver_ids, |connection_id| {
803 self.peer
804 .forward_send(request.sender_id, connection_id, request.payload.clone())
805 });
806 Ok(())
807 }
808
809 async fn buffer_saved(
810 self: Arc<Server>,
811 request: TypedEnvelope<proto::BufferSaved>,
812 ) -> Result<()> {
813 let receiver_ids = self
814 .store()
815 .await
816 .project_connection_ids(request.payload.project_id, request.sender_id)?;
817 broadcast(request.sender_id, receiver_ids, |connection_id| {
818 self.peer
819 .forward_send(request.sender_id, connection_id, request.payload.clone())
820 });
821 Ok(())
822 }
823
824 async fn follow(
825 self: Arc<Self>,
826 request: TypedEnvelope<proto::Follow>,
827 response: Response<proto::Follow>,
828 ) -> Result<()> {
829 let leader_id = ConnectionId(request.payload.leader_id);
830 let follower_id = request.sender_id;
831 if !self
832 .store()
833 .await
834 .project_connection_ids(request.payload.project_id, follower_id)?
835 .contains(&leader_id)
836 {
837 Err(anyhow!("no such peer"))?;
838 }
839 let mut response_payload = self
840 .peer
841 .forward_request(request.sender_id, leader_id, request.payload)
842 .await?;
843 response_payload
844 .views
845 .retain(|view| view.leader_id != Some(follower_id.0));
846 response.send(response_payload)?;
847 Ok(())
848 }
849
850 async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
851 let leader_id = ConnectionId(request.payload.leader_id);
852 if !self
853 .store()
854 .await
855 .project_connection_ids(request.payload.project_id, request.sender_id)?
856 .contains(&leader_id)
857 {
858 Err(anyhow!("no such peer"))?;
859 }
860 self.peer
861 .forward_send(request.sender_id, leader_id, request.payload)?;
862 Ok(())
863 }
864
865 async fn update_followers(
866 self: Arc<Self>,
867 request: TypedEnvelope<proto::UpdateFollowers>,
868 ) -> Result<()> {
869 let connection_ids = self
870 .store()
871 .await
872 .project_connection_ids(request.payload.project_id, request.sender_id)?;
873 let leader_id = request
874 .payload
875 .variant
876 .as_ref()
877 .and_then(|variant| match variant {
878 proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
879 proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
880 proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
881 });
882 for follower_id in &request.payload.follower_ids {
883 let follower_id = ConnectionId(*follower_id);
884 if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
885 self.peer
886 .forward_send(request.sender_id, follower_id, request.payload.clone())?;
887 }
888 }
889 Ok(())
890 }
891
892 async fn get_channels(
893 self: Arc<Server>,
894 request: TypedEnvelope<proto::GetChannels>,
895 response: Response<proto::GetChannels>,
896 ) -> Result<()> {
897 let user_id = self
898 .store()
899 .await
900 .user_id_for_connection(request.sender_id)?;
901 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
902 response.send(proto::GetChannelsResponse {
903 channels: channels
904 .into_iter()
905 .map(|chan| proto::Channel {
906 id: chan.id.to_proto(),
907 name: chan.name,
908 })
909 .collect(),
910 })?;
911 Ok(())
912 }
913
914 async fn get_users(
915 self: Arc<Server>,
916 request: TypedEnvelope<proto::GetUsers>,
917 response: Response<proto::GetUsers>,
918 ) -> Result<()> {
919 let user_ids = request
920 .payload
921 .user_ids
922 .into_iter()
923 .map(UserId::from_proto)
924 .collect();
925 let users = self
926 .app_state
927 .db
928 .get_users_by_ids(user_ids)
929 .await?
930 .into_iter()
931 .map(|user| proto::User {
932 id: user.id.to_proto(),
933 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
934 github_login: user.github_login,
935 })
936 .collect();
937 response.send(proto::UsersResponse { users })?;
938 Ok(())
939 }
940
941 async fn fuzzy_search_users(
942 self: Arc<Server>,
943 request: TypedEnvelope<proto::FuzzySearchUsers>,
944 response: Response<proto::FuzzySearchUsers>,
945 ) -> Result<()> {
946 let user_id = self
947 .store()
948 .await
949 .user_id_for_connection(request.sender_id)?;
950 let query = request.payload.query;
951 let db = &self.app_state.db;
952 let users = match query.len() {
953 0 => vec![],
954 1 | 2 => db
955 .get_user_by_github_login(&query)
956 .await?
957 .into_iter()
958 .collect(),
959 _ => db.fuzzy_search_users(&query, 10).await?,
960 };
961 let users = users
962 .into_iter()
963 .filter(|user| user.id != user_id)
964 .map(|user| proto::User {
965 id: user.id.to_proto(),
966 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
967 github_login: user.github_login,
968 })
969 .collect();
970 response.send(proto::UsersResponse { users })?;
971 Ok(())
972 }
973
974 async fn request_contact(
975 self: Arc<Server>,
976 request: TypedEnvelope<proto::RequestContact>,
977 response: Response<proto::RequestContact>,
978 ) -> Result<()> {
979 let requester_id = self
980 .store()
981 .await
982 .user_id_for_connection(request.sender_id)?;
983 let responder_id = UserId::from_proto(request.payload.responder_id);
984 if requester_id == responder_id {
985 return Err(anyhow!("cannot add yourself as a contact"))?;
986 }
987
988 self.app_state
989 .db
990 .send_contact_request(requester_id, responder_id)
991 .await?;
992
993 // Update outgoing contact requests of requester
994 let mut update = proto::UpdateContacts::default();
995 update.outgoing_requests.push(responder_id.to_proto());
996 for connection_id in self.store().await.connection_ids_for_user(requester_id) {
997 self.peer.send(connection_id, update.clone())?;
998 }
999
1000 // Update incoming contact requests of responder
1001 let mut update = proto::UpdateContacts::default();
1002 update
1003 .incoming_requests
1004 .push(proto::IncomingContactRequest {
1005 requester_id: requester_id.to_proto(),
1006 should_notify: true,
1007 });
1008 for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1009 self.peer.send(connection_id, update.clone())?;
1010 }
1011
1012 response.send(proto::Ack {})?;
1013 Ok(())
1014 }
1015
1016 async fn respond_to_contact_request(
1017 self: Arc<Server>,
1018 request: TypedEnvelope<proto::RespondToContactRequest>,
1019 response: Response<proto::RespondToContactRequest>,
1020 ) -> Result<()> {
1021 let responder_id = self
1022 .store()
1023 .await
1024 .user_id_for_connection(request.sender_id)?;
1025 let requester_id = UserId::from_proto(request.payload.requester_id);
1026 if request.payload.response == proto::ContactRequestResponse::Dismiss as i32 {
1027 self.app_state
1028 .db
1029 .dismiss_contact_request(responder_id, requester_id)
1030 .await?;
1031 } else {
1032 let accept = request.payload.response == proto::ContactRequestResponse::Accept as i32;
1033 self.app_state
1034 .db
1035 .respond_to_contact_request(responder_id, requester_id, accept)
1036 .await?;
1037
1038 let store = self.store().await;
1039 // Update responder with new contact
1040 let mut update = proto::UpdateContacts::default();
1041 if accept {
1042 update.contacts.push(store.contact_for_user(requester_id));
1043 }
1044 update
1045 .remove_incoming_requests
1046 .push(requester_id.to_proto());
1047 for connection_id in store.connection_ids_for_user(responder_id) {
1048 self.peer.send(connection_id, update.clone())?;
1049 }
1050
1051 // Update requester with new contact
1052 let mut update = proto::UpdateContacts::default();
1053 if accept {
1054 update.contacts.push(store.contact_for_user(responder_id));
1055 }
1056 update
1057 .remove_outgoing_requests
1058 .push(responder_id.to_proto());
1059 for connection_id in store.connection_ids_for_user(requester_id) {
1060 self.peer.send(connection_id, update.clone())?;
1061 }
1062 }
1063
1064 response.send(proto::Ack {})?;
1065 Ok(())
1066 }
1067
1068 async fn remove_contact(
1069 self: Arc<Server>,
1070 request: TypedEnvelope<proto::RemoveContact>,
1071 response: Response<proto::RemoveContact>,
1072 ) -> Result<()> {
1073 let requester_id = self
1074 .store()
1075 .await
1076 .user_id_for_connection(request.sender_id)?;
1077 let responder_id = UserId::from_proto(request.payload.user_id);
1078 self.app_state
1079 .db
1080 .remove_contact(requester_id, responder_id)
1081 .await?;
1082
1083 // Update outgoing contact requests of requester
1084 let mut update = proto::UpdateContacts::default();
1085 update
1086 .remove_outgoing_requests
1087 .push(responder_id.to_proto());
1088 for connection_id in self.store().await.connection_ids_for_user(requester_id) {
1089 self.peer.send(connection_id, update.clone())?;
1090 }
1091
1092 // Update incoming contact requests of responder
1093 let mut update = proto::UpdateContacts::default();
1094 update
1095 .remove_incoming_requests
1096 .push(requester_id.to_proto());
1097 for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1098 self.peer.send(connection_id, update.clone())?;
1099 }
1100
1101 response.send(proto::Ack {})?;
1102 Ok(())
1103 }
1104
1105 // #[instrument(skip(self, state, user_ids))]
1106 // fn update_contacts_for_users<'a>(
1107 // self: &Arc<Self>,
1108 // state: &Store,
1109 // user_ids: impl IntoIterator<Item = &'a UserId>,
1110 // ) {
1111 // for user_id in user_ids {
1112 // let contacts = state.contacts_for_user(*user_id);
1113 // for connection_id in state.connection_ids_for_user(*user_id) {
1114 // self.peer
1115 // .send(
1116 // connection_id,
1117 // proto::UpdateContacts {
1118 // contacts: contacts.clone(),
1119 // pending_requests_from_user_ids: Default::default(),
1120 // pending_requests_to_user_ids: Default::default(),
1121 // },
1122 // )
1123 // .trace_err();
1124 // }
1125 // }
1126 // }
1127
1128 async fn join_channel(
1129 self: Arc<Self>,
1130 request: TypedEnvelope<proto::JoinChannel>,
1131 response: Response<proto::JoinChannel>,
1132 ) -> Result<()> {
1133 let user_id = self
1134 .store()
1135 .await
1136 .user_id_for_connection(request.sender_id)?;
1137 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1138 if !self
1139 .app_state
1140 .db
1141 .can_user_access_channel(user_id, channel_id)
1142 .await?
1143 {
1144 Err(anyhow!("access denied"))?;
1145 }
1146
1147 self.store_mut()
1148 .await
1149 .join_channel(request.sender_id, channel_id);
1150 let messages = self
1151 .app_state
1152 .db
1153 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
1154 .await?
1155 .into_iter()
1156 .map(|msg| proto::ChannelMessage {
1157 id: msg.id.to_proto(),
1158 body: msg.body,
1159 timestamp: msg.sent_at.unix_timestamp() as u64,
1160 sender_id: msg.sender_id.to_proto(),
1161 nonce: Some(msg.nonce.as_u128().into()),
1162 })
1163 .collect::<Vec<_>>();
1164 response.send(proto::JoinChannelResponse {
1165 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1166 messages,
1167 })?;
1168 Ok(())
1169 }
1170
1171 async fn leave_channel(
1172 self: Arc<Self>,
1173 request: TypedEnvelope<proto::LeaveChannel>,
1174 ) -> Result<()> {
1175 let user_id = self
1176 .store()
1177 .await
1178 .user_id_for_connection(request.sender_id)?;
1179 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1180 if !self
1181 .app_state
1182 .db
1183 .can_user_access_channel(user_id, channel_id)
1184 .await?
1185 {
1186 Err(anyhow!("access denied"))?;
1187 }
1188
1189 self.store_mut()
1190 .await
1191 .leave_channel(request.sender_id, channel_id);
1192
1193 Ok(())
1194 }
1195
1196 async fn send_channel_message(
1197 self: Arc<Self>,
1198 request: TypedEnvelope<proto::SendChannelMessage>,
1199 response: Response<proto::SendChannelMessage>,
1200 ) -> Result<()> {
1201 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1202 let user_id;
1203 let connection_ids;
1204 {
1205 let state = self.store().await;
1206 user_id = state.user_id_for_connection(request.sender_id)?;
1207 connection_ids = state.channel_connection_ids(channel_id)?;
1208 }
1209
1210 // Validate the message body.
1211 let body = request.payload.body.trim().to_string();
1212 if body.len() > MAX_MESSAGE_LEN {
1213 return Err(anyhow!("message is too long"))?;
1214 }
1215 if body.is_empty() {
1216 return Err(anyhow!("message can't be blank"))?;
1217 }
1218
1219 let timestamp = OffsetDateTime::now_utc();
1220 let nonce = request
1221 .payload
1222 .nonce
1223 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
1224
1225 let message_id = self
1226 .app_state
1227 .db
1228 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1229 .await?
1230 .to_proto();
1231 let message = proto::ChannelMessage {
1232 sender_id: user_id.to_proto(),
1233 id: message_id,
1234 body,
1235 timestamp: timestamp.unix_timestamp() as u64,
1236 nonce: Some(nonce),
1237 };
1238 broadcast(request.sender_id, connection_ids, |conn_id| {
1239 self.peer.send(
1240 conn_id,
1241 proto::ChannelMessageSent {
1242 channel_id: channel_id.to_proto(),
1243 message: Some(message.clone()),
1244 },
1245 )
1246 });
1247 response.send(proto::SendChannelMessageResponse {
1248 message: Some(message),
1249 })?;
1250 Ok(())
1251 }
1252
1253 async fn get_channel_messages(
1254 self: Arc<Self>,
1255 request: TypedEnvelope<proto::GetChannelMessages>,
1256 response: Response<proto::GetChannelMessages>,
1257 ) -> Result<()> {
1258 let user_id = self
1259 .store()
1260 .await
1261 .user_id_for_connection(request.sender_id)?;
1262 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1263 if !self
1264 .app_state
1265 .db
1266 .can_user_access_channel(user_id, channel_id)
1267 .await?
1268 {
1269 Err(anyhow!("access denied"))?;
1270 }
1271
1272 let messages = self
1273 .app_state
1274 .db
1275 .get_channel_messages(
1276 channel_id,
1277 MESSAGE_COUNT_PER_PAGE,
1278 Some(MessageId::from_proto(request.payload.before_message_id)),
1279 )
1280 .await?
1281 .into_iter()
1282 .map(|msg| proto::ChannelMessage {
1283 id: msg.id.to_proto(),
1284 body: msg.body,
1285 timestamp: msg.sent_at.unix_timestamp() as u64,
1286 sender_id: msg.sender_id.to_proto(),
1287 nonce: Some(msg.nonce.as_u128().into()),
1288 })
1289 .collect::<Vec<_>>();
1290 response.send(proto::GetChannelMessagesResponse {
1291 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1292 messages,
1293 })?;
1294 Ok(())
1295 }
1296
1297 async fn store<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1298 #[cfg(test)]
1299 tokio::task::yield_now().await;
1300 let guard = self.store.read().await;
1301 #[cfg(test)]
1302 tokio::task::yield_now().await;
1303 StoreReadGuard {
1304 guard,
1305 _not_send: PhantomData,
1306 }
1307 }
1308
1309 async fn store_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1310 #[cfg(test)]
1311 tokio::task::yield_now().await;
1312 let guard = self.store.write().await;
1313 #[cfg(test)]
1314 tokio::task::yield_now().await;
1315 StoreWriteGuard {
1316 guard,
1317 _not_send: PhantomData,
1318 }
1319 }
1320}
1321
1322impl<'a> Deref for StoreReadGuard<'a> {
1323 type Target = Store;
1324
1325 fn deref(&self) -> &Self::Target {
1326 &*self.guard
1327 }
1328}
1329
1330impl<'a> Deref for StoreWriteGuard<'a> {
1331 type Target = Store;
1332
1333 fn deref(&self) -> &Self::Target {
1334 &*self.guard
1335 }
1336}
1337
1338impl<'a> DerefMut for StoreWriteGuard<'a> {
1339 fn deref_mut(&mut self) -> &mut Self::Target {
1340 &mut *self.guard
1341 }
1342}
1343
1344impl<'a> Drop for StoreWriteGuard<'a> {
1345 fn drop(&mut self) {
1346 #[cfg(test)]
1347 self.check_invariants();
1348
1349 let metrics = self.metrics();
1350 tracing::info!(
1351 connections = metrics.connections,
1352 registered_projects = metrics.registered_projects,
1353 shared_projects = metrics.shared_projects,
1354 "metrics"
1355 );
1356 }
1357}
1358
1359impl Executor for RealExecutor {
1360 type Sleep = Sleep;
1361
1362 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1363 tokio::task::spawn(future);
1364 }
1365
1366 fn sleep(&self, duration: Duration) -> Self::Sleep {
1367 tokio::time::sleep(duration)
1368 }
1369}
1370
1371fn broadcast<F>(
1372 sender_id: ConnectionId,
1373 receiver_ids: impl IntoIterator<Item = ConnectionId>,
1374 mut f: F,
1375) where
1376 F: FnMut(ConnectionId) -> anyhow::Result<()>,
1377{
1378 for receiver_id in receiver_ids {
1379 if receiver_id != sender_id {
1380 f(receiver_id).trace_err();
1381 }
1382 }
1383}
1384
1385lazy_static! {
1386 static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1387}
1388
1389pub struct ProtocolVersion(u32);
1390
1391impl Header for ProtocolVersion {
1392 fn name() -> &'static HeaderName {
1393 &ZED_PROTOCOL_VERSION
1394 }
1395
1396 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1397 where
1398 Self: Sized,
1399 I: Iterator<Item = &'i axum::http::HeaderValue>,
1400 {
1401 let version = values
1402 .next()
1403 .ok_or_else(|| axum::headers::Error::invalid())?
1404 .to_str()
1405 .map_err(|_| axum::headers::Error::invalid())?
1406 .parse()
1407 .map_err(|_| axum::headers::Error::invalid())?;
1408 Ok(Self(version))
1409 }
1410
1411 fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1412 values.extend([self.0.to_string().parse().unwrap()]);
1413 }
1414}
1415
1416pub fn routes(app_state: Arc<AppState>) -> Router<Body> {
1417 let server = Server::new(app_state.clone(), None);
1418 Router::new()
1419 .route("/rpc", get(handle_websocket_request))
1420 .layer(
1421 ServiceBuilder::new()
1422 .layer(Extension(app_state))
1423 .layer(middleware::from_fn(auth::validate_header))
1424 .layer(Extension(server)),
1425 )
1426}
1427
1428pub async fn handle_websocket_request(
1429 TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1430 ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1431 Extension(server): Extension<Arc<Server>>,
1432 Extension(user_id): Extension<UserId>,
1433 ws: WebSocketUpgrade,
1434) -> axum::response::Response {
1435 if protocol_version != rpc::PROTOCOL_VERSION {
1436 return (
1437 StatusCode::UPGRADE_REQUIRED,
1438 "client must be upgraded".to_string(),
1439 )
1440 .into_response();
1441 }
1442 let socket_address = socket_address.to_string();
1443 ws.on_upgrade(move |socket| {
1444 use util::ResultExt;
1445 let socket = socket
1446 .map_ok(to_tungstenite_message)
1447 .err_into()
1448 .with(|message| async move { Ok(to_axum_message(message)) });
1449 let connection = Connection::new(Box::pin(socket));
1450 async move {
1451 server
1452 .handle_connection(connection, socket_address, user_id, None, RealExecutor)
1453 .await
1454 .log_err();
1455 }
1456 })
1457}
1458
1459fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1460 match message {
1461 TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1462 TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1463 TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1464 TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1465 TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1466 code: frame.code.into(),
1467 reason: frame.reason,
1468 })),
1469 }
1470}
1471
1472fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1473 match message {
1474 AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1475 AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1476 AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1477 AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1478 AxumMessage::Close(frame) => {
1479 TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1480 code: frame.code.into(),
1481 reason: frame.reason,
1482 }))
1483 }
1484 }
1485}
1486
1487pub trait ResultExt {
1488 type Ok;
1489
1490 fn trace_err(self) -> Option<Self::Ok>;
1491}
1492
1493impl<T, E> ResultExt for Result<T, E>
1494where
1495 E: std::fmt::Debug,
1496{
1497 type Ok = T;
1498
1499 fn trace_err(self) -> Option<T> {
1500 match self {
1501 Ok(value) => Some(value),
1502 Err(error) => {
1503 tracing::error!("{:?}", error);
1504 None
1505 }
1506 }
1507 }
1508}
1509
1510#[cfg(test)]
1511mod tests {
1512 use super::*;
1513 use crate::{
1514 db::{tests::TestDb, UserId},
1515 AppState,
1516 };
1517 use ::rpc::Peer;
1518 use client::{
1519 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1520 EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1521 };
1522 use collections::{BTreeMap, HashSet};
1523 use editor::{
1524 self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1525 ToOffset, ToggleCodeActions, Undo,
1526 };
1527 use gpui::{
1528 executor::{self, Deterministic},
1529 geometry::vector::vec2f,
1530 ModelHandle, TestAppContext, ViewHandle,
1531 };
1532 use language::{
1533 range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1534 LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1535 };
1536 use lsp::{self, FakeLanguageServer};
1537 use parking_lot::Mutex;
1538 use project::{
1539 fs::{FakeFs, Fs as _},
1540 search::SearchQuery,
1541 worktree::WorktreeHandle,
1542 DiagnosticSummary, Project, ProjectPath, WorktreeId,
1543 };
1544 use rand::prelude::*;
1545 use rpc::PeerId;
1546 use serde_json::json;
1547 use settings::Settings;
1548 use sqlx::types::time::OffsetDateTime;
1549 use std::{
1550 env,
1551 ops::Deref,
1552 path::{Path, PathBuf},
1553 rc::Rc,
1554 sync::{
1555 atomic::{AtomicBool, Ordering::SeqCst},
1556 Arc,
1557 },
1558 time::Duration,
1559 };
1560 use theme::ThemeRegistry;
1561 use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
1562
1563 #[cfg(test)]
1564 #[ctor::ctor]
1565 fn init_logger() {
1566 if std::env::var("RUST_LOG").is_ok() {
1567 env_logger::init();
1568 }
1569 }
1570
1571 #[gpui::test(iterations = 10)]
1572 async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1573 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1574 let lang_registry = Arc::new(LanguageRegistry::test());
1575 let fs = FakeFs::new(cx_a.background());
1576 cx_a.foreground().forbid_parking();
1577
1578 // Connect to a server as 2 clients.
1579 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1580 let client_a = server.create_client(cx_a, "user_a").await;
1581 let client_b = server.create_client(cx_b, "user_b").await;
1582 server
1583 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1584 .await;
1585
1586 // Share a project as client A
1587 fs.insert_tree(
1588 "/a",
1589 json!({
1590 "a.txt": "a-contents",
1591 "b.txt": "b-contents",
1592 }),
1593 )
1594 .await;
1595 let project_a = cx_a.update(|cx| {
1596 Project::local(
1597 client_a.clone(),
1598 client_a.user_store.clone(),
1599 lang_registry.clone(),
1600 fs.clone(),
1601 cx,
1602 )
1603 });
1604 let (worktree_a, _) = project_a
1605 .update(cx_a, |p, cx| {
1606 p.find_or_create_local_worktree("/a", true, cx)
1607 })
1608 .await
1609 .unwrap();
1610 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1611 worktree_a
1612 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1613 .await;
1614 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1615 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1616
1617 // Join that project as client B
1618 let project_b = Project::remote(
1619 project_id,
1620 client_b.clone(),
1621 client_b.user_store.clone(),
1622 lang_registry.clone(),
1623 fs.clone(),
1624 &mut cx_b.to_async(),
1625 )
1626 .await
1627 .unwrap();
1628
1629 let replica_id_b = project_b.read_with(cx_b, |project, _| {
1630 assert_eq!(
1631 project
1632 .collaborators()
1633 .get(&client_a.peer_id)
1634 .unwrap()
1635 .user
1636 .github_login,
1637 "user_a"
1638 );
1639 project.replica_id()
1640 });
1641 project_a
1642 .condition(&cx_a, |tree, _| {
1643 tree.collaborators()
1644 .get(&client_b.peer_id)
1645 .map_or(false, |collaborator| {
1646 collaborator.replica_id == replica_id_b
1647 && collaborator.user.github_login == "user_b"
1648 })
1649 })
1650 .await;
1651
1652 // Open the same file as client B and client A.
1653 let buffer_b = project_b
1654 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1655 .await
1656 .unwrap();
1657 buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1658 project_a.read_with(cx_a, |project, cx| {
1659 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1660 });
1661 let buffer_a = project_a
1662 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1663 .await
1664 .unwrap();
1665
1666 let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1667
1668 // TODO
1669 // // Create a selection set as client B and see that selection set as client A.
1670 // buffer_a
1671 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1672 // .await;
1673
1674 // Edit the buffer as client B and see that edit as client A.
1675 editor_b.update(cx_b, |editor, cx| {
1676 editor.handle_input(&Input("ok, ".into()), cx)
1677 });
1678 buffer_a
1679 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1680 .await;
1681
1682 // TODO
1683 // // Remove the selection set as client B, see those selections disappear as client A.
1684 cx_b.update(move |_| drop(editor_b));
1685 // buffer_a
1686 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1687 // .await;
1688
1689 // Dropping the client B's project removes client B from client A's collaborators.
1690 cx_b.update(move |_| drop(project_b));
1691 project_a
1692 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1693 .await;
1694 }
1695
1696 #[gpui::test(iterations = 10)]
1697 async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1698 let lang_registry = Arc::new(LanguageRegistry::test());
1699 let fs = FakeFs::new(cx_a.background());
1700 cx_a.foreground().forbid_parking();
1701
1702 // Connect to a server as 2 clients.
1703 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1704 let client_a = server.create_client(cx_a, "user_a").await;
1705 let client_b = server.create_client(cx_b, "user_b").await;
1706 server
1707 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1708 .await;
1709
1710 // Share a project as client A
1711 fs.insert_tree(
1712 "/a",
1713 json!({
1714 "a.txt": "a-contents",
1715 "b.txt": "b-contents",
1716 }),
1717 )
1718 .await;
1719 let project_a = cx_a.update(|cx| {
1720 Project::local(
1721 client_a.clone(),
1722 client_a.user_store.clone(),
1723 lang_registry.clone(),
1724 fs.clone(),
1725 cx,
1726 )
1727 });
1728 let (worktree_a, _) = project_a
1729 .update(cx_a, |p, cx| {
1730 p.find_or_create_local_worktree("/a", true, cx)
1731 })
1732 .await
1733 .unwrap();
1734 worktree_a
1735 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1736 .await;
1737 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1738 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1739 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1740 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1741
1742 // Join that project as client B
1743 let project_b = Project::remote(
1744 project_id,
1745 client_b.clone(),
1746 client_b.user_store.clone(),
1747 lang_registry.clone(),
1748 fs.clone(),
1749 &mut cx_b.to_async(),
1750 )
1751 .await
1752 .unwrap();
1753 project_b
1754 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1755 .await
1756 .unwrap();
1757
1758 // Unshare the project as client A
1759 project_a.update(cx_a, |project, cx| project.unshare(cx));
1760 project_b
1761 .condition(cx_b, |project, _| project.is_read_only())
1762 .await;
1763 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1764 cx_b.update(|_| {
1765 drop(project_b);
1766 });
1767
1768 // Share the project again and ensure guests can still join.
1769 project_a
1770 .update(cx_a, |project, cx| project.share(cx))
1771 .await
1772 .unwrap();
1773 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1774
1775 let project_b2 = Project::remote(
1776 project_id,
1777 client_b.clone(),
1778 client_b.user_store.clone(),
1779 lang_registry.clone(),
1780 fs.clone(),
1781 &mut cx_b.to_async(),
1782 )
1783 .await
1784 .unwrap();
1785 project_b2
1786 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1787 .await
1788 .unwrap();
1789 }
1790
1791 #[gpui::test(iterations = 10)]
1792 async fn test_host_disconnect(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1793 let lang_registry = Arc::new(LanguageRegistry::test());
1794 let fs = FakeFs::new(cx_a.background());
1795 cx_a.foreground().forbid_parking();
1796
1797 // Connect to a server as 2 clients.
1798 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1799 let client_a = server.create_client(cx_a, "user_a").await;
1800 let client_b = server.create_client(cx_b, "user_b").await;
1801 server
1802 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1803 .await;
1804
1805 // Share a project as client A
1806 fs.insert_tree(
1807 "/a",
1808 json!({
1809 "a.txt": "a-contents",
1810 "b.txt": "b-contents",
1811 }),
1812 )
1813 .await;
1814 let project_a = cx_a.update(|cx| {
1815 Project::local(
1816 client_a.clone(),
1817 client_a.user_store.clone(),
1818 lang_registry.clone(),
1819 fs.clone(),
1820 cx,
1821 )
1822 });
1823 let (worktree_a, _) = project_a
1824 .update(cx_a, |p, cx| {
1825 p.find_or_create_local_worktree("/a", true, cx)
1826 })
1827 .await
1828 .unwrap();
1829 worktree_a
1830 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1831 .await;
1832 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1833 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1834 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1835 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1836
1837 // Join that project as client B
1838 let project_b = Project::remote(
1839 project_id,
1840 client_b.clone(),
1841 client_b.user_store.clone(),
1842 lang_registry.clone(),
1843 fs.clone(),
1844 &mut cx_b.to_async(),
1845 )
1846 .await
1847 .unwrap();
1848 project_b
1849 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1850 .await
1851 .unwrap();
1852
1853 // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
1854 server.disconnect_client(client_a.current_user_id(cx_a));
1855 cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
1856 project_a
1857 .condition(cx_a, |project, _| project.collaborators().is_empty())
1858 .await;
1859 project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
1860 project_b
1861 .condition(cx_b, |project, _| project.is_read_only())
1862 .await;
1863 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1864 cx_b.update(|_| {
1865 drop(project_b);
1866 });
1867
1868 // Await reconnection
1869 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1870
1871 // Share the project again and ensure guests can still join.
1872 project_a
1873 .update(cx_a, |project, cx| project.share(cx))
1874 .await
1875 .unwrap();
1876 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1877
1878 let project_b2 = Project::remote(
1879 project_id,
1880 client_b.clone(),
1881 client_b.user_store.clone(),
1882 lang_registry.clone(),
1883 fs.clone(),
1884 &mut cx_b.to_async(),
1885 )
1886 .await
1887 .unwrap();
1888 project_b2
1889 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1890 .await
1891 .unwrap();
1892 }
1893
1894 #[gpui::test(iterations = 10)]
1895 async fn test_propagate_saves_and_fs_changes(
1896 cx_a: &mut TestAppContext,
1897 cx_b: &mut TestAppContext,
1898 cx_c: &mut TestAppContext,
1899 ) {
1900 let lang_registry = Arc::new(LanguageRegistry::test());
1901 let fs = FakeFs::new(cx_a.background());
1902 cx_a.foreground().forbid_parking();
1903
1904 // Connect to a server as 3 clients.
1905 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1906 let client_a = server.create_client(cx_a, "user_a").await;
1907 let client_b = server.create_client(cx_b, "user_b").await;
1908 let client_c = server.create_client(cx_c, "user_c").await;
1909 server
1910 .make_contacts(vec![
1911 (&client_a, cx_a),
1912 (&client_b, cx_b),
1913 (&client_c, cx_c),
1914 ])
1915 .await;
1916
1917 // Share a worktree as client A.
1918 fs.insert_tree(
1919 "/a",
1920 json!({
1921 "file1": "",
1922 "file2": ""
1923 }),
1924 )
1925 .await;
1926 let project_a = cx_a.update(|cx| {
1927 Project::local(
1928 client_a.clone(),
1929 client_a.user_store.clone(),
1930 lang_registry.clone(),
1931 fs.clone(),
1932 cx,
1933 )
1934 });
1935 let (worktree_a, _) = project_a
1936 .update(cx_a, |p, cx| {
1937 p.find_or_create_local_worktree("/a", true, cx)
1938 })
1939 .await
1940 .unwrap();
1941 worktree_a
1942 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1943 .await;
1944 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1945 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1946 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1947
1948 // Join that worktree as clients B and C.
1949 let project_b = Project::remote(
1950 project_id,
1951 client_b.clone(),
1952 client_b.user_store.clone(),
1953 lang_registry.clone(),
1954 fs.clone(),
1955 &mut cx_b.to_async(),
1956 )
1957 .await
1958 .unwrap();
1959 let project_c = Project::remote(
1960 project_id,
1961 client_c.clone(),
1962 client_c.user_store.clone(),
1963 lang_registry.clone(),
1964 fs.clone(),
1965 &mut cx_c.to_async(),
1966 )
1967 .await
1968 .unwrap();
1969 let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1970 let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1971
1972 // Open and edit a buffer as both guests B and C.
1973 let buffer_b = project_b
1974 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1975 .await
1976 .unwrap();
1977 let buffer_c = project_c
1978 .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1979 .await
1980 .unwrap();
1981 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "i-am-b, ")], cx));
1982 buffer_c.update(cx_c, |buf, cx| buf.edit([(0..0, "i-am-c, ")], cx));
1983
1984 // Open and edit that buffer as the host.
1985 let buffer_a = project_a
1986 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1987 .await
1988 .unwrap();
1989
1990 buffer_a
1991 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1992 .await;
1993 buffer_a.update(cx_a, |buf, cx| {
1994 buf.edit([(buf.len()..buf.len(), "i-am-a")], cx)
1995 });
1996
1997 // Wait for edits to propagate
1998 buffer_a
1999 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2000 .await;
2001 buffer_b
2002 .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2003 .await;
2004 buffer_c
2005 .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2006 .await;
2007
2008 // Edit the buffer as the host and concurrently save as guest B.
2009 let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
2010 buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "hi-a, ")], cx));
2011 save_b.await.unwrap();
2012 assert_eq!(
2013 fs.load("/a/file1".as_ref()).await.unwrap(),
2014 "hi-a, i-am-c, i-am-b, i-am-a"
2015 );
2016 buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
2017 buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
2018 buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
2019
2020 worktree_a.flush_fs_events(cx_a).await;
2021
2022 // Make changes on host's file system, see those changes on guest worktrees.
2023 fs.rename(
2024 "/a/file1".as_ref(),
2025 "/a/file1-renamed".as_ref(),
2026 Default::default(),
2027 )
2028 .await
2029 .unwrap();
2030
2031 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
2032 .await
2033 .unwrap();
2034 fs.insert_file(Path::new("/a/file4"), "4".into()).await;
2035
2036 worktree_a
2037 .condition(&cx_a, |tree, _| {
2038 tree.paths()
2039 .map(|p| p.to_string_lossy())
2040 .collect::<Vec<_>>()
2041 == ["file1-renamed", "file3", "file4"]
2042 })
2043 .await;
2044 worktree_b
2045 .condition(&cx_b, |tree, _| {
2046 tree.paths()
2047 .map(|p| p.to_string_lossy())
2048 .collect::<Vec<_>>()
2049 == ["file1-renamed", "file3", "file4"]
2050 })
2051 .await;
2052 worktree_c
2053 .condition(&cx_c, |tree, _| {
2054 tree.paths()
2055 .map(|p| p.to_string_lossy())
2056 .collect::<Vec<_>>()
2057 == ["file1-renamed", "file3", "file4"]
2058 })
2059 .await;
2060
2061 // Ensure buffer files are updated as well.
2062 buffer_a
2063 .condition(&cx_a, |buf, _| {
2064 buf.file().unwrap().path().to_str() == Some("file1-renamed")
2065 })
2066 .await;
2067 buffer_b
2068 .condition(&cx_b, |buf, _| {
2069 buf.file().unwrap().path().to_str() == Some("file1-renamed")
2070 })
2071 .await;
2072 buffer_c
2073 .condition(&cx_c, |buf, _| {
2074 buf.file().unwrap().path().to_str() == Some("file1-renamed")
2075 })
2076 .await;
2077 }
2078
2079 #[gpui::test(iterations = 10)]
2080 async fn test_fs_operations(
2081 executor: Arc<Deterministic>,
2082 cx_a: &mut TestAppContext,
2083 cx_b: &mut TestAppContext,
2084 ) {
2085 executor.forbid_parking();
2086 let fs = FakeFs::new(cx_a.background());
2087
2088 // Connect to a server as 2 clients.
2089 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2090 let mut client_a = server.create_client(cx_a, "user_a").await;
2091 let mut client_b = server.create_client(cx_b, "user_b").await;
2092 server
2093 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2094 .await;
2095
2096 // Share a project as client A
2097 fs.insert_tree(
2098 "/dir",
2099 json!({
2100 "a.txt": "a-contents",
2101 "b.txt": "b-contents",
2102 }),
2103 )
2104 .await;
2105
2106 let (project_a, worktree_id) = client_a.build_local_project(fs, "/dir", cx_a).await;
2107 let project_id = project_a.read_with(cx_a, |project, _| project.remote_id().unwrap());
2108 project_a
2109 .update(cx_a, |project, cx| project.share(cx))
2110 .await
2111 .unwrap();
2112
2113 let project_b = client_b.build_remote_project(project_id, cx_b).await;
2114
2115 let worktree_a =
2116 project_a.read_with(cx_a, |project, cx| project.worktrees(cx).next().unwrap());
2117 let worktree_b =
2118 project_b.read_with(cx_b, |project, cx| project.worktrees(cx).next().unwrap());
2119
2120 let entry = project_b
2121 .update(cx_b, |project, cx| {
2122 project
2123 .create_entry((worktree_id, "c.txt"), false, cx)
2124 .unwrap()
2125 })
2126 .await
2127 .unwrap();
2128 worktree_a.read_with(cx_a, |worktree, _| {
2129 assert_eq!(
2130 worktree
2131 .paths()
2132 .map(|p| p.to_string_lossy())
2133 .collect::<Vec<_>>(),
2134 ["a.txt", "b.txt", "c.txt"]
2135 );
2136 });
2137 worktree_b.read_with(cx_b, |worktree, _| {
2138 assert_eq!(
2139 worktree
2140 .paths()
2141 .map(|p| p.to_string_lossy())
2142 .collect::<Vec<_>>(),
2143 ["a.txt", "b.txt", "c.txt"]
2144 );
2145 });
2146
2147 project_b
2148 .update(cx_b, |project, cx| {
2149 project.rename_entry(entry.id, Path::new("d.txt"), cx)
2150 })
2151 .unwrap()
2152 .await
2153 .unwrap();
2154 worktree_a.read_with(cx_a, |worktree, _| {
2155 assert_eq!(
2156 worktree
2157 .paths()
2158 .map(|p| p.to_string_lossy())
2159 .collect::<Vec<_>>(),
2160 ["a.txt", "b.txt", "d.txt"]
2161 );
2162 });
2163 worktree_b.read_with(cx_b, |worktree, _| {
2164 assert_eq!(
2165 worktree
2166 .paths()
2167 .map(|p| p.to_string_lossy())
2168 .collect::<Vec<_>>(),
2169 ["a.txt", "b.txt", "d.txt"]
2170 );
2171 });
2172
2173 let dir_entry = project_b
2174 .update(cx_b, |project, cx| {
2175 project
2176 .create_entry((worktree_id, "DIR"), true, cx)
2177 .unwrap()
2178 })
2179 .await
2180 .unwrap();
2181 worktree_a.read_with(cx_a, |worktree, _| {
2182 assert_eq!(
2183 worktree
2184 .paths()
2185 .map(|p| p.to_string_lossy())
2186 .collect::<Vec<_>>(),
2187 ["DIR", "a.txt", "b.txt", "d.txt"]
2188 );
2189 });
2190 worktree_b.read_with(cx_b, |worktree, _| {
2191 assert_eq!(
2192 worktree
2193 .paths()
2194 .map(|p| p.to_string_lossy())
2195 .collect::<Vec<_>>(),
2196 ["DIR", "a.txt", "b.txt", "d.txt"]
2197 );
2198 });
2199
2200 project_b
2201 .update(cx_b, |project, cx| {
2202 project.delete_entry(dir_entry.id, cx).unwrap()
2203 })
2204 .await
2205 .unwrap();
2206 worktree_a.read_with(cx_a, |worktree, _| {
2207 assert_eq!(
2208 worktree
2209 .paths()
2210 .map(|p| p.to_string_lossy())
2211 .collect::<Vec<_>>(),
2212 ["a.txt", "b.txt", "d.txt"]
2213 );
2214 });
2215 worktree_b.read_with(cx_b, |worktree, _| {
2216 assert_eq!(
2217 worktree
2218 .paths()
2219 .map(|p| p.to_string_lossy())
2220 .collect::<Vec<_>>(),
2221 ["a.txt", "b.txt", "d.txt"]
2222 );
2223 });
2224
2225 project_b
2226 .update(cx_b, |project, cx| {
2227 project.delete_entry(entry.id, cx).unwrap()
2228 })
2229 .await
2230 .unwrap();
2231 worktree_a.read_with(cx_a, |worktree, _| {
2232 assert_eq!(
2233 worktree
2234 .paths()
2235 .map(|p| p.to_string_lossy())
2236 .collect::<Vec<_>>(),
2237 ["a.txt", "b.txt"]
2238 );
2239 });
2240 worktree_b.read_with(cx_b, |worktree, _| {
2241 assert_eq!(
2242 worktree
2243 .paths()
2244 .map(|p| p.to_string_lossy())
2245 .collect::<Vec<_>>(),
2246 ["a.txt", "b.txt"]
2247 );
2248 });
2249 }
2250
2251 #[gpui::test(iterations = 10)]
2252 async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2253 cx_a.foreground().forbid_parking();
2254 let lang_registry = Arc::new(LanguageRegistry::test());
2255 let fs = FakeFs::new(cx_a.background());
2256
2257 // Connect to a server as 2 clients.
2258 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2259 let client_a = server.create_client(cx_a, "user_a").await;
2260 let client_b = server.create_client(cx_b, "user_b").await;
2261 server
2262 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2263 .await;
2264
2265 // Share a project as client A
2266 fs.insert_tree(
2267 "/dir",
2268 json!({
2269 "a.txt": "a-contents",
2270 }),
2271 )
2272 .await;
2273
2274 let project_a = cx_a.update(|cx| {
2275 Project::local(
2276 client_a.clone(),
2277 client_a.user_store.clone(),
2278 lang_registry.clone(),
2279 fs.clone(),
2280 cx,
2281 )
2282 });
2283 let (worktree_a, _) = project_a
2284 .update(cx_a, |p, cx| {
2285 p.find_or_create_local_worktree("/dir", true, cx)
2286 })
2287 .await
2288 .unwrap();
2289 worktree_a
2290 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2291 .await;
2292 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2293 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2294 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2295
2296 // Join that project as client B
2297 let project_b = Project::remote(
2298 project_id,
2299 client_b.clone(),
2300 client_b.user_store.clone(),
2301 lang_registry.clone(),
2302 fs.clone(),
2303 &mut cx_b.to_async(),
2304 )
2305 .await
2306 .unwrap();
2307
2308 // Open a buffer as client B
2309 let buffer_b = project_b
2310 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2311 .await
2312 .unwrap();
2313
2314 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "world ")], cx));
2315 buffer_b.read_with(cx_b, |buf, _| {
2316 assert!(buf.is_dirty());
2317 assert!(!buf.has_conflict());
2318 });
2319
2320 buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
2321 buffer_b
2322 .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
2323 .await;
2324 buffer_b.read_with(cx_b, |buf, _| {
2325 assert!(!buf.has_conflict());
2326 });
2327
2328 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "hello ")], cx));
2329 buffer_b.read_with(cx_b, |buf, _| {
2330 assert!(buf.is_dirty());
2331 assert!(!buf.has_conflict());
2332 });
2333 }
2334
2335 #[gpui::test(iterations = 10)]
2336 async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2337 cx_a.foreground().forbid_parking();
2338 let lang_registry = Arc::new(LanguageRegistry::test());
2339 let fs = FakeFs::new(cx_a.background());
2340
2341 // Connect to a server as 2 clients.
2342 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2343 let client_a = server.create_client(cx_a, "user_a").await;
2344 let client_b = server.create_client(cx_b, "user_b").await;
2345 server
2346 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2347 .await;
2348
2349 // Share a project as client A
2350 fs.insert_tree(
2351 "/dir",
2352 json!({
2353 "a.txt": "a-contents",
2354 }),
2355 )
2356 .await;
2357
2358 let project_a = cx_a.update(|cx| {
2359 Project::local(
2360 client_a.clone(),
2361 client_a.user_store.clone(),
2362 lang_registry.clone(),
2363 fs.clone(),
2364 cx,
2365 )
2366 });
2367 let (worktree_a, _) = project_a
2368 .update(cx_a, |p, cx| {
2369 p.find_or_create_local_worktree("/dir", true, cx)
2370 })
2371 .await
2372 .unwrap();
2373 worktree_a
2374 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2375 .await;
2376 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2377 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2378 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2379
2380 // Join that project as client B
2381 let project_b = Project::remote(
2382 project_id,
2383 client_b.clone(),
2384 client_b.user_store.clone(),
2385 lang_registry.clone(),
2386 fs.clone(),
2387 &mut cx_b.to_async(),
2388 )
2389 .await
2390 .unwrap();
2391 let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2392
2393 // Open a buffer as client B
2394 let buffer_b = project_b
2395 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2396 .await
2397 .unwrap();
2398 buffer_b.read_with(cx_b, |buf, _| {
2399 assert!(!buf.is_dirty());
2400 assert!(!buf.has_conflict());
2401 });
2402
2403 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
2404 .await
2405 .unwrap();
2406 buffer_b
2407 .condition(&cx_b, |buf, _| {
2408 buf.text() == "new contents" && !buf.is_dirty()
2409 })
2410 .await;
2411 buffer_b.read_with(cx_b, |buf, _| {
2412 assert!(!buf.has_conflict());
2413 });
2414 }
2415
2416 #[gpui::test(iterations = 10)]
2417 async fn test_editing_while_guest_opens_buffer(
2418 cx_a: &mut TestAppContext,
2419 cx_b: &mut TestAppContext,
2420 ) {
2421 cx_a.foreground().forbid_parking();
2422 let lang_registry = Arc::new(LanguageRegistry::test());
2423 let fs = FakeFs::new(cx_a.background());
2424
2425 // Connect to a server as 2 clients.
2426 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2427 let client_a = server.create_client(cx_a, "user_a").await;
2428 let client_b = server.create_client(cx_b, "user_b").await;
2429 server
2430 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2431 .await;
2432
2433 // Share a project as client A
2434 fs.insert_tree(
2435 "/dir",
2436 json!({
2437 "a.txt": "a-contents",
2438 }),
2439 )
2440 .await;
2441 let project_a = cx_a.update(|cx| {
2442 Project::local(
2443 client_a.clone(),
2444 client_a.user_store.clone(),
2445 lang_registry.clone(),
2446 fs.clone(),
2447 cx,
2448 )
2449 });
2450 let (worktree_a, _) = project_a
2451 .update(cx_a, |p, cx| {
2452 p.find_or_create_local_worktree("/dir", true, cx)
2453 })
2454 .await
2455 .unwrap();
2456 worktree_a
2457 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2458 .await;
2459 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2460 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2461 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2462
2463 // Join that project as client B
2464 let project_b = Project::remote(
2465 project_id,
2466 client_b.clone(),
2467 client_b.user_store.clone(),
2468 lang_registry.clone(),
2469 fs.clone(),
2470 &mut cx_b.to_async(),
2471 )
2472 .await
2473 .unwrap();
2474
2475 // Open a buffer as client A
2476 let buffer_a = project_a
2477 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2478 .await
2479 .unwrap();
2480
2481 // Start opening the same buffer as client B
2482 let buffer_b = cx_b
2483 .background()
2484 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2485
2486 // Edit the buffer as client A while client B is still opening it.
2487 cx_b.background().simulate_random_delay().await;
2488 buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "X")], cx));
2489 cx_b.background().simulate_random_delay().await;
2490 buffer_a.update(cx_a, |buf, cx| buf.edit([(1..1, "Y")], cx));
2491
2492 let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2493 let buffer_b = buffer_b.await.unwrap();
2494 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2495 }
2496
2497 #[gpui::test(iterations = 10)]
2498 async fn test_leaving_worktree_while_opening_buffer(
2499 cx_a: &mut TestAppContext,
2500 cx_b: &mut TestAppContext,
2501 ) {
2502 cx_a.foreground().forbid_parking();
2503 let lang_registry = Arc::new(LanguageRegistry::test());
2504 let fs = FakeFs::new(cx_a.background());
2505
2506 // Connect to a server as 2 clients.
2507 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2508 let client_a = server.create_client(cx_a, "user_a").await;
2509 let client_b = server.create_client(cx_b, "user_b").await;
2510 server
2511 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2512 .await;
2513
2514 // Share a project as client A
2515 fs.insert_tree(
2516 "/dir",
2517 json!({
2518 "a.txt": "a-contents",
2519 }),
2520 )
2521 .await;
2522 let project_a = cx_a.update(|cx| {
2523 Project::local(
2524 client_a.clone(),
2525 client_a.user_store.clone(),
2526 lang_registry.clone(),
2527 fs.clone(),
2528 cx,
2529 )
2530 });
2531 let (worktree_a, _) = project_a
2532 .update(cx_a, |p, cx| {
2533 p.find_or_create_local_worktree("/dir", true, cx)
2534 })
2535 .await
2536 .unwrap();
2537 worktree_a
2538 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2539 .await;
2540 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2541 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2542 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2543
2544 // Join that project as client B
2545 let project_b = Project::remote(
2546 project_id,
2547 client_b.clone(),
2548 client_b.user_store.clone(),
2549 lang_registry.clone(),
2550 fs.clone(),
2551 &mut cx_b.to_async(),
2552 )
2553 .await
2554 .unwrap();
2555
2556 // See that a guest has joined as client A.
2557 project_a
2558 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2559 .await;
2560
2561 // Begin opening a buffer as client B, but leave the project before the open completes.
2562 let buffer_b = cx_b
2563 .background()
2564 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2565 cx_b.update(|_| drop(project_b));
2566 drop(buffer_b);
2567
2568 // See that the guest has left.
2569 project_a
2570 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2571 .await;
2572 }
2573
2574 #[gpui::test(iterations = 10)]
2575 async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2576 cx_a.foreground().forbid_parking();
2577 let lang_registry = Arc::new(LanguageRegistry::test());
2578 let fs = FakeFs::new(cx_a.background());
2579
2580 // Connect to a server as 2 clients.
2581 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2582 let client_a = server.create_client(cx_a, "user_a").await;
2583 let client_b = server.create_client(cx_b, "user_b").await;
2584 server
2585 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2586 .await;
2587
2588 // Share a project as client A
2589 fs.insert_tree(
2590 "/a",
2591 json!({
2592 "a.txt": "a-contents",
2593 "b.txt": "b-contents",
2594 }),
2595 )
2596 .await;
2597 let project_a = cx_a.update(|cx| {
2598 Project::local(
2599 client_a.clone(),
2600 client_a.user_store.clone(),
2601 lang_registry.clone(),
2602 fs.clone(),
2603 cx,
2604 )
2605 });
2606 let (worktree_a, _) = project_a
2607 .update(cx_a, |p, cx| {
2608 p.find_or_create_local_worktree("/a", true, cx)
2609 })
2610 .await
2611 .unwrap();
2612 worktree_a
2613 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2614 .await;
2615 let project_id = project_a
2616 .update(cx_a, |project, _| project.next_remote_id())
2617 .await;
2618 project_a
2619 .update(cx_a, |project, cx| project.share(cx))
2620 .await
2621 .unwrap();
2622
2623 // Join that project as client B
2624 let _project_b = Project::remote(
2625 project_id,
2626 client_b.clone(),
2627 client_b.user_store.clone(),
2628 lang_registry.clone(),
2629 fs.clone(),
2630 &mut cx_b.to_async(),
2631 )
2632 .await
2633 .unwrap();
2634
2635 // Client A sees that a guest has joined.
2636 project_a
2637 .condition(cx_a, |p, _| p.collaborators().len() == 1)
2638 .await;
2639
2640 // Drop client B's connection and ensure client A observes client B leaving the project.
2641 client_b.disconnect(&cx_b.to_async()).unwrap();
2642 project_a
2643 .condition(cx_a, |p, _| p.collaborators().len() == 0)
2644 .await;
2645
2646 // Rejoin the project as client B
2647 let _project_b = Project::remote(
2648 project_id,
2649 client_b.clone(),
2650 client_b.user_store.clone(),
2651 lang_registry.clone(),
2652 fs.clone(),
2653 &mut cx_b.to_async(),
2654 )
2655 .await
2656 .unwrap();
2657
2658 // Client A sees that a guest has re-joined.
2659 project_a
2660 .condition(cx_a, |p, _| p.collaborators().len() == 1)
2661 .await;
2662
2663 // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2664 client_b.wait_for_current_user(cx_b).await;
2665 server.disconnect_client(client_b.current_user_id(cx_b));
2666 cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
2667 project_a
2668 .condition(cx_a, |p, _| p.collaborators().len() == 0)
2669 .await;
2670 }
2671
2672 #[gpui::test(iterations = 10)]
2673 async fn test_collaborating_with_diagnostics(
2674 cx_a: &mut TestAppContext,
2675 cx_b: &mut TestAppContext,
2676 ) {
2677 cx_a.foreground().forbid_parking();
2678 let lang_registry = Arc::new(LanguageRegistry::test());
2679 let fs = FakeFs::new(cx_a.background());
2680
2681 // Set up a fake language server.
2682 let mut language = Language::new(
2683 LanguageConfig {
2684 name: "Rust".into(),
2685 path_suffixes: vec!["rs".to_string()],
2686 ..Default::default()
2687 },
2688 Some(tree_sitter_rust::language()),
2689 );
2690 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2691 lang_registry.add(Arc::new(language));
2692
2693 // Connect to a server as 2 clients.
2694 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2695 let client_a = server.create_client(cx_a, "user_a").await;
2696 let client_b = server.create_client(cx_b, "user_b").await;
2697 server
2698 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2699 .await;
2700
2701 // Share a project as client A
2702 fs.insert_tree(
2703 "/a",
2704 json!({
2705 "a.rs": "let one = two",
2706 "other.rs": "",
2707 }),
2708 )
2709 .await;
2710 let project_a = cx_a.update(|cx| {
2711 Project::local(
2712 client_a.clone(),
2713 client_a.user_store.clone(),
2714 lang_registry.clone(),
2715 fs.clone(),
2716 cx,
2717 )
2718 });
2719 let (worktree_a, _) = project_a
2720 .update(cx_a, |p, cx| {
2721 p.find_or_create_local_worktree("/a", true, cx)
2722 })
2723 .await
2724 .unwrap();
2725 worktree_a
2726 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2727 .await;
2728 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2729 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2730 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2731
2732 // Cause the language server to start.
2733 let _ = cx_a
2734 .background()
2735 .spawn(project_a.update(cx_a, |project, cx| {
2736 project.open_buffer(
2737 ProjectPath {
2738 worktree_id,
2739 path: Path::new("other.rs").into(),
2740 },
2741 cx,
2742 )
2743 }))
2744 .await
2745 .unwrap();
2746
2747 // Simulate a language server reporting errors for a file.
2748 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2749 fake_language_server
2750 .receive_notification::<lsp::notification::DidOpenTextDocument>()
2751 .await;
2752 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2753 lsp::PublishDiagnosticsParams {
2754 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2755 version: None,
2756 diagnostics: vec![lsp::Diagnostic {
2757 severity: Some(lsp::DiagnosticSeverity::ERROR),
2758 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2759 message: "message 1".to_string(),
2760 ..Default::default()
2761 }],
2762 },
2763 );
2764
2765 // Wait for server to see the diagnostics update.
2766 server
2767 .condition(|store| {
2768 let worktree = store
2769 .project(project_id)
2770 .unwrap()
2771 .share
2772 .as_ref()
2773 .unwrap()
2774 .worktrees
2775 .get(&worktree_id.to_proto())
2776 .unwrap();
2777
2778 !worktree.diagnostic_summaries.is_empty()
2779 })
2780 .await;
2781
2782 // Join the worktree as client B.
2783 let project_b = Project::remote(
2784 project_id,
2785 client_b.clone(),
2786 client_b.user_store.clone(),
2787 lang_registry.clone(),
2788 fs.clone(),
2789 &mut cx_b.to_async(),
2790 )
2791 .await
2792 .unwrap();
2793
2794 project_b.read_with(cx_b, |project, cx| {
2795 assert_eq!(
2796 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2797 &[(
2798 ProjectPath {
2799 worktree_id,
2800 path: Arc::from(Path::new("a.rs")),
2801 },
2802 DiagnosticSummary {
2803 error_count: 1,
2804 warning_count: 0,
2805 ..Default::default()
2806 },
2807 )]
2808 )
2809 });
2810
2811 // Simulate a language server reporting more errors for a file.
2812 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2813 lsp::PublishDiagnosticsParams {
2814 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2815 version: None,
2816 diagnostics: vec![
2817 lsp::Diagnostic {
2818 severity: Some(lsp::DiagnosticSeverity::ERROR),
2819 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2820 message: "message 1".to_string(),
2821 ..Default::default()
2822 },
2823 lsp::Diagnostic {
2824 severity: Some(lsp::DiagnosticSeverity::WARNING),
2825 range: lsp::Range::new(
2826 lsp::Position::new(0, 10),
2827 lsp::Position::new(0, 13),
2828 ),
2829 message: "message 2".to_string(),
2830 ..Default::default()
2831 },
2832 ],
2833 },
2834 );
2835
2836 // Client b gets the updated summaries
2837 project_b
2838 .condition(&cx_b, |project, cx| {
2839 project.diagnostic_summaries(cx).collect::<Vec<_>>()
2840 == &[(
2841 ProjectPath {
2842 worktree_id,
2843 path: Arc::from(Path::new("a.rs")),
2844 },
2845 DiagnosticSummary {
2846 error_count: 1,
2847 warning_count: 1,
2848 ..Default::default()
2849 },
2850 )]
2851 })
2852 .await;
2853
2854 // Open the file with the errors on client B. They should be present.
2855 let buffer_b = cx_b
2856 .background()
2857 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2858 .await
2859 .unwrap();
2860
2861 buffer_b.read_with(cx_b, |buffer, _| {
2862 assert_eq!(
2863 buffer
2864 .snapshot()
2865 .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2866 .map(|entry| entry)
2867 .collect::<Vec<_>>(),
2868 &[
2869 DiagnosticEntry {
2870 range: Point::new(0, 4)..Point::new(0, 7),
2871 diagnostic: Diagnostic {
2872 group_id: 0,
2873 message: "message 1".to_string(),
2874 severity: lsp::DiagnosticSeverity::ERROR,
2875 is_primary: true,
2876 ..Default::default()
2877 }
2878 },
2879 DiagnosticEntry {
2880 range: Point::new(0, 10)..Point::new(0, 13),
2881 diagnostic: Diagnostic {
2882 group_id: 1,
2883 severity: lsp::DiagnosticSeverity::WARNING,
2884 message: "message 2".to_string(),
2885 is_primary: true,
2886 ..Default::default()
2887 }
2888 }
2889 ]
2890 );
2891 });
2892
2893 // Simulate a language server reporting no errors for a file.
2894 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2895 lsp::PublishDiagnosticsParams {
2896 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2897 version: None,
2898 diagnostics: vec![],
2899 },
2900 );
2901 project_a
2902 .condition(cx_a, |project, cx| {
2903 project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2904 })
2905 .await;
2906 project_b
2907 .condition(cx_b, |project, cx| {
2908 project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2909 })
2910 .await;
2911 }
2912
2913 #[gpui::test(iterations = 10)]
2914 async fn test_collaborating_with_completion(
2915 cx_a: &mut TestAppContext,
2916 cx_b: &mut TestAppContext,
2917 ) {
2918 cx_a.foreground().forbid_parking();
2919 let lang_registry = Arc::new(LanguageRegistry::test());
2920 let fs = FakeFs::new(cx_a.background());
2921
2922 // Set up a fake language server.
2923 let mut language = Language::new(
2924 LanguageConfig {
2925 name: "Rust".into(),
2926 path_suffixes: vec!["rs".to_string()],
2927 ..Default::default()
2928 },
2929 Some(tree_sitter_rust::language()),
2930 );
2931 let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
2932 capabilities: lsp::ServerCapabilities {
2933 completion_provider: Some(lsp::CompletionOptions {
2934 trigger_characters: Some(vec![".".to_string()]),
2935 ..Default::default()
2936 }),
2937 ..Default::default()
2938 },
2939 ..Default::default()
2940 });
2941 lang_registry.add(Arc::new(language));
2942
2943 // Connect to a server as 2 clients.
2944 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2945 let client_a = server.create_client(cx_a, "user_a").await;
2946 let client_b = server.create_client(cx_b, "user_b").await;
2947 server
2948 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2949 .await;
2950
2951 // Share a project as client A
2952 fs.insert_tree(
2953 "/a",
2954 json!({
2955 "main.rs": "fn main() { a }",
2956 "other.rs": "",
2957 }),
2958 )
2959 .await;
2960 let project_a = cx_a.update(|cx| {
2961 Project::local(
2962 client_a.clone(),
2963 client_a.user_store.clone(),
2964 lang_registry.clone(),
2965 fs.clone(),
2966 cx,
2967 )
2968 });
2969 let (worktree_a, _) = project_a
2970 .update(cx_a, |p, cx| {
2971 p.find_or_create_local_worktree("/a", true, cx)
2972 })
2973 .await
2974 .unwrap();
2975 worktree_a
2976 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2977 .await;
2978 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2979 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2980 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2981
2982 // Join the worktree as client B.
2983 let project_b = Project::remote(
2984 project_id,
2985 client_b.clone(),
2986 client_b.user_store.clone(),
2987 lang_registry.clone(),
2988 fs.clone(),
2989 &mut cx_b.to_async(),
2990 )
2991 .await
2992 .unwrap();
2993
2994 // Open a file in an editor as the guest.
2995 let buffer_b = project_b
2996 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2997 .await
2998 .unwrap();
2999 let (window_b, _) = cx_b.add_window(|_| EmptyView);
3000 let editor_b = cx_b.add_view(window_b, |cx| {
3001 Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
3002 });
3003
3004 let fake_language_server = fake_language_servers.next().await.unwrap();
3005 buffer_b
3006 .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
3007 .await;
3008
3009 // Type a completion trigger character as the guest.
3010 editor_b.update(cx_b, |editor, cx| {
3011 editor.select_ranges([13..13], None, cx);
3012 editor.handle_input(&Input(".".into()), cx);
3013 cx.focus(&editor_b);
3014 });
3015
3016 // Receive a completion request as the host's language server.
3017 // Return some completions from the host's language server.
3018 cx_a.foreground().start_waiting();
3019 fake_language_server
3020 .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
3021 assert_eq!(
3022 params.text_document_position.text_document.uri,
3023 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3024 );
3025 assert_eq!(
3026 params.text_document_position.position,
3027 lsp::Position::new(0, 14),
3028 );
3029
3030 Ok(Some(lsp::CompletionResponse::Array(vec![
3031 lsp::CompletionItem {
3032 label: "first_method(…)".into(),
3033 detail: Some("fn(&mut self, B) -> C".into()),
3034 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3035 new_text: "first_method($1)".to_string(),
3036 range: lsp::Range::new(
3037 lsp::Position::new(0, 14),
3038 lsp::Position::new(0, 14),
3039 ),
3040 })),
3041 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3042 ..Default::default()
3043 },
3044 lsp::CompletionItem {
3045 label: "second_method(…)".into(),
3046 detail: Some("fn(&mut self, C) -> D<E>".into()),
3047 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3048 new_text: "second_method()".to_string(),
3049 range: lsp::Range::new(
3050 lsp::Position::new(0, 14),
3051 lsp::Position::new(0, 14),
3052 ),
3053 })),
3054 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3055 ..Default::default()
3056 },
3057 ])))
3058 })
3059 .next()
3060 .await
3061 .unwrap();
3062 cx_a.foreground().finish_waiting();
3063
3064 // Open the buffer on the host.
3065 let buffer_a = project_a
3066 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
3067 .await
3068 .unwrap();
3069 buffer_a
3070 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
3071 .await;
3072
3073 // Confirm a completion on the guest.
3074 editor_b
3075 .condition(&cx_b, |editor, _| editor.context_menu_visible())
3076 .await;
3077 editor_b.update(cx_b, |editor, cx| {
3078 editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
3079 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
3080 });
3081
3082 // Return a resolved completion from the host's language server.
3083 // The resolved completion has an additional text edit.
3084 fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
3085 |params, _| async move {
3086 assert_eq!(params.label, "first_method(…)");
3087 Ok(lsp::CompletionItem {
3088 label: "first_method(…)".into(),
3089 detail: Some("fn(&mut self, B) -> C".into()),
3090 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3091 new_text: "first_method($1)".to_string(),
3092 range: lsp::Range::new(
3093 lsp::Position::new(0, 14),
3094 lsp::Position::new(0, 14),
3095 ),
3096 })),
3097 additional_text_edits: Some(vec![lsp::TextEdit {
3098 new_text: "use d::SomeTrait;\n".to_string(),
3099 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
3100 }]),
3101 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3102 ..Default::default()
3103 })
3104 },
3105 );
3106
3107 // The additional edit is applied.
3108 buffer_a
3109 .condition(&cx_a, |buffer, _| {
3110 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3111 })
3112 .await;
3113 buffer_b
3114 .condition(&cx_b, |buffer, _| {
3115 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3116 })
3117 .await;
3118 }
3119
3120 #[gpui::test(iterations = 10)]
3121 async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3122 cx_a.foreground().forbid_parking();
3123 let lang_registry = Arc::new(LanguageRegistry::test());
3124 let fs = FakeFs::new(cx_a.background());
3125
3126 // Connect to a server as 2 clients.
3127 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3128 let client_a = server.create_client(cx_a, "user_a").await;
3129 let client_b = server.create_client(cx_b, "user_b").await;
3130 server
3131 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3132 .await;
3133
3134 // Share a project as client A
3135 fs.insert_tree(
3136 "/a",
3137 json!({
3138 "a.rs": "let one = 1;",
3139 }),
3140 )
3141 .await;
3142 let project_a = cx_a.update(|cx| {
3143 Project::local(
3144 client_a.clone(),
3145 client_a.user_store.clone(),
3146 lang_registry.clone(),
3147 fs.clone(),
3148 cx,
3149 )
3150 });
3151 let (worktree_a, _) = project_a
3152 .update(cx_a, |p, cx| {
3153 p.find_or_create_local_worktree("/a", true, cx)
3154 })
3155 .await
3156 .unwrap();
3157 worktree_a
3158 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3159 .await;
3160 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3161 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3162 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3163 let buffer_a = project_a
3164 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
3165 .await
3166 .unwrap();
3167
3168 // Join the worktree as client B.
3169 let project_b = Project::remote(
3170 project_id,
3171 client_b.clone(),
3172 client_b.user_store.clone(),
3173 lang_registry.clone(),
3174 fs.clone(),
3175 &mut cx_b.to_async(),
3176 )
3177 .await
3178 .unwrap();
3179
3180 let buffer_b = cx_b
3181 .background()
3182 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3183 .await
3184 .unwrap();
3185 buffer_b.update(cx_b, |buffer, cx| {
3186 buffer.edit([(4..7, "six")], cx);
3187 buffer.edit([(10..11, "6")], cx);
3188 assert_eq!(buffer.text(), "let six = 6;");
3189 assert!(buffer.is_dirty());
3190 assert!(!buffer.has_conflict());
3191 });
3192 buffer_a
3193 .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
3194 .await;
3195
3196 fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
3197 .await
3198 .unwrap();
3199 buffer_a
3200 .condition(cx_a, |buffer, _| buffer.has_conflict())
3201 .await;
3202 buffer_b
3203 .condition(cx_b, |buffer, _| buffer.has_conflict())
3204 .await;
3205
3206 project_b
3207 .update(cx_b, |project, cx| {
3208 project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
3209 })
3210 .await
3211 .unwrap();
3212 buffer_a.read_with(cx_a, |buffer, _| {
3213 assert_eq!(buffer.text(), "let seven = 7;");
3214 assert!(!buffer.is_dirty());
3215 assert!(!buffer.has_conflict());
3216 });
3217 buffer_b.read_with(cx_b, |buffer, _| {
3218 assert_eq!(buffer.text(), "let seven = 7;");
3219 assert!(!buffer.is_dirty());
3220 assert!(!buffer.has_conflict());
3221 });
3222
3223 buffer_a.update(cx_a, |buffer, cx| {
3224 // Undoing on the host is a no-op when the reload was initiated by the guest.
3225 buffer.undo(cx);
3226 assert_eq!(buffer.text(), "let seven = 7;");
3227 assert!(!buffer.is_dirty());
3228 assert!(!buffer.has_conflict());
3229 });
3230 buffer_b.update(cx_b, |buffer, cx| {
3231 // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
3232 buffer.undo(cx);
3233 assert_eq!(buffer.text(), "let six = 6;");
3234 assert!(buffer.is_dirty());
3235 assert!(!buffer.has_conflict());
3236 });
3237 }
3238
3239 #[gpui::test(iterations = 10)]
3240 async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3241 cx_a.foreground().forbid_parking();
3242 let lang_registry = Arc::new(LanguageRegistry::test());
3243 let fs = FakeFs::new(cx_a.background());
3244
3245 // Set up a fake language server.
3246 let mut language = Language::new(
3247 LanguageConfig {
3248 name: "Rust".into(),
3249 path_suffixes: vec!["rs".to_string()],
3250 ..Default::default()
3251 },
3252 Some(tree_sitter_rust::language()),
3253 );
3254 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3255 lang_registry.add(Arc::new(language));
3256
3257 // Connect to a server as 2 clients.
3258 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3259 let client_a = server.create_client(cx_a, "user_a").await;
3260 let client_b = server.create_client(cx_b, "user_b").await;
3261 server
3262 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3263 .await;
3264
3265 // Share a project as client A
3266 fs.insert_tree(
3267 "/a",
3268 json!({
3269 "a.rs": "let one = two",
3270 }),
3271 )
3272 .await;
3273 let project_a = cx_a.update(|cx| {
3274 Project::local(
3275 client_a.clone(),
3276 client_a.user_store.clone(),
3277 lang_registry.clone(),
3278 fs.clone(),
3279 cx,
3280 )
3281 });
3282 let (worktree_a, _) = project_a
3283 .update(cx_a, |p, cx| {
3284 p.find_or_create_local_worktree("/a", true, cx)
3285 })
3286 .await
3287 .unwrap();
3288 worktree_a
3289 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3290 .await;
3291 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3292 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3293 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3294
3295 // Join the worktree as client B.
3296 let project_b = Project::remote(
3297 project_id,
3298 client_b.clone(),
3299 client_b.user_store.clone(),
3300 lang_registry.clone(),
3301 fs.clone(),
3302 &mut cx_b.to_async(),
3303 )
3304 .await
3305 .unwrap();
3306
3307 let buffer_b = cx_b
3308 .background()
3309 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3310 .await
3311 .unwrap();
3312
3313 let fake_language_server = fake_language_servers.next().await.unwrap();
3314 fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
3315 Ok(Some(vec![
3316 lsp::TextEdit {
3317 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
3318 new_text: "h".to_string(),
3319 },
3320 lsp::TextEdit {
3321 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
3322 new_text: "y".to_string(),
3323 },
3324 ]))
3325 });
3326
3327 project_b
3328 .update(cx_b, |project, cx| {
3329 project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
3330 })
3331 .await
3332 .unwrap();
3333 assert_eq!(
3334 buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
3335 "let honey = two"
3336 );
3337 }
3338
3339 #[gpui::test(iterations = 10)]
3340 async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3341 cx_a.foreground().forbid_parking();
3342 let lang_registry = Arc::new(LanguageRegistry::test());
3343 let fs = FakeFs::new(cx_a.background());
3344 fs.insert_tree(
3345 "/root-1",
3346 json!({
3347 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
3348 }),
3349 )
3350 .await;
3351 fs.insert_tree(
3352 "/root-2",
3353 json!({
3354 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
3355 }),
3356 )
3357 .await;
3358
3359 // Set up a fake language server.
3360 let mut language = Language::new(
3361 LanguageConfig {
3362 name: "Rust".into(),
3363 path_suffixes: vec!["rs".to_string()],
3364 ..Default::default()
3365 },
3366 Some(tree_sitter_rust::language()),
3367 );
3368 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3369 lang_registry.add(Arc::new(language));
3370
3371 // Connect to a server as 2 clients.
3372 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3373 let client_a = server.create_client(cx_a, "user_a").await;
3374 let client_b = server.create_client(cx_b, "user_b").await;
3375 server
3376 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3377 .await;
3378
3379 // Share a project as client A
3380 let project_a = cx_a.update(|cx| {
3381 Project::local(
3382 client_a.clone(),
3383 client_a.user_store.clone(),
3384 lang_registry.clone(),
3385 fs.clone(),
3386 cx,
3387 )
3388 });
3389 let (worktree_a, _) = project_a
3390 .update(cx_a, |p, cx| {
3391 p.find_or_create_local_worktree("/root-1", true, cx)
3392 })
3393 .await
3394 .unwrap();
3395 worktree_a
3396 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3397 .await;
3398 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3399 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3400 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3401
3402 // Join the worktree as client B.
3403 let project_b = Project::remote(
3404 project_id,
3405 client_b.clone(),
3406 client_b.user_store.clone(),
3407 lang_registry.clone(),
3408 fs.clone(),
3409 &mut cx_b.to_async(),
3410 )
3411 .await
3412 .unwrap();
3413
3414 // Open the file on client B.
3415 let buffer_b = cx_b
3416 .background()
3417 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3418 .await
3419 .unwrap();
3420
3421 // Request the definition of a symbol as the guest.
3422 let fake_language_server = fake_language_servers.next().await.unwrap();
3423 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3424 |_, _| async move {
3425 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3426 lsp::Location::new(
3427 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3428 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3429 ),
3430 )))
3431 },
3432 );
3433
3434 let definitions_1 = project_b
3435 .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
3436 .await
3437 .unwrap();
3438 cx_b.read(|cx| {
3439 assert_eq!(definitions_1.len(), 1);
3440 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3441 let target_buffer = definitions_1[0].buffer.read(cx);
3442 assert_eq!(
3443 target_buffer.text(),
3444 "const TWO: usize = 2;\nconst THREE: usize = 3;"
3445 );
3446 assert_eq!(
3447 definitions_1[0].range.to_point(target_buffer),
3448 Point::new(0, 6)..Point::new(0, 9)
3449 );
3450 });
3451
3452 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
3453 // the previous call to `definition`.
3454 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3455 |_, _| async move {
3456 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3457 lsp::Location::new(
3458 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3459 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
3460 ),
3461 )))
3462 },
3463 );
3464
3465 let definitions_2 = project_b
3466 .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
3467 .await
3468 .unwrap();
3469 cx_b.read(|cx| {
3470 assert_eq!(definitions_2.len(), 1);
3471 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3472 let target_buffer = definitions_2[0].buffer.read(cx);
3473 assert_eq!(
3474 target_buffer.text(),
3475 "const TWO: usize = 2;\nconst THREE: usize = 3;"
3476 );
3477 assert_eq!(
3478 definitions_2[0].range.to_point(target_buffer),
3479 Point::new(1, 6)..Point::new(1, 11)
3480 );
3481 });
3482 assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
3483 }
3484
3485 #[gpui::test(iterations = 10)]
3486 async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3487 cx_a.foreground().forbid_parking();
3488 let lang_registry = Arc::new(LanguageRegistry::test());
3489 let fs = FakeFs::new(cx_a.background());
3490 fs.insert_tree(
3491 "/root-1",
3492 json!({
3493 "one.rs": "const ONE: usize = 1;",
3494 "two.rs": "const TWO: usize = one::ONE + one::ONE;",
3495 }),
3496 )
3497 .await;
3498 fs.insert_tree(
3499 "/root-2",
3500 json!({
3501 "three.rs": "const THREE: usize = two::TWO + one::ONE;",
3502 }),
3503 )
3504 .await;
3505
3506 // Set up a fake language server.
3507 let mut language = Language::new(
3508 LanguageConfig {
3509 name: "Rust".into(),
3510 path_suffixes: vec!["rs".to_string()],
3511 ..Default::default()
3512 },
3513 Some(tree_sitter_rust::language()),
3514 );
3515 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3516 lang_registry.add(Arc::new(language));
3517
3518 // Connect to a server as 2 clients.
3519 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3520 let client_a = server.create_client(cx_a, "user_a").await;
3521 let client_b = server.create_client(cx_b, "user_b").await;
3522 server
3523 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3524 .await;
3525
3526 // Share a project as client A
3527 let project_a = cx_a.update(|cx| {
3528 Project::local(
3529 client_a.clone(),
3530 client_a.user_store.clone(),
3531 lang_registry.clone(),
3532 fs.clone(),
3533 cx,
3534 )
3535 });
3536 let (worktree_a, _) = project_a
3537 .update(cx_a, |p, cx| {
3538 p.find_or_create_local_worktree("/root-1", true, cx)
3539 })
3540 .await
3541 .unwrap();
3542 worktree_a
3543 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3544 .await;
3545 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3546 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3547 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3548
3549 // Join the worktree as client B.
3550 let project_b = Project::remote(
3551 project_id,
3552 client_b.clone(),
3553 client_b.user_store.clone(),
3554 lang_registry.clone(),
3555 fs.clone(),
3556 &mut cx_b.to_async(),
3557 )
3558 .await
3559 .unwrap();
3560
3561 // Open the file on client B.
3562 let buffer_b = cx_b
3563 .background()
3564 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3565 .await
3566 .unwrap();
3567
3568 // Request references to a symbol as the guest.
3569 let fake_language_server = fake_language_servers.next().await.unwrap();
3570 fake_language_server.handle_request::<lsp::request::References, _, _>(
3571 |params, _| async move {
3572 assert_eq!(
3573 params.text_document_position.text_document.uri.as_str(),
3574 "file:///root-1/one.rs"
3575 );
3576 Ok(Some(vec![
3577 lsp::Location {
3578 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3579 range: lsp::Range::new(
3580 lsp::Position::new(0, 24),
3581 lsp::Position::new(0, 27),
3582 ),
3583 },
3584 lsp::Location {
3585 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3586 range: lsp::Range::new(
3587 lsp::Position::new(0, 35),
3588 lsp::Position::new(0, 38),
3589 ),
3590 },
3591 lsp::Location {
3592 uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3593 range: lsp::Range::new(
3594 lsp::Position::new(0, 37),
3595 lsp::Position::new(0, 40),
3596 ),
3597 },
3598 ]))
3599 },
3600 );
3601
3602 let references = project_b
3603 .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3604 .await
3605 .unwrap();
3606 cx_b.read(|cx| {
3607 assert_eq!(references.len(), 3);
3608 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3609
3610 let two_buffer = references[0].buffer.read(cx);
3611 let three_buffer = references[2].buffer.read(cx);
3612 assert_eq!(
3613 two_buffer.file().unwrap().path().as_ref(),
3614 Path::new("two.rs")
3615 );
3616 assert_eq!(references[1].buffer, references[0].buffer);
3617 assert_eq!(
3618 three_buffer.file().unwrap().full_path(cx),
3619 Path::new("three.rs")
3620 );
3621
3622 assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3623 assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3624 assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3625 });
3626 }
3627
3628 #[gpui::test(iterations = 10)]
3629 async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3630 cx_a.foreground().forbid_parking();
3631 let lang_registry = Arc::new(LanguageRegistry::test());
3632 let fs = FakeFs::new(cx_a.background());
3633 fs.insert_tree(
3634 "/root-1",
3635 json!({
3636 "a": "hello world",
3637 "b": "goodnight moon",
3638 "c": "a world of goo",
3639 "d": "world champion of clown world",
3640 }),
3641 )
3642 .await;
3643 fs.insert_tree(
3644 "/root-2",
3645 json!({
3646 "e": "disney world is fun",
3647 }),
3648 )
3649 .await;
3650
3651 // Connect to a server as 2 clients.
3652 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3653 let client_a = server.create_client(cx_a, "user_a").await;
3654 let client_b = server.create_client(cx_b, "user_b").await;
3655 server
3656 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3657 .await;
3658
3659 // Share a project as client A
3660 let project_a = cx_a.update(|cx| {
3661 Project::local(
3662 client_a.clone(),
3663 client_a.user_store.clone(),
3664 lang_registry.clone(),
3665 fs.clone(),
3666 cx,
3667 )
3668 });
3669 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3670
3671 let (worktree_1, _) = project_a
3672 .update(cx_a, |p, cx| {
3673 p.find_or_create_local_worktree("/root-1", true, cx)
3674 })
3675 .await
3676 .unwrap();
3677 worktree_1
3678 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3679 .await;
3680 let (worktree_2, _) = project_a
3681 .update(cx_a, |p, cx| {
3682 p.find_or_create_local_worktree("/root-2", true, cx)
3683 })
3684 .await
3685 .unwrap();
3686 worktree_2
3687 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3688 .await;
3689
3690 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3691
3692 // Join the worktree as client B.
3693 let project_b = Project::remote(
3694 project_id,
3695 client_b.clone(),
3696 client_b.user_store.clone(),
3697 lang_registry.clone(),
3698 fs.clone(),
3699 &mut cx_b.to_async(),
3700 )
3701 .await
3702 .unwrap();
3703
3704 let results = project_b
3705 .update(cx_b, |project, cx| {
3706 project.search(SearchQuery::text("world", false, false), cx)
3707 })
3708 .await
3709 .unwrap();
3710
3711 let mut ranges_by_path = results
3712 .into_iter()
3713 .map(|(buffer, ranges)| {
3714 buffer.read_with(cx_b, |buffer, cx| {
3715 let path = buffer.file().unwrap().full_path(cx);
3716 let offset_ranges = ranges
3717 .into_iter()
3718 .map(|range| range.to_offset(buffer))
3719 .collect::<Vec<_>>();
3720 (path, offset_ranges)
3721 })
3722 })
3723 .collect::<Vec<_>>();
3724 ranges_by_path.sort_by_key(|(path, _)| path.clone());
3725
3726 assert_eq!(
3727 ranges_by_path,
3728 &[
3729 (PathBuf::from("root-1/a"), vec![6..11]),
3730 (PathBuf::from("root-1/c"), vec![2..7]),
3731 (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3732 (PathBuf::from("root-2/e"), vec![7..12]),
3733 ]
3734 );
3735 }
3736
3737 #[gpui::test(iterations = 10)]
3738 async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3739 cx_a.foreground().forbid_parking();
3740 let lang_registry = Arc::new(LanguageRegistry::test());
3741 let fs = FakeFs::new(cx_a.background());
3742 fs.insert_tree(
3743 "/root-1",
3744 json!({
3745 "main.rs": "fn double(number: i32) -> i32 { number + number }",
3746 }),
3747 )
3748 .await;
3749
3750 // Set up a fake language server.
3751 let mut language = Language::new(
3752 LanguageConfig {
3753 name: "Rust".into(),
3754 path_suffixes: vec!["rs".to_string()],
3755 ..Default::default()
3756 },
3757 Some(tree_sitter_rust::language()),
3758 );
3759 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3760 lang_registry.add(Arc::new(language));
3761
3762 // Connect to a server as 2 clients.
3763 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3764 let client_a = server.create_client(cx_a, "user_a").await;
3765 let client_b = server.create_client(cx_b, "user_b").await;
3766 server
3767 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3768 .await;
3769
3770 // Share a project as client A
3771 let project_a = cx_a.update(|cx| {
3772 Project::local(
3773 client_a.clone(),
3774 client_a.user_store.clone(),
3775 lang_registry.clone(),
3776 fs.clone(),
3777 cx,
3778 )
3779 });
3780 let (worktree_a, _) = project_a
3781 .update(cx_a, |p, cx| {
3782 p.find_or_create_local_worktree("/root-1", true, cx)
3783 })
3784 .await
3785 .unwrap();
3786 worktree_a
3787 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3788 .await;
3789 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3790 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3791 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3792
3793 // Join the worktree as client B.
3794 let project_b = Project::remote(
3795 project_id,
3796 client_b.clone(),
3797 client_b.user_store.clone(),
3798 lang_registry.clone(),
3799 fs.clone(),
3800 &mut cx_b.to_async(),
3801 )
3802 .await
3803 .unwrap();
3804
3805 // Open the file on client B.
3806 let buffer_b = cx_b
3807 .background()
3808 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
3809 .await
3810 .unwrap();
3811
3812 // Request document highlights as the guest.
3813 let fake_language_server = fake_language_servers.next().await.unwrap();
3814 fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
3815 |params, _| async move {
3816 assert_eq!(
3817 params
3818 .text_document_position_params
3819 .text_document
3820 .uri
3821 .as_str(),
3822 "file:///root-1/main.rs"
3823 );
3824 assert_eq!(
3825 params.text_document_position_params.position,
3826 lsp::Position::new(0, 34)
3827 );
3828 Ok(Some(vec![
3829 lsp::DocumentHighlight {
3830 kind: Some(lsp::DocumentHighlightKind::WRITE),
3831 range: lsp::Range::new(
3832 lsp::Position::new(0, 10),
3833 lsp::Position::new(0, 16),
3834 ),
3835 },
3836 lsp::DocumentHighlight {
3837 kind: Some(lsp::DocumentHighlightKind::READ),
3838 range: lsp::Range::new(
3839 lsp::Position::new(0, 32),
3840 lsp::Position::new(0, 38),
3841 ),
3842 },
3843 lsp::DocumentHighlight {
3844 kind: Some(lsp::DocumentHighlightKind::READ),
3845 range: lsp::Range::new(
3846 lsp::Position::new(0, 41),
3847 lsp::Position::new(0, 47),
3848 ),
3849 },
3850 ]))
3851 },
3852 );
3853
3854 let highlights = project_b
3855 .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
3856 .await
3857 .unwrap();
3858 buffer_b.read_with(cx_b, |buffer, _| {
3859 let snapshot = buffer.snapshot();
3860
3861 let highlights = highlights
3862 .into_iter()
3863 .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3864 .collect::<Vec<_>>();
3865 assert_eq!(
3866 highlights,
3867 &[
3868 (lsp::DocumentHighlightKind::WRITE, 10..16),
3869 (lsp::DocumentHighlightKind::READ, 32..38),
3870 (lsp::DocumentHighlightKind::READ, 41..47)
3871 ]
3872 )
3873 });
3874 }
3875
3876 #[gpui::test(iterations = 10)]
3877 async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3878 cx_a.foreground().forbid_parking();
3879 let lang_registry = Arc::new(LanguageRegistry::test());
3880 let fs = FakeFs::new(cx_a.background());
3881 fs.insert_tree(
3882 "/code",
3883 json!({
3884 "crate-1": {
3885 "one.rs": "const ONE: usize = 1;",
3886 },
3887 "crate-2": {
3888 "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3889 },
3890 "private": {
3891 "passwords.txt": "the-password",
3892 }
3893 }),
3894 )
3895 .await;
3896
3897 // Set up a fake language server.
3898 let mut language = Language::new(
3899 LanguageConfig {
3900 name: "Rust".into(),
3901 path_suffixes: vec!["rs".to_string()],
3902 ..Default::default()
3903 },
3904 Some(tree_sitter_rust::language()),
3905 );
3906 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3907 lang_registry.add(Arc::new(language));
3908
3909 // Connect to a server as 2 clients.
3910 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3911 let client_a = server.create_client(cx_a, "user_a").await;
3912 let client_b = server.create_client(cx_b, "user_b").await;
3913 server
3914 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3915 .await;
3916
3917 // Share a project as client A
3918 let project_a = cx_a.update(|cx| {
3919 Project::local(
3920 client_a.clone(),
3921 client_a.user_store.clone(),
3922 lang_registry.clone(),
3923 fs.clone(),
3924 cx,
3925 )
3926 });
3927 let (worktree_a, _) = project_a
3928 .update(cx_a, |p, cx| {
3929 p.find_or_create_local_worktree("/code/crate-1", true, cx)
3930 })
3931 .await
3932 .unwrap();
3933 worktree_a
3934 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3935 .await;
3936 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3937 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3938 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3939
3940 // Join the worktree as client B.
3941 let project_b = Project::remote(
3942 project_id,
3943 client_b.clone(),
3944 client_b.user_store.clone(),
3945 lang_registry.clone(),
3946 fs.clone(),
3947 &mut cx_b.to_async(),
3948 )
3949 .await
3950 .unwrap();
3951
3952 // Cause the language server to start.
3953 let _buffer = cx_b
3954 .background()
3955 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3956 .await
3957 .unwrap();
3958
3959 let fake_language_server = fake_language_servers.next().await.unwrap();
3960 fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
3961 |_, _| async move {
3962 #[allow(deprecated)]
3963 Ok(Some(vec![lsp::SymbolInformation {
3964 name: "TWO".into(),
3965 location: lsp::Location {
3966 uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3967 range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3968 },
3969 kind: lsp::SymbolKind::CONSTANT,
3970 tags: None,
3971 container_name: None,
3972 deprecated: None,
3973 }]))
3974 },
3975 );
3976
3977 // Request the definition of a symbol as the guest.
3978 let symbols = project_b
3979 .update(cx_b, |p, cx| p.symbols("two", cx))
3980 .await
3981 .unwrap();
3982 assert_eq!(symbols.len(), 1);
3983 assert_eq!(symbols[0].name, "TWO");
3984
3985 // Open one of the returned symbols.
3986 let buffer_b_2 = project_b
3987 .update(cx_b, |project, cx| {
3988 project.open_buffer_for_symbol(&symbols[0], cx)
3989 })
3990 .await
3991 .unwrap();
3992 buffer_b_2.read_with(cx_b, |buffer, _| {
3993 assert_eq!(
3994 buffer.file().unwrap().path().as_ref(),
3995 Path::new("../crate-2/two.rs")
3996 );
3997 });
3998
3999 // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
4000 let mut fake_symbol = symbols[0].clone();
4001 fake_symbol.path = Path::new("/code/secrets").into();
4002 let error = project_b
4003 .update(cx_b, |project, cx| {
4004 project.open_buffer_for_symbol(&fake_symbol, cx)
4005 })
4006 .await
4007 .unwrap_err();
4008 assert!(error.to_string().contains("invalid symbol signature"));
4009 }
4010
4011 #[gpui::test(iterations = 10)]
4012 async fn test_open_buffer_while_getting_definition_pointing_to_it(
4013 cx_a: &mut TestAppContext,
4014 cx_b: &mut TestAppContext,
4015 mut rng: StdRng,
4016 ) {
4017 cx_a.foreground().forbid_parking();
4018 let lang_registry = Arc::new(LanguageRegistry::test());
4019 let fs = FakeFs::new(cx_a.background());
4020 fs.insert_tree(
4021 "/root",
4022 json!({
4023 "a.rs": "const ONE: usize = b::TWO;",
4024 "b.rs": "const TWO: usize = 2",
4025 }),
4026 )
4027 .await;
4028
4029 // Set up a fake language server.
4030 let mut language = Language::new(
4031 LanguageConfig {
4032 name: "Rust".into(),
4033 path_suffixes: vec!["rs".to_string()],
4034 ..Default::default()
4035 },
4036 Some(tree_sitter_rust::language()),
4037 );
4038 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4039 lang_registry.add(Arc::new(language));
4040
4041 // Connect to a server as 2 clients.
4042 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4043 let client_a = server.create_client(cx_a, "user_a").await;
4044 let client_b = server.create_client(cx_b, "user_b").await;
4045 server
4046 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4047 .await;
4048
4049 // Share a project as client A
4050 let project_a = cx_a.update(|cx| {
4051 Project::local(
4052 client_a.clone(),
4053 client_a.user_store.clone(),
4054 lang_registry.clone(),
4055 fs.clone(),
4056 cx,
4057 )
4058 });
4059
4060 let (worktree_a, _) = project_a
4061 .update(cx_a, |p, cx| {
4062 p.find_or_create_local_worktree("/root", true, cx)
4063 })
4064 .await
4065 .unwrap();
4066 worktree_a
4067 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4068 .await;
4069 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4070 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4071 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4072
4073 // Join the worktree as client B.
4074 let project_b = Project::remote(
4075 project_id,
4076 client_b.clone(),
4077 client_b.user_store.clone(),
4078 lang_registry.clone(),
4079 fs.clone(),
4080 &mut cx_b.to_async(),
4081 )
4082 .await
4083 .unwrap();
4084
4085 let buffer_b1 = cx_b
4086 .background()
4087 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
4088 .await
4089 .unwrap();
4090
4091 let fake_language_server = fake_language_servers.next().await.unwrap();
4092 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
4093 |_, _| async move {
4094 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
4095 lsp::Location::new(
4096 lsp::Url::from_file_path("/root/b.rs").unwrap(),
4097 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
4098 ),
4099 )))
4100 },
4101 );
4102
4103 let definitions;
4104 let buffer_b2;
4105 if rng.gen() {
4106 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4107 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4108 } else {
4109 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4110 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4111 }
4112
4113 let buffer_b2 = buffer_b2.await.unwrap();
4114 let definitions = definitions.await.unwrap();
4115 assert_eq!(definitions.len(), 1);
4116 assert_eq!(definitions[0].buffer, buffer_b2);
4117 }
4118
4119 #[gpui::test(iterations = 10)]
4120 async fn test_collaborating_with_code_actions(
4121 cx_a: &mut TestAppContext,
4122 cx_b: &mut TestAppContext,
4123 ) {
4124 cx_a.foreground().forbid_parking();
4125 let lang_registry = Arc::new(LanguageRegistry::test());
4126 let fs = FakeFs::new(cx_a.background());
4127 cx_b.update(|cx| editor::init(cx));
4128
4129 // Set up a fake language server.
4130 let mut language = Language::new(
4131 LanguageConfig {
4132 name: "Rust".into(),
4133 path_suffixes: vec!["rs".to_string()],
4134 ..Default::default()
4135 },
4136 Some(tree_sitter_rust::language()),
4137 );
4138 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4139 lang_registry.add(Arc::new(language));
4140
4141 // Connect to a server as 2 clients.
4142 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4143 let client_a = server.create_client(cx_a, "user_a").await;
4144 let client_b = server.create_client(cx_b, "user_b").await;
4145 server
4146 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4147 .await;
4148
4149 // Share a project as client A
4150 fs.insert_tree(
4151 "/a",
4152 json!({
4153 "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
4154 "other.rs": "pub fn foo() -> usize { 4 }",
4155 }),
4156 )
4157 .await;
4158 let project_a = cx_a.update(|cx| {
4159 Project::local(
4160 client_a.clone(),
4161 client_a.user_store.clone(),
4162 lang_registry.clone(),
4163 fs.clone(),
4164 cx,
4165 )
4166 });
4167 let (worktree_a, _) = project_a
4168 .update(cx_a, |p, cx| {
4169 p.find_or_create_local_worktree("/a", true, cx)
4170 })
4171 .await
4172 .unwrap();
4173 worktree_a
4174 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4175 .await;
4176 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4177 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4178 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4179
4180 // Join the worktree as client B.
4181 let project_b = Project::remote(
4182 project_id,
4183 client_b.clone(),
4184 client_b.user_store.clone(),
4185 lang_registry.clone(),
4186 fs.clone(),
4187 &mut cx_b.to_async(),
4188 )
4189 .await
4190 .unwrap();
4191 let mut params = cx_b.update(WorkspaceParams::test);
4192 params.languages = lang_registry.clone();
4193 params.client = client_b.client.clone();
4194 params.user_store = client_b.user_store.clone();
4195 params.project = project_b;
4196
4197 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
4198 let editor_b = workspace_b
4199 .update(cx_b, |workspace, cx| {
4200 workspace.open_path((worktree_id, "main.rs"), true, cx)
4201 })
4202 .await
4203 .unwrap()
4204 .downcast::<Editor>()
4205 .unwrap();
4206
4207 let mut fake_language_server = fake_language_servers.next().await.unwrap();
4208 fake_language_server
4209 .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4210 assert_eq!(
4211 params.text_document.uri,
4212 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4213 );
4214 assert_eq!(params.range.start, lsp::Position::new(0, 0));
4215 assert_eq!(params.range.end, lsp::Position::new(0, 0));
4216 Ok(None)
4217 })
4218 .next()
4219 .await;
4220
4221 // Move cursor to a location that contains code actions.
4222 editor_b.update(cx_b, |editor, cx| {
4223 editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
4224 cx.focus(&editor_b);
4225 });
4226
4227 fake_language_server
4228 .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4229 assert_eq!(
4230 params.text_document.uri,
4231 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4232 );
4233 assert_eq!(params.range.start, lsp::Position::new(1, 31));
4234 assert_eq!(params.range.end, lsp::Position::new(1, 31));
4235
4236 Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
4237 lsp::CodeAction {
4238 title: "Inline into all callers".to_string(),
4239 edit: Some(lsp::WorkspaceEdit {
4240 changes: Some(
4241 [
4242 (
4243 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4244 vec![lsp::TextEdit::new(
4245 lsp::Range::new(
4246 lsp::Position::new(1, 22),
4247 lsp::Position::new(1, 34),
4248 ),
4249 "4".to_string(),
4250 )],
4251 ),
4252 (
4253 lsp::Url::from_file_path("/a/other.rs").unwrap(),
4254 vec![lsp::TextEdit::new(
4255 lsp::Range::new(
4256 lsp::Position::new(0, 0),
4257 lsp::Position::new(0, 27),
4258 ),
4259 "".to_string(),
4260 )],
4261 ),
4262 ]
4263 .into_iter()
4264 .collect(),
4265 ),
4266 ..Default::default()
4267 }),
4268 data: Some(json!({
4269 "codeActionParams": {
4270 "range": {
4271 "start": {"line": 1, "column": 31},
4272 "end": {"line": 1, "column": 31},
4273 }
4274 }
4275 })),
4276 ..Default::default()
4277 },
4278 )]))
4279 })
4280 .next()
4281 .await;
4282
4283 // Toggle code actions and wait for them to display.
4284 editor_b.update(cx_b, |editor, cx| {
4285 editor.toggle_code_actions(
4286 &ToggleCodeActions {
4287 deployed_from_indicator: false,
4288 },
4289 cx,
4290 );
4291 });
4292 editor_b
4293 .condition(&cx_b, |editor, _| editor.context_menu_visible())
4294 .await;
4295
4296 fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
4297
4298 // Confirming the code action will trigger a resolve request.
4299 let confirm_action = workspace_b
4300 .update(cx_b, |workspace, cx| {
4301 Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
4302 })
4303 .unwrap();
4304 fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
4305 |_, _| async move {
4306 Ok(lsp::CodeAction {
4307 title: "Inline into all callers".to_string(),
4308 edit: Some(lsp::WorkspaceEdit {
4309 changes: Some(
4310 [
4311 (
4312 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4313 vec![lsp::TextEdit::new(
4314 lsp::Range::new(
4315 lsp::Position::new(1, 22),
4316 lsp::Position::new(1, 34),
4317 ),
4318 "4".to_string(),
4319 )],
4320 ),
4321 (
4322 lsp::Url::from_file_path("/a/other.rs").unwrap(),
4323 vec![lsp::TextEdit::new(
4324 lsp::Range::new(
4325 lsp::Position::new(0, 0),
4326 lsp::Position::new(0, 27),
4327 ),
4328 "".to_string(),
4329 )],
4330 ),
4331 ]
4332 .into_iter()
4333 .collect(),
4334 ),
4335 ..Default::default()
4336 }),
4337 ..Default::default()
4338 })
4339 },
4340 );
4341
4342 // After the action is confirmed, an editor containing both modified files is opened.
4343 confirm_action.await.unwrap();
4344 let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4345 workspace
4346 .active_item(cx)
4347 .unwrap()
4348 .downcast::<Editor>()
4349 .unwrap()
4350 });
4351 code_action_editor.update(cx_b, |editor, cx| {
4352 assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4353 editor.undo(&Undo, cx);
4354 assert_eq!(
4355 editor.text(cx),
4356 "mod other;\nfn main() { let foo = other::foo(); }\npub fn foo() -> usize { 4 }"
4357 );
4358 editor.redo(&Redo, cx);
4359 assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4360 });
4361 }
4362
4363 #[gpui::test(iterations = 10)]
4364 async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4365 cx_a.foreground().forbid_parking();
4366 let lang_registry = Arc::new(LanguageRegistry::test());
4367 let fs = FakeFs::new(cx_a.background());
4368 cx_b.update(|cx| editor::init(cx));
4369
4370 // Set up a fake language server.
4371 let mut language = Language::new(
4372 LanguageConfig {
4373 name: "Rust".into(),
4374 path_suffixes: vec!["rs".to_string()],
4375 ..Default::default()
4376 },
4377 Some(tree_sitter_rust::language()),
4378 );
4379 let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
4380 capabilities: lsp::ServerCapabilities {
4381 rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
4382 prepare_provider: Some(true),
4383 work_done_progress_options: Default::default(),
4384 })),
4385 ..Default::default()
4386 },
4387 ..Default::default()
4388 });
4389 lang_registry.add(Arc::new(language));
4390
4391 // Connect to a server as 2 clients.
4392 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4393 let client_a = server.create_client(cx_a, "user_a").await;
4394 let client_b = server.create_client(cx_b, "user_b").await;
4395 server
4396 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4397 .await;
4398
4399 // Share a project as client A
4400 fs.insert_tree(
4401 "/dir",
4402 json!({
4403 "one.rs": "const ONE: usize = 1;",
4404 "two.rs": "const TWO: usize = one::ONE + one::ONE;"
4405 }),
4406 )
4407 .await;
4408 let project_a = cx_a.update(|cx| {
4409 Project::local(
4410 client_a.clone(),
4411 client_a.user_store.clone(),
4412 lang_registry.clone(),
4413 fs.clone(),
4414 cx,
4415 )
4416 });
4417 let (worktree_a, _) = project_a
4418 .update(cx_a, |p, cx| {
4419 p.find_or_create_local_worktree("/dir", true, cx)
4420 })
4421 .await
4422 .unwrap();
4423 worktree_a
4424 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4425 .await;
4426 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4427 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4428 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4429
4430 // Join the worktree as client B.
4431 let project_b = Project::remote(
4432 project_id,
4433 client_b.clone(),
4434 client_b.user_store.clone(),
4435 lang_registry.clone(),
4436 fs.clone(),
4437 &mut cx_b.to_async(),
4438 )
4439 .await
4440 .unwrap();
4441 let mut params = cx_b.update(WorkspaceParams::test);
4442 params.languages = lang_registry.clone();
4443 params.client = client_b.client.clone();
4444 params.user_store = client_b.user_store.clone();
4445 params.project = project_b;
4446
4447 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
4448 let editor_b = workspace_b
4449 .update(cx_b, |workspace, cx| {
4450 workspace.open_path((worktree_id, "one.rs"), true, cx)
4451 })
4452 .await
4453 .unwrap()
4454 .downcast::<Editor>()
4455 .unwrap();
4456 let fake_language_server = fake_language_servers.next().await.unwrap();
4457
4458 // Move cursor to a location that can be renamed.
4459 let prepare_rename = editor_b.update(cx_b, |editor, cx| {
4460 editor.select_ranges([7..7], None, cx);
4461 editor.rename(&Rename, cx).unwrap()
4462 });
4463
4464 fake_language_server
4465 .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
4466 assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
4467 assert_eq!(params.position, lsp::Position::new(0, 7));
4468 Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4469 lsp::Position::new(0, 6),
4470 lsp::Position::new(0, 9),
4471 ))))
4472 })
4473 .next()
4474 .await
4475 .unwrap();
4476 prepare_rename.await.unwrap();
4477 editor_b.update(cx_b, |editor, cx| {
4478 let rename = editor.pending_rename().unwrap();
4479 let buffer = editor.buffer().read(cx).snapshot(cx);
4480 assert_eq!(
4481 rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
4482 6..9
4483 );
4484 rename.editor.update(cx, |rename_editor, cx| {
4485 rename_editor.buffer().update(cx, |rename_buffer, cx| {
4486 rename_buffer.edit([(0..3, "THREE")], cx);
4487 });
4488 });
4489 });
4490
4491 let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
4492 Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
4493 });
4494 fake_language_server
4495 .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
4496 assert_eq!(
4497 params.text_document_position.text_document.uri.as_str(),
4498 "file:///dir/one.rs"
4499 );
4500 assert_eq!(
4501 params.text_document_position.position,
4502 lsp::Position::new(0, 6)
4503 );
4504 assert_eq!(params.new_name, "THREE");
4505 Ok(Some(lsp::WorkspaceEdit {
4506 changes: Some(
4507 [
4508 (
4509 lsp::Url::from_file_path("/dir/one.rs").unwrap(),
4510 vec![lsp::TextEdit::new(
4511 lsp::Range::new(
4512 lsp::Position::new(0, 6),
4513 lsp::Position::new(0, 9),
4514 ),
4515 "THREE".to_string(),
4516 )],
4517 ),
4518 (
4519 lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4520 vec![
4521 lsp::TextEdit::new(
4522 lsp::Range::new(
4523 lsp::Position::new(0, 24),
4524 lsp::Position::new(0, 27),
4525 ),
4526 "THREE".to_string(),
4527 ),
4528 lsp::TextEdit::new(
4529 lsp::Range::new(
4530 lsp::Position::new(0, 35),
4531 lsp::Position::new(0, 38),
4532 ),
4533 "THREE".to_string(),
4534 ),
4535 ],
4536 ),
4537 ]
4538 .into_iter()
4539 .collect(),
4540 ),
4541 ..Default::default()
4542 }))
4543 })
4544 .next()
4545 .await
4546 .unwrap();
4547 confirm_rename.await.unwrap();
4548
4549 let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4550 workspace
4551 .active_item(cx)
4552 .unwrap()
4553 .downcast::<Editor>()
4554 .unwrap()
4555 });
4556 rename_editor.update(cx_b, |editor, cx| {
4557 assert_eq!(
4558 editor.text(cx),
4559 "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4560 );
4561 editor.undo(&Undo, cx);
4562 assert_eq!(
4563 editor.text(cx),
4564 "const ONE: usize = 1;\nconst TWO: usize = one::ONE + one::ONE;"
4565 );
4566 editor.redo(&Redo, cx);
4567 assert_eq!(
4568 editor.text(cx),
4569 "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4570 );
4571 });
4572
4573 // Ensure temporary rename edits cannot be undone/redone.
4574 editor_b.update(cx_b, |editor, cx| {
4575 editor.undo(&Undo, cx);
4576 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4577 editor.undo(&Undo, cx);
4578 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4579 editor.redo(&Redo, cx);
4580 assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4581 })
4582 }
4583
4584 #[gpui::test(iterations = 10)]
4585 async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4586 cx_a.foreground().forbid_parking();
4587
4588 // Connect to a server as 2 clients.
4589 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4590 let client_a = server.create_client(cx_a, "user_a").await;
4591 let client_b = server.create_client(cx_b, "user_b").await;
4592
4593 // Create an org that includes these 2 users.
4594 let db = &server.app_state.db;
4595 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4596 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4597 .await
4598 .unwrap();
4599 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4600 .await
4601 .unwrap();
4602
4603 // Create a channel that includes all the users.
4604 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4605 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4606 .await
4607 .unwrap();
4608 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4609 .await
4610 .unwrap();
4611 db.create_channel_message(
4612 channel_id,
4613 client_b.current_user_id(&cx_b),
4614 "hello A, it's B.",
4615 OffsetDateTime::now_utc(),
4616 1,
4617 )
4618 .await
4619 .unwrap();
4620
4621 let channels_a = cx_a
4622 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4623 channels_a
4624 .condition(cx_a, |list, _| list.available_channels().is_some())
4625 .await;
4626 channels_a.read_with(cx_a, |list, _| {
4627 assert_eq!(
4628 list.available_channels().unwrap(),
4629 &[ChannelDetails {
4630 id: channel_id.to_proto(),
4631 name: "test-channel".to_string()
4632 }]
4633 )
4634 });
4635 let channel_a = channels_a.update(cx_a, |this, cx| {
4636 this.get_channel(channel_id.to_proto(), cx).unwrap()
4637 });
4638 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4639 channel_a
4640 .condition(&cx_a, |channel, _| {
4641 channel_messages(channel)
4642 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4643 })
4644 .await;
4645
4646 let channels_b = cx_b
4647 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4648 channels_b
4649 .condition(cx_b, |list, _| list.available_channels().is_some())
4650 .await;
4651 channels_b.read_with(cx_b, |list, _| {
4652 assert_eq!(
4653 list.available_channels().unwrap(),
4654 &[ChannelDetails {
4655 id: channel_id.to_proto(),
4656 name: "test-channel".to_string()
4657 }]
4658 )
4659 });
4660
4661 let channel_b = channels_b.update(cx_b, |this, cx| {
4662 this.get_channel(channel_id.to_proto(), cx).unwrap()
4663 });
4664 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4665 channel_b
4666 .condition(&cx_b, |channel, _| {
4667 channel_messages(channel)
4668 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4669 })
4670 .await;
4671
4672 channel_a
4673 .update(cx_a, |channel, cx| {
4674 channel
4675 .send_message("oh, hi B.".to_string(), cx)
4676 .unwrap()
4677 .detach();
4678 let task = channel.send_message("sup".to_string(), cx).unwrap();
4679 assert_eq!(
4680 channel_messages(channel),
4681 &[
4682 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4683 ("user_a".to_string(), "oh, hi B.".to_string(), true),
4684 ("user_a".to_string(), "sup".to_string(), true)
4685 ]
4686 );
4687 task
4688 })
4689 .await
4690 .unwrap();
4691
4692 channel_b
4693 .condition(&cx_b, |channel, _| {
4694 channel_messages(channel)
4695 == [
4696 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4697 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4698 ("user_a".to_string(), "sup".to_string(), false),
4699 ]
4700 })
4701 .await;
4702
4703 assert_eq!(
4704 server
4705 .state()
4706 .await
4707 .channel(channel_id)
4708 .unwrap()
4709 .connection_ids
4710 .len(),
4711 2
4712 );
4713 cx_b.update(|_| drop(channel_b));
4714 server
4715 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4716 .await;
4717
4718 cx_a.update(|_| drop(channel_a));
4719 server
4720 .condition(|state| state.channel(channel_id).is_none())
4721 .await;
4722 }
4723
4724 #[gpui::test(iterations = 10)]
4725 async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4726 cx_a.foreground().forbid_parking();
4727
4728 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4729 let client_a = server.create_client(cx_a, "user_a").await;
4730
4731 let db = &server.app_state.db;
4732 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4733 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4734 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4735 .await
4736 .unwrap();
4737 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4738 .await
4739 .unwrap();
4740
4741 let channels_a = cx_a
4742 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4743 channels_a
4744 .condition(cx_a, |list, _| list.available_channels().is_some())
4745 .await;
4746 let channel_a = channels_a.update(cx_a, |this, cx| {
4747 this.get_channel(channel_id.to_proto(), cx).unwrap()
4748 });
4749
4750 // Messages aren't allowed to be too long.
4751 channel_a
4752 .update(cx_a, |channel, cx| {
4753 let long_body = "this is long.\n".repeat(1024);
4754 channel.send_message(long_body, cx).unwrap()
4755 })
4756 .await
4757 .unwrap_err();
4758
4759 // Messages aren't allowed to be blank.
4760 channel_a.update(cx_a, |channel, cx| {
4761 channel.send_message(String::new(), cx).unwrap_err()
4762 });
4763
4764 // Leading and trailing whitespace are trimmed.
4765 channel_a
4766 .update(cx_a, |channel, cx| {
4767 channel
4768 .send_message("\n surrounded by whitespace \n".to_string(), cx)
4769 .unwrap()
4770 })
4771 .await
4772 .unwrap();
4773 assert_eq!(
4774 db.get_channel_messages(channel_id, 10, None)
4775 .await
4776 .unwrap()
4777 .iter()
4778 .map(|m| &m.body)
4779 .collect::<Vec<_>>(),
4780 &["surrounded by whitespace"]
4781 );
4782 }
4783
4784 #[gpui::test(iterations = 10)]
4785 async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4786 cx_a.foreground().forbid_parking();
4787
4788 // Connect to a server as 2 clients.
4789 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4790 let client_a = server.create_client(cx_a, "user_a").await;
4791 let client_b = server.create_client(cx_b, "user_b").await;
4792 let mut status_b = client_b.status();
4793
4794 // Create an org that includes these 2 users.
4795 let db = &server.app_state.db;
4796 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4797 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4798 .await
4799 .unwrap();
4800 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4801 .await
4802 .unwrap();
4803
4804 // Create a channel that includes all the users.
4805 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4806 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4807 .await
4808 .unwrap();
4809 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4810 .await
4811 .unwrap();
4812 db.create_channel_message(
4813 channel_id,
4814 client_b.current_user_id(&cx_b),
4815 "hello A, it's B.",
4816 OffsetDateTime::now_utc(),
4817 2,
4818 )
4819 .await
4820 .unwrap();
4821
4822 let channels_a = cx_a
4823 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4824 channels_a
4825 .condition(cx_a, |list, _| list.available_channels().is_some())
4826 .await;
4827
4828 channels_a.read_with(cx_a, |list, _| {
4829 assert_eq!(
4830 list.available_channels().unwrap(),
4831 &[ChannelDetails {
4832 id: channel_id.to_proto(),
4833 name: "test-channel".to_string()
4834 }]
4835 )
4836 });
4837 let channel_a = channels_a.update(cx_a, |this, cx| {
4838 this.get_channel(channel_id.to_proto(), cx).unwrap()
4839 });
4840 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4841 channel_a
4842 .condition(&cx_a, |channel, _| {
4843 channel_messages(channel)
4844 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4845 })
4846 .await;
4847
4848 let channels_b = cx_b
4849 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4850 channels_b
4851 .condition(cx_b, |list, _| list.available_channels().is_some())
4852 .await;
4853 channels_b.read_with(cx_b, |list, _| {
4854 assert_eq!(
4855 list.available_channels().unwrap(),
4856 &[ChannelDetails {
4857 id: channel_id.to_proto(),
4858 name: "test-channel".to_string()
4859 }]
4860 )
4861 });
4862
4863 let channel_b = channels_b.update(cx_b, |this, cx| {
4864 this.get_channel(channel_id.to_proto(), cx).unwrap()
4865 });
4866 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4867 channel_b
4868 .condition(&cx_b, |channel, _| {
4869 channel_messages(channel)
4870 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4871 })
4872 .await;
4873
4874 // Disconnect client B, ensuring we can still access its cached channel data.
4875 server.forbid_connections();
4876 server.disconnect_client(client_b.current_user_id(&cx_b));
4877 cx_b.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
4878 while !matches!(
4879 status_b.next().await,
4880 Some(client::Status::ReconnectionError { .. })
4881 ) {}
4882
4883 channels_b.read_with(cx_b, |channels, _| {
4884 assert_eq!(
4885 channels.available_channels().unwrap(),
4886 [ChannelDetails {
4887 id: channel_id.to_proto(),
4888 name: "test-channel".to_string()
4889 }]
4890 )
4891 });
4892 channel_b.read_with(cx_b, |channel, _| {
4893 assert_eq!(
4894 channel_messages(channel),
4895 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4896 )
4897 });
4898
4899 // Send a message from client B while it is disconnected.
4900 channel_b
4901 .update(cx_b, |channel, cx| {
4902 let task = channel
4903 .send_message("can you see this?".to_string(), cx)
4904 .unwrap();
4905 assert_eq!(
4906 channel_messages(channel),
4907 &[
4908 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4909 ("user_b".to_string(), "can you see this?".to_string(), true)
4910 ]
4911 );
4912 task
4913 })
4914 .await
4915 .unwrap_err();
4916
4917 // Send a message from client A while B is disconnected.
4918 channel_a
4919 .update(cx_a, |channel, cx| {
4920 channel
4921 .send_message("oh, hi B.".to_string(), cx)
4922 .unwrap()
4923 .detach();
4924 let task = channel.send_message("sup".to_string(), cx).unwrap();
4925 assert_eq!(
4926 channel_messages(channel),
4927 &[
4928 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4929 ("user_a".to_string(), "oh, hi B.".to_string(), true),
4930 ("user_a".to_string(), "sup".to_string(), true)
4931 ]
4932 );
4933 task
4934 })
4935 .await
4936 .unwrap();
4937
4938 // Give client B a chance to reconnect.
4939 server.allow_connections();
4940 cx_b.foreground().advance_clock(Duration::from_secs(10));
4941
4942 // Verify that B sees the new messages upon reconnection, as well as the message client B
4943 // sent while offline.
4944 channel_b
4945 .condition(&cx_b, |channel, _| {
4946 channel_messages(channel)
4947 == [
4948 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4949 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4950 ("user_a".to_string(), "sup".to_string(), false),
4951 ("user_b".to_string(), "can you see this?".to_string(), false),
4952 ]
4953 })
4954 .await;
4955
4956 // Ensure client A and B can communicate normally after reconnection.
4957 channel_a
4958 .update(cx_a, |channel, cx| {
4959 channel.send_message("you online?".to_string(), cx).unwrap()
4960 })
4961 .await
4962 .unwrap();
4963 channel_b
4964 .condition(&cx_b, |channel, _| {
4965 channel_messages(channel)
4966 == [
4967 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4968 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4969 ("user_a".to_string(), "sup".to_string(), false),
4970 ("user_b".to_string(), "can you see this?".to_string(), false),
4971 ("user_a".to_string(), "you online?".to_string(), false),
4972 ]
4973 })
4974 .await;
4975
4976 channel_b
4977 .update(cx_b, |channel, cx| {
4978 channel.send_message("yep".to_string(), cx).unwrap()
4979 })
4980 .await
4981 .unwrap();
4982 channel_a
4983 .condition(&cx_a, |channel, _| {
4984 channel_messages(channel)
4985 == [
4986 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4987 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4988 ("user_a".to_string(), "sup".to_string(), false),
4989 ("user_b".to_string(), "can you see this?".to_string(), false),
4990 ("user_a".to_string(), "you online?".to_string(), false),
4991 ("user_b".to_string(), "yep".to_string(), false),
4992 ]
4993 })
4994 .await;
4995 }
4996
4997 #[gpui::test(iterations = 10)]
4998 async fn test_contacts(
4999 deterministic: Arc<Deterministic>,
5000 cx_a: &mut TestAppContext,
5001 cx_b: &mut TestAppContext,
5002 cx_c: &mut TestAppContext,
5003 ) {
5004 cx_a.foreground().forbid_parking();
5005 let lang_registry = Arc::new(LanguageRegistry::test());
5006 let fs = FakeFs::new(cx_a.background());
5007
5008 // Connect to a server as 3 clients.
5009 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5010 let client_a = server.create_client(cx_a, "user_a").await;
5011 let client_b = server.create_client(cx_b, "user_b").await;
5012 let client_c = server.create_client(cx_c, "user_c").await;
5013 server
5014 .make_contacts(vec![
5015 (&client_a, cx_a),
5016 (&client_b, cx_b),
5017 (&client_c, cx_c),
5018 ])
5019 .await;
5020
5021 deterministic.run_until_parked();
5022 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5023 client.user_store.read_with(*cx, |store, _| {
5024 assert_eq!(
5025 contacts(store),
5026 [
5027 ("user_a", true, vec![]),
5028 ("user_b", true, vec![]),
5029 ("user_c", true, vec![])
5030 ]
5031 )
5032 });
5033 }
5034
5035 // Share a worktree as client A.
5036 fs.create_dir(Path::new("/a")).await.unwrap();
5037
5038 let project_a = cx_a.update(|cx| {
5039 Project::local(
5040 client_a.clone(),
5041 client_a.user_store.clone(),
5042 lang_registry.clone(),
5043 fs.clone(),
5044 cx,
5045 )
5046 });
5047 let (worktree_a, _) = project_a
5048 .update(cx_a, |p, cx| {
5049 p.find_or_create_local_worktree("/a", true, cx)
5050 })
5051 .await
5052 .unwrap();
5053 worktree_a
5054 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
5055 .await;
5056
5057 deterministic.run_until_parked();
5058 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5059 client.user_store.read_with(*cx, |store, _| {
5060 assert_eq!(
5061 contacts(store),
5062 [
5063 ("user_a", true, vec![("a", false, vec![])]),
5064 ("user_b", true, vec![]),
5065 ("user_c", true, vec![])
5066 ]
5067 )
5068 });
5069 }
5070
5071 let project_id = project_a
5072 .update(cx_a, |project, _| project.next_remote_id())
5073 .await;
5074 project_a
5075 .update(cx_a, |project, cx| project.share(cx))
5076 .await
5077 .unwrap();
5078
5079 deterministic.run_until_parked();
5080 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5081 client.user_store.read_with(*cx, |store, _| {
5082 assert_eq!(
5083 contacts(store),
5084 [
5085 ("user_a", true, vec![("a", true, vec![])]),
5086 ("user_b", true, vec![]),
5087 ("user_c", true, vec![])
5088 ]
5089 )
5090 });
5091 }
5092
5093 let _project_b = Project::remote(
5094 project_id,
5095 client_b.clone(),
5096 client_b.user_store.clone(),
5097 lang_registry.clone(),
5098 fs.clone(),
5099 &mut cx_b.to_async(),
5100 )
5101 .await
5102 .unwrap();
5103
5104 deterministic.run_until_parked();
5105 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5106 client.user_store.read_with(*cx, |store, _| {
5107 assert_eq!(
5108 contacts(store),
5109 [
5110 ("user_a", true, vec![("a", true, vec!["user_b"])]),
5111 ("user_b", true, vec![]),
5112 ("user_c", true, vec![])
5113 ]
5114 )
5115 });
5116 }
5117
5118 project_a
5119 .condition(&cx_a, |project, _| {
5120 project.collaborators().contains_key(&client_b.peer_id)
5121 })
5122 .await;
5123
5124 cx_a.update(move |_| drop(project_a));
5125 deterministic.run_until_parked();
5126 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5127 client.user_store.read_with(*cx, |store, _| {
5128 assert_eq!(
5129 contacts(store),
5130 [
5131 ("user_a", true, vec![]),
5132 ("user_b", true, vec![]),
5133 ("user_c", true, vec![])
5134 ]
5135 )
5136 });
5137 }
5138
5139 server.disconnect_client(client_c.current_user_id(cx_c));
5140 server.forbid_connections();
5141 deterministic.advance_clock(rpc::RECEIVE_TIMEOUT);
5142 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b)] {
5143 client.user_store.read_with(*cx, |store, _| {
5144 assert_eq!(
5145 contacts(store),
5146 [
5147 ("user_a", true, vec![]),
5148 ("user_b", true, vec![]),
5149 ("user_c", false, vec![])
5150 ]
5151 )
5152 });
5153 }
5154 client_c
5155 .user_store
5156 .read_with(cx_c, |store, _| assert_eq!(contacts(store), []));
5157
5158 server.allow_connections();
5159 client_c
5160 .authenticate_and_connect(false, &cx_c.to_async())
5161 .await
5162 .unwrap();
5163
5164 deterministic.run_until_parked();
5165 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5166 client.user_store.read_with(*cx, |store, _| {
5167 assert_eq!(
5168 contacts(store),
5169 [
5170 ("user_a", true, vec![]),
5171 ("user_b", true, vec![]),
5172 ("user_c", true, vec![])
5173 ]
5174 )
5175 });
5176 }
5177
5178 fn contacts(user_store: &UserStore) -> Vec<(&str, bool, Vec<(&str, bool, Vec<&str>)>)> {
5179 user_store
5180 .contacts()
5181 .iter()
5182 .map(|contact| {
5183 let worktrees = contact
5184 .projects
5185 .iter()
5186 .map(|p| {
5187 (
5188 p.worktree_root_names[0].as_str(),
5189 p.is_shared,
5190 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
5191 )
5192 })
5193 .collect();
5194 (
5195 contact.user.github_login.as_str(),
5196 contact.online,
5197 worktrees,
5198 )
5199 })
5200 .collect()
5201 }
5202 }
5203
5204 #[gpui::test(iterations = 10)]
5205 async fn test_contact_requests(
5206 executor: Arc<Deterministic>,
5207 cx_a: &mut TestAppContext,
5208 cx_a2: &mut TestAppContext,
5209 cx_b: &mut TestAppContext,
5210 cx_b2: &mut TestAppContext,
5211 cx_c: &mut TestAppContext,
5212 cx_c2: &mut TestAppContext,
5213 ) {
5214 cx_a.foreground().forbid_parking();
5215
5216 // Connect to a server as 3 clients.
5217 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5218 let client_a = server.create_client(cx_a, "user_a").await;
5219 let client_a2 = server.create_client(cx_a2, "user_a").await;
5220 let client_b = server.create_client(cx_b, "user_b").await;
5221 let client_b2 = server.create_client(cx_b2, "user_b").await;
5222 let client_c = server.create_client(cx_c, "user_c").await;
5223 let client_c2 = server.create_client(cx_c2, "user_c").await;
5224
5225 assert_eq!(client_a.user_id().unwrap(), client_a2.user_id().unwrap());
5226 assert_eq!(client_b.user_id().unwrap(), client_b2.user_id().unwrap());
5227 assert_eq!(client_c.user_id().unwrap(), client_c2.user_id().unwrap());
5228
5229 // User A and User C request that user B become their contact.
5230 client_a
5231 .user_store
5232 .update(cx_a, |store, cx| {
5233 store.request_contact(client_b.user_id().unwrap(), cx)
5234 })
5235 .await
5236 .unwrap();
5237 client_c
5238 .user_store
5239 .update(cx_c, |store, cx| {
5240 store.request_contact(client_b.user_id().unwrap(), cx)
5241 })
5242 .await
5243 .unwrap();
5244 executor.run_until_parked();
5245
5246 // All users see the pending request appear in all their clients.
5247 assert_eq!(
5248 client_a.summarize_contacts(&cx_a).outgoing_requests,
5249 &["user_b"]
5250 );
5251 assert_eq!(
5252 client_a2.summarize_contacts(&cx_a2).outgoing_requests,
5253 &["user_b"]
5254 );
5255 assert_eq!(
5256 client_b.summarize_contacts(&cx_b).incoming_requests,
5257 &["user_a", "user_c"]
5258 );
5259 assert_eq!(
5260 client_b2.summarize_contacts(&cx_b2).incoming_requests,
5261 &["user_a", "user_c"]
5262 );
5263 assert_eq!(
5264 client_c.summarize_contacts(&cx_c).outgoing_requests,
5265 &["user_b"]
5266 );
5267 assert_eq!(
5268 client_c2.summarize_contacts(&cx_c2).outgoing_requests,
5269 &["user_b"]
5270 );
5271
5272 // Contact requests are present upon connecting (tested here via disconnect/reconnect)
5273 disconnect_and_reconnect(&client_a, cx_a).await;
5274 disconnect_and_reconnect(&client_b, cx_b).await;
5275 disconnect_and_reconnect(&client_c, cx_c).await;
5276 executor.run_until_parked();
5277 assert_eq!(
5278 client_a.summarize_contacts(&cx_a).outgoing_requests,
5279 &["user_b"]
5280 );
5281 assert_eq!(
5282 client_b.summarize_contacts(&cx_b).incoming_requests,
5283 &["user_a", "user_c"]
5284 );
5285 assert_eq!(
5286 client_c.summarize_contacts(&cx_c).outgoing_requests,
5287 &["user_b"]
5288 );
5289
5290 // User B accepts the request from user A.
5291 client_b
5292 .user_store
5293 .update(cx_b, |store, cx| {
5294 store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
5295 })
5296 .await
5297 .unwrap();
5298
5299 executor.run_until_parked();
5300
5301 // User B sees user A as their contact now in all client, and the incoming request from them is removed.
5302 let contacts_b = client_b.summarize_contacts(&cx_b);
5303 assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5304 assert_eq!(contacts_b.incoming_requests, &["user_c"]);
5305 let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5306 assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5307 assert_eq!(contacts_b2.incoming_requests, &["user_c"]);
5308
5309 // User A sees user B as their contact now in all clients, and the outgoing request to them is removed.
5310 let contacts_a = client_a.summarize_contacts(&cx_a);
5311 assert_eq!(contacts_a.current, &["user_a", "user_b"]);
5312 assert!(contacts_a.outgoing_requests.is_empty());
5313 let contacts_a2 = client_a2.summarize_contacts(&cx_a2);
5314 assert_eq!(contacts_a2.current, &["user_a", "user_b"]);
5315 assert!(contacts_a2.outgoing_requests.is_empty());
5316
5317 // Contacts are present upon connecting (tested here via disconnect/reconnect)
5318 disconnect_and_reconnect(&client_a, cx_a).await;
5319 disconnect_and_reconnect(&client_b, cx_b).await;
5320 disconnect_and_reconnect(&client_c, cx_c).await;
5321 executor.run_until_parked();
5322 assert_eq!(
5323 client_a.summarize_contacts(&cx_a).current,
5324 &["user_a", "user_b"]
5325 );
5326 assert_eq!(
5327 client_b.summarize_contacts(&cx_b).current,
5328 &["user_a", "user_b"]
5329 );
5330 assert_eq!(
5331 client_b.summarize_contacts(&cx_b).incoming_requests,
5332 &["user_c"]
5333 );
5334 assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5335 assert_eq!(
5336 client_c.summarize_contacts(&cx_c).outgoing_requests,
5337 &["user_b"]
5338 );
5339
5340 // User B rejects the request from user C.
5341 client_b
5342 .user_store
5343 .update(cx_b, |store, cx| {
5344 store.respond_to_contact_request(client_c.user_id().unwrap(), false, cx)
5345 })
5346 .await
5347 .unwrap();
5348
5349 executor.run_until_parked();
5350
5351 // User B doesn't see user C as their contact, and the incoming request from them is removed.
5352 let contacts_b = client_b.summarize_contacts(&cx_b);
5353 assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5354 assert!(contacts_b.incoming_requests.is_empty());
5355 let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5356 assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5357 assert!(contacts_b2.incoming_requests.is_empty());
5358
5359 // User C doesn't see user B as their contact, and the outgoing request to them is removed.
5360 let contacts_c = client_c.summarize_contacts(&cx_c);
5361 assert_eq!(contacts_c.current, &["user_c"]);
5362 assert!(contacts_c.outgoing_requests.is_empty());
5363 let contacts_c2 = client_c2.summarize_contacts(&cx_c2);
5364 assert_eq!(contacts_c2.current, &["user_c"]);
5365 assert!(contacts_c2.outgoing_requests.is_empty());
5366
5367 // Incoming/outgoing requests are not present upon connecting (tested here via disconnect/reconnect)
5368 disconnect_and_reconnect(&client_a, cx_a).await;
5369 disconnect_and_reconnect(&client_b, cx_b).await;
5370 disconnect_and_reconnect(&client_c, cx_c).await;
5371 executor.run_until_parked();
5372 assert_eq!(
5373 client_a.summarize_contacts(&cx_a).current,
5374 &["user_a", "user_b"]
5375 );
5376 assert_eq!(
5377 client_b.summarize_contacts(&cx_b).current,
5378 &["user_a", "user_b"]
5379 );
5380 assert!(client_b
5381 .summarize_contacts(&cx_b)
5382 .incoming_requests
5383 .is_empty());
5384 assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5385 assert!(client_c
5386 .summarize_contacts(&cx_c)
5387 .outgoing_requests
5388 .is_empty());
5389
5390 async fn disconnect_and_reconnect(client: &TestClient, cx: &mut TestAppContext) {
5391 client.disconnect(&cx.to_async()).unwrap();
5392 client.clear_contacts(cx).await;
5393 client
5394 .authenticate_and_connect(false, &cx.to_async())
5395 .await
5396 .unwrap();
5397 }
5398 }
5399
5400 #[gpui::test(iterations = 10)]
5401 async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5402 cx_a.foreground().forbid_parking();
5403 let fs = FakeFs::new(cx_a.background());
5404
5405 // 2 clients connect to a server.
5406 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5407 let mut client_a = server.create_client(cx_a, "user_a").await;
5408 let mut client_b = server.create_client(cx_b, "user_b").await;
5409 server
5410 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5411 .await;
5412 cx_a.update(editor::init);
5413 cx_b.update(editor::init);
5414
5415 // Client A shares a project.
5416 fs.insert_tree(
5417 "/a",
5418 json!({
5419 "1.txt": "one",
5420 "2.txt": "two",
5421 "3.txt": "three",
5422 }),
5423 )
5424 .await;
5425 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5426 project_a
5427 .update(cx_a, |project, cx| project.share(cx))
5428 .await
5429 .unwrap();
5430
5431 // Client B joins the project.
5432 let project_b = client_b
5433 .build_remote_project(
5434 project_a
5435 .read_with(cx_a, |project, _| project.remote_id())
5436 .unwrap(),
5437 cx_b,
5438 )
5439 .await;
5440
5441 // Client A opens some editors.
5442 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5443 let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5444 let editor_a1 = workspace_a
5445 .update(cx_a, |workspace, cx| {
5446 workspace.open_path((worktree_id, "1.txt"), true, cx)
5447 })
5448 .await
5449 .unwrap()
5450 .downcast::<Editor>()
5451 .unwrap();
5452 let editor_a2 = workspace_a
5453 .update(cx_a, |workspace, cx| {
5454 workspace.open_path((worktree_id, "2.txt"), true, cx)
5455 })
5456 .await
5457 .unwrap()
5458 .downcast::<Editor>()
5459 .unwrap();
5460
5461 // Client B opens an editor.
5462 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5463 let editor_b1 = workspace_b
5464 .update(cx_b, |workspace, cx| {
5465 workspace.open_path((worktree_id, "1.txt"), true, cx)
5466 })
5467 .await
5468 .unwrap()
5469 .downcast::<Editor>()
5470 .unwrap();
5471
5472 let client_a_id = project_b.read_with(cx_b, |project, _| {
5473 project.collaborators().values().next().unwrap().peer_id
5474 });
5475 let client_b_id = project_a.read_with(cx_a, |project, _| {
5476 project.collaborators().values().next().unwrap().peer_id
5477 });
5478
5479 // When client B starts following client A, all visible view states are replicated to client B.
5480 editor_a1.update(cx_a, |editor, cx| editor.select_ranges([0..1], None, cx));
5481 editor_a2.update(cx_a, |editor, cx| editor.select_ranges([2..3], None, cx));
5482 workspace_b
5483 .update(cx_b, |workspace, cx| {
5484 workspace
5485 .toggle_follow(&ToggleFollow(client_a_id), cx)
5486 .unwrap()
5487 })
5488 .await
5489 .unwrap();
5490 let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5491 workspace
5492 .active_item(cx)
5493 .unwrap()
5494 .downcast::<Editor>()
5495 .unwrap()
5496 });
5497 assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
5498 assert_eq!(
5499 editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
5500 Some((worktree_id, "2.txt").into())
5501 );
5502 assert_eq!(
5503 editor_b2.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
5504 vec![2..3]
5505 );
5506 assert_eq!(
5507 editor_b1.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
5508 vec![0..1]
5509 );
5510
5511 // When client A activates a different editor, client B does so as well.
5512 workspace_a.update(cx_a, |workspace, cx| {
5513 workspace.activate_item(&editor_a1, cx)
5514 });
5515 workspace_b
5516 .condition(cx_b, |workspace, cx| {
5517 workspace.active_item(cx).unwrap().id() == editor_b1.id()
5518 })
5519 .await;
5520
5521 // When client A navigates back and forth, client B does so as well.
5522 workspace_a
5523 .update(cx_a, |workspace, cx| {
5524 workspace::Pane::go_back(workspace, None, cx)
5525 })
5526 .await;
5527 workspace_b
5528 .condition(cx_b, |workspace, cx| {
5529 workspace.active_item(cx).unwrap().id() == editor_b2.id()
5530 })
5531 .await;
5532
5533 workspace_a
5534 .update(cx_a, |workspace, cx| {
5535 workspace::Pane::go_forward(workspace, None, cx)
5536 })
5537 .await;
5538 workspace_b
5539 .condition(cx_b, |workspace, cx| {
5540 workspace.active_item(cx).unwrap().id() == editor_b1.id()
5541 })
5542 .await;
5543
5544 // Changes to client A's editor are reflected on client B.
5545 editor_a1.update(cx_a, |editor, cx| {
5546 editor.select_ranges([1..1, 2..2], None, cx);
5547 });
5548 editor_b1
5549 .condition(cx_b, |editor, cx| {
5550 editor.selected_ranges(cx) == vec![1..1, 2..2]
5551 })
5552 .await;
5553
5554 editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
5555 editor_b1
5556 .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
5557 .await;
5558
5559 editor_a1.update(cx_a, |editor, cx| {
5560 editor.select_ranges([3..3], None, cx);
5561 editor.set_scroll_position(vec2f(0., 100.), cx);
5562 });
5563 editor_b1
5564 .condition(cx_b, |editor, cx| editor.selected_ranges(cx) == vec![3..3])
5565 .await;
5566
5567 // After unfollowing, client B stops receiving updates from client A.
5568 workspace_b.update(cx_b, |workspace, cx| {
5569 workspace.unfollow(&workspace.active_pane().clone(), cx)
5570 });
5571 workspace_a.update(cx_a, |workspace, cx| {
5572 workspace.activate_item(&editor_a2, cx)
5573 });
5574 cx_a.foreground().run_until_parked();
5575 assert_eq!(
5576 workspace_b.read_with(cx_b, |workspace, cx| workspace
5577 .active_item(cx)
5578 .unwrap()
5579 .id()),
5580 editor_b1.id()
5581 );
5582
5583 // Client A starts following client B.
5584 workspace_a
5585 .update(cx_a, |workspace, cx| {
5586 workspace
5587 .toggle_follow(&ToggleFollow(client_b_id), cx)
5588 .unwrap()
5589 })
5590 .await
5591 .unwrap();
5592 assert_eq!(
5593 workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5594 Some(client_b_id)
5595 );
5596 assert_eq!(
5597 workspace_a.read_with(cx_a, |workspace, cx| workspace
5598 .active_item(cx)
5599 .unwrap()
5600 .id()),
5601 editor_a1.id()
5602 );
5603
5604 // Following interrupts when client B disconnects.
5605 client_b.disconnect(&cx_b.to_async()).unwrap();
5606 cx_a.foreground().run_until_parked();
5607 assert_eq!(
5608 workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5609 None
5610 );
5611 }
5612
5613 #[gpui::test(iterations = 10)]
5614 async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5615 cx_a.foreground().forbid_parking();
5616 let fs = FakeFs::new(cx_a.background());
5617
5618 // 2 clients connect to a server.
5619 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5620 let mut client_a = server.create_client(cx_a, "user_a").await;
5621 let mut client_b = server.create_client(cx_b, "user_b").await;
5622 server
5623 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5624 .await;
5625 cx_a.update(editor::init);
5626 cx_b.update(editor::init);
5627
5628 // Client A shares a project.
5629 fs.insert_tree(
5630 "/a",
5631 json!({
5632 "1.txt": "one",
5633 "2.txt": "two",
5634 "3.txt": "three",
5635 "4.txt": "four",
5636 }),
5637 )
5638 .await;
5639 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5640 project_a
5641 .update(cx_a, |project, cx| project.share(cx))
5642 .await
5643 .unwrap();
5644
5645 // Client B joins the project.
5646 let project_b = client_b
5647 .build_remote_project(
5648 project_a
5649 .read_with(cx_a, |project, _| project.remote_id())
5650 .unwrap(),
5651 cx_b,
5652 )
5653 .await;
5654
5655 // Client A opens some editors.
5656 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5657 let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5658 let _editor_a1 = workspace_a
5659 .update(cx_a, |workspace, cx| {
5660 workspace.open_path((worktree_id, "1.txt"), true, cx)
5661 })
5662 .await
5663 .unwrap()
5664 .downcast::<Editor>()
5665 .unwrap();
5666
5667 // Client B opens an editor.
5668 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5669 let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5670 let _editor_b1 = workspace_b
5671 .update(cx_b, |workspace, cx| {
5672 workspace.open_path((worktree_id, "2.txt"), true, cx)
5673 })
5674 .await
5675 .unwrap()
5676 .downcast::<Editor>()
5677 .unwrap();
5678
5679 // Clients A and B follow each other in split panes
5680 workspace_a
5681 .update(cx_a, |workspace, cx| {
5682 workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5683 assert_ne!(*workspace.active_pane(), pane_a1);
5684 let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
5685 workspace
5686 .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5687 .unwrap()
5688 })
5689 .await
5690 .unwrap();
5691 workspace_b
5692 .update(cx_b, |workspace, cx| {
5693 workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5694 assert_ne!(*workspace.active_pane(), pane_b1);
5695 let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
5696 workspace
5697 .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5698 .unwrap()
5699 })
5700 .await
5701 .unwrap();
5702
5703 workspace_a
5704 .update(cx_a, |workspace, cx| {
5705 workspace.activate_next_pane(cx);
5706 assert_eq!(*workspace.active_pane(), pane_a1);
5707 workspace.open_path((worktree_id, "3.txt"), true, cx)
5708 })
5709 .await
5710 .unwrap();
5711 workspace_b
5712 .update(cx_b, |workspace, cx| {
5713 workspace.activate_next_pane(cx);
5714 assert_eq!(*workspace.active_pane(), pane_b1);
5715 workspace.open_path((worktree_id, "4.txt"), true, cx)
5716 })
5717 .await
5718 .unwrap();
5719 cx_a.foreground().run_until_parked();
5720
5721 // Ensure leader updates don't change the active pane of followers
5722 workspace_a.read_with(cx_a, |workspace, _| {
5723 assert_eq!(*workspace.active_pane(), pane_a1);
5724 });
5725 workspace_b.read_with(cx_b, |workspace, _| {
5726 assert_eq!(*workspace.active_pane(), pane_b1);
5727 });
5728
5729 // Ensure peers following each other doesn't cause an infinite loop.
5730 assert_eq!(
5731 workspace_a.read_with(cx_a, |workspace, cx| workspace
5732 .active_item(cx)
5733 .unwrap()
5734 .project_path(cx)),
5735 Some((worktree_id, "3.txt").into())
5736 );
5737 workspace_a.update(cx_a, |workspace, cx| {
5738 assert_eq!(
5739 workspace.active_item(cx).unwrap().project_path(cx),
5740 Some((worktree_id, "3.txt").into())
5741 );
5742 workspace.activate_next_pane(cx);
5743 assert_eq!(
5744 workspace.active_item(cx).unwrap().project_path(cx),
5745 Some((worktree_id, "4.txt").into())
5746 );
5747 });
5748 workspace_b.update(cx_b, |workspace, cx| {
5749 assert_eq!(
5750 workspace.active_item(cx).unwrap().project_path(cx),
5751 Some((worktree_id, "4.txt").into())
5752 );
5753 workspace.activate_next_pane(cx);
5754 assert_eq!(
5755 workspace.active_item(cx).unwrap().project_path(cx),
5756 Some((worktree_id, "3.txt").into())
5757 );
5758 });
5759 }
5760
5761 #[gpui::test(iterations = 10)]
5762 async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5763 cx_a.foreground().forbid_parking();
5764 let fs = FakeFs::new(cx_a.background());
5765
5766 // 2 clients connect to a server.
5767 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5768 let mut client_a = server.create_client(cx_a, "user_a").await;
5769 let mut client_b = server.create_client(cx_b, "user_b").await;
5770 server
5771 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5772 .await;
5773 cx_a.update(editor::init);
5774 cx_b.update(editor::init);
5775
5776 // Client A shares a project.
5777 fs.insert_tree(
5778 "/a",
5779 json!({
5780 "1.txt": "one",
5781 "2.txt": "two",
5782 "3.txt": "three",
5783 }),
5784 )
5785 .await;
5786 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5787 project_a
5788 .update(cx_a, |project, cx| project.share(cx))
5789 .await
5790 .unwrap();
5791
5792 // Client B joins the project.
5793 let project_b = client_b
5794 .build_remote_project(
5795 project_a
5796 .read_with(cx_a, |project, _| project.remote_id())
5797 .unwrap(),
5798 cx_b,
5799 )
5800 .await;
5801
5802 // Client A opens some editors.
5803 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5804 let _editor_a1 = workspace_a
5805 .update(cx_a, |workspace, cx| {
5806 workspace.open_path((worktree_id, "1.txt"), true, cx)
5807 })
5808 .await
5809 .unwrap()
5810 .downcast::<Editor>()
5811 .unwrap();
5812
5813 // Client B starts following client A.
5814 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5815 let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5816 let leader_id = project_b.read_with(cx_b, |project, _| {
5817 project.collaborators().values().next().unwrap().peer_id
5818 });
5819 workspace_b
5820 .update(cx_b, |workspace, cx| {
5821 workspace
5822 .toggle_follow(&ToggleFollow(leader_id), cx)
5823 .unwrap()
5824 })
5825 .await
5826 .unwrap();
5827 assert_eq!(
5828 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5829 Some(leader_id)
5830 );
5831 let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5832 workspace
5833 .active_item(cx)
5834 .unwrap()
5835 .downcast::<Editor>()
5836 .unwrap()
5837 });
5838
5839 // When client B moves, it automatically stops following client A.
5840 editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
5841 assert_eq!(
5842 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5843 None
5844 );
5845
5846 workspace_b
5847 .update(cx_b, |workspace, cx| {
5848 workspace
5849 .toggle_follow(&ToggleFollow(leader_id), cx)
5850 .unwrap()
5851 })
5852 .await
5853 .unwrap();
5854 assert_eq!(
5855 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5856 Some(leader_id)
5857 );
5858
5859 // When client B edits, it automatically stops following client A.
5860 editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
5861 assert_eq!(
5862 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5863 None
5864 );
5865
5866 workspace_b
5867 .update(cx_b, |workspace, cx| {
5868 workspace
5869 .toggle_follow(&ToggleFollow(leader_id), cx)
5870 .unwrap()
5871 })
5872 .await
5873 .unwrap();
5874 assert_eq!(
5875 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5876 Some(leader_id)
5877 );
5878
5879 // When client B scrolls, it automatically stops following client A.
5880 editor_b2.update(cx_b, |editor, cx| {
5881 editor.set_scroll_position(vec2f(0., 3.), cx)
5882 });
5883 assert_eq!(
5884 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5885 None
5886 );
5887
5888 workspace_b
5889 .update(cx_b, |workspace, cx| {
5890 workspace
5891 .toggle_follow(&ToggleFollow(leader_id), cx)
5892 .unwrap()
5893 })
5894 .await
5895 .unwrap();
5896 assert_eq!(
5897 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5898 Some(leader_id)
5899 );
5900
5901 // When client B activates a different pane, it continues following client A in the original pane.
5902 workspace_b.update(cx_b, |workspace, cx| {
5903 workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
5904 });
5905 assert_eq!(
5906 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5907 Some(leader_id)
5908 );
5909
5910 workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
5911 assert_eq!(
5912 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5913 Some(leader_id)
5914 );
5915
5916 // When client B activates a different item in the original pane, it automatically stops following client A.
5917 workspace_b
5918 .update(cx_b, |workspace, cx| {
5919 workspace.open_path((worktree_id, "2.txt"), true, cx)
5920 })
5921 .await
5922 .unwrap();
5923 assert_eq!(
5924 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5925 None
5926 );
5927 }
5928
5929 #[gpui::test(iterations = 100)]
5930 async fn test_random_collaboration(
5931 cx: &mut TestAppContext,
5932 deterministic: Arc<Deterministic>,
5933 rng: StdRng,
5934 ) {
5935 cx.foreground().forbid_parking();
5936 let max_peers = env::var("MAX_PEERS")
5937 .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
5938 .unwrap_or(5);
5939 assert!(max_peers <= 5);
5940
5941 let max_operations = env::var("OPERATIONS")
5942 .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
5943 .unwrap_or(10);
5944
5945 let rng = Arc::new(Mutex::new(rng));
5946
5947 let guest_lang_registry = Arc::new(LanguageRegistry::test());
5948 let host_language_registry = Arc::new(LanguageRegistry::test());
5949
5950 let fs = FakeFs::new(cx.background());
5951 fs.insert_tree("/_collab", json!({"init": ""})).await;
5952
5953 let mut server = TestServer::start(cx.foreground(), cx.background()).await;
5954 let db = server.app_state.db.clone();
5955 let host_user_id = db.create_user("host", false).await.unwrap();
5956 for username in ["guest-1", "guest-2", "guest-3", "guest-4"] {
5957 let guest_user_id = db.create_user(username, false).await.unwrap();
5958 server
5959 .app_state
5960 .db
5961 .send_contact_request(guest_user_id, host_user_id)
5962 .await
5963 .unwrap();
5964 server
5965 .app_state
5966 .db
5967 .respond_to_contact_request(host_user_id, guest_user_id, true)
5968 .await
5969 .unwrap();
5970 }
5971
5972 let mut clients = Vec::new();
5973 let mut user_ids = Vec::new();
5974 let mut op_start_signals = Vec::new();
5975
5976 let mut next_entity_id = 100000;
5977 let mut host_cx = TestAppContext::new(
5978 cx.foreground_platform(),
5979 cx.platform(),
5980 deterministic.build_foreground(next_entity_id),
5981 deterministic.build_background(),
5982 cx.font_cache(),
5983 cx.leak_detector(),
5984 next_entity_id,
5985 );
5986 let host = server.create_client(&mut host_cx, "host").await;
5987 let host_project = host_cx.update(|cx| {
5988 Project::local(
5989 host.client.clone(),
5990 host.user_store.clone(),
5991 host_language_registry.clone(),
5992 fs.clone(),
5993 cx,
5994 )
5995 });
5996 let host_project_id = host_project
5997 .update(&mut host_cx, |p, _| p.next_remote_id())
5998 .await;
5999
6000 let (collab_worktree, _) = host_project
6001 .update(&mut host_cx, |project, cx| {
6002 project.find_or_create_local_worktree("/_collab", true, cx)
6003 })
6004 .await
6005 .unwrap();
6006 collab_worktree
6007 .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
6008 .await;
6009 host_project
6010 .update(&mut host_cx, |project, cx| project.share(cx))
6011 .await
6012 .unwrap();
6013
6014 // Set up fake language servers.
6015 let mut language = Language::new(
6016 LanguageConfig {
6017 name: "Rust".into(),
6018 path_suffixes: vec!["rs".to_string()],
6019 ..Default::default()
6020 },
6021 None,
6022 );
6023 let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
6024 name: "the-fake-language-server",
6025 capabilities: lsp::LanguageServer::full_capabilities(),
6026 initializer: Some(Box::new({
6027 let rng = rng.clone();
6028 let fs = fs.clone();
6029 let project = host_project.downgrade();
6030 move |fake_server: &mut FakeLanguageServer| {
6031 fake_server.handle_request::<lsp::request::Completion, _, _>(
6032 |_, _| async move {
6033 Ok(Some(lsp::CompletionResponse::Array(vec![
6034 lsp::CompletionItem {
6035 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
6036 range: lsp::Range::new(
6037 lsp::Position::new(0, 0),
6038 lsp::Position::new(0, 0),
6039 ),
6040 new_text: "the-new-text".to_string(),
6041 })),
6042 ..Default::default()
6043 },
6044 ])))
6045 },
6046 );
6047
6048 fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
6049 |_, _| async move {
6050 Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
6051 lsp::CodeAction {
6052 title: "the-code-action".to_string(),
6053 ..Default::default()
6054 },
6055 )]))
6056 },
6057 );
6058
6059 fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
6060 |params, _| async move {
6061 Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
6062 params.position,
6063 params.position,
6064 ))))
6065 },
6066 );
6067
6068 fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
6069 let fs = fs.clone();
6070 let rng = rng.clone();
6071 move |_, _| {
6072 let fs = fs.clone();
6073 let rng = rng.clone();
6074 async move {
6075 let files = fs.files().await;
6076 let mut rng = rng.lock();
6077 let count = rng.gen_range::<usize, _>(1..3);
6078 let files = (0..count)
6079 .map(|_| files.choose(&mut *rng).unwrap())
6080 .collect::<Vec<_>>();
6081 log::info!("LSP: Returning definitions in files {:?}", &files);
6082 Ok(Some(lsp::GotoDefinitionResponse::Array(
6083 files
6084 .into_iter()
6085 .map(|file| lsp::Location {
6086 uri: lsp::Url::from_file_path(file).unwrap(),
6087 range: Default::default(),
6088 })
6089 .collect(),
6090 )))
6091 }
6092 }
6093 });
6094
6095 fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
6096 let rng = rng.clone();
6097 let project = project.clone();
6098 move |params, mut cx| {
6099 let highlights = if let Some(project) = project.upgrade(&cx) {
6100 project.update(&mut cx, |project, cx| {
6101 let path = params
6102 .text_document_position_params
6103 .text_document
6104 .uri
6105 .to_file_path()
6106 .unwrap();
6107 let (worktree, relative_path) =
6108 project.find_local_worktree(&path, cx)?;
6109 let project_path =
6110 ProjectPath::from((worktree.read(cx).id(), relative_path));
6111 let buffer =
6112 project.get_open_buffer(&project_path, cx)?.read(cx);
6113
6114 let mut highlights = Vec::new();
6115 let highlight_count = rng.lock().gen_range(1..=5);
6116 let mut prev_end = 0;
6117 for _ in 0..highlight_count {
6118 let range =
6119 buffer.random_byte_range(prev_end, &mut *rng.lock());
6120
6121 highlights.push(lsp::DocumentHighlight {
6122 range: range_to_lsp(range.to_point_utf16(buffer)),
6123 kind: Some(lsp::DocumentHighlightKind::READ),
6124 });
6125 prev_end = range.end;
6126 }
6127 Some(highlights)
6128 })
6129 } else {
6130 None
6131 };
6132 async move { Ok(highlights) }
6133 }
6134 });
6135 }
6136 })),
6137 ..Default::default()
6138 });
6139 host_language_registry.add(Arc::new(language));
6140
6141 let op_start_signal = futures::channel::mpsc::unbounded();
6142 user_ids.push(host.current_user_id(&host_cx));
6143 op_start_signals.push(op_start_signal.0);
6144 clients.push(host_cx.foreground().spawn(host.simulate_host(
6145 host_project,
6146 op_start_signal.1,
6147 rng.clone(),
6148 host_cx,
6149 )));
6150
6151 let disconnect_host_at = if rng.lock().gen_bool(0.2) {
6152 rng.lock().gen_range(0..max_operations)
6153 } else {
6154 max_operations
6155 };
6156 let mut available_guests = vec![
6157 "guest-1".to_string(),
6158 "guest-2".to_string(),
6159 "guest-3".to_string(),
6160 "guest-4".to_string(),
6161 ];
6162 let mut operations = 0;
6163 while operations < max_operations {
6164 if operations == disconnect_host_at {
6165 server.disconnect_client(user_ids[0]);
6166 cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6167 drop(op_start_signals);
6168 let mut clients = futures::future::join_all(clients).await;
6169 cx.foreground().run_until_parked();
6170
6171 let (host, mut host_cx, host_err) = clients.remove(0);
6172 if let Some(host_err) = host_err {
6173 log::error!("host error - {}", host_err);
6174 }
6175 host.project
6176 .as_ref()
6177 .unwrap()
6178 .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
6179 for (guest, mut guest_cx, guest_err) in clients {
6180 if let Some(guest_err) = guest_err {
6181 log::error!("{} error - {}", guest.username, guest_err);
6182 }
6183 // TODO
6184 // let contacts = server
6185 // .store
6186 // .read()
6187 // .await
6188 // .contacts_for_user(guest.current_user_id(&guest_cx));
6189 // assert!(!contacts
6190 // .iter()
6191 // .flat_map(|contact| &contact.projects)
6192 // .any(|project| project.id == host_project_id));
6193 guest
6194 .project
6195 .as_ref()
6196 .unwrap()
6197 .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6198 guest_cx.update(|_| drop(guest));
6199 }
6200 host_cx.update(|_| drop(host));
6201
6202 return;
6203 }
6204
6205 let distribution = rng.lock().gen_range(0..100);
6206 match distribution {
6207 0..=19 if !available_guests.is_empty() => {
6208 let guest_ix = rng.lock().gen_range(0..available_guests.len());
6209 let guest_username = available_guests.remove(guest_ix);
6210 log::info!("Adding new connection for {}", guest_username);
6211 next_entity_id += 100000;
6212 let mut guest_cx = TestAppContext::new(
6213 cx.foreground_platform(),
6214 cx.platform(),
6215 deterministic.build_foreground(next_entity_id),
6216 deterministic.build_background(),
6217 cx.font_cache(),
6218 cx.leak_detector(),
6219 next_entity_id,
6220 );
6221 let guest = server.create_client(&mut guest_cx, &guest_username).await;
6222 let guest_project = Project::remote(
6223 host_project_id,
6224 guest.client.clone(),
6225 guest.user_store.clone(),
6226 guest_lang_registry.clone(),
6227 FakeFs::new(cx.background()),
6228 &mut guest_cx.to_async(),
6229 )
6230 .await
6231 .unwrap();
6232 let op_start_signal = futures::channel::mpsc::unbounded();
6233 user_ids.push(guest.current_user_id(&guest_cx));
6234 op_start_signals.push(op_start_signal.0);
6235 clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
6236 guest_username.clone(),
6237 guest_project,
6238 op_start_signal.1,
6239 rng.clone(),
6240 guest_cx,
6241 )));
6242
6243 log::info!("Added connection for {}", guest_username);
6244 operations += 1;
6245 }
6246 20..=29 if clients.len() > 1 => {
6247 let guest_ix = rng.lock().gen_range(1..clients.len());
6248 log::info!("Removing guest {}", user_ids[guest_ix]);
6249 let removed_guest_id = user_ids.remove(guest_ix);
6250 let guest = clients.remove(guest_ix);
6251 op_start_signals.remove(guest_ix);
6252 server.disconnect_client(removed_guest_id);
6253 cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6254 let (guest, mut guest_cx, guest_err) = guest.await;
6255 if let Some(guest_err) = guest_err {
6256 log::error!("{} error - {}", guest.username, guest_err);
6257 }
6258 guest
6259 .project
6260 .as_ref()
6261 .unwrap()
6262 .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6263 // TODO
6264 // for user_id in &user_ids {
6265 // for contact in server.store.read().await.contacts_for_user(*user_id) {
6266 // assert_ne!(
6267 // contact.user_id, removed_guest_id.0 as u64,
6268 // "removed guest is still a contact of another peer"
6269 // );
6270 // for project in contact.projects {
6271 // for project_guest_id in project.guests {
6272 // assert_ne!(
6273 // project_guest_id, removed_guest_id.0 as u64,
6274 // "removed guest appears as still participating on a project"
6275 // );
6276 // }
6277 // }
6278 // }
6279 // }
6280
6281 log::info!("{} removed", guest.username);
6282 available_guests.push(guest.username.clone());
6283 guest_cx.update(|_| drop(guest));
6284
6285 operations += 1;
6286 }
6287 _ => {
6288 while operations < max_operations && rng.lock().gen_bool(0.7) {
6289 op_start_signals
6290 .choose(&mut *rng.lock())
6291 .unwrap()
6292 .unbounded_send(())
6293 .unwrap();
6294 operations += 1;
6295 }
6296
6297 if rng.lock().gen_bool(0.8) {
6298 cx.foreground().run_until_parked();
6299 }
6300 }
6301 }
6302 }
6303
6304 drop(op_start_signals);
6305 let mut clients = futures::future::join_all(clients).await;
6306 cx.foreground().run_until_parked();
6307
6308 let (host_client, mut host_cx, host_err) = clients.remove(0);
6309 if let Some(host_err) = host_err {
6310 panic!("host error - {}", host_err);
6311 }
6312 let host_project = host_client.project.as_ref().unwrap();
6313 let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
6314 project
6315 .worktrees(cx)
6316 .map(|worktree| {
6317 let snapshot = worktree.read(cx).snapshot();
6318 (snapshot.id(), snapshot)
6319 })
6320 .collect::<BTreeMap<_, _>>()
6321 });
6322
6323 host_client
6324 .project
6325 .as_ref()
6326 .unwrap()
6327 .read_with(&host_cx, |project, cx| project.check_invariants(cx));
6328
6329 for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
6330 if let Some(guest_err) = guest_err {
6331 panic!("{} error - {}", guest_client.username, guest_err);
6332 }
6333 let worktree_snapshots =
6334 guest_client
6335 .project
6336 .as_ref()
6337 .unwrap()
6338 .read_with(&guest_cx, |project, cx| {
6339 project
6340 .worktrees(cx)
6341 .map(|worktree| {
6342 let worktree = worktree.read(cx);
6343 (worktree.id(), worktree.snapshot())
6344 })
6345 .collect::<BTreeMap<_, _>>()
6346 });
6347
6348 assert_eq!(
6349 worktree_snapshots.keys().collect::<Vec<_>>(),
6350 host_worktree_snapshots.keys().collect::<Vec<_>>(),
6351 "{} has different worktrees than the host",
6352 guest_client.username
6353 );
6354 for (id, host_snapshot) in &host_worktree_snapshots {
6355 let guest_snapshot = &worktree_snapshots[id];
6356 assert_eq!(
6357 guest_snapshot.root_name(),
6358 host_snapshot.root_name(),
6359 "{} has different root name than the host for worktree {}",
6360 guest_client.username,
6361 id
6362 );
6363 assert_eq!(
6364 guest_snapshot.entries(false).collect::<Vec<_>>(),
6365 host_snapshot.entries(false).collect::<Vec<_>>(),
6366 "{} has different snapshot than the host for worktree {}",
6367 guest_client.username,
6368 id
6369 );
6370 assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id());
6371 }
6372
6373 guest_client
6374 .project
6375 .as_ref()
6376 .unwrap()
6377 .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
6378
6379 for guest_buffer in &guest_client.buffers {
6380 let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
6381 let host_buffer = host_project.read_with(&host_cx, |project, cx| {
6382 project.buffer_for_id(buffer_id, cx).expect(&format!(
6383 "host does not have buffer for guest:{}, peer:{}, id:{}",
6384 guest_client.username, guest_client.peer_id, buffer_id
6385 ))
6386 });
6387 let path = host_buffer
6388 .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
6389
6390 assert_eq!(
6391 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
6392 0,
6393 "{}, buffer {}, path {:?} has deferred operations",
6394 guest_client.username,
6395 buffer_id,
6396 path,
6397 );
6398 assert_eq!(
6399 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
6400 host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
6401 "{}, buffer {}, path {:?}, differs from the host's buffer",
6402 guest_client.username,
6403 buffer_id,
6404 path
6405 );
6406 }
6407
6408 guest_cx.update(|_| drop(guest_client));
6409 }
6410
6411 host_cx.update(|_| drop(host_client));
6412 }
6413
6414 struct TestServer {
6415 peer: Arc<Peer>,
6416 app_state: Arc<AppState>,
6417 server: Arc<Server>,
6418 foreground: Rc<executor::Foreground>,
6419 notifications: mpsc::UnboundedReceiver<()>,
6420 connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
6421 forbid_connections: Arc<AtomicBool>,
6422 _test_db: TestDb,
6423 }
6424
6425 impl TestServer {
6426 async fn start(
6427 foreground: Rc<executor::Foreground>,
6428 background: Arc<executor::Background>,
6429 ) -> Self {
6430 let test_db = TestDb::fake(background);
6431 let app_state = Self::build_app_state(&test_db).await;
6432 let peer = Peer::new();
6433 let notifications = mpsc::unbounded();
6434 let server = Server::new(app_state.clone(), Some(notifications.0));
6435 Self {
6436 peer,
6437 app_state,
6438 server,
6439 foreground,
6440 notifications: notifications.1,
6441 connection_killers: Default::default(),
6442 forbid_connections: Default::default(),
6443 _test_db: test_db,
6444 }
6445 }
6446
6447 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
6448 cx.update(|cx| {
6449 let settings = Settings::test(cx);
6450 cx.set_global(settings);
6451 });
6452
6453 let http = FakeHttpClient::with_404_response();
6454 let user_id =
6455 if let Ok(Some(user)) = self.app_state.db.get_user_by_github_login(name).await {
6456 user.id
6457 } else {
6458 self.app_state.db.create_user(name, false).await.unwrap()
6459 };
6460 let client_name = name.to_string();
6461 let mut client = Client::new(http.clone());
6462 let server = self.server.clone();
6463 let connection_killers = self.connection_killers.clone();
6464 let forbid_connections = self.forbid_connections.clone();
6465 let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
6466
6467 Arc::get_mut(&mut client)
6468 .unwrap()
6469 .override_authenticate(move |cx| {
6470 cx.spawn(|_| async move {
6471 let access_token = "the-token".to_string();
6472 Ok(Credentials {
6473 user_id: user_id.0 as u64,
6474 access_token,
6475 })
6476 })
6477 })
6478 .override_establish_connection(move |credentials, cx| {
6479 assert_eq!(credentials.user_id, user_id.0 as u64);
6480 assert_eq!(credentials.access_token, "the-token");
6481
6482 let server = server.clone();
6483 let connection_killers = connection_killers.clone();
6484 let forbid_connections = forbid_connections.clone();
6485 let client_name = client_name.clone();
6486 let connection_id_tx = connection_id_tx.clone();
6487 cx.spawn(move |cx| async move {
6488 if forbid_connections.load(SeqCst) {
6489 Err(EstablishConnectionError::other(anyhow!(
6490 "server is forbidding connections"
6491 )))
6492 } else {
6493 let (client_conn, server_conn, killed) =
6494 Connection::in_memory(cx.background());
6495 connection_killers.lock().insert(user_id, killed);
6496 cx.background()
6497 .spawn(server.handle_connection(
6498 server_conn,
6499 client_name,
6500 user_id,
6501 Some(connection_id_tx),
6502 cx.background(),
6503 ))
6504 .detach();
6505 Ok(client_conn)
6506 }
6507 })
6508 });
6509
6510 client
6511 .authenticate_and_connect(false, &cx.to_async())
6512 .await
6513 .unwrap();
6514
6515 Channel::init(&client);
6516 Project::init(&client);
6517 cx.update(|cx| {
6518 workspace::init(&client, cx);
6519 });
6520
6521 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
6522 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
6523
6524 let client = TestClient {
6525 client,
6526 peer_id,
6527 username: name.to_string(),
6528 user_store,
6529 language_registry: Arc::new(LanguageRegistry::test()),
6530 project: Default::default(),
6531 buffers: Default::default(),
6532 };
6533 client.wait_for_current_user(cx).await;
6534 client
6535 }
6536
6537 fn disconnect_client(&self, user_id: UserId) {
6538 self.connection_killers
6539 .lock()
6540 .remove(&user_id)
6541 .unwrap()
6542 .store(true, SeqCst);
6543 }
6544
6545 fn forbid_connections(&self) {
6546 self.forbid_connections.store(true, SeqCst);
6547 }
6548
6549 fn allow_connections(&self) {
6550 self.forbid_connections.store(false, SeqCst);
6551 }
6552
6553 async fn make_contacts(&self, mut clients: Vec<(&TestClient, &mut TestAppContext)>) {
6554 while let Some((client_a, cx_a)) = clients.pop() {
6555 for (client_b, cx_b) in &mut clients {
6556 client_a
6557 .user_store
6558 .update(cx_a, |store, cx| {
6559 store.request_contact(client_b.user_id().unwrap(), cx)
6560 })
6561 .await
6562 .unwrap();
6563 cx_a.foreground().run_until_parked();
6564 client_b
6565 .user_store
6566 .update(*cx_b, |store, cx| {
6567 store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
6568 })
6569 .await
6570 .unwrap();
6571 }
6572 }
6573 }
6574
6575 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
6576 Arc::new(AppState {
6577 db: test_db.db().clone(),
6578 api_token: Default::default(),
6579 })
6580 }
6581
6582 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
6583 self.server.store.read().await
6584 }
6585
6586 async fn condition<F>(&mut self, mut predicate: F)
6587 where
6588 F: FnMut(&Store) -> bool,
6589 {
6590 assert!(
6591 self.foreground.parking_forbidden(),
6592 "you must call forbid_parking to use server conditions so we don't block indefinitely"
6593 );
6594 while !(predicate)(&*self.server.store.read().await) {
6595 self.foreground.start_waiting();
6596 self.notifications.next().await;
6597 self.foreground.finish_waiting();
6598 }
6599 }
6600 }
6601
6602 impl Deref for TestServer {
6603 type Target = Server;
6604
6605 fn deref(&self) -> &Self::Target {
6606 &self.server
6607 }
6608 }
6609
6610 impl Drop for TestServer {
6611 fn drop(&mut self) {
6612 self.peer.reset();
6613 }
6614 }
6615
6616 struct TestClient {
6617 client: Arc<Client>,
6618 username: String,
6619 pub peer_id: PeerId,
6620 pub user_store: ModelHandle<UserStore>,
6621 language_registry: Arc<LanguageRegistry>,
6622 project: Option<ModelHandle<Project>>,
6623 buffers: HashSet<ModelHandle<language::Buffer>>,
6624 }
6625
6626 impl Deref for TestClient {
6627 type Target = Arc<Client>;
6628
6629 fn deref(&self) -> &Self::Target {
6630 &self.client
6631 }
6632 }
6633
6634 struct ContactsSummary {
6635 pub current: Vec<String>,
6636 pub outgoing_requests: Vec<String>,
6637 pub incoming_requests: Vec<String>,
6638 }
6639
6640 impl TestClient {
6641 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
6642 UserId::from_proto(
6643 self.user_store
6644 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
6645 )
6646 }
6647
6648 async fn wait_for_current_user(&self, cx: &TestAppContext) {
6649 let mut authed_user = self
6650 .user_store
6651 .read_with(cx, |user_store, _| user_store.watch_current_user());
6652 while authed_user.next().await.unwrap().is_none() {}
6653 }
6654
6655 async fn clear_contacts(&self, cx: &mut TestAppContext) {
6656 self.user_store
6657 .update(cx, |store, _| store.clear_contacts())
6658 .await;
6659 }
6660
6661 fn summarize_contacts(&self, cx: &TestAppContext) -> ContactsSummary {
6662 self.user_store.read_with(cx, |store, _| ContactsSummary {
6663 current: store
6664 .contacts()
6665 .iter()
6666 .map(|contact| contact.user.github_login.clone())
6667 .collect(),
6668 outgoing_requests: store
6669 .outgoing_contact_requests()
6670 .iter()
6671 .map(|user| user.github_login.clone())
6672 .collect(),
6673 incoming_requests: store
6674 .incoming_contact_requests()
6675 .iter()
6676 .map(|user| user.github_login.clone())
6677 .collect(),
6678 })
6679 }
6680
6681 async fn build_local_project(
6682 &mut self,
6683 fs: Arc<FakeFs>,
6684 root_path: impl AsRef<Path>,
6685 cx: &mut TestAppContext,
6686 ) -> (ModelHandle<Project>, WorktreeId) {
6687 let project = cx.update(|cx| {
6688 Project::local(
6689 self.client.clone(),
6690 self.user_store.clone(),
6691 self.language_registry.clone(),
6692 fs,
6693 cx,
6694 )
6695 });
6696 self.project = Some(project.clone());
6697 let (worktree, _) = project
6698 .update(cx, |p, cx| {
6699 p.find_or_create_local_worktree(root_path, true, cx)
6700 })
6701 .await
6702 .unwrap();
6703 worktree
6704 .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
6705 .await;
6706 project
6707 .update(cx, |project, _| project.next_remote_id())
6708 .await;
6709 (project, worktree.read_with(cx, |tree, _| tree.id()))
6710 }
6711
6712 async fn build_remote_project(
6713 &mut self,
6714 project_id: u64,
6715 cx: &mut TestAppContext,
6716 ) -> ModelHandle<Project> {
6717 let project = Project::remote(
6718 project_id,
6719 self.client.clone(),
6720 self.user_store.clone(),
6721 self.language_registry.clone(),
6722 FakeFs::new(cx.background()),
6723 &mut cx.to_async(),
6724 )
6725 .await
6726 .unwrap();
6727 self.project = Some(project.clone());
6728 project
6729 }
6730
6731 fn build_workspace(
6732 &self,
6733 project: &ModelHandle<Project>,
6734 cx: &mut TestAppContext,
6735 ) -> ViewHandle<Workspace> {
6736 let (window_id, _) = cx.add_window(|_| EmptyView);
6737 cx.add_view(window_id, |cx| {
6738 let fs = project.read(cx).fs().clone();
6739 Workspace::new(
6740 &WorkspaceParams {
6741 fs,
6742 project: project.clone(),
6743 user_store: self.user_store.clone(),
6744 languages: self.language_registry.clone(),
6745 themes: ThemeRegistry::new((), cx.font_cache().clone()),
6746 channel_list: cx.add_model(|cx| {
6747 ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
6748 }),
6749 client: self.client.clone(),
6750 },
6751 cx,
6752 )
6753 })
6754 }
6755
6756 async fn simulate_host(
6757 mut self,
6758 project: ModelHandle<Project>,
6759 op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6760 rng: Arc<Mutex<StdRng>>,
6761 mut cx: TestAppContext,
6762 ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6763 async fn simulate_host_internal(
6764 client: &mut TestClient,
6765 project: ModelHandle<Project>,
6766 mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6767 rng: Arc<Mutex<StdRng>>,
6768 cx: &mut TestAppContext,
6769 ) -> anyhow::Result<()> {
6770 let fs = project.read_with(cx, |project, _| project.fs().clone());
6771
6772 while op_start_signal.next().await.is_some() {
6773 let distribution = rng.lock().gen_range::<usize, _>(0..100);
6774 let files = fs.as_fake().files().await;
6775 match distribution {
6776 0..=20 if !files.is_empty() => {
6777 let path = files.choose(&mut *rng.lock()).unwrap();
6778 let mut path = path.as_path();
6779 while let Some(parent_path) = path.parent() {
6780 path = parent_path;
6781 if rng.lock().gen() {
6782 break;
6783 }
6784 }
6785
6786 log::info!("Host: find/create local worktree {:?}", path);
6787 let find_or_create_worktree = project.update(cx, |project, cx| {
6788 project.find_or_create_local_worktree(path, true, cx)
6789 });
6790 if rng.lock().gen() {
6791 cx.background().spawn(find_or_create_worktree).detach();
6792 } else {
6793 find_or_create_worktree.await?;
6794 }
6795 }
6796 10..=80 if !files.is_empty() => {
6797 let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6798 let file = files.choose(&mut *rng.lock()).unwrap();
6799 let (worktree, path) = project
6800 .update(cx, |project, cx| {
6801 project.find_or_create_local_worktree(
6802 file.clone(),
6803 true,
6804 cx,
6805 )
6806 })
6807 .await?;
6808 let project_path =
6809 worktree.read_with(cx, |worktree, _| (worktree.id(), path));
6810 log::info!(
6811 "Host: opening path {:?}, worktree {}, relative_path {:?}",
6812 file,
6813 project_path.0,
6814 project_path.1
6815 );
6816 let buffer = project
6817 .update(cx, |project, cx| project.open_buffer(project_path, cx))
6818 .await
6819 .unwrap();
6820 client.buffers.insert(buffer.clone());
6821 buffer
6822 } else {
6823 client
6824 .buffers
6825 .iter()
6826 .choose(&mut *rng.lock())
6827 .unwrap()
6828 .clone()
6829 };
6830
6831 if rng.lock().gen_bool(0.1) {
6832 cx.update(|cx| {
6833 log::info!(
6834 "Host: dropping buffer {:?}",
6835 buffer.read(cx).file().unwrap().full_path(cx)
6836 );
6837 client.buffers.remove(&buffer);
6838 drop(buffer);
6839 });
6840 } else {
6841 buffer.update(cx, |buffer, cx| {
6842 log::info!(
6843 "Host: updating buffer {:?} ({})",
6844 buffer.file().unwrap().full_path(cx),
6845 buffer.remote_id()
6846 );
6847
6848 if rng.lock().gen_bool(0.7) {
6849 buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6850 } else {
6851 buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6852 }
6853 });
6854 }
6855 }
6856 _ => loop {
6857 let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
6858 let mut path = PathBuf::new();
6859 path.push("/");
6860 for _ in 0..path_component_count {
6861 let letter = rng.lock().gen_range(b'a'..=b'z');
6862 path.push(std::str::from_utf8(&[letter]).unwrap());
6863 }
6864 path.set_extension("rs");
6865 let parent_path = path.parent().unwrap();
6866
6867 log::info!("Host: creating file {:?}", path,);
6868
6869 if fs.create_dir(&parent_path).await.is_ok()
6870 && fs.create_file(&path, Default::default()).await.is_ok()
6871 {
6872 break;
6873 } else {
6874 log::info!("Host: cannot create file");
6875 }
6876 },
6877 }
6878
6879 cx.background().simulate_random_delay().await;
6880 }
6881
6882 Ok(())
6883 }
6884
6885 let result =
6886 simulate_host_internal(&mut self, project.clone(), op_start_signal, rng, &mut cx)
6887 .await;
6888 log::info!("Host done");
6889 self.project = Some(project);
6890 (self, cx, result.err())
6891 }
6892
6893 pub async fn simulate_guest(
6894 mut self,
6895 guest_username: String,
6896 project: ModelHandle<Project>,
6897 op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6898 rng: Arc<Mutex<StdRng>>,
6899 mut cx: TestAppContext,
6900 ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6901 async fn simulate_guest_internal(
6902 client: &mut TestClient,
6903 guest_username: &str,
6904 project: ModelHandle<Project>,
6905 mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6906 rng: Arc<Mutex<StdRng>>,
6907 cx: &mut TestAppContext,
6908 ) -> anyhow::Result<()> {
6909 while op_start_signal.next().await.is_some() {
6910 let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6911 let worktree = if let Some(worktree) =
6912 project.read_with(cx, |project, cx| {
6913 project
6914 .worktrees(&cx)
6915 .filter(|worktree| {
6916 let worktree = worktree.read(cx);
6917 worktree.is_visible()
6918 && worktree.entries(false).any(|e| e.is_file())
6919 })
6920 .choose(&mut *rng.lock())
6921 }) {
6922 worktree
6923 } else {
6924 cx.background().simulate_random_delay().await;
6925 continue;
6926 };
6927
6928 let (worktree_root_name, project_path) =
6929 worktree.read_with(cx, |worktree, _| {
6930 let entry = worktree
6931 .entries(false)
6932 .filter(|e| e.is_file())
6933 .choose(&mut *rng.lock())
6934 .unwrap();
6935 (
6936 worktree.root_name().to_string(),
6937 (worktree.id(), entry.path.clone()),
6938 )
6939 });
6940 log::info!(
6941 "{}: opening path {:?} in worktree {} ({})",
6942 guest_username,
6943 project_path.1,
6944 project_path.0,
6945 worktree_root_name,
6946 );
6947 let buffer = project
6948 .update(cx, |project, cx| {
6949 project.open_buffer(project_path.clone(), cx)
6950 })
6951 .await?;
6952 log::info!(
6953 "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
6954 guest_username,
6955 project_path.1,
6956 project_path.0,
6957 worktree_root_name,
6958 buffer.read_with(cx, |buffer, _| buffer.remote_id())
6959 );
6960 client.buffers.insert(buffer.clone());
6961 buffer
6962 } else {
6963 client
6964 .buffers
6965 .iter()
6966 .choose(&mut *rng.lock())
6967 .unwrap()
6968 .clone()
6969 };
6970
6971 let choice = rng.lock().gen_range(0..100);
6972 match choice {
6973 0..=9 => {
6974 cx.update(|cx| {
6975 log::info!(
6976 "{}: dropping buffer {:?}",
6977 guest_username,
6978 buffer.read(cx).file().unwrap().full_path(cx)
6979 );
6980 client.buffers.remove(&buffer);
6981 drop(buffer);
6982 });
6983 }
6984 10..=19 => {
6985 let completions = project.update(cx, |project, cx| {
6986 log::info!(
6987 "{}: requesting completions for buffer {} ({:?})",
6988 guest_username,
6989 buffer.read(cx).remote_id(),
6990 buffer.read(cx).file().unwrap().full_path(cx)
6991 );
6992 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6993 project.completions(&buffer, offset, cx)
6994 });
6995 let completions = cx.background().spawn(async move {
6996 completions
6997 .await
6998 .map_err(|err| anyhow!("completions request failed: {:?}", err))
6999 });
7000 if rng.lock().gen_bool(0.3) {
7001 log::info!("{}: detaching completions request", guest_username);
7002 cx.update(|cx| completions.detach_and_log_err(cx));
7003 } else {
7004 completions.await?;
7005 }
7006 }
7007 20..=29 => {
7008 let code_actions = project.update(cx, |project, cx| {
7009 log::info!(
7010 "{}: requesting code actions for buffer {} ({:?})",
7011 guest_username,
7012 buffer.read(cx).remote_id(),
7013 buffer.read(cx).file().unwrap().full_path(cx)
7014 );
7015 let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
7016 project.code_actions(&buffer, range, cx)
7017 });
7018 let code_actions = cx.background().spawn(async move {
7019 code_actions.await.map_err(|err| {
7020 anyhow!("code actions request failed: {:?}", err)
7021 })
7022 });
7023 if rng.lock().gen_bool(0.3) {
7024 log::info!("{}: detaching code actions request", guest_username);
7025 cx.update(|cx| code_actions.detach_and_log_err(cx));
7026 } else {
7027 code_actions.await?;
7028 }
7029 }
7030 30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
7031 let (requested_version, save) = buffer.update(cx, |buffer, cx| {
7032 log::info!(
7033 "{}: saving buffer {} ({:?})",
7034 guest_username,
7035 buffer.remote_id(),
7036 buffer.file().unwrap().full_path(cx)
7037 );
7038 (buffer.version(), buffer.save(cx))
7039 });
7040 let save = cx.background().spawn(async move {
7041 let (saved_version, _) = save
7042 .await
7043 .map_err(|err| anyhow!("save request failed: {:?}", err))?;
7044 assert!(saved_version.observed_all(&requested_version));
7045 Ok::<_, anyhow::Error>(())
7046 });
7047 if rng.lock().gen_bool(0.3) {
7048 log::info!("{}: detaching save request", guest_username);
7049 cx.update(|cx| save.detach_and_log_err(cx));
7050 } else {
7051 save.await?;
7052 }
7053 }
7054 40..=44 => {
7055 let prepare_rename = project.update(cx, |project, cx| {
7056 log::info!(
7057 "{}: preparing rename for buffer {} ({:?})",
7058 guest_username,
7059 buffer.read(cx).remote_id(),
7060 buffer.read(cx).file().unwrap().full_path(cx)
7061 );
7062 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7063 project.prepare_rename(buffer, offset, cx)
7064 });
7065 let prepare_rename = cx.background().spawn(async move {
7066 prepare_rename.await.map_err(|err| {
7067 anyhow!("prepare rename request failed: {:?}", err)
7068 })
7069 });
7070 if rng.lock().gen_bool(0.3) {
7071 log::info!("{}: detaching prepare rename request", guest_username);
7072 cx.update(|cx| prepare_rename.detach_and_log_err(cx));
7073 } else {
7074 prepare_rename.await?;
7075 }
7076 }
7077 45..=49 => {
7078 let definitions = project.update(cx, |project, cx| {
7079 log::info!(
7080 "{}: requesting definitions for buffer {} ({:?})",
7081 guest_username,
7082 buffer.read(cx).remote_id(),
7083 buffer.read(cx).file().unwrap().full_path(cx)
7084 );
7085 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7086 project.definition(&buffer, offset, cx)
7087 });
7088 let definitions = cx.background().spawn(async move {
7089 definitions
7090 .await
7091 .map_err(|err| anyhow!("definitions request failed: {:?}", err))
7092 });
7093 if rng.lock().gen_bool(0.3) {
7094 log::info!("{}: detaching definitions request", guest_username);
7095 cx.update(|cx| definitions.detach_and_log_err(cx));
7096 } else {
7097 client
7098 .buffers
7099 .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
7100 }
7101 }
7102 50..=54 => {
7103 let highlights = project.update(cx, |project, cx| {
7104 log::info!(
7105 "{}: requesting highlights for buffer {} ({:?})",
7106 guest_username,
7107 buffer.read(cx).remote_id(),
7108 buffer.read(cx).file().unwrap().full_path(cx)
7109 );
7110 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7111 project.document_highlights(&buffer, offset, cx)
7112 });
7113 let highlights = cx.background().spawn(async move {
7114 highlights
7115 .await
7116 .map_err(|err| anyhow!("highlights request failed: {:?}", err))
7117 });
7118 if rng.lock().gen_bool(0.3) {
7119 log::info!("{}: detaching highlights request", guest_username);
7120 cx.update(|cx| highlights.detach_and_log_err(cx));
7121 } else {
7122 highlights.await?;
7123 }
7124 }
7125 55..=59 => {
7126 let search = project.update(cx, |project, cx| {
7127 let query = rng.lock().gen_range('a'..='z');
7128 log::info!("{}: project-wide search {:?}", guest_username, query);
7129 project.search(SearchQuery::text(query, false, false), cx)
7130 });
7131 let search = cx.background().spawn(async move {
7132 search
7133 .await
7134 .map_err(|err| anyhow!("search request failed: {:?}", err))
7135 });
7136 if rng.lock().gen_bool(0.3) {
7137 log::info!("{}: detaching search request", guest_username);
7138 cx.update(|cx| search.detach_and_log_err(cx));
7139 } else {
7140 client.buffers.extend(search.await?.into_keys());
7141 }
7142 }
7143 60..=69 => {
7144 let worktree = project
7145 .read_with(cx, |project, cx| {
7146 project
7147 .worktrees(&cx)
7148 .filter(|worktree| {
7149 let worktree = worktree.read(cx);
7150 worktree.is_visible()
7151 && worktree.entries(false).any(|e| e.is_file())
7152 && worktree
7153 .root_entry()
7154 .map_or(false, |e| e.is_dir())
7155 })
7156 .choose(&mut *rng.lock())
7157 })
7158 .unwrap();
7159 let (worktree_id, worktree_root_name) = worktree
7160 .read_with(cx, |worktree, _| {
7161 (worktree.id(), worktree.root_name().to_string())
7162 });
7163
7164 let mut new_name = String::new();
7165 for _ in 0..10 {
7166 let letter = rng.lock().gen_range('a'..='z');
7167 new_name.push(letter);
7168 }
7169 let mut new_path = PathBuf::new();
7170 new_path.push(new_name);
7171 new_path.set_extension("rs");
7172 log::info!(
7173 "{}: creating {:?} in worktree {} ({})",
7174 guest_username,
7175 new_path,
7176 worktree_id,
7177 worktree_root_name,
7178 );
7179 project
7180 .update(cx, |project, cx| {
7181 project.create_entry((worktree_id, new_path), false, cx)
7182 })
7183 .unwrap()
7184 .await?;
7185 }
7186 _ => {
7187 buffer.update(cx, |buffer, cx| {
7188 log::info!(
7189 "{}: updating buffer {} ({:?})",
7190 guest_username,
7191 buffer.remote_id(),
7192 buffer.file().unwrap().full_path(cx)
7193 );
7194 if rng.lock().gen_bool(0.7) {
7195 buffer.randomly_edit(&mut *rng.lock(), 5, cx);
7196 } else {
7197 buffer.randomly_undo_redo(&mut *rng.lock(), cx);
7198 }
7199 });
7200 }
7201 }
7202 cx.background().simulate_random_delay().await;
7203 }
7204 Ok(())
7205 }
7206
7207 let result = simulate_guest_internal(
7208 &mut self,
7209 &guest_username,
7210 project.clone(),
7211 op_start_signal,
7212 rng,
7213 &mut cx,
7214 )
7215 .await;
7216 log::info!("{}: done", guest_username);
7217
7218 self.project = Some(project);
7219 (self, cx, result.err())
7220 }
7221 }
7222
7223 impl Drop for TestClient {
7224 fn drop(&mut self) {
7225 self.client.tear_down();
7226 }
7227 }
7228
7229 impl Executor for Arc<gpui::executor::Background> {
7230 type Sleep = gpui::executor::Timer;
7231
7232 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
7233 self.spawn(future).detach();
7234 }
7235
7236 fn sleep(&self, duration: Duration) -> Self::Sleep {
7237 self.as_ref().timer(duration)
7238 }
7239 }
7240
7241 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
7242 channel
7243 .messages()
7244 .cursor::<()>()
7245 .map(|m| {
7246 (
7247 m.sender.github_login.clone(),
7248 m.body.clone(),
7249 m.is_pending(),
7250 )
7251 })
7252 .collect()
7253 }
7254
7255 struct EmptyView;
7256
7257 impl gpui::Entity for EmptyView {
7258 type Event = ();
7259 }
7260
7261 impl gpui::View for EmptyView {
7262 fn ui_name() -> &'static str {
7263 "empty view"
7264 }
7265
7266 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
7267 gpui::Element::boxed(gpui::elements::Empty)
7268 }
7269 }
7270}