1mod store;
2
3use crate::{
4 auth,
5 db::{ChannelId, MessageId, UserId},
6 AppState, Result,
7};
8use anyhow::anyhow;
9use async_tungstenite::tungstenite::{
10 protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
11};
12use axum::{
13 body::Body,
14 extract::{
15 ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
16 ConnectInfo, WebSocketUpgrade,
17 },
18 headers::{Header, HeaderName},
19 http::StatusCode,
20 middleware,
21 response::IntoResponse,
22 routing::get,
23 Extension, Router, TypedHeader,
24};
25use collections::HashMap;
26use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
27use lazy_static::lazy_static;
28use rpc::{
29 proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
30 Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
31};
32use std::{
33 any::TypeId,
34 future::Future,
35 marker::PhantomData,
36 net::SocketAddr,
37 ops::{Deref, DerefMut},
38 rc::Rc,
39 sync::{
40 atomic::{AtomicBool, Ordering::SeqCst},
41 Arc,
42 },
43 time::Duration,
44};
45use store::{Store, Worktree};
46use time::OffsetDateTime;
47use tokio::{
48 sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
49 time::Sleep,
50};
51use tower::ServiceBuilder;
52use tracing::{info_span, Instrument};
53
54type MessageHandler =
55 Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
56
57struct Response<R> {
58 server: Arc<Server>,
59 receipt: Receipt<R>,
60 responded: Arc<AtomicBool>,
61}
62
63impl<R: RequestMessage> Response<R> {
64 fn send(self, payload: R::Response) -> Result<()> {
65 self.responded.store(true, SeqCst);
66 self.server.peer.respond(self.receipt, payload)?;
67 Ok(())
68 }
69}
70
71pub struct Server {
72 peer: Arc<Peer>,
73 store: RwLock<Store>,
74 app_state: Arc<AppState>,
75 handlers: HashMap<TypeId, MessageHandler>,
76 notifications: Option<mpsc::UnboundedSender<()>>,
77}
78
79pub trait Executor: Send + Clone {
80 type Sleep: Send + Future;
81 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
82 fn sleep(&self, duration: Duration) -> Self::Sleep;
83}
84
85#[derive(Clone)]
86pub struct RealExecutor;
87
88const MESSAGE_COUNT_PER_PAGE: usize = 100;
89const MAX_MESSAGE_LEN: usize = 1024;
90
91struct StoreReadGuard<'a> {
92 guard: RwLockReadGuard<'a, Store>,
93 _not_send: PhantomData<Rc<()>>,
94}
95
96struct StoreWriteGuard<'a> {
97 guard: RwLockWriteGuard<'a, Store>,
98 _not_send: PhantomData<Rc<()>>,
99}
100
101impl Server {
102 pub fn new(
103 app_state: Arc<AppState>,
104 notifications: Option<mpsc::UnboundedSender<()>>,
105 ) -> Arc<Self> {
106 let mut server = Self {
107 peer: Peer::new(),
108 app_state,
109 store: Default::default(),
110 handlers: Default::default(),
111 notifications,
112 };
113
114 server
115 .add_request_handler(Server::ping)
116 .add_request_handler(Server::register_project)
117 .add_message_handler(Server::unregister_project)
118 .add_request_handler(Server::share_project)
119 .add_message_handler(Server::unshare_project)
120 .add_request_handler(Server::join_project)
121 .add_message_handler(Server::leave_project)
122 .add_request_handler(Server::register_worktree)
123 .add_message_handler(Server::unregister_worktree)
124 .add_request_handler(Server::update_worktree)
125 .add_message_handler(Server::start_language_server)
126 .add_message_handler(Server::update_language_server)
127 .add_message_handler(Server::update_diagnostic_summary)
128 .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
129 .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
130 .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
131 .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
132 .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
133 .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
134 .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
135 .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
136 .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
137 .add_request_handler(
138 Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
139 )
140 .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
141 .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
142 .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
143 .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
144 .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
145 .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
146 .add_request_handler(Server::forward_project_request::<proto::CreateProjectEntry>)
147 .add_request_handler(Server::forward_project_request::<proto::RenameProjectEntry>)
148 .add_request_handler(Server::forward_project_request::<proto::DeleteProjectEntry>)
149 .add_request_handler(Server::update_buffer)
150 .add_message_handler(Server::update_buffer_file)
151 .add_message_handler(Server::buffer_reloaded)
152 .add_message_handler(Server::buffer_saved)
153 .add_request_handler(Server::save_buffer)
154 .add_request_handler(Server::get_channels)
155 .add_request_handler(Server::get_users)
156 .add_request_handler(Server::fuzzy_search_users)
157 .add_request_handler(Server::request_contact)
158 .add_request_handler(Server::remove_contact)
159 .add_request_handler(Server::respond_to_contact_request)
160 .add_request_handler(Server::join_channel)
161 .add_message_handler(Server::leave_channel)
162 .add_request_handler(Server::send_channel_message)
163 .add_request_handler(Server::follow)
164 .add_message_handler(Server::unfollow)
165 .add_message_handler(Server::update_followers)
166 .add_request_handler(Server::get_channel_messages);
167
168 Arc::new(server)
169 }
170
171 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
172 where
173 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
174 Fut: 'static + Send + Future<Output = Result<()>>,
175 M: EnvelopedMessage,
176 {
177 let prev_handler = self.handlers.insert(
178 TypeId::of::<M>(),
179 Box::new(move |server, envelope| {
180 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
181 let span = info_span!(
182 "handle message",
183 payload_type = envelope.payload_type_name(),
184 payload = format!("{:?}", envelope.payload).as_str(),
185 );
186 let future = (handler)(server, *envelope);
187 async move {
188 if let Err(error) = future.await {
189 tracing::error!(%error, "error handling message");
190 }
191 }
192 .instrument(span)
193 .boxed()
194 }),
195 );
196 if prev_handler.is_some() {
197 panic!("registered a handler for the same message twice");
198 }
199 self
200 }
201
202 /// Handle a request while holding a lock to the store. This is useful when we're registering
203 /// a connection but we want to respond on the connection before anybody else can send on it.
204 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
205 where
206 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>, Response<M>) -> Fut,
207 Fut: Send + Future<Output = Result<()>>,
208 M: RequestMessage,
209 {
210 let handler = Arc::new(handler);
211 self.add_message_handler(move |server, envelope| {
212 let receipt = envelope.receipt();
213 let handler = handler.clone();
214 async move {
215 let responded = Arc::new(AtomicBool::default());
216 let response = Response {
217 server: server.clone(),
218 responded: responded.clone(),
219 receipt: envelope.receipt(),
220 };
221 match (handler)(server.clone(), envelope, response).await {
222 Ok(()) => {
223 if responded.load(std::sync::atomic::Ordering::SeqCst) {
224 Ok(())
225 } else {
226 Err(anyhow!("handler did not send a response"))?
227 }
228 }
229 Err(error) => {
230 server.peer.respond_with_error(
231 receipt,
232 proto::Error {
233 message: error.to_string(),
234 },
235 )?;
236 Err(error)
237 }
238 }
239 }
240 })
241 }
242
243 pub fn handle_connection<E: Executor>(
244 self: &Arc<Self>,
245 connection: Connection,
246 address: String,
247 user_id: UserId,
248 mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
249 executor: E,
250 ) -> impl Future<Output = Result<()>> {
251 let mut this = self.clone();
252 let span = info_span!("handle connection", %user_id, %address);
253 async move {
254 let (connection_id, handle_io, mut incoming_rx) = this
255 .peer
256 .add_connection(connection, {
257 let executor = executor.clone();
258 move |duration| {
259 let timer = executor.sleep(duration);
260 async move {
261 timer.await;
262 }
263 }
264 })
265 .await;
266
267 tracing::info!(%user_id, %connection_id, %address, "connection opened");
268
269 if let Some(send_connection_id) = send_connection_id.as_mut() {
270 let _ = send_connection_id.send(connection_id).await;
271 }
272
273 let contacts = this.app_state.db.get_contacts(user_id).await?;
274
275 {
276 let mut store = this.store_mut().await;
277 store.add_connection(connection_id, user_id);
278 this.peer.send(connection_id, store.build_initial_contacts_update(contacts))?;
279 }
280 this.update_user_contacts(user_id).await?;
281
282 let handle_io = handle_io.fuse();
283 futures::pin_mut!(handle_io);
284 loop {
285 let next_message = incoming_rx.next().fuse();
286 futures::pin_mut!(next_message);
287 futures::select_biased! {
288 result = handle_io => {
289 if let Err(error) = result {
290 tracing::error!(%error, "error handling I/O");
291 }
292 break;
293 }
294 message = next_message => {
295 if let Some(message) = message {
296 let type_name = message.payload_type_name();
297 let span = tracing::info_span!("receive message", %user_id, %connection_id, %address, type_name);
298 async {
299 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
300 let notifications = this.notifications.clone();
301 let is_background = message.is_background();
302 let handle_message = (handler)(this.clone(), message);
303 let handle_message = async move {
304 handle_message.await;
305 if let Some(mut notifications) = notifications {
306 let _ = notifications.send(()).await;
307 }
308 };
309 if is_background {
310 executor.spawn_detached(handle_message);
311 } else {
312 handle_message.await;
313 }
314 } else {
315 tracing::error!("no message handler");
316 }
317 }.instrument(span).await;
318 } else {
319 tracing::info!(%user_id, %connection_id, %address, "connection closed");
320 break;
321 }
322 }
323 }
324 }
325
326 if let Err(error) = this.sign_out(connection_id).await {
327 tracing::error!(%error, "error signing out");
328 }
329
330 Ok(())
331 }.instrument(span)
332 }
333
334 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
335 self.peer.disconnect(connection_id);
336 let removed_connection = self.store_mut().await.remove_connection(connection_id)?;
337
338 for (project_id, project) in removed_connection.hosted_projects {
339 if let Some(share) = project.share {
340 broadcast(connection_id, share.guests.keys().copied(), |conn_id| {
341 self.peer
342 .send(conn_id, proto::UnshareProject { project_id })
343 });
344 }
345 }
346
347 for (project_id, peer_ids) in removed_connection.guest_project_ids {
348 broadcast(connection_id, peer_ids, |conn_id| {
349 self.peer.send(
350 conn_id,
351 proto::RemoveProjectCollaborator {
352 project_id,
353 peer_id: connection_id.0,
354 },
355 )
356 });
357 }
358
359 self.update_user_contacts(removed_connection.user_id)
360 .await?;
361
362 Ok(())
363 }
364
365 async fn ping(
366 self: Arc<Server>,
367 _: TypedEnvelope<proto::Ping>,
368 response: Response<proto::Ping>,
369 ) -> Result<()> {
370 response.send(proto::Ack {})?;
371 Ok(())
372 }
373
374 async fn register_project(
375 self: Arc<Server>,
376 request: TypedEnvelope<proto::RegisterProject>,
377 response: Response<proto::RegisterProject>,
378 ) -> Result<()> {
379 let project_id = {
380 let mut state = self.store_mut().await;
381 let user_id = state.user_id_for_connection(request.sender_id)?;
382 state.register_project(request.sender_id, user_id)
383 };
384 response.send(proto::RegisterProjectResponse { project_id })?;
385 Ok(())
386 }
387
388 async fn unregister_project(
389 self: Arc<Server>,
390 request: TypedEnvelope<proto::UnregisterProject>,
391 ) -> Result<()> {
392 let user_id = {
393 let mut state = self.store_mut().await;
394 state.unregister_project(request.payload.project_id, request.sender_id)?;
395 state.user_id_for_connection(request.sender_id)?
396 };
397
398 self.update_user_contacts(user_id).await?;
399 Ok(())
400 }
401
402 async fn share_project(
403 self: Arc<Server>,
404 request: TypedEnvelope<proto::ShareProject>,
405 response: Response<proto::ShareProject>,
406 ) -> Result<()> {
407 let user_id = {
408 let mut state = self.store_mut().await;
409 state.share_project(request.payload.project_id, request.sender_id)?;
410 state.user_id_for_connection(request.sender_id)?
411 };
412 self.update_user_contacts(user_id).await?;
413 response.send(proto::Ack {})?;
414 Ok(())
415 }
416
417 async fn update_user_contacts(self: &Arc<Server>, user_id: UserId) -> Result<()> {
418 let contacts = self.app_state.db.get_contacts(user_id).await?;
419 let store = self.store().await;
420 let updated_contact = store.contact_for_user(user_id);
421 for contact_user_id in contacts.current {
422 for contact_conn_id in store.connection_ids_for_user(contact_user_id) {
423 self.peer
424 .send(
425 contact_conn_id,
426 proto::UpdateContacts {
427 contacts: vec![updated_contact.clone()],
428 remove_contacts: Default::default(),
429 incoming_requests: Default::default(),
430 remove_incoming_requests: Default::default(),
431 outgoing_requests: Default::default(),
432 remove_outgoing_requests: Default::default(),
433 },
434 )
435 .trace_err();
436 }
437 }
438 Ok(())
439 }
440
441 async fn unshare_project(
442 self: Arc<Server>,
443 request: TypedEnvelope<proto::UnshareProject>,
444 ) -> Result<()> {
445 let project_id = request.payload.project_id;
446 let project;
447 {
448 let mut state = self.store_mut().await;
449 project = state.unshare_project(project_id, request.sender_id)?;
450 broadcast(request.sender_id, project.connection_ids, |conn_id| {
451 self.peer
452 .send(conn_id, proto::UnshareProject { project_id })
453 });
454 }
455 self.update_user_contacts(project.host_user_id).await?;
456 Ok(())
457 }
458
459 async fn join_project(
460 self: Arc<Server>,
461 request: TypedEnvelope<proto::JoinProject>,
462 response: Response<proto::JoinProject>,
463 ) -> Result<()> {
464 let project_id = request.payload.project_id;
465 let host_user_id;
466 let guest_user_id;
467 {
468 let state = self.store().await;
469 host_user_id = state.project(project_id)?.host_user_id;
470 guest_user_id = state.user_id_for_connection(request.sender_id)?;
471 };
472
473 let guest_contacts = self.app_state.db.get_contacts(guest_user_id).await?;
474 if !guest_contacts.current.contains(&host_user_id) {
475 return Err(anyhow!("no such project"))?;
476 }
477
478 {
479 let state = &mut *self.store_mut().await;
480 let joined = state.join_project(request.sender_id, guest_user_id, project_id)?;
481 let share = joined.project.share()?;
482 let peer_count = share.guests.len();
483 let mut collaborators = Vec::with_capacity(peer_count);
484 collaborators.push(proto::Collaborator {
485 peer_id: joined.project.host_connection_id.0,
486 replica_id: 0,
487 user_id: joined.project.host_user_id.to_proto(),
488 });
489 let worktrees = share
490 .worktrees
491 .iter()
492 .filter_map(|(id, shared_worktree)| {
493 let worktree = joined.project.worktrees.get(&id)?;
494 Some(proto::Worktree {
495 id: *id,
496 root_name: worktree.root_name.clone(),
497 entries: shared_worktree.entries.values().cloned().collect(),
498 diagnostic_summaries: shared_worktree
499 .diagnostic_summaries
500 .values()
501 .cloned()
502 .collect(),
503 visible: worktree.visible,
504 scan_id: shared_worktree.scan_id,
505 })
506 })
507 .collect();
508 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
509 if *peer_conn_id != request.sender_id {
510 collaborators.push(proto::Collaborator {
511 peer_id: peer_conn_id.0,
512 replica_id: *peer_replica_id as u32,
513 user_id: peer_user_id.to_proto(),
514 });
515 }
516 }
517 broadcast(
518 request.sender_id,
519 joined.project.connection_ids(),
520 |conn_id| {
521 self.peer.send(
522 conn_id,
523 proto::AddProjectCollaborator {
524 project_id,
525 collaborator: Some(proto::Collaborator {
526 peer_id: request.sender_id.0,
527 replica_id: joined.replica_id as u32,
528 user_id: guest_user_id.to_proto(),
529 }),
530 },
531 )
532 },
533 );
534 response.send(proto::JoinProjectResponse {
535 worktrees,
536 replica_id: joined.replica_id as u32,
537 collaborators,
538 language_servers: joined.project.language_servers.clone(),
539 })?;
540 }
541 self.update_user_contacts(host_user_id).await?;
542 Ok(())
543 }
544
545 async fn leave_project(
546 self: Arc<Server>,
547 request: TypedEnvelope<proto::LeaveProject>,
548 ) -> Result<()> {
549 let sender_id = request.sender_id;
550 let project_id = request.payload.project_id;
551 let project;
552 {
553 let mut state = self.store_mut().await;
554 project = state.leave_project(sender_id, project_id)?;
555 broadcast(sender_id, project.connection_ids, |conn_id| {
556 self.peer.send(
557 conn_id,
558 proto::RemoveProjectCollaborator {
559 project_id,
560 peer_id: sender_id.0,
561 },
562 )
563 });
564 }
565 self.update_user_contacts(project.host_user_id).await?;
566 Ok(())
567 }
568
569 async fn register_worktree(
570 self: Arc<Server>,
571 request: TypedEnvelope<proto::RegisterWorktree>,
572 response: Response<proto::RegisterWorktree>,
573 ) -> Result<()> {
574 let host_user_id;
575 {
576 let mut state = self.store_mut().await;
577 host_user_id = state.user_id_for_connection(request.sender_id)?;
578
579 let guest_connection_ids = state
580 .read_project(request.payload.project_id, request.sender_id)?
581 .guest_connection_ids();
582 state.register_worktree(
583 request.payload.project_id,
584 request.payload.worktree_id,
585 request.sender_id,
586 Worktree {
587 root_name: request.payload.root_name.clone(),
588 visible: request.payload.visible,
589 },
590 )?;
591
592 broadcast(request.sender_id, guest_connection_ids, |connection_id| {
593 self.peer
594 .forward_send(request.sender_id, connection_id, request.payload.clone())
595 });
596 }
597 self.update_user_contacts(host_user_id).await?;
598 response.send(proto::Ack {})?;
599 Ok(())
600 }
601
602 async fn unregister_worktree(
603 self: Arc<Server>,
604 request: TypedEnvelope<proto::UnregisterWorktree>,
605 ) -> Result<()> {
606 let host_user_id;
607 let project_id = request.payload.project_id;
608 let worktree_id = request.payload.worktree_id;
609 {
610 let mut state = self.store_mut().await;
611 let (_, guest_connection_ids) =
612 state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
613 host_user_id = state.user_id_for_connection(request.sender_id)?;
614 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
615 self.peer.send(
616 conn_id,
617 proto::UnregisterWorktree {
618 project_id,
619 worktree_id,
620 },
621 )
622 });
623 }
624 self.update_user_contacts(host_user_id).await?;
625 Ok(())
626 }
627
628 async fn update_worktree(
629 self: Arc<Server>,
630 request: TypedEnvelope<proto::UpdateWorktree>,
631 response: Response<proto::UpdateWorktree>,
632 ) -> Result<()> {
633 let connection_ids = self.store_mut().await.update_worktree(
634 request.sender_id,
635 request.payload.project_id,
636 request.payload.worktree_id,
637 &request.payload.removed_entries,
638 &request.payload.updated_entries,
639 request.payload.scan_id,
640 )?;
641
642 broadcast(request.sender_id, connection_ids, |connection_id| {
643 self.peer
644 .forward_send(request.sender_id, connection_id, request.payload.clone())
645 });
646 response.send(proto::Ack {})?;
647 Ok(())
648 }
649
650 async fn update_diagnostic_summary(
651 self: Arc<Server>,
652 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
653 ) -> Result<()> {
654 let summary = request
655 .payload
656 .summary
657 .clone()
658 .ok_or_else(|| anyhow!("invalid summary"))?;
659 let receiver_ids = self.store_mut().await.update_diagnostic_summary(
660 request.payload.project_id,
661 request.payload.worktree_id,
662 request.sender_id,
663 summary,
664 )?;
665
666 broadcast(request.sender_id, receiver_ids, |connection_id| {
667 self.peer
668 .forward_send(request.sender_id, connection_id, request.payload.clone())
669 });
670 Ok(())
671 }
672
673 async fn start_language_server(
674 self: Arc<Server>,
675 request: TypedEnvelope<proto::StartLanguageServer>,
676 ) -> Result<()> {
677 let receiver_ids = self.store_mut().await.start_language_server(
678 request.payload.project_id,
679 request.sender_id,
680 request
681 .payload
682 .server
683 .clone()
684 .ok_or_else(|| anyhow!("invalid language server"))?,
685 )?;
686 broadcast(request.sender_id, receiver_ids, |connection_id| {
687 self.peer
688 .forward_send(request.sender_id, connection_id, request.payload.clone())
689 });
690 Ok(())
691 }
692
693 async fn update_language_server(
694 self: Arc<Server>,
695 request: TypedEnvelope<proto::UpdateLanguageServer>,
696 ) -> Result<()> {
697 let receiver_ids = self
698 .store()
699 .await
700 .project_connection_ids(request.payload.project_id, request.sender_id)?;
701 broadcast(request.sender_id, receiver_ids, |connection_id| {
702 self.peer
703 .forward_send(request.sender_id, connection_id, request.payload.clone())
704 });
705 Ok(())
706 }
707
708 async fn forward_project_request<T>(
709 self: Arc<Server>,
710 request: TypedEnvelope<T>,
711 response: Response<T>,
712 ) -> Result<()>
713 where
714 T: EntityMessage + RequestMessage,
715 {
716 let host_connection_id = self
717 .store()
718 .await
719 .read_project(request.payload.remote_entity_id(), request.sender_id)?
720 .host_connection_id;
721
722 response.send(
723 self.peer
724 .forward_request(request.sender_id, host_connection_id, request.payload)
725 .await?,
726 )?;
727 Ok(())
728 }
729
730 async fn save_buffer(
731 self: Arc<Server>,
732 request: TypedEnvelope<proto::SaveBuffer>,
733 response: Response<proto::SaveBuffer>,
734 ) -> Result<()> {
735 let host = self
736 .store()
737 .await
738 .read_project(request.payload.project_id, request.sender_id)?
739 .host_connection_id;
740 let response_payload = self
741 .peer
742 .forward_request(request.sender_id, host, request.payload.clone())
743 .await?;
744
745 let mut guests = self
746 .store()
747 .await
748 .read_project(request.payload.project_id, request.sender_id)?
749 .connection_ids();
750 guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
751 broadcast(host, guests, |conn_id| {
752 self.peer
753 .forward_send(host, conn_id, response_payload.clone())
754 });
755 response.send(response_payload)?;
756 Ok(())
757 }
758
759 async fn update_buffer(
760 self: Arc<Server>,
761 request: TypedEnvelope<proto::UpdateBuffer>,
762 response: Response<proto::UpdateBuffer>,
763 ) -> Result<()> {
764 let receiver_ids = self
765 .store()
766 .await
767 .project_connection_ids(request.payload.project_id, request.sender_id)?;
768 broadcast(request.sender_id, receiver_ids, |connection_id| {
769 self.peer
770 .forward_send(request.sender_id, connection_id, request.payload.clone())
771 });
772 response.send(proto::Ack {})?;
773 Ok(())
774 }
775
776 async fn update_buffer_file(
777 self: Arc<Server>,
778 request: TypedEnvelope<proto::UpdateBufferFile>,
779 ) -> Result<()> {
780 let receiver_ids = self
781 .store()
782 .await
783 .project_connection_ids(request.payload.project_id, request.sender_id)?;
784 broadcast(request.sender_id, receiver_ids, |connection_id| {
785 self.peer
786 .forward_send(request.sender_id, connection_id, request.payload.clone())
787 });
788 Ok(())
789 }
790
791 async fn buffer_reloaded(
792 self: Arc<Server>,
793 request: TypedEnvelope<proto::BufferReloaded>,
794 ) -> Result<()> {
795 let receiver_ids = self
796 .store()
797 .await
798 .project_connection_ids(request.payload.project_id, request.sender_id)?;
799 broadcast(request.sender_id, receiver_ids, |connection_id| {
800 self.peer
801 .forward_send(request.sender_id, connection_id, request.payload.clone())
802 });
803 Ok(())
804 }
805
806 async fn buffer_saved(
807 self: Arc<Server>,
808 request: TypedEnvelope<proto::BufferSaved>,
809 ) -> Result<()> {
810 let receiver_ids = self
811 .store()
812 .await
813 .project_connection_ids(request.payload.project_id, request.sender_id)?;
814 broadcast(request.sender_id, receiver_ids, |connection_id| {
815 self.peer
816 .forward_send(request.sender_id, connection_id, request.payload.clone())
817 });
818 Ok(())
819 }
820
821 async fn follow(
822 self: Arc<Self>,
823 request: TypedEnvelope<proto::Follow>,
824 response: Response<proto::Follow>,
825 ) -> Result<()> {
826 let leader_id = ConnectionId(request.payload.leader_id);
827 let follower_id = request.sender_id;
828 if !self
829 .store()
830 .await
831 .project_connection_ids(request.payload.project_id, follower_id)?
832 .contains(&leader_id)
833 {
834 Err(anyhow!("no such peer"))?;
835 }
836 let mut response_payload = self
837 .peer
838 .forward_request(request.sender_id, leader_id, request.payload)
839 .await?;
840 response_payload
841 .views
842 .retain(|view| view.leader_id != Some(follower_id.0));
843 response.send(response_payload)?;
844 Ok(())
845 }
846
847 async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
848 let leader_id = ConnectionId(request.payload.leader_id);
849 if !self
850 .store()
851 .await
852 .project_connection_ids(request.payload.project_id, request.sender_id)?
853 .contains(&leader_id)
854 {
855 Err(anyhow!("no such peer"))?;
856 }
857 self.peer
858 .forward_send(request.sender_id, leader_id, request.payload)?;
859 Ok(())
860 }
861
862 async fn update_followers(
863 self: Arc<Self>,
864 request: TypedEnvelope<proto::UpdateFollowers>,
865 ) -> Result<()> {
866 let connection_ids = self
867 .store()
868 .await
869 .project_connection_ids(request.payload.project_id, request.sender_id)?;
870 let leader_id = request
871 .payload
872 .variant
873 .as_ref()
874 .and_then(|variant| match variant {
875 proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
876 proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
877 proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
878 });
879 for follower_id in &request.payload.follower_ids {
880 let follower_id = ConnectionId(*follower_id);
881 if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
882 self.peer
883 .forward_send(request.sender_id, follower_id, request.payload.clone())?;
884 }
885 }
886 Ok(())
887 }
888
889 async fn get_channels(
890 self: Arc<Server>,
891 request: TypedEnvelope<proto::GetChannels>,
892 response: Response<proto::GetChannels>,
893 ) -> Result<()> {
894 let user_id = self
895 .store()
896 .await
897 .user_id_for_connection(request.sender_id)?;
898 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
899 response.send(proto::GetChannelsResponse {
900 channels: channels
901 .into_iter()
902 .map(|chan| proto::Channel {
903 id: chan.id.to_proto(),
904 name: chan.name,
905 })
906 .collect(),
907 })?;
908 Ok(())
909 }
910
911 async fn get_users(
912 self: Arc<Server>,
913 request: TypedEnvelope<proto::GetUsers>,
914 response: Response<proto::GetUsers>,
915 ) -> Result<()> {
916 let user_ids = request
917 .payload
918 .user_ids
919 .into_iter()
920 .map(UserId::from_proto)
921 .collect();
922 let users = self
923 .app_state
924 .db
925 .get_users_by_ids(user_ids)
926 .await?
927 .into_iter()
928 .map(|user| proto::User {
929 id: user.id.to_proto(),
930 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
931 github_login: user.github_login,
932 })
933 .collect();
934 response.send(proto::UsersResponse { users })?;
935 Ok(())
936 }
937
938 async fn fuzzy_search_users(
939 self: Arc<Server>,
940 request: TypedEnvelope<proto::FuzzySearchUsers>,
941 response: Response<proto::FuzzySearchUsers>,
942 ) -> Result<()> {
943 let user_id = self
944 .store()
945 .await
946 .user_id_for_connection(request.sender_id)?;
947 let query = request.payload.query;
948 let db = &self.app_state.db;
949 let users = match query.len() {
950 0 => vec![],
951 1 | 2 => db
952 .get_user_by_github_login(&query)
953 .await?
954 .into_iter()
955 .collect(),
956 _ => db.fuzzy_search_users(&query, 10).await?,
957 };
958 let users = users
959 .into_iter()
960 .filter(|user| user.id != user_id)
961 .map(|user| proto::User {
962 id: user.id.to_proto(),
963 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
964 github_login: user.github_login,
965 })
966 .collect();
967 response.send(proto::UsersResponse { users })?;
968 Ok(())
969 }
970
971 async fn request_contact(
972 self: Arc<Server>,
973 request: TypedEnvelope<proto::RequestContact>,
974 response: Response<proto::RequestContact>,
975 ) -> Result<()> {
976 let requester_id = self
977 .store()
978 .await
979 .user_id_for_connection(request.sender_id)?;
980 let responder_id = UserId::from_proto(request.payload.responder_id);
981 if requester_id == responder_id {
982 return Err(anyhow!("cannot add yourself as a contact"))?;
983 }
984
985 self.app_state
986 .db
987 .send_contact_request(requester_id, responder_id)
988 .await?;
989
990 // Update outgoing contact requests of requester
991 let mut update = proto::UpdateContacts::default();
992 update.outgoing_requests.push(responder_id.to_proto());
993 for connection_id in self.store().await.connection_ids_for_user(requester_id) {
994 self.peer.send(connection_id, update.clone())?;
995 }
996
997 // Update incoming contact requests of responder
998 let mut update = proto::UpdateContacts::default();
999 update
1000 .incoming_requests
1001 .push(proto::IncomingContactRequest {
1002 requester_id: requester_id.to_proto(),
1003 should_notify: true,
1004 });
1005 for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1006 self.peer.send(connection_id, update.clone())?;
1007 }
1008
1009 response.send(proto::Ack {})?;
1010 Ok(())
1011 }
1012
1013 async fn respond_to_contact_request(
1014 self: Arc<Server>,
1015 request: TypedEnvelope<proto::RespondToContactRequest>,
1016 response: Response<proto::RespondToContactRequest>,
1017 ) -> Result<()> {
1018 let responder_id = self
1019 .store()
1020 .await
1021 .user_id_for_connection(request.sender_id)?;
1022 let requester_id = UserId::from_proto(request.payload.requester_id);
1023 let accept = request.payload.response == proto::ContactRequestResponse::Accept as i32;
1024 self.app_state
1025 .db
1026 .respond_to_contact_request(responder_id, requester_id, accept)
1027 .await?;
1028
1029 let store = self.store().await;
1030 // Update responder with new contact
1031 let mut update = proto::UpdateContacts::default();
1032 if accept {
1033 update.contacts.push(store.contact_for_user(requester_id));
1034 }
1035 update
1036 .remove_incoming_requests
1037 .push(requester_id.to_proto());
1038 for connection_id in store.connection_ids_for_user(responder_id) {
1039 self.peer.send(connection_id, update.clone())?;
1040 }
1041
1042 // Update requester with new contact
1043 let mut update = proto::UpdateContacts::default();
1044 if accept {
1045 update.contacts.push(store.contact_for_user(responder_id));
1046 }
1047 update
1048 .remove_outgoing_requests
1049 .push(responder_id.to_proto());
1050 for connection_id in store.connection_ids_for_user(requester_id) {
1051 self.peer.send(connection_id, update.clone())?;
1052 }
1053
1054 response.send(proto::Ack {})?;
1055 Ok(())
1056 }
1057
1058 async fn remove_contact(
1059 self: Arc<Server>,
1060 request: TypedEnvelope<proto::RemoveContact>,
1061 response: Response<proto::RemoveContact>,
1062 ) -> Result<()> {
1063 let requester_id = self
1064 .store()
1065 .await
1066 .user_id_for_connection(request.sender_id)?;
1067 let responder_id = UserId::from_proto(request.payload.user_id);
1068 self.app_state
1069 .db
1070 .remove_contact(requester_id, responder_id)
1071 .await?;
1072
1073 // Update outgoing contact requests of requester
1074 let mut update = proto::UpdateContacts::default();
1075 update
1076 .remove_outgoing_requests
1077 .push(responder_id.to_proto());
1078 for connection_id in self.store().await.connection_ids_for_user(requester_id) {
1079 self.peer.send(connection_id, update.clone())?;
1080 }
1081
1082 // Update incoming contact requests of responder
1083 let mut update = proto::UpdateContacts::default();
1084 update
1085 .remove_incoming_requests
1086 .push(requester_id.to_proto());
1087 for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1088 self.peer.send(connection_id, update.clone())?;
1089 }
1090
1091 response.send(proto::Ack {})?;
1092 Ok(())
1093 }
1094
1095 // #[instrument(skip(self, state, user_ids))]
1096 // fn update_contacts_for_users<'a>(
1097 // self: &Arc<Self>,
1098 // state: &Store,
1099 // user_ids: impl IntoIterator<Item = &'a UserId>,
1100 // ) {
1101 // for user_id in user_ids {
1102 // let contacts = state.contacts_for_user(*user_id);
1103 // for connection_id in state.connection_ids_for_user(*user_id) {
1104 // self.peer
1105 // .send(
1106 // connection_id,
1107 // proto::UpdateContacts {
1108 // contacts: contacts.clone(),
1109 // pending_requests_from_user_ids: Default::default(),
1110 // pending_requests_to_user_ids: Default::default(),
1111 // },
1112 // )
1113 // .trace_err();
1114 // }
1115 // }
1116 // }
1117
1118 async fn join_channel(
1119 self: Arc<Self>,
1120 request: TypedEnvelope<proto::JoinChannel>,
1121 response: Response<proto::JoinChannel>,
1122 ) -> Result<()> {
1123 let user_id = self
1124 .store()
1125 .await
1126 .user_id_for_connection(request.sender_id)?;
1127 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1128 if !self
1129 .app_state
1130 .db
1131 .can_user_access_channel(user_id, channel_id)
1132 .await?
1133 {
1134 Err(anyhow!("access denied"))?;
1135 }
1136
1137 self.store_mut()
1138 .await
1139 .join_channel(request.sender_id, channel_id);
1140 let messages = self
1141 .app_state
1142 .db
1143 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
1144 .await?
1145 .into_iter()
1146 .map(|msg| proto::ChannelMessage {
1147 id: msg.id.to_proto(),
1148 body: msg.body,
1149 timestamp: msg.sent_at.unix_timestamp() as u64,
1150 sender_id: msg.sender_id.to_proto(),
1151 nonce: Some(msg.nonce.as_u128().into()),
1152 })
1153 .collect::<Vec<_>>();
1154 response.send(proto::JoinChannelResponse {
1155 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1156 messages,
1157 })?;
1158 Ok(())
1159 }
1160
1161 async fn leave_channel(
1162 self: Arc<Self>,
1163 request: TypedEnvelope<proto::LeaveChannel>,
1164 ) -> Result<()> {
1165 let user_id = self
1166 .store()
1167 .await
1168 .user_id_for_connection(request.sender_id)?;
1169 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1170 if !self
1171 .app_state
1172 .db
1173 .can_user_access_channel(user_id, channel_id)
1174 .await?
1175 {
1176 Err(anyhow!("access denied"))?;
1177 }
1178
1179 self.store_mut()
1180 .await
1181 .leave_channel(request.sender_id, channel_id);
1182
1183 Ok(())
1184 }
1185
1186 async fn send_channel_message(
1187 self: Arc<Self>,
1188 request: TypedEnvelope<proto::SendChannelMessage>,
1189 response: Response<proto::SendChannelMessage>,
1190 ) -> Result<()> {
1191 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1192 let user_id;
1193 let connection_ids;
1194 {
1195 let state = self.store().await;
1196 user_id = state.user_id_for_connection(request.sender_id)?;
1197 connection_ids = state.channel_connection_ids(channel_id)?;
1198 }
1199
1200 // Validate the message body.
1201 let body = request.payload.body.trim().to_string();
1202 if body.len() > MAX_MESSAGE_LEN {
1203 return Err(anyhow!("message is too long"))?;
1204 }
1205 if body.is_empty() {
1206 return Err(anyhow!("message can't be blank"))?;
1207 }
1208
1209 let timestamp = OffsetDateTime::now_utc();
1210 let nonce = request
1211 .payload
1212 .nonce
1213 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
1214
1215 let message_id = self
1216 .app_state
1217 .db
1218 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1219 .await?
1220 .to_proto();
1221 let message = proto::ChannelMessage {
1222 sender_id: user_id.to_proto(),
1223 id: message_id,
1224 body,
1225 timestamp: timestamp.unix_timestamp() as u64,
1226 nonce: Some(nonce),
1227 };
1228 broadcast(request.sender_id, connection_ids, |conn_id| {
1229 self.peer.send(
1230 conn_id,
1231 proto::ChannelMessageSent {
1232 channel_id: channel_id.to_proto(),
1233 message: Some(message.clone()),
1234 },
1235 )
1236 });
1237 response.send(proto::SendChannelMessageResponse {
1238 message: Some(message),
1239 })?;
1240 Ok(())
1241 }
1242
1243 async fn get_channel_messages(
1244 self: Arc<Self>,
1245 request: TypedEnvelope<proto::GetChannelMessages>,
1246 response: Response<proto::GetChannelMessages>,
1247 ) -> Result<()> {
1248 let user_id = self
1249 .store()
1250 .await
1251 .user_id_for_connection(request.sender_id)?;
1252 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1253 if !self
1254 .app_state
1255 .db
1256 .can_user_access_channel(user_id, channel_id)
1257 .await?
1258 {
1259 Err(anyhow!("access denied"))?;
1260 }
1261
1262 let messages = self
1263 .app_state
1264 .db
1265 .get_channel_messages(
1266 channel_id,
1267 MESSAGE_COUNT_PER_PAGE,
1268 Some(MessageId::from_proto(request.payload.before_message_id)),
1269 )
1270 .await?
1271 .into_iter()
1272 .map(|msg| proto::ChannelMessage {
1273 id: msg.id.to_proto(),
1274 body: msg.body,
1275 timestamp: msg.sent_at.unix_timestamp() as u64,
1276 sender_id: msg.sender_id.to_proto(),
1277 nonce: Some(msg.nonce.as_u128().into()),
1278 })
1279 .collect::<Vec<_>>();
1280 response.send(proto::GetChannelMessagesResponse {
1281 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1282 messages,
1283 })?;
1284 Ok(())
1285 }
1286
1287 async fn store<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1288 #[cfg(test)]
1289 tokio::task::yield_now().await;
1290 let guard = self.store.read().await;
1291 #[cfg(test)]
1292 tokio::task::yield_now().await;
1293 StoreReadGuard {
1294 guard,
1295 _not_send: PhantomData,
1296 }
1297 }
1298
1299 async fn store_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1300 #[cfg(test)]
1301 tokio::task::yield_now().await;
1302 let guard = self.store.write().await;
1303 #[cfg(test)]
1304 tokio::task::yield_now().await;
1305 StoreWriteGuard {
1306 guard,
1307 _not_send: PhantomData,
1308 }
1309 }
1310}
1311
1312impl<'a> Deref for StoreReadGuard<'a> {
1313 type Target = Store;
1314
1315 fn deref(&self) -> &Self::Target {
1316 &*self.guard
1317 }
1318}
1319
1320impl<'a> Deref for StoreWriteGuard<'a> {
1321 type Target = Store;
1322
1323 fn deref(&self) -> &Self::Target {
1324 &*self.guard
1325 }
1326}
1327
1328impl<'a> DerefMut for StoreWriteGuard<'a> {
1329 fn deref_mut(&mut self) -> &mut Self::Target {
1330 &mut *self.guard
1331 }
1332}
1333
1334impl<'a> Drop for StoreWriteGuard<'a> {
1335 fn drop(&mut self) {
1336 #[cfg(test)]
1337 self.check_invariants();
1338
1339 let metrics = self.metrics();
1340 tracing::info!(
1341 connections = metrics.connections,
1342 registered_projects = metrics.registered_projects,
1343 shared_projects = metrics.shared_projects,
1344 "metrics"
1345 );
1346 }
1347}
1348
1349impl Executor for RealExecutor {
1350 type Sleep = Sleep;
1351
1352 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1353 tokio::task::spawn(future);
1354 }
1355
1356 fn sleep(&self, duration: Duration) -> Self::Sleep {
1357 tokio::time::sleep(duration)
1358 }
1359}
1360
1361fn broadcast<F>(
1362 sender_id: ConnectionId,
1363 receiver_ids: impl IntoIterator<Item = ConnectionId>,
1364 mut f: F,
1365) where
1366 F: FnMut(ConnectionId) -> anyhow::Result<()>,
1367{
1368 for receiver_id in receiver_ids {
1369 if receiver_id != sender_id {
1370 f(receiver_id).trace_err();
1371 }
1372 }
1373}
1374
1375lazy_static! {
1376 static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1377}
1378
1379pub struct ProtocolVersion(u32);
1380
1381impl Header for ProtocolVersion {
1382 fn name() -> &'static HeaderName {
1383 &ZED_PROTOCOL_VERSION
1384 }
1385
1386 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1387 where
1388 Self: Sized,
1389 I: Iterator<Item = &'i axum::http::HeaderValue>,
1390 {
1391 let version = values
1392 .next()
1393 .ok_or_else(|| axum::headers::Error::invalid())?
1394 .to_str()
1395 .map_err(|_| axum::headers::Error::invalid())?
1396 .parse()
1397 .map_err(|_| axum::headers::Error::invalid())?;
1398 Ok(Self(version))
1399 }
1400
1401 fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1402 values.extend([self.0.to_string().parse().unwrap()]);
1403 }
1404}
1405
1406pub fn routes(app_state: Arc<AppState>) -> Router<Body> {
1407 let server = Server::new(app_state.clone(), None);
1408 Router::new()
1409 .route("/rpc", get(handle_websocket_request))
1410 .layer(
1411 ServiceBuilder::new()
1412 .layer(Extension(app_state))
1413 .layer(middleware::from_fn(auth::validate_header))
1414 .layer(Extension(server)),
1415 )
1416}
1417
1418pub async fn handle_websocket_request(
1419 TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1420 ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1421 Extension(server): Extension<Arc<Server>>,
1422 Extension(user_id): Extension<UserId>,
1423 ws: WebSocketUpgrade,
1424) -> axum::response::Response {
1425 if protocol_version != rpc::PROTOCOL_VERSION {
1426 return (
1427 StatusCode::UPGRADE_REQUIRED,
1428 "client must be upgraded".to_string(),
1429 )
1430 .into_response();
1431 }
1432 let socket_address = socket_address.to_string();
1433 ws.on_upgrade(move |socket| {
1434 use util::ResultExt;
1435 let socket = socket
1436 .map_ok(to_tungstenite_message)
1437 .err_into()
1438 .with(|message| async move { Ok(to_axum_message(message)) });
1439 let connection = Connection::new(Box::pin(socket));
1440 async move {
1441 server
1442 .handle_connection(connection, socket_address, user_id, None, RealExecutor)
1443 .await
1444 .log_err();
1445 }
1446 })
1447}
1448
1449fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1450 match message {
1451 TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1452 TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1453 TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1454 TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1455 TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1456 code: frame.code.into(),
1457 reason: frame.reason,
1458 })),
1459 }
1460}
1461
1462fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1463 match message {
1464 AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1465 AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1466 AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1467 AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1468 AxumMessage::Close(frame) => {
1469 TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1470 code: frame.code.into(),
1471 reason: frame.reason,
1472 }))
1473 }
1474 }
1475}
1476
1477pub trait ResultExt {
1478 type Ok;
1479
1480 fn trace_err(self) -> Option<Self::Ok>;
1481}
1482
1483impl<T, E> ResultExt for Result<T, E>
1484where
1485 E: std::fmt::Debug,
1486{
1487 type Ok = T;
1488
1489 fn trace_err(self) -> Option<T> {
1490 match self {
1491 Ok(value) => Some(value),
1492 Err(error) => {
1493 tracing::error!("{:?}", error);
1494 None
1495 }
1496 }
1497 }
1498}
1499
1500#[cfg(test)]
1501mod tests {
1502 use super::*;
1503 use crate::{
1504 db::{tests::TestDb, UserId},
1505 AppState,
1506 };
1507 use ::rpc::Peer;
1508 use client::{
1509 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1510 EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1511 };
1512 use collections::{BTreeMap, HashSet};
1513 use editor::{
1514 self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1515 ToOffset, ToggleCodeActions, Undo,
1516 };
1517 use gpui::{
1518 executor::{self, Deterministic},
1519 geometry::vector::vec2f,
1520 ModelHandle, TestAppContext, ViewHandle,
1521 };
1522 use language::{
1523 range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1524 LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1525 };
1526 use lsp::{self, FakeLanguageServer};
1527 use parking_lot::Mutex;
1528 use project::{
1529 fs::{FakeFs, Fs as _},
1530 search::SearchQuery,
1531 worktree::WorktreeHandle,
1532 DiagnosticSummary, Project, ProjectPath, WorktreeId,
1533 };
1534 use rand::prelude::*;
1535 use rpc::PeerId;
1536 use serde_json::json;
1537 use settings::Settings;
1538 use sqlx::types::time::OffsetDateTime;
1539 use std::{
1540 env,
1541 ops::Deref,
1542 path::{Path, PathBuf},
1543 rc::Rc,
1544 sync::{
1545 atomic::{AtomicBool, Ordering::SeqCst},
1546 Arc,
1547 },
1548 time::Duration,
1549 };
1550 use theme::ThemeRegistry;
1551 use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
1552
1553 #[cfg(test)]
1554 #[ctor::ctor]
1555 fn init_logger() {
1556 if std::env::var("RUST_LOG").is_ok() {
1557 env_logger::init();
1558 }
1559 }
1560
1561 #[gpui::test(iterations = 10)]
1562 async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1563 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1564 let lang_registry = Arc::new(LanguageRegistry::test());
1565 let fs = FakeFs::new(cx_a.background());
1566 cx_a.foreground().forbid_parking();
1567
1568 // Connect to a server as 2 clients.
1569 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1570 let client_a = server.create_client(cx_a, "user_a").await;
1571 let client_b = server.create_client(cx_b, "user_b").await;
1572 server
1573 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1574 .await;
1575
1576 // Share a project as client A
1577 fs.insert_tree(
1578 "/a",
1579 json!({
1580 "a.txt": "a-contents",
1581 "b.txt": "b-contents",
1582 }),
1583 )
1584 .await;
1585 let project_a = cx_a.update(|cx| {
1586 Project::local(
1587 client_a.clone(),
1588 client_a.user_store.clone(),
1589 lang_registry.clone(),
1590 fs.clone(),
1591 cx,
1592 )
1593 });
1594 let (worktree_a, _) = project_a
1595 .update(cx_a, |p, cx| {
1596 p.find_or_create_local_worktree("/a", true, cx)
1597 })
1598 .await
1599 .unwrap();
1600 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1601 worktree_a
1602 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1603 .await;
1604 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1605 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1606
1607 // Join that project as client B
1608 let project_b = Project::remote(
1609 project_id,
1610 client_b.clone(),
1611 client_b.user_store.clone(),
1612 lang_registry.clone(),
1613 fs.clone(),
1614 &mut cx_b.to_async(),
1615 )
1616 .await
1617 .unwrap();
1618
1619 let replica_id_b = project_b.read_with(cx_b, |project, _| {
1620 assert_eq!(
1621 project
1622 .collaborators()
1623 .get(&client_a.peer_id)
1624 .unwrap()
1625 .user
1626 .github_login,
1627 "user_a"
1628 );
1629 project.replica_id()
1630 });
1631 project_a
1632 .condition(&cx_a, |tree, _| {
1633 tree.collaborators()
1634 .get(&client_b.peer_id)
1635 .map_or(false, |collaborator| {
1636 collaborator.replica_id == replica_id_b
1637 && collaborator.user.github_login == "user_b"
1638 })
1639 })
1640 .await;
1641
1642 // Open the same file as client B and client A.
1643 let buffer_b = project_b
1644 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1645 .await
1646 .unwrap();
1647 buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1648 project_a.read_with(cx_a, |project, cx| {
1649 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1650 });
1651 let buffer_a = project_a
1652 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1653 .await
1654 .unwrap();
1655
1656 let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1657
1658 // TODO
1659 // // Create a selection set as client B and see that selection set as client A.
1660 // buffer_a
1661 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1662 // .await;
1663
1664 // Edit the buffer as client B and see that edit as client A.
1665 editor_b.update(cx_b, |editor, cx| {
1666 editor.handle_input(&Input("ok, ".into()), cx)
1667 });
1668 buffer_a
1669 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1670 .await;
1671
1672 // TODO
1673 // // Remove the selection set as client B, see those selections disappear as client A.
1674 cx_b.update(move |_| drop(editor_b));
1675 // buffer_a
1676 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1677 // .await;
1678
1679 // Dropping the client B's project removes client B from client A's collaborators.
1680 cx_b.update(move |_| drop(project_b));
1681 project_a
1682 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1683 .await;
1684 }
1685
1686 #[gpui::test(iterations = 10)]
1687 async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1688 let lang_registry = Arc::new(LanguageRegistry::test());
1689 let fs = FakeFs::new(cx_a.background());
1690 cx_a.foreground().forbid_parking();
1691
1692 // Connect to a server as 2 clients.
1693 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1694 let client_a = server.create_client(cx_a, "user_a").await;
1695 let client_b = server.create_client(cx_b, "user_b").await;
1696 server
1697 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1698 .await;
1699
1700 // Share a project as client A
1701 fs.insert_tree(
1702 "/a",
1703 json!({
1704 "a.txt": "a-contents",
1705 "b.txt": "b-contents",
1706 }),
1707 )
1708 .await;
1709 let project_a = cx_a.update(|cx| {
1710 Project::local(
1711 client_a.clone(),
1712 client_a.user_store.clone(),
1713 lang_registry.clone(),
1714 fs.clone(),
1715 cx,
1716 )
1717 });
1718 let (worktree_a, _) = project_a
1719 .update(cx_a, |p, cx| {
1720 p.find_or_create_local_worktree("/a", true, cx)
1721 })
1722 .await
1723 .unwrap();
1724 worktree_a
1725 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1726 .await;
1727 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1728 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1729 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1730 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1731
1732 // Join that project as client B
1733 let project_b = Project::remote(
1734 project_id,
1735 client_b.clone(),
1736 client_b.user_store.clone(),
1737 lang_registry.clone(),
1738 fs.clone(),
1739 &mut cx_b.to_async(),
1740 )
1741 .await
1742 .unwrap();
1743 project_b
1744 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1745 .await
1746 .unwrap();
1747
1748 // Unshare the project as client A
1749 project_a.update(cx_a, |project, cx| project.unshare(cx));
1750 project_b
1751 .condition(cx_b, |project, _| project.is_read_only())
1752 .await;
1753 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1754 cx_b.update(|_| {
1755 drop(project_b);
1756 });
1757
1758 // Share the project again and ensure guests can still join.
1759 project_a
1760 .update(cx_a, |project, cx| project.share(cx))
1761 .await
1762 .unwrap();
1763 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1764
1765 let project_b2 = Project::remote(
1766 project_id,
1767 client_b.clone(),
1768 client_b.user_store.clone(),
1769 lang_registry.clone(),
1770 fs.clone(),
1771 &mut cx_b.to_async(),
1772 )
1773 .await
1774 .unwrap();
1775 project_b2
1776 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1777 .await
1778 .unwrap();
1779 }
1780
1781 #[gpui::test(iterations = 10)]
1782 async fn test_host_disconnect(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1783 let lang_registry = Arc::new(LanguageRegistry::test());
1784 let fs = FakeFs::new(cx_a.background());
1785 cx_a.foreground().forbid_parking();
1786
1787 // Connect to a server as 2 clients.
1788 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1789 let client_a = server.create_client(cx_a, "user_a").await;
1790 let client_b = server.create_client(cx_b, "user_b").await;
1791 server
1792 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1793 .await;
1794
1795 // Share a project as client A
1796 fs.insert_tree(
1797 "/a",
1798 json!({
1799 "a.txt": "a-contents",
1800 "b.txt": "b-contents",
1801 }),
1802 )
1803 .await;
1804 let project_a = cx_a.update(|cx| {
1805 Project::local(
1806 client_a.clone(),
1807 client_a.user_store.clone(),
1808 lang_registry.clone(),
1809 fs.clone(),
1810 cx,
1811 )
1812 });
1813 let (worktree_a, _) = project_a
1814 .update(cx_a, |p, cx| {
1815 p.find_or_create_local_worktree("/a", true, cx)
1816 })
1817 .await
1818 .unwrap();
1819 worktree_a
1820 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1821 .await;
1822 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1823 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1824 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1825 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1826
1827 // Join that project as client B
1828 let project_b = Project::remote(
1829 project_id,
1830 client_b.clone(),
1831 client_b.user_store.clone(),
1832 lang_registry.clone(),
1833 fs.clone(),
1834 &mut cx_b.to_async(),
1835 )
1836 .await
1837 .unwrap();
1838 project_b
1839 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1840 .await
1841 .unwrap();
1842
1843 // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
1844 server.disconnect_client(client_a.current_user_id(cx_a));
1845 cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
1846 project_a
1847 .condition(cx_a, |project, _| project.collaborators().is_empty())
1848 .await;
1849 project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
1850 project_b
1851 .condition(cx_b, |project, _| project.is_read_only())
1852 .await;
1853 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1854 cx_b.update(|_| {
1855 drop(project_b);
1856 });
1857
1858 // Await reconnection
1859 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1860
1861 // Share the project again and ensure guests can still join.
1862 project_a
1863 .update(cx_a, |project, cx| project.share(cx))
1864 .await
1865 .unwrap();
1866 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1867
1868 let project_b2 = Project::remote(
1869 project_id,
1870 client_b.clone(),
1871 client_b.user_store.clone(),
1872 lang_registry.clone(),
1873 fs.clone(),
1874 &mut cx_b.to_async(),
1875 )
1876 .await
1877 .unwrap();
1878 project_b2
1879 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1880 .await
1881 .unwrap();
1882 }
1883
1884 #[gpui::test(iterations = 10)]
1885 async fn test_propagate_saves_and_fs_changes(
1886 cx_a: &mut TestAppContext,
1887 cx_b: &mut TestAppContext,
1888 cx_c: &mut TestAppContext,
1889 ) {
1890 let lang_registry = Arc::new(LanguageRegistry::test());
1891 let fs = FakeFs::new(cx_a.background());
1892 cx_a.foreground().forbid_parking();
1893
1894 // Connect to a server as 3 clients.
1895 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1896 let client_a = server.create_client(cx_a, "user_a").await;
1897 let client_b = server.create_client(cx_b, "user_b").await;
1898 let client_c = server.create_client(cx_c, "user_c").await;
1899 server
1900 .make_contacts(vec![
1901 (&client_a, cx_a),
1902 (&client_b, cx_b),
1903 (&client_c, cx_c),
1904 ])
1905 .await;
1906
1907 // Share a worktree as client A.
1908 fs.insert_tree(
1909 "/a",
1910 json!({
1911 "file1": "",
1912 "file2": ""
1913 }),
1914 )
1915 .await;
1916 let project_a = cx_a.update(|cx| {
1917 Project::local(
1918 client_a.clone(),
1919 client_a.user_store.clone(),
1920 lang_registry.clone(),
1921 fs.clone(),
1922 cx,
1923 )
1924 });
1925 let (worktree_a, _) = project_a
1926 .update(cx_a, |p, cx| {
1927 p.find_or_create_local_worktree("/a", true, cx)
1928 })
1929 .await
1930 .unwrap();
1931 worktree_a
1932 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1933 .await;
1934 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1935 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1936 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1937
1938 // Join that worktree as clients B and C.
1939 let project_b = Project::remote(
1940 project_id,
1941 client_b.clone(),
1942 client_b.user_store.clone(),
1943 lang_registry.clone(),
1944 fs.clone(),
1945 &mut cx_b.to_async(),
1946 )
1947 .await
1948 .unwrap();
1949 let project_c = Project::remote(
1950 project_id,
1951 client_c.clone(),
1952 client_c.user_store.clone(),
1953 lang_registry.clone(),
1954 fs.clone(),
1955 &mut cx_c.to_async(),
1956 )
1957 .await
1958 .unwrap();
1959 let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1960 let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1961
1962 // Open and edit a buffer as both guests B and C.
1963 let buffer_b = project_b
1964 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1965 .await
1966 .unwrap();
1967 let buffer_c = project_c
1968 .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1969 .await
1970 .unwrap();
1971 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "i-am-b, ")], cx));
1972 buffer_c.update(cx_c, |buf, cx| buf.edit([(0..0, "i-am-c, ")], cx));
1973
1974 // Open and edit that buffer as the host.
1975 let buffer_a = project_a
1976 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1977 .await
1978 .unwrap();
1979
1980 buffer_a
1981 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1982 .await;
1983 buffer_a.update(cx_a, |buf, cx| {
1984 buf.edit([(buf.len()..buf.len(), "i-am-a")], cx)
1985 });
1986
1987 // Wait for edits to propagate
1988 buffer_a
1989 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1990 .await;
1991 buffer_b
1992 .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1993 .await;
1994 buffer_c
1995 .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1996 .await;
1997
1998 // Edit the buffer as the host and concurrently save as guest B.
1999 let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
2000 buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "hi-a, ")], cx));
2001 save_b.await.unwrap();
2002 assert_eq!(
2003 fs.load("/a/file1".as_ref()).await.unwrap(),
2004 "hi-a, i-am-c, i-am-b, i-am-a"
2005 );
2006 buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
2007 buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
2008 buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
2009
2010 worktree_a.flush_fs_events(cx_a).await;
2011
2012 // Make changes on host's file system, see those changes on guest worktrees.
2013 fs.rename(
2014 "/a/file1".as_ref(),
2015 "/a/file1-renamed".as_ref(),
2016 Default::default(),
2017 )
2018 .await
2019 .unwrap();
2020
2021 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
2022 .await
2023 .unwrap();
2024 fs.insert_file(Path::new("/a/file4"), "4".into()).await;
2025
2026 worktree_a
2027 .condition(&cx_a, |tree, _| {
2028 tree.paths()
2029 .map(|p| p.to_string_lossy())
2030 .collect::<Vec<_>>()
2031 == ["file1-renamed", "file3", "file4"]
2032 })
2033 .await;
2034 worktree_b
2035 .condition(&cx_b, |tree, _| {
2036 tree.paths()
2037 .map(|p| p.to_string_lossy())
2038 .collect::<Vec<_>>()
2039 == ["file1-renamed", "file3", "file4"]
2040 })
2041 .await;
2042 worktree_c
2043 .condition(&cx_c, |tree, _| {
2044 tree.paths()
2045 .map(|p| p.to_string_lossy())
2046 .collect::<Vec<_>>()
2047 == ["file1-renamed", "file3", "file4"]
2048 })
2049 .await;
2050
2051 // Ensure buffer files are updated as well.
2052 buffer_a
2053 .condition(&cx_a, |buf, _| {
2054 buf.file().unwrap().path().to_str() == Some("file1-renamed")
2055 })
2056 .await;
2057 buffer_b
2058 .condition(&cx_b, |buf, _| {
2059 buf.file().unwrap().path().to_str() == Some("file1-renamed")
2060 })
2061 .await;
2062 buffer_c
2063 .condition(&cx_c, |buf, _| {
2064 buf.file().unwrap().path().to_str() == Some("file1-renamed")
2065 })
2066 .await;
2067 }
2068
2069 #[gpui::test(iterations = 10)]
2070 async fn test_fs_operations(
2071 executor: Arc<Deterministic>,
2072 cx_a: &mut TestAppContext,
2073 cx_b: &mut TestAppContext,
2074 ) {
2075 executor.forbid_parking();
2076 let fs = FakeFs::new(cx_a.background());
2077
2078 // Connect to a server as 2 clients.
2079 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2080 let mut client_a = server.create_client(cx_a, "user_a").await;
2081 let mut client_b = server.create_client(cx_b, "user_b").await;
2082 server
2083 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2084 .await;
2085
2086 // Share a project as client A
2087 fs.insert_tree(
2088 "/dir",
2089 json!({
2090 "a.txt": "a-contents",
2091 "b.txt": "b-contents",
2092 }),
2093 )
2094 .await;
2095
2096 let (project_a, worktree_id) = client_a.build_local_project(fs, "/dir", cx_a).await;
2097 let project_id = project_a.read_with(cx_a, |project, _| project.remote_id().unwrap());
2098 project_a
2099 .update(cx_a, |project, cx| project.share(cx))
2100 .await
2101 .unwrap();
2102
2103 let project_b = client_b.build_remote_project(project_id, cx_b).await;
2104
2105 let worktree_a =
2106 project_a.read_with(cx_a, |project, cx| project.worktrees(cx).next().unwrap());
2107 let worktree_b =
2108 project_b.read_with(cx_b, |project, cx| project.worktrees(cx).next().unwrap());
2109
2110 let entry = project_b
2111 .update(cx_b, |project, cx| {
2112 project
2113 .create_entry((worktree_id, "c.txt"), false, cx)
2114 .unwrap()
2115 })
2116 .await
2117 .unwrap();
2118 worktree_a.read_with(cx_a, |worktree, _| {
2119 assert_eq!(
2120 worktree
2121 .paths()
2122 .map(|p| p.to_string_lossy())
2123 .collect::<Vec<_>>(),
2124 ["a.txt", "b.txt", "c.txt"]
2125 );
2126 });
2127 worktree_b.read_with(cx_b, |worktree, _| {
2128 assert_eq!(
2129 worktree
2130 .paths()
2131 .map(|p| p.to_string_lossy())
2132 .collect::<Vec<_>>(),
2133 ["a.txt", "b.txt", "c.txt"]
2134 );
2135 });
2136
2137 project_b
2138 .update(cx_b, |project, cx| {
2139 project.rename_entry(entry.id, Path::new("d.txt"), cx)
2140 })
2141 .unwrap()
2142 .await
2143 .unwrap();
2144 worktree_a.read_with(cx_a, |worktree, _| {
2145 assert_eq!(
2146 worktree
2147 .paths()
2148 .map(|p| p.to_string_lossy())
2149 .collect::<Vec<_>>(),
2150 ["a.txt", "b.txt", "d.txt"]
2151 );
2152 });
2153 worktree_b.read_with(cx_b, |worktree, _| {
2154 assert_eq!(
2155 worktree
2156 .paths()
2157 .map(|p| p.to_string_lossy())
2158 .collect::<Vec<_>>(),
2159 ["a.txt", "b.txt", "d.txt"]
2160 );
2161 });
2162
2163 let dir_entry = project_b
2164 .update(cx_b, |project, cx| {
2165 project
2166 .create_entry((worktree_id, "DIR"), true, cx)
2167 .unwrap()
2168 })
2169 .await
2170 .unwrap();
2171 worktree_a.read_with(cx_a, |worktree, _| {
2172 assert_eq!(
2173 worktree
2174 .paths()
2175 .map(|p| p.to_string_lossy())
2176 .collect::<Vec<_>>(),
2177 ["DIR", "a.txt", "b.txt", "d.txt"]
2178 );
2179 });
2180 worktree_b.read_with(cx_b, |worktree, _| {
2181 assert_eq!(
2182 worktree
2183 .paths()
2184 .map(|p| p.to_string_lossy())
2185 .collect::<Vec<_>>(),
2186 ["DIR", "a.txt", "b.txt", "d.txt"]
2187 );
2188 });
2189
2190 project_b
2191 .update(cx_b, |project, cx| {
2192 project.delete_entry(dir_entry.id, cx).unwrap()
2193 })
2194 .await
2195 .unwrap();
2196 worktree_a.read_with(cx_a, |worktree, _| {
2197 assert_eq!(
2198 worktree
2199 .paths()
2200 .map(|p| p.to_string_lossy())
2201 .collect::<Vec<_>>(),
2202 ["a.txt", "b.txt", "d.txt"]
2203 );
2204 });
2205 worktree_b.read_with(cx_b, |worktree, _| {
2206 assert_eq!(
2207 worktree
2208 .paths()
2209 .map(|p| p.to_string_lossy())
2210 .collect::<Vec<_>>(),
2211 ["a.txt", "b.txt", "d.txt"]
2212 );
2213 });
2214
2215 project_b
2216 .update(cx_b, |project, cx| {
2217 project.delete_entry(entry.id, cx).unwrap()
2218 })
2219 .await
2220 .unwrap();
2221 worktree_a.read_with(cx_a, |worktree, _| {
2222 assert_eq!(
2223 worktree
2224 .paths()
2225 .map(|p| p.to_string_lossy())
2226 .collect::<Vec<_>>(),
2227 ["a.txt", "b.txt"]
2228 );
2229 });
2230 worktree_b.read_with(cx_b, |worktree, _| {
2231 assert_eq!(
2232 worktree
2233 .paths()
2234 .map(|p| p.to_string_lossy())
2235 .collect::<Vec<_>>(),
2236 ["a.txt", "b.txt"]
2237 );
2238 });
2239 }
2240
2241 #[gpui::test(iterations = 10)]
2242 async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2243 cx_a.foreground().forbid_parking();
2244 let lang_registry = Arc::new(LanguageRegistry::test());
2245 let fs = FakeFs::new(cx_a.background());
2246
2247 // Connect to a server as 2 clients.
2248 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2249 let client_a = server.create_client(cx_a, "user_a").await;
2250 let client_b = server.create_client(cx_b, "user_b").await;
2251 server
2252 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2253 .await;
2254
2255 // Share a project as client A
2256 fs.insert_tree(
2257 "/dir",
2258 json!({
2259 "a.txt": "a-contents",
2260 }),
2261 )
2262 .await;
2263
2264 let project_a = cx_a.update(|cx| {
2265 Project::local(
2266 client_a.clone(),
2267 client_a.user_store.clone(),
2268 lang_registry.clone(),
2269 fs.clone(),
2270 cx,
2271 )
2272 });
2273 let (worktree_a, _) = project_a
2274 .update(cx_a, |p, cx| {
2275 p.find_or_create_local_worktree("/dir", true, cx)
2276 })
2277 .await
2278 .unwrap();
2279 worktree_a
2280 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2281 .await;
2282 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2283 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2284 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2285
2286 // Join that project as client B
2287 let project_b = Project::remote(
2288 project_id,
2289 client_b.clone(),
2290 client_b.user_store.clone(),
2291 lang_registry.clone(),
2292 fs.clone(),
2293 &mut cx_b.to_async(),
2294 )
2295 .await
2296 .unwrap();
2297
2298 // Open a buffer as client B
2299 let buffer_b = project_b
2300 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2301 .await
2302 .unwrap();
2303
2304 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "world ")], cx));
2305 buffer_b.read_with(cx_b, |buf, _| {
2306 assert!(buf.is_dirty());
2307 assert!(!buf.has_conflict());
2308 });
2309
2310 buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
2311 buffer_b
2312 .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
2313 .await;
2314 buffer_b.read_with(cx_b, |buf, _| {
2315 assert!(!buf.has_conflict());
2316 });
2317
2318 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "hello ")], cx));
2319 buffer_b.read_with(cx_b, |buf, _| {
2320 assert!(buf.is_dirty());
2321 assert!(!buf.has_conflict());
2322 });
2323 }
2324
2325 #[gpui::test(iterations = 10)]
2326 async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2327 cx_a.foreground().forbid_parking();
2328 let lang_registry = Arc::new(LanguageRegistry::test());
2329 let fs = FakeFs::new(cx_a.background());
2330
2331 // Connect to a server as 2 clients.
2332 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2333 let client_a = server.create_client(cx_a, "user_a").await;
2334 let client_b = server.create_client(cx_b, "user_b").await;
2335 server
2336 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2337 .await;
2338
2339 // Share a project as client A
2340 fs.insert_tree(
2341 "/dir",
2342 json!({
2343 "a.txt": "a-contents",
2344 }),
2345 )
2346 .await;
2347
2348 let project_a = cx_a.update(|cx| {
2349 Project::local(
2350 client_a.clone(),
2351 client_a.user_store.clone(),
2352 lang_registry.clone(),
2353 fs.clone(),
2354 cx,
2355 )
2356 });
2357 let (worktree_a, _) = project_a
2358 .update(cx_a, |p, cx| {
2359 p.find_or_create_local_worktree("/dir", true, cx)
2360 })
2361 .await
2362 .unwrap();
2363 worktree_a
2364 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2365 .await;
2366 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2367 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2368 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2369
2370 // Join that project as client B
2371 let project_b = Project::remote(
2372 project_id,
2373 client_b.clone(),
2374 client_b.user_store.clone(),
2375 lang_registry.clone(),
2376 fs.clone(),
2377 &mut cx_b.to_async(),
2378 )
2379 .await
2380 .unwrap();
2381 let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2382
2383 // Open a buffer as client B
2384 let buffer_b = project_b
2385 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2386 .await
2387 .unwrap();
2388 buffer_b.read_with(cx_b, |buf, _| {
2389 assert!(!buf.is_dirty());
2390 assert!(!buf.has_conflict());
2391 });
2392
2393 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
2394 .await
2395 .unwrap();
2396 buffer_b
2397 .condition(&cx_b, |buf, _| {
2398 buf.text() == "new contents" && !buf.is_dirty()
2399 })
2400 .await;
2401 buffer_b.read_with(cx_b, |buf, _| {
2402 assert!(!buf.has_conflict());
2403 });
2404 }
2405
2406 #[gpui::test(iterations = 10)]
2407 async fn test_editing_while_guest_opens_buffer(
2408 cx_a: &mut TestAppContext,
2409 cx_b: &mut TestAppContext,
2410 ) {
2411 cx_a.foreground().forbid_parking();
2412 let lang_registry = Arc::new(LanguageRegistry::test());
2413 let fs = FakeFs::new(cx_a.background());
2414
2415 // Connect to a server as 2 clients.
2416 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2417 let client_a = server.create_client(cx_a, "user_a").await;
2418 let client_b = server.create_client(cx_b, "user_b").await;
2419 server
2420 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2421 .await;
2422
2423 // Share a project as client A
2424 fs.insert_tree(
2425 "/dir",
2426 json!({
2427 "a.txt": "a-contents",
2428 }),
2429 )
2430 .await;
2431 let project_a = cx_a.update(|cx| {
2432 Project::local(
2433 client_a.clone(),
2434 client_a.user_store.clone(),
2435 lang_registry.clone(),
2436 fs.clone(),
2437 cx,
2438 )
2439 });
2440 let (worktree_a, _) = project_a
2441 .update(cx_a, |p, cx| {
2442 p.find_or_create_local_worktree("/dir", true, cx)
2443 })
2444 .await
2445 .unwrap();
2446 worktree_a
2447 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2448 .await;
2449 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2450 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2451 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2452
2453 // Join that project as client B
2454 let project_b = Project::remote(
2455 project_id,
2456 client_b.clone(),
2457 client_b.user_store.clone(),
2458 lang_registry.clone(),
2459 fs.clone(),
2460 &mut cx_b.to_async(),
2461 )
2462 .await
2463 .unwrap();
2464
2465 // Open a buffer as client A
2466 let buffer_a = project_a
2467 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2468 .await
2469 .unwrap();
2470
2471 // Start opening the same buffer as client B
2472 let buffer_b = cx_b
2473 .background()
2474 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2475
2476 // Edit the buffer as client A while client B is still opening it.
2477 cx_b.background().simulate_random_delay().await;
2478 buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "X")], cx));
2479 cx_b.background().simulate_random_delay().await;
2480 buffer_a.update(cx_a, |buf, cx| buf.edit([(1..1, "Y")], cx));
2481
2482 let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2483 let buffer_b = buffer_b.await.unwrap();
2484 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2485 }
2486
2487 #[gpui::test(iterations = 10)]
2488 async fn test_leaving_worktree_while_opening_buffer(
2489 cx_a: &mut TestAppContext,
2490 cx_b: &mut TestAppContext,
2491 ) {
2492 cx_a.foreground().forbid_parking();
2493 let lang_registry = Arc::new(LanguageRegistry::test());
2494 let fs = FakeFs::new(cx_a.background());
2495
2496 // Connect to a server as 2 clients.
2497 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2498 let client_a = server.create_client(cx_a, "user_a").await;
2499 let client_b = server.create_client(cx_b, "user_b").await;
2500 server
2501 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2502 .await;
2503
2504 // Share a project as client A
2505 fs.insert_tree(
2506 "/dir",
2507 json!({
2508 "a.txt": "a-contents",
2509 }),
2510 )
2511 .await;
2512 let project_a = cx_a.update(|cx| {
2513 Project::local(
2514 client_a.clone(),
2515 client_a.user_store.clone(),
2516 lang_registry.clone(),
2517 fs.clone(),
2518 cx,
2519 )
2520 });
2521 let (worktree_a, _) = project_a
2522 .update(cx_a, |p, cx| {
2523 p.find_or_create_local_worktree("/dir", true, cx)
2524 })
2525 .await
2526 .unwrap();
2527 worktree_a
2528 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2529 .await;
2530 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2531 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2532 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2533
2534 // Join that project as client B
2535 let project_b = Project::remote(
2536 project_id,
2537 client_b.clone(),
2538 client_b.user_store.clone(),
2539 lang_registry.clone(),
2540 fs.clone(),
2541 &mut cx_b.to_async(),
2542 )
2543 .await
2544 .unwrap();
2545
2546 // See that a guest has joined as client A.
2547 project_a
2548 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2549 .await;
2550
2551 // Begin opening a buffer as client B, but leave the project before the open completes.
2552 let buffer_b = cx_b
2553 .background()
2554 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2555 cx_b.update(|_| drop(project_b));
2556 drop(buffer_b);
2557
2558 // See that the guest has left.
2559 project_a
2560 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2561 .await;
2562 }
2563
2564 #[gpui::test(iterations = 10)]
2565 async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2566 cx_a.foreground().forbid_parking();
2567 let lang_registry = Arc::new(LanguageRegistry::test());
2568 let fs = FakeFs::new(cx_a.background());
2569
2570 // Connect to a server as 2 clients.
2571 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2572 let client_a = server.create_client(cx_a, "user_a").await;
2573 let client_b = server.create_client(cx_b, "user_b").await;
2574 server
2575 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2576 .await;
2577
2578 // Share a project as client A
2579 fs.insert_tree(
2580 "/a",
2581 json!({
2582 "a.txt": "a-contents",
2583 "b.txt": "b-contents",
2584 }),
2585 )
2586 .await;
2587 let project_a = cx_a.update(|cx| {
2588 Project::local(
2589 client_a.clone(),
2590 client_a.user_store.clone(),
2591 lang_registry.clone(),
2592 fs.clone(),
2593 cx,
2594 )
2595 });
2596 let (worktree_a, _) = project_a
2597 .update(cx_a, |p, cx| {
2598 p.find_or_create_local_worktree("/a", true, cx)
2599 })
2600 .await
2601 .unwrap();
2602 worktree_a
2603 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2604 .await;
2605 let project_id = project_a
2606 .update(cx_a, |project, _| project.next_remote_id())
2607 .await;
2608 project_a
2609 .update(cx_a, |project, cx| project.share(cx))
2610 .await
2611 .unwrap();
2612
2613 // Join that project as client B
2614 let _project_b = Project::remote(
2615 project_id,
2616 client_b.clone(),
2617 client_b.user_store.clone(),
2618 lang_registry.clone(),
2619 fs.clone(),
2620 &mut cx_b.to_async(),
2621 )
2622 .await
2623 .unwrap();
2624
2625 // Client A sees that a guest has joined.
2626 project_a
2627 .condition(cx_a, |p, _| p.collaborators().len() == 1)
2628 .await;
2629
2630 // Drop client B's connection and ensure client A observes client B leaving the project.
2631 client_b.disconnect(&cx_b.to_async()).unwrap();
2632 project_a
2633 .condition(cx_a, |p, _| p.collaborators().len() == 0)
2634 .await;
2635
2636 // Rejoin the project as client B
2637 let _project_b = Project::remote(
2638 project_id,
2639 client_b.clone(),
2640 client_b.user_store.clone(),
2641 lang_registry.clone(),
2642 fs.clone(),
2643 &mut cx_b.to_async(),
2644 )
2645 .await
2646 .unwrap();
2647
2648 // Client A sees that a guest has re-joined.
2649 project_a
2650 .condition(cx_a, |p, _| p.collaborators().len() == 1)
2651 .await;
2652
2653 // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2654 client_b.wait_for_current_user(cx_b).await;
2655 server.disconnect_client(client_b.current_user_id(cx_b));
2656 cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
2657 project_a
2658 .condition(cx_a, |p, _| p.collaborators().len() == 0)
2659 .await;
2660 }
2661
2662 #[gpui::test(iterations = 10)]
2663 async fn test_collaborating_with_diagnostics(
2664 cx_a: &mut TestAppContext,
2665 cx_b: &mut TestAppContext,
2666 ) {
2667 cx_a.foreground().forbid_parking();
2668 let lang_registry = Arc::new(LanguageRegistry::test());
2669 let fs = FakeFs::new(cx_a.background());
2670
2671 // Set up a fake language server.
2672 let mut language = Language::new(
2673 LanguageConfig {
2674 name: "Rust".into(),
2675 path_suffixes: vec!["rs".to_string()],
2676 ..Default::default()
2677 },
2678 Some(tree_sitter_rust::language()),
2679 );
2680 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2681 lang_registry.add(Arc::new(language));
2682
2683 // Connect to a server as 2 clients.
2684 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2685 let client_a = server.create_client(cx_a, "user_a").await;
2686 let client_b = server.create_client(cx_b, "user_b").await;
2687 server
2688 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2689 .await;
2690
2691 // Share a project as client A
2692 fs.insert_tree(
2693 "/a",
2694 json!({
2695 "a.rs": "let one = two",
2696 "other.rs": "",
2697 }),
2698 )
2699 .await;
2700 let project_a = cx_a.update(|cx| {
2701 Project::local(
2702 client_a.clone(),
2703 client_a.user_store.clone(),
2704 lang_registry.clone(),
2705 fs.clone(),
2706 cx,
2707 )
2708 });
2709 let (worktree_a, _) = project_a
2710 .update(cx_a, |p, cx| {
2711 p.find_or_create_local_worktree("/a", true, cx)
2712 })
2713 .await
2714 .unwrap();
2715 worktree_a
2716 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2717 .await;
2718 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2719 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2720 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2721
2722 // Cause the language server to start.
2723 let _ = cx_a
2724 .background()
2725 .spawn(project_a.update(cx_a, |project, cx| {
2726 project.open_buffer(
2727 ProjectPath {
2728 worktree_id,
2729 path: Path::new("other.rs").into(),
2730 },
2731 cx,
2732 )
2733 }))
2734 .await
2735 .unwrap();
2736
2737 // Simulate a language server reporting errors for a file.
2738 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2739 fake_language_server
2740 .receive_notification::<lsp::notification::DidOpenTextDocument>()
2741 .await;
2742 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2743 lsp::PublishDiagnosticsParams {
2744 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2745 version: None,
2746 diagnostics: vec![lsp::Diagnostic {
2747 severity: Some(lsp::DiagnosticSeverity::ERROR),
2748 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2749 message: "message 1".to_string(),
2750 ..Default::default()
2751 }],
2752 },
2753 );
2754
2755 // Wait for server to see the diagnostics update.
2756 server
2757 .condition(|store| {
2758 let worktree = store
2759 .project(project_id)
2760 .unwrap()
2761 .share
2762 .as_ref()
2763 .unwrap()
2764 .worktrees
2765 .get(&worktree_id.to_proto())
2766 .unwrap();
2767
2768 !worktree.diagnostic_summaries.is_empty()
2769 })
2770 .await;
2771
2772 // Join the worktree as client B.
2773 let project_b = Project::remote(
2774 project_id,
2775 client_b.clone(),
2776 client_b.user_store.clone(),
2777 lang_registry.clone(),
2778 fs.clone(),
2779 &mut cx_b.to_async(),
2780 )
2781 .await
2782 .unwrap();
2783
2784 project_b.read_with(cx_b, |project, cx| {
2785 assert_eq!(
2786 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2787 &[(
2788 ProjectPath {
2789 worktree_id,
2790 path: Arc::from(Path::new("a.rs")),
2791 },
2792 DiagnosticSummary {
2793 error_count: 1,
2794 warning_count: 0,
2795 ..Default::default()
2796 },
2797 )]
2798 )
2799 });
2800
2801 // Simulate a language server reporting more errors for a file.
2802 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2803 lsp::PublishDiagnosticsParams {
2804 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2805 version: None,
2806 diagnostics: vec![
2807 lsp::Diagnostic {
2808 severity: Some(lsp::DiagnosticSeverity::ERROR),
2809 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2810 message: "message 1".to_string(),
2811 ..Default::default()
2812 },
2813 lsp::Diagnostic {
2814 severity: Some(lsp::DiagnosticSeverity::WARNING),
2815 range: lsp::Range::new(
2816 lsp::Position::new(0, 10),
2817 lsp::Position::new(0, 13),
2818 ),
2819 message: "message 2".to_string(),
2820 ..Default::default()
2821 },
2822 ],
2823 },
2824 );
2825
2826 // Client b gets the updated summaries
2827 project_b
2828 .condition(&cx_b, |project, cx| {
2829 project.diagnostic_summaries(cx).collect::<Vec<_>>()
2830 == &[(
2831 ProjectPath {
2832 worktree_id,
2833 path: Arc::from(Path::new("a.rs")),
2834 },
2835 DiagnosticSummary {
2836 error_count: 1,
2837 warning_count: 1,
2838 ..Default::default()
2839 },
2840 )]
2841 })
2842 .await;
2843
2844 // Open the file with the errors on client B. They should be present.
2845 let buffer_b = cx_b
2846 .background()
2847 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2848 .await
2849 .unwrap();
2850
2851 buffer_b.read_with(cx_b, |buffer, _| {
2852 assert_eq!(
2853 buffer
2854 .snapshot()
2855 .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2856 .map(|entry| entry)
2857 .collect::<Vec<_>>(),
2858 &[
2859 DiagnosticEntry {
2860 range: Point::new(0, 4)..Point::new(0, 7),
2861 diagnostic: Diagnostic {
2862 group_id: 0,
2863 message: "message 1".to_string(),
2864 severity: lsp::DiagnosticSeverity::ERROR,
2865 is_primary: true,
2866 ..Default::default()
2867 }
2868 },
2869 DiagnosticEntry {
2870 range: Point::new(0, 10)..Point::new(0, 13),
2871 diagnostic: Diagnostic {
2872 group_id: 1,
2873 severity: lsp::DiagnosticSeverity::WARNING,
2874 message: "message 2".to_string(),
2875 is_primary: true,
2876 ..Default::default()
2877 }
2878 }
2879 ]
2880 );
2881 });
2882
2883 // Simulate a language server reporting no errors for a file.
2884 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2885 lsp::PublishDiagnosticsParams {
2886 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2887 version: None,
2888 diagnostics: vec![],
2889 },
2890 );
2891 project_a
2892 .condition(cx_a, |project, cx| {
2893 project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2894 })
2895 .await;
2896 project_b
2897 .condition(cx_b, |project, cx| {
2898 project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2899 })
2900 .await;
2901 }
2902
2903 #[gpui::test(iterations = 10)]
2904 async fn test_collaborating_with_completion(
2905 cx_a: &mut TestAppContext,
2906 cx_b: &mut TestAppContext,
2907 ) {
2908 cx_a.foreground().forbid_parking();
2909 let lang_registry = Arc::new(LanguageRegistry::test());
2910 let fs = FakeFs::new(cx_a.background());
2911
2912 // Set up a fake language server.
2913 let mut language = Language::new(
2914 LanguageConfig {
2915 name: "Rust".into(),
2916 path_suffixes: vec!["rs".to_string()],
2917 ..Default::default()
2918 },
2919 Some(tree_sitter_rust::language()),
2920 );
2921 let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
2922 capabilities: lsp::ServerCapabilities {
2923 completion_provider: Some(lsp::CompletionOptions {
2924 trigger_characters: Some(vec![".".to_string()]),
2925 ..Default::default()
2926 }),
2927 ..Default::default()
2928 },
2929 ..Default::default()
2930 });
2931 lang_registry.add(Arc::new(language));
2932
2933 // Connect to a server as 2 clients.
2934 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2935 let client_a = server.create_client(cx_a, "user_a").await;
2936 let client_b = server.create_client(cx_b, "user_b").await;
2937 server
2938 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2939 .await;
2940
2941 // Share a project as client A
2942 fs.insert_tree(
2943 "/a",
2944 json!({
2945 "main.rs": "fn main() { a }",
2946 "other.rs": "",
2947 }),
2948 )
2949 .await;
2950 let project_a = cx_a.update(|cx| {
2951 Project::local(
2952 client_a.clone(),
2953 client_a.user_store.clone(),
2954 lang_registry.clone(),
2955 fs.clone(),
2956 cx,
2957 )
2958 });
2959 let (worktree_a, _) = project_a
2960 .update(cx_a, |p, cx| {
2961 p.find_or_create_local_worktree("/a", true, cx)
2962 })
2963 .await
2964 .unwrap();
2965 worktree_a
2966 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2967 .await;
2968 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2969 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2970 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2971
2972 // Join the worktree as client B.
2973 let project_b = Project::remote(
2974 project_id,
2975 client_b.clone(),
2976 client_b.user_store.clone(),
2977 lang_registry.clone(),
2978 fs.clone(),
2979 &mut cx_b.to_async(),
2980 )
2981 .await
2982 .unwrap();
2983
2984 // Open a file in an editor as the guest.
2985 let buffer_b = project_b
2986 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2987 .await
2988 .unwrap();
2989 let (window_b, _) = cx_b.add_window(|_| EmptyView);
2990 let editor_b = cx_b.add_view(window_b, |cx| {
2991 Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
2992 });
2993
2994 let fake_language_server = fake_language_servers.next().await.unwrap();
2995 buffer_b
2996 .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2997 .await;
2998
2999 // Type a completion trigger character as the guest.
3000 editor_b.update(cx_b, |editor, cx| {
3001 editor.select_ranges([13..13], None, cx);
3002 editor.handle_input(&Input(".".into()), cx);
3003 cx.focus(&editor_b);
3004 });
3005
3006 // Receive a completion request as the host's language server.
3007 // Return some completions from the host's language server.
3008 cx_a.foreground().start_waiting();
3009 fake_language_server
3010 .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
3011 assert_eq!(
3012 params.text_document_position.text_document.uri,
3013 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3014 );
3015 assert_eq!(
3016 params.text_document_position.position,
3017 lsp::Position::new(0, 14),
3018 );
3019
3020 Ok(Some(lsp::CompletionResponse::Array(vec![
3021 lsp::CompletionItem {
3022 label: "first_method(…)".into(),
3023 detail: Some("fn(&mut self, B) -> C".into()),
3024 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3025 new_text: "first_method($1)".to_string(),
3026 range: lsp::Range::new(
3027 lsp::Position::new(0, 14),
3028 lsp::Position::new(0, 14),
3029 ),
3030 })),
3031 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3032 ..Default::default()
3033 },
3034 lsp::CompletionItem {
3035 label: "second_method(…)".into(),
3036 detail: Some("fn(&mut self, C) -> D<E>".into()),
3037 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3038 new_text: "second_method()".to_string(),
3039 range: lsp::Range::new(
3040 lsp::Position::new(0, 14),
3041 lsp::Position::new(0, 14),
3042 ),
3043 })),
3044 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3045 ..Default::default()
3046 },
3047 ])))
3048 })
3049 .next()
3050 .await
3051 .unwrap();
3052 cx_a.foreground().finish_waiting();
3053
3054 // Open the buffer on the host.
3055 let buffer_a = project_a
3056 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
3057 .await
3058 .unwrap();
3059 buffer_a
3060 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
3061 .await;
3062
3063 // Confirm a completion on the guest.
3064 editor_b
3065 .condition(&cx_b, |editor, _| editor.context_menu_visible())
3066 .await;
3067 editor_b.update(cx_b, |editor, cx| {
3068 editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
3069 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
3070 });
3071
3072 // Return a resolved completion from the host's language server.
3073 // The resolved completion has an additional text edit.
3074 fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
3075 |params, _| async move {
3076 assert_eq!(params.label, "first_method(…)");
3077 Ok(lsp::CompletionItem {
3078 label: "first_method(…)".into(),
3079 detail: Some("fn(&mut self, B) -> C".into()),
3080 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3081 new_text: "first_method($1)".to_string(),
3082 range: lsp::Range::new(
3083 lsp::Position::new(0, 14),
3084 lsp::Position::new(0, 14),
3085 ),
3086 })),
3087 additional_text_edits: Some(vec![lsp::TextEdit {
3088 new_text: "use d::SomeTrait;\n".to_string(),
3089 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
3090 }]),
3091 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3092 ..Default::default()
3093 })
3094 },
3095 );
3096
3097 // The additional edit is applied.
3098 buffer_a
3099 .condition(&cx_a, |buffer, _| {
3100 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3101 })
3102 .await;
3103 buffer_b
3104 .condition(&cx_b, |buffer, _| {
3105 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3106 })
3107 .await;
3108 }
3109
3110 #[gpui::test(iterations = 10)]
3111 async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3112 cx_a.foreground().forbid_parking();
3113 let lang_registry = Arc::new(LanguageRegistry::test());
3114 let fs = FakeFs::new(cx_a.background());
3115
3116 // Connect to a server as 2 clients.
3117 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3118 let client_a = server.create_client(cx_a, "user_a").await;
3119 let client_b = server.create_client(cx_b, "user_b").await;
3120 server
3121 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3122 .await;
3123
3124 // Share a project as client A
3125 fs.insert_tree(
3126 "/a",
3127 json!({
3128 "a.rs": "let one = 1;",
3129 }),
3130 )
3131 .await;
3132 let project_a = cx_a.update(|cx| {
3133 Project::local(
3134 client_a.clone(),
3135 client_a.user_store.clone(),
3136 lang_registry.clone(),
3137 fs.clone(),
3138 cx,
3139 )
3140 });
3141 let (worktree_a, _) = project_a
3142 .update(cx_a, |p, cx| {
3143 p.find_or_create_local_worktree("/a", true, cx)
3144 })
3145 .await
3146 .unwrap();
3147 worktree_a
3148 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3149 .await;
3150 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3151 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3152 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3153 let buffer_a = project_a
3154 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
3155 .await
3156 .unwrap();
3157
3158 // Join the worktree as client B.
3159 let project_b = Project::remote(
3160 project_id,
3161 client_b.clone(),
3162 client_b.user_store.clone(),
3163 lang_registry.clone(),
3164 fs.clone(),
3165 &mut cx_b.to_async(),
3166 )
3167 .await
3168 .unwrap();
3169
3170 let buffer_b = cx_b
3171 .background()
3172 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3173 .await
3174 .unwrap();
3175 buffer_b.update(cx_b, |buffer, cx| {
3176 buffer.edit([(4..7, "six")], cx);
3177 buffer.edit([(10..11, "6")], cx);
3178 assert_eq!(buffer.text(), "let six = 6;");
3179 assert!(buffer.is_dirty());
3180 assert!(!buffer.has_conflict());
3181 });
3182 buffer_a
3183 .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
3184 .await;
3185
3186 fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
3187 .await
3188 .unwrap();
3189 buffer_a
3190 .condition(cx_a, |buffer, _| buffer.has_conflict())
3191 .await;
3192 buffer_b
3193 .condition(cx_b, |buffer, _| buffer.has_conflict())
3194 .await;
3195
3196 project_b
3197 .update(cx_b, |project, cx| {
3198 project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
3199 })
3200 .await
3201 .unwrap();
3202 buffer_a.read_with(cx_a, |buffer, _| {
3203 assert_eq!(buffer.text(), "let seven = 7;");
3204 assert!(!buffer.is_dirty());
3205 assert!(!buffer.has_conflict());
3206 });
3207 buffer_b.read_with(cx_b, |buffer, _| {
3208 assert_eq!(buffer.text(), "let seven = 7;");
3209 assert!(!buffer.is_dirty());
3210 assert!(!buffer.has_conflict());
3211 });
3212
3213 buffer_a.update(cx_a, |buffer, cx| {
3214 // Undoing on the host is a no-op when the reload was initiated by the guest.
3215 buffer.undo(cx);
3216 assert_eq!(buffer.text(), "let seven = 7;");
3217 assert!(!buffer.is_dirty());
3218 assert!(!buffer.has_conflict());
3219 });
3220 buffer_b.update(cx_b, |buffer, cx| {
3221 // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
3222 buffer.undo(cx);
3223 assert_eq!(buffer.text(), "let six = 6;");
3224 assert!(buffer.is_dirty());
3225 assert!(!buffer.has_conflict());
3226 });
3227 }
3228
3229 #[gpui::test(iterations = 10)]
3230 async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3231 cx_a.foreground().forbid_parking();
3232 let lang_registry = Arc::new(LanguageRegistry::test());
3233 let fs = FakeFs::new(cx_a.background());
3234
3235 // Set up a fake language server.
3236 let mut language = Language::new(
3237 LanguageConfig {
3238 name: "Rust".into(),
3239 path_suffixes: vec!["rs".to_string()],
3240 ..Default::default()
3241 },
3242 Some(tree_sitter_rust::language()),
3243 );
3244 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3245 lang_registry.add(Arc::new(language));
3246
3247 // Connect to a server as 2 clients.
3248 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3249 let client_a = server.create_client(cx_a, "user_a").await;
3250 let client_b = server.create_client(cx_b, "user_b").await;
3251 server
3252 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3253 .await;
3254
3255 // Share a project as client A
3256 fs.insert_tree(
3257 "/a",
3258 json!({
3259 "a.rs": "let one = two",
3260 }),
3261 )
3262 .await;
3263 let project_a = cx_a.update(|cx| {
3264 Project::local(
3265 client_a.clone(),
3266 client_a.user_store.clone(),
3267 lang_registry.clone(),
3268 fs.clone(),
3269 cx,
3270 )
3271 });
3272 let (worktree_a, _) = project_a
3273 .update(cx_a, |p, cx| {
3274 p.find_or_create_local_worktree("/a", true, cx)
3275 })
3276 .await
3277 .unwrap();
3278 worktree_a
3279 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3280 .await;
3281 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3282 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3283 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3284
3285 // Join the worktree as client B.
3286 let project_b = Project::remote(
3287 project_id,
3288 client_b.clone(),
3289 client_b.user_store.clone(),
3290 lang_registry.clone(),
3291 fs.clone(),
3292 &mut cx_b.to_async(),
3293 )
3294 .await
3295 .unwrap();
3296
3297 let buffer_b = cx_b
3298 .background()
3299 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3300 .await
3301 .unwrap();
3302
3303 let fake_language_server = fake_language_servers.next().await.unwrap();
3304 fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
3305 Ok(Some(vec![
3306 lsp::TextEdit {
3307 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
3308 new_text: "h".to_string(),
3309 },
3310 lsp::TextEdit {
3311 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
3312 new_text: "y".to_string(),
3313 },
3314 ]))
3315 });
3316
3317 project_b
3318 .update(cx_b, |project, cx| {
3319 project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
3320 })
3321 .await
3322 .unwrap();
3323 assert_eq!(
3324 buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
3325 "let honey = two"
3326 );
3327 }
3328
3329 #[gpui::test(iterations = 10)]
3330 async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3331 cx_a.foreground().forbid_parking();
3332 let lang_registry = Arc::new(LanguageRegistry::test());
3333 let fs = FakeFs::new(cx_a.background());
3334 fs.insert_tree(
3335 "/root-1",
3336 json!({
3337 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
3338 }),
3339 )
3340 .await;
3341 fs.insert_tree(
3342 "/root-2",
3343 json!({
3344 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
3345 }),
3346 )
3347 .await;
3348
3349 // Set up a fake language server.
3350 let mut language = Language::new(
3351 LanguageConfig {
3352 name: "Rust".into(),
3353 path_suffixes: vec!["rs".to_string()],
3354 ..Default::default()
3355 },
3356 Some(tree_sitter_rust::language()),
3357 );
3358 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3359 lang_registry.add(Arc::new(language));
3360
3361 // Connect to a server as 2 clients.
3362 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3363 let client_a = server.create_client(cx_a, "user_a").await;
3364 let client_b = server.create_client(cx_b, "user_b").await;
3365 server
3366 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3367 .await;
3368
3369 // Share a project as client A
3370 let project_a = cx_a.update(|cx| {
3371 Project::local(
3372 client_a.clone(),
3373 client_a.user_store.clone(),
3374 lang_registry.clone(),
3375 fs.clone(),
3376 cx,
3377 )
3378 });
3379 let (worktree_a, _) = project_a
3380 .update(cx_a, |p, cx| {
3381 p.find_or_create_local_worktree("/root-1", true, cx)
3382 })
3383 .await
3384 .unwrap();
3385 worktree_a
3386 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3387 .await;
3388 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3389 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3390 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3391
3392 // Join the worktree as client B.
3393 let project_b = Project::remote(
3394 project_id,
3395 client_b.clone(),
3396 client_b.user_store.clone(),
3397 lang_registry.clone(),
3398 fs.clone(),
3399 &mut cx_b.to_async(),
3400 )
3401 .await
3402 .unwrap();
3403
3404 // Open the file on client B.
3405 let buffer_b = cx_b
3406 .background()
3407 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3408 .await
3409 .unwrap();
3410
3411 // Request the definition of a symbol as the guest.
3412 let fake_language_server = fake_language_servers.next().await.unwrap();
3413 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3414 |_, _| async move {
3415 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3416 lsp::Location::new(
3417 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3418 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3419 ),
3420 )))
3421 },
3422 );
3423
3424 let definitions_1 = project_b
3425 .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
3426 .await
3427 .unwrap();
3428 cx_b.read(|cx| {
3429 assert_eq!(definitions_1.len(), 1);
3430 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3431 let target_buffer = definitions_1[0].buffer.read(cx);
3432 assert_eq!(
3433 target_buffer.text(),
3434 "const TWO: usize = 2;\nconst THREE: usize = 3;"
3435 );
3436 assert_eq!(
3437 definitions_1[0].range.to_point(target_buffer),
3438 Point::new(0, 6)..Point::new(0, 9)
3439 );
3440 });
3441
3442 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
3443 // the previous call to `definition`.
3444 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3445 |_, _| async move {
3446 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3447 lsp::Location::new(
3448 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3449 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
3450 ),
3451 )))
3452 },
3453 );
3454
3455 let definitions_2 = project_b
3456 .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
3457 .await
3458 .unwrap();
3459 cx_b.read(|cx| {
3460 assert_eq!(definitions_2.len(), 1);
3461 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3462 let target_buffer = definitions_2[0].buffer.read(cx);
3463 assert_eq!(
3464 target_buffer.text(),
3465 "const TWO: usize = 2;\nconst THREE: usize = 3;"
3466 );
3467 assert_eq!(
3468 definitions_2[0].range.to_point(target_buffer),
3469 Point::new(1, 6)..Point::new(1, 11)
3470 );
3471 });
3472 assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
3473 }
3474
3475 #[gpui::test(iterations = 10)]
3476 async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3477 cx_a.foreground().forbid_parking();
3478 let lang_registry = Arc::new(LanguageRegistry::test());
3479 let fs = FakeFs::new(cx_a.background());
3480 fs.insert_tree(
3481 "/root-1",
3482 json!({
3483 "one.rs": "const ONE: usize = 1;",
3484 "two.rs": "const TWO: usize = one::ONE + one::ONE;",
3485 }),
3486 )
3487 .await;
3488 fs.insert_tree(
3489 "/root-2",
3490 json!({
3491 "three.rs": "const THREE: usize = two::TWO + one::ONE;",
3492 }),
3493 )
3494 .await;
3495
3496 // Set up a fake language server.
3497 let mut language = Language::new(
3498 LanguageConfig {
3499 name: "Rust".into(),
3500 path_suffixes: vec!["rs".to_string()],
3501 ..Default::default()
3502 },
3503 Some(tree_sitter_rust::language()),
3504 );
3505 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3506 lang_registry.add(Arc::new(language));
3507
3508 // Connect to a server as 2 clients.
3509 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3510 let client_a = server.create_client(cx_a, "user_a").await;
3511 let client_b = server.create_client(cx_b, "user_b").await;
3512 server
3513 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3514 .await;
3515
3516 // Share a project as client A
3517 let project_a = cx_a.update(|cx| {
3518 Project::local(
3519 client_a.clone(),
3520 client_a.user_store.clone(),
3521 lang_registry.clone(),
3522 fs.clone(),
3523 cx,
3524 )
3525 });
3526 let (worktree_a, _) = project_a
3527 .update(cx_a, |p, cx| {
3528 p.find_or_create_local_worktree("/root-1", true, cx)
3529 })
3530 .await
3531 .unwrap();
3532 worktree_a
3533 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3534 .await;
3535 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3536 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3537 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3538
3539 // Join the worktree as client B.
3540 let project_b = Project::remote(
3541 project_id,
3542 client_b.clone(),
3543 client_b.user_store.clone(),
3544 lang_registry.clone(),
3545 fs.clone(),
3546 &mut cx_b.to_async(),
3547 )
3548 .await
3549 .unwrap();
3550
3551 // Open the file on client B.
3552 let buffer_b = cx_b
3553 .background()
3554 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3555 .await
3556 .unwrap();
3557
3558 // Request references to a symbol as the guest.
3559 let fake_language_server = fake_language_servers.next().await.unwrap();
3560 fake_language_server.handle_request::<lsp::request::References, _, _>(
3561 |params, _| async move {
3562 assert_eq!(
3563 params.text_document_position.text_document.uri.as_str(),
3564 "file:///root-1/one.rs"
3565 );
3566 Ok(Some(vec![
3567 lsp::Location {
3568 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3569 range: lsp::Range::new(
3570 lsp::Position::new(0, 24),
3571 lsp::Position::new(0, 27),
3572 ),
3573 },
3574 lsp::Location {
3575 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3576 range: lsp::Range::new(
3577 lsp::Position::new(0, 35),
3578 lsp::Position::new(0, 38),
3579 ),
3580 },
3581 lsp::Location {
3582 uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3583 range: lsp::Range::new(
3584 lsp::Position::new(0, 37),
3585 lsp::Position::new(0, 40),
3586 ),
3587 },
3588 ]))
3589 },
3590 );
3591
3592 let references = project_b
3593 .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3594 .await
3595 .unwrap();
3596 cx_b.read(|cx| {
3597 assert_eq!(references.len(), 3);
3598 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3599
3600 let two_buffer = references[0].buffer.read(cx);
3601 let three_buffer = references[2].buffer.read(cx);
3602 assert_eq!(
3603 two_buffer.file().unwrap().path().as_ref(),
3604 Path::new("two.rs")
3605 );
3606 assert_eq!(references[1].buffer, references[0].buffer);
3607 assert_eq!(
3608 three_buffer.file().unwrap().full_path(cx),
3609 Path::new("three.rs")
3610 );
3611
3612 assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3613 assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3614 assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3615 });
3616 }
3617
3618 #[gpui::test(iterations = 10)]
3619 async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3620 cx_a.foreground().forbid_parking();
3621 let lang_registry = Arc::new(LanguageRegistry::test());
3622 let fs = FakeFs::new(cx_a.background());
3623 fs.insert_tree(
3624 "/root-1",
3625 json!({
3626 "a": "hello world",
3627 "b": "goodnight moon",
3628 "c": "a world of goo",
3629 "d": "world champion of clown world",
3630 }),
3631 )
3632 .await;
3633 fs.insert_tree(
3634 "/root-2",
3635 json!({
3636 "e": "disney world is fun",
3637 }),
3638 )
3639 .await;
3640
3641 // Connect to a server as 2 clients.
3642 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3643 let client_a = server.create_client(cx_a, "user_a").await;
3644 let client_b = server.create_client(cx_b, "user_b").await;
3645 server
3646 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3647 .await;
3648
3649 // Share a project as client A
3650 let project_a = cx_a.update(|cx| {
3651 Project::local(
3652 client_a.clone(),
3653 client_a.user_store.clone(),
3654 lang_registry.clone(),
3655 fs.clone(),
3656 cx,
3657 )
3658 });
3659 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3660
3661 let (worktree_1, _) = project_a
3662 .update(cx_a, |p, cx| {
3663 p.find_or_create_local_worktree("/root-1", true, cx)
3664 })
3665 .await
3666 .unwrap();
3667 worktree_1
3668 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3669 .await;
3670 let (worktree_2, _) = project_a
3671 .update(cx_a, |p, cx| {
3672 p.find_or_create_local_worktree("/root-2", true, cx)
3673 })
3674 .await
3675 .unwrap();
3676 worktree_2
3677 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3678 .await;
3679
3680 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3681
3682 // Join the worktree as client B.
3683 let project_b = Project::remote(
3684 project_id,
3685 client_b.clone(),
3686 client_b.user_store.clone(),
3687 lang_registry.clone(),
3688 fs.clone(),
3689 &mut cx_b.to_async(),
3690 )
3691 .await
3692 .unwrap();
3693
3694 let results = project_b
3695 .update(cx_b, |project, cx| {
3696 project.search(SearchQuery::text("world", false, false), cx)
3697 })
3698 .await
3699 .unwrap();
3700
3701 let mut ranges_by_path = results
3702 .into_iter()
3703 .map(|(buffer, ranges)| {
3704 buffer.read_with(cx_b, |buffer, cx| {
3705 let path = buffer.file().unwrap().full_path(cx);
3706 let offset_ranges = ranges
3707 .into_iter()
3708 .map(|range| range.to_offset(buffer))
3709 .collect::<Vec<_>>();
3710 (path, offset_ranges)
3711 })
3712 })
3713 .collect::<Vec<_>>();
3714 ranges_by_path.sort_by_key(|(path, _)| path.clone());
3715
3716 assert_eq!(
3717 ranges_by_path,
3718 &[
3719 (PathBuf::from("root-1/a"), vec![6..11]),
3720 (PathBuf::from("root-1/c"), vec![2..7]),
3721 (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3722 (PathBuf::from("root-2/e"), vec![7..12]),
3723 ]
3724 );
3725 }
3726
3727 #[gpui::test(iterations = 10)]
3728 async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3729 cx_a.foreground().forbid_parking();
3730 let lang_registry = Arc::new(LanguageRegistry::test());
3731 let fs = FakeFs::new(cx_a.background());
3732 fs.insert_tree(
3733 "/root-1",
3734 json!({
3735 "main.rs": "fn double(number: i32) -> i32 { number + number }",
3736 }),
3737 )
3738 .await;
3739
3740 // Set up a fake language server.
3741 let mut language = Language::new(
3742 LanguageConfig {
3743 name: "Rust".into(),
3744 path_suffixes: vec!["rs".to_string()],
3745 ..Default::default()
3746 },
3747 Some(tree_sitter_rust::language()),
3748 );
3749 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3750 lang_registry.add(Arc::new(language));
3751
3752 // Connect to a server as 2 clients.
3753 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3754 let client_a = server.create_client(cx_a, "user_a").await;
3755 let client_b = server.create_client(cx_b, "user_b").await;
3756 server
3757 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3758 .await;
3759
3760 // Share a project as client A
3761 let project_a = cx_a.update(|cx| {
3762 Project::local(
3763 client_a.clone(),
3764 client_a.user_store.clone(),
3765 lang_registry.clone(),
3766 fs.clone(),
3767 cx,
3768 )
3769 });
3770 let (worktree_a, _) = project_a
3771 .update(cx_a, |p, cx| {
3772 p.find_or_create_local_worktree("/root-1", true, cx)
3773 })
3774 .await
3775 .unwrap();
3776 worktree_a
3777 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3778 .await;
3779 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3780 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3781 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3782
3783 // Join the worktree as client B.
3784 let project_b = Project::remote(
3785 project_id,
3786 client_b.clone(),
3787 client_b.user_store.clone(),
3788 lang_registry.clone(),
3789 fs.clone(),
3790 &mut cx_b.to_async(),
3791 )
3792 .await
3793 .unwrap();
3794
3795 // Open the file on client B.
3796 let buffer_b = cx_b
3797 .background()
3798 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
3799 .await
3800 .unwrap();
3801
3802 // Request document highlights as the guest.
3803 let fake_language_server = fake_language_servers.next().await.unwrap();
3804 fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
3805 |params, _| async move {
3806 assert_eq!(
3807 params
3808 .text_document_position_params
3809 .text_document
3810 .uri
3811 .as_str(),
3812 "file:///root-1/main.rs"
3813 );
3814 assert_eq!(
3815 params.text_document_position_params.position,
3816 lsp::Position::new(0, 34)
3817 );
3818 Ok(Some(vec![
3819 lsp::DocumentHighlight {
3820 kind: Some(lsp::DocumentHighlightKind::WRITE),
3821 range: lsp::Range::new(
3822 lsp::Position::new(0, 10),
3823 lsp::Position::new(0, 16),
3824 ),
3825 },
3826 lsp::DocumentHighlight {
3827 kind: Some(lsp::DocumentHighlightKind::READ),
3828 range: lsp::Range::new(
3829 lsp::Position::new(0, 32),
3830 lsp::Position::new(0, 38),
3831 ),
3832 },
3833 lsp::DocumentHighlight {
3834 kind: Some(lsp::DocumentHighlightKind::READ),
3835 range: lsp::Range::new(
3836 lsp::Position::new(0, 41),
3837 lsp::Position::new(0, 47),
3838 ),
3839 },
3840 ]))
3841 },
3842 );
3843
3844 let highlights = project_b
3845 .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
3846 .await
3847 .unwrap();
3848 buffer_b.read_with(cx_b, |buffer, _| {
3849 let snapshot = buffer.snapshot();
3850
3851 let highlights = highlights
3852 .into_iter()
3853 .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3854 .collect::<Vec<_>>();
3855 assert_eq!(
3856 highlights,
3857 &[
3858 (lsp::DocumentHighlightKind::WRITE, 10..16),
3859 (lsp::DocumentHighlightKind::READ, 32..38),
3860 (lsp::DocumentHighlightKind::READ, 41..47)
3861 ]
3862 )
3863 });
3864 }
3865
3866 #[gpui::test(iterations = 10)]
3867 async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3868 cx_a.foreground().forbid_parking();
3869 let lang_registry = Arc::new(LanguageRegistry::test());
3870 let fs = FakeFs::new(cx_a.background());
3871 fs.insert_tree(
3872 "/code",
3873 json!({
3874 "crate-1": {
3875 "one.rs": "const ONE: usize = 1;",
3876 },
3877 "crate-2": {
3878 "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3879 },
3880 "private": {
3881 "passwords.txt": "the-password",
3882 }
3883 }),
3884 )
3885 .await;
3886
3887 // Set up a fake language server.
3888 let mut language = Language::new(
3889 LanguageConfig {
3890 name: "Rust".into(),
3891 path_suffixes: vec!["rs".to_string()],
3892 ..Default::default()
3893 },
3894 Some(tree_sitter_rust::language()),
3895 );
3896 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3897 lang_registry.add(Arc::new(language));
3898
3899 // Connect to a server as 2 clients.
3900 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3901 let client_a = server.create_client(cx_a, "user_a").await;
3902 let client_b = server.create_client(cx_b, "user_b").await;
3903 server
3904 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3905 .await;
3906
3907 // Share a project as client A
3908 let project_a = cx_a.update(|cx| {
3909 Project::local(
3910 client_a.clone(),
3911 client_a.user_store.clone(),
3912 lang_registry.clone(),
3913 fs.clone(),
3914 cx,
3915 )
3916 });
3917 let (worktree_a, _) = project_a
3918 .update(cx_a, |p, cx| {
3919 p.find_or_create_local_worktree("/code/crate-1", true, cx)
3920 })
3921 .await
3922 .unwrap();
3923 worktree_a
3924 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3925 .await;
3926 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3927 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3928 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3929
3930 // Join the worktree as client B.
3931 let project_b = Project::remote(
3932 project_id,
3933 client_b.clone(),
3934 client_b.user_store.clone(),
3935 lang_registry.clone(),
3936 fs.clone(),
3937 &mut cx_b.to_async(),
3938 )
3939 .await
3940 .unwrap();
3941
3942 // Cause the language server to start.
3943 let _buffer = cx_b
3944 .background()
3945 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3946 .await
3947 .unwrap();
3948
3949 let fake_language_server = fake_language_servers.next().await.unwrap();
3950 fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
3951 |_, _| async move {
3952 #[allow(deprecated)]
3953 Ok(Some(vec![lsp::SymbolInformation {
3954 name: "TWO".into(),
3955 location: lsp::Location {
3956 uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3957 range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3958 },
3959 kind: lsp::SymbolKind::CONSTANT,
3960 tags: None,
3961 container_name: None,
3962 deprecated: None,
3963 }]))
3964 },
3965 );
3966
3967 // Request the definition of a symbol as the guest.
3968 let symbols = project_b
3969 .update(cx_b, |p, cx| p.symbols("two", cx))
3970 .await
3971 .unwrap();
3972 assert_eq!(symbols.len(), 1);
3973 assert_eq!(symbols[0].name, "TWO");
3974
3975 // Open one of the returned symbols.
3976 let buffer_b_2 = project_b
3977 .update(cx_b, |project, cx| {
3978 project.open_buffer_for_symbol(&symbols[0], cx)
3979 })
3980 .await
3981 .unwrap();
3982 buffer_b_2.read_with(cx_b, |buffer, _| {
3983 assert_eq!(
3984 buffer.file().unwrap().path().as_ref(),
3985 Path::new("../crate-2/two.rs")
3986 );
3987 });
3988
3989 // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3990 let mut fake_symbol = symbols[0].clone();
3991 fake_symbol.path = Path::new("/code/secrets").into();
3992 let error = project_b
3993 .update(cx_b, |project, cx| {
3994 project.open_buffer_for_symbol(&fake_symbol, cx)
3995 })
3996 .await
3997 .unwrap_err();
3998 assert!(error.to_string().contains("invalid symbol signature"));
3999 }
4000
4001 #[gpui::test(iterations = 10)]
4002 async fn test_open_buffer_while_getting_definition_pointing_to_it(
4003 cx_a: &mut TestAppContext,
4004 cx_b: &mut TestAppContext,
4005 mut rng: StdRng,
4006 ) {
4007 cx_a.foreground().forbid_parking();
4008 let lang_registry = Arc::new(LanguageRegistry::test());
4009 let fs = FakeFs::new(cx_a.background());
4010 fs.insert_tree(
4011 "/root",
4012 json!({
4013 "a.rs": "const ONE: usize = b::TWO;",
4014 "b.rs": "const TWO: usize = 2",
4015 }),
4016 )
4017 .await;
4018
4019 // Set up a fake language server.
4020 let mut language = Language::new(
4021 LanguageConfig {
4022 name: "Rust".into(),
4023 path_suffixes: vec!["rs".to_string()],
4024 ..Default::default()
4025 },
4026 Some(tree_sitter_rust::language()),
4027 );
4028 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4029 lang_registry.add(Arc::new(language));
4030
4031 // Connect to a server as 2 clients.
4032 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4033 let client_a = server.create_client(cx_a, "user_a").await;
4034 let client_b = server.create_client(cx_b, "user_b").await;
4035 server
4036 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4037 .await;
4038
4039 // Share a project as client A
4040 let project_a = cx_a.update(|cx| {
4041 Project::local(
4042 client_a.clone(),
4043 client_a.user_store.clone(),
4044 lang_registry.clone(),
4045 fs.clone(),
4046 cx,
4047 )
4048 });
4049
4050 let (worktree_a, _) = project_a
4051 .update(cx_a, |p, cx| {
4052 p.find_or_create_local_worktree("/root", true, cx)
4053 })
4054 .await
4055 .unwrap();
4056 worktree_a
4057 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4058 .await;
4059 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4060 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4061 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4062
4063 // Join the worktree as client B.
4064 let project_b = Project::remote(
4065 project_id,
4066 client_b.clone(),
4067 client_b.user_store.clone(),
4068 lang_registry.clone(),
4069 fs.clone(),
4070 &mut cx_b.to_async(),
4071 )
4072 .await
4073 .unwrap();
4074
4075 let buffer_b1 = cx_b
4076 .background()
4077 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
4078 .await
4079 .unwrap();
4080
4081 let fake_language_server = fake_language_servers.next().await.unwrap();
4082 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
4083 |_, _| async move {
4084 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
4085 lsp::Location::new(
4086 lsp::Url::from_file_path("/root/b.rs").unwrap(),
4087 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
4088 ),
4089 )))
4090 },
4091 );
4092
4093 let definitions;
4094 let buffer_b2;
4095 if rng.gen() {
4096 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4097 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4098 } else {
4099 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4100 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4101 }
4102
4103 let buffer_b2 = buffer_b2.await.unwrap();
4104 let definitions = definitions.await.unwrap();
4105 assert_eq!(definitions.len(), 1);
4106 assert_eq!(definitions[0].buffer, buffer_b2);
4107 }
4108
4109 #[gpui::test(iterations = 10)]
4110 async fn test_collaborating_with_code_actions(
4111 cx_a: &mut TestAppContext,
4112 cx_b: &mut TestAppContext,
4113 ) {
4114 cx_a.foreground().forbid_parking();
4115 let lang_registry = Arc::new(LanguageRegistry::test());
4116 let fs = FakeFs::new(cx_a.background());
4117 cx_b.update(|cx| editor::init(cx));
4118
4119 // Set up a fake language server.
4120 let mut language = Language::new(
4121 LanguageConfig {
4122 name: "Rust".into(),
4123 path_suffixes: vec!["rs".to_string()],
4124 ..Default::default()
4125 },
4126 Some(tree_sitter_rust::language()),
4127 );
4128 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4129 lang_registry.add(Arc::new(language));
4130
4131 // Connect to a server as 2 clients.
4132 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4133 let client_a = server.create_client(cx_a, "user_a").await;
4134 let client_b = server.create_client(cx_b, "user_b").await;
4135 server
4136 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4137 .await;
4138
4139 // Share a project as client A
4140 fs.insert_tree(
4141 "/a",
4142 json!({
4143 "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
4144 "other.rs": "pub fn foo() -> usize { 4 }",
4145 }),
4146 )
4147 .await;
4148 let project_a = cx_a.update(|cx| {
4149 Project::local(
4150 client_a.clone(),
4151 client_a.user_store.clone(),
4152 lang_registry.clone(),
4153 fs.clone(),
4154 cx,
4155 )
4156 });
4157 let (worktree_a, _) = project_a
4158 .update(cx_a, |p, cx| {
4159 p.find_or_create_local_worktree("/a", true, cx)
4160 })
4161 .await
4162 .unwrap();
4163 worktree_a
4164 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4165 .await;
4166 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4167 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4168 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4169
4170 // Join the worktree as client B.
4171 let project_b = Project::remote(
4172 project_id,
4173 client_b.clone(),
4174 client_b.user_store.clone(),
4175 lang_registry.clone(),
4176 fs.clone(),
4177 &mut cx_b.to_async(),
4178 )
4179 .await
4180 .unwrap();
4181 let mut params = cx_b.update(WorkspaceParams::test);
4182 params.languages = lang_registry.clone();
4183 params.client = client_b.client.clone();
4184 params.user_store = client_b.user_store.clone();
4185 params.project = project_b;
4186
4187 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
4188 let editor_b = workspace_b
4189 .update(cx_b, |workspace, cx| {
4190 workspace.open_path((worktree_id, "main.rs"), true, cx)
4191 })
4192 .await
4193 .unwrap()
4194 .downcast::<Editor>()
4195 .unwrap();
4196
4197 let mut fake_language_server = fake_language_servers.next().await.unwrap();
4198 fake_language_server
4199 .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4200 assert_eq!(
4201 params.text_document.uri,
4202 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4203 );
4204 assert_eq!(params.range.start, lsp::Position::new(0, 0));
4205 assert_eq!(params.range.end, lsp::Position::new(0, 0));
4206 Ok(None)
4207 })
4208 .next()
4209 .await;
4210
4211 // Move cursor to a location that contains code actions.
4212 editor_b.update(cx_b, |editor, cx| {
4213 editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
4214 cx.focus(&editor_b);
4215 });
4216
4217 fake_language_server
4218 .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4219 assert_eq!(
4220 params.text_document.uri,
4221 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4222 );
4223 assert_eq!(params.range.start, lsp::Position::new(1, 31));
4224 assert_eq!(params.range.end, lsp::Position::new(1, 31));
4225
4226 Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
4227 lsp::CodeAction {
4228 title: "Inline into all callers".to_string(),
4229 edit: Some(lsp::WorkspaceEdit {
4230 changes: Some(
4231 [
4232 (
4233 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4234 vec![lsp::TextEdit::new(
4235 lsp::Range::new(
4236 lsp::Position::new(1, 22),
4237 lsp::Position::new(1, 34),
4238 ),
4239 "4".to_string(),
4240 )],
4241 ),
4242 (
4243 lsp::Url::from_file_path("/a/other.rs").unwrap(),
4244 vec![lsp::TextEdit::new(
4245 lsp::Range::new(
4246 lsp::Position::new(0, 0),
4247 lsp::Position::new(0, 27),
4248 ),
4249 "".to_string(),
4250 )],
4251 ),
4252 ]
4253 .into_iter()
4254 .collect(),
4255 ),
4256 ..Default::default()
4257 }),
4258 data: Some(json!({
4259 "codeActionParams": {
4260 "range": {
4261 "start": {"line": 1, "column": 31},
4262 "end": {"line": 1, "column": 31},
4263 }
4264 }
4265 })),
4266 ..Default::default()
4267 },
4268 )]))
4269 })
4270 .next()
4271 .await;
4272
4273 // Toggle code actions and wait for them to display.
4274 editor_b.update(cx_b, |editor, cx| {
4275 editor.toggle_code_actions(
4276 &ToggleCodeActions {
4277 deployed_from_indicator: false,
4278 },
4279 cx,
4280 );
4281 });
4282 editor_b
4283 .condition(&cx_b, |editor, _| editor.context_menu_visible())
4284 .await;
4285
4286 fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
4287
4288 // Confirming the code action will trigger a resolve request.
4289 let confirm_action = workspace_b
4290 .update(cx_b, |workspace, cx| {
4291 Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
4292 })
4293 .unwrap();
4294 fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
4295 |_, _| async move {
4296 Ok(lsp::CodeAction {
4297 title: "Inline into all callers".to_string(),
4298 edit: Some(lsp::WorkspaceEdit {
4299 changes: Some(
4300 [
4301 (
4302 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4303 vec![lsp::TextEdit::new(
4304 lsp::Range::new(
4305 lsp::Position::new(1, 22),
4306 lsp::Position::new(1, 34),
4307 ),
4308 "4".to_string(),
4309 )],
4310 ),
4311 (
4312 lsp::Url::from_file_path("/a/other.rs").unwrap(),
4313 vec![lsp::TextEdit::new(
4314 lsp::Range::new(
4315 lsp::Position::new(0, 0),
4316 lsp::Position::new(0, 27),
4317 ),
4318 "".to_string(),
4319 )],
4320 ),
4321 ]
4322 .into_iter()
4323 .collect(),
4324 ),
4325 ..Default::default()
4326 }),
4327 ..Default::default()
4328 })
4329 },
4330 );
4331
4332 // After the action is confirmed, an editor containing both modified files is opened.
4333 confirm_action.await.unwrap();
4334 let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4335 workspace
4336 .active_item(cx)
4337 .unwrap()
4338 .downcast::<Editor>()
4339 .unwrap()
4340 });
4341 code_action_editor.update(cx_b, |editor, cx| {
4342 assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4343 editor.undo(&Undo, cx);
4344 assert_eq!(
4345 editor.text(cx),
4346 "mod other;\nfn main() { let foo = other::foo(); }\npub fn foo() -> usize { 4 }"
4347 );
4348 editor.redo(&Redo, cx);
4349 assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4350 });
4351 }
4352
4353 #[gpui::test(iterations = 10)]
4354 async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4355 cx_a.foreground().forbid_parking();
4356 let lang_registry = Arc::new(LanguageRegistry::test());
4357 let fs = FakeFs::new(cx_a.background());
4358 cx_b.update(|cx| editor::init(cx));
4359
4360 // Set up a fake language server.
4361 let mut language = Language::new(
4362 LanguageConfig {
4363 name: "Rust".into(),
4364 path_suffixes: vec!["rs".to_string()],
4365 ..Default::default()
4366 },
4367 Some(tree_sitter_rust::language()),
4368 );
4369 let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
4370 capabilities: lsp::ServerCapabilities {
4371 rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
4372 prepare_provider: Some(true),
4373 work_done_progress_options: Default::default(),
4374 })),
4375 ..Default::default()
4376 },
4377 ..Default::default()
4378 });
4379 lang_registry.add(Arc::new(language));
4380
4381 // Connect to a server as 2 clients.
4382 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4383 let client_a = server.create_client(cx_a, "user_a").await;
4384 let client_b = server.create_client(cx_b, "user_b").await;
4385 server
4386 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4387 .await;
4388
4389 // Share a project as client A
4390 fs.insert_tree(
4391 "/dir",
4392 json!({
4393 "one.rs": "const ONE: usize = 1;",
4394 "two.rs": "const TWO: usize = one::ONE + one::ONE;"
4395 }),
4396 )
4397 .await;
4398 let project_a = cx_a.update(|cx| {
4399 Project::local(
4400 client_a.clone(),
4401 client_a.user_store.clone(),
4402 lang_registry.clone(),
4403 fs.clone(),
4404 cx,
4405 )
4406 });
4407 let (worktree_a, _) = project_a
4408 .update(cx_a, |p, cx| {
4409 p.find_or_create_local_worktree("/dir", true, cx)
4410 })
4411 .await
4412 .unwrap();
4413 worktree_a
4414 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4415 .await;
4416 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4417 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4418 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4419
4420 // Join the worktree as client B.
4421 let project_b = Project::remote(
4422 project_id,
4423 client_b.clone(),
4424 client_b.user_store.clone(),
4425 lang_registry.clone(),
4426 fs.clone(),
4427 &mut cx_b.to_async(),
4428 )
4429 .await
4430 .unwrap();
4431 let mut params = cx_b.update(WorkspaceParams::test);
4432 params.languages = lang_registry.clone();
4433 params.client = client_b.client.clone();
4434 params.user_store = client_b.user_store.clone();
4435 params.project = project_b;
4436
4437 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
4438 let editor_b = workspace_b
4439 .update(cx_b, |workspace, cx| {
4440 workspace.open_path((worktree_id, "one.rs"), true, cx)
4441 })
4442 .await
4443 .unwrap()
4444 .downcast::<Editor>()
4445 .unwrap();
4446 let fake_language_server = fake_language_servers.next().await.unwrap();
4447
4448 // Move cursor to a location that can be renamed.
4449 let prepare_rename = editor_b.update(cx_b, |editor, cx| {
4450 editor.select_ranges([7..7], None, cx);
4451 editor.rename(&Rename, cx).unwrap()
4452 });
4453
4454 fake_language_server
4455 .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
4456 assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
4457 assert_eq!(params.position, lsp::Position::new(0, 7));
4458 Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4459 lsp::Position::new(0, 6),
4460 lsp::Position::new(0, 9),
4461 ))))
4462 })
4463 .next()
4464 .await
4465 .unwrap();
4466 prepare_rename.await.unwrap();
4467 editor_b.update(cx_b, |editor, cx| {
4468 let rename = editor.pending_rename().unwrap();
4469 let buffer = editor.buffer().read(cx).snapshot(cx);
4470 assert_eq!(
4471 rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
4472 6..9
4473 );
4474 rename.editor.update(cx, |rename_editor, cx| {
4475 rename_editor.buffer().update(cx, |rename_buffer, cx| {
4476 rename_buffer.edit([(0..3, "THREE")], cx);
4477 });
4478 });
4479 });
4480
4481 let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
4482 Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
4483 });
4484 fake_language_server
4485 .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
4486 assert_eq!(
4487 params.text_document_position.text_document.uri.as_str(),
4488 "file:///dir/one.rs"
4489 );
4490 assert_eq!(
4491 params.text_document_position.position,
4492 lsp::Position::new(0, 6)
4493 );
4494 assert_eq!(params.new_name, "THREE");
4495 Ok(Some(lsp::WorkspaceEdit {
4496 changes: Some(
4497 [
4498 (
4499 lsp::Url::from_file_path("/dir/one.rs").unwrap(),
4500 vec![lsp::TextEdit::new(
4501 lsp::Range::new(
4502 lsp::Position::new(0, 6),
4503 lsp::Position::new(0, 9),
4504 ),
4505 "THREE".to_string(),
4506 )],
4507 ),
4508 (
4509 lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4510 vec![
4511 lsp::TextEdit::new(
4512 lsp::Range::new(
4513 lsp::Position::new(0, 24),
4514 lsp::Position::new(0, 27),
4515 ),
4516 "THREE".to_string(),
4517 ),
4518 lsp::TextEdit::new(
4519 lsp::Range::new(
4520 lsp::Position::new(0, 35),
4521 lsp::Position::new(0, 38),
4522 ),
4523 "THREE".to_string(),
4524 ),
4525 ],
4526 ),
4527 ]
4528 .into_iter()
4529 .collect(),
4530 ),
4531 ..Default::default()
4532 }))
4533 })
4534 .next()
4535 .await
4536 .unwrap();
4537 confirm_rename.await.unwrap();
4538
4539 let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4540 workspace
4541 .active_item(cx)
4542 .unwrap()
4543 .downcast::<Editor>()
4544 .unwrap()
4545 });
4546 rename_editor.update(cx_b, |editor, cx| {
4547 assert_eq!(
4548 editor.text(cx),
4549 "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4550 );
4551 editor.undo(&Undo, cx);
4552 assert_eq!(
4553 editor.text(cx),
4554 "const ONE: usize = 1;\nconst TWO: usize = one::ONE + one::ONE;"
4555 );
4556 editor.redo(&Redo, cx);
4557 assert_eq!(
4558 editor.text(cx),
4559 "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4560 );
4561 });
4562
4563 // Ensure temporary rename edits cannot be undone/redone.
4564 editor_b.update(cx_b, |editor, cx| {
4565 editor.undo(&Undo, cx);
4566 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4567 editor.undo(&Undo, cx);
4568 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4569 editor.redo(&Redo, cx);
4570 assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4571 })
4572 }
4573
4574 #[gpui::test(iterations = 10)]
4575 async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4576 cx_a.foreground().forbid_parking();
4577
4578 // Connect to a server as 2 clients.
4579 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4580 let client_a = server.create_client(cx_a, "user_a").await;
4581 let client_b = server.create_client(cx_b, "user_b").await;
4582
4583 // Create an org that includes these 2 users.
4584 let db = &server.app_state.db;
4585 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4586 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4587 .await
4588 .unwrap();
4589 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4590 .await
4591 .unwrap();
4592
4593 // Create a channel that includes all the users.
4594 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4595 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4596 .await
4597 .unwrap();
4598 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4599 .await
4600 .unwrap();
4601 db.create_channel_message(
4602 channel_id,
4603 client_b.current_user_id(&cx_b),
4604 "hello A, it's B.",
4605 OffsetDateTime::now_utc(),
4606 1,
4607 )
4608 .await
4609 .unwrap();
4610
4611 let channels_a = cx_a
4612 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4613 channels_a
4614 .condition(cx_a, |list, _| list.available_channels().is_some())
4615 .await;
4616 channels_a.read_with(cx_a, |list, _| {
4617 assert_eq!(
4618 list.available_channels().unwrap(),
4619 &[ChannelDetails {
4620 id: channel_id.to_proto(),
4621 name: "test-channel".to_string()
4622 }]
4623 )
4624 });
4625 let channel_a = channels_a.update(cx_a, |this, cx| {
4626 this.get_channel(channel_id.to_proto(), cx).unwrap()
4627 });
4628 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4629 channel_a
4630 .condition(&cx_a, |channel, _| {
4631 channel_messages(channel)
4632 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4633 })
4634 .await;
4635
4636 let channels_b = cx_b
4637 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4638 channels_b
4639 .condition(cx_b, |list, _| list.available_channels().is_some())
4640 .await;
4641 channels_b.read_with(cx_b, |list, _| {
4642 assert_eq!(
4643 list.available_channels().unwrap(),
4644 &[ChannelDetails {
4645 id: channel_id.to_proto(),
4646 name: "test-channel".to_string()
4647 }]
4648 )
4649 });
4650
4651 let channel_b = channels_b.update(cx_b, |this, cx| {
4652 this.get_channel(channel_id.to_proto(), cx).unwrap()
4653 });
4654 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4655 channel_b
4656 .condition(&cx_b, |channel, _| {
4657 channel_messages(channel)
4658 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4659 })
4660 .await;
4661
4662 channel_a
4663 .update(cx_a, |channel, cx| {
4664 channel
4665 .send_message("oh, hi B.".to_string(), cx)
4666 .unwrap()
4667 .detach();
4668 let task = channel.send_message("sup".to_string(), cx).unwrap();
4669 assert_eq!(
4670 channel_messages(channel),
4671 &[
4672 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4673 ("user_a".to_string(), "oh, hi B.".to_string(), true),
4674 ("user_a".to_string(), "sup".to_string(), true)
4675 ]
4676 );
4677 task
4678 })
4679 .await
4680 .unwrap();
4681
4682 channel_b
4683 .condition(&cx_b, |channel, _| {
4684 channel_messages(channel)
4685 == [
4686 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4687 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4688 ("user_a".to_string(), "sup".to_string(), false),
4689 ]
4690 })
4691 .await;
4692
4693 assert_eq!(
4694 server
4695 .state()
4696 .await
4697 .channel(channel_id)
4698 .unwrap()
4699 .connection_ids
4700 .len(),
4701 2
4702 );
4703 cx_b.update(|_| drop(channel_b));
4704 server
4705 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4706 .await;
4707
4708 cx_a.update(|_| drop(channel_a));
4709 server
4710 .condition(|state| state.channel(channel_id).is_none())
4711 .await;
4712 }
4713
4714 #[gpui::test(iterations = 10)]
4715 async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4716 cx_a.foreground().forbid_parking();
4717
4718 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4719 let client_a = server.create_client(cx_a, "user_a").await;
4720
4721 let db = &server.app_state.db;
4722 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4723 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4724 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4725 .await
4726 .unwrap();
4727 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4728 .await
4729 .unwrap();
4730
4731 let channels_a = cx_a
4732 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4733 channels_a
4734 .condition(cx_a, |list, _| list.available_channels().is_some())
4735 .await;
4736 let channel_a = channels_a.update(cx_a, |this, cx| {
4737 this.get_channel(channel_id.to_proto(), cx).unwrap()
4738 });
4739
4740 // Messages aren't allowed to be too long.
4741 channel_a
4742 .update(cx_a, |channel, cx| {
4743 let long_body = "this is long.\n".repeat(1024);
4744 channel.send_message(long_body, cx).unwrap()
4745 })
4746 .await
4747 .unwrap_err();
4748
4749 // Messages aren't allowed to be blank.
4750 channel_a.update(cx_a, |channel, cx| {
4751 channel.send_message(String::new(), cx).unwrap_err()
4752 });
4753
4754 // Leading and trailing whitespace are trimmed.
4755 channel_a
4756 .update(cx_a, |channel, cx| {
4757 channel
4758 .send_message("\n surrounded by whitespace \n".to_string(), cx)
4759 .unwrap()
4760 })
4761 .await
4762 .unwrap();
4763 assert_eq!(
4764 db.get_channel_messages(channel_id, 10, None)
4765 .await
4766 .unwrap()
4767 .iter()
4768 .map(|m| &m.body)
4769 .collect::<Vec<_>>(),
4770 &["surrounded by whitespace"]
4771 );
4772 }
4773
4774 #[gpui::test(iterations = 10)]
4775 async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4776 cx_a.foreground().forbid_parking();
4777
4778 // Connect to a server as 2 clients.
4779 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4780 let client_a = server.create_client(cx_a, "user_a").await;
4781 let client_b = server.create_client(cx_b, "user_b").await;
4782 let mut status_b = client_b.status();
4783
4784 // Create an org that includes these 2 users.
4785 let db = &server.app_state.db;
4786 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4787 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4788 .await
4789 .unwrap();
4790 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4791 .await
4792 .unwrap();
4793
4794 // Create a channel that includes all the users.
4795 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4796 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4797 .await
4798 .unwrap();
4799 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4800 .await
4801 .unwrap();
4802 db.create_channel_message(
4803 channel_id,
4804 client_b.current_user_id(&cx_b),
4805 "hello A, it's B.",
4806 OffsetDateTime::now_utc(),
4807 2,
4808 )
4809 .await
4810 .unwrap();
4811
4812 let channels_a = cx_a
4813 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4814 channels_a
4815 .condition(cx_a, |list, _| list.available_channels().is_some())
4816 .await;
4817
4818 channels_a.read_with(cx_a, |list, _| {
4819 assert_eq!(
4820 list.available_channels().unwrap(),
4821 &[ChannelDetails {
4822 id: channel_id.to_proto(),
4823 name: "test-channel".to_string()
4824 }]
4825 )
4826 });
4827 let channel_a = channels_a.update(cx_a, |this, cx| {
4828 this.get_channel(channel_id.to_proto(), cx).unwrap()
4829 });
4830 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4831 channel_a
4832 .condition(&cx_a, |channel, _| {
4833 channel_messages(channel)
4834 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4835 })
4836 .await;
4837
4838 let channels_b = cx_b
4839 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4840 channels_b
4841 .condition(cx_b, |list, _| list.available_channels().is_some())
4842 .await;
4843 channels_b.read_with(cx_b, |list, _| {
4844 assert_eq!(
4845 list.available_channels().unwrap(),
4846 &[ChannelDetails {
4847 id: channel_id.to_proto(),
4848 name: "test-channel".to_string()
4849 }]
4850 )
4851 });
4852
4853 let channel_b = channels_b.update(cx_b, |this, cx| {
4854 this.get_channel(channel_id.to_proto(), cx).unwrap()
4855 });
4856 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4857 channel_b
4858 .condition(&cx_b, |channel, _| {
4859 channel_messages(channel)
4860 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4861 })
4862 .await;
4863
4864 // Disconnect client B, ensuring we can still access its cached channel data.
4865 server.forbid_connections();
4866 server.disconnect_client(client_b.current_user_id(&cx_b));
4867 cx_b.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
4868 while !matches!(
4869 status_b.next().await,
4870 Some(client::Status::ReconnectionError { .. })
4871 ) {}
4872
4873 channels_b.read_with(cx_b, |channels, _| {
4874 assert_eq!(
4875 channels.available_channels().unwrap(),
4876 [ChannelDetails {
4877 id: channel_id.to_proto(),
4878 name: "test-channel".to_string()
4879 }]
4880 )
4881 });
4882 channel_b.read_with(cx_b, |channel, _| {
4883 assert_eq!(
4884 channel_messages(channel),
4885 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4886 )
4887 });
4888
4889 // Send a message from client B while it is disconnected.
4890 channel_b
4891 .update(cx_b, |channel, cx| {
4892 let task = channel
4893 .send_message("can you see this?".to_string(), cx)
4894 .unwrap();
4895 assert_eq!(
4896 channel_messages(channel),
4897 &[
4898 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4899 ("user_b".to_string(), "can you see this?".to_string(), true)
4900 ]
4901 );
4902 task
4903 })
4904 .await
4905 .unwrap_err();
4906
4907 // Send a message from client A while B is disconnected.
4908 channel_a
4909 .update(cx_a, |channel, cx| {
4910 channel
4911 .send_message("oh, hi B.".to_string(), cx)
4912 .unwrap()
4913 .detach();
4914 let task = channel.send_message("sup".to_string(), cx).unwrap();
4915 assert_eq!(
4916 channel_messages(channel),
4917 &[
4918 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4919 ("user_a".to_string(), "oh, hi B.".to_string(), true),
4920 ("user_a".to_string(), "sup".to_string(), true)
4921 ]
4922 );
4923 task
4924 })
4925 .await
4926 .unwrap();
4927
4928 // Give client B a chance to reconnect.
4929 server.allow_connections();
4930 cx_b.foreground().advance_clock(Duration::from_secs(10));
4931
4932 // Verify that B sees the new messages upon reconnection, as well as the message client B
4933 // sent while offline.
4934 channel_b
4935 .condition(&cx_b, |channel, _| {
4936 channel_messages(channel)
4937 == [
4938 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4939 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4940 ("user_a".to_string(), "sup".to_string(), false),
4941 ("user_b".to_string(), "can you see this?".to_string(), false),
4942 ]
4943 })
4944 .await;
4945
4946 // Ensure client A and B can communicate normally after reconnection.
4947 channel_a
4948 .update(cx_a, |channel, cx| {
4949 channel.send_message("you online?".to_string(), cx).unwrap()
4950 })
4951 .await
4952 .unwrap();
4953 channel_b
4954 .condition(&cx_b, |channel, _| {
4955 channel_messages(channel)
4956 == [
4957 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4958 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4959 ("user_a".to_string(), "sup".to_string(), false),
4960 ("user_b".to_string(), "can you see this?".to_string(), false),
4961 ("user_a".to_string(), "you online?".to_string(), false),
4962 ]
4963 })
4964 .await;
4965
4966 channel_b
4967 .update(cx_b, |channel, cx| {
4968 channel.send_message("yep".to_string(), cx).unwrap()
4969 })
4970 .await
4971 .unwrap();
4972 channel_a
4973 .condition(&cx_a, |channel, _| {
4974 channel_messages(channel)
4975 == [
4976 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4977 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4978 ("user_a".to_string(), "sup".to_string(), false),
4979 ("user_b".to_string(), "can you see this?".to_string(), false),
4980 ("user_a".to_string(), "you online?".to_string(), false),
4981 ("user_b".to_string(), "yep".to_string(), false),
4982 ]
4983 })
4984 .await;
4985 }
4986
4987 #[gpui::test(iterations = 10)]
4988 async fn test_contacts(
4989 deterministic: Arc<Deterministic>,
4990 cx_a: &mut TestAppContext,
4991 cx_b: &mut TestAppContext,
4992 cx_c: &mut TestAppContext,
4993 ) {
4994 cx_a.foreground().forbid_parking();
4995 let lang_registry = Arc::new(LanguageRegistry::test());
4996 let fs = FakeFs::new(cx_a.background());
4997
4998 // Connect to a server as 3 clients.
4999 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5000 let client_a = server.create_client(cx_a, "user_a").await;
5001 let client_b = server.create_client(cx_b, "user_b").await;
5002 let client_c = server.create_client(cx_c, "user_c").await;
5003 server
5004 .make_contacts(vec![
5005 (&client_a, cx_a),
5006 (&client_b, cx_b),
5007 (&client_c, cx_c),
5008 ])
5009 .await;
5010 deterministic.run_until_parked();
5011 client_a.user_store.read_with(cx_a, |store, _| {
5012 assert_eq!(
5013 contacts(store),
5014 [("user_b", true, vec![]), ("user_c", true, vec![])]
5015 )
5016 });
5017 client_b.user_store.read_with(cx_b, |store, _| {
5018 assert_eq!(
5019 contacts(store),
5020 [("user_a", true, vec![]), ("user_c", true, vec![])]
5021 )
5022 });
5023 client_c.user_store.read_with(cx_c, |store, _| {
5024 assert_eq!(
5025 contacts(store),
5026 [("user_a", true, vec![]), ("user_b", true, vec![])]
5027 )
5028 });
5029
5030 // Share a worktree as client A.
5031 fs.create_dir(Path::new("/a")).await.unwrap();
5032
5033 let project_a = cx_a.update(|cx| {
5034 Project::local(
5035 client_a.clone(),
5036 client_a.user_store.clone(),
5037 lang_registry.clone(),
5038 fs.clone(),
5039 cx,
5040 )
5041 });
5042 let (worktree_a, _) = project_a
5043 .update(cx_a, |p, cx| {
5044 p.find_or_create_local_worktree("/a", true, cx)
5045 })
5046 .await
5047 .unwrap();
5048 worktree_a
5049 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
5050 .await;
5051
5052 deterministic.run_until_parked();
5053 client_a.user_store.read_with(cx_a, |store, _| {
5054 assert_eq!(
5055 contacts(store),
5056 [("user_b", true, vec![]), ("user_c", true, vec![])]
5057 )
5058 });
5059 client_b.user_store.read_with(cx_b, |store, _| {
5060 assert_eq!(
5061 contacts(store),
5062 [
5063 ("user_a", true, vec![("a", false, vec![])]),
5064 ("user_c", true, vec![])
5065 ]
5066 )
5067 });
5068 client_c.user_store.read_with(cx_c, |store, _| {
5069 assert_eq!(
5070 contacts(store),
5071 [
5072 ("user_a", true, vec![("a", false, vec![])]),
5073 ("user_b", true, vec![])
5074 ]
5075 )
5076 });
5077
5078 let project_id = project_a
5079 .update(cx_a, |project, _| project.next_remote_id())
5080 .await;
5081 project_a
5082 .update(cx_a, |project, cx| project.share(cx))
5083 .await
5084 .unwrap();
5085 deterministic.run_until_parked();
5086 client_a.user_store.read_with(cx_a, |store, _| {
5087 assert_eq!(
5088 contacts(store),
5089 [("user_b", true, vec![]), ("user_c", true, vec![])]
5090 )
5091 });
5092 client_b.user_store.read_with(cx_b, |store, _| {
5093 assert_eq!(
5094 contacts(store),
5095 [
5096 ("user_a", true, vec![("a", true, vec![])]),
5097 ("user_c", true, vec![])
5098 ]
5099 )
5100 });
5101 client_c.user_store.read_with(cx_c, |store, _| {
5102 assert_eq!(
5103 contacts(store),
5104 [
5105 ("user_a", true, vec![("a", true, vec![])]),
5106 ("user_b", true, vec![])
5107 ]
5108 )
5109 });
5110
5111 let _project_b = Project::remote(
5112 project_id,
5113 client_b.clone(),
5114 client_b.user_store.clone(),
5115 lang_registry.clone(),
5116 fs.clone(),
5117 &mut cx_b.to_async(),
5118 )
5119 .await
5120 .unwrap();
5121 deterministic.run_until_parked();
5122 client_a.user_store.read_with(cx_a, |store, _| {
5123 assert_eq!(
5124 contacts(store),
5125 [("user_b", true, vec![]), ("user_c", true, vec![])]
5126 )
5127 });
5128 client_b.user_store.read_with(cx_b, |store, _| {
5129 assert_eq!(
5130 contacts(store),
5131 [
5132 ("user_a", true, vec![("a", true, vec!["user_b"])]),
5133 ("user_c", true, vec![])
5134 ]
5135 )
5136 });
5137 client_c.user_store.read_with(cx_c, |store, _| {
5138 assert_eq!(
5139 contacts(store),
5140 [
5141 ("user_a", true, vec![("a", true, vec!["user_b"])]),
5142 ("user_b", true, vec![])
5143 ]
5144 )
5145 });
5146
5147 project_a
5148 .condition(&cx_a, |project, _| {
5149 project.collaborators().contains_key(&client_b.peer_id)
5150 })
5151 .await;
5152
5153 cx_a.update(move |_| drop(project_a));
5154 deterministic.run_until_parked();
5155 client_a.user_store.read_with(cx_a, |store, _| {
5156 assert_eq!(
5157 contacts(store),
5158 [("user_b", true, vec![]), ("user_c", true, vec![])]
5159 )
5160 });
5161 client_b.user_store.read_with(cx_b, |store, _| {
5162 assert_eq!(
5163 contacts(store),
5164 [("user_a", true, vec![]), ("user_c", true, vec![])]
5165 )
5166 });
5167 client_c.user_store.read_with(cx_c, |store, _| {
5168 assert_eq!(
5169 contacts(store),
5170 [("user_a", true, vec![]), ("user_b", true, vec![])]
5171 )
5172 });
5173
5174 server.disconnect_client(client_c.current_user_id(cx_c));
5175 server.forbid_connections();
5176 deterministic.advance_clock(rpc::RECEIVE_TIMEOUT);
5177
5178 client_a.user_store.read_with(cx_a, |store, _| {
5179 assert_eq!(
5180 contacts(store),
5181 [("user_b", true, vec![]), ("user_c", false, vec![])]
5182 )
5183 });
5184 client_b.user_store.read_with(cx_b, |store, _| {
5185 assert_eq!(
5186 contacts(store),
5187 [("user_a", true, vec![]), ("user_c", false, vec![])]
5188 )
5189 });
5190 client_c
5191 .user_store
5192 .read_with(cx_c, |store, _| assert_eq!(contacts(store), []));
5193
5194 server.allow_connections();
5195 client_c
5196 .authenticate_and_connect(false, &cx_c.to_async())
5197 .await
5198 .unwrap();
5199 deterministic.run_until_parked();
5200 client_a.user_store.read_with(cx_a, |store, _| {
5201 assert_eq!(
5202 contacts(store),
5203 [("user_b", true, vec![]), ("user_c", true, vec![])]
5204 )
5205 });
5206 client_b.user_store.read_with(cx_b, |store, _| {
5207 assert_eq!(
5208 contacts(store),
5209 [("user_a", true, vec![]), ("user_c", true, vec![])]
5210 )
5211 });
5212 client_c.user_store.read_with(cx_c, |store, _| {
5213 assert_eq!(
5214 contacts(store),
5215 [("user_a", true, vec![]), ("user_b", true, vec![])]
5216 )
5217 });
5218
5219 fn contacts(user_store: &UserStore) -> Vec<(&str, bool, Vec<(&str, bool, Vec<&str>)>)> {
5220 user_store
5221 .contacts()
5222 .iter()
5223 .map(|contact| {
5224 let worktrees = contact
5225 .projects
5226 .iter()
5227 .map(|p| {
5228 (
5229 p.worktree_root_names[0].as_str(),
5230 p.is_shared,
5231 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
5232 )
5233 })
5234 .collect();
5235 (
5236 contact.user.github_login.as_str(),
5237 contact.online,
5238 worktrees,
5239 )
5240 })
5241 .collect()
5242 }
5243 }
5244
5245 #[gpui::test(iterations = 10)]
5246 async fn test_contact_requests(
5247 executor: Arc<Deterministic>,
5248 cx_a: &mut TestAppContext,
5249 cx_a2: &mut TestAppContext,
5250 cx_b: &mut TestAppContext,
5251 cx_b2: &mut TestAppContext,
5252 cx_c: &mut TestAppContext,
5253 cx_c2: &mut TestAppContext,
5254 ) {
5255 cx_a.foreground().forbid_parking();
5256
5257 // Connect to a server as 3 clients.
5258 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5259 let client_a = server.create_client(cx_a, "user_a").await;
5260 let client_a2 = server.create_client(cx_a2, "user_a").await;
5261 let client_b = server.create_client(cx_b, "user_b").await;
5262 let client_b2 = server.create_client(cx_b2, "user_b").await;
5263 let client_c = server.create_client(cx_c, "user_c").await;
5264 let client_c2 = server.create_client(cx_c2, "user_c").await;
5265
5266 assert_eq!(client_a.user_id().unwrap(), client_a2.user_id().unwrap());
5267 assert_eq!(client_b.user_id().unwrap(), client_b2.user_id().unwrap());
5268 assert_eq!(client_c.user_id().unwrap(), client_c2.user_id().unwrap());
5269
5270 // User A and User C request that user B become their contact.
5271 client_a
5272 .user_store
5273 .update(cx_a, |store, cx| {
5274 store.request_contact(client_b.user_id().unwrap(), cx)
5275 })
5276 .await
5277 .unwrap();
5278 client_c
5279 .user_store
5280 .update(cx_c, |store, cx| {
5281 store.request_contact(client_b.user_id().unwrap(), cx)
5282 })
5283 .await
5284 .unwrap();
5285 executor.run_until_parked();
5286
5287 // All users see the pending request appear in all their clients.
5288 assert_eq!(
5289 client_a.summarize_contacts(&cx_a).outgoing_requests,
5290 &["user_b"]
5291 );
5292 assert_eq!(
5293 client_a2.summarize_contacts(&cx_a2).outgoing_requests,
5294 &["user_b"]
5295 );
5296 assert_eq!(
5297 client_b.summarize_contacts(&cx_b).incoming_requests,
5298 &["user_a", "user_c"]
5299 );
5300 assert_eq!(
5301 client_b2.summarize_contacts(&cx_b2).incoming_requests,
5302 &["user_a", "user_c"]
5303 );
5304 assert_eq!(
5305 client_c.summarize_contacts(&cx_c).outgoing_requests,
5306 &["user_b"]
5307 );
5308 assert_eq!(
5309 client_c2.summarize_contacts(&cx_c2).outgoing_requests,
5310 &["user_b"]
5311 );
5312
5313 // Contact requests are present upon connecting (tested here via disconnect/reconnect)
5314 disconnect_and_reconnect(&client_a, cx_a).await;
5315 disconnect_and_reconnect(&client_b, cx_b).await;
5316 disconnect_and_reconnect(&client_c, cx_c).await;
5317 executor.run_until_parked();
5318 assert_eq!(
5319 client_a.summarize_contacts(&cx_a).outgoing_requests,
5320 &["user_b"]
5321 );
5322 assert_eq!(
5323 client_b.summarize_contacts(&cx_b).incoming_requests,
5324 &["user_a", "user_c"]
5325 );
5326 assert_eq!(
5327 client_c.summarize_contacts(&cx_c).outgoing_requests,
5328 &["user_b"]
5329 );
5330
5331 // User B accepts the request from user A.
5332 client_b
5333 .user_store
5334 .update(cx_b, |store, cx| {
5335 store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
5336 })
5337 .await
5338 .unwrap();
5339
5340 executor.run_until_parked();
5341
5342 // User B sees user A as their contact now in all client, and the incoming request from them is removed.
5343 let contacts_b = client_b.summarize_contacts(&cx_b);
5344 assert_eq!(contacts_b.current, &["user_a"]);
5345 assert_eq!(contacts_b.incoming_requests, &["user_c"]);
5346 let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5347 assert_eq!(contacts_b2.current, &["user_a"]);
5348 assert_eq!(contacts_b2.incoming_requests, &["user_c"]);
5349
5350 // User A sees user B as their contact now in all clients, and the outgoing request to them is removed.
5351 let contacts_a = client_a.summarize_contacts(&cx_a);
5352 assert_eq!(contacts_a.current, &["user_b"]);
5353 assert!(contacts_a.outgoing_requests.is_empty());
5354 let contacts_a2 = client_a2.summarize_contacts(&cx_a2);
5355 assert_eq!(contacts_a2.current, &["user_b"]);
5356 assert!(contacts_a2.outgoing_requests.is_empty());
5357
5358 // Contacts are present upon connecting (tested here via disconnect/reconnect)
5359 disconnect_and_reconnect(&client_a, cx_a).await;
5360 disconnect_and_reconnect(&client_b, cx_b).await;
5361 disconnect_and_reconnect(&client_c, cx_c).await;
5362 executor.run_until_parked();
5363 assert_eq!(client_a.summarize_contacts(&cx_a).current, &["user_b"]);
5364 assert_eq!(client_b.summarize_contacts(&cx_b).current, &["user_a"]);
5365 assert_eq!(
5366 client_b.summarize_contacts(&cx_b).incoming_requests,
5367 &["user_c"]
5368 );
5369 assert!(client_c.summarize_contacts(&cx_c).current.is_empty());
5370 assert_eq!(
5371 client_c.summarize_contacts(&cx_c).outgoing_requests,
5372 &["user_b"]
5373 );
5374
5375 // User B rejects the request from user C.
5376 client_b
5377 .user_store
5378 .update(cx_b, |store, cx| {
5379 store.respond_to_contact_request(client_c.user_id().unwrap(), false, cx)
5380 })
5381 .await
5382 .unwrap();
5383
5384 executor.run_until_parked();
5385
5386 // User B doesn't see user C as their contact, and the incoming request from them is removed.
5387 let contacts_b = client_b.summarize_contacts(&cx_b);
5388 assert_eq!(contacts_b.current, &["user_a"]);
5389 assert!(contacts_b.incoming_requests.is_empty());
5390 let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5391 assert_eq!(contacts_b2.current, &["user_a"]);
5392 assert!(contacts_b2.incoming_requests.is_empty());
5393
5394 // User C doesn't see user B as their contact, and the outgoing request to them is removed.
5395 let contacts_c = client_c.summarize_contacts(&cx_c);
5396 assert!(contacts_c.current.is_empty());
5397 assert!(contacts_c.outgoing_requests.is_empty());
5398 let contacts_c2 = client_c2.summarize_contacts(&cx_c2);
5399 assert!(contacts_c2.current.is_empty());
5400 assert!(contacts_c2.outgoing_requests.is_empty());
5401
5402 // Incoming/outgoing requests are not present upon connecting (tested here via disconnect/reconnect)
5403 disconnect_and_reconnect(&client_a, cx_a).await;
5404 disconnect_and_reconnect(&client_b, cx_b).await;
5405 disconnect_and_reconnect(&client_c, cx_c).await;
5406 executor.run_until_parked();
5407 assert_eq!(client_a.summarize_contacts(&cx_a).current, &["user_b"]);
5408 assert_eq!(client_b.summarize_contacts(&cx_b).current, &["user_a"]);
5409 assert!(client_b
5410 .summarize_contacts(&cx_b)
5411 .incoming_requests
5412 .is_empty());
5413 assert!(client_c.summarize_contacts(&cx_c).current.is_empty());
5414 assert!(client_c
5415 .summarize_contacts(&cx_c)
5416 .outgoing_requests
5417 .is_empty());
5418
5419 async fn disconnect_and_reconnect(client: &TestClient, cx: &mut TestAppContext) {
5420 client.disconnect(&cx.to_async()).unwrap();
5421 client.clear_contacts(cx).await;
5422 client
5423 .authenticate_and_connect(false, &cx.to_async())
5424 .await
5425 .unwrap();
5426 }
5427 }
5428
5429 #[gpui::test(iterations = 10)]
5430 async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5431 cx_a.foreground().forbid_parking();
5432 let fs = FakeFs::new(cx_a.background());
5433
5434 // 2 clients connect to a server.
5435 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5436 let mut client_a = server.create_client(cx_a, "user_a").await;
5437 let mut client_b = server.create_client(cx_b, "user_b").await;
5438 server
5439 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5440 .await;
5441 cx_a.update(editor::init);
5442 cx_b.update(editor::init);
5443
5444 // Client A shares a project.
5445 fs.insert_tree(
5446 "/a",
5447 json!({
5448 "1.txt": "one",
5449 "2.txt": "two",
5450 "3.txt": "three",
5451 }),
5452 )
5453 .await;
5454 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5455 project_a
5456 .update(cx_a, |project, cx| project.share(cx))
5457 .await
5458 .unwrap();
5459
5460 // Client B joins the project.
5461 let project_b = client_b
5462 .build_remote_project(
5463 project_a
5464 .read_with(cx_a, |project, _| project.remote_id())
5465 .unwrap(),
5466 cx_b,
5467 )
5468 .await;
5469
5470 // Client A opens some editors.
5471 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5472 let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5473 let editor_a1 = workspace_a
5474 .update(cx_a, |workspace, cx| {
5475 workspace.open_path((worktree_id, "1.txt"), true, cx)
5476 })
5477 .await
5478 .unwrap()
5479 .downcast::<Editor>()
5480 .unwrap();
5481 let editor_a2 = workspace_a
5482 .update(cx_a, |workspace, cx| {
5483 workspace.open_path((worktree_id, "2.txt"), true, cx)
5484 })
5485 .await
5486 .unwrap()
5487 .downcast::<Editor>()
5488 .unwrap();
5489
5490 // Client B opens an editor.
5491 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5492 let editor_b1 = workspace_b
5493 .update(cx_b, |workspace, cx| {
5494 workspace.open_path((worktree_id, "1.txt"), true, cx)
5495 })
5496 .await
5497 .unwrap()
5498 .downcast::<Editor>()
5499 .unwrap();
5500
5501 let client_a_id = project_b.read_with(cx_b, |project, _| {
5502 project.collaborators().values().next().unwrap().peer_id
5503 });
5504 let client_b_id = project_a.read_with(cx_a, |project, _| {
5505 project.collaborators().values().next().unwrap().peer_id
5506 });
5507
5508 // When client B starts following client A, all visible view states are replicated to client B.
5509 editor_a1.update(cx_a, |editor, cx| editor.select_ranges([0..1], None, cx));
5510 editor_a2.update(cx_a, |editor, cx| editor.select_ranges([2..3], None, cx));
5511 workspace_b
5512 .update(cx_b, |workspace, cx| {
5513 workspace
5514 .toggle_follow(&ToggleFollow(client_a_id), cx)
5515 .unwrap()
5516 })
5517 .await
5518 .unwrap();
5519 let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5520 workspace
5521 .active_item(cx)
5522 .unwrap()
5523 .downcast::<Editor>()
5524 .unwrap()
5525 });
5526 assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
5527 assert_eq!(
5528 editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
5529 Some((worktree_id, "2.txt").into())
5530 );
5531 assert_eq!(
5532 editor_b2.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
5533 vec![2..3]
5534 );
5535 assert_eq!(
5536 editor_b1.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
5537 vec![0..1]
5538 );
5539
5540 // When client A activates a different editor, client B does so as well.
5541 workspace_a.update(cx_a, |workspace, cx| {
5542 workspace.activate_item(&editor_a1, cx)
5543 });
5544 workspace_b
5545 .condition(cx_b, |workspace, cx| {
5546 workspace.active_item(cx).unwrap().id() == editor_b1.id()
5547 })
5548 .await;
5549
5550 // When client A navigates back and forth, client B does so as well.
5551 workspace_a
5552 .update(cx_a, |workspace, cx| {
5553 workspace::Pane::go_back(workspace, None, cx)
5554 })
5555 .await;
5556 workspace_b
5557 .condition(cx_b, |workspace, cx| {
5558 workspace.active_item(cx).unwrap().id() == editor_b2.id()
5559 })
5560 .await;
5561
5562 workspace_a
5563 .update(cx_a, |workspace, cx| {
5564 workspace::Pane::go_forward(workspace, None, cx)
5565 })
5566 .await;
5567 workspace_b
5568 .condition(cx_b, |workspace, cx| {
5569 workspace.active_item(cx).unwrap().id() == editor_b1.id()
5570 })
5571 .await;
5572
5573 // Changes to client A's editor are reflected on client B.
5574 editor_a1.update(cx_a, |editor, cx| {
5575 editor.select_ranges([1..1, 2..2], None, cx);
5576 });
5577 editor_b1
5578 .condition(cx_b, |editor, cx| {
5579 editor.selected_ranges(cx) == vec![1..1, 2..2]
5580 })
5581 .await;
5582
5583 editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
5584 editor_b1
5585 .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
5586 .await;
5587
5588 editor_a1.update(cx_a, |editor, cx| {
5589 editor.select_ranges([3..3], None, cx);
5590 editor.set_scroll_position(vec2f(0., 100.), cx);
5591 });
5592 editor_b1
5593 .condition(cx_b, |editor, cx| editor.selected_ranges(cx) == vec![3..3])
5594 .await;
5595
5596 // After unfollowing, client B stops receiving updates from client A.
5597 workspace_b.update(cx_b, |workspace, cx| {
5598 workspace.unfollow(&workspace.active_pane().clone(), cx)
5599 });
5600 workspace_a.update(cx_a, |workspace, cx| {
5601 workspace.activate_item(&editor_a2, cx)
5602 });
5603 cx_a.foreground().run_until_parked();
5604 assert_eq!(
5605 workspace_b.read_with(cx_b, |workspace, cx| workspace
5606 .active_item(cx)
5607 .unwrap()
5608 .id()),
5609 editor_b1.id()
5610 );
5611
5612 // Client A starts following client B.
5613 workspace_a
5614 .update(cx_a, |workspace, cx| {
5615 workspace
5616 .toggle_follow(&ToggleFollow(client_b_id), cx)
5617 .unwrap()
5618 })
5619 .await
5620 .unwrap();
5621 assert_eq!(
5622 workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5623 Some(client_b_id)
5624 );
5625 assert_eq!(
5626 workspace_a.read_with(cx_a, |workspace, cx| workspace
5627 .active_item(cx)
5628 .unwrap()
5629 .id()),
5630 editor_a1.id()
5631 );
5632
5633 // Following interrupts when client B disconnects.
5634 client_b.disconnect(&cx_b.to_async()).unwrap();
5635 cx_a.foreground().run_until_parked();
5636 assert_eq!(
5637 workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5638 None
5639 );
5640 }
5641
5642 #[gpui::test(iterations = 10)]
5643 async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5644 cx_a.foreground().forbid_parking();
5645 let fs = FakeFs::new(cx_a.background());
5646
5647 // 2 clients connect to a server.
5648 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5649 let mut client_a = server.create_client(cx_a, "user_a").await;
5650 let mut client_b = server.create_client(cx_b, "user_b").await;
5651 server
5652 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5653 .await;
5654 cx_a.update(editor::init);
5655 cx_b.update(editor::init);
5656
5657 // Client A shares a project.
5658 fs.insert_tree(
5659 "/a",
5660 json!({
5661 "1.txt": "one",
5662 "2.txt": "two",
5663 "3.txt": "three",
5664 "4.txt": "four",
5665 }),
5666 )
5667 .await;
5668 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5669 project_a
5670 .update(cx_a, |project, cx| project.share(cx))
5671 .await
5672 .unwrap();
5673
5674 // Client B joins the project.
5675 let project_b = client_b
5676 .build_remote_project(
5677 project_a
5678 .read_with(cx_a, |project, _| project.remote_id())
5679 .unwrap(),
5680 cx_b,
5681 )
5682 .await;
5683
5684 // Client A opens some editors.
5685 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5686 let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5687 let _editor_a1 = workspace_a
5688 .update(cx_a, |workspace, cx| {
5689 workspace.open_path((worktree_id, "1.txt"), true, cx)
5690 })
5691 .await
5692 .unwrap()
5693 .downcast::<Editor>()
5694 .unwrap();
5695
5696 // Client B opens an editor.
5697 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5698 let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5699 let _editor_b1 = workspace_b
5700 .update(cx_b, |workspace, cx| {
5701 workspace.open_path((worktree_id, "2.txt"), true, cx)
5702 })
5703 .await
5704 .unwrap()
5705 .downcast::<Editor>()
5706 .unwrap();
5707
5708 // Clients A and B follow each other in split panes
5709 workspace_a
5710 .update(cx_a, |workspace, cx| {
5711 workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5712 assert_ne!(*workspace.active_pane(), pane_a1);
5713 let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
5714 workspace
5715 .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5716 .unwrap()
5717 })
5718 .await
5719 .unwrap();
5720 workspace_b
5721 .update(cx_b, |workspace, cx| {
5722 workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5723 assert_ne!(*workspace.active_pane(), pane_b1);
5724 let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
5725 workspace
5726 .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5727 .unwrap()
5728 })
5729 .await
5730 .unwrap();
5731
5732 workspace_a
5733 .update(cx_a, |workspace, cx| {
5734 workspace.activate_next_pane(cx);
5735 assert_eq!(*workspace.active_pane(), pane_a1);
5736 workspace.open_path((worktree_id, "3.txt"), true, cx)
5737 })
5738 .await
5739 .unwrap();
5740 workspace_b
5741 .update(cx_b, |workspace, cx| {
5742 workspace.activate_next_pane(cx);
5743 assert_eq!(*workspace.active_pane(), pane_b1);
5744 workspace.open_path((worktree_id, "4.txt"), true, cx)
5745 })
5746 .await
5747 .unwrap();
5748 cx_a.foreground().run_until_parked();
5749
5750 // Ensure leader updates don't change the active pane of followers
5751 workspace_a.read_with(cx_a, |workspace, _| {
5752 assert_eq!(*workspace.active_pane(), pane_a1);
5753 });
5754 workspace_b.read_with(cx_b, |workspace, _| {
5755 assert_eq!(*workspace.active_pane(), pane_b1);
5756 });
5757
5758 // Ensure peers following each other doesn't cause an infinite loop.
5759 assert_eq!(
5760 workspace_a.read_with(cx_a, |workspace, cx| workspace
5761 .active_item(cx)
5762 .unwrap()
5763 .project_path(cx)),
5764 Some((worktree_id, "3.txt").into())
5765 );
5766 workspace_a.update(cx_a, |workspace, cx| {
5767 assert_eq!(
5768 workspace.active_item(cx).unwrap().project_path(cx),
5769 Some((worktree_id, "3.txt").into())
5770 );
5771 workspace.activate_next_pane(cx);
5772 assert_eq!(
5773 workspace.active_item(cx).unwrap().project_path(cx),
5774 Some((worktree_id, "4.txt").into())
5775 );
5776 });
5777 workspace_b.update(cx_b, |workspace, cx| {
5778 assert_eq!(
5779 workspace.active_item(cx).unwrap().project_path(cx),
5780 Some((worktree_id, "4.txt").into())
5781 );
5782 workspace.activate_next_pane(cx);
5783 assert_eq!(
5784 workspace.active_item(cx).unwrap().project_path(cx),
5785 Some((worktree_id, "3.txt").into())
5786 );
5787 });
5788 }
5789
5790 #[gpui::test(iterations = 10)]
5791 async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5792 cx_a.foreground().forbid_parking();
5793 let fs = FakeFs::new(cx_a.background());
5794
5795 // 2 clients connect to a server.
5796 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5797 let mut client_a = server.create_client(cx_a, "user_a").await;
5798 let mut client_b = server.create_client(cx_b, "user_b").await;
5799 server
5800 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5801 .await;
5802 cx_a.update(editor::init);
5803 cx_b.update(editor::init);
5804
5805 // Client A shares a project.
5806 fs.insert_tree(
5807 "/a",
5808 json!({
5809 "1.txt": "one",
5810 "2.txt": "two",
5811 "3.txt": "three",
5812 }),
5813 )
5814 .await;
5815 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5816 project_a
5817 .update(cx_a, |project, cx| project.share(cx))
5818 .await
5819 .unwrap();
5820
5821 // Client B joins the project.
5822 let project_b = client_b
5823 .build_remote_project(
5824 project_a
5825 .read_with(cx_a, |project, _| project.remote_id())
5826 .unwrap(),
5827 cx_b,
5828 )
5829 .await;
5830
5831 // Client A opens some editors.
5832 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5833 let _editor_a1 = workspace_a
5834 .update(cx_a, |workspace, cx| {
5835 workspace.open_path((worktree_id, "1.txt"), true, cx)
5836 })
5837 .await
5838 .unwrap()
5839 .downcast::<Editor>()
5840 .unwrap();
5841
5842 // Client B starts following client A.
5843 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5844 let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5845 let leader_id = project_b.read_with(cx_b, |project, _| {
5846 project.collaborators().values().next().unwrap().peer_id
5847 });
5848 workspace_b
5849 .update(cx_b, |workspace, cx| {
5850 workspace
5851 .toggle_follow(&ToggleFollow(leader_id), cx)
5852 .unwrap()
5853 })
5854 .await
5855 .unwrap();
5856 assert_eq!(
5857 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5858 Some(leader_id)
5859 );
5860 let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5861 workspace
5862 .active_item(cx)
5863 .unwrap()
5864 .downcast::<Editor>()
5865 .unwrap()
5866 });
5867
5868 // When client B moves, it automatically stops following client A.
5869 editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
5870 assert_eq!(
5871 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5872 None
5873 );
5874
5875 workspace_b
5876 .update(cx_b, |workspace, cx| {
5877 workspace
5878 .toggle_follow(&ToggleFollow(leader_id), cx)
5879 .unwrap()
5880 })
5881 .await
5882 .unwrap();
5883 assert_eq!(
5884 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5885 Some(leader_id)
5886 );
5887
5888 // When client B edits, it automatically stops following client A.
5889 editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
5890 assert_eq!(
5891 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5892 None
5893 );
5894
5895 workspace_b
5896 .update(cx_b, |workspace, cx| {
5897 workspace
5898 .toggle_follow(&ToggleFollow(leader_id), cx)
5899 .unwrap()
5900 })
5901 .await
5902 .unwrap();
5903 assert_eq!(
5904 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5905 Some(leader_id)
5906 );
5907
5908 // When client B scrolls, it automatically stops following client A.
5909 editor_b2.update(cx_b, |editor, cx| {
5910 editor.set_scroll_position(vec2f(0., 3.), cx)
5911 });
5912 assert_eq!(
5913 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5914 None
5915 );
5916
5917 workspace_b
5918 .update(cx_b, |workspace, cx| {
5919 workspace
5920 .toggle_follow(&ToggleFollow(leader_id), cx)
5921 .unwrap()
5922 })
5923 .await
5924 .unwrap();
5925 assert_eq!(
5926 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5927 Some(leader_id)
5928 );
5929
5930 // When client B activates a different pane, it continues following client A in the original pane.
5931 workspace_b.update(cx_b, |workspace, cx| {
5932 workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
5933 });
5934 assert_eq!(
5935 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5936 Some(leader_id)
5937 );
5938
5939 workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
5940 assert_eq!(
5941 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5942 Some(leader_id)
5943 );
5944
5945 // When client B activates a different item in the original pane, it automatically stops following client A.
5946 workspace_b
5947 .update(cx_b, |workspace, cx| {
5948 workspace.open_path((worktree_id, "2.txt"), true, cx)
5949 })
5950 .await
5951 .unwrap();
5952 assert_eq!(
5953 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5954 None
5955 );
5956 }
5957
5958 #[gpui::test(iterations = 100)]
5959 async fn test_random_collaboration(
5960 cx: &mut TestAppContext,
5961 deterministic: Arc<Deterministic>,
5962 rng: StdRng,
5963 ) {
5964 cx.foreground().forbid_parking();
5965 let max_peers = env::var("MAX_PEERS")
5966 .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
5967 .unwrap_or(5);
5968 assert!(max_peers <= 5);
5969
5970 let max_operations = env::var("OPERATIONS")
5971 .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
5972 .unwrap_or(10);
5973
5974 let rng = Arc::new(Mutex::new(rng));
5975
5976 let guest_lang_registry = Arc::new(LanguageRegistry::test());
5977 let host_language_registry = Arc::new(LanguageRegistry::test());
5978
5979 let fs = FakeFs::new(cx.background());
5980 fs.insert_tree("/_collab", json!({"init": ""})).await;
5981
5982 let mut server = TestServer::start(cx.foreground(), cx.background()).await;
5983 let db = server.app_state.db.clone();
5984 let host_user_id = db.create_user("host", false).await.unwrap();
5985 for username in ["guest-1", "guest-2", "guest-3", "guest-4"] {
5986 let guest_user_id = db.create_user(username, false).await.unwrap();
5987 server
5988 .app_state
5989 .db
5990 .send_contact_request(guest_user_id, host_user_id)
5991 .await
5992 .unwrap();
5993 server
5994 .app_state
5995 .db
5996 .respond_to_contact_request(host_user_id, guest_user_id, true)
5997 .await
5998 .unwrap();
5999 }
6000
6001 let mut clients = Vec::new();
6002 let mut user_ids = Vec::new();
6003 let mut op_start_signals = Vec::new();
6004
6005 let mut next_entity_id = 100000;
6006 let mut host_cx = TestAppContext::new(
6007 cx.foreground_platform(),
6008 cx.platform(),
6009 deterministic.build_foreground(next_entity_id),
6010 deterministic.build_background(),
6011 cx.font_cache(),
6012 cx.leak_detector(),
6013 next_entity_id,
6014 );
6015 let host = server.create_client(&mut host_cx, "host").await;
6016 let host_project = host_cx.update(|cx| {
6017 Project::local(
6018 host.client.clone(),
6019 host.user_store.clone(),
6020 host_language_registry.clone(),
6021 fs.clone(),
6022 cx,
6023 )
6024 });
6025 let host_project_id = host_project
6026 .update(&mut host_cx, |p, _| p.next_remote_id())
6027 .await;
6028
6029 let (collab_worktree, _) = host_project
6030 .update(&mut host_cx, |project, cx| {
6031 project.find_or_create_local_worktree("/_collab", true, cx)
6032 })
6033 .await
6034 .unwrap();
6035 collab_worktree
6036 .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
6037 .await;
6038 host_project
6039 .update(&mut host_cx, |project, cx| project.share(cx))
6040 .await
6041 .unwrap();
6042
6043 // Set up fake language servers.
6044 let mut language = Language::new(
6045 LanguageConfig {
6046 name: "Rust".into(),
6047 path_suffixes: vec!["rs".to_string()],
6048 ..Default::default()
6049 },
6050 None,
6051 );
6052 let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
6053 name: "the-fake-language-server",
6054 capabilities: lsp::LanguageServer::full_capabilities(),
6055 initializer: Some(Box::new({
6056 let rng = rng.clone();
6057 let fs = fs.clone();
6058 let project = host_project.downgrade();
6059 move |fake_server: &mut FakeLanguageServer| {
6060 fake_server.handle_request::<lsp::request::Completion, _, _>(
6061 |_, _| async move {
6062 Ok(Some(lsp::CompletionResponse::Array(vec![
6063 lsp::CompletionItem {
6064 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
6065 range: lsp::Range::new(
6066 lsp::Position::new(0, 0),
6067 lsp::Position::new(0, 0),
6068 ),
6069 new_text: "the-new-text".to_string(),
6070 })),
6071 ..Default::default()
6072 },
6073 ])))
6074 },
6075 );
6076
6077 fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
6078 |_, _| async move {
6079 Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
6080 lsp::CodeAction {
6081 title: "the-code-action".to_string(),
6082 ..Default::default()
6083 },
6084 )]))
6085 },
6086 );
6087
6088 fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
6089 |params, _| async move {
6090 Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
6091 params.position,
6092 params.position,
6093 ))))
6094 },
6095 );
6096
6097 fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
6098 let fs = fs.clone();
6099 let rng = rng.clone();
6100 move |_, _| {
6101 let fs = fs.clone();
6102 let rng = rng.clone();
6103 async move {
6104 let files = fs.files().await;
6105 let mut rng = rng.lock();
6106 let count = rng.gen_range::<usize, _>(1..3);
6107 let files = (0..count)
6108 .map(|_| files.choose(&mut *rng).unwrap())
6109 .collect::<Vec<_>>();
6110 log::info!("LSP: Returning definitions in files {:?}", &files);
6111 Ok(Some(lsp::GotoDefinitionResponse::Array(
6112 files
6113 .into_iter()
6114 .map(|file| lsp::Location {
6115 uri: lsp::Url::from_file_path(file).unwrap(),
6116 range: Default::default(),
6117 })
6118 .collect(),
6119 )))
6120 }
6121 }
6122 });
6123
6124 fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
6125 let rng = rng.clone();
6126 let project = project.clone();
6127 move |params, mut cx| {
6128 let highlights = if let Some(project) = project.upgrade(&cx) {
6129 project.update(&mut cx, |project, cx| {
6130 let path = params
6131 .text_document_position_params
6132 .text_document
6133 .uri
6134 .to_file_path()
6135 .unwrap();
6136 let (worktree, relative_path) =
6137 project.find_local_worktree(&path, cx)?;
6138 let project_path =
6139 ProjectPath::from((worktree.read(cx).id(), relative_path));
6140 let buffer =
6141 project.get_open_buffer(&project_path, cx)?.read(cx);
6142
6143 let mut highlights = Vec::new();
6144 let highlight_count = rng.lock().gen_range(1..=5);
6145 let mut prev_end = 0;
6146 for _ in 0..highlight_count {
6147 let range =
6148 buffer.random_byte_range(prev_end, &mut *rng.lock());
6149
6150 highlights.push(lsp::DocumentHighlight {
6151 range: range_to_lsp(range.to_point_utf16(buffer)),
6152 kind: Some(lsp::DocumentHighlightKind::READ),
6153 });
6154 prev_end = range.end;
6155 }
6156 Some(highlights)
6157 })
6158 } else {
6159 None
6160 };
6161 async move { Ok(highlights) }
6162 }
6163 });
6164 }
6165 })),
6166 ..Default::default()
6167 });
6168 host_language_registry.add(Arc::new(language));
6169
6170 let op_start_signal = futures::channel::mpsc::unbounded();
6171 user_ids.push(host.current_user_id(&host_cx));
6172 op_start_signals.push(op_start_signal.0);
6173 clients.push(host_cx.foreground().spawn(host.simulate_host(
6174 host_project,
6175 op_start_signal.1,
6176 rng.clone(),
6177 host_cx,
6178 )));
6179
6180 let disconnect_host_at = if rng.lock().gen_bool(0.2) {
6181 rng.lock().gen_range(0..max_operations)
6182 } else {
6183 max_operations
6184 };
6185 let mut available_guests = vec![
6186 "guest-1".to_string(),
6187 "guest-2".to_string(),
6188 "guest-3".to_string(),
6189 "guest-4".to_string(),
6190 ];
6191 let mut operations = 0;
6192 while operations < max_operations {
6193 if operations == disconnect_host_at {
6194 server.disconnect_client(user_ids[0]);
6195 cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6196 drop(op_start_signals);
6197 let mut clients = futures::future::join_all(clients).await;
6198 cx.foreground().run_until_parked();
6199
6200 let (host, mut host_cx, host_err) = clients.remove(0);
6201 if let Some(host_err) = host_err {
6202 log::error!("host error - {}", host_err);
6203 }
6204 host.project
6205 .as_ref()
6206 .unwrap()
6207 .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
6208 for (guest, mut guest_cx, guest_err) in clients {
6209 if let Some(guest_err) = guest_err {
6210 log::error!("{} error - {}", guest.username, guest_err);
6211 }
6212 // TODO
6213 // let contacts = server
6214 // .store
6215 // .read()
6216 // .await
6217 // .contacts_for_user(guest.current_user_id(&guest_cx));
6218 // assert!(!contacts
6219 // .iter()
6220 // .flat_map(|contact| &contact.projects)
6221 // .any(|project| project.id == host_project_id));
6222 guest
6223 .project
6224 .as_ref()
6225 .unwrap()
6226 .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6227 guest_cx.update(|_| drop(guest));
6228 }
6229 host_cx.update(|_| drop(host));
6230
6231 return;
6232 }
6233
6234 let distribution = rng.lock().gen_range(0..100);
6235 match distribution {
6236 0..=19 if !available_guests.is_empty() => {
6237 let guest_ix = rng.lock().gen_range(0..available_guests.len());
6238 let guest_username = available_guests.remove(guest_ix);
6239 log::info!("Adding new connection for {}", guest_username);
6240 next_entity_id += 100000;
6241 let mut guest_cx = TestAppContext::new(
6242 cx.foreground_platform(),
6243 cx.platform(),
6244 deterministic.build_foreground(next_entity_id),
6245 deterministic.build_background(),
6246 cx.font_cache(),
6247 cx.leak_detector(),
6248 next_entity_id,
6249 );
6250 let guest = server.create_client(&mut guest_cx, &guest_username).await;
6251 let guest_project = Project::remote(
6252 host_project_id,
6253 guest.client.clone(),
6254 guest.user_store.clone(),
6255 guest_lang_registry.clone(),
6256 FakeFs::new(cx.background()),
6257 &mut guest_cx.to_async(),
6258 )
6259 .await
6260 .unwrap();
6261 let op_start_signal = futures::channel::mpsc::unbounded();
6262 user_ids.push(guest.current_user_id(&guest_cx));
6263 op_start_signals.push(op_start_signal.0);
6264 clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
6265 guest_username.clone(),
6266 guest_project,
6267 op_start_signal.1,
6268 rng.clone(),
6269 guest_cx,
6270 )));
6271
6272 log::info!("Added connection for {}", guest_username);
6273 operations += 1;
6274 }
6275 20..=29 if clients.len() > 1 => {
6276 let guest_ix = rng.lock().gen_range(1..clients.len());
6277 log::info!("Removing guest {}", user_ids[guest_ix]);
6278 let removed_guest_id = user_ids.remove(guest_ix);
6279 let guest = clients.remove(guest_ix);
6280 op_start_signals.remove(guest_ix);
6281 server.disconnect_client(removed_guest_id);
6282 cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6283 let (guest, mut guest_cx, guest_err) = guest.await;
6284 if let Some(guest_err) = guest_err {
6285 log::error!("{} error - {}", guest.username, guest_err);
6286 }
6287 guest
6288 .project
6289 .as_ref()
6290 .unwrap()
6291 .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6292 // TODO
6293 // for user_id in &user_ids {
6294 // for contact in server.store.read().await.contacts_for_user(*user_id) {
6295 // assert_ne!(
6296 // contact.user_id, removed_guest_id.0 as u64,
6297 // "removed guest is still a contact of another peer"
6298 // );
6299 // for project in contact.projects {
6300 // for project_guest_id in project.guests {
6301 // assert_ne!(
6302 // project_guest_id, removed_guest_id.0 as u64,
6303 // "removed guest appears as still participating on a project"
6304 // );
6305 // }
6306 // }
6307 // }
6308 // }
6309
6310 log::info!("{} removed", guest.username);
6311 available_guests.push(guest.username.clone());
6312 guest_cx.update(|_| drop(guest));
6313
6314 operations += 1;
6315 }
6316 _ => {
6317 while operations < max_operations && rng.lock().gen_bool(0.7) {
6318 op_start_signals
6319 .choose(&mut *rng.lock())
6320 .unwrap()
6321 .unbounded_send(())
6322 .unwrap();
6323 operations += 1;
6324 }
6325
6326 if rng.lock().gen_bool(0.8) {
6327 cx.foreground().run_until_parked();
6328 }
6329 }
6330 }
6331 }
6332
6333 drop(op_start_signals);
6334 let mut clients = futures::future::join_all(clients).await;
6335 cx.foreground().run_until_parked();
6336
6337 let (host_client, mut host_cx, host_err) = clients.remove(0);
6338 if let Some(host_err) = host_err {
6339 panic!("host error - {}", host_err);
6340 }
6341 let host_project = host_client.project.as_ref().unwrap();
6342 let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
6343 project
6344 .worktrees(cx)
6345 .map(|worktree| {
6346 let snapshot = worktree.read(cx).snapshot();
6347 (snapshot.id(), snapshot)
6348 })
6349 .collect::<BTreeMap<_, _>>()
6350 });
6351
6352 host_client
6353 .project
6354 .as_ref()
6355 .unwrap()
6356 .read_with(&host_cx, |project, cx| project.check_invariants(cx));
6357
6358 for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
6359 if let Some(guest_err) = guest_err {
6360 panic!("{} error - {}", guest_client.username, guest_err);
6361 }
6362 let worktree_snapshots =
6363 guest_client
6364 .project
6365 .as_ref()
6366 .unwrap()
6367 .read_with(&guest_cx, |project, cx| {
6368 project
6369 .worktrees(cx)
6370 .map(|worktree| {
6371 let worktree = worktree.read(cx);
6372 (worktree.id(), worktree.snapshot())
6373 })
6374 .collect::<BTreeMap<_, _>>()
6375 });
6376
6377 assert_eq!(
6378 worktree_snapshots.keys().collect::<Vec<_>>(),
6379 host_worktree_snapshots.keys().collect::<Vec<_>>(),
6380 "{} has different worktrees than the host",
6381 guest_client.username
6382 );
6383 for (id, host_snapshot) in &host_worktree_snapshots {
6384 let guest_snapshot = &worktree_snapshots[id];
6385 assert_eq!(
6386 guest_snapshot.root_name(),
6387 host_snapshot.root_name(),
6388 "{} has different root name than the host for worktree {}",
6389 guest_client.username,
6390 id
6391 );
6392 assert_eq!(
6393 guest_snapshot.entries(false).collect::<Vec<_>>(),
6394 host_snapshot.entries(false).collect::<Vec<_>>(),
6395 "{} has different snapshot than the host for worktree {}",
6396 guest_client.username,
6397 id
6398 );
6399 assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id());
6400 }
6401
6402 guest_client
6403 .project
6404 .as_ref()
6405 .unwrap()
6406 .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
6407
6408 for guest_buffer in &guest_client.buffers {
6409 let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
6410 let host_buffer = host_project.read_with(&host_cx, |project, cx| {
6411 project.buffer_for_id(buffer_id, cx).expect(&format!(
6412 "host does not have buffer for guest:{}, peer:{}, id:{}",
6413 guest_client.username, guest_client.peer_id, buffer_id
6414 ))
6415 });
6416 let path = host_buffer
6417 .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
6418
6419 assert_eq!(
6420 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
6421 0,
6422 "{}, buffer {}, path {:?} has deferred operations",
6423 guest_client.username,
6424 buffer_id,
6425 path,
6426 );
6427 assert_eq!(
6428 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
6429 host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
6430 "{}, buffer {}, path {:?}, differs from the host's buffer",
6431 guest_client.username,
6432 buffer_id,
6433 path
6434 );
6435 }
6436
6437 guest_cx.update(|_| drop(guest_client));
6438 }
6439
6440 host_cx.update(|_| drop(host_client));
6441 }
6442
6443 struct TestServer {
6444 peer: Arc<Peer>,
6445 app_state: Arc<AppState>,
6446 server: Arc<Server>,
6447 foreground: Rc<executor::Foreground>,
6448 notifications: mpsc::UnboundedReceiver<()>,
6449 connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
6450 forbid_connections: Arc<AtomicBool>,
6451 _test_db: TestDb,
6452 }
6453
6454 impl TestServer {
6455 async fn start(
6456 foreground: Rc<executor::Foreground>,
6457 background: Arc<executor::Background>,
6458 ) -> Self {
6459 let test_db = TestDb::fake(background);
6460 let app_state = Self::build_app_state(&test_db).await;
6461 let peer = Peer::new();
6462 let notifications = mpsc::unbounded();
6463 let server = Server::new(app_state.clone(), Some(notifications.0));
6464 Self {
6465 peer,
6466 app_state,
6467 server,
6468 foreground,
6469 notifications: notifications.1,
6470 connection_killers: Default::default(),
6471 forbid_connections: Default::default(),
6472 _test_db: test_db,
6473 }
6474 }
6475
6476 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
6477 cx.update(|cx| {
6478 let settings = Settings::test(cx);
6479 cx.set_global(settings);
6480 });
6481
6482 let http = FakeHttpClient::with_404_response();
6483 let user_id =
6484 if let Ok(Some(user)) = self.app_state.db.get_user_by_github_login(name).await {
6485 user.id
6486 } else {
6487 self.app_state.db.create_user(name, false).await.unwrap()
6488 };
6489 let client_name = name.to_string();
6490 let mut client = Client::new(http.clone());
6491 let server = self.server.clone();
6492 let connection_killers = self.connection_killers.clone();
6493 let forbid_connections = self.forbid_connections.clone();
6494 let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
6495
6496 Arc::get_mut(&mut client)
6497 .unwrap()
6498 .override_authenticate(move |cx| {
6499 cx.spawn(|_| async move {
6500 let access_token = "the-token".to_string();
6501 Ok(Credentials {
6502 user_id: user_id.0 as u64,
6503 access_token,
6504 })
6505 })
6506 })
6507 .override_establish_connection(move |credentials, cx| {
6508 assert_eq!(credentials.user_id, user_id.0 as u64);
6509 assert_eq!(credentials.access_token, "the-token");
6510
6511 let server = server.clone();
6512 let connection_killers = connection_killers.clone();
6513 let forbid_connections = forbid_connections.clone();
6514 let client_name = client_name.clone();
6515 let connection_id_tx = connection_id_tx.clone();
6516 cx.spawn(move |cx| async move {
6517 if forbid_connections.load(SeqCst) {
6518 Err(EstablishConnectionError::other(anyhow!(
6519 "server is forbidding connections"
6520 )))
6521 } else {
6522 let (client_conn, server_conn, killed) =
6523 Connection::in_memory(cx.background());
6524 connection_killers.lock().insert(user_id, killed);
6525 cx.background()
6526 .spawn(server.handle_connection(
6527 server_conn,
6528 client_name,
6529 user_id,
6530 Some(connection_id_tx),
6531 cx.background(),
6532 ))
6533 .detach();
6534 Ok(client_conn)
6535 }
6536 })
6537 });
6538
6539 client
6540 .authenticate_and_connect(false, &cx.to_async())
6541 .await
6542 .unwrap();
6543
6544 Channel::init(&client);
6545 Project::init(&client);
6546 cx.update(|cx| {
6547 workspace::init(&client, cx);
6548 });
6549
6550 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
6551 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
6552
6553 let client = TestClient {
6554 client,
6555 peer_id,
6556 username: name.to_string(),
6557 user_store,
6558 language_registry: Arc::new(LanguageRegistry::test()),
6559 project: Default::default(),
6560 buffers: Default::default(),
6561 };
6562 client.wait_for_current_user(cx).await;
6563 client
6564 }
6565
6566 fn disconnect_client(&self, user_id: UserId) {
6567 self.connection_killers
6568 .lock()
6569 .remove(&user_id)
6570 .unwrap()
6571 .store(true, SeqCst);
6572 }
6573
6574 fn forbid_connections(&self) {
6575 self.forbid_connections.store(true, SeqCst);
6576 }
6577
6578 fn allow_connections(&self) {
6579 self.forbid_connections.store(false, SeqCst);
6580 }
6581
6582 async fn make_contacts(&self, mut clients: Vec<(&TestClient, &mut TestAppContext)>) {
6583 while let Some((client_a, cx_a)) = clients.pop() {
6584 for (client_b, cx_b) in &mut clients {
6585 client_a
6586 .user_store
6587 .update(cx_a, |store, cx| {
6588 store.request_contact(client_b.user_id().unwrap(), cx)
6589 })
6590 .await
6591 .unwrap();
6592 cx_a.foreground().run_until_parked();
6593 client_b
6594 .user_store
6595 .update(*cx_b, |store, cx| {
6596 store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
6597 })
6598 .await
6599 .unwrap();
6600 }
6601 }
6602 }
6603
6604 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
6605 Arc::new(AppState {
6606 db: test_db.db().clone(),
6607 api_token: Default::default(),
6608 })
6609 }
6610
6611 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
6612 self.server.store.read().await
6613 }
6614
6615 async fn condition<F>(&mut self, mut predicate: F)
6616 where
6617 F: FnMut(&Store) -> bool,
6618 {
6619 assert!(
6620 self.foreground.parking_forbidden(),
6621 "you must call forbid_parking to use server conditions so we don't block indefinitely"
6622 );
6623 while !(predicate)(&*self.server.store.read().await) {
6624 self.foreground.start_waiting();
6625 self.notifications.next().await;
6626 self.foreground.finish_waiting();
6627 }
6628 }
6629 }
6630
6631 impl Deref for TestServer {
6632 type Target = Server;
6633
6634 fn deref(&self) -> &Self::Target {
6635 &self.server
6636 }
6637 }
6638
6639 impl Drop for TestServer {
6640 fn drop(&mut self) {
6641 self.peer.reset();
6642 }
6643 }
6644
6645 struct TestClient {
6646 client: Arc<Client>,
6647 username: String,
6648 pub peer_id: PeerId,
6649 pub user_store: ModelHandle<UserStore>,
6650 language_registry: Arc<LanguageRegistry>,
6651 project: Option<ModelHandle<Project>>,
6652 buffers: HashSet<ModelHandle<language::Buffer>>,
6653 }
6654
6655 impl Deref for TestClient {
6656 type Target = Arc<Client>;
6657
6658 fn deref(&self) -> &Self::Target {
6659 &self.client
6660 }
6661 }
6662
6663 struct ContactsSummary {
6664 pub current: Vec<String>,
6665 pub outgoing_requests: Vec<String>,
6666 pub incoming_requests: Vec<String>,
6667 }
6668
6669 impl TestClient {
6670 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
6671 UserId::from_proto(
6672 self.user_store
6673 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
6674 )
6675 }
6676
6677 async fn wait_for_current_user(&self, cx: &TestAppContext) {
6678 let mut authed_user = self
6679 .user_store
6680 .read_with(cx, |user_store, _| user_store.watch_current_user());
6681 while authed_user.next().await.unwrap().is_none() {}
6682 }
6683
6684 async fn clear_contacts(&self, cx: &mut TestAppContext) {
6685 self.user_store
6686 .update(cx, |store, _| store.clear_contacts())
6687 .await;
6688 }
6689
6690 fn summarize_contacts(&self, cx: &TestAppContext) -> ContactsSummary {
6691 self.user_store.read_with(cx, |store, _| ContactsSummary {
6692 current: store
6693 .contacts()
6694 .iter()
6695 .map(|contact| contact.user.github_login.clone())
6696 .collect(),
6697 outgoing_requests: store
6698 .outgoing_contact_requests()
6699 .iter()
6700 .map(|user| user.github_login.clone())
6701 .collect(),
6702 incoming_requests: store
6703 .incoming_contact_requests()
6704 .iter()
6705 .map(|user| user.github_login.clone())
6706 .collect(),
6707 })
6708 }
6709
6710 async fn build_local_project(
6711 &mut self,
6712 fs: Arc<FakeFs>,
6713 root_path: impl AsRef<Path>,
6714 cx: &mut TestAppContext,
6715 ) -> (ModelHandle<Project>, WorktreeId) {
6716 let project = cx.update(|cx| {
6717 Project::local(
6718 self.client.clone(),
6719 self.user_store.clone(),
6720 self.language_registry.clone(),
6721 fs,
6722 cx,
6723 )
6724 });
6725 self.project = Some(project.clone());
6726 let (worktree, _) = project
6727 .update(cx, |p, cx| {
6728 p.find_or_create_local_worktree(root_path, true, cx)
6729 })
6730 .await
6731 .unwrap();
6732 worktree
6733 .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
6734 .await;
6735 project
6736 .update(cx, |project, _| project.next_remote_id())
6737 .await;
6738 (project, worktree.read_with(cx, |tree, _| tree.id()))
6739 }
6740
6741 async fn build_remote_project(
6742 &mut self,
6743 project_id: u64,
6744 cx: &mut TestAppContext,
6745 ) -> ModelHandle<Project> {
6746 let project = Project::remote(
6747 project_id,
6748 self.client.clone(),
6749 self.user_store.clone(),
6750 self.language_registry.clone(),
6751 FakeFs::new(cx.background()),
6752 &mut cx.to_async(),
6753 )
6754 .await
6755 .unwrap();
6756 self.project = Some(project.clone());
6757 project
6758 }
6759
6760 fn build_workspace(
6761 &self,
6762 project: &ModelHandle<Project>,
6763 cx: &mut TestAppContext,
6764 ) -> ViewHandle<Workspace> {
6765 let (window_id, _) = cx.add_window(|_| EmptyView);
6766 cx.add_view(window_id, |cx| {
6767 let fs = project.read(cx).fs().clone();
6768 Workspace::new(
6769 &WorkspaceParams {
6770 fs,
6771 project: project.clone(),
6772 user_store: self.user_store.clone(),
6773 languages: self.language_registry.clone(),
6774 themes: ThemeRegistry::new((), cx.font_cache().clone()),
6775 channel_list: cx.add_model(|cx| {
6776 ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
6777 }),
6778 client: self.client.clone(),
6779 },
6780 cx,
6781 )
6782 })
6783 }
6784
6785 async fn simulate_host(
6786 mut self,
6787 project: ModelHandle<Project>,
6788 op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6789 rng: Arc<Mutex<StdRng>>,
6790 mut cx: TestAppContext,
6791 ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6792 async fn simulate_host_internal(
6793 client: &mut TestClient,
6794 project: ModelHandle<Project>,
6795 mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6796 rng: Arc<Mutex<StdRng>>,
6797 cx: &mut TestAppContext,
6798 ) -> anyhow::Result<()> {
6799 let fs = project.read_with(cx, |project, _| project.fs().clone());
6800
6801 while op_start_signal.next().await.is_some() {
6802 let distribution = rng.lock().gen_range::<usize, _>(0..100);
6803 let files = fs.as_fake().files().await;
6804 match distribution {
6805 0..=20 if !files.is_empty() => {
6806 let path = files.choose(&mut *rng.lock()).unwrap();
6807 let mut path = path.as_path();
6808 while let Some(parent_path) = path.parent() {
6809 path = parent_path;
6810 if rng.lock().gen() {
6811 break;
6812 }
6813 }
6814
6815 log::info!("Host: find/create local worktree {:?}", path);
6816 let find_or_create_worktree = project.update(cx, |project, cx| {
6817 project.find_or_create_local_worktree(path, true, cx)
6818 });
6819 if rng.lock().gen() {
6820 cx.background().spawn(find_or_create_worktree).detach();
6821 } else {
6822 find_or_create_worktree.await?;
6823 }
6824 }
6825 10..=80 if !files.is_empty() => {
6826 let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6827 let file = files.choose(&mut *rng.lock()).unwrap();
6828 let (worktree, path) = project
6829 .update(cx, |project, cx| {
6830 project.find_or_create_local_worktree(
6831 file.clone(),
6832 true,
6833 cx,
6834 )
6835 })
6836 .await?;
6837 let project_path =
6838 worktree.read_with(cx, |worktree, _| (worktree.id(), path));
6839 log::info!(
6840 "Host: opening path {:?}, worktree {}, relative_path {:?}",
6841 file,
6842 project_path.0,
6843 project_path.1
6844 );
6845 let buffer = project
6846 .update(cx, |project, cx| project.open_buffer(project_path, cx))
6847 .await
6848 .unwrap();
6849 client.buffers.insert(buffer.clone());
6850 buffer
6851 } else {
6852 client
6853 .buffers
6854 .iter()
6855 .choose(&mut *rng.lock())
6856 .unwrap()
6857 .clone()
6858 };
6859
6860 if rng.lock().gen_bool(0.1) {
6861 cx.update(|cx| {
6862 log::info!(
6863 "Host: dropping buffer {:?}",
6864 buffer.read(cx).file().unwrap().full_path(cx)
6865 );
6866 client.buffers.remove(&buffer);
6867 drop(buffer);
6868 });
6869 } else {
6870 buffer.update(cx, |buffer, cx| {
6871 log::info!(
6872 "Host: updating buffer {:?} ({})",
6873 buffer.file().unwrap().full_path(cx),
6874 buffer.remote_id()
6875 );
6876
6877 if rng.lock().gen_bool(0.7) {
6878 buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6879 } else {
6880 buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6881 }
6882 });
6883 }
6884 }
6885 _ => loop {
6886 let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
6887 let mut path = PathBuf::new();
6888 path.push("/");
6889 for _ in 0..path_component_count {
6890 let letter = rng.lock().gen_range(b'a'..=b'z');
6891 path.push(std::str::from_utf8(&[letter]).unwrap());
6892 }
6893 path.set_extension("rs");
6894 let parent_path = path.parent().unwrap();
6895
6896 log::info!("Host: creating file {:?}", path,);
6897
6898 if fs.create_dir(&parent_path).await.is_ok()
6899 && fs.create_file(&path, Default::default()).await.is_ok()
6900 {
6901 break;
6902 } else {
6903 log::info!("Host: cannot create file");
6904 }
6905 },
6906 }
6907
6908 cx.background().simulate_random_delay().await;
6909 }
6910
6911 Ok(())
6912 }
6913
6914 let result =
6915 simulate_host_internal(&mut self, project.clone(), op_start_signal, rng, &mut cx)
6916 .await;
6917 log::info!("Host done");
6918 self.project = Some(project);
6919 (self, cx, result.err())
6920 }
6921
6922 pub async fn simulate_guest(
6923 mut self,
6924 guest_username: String,
6925 project: ModelHandle<Project>,
6926 op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6927 rng: Arc<Mutex<StdRng>>,
6928 mut cx: TestAppContext,
6929 ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6930 async fn simulate_guest_internal(
6931 client: &mut TestClient,
6932 guest_username: &str,
6933 project: ModelHandle<Project>,
6934 mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6935 rng: Arc<Mutex<StdRng>>,
6936 cx: &mut TestAppContext,
6937 ) -> anyhow::Result<()> {
6938 while op_start_signal.next().await.is_some() {
6939 let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6940 let worktree = if let Some(worktree) =
6941 project.read_with(cx, |project, cx| {
6942 project
6943 .worktrees(&cx)
6944 .filter(|worktree| {
6945 let worktree = worktree.read(cx);
6946 worktree.is_visible()
6947 && worktree.entries(false).any(|e| e.is_file())
6948 })
6949 .choose(&mut *rng.lock())
6950 }) {
6951 worktree
6952 } else {
6953 cx.background().simulate_random_delay().await;
6954 continue;
6955 };
6956
6957 let (worktree_root_name, project_path) =
6958 worktree.read_with(cx, |worktree, _| {
6959 let entry = worktree
6960 .entries(false)
6961 .filter(|e| e.is_file())
6962 .choose(&mut *rng.lock())
6963 .unwrap();
6964 (
6965 worktree.root_name().to_string(),
6966 (worktree.id(), entry.path.clone()),
6967 )
6968 });
6969 log::info!(
6970 "{}: opening path {:?} in worktree {} ({})",
6971 guest_username,
6972 project_path.1,
6973 project_path.0,
6974 worktree_root_name,
6975 );
6976 let buffer = project
6977 .update(cx, |project, cx| {
6978 project.open_buffer(project_path.clone(), cx)
6979 })
6980 .await?;
6981 log::info!(
6982 "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
6983 guest_username,
6984 project_path.1,
6985 project_path.0,
6986 worktree_root_name,
6987 buffer.read_with(cx, |buffer, _| buffer.remote_id())
6988 );
6989 client.buffers.insert(buffer.clone());
6990 buffer
6991 } else {
6992 client
6993 .buffers
6994 .iter()
6995 .choose(&mut *rng.lock())
6996 .unwrap()
6997 .clone()
6998 };
6999
7000 let choice = rng.lock().gen_range(0..100);
7001 match choice {
7002 0..=9 => {
7003 cx.update(|cx| {
7004 log::info!(
7005 "{}: dropping buffer {:?}",
7006 guest_username,
7007 buffer.read(cx).file().unwrap().full_path(cx)
7008 );
7009 client.buffers.remove(&buffer);
7010 drop(buffer);
7011 });
7012 }
7013 10..=19 => {
7014 let completions = project.update(cx, |project, cx| {
7015 log::info!(
7016 "{}: requesting completions for buffer {} ({:?})",
7017 guest_username,
7018 buffer.read(cx).remote_id(),
7019 buffer.read(cx).file().unwrap().full_path(cx)
7020 );
7021 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7022 project.completions(&buffer, offset, cx)
7023 });
7024 let completions = cx.background().spawn(async move {
7025 completions
7026 .await
7027 .map_err(|err| anyhow!("completions request failed: {:?}", err))
7028 });
7029 if rng.lock().gen_bool(0.3) {
7030 log::info!("{}: detaching completions request", guest_username);
7031 cx.update(|cx| completions.detach_and_log_err(cx));
7032 } else {
7033 completions.await?;
7034 }
7035 }
7036 20..=29 => {
7037 let code_actions = project.update(cx, |project, cx| {
7038 log::info!(
7039 "{}: requesting code actions for buffer {} ({:?})",
7040 guest_username,
7041 buffer.read(cx).remote_id(),
7042 buffer.read(cx).file().unwrap().full_path(cx)
7043 );
7044 let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
7045 project.code_actions(&buffer, range, cx)
7046 });
7047 let code_actions = cx.background().spawn(async move {
7048 code_actions.await.map_err(|err| {
7049 anyhow!("code actions request failed: {:?}", err)
7050 })
7051 });
7052 if rng.lock().gen_bool(0.3) {
7053 log::info!("{}: detaching code actions request", guest_username);
7054 cx.update(|cx| code_actions.detach_and_log_err(cx));
7055 } else {
7056 code_actions.await?;
7057 }
7058 }
7059 30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
7060 let (requested_version, save) = buffer.update(cx, |buffer, cx| {
7061 log::info!(
7062 "{}: saving buffer {} ({:?})",
7063 guest_username,
7064 buffer.remote_id(),
7065 buffer.file().unwrap().full_path(cx)
7066 );
7067 (buffer.version(), buffer.save(cx))
7068 });
7069 let save = cx.background().spawn(async move {
7070 let (saved_version, _) = save
7071 .await
7072 .map_err(|err| anyhow!("save request failed: {:?}", err))?;
7073 assert!(saved_version.observed_all(&requested_version));
7074 Ok::<_, anyhow::Error>(())
7075 });
7076 if rng.lock().gen_bool(0.3) {
7077 log::info!("{}: detaching save request", guest_username);
7078 cx.update(|cx| save.detach_and_log_err(cx));
7079 } else {
7080 save.await?;
7081 }
7082 }
7083 40..=44 => {
7084 let prepare_rename = project.update(cx, |project, cx| {
7085 log::info!(
7086 "{}: preparing rename for buffer {} ({:?})",
7087 guest_username,
7088 buffer.read(cx).remote_id(),
7089 buffer.read(cx).file().unwrap().full_path(cx)
7090 );
7091 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7092 project.prepare_rename(buffer, offset, cx)
7093 });
7094 let prepare_rename = cx.background().spawn(async move {
7095 prepare_rename.await.map_err(|err| {
7096 anyhow!("prepare rename request failed: {:?}", err)
7097 })
7098 });
7099 if rng.lock().gen_bool(0.3) {
7100 log::info!("{}: detaching prepare rename request", guest_username);
7101 cx.update(|cx| prepare_rename.detach_and_log_err(cx));
7102 } else {
7103 prepare_rename.await?;
7104 }
7105 }
7106 45..=49 => {
7107 let definitions = project.update(cx, |project, cx| {
7108 log::info!(
7109 "{}: requesting definitions for buffer {} ({:?})",
7110 guest_username,
7111 buffer.read(cx).remote_id(),
7112 buffer.read(cx).file().unwrap().full_path(cx)
7113 );
7114 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7115 project.definition(&buffer, offset, cx)
7116 });
7117 let definitions = cx.background().spawn(async move {
7118 definitions
7119 .await
7120 .map_err(|err| anyhow!("definitions request failed: {:?}", err))
7121 });
7122 if rng.lock().gen_bool(0.3) {
7123 log::info!("{}: detaching definitions request", guest_username);
7124 cx.update(|cx| definitions.detach_and_log_err(cx));
7125 } else {
7126 client
7127 .buffers
7128 .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
7129 }
7130 }
7131 50..=54 => {
7132 let highlights = project.update(cx, |project, cx| {
7133 log::info!(
7134 "{}: requesting highlights for buffer {} ({:?})",
7135 guest_username,
7136 buffer.read(cx).remote_id(),
7137 buffer.read(cx).file().unwrap().full_path(cx)
7138 );
7139 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7140 project.document_highlights(&buffer, offset, cx)
7141 });
7142 let highlights = cx.background().spawn(async move {
7143 highlights
7144 .await
7145 .map_err(|err| anyhow!("highlights request failed: {:?}", err))
7146 });
7147 if rng.lock().gen_bool(0.3) {
7148 log::info!("{}: detaching highlights request", guest_username);
7149 cx.update(|cx| highlights.detach_and_log_err(cx));
7150 } else {
7151 highlights.await?;
7152 }
7153 }
7154 55..=59 => {
7155 let search = project.update(cx, |project, cx| {
7156 let query = rng.lock().gen_range('a'..='z');
7157 log::info!("{}: project-wide search {:?}", guest_username, query);
7158 project.search(SearchQuery::text(query, false, false), cx)
7159 });
7160 let search = cx.background().spawn(async move {
7161 search
7162 .await
7163 .map_err(|err| anyhow!("search request failed: {:?}", err))
7164 });
7165 if rng.lock().gen_bool(0.3) {
7166 log::info!("{}: detaching search request", guest_username);
7167 cx.update(|cx| search.detach_and_log_err(cx));
7168 } else {
7169 client.buffers.extend(search.await?.into_keys());
7170 }
7171 }
7172 60..=69 => {
7173 let worktree = project
7174 .read_with(cx, |project, cx| {
7175 project
7176 .worktrees(&cx)
7177 .filter(|worktree| {
7178 let worktree = worktree.read(cx);
7179 worktree.is_visible()
7180 && worktree.entries(false).any(|e| e.is_file())
7181 && worktree
7182 .root_entry()
7183 .map_or(false, |e| e.is_dir())
7184 })
7185 .choose(&mut *rng.lock())
7186 })
7187 .unwrap();
7188 let (worktree_id, worktree_root_name) = worktree
7189 .read_with(cx, |worktree, _| {
7190 (worktree.id(), worktree.root_name().to_string())
7191 });
7192
7193 let mut new_name = String::new();
7194 for _ in 0..10 {
7195 let letter = rng.lock().gen_range('a'..='z');
7196 new_name.push(letter);
7197 }
7198 let mut new_path = PathBuf::new();
7199 new_path.push(new_name);
7200 new_path.set_extension("rs");
7201 log::info!(
7202 "{}: creating {:?} in worktree {} ({})",
7203 guest_username,
7204 new_path,
7205 worktree_id,
7206 worktree_root_name,
7207 );
7208 project
7209 .update(cx, |project, cx| {
7210 project.create_entry((worktree_id, new_path), false, cx)
7211 })
7212 .unwrap()
7213 .await?;
7214 }
7215 _ => {
7216 buffer.update(cx, |buffer, cx| {
7217 log::info!(
7218 "{}: updating buffer {} ({:?})",
7219 guest_username,
7220 buffer.remote_id(),
7221 buffer.file().unwrap().full_path(cx)
7222 );
7223 if rng.lock().gen_bool(0.7) {
7224 buffer.randomly_edit(&mut *rng.lock(), 5, cx);
7225 } else {
7226 buffer.randomly_undo_redo(&mut *rng.lock(), cx);
7227 }
7228 });
7229 }
7230 }
7231 cx.background().simulate_random_delay().await;
7232 }
7233 Ok(())
7234 }
7235
7236 let result = simulate_guest_internal(
7237 &mut self,
7238 &guest_username,
7239 project.clone(),
7240 op_start_signal,
7241 rng,
7242 &mut cx,
7243 )
7244 .await;
7245 log::info!("{}: done", guest_username);
7246
7247 self.project = Some(project);
7248 (self, cx, result.err())
7249 }
7250 }
7251
7252 impl Drop for TestClient {
7253 fn drop(&mut self) {
7254 self.client.tear_down();
7255 }
7256 }
7257
7258 impl Executor for Arc<gpui::executor::Background> {
7259 type Sleep = gpui::executor::Timer;
7260
7261 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
7262 self.spawn(future).detach();
7263 }
7264
7265 fn sleep(&self, duration: Duration) -> Self::Sleep {
7266 self.as_ref().timer(duration)
7267 }
7268 }
7269
7270 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
7271 channel
7272 .messages()
7273 .cursor::<()>()
7274 .map(|m| {
7275 (
7276 m.sender.github_login.clone(),
7277 m.body.clone(),
7278 m.is_pending(),
7279 )
7280 })
7281 .collect()
7282 }
7283
7284 struct EmptyView;
7285
7286 impl gpui::Entity for EmptyView {
7287 type Event = ();
7288 }
7289
7290 impl gpui::View for EmptyView {
7291 fn ui_name() -> &'static str {
7292 "empty view"
7293 }
7294
7295 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
7296 gpui::Element::boxed(gpui::elements::Empty)
7297 }
7298 }
7299}