1mod store;
2
3use crate::{
4 auth,
5 db::{self, ChannelId, MessageId, User, UserId},
6 AppState, Result,
7};
8use anyhow::anyhow;
9use async_tungstenite::tungstenite::{
10 protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
11};
12use axum::{
13 body::Body,
14 extract::{
15 ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
16 ConnectInfo, WebSocketUpgrade,
17 },
18 headers::{Header, HeaderName},
19 http::StatusCode,
20 middleware,
21 response::IntoResponse,
22 routing::get,
23 Extension, Router, TypedHeader,
24};
25use collections::HashMap;
26use futures::{
27 channel::mpsc,
28 future::{self, BoxFuture},
29 FutureExt, SinkExt, StreamExt, TryStreamExt,
30};
31use lazy_static::lazy_static;
32use rpc::{
33 proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
34 Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
35};
36use serde::{Serialize, Serializer};
37use std::{
38 any::TypeId,
39 future::Future,
40 marker::PhantomData,
41 net::SocketAddr,
42 ops::{Deref, DerefMut},
43 rc::Rc,
44 sync::{
45 atomic::{AtomicBool, Ordering::SeqCst},
46 Arc,
47 },
48 time::Duration,
49};
50use store::{Store, Worktree};
51use time::OffsetDateTime;
52use tokio::{
53 sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
54 time::Sleep,
55};
56use tower::ServiceBuilder;
57use tracing::{info_span, instrument, Instrument};
58
59type MessageHandler =
60 Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
61
62struct Response<R> {
63 server: Arc<Server>,
64 receipt: Receipt<R>,
65 responded: Arc<AtomicBool>,
66}
67
68impl<R: RequestMessage> Response<R> {
69 fn send(self, payload: R::Response) -> Result<()> {
70 self.responded.store(true, SeqCst);
71 self.server.peer.respond(self.receipt, payload)?;
72 Ok(())
73 }
74
75 fn into_receipt(self) -> Receipt<R> {
76 self.responded.store(true, SeqCst);
77 self.receipt
78 }
79}
80
81pub struct Server {
82 peer: Arc<Peer>,
83 store: RwLock<Store>,
84 app_state: Arc<AppState>,
85 handlers: HashMap<TypeId, MessageHandler>,
86 notifications: Option<mpsc::UnboundedSender<()>>,
87}
88
89
90pub trait Executor: Send + Clone {
91 type Sleep: Send + Future;
92 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
93 fn sleep(&self, duration: Duration) -> Self::Sleep;
94}
95
96#[derive(Clone)]
97pub struct RealExecutor;
98
99const MESSAGE_COUNT_PER_PAGE: usize = 100;
100const MAX_MESSAGE_LEN: usize = 1024;
101
102struct StoreReadGuard<'a> {
103 guard: RwLockReadGuard<'a, Store>,
104 _not_send: PhantomData<Rc<()>>,
105}
106
107struct StoreWriteGuard<'a> {
108 guard: RwLockWriteGuard<'a, Store>,
109 _not_send: PhantomData<Rc<()>>,
110}
111
112#[derive(Serialize)]
113pub struct ServerSnapshot<'a> {
114 peer: &'a Peer,
115 #[serde(serialize_with = "serialize_deref")]
116 store: RwLockReadGuard<'a, Store>,
117}
118
119pub fn serialize_deref<S, T, U>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
120where
121 S: Serializer,
122 T: Deref<Target = U>,
123 U: Serialize
124{
125 Serialize::serialize(value.deref(), serializer)
126}
127
128
129impl Server {
130 pub fn new(
131 app_state: Arc<AppState>,
132 notifications: Option<mpsc::UnboundedSender<()>>,
133 ) -> Arc<Self> {
134 let mut server = Self {
135 peer: Peer::new(),
136 app_state,
137 store: Default::default(),
138 handlers: Default::default(),
139 notifications,
140 };
141
142 server
143 .add_request_handler(Server::ping)
144 .add_request_handler(Server::register_project)
145 .add_message_handler(Server::unregister_project)
146 .add_request_handler(Server::join_project)
147 .add_message_handler(Server::leave_project)
148 .add_message_handler(Server::respond_to_join_project_request)
149 .add_request_handler(Server::register_worktree)
150 .add_message_handler(Server::unregister_worktree)
151 .add_request_handler(Server::update_worktree)
152 .add_message_handler(Server::start_language_server)
153 .add_message_handler(Server::update_language_server)
154 .add_message_handler(Server::update_diagnostic_summary)
155 .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
156 .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
157 .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
158 .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
159 .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
160 .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
161 .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
162 .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
163 .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
164 .add_request_handler(
165 Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
166 )
167 .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
168 .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
169 .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
170 .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
171 .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
172 .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
173 .add_request_handler(Server::forward_project_request::<proto::CreateProjectEntry>)
174 .add_request_handler(Server::forward_project_request::<proto::RenameProjectEntry>)
175 .add_request_handler(Server::forward_project_request::<proto::DeleteProjectEntry>)
176 .add_request_handler(Server::update_buffer)
177 .add_message_handler(Server::update_buffer_file)
178 .add_message_handler(Server::buffer_reloaded)
179 .add_message_handler(Server::buffer_saved)
180 .add_request_handler(Server::save_buffer)
181 .add_request_handler(Server::get_channels)
182 .add_request_handler(Server::get_users)
183 .add_request_handler(Server::fuzzy_search_users)
184 .add_request_handler(Server::request_contact)
185 .add_request_handler(Server::remove_contact)
186 .add_request_handler(Server::respond_to_contact_request)
187 .add_request_handler(Server::join_channel)
188 .add_message_handler(Server::leave_channel)
189 .add_request_handler(Server::send_channel_message)
190 .add_request_handler(Server::follow)
191 .add_message_handler(Server::unfollow)
192 .add_message_handler(Server::update_followers)
193 .add_request_handler(Server::get_channel_messages);
194
195 Arc::new(server)
196 }
197
198 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
199 where
200 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
201 Fut: 'static + Send + Future<Output = Result<()>>,
202 M: EnvelopedMessage,
203 {
204 let prev_handler = self.handlers.insert(
205 TypeId::of::<M>(),
206 Box::new(move |server, envelope| {
207 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
208 let span = info_span!(
209 "handle message",
210 payload_type = envelope.payload_type_name(),
211 payload = format!("{:?}", envelope.payload).as_str(),
212 );
213 let future = (handler)(server, *envelope);
214 async move {
215 if let Err(error) = future.await {
216 tracing::error!(%error, "error handling message");
217 }
218 }
219 .instrument(span)
220 .boxed()
221 }),
222 );
223 if prev_handler.is_some() {
224 panic!("registered a handler for the same message twice");
225 }
226 self
227 }
228
229 /// Handle a request while holding a lock to the store. This is useful when we're registering
230 /// a connection but we want to respond on the connection before anybody else can send on it.
231 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
232 where
233 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>, Response<M>) -> Fut,
234 Fut: Send + Future<Output = Result<()>>,
235 M: RequestMessage,
236 {
237 let handler = Arc::new(handler);
238 self.add_message_handler(move |server, envelope| {
239 let receipt = envelope.receipt();
240 let handler = handler.clone();
241 async move {
242 let responded = Arc::new(AtomicBool::default());
243 let response = Response {
244 server: server.clone(),
245 responded: responded.clone(),
246 receipt: envelope.receipt(),
247 };
248 match (handler)(server.clone(), envelope, response).await {
249 Ok(()) => {
250 if responded.load(std::sync::atomic::Ordering::SeqCst) {
251 Ok(())
252 } else {
253 Err(anyhow!("handler did not send a response"))?
254 }
255 }
256 Err(error) => {
257 server.peer.respond_with_error(
258 receipt,
259 proto::Error {
260 message: error.to_string(),
261 },
262 )?;
263 Err(error)
264 }
265 }
266 }
267 })
268 }
269
270 pub fn handle_connection<E: Executor>(
271 self: &Arc<Self>,
272 connection: Connection,
273 address: String,
274 user: User,
275 mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
276 executor: E,
277 ) -> impl Future<Output = Result<()>> {
278 let mut this = self.clone();
279 let user_id = user.id;
280 let login = user.github_login;
281 let span = info_span!("handle connection", %user_id, %login, %address);
282 async move {
283 let (connection_id, handle_io, mut incoming_rx) = this
284 .peer
285 .add_connection(connection, {
286 let executor = executor.clone();
287 move |duration| {
288 let timer = executor.sleep(duration);
289 async move {
290 timer.await;
291 }
292 }
293 })
294 .await;
295
296 tracing::info!(%user_id, %login, %connection_id, %address, "connection opened");
297
298 if let Some(send_connection_id) = send_connection_id.as_mut() {
299 let _ = send_connection_id.send(connection_id).await;
300 }
301
302 if !user.connected_once {
303 this.peer.send(connection_id, proto::ShowContacts {})?;
304 this.app_state.db.set_user_connected_once(user_id, true).await?;
305 }
306
307 let (contacts, invite_code) = future::try_join(
308 this.app_state.db.get_contacts(user_id),
309 this.app_state.db.get_invite_code_for_user(user_id)
310 ).await?;
311
312 {
313 let mut store = this.store_mut().await;
314 store.add_connection(connection_id, user_id);
315 this.peer.send(connection_id, store.build_initial_contacts_update(contacts))?;
316
317 if let Some((code, count)) = invite_code {
318 this.peer.send(connection_id, proto::UpdateInviteInfo {
319 url: format!("{}{}", this.app_state.invite_link_prefix, code),
320 count,
321 })?;
322 }
323 }
324 this.update_user_contacts(user_id).await?;
325
326 let handle_io = handle_io.fuse();
327 futures::pin_mut!(handle_io);
328 loop {
329 let next_message = incoming_rx.next().fuse();
330 futures::pin_mut!(next_message);
331 futures::select_biased! {
332 result = handle_io => {
333 if let Err(error) = result {
334 tracing::error!(?error, %user_id, %login, %connection_id, %address, "error handling I/O");
335 }
336 break;
337 }
338 message = next_message => {
339 if let Some(message) = message {
340 let type_name = message.payload_type_name();
341 let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name);
342 async {
343 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
344 let notifications = this.notifications.clone();
345 let is_background = message.is_background();
346 let handle_message = (handler)(this.clone(), message);
347 let handle_message = async move {
348 handle_message.await;
349 if let Some(mut notifications) = notifications {
350 let _ = notifications.send(()).await;
351 }
352 };
353 if is_background {
354 executor.spawn_detached(handle_message);
355 } else {
356 handle_message.await;
357 }
358 } else {
359 tracing::error!(%user_id, %login, %connection_id, %address, "no message handler");
360 }
361 }.instrument(span).await;
362 } else {
363 tracing::info!(%user_id, %login, %connection_id, %address, "connection closed");
364 break;
365 }
366 }
367 }
368 }
369
370 tracing::info!(%user_id, %login, %connection_id, %address, "signing out");
371 if let Err(error) = this.sign_out(connection_id).await {
372 tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out");
373 }
374
375 Ok(())
376 }.instrument(span)
377 }
378
379 #[instrument(skip(self), err)]
380 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
381 self.peer.disconnect(connection_id);
382
383 let removed_user_id = {
384 let mut store = self.store_mut().await;
385 let removed_connection = store.remove_connection(connection_id)?;
386
387 for (project_id, project) in removed_connection.hosted_projects {
388 broadcast(connection_id, project.guests.keys().copied(), |conn_id| {
389 self.peer
390 .send(conn_id, proto::UnregisterProject { project_id })
391 });
392
393 for (_, receipts) in project.join_requests {
394 for receipt in receipts {
395 self.peer.respond(
396 receipt,
397 proto::JoinProjectResponse {
398 variant: Some(proto::join_project_response::Variant::Decline(
399 proto::join_project_response::Decline {
400 reason: proto::join_project_response::decline::Reason::WentOffline as i32
401 },
402 )),
403 },
404 )?;
405 }
406 }
407 }
408
409 for project_id in removed_connection.guest_project_ids {
410 if let Some(project) = store.project(project_id).trace_err() {
411 broadcast(connection_id, project.connection_ids(), |conn_id| {
412 self.peer.send(
413 conn_id,
414 proto::RemoveProjectCollaborator {
415 project_id,
416 peer_id: connection_id.0,
417 },
418 )
419 });
420 if project.guests.is_empty() {
421 self.peer
422 .send(
423 project.host_connection_id,
424 proto::ProjectUnshared { project_id },
425 )
426 .trace_err();
427 }
428 }
429 }
430
431 removed_connection.user_id
432 };
433
434 self.update_user_contacts(removed_user_id).await?;
435
436 Ok(())
437 }
438
439 pub async fn invite_code_redeemed(self: &Arc<Self>, code: &str, invitee_id: UserId) -> Result<()> {
440 let user = self.app_state.db.get_user_for_invite_code(code).await?;
441 let store = self.store().await;
442 let invitee_contact = store.contact_for_user(invitee_id, true);
443 for connection_id in store.connection_ids_for_user(user.id) {
444 self.peer.send(connection_id, proto::UpdateContacts {
445 contacts: vec![invitee_contact.clone()],
446 ..Default::default()
447 })?;
448 self.peer.send(connection_id, proto::UpdateInviteInfo {
449 url: format!("{}{}", self.app_state.invite_link_prefix, code),
450 count: user.invite_count as u32,
451 })?;
452 }
453 Ok(())
454 }
455
456 async fn ping(
457 self: Arc<Server>,
458 _: TypedEnvelope<proto::Ping>,
459 response: Response<proto::Ping>,
460 ) -> Result<()> {
461 response.send(proto::Ack {})?;
462 Ok(())
463 }
464
465 async fn register_project(
466 self: Arc<Server>,
467 request: TypedEnvelope<proto::RegisterProject>,
468 response: Response<proto::RegisterProject>,
469 ) -> Result<()> {
470 let user_id;
471 let project_id;
472 {
473 let mut state = self.store_mut().await;
474 user_id = state.user_id_for_connection(request.sender_id)?;
475 project_id = state.register_project(request.sender_id, user_id);
476 };
477 self.update_user_contacts(user_id).await?;
478 response.send(proto::RegisterProjectResponse { project_id })?;
479 Ok(())
480 }
481
482 async fn unregister_project(
483 self: Arc<Server>,
484 request: TypedEnvelope<proto::UnregisterProject>,
485 ) -> Result<()> {
486 let (user_id, project) = {
487 let mut state = self.store_mut().await;
488 let project =
489 state.unregister_project(request.payload.project_id, request.sender_id)?;
490 (state.user_id_for_connection(request.sender_id)?, project)
491 };
492
493 broadcast(
494 request.sender_id,
495 project.guests.keys().copied(),
496 |conn_id| {
497 self.peer.send(
498 conn_id,
499 proto::UnregisterProject {
500 project_id: request.payload.project_id,
501 },
502 )
503 },
504 );
505 for (_, receipts) in project.join_requests {
506 for receipt in receipts {
507 self.peer.respond(
508 receipt,
509 proto::JoinProjectResponse {
510 variant: Some(proto::join_project_response::Variant::Decline(
511 proto::join_project_response::Decline {
512 reason: proto::join_project_response::decline::Reason::Closed
513 as i32,
514 },
515 )),
516 },
517 )?;
518 }
519 }
520
521 self.update_user_contacts(user_id).await?;
522 Ok(())
523 }
524
525 async fn update_user_contacts(self: &Arc<Server>, user_id: UserId) -> Result<()> {
526 let contacts = self.app_state.db.get_contacts(user_id).await?;
527 let store = self.store().await;
528 let updated_contact = store.contact_for_user(user_id, false);
529 for contact in contacts {
530 if let db::Contact::Accepted {
531 user_id: contact_user_id,
532 ..
533 } = contact
534 {
535 for contact_conn_id in store.connection_ids_for_user(contact_user_id) {
536 self.peer
537 .send(
538 contact_conn_id,
539 proto::UpdateContacts {
540 contacts: vec![updated_contact.clone()],
541 remove_contacts: Default::default(),
542 incoming_requests: Default::default(),
543 remove_incoming_requests: Default::default(),
544 outgoing_requests: Default::default(),
545 remove_outgoing_requests: Default::default(),
546 },
547 )
548 .trace_err();
549 }
550 }
551 }
552 Ok(())
553 }
554
555 async fn join_project(
556 self: Arc<Server>,
557 request: TypedEnvelope<proto::JoinProject>,
558 response: Response<proto::JoinProject>,
559 ) -> Result<()> {
560 let project_id = request.payload.project_id;
561 let host_user_id;
562 let guest_user_id;
563 let host_connection_id;
564 {
565 let state = self.store().await;
566 let project = state.project(project_id)?;
567 host_user_id = project.host_user_id;
568 host_connection_id = project.host_connection_id;
569 guest_user_id = state.user_id_for_connection(request.sender_id)?;
570 };
571
572 let has_contact = self
573 .app_state
574 .db
575 .has_contact(guest_user_id, host_user_id)
576 .await?;
577 if !has_contact {
578 return Err(anyhow!("no such project"))?;
579 }
580
581 self.store_mut().await.request_join_project(
582 guest_user_id,
583 project_id,
584 response.into_receipt(),
585 )?;
586 self.peer.send(
587 host_connection_id,
588 proto::RequestJoinProject {
589 project_id,
590 requester_id: guest_user_id.to_proto(),
591 },
592 )?;
593 Ok(())
594 }
595
596 async fn respond_to_join_project_request(
597 self: Arc<Server>,
598 request: TypedEnvelope<proto::RespondToJoinProjectRequest>,
599 ) -> Result<()> {
600 let host_user_id;
601
602 {
603 let mut state = self.store_mut().await;
604 let project_id = request.payload.project_id;
605 let project = state.project(project_id)?;
606 if project.host_connection_id != request.sender_id {
607 Err(anyhow!("no such connection"))?;
608 }
609
610 host_user_id = project.host_user_id;
611 let guest_user_id = UserId::from_proto(request.payload.requester_id);
612
613 if !request.payload.allow {
614 let receipts = state
615 .deny_join_project_request(request.sender_id, guest_user_id, project_id)
616 .ok_or_else(|| anyhow!("no such request"))?;
617 for receipt in receipts {
618 self.peer.respond(
619 receipt,
620 proto::JoinProjectResponse {
621 variant: Some(proto::join_project_response::Variant::Decline(
622 proto::join_project_response::Decline {
623 reason: proto::join_project_response::decline::Reason::Declined
624 as i32,
625 },
626 )),
627 },
628 )?;
629 }
630 return Ok(());
631 }
632
633 let (receipts_with_replica_ids, project) = state
634 .accept_join_project_request(request.sender_id, guest_user_id, project_id)
635 .ok_or_else(|| anyhow!("no such request"))?;
636
637 let peer_count = project.guests.len();
638 let mut collaborators = Vec::with_capacity(peer_count);
639 collaborators.push(proto::Collaborator {
640 peer_id: project.host_connection_id.0,
641 replica_id: 0,
642 user_id: project.host_user_id.to_proto(),
643 });
644 let worktrees = project
645 .worktrees
646 .iter()
647 .filter_map(|(id, shared_worktree)| {
648 let worktree = project.worktrees.get(&id)?;
649 Some(proto::Worktree {
650 id: *id,
651 root_name: worktree.root_name.clone(),
652 entries: shared_worktree.entries.values().cloned().collect(),
653 diagnostic_summaries: shared_worktree
654 .diagnostic_summaries
655 .values()
656 .cloned()
657 .collect(),
658 visible: worktree.visible,
659 scan_id: shared_worktree.scan_id,
660 })
661 })
662 .collect::<Vec<_>>();
663
664 // Add all guests other than the requesting user's own connections as collaborators
665 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &project.guests {
666 if receipts_with_replica_ids
667 .iter()
668 .all(|(receipt, _)| receipt.sender_id != *peer_conn_id)
669 {
670 collaborators.push(proto::Collaborator {
671 peer_id: peer_conn_id.0,
672 replica_id: *peer_replica_id as u32,
673 user_id: peer_user_id.to_proto(),
674 });
675 }
676 }
677
678 for conn_id in project.connection_ids() {
679 for (receipt, replica_id) in &receipts_with_replica_ids {
680 if conn_id != receipt.sender_id {
681 self.peer.send(
682 conn_id,
683 proto::AddProjectCollaborator {
684 project_id,
685 collaborator: Some(proto::Collaborator {
686 peer_id: receipt.sender_id.0,
687 replica_id: *replica_id as u32,
688 user_id: guest_user_id.to_proto(),
689 }),
690 },
691 )?;
692 }
693 }
694 }
695
696 for (receipt, replica_id) in receipts_with_replica_ids {
697 self.peer.respond(
698 receipt,
699 proto::JoinProjectResponse {
700 variant: Some(proto::join_project_response::Variant::Accept(
701 proto::join_project_response::Accept {
702 worktrees: worktrees.clone(),
703 replica_id: replica_id as u32,
704 collaborators: collaborators.clone(),
705 language_servers: project.language_servers.clone(),
706 },
707 )),
708 },
709 )?;
710 }
711 }
712
713 self.update_user_contacts(host_user_id).await?;
714 Ok(())
715 }
716
717 async fn leave_project(
718 self: Arc<Server>,
719 request: TypedEnvelope<proto::LeaveProject>,
720 ) -> Result<()> {
721 let sender_id = request.sender_id;
722 let project_id = request.payload.project_id;
723 let project;
724 {
725 let mut store = self.store_mut().await;
726 project = store.leave_project(sender_id, project_id)?;
727
728 if project.remove_collaborator {
729 broadcast(sender_id, project.connection_ids, |conn_id| {
730 self.peer.send(
731 conn_id,
732 proto::RemoveProjectCollaborator {
733 project_id,
734 peer_id: sender_id.0,
735 },
736 )
737 });
738 }
739
740 if let Some(requester_id) = project.cancel_request {
741 self.peer.send(
742 project.host_connection_id,
743 proto::JoinProjectRequestCancelled {
744 project_id,
745 requester_id: requester_id.to_proto(),
746 },
747 )?;
748 }
749
750 if project.unshare {
751 self.peer.send(
752 project.host_connection_id,
753 proto::ProjectUnshared { project_id },
754 )?;
755 }
756 }
757 self.update_user_contacts(project.host_user_id).await?;
758 Ok(())
759 }
760
761 async fn register_worktree(
762 self: Arc<Server>,
763 request: TypedEnvelope<proto::RegisterWorktree>,
764 response: Response<proto::RegisterWorktree>,
765 ) -> Result<()> {
766 let host_user_id;
767 {
768 let mut state = self.store_mut().await;
769 host_user_id = state.user_id_for_connection(request.sender_id)?;
770
771 let guest_connection_ids = state
772 .read_project(request.payload.project_id, request.sender_id)?
773 .guest_connection_ids();
774 state.register_worktree(
775 request.payload.project_id,
776 request.payload.worktree_id,
777 request.sender_id,
778 Worktree {
779 root_name: request.payload.root_name.clone(),
780 visible: request.payload.visible,
781 ..Default::default()
782 },
783 )?;
784
785 broadcast(request.sender_id, guest_connection_ids, |connection_id| {
786 self.peer
787 .forward_send(request.sender_id, connection_id, request.payload.clone())
788 });
789 }
790 self.update_user_contacts(host_user_id).await?;
791 response.send(proto::Ack {})?;
792 Ok(())
793 }
794
795 async fn unregister_worktree(
796 self: Arc<Server>,
797 request: TypedEnvelope<proto::UnregisterWorktree>,
798 ) -> Result<()> {
799 let host_user_id;
800 let project_id = request.payload.project_id;
801 let worktree_id = request.payload.worktree_id;
802 {
803 let mut state = self.store_mut().await;
804 let (_, guest_connection_ids) =
805 state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
806 host_user_id = state.user_id_for_connection(request.sender_id)?;
807 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
808 self.peer.send(
809 conn_id,
810 proto::UnregisterWorktree {
811 project_id,
812 worktree_id,
813 },
814 )
815 });
816 }
817 self.update_user_contacts(host_user_id).await?;
818 Ok(())
819 }
820
821 async fn update_worktree(
822 self: Arc<Server>,
823 request: TypedEnvelope<proto::UpdateWorktree>,
824 response: Response<proto::UpdateWorktree>,
825 ) -> Result<()> {
826 let connection_ids = self.store_mut().await.update_worktree(
827 request.sender_id,
828 request.payload.project_id,
829 request.payload.worktree_id,
830 &request.payload.removed_entries,
831 &request.payload.updated_entries,
832 request.payload.scan_id,
833 )?;
834
835 broadcast(request.sender_id, connection_ids, |connection_id| {
836 self.peer
837 .forward_send(request.sender_id, connection_id, request.payload.clone())
838 });
839 response.send(proto::Ack {})?;
840 Ok(())
841 }
842
843 async fn update_diagnostic_summary(
844 self: Arc<Server>,
845 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
846 ) -> Result<()> {
847 let summary = request
848 .payload
849 .summary
850 .clone()
851 .ok_or_else(|| anyhow!("invalid summary"))?;
852 let receiver_ids = self.store_mut().await.update_diagnostic_summary(
853 request.payload.project_id,
854 request.payload.worktree_id,
855 request.sender_id,
856 summary,
857 )?;
858
859 broadcast(request.sender_id, receiver_ids, |connection_id| {
860 self.peer
861 .forward_send(request.sender_id, connection_id, request.payload.clone())
862 });
863 Ok(())
864 }
865
866 async fn start_language_server(
867 self: Arc<Server>,
868 request: TypedEnvelope<proto::StartLanguageServer>,
869 ) -> Result<()> {
870 let receiver_ids = self.store_mut().await.start_language_server(
871 request.payload.project_id,
872 request.sender_id,
873 request
874 .payload
875 .server
876 .clone()
877 .ok_or_else(|| anyhow!("invalid language server"))?,
878 )?;
879 broadcast(request.sender_id, receiver_ids, |connection_id| {
880 self.peer
881 .forward_send(request.sender_id, connection_id, request.payload.clone())
882 });
883 Ok(())
884 }
885
886 async fn update_language_server(
887 self: Arc<Server>,
888 request: TypedEnvelope<proto::UpdateLanguageServer>,
889 ) -> Result<()> {
890 let receiver_ids = self
891 .store()
892 .await
893 .project_connection_ids(request.payload.project_id, request.sender_id)?;
894 broadcast(request.sender_id, receiver_ids, |connection_id| {
895 self.peer
896 .forward_send(request.sender_id, connection_id, request.payload.clone())
897 });
898 Ok(())
899 }
900
901 async fn forward_project_request<T>(
902 self: Arc<Server>,
903 request: TypedEnvelope<T>,
904 response: Response<T>,
905 ) -> Result<()>
906 where
907 T: EntityMessage + RequestMessage,
908 {
909 let host_connection_id = self
910 .store()
911 .await
912 .read_project(request.payload.remote_entity_id(), request.sender_id)?
913 .host_connection_id;
914
915 response.send(
916 self.peer
917 .forward_request(request.sender_id, host_connection_id, request.payload)
918 .await?,
919 )?;
920 Ok(())
921 }
922
923 async fn save_buffer(
924 self: Arc<Server>,
925 request: TypedEnvelope<proto::SaveBuffer>,
926 response: Response<proto::SaveBuffer>,
927 ) -> Result<()> {
928 let host = self
929 .store()
930 .await
931 .read_project(request.payload.project_id, request.sender_id)?
932 .host_connection_id;
933 let response_payload = self
934 .peer
935 .forward_request(request.sender_id, host, request.payload.clone())
936 .await?;
937
938 let mut guests = self
939 .store()
940 .await
941 .read_project(request.payload.project_id, request.sender_id)?
942 .connection_ids();
943 guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
944 broadcast(host, guests, |conn_id| {
945 self.peer
946 .forward_send(host, conn_id, response_payload.clone())
947 });
948 response.send(response_payload)?;
949 Ok(())
950 }
951
952 async fn update_buffer(
953 self: Arc<Server>,
954 request: TypedEnvelope<proto::UpdateBuffer>,
955 response: Response<proto::UpdateBuffer>,
956 ) -> Result<()> {
957 let receiver_ids = self
958 .store()
959 .await
960 .project_connection_ids(request.payload.project_id, request.sender_id)?;
961 broadcast(request.sender_id, receiver_ids, |connection_id| {
962 self.peer
963 .forward_send(request.sender_id, connection_id, request.payload.clone())
964 });
965 response.send(proto::Ack {})?;
966 Ok(())
967 }
968
969 async fn update_buffer_file(
970 self: Arc<Server>,
971 request: TypedEnvelope<proto::UpdateBufferFile>,
972 ) -> Result<()> {
973 let receiver_ids = self
974 .store()
975 .await
976 .project_connection_ids(request.payload.project_id, request.sender_id)?;
977 broadcast(request.sender_id, receiver_ids, |connection_id| {
978 self.peer
979 .forward_send(request.sender_id, connection_id, request.payload.clone())
980 });
981 Ok(())
982 }
983
984 async fn buffer_reloaded(
985 self: Arc<Server>,
986 request: TypedEnvelope<proto::BufferReloaded>,
987 ) -> Result<()> {
988 let receiver_ids = self
989 .store()
990 .await
991 .project_connection_ids(request.payload.project_id, request.sender_id)?;
992 broadcast(request.sender_id, receiver_ids, |connection_id| {
993 self.peer
994 .forward_send(request.sender_id, connection_id, request.payload.clone())
995 });
996 Ok(())
997 }
998
999 async fn buffer_saved(
1000 self: Arc<Server>,
1001 request: TypedEnvelope<proto::BufferSaved>,
1002 ) -> Result<()> {
1003 let receiver_ids = self
1004 .store()
1005 .await
1006 .project_connection_ids(request.payload.project_id, request.sender_id)?;
1007 broadcast(request.sender_id, receiver_ids, |connection_id| {
1008 self.peer
1009 .forward_send(request.sender_id, connection_id, request.payload.clone())
1010 });
1011 Ok(())
1012 }
1013
1014 async fn follow(
1015 self: Arc<Self>,
1016 request: TypedEnvelope<proto::Follow>,
1017 response: Response<proto::Follow>,
1018 ) -> Result<()> {
1019 let leader_id = ConnectionId(request.payload.leader_id);
1020 let follower_id = request.sender_id;
1021 if !self
1022 .store()
1023 .await
1024 .project_connection_ids(request.payload.project_id, follower_id)?
1025 .contains(&leader_id)
1026 {
1027 Err(anyhow!("no such peer"))?;
1028 }
1029 let mut response_payload = self
1030 .peer
1031 .forward_request(request.sender_id, leader_id, request.payload)
1032 .await?;
1033 response_payload
1034 .views
1035 .retain(|view| view.leader_id != Some(follower_id.0));
1036 response.send(response_payload)?;
1037 Ok(())
1038 }
1039
1040 async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
1041 let leader_id = ConnectionId(request.payload.leader_id);
1042 if !self
1043 .store()
1044 .await
1045 .project_connection_ids(request.payload.project_id, request.sender_id)?
1046 .contains(&leader_id)
1047 {
1048 Err(anyhow!("no such peer"))?;
1049 }
1050 self.peer
1051 .forward_send(request.sender_id, leader_id, request.payload)?;
1052 Ok(())
1053 }
1054
1055 async fn update_followers(
1056 self: Arc<Self>,
1057 request: TypedEnvelope<proto::UpdateFollowers>,
1058 ) -> Result<()> {
1059 let connection_ids = self
1060 .store()
1061 .await
1062 .project_connection_ids(request.payload.project_id, request.sender_id)?;
1063 let leader_id = request
1064 .payload
1065 .variant
1066 .as_ref()
1067 .and_then(|variant| match variant {
1068 proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
1069 proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
1070 proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
1071 });
1072 for follower_id in &request.payload.follower_ids {
1073 let follower_id = ConnectionId(*follower_id);
1074 if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
1075 self.peer
1076 .forward_send(request.sender_id, follower_id, request.payload.clone())?;
1077 }
1078 }
1079 Ok(())
1080 }
1081
1082 async fn get_channels(
1083 self: Arc<Server>,
1084 request: TypedEnvelope<proto::GetChannels>,
1085 response: Response<proto::GetChannels>,
1086 ) -> Result<()> {
1087 let user_id = self
1088 .store()
1089 .await
1090 .user_id_for_connection(request.sender_id)?;
1091 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
1092 response.send(proto::GetChannelsResponse {
1093 channels: channels
1094 .into_iter()
1095 .map(|chan| proto::Channel {
1096 id: chan.id.to_proto(),
1097 name: chan.name,
1098 })
1099 .collect(),
1100 })?;
1101 Ok(())
1102 }
1103
1104 async fn get_users(
1105 self: Arc<Server>,
1106 request: TypedEnvelope<proto::GetUsers>,
1107 response: Response<proto::GetUsers>,
1108 ) -> Result<()> {
1109 let user_ids = request
1110 .payload
1111 .user_ids
1112 .into_iter()
1113 .map(UserId::from_proto)
1114 .collect();
1115 let users = self
1116 .app_state
1117 .db
1118 .get_users_by_ids(user_ids)
1119 .await?
1120 .into_iter()
1121 .map(|user| proto::User {
1122 id: user.id.to_proto(),
1123 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1124 github_login: user.github_login,
1125 })
1126 .collect();
1127 response.send(proto::UsersResponse { users })?;
1128 Ok(())
1129 }
1130
1131 async fn fuzzy_search_users(
1132 self: Arc<Server>,
1133 request: TypedEnvelope<proto::FuzzySearchUsers>,
1134 response: Response<proto::FuzzySearchUsers>,
1135 ) -> Result<()> {
1136 let user_id = self
1137 .store()
1138 .await
1139 .user_id_for_connection(request.sender_id)?;
1140 let query = request.payload.query;
1141 let db = &self.app_state.db;
1142 let users = match query.len() {
1143 0 => vec![],
1144 1 | 2 => db
1145 .get_user_by_github_login(&query)
1146 .await?
1147 .into_iter()
1148 .collect(),
1149 _ => db.fuzzy_search_users(&query, 10).await?,
1150 };
1151 let users = users
1152 .into_iter()
1153 .filter(|user| user.id != user_id)
1154 .map(|user| proto::User {
1155 id: user.id.to_proto(),
1156 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1157 github_login: user.github_login,
1158 })
1159 .collect();
1160 response.send(proto::UsersResponse { users })?;
1161 Ok(())
1162 }
1163
1164 async fn request_contact(
1165 self: Arc<Server>,
1166 request: TypedEnvelope<proto::RequestContact>,
1167 response: Response<proto::RequestContact>,
1168 ) -> Result<()> {
1169 let requester_id = self
1170 .store()
1171 .await
1172 .user_id_for_connection(request.sender_id)?;
1173 let responder_id = UserId::from_proto(request.payload.responder_id);
1174 if requester_id == responder_id {
1175 return Err(anyhow!("cannot add yourself as a contact"))?;
1176 }
1177
1178 self.app_state
1179 .db
1180 .send_contact_request(requester_id, responder_id)
1181 .await?;
1182
1183 // Update outgoing contact requests of requester
1184 let mut update = proto::UpdateContacts::default();
1185 update.outgoing_requests.push(responder_id.to_proto());
1186 for connection_id in self.store().await.connection_ids_for_user(requester_id) {
1187 self.peer.send(connection_id, update.clone())?;
1188 }
1189
1190 // Update incoming contact requests of responder
1191 let mut update = proto::UpdateContacts::default();
1192 update
1193 .incoming_requests
1194 .push(proto::IncomingContactRequest {
1195 requester_id: requester_id.to_proto(),
1196 should_notify: true,
1197 });
1198 for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1199 self.peer.send(connection_id, update.clone())?;
1200 }
1201
1202 response.send(proto::Ack {})?;
1203 Ok(())
1204 }
1205
1206 async fn respond_to_contact_request(
1207 self: Arc<Server>,
1208 request: TypedEnvelope<proto::RespondToContactRequest>,
1209 response: Response<proto::RespondToContactRequest>,
1210 ) -> Result<()> {
1211 let responder_id = self
1212 .store()
1213 .await
1214 .user_id_for_connection(request.sender_id)?;
1215 let requester_id = UserId::from_proto(request.payload.requester_id);
1216 if request.payload.response == proto::ContactRequestResponse::Dismiss as i32 {
1217 self.app_state
1218 .db
1219 .dismiss_contact_notification(responder_id, requester_id)
1220 .await?;
1221 } else {
1222 let accept = request.payload.response == proto::ContactRequestResponse::Accept as i32;
1223 self.app_state
1224 .db
1225 .respond_to_contact_request(responder_id, requester_id, accept)
1226 .await?;
1227
1228 let store = self.store().await;
1229 // Update responder with new contact
1230 let mut update = proto::UpdateContacts::default();
1231 if accept {
1232 update
1233 .contacts
1234 .push(store.contact_for_user(requester_id, false));
1235 }
1236 update
1237 .remove_incoming_requests
1238 .push(requester_id.to_proto());
1239 for connection_id in store.connection_ids_for_user(responder_id) {
1240 self.peer.send(connection_id, update.clone())?;
1241 }
1242
1243 // Update requester with new contact
1244 let mut update = proto::UpdateContacts::default();
1245 if accept {
1246 update
1247 .contacts
1248 .push(store.contact_for_user(responder_id, true));
1249 }
1250 update
1251 .remove_outgoing_requests
1252 .push(responder_id.to_proto());
1253 for connection_id in store.connection_ids_for_user(requester_id) {
1254 self.peer.send(connection_id, update.clone())?;
1255 }
1256 }
1257
1258 response.send(proto::Ack {})?;
1259 Ok(())
1260 }
1261
1262 async fn remove_contact(
1263 self: Arc<Server>,
1264 request: TypedEnvelope<proto::RemoveContact>,
1265 response: Response<proto::RemoveContact>,
1266 ) -> Result<()> {
1267 let requester_id = self
1268 .store()
1269 .await
1270 .user_id_for_connection(request.sender_id)?;
1271 let responder_id = UserId::from_proto(request.payload.user_id);
1272 self.app_state
1273 .db
1274 .remove_contact(requester_id, responder_id)
1275 .await?;
1276
1277 // Update outgoing contact requests of requester
1278 let mut update = proto::UpdateContacts::default();
1279 update
1280 .remove_outgoing_requests
1281 .push(responder_id.to_proto());
1282 for connection_id in self.store().await.connection_ids_for_user(requester_id) {
1283 self.peer.send(connection_id, update.clone())?;
1284 }
1285
1286 // Update incoming contact requests of responder
1287 let mut update = proto::UpdateContacts::default();
1288 update
1289 .remove_incoming_requests
1290 .push(requester_id.to_proto());
1291 for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1292 self.peer.send(connection_id, update.clone())?;
1293 }
1294
1295 response.send(proto::Ack {})?;
1296 Ok(())
1297 }
1298
1299 async fn join_channel(
1300 self: Arc<Self>,
1301 request: TypedEnvelope<proto::JoinChannel>,
1302 response: Response<proto::JoinChannel>,
1303 ) -> Result<()> {
1304 let user_id = self
1305 .store()
1306 .await
1307 .user_id_for_connection(request.sender_id)?;
1308 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1309 if !self
1310 .app_state
1311 .db
1312 .can_user_access_channel(user_id, channel_id)
1313 .await?
1314 {
1315 Err(anyhow!("access denied"))?;
1316 }
1317
1318 self.store_mut()
1319 .await
1320 .join_channel(request.sender_id, channel_id);
1321 let messages = self
1322 .app_state
1323 .db
1324 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
1325 .await?
1326 .into_iter()
1327 .map(|msg| proto::ChannelMessage {
1328 id: msg.id.to_proto(),
1329 body: msg.body,
1330 timestamp: msg.sent_at.unix_timestamp() as u64,
1331 sender_id: msg.sender_id.to_proto(),
1332 nonce: Some(msg.nonce.as_u128().into()),
1333 })
1334 .collect::<Vec<_>>();
1335 response.send(proto::JoinChannelResponse {
1336 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1337 messages,
1338 })?;
1339 Ok(())
1340 }
1341
1342 async fn leave_channel(
1343 self: Arc<Self>,
1344 request: TypedEnvelope<proto::LeaveChannel>,
1345 ) -> Result<()> {
1346 let user_id = self
1347 .store()
1348 .await
1349 .user_id_for_connection(request.sender_id)?;
1350 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1351 if !self
1352 .app_state
1353 .db
1354 .can_user_access_channel(user_id, channel_id)
1355 .await?
1356 {
1357 Err(anyhow!("access denied"))?;
1358 }
1359
1360 self.store_mut()
1361 .await
1362 .leave_channel(request.sender_id, channel_id);
1363
1364 Ok(())
1365 }
1366
1367 async fn send_channel_message(
1368 self: Arc<Self>,
1369 request: TypedEnvelope<proto::SendChannelMessage>,
1370 response: Response<proto::SendChannelMessage>,
1371 ) -> Result<()> {
1372 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1373 let user_id;
1374 let connection_ids;
1375 {
1376 let state = self.store().await;
1377 user_id = state.user_id_for_connection(request.sender_id)?;
1378 connection_ids = state.channel_connection_ids(channel_id)?;
1379 }
1380
1381 // Validate the message body.
1382 let body = request.payload.body.trim().to_string();
1383 if body.len() > MAX_MESSAGE_LEN {
1384 return Err(anyhow!("message is too long"))?;
1385 }
1386 if body.is_empty() {
1387 return Err(anyhow!("message can't be blank"))?;
1388 }
1389
1390 let timestamp = OffsetDateTime::now_utc();
1391 let nonce = request
1392 .payload
1393 .nonce
1394 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
1395
1396 let message_id = self
1397 .app_state
1398 .db
1399 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1400 .await?
1401 .to_proto();
1402 let message = proto::ChannelMessage {
1403 sender_id: user_id.to_proto(),
1404 id: message_id,
1405 body,
1406 timestamp: timestamp.unix_timestamp() as u64,
1407 nonce: Some(nonce),
1408 };
1409 broadcast(request.sender_id, connection_ids, |conn_id| {
1410 self.peer.send(
1411 conn_id,
1412 proto::ChannelMessageSent {
1413 channel_id: channel_id.to_proto(),
1414 message: Some(message.clone()),
1415 },
1416 )
1417 });
1418 response.send(proto::SendChannelMessageResponse {
1419 message: Some(message),
1420 })?;
1421 Ok(())
1422 }
1423
1424 async fn get_channel_messages(
1425 self: Arc<Self>,
1426 request: TypedEnvelope<proto::GetChannelMessages>,
1427 response: Response<proto::GetChannelMessages>,
1428 ) -> Result<()> {
1429 let user_id = self
1430 .store()
1431 .await
1432 .user_id_for_connection(request.sender_id)?;
1433 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1434 if !self
1435 .app_state
1436 .db
1437 .can_user_access_channel(user_id, channel_id)
1438 .await?
1439 {
1440 Err(anyhow!("access denied"))?;
1441 }
1442
1443 let messages = self
1444 .app_state
1445 .db
1446 .get_channel_messages(
1447 channel_id,
1448 MESSAGE_COUNT_PER_PAGE,
1449 Some(MessageId::from_proto(request.payload.before_message_id)),
1450 )
1451 .await?
1452 .into_iter()
1453 .map(|msg| proto::ChannelMessage {
1454 id: msg.id.to_proto(),
1455 body: msg.body,
1456 timestamp: msg.sent_at.unix_timestamp() as u64,
1457 sender_id: msg.sender_id.to_proto(),
1458 nonce: Some(msg.nonce.as_u128().into()),
1459 })
1460 .collect::<Vec<_>>();
1461 response.send(proto::GetChannelMessagesResponse {
1462 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1463 messages,
1464 })?;
1465 Ok(())
1466 }
1467
1468 async fn store<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1469 #[cfg(test)]
1470 tokio::task::yield_now().await;
1471 let guard = self.store.read().await;
1472 #[cfg(test)]
1473 tokio::task::yield_now().await;
1474 StoreReadGuard {
1475 guard,
1476 _not_send: PhantomData,
1477 }
1478 }
1479
1480 async fn store_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1481 #[cfg(test)]
1482 tokio::task::yield_now().await;
1483 let guard = self.store.write().await;
1484 #[cfg(test)]
1485 tokio::task::yield_now().await;
1486 StoreWriteGuard {
1487 guard,
1488 _not_send: PhantomData,
1489 }
1490 }
1491
1492 pub async fn snapshot<'a>(self: &'a Arc<Self>) -> ServerSnapshot<'a> {
1493 ServerSnapshot {
1494 store: self.store.read().await,
1495 peer: &self.peer
1496 }
1497 }
1498}
1499
1500impl<'a> Deref for StoreReadGuard<'a> {
1501 type Target = Store;
1502
1503 fn deref(&self) -> &Self::Target {
1504 &*self.guard
1505 }
1506}
1507
1508impl<'a> Deref for StoreWriteGuard<'a> {
1509 type Target = Store;
1510
1511 fn deref(&self) -> &Self::Target {
1512 &*self.guard
1513 }
1514}
1515
1516impl<'a> DerefMut for StoreWriteGuard<'a> {
1517 fn deref_mut(&mut self) -> &mut Self::Target {
1518 &mut *self.guard
1519 }
1520}
1521
1522impl<'a> Drop for StoreWriteGuard<'a> {
1523 fn drop(&mut self) {
1524 #[cfg(test)]
1525 self.check_invariants();
1526
1527 let metrics = self.metrics();
1528 tracing::info!(
1529 connections = metrics.connections,
1530 registered_projects = metrics.registered_projects,
1531 shared_projects = metrics.shared_projects,
1532 "metrics"
1533 );
1534 }
1535}
1536
1537impl Executor for RealExecutor {
1538 type Sleep = Sleep;
1539
1540 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1541 tokio::task::spawn(future);
1542 }
1543
1544 fn sleep(&self, duration: Duration) -> Self::Sleep {
1545 tokio::time::sleep(duration)
1546 }
1547}
1548
1549fn broadcast<F>(
1550 sender_id: ConnectionId,
1551 receiver_ids: impl IntoIterator<Item = ConnectionId>,
1552 mut f: F,
1553) where
1554 F: FnMut(ConnectionId) -> anyhow::Result<()>,
1555{
1556 for receiver_id in receiver_ids {
1557 if receiver_id != sender_id {
1558 f(receiver_id).trace_err();
1559 }
1560 }
1561}
1562
1563lazy_static! {
1564 static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1565}
1566
1567pub struct ProtocolVersion(u32);
1568
1569impl Header for ProtocolVersion {
1570 fn name() -> &'static HeaderName {
1571 &ZED_PROTOCOL_VERSION
1572 }
1573
1574 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1575 where
1576 Self: Sized,
1577 I: Iterator<Item = &'i axum::http::HeaderValue>,
1578 {
1579 let version = values
1580 .next()
1581 .ok_or_else(|| axum::headers::Error::invalid())?
1582 .to_str()
1583 .map_err(|_| axum::headers::Error::invalid())?
1584 .parse()
1585 .map_err(|_| axum::headers::Error::invalid())?;
1586 Ok(Self(version))
1587 }
1588
1589 fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1590 values.extend([self.0.to_string().parse().unwrap()]);
1591 }
1592}
1593
1594pub fn routes(server: Arc<Server>) -> Router<Body> {
1595 Router::new()
1596 .route("/rpc", get(handle_websocket_request))
1597 .layer(
1598 ServiceBuilder::new()
1599 .layer(Extension(server.app_state.clone()))
1600 .layer(middleware::from_fn(auth::validate_header))
1601 .layer(Extension(server)),
1602 )
1603}
1604
1605pub async fn handle_websocket_request(
1606 TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1607 ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1608 Extension(server): Extension<Arc<Server>>,
1609 Extension(user): Extension<User>,
1610 ws: WebSocketUpgrade,
1611) -> axum::response::Response {
1612 if protocol_version != rpc::PROTOCOL_VERSION {
1613 return (
1614 StatusCode::UPGRADE_REQUIRED,
1615 "client must be upgraded".to_string(),
1616 )
1617 .into_response();
1618 }
1619 let socket_address = socket_address.to_string();
1620 ws.on_upgrade(move |socket| {
1621 use util::ResultExt;
1622 let socket = socket
1623 .map_ok(to_tungstenite_message)
1624 .err_into()
1625 .with(|message| async move { Ok(to_axum_message(message)) });
1626 let connection = Connection::new(Box::pin(socket));
1627 async move {
1628 server
1629 .handle_connection(connection, socket_address, user, None, RealExecutor)
1630 .await
1631 .log_err();
1632 }
1633 })
1634}
1635
1636fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1637 match message {
1638 TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1639 TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1640 TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1641 TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1642 TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1643 code: frame.code.into(),
1644 reason: frame.reason,
1645 })),
1646 }
1647}
1648
1649fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1650 match message {
1651 AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1652 AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1653 AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1654 AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1655 AxumMessage::Close(frame) => {
1656 TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1657 code: frame.code.into(),
1658 reason: frame.reason,
1659 }))
1660 }
1661 }
1662}
1663
1664pub trait ResultExt {
1665 type Ok;
1666
1667 fn trace_err(self) -> Option<Self::Ok>;
1668}
1669
1670impl<T, E> ResultExt for Result<T, E>
1671where
1672 E: std::fmt::Debug,
1673{
1674 type Ok = T;
1675
1676 fn trace_err(self) -> Option<T> {
1677 match self {
1678 Ok(value) => Some(value),
1679 Err(error) => {
1680 tracing::error!("{:?}", error);
1681 None
1682 }
1683 }
1684 }
1685}
1686
1687#[cfg(test)]
1688mod tests {
1689 use super::*;
1690 use crate::{
1691 db::{tests::TestDb, UserId},
1692 AppState,
1693 };
1694 use ::rpc::Peer;
1695 use client::{
1696 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1697 EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1698 };
1699 use collections::{BTreeMap, HashSet};
1700 use editor::{
1701 self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1702 ToOffset, ToggleCodeActions, Undo,
1703 };
1704 use gpui::{
1705 executor::{self, Deterministic},
1706 geometry::vector::vec2f,
1707 ModelHandle, Task, TestAppContext, ViewHandle,
1708 };
1709 use language::{
1710 range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1711 LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1712 };
1713 use lsp::{self, FakeLanguageServer};
1714 use parking_lot::Mutex;
1715 use project::{
1716 fs::{FakeFs, Fs as _},
1717 search::SearchQuery,
1718 worktree::WorktreeHandle,
1719 DiagnosticSummary, Project, ProjectPath, WorktreeId,
1720 };
1721 use rand::prelude::*;
1722 use rpc::PeerId;
1723 use serde_json::json;
1724 use settings::Settings;
1725 use sqlx::types::time::OffsetDateTime;
1726 use std::{
1727 cell::RefCell,
1728 env,
1729 ops::Deref,
1730 path::{Path, PathBuf},
1731 rc::Rc,
1732 sync::{
1733 atomic::{AtomicBool, Ordering::SeqCst},
1734 Arc,
1735 },
1736 time::Duration,
1737 };
1738 use theme::ThemeRegistry;
1739 use workspace::{Item, SplitDirection, ToggleFollow, Workspace};
1740
1741 #[cfg(test)]
1742 #[ctor::ctor]
1743 fn init_logger() {
1744 if std::env::var("RUST_LOG").is_ok() {
1745 env_logger::init();
1746 }
1747 }
1748
1749 #[gpui::test(iterations = 10)]
1750 async fn test_share_project(
1751 deterministic: Arc<Deterministic>,
1752 cx_a: &mut TestAppContext,
1753 cx_b: &mut TestAppContext,
1754 cx_b2: &mut TestAppContext,
1755 ) {
1756 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1757 let lang_registry = Arc::new(LanguageRegistry::test());
1758 let fs = FakeFs::new(cx_a.background());
1759 cx_a.foreground().forbid_parking();
1760
1761 // Connect to a server as 2 clients.
1762 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1763 let client_a = server.create_client(cx_a, "user_a").await;
1764 let mut client_b = server.create_client(cx_b, "user_b").await;
1765 server
1766 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1767 .await;
1768
1769 // Share a project as client A
1770 fs.insert_tree(
1771 "/a",
1772 json!({
1773 ".gitignore": "ignored-dir",
1774 "a.txt": "a-contents",
1775 "b.txt": "b-contents",
1776 "ignored-dir": {
1777 "c.txt": "",
1778 "d.txt": "",
1779 }
1780 }),
1781 )
1782 .await;
1783 let project_a = cx_a.update(|cx| {
1784 Project::local(
1785 client_a.clone(),
1786 client_a.user_store.clone(),
1787 lang_registry.clone(),
1788 fs.clone(),
1789 cx,
1790 )
1791 });
1792 let project_id = project_a
1793 .read_with(cx_a, |project, _| project.next_remote_id())
1794 .await;
1795 let (worktree_a, _) = project_a
1796 .update(cx_a, |p, cx| {
1797 p.find_or_create_local_worktree("/a", true, cx)
1798 })
1799 .await
1800 .unwrap();
1801 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1802 worktree_a
1803 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1804 .await;
1805
1806 // Join that project as client B
1807 let client_b_peer_id = client_b.peer_id;
1808 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1809 let replica_id_b = project_b.read_with(cx_b, |project, _| {
1810 assert_eq!(
1811 project
1812 .collaborators()
1813 .get(&client_a.peer_id)
1814 .unwrap()
1815 .user
1816 .github_login,
1817 "user_a"
1818 );
1819 project.replica_id()
1820 });
1821
1822 deterministic.run_until_parked();
1823 project_a.read_with(cx_a, |project, _| {
1824 let client_b_collaborator = project.collaborators().get(&client_b_peer_id).unwrap();
1825 assert_eq!(client_b_collaborator.replica_id, replica_id_b);
1826 assert_eq!(client_b_collaborator.user.github_login, "user_b");
1827 });
1828 project_b.read_with(cx_b, |project, cx| {
1829 let worktree = project.worktrees(cx).next().unwrap().read(cx);
1830 assert_eq!(
1831 worktree.paths().map(AsRef::as_ref).collect::<Vec<_>>(),
1832 [
1833 Path::new(".gitignore"),
1834 Path::new("a.txt"),
1835 Path::new("b.txt"),
1836 Path::new("ignored-dir"),
1837 Path::new("ignored-dir/c.txt"),
1838 Path::new("ignored-dir/d.txt"),
1839 ]
1840 );
1841 });
1842
1843 // Open the same file as client B and client A.
1844 let buffer_b = project_b
1845 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1846 .await
1847 .unwrap();
1848 buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1849 project_a.read_with(cx_a, |project, cx| {
1850 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1851 });
1852 let buffer_a = project_a
1853 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1854 .await
1855 .unwrap();
1856
1857 let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1858
1859 // TODO
1860 // // Create a selection set as client B and see that selection set as client A.
1861 // buffer_a
1862 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1863 // .await;
1864
1865 // Edit the buffer as client B and see that edit as client A.
1866 editor_b.update(cx_b, |editor, cx| {
1867 editor.handle_input(&Input("ok, ".into()), cx)
1868 });
1869 buffer_a
1870 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1871 .await;
1872
1873 // TODO
1874 // // Remove the selection set as client B, see those selections disappear as client A.
1875 cx_b.update(move |_| drop(editor_b));
1876 // buffer_a
1877 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1878 // .await;
1879
1880 // Client B can join again on a different window because they are already a participant.
1881 let client_b2 = server.create_client(cx_b2, "user_b").await;
1882 let project_b2 = Project::remote(
1883 project_id,
1884 client_b2.client.clone(),
1885 client_b2.user_store.clone(),
1886 lang_registry.clone(),
1887 FakeFs::new(cx_b2.background()),
1888 &mut cx_b2.to_async(),
1889 )
1890 .await
1891 .unwrap();
1892 deterministic.run_until_parked();
1893 project_a.read_with(cx_a, |project, _| {
1894 assert_eq!(project.collaborators().len(), 2);
1895 });
1896 project_b.read_with(cx_b, |project, _| {
1897 assert_eq!(project.collaborators().len(), 2);
1898 });
1899 project_b2.read_with(cx_b2, |project, _| {
1900 assert_eq!(project.collaborators().len(), 2);
1901 });
1902
1903 // Dropping client B's first project removes only that from client A's collaborators.
1904 cx_b.update(move |_| {
1905 drop(client_b.project.take());
1906 drop(project_b);
1907 });
1908 deterministic.run_until_parked();
1909 project_a.read_with(cx_a, |project, _| {
1910 assert_eq!(project.collaborators().len(), 1);
1911 });
1912 project_b2.read_with(cx_b2, |project, _| {
1913 assert_eq!(project.collaborators().len(), 1);
1914 });
1915 }
1916
1917 #[gpui::test(iterations = 10)]
1918 async fn test_unshare_project(
1919 deterministic: Arc<Deterministic>,
1920 cx_a: &mut TestAppContext,
1921 cx_b: &mut TestAppContext,
1922 ) {
1923 let lang_registry = Arc::new(LanguageRegistry::test());
1924 let fs = FakeFs::new(cx_a.background());
1925 cx_a.foreground().forbid_parking();
1926
1927 // Connect to a server as 2 clients.
1928 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1929 let client_a = server.create_client(cx_a, "user_a").await;
1930 let mut client_b = server.create_client(cx_b, "user_b").await;
1931 server
1932 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1933 .await;
1934
1935 // Share a project as client A
1936 fs.insert_tree(
1937 "/a",
1938 json!({
1939 "a.txt": "a-contents",
1940 "b.txt": "b-contents",
1941 }),
1942 )
1943 .await;
1944 let project_a = cx_a.update(|cx| {
1945 Project::local(
1946 client_a.clone(),
1947 client_a.user_store.clone(),
1948 lang_registry.clone(),
1949 fs.clone(),
1950 cx,
1951 )
1952 });
1953 let (worktree_a, _) = project_a
1954 .update(cx_a, |p, cx| {
1955 p.find_or_create_local_worktree("/a", true, cx)
1956 })
1957 .await
1958 .unwrap();
1959 worktree_a
1960 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1961 .await;
1962 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1963
1964 // Join that project as client B
1965 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1966 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1967 project_b
1968 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1969 .await
1970 .unwrap();
1971
1972 // When client B leaves the project, it gets automatically unshared.
1973 cx_b.update(|_| {
1974 drop(client_b.project.take());
1975 drop(project_b);
1976 });
1977 deterministic.run_until_parked();
1978 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1979
1980 // When client B joins again, the project gets re-shared.
1981 let project_b2 = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1982 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1983 project_b2
1984 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1985 .await
1986 .unwrap();
1987
1988 // When client A (the host) leaves, the project gets unshared and guests are notified.
1989 cx_a.update(|_| drop(project_a));
1990 deterministic.run_until_parked();
1991 project_b2.read_with(cx_b, |project, _| {
1992 assert!(project.is_read_only());
1993 assert!(project.collaborators().is_empty());
1994 });
1995 }
1996
1997 #[gpui::test(iterations = 10)]
1998 async fn test_host_disconnect(
1999 deterministic: Arc<Deterministic>,
2000 cx_a: &mut TestAppContext,
2001 cx_b: &mut TestAppContext,
2002 cx_c: &mut TestAppContext,
2003 ) {
2004 let lang_registry = Arc::new(LanguageRegistry::test());
2005 let fs = FakeFs::new(cx_a.background());
2006 cx_a.foreground().forbid_parking();
2007
2008 // Connect to a server as 3 clients.
2009 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2010 let client_a = server.create_client(cx_a, "user_a").await;
2011 let mut client_b = server.create_client(cx_b, "user_b").await;
2012 let client_c = server.create_client(cx_c, "user_c").await;
2013 server
2014 .make_contacts(vec![
2015 (&client_a, cx_a),
2016 (&client_b, cx_b),
2017 (&client_c, cx_c),
2018 ])
2019 .await;
2020
2021 // Share a project as client A
2022 fs.insert_tree(
2023 "/a",
2024 json!({
2025 "a.txt": "a-contents",
2026 "b.txt": "b-contents",
2027 }),
2028 )
2029 .await;
2030 let project_a = cx_a.update(|cx| {
2031 Project::local(
2032 client_a.clone(),
2033 client_a.user_store.clone(),
2034 lang_registry.clone(),
2035 fs.clone(),
2036 cx,
2037 )
2038 });
2039 let project_id = project_a
2040 .read_with(cx_a, |project, _| project.next_remote_id())
2041 .await;
2042 let (worktree_a, _) = project_a
2043 .update(cx_a, |p, cx| {
2044 p.find_or_create_local_worktree("/a", true, cx)
2045 })
2046 .await
2047 .unwrap();
2048 worktree_a
2049 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2050 .await;
2051 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2052
2053 // Join that project as client B
2054 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2055 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
2056 project_b
2057 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2058 .await
2059 .unwrap();
2060
2061 // Request to join that project as client C
2062 let project_c = cx_c.spawn(|mut cx| {
2063 let client = client_c.client.clone();
2064 let user_store = client_c.user_store.clone();
2065 let lang_registry = lang_registry.clone();
2066 async move {
2067 Project::remote(
2068 project_id,
2069 client,
2070 user_store,
2071 lang_registry.clone(),
2072 FakeFs::new(cx.background()),
2073 &mut cx,
2074 )
2075 .await
2076 }
2077 });
2078 deterministic.run_until_parked();
2079
2080 // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
2081 server.disconnect_client(client_a.current_user_id(cx_a));
2082 cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
2083 project_a
2084 .condition(cx_a, |project, _| project.collaborators().is_empty())
2085 .await;
2086 project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
2087 project_b
2088 .condition(cx_b, |project, _| project.is_read_only())
2089 .await;
2090 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
2091 cx_b.update(|_| {
2092 drop(project_b);
2093 });
2094 assert!(matches!(
2095 project_c.await.unwrap_err(),
2096 project::JoinProjectError::HostWentOffline
2097 ));
2098
2099 // Ensure guests can still join.
2100 let project_b2 = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2101 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
2102 project_b2
2103 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2104 .await
2105 .unwrap();
2106 }
2107
2108 #[gpui::test(iterations = 10)]
2109 async fn test_decline_join_request(
2110 deterministic: Arc<Deterministic>,
2111 cx_a: &mut TestAppContext,
2112 cx_b: &mut TestAppContext,
2113 ) {
2114 let lang_registry = Arc::new(LanguageRegistry::test());
2115 let fs = FakeFs::new(cx_a.background());
2116 cx_a.foreground().forbid_parking();
2117
2118 // Connect to a server as 2 clients.
2119 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2120 let client_a = server.create_client(cx_a, "user_a").await;
2121 let client_b = server.create_client(cx_b, "user_b").await;
2122 server
2123 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2124 .await;
2125
2126 // Share a project as client A
2127 fs.insert_tree("/a", json!({})).await;
2128 let project_a = cx_a.update(|cx| {
2129 Project::local(
2130 client_a.clone(),
2131 client_a.user_store.clone(),
2132 lang_registry.clone(),
2133 fs.clone(),
2134 cx,
2135 )
2136 });
2137 let project_id = project_a
2138 .read_with(cx_a, |project, _| project.next_remote_id())
2139 .await;
2140 let (worktree_a, _) = project_a
2141 .update(cx_a, |p, cx| {
2142 p.find_or_create_local_worktree("/a", true, cx)
2143 })
2144 .await
2145 .unwrap();
2146 worktree_a
2147 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2148 .await;
2149
2150 // Request to join that project as client B
2151 let project_b = cx_b.spawn(|mut cx| {
2152 let client = client_b.client.clone();
2153 let user_store = client_b.user_store.clone();
2154 let lang_registry = lang_registry.clone();
2155 async move {
2156 Project::remote(
2157 project_id,
2158 client,
2159 user_store,
2160 lang_registry.clone(),
2161 FakeFs::new(cx.background()),
2162 &mut cx,
2163 )
2164 .await
2165 }
2166 });
2167 deterministic.run_until_parked();
2168 project_a.update(cx_a, |project, cx| {
2169 project.respond_to_join_request(client_b.user_id().unwrap(), false, cx)
2170 });
2171 assert!(matches!(
2172 project_b.await.unwrap_err(),
2173 project::JoinProjectError::HostDeclined
2174 ));
2175
2176 // Request to join the project again as client B
2177 let project_b = cx_b.spawn(|mut cx| {
2178 let client = client_b.client.clone();
2179 let user_store = client_b.user_store.clone();
2180 let lang_registry = lang_registry.clone();
2181 async move {
2182 Project::remote(
2183 project_id,
2184 client,
2185 user_store,
2186 lang_registry.clone(),
2187 FakeFs::new(cx.background()),
2188 &mut cx,
2189 )
2190 .await
2191 }
2192 });
2193
2194 // Close the project on the host
2195 deterministic.run_until_parked();
2196 cx_a.update(|_| drop(project_a));
2197 deterministic.run_until_parked();
2198 assert!(matches!(
2199 project_b.await.unwrap_err(),
2200 project::JoinProjectError::HostClosedProject
2201 ));
2202 }
2203
2204 #[gpui::test(iterations = 10)]
2205 async fn test_cancel_join_request(
2206 deterministic: Arc<Deterministic>,
2207 cx_a: &mut TestAppContext,
2208 cx_b: &mut TestAppContext,
2209 ) {
2210 let lang_registry = Arc::new(LanguageRegistry::test());
2211 let fs = FakeFs::new(cx_a.background());
2212 cx_a.foreground().forbid_parking();
2213
2214 // Connect to a server as 2 clients.
2215 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2216 let client_a = server.create_client(cx_a, "user_a").await;
2217 let client_b = server.create_client(cx_b, "user_b").await;
2218 server
2219 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2220 .await;
2221
2222 // Share a project as client A
2223 fs.insert_tree("/a", json!({})).await;
2224 let project_a = cx_a.update(|cx| {
2225 Project::local(
2226 client_a.clone(),
2227 client_a.user_store.clone(),
2228 lang_registry.clone(),
2229 fs.clone(),
2230 cx,
2231 )
2232 });
2233 let project_id = project_a
2234 .read_with(cx_a, |project, _| project.next_remote_id())
2235 .await;
2236
2237 let user_b = client_a
2238 .user_store
2239 .update(cx_a, |store, cx| {
2240 store.fetch_user(client_b.user_id().unwrap(), cx)
2241 })
2242 .await
2243 .unwrap();
2244
2245 let (worktree_a, _) = project_a
2246 .update(cx_a, |p, cx| {
2247 p.find_or_create_local_worktree("/a", true, cx)
2248 })
2249 .await
2250 .unwrap();
2251 worktree_a
2252 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2253 .await;
2254
2255 let project_a_events = Rc::new(RefCell::new(Vec::new()));
2256 project_a.update(cx_a, {
2257 let project_a_events = project_a_events.clone();
2258 move |_, cx| {
2259 cx.subscribe(&cx.handle(), move |_, _, event, _| {
2260 project_a_events.borrow_mut().push(event.clone());
2261 })
2262 .detach();
2263 }
2264 });
2265
2266 // Request to join that project as client B
2267 let project_b = cx_b.spawn(|mut cx| {
2268 let client = client_b.client.clone();
2269 let user_store = client_b.user_store.clone();
2270 let lang_registry = lang_registry.clone();
2271 async move {
2272 Project::remote(
2273 project_id,
2274 client,
2275 user_store,
2276 lang_registry.clone(),
2277 FakeFs::new(cx.background()),
2278 &mut cx,
2279 )
2280 .await
2281 }
2282 });
2283 deterministic.run_until_parked();
2284 assert_eq!(
2285 &*project_a_events.borrow(),
2286 &[project::Event::ContactRequestedJoin(user_b.clone())]
2287 );
2288 project_a_events.borrow_mut().clear();
2289
2290 // Cancel the join request by leaving the project
2291 client_b
2292 .client
2293 .send(proto::LeaveProject { project_id })
2294 .unwrap();
2295 drop(project_b);
2296
2297 deterministic.run_until_parked();
2298 assert_eq!(
2299 &*project_a_events.borrow(),
2300 &[project::Event::ContactCancelledJoinRequest(user_b.clone())]
2301 );
2302 }
2303
2304 #[gpui::test(iterations = 10)]
2305 async fn test_propagate_saves_and_fs_changes(
2306 cx_a: &mut TestAppContext,
2307 cx_b: &mut TestAppContext,
2308 cx_c: &mut TestAppContext,
2309 ) {
2310 let lang_registry = Arc::new(LanguageRegistry::test());
2311 let fs = FakeFs::new(cx_a.background());
2312 cx_a.foreground().forbid_parking();
2313
2314 // Connect to a server as 3 clients.
2315 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2316 let client_a = server.create_client(cx_a, "user_a").await;
2317 let mut client_b = server.create_client(cx_b, "user_b").await;
2318 let mut client_c = server.create_client(cx_c, "user_c").await;
2319 server
2320 .make_contacts(vec![
2321 (&client_a, cx_a),
2322 (&client_b, cx_b),
2323 (&client_c, cx_c),
2324 ])
2325 .await;
2326
2327 // Share a worktree as client A.
2328 fs.insert_tree(
2329 "/a",
2330 json!({
2331 "file1": "",
2332 "file2": ""
2333 }),
2334 )
2335 .await;
2336 let project_a = cx_a.update(|cx| {
2337 Project::local(
2338 client_a.clone(),
2339 client_a.user_store.clone(),
2340 lang_registry.clone(),
2341 fs.clone(),
2342 cx,
2343 )
2344 });
2345 let (worktree_a, _) = project_a
2346 .update(cx_a, |p, cx| {
2347 p.find_or_create_local_worktree("/a", true, cx)
2348 })
2349 .await
2350 .unwrap();
2351 worktree_a
2352 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2353 .await;
2354 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2355
2356 // Join that worktree as clients B and C.
2357 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2358 let project_c = client_c.build_remote_project(&project_a, cx_a, cx_c).await;
2359 let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2360 let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
2361
2362 // Open and edit a buffer as both guests B and C.
2363 let buffer_b = project_b
2364 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
2365 .await
2366 .unwrap();
2367 let buffer_c = project_c
2368 .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
2369 .await
2370 .unwrap();
2371 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "i-am-b, ")], cx));
2372 buffer_c.update(cx_c, |buf, cx| buf.edit([(0..0, "i-am-c, ")], cx));
2373
2374 // Open and edit that buffer as the host.
2375 let buffer_a = project_a
2376 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
2377 .await
2378 .unwrap();
2379
2380 buffer_a
2381 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
2382 .await;
2383 buffer_a.update(cx_a, |buf, cx| {
2384 buf.edit([(buf.len()..buf.len(), "i-am-a")], cx)
2385 });
2386
2387 // Wait for edits to propagate
2388 buffer_a
2389 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2390 .await;
2391 buffer_b
2392 .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2393 .await;
2394 buffer_c
2395 .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2396 .await;
2397
2398 // Edit the buffer as the host and concurrently save as guest B.
2399 let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
2400 buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "hi-a, ")], cx));
2401 save_b.await.unwrap();
2402 assert_eq!(
2403 fs.load("/a/file1".as_ref()).await.unwrap(),
2404 "hi-a, i-am-c, i-am-b, i-am-a"
2405 );
2406 buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
2407 buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
2408 buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
2409
2410 worktree_a.flush_fs_events(cx_a).await;
2411
2412 // Make changes on host's file system, see those changes on guest worktrees.
2413 fs.rename(
2414 "/a/file1".as_ref(),
2415 "/a/file1-renamed".as_ref(),
2416 Default::default(),
2417 )
2418 .await
2419 .unwrap();
2420
2421 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
2422 .await
2423 .unwrap();
2424 fs.insert_file(Path::new("/a/file4"), "4".into()).await;
2425
2426 worktree_a
2427 .condition(&cx_a, |tree, _| {
2428 tree.paths()
2429 .map(|p| p.to_string_lossy())
2430 .collect::<Vec<_>>()
2431 == ["file1-renamed", "file3", "file4"]
2432 })
2433 .await;
2434 worktree_b
2435 .condition(&cx_b, |tree, _| {
2436 tree.paths()
2437 .map(|p| p.to_string_lossy())
2438 .collect::<Vec<_>>()
2439 == ["file1-renamed", "file3", "file4"]
2440 })
2441 .await;
2442 worktree_c
2443 .condition(&cx_c, |tree, _| {
2444 tree.paths()
2445 .map(|p| p.to_string_lossy())
2446 .collect::<Vec<_>>()
2447 == ["file1-renamed", "file3", "file4"]
2448 })
2449 .await;
2450
2451 // Ensure buffer files are updated as well.
2452 buffer_a
2453 .condition(&cx_a, |buf, _| {
2454 buf.file().unwrap().path().to_str() == Some("file1-renamed")
2455 })
2456 .await;
2457 buffer_b
2458 .condition(&cx_b, |buf, _| {
2459 buf.file().unwrap().path().to_str() == Some("file1-renamed")
2460 })
2461 .await;
2462 buffer_c
2463 .condition(&cx_c, |buf, _| {
2464 buf.file().unwrap().path().to_str() == Some("file1-renamed")
2465 })
2466 .await;
2467 }
2468
2469 #[gpui::test(iterations = 10)]
2470 async fn test_fs_operations(
2471 executor: Arc<Deterministic>,
2472 cx_a: &mut TestAppContext,
2473 cx_b: &mut TestAppContext,
2474 ) {
2475 executor.forbid_parking();
2476 let fs = FakeFs::new(cx_a.background());
2477
2478 // Connect to a server as 2 clients.
2479 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2480 let mut client_a = server.create_client(cx_a, "user_a").await;
2481 let mut client_b = server.create_client(cx_b, "user_b").await;
2482 server
2483 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2484 .await;
2485
2486 // Share a project as client A
2487 fs.insert_tree(
2488 "/dir",
2489 json!({
2490 "a.txt": "a-contents",
2491 "b.txt": "b-contents",
2492 }),
2493 )
2494 .await;
2495
2496 let (project_a, worktree_id) = client_a.build_local_project(fs, "/dir", cx_a).await;
2497 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2498
2499 let worktree_a =
2500 project_a.read_with(cx_a, |project, cx| project.worktrees(cx).next().unwrap());
2501 let worktree_b =
2502 project_b.read_with(cx_b, |project, cx| project.worktrees(cx).next().unwrap());
2503
2504 let entry = project_b
2505 .update(cx_b, |project, cx| {
2506 project
2507 .create_entry((worktree_id, "c.txt"), false, cx)
2508 .unwrap()
2509 })
2510 .await
2511 .unwrap();
2512 worktree_a.read_with(cx_a, |worktree, _| {
2513 assert_eq!(
2514 worktree
2515 .paths()
2516 .map(|p| p.to_string_lossy())
2517 .collect::<Vec<_>>(),
2518 ["a.txt", "b.txt", "c.txt"]
2519 );
2520 });
2521 worktree_b.read_with(cx_b, |worktree, _| {
2522 assert_eq!(
2523 worktree
2524 .paths()
2525 .map(|p| p.to_string_lossy())
2526 .collect::<Vec<_>>(),
2527 ["a.txt", "b.txt", "c.txt"]
2528 );
2529 });
2530
2531 project_b
2532 .update(cx_b, |project, cx| {
2533 project.rename_entry(entry.id, Path::new("d.txt"), cx)
2534 })
2535 .unwrap()
2536 .await
2537 .unwrap();
2538 worktree_a.read_with(cx_a, |worktree, _| {
2539 assert_eq!(
2540 worktree
2541 .paths()
2542 .map(|p| p.to_string_lossy())
2543 .collect::<Vec<_>>(),
2544 ["a.txt", "b.txt", "d.txt"]
2545 );
2546 });
2547 worktree_b.read_with(cx_b, |worktree, _| {
2548 assert_eq!(
2549 worktree
2550 .paths()
2551 .map(|p| p.to_string_lossy())
2552 .collect::<Vec<_>>(),
2553 ["a.txt", "b.txt", "d.txt"]
2554 );
2555 });
2556
2557 let dir_entry = project_b
2558 .update(cx_b, |project, cx| {
2559 project
2560 .create_entry((worktree_id, "DIR"), true, cx)
2561 .unwrap()
2562 })
2563 .await
2564 .unwrap();
2565 worktree_a.read_with(cx_a, |worktree, _| {
2566 assert_eq!(
2567 worktree
2568 .paths()
2569 .map(|p| p.to_string_lossy())
2570 .collect::<Vec<_>>(),
2571 ["DIR", "a.txt", "b.txt", "d.txt"]
2572 );
2573 });
2574 worktree_b.read_with(cx_b, |worktree, _| {
2575 assert_eq!(
2576 worktree
2577 .paths()
2578 .map(|p| p.to_string_lossy())
2579 .collect::<Vec<_>>(),
2580 ["DIR", "a.txt", "b.txt", "d.txt"]
2581 );
2582 });
2583
2584 project_b
2585 .update(cx_b, |project, cx| {
2586 project.delete_entry(dir_entry.id, cx).unwrap()
2587 })
2588 .await
2589 .unwrap();
2590 worktree_a.read_with(cx_a, |worktree, _| {
2591 assert_eq!(
2592 worktree
2593 .paths()
2594 .map(|p| p.to_string_lossy())
2595 .collect::<Vec<_>>(),
2596 ["a.txt", "b.txt", "d.txt"]
2597 );
2598 });
2599 worktree_b.read_with(cx_b, |worktree, _| {
2600 assert_eq!(
2601 worktree
2602 .paths()
2603 .map(|p| p.to_string_lossy())
2604 .collect::<Vec<_>>(),
2605 ["a.txt", "b.txt", "d.txt"]
2606 );
2607 });
2608
2609 project_b
2610 .update(cx_b, |project, cx| {
2611 project.delete_entry(entry.id, cx).unwrap()
2612 })
2613 .await
2614 .unwrap();
2615 worktree_a.read_with(cx_a, |worktree, _| {
2616 assert_eq!(
2617 worktree
2618 .paths()
2619 .map(|p| p.to_string_lossy())
2620 .collect::<Vec<_>>(),
2621 ["a.txt", "b.txt"]
2622 );
2623 });
2624 worktree_b.read_with(cx_b, |worktree, _| {
2625 assert_eq!(
2626 worktree
2627 .paths()
2628 .map(|p| p.to_string_lossy())
2629 .collect::<Vec<_>>(),
2630 ["a.txt", "b.txt"]
2631 );
2632 });
2633 }
2634
2635 #[gpui::test(iterations = 10)]
2636 async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2637 cx_a.foreground().forbid_parking();
2638 let lang_registry = Arc::new(LanguageRegistry::test());
2639 let fs = FakeFs::new(cx_a.background());
2640
2641 // Connect to a server as 2 clients.
2642 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2643 let client_a = server.create_client(cx_a, "user_a").await;
2644 let mut client_b = server.create_client(cx_b, "user_b").await;
2645 server
2646 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2647 .await;
2648
2649 // Share a project as client A
2650 fs.insert_tree(
2651 "/dir",
2652 json!({
2653 "a.txt": "a-contents",
2654 }),
2655 )
2656 .await;
2657
2658 let project_a = cx_a.update(|cx| {
2659 Project::local(
2660 client_a.clone(),
2661 client_a.user_store.clone(),
2662 lang_registry.clone(),
2663 fs.clone(),
2664 cx,
2665 )
2666 });
2667 let (worktree_a, _) = project_a
2668 .update(cx_a, |p, cx| {
2669 p.find_or_create_local_worktree("/dir", true, cx)
2670 })
2671 .await
2672 .unwrap();
2673 worktree_a
2674 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2675 .await;
2676 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2677
2678 // Join that project as client B
2679 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2680
2681 // Open a buffer as client B
2682 let buffer_b = project_b
2683 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2684 .await
2685 .unwrap();
2686
2687 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "world ")], cx));
2688 buffer_b.read_with(cx_b, |buf, _| {
2689 assert!(buf.is_dirty());
2690 assert!(!buf.has_conflict());
2691 });
2692
2693 buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
2694 buffer_b
2695 .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
2696 .await;
2697 buffer_b.read_with(cx_b, |buf, _| {
2698 assert!(!buf.has_conflict());
2699 });
2700
2701 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "hello ")], cx));
2702 buffer_b.read_with(cx_b, |buf, _| {
2703 assert!(buf.is_dirty());
2704 assert!(!buf.has_conflict());
2705 });
2706 }
2707
2708 #[gpui::test(iterations = 10)]
2709 async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2710 cx_a.foreground().forbid_parking();
2711 let lang_registry = Arc::new(LanguageRegistry::test());
2712 let fs = FakeFs::new(cx_a.background());
2713
2714 // Connect to a server as 2 clients.
2715 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2716 let client_a = server.create_client(cx_a, "user_a").await;
2717 let mut client_b = server.create_client(cx_b, "user_b").await;
2718 server
2719 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2720 .await;
2721
2722 // Share a project as client A
2723 fs.insert_tree(
2724 "/dir",
2725 json!({
2726 "a.txt": "a-contents",
2727 }),
2728 )
2729 .await;
2730
2731 let project_a = cx_a.update(|cx| {
2732 Project::local(
2733 client_a.clone(),
2734 client_a.user_store.clone(),
2735 lang_registry.clone(),
2736 fs.clone(),
2737 cx,
2738 )
2739 });
2740 let (worktree_a, _) = project_a
2741 .update(cx_a, |p, cx| {
2742 p.find_or_create_local_worktree("/dir", true, cx)
2743 })
2744 .await
2745 .unwrap();
2746 worktree_a
2747 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2748 .await;
2749 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2750
2751 // Join that project as client B
2752 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2753 let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2754
2755 // Open a buffer as client B
2756 let buffer_b = project_b
2757 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2758 .await
2759 .unwrap();
2760 buffer_b.read_with(cx_b, |buf, _| {
2761 assert!(!buf.is_dirty());
2762 assert!(!buf.has_conflict());
2763 });
2764
2765 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
2766 .await
2767 .unwrap();
2768 buffer_b
2769 .condition(&cx_b, |buf, _| {
2770 buf.text() == "new contents" && !buf.is_dirty()
2771 })
2772 .await;
2773 buffer_b.read_with(cx_b, |buf, _| {
2774 assert!(!buf.has_conflict());
2775 });
2776 }
2777
2778 #[gpui::test(iterations = 10)]
2779 async fn test_editing_while_guest_opens_buffer(
2780 cx_a: &mut TestAppContext,
2781 cx_b: &mut TestAppContext,
2782 ) {
2783 cx_a.foreground().forbid_parking();
2784 let lang_registry = Arc::new(LanguageRegistry::test());
2785 let fs = FakeFs::new(cx_a.background());
2786
2787 // Connect to a server as 2 clients.
2788 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2789 let client_a = server.create_client(cx_a, "user_a").await;
2790 let mut client_b = server.create_client(cx_b, "user_b").await;
2791 server
2792 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2793 .await;
2794
2795 // Share a project as client A
2796 fs.insert_tree(
2797 "/dir",
2798 json!({
2799 "a.txt": "a-contents",
2800 }),
2801 )
2802 .await;
2803 let project_a = cx_a.update(|cx| {
2804 Project::local(
2805 client_a.clone(),
2806 client_a.user_store.clone(),
2807 lang_registry.clone(),
2808 fs.clone(),
2809 cx,
2810 )
2811 });
2812 let (worktree_a, _) = project_a
2813 .update(cx_a, |p, cx| {
2814 p.find_or_create_local_worktree("/dir", true, cx)
2815 })
2816 .await
2817 .unwrap();
2818 worktree_a
2819 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2820 .await;
2821 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2822
2823 // Join that project as client B
2824 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2825
2826 // Open a buffer as client A
2827 let buffer_a = project_a
2828 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2829 .await
2830 .unwrap();
2831
2832 // Start opening the same buffer as client B
2833 let buffer_b = cx_b
2834 .background()
2835 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2836
2837 // Edit the buffer as client A while client B is still opening it.
2838 cx_b.background().simulate_random_delay().await;
2839 buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "X")], cx));
2840 cx_b.background().simulate_random_delay().await;
2841 buffer_a.update(cx_a, |buf, cx| buf.edit([(1..1, "Y")], cx));
2842
2843 let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2844 let buffer_b = buffer_b.await.unwrap();
2845 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2846 }
2847
2848 #[gpui::test(iterations = 10)]
2849 async fn test_leaving_worktree_while_opening_buffer(
2850 cx_a: &mut TestAppContext,
2851 cx_b: &mut TestAppContext,
2852 ) {
2853 cx_a.foreground().forbid_parking();
2854 let lang_registry = Arc::new(LanguageRegistry::test());
2855 let fs = FakeFs::new(cx_a.background());
2856
2857 // Connect to a server as 2 clients.
2858 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2859 let client_a = server.create_client(cx_a, "user_a").await;
2860 let mut client_b = server.create_client(cx_b, "user_b").await;
2861 server
2862 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2863 .await;
2864
2865 // Share a project as client A
2866 fs.insert_tree(
2867 "/dir",
2868 json!({
2869 "a.txt": "a-contents",
2870 }),
2871 )
2872 .await;
2873 let project_a = cx_a.update(|cx| {
2874 Project::local(
2875 client_a.clone(),
2876 client_a.user_store.clone(),
2877 lang_registry.clone(),
2878 fs.clone(),
2879 cx,
2880 )
2881 });
2882 let (worktree_a, _) = project_a
2883 .update(cx_a, |p, cx| {
2884 p.find_or_create_local_worktree("/dir", true, cx)
2885 })
2886 .await
2887 .unwrap();
2888 worktree_a
2889 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2890 .await;
2891 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2892
2893 // Join that project as client B
2894 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2895
2896 // See that a guest has joined as client A.
2897 project_a
2898 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2899 .await;
2900
2901 // Begin opening a buffer as client B, but leave the project before the open completes.
2902 let buffer_b = cx_b
2903 .background()
2904 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2905 cx_b.update(|_| {
2906 drop(client_b.project.take());
2907 drop(project_b);
2908 });
2909 drop(buffer_b);
2910
2911 // See that the guest has left.
2912 project_a
2913 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2914 .await;
2915 }
2916
2917 #[gpui::test(iterations = 10)]
2918 async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2919 cx_a.foreground().forbid_parking();
2920 let lang_registry = Arc::new(LanguageRegistry::test());
2921 let fs = FakeFs::new(cx_a.background());
2922
2923 // Connect to a server as 2 clients.
2924 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2925 let client_a = server.create_client(cx_a, "user_a").await;
2926 let mut client_b = server.create_client(cx_b, "user_b").await;
2927 server
2928 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2929 .await;
2930
2931 // Share a project as client A
2932 fs.insert_tree(
2933 "/a",
2934 json!({
2935 "a.txt": "a-contents",
2936 "b.txt": "b-contents",
2937 }),
2938 )
2939 .await;
2940 let project_a = cx_a.update(|cx| {
2941 Project::local(
2942 client_a.clone(),
2943 client_a.user_store.clone(),
2944 lang_registry.clone(),
2945 fs.clone(),
2946 cx,
2947 )
2948 });
2949 let (worktree_a, _) = project_a
2950 .update(cx_a, |p, cx| {
2951 p.find_or_create_local_worktree("/a", true, cx)
2952 })
2953 .await
2954 .unwrap();
2955 worktree_a
2956 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2957 .await;
2958
2959 // Join that project as client B
2960 let _project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2961
2962 // Client A sees that a guest has joined.
2963 project_a
2964 .condition(cx_a, |p, _| p.collaborators().len() == 1)
2965 .await;
2966
2967 // Drop client B's connection and ensure client A observes client B leaving the project.
2968 client_b.disconnect(&cx_b.to_async()).unwrap();
2969 project_a
2970 .condition(cx_a, |p, _| p.collaborators().len() == 0)
2971 .await;
2972
2973 // Rejoin the project as client B
2974 let _project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2975
2976 // Client A sees that a guest has re-joined.
2977 project_a
2978 .condition(cx_a, |p, _| p.collaborators().len() == 1)
2979 .await;
2980
2981 // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2982 client_b.wait_for_current_user(cx_b).await;
2983 server.disconnect_client(client_b.current_user_id(cx_b));
2984 cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
2985 project_a
2986 .condition(cx_a, |p, _| p.collaborators().len() == 0)
2987 .await;
2988 }
2989
2990 #[gpui::test(iterations = 10)]
2991 async fn test_collaborating_with_diagnostics(
2992 deterministic: Arc<Deterministic>,
2993 cx_a: &mut TestAppContext,
2994 cx_b: &mut TestAppContext,
2995 cx_c: &mut TestAppContext,
2996 ) {
2997 deterministic.forbid_parking();
2998 let lang_registry = Arc::new(LanguageRegistry::test());
2999 let fs = FakeFs::new(cx_a.background());
3000
3001 // Set up a fake language server.
3002 let mut language = Language::new(
3003 LanguageConfig {
3004 name: "Rust".into(),
3005 path_suffixes: vec!["rs".to_string()],
3006 ..Default::default()
3007 },
3008 Some(tree_sitter_rust::language()),
3009 );
3010 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3011 lang_registry.add(Arc::new(language));
3012
3013 // Connect to a server as 2 clients.
3014 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3015 let client_a = server.create_client(cx_a, "user_a").await;
3016 let mut client_b = server.create_client(cx_b, "user_b").await;
3017 let mut client_c = server.create_client(cx_c, "user_c").await;
3018 server
3019 .make_contacts(vec![
3020 (&client_a, cx_a),
3021 (&client_b, cx_b),
3022 (&client_c, cx_c),
3023 ])
3024 .await;
3025
3026 // Share a project as client A
3027 fs.insert_tree(
3028 "/a",
3029 json!({
3030 "a.rs": "let one = two",
3031 "other.rs": "",
3032 }),
3033 )
3034 .await;
3035 let project_a = cx_a.update(|cx| {
3036 Project::local(
3037 client_a.clone(),
3038 client_a.user_store.clone(),
3039 lang_registry.clone(),
3040 fs.clone(),
3041 cx,
3042 )
3043 });
3044 let (worktree_a, _) = project_a
3045 .update(cx_a, |p, cx| {
3046 p.find_or_create_local_worktree("/a", true, cx)
3047 })
3048 .await
3049 .unwrap();
3050 worktree_a
3051 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3052 .await;
3053 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3054 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3055
3056 // Cause the language server to start.
3057 let _buffer = cx_a
3058 .background()
3059 .spawn(project_a.update(cx_a, |project, cx| {
3060 project.open_buffer(
3061 ProjectPath {
3062 worktree_id,
3063 path: Path::new("other.rs").into(),
3064 },
3065 cx,
3066 )
3067 }))
3068 .await
3069 .unwrap();
3070
3071 // Join the worktree as client B.
3072 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3073
3074 // Simulate a language server reporting errors for a file.
3075 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3076 fake_language_server
3077 .receive_notification::<lsp::notification::DidOpenTextDocument>()
3078 .await;
3079 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
3080 lsp::PublishDiagnosticsParams {
3081 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
3082 version: None,
3083 diagnostics: vec![lsp::Diagnostic {
3084 severity: Some(lsp::DiagnosticSeverity::ERROR),
3085 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
3086 message: "message 1".to_string(),
3087 ..Default::default()
3088 }],
3089 },
3090 );
3091
3092 // Wait for server to see the diagnostics update.
3093 deterministic.run_until_parked();
3094 {
3095 let store = server.store.read().await;
3096 let project = store.project(project_id).unwrap();
3097 let worktree = project.worktrees.get(&worktree_id.to_proto()).unwrap();
3098 assert!(!worktree.diagnostic_summaries.is_empty());
3099 }
3100
3101 // Ensure client B observes the new diagnostics.
3102 project_b.read_with(cx_b, |project, cx| {
3103 assert_eq!(
3104 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
3105 &[(
3106 ProjectPath {
3107 worktree_id,
3108 path: Arc::from(Path::new("a.rs")),
3109 },
3110 DiagnosticSummary {
3111 error_count: 1,
3112 warning_count: 0,
3113 ..Default::default()
3114 },
3115 )]
3116 )
3117 });
3118
3119 // Join project as client C and observe the diagnostics.
3120 let project_c = client_c.build_remote_project(&project_a, cx_a, cx_c).await;
3121 project_c.read_with(cx_c, |project, cx| {
3122 assert_eq!(
3123 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
3124 &[(
3125 ProjectPath {
3126 worktree_id,
3127 path: Arc::from(Path::new("a.rs")),
3128 },
3129 DiagnosticSummary {
3130 error_count: 1,
3131 warning_count: 0,
3132 ..Default::default()
3133 },
3134 )]
3135 )
3136 });
3137
3138 // Simulate a language server reporting more errors for a file.
3139 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
3140 lsp::PublishDiagnosticsParams {
3141 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
3142 version: None,
3143 diagnostics: vec![
3144 lsp::Diagnostic {
3145 severity: Some(lsp::DiagnosticSeverity::ERROR),
3146 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
3147 message: "message 1".to_string(),
3148 ..Default::default()
3149 },
3150 lsp::Diagnostic {
3151 severity: Some(lsp::DiagnosticSeverity::WARNING),
3152 range: lsp::Range::new(
3153 lsp::Position::new(0, 10),
3154 lsp::Position::new(0, 13),
3155 ),
3156 message: "message 2".to_string(),
3157 ..Default::default()
3158 },
3159 ],
3160 },
3161 );
3162
3163 // Clients B and C get the updated summaries
3164 deterministic.run_until_parked();
3165 project_b.read_with(cx_b, |project, cx| {
3166 assert_eq!(
3167 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
3168 [(
3169 ProjectPath {
3170 worktree_id,
3171 path: Arc::from(Path::new("a.rs")),
3172 },
3173 DiagnosticSummary {
3174 error_count: 1,
3175 warning_count: 1,
3176 ..Default::default()
3177 },
3178 )]
3179 );
3180 });
3181 project_c.read_with(cx_c, |project, cx| {
3182 assert_eq!(
3183 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
3184 [(
3185 ProjectPath {
3186 worktree_id,
3187 path: Arc::from(Path::new("a.rs")),
3188 },
3189 DiagnosticSummary {
3190 error_count: 1,
3191 warning_count: 1,
3192 ..Default::default()
3193 },
3194 )]
3195 );
3196 });
3197
3198 // Open the file with the errors on client B. They should be present.
3199 let buffer_b = cx_b
3200 .background()
3201 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3202 .await
3203 .unwrap();
3204
3205 buffer_b.read_with(cx_b, |buffer, _| {
3206 assert_eq!(
3207 buffer
3208 .snapshot()
3209 .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
3210 .map(|entry| entry)
3211 .collect::<Vec<_>>(),
3212 &[
3213 DiagnosticEntry {
3214 range: Point::new(0, 4)..Point::new(0, 7),
3215 diagnostic: Diagnostic {
3216 group_id: 0,
3217 message: "message 1".to_string(),
3218 severity: lsp::DiagnosticSeverity::ERROR,
3219 is_primary: true,
3220 ..Default::default()
3221 }
3222 },
3223 DiagnosticEntry {
3224 range: Point::new(0, 10)..Point::new(0, 13),
3225 diagnostic: Diagnostic {
3226 group_id: 1,
3227 severity: lsp::DiagnosticSeverity::WARNING,
3228 message: "message 2".to_string(),
3229 is_primary: true,
3230 ..Default::default()
3231 }
3232 }
3233 ]
3234 );
3235 });
3236
3237 // Simulate a language server reporting no errors for a file.
3238 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
3239 lsp::PublishDiagnosticsParams {
3240 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
3241 version: None,
3242 diagnostics: vec![],
3243 },
3244 );
3245 deterministic.run_until_parked();
3246 project_a.read_with(cx_a, |project, cx| {
3247 assert_eq!(project.diagnostic_summaries(cx).collect::<Vec<_>>(), [])
3248 });
3249 project_b.read_with(cx_b, |project, cx| {
3250 assert_eq!(project.diagnostic_summaries(cx).collect::<Vec<_>>(), [])
3251 });
3252 project_c.read_with(cx_c, |project, cx| {
3253 assert_eq!(project.diagnostic_summaries(cx).collect::<Vec<_>>(), [])
3254 });
3255 }
3256
3257 #[gpui::test(iterations = 10)]
3258 async fn test_collaborating_with_completion(
3259 cx_a: &mut TestAppContext,
3260 cx_b: &mut TestAppContext,
3261 ) {
3262 cx_a.foreground().forbid_parking();
3263 let lang_registry = Arc::new(LanguageRegistry::test());
3264 let fs = FakeFs::new(cx_a.background());
3265
3266 // Set up a fake language server.
3267 let mut language = Language::new(
3268 LanguageConfig {
3269 name: "Rust".into(),
3270 path_suffixes: vec!["rs".to_string()],
3271 ..Default::default()
3272 },
3273 Some(tree_sitter_rust::language()),
3274 );
3275 let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
3276 capabilities: lsp::ServerCapabilities {
3277 completion_provider: Some(lsp::CompletionOptions {
3278 trigger_characters: Some(vec![".".to_string()]),
3279 ..Default::default()
3280 }),
3281 ..Default::default()
3282 },
3283 ..Default::default()
3284 });
3285 lang_registry.add(Arc::new(language));
3286
3287 // Connect to a server as 2 clients.
3288 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3289 let client_a = server.create_client(cx_a, "user_a").await;
3290 let mut client_b = server.create_client(cx_b, "user_b").await;
3291 server
3292 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3293 .await;
3294
3295 // Share a project as client A
3296 fs.insert_tree(
3297 "/a",
3298 json!({
3299 "main.rs": "fn main() { a }",
3300 "other.rs": "",
3301 }),
3302 )
3303 .await;
3304 let project_a = cx_a.update(|cx| {
3305 Project::local(
3306 client_a.clone(),
3307 client_a.user_store.clone(),
3308 lang_registry.clone(),
3309 fs.clone(),
3310 cx,
3311 )
3312 });
3313 let (worktree_a, _) = project_a
3314 .update(cx_a, |p, cx| {
3315 p.find_or_create_local_worktree("/a", true, cx)
3316 })
3317 .await
3318 .unwrap();
3319 worktree_a
3320 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3321 .await;
3322 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3323
3324 // Join the worktree as client B.
3325 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3326
3327 // Open a file in an editor as the guest.
3328 let buffer_b = project_b
3329 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
3330 .await
3331 .unwrap();
3332 let (window_b, _) = cx_b.add_window(|_| EmptyView);
3333 let editor_b = cx_b.add_view(window_b, |cx| {
3334 Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
3335 });
3336
3337 let fake_language_server = fake_language_servers.next().await.unwrap();
3338 buffer_b
3339 .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
3340 .await;
3341
3342 // Type a completion trigger character as the guest.
3343 editor_b.update(cx_b, |editor, cx| {
3344 editor.change_selections(None, cx, |s| s.select_ranges([13..13]));
3345 editor.handle_input(&Input(".".into()), cx);
3346 cx.focus(&editor_b);
3347 });
3348
3349 // Receive a completion request as the host's language server.
3350 // Return some completions from the host's language server.
3351 cx_a.foreground().start_waiting();
3352 fake_language_server
3353 .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
3354 assert_eq!(
3355 params.text_document_position.text_document.uri,
3356 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3357 );
3358 assert_eq!(
3359 params.text_document_position.position,
3360 lsp::Position::new(0, 14),
3361 );
3362
3363 Ok(Some(lsp::CompletionResponse::Array(vec![
3364 lsp::CompletionItem {
3365 label: "first_method(…)".into(),
3366 detail: Some("fn(&mut self, B) -> C".into()),
3367 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3368 new_text: "first_method($1)".to_string(),
3369 range: lsp::Range::new(
3370 lsp::Position::new(0, 14),
3371 lsp::Position::new(0, 14),
3372 ),
3373 })),
3374 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3375 ..Default::default()
3376 },
3377 lsp::CompletionItem {
3378 label: "second_method(…)".into(),
3379 detail: Some("fn(&mut self, C) -> D<E>".into()),
3380 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3381 new_text: "second_method()".to_string(),
3382 range: lsp::Range::new(
3383 lsp::Position::new(0, 14),
3384 lsp::Position::new(0, 14),
3385 ),
3386 })),
3387 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3388 ..Default::default()
3389 },
3390 ])))
3391 })
3392 .next()
3393 .await
3394 .unwrap();
3395 cx_a.foreground().finish_waiting();
3396
3397 // Open the buffer on the host.
3398 let buffer_a = project_a
3399 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
3400 .await
3401 .unwrap();
3402 buffer_a
3403 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
3404 .await;
3405
3406 // Confirm a completion on the guest.
3407 editor_b
3408 .condition(&cx_b, |editor, _| editor.context_menu_visible())
3409 .await;
3410 editor_b.update(cx_b, |editor, cx| {
3411 editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
3412 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
3413 });
3414
3415 // Return a resolved completion from the host's language server.
3416 // The resolved completion has an additional text edit.
3417 fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
3418 |params, _| async move {
3419 assert_eq!(params.label, "first_method(…)");
3420 Ok(lsp::CompletionItem {
3421 label: "first_method(…)".into(),
3422 detail: Some("fn(&mut self, B) -> C".into()),
3423 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3424 new_text: "first_method($1)".to_string(),
3425 range: lsp::Range::new(
3426 lsp::Position::new(0, 14),
3427 lsp::Position::new(0, 14),
3428 ),
3429 })),
3430 additional_text_edits: Some(vec![lsp::TextEdit {
3431 new_text: "use d::SomeTrait;\n".to_string(),
3432 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
3433 }]),
3434 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3435 ..Default::default()
3436 })
3437 },
3438 );
3439
3440 // The additional edit is applied.
3441 buffer_a
3442 .condition(&cx_a, |buffer, _| {
3443 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3444 })
3445 .await;
3446 buffer_b
3447 .condition(&cx_b, |buffer, _| {
3448 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3449 })
3450 .await;
3451 }
3452
3453 #[gpui::test(iterations = 10)]
3454 async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3455 cx_a.foreground().forbid_parking();
3456 let lang_registry = Arc::new(LanguageRegistry::test());
3457 let fs = FakeFs::new(cx_a.background());
3458
3459 // Connect to a server as 2 clients.
3460 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3461 let client_a = server.create_client(cx_a, "user_a").await;
3462 let mut client_b = server.create_client(cx_b, "user_b").await;
3463 server
3464 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3465 .await;
3466
3467 // Share a project as client A
3468 fs.insert_tree(
3469 "/a",
3470 json!({
3471 "a.rs": "let one = 1;",
3472 }),
3473 )
3474 .await;
3475 let project_a = cx_a.update(|cx| {
3476 Project::local(
3477 client_a.clone(),
3478 client_a.user_store.clone(),
3479 lang_registry.clone(),
3480 fs.clone(),
3481 cx,
3482 )
3483 });
3484 let (worktree_a, _) = project_a
3485 .update(cx_a, |p, cx| {
3486 p.find_or_create_local_worktree("/a", true, cx)
3487 })
3488 .await
3489 .unwrap();
3490 worktree_a
3491 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3492 .await;
3493 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3494 let buffer_a = project_a
3495 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
3496 .await
3497 .unwrap();
3498
3499 // Join the worktree as client B.
3500 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3501
3502 let buffer_b = cx_b
3503 .background()
3504 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3505 .await
3506 .unwrap();
3507 buffer_b.update(cx_b, |buffer, cx| {
3508 buffer.edit([(4..7, "six")], cx);
3509 buffer.edit([(10..11, "6")], cx);
3510 assert_eq!(buffer.text(), "let six = 6;");
3511 assert!(buffer.is_dirty());
3512 assert!(!buffer.has_conflict());
3513 });
3514 buffer_a
3515 .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
3516 .await;
3517
3518 fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
3519 .await
3520 .unwrap();
3521 buffer_a
3522 .condition(cx_a, |buffer, _| buffer.has_conflict())
3523 .await;
3524 buffer_b
3525 .condition(cx_b, |buffer, _| buffer.has_conflict())
3526 .await;
3527
3528 project_b
3529 .update(cx_b, |project, cx| {
3530 project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
3531 })
3532 .await
3533 .unwrap();
3534 buffer_a.read_with(cx_a, |buffer, _| {
3535 assert_eq!(buffer.text(), "let seven = 7;");
3536 assert!(!buffer.is_dirty());
3537 assert!(!buffer.has_conflict());
3538 });
3539 buffer_b.read_with(cx_b, |buffer, _| {
3540 assert_eq!(buffer.text(), "let seven = 7;");
3541 assert!(!buffer.is_dirty());
3542 assert!(!buffer.has_conflict());
3543 });
3544
3545 buffer_a.update(cx_a, |buffer, cx| {
3546 // Undoing on the host is a no-op when the reload was initiated by the guest.
3547 buffer.undo(cx);
3548 assert_eq!(buffer.text(), "let seven = 7;");
3549 assert!(!buffer.is_dirty());
3550 assert!(!buffer.has_conflict());
3551 });
3552 buffer_b.update(cx_b, |buffer, cx| {
3553 // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
3554 buffer.undo(cx);
3555 assert_eq!(buffer.text(), "let six = 6;");
3556 assert!(buffer.is_dirty());
3557 assert!(!buffer.has_conflict());
3558 });
3559 }
3560
3561 #[gpui::test(iterations = 10)]
3562 async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3563 cx_a.foreground().forbid_parking();
3564 let lang_registry = Arc::new(LanguageRegistry::test());
3565 let fs = FakeFs::new(cx_a.background());
3566
3567 // Set up a fake language server.
3568 let mut language = Language::new(
3569 LanguageConfig {
3570 name: "Rust".into(),
3571 path_suffixes: vec!["rs".to_string()],
3572 ..Default::default()
3573 },
3574 Some(tree_sitter_rust::language()),
3575 );
3576 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3577 lang_registry.add(Arc::new(language));
3578
3579 // Connect to a server as 2 clients.
3580 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3581 let client_a = server.create_client(cx_a, "user_a").await;
3582 let mut client_b = server.create_client(cx_b, "user_b").await;
3583 server
3584 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3585 .await;
3586
3587 // Share a project as client A
3588 fs.insert_tree(
3589 "/a",
3590 json!({
3591 "a.rs": "let one = two",
3592 }),
3593 )
3594 .await;
3595 let project_a = cx_a.update(|cx| {
3596 Project::local(
3597 client_a.clone(),
3598 client_a.user_store.clone(),
3599 lang_registry.clone(),
3600 fs.clone(),
3601 cx,
3602 )
3603 });
3604 let (worktree_a, _) = project_a
3605 .update(cx_a, |p, cx| {
3606 p.find_or_create_local_worktree("/a", true, cx)
3607 })
3608 .await
3609 .unwrap();
3610 worktree_a
3611 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3612 .await;
3613 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3614
3615 // Join the project as client B.
3616 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3617
3618 let buffer_b = cx_b
3619 .background()
3620 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3621 .await
3622 .unwrap();
3623
3624 let fake_language_server = fake_language_servers.next().await.unwrap();
3625 fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
3626 Ok(Some(vec![
3627 lsp::TextEdit {
3628 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
3629 new_text: "h".to_string(),
3630 },
3631 lsp::TextEdit {
3632 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
3633 new_text: "y".to_string(),
3634 },
3635 ]))
3636 });
3637
3638 project_b
3639 .update(cx_b, |project, cx| {
3640 project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
3641 })
3642 .await
3643 .unwrap();
3644 assert_eq!(
3645 buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
3646 "let honey = two"
3647 );
3648 }
3649
3650 #[gpui::test(iterations = 10)]
3651 async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3652 cx_a.foreground().forbid_parking();
3653 let lang_registry = Arc::new(LanguageRegistry::test());
3654 let fs = FakeFs::new(cx_a.background());
3655 fs.insert_tree(
3656 "/root-1",
3657 json!({
3658 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
3659 }),
3660 )
3661 .await;
3662 fs.insert_tree(
3663 "/root-2",
3664 json!({
3665 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
3666 }),
3667 )
3668 .await;
3669
3670 // Set up a fake language server.
3671 let mut language = Language::new(
3672 LanguageConfig {
3673 name: "Rust".into(),
3674 path_suffixes: vec!["rs".to_string()],
3675 ..Default::default()
3676 },
3677 Some(tree_sitter_rust::language()),
3678 );
3679 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3680 lang_registry.add(Arc::new(language));
3681
3682 // Connect to a server as 2 clients.
3683 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3684 let client_a = server.create_client(cx_a, "user_a").await;
3685 let mut client_b = server.create_client(cx_b, "user_b").await;
3686 server
3687 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3688 .await;
3689
3690 // Share a project as client A
3691 let project_a = cx_a.update(|cx| {
3692 Project::local(
3693 client_a.clone(),
3694 client_a.user_store.clone(),
3695 lang_registry.clone(),
3696 fs.clone(),
3697 cx,
3698 )
3699 });
3700 let (worktree_a, _) = project_a
3701 .update(cx_a, |p, cx| {
3702 p.find_or_create_local_worktree("/root-1", true, cx)
3703 })
3704 .await
3705 .unwrap();
3706 worktree_a
3707 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3708 .await;
3709 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3710
3711 // Join the project as client B.
3712 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3713
3714 // Open the file on client B.
3715 let buffer_b = cx_b
3716 .background()
3717 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3718 .await
3719 .unwrap();
3720
3721 // Request the definition of a symbol as the guest.
3722 let fake_language_server = fake_language_servers.next().await.unwrap();
3723 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3724 |_, _| async move {
3725 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3726 lsp::Location::new(
3727 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3728 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3729 ),
3730 )))
3731 },
3732 );
3733
3734 let definitions_1 = project_b
3735 .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
3736 .await
3737 .unwrap();
3738 cx_b.read(|cx| {
3739 assert_eq!(definitions_1.len(), 1);
3740 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3741 let target_buffer = definitions_1[0].buffer.read(cx);
3742 assert_eq!(
3743 target_buffer.text(),
3744 "const TWO: usize = 2;\nconst THREE: usize = 3;"
3745 );
3746 assert_eq!(
3747 definitions_1[0].range.to_point(target_buffer),
3748 Point::new(0, 6)..Point::new(0, 9)
3749 );
3750 });
3751
3752 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
3753 // the previous call to `definition`.
3754 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3755 |_, _| async move {
3756 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3757 lsp::Location::new(
3758 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3759 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
3760 ),
3761 )))
3762 },
3763 );
3764
3765 let definitions_2 = project_b
3766 .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
3767 .await
3768 .unwrap();
3769 cx_b.read(|cx| {
3770 assert_eq!(definitions_2.len(), 1);
3771 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3772 let target_buffer = definitions_2[0].buffer.read(cx);
3773 assert_eq!(
3774 target_buffer.text(),
3775 "const TWO: usize = 2;\nconst THREE: usize = 3;"
3776 );
3777 assert_eq!(
3778 definitions_2[0].range.to_point(target_buffer),
3779 Point::new(1, 6)..Point::new(1, 11)
3780 );
3781 });
3782 assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
3783 }
3784
3785 #[gpui::test(iterations = 10)]
3786 async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3787 cx_a.foreground().forbid_parking();
3788 let lang_registry = Arc::new(LanguageRegistry::test());
3789 let fs = FakeFs::new(cx_a.background());
3790 fs.insert_tree(
3791 "/root-1",
3792 json!({
3793 "one.rs": "const ONE: usize = 1;",
3794 "two.rs": "const TWO: usize = one::ONE + one::ONE;",
3795 }),
3796 )
3797 .await;
3798 fs.insert_tree(
3799 "/root-2",
3800 json!({
3801 "three.rs": "const THREE: usize = two::TWO + one::ONE;",
3802 }),
3803 )
3804 .await;
3805
3806 // Set up a fake language server.
3807 let mut language = Language::new(
3808 LanguageConfig {
3809 name: "Rust".into(),
3810 path_suffixes: vec!["rs".to_string()],
3811 ..Default::default()
3812 },
3813 Some(tree_sitter_rust::language()),
3814 );
3815 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3816 lang_registry.add(Arc::new(language));
3817
3818 // Connect to a server as 2 clients.
3819 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3820 let client_a = server.create_client(cx_a, "user_a").await;
3821 let mut client_b = server.create_client(cx_b, "user_b").await;
3822 server
3823 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3824 .await;
3825
3826 // Share a project as client A
3827 let project_a = cx_a.update(|cx| {
3828 Project::local(
3829 client_a.clone(),
3830 client_a.user_store.clone(),
3831 lang_registry.clone(),
3832 fs.clone(),
3833 cx,
3834 )
3835 });
3836 let (worktree_a, _) = project_a
3837 .update(cx_a, |p, cx| {
3838 p.find_or_create_local_worktree("/root-1", true, cx)
3839 })
3840 .await
3841 .unwrap();
3842 worktree_a
3843 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3844 .await;
3845 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3846
3847 // Join the worktree as client B.
3848 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3849
3850 // Open the file on client B.
3851 let buffer_b = cx_b
3852 .background()
3853 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3854 .await
3855 .unwrap();
3856
3857 // Request references to a symbol as the guest.
3858 let fake_language_server = fake_language_servers.next().await.unwrap();
3859 fake_language_server.handle_request::<lsp::request::References, _, _>(
3860 |params, _| async move {
3861 assert_eq!(
3862 params.text_document_position.text_document.uri.as_str(),
3863 "file:///root-1/one.rs"
3864 );
3865 Ok(Some(vec![
3866 lsp::Location {
3867 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3868 range: lsp::Range::new(
3869 lsp::Position::new(0, 24),
3870 lsp::Position::new(0, 27),
3871 ),
3872 },
3873 lsp::Location {
3874 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3875 range: lsp::Range::new(
3876 lsp::Position::new(0, 35),
3877 lsp::Position::new(0, 38),
3878 ),
3879 },
3880 lsp::Location {
3881 uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3882 range: lsp::Range::new(
3883 lsp::Position::new(0, 37),
3884 lsp::Position::new(0, 40),
3885 ),
3886 },
3887 ]))
3888 },
3889 );
3890
3891 let references = project_b
3892 .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3893 .await
3894 .unwrap();
3895 cx_b.read(|cx| {
3896 assert_eq!(references.len(), 3);
3897 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3898
3899 let two_buffer = references[0].buffer.read(cx);
3900 let three_buffer = references[2].buffer.read(cx);
3901 assert_eq!(
3902 two_buffer.file().unwrap().path().as_ref(),
3903 Path::new("two.rs")
3904 );
3905 assert_eq!(references[1].buffer, references[0].buffer);
3906 assert_eq!(
3907 three_buffer.file().unwrap().full_path(cx),
3908 Path::new("three.rs")
3909 );
3910
3911 assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3912 assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3913 assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3914 });
3915 }
3916
3917 #[gpui::test(iterations = 10)]
3918 async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3919 cx_a.foreground().forbid_parking();
3920 let lang_registry = Arc::new(LanguageRegistry::test());
3921 let fs = FakeFs::new(cx_a.background());
3922 fs.insert_tree(
3923 "/root-1",
3924 json!({
3925 "a": "hello world",
3926 "b": "goodnight moon",
3927 "c": "a world of goo",
3928 "d": "world champion of clown world",
3929 }),
3930 )
3931 .await;
3932 fs.insert_tree(
3933 "/root-2",
3934 json!({
3935 "e": "disney world is fun",
3936 }),
3937 )
3938 .await;
3939
3940 // Connect to a server as 2 clients.
3941 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3942 let client_a = server.create_client(cx_a, "user_a").await;
3943 let mut client_b = server.create_client(cx_b, "user_b").await;
3944 server
3945 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3946 .await;
3947
3948 // Share a project as client A
3949 let project_a = cx_a.update(|cx| {
3950 Project::local(
3951 client_a.clone(),
3952 client_a.user_store.clone(),
3953 lang_registry.clone(),
3954 fs.clone(),
3955 cx,
3956 )
3957 });
3958
3959 let (worktree_1, _) = project_a
3960 .update(cx_a, |p, cx| {
3961 p.find_or_create_local_worktree("/root-1", true, cx)
3962 })
3963 .await
3964 .unwrap();
3965 worktree_1
3966 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3967 .await;
3968 let (worktree_2, _) = project_a
3969 .update(cx_a, |p, cx| {
3970 p.find_or_create_local_worktree("/root-2", true, cx)
3971 })
3972 .await
3973 .unwrap();
3974 worktree_2
3975 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3976 .await;
3977
3978 // Join the worktree as client B.
3979 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3980 let results = project_b
3981 .update(cx_b, |project, cx| {
3982 project.search(SearchQuery::text("world", false, false), cx)
3983 })
3984 .await
3985 .unwrap();
3986
3987 let mut ranges_by_path = results
3988 .into_iter()
3989 .map(|(buffer, ranges)| {
3990 buffer.read_with(cx_b, |buffer, cx| {
3991 let path = buffer.file().unwrap().full_path(cx);
3992 let offset_ranges = ranges
3993 .into_iter()
3994 .map(|range| range.to_offset(buffer))
3995 .collect::<Vec<_>>();
3996 (path, offset_ranges)
3997 })
3998 })
3999 .collect::<Vec<_>>();
4000 ranges_by_path.sort_by_key(|(path, _)| path.clone());
4001
4002 assert_eq!(
4003 ranges_by_path,
4004 &[
4005 (PathBuf::from("root-1/a"), vec![6..11]),
4006 (PathBuf::from("root-1/c"), vec![2..7]),
4007 (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
4008 (PathBuf::from("root-2/e"), vec![7..12]),
4009 ]
4010 );
4011 }
4012
4013 #[gpui::test(iterations = 10)]
4014 async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4015 cx_a.foreground().forbid_parking();
4016 let lang_registry = Arc::new(LanguageRegistry::test());
4017 let fs = FakeFs::new(cx_a.background());
4018 fs.insert_tree(
4019 "/root-1",
4020 json!({
4021 "main.rs": "fn double(number: i32) -> i32 { number + number }",
4022 }),
4023 )
4024 .await;
4025
4026 // Set up a fake language server.
4027 let mut language = Language::new(
4028 LanguageConfig {
4029 name: "Rust".into(),
4030 path_suffixes: vec!["rs".to_string()],
4031 ..Default::default()
4032 },
4033 Some(tree_sitter_rust::language()),
4034 );
4035 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4036 lang_registry.add(Arc::new(language));
4037
4038 // Connect to a server as 2 clients.
4039 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4040 let client_a = server.create_client(cx_a, "user_a").await;
4041 let mut client_b = server.create_client(cx_b, "user_b").await;
4042 server
4043 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4044 .await;
4045
4046 // Share a project as client A
4047 let project_a = cx_a.update(|cx| {
4048 Project::local(
4049 client_a.clone(),
4050 client_a.user_store.clone(),
4051 lang_registry.clone(),
4052 fs.clone(),
4053 cx,
4054 )
4055 });
4056 let (worktree_a, _) = project_a
4057 .update(cx_a, |p, cx| {
4058 p.find_or_create_local_worktree("/root-1", true, cx)
4059 })
4060 .await
4061 .unwrap();
4062 worktree_a
4063 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4064 .await;
4065 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4066
4067 // Join the worktree as client B.
4068 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4069
4070 // Open the file on client B.
4071 let buffer_b = cx_b
4072 .background()
4073 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
4074 .await
4075 .unwrap();
4076
4077 // Request document highlights as the guest.
4078 let fake_language_server = fake_language_servers.next().await.unwrap();
4079 fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
4080 |params, _| async move {
4081 assert_eq!(
4082 params
4083 .text_document_position_params
4084 .text_document
4085 .uri
4086 .as_str(),
4087 "file:///root-1/main.rs"
4088 );
4089 assert_eq!(
4090 params.text_document_position_params.position,
4091 lsp::Position::new(0, 34)
4092 );
4093 Ok(Some(vec![
4094 lsp::DocumentHighlight {
4095 kind: Some(lsp::DocumentHighlightKind::WRITE),
4096 range: lsp::Range::new(
4097 lsp::Position::new(0, 10),
4098 lsp::Position::new(0, 16),
4099 ),
4100 },
4101 lsp::DocumentHighlight {
4102 kind: Some(lsp::DocumentHighlightKind::READ),
4103 range: lsp::Range::new(
4104 lsp::Position::new(0, 32),
4105 lsp::Position::new(0, 38),
4106 ),
4107 },
4108 lsp::DocumentHighlight {
4109 kind: Some(lsp::DocumentHighlightKind::READ),
4110 range: lsp::Range::new(
4111 lsp::Position::new(0, 41),
4112 lsp::Position::new(0, 47),
4113 ),
4114 },
4115 ]))
4116 },
4117 );
4118
4119 let highlights = project_b
4120 .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
4121 .await
4122 .unwrap();
4123 buffer_b.read_with(cx_b, |buffer, _| {
4124 let snapshot = buffer.snapshot();
4125
4126 let highlights = highlights
4127 .into_iter()
4128 .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
4129 .collect::<Vec<_>>();
4130 assert_eq!(
4131 highlights,
4132 &[
4133 (lsp::DocumentHighlightKind::WRITE, 10..16),
4134 (lsp::DocumentHighlightKind::READ, 32..38),
4135 (lsp::DocumentHighlightKind::READ, 41..47)
4136 ]
4137 )
4138 });
4139 }
4140
4141 #[gpui::test(iterations = 10)]
4142 async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4143 cx_a.foreground().forbid_parking();
4144 let lang_registry = Arc::new(LanguageRegistry::test());
4145 let fs = FakeFs::new(cx_a.background());
4146 fs.insert_tree(
4147 "/code",
4148 json!({
4149 "crate-1": {
4150 "one.rs": "const ONE: usize = 1;",
4151 },
4152 "crate-2": {
4153 "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
4154 },
4155 "private": {
4156 "passwords.txt": "the-password",
4157 }
4158 }),
4159 )
4160 .await;
4161
4162 // Set up a fake language server.
4163 let mut language = Language::new(
4164 LanguageConfig {
4165 name: "Rust".into(),
4166 path_suffixes: vec!["rs".to_string()],
4167 ..Default::default()
4168 },
4169 Some(tree_sitter_rust::language()),
4170 );
4171 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4172 lang_registry.add(Arc::new(language));
4173
4174 // Connect to a server as 2 clients.
4175 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4176 let client_a = server.create_client(cx_a, "user_a").await;
4177 let mut client_b = server.create_client(cx_b, "user_b").await;
4178 server
4179 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4180 .await;
4181
4182 // Share a project as client A
4183 let project_a = cx_a.update(|cx| {
4184 Project::local(
4185 client_a.clone(),
4186 client_a.user_store.clone(),
4187 lang_registry.clone(),
4188 fs.clone(),
4189 cx,
4190 )
4191 });
4192 let (worktree_a, _) = project_a
4193 .update(cx_a, |p, cx| {
4194 p.find_or_create_local_worktree("/code/crate-1", true, cx)
4195 })
4196 .await
4197 .unwrap();
4198 worktree_a
4199 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4200 .await;
4201 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4202
4203 // Join the worktree as client B.
4204 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4205
4206 // Cause the language server to start.
4207 let _buffer = cx_b
4208 .background()
4209 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
4210 .await
4211 .unwrap();
4212
4213 let fake_language_server = fake_language_servers.next().await.unwrap();
4214 fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
4215 |_, _| async move {
4216 #[allow(deprecated)]
4217 Ok(Some(vec![lsp::SymbolInformation {
4218 name: "TWO".into(),
4219 location: lsp::Location {
4220 uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
4221 range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
4222 },
4223 kind: lsp::SymbolKind::CONSTANT,
4224 tags: None,
4225 container_name: None,
4226 deprecated: None,
4227 }]))
4228 },
4229 );
4230
4231 // Request the definition of a symbol as the guest.
4232 let symbols = project_b
4233 .update(cx_b, |p, cx| p.symbols("two", cx))
4234 .await
4235 .unwrap();
4236 assert_eq!(symbols.len(), 1);
4237 assert_eq!(symbols[0].name, "TWO");
4238
4239 // Open one of the returned symbols.
4240 let buffer_b_2 = project_b
4241 .update(cx_b, |project, cx| {
4242 project.open_buffer_for_symbol(&symbols[0], cx)
4243 })
4244 .await
4245 .unwrap();
4246 buffer_b_2.read_with(cx_b, |buffer, _| {
4247 assert_eq!(
4248 buffer.file().unwrap().path().as_ref(),
4249 Path::new("../crate-2/two.rs")
4250 );
4251 });
4252
4253 // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
4254 let mut fake_symbol = symbols[0].clone();
4255 fake_symbol.path = Path::new("/code/secrets").into();
4256 let error = project_b
4257 .update(cx_b, |project, cx| {
4258 project.open_buffer_for_symbol(&fake_symbol, cx)
4259 })
4260 .await
4261 .unwrap_err();
4262 assert!(error.to_string().contains("invalid symbol signature"));
4263 }
4264
4265 #[gpui::test(iterations = 10)]
4266 async fn test_open_buffer_while_getting_definition_pointing_to_it(
4267 cx_a: &mut TestAppContext,
4268 cx_b: &mut TestAppContext,
4269 mut rng: StdRng,
4270 ) {
4271 cx_a.foreground().forbid_parking();
4272 let lang_registry = Arc::new(LanguageRegistry::test());
4273 let fs = FakeFs::new(cx_a.background());
4274 fs.insert_tree(
4275 "/root",
4276 json!({
4277 "a.rs": "const ONE: usize = b::TWO;",
4278 "b.rs": "const TWO: usize = 2",
4279 }),
4280 )
4281 .await;
4282
4283 // Set up a fake language server.
4284 let mut language = Language::new(
4285 LanguageConfig {
4286 name: "Rust".into(),
4287 path_suffixes: vec!["rs".to_string()],
4288 ..Default::default()
4289 },
4290 Some(tree_sitter_rust::language()),
4291 );
4292 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4293 lang_registry.add(Arc::new(language));
4294
4295 // Connect to a server as 2 clients.
4296 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4297 let client_a = server.create_client(cx_a, "user_a").await;
4298 let mut client_b = server.create_client(cx_b, "user_b").await;
4299 server
4300 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4301 .await;
4302
4303 // Share a project as client A
4304 let project_a = cx_a.update(|cx| {
4305 Project::local(
4306 client_a.clone(),
4307 client_a.user_store.clone(),
4308 lang_registry.clone(),
4309 fs.clone(),
4310 cx,
4311 )
4312 });
4313
4314 let (worktree_a, _) = project_a
4315 .update(cx_a, |p, cx| {
4316 p.find_or_create_local_worktree("/root", true, cx)
4317 })
4318 .await
4319 .unwrap();
4320 worktree_a
4321 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4322 .await;
4323 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4324
4325 // Join the project as client B.
4326 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4327
4328 let buffer_b1 = cx_b
4329 .background()
4330 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
4331 .await
4332 .unwrap();
4333
4334 let fake_language_server = fake_language_servers.next().await.unwrap();
4335 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
4336 |_, _| async move {
4337 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
4338 lsp::Location::new(
4339 lsp::Url::from_file_path("/root/b.rs").unwrap(),
4340 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
4341 ),
4342 )))
4343 },
4344 );
4345
4346 let definitions;
4347 let buffer_b2;
4348 if rng.gen() {
4349 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4350 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4351 } else {
4352 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4353 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4354 }
4355
4356 let buffer_b2 = buffer_b2.await.unwrap();
4357 let definitions = definitions.await.unwrap();
4358 assert_eq!(definitions.len(), 1);
4359 assert_eq!(definitions[0].buffer, buffer_b2);
4360 }
4361
4362 #[gpui::test(iterations = 10)]
4363 async fn test_collaborating_with_code_actions(
4364 cx_a: &mut TestAppContext,
4365 cx_b: &mut TestAppContext,
4366 ) {
4367 cx_a.foreground().forbid_parking();
4368 let lang_registry = Arc::new(LanguageRegistry::test());
4369 let fs = FakeFs::new(cx_a.background());
4370 cx_b.update(|cx| editor::init(cx));
4371
4372 // Set up a fake language server.
4373 let mut language = Language::new(
4374 LanguageConfig {
4375 name: "Rust".into(),
4376 path_suffixes: vec!["rs".to_string()],
4377 ..Default::default()
4378 },
4379 Some(tree_sitter_rust::language()),
4380 );
4381 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4382 lang_registry.add(Arc::new(language));
4383
4384 // Connect to a server as 2 clients.
4385 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4386 let client_a = server.create_client(cx_a, "user_a").await;
4387 let mut client_b = server.create_client(cx_b, "user_b").await;
4388 server
4389 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4390 .await;
4391
4392 // Share a project as client A
4393 fs.insert_tree(
4394 "/a",
4395 json!({
4396 "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
4397 "other.rs": "pub fn foo() -> usize { 4 }",
4398 }),
4399 )
4400 .await;
4401 let project_a = cx_a.update(|cx| {
4402 Project::local(
4403 client_a.clone(),
4404 client_a.user_store.clone(),
4405 lang_registry.clone(),
4406 fs.clone(),
4407 cx,
4408 )
4409 });
4410 let (worktree_a, _) = project_a
4411 .update(cx_a, |p, cx| {
4412 p.find_or_create_local_worktree("/a", true, cx)
4413 })
4414 .await
4415 .unwrap();
4416 worktree_a
4417 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4418 .await;
4419 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4420
4421 // Join the project as client B.
4422 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4423 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(project_b.clone(), cx));
4424 let editor_b = workspace_b
4425 .update(cx_b, |workspace, cx| {
4426 workspace.open_path((worktree_id, "main.rs"), true, cx)
4427 })
4428 .await
4429 .unwrap()
4430 .downcast::<Editor>()
4431 .unwrap();
4432
4433 let mut fake_language_server = fake_language_servers.next().await.unwrap();
4434 fake_language_server
4435 .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4436 assert_eq!(
4437 params.text_document.uri,
4438 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4439 );
4440 assert_eq!(params.range.start, lsp::Position::new(0, 0));
4441 assert_eq!(params.range.end, lsp::Position::new(0, 0));
4442 Ok(None)
4443 })
4444 .next()
4445 .await;
4446
4447 // Move cursor to a location that contains code actions.
4448 editor_b.update(cx_b, |editor, cx| {
4449 editor.change_selections(None, cx, |s| {
4450 s.select_ranges([Point::new(1, 31)..Point::new(1, 31)])
4451 });
4452 cx.focus(&editor_b);
4453 });
4454
4455 fake_language_server
4456 .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4457 assert_eq!(
4458 params.text_document.uri,
4459 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4460 );
4461 assert_eq!(params.range.start, lsp::Position::new(1, 31));
4462 assert_eq!(params.range.end, lsp::Position::new(1, 31));
4463
4464 Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
4465 lsp::CodeAction {
4466 title: "Inline into all callers".to_string(),
4467 edit: Some(lsp::WorkspaceEdit {
4468 changes: Some(
4469 [
4470 (
4471 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4472 vec![lsp::TextEdit::new(
4473 lsp::Range::new(
4474 lsp::Position::new(1, 22),
4475 lsp::Position::new(1, 34),
4476 ),
4477 "4".to_string(),
4478 )],
4479 ),
4480 (
4481 lsp::Url::from_file_path("/a/other.rs").unwrap(),
4482 vec![lsp::TextEdit::new(
4483 lsp::Range::new(
4484 lsp::Position::new(0, 0),
4485 lsp::Position::new(0, 27),
4486 ),
4487 "".to_string(),
4488 )],
4489 ),
4490 ]
4491 .into_iter()
4492 .collect(),
4493 ),
4494 ..Default::default()
4495 }),
4496 data: Some(json!({
4497 "codeActionParams": {
4498 "range": {
4499 "start": {"line": 1, "column": 31},
4500 "end": {"line": 1, "column": 31},
4501 }
4502 }
4503 })),
4504 ..Default::default()
4505 },
4506 )]))
4507 })
4508 .next()
4509 .await;
4510
4511 // Toggle code actions and wait for them to display.
4512 editor_b.update(cx_b, |editor, cx| {
4513 editor.toggle_code_actions(
4514 &ToggleCodeActions {
4515 deployed_from_indicator: false,
4516 },
4517 cx,
4518 );
4519 });
4520 editor_b
4521 .condition(&cx_b, |editor, _| editor.context_menu_visible())
4522 .await;
4523
4524 fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
4525
4526 // Confirming the code action will trigger a resolve request.
4527 let confirm_action = workspace_b
4528 .update(cx_b, |workspace, cx| {
4529 Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
4530 })
4531 .unwrap();
4532 fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
4533 |_, _| async move {
4534 Ok(lsp::CodeAction {
4535 title: "Inline into all callers".to_string(),
4536 edit: Some(lsp::WorkspaceEdit {
4537 changes: Some(
4538 [
4539 (
4540 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4541 vec![lsp::TextEdit::new(
4542 lsp::Range::new(
4543 lsp::Position::new(1, 22),
4544 lsp::Position::new(1, 34),
4545 ),
4546 "4".to_string(),
4547 )],
4548 ),
4549 (
4550 lsp::Url::from_file_path("/a/other.rs").unwrap(),
4551 vec![lsp::TextEdit::new(
4552 lsp::Range::new(
4553 lsp::Position::new(0, 0),
4554 lsp::Position::new(0, 27),
4555 ),
4556 "".to_string(),
4557 )],
4558 ),
4559 ]
4560 .into_iter()
4561 .collect(),
4562 ),
4563 ..Default::default()
4564 }),
4565 ..Default::default()
4566 })
4567 },
4568 );
4569
4570 // After the action is confirmed, an editor containing both modified files is opened.
4571 confirm_action.await.unwrap();
4572 let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4573 workspace
4574 .active_item(cx)
4575 .unwrap()
4576 .downcast::<Editor>()
4577 .unwrap()
4578 });
4579 code_action_editor.update(cx_b, |editor, cx| {
4580 assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4581 editor.undo(&Undo, cx);
4582 assert_eq!(
4583 editor.text(cx),
4584 "mod other;\nfn main() { let foo = other::foo(); }\npub fn foo() -> usize { 4 }"
4585 );
4586 editor.redo(&Redo, cx);
4587 assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4588 });
4589 }
4590
4591 #[gpui::test(iterations = 10)]
4592 async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4593 cx_a.foreground().forbid_parking();
4594 let lang_registry = Arc::new(LanguageRegistry::test());
4595 let fs = FakeFs::new(cx_a.background());
4596 cx_b.update(|cx| editor::init(cx));
4597
4598 // Set up a fake language server.
4599 let mut language = Language::new(
4600 LanguageConfig {
4601 name: "Rust".into(),
4602 path_suffixes: vec!["rs".to_string()],
4603 ..Default::default()
4604 },
4605 Some(tree_sitter_rust::language()),
4606 );
4607 let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
4608 capabilities: lsp::ServerCapabilities {
4609 rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
4610 prepare_provider: Some(true),
4611 work_done_progress_options: Default::default(),
4612 })),
4613 ..Default::default()
4614 },
4615 ..Default::default()
4616 });
4617 lang_registry.add(Arc::new(language));
4618
4619 // Connect to a server as 2 clients.
4620 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4621 let client_a = server.create_client(cx_a, "user_a").await;
4622 let mut client_b = server.create_client(cx_b, "user_b").await;
4623 server
4624 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4625 .await;
4626
4627 // Share a project as client A
4628 fs.insert_tree(
4629 "/dir",
4630 json!({
4631 "one.rs": "const ONE: usize = 1;",
4632 "two.rs": "const TWO: usize = one::ONE + one::ONE;"
4633 }),
4634 )
4635 .await;
4636 let project_a = cx_a.update(|cx| {
4637 Project::local(
4638 client_a.clone(),
4639 client_a.user_store.clone(),
4640 lang_registry.clone(),
4641 fs.clone(),
4642 cx,
4643 )
4644 });
4645 let (worktree_a, _) = project_a
4646 .update(cx_a, |p, cx| {
4647 p.find_or_create_local_worktree("/dir", true, cx)
4648 })
4649 .await
4650 .unwrap();
4651 worktree_a
4652 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4653 .await;
4654 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4655
4656 // Join the worktree as client B.
4657 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4658 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(project_b.clone(), cx));
4659 let editor_b = workspace_b
4660 .update(cx_b, |workspace, cx| {
4661 workspace.open_path((worktree_id, "one.rs"), true, cx)
4662 })
4663 .await
4664 .unwrap()
4665 .downcast::<Editor>()
4666 .unwrap();
4667 let fake_language_server = fake_language_servers.next().await.unwrap();
4668
4669 // Move cursor to a location that can be renamed.
4670 let prepare_rename = editor_b.update(cx_b, |editor, cx| {
4671 editor.change_selections(None, cx, |s| s.select_ranges([7..7]));
4672 editor.rename(&Rename, cx).unwrap()
4673 });
4674
4675 fake_language_server
4676 .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
4677 assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
4678 assert_eq!(params.position, lsp::Position::new(0, 7));
4679 Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4680 lsp::Position::new(0, 6),
4681 lsp::Position::new(0, 9),
4682 ))))
4683 })
4684 .next()
4685 .await
4686 .unwrap();
4687 prepare_rename.await.unwrap();
4688 editor_b.update(cx_b, |editor, cx| {
4689 let rename = editor.pending_rename().unwrap();
4690 let buffer = editor.buffer().read(cx).snapshot(cx);
4691 assert_eq!(
4692 rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
4693 6..9
4694 );
4695 rename.editor.update(cx, |rename_editor, cx| {
4696 rename_editor.buffer().update(cx, |rename_buffer, cx| {
4697 rename_buffer.edit([(0..3, "THREE")], cx);
4698 });
4699 });
4700 });
4701
4702 let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
4703 Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
4704 });
4705 fake_language_server
4706 .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
4707 assert_eq!(
4708 params.text_document_position.text_document.uri.as_str(),
4709 "file:///dir/one.rs"
4710 );
4711 assert_eq!(
4712 params.text_document_position.position,
4713 lsp::Position::new(0, 6)
4714 );
4715 assert_eq!(params.new_name, "THREE");
4716 Ok(Some(lsp::WorkspaceEdit {
4717 changes: Some(
4718 [
4719 (
4720 lsp::Url::from_file_path("/dir/one.rs").unwrap(),
4721 vec![lsp::TextEdit::new(
4722 lsp::Range::new(
4723 lsp::Position::new(0, 6),
4724 lsp::Position::new(0, 9),
4725 ),
4726 "THREE".to_string(),
4727 )],
4728 ),
4729 (
4730 lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4731 vec![
4732 lsp::TextEdit::new(
4733 lsp::Range::new(
4734 lsp::Position::new(0, 24),
4735 lsp::Position::new(0, 27),
4736 ),
4737 "THREE".to_string(),
4738 ),
4739 lsp::TextEdit::new(
4740 lsp::Range::new(
4741 lsp::Position::new(0, 35),
4742 lsp::Position::new(0, 38),
4743 ),
4744 "THREE".to_string(),
4745 ),
4746 ],
4747 ),
4748 ]
4749 .into_iter()
4750 .collect(),
4751 ),
4752 ..Default::default()
4753 }))
4754 })
4755 .next()
4756 .await
4757 .unwrap();
4758 confirm_rename.await.unwrap();
4759
4760 let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4761 workspace
4762 .active_item(cx)
4763 .unwrap()
4764 .downcast::<Editor>()
4765 .unwrap()
4766 });
4767 rename_editor.update(cx_b, |editor, cx| {
4768 assert_eq!(
4769 editor.text(cx),
4770 "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4771 );
4772 editor.undo(&Undo, cx);
4773 assert_eq!(
4774 editor.text(cx),
4775 "const ONE: usize = 1;\nconst TWO: usize = one::ONE + one::ONE;"
4776 );
4777 editor.redo(&Redo, cx);
4778 assert_eq!(
4779 editor.text(cx),
4780 "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4781 );
4782 });
4783
4784 // Ensure temporary rename edits cannot be undone/redone.
4785 editor_b.update(cx_b, |editor, cx| {
4786 editor.undo(&Undo, cx);
4787 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4788 editor.undo(&Undo, cx);
4789 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4790 editor.redo(&Redo, cx);
4791 assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4792 })
4793 }
4794
4795 #[gpui::test(iterations = 10)]
4796 async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4797 cx_a.foreground().forbid_parking();
4798
4799 // Connect to a server as 2 clients.
4800 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4801 let client_a = server.create_client(cx_a, "user_a").await;
4802 let client_b = server.create_client(cx_b, "user_b").await;
4803
4804 // Create an org that includes these 2 users.
4805 let db = &server.app_state.db;
4806 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4807 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4808 .await
4809 .unwrap();
4810 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4811 .await
4812 .unwrap();
4813
4814 // Create a channel that includes all the users.
4815 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4816 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4817 .await
4818 .unwrap();
4819 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4820 .await
4821 .unwrap();
4822 db.create_channel_message(
4823 channel_id,
4824 client_b.current_user_id(&cx_b),
4825 "hello A, it's B.",
4826 OffsetDateTime::now_utc(),
4827 1,
4828 )
4829 .await
4830 .unwrap();
4831
4832 let channels_a = cx_a
4833 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4834 channels_a
4835 .condition(cx_a, |list, _| list.available_channels().is_some())
4836 .await;
4837 channels_a.read_with(cx_a, |list, _| {
4838 assert_eq!(
4839 list.available_channels().unwrap(),
4840 &[ChannelDetails {
4841 id: channel_id.to_proto(),
4842 name: "test-channel".to_string()
4843 }]
4844 )
4845 });
4846 let channel_a = channels_a.update(cx_a, |this, cx| {
4847 this.get_channel(channel_id.to_proto(), cx).unwrap()
4848 });
4849 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4850 channel_a
4851 .condition(&cx_a, |channel, _| {
4852 channel_messages(channel)
4853 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4854 })
4855 .await;
4856
4857 let channels_b = cx_b
4858 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4859 channels_b
4860 .condition(cx_b, |list, _| list.available_channels().is_some())
4861 .await;
4862 channels_b.read_with(cx_b, |list, _| {
4863 assert_eq!(
4864 list.available_channels().unwrap(),
4865 &[ChannelDetails {
4866 id: channel_id.to_proto(),
4867 name: "test-channel".to_string()
4868 }]
4869 )
4870 });
4871
4872 let channel_b = channels_b.update(cx_b, |this, cx| {
4873 this.get_channel(channel_id.to_proto(), cx).unwrap()
4874 });
4875 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4876 channel_b
4877 .condition(&cx_b, |channel, _| {
4878 channel_messages(channel)
4879 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4880 })
4881 .await;
4882
4883 channel_a
4884 .update(cx_a, |channel, cx| {
4885 channel
4886 .send_message("oh, hi B.".to_string(), cx)
4887 .unwrap()
4888 .detach();
4889 let task = channel.send_message("sup".to_string(), cx).unwrap();
4890 assert_eq!(
4891 channel_messages(channel),
4892 &[
4893 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4894 ("user_a".to_string(), "oh, hi B.".to_string(), true),
4895 ("user_a".to_string(), "sup".to_string(), true)
4896 ]
4897 );
4898 task
4899 })
4900 .await
4901 .unwrap();
4902
4903 channel_b
4904 .condition(&cx_b, |channel, _| {
4905 channel_messages(channel)
4906 == [
4907 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4908 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4909 ("user_a".to_string(), "sup".to_string(), false),
4910 ]
4911 })
4912 .await;
4913
4914 assert_eq!(
4915 server
4916 .state()
4917 .await
4918 .channel(channel_id)
4919 .unwrap()
4920 .connection_ids
4921 .len(),
4922 2
4923 );
4924 cx_b.update(|_| drop(channel_b));
4925 server
4926 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4927 .await;
4928
4929 cx_a.update(|_| drop(channel_a));
4930 server
4931 .condition(|state| state.channel(channel_id).is_none())
4932 .await;
4933 }
4934
4935 #[gpui::test(iterations = 10)]
4936 async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4937 cx_a.foreground().forbid_parking();
4938
4939 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4940 let client_a = server.create_client(cx_a, "user_a").await;
4941
4942 let db = &server.app_state.db;
4943 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4944 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4945 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4946 .await
4947 .unwrap();
4948 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4949 .await
4950 .unwrap();
4951
4952 let channels_a = cx_a
4953 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4954 channels_a
4955 .condition(cx_a, |list, _| list.available_channels().is_some())
4956 .await;
4957 let channel_a = channels_a.update(cx_a, |this, cx| {
4958 this.get_channel(channel_id.to_proto(), cx).unwrap()
4959 });
4960
4961 // Messages aren't allowed to be too long.
4962 channel_a
4963 .update(cx_a, |channel, cx| {
4964 let long_body = "this is long.\n".repeat(1024);
4965 channel.send_message(long_body, cx).unwrap()
4966 })
4967 .await
4968 .unwrap_err();
4969
4970 // Messages aren't allowed to be blank.
4971 channel_a.update(cx_a, |channel, cx| {
4972 channel.send_message(String::new(), cx).unwrap_err()
4973 });
4974
4975 // Leading and trailing whitespace are trimmed.
4976 channel_a
4977 .update(cx_a, |channel, cx| {
4978 channel
4979 .send_message("\n surrounded by whitespace \n".to_string(), cx)
4980 .unwrap()
4981 })
4982 .await
4983 .unwrap();
4984 assert_eq!(
4985 db.get_channel_messages(channel_id, 10, None)
4986 .await
4987 .unwrap()
4988 .iter()
4989 .map(|m| &m.body)
4990 .collect::<Vec<_>>(),
4991 &["surrounded by whitespace"]
4992 );
4993 }
4994
4995 #[gpui::test(iterations = 10)]
4996 async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4997 cx_a.foreground().forbid_parking();
4998
4999 // Connect to a server as 2 clients.
5000 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5001 let client_a = server.create_client(cx_a, "user_a").await;
5002 let client_b = server.create_client(cx_b, "user_b").await;
5003 let mut status_b = client_b.status();
5004
5005 // Create an org that includes these 2 users.
5006 let db = &server.app_state.db;
5007 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
5008 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
5009 .await
5010 .unwrap();
5011 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
5012 .await
5013 .unwrap();
5014
5015 // Create a channel that includes all the users.
5016 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
5017 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
5018 .await
5019 .unwrap();
5020 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
5021 .await
5022 .unwrap();
5023 db.create_channel_message(
5024 channel_id,
5025 client_b.current_user_id(&cx_b),
5026 "hello A, it's B.",
5027 OffsetDateTime::now_utc(),
5028 2,
5029 )
5030 .await
5031 .unwrap();
5032
5033 let channels_a = cx_a
5034 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
5035 channels_a
5036 .condition(cx_a, |list, _| list.available_channels().is_some())
5037 .await;
5038
5039 channels_a.read_with(cx_a, |list, _| {
5040 assert_eq!(
5041 list.available_channels().unwrap(),
5042 &[ChannelDetails {
5043 id: channel_id.to_proto(),
5044 name: "test-channel".to_string()
5045 }]
5046 )
5047 });
5048 let channel_a = channels_a.update(cx_a, |this, cx| {
5049 this.get_channel(channel_id.to_proto(), cx).unwrap()
5050 });
5051 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
5052 channel_a
5053 .condition(&cx_a, |channel, _| {
5054 channel_messages(channel)
5055 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
5056 })
5057 .await;
5058
5059 let channels_b = cx_b
5060 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
5061 channels_b
5062 .condition(cx_b, |list, _| list.available_channels().is_some())
5063 .await;
5064 channels_b.read_with(cx_b, |list, _| {
5065 assert_eq!(
5066 list.available_channels().unwrap(),
5067 &[ChannelDetails {
5068 id: channel_id.to_proto(),
5069 name: "test-channel".to_string()
5070 }]
5071 )
5072 });
5073
5074 let channel_b = channels_b.update(cx_b, |this, cx| {
5075 this.get_channel(channel_id.to_proto(), cx).unwrap()
5076 });
5077 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
5078 channel_b
5079 .condition(&cx_b, |channel, _| {
5080 channel_messages(channel)
5081 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
5082 })
5083 .await;
5084
5085 // Disconnect client B, ensuring we can still access its cached channel data.
5086 server.forbid_connections();
5087 server.disconnect_client(client_b.current_user_id(&cx_b));
5088 cx_b.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
5089 while !matches!(
5090 status_b.next().await,
5091 Some(client::Status::ReconnectionError { .. })
5092 ) {}
5093
5094 channels_b.read_with(cx_b, |channels, _| {
5095 assert_eq!(
5096 channels.available_channels().unwrap(),
5097 [ChannelDetails {
5098 id: channel_id.to_proto(),
5099 name: "test-channel".to_string()
5100 }]
5101 )
5102 });
5103 channel_b.read_with(cx_b, |channel, _| {
5104 assert_eq!(
5105 channel_messages(channel),
5106 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
5107 )
5108 });
5109
5110 // Send a message from client B while it is disconnected.
5111 channel_b
5112 .update(cx_b, |channel, cx| {
5113 let task = channel
5114 .send_message("can you see this?".to_string(), cx)
5115 .unwrap();
5116 assert_eq!(
5117 channel_messages(channel),
5118 &[
5119 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5120 ("user_b".to_string(), "can you see this?".to_string(), true)
5121 ]
5122 );
5123 task
5124 })
5125 .await
5126 .unwrap_err();
5127
5128 // Send a message from client A while B is disconnected.
5129 channel_a
5130 .update(cx_a, |channel, cx| {
5131 channel
5132 .send_message("oh, hi B.".to_string(), cx)
5133 .unwrap()
5134 .detach();
5135 let task = channel.send_message("sup".to_string(), cx).unwrap();
5136 assert_eq!(
5137 channel_messages(channel),
5138 &[
5139 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5140 ("user_a".to_string(), "oh, hi B.".to_string(), true),
5141 ("user_a".to_string(), "sup".to_string(), true)
5142 ]
5143 );
5144 task
5145 })
5146 .await
5147 .unwrap();
5148
5149 // Give client B a chance to reconnect.
5150 server.allow_connections();
5151 cx_b.foreground().advance_clock(Duration::from_secs(10));
5152
5153 // Verify that B sees the new messages upon reconnection, as well as the message client B
5154 // sent while offline.
5155 channel_b
5156 .condition(&cx_b, |channel, _| {
5157 channel_messages(channel)
5158 == [
5159 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5160 ("user_a".to_string(), "oh, hi B.".to_string(), false),
5161 ("user_a".to_string(), "sup".to_string(), false),
5162 ("user_b".to_string(), "can you see this?".to_string(), false),
5163 ]
5164 })
5165 .await;
5166
5167 // Ensure client A and B can communicate normally after reconnection.
5168 channel_a
5169 .update(cx_a, |channel, cx| {
5170 channel.send_message("you online?".to_string(), cx).unwrap()
5171 })
5172 .await
5173 .unwrap();
5174 channel_b
5175 .condition(&cx_b, |channel, _| {
5176 channel_messages(channel)
5177 == [
5178 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5179 ("user_a".to_string(), "oh, hi B.".to_string(), false),
5180 ("user_a".to_string(), "sup".to_string(), false),
5181 ("user_b".to_string(), "can you see this?".to_string(), false),
5182 ("user_a".to_string(), "you online?".to_string(), false),
5183 ]
5184 })
5185 .await;
5186
5187 channel_b
5188 .update(cx_b, |channel, cx| {
5189 channel.send_message("yep".to_string(), cx).unwrap()
5190 })
5191 .await
5192 .unwrap();
5193 channel_a
5194 .condition(&cx_a, |channel, _| {
5195 channel_messages(channel)
5196 == [
5197 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5198 ("user_a".to_string(), "oh, hi B.".to_string(), false),
5199 ("user_a".to_string(), "sup".to_string(), false),
5200 ("user_b".to_string(), "can you see this?".to_string(), false),
5201 ("user_a".to_string(), "you online?".to_string(), false),
5202 ("user_b".to_string(), "yep".to_string(), false),
5203 ]
5204 })
5205 .await;
5206 }
5207
5208 #[gpui::test(iterations = 10)]
5209 async fn test_contacts(
5210 deterministic: Arc<Deterministic>,
5211 cx_a: &mut TestAppContext,
5212 cx_b: &mut TestAppContext,
5213 cx_c: &mut TestAppContext,
5214 ) {
5215 cx_a.foreground().forbid_parking();
5216
5217 // Connect to a server as 3 clients.
5218 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5219 let mut client_a = server.create_client(cx_a, "user_a").await;
5220 let mut client_b = server.create_client(cx_b, "user_b").await;
5221 let client_c = server.create_client(cx_c, "user_c").await;
5222 server
5223 .make_contacts(vec![
5224 (&client_a, cx_a),
5225 (&client_b, cx_b),
5226 (&client_c, cx_c),
5227 ])
5228 .await;
5229
5230 deterministic.run_until_parked();
5231 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5232 client.user_store.read_with(*cx, |store, _| {
5233 assert_eq!(
5234 contacts(store),
5235 [
5236 ("user_a", true, vec![]),
5237 ("user_b", true, vec![]),
5238 ("user_c", true, vec![])
5239 ],
5240 "{} has the wrong contacts",
5241 client.username
5242 )
5243 });
5244 }
5245
5246 // Share a project as client A.
5247 let fs = FakeFs::new(cx_a.background());
5248 fs.create_dir(Path::new("/a")).await.unwrap();
5249 let (project_a, _) = client_a.build_local_project(fs, "/a", cx_a).await;
5250
5251 deterministic.run_until_parked();
5252 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5253 client.user_store.read_with(*cx, |store, _| {
5254 assert_eq!(
5255 contacts(store),
5256 [
5257 ("user_a", true, vec![("a", vec![])]),
5258 ("user_b", true, vec![]),
5259 ("user_c", true, vec![])
5260 ],
5261 "{} has the wrong contacts",
5262 client.username
5263 )
5264 });
5265 }
5266
5267 let _project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5268
5269 deterministic.run_until_parked();
5270 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5271 client.user_store.read_with(*cx, |store, _| {
5272 assert_eq!(
5273 contacts(store),
5274 [
5275 ("user_a", true, vec![("a", vec!["user_b"])]),
5276 ("user_b", true, vec![]),
5277 ("user_c", true, vec![])
5278 ],
5279 "{} has the wrong contacts",
5280 client.username
5281 )
5282 });
5283 }
5284
5285 // Add a local project as client B
5286 let fs = FakeFs::new(cx_b.background());
5287 fs.create_dir(Path::new("/b")).await.unwrap();
5288 let (_project_b, _) = client_b.build_local_project(fs, "/b", cx_a).await;
5289
5290 deterministic.run_until_parked();
5291 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5292 client.user_store.read_with(*cx, |store, _| {
5293 assert_eq!(
5294 contacts(store),
5295 [
5296 ("user_a", true, vec![("a", vec!["user_b"])]),
5297 ("user_b", true, vec![("b", vec![])]),
5298 ("user_c", true, vec![])
5299 ],
5300 "{} has the wrong contacts",
5301 client.username
5302 )
5303 });
5304 }
5305
5306 project_a
5307 .condition(&cx_a, |project, _| {
5308 project.collaborators().contains_key(&client_b.peer_id)
5309 })
5310 .await;
5311
5312 client_a.project.take();
5313 cx_a.update(move |_| drop(project_a));
5314 deterministic.run_until_parked();
5315 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5316 client.user_store.read_with(*cx, |store, _| {
5317 assert_eq!(
5318 contacts(store),
5319 [
5320 ("user_a", true, vec![]),
5321 ("user_b", true, vec![("b", vec![])]),
5322 ("user_c", true, vec![])
5323 ],
5324 "{} has the wrong contacts",
5325 client.username
5326 )
5327 });
5328 }
5329
5330 server.disconnect_client(client_c.current_user_id(cx_c));
5331 server.forbid_connections();
5332 deterministic.advance_clock(rpc::RECEIVE_TIMEOUT);
5333 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b)] {
5334 client.user_store.read_with(*cx, |store, _| {
5335 assert_eq!(
5336 contacts(store),
5337 [
5338 ("user_a", true, vec![]),
5339 ("user_b", true, vec![("b", vec![])]),
5340 ("user_c", false, vec![])
5341 ],
5342 "{} has the wrong contacts",
5343 client.username
5344 )
5345 });
5346 }
5347 client_c
5348 .user_store
5349 .read_with(cx_c, |store, _| assert_eq!(contacts(store), []));
5350
5351 server.allow_connections();
5352 client_c
5353 .authenticate_and_connect(false, &cx_c.to_async())
5354 .await
5355 .unwrap();
5356
5357 deterministic.run_until_parked();
5358 for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5359 client.user_store.read_with(*cx, |store, _| {
5360 assert_eq!(
5361 contacts(store),
5362 [
5363 ("user_a", true, vec![]),
5364 ("user_b", true, vec![("b", vec![])]),
5365 ("user_c", true, vec![])
5366 ],
5367 "{} has the wrong contacts",
5368 client.username
5369 )
5370 });
5371 }
5372
5373 fn contacts(user_store: &UserStore) -> Vec<(&str, bool, Vec<(&str, Vec<&str>)>)> {
5374 user_store
5375 .contacts()
5376 .iter()
5377 .map(|contact| {
5378 let projects = contact
5379 .projects
5380 .iter()
5381 .map(|p| {
5382 (
5383 p.worktree_root_names[0].as_str(),
5384 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
5385 )
5386 })
5387 .collect();
5388 (contact.user.github_login.as_str(), contact.online, projects)
5389 })
5390 .collect()
5391 }
5392 }
5393
5394 #[gpui::test(iterations = 10)]
5395 async fn test_contact_requests(
5396 executor: Arc<Deterministic>,
5397 cx_a: &mut TestAppContext,
5398 cx_a2: &mut TestAppContext,
5399 cx_b: &mut TestAppContext,
5400 cx_b2: &mut TestAppContext,
5401 cx_c: &mut TestAppContext,
5402 cx_c2: &mut TestAppContext,
5403 ) {
5404 cx_a.foreground().forbid_parking();
5405
5406 // Connect to a server as 3 clients.
5407 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5408 let client_a = server.create_client(cx_a, "user_a").await;
5409 let client_a2 = server.create_client(cx_a2, "user_a").await;
5410 let client_b = server.create_client(cx_b, "user_b").await;
5411 let client_b2 = server.create_client(cx_b2, "user_b").await;
5412 let client_c = server.create_client(cx_c, "user_c").await;
5413 let client_c2 = server.create_client(cx_c2, "user_c").await;
5414
5415 assert_eq!(client_a.user_id().unwrap(), client_a2.user_id().unwrap());
5416 assert_eq!(client_b.user_id().unwrap(), client_b2.user_id().unwrap());
5417 assert_eq!(client_c.user_id().unwrap(), client_c2.user_id().unwrap());
5418
5419 // User A and User C request that user B become their contact.
5420 client_a
5421 .user_store
5422 .update(cx_a, |store, cx| {
5423 store.request_contact(client_b.user_id().unwrap(), cx)
5424 })
5425 .await
5426 .unwrap();
5427 client_c
5428 .user_store
5429 .update(cx_c, |store, cx| {
5430 store.request_contact(client_b.user_id().unwrap(), cx)
5431 })
5432 .await
5433 .unwrap();
5434 executor.run_until_parked();
5435
5436 // All users see the pending request appear in all their clients.
5437 assert_eq!(
5438 client_a.summarize_contacts(&cx_a).outgoing_requests,
5439 &["user_b"]
5440 );
5441 assert_eq!(
5442 client_a2.summarize_contacts(&cx_a2).outgoing_requests,
5443 &["user_b"]
5444 );
5445 assert_eq!(
5446 client_b.summarize_contacts(&cx_b).incoming_requests,
5447 &["user_a", "user_c"]
5448 );
5449 assert_eq!(
5450 client_b2.summarize_contacts(&cx_b2).incoming_requests,
5451 &["user_a", "user_c"]
5452 );
5453 assert_eq!(
5454 client_c.summarize_contacts(&cx_c).outgoing_requests,
5455 &["user_b"]
5456 );
5457 assert_eq!(
5458 client_c2.summarize_contacts(&cx_c2).outgoing_requests,
5459 &["user_b"]
5460 );
5461
5462 // Contact requests are present upon connecting (tested here via disconnect/reconnect)
5463 disconnect_and_reconnect(&client_a, cx_a).await;
5464 disconnect_and_reconnect(&client_b, cx_b).await;
5465 disconnect_and_reconnect(&client_c, cx_c).await;
5466 executor.run_until_parked();
5467 assert_eq!(
5468 client_a.summarize_contacts(&cx_a).outgoing_requests,
5469 &["user_b"]
5470 );
5471 assert_eq!(
5472 client_b.summarize_contacts(&cx_b).incoming_requests,
5473 &["user_a", "user_c"]
5474 );
5475 assert_eq!(
5476 client_c.summarize_contacts(&cx_c).outgoing_requests,
5477 &["user_b"]
5478 );
5479
5480 // User B accepts the request from user A.
5481 client_b
5482 .user_store
5483 .update(cx_b, |store, cx| {
5484 store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
5485 })
5486 .await
5487 .unwrap();
5488
5489 executor.run_until_parked();
5490
5491 // User B sees user A as their contact now in all client, and the incoming request from them is removed.
5492 let contacts_b = client_b.summarize_contacts(&cx_b);
5493 assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5494 assert_eq!(contacts_b.incoming_requests, &["user_c"]);
5495 let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5496 assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5497 assert_eq!(contacts_b2.incoming_requests, &["user_c"]);
5498
5499 // User A sees user B as their contact now in all clients, and the outgoing request to them is removed.
5500 let contacts_a = client_a.summarize_contacts(&cx_a);
5501 assert_eq!(contacts_a.current, &["user_a", "user_b"]);
5502 assert!(contacts_a.outgoing_requests.is_empty());
5503 let contacts_a2 = client_a2.summarize_contacts(&cx_a2);
5504 assert_eq!(contacts_a2.current, &["user_a", "user_b"]);
5505 assert!(contacts_a2.outgoing_requests.is_empty());
5506
5507 // Contacts are present upon connecting (tested here via disconnect/reconnect)
5508 disconnect_and_reconnect(&client_a, cx_a).await;
5509 disconnect_and_reconnect(&client_b, cx_b).await;
5510 disconnect_and_reconnect(&client_c, cx_c).await;
5511 executor.run_until_parked();
5512 assert_eq!(
5513 client_a.summarize_contacts(&cx_a).current,
5514 &["user_a", "user_b"]
5515 );
5516 assert_eq!(
5517 client_b.summarize_contacts(&cx_b).current,
5518 &["user_a", "user_b"]
5519 );
5520 assert_eq!(
5521 client_b.summarize_contacts(&cx_b).incoming_requests,
5522 &["user_c"]
5523 );
5524 assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5525 assert_eq!(
5526 client_c.summarize_contacts(&cx_c).outgoing_requests,
5527 &["user_b"]
5528 );
5529
5530 // User B rejects the request from user C.
5531 client_b
5532 .user_store
5533 .update(cx_b, |store, cx| {
5534 store.respond_to_contact_request(client_c.user_id().unwrap(), false, cx)
5535 })
5536 .await
5537 .unwrap();
5538
5539 executor.run_until_parked();
5540
5541 // User B doesn't see user C as their contact, and the incoming request from them is removed.
5542 let contacts_b = client_b.summarize_contacts(&cx_b);
5543 assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5544 assert!(contacts_b.incoming_requests.is_empty());
5545 let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5546 assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5547 assert!(contacts_b2.incoming_requests.is_empty());
5548
5549 // User C doesn't see user B as their contact, and the outgoing request to them is removed.
5550 let contacts_c = client_c.summarize_contacts(&cx_c);
5551 assert_eq!(contacts_c.current, &["user_c"]);
5552 assert!(contacts_c.outgoing_requests.is_empty());
5553 let contacts_c2 = client_c2.summarize_contacts(&cx_c2);
5554 assert_eq!(contacts_c2.current, &["user_c"]);
5555 assert!(contacts_c2.outgoing_requests.is_empty());
5556
5557 // Incoming/outgoing requests are not present upon connecting (tested here via disconnect/reconnect)
5558 disconnect_and_reconnect(&client_a, cx_a).await;
5559 disconnect_and_reconnect(&client_b, cx_b).await;
5560 disconnect_and_reconnect(&client_c, cx_c).await;
5561 executor.run_until_parked();
5562 assert_eq!(
5563 client_a.summarize_contacts(&cx_a).current,
5564 &["user_a", "user_b"]
5565 );
5566 assert_eq!(
5567 client_b.summarize_contacts(&cx_b).current,
5568 &["user_a", "user_b"]
5569 );
5570 assert!(client_b
5571 .summarize_contacts(&cx_b)
5572 .incoming_requests
5573 .is_empty());
5574 assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5575 assert!(client_c
5576 .summarize_contacts(&cx_c)
5577 .outgoing_requests
5578 .is_empty());
5579
5580 async fn disconnect_and_reconnect(client: &TestClient, cx: &mut TestAppContext) {
5581 client.disconnect(&cx.to_async()).unwrap();
5582 client.clear_contacts(cx).await;
5583 client
5584 .authenticate_and_connect(false, &cx.to_async())
5585 .await
5586 .unwrap();
5587 }
5588 }
5589
5590 #[gpui::test(iterations = 10)]
5591 async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5592 cx_a.foreground().forbid_parking();
5593 let fs = FakeFs::new(cx_a.background());
5594
5595 // 2 clients connect to a server.
5596 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5597 let mut client_a = server.create_client(cx_a, "user_a").await;
5598 let mut client_b = server.create_client(cx_b, "user_b").await;
5599 server
5600 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5601 .await;
5602 cx_a.update(editor::init);
5603 cx_b.update(editor::init);
5604
5605 // Client A shares a project.
5606 fs.insert_tree(
5607 "/a",
5608 json!({
5609 "1.txt": "one",
5610 "2.txt": "two",
5611 "3.txt": "three",
5612 }),
5613 )
5614 .await;
5615 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5616
5617 // Client B joins the project.
5618 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5619
5620 // Client A opens some editors.
5621 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5622 let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5623 let editor_a1 = workspace_a
5624 .update(cx_a, |workspace, cx| {
5625 workspace.open_path((worktree_id, "1.txt"), true, cx)
5626 })
5627 .await
5628 .unwrap()
5629 .downcast::<Editor>()
5630 .unwrap();
5631 let editor_a2 = workspace_a
5632 .update(cx_a, |workspace, cx| {
5633 workspace.open_path((worktree_id, "2.txt"), true, cx)
5634 })
5635 .await
5636 .unwrap()
5637 .downcast::<Editor>()
5638 .unwrap();
5639
5640 // Client B opens an editor.
5641 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5642 let editor_b1 = workspace_b
5643 .update(cx_b, |workspace, cx| {
5644 workspace.open_path((worktree_id, "1.txt"), true, cx)
5645 })
5646 .await
5647 .unwrap()
5648 .downcast::<Editor>()
5649 .unwrap();
5650
5651 let client_a_id = project_b.read_with(cx_b, |project, _| {
5652 project.collaborators().values().next().unwrap().peer_id
5653 });
5654 let client_b_id = project_a.read_with(cx_a, |project, _| {
5655 project.collaborators().values().next().unwrap().peer_id
5656 });
5657
5658 // When client B starts following client A, all visible view states are replicated to client B.
5659 editor_a1.update(cx_a, |editor, cx| {
5660 editor.change_selections(None, cx, |s| s.select_ranges([0..1]))
5661 });
5662 editor_a2.update(cx_a, |editor, cx| {
5663 editor.change_selections(None, cx, |s| s.select_ranges([2..3]))
5664 });
5665 workspace_b
5666 .update(cx_b, |workspace, cx| {
5667 workspace
5668 .toggle_follow(&ToggleFollow(client_a_id), cx)
5669 .unwrap()
5670 })
5671 .await
5672 .unwrap();
5673
5674 let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5675 workspace
5676 .active_item(cx)
5677 .unwrap()
5678 .downcast::<Editor>()
5679 .unwrap()
5680 });
5681 assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
5682 assert_eq!(
5683 editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
5684 Some((worktree_id, "2.txt").into())
5685 );
5686 assert_eq!(
5687 editor_b2.read_with(cx_b, |editor, cx| editor.selections.ranges(cx)),
5688 vec![2..3]
5689 );
5690 assert_eq!(
5691 editor_b1.read_with(cx_b, |editor, cx| editor.selections.ranges(cx)),
5692 vec![0..1]
5693 );
5694
5695 // When client A activates a different editor, client B does so as well.
5696 workspace_a.update(cx_a, |workspace, cx| {
5697 workspace.activate_item(&editor_a1, cx)
5698 });
5699 workspace_b
5700 .condition(cx_b, |workspace, cx| {
5701 workspace.active_item(cx).unwrap().id() == editor_b1.id()
5702 })
5703 .await;
5704
5705 // When client A navigates back and forth, client B does so as well.
5706 workspace_a
5707 .update(cx_a, |workspace, cx| {
5708 workspace::Pane::go_back(workspace, None, cx)
5709 })
5710 .await;
5711 workspace_b
5712 .condition(cx_b, |workspace, cx| {
5713 workspace.active_item(cx).unwrap().id() == editor_b2.id()
5714 })
5715 .await;
5716
5717 workspace_a
5718 .update(cx_a, |workspace, cx| {
5719 workspace::Pane::go_forward(workspace, None, cx)
5720 })
5721 .await;
5722 workspace_b
5723 .condition(cx_b, |workspace, cx| {
5724 workspace.active_item(cx).unwrap().id() == editor_b1.id()
5725 })
5726 .await;
5727
5728 // Changes to client A's editor are reflected on client B.
5729 editor_a1.update(cx_a, |editor, cx| {
5730 editor.change_selections(None, cx, |s| s.select_ranges([1..1, 2..2]));
5731 });
5732 editor_b1
5733 .condition(cx_b, |editor, cx| {
5734 editor.selections.ranges(cx) == vec![1..1, 2..2]
5735 })
5736 .await;
5737
5738 editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
5739 editor_b1
5740 .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
5741 .await;
5742
5743 editor_a1.update(cx_a, |editor, cx| {
5744 editor.change_selections(None, cx, |s| s.select_ranges([3..3]));
5745 editor.set_scroll_position(vec2f(0., 100.), cx);
5746 });
5747 editor_b1
5748 .condition(cx_b, |editor, cx| {
5749 editor.selections.ranges(cx) == vec![3..3]
5750 })
5751 .await;
5752
5753 // After unfollowing, client B stops receiving updates from client A.
5754 workspace_b.update(cx_b, |workspace, cx| {
5755 workspace.unfollow(&workspace.active_pane().clone(), cx)
5756 });
5757 workspace_a.update(cx_a, |workspace, cx| {
5758 workspace.activate_item(&editor_a2, cx)
5759 });
5760 cx_a.foreground().run_until_parked();
5761 assert_eq!(
5762 workspace_b.read_with(cx_b, |workspace, cx| workspace
5763 .active_item(cx)
5764 .unwrap()
5765 .id()),
5766 editor_b1.id()
5767 );
5768
5769 // Client A starts following client B.
5770 workspace_a
5771 .update(cx_a, |workspace, cx| {
5772 workspace
5773 .toggle_follow(&ToggleFollow(client_b_id), cx)
5774 .unwrap()
5775 })
5776 .await
5777 .unwrap();
5778 assert_eq!(
5779 workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5780 Some(client_b_id)
5781 );
5782 assert_eq!(
5783 workspace_a.read_with(cx_a, |workspace, cx| workspace
5784 .active_item(cx)
5785 .unwrap()
5786 .id()),
5787 editor_a1.id()
5788 );
5789
5790 // Following interrupts when client B disconnects.
5791 client_b.disconnect(&cx_b.to_async()).unwrap();
5792 cx_a.foreground().run_until_parked();
5793 assert_eq!(
5794 workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5795 None
5796 );
5797 }
5798
5799 #[gpui::test(iterations = 10)]
5800 async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5801 cx_a.foreground().forbid_parking();
5802 let fs = FakeFs::new(cx_a.background());
5803
5804 // 2 clients connect to a server.
5805 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5806 let mut client_a = server.create_client(cx_a, "user_a").await;
5807 let mut client_b = server.create_client(cx_b, "user_b").await;
5808 server
5809 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5810 .await;
5811 cx_a.update(editor::init);
5812 cx_b.update(editor::init);
5813
5814 // Client A shares a project.
5815 fs.insert_tree(
5816 "/a",
5817 json!({
5818 "1.txt": "one",
5819 "2.txt": "two",
5820 "3.txt": "three",
5821 "4.txt": "four",
5822 }),
5823 )
5824 .await;
5825 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5826
5827 // Client B joins the project.
5828 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5829
5830 // Client A opens some editors.
5831 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5832 let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5833 let _editor_a1 = workspace_a
5834 .update(cx_a, |workspace, cx| {
5835 workspace.open_path((worktree_id, "1.txt"), true, cx)
5836 })
5837 .await
5838 .unwrap()
5839 .downcast::<Editor>()
5840 .unwrap();
5841
5842 // Client B opens an editor.
5843 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5844 let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5845 let _editor_b1 = workspace_b
5846 .update(cx_b, |workspace, cx| {
5847 workspace.open_path((worktree_id, "2.txt"), true, cx)
5848 })
5849 .await
5850 .unwrap()
5851 .downcast::<Editor>()
5852 .unwrap();
5853
5854 // Clients A and B follow each other in split panes
5855 workspace_a
5856 .update(cx_a, |workspace, cx| {
5857 workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5858 assert_ne!(*workspace.active_pane(), pane_a1);
5859 let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
5860 workspace
5861 .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5862 .unwrap()
5863 })
5864 .await
5865 .unwrap();
5866 workspace_b
5867 .update(cx_b, |workspace, cx| {
5868 workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5869 assert_ne!(*workspace.active_pane(), pane_b1);
5870 let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
5871 workspace
5872 .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5873 .unwrap()
5874 })
5875 .await
5876 .unwrap();
5877
5878 workspace_a
5879 .update(cx_a, |workspace, cx| {
5880 workspace.activate_next_pane(cx);
5881 assert_eq!(*workspace.active_pane(), pane_a1);
5882 workspace.open_path((worktree_id, "3.txt"), true, cx)
5883 })
5884 .await
5885 .unwrap();
5886 workspace_b
5887 .update(cx_b, |workspace, cx| {
5888 workspace.activate_next_pane(cx);
5889 assert_eq!(*workspace.active_pane(), pane_b1);
5890 workspace.open_path((worktree_id, "4.txt"), true, cx)
5891 })
5892 .await
5893 .unwrap();
5894 cx_a.foreground().run_until_parked();
5895
5896 // Ensure leader updates don't change the active pane of followers
5897 workspace_a.read_with(cx_a, |workspace, _| {
5898 assert_eq!(*workspace.active_pane(), pane_a1);
5899 });
5900 workspace_b.read_with(cx_b, |workspace, _| {
5901 assert_eq!(*workspace.active_pane(), pane_b1);
5902 });
5903
5904 // Ensure peers following each other doesn't cause an infinite loop.
5905 assert_eq!(
5906 workspace_a.read_with(cx_a, |workspace, cx| workspace
5907 .active_item(cx)
5908 .unwrap()
5909 .project_path(cx)),
5910 Some((worktree_id, "3.txt").into())
5911 );
5912 workspace_a.update(cx_a, |workspace, cx| {
5913 assert_eq!(
5914 workspace.active_item(cx).unwrap().project_path(cx),
5915 Some((worktree_id, "3.txt").into())
5916 );
5917 workspace.activate_next_pane(cx);
5918 assert_eq!(
5919 workspace.active_item(cx).unwrap().project_path(cx),
5920 Some((worktree_id, "4.txt").into())
5921 );
5922 });
5923 workspace_b.update(cx_b, |workspace, cx| {
5924 assert_eq!(
5925 workspace.active_item(cx).unwrap().project_path(cx),
5926 Some((worktree_id, "4.txt").into())
5927 );
5928 workspace.activate_next_pane(cx);
5929 assert_eq!(
5930 workspace.active_item(cx).unwrap().project_path(cx),
5931 Some((worktree_id, "3.txt").into())
5932 );
5933 });
5934 }
5935
5936 #[gpui::test(iterations = 10)]
5937 async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5938 cx_a.foreground().forbid_parking();
5939 let fs = FakeFs::new(cx_a.background());
5940
5941 // 2 clients connect to a server.
5942 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5943 let mut client_a = server.create_client(cx_a, "user_a").await;
5944 let mut client_b = server.create_client(cx_b, "user_b").await;
5945 server
5946 .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5947 .await;
5948 cx_a.update(editor::init);
5949 cx_b.update(editor::init);
5950
5951 // Client A shares a project.
5952 fs.insert_tree(
5953 "/a",
5954 json!({
5955 "1.txt": "one",
5956 "2.txt": "two",
5957 "3.txt": "three",
5958 }),
5959 )
5960 .await;
5961 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5962
5963 // Client B joins the project.
5964 let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5965
5966 // Client A opens some editors.
5967 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5968 let _editor_a1 = workspace_a
5969 .update(cx_a, |workspace, cx| {
5970 workspace.open_path((worktree_id, "1.txt"), true, cx)
5971 })
5972 .await
5973 .unwrap()
5974 .downcast::<Editor>()
5975 .unwrap();
5976
5977 // Client B starts following client A.
5978 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5979 let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5980 let leader_id = project_b.read_with(cx_b, |project, _| {
5981 project.collaborators().values().next().unwrap().peer_id
5982 });
5983 workspace_b
5984 .update(cx_b, |workspace, cx| {
5985 workspace
5986 .toggle_follow(&ToggleFollow(leader_id), cx)
5987 .unwrap()
5988 })
5989 .await
5990 .unwrap();
5991 assert_eq!(
5992 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5993 Some(leader_id)
5994 );
5995 let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5996 workspace
5997 .active_item(cx)
5998 .unwrap()
5999 .downcast::<Editor>()
6000 .unwrap()
6001 });
6002
6003 // When client B moves, it automatically stops following client A.
6004 editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
6005 assert_eq!(
6006 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6007 None
6008 );
6009
6010 workspace_b
6011 .update(cx_b, |workspace, cx| {
6012 workspace
6013 .toggle_follow(&ToggleFollow(leader_id), cx)
6014 .unwrap()
6015 })
6016 .await
6017 .unwrap();
6018 assert_eq!(
6019 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6020 Some(leader_id)
6021 );
6022
6023 // When client B edits, it automatically stops following client A.
6024 editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
6025 assert_eq!(
6026 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6027 None
6028 );
6029
6030 workspace_b
6031 .update(cx_b, |workspace, cx| {
6032 workspace
6033 .toggle_follow(&ToggleFollow(leader_id), cx)
6034 .unwrap()
6035 })
6036 .await
6037 .unwrap();
6038 assert_eq!(
6039 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6040 Some(leader_id)
6041 );
6042
6043 // When client B scrolls, it automatically stops following client A.
6044 editor_b2.update(cx_b, |editor, cx| {
6045 editor.set_scroll_position(vec2f(0., 3.), cx)
6046 });
6047 assert_eq!(
6048 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6049 None
6050 );
6051
6052 workspace_b
6053 .update(cx_b, |workspace, cx| {
6054 workspace
6055 .toggle_follow(&ToggleFollow(leader_id), cx)
6056 .unwrap()
6057 })
6058 .await
6059 .unwrap();
6060 assert_eq!(
6061 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6062 Some(leader_id)
6063 );
6064
6065 // When client B activates a different pane, it continues following client A in the original pane.
6066 workspace_b.update(cx_b, |workspace, cx| {
6067 workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
6068 });
6069 assert_eq!(
6070 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6071 Some(leader_id)
6072 );
6073
6074 workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
6075 assert_eq!(
6076 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6077 Some(leader_id)
6078 );
6079
6080 // When client B activates a different item in the original pane, it automatically stops following client A.
6081 workspace_b
6082 .update(cx_b, |workspace, cx| {
6083 workspace.open_path((worktree_id, "2.txt"), true, cx)
6084 })
6085 .await
6086 .unwrap();
6087 assert_eq!(
6088 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6089 None
6090 );
6091 }
6092
6093 #[gpui::test(iterations = 100)]
6094 async fn test_random_collaboration(
6095 cx: &mut TestAppContext,
6096 deterministic: Arc<Deterministic>,
6097 rng: StdRng,
6098 ) {
6099 cx.foreground().forbid_parking();
6100 let max_peers = env::var("MAX_PEERS")
6101 .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
6102 .unwrap_or(5);
6103 assert!(max_peers <= 5);
6104
6105 let max_operations = env::var("OPERATIONS")
6106 .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
6107 .unwrap_or(10);
6108
6109 let rng = Arc::new(Mutex::new(rng));
6110
6111 let guest_lang_registry = Arc::new(LanguageRegistry::test());
6112 let host_language_registry = Arc::new(LanguageRegistry::test());
6113
6114 let fs = FakeFs::new(cx.background());
6115 fs.insert_tree("/_collab", json!({"init": ""})).await;
6116
6117 let mut server = TestServer::start(cx.foreground(), cx.background()).await;
6118 let db = server.app_state.db.clone();
6119 let host_user_id = db.create_user("host", None, false).await.unwrap();
6120 for username in ["guest-1", "guest-2", "guest-3", "guest-4"] {
6121 let guest_user_id = db.create_user(username, None, false).await.unwrap();
6122 server
6123 .app_state
6124 .db
6125 .send_contact_request(guest_user_id, host_user_id)
6126 .await
6127 .unwrap();
6128 server
6129 .app_state
6130 .db
6131 .respond_to_contact_request(host_user_id, guest_user_id, true)
6132 .await
6133 .unwrap();
6134 }
6135
6136 let mut clients = Vec::new();
6137 let mut user_ids = Vec::new();
6138 let mut op_start_signals = Vec::new();
6139
6140 let mut next_entity_id = 100000;
6141 let mut host_cx = TestAppContext::new(
6142 cx.foreground_platform(),
6143 cx.platform(),
6144 deterministic.build_foreground(next_entity_id),
6145 deterministic.build_background(),
6146 cx.font_cache(),
6147 cx.leak_detector(),
6148 next_entity_id,
6149 );
6150 let host = server.create_client(&mut host_cx, "host").await;
6151 let host_project = host_cx.update(|cx| {
6152 Project::local(
6153 host.client.clone(),
6154 host.user_store.clone(),
6155 host_language_registry.clone(),
6156 fs.clone(),
6157 cx,
6158 )
6159 });
6160 let host_project_id = host_project
6161 .update(&mut host_cx, |p, _| p.next_remote_id())
6162 .await;
6163
6164 let (collab_worktree, _) = host_project
6165 .update(&mut host_cx, |project, cx| {
6166 project.find_or_create_local_worktree("/_collab", true, cx)
6167 })
6168 .await
6169 .unwrap();
6170 collab_worktree
6171 .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
6172 .await;
6173
6174 // Set up fake language servers.
6175 let mut language = Language::new(
6176 LanguageConfig {
6177 name: "Rust".into(),
6178 path_suffixes: vec!["rs".to_string()],
6179 ..Default::default()
6180 },
6181 None,
6182 );
6183 let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
6184 name: "the-fake-language-server",
6185 capabilities: lsp::LanguageServer::full_capabilities(),
6186 initializer: Some(Box::new({
6187 let rng = rng.clone();
6188 let fs = fs.clone();
6189 let project = host_project.downgrade();
6190 move |fake_server: &mut FakeLanguageServer| {
6191 fake_server.handle_request::<lsp::request::Completion, _, _>(
6192 |_, _| async move {
6193 Ok(Some(lsp::CompletionResponse::Array(vec![
6194 lsp::CompletionItem {
6195 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
6196 range: lsp::Range::new(
6197 lsp::Position::new(0, 0),
6198 lsp::Position::new(0, 0),
6199 ),
6200 new_text: "the-new-text".to_string(),
6201 })),
6202 ..Default::default()
6203 },
6204 ])))
6205 },
6206 );
6207
6208 fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
6209 |_, _| async move {
6210 Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
6211 lsp::CodeAction {
6212 title: "the-code-action".to_string(),
6213 ..Default::default()
6214 },
6215 )]))
6216 },
6217 );
6218
6219 fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
6220 |params, _| async move {
6221 Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
6222 params.position,
6223 params.position,
6224 ))))
6225 },
6226 );
6227
6228 fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
6229 let fs = fs.clone();
6230 let rng = rng.clone();
6231 move |_, _| {
6232 let fs = fs.clone();
6233 let rng = rng.clone();
6234 async move {
6235 let files = fs.files().await;
6236 let mut rng = rng.lock();
6237 let count = rng.gen_range::<usize, _>(1..3);
6238 let files = (0..count)
6239 .map(|_| files.choose(&mut *rng).unwrap())
6240 .collect::<Vec<_>>();
6241 log::info!("LSP: Returning definitions in files {:?}", &files);
6242 Ok(Some(lsp::GotoDefinitionResponse::Array(
6243 files
6244 .into_iter()
6245 .map(|file| lsp::Location {
6246 uri: lsp::Url::from_file_path(file).unwrap(),
6247 range: Default::default(),
6248 })
6249 .collect(),
6250 )))
6251 }
6252 }
6253 });
6254
6255 fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
6256 let rng = rng.clone();
6257 let project = project.clone();
6258 move |params, mut cx| {
6259 let highlights = if let Some(project) = project.upgrade(&cx) {
6260 project.update(&mut cx, |project, cx| {
6261 let path = params
6262 .text_document_position_params
6263 .text_document
6264 .uri
6265 .to_file_path()
6266 .unwrap();
6267 let (worktree, relative_path) =
6268 project.find_local_worktree(&path, cx)?;
6269 let project_path =
6270 ProjectPath::from((worktree.read(cx).id(), relative_path));
6271 let buffer =
6272 project.get_open_buffer(&project_path, cx)?.read(cx);
6273
6274 let mut highlights = Vec::new();
6275 let highlight_count = rng.lock().gen_range(1..=5);
6276 let mut prev_end = 0;
6277 for _ in 0..highlight_count {
6278 let range =
6279 buffer.random_byte_range(prev_end, &mut *rng.lock());
6280
6281 highlights.push(lsp::DocumentHighlight {
6282 range: range_to_lsp(range.to_point_utf16(buffer)),
6283 kind: Some(lsp::DocumentHighlightKind::READ),
6284 });
6285 prev_end = range.end;
6286 }
6287 Some(highlights)
6288 })
6289 } else {
6290 None
6291 };
6292 async move { Ok(highlights) }
6293 }
6294 });
6295 }
6296 })),
6297 ..Default::default()
6298 });
6299 host_language_registry.add(Arc::new(language));
6300
6301 let op_start_signal = futures::channel::mpsc::unbounded();
6302 user_ids.push(host.current_user_id(&host_cx));
6303 op_start_signals.push(op_start_signal.0);
6304 clients.push(host_cx.foreground().spawn(host.simulate_host(
6305 host_project,
6306 op_start_signal.1,
6307 rng.clone(),
6308 host_cx,
6309 )));
6310
6311 let disconnect_host_at = if rng.lock().gen_bool(0.2) {
6312 rng.lock().gen_range(0..max_operations)
6313 } else {
6314 max_operations
6315 };
6316 let mut available_guests = vec![
6317 "guest-1".to_string(),
6318 "guest-2".to_string(),
6319 "guest-3".to_string(),
6320 "guest-4".to_string(),
6321 ];
6322 let mut operations = 0;
6323 while operations < max_operations {
6324 if operations == disconnect_host_at {
6325 server.disconnect_client(user_ids[0]);
6326 cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6327 drop(op_start_signals);
6328 let mut clients = futures::future::join_all(clients).await;
6329 cx.foreground().run_until_parked();
6330
6331 let (host, mut host_cx, host_err) = clients.remove(0);
6332 if let Some(host_err) = host_err {
6333 log::error!("host error - {:?}", host_err);
6334 }
6335 host.project
6336 .as_ref()
6337 .unwrap()
6338 .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
6339 for (guest, mut guest_cx, guest_err) in clients {
6340 if let Some(guest_err) = guest_err {
6341 log::error!("{} error - {:?}", guest.username, guest_err);
6342 }
6343
6344 let contacts = server
6345 .app_state
6346 .db
6347 .get_contacts(guest.current_user_id(&guest_cx))
6348 .await
6349 .unwrap();
6350 let contacts = server
6351 .store
6352 .read()
6353 .await
6354 .build_initial_contacts_update(contacts)
6355 .contacts;
6356 assert!(!contacts
6357 .iter()
6358 .flat_map(|contact| &contact.projects)
6359 .any(|project| project.id == host_project_id));
6360 guest
6361 .project
6362 .as_ref()
6363 .unwrap()
6364 .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6365 guest_cx.update(|_| drop(guest));
6366 }
6367 host_cx.update(|_| drop(host));
6368
6369 return;
6370 }
6371
6372 let distribution = rng.lock().gen_range(0..100);
6373 match distribution {
6374 0..=19 if !available_guests.is_empty() => {
6375 let guest_ix = rng.lock().gen_range(0..available_guests.len());
6376 let guest_username = available_guests.remove(guest_ix);
6377 log::info!("Adding new connection for {}", guest_username);
6378 next_entity_id += 100000;
6379 let mut guest_cx = TestAppContext::new(
6380 cx.foreground_platform(),
6381 cx.platform(),
6382 deterministic.build_foreground(next_entity_id),
6383 deterministic.build_background(),
6384 cx.font_cache(),
6385 cx.leak_detector(),
6386 next_entity_id,
6387 );
6388 let guest = server.create_client(&mut guest_cx, &guest_username).await;
6389 let guest_project = Project::remote(
6390 host_project_id,
6391 guest.client.clone(),
6392 guest.user_store.clone(),
6393 guest_lang_registry.clone(),
6394 FakeFs::new(cx.background()),
6395 &mut guest_cx.to_async(),
6396 )
6397 .await
6398 .unwrap();
6399 let op_start_signal = futures::channel::mpsc::unbounded();
6400 user_ids.push(guest.current_user_id(&guest_cx));
6401 op_start_signals.push(op_start_signal.0);
6402 clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
6403 guest_username.clone(),
6404 guest_project,
6405 op_start_signal.1,
6406 rng.clone(),
6407 guest_cx,
6408 )));
6409
6410 log::info!("Added connection for {}", guest_username);
6411 operations += 1;
6412 }
6413 20..=29 if clients.len() > 1 => {
6414 let guest_ix = rng.lock().gen_range(1..clients.len());
6415 log::info!("Removing guest {}", user_ids[guest_ix]);
6416 let removed_guest_id = user_ids.remove(guest_ix);
6417 let guest = clients.remove(guest_ix);
6418 op_start_signals.remove(guest_ix);
6419 server.forbid_connections();
6420 server.disconnect_client(removed_guest_id);
6421 cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6422 let (guest, mut guest_cx, guest_err) = guest.await;
6423 server.allow_connections();
6424
6425 if let Some(guest_err) = guest_err {
6426 log::error!("{} error - {:?}", guest.username, guest_err);
6427 }
6428 guest
6429 .project
6430 .as_ref()
6431 .unwrap()
6432 .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6433 for user_id in &user_ids {
6434 let contacts = server.app_state.db.get_contacts(*user_id).await.unwrap();
6435 let contacts = server
6436 .store
6437 .read()
6438 .await
6439 .build_initial_contacts_update(contacts)
6440 .contacts;
6441 for contact in contacts {
6442 if contact.online {
6443 assert_ne!(
6444 contact.user_id, removed_guest_id.0 as u64,
6445 "removed guest is still a contact of another peer"
6446 );
6447 }
6448 for project in contact.projects {
6449 for project_guest_id in project.guests {
6450 assert_ne!(
6451 project_guest_id, removed_guest_id.0 as u64,
6452 "removed guest appears as still participating on a project"
6453 );
6454 }
6455 }
6456 }
6457 }
6458
6459 log::info!("{} removed", guest.username);
6460 available_guests.push(guest.username.clone());
6461 guest_cx.update(|_| drop(guest));
6462
6463 operations += 1;
6464 }
6465 _ => {
6466 while operations < max_operations && rng.lock().gen_bool(0.7) {
6467 op_start_signals
6468 .choose(&mut *rng.lock())
6469 .unwrap()
6470 .unbounded_send(())
6471 .unwrap();
6472 operations += 1;
6473 }
6474
6475 if rng.lock().gen_bool(0.8) {
6476 cx.foreground().run_until_parked();
6477 }
6478 }
6479 }
6480 }
6481
6482 drop(op_start_signals);
6483 let mut clients = futures::future::join_all(clients).await;
6484 cx.foreground().run_until_parked();
6485
6486 let (host_client, mut host_cx, host_err) = clients.remove(0);
6487 if let Some(host_err) = host_err {
6488 panic!("host error - {:?}", host_err);
6489 }
6490 let host_project = host_client.project.as_ref().unwrap();
6491 let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
6492 project
6493 .worktrees(cx)
6494 .map(|worktree| {
6495 let snapshot = worktree.read(cx).snapshot();
6496 (snapshot.id(), snapshot)
6497 })
6498 .collect::<BTreeMap<_, _>>()
6499 });
6500
6501 host_client
6502 .project
6503 .as_ref()
6504 .unwrap()
6505 .read_with(&host_cx, |project, cx| project.check_invariants(cx));
6506
6507 for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
6508 if let Some(guest_err) = guest_err {
6509 panic!("{} error - {:?}", guest_client.username, guest_err);
6510 }
6511 let worktree_snapshots =
6512 guest_client
6513 .project
6514 .as_ref()
6515 .unwrap()
6516 .read_with(&guest_cx, |project, cx| {
6517 project
6518 .worktrees(cx)
6519 .map(|worktree| {
6520 let worktree = worktree.read(cx);
6521 (worktree.id(), worktree.snapshot())
6522 })
6523 .collect::<BTreeMap<_, _>>()
6524 });
6525
6526 assert_eq!(
6527 worktree_snapshots.keys().collect::<Vec<_>>(),
6528 host_worktree_snapshots.keys().collect::<Vec<_>>(),
6529 "{} has different worktrees than the host",
6530 guest_client.username
6531 );
6532 for (id, host_snapshot) in &host_worktree_snapshots {
6533 let guest_snapshot = &worktree_snapshots[id];
6534 assert_eq!(
6535 guest_snapshot.root_name(),
6536 host_snapshot.root_name(),
6537 "{} has different root name than the host for worktree {}",
6538 guest_client.username,
6539 id
6540 );
6541 assert_eq!(
6542 guest_snapshot.entries(false).collect::<Vec<_>>(),
6543 host_snapshot.entries(false).collect::<Vec<_>>(),
6544 "{} has different snapshot than the host for worktree {}",
6545 guest_client.username,
6546 id
6547 );
6548 assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id());
6549 }
6550
6551 guest_client
6552 .project
6553 .as_ref()
6554 .unwrap()
6555 .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
6556
6557 for guest_buffer in &guest_client.buffers {
6558 let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
6559 let host_buffer = host_project.read_with(&host_cx, |project, cx| {
6560 project.buffer_for_id(buffer_id, cx).expect(&format!(
6561 "host does not have buffer for guest:{}, peer:{}, id:{}",
6562 guest_client.username, guest_client.peer_id, buffer_id
6563 ))
6564 });
6565 let path = host_buffer
6566 .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
6567
6568 assert_eq!(
6569 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
6570 0,
6571 "{}, buffer {}, path {:?} has deferred operations",
6572 guest_client.username,
6573 buffer_id,
6574 path,
6575 );
6576 assert_eq!(
6577 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
6578 host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
6579 "{}, buffer {}, path {:?}, differs from the host's buffer",
6580 guest_client.username,
6581 buffer_id,
6582 path
6583 );
6584 }
6585
6586 guest_cx.update(|_| drop(guest_client));
6587 }
6588
6589 host_cx.update(|_| drop(host_client));
6590 }
6591
6592 struct TestServer {
6593 peer: Arc<Peer>,
6594 app_state: Arc<AppState>,
6595 server: Arc<Server>,
6596 foreground: Rc<executor::Foreground>,
6597 notifications: mpsc::UnboundedReceiver<()>,
6598 connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
6599 forbid_connections: Arc<AtomicBool>,
6600 _test_db: TestDb,
6601 }
6602
6603 impl TestServer {
6604 async fn start(
6605 foreground: Rc<executor::Foreground>,
6606 background: Arc<executor::Background>,
6607 ) -> Self {
6608 let test_db = TestDb::fake(background);
6609 let app_state = Self::build_app_state(&test_db).await;
6610 let peer = Peer::new();
6611 let notifications = mpsc::unbounded();
6612 let server = Server::new(app_state.clone(), Some(notifications.0));
6613 Self {
6614 peer,
6615 app_state,
6616 server,
6617 foreground,
6618 notifications: notifications.1,
6619 connection_killers: Default::default(),
6620 forbid_connections: Default::default(),
6621 _test_db: test_db,
6622 }
6623 }
6624
6625 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
6626 cx.update(|cx| {
6627 let settings = Settings::test(cx);
6628 cx.set_global(settings);
6629 });
6630
6631 let http = FakeHttpClient::with_404_response();
6632 let user_id =
6633 if let Ok(Some(user)) = self.app_state.db.get_user_by_github_login(name).await {
6634 user.id
6635 } else {
6636 self.app_state.db.create_user(name, None, false).await.unwrap()
6637 };
6638 let client_name = name.to_string();
6639 let mut client = Client::new(http.clone());
6640 let server = self.server.clone();
6641 let db = self.app_state.db.clone();
6642 let connection_killers = self.connection_killers.clone();
6643 let forbid_connections = self.forbid_connections.clone();
6644 let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
6645
6646 Arc::get_mut(&mut client)
6647 .unwrap()
6648 .override_authenticate(move |cx| {
6649 cx.spawn(|_| async move {
6650 let access_token = "the-token".to_string();
6651 Ok(Credentials {
6652 user_id: user_id.0 as u64,
6653 access_token,
6654 })
6655 })
6656 })
6657 .override_establish_connection(move |credentials, cx| {
6658 assert_eq!(credentials.user_id, user_id.0 as u64);
6659 assert_eq!(credentials.access_token, "the-token");
6660
6661 let server = server.clone();
6662 let db = db.clone();
6663 let connection_killers = connection_killers.clone();
6664 let forbid_connections = forbid_connections.clone();
6665 let client_name = client_name.clone();
6666 let connection_id_tx = connection_id_tx.clone();
6667 cx.spawn(move |cx| async move {
6668 if forbid_connections.load(SeqCst) {
6669 Err(EstablishConnectionError::other(anyhow!(
6670 "server is forbidding connections"
6671 )))
6672 } else {
6673 let (client_conn, server_conn, killed) =
6674 Connection::in_memory(cx.background());
6675 connection_killers.lock().insert(user_id, killed);
6676 let user = db.get_user_by_id(user_id).await.unwrap().unwrap();
6677 cx.background()
6678 .spawn(server.handle_connection(
6679 server_conn,
6680 client_name,
6681 user,
6682 Some(connection_id_tx),
6683 cx.background(),
6684 ))
6685 .detach();
6686 Ok(client_conn)
6687 }
6688 })
6689 });
6690
6691 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
6692 let app_state = Arc::new(workspace::AppState {
6693 client: client.clone(),
6694 user_store: user_store.clone(),
6695 languages: Arc::new(LanguageRegistry::new(Task::ready(()))),
6696 themes: ThemeRegistry::new((), cx.font_cache()),
6697 fs: FakeFs::new(cx.background()),
6698 build_window_options: || Default::default(),
6699 initialize_workspace: |_, _, _| unimplemented!(),
6700 });
6701
6702 Channel::init(&client);
6703 Project::init(&client);
6704 cx.update(|cx| workspace::init(app_state.clone(), cx));
6705
6706 client
6707 .authenticate_and_connect(false, &cx.to_async())
6708 .await
6709 .unwrap();
6710 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
6711
6712 let client = TestClient {
6713 client,
6714 peer_id,
6715 username: name.to_string(),
6716 user_store,
6717 language_registry: Arc::new(LanguageRegistry::test()),
6718 project: Default::default(),
6719 buffers: Default::default(),
6720 };
6721 client.wait_for_current_user(cx).await;
6722 client
6723 }
6724
6725 fn disconnect_client(&self, user_id: UserId) {
6726 self.connection_killers
6727 .lock()
6728 .remove(&user_id)
6729 .unwrap()
6730 .store(true, SeqCst);
6731 }
6732
6733 fn forbid_connections(&self) {
6734 self.forbid_connections.store(true, SeqCst);
6735 }
6736
6737 fn allow_connections(&self) {
6738 self.forbid_connections.store(false, SeqCst);
6739 }
6740
6741 async fn make_contacts(&self, mut clients: Vec<(&TestClient, &mut TestAppContext)>) {
6742 while let Some((client_a, cx_a)) = clients.pop() {
6743 for (client_b, cx_b) in &mut clients {
6744 client_a
6745 .user_store
6746 .update(cx_a, |store, cx| {
6747 store.request_contact(client_b.user_id().unwrap(), cx)
6748 })
6749 .await
6750 .unwrap();
6751 cx_a.foreground().run_until_parked();
6752 client_b
6753 .user_store
6754 .update(*cx_b, |store, cx| {
6755 store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
6756 })
6757 .await
6758 .unwrap();
6759 }
6760 }
6761 }
6762
6763 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
6764 Arc::new(AppState {
6765 db: test_db.db().clone(),
6766 api_token: Default::default(),
6767 invite_link_prefix: Default::default(),
6768 })
6769 }
6770
6771 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
6772 self.server.store.read().await
6773 }
6774
6775 async fn condition<F>(&mut self, mut predicate: F)
6776 where
6777 F: FnMut(&Store) -> bool,
6778 {
6779 assert!(
6780 self.foreground.parking_forbidden(),
6781 "you must call forbid_parking to use server conditions so we don't block indefinitely"
6782 );
6783 while !(predicate)(&*self.server.store.read().await) {
6784 self.foreground.start_waiting();
6785 self.notifications.next().await;
6786 self.foreground.finish_waiting();
6787 }
6788 }
6789 }
6790
6791 impl Deref for TestServer {
6792 type Target = Server;
6793
6794 fn deref(&self) -> &Self::Target {
6795 &self.server
6796 }
6797 }
6798
6799 impl Drop for TestServer {
6800 fn drop(&mut self) {
6801 self.peer.reset();
6802 }
6803 }
6804
6805 struct TestClient {
6806 client: Arc<Client>,
6807 username: String,
6808 pub peer_id: PeerId,
6809 pub user_store: ModelHandle<UserStore>,
6810 language_registry: Arc<LanguageRegistry>,
6811 project: Option<ModelHandle<Project>>,
6812 buffers: HashSet<ModelHandle<language::Buffer>>,
6813 }
6814
6815 impl Deref for TestClient {
6816 type Target = Arc<Client>;
6817
6818 fn deref(&self) -> &Self::Target {
6819 &self.client
6820 }
6821 }
6822
6823 struct ContactsSummary {
6824 pub current: Vec<String>,
6825 pub outgoing_requests: Vec<String>,
6826 pub incoming_requests: Vec<String>,
6827 }
6828
6829 impl TestClient {
6830 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
6831 UserId::from_proto(
6832 self.user_store
6833 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
6834 )
6835 }
6836
6837 async fn wait_for_current_user(&self, cx: &TestAppContext) {
6838 let mut authed_user = self
6839 .user_store
6840 .read_with(cx, |user_store, _| user_store.watch_current_user());
6841 while authed_user.next().await.unwrap().is_none() {}
6842 }
6843
6844 async fn clear_contacts(&self, cx: &mut TestAppContext) {
6845 self.user_store
6846 .update(cx, |store, _| store.clear_contacts())
6847 .await;
6848 }
6849
6850 fn summarize_contacts(&self, cx: &TestAppContext) -> ContactsSummary {
6851 self.user_store.read_with(cx, |store, _| ContactsSummary {
6852 current: store
6853 .contacts()
6854 .iter()
6855 .map(|contact| contact.user.github_login.clone())
6856 .collect(),
6857 outgoing_requests: store
6858 .outgoing_contact_requests()
6859 .iter()
6860 .map(|user| user.github_login.clone())
6861 .collect(),
6862 incoming_requests: store
6863 .incoming_contact_requests()
6864 .iter()
6865 .map(|user| user.github_login.clone())
6866 .collect(),
6867 })
6868 }
6869
6870 async fn build_local_project(
6871 &mut self,
6872 fs: Arc<FakeFs>,
6873 root_path: impl AsRef<Path>,
6874 cx: &mut TestAppContext,
6875 ) -> (ModelHandle<Project>, WorktreeId) {
6876 let project = cx.update(|cx| {
6877 Project::local(
6878 self.client.clone(),
6879 self.user_store.clone(),
6880 self.language_registry.clone(),
6881 fs,
6882 cx,
6883 )
6884 });
6885 self.project = Some(project.clone());
6886 let (worktree, _) = project
6887 .update(cx, |p, cx| {
6888 p.find_or_create_local_worktree(root_path, true, cx)
6889 })
6890 .await
6891 .unwrap();
6892 worktree
6893 .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
6894 .await;
6895 project
6896 .update(cx, |project, _| project.next_remote_id())
6897 .await;
6898 (project, worktree.read_with(cx, |tree, _| tree.id()))
6899 }
6900
6901 async fn build_remote_project(
6902 &mut self,
6903 host_project: &ModelHandle<Project>,
6904 host_cx: &mut TestAppContext,
6905 guest_cx: &mut TestAppContext,
6906 ) -> ModelHandle<Project> {
6907 let host_project_id = host_project
6908 .read_with(host_cx, |project, _| project.next_remote_id())
6909 .await;
6910 let guest_user_id = self.user_id().unwrap();
6911 let languages =
6912 host_project.read_with(host_cx, |project, _| project.languages().clone());
6913 let project_b = guest_cx.spawn(|mut cx| {
6914 let user_store = self.user_store.clone();
6915 let guest_client = self.client.clone();
6916 async move {
6917 Project::remote(
6918 host_project_id,
6919 guest_client,
6920 user_store.clone(),
6921 languages,
6922 FakeFs::new(cx.background()),
6923 &mut cx,
6924 )
6925 .await
6926 .unwrap()
6927 }
6928 });
6929 host_cx.foreground().run_until_parked();
6930 host_project.update(host_cx, |project, cx| {
6931 project.respond_to_join_request(guest_user_id, true, cx)
6932 });
6933 let project = project_b.await;
6934 self.project = Some(project.clone());
6935 project
6936 }
6937
6938 fn build_workspace(
6939 &self,
6940 project: &ModelHandle<Project>,
6941 cx: &mut TestAppContext,
6942 ) -> ViewHandle<Workspace> {
6943 let (window_id, _) = cx.add_window(|_| EmptyView);
6944 cx.add_view(window_id, |cx| Workspace::new(project.clone(), cx))
6945 }
6946
6947 async fn simulate_host(
6948 mut self,
6949 project: ModelHandle<Project>,
6950 op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6951 rng: Arc<Mutex<StdRng>>,
6952 mut cx: TestAppContext,
6953 ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6954 async fn simulate_host_internal(
6955 client: &mut TestClient,
6956 project: ModelHandle<Project>,
6957 mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6958 rng: Arc<Mutex<StdRng>>,
6959 cx: &mut TestAppContext,
6960 ) -> anyhow::Result<()> {
6961 let fs = project.read_with(cx, |project, _| project.fs().clone());
6962
6963 cx.update(|cx| {
6964 cx.subscribe(&project, move |project, event, cx| {
6965 if let project::Event::ContactRequestedJoin(user) = event {
6966 log::info!("Host: accepting join request from {}", user.github_login);
6967 project.update(cx, |project, cx| {
6968 project.respond_to_join_request(user.id, true, cx)
6969 });
6970 }
6971 })
6972 .detach();
6973 });
6974
6975 while op_start_signal.next().await.is_some() {
6976 let distribution = rng.lock().gen_range::<usize, _>(0..100);
6977 let files = fs.as_fake().files().await;
6978 match distribution {
6979 0..=19 if !files.is_empty() => {
6980 let path = files.choose(&mut *rng.lock()).unwrap();
6981 let mut path = path.as_path();
6982 while let Some(parent_path) = path.parent() {
6983 path = parent_path;
6984 if rng.lock().gen() {
6985 break;
6986 }
6987 }
6988
6989 log::info!("Host: find/create local worktree {:?}", path);
6990 let find_or_create_worktree = project.update(cx, |project, cx| {
6991 project.find_or_create_local_worktree(path, true, cx)
6992 });
6993 if rng.lock().gen() {
6994 cx.background().spawn(find_or_create_worktree).detach();
6995 } else {
6996 find_or_create_worktree.await?;
6997 }
6998 }
6999 20..=79 if !files.is_empty() => {
7000 let buffer = if client.buffers.is_empty() || rng.lock().gen() {
7001 let file = files.choose(&mut *rng.lock()).unwrap();
7002 let (worktree, path) = project
7003 .update(cx, |project, cx| {
7004 project.find_or_create_local_worktree(
7005 file.clone(),
7006 true,
7007 cx,
7008 )
7009 })
7010 .await?;
7011 let project_path =
7012 worktree.read_with(cx, |worktree, _| (worktree.id(), path));
7013 log::info!(
7014 "Host: opening path {:?}, worktree {}, relative_path {:?}",
7015 file,
7016 project_path.0,
7017 project_path.1
7018 );
7019 let buffer = project
7020 .update(cx, |project, cx| project.open_buffer(project_path, cx))
7021 .await
7022 .unwrap();
7023 client.buffers.insert(buffer.clone());
7024 buffer
7025 } else {
7026 client
7027 .buffers
7028 .iter()
7029 .choose(&mut *rng.lock())
7030 .unwrap()
7031 .clone()
7032 };
7033
7034 if rng.lock().gen_bool(0.1) {
7035 cx.update(|cx| {
7036 log::info!(
7037 "Host: dropping buffer {:?}",
7038 buffer.read(cx).file().unwrap().full_path(cx)
7039 );
7040 client.buffers.remove(&buffer);
7041 drop(buffer);
7042 });
7043 } else {
7044 buffer.update(cx, |buffer, cx| {
7045 log::info!(
7046 "Host: updating buffer {:?} ({})",
7047 buffer.file().unwrap().full_path(cx),
7048 buffer.remote_id()
7049 );
7050
7051 if rng.lock().gen_bool(0.7) {
7052 buffer.randomly_edit(&mut *rng.lock(), 5, cx);
7053 } else {
7054 buffer.randomly_undo_redo(&mut *rng.lock(), cx);
7055 }
7056 });
7057 }
7058 }
7059 _ => loop {
7060 let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
7061 let mut path = PathBuf::new();
7062 path.push("/");
7063 for _ in 0..path_component_count {
7064 let letter = rng.lock().gen_range(b'a'..=b'z');
7065 path.push(std::str::from_utf8(&[letter]).unwrap());
7066 }
7067 path.set_extension("rs");
7068 let parent_path = path.parent().unwrap();
7069
7070 log::info!("Host: creating file {:?}", path,);
7071
7072 if fs.create_dir(&parent_path).await.is_ok()
7073 && fs.create_file(&path, Default::default()).await.is_ok()
7074 {
7075 break;
7076 } else {
7077 log::info!("Host: cannot create file");
7078 }
7079 },
7080 }
7081
7082 cx.background().simulate_random_delay().await;
7083 }
7084
7085 Ok(())
7086 }
7087
7088 let result =
7089 simulate_host_internal(&mut self, project.clone(), op_start_signal, rng, &mut cx)
7090 .await;
7091 log::info!("Host done");
7092 self.project = Some(project);
7093 (self, cx, result.err())
7094 }
7095
7096 pub async fn simulate_guest(
7097 mut self,
7098 guest_username: String,
7099 project: ModelHandle<Project>,
7100 op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
7101 rng: Arc<Mutex<StdRng>>,
7102 mut cx: TestAppContext,
7103 ) -> (Self, TestAppContext, Option<anyhow::Error>) {
7104 async fn simulate_guest_internal(
7105 client: &mut TestClient,
7106 guest_username: &str,
7107 project: ModelHandle<Project>,
7108 mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
7109 rng: Arc<Mutex<StdRng>>,
7110 cx: &mut TestAppContext,
7111 ) -> anyhow::Result<()> {
7112 while op_start_signal.next().await.is_some() {
7113 let buffer = if client.buffers.is_empty() || rng.lock().gen() {
7114 let worktree = if let Some(worktree) =
7115 project.read_with(cx, |project, cx| {
7116 project
7117 .worktrees(&cx)
7118 .filter(|worktree| {
7119 let worktree = worktree.read(cx);
7120 worktree.is_visible()
7121 && worktree.entries(false).any(|e| e.is_file())
7122 })
7123 .choose(&mut *rng.lock())
7124 }) {
7125 worktree
7126 } else {
7127 cx.background().simulate_random_delay().await;
7128 continue;
7129 };
7130
7131 let (worktree_root_name, project_path) =
7132 worktree.read_with(cx, |worktree, _| {
7133 let entry = worktree
7134 .entries(false)
7135 .filter(|e| e.is_file())
7136 .choose(&mut *rng.lock())
7137 .unwrap();
7138 (
7139 worktree.root_name().to_string(),
7140 (worktree.id(), entry.path.clone()),
7141 )
7142 });
7143 log::info!(
7144 "{}: opening path {:?} in worktree {} ({})",
7145 guest_username,
7146 project_path.1,
7147 project_path.0,
7148 worktree_root_name,
7149 );
7150 let buffer = project
7151 .update(cx, |project, cx| {
7152 project.open_buffer(project_path.clone(), cx)
7153 })
7154 .await?;
7155 log::info!(
7156 "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
7157 guest_username,
7158 project_path.1,
7159 project_path.0,
7160 worktree_root_name,
7161 buffer.read_with(cx, |buffer, _| buffer.remote_id())
7162 );
7163 client.buffers.insert(buffer.clone());
7164 buffer
7165 } else {
7166 client
7167 .buffers
7168 .iter()
7169 .choose(&mut *rng.lock())
7170 .unwrap()
7171 .clone()
7172 };
7173
7174 let choice = rng.lock().gen_range(0..100);
7175 match choice {
7176 0..=9 => {
7177 cx.update(|cx| {
7178 log::info!(
7179 "{}: dropping buffer {:?}",
7180 guest_username,
7181 buffer.read(cx).file().unwrap().full_path(cx)
7182 );
7183 client.buffers.remove(&buffer);
7184 drop(buffer);
7185 });
7186 }
7187 10..=19 => {
7188 let completions = project.update(cx, |project, cx| {
7189 log::info!(
7190 "{}: requesting completions for buffer {} ({:?})",
7191 guest_username,
7192 buffer.read(cx).remote_id(),
7193 buffer.read(cx).file().unwrap().full_path(cx)
7194 );
7195 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7196 project.completions(&buffer, offset, cx)
7197 });
7198 let completions = cx.background().spawn(async move {
7199 completions
7200 .await
7201 .map_err(|err| anyhow!("completions request failed: {:?}", err))
7202 });
7203 if rng.lock().gen_bool(0.3) {
7204 log::info!("{}: detaching completions request", guest_username);
7205 cx.update(|cx| completions.detach_and_log_err(cx));
7206 } else {
7207 completions.await?;
7208 }
7209 }
7210 20..=29 => {
7211 let code_actions = project.update(cx, |project, cx| {
7212 log::info!(
7213 "{}: requesting code actions for buffer {} ({:?})",
7214 guest_username,
7215 buffer.read(cx).remote_id(),
7216 buffer.read(cx).file().unwrap().full_path(cx)
7217 );
7218 let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
7219 project.code_actions(&buffer, range, cx)
7220 });
7221 let code_actions = cx.background().spawn(async move {
7222 code_actions.await.map_err(|err| {
7223 anyhow!("code actions request failed: {:?}", err)
7224 })
7225 });
7226 if rng.lock().gen_bool(0.3) {
7227 log::info!("{}: detaching code actions request", guest_username);
7228 cx.update(|cx| code_actions.detach_and_log_err(cx));
7229 } else {
7230 code_actions.await?;
7231 }
7232 }
7233 30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
7234 let (requested_version, save) = buffer.update(cx, |buffer, cx| {
7235 log::info!(
7236 "{}: saving buffer {} ({:?})",
7237 guest_username,
7238 buffer.remote_id(),
7239 buffer.file().unwrap().full_path(cx)
7240 );
7241 (buffer.version(), buffer.save(cx))
7242 });
7243 let save = cx.background().spawn(async move {
7244 let (saved_version, _) = save
7245 .await
7246 .map_err(|err| anyhow!("save request failed: {:?}", err))?;
7247 assert!(saved_version.observed_all(&requested_version));
7248 Ok::<_, anyhow::Error>(())
7249 });
7250 if rng.lock().gen_bool(0.3) {
7251 log::info!("{}: detaching save request", guest_username);
7252 cx.update(|cx| save.detach_and_log_err(cx));
7253 } else {
7254 save.await?;
7255 }
7256 }
7257 40..=44 => {
7258 let prepare_rename = project.update(cx, |project, cx| {
7259 log::info!(
7260 "{}: preparing rename for buffer {} ({:?})",
7261 guest_username,
7262 buffer.read(cx).remote_id(),
7263 buffer.read(cx).file().unwrap().full_path(cx)
7264 );
7265 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7266 project.prepare_rename(buffer, offset, cx)
7267 });
7268 let prepare_rename = cx.background().spawn(async move {
7269 prepare_rename.await.map_err(|err| {
7270 anyhow!("prepare rename request failed: {:?}", err)
7271 })
7272 });
7273 if rng.lock().gen_bool(0.3) {
7274 log::info!("{}: detaching prepare rename request", guest_username);
7275 cx.update(|cx| prepare_rename.detach_and_log_err(cx));
7276 } else {
7277 prepare_rename.await?;
7278 }
7279 }
7280 45..=49 => {
7281 let definitions = project.update(cx, |project, cx| {
7282 log::info!(
7283 "{}: requesting definitions for buffer {} ({:?})",
7284 guest_username,
7285 buffer.read(cx).remote_id(),
7286 buffer.read(cx).file().unwrap().full_path(cx)
7287 );
7288 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7289 project.definition(&buffer, offset, cx)
7290 });
7291 let definitions = cx.background().spawn(async move {
7292 definitions
7293 .await
7294 .map_err(|err| anyhow!("definitions request failed: {:?}", err))
7295 });
7296 if rng.lock().gen_bool(0.3) {
7297 log::info!("{}: detaching definitions request", guest_username);
7298 cx.update(|cx| definitions.detach_and_log_err(cx));
7299 } else {
7300 client
7301 .buffers
7302 .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
7303 }
7304 }
7305 50..=54 => {
7306 let highlights = project.update(cx, |project, cx| {
7307 log::info!(
7308 "{}: requesting highlights for buffer {} ({:?})",
7309 guest_username,
7310 buffer.read(cx).remote_id(),
7311 buffer.read(cx).file().unwrap().full_path(cx)
7312 );
7313 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7314 project.document_highlights(&buffer, offset, cx)
7315 });
7316 let highlights = cx.background().spawn(async move {
7317 highlights
7318 .await
7319 .map_err(|err| anyhow!("highlights request failed: {:?}", err))
7320 });
7321 if rng.lock().gen_bool(0.3) {
7322 log::info!("{}: detaching highlights request", guest_username);
7323 cx.update(|cx| highlights.detach_and_log_err(cx));
7324 } else {
7325 highlights.await?;
7326 }
7327 }
7328 55..=59 => {
7329 let search = project.update(cx, |project, cx| {
7330 let query = rng.lock().gen_range('a'..='z');
7331 log::info!("{}: project-wide search {:?}", guest_username, query);
7332 project.search(SearchQuery::text(query, false, false), cx)
7333 });
7334 let search = cx.background().spawn(async move {
7335 search
7336 .await
7337 .map_err(|err| anyhow!("search request failed: {:?}", err))
7338 });
7339 if rng.lock().gen_bool(0.3) {
7340 log::info!("{}: detaching search request", guest_username);
7341 cx.update(|cx| search.detach_and_log_err(cx));
7342 } else {
7343 client.buffers.extend(search.await?.into_keys());
7344 }
7345 }
7346 60..=69 => {
7347 let worktree = project
7348 .read_with(cx, |project, cx| {
7349 project
7350 .worktrees(&cx)
7351 .filter(|worktree| {
7352 let worktree = worktree.read(cx);
7353 worktree.is_visible()
7354 && worktree.entries(false).any(|e| e.is_file())
7355 && worktree
7356 .root_entry()
7357 .map_or(false, |e| e.is_dir())
7358 })
7359 .choose(&mut *rng.lock())
7360 })
7361 .unwrap();
7362 let (worktree_id, worktree_root_name) = worktree
7363 .read_with(cx, |worktree, _| {
7364 (worktree.id(), worktree.root_name().to_string())
7365 });
7366
7367 let mut new_name = String::new();
7368 for _ in 0..10 {
7369 let letter = rng.lock().gen_range('a'..='z');
7370 new_name.push(letter);
7371 }
7372 let mut new_path = PathBuf::new();
7373 new_path.push(new_name);
7374 new_path.set_extension("rs");
7375 log::info!(
7376 "{}: creating {:?} in worktree {} ({})",
7377 guest_username,
7378 new_path,
7379 worktree_id,
7380 worktree_root_name,
7381 );
7382 project
7383 .update(cx, |project, cx| {
7384 project.create_entry((worktree_id, new_path), false, cx)
7385 })
7386 .unwrap()
7387 .await?;
7388 }
7389 _ => {
7390 buffer.update(cx, |buffer, cx| {
7391 log::info!(
7392 "{}: updating buffer {} ({:?})",
7393 guest_username,
7394 buffer.remote_id(),
7395 buffer.file().unwrap().full_path(cx)
7396 );
7397 if rng.lock().gen_bool(0.7) {
7398 buffer.randomly_edit(&mut *rng.lock(), 5, cx);
7399 } else {
7400 buffer.randomly_undo_redo(&mut *rng.lock(), cx);
7401 }
7402 });
7403 }
7404 }
7405 cx.background().simulate_random_delay().await;
7406 }
7407 Ok(())
7408 }
7409
7410 let result = simulate_guest_internal(
7411 &mut self,
7412 &guest_username,
7413 project.clone(),
7414 op_start_signal,
7415 rng,
7416 &mut cx,
7417 )
7418 .await;
7419 log::info!("{}: done", guest_username);
7420
7421 self.project = Some(project);
7422 (self, cx, result.err())
7423 }
7424 }
7425
7426 impl Drop for TestClient {
7427 fn drop(&mut self) {
7428 self.client.tear_down();
7429 }
7430 }
7431
7432 impl Executor for Arc<gpui::executor::Background> {
7433 type Sleep = gpui::executor::Timer;
7434
7435 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
7436 self.spawn(future).detach();
7437 }
7438
7439 fn sleep(&self, duration: Duration) -> Self::Sleep {
7440 self.as_ref().timer(duration)
7441 }
7442 }
7443
7444 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
7445 channel
7446 .messages()
7447 .cursor::<()>()
7448 .map(|m| {
7449 (
7450 m.sender.github_login.clone(),
7451 m.body.clone(),
7452 m.is_pending(),
7453 )
7454 })
7455 .collect()
7456 }
7457
7458 struct EmptyView;
7459
7460 impl gpui::Entity for EmptyView {
7461 type Event = ();
7462 }
7463
7464 impl gpui::View for EmptyView {
7465 fn ui_name() -> &'static str {
7466 "empty view"
7467 }
7468
7469 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
7470 gpui::Element::boxed(gpui::elements::Empty::new())
7471 }
7472 }
7473}