1mod store;
2
3use crate::{
4 auth,
5 db::{ChannelId, MessageId, UserId},
6 AppState, Result,
7};
8use anyhow::anyhow;
9use async_tungstenite::tungstenite::{
10 protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
11};
12use axum::{
13 body::Body,
14 extract::{
15 ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
16 ConnectInfo, WebSocketUpgrade,
17 },
18 headers::{Header, HeaderName},
19 http::StatusCode,
20 middleware,
21 response::IntoResponse,
22 routing::get,
23 Extension, Router, TypedHeader,
24};
25use collections::HashMap;
26use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
27use lazy_static::lazy_static;
28use rpc::{
29 proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
30 Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
31};
32use std::{
33 any::TypeId,
34 future::Future,
35 marker::PhantomData,
36 net::SocketAddr,
37 ops::{Deref, DerefMut},
38 rc::Rc,
39 sync::{
40 atomic::{AtomicBool, Ordering::SeqCst},
41 Arc,
42 },
43 time::Duration,
44};
45use store::{Store, Worktree};
46use time::OffsetDateTime;
47use tokio::{
48 sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
49 time::Sleep,
50};
51use tower::ServiceBuilder;
52use tracing::{info_span, Instrument};
53
54type MessageHandler =
55 Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
56
57struct Response<R> {
58 server: Arc<Server>,
59 receipt: Receipt<R>,
60 responded: Arc<AtomicBool>,
61}
62
63impl<R: RequestMessage> Response<R> {
64 fn send(self, payload: R::Response) -> Result<()> {
65 self.responded.store(true, SeqCst);
66 self.server.peer.respond(self.receipt, payload)?;
67 Ok(())
68 }
69}
70
71pub struct Server {
72 peer: Arc<Peer>,
73 store: RwLock<Store>,
74 app_state: Arc<AppState>,
75 handlers: HashMap<TypeId, MessageHandler>,
76 notifications: Option<mpsc::UnboundedSender<()>>,
77}
78
79pub trait Executor: Send + Clone {
80 type Sleep: Send + Future;
81 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
82 fn sleep(&self, duration: Duration) -> Self::Sleep;
83}
84
85#[derive(Clone)]
86pub struct RealExecutor;
87
88const MESSAGE_COUNT_PER_PAGE: usize = 100;
89const MAX_MESSAGE_LEN: usize = 1024;
90
91struct StoreReadGuard<'a> {
92 guard: RwLockReadGuard<'a, Store>,
93 _not_send: PhantomData<Rc<()>>,
94}
95
96struct StoreWriteGuard<'a> {
97 guard: RwLockWriteGuard<'a, Store>,
98 _not_send: PhantomData<Rc<()>>,
99}
100
101impl Server {
102 pub fn new(
103 app_state: Arc<AppState>,
104 notifications: Option<mpsc::UnboundedSender<()>>,
105 ) -> Arc<Self> {
106 let mut server = Self {
107 peer: Peer::new(),
108 app_state,
109 store: Default::default(),
110 handlers: Default::default(),
111 notifications,
112 };
113
114 server
115 .add_request_handler(Server::ping)
116 .add_request_handler(Server::register_project)
117 .add_message_handler(Server::unregister_project)
118 .add_request_handler(Server::share_project)
119 .add_message_handler(Server::unshare_project)
120 .add_request_handler(Server::join_project)
121 .add_message_handler(Server::leave_project)
122 .add_request_handler(Server::register_worktree)
123 .add_message_handler(Server::unregister_worktree)
124 .add_request_handler(Server::update_worktree)
125 .add_message_handler(Server::start_language_server)
126 .add_message_handler(Server::update_language_server)
127 .add_message_handler(Server::update_diagnostic_summary)
128 .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
129 .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
130 .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
131 .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
132 .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
133 .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
134 .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
135 .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
136 .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
137 .add_request_handler(
138 Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
139 )
140 .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
141 .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
142 .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
143 .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
144 .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
145 .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
146 .add_request_handler(Server::forward_project_request::<proto::CreateProjectEntry>)
147 .add_request_handler(Server::forward_project_request::<proto::RenameProjectEntry>)
148 .add_request_handler(Server::forward_project_request::<proto::DeleteProjectEntry>)
149 .add_request_handler(Server::update_buffer)
150 .add_message_handler(Server::update_buffer_file)
151 .add_message_handler(Server::buffer_reloaded)
152 .add_message_handler(Server::buffer_saved)
153 .add_request_handler(Server::save_buffer)
154 .add_request_handler(Server::get_channels)
155 .add_request_handler(Server::get_users)
156 .add_request_handler(Server::fuzzy_search_users)
157 .add_request_handler(Server::request_contact)
158 .add_request_handler(Server::respond_to_contact_request)
159 .add_request_handler(Server::join_channel)
160 .add_message_handler(Server::leave_channel)
161 .add_request_handler(Server::send_channel_message)
162 .add_request_handler(Server::follow)
163 .add_message_handler(Server::unfollow)
164 .add_message_handler(Server::update_followers)
165 .add_request_handler(Server::get_channel_messages);
166
167 Arc::new(server)
168 }
169
170 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
171 where
172 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
173 Fut: 'static + Send + Future<Output = Result<()>>,
174 M: EnvelopedMessage,
175 {
176 let prev_handler = self.handlers.insert(
177 TypeId::of::<M>(),
178 Box::new(move |server, envelope| {
179 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
180 let span = info_span!(
181 "handle message",
182 payload_type = envelope.payload_type_name(),
183 payload = format!("{:?}", envelope.payload).as_str(),
184 );
185 let future = (handler)(server, *envelope);
186 async move {
187 if let Err(error) = future.await {
188 tracing::error!(%error, "error handling message");
189 }
190 }
191 .instrument(span)
192 .boxed()
193 }),
194 );
195 if prev_handler.is_some() {
196 panic!("registered a handler for the same message twice");
197 }
198 self
199 }
200
201 /// Handle a request while holding a lock to the store. This is useful when we're registering
202 /// a connection but we want to respond on the connection before anybody else can send on it.
203 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
204 where
205 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>, Response<M>) -> Fut,
206 Fut: Send + Future<Output = Result<()>>,
207 M: RequestMessage,
208 {
209 let handler = Arc::new(handler);
210 self.add_message_handler(move |server, envelope| {
211 let receipt = envelope.receipt();
212 let handler = handler.clone();
213 async move {
214 let responded = Arc::new(AtomicBool::default());
215 let response = Response {
216 server: server.clone(),
217 responded: responded.clone(),
218 receipt: envelope.receipt(),
219 };
220 match (handler)(server.clone(), envelope, response).await {
221 Ok(()) => {
222 if responded.load(std::sync::atomic::Ordering::SeqCst) {
223 Ok(())
224 } else {
225 Err(anyhow!("handler did not send a response"))?
226 }
227 }
228 Err(error) => {
229 server.peer.respond_with_error(
230 receipt,
231 proto::Error {
232 message: error.to_string(),
233 },
234 )?;
235 Err(error)
236 }
237 }
238 }
239 })
240 }
241
242 pub fn handle_connection<E: Executor>(
243 self: &Arc<Self>,
244 connection: Connection,
245 address: String,
246 user_id: UserId,
247 mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
248 executor: E,
249 ) -> impl Future<Output = Result<()>> {
250 let mut this = self.clone();
251 let span = info_span!("handle connection", %user_id, %address);
252 async move {
253 let (connection_id, handle_io, mut incoming_rx) = this
254 .peer
255 .add_connection(connection, {
256 let executor = executor.clone();
257 move |duration| {
258 let timer = executor.sleep(duration);
259 async move {
260 timer.await;
261 }
262 }
263 })
264 .await;
265
266 tracing::info!(%user_id, %connection_id, %address, "connection opened");
267
268 if let Some(send_connection_id) = send_connection_id.as_mut() {
269 let _ = send_connection_id.send(connection_id).await;
270 }
271
272 let contacts = this.app_state.db.get_contacts(user_id).await?;
273
274 {
275 let mut store = this.store_mut().await;
276 store.add_connection(connection_id, user_id);
277 this.peer.send(connection_id, store.build_initial_contacts_update(contacts))?;
278 }
279
280 let handle_io = handle_io.fuse();
281 futures::pin_mut!(handle_io);
282 loop {
283 let next_message = incoming_rx.next().fuse();
284 futures::pin_mut!(next_message);
285 futures::select_biased! {
286 result = handle_io => {
287 if let Err(error) = result {
288 tracing::error!(%error, "error handling I/O");
289 }
290 break;
291 }
292 message = next_message => {
293 if let Some(message) = message {
294 let type_name = message.payload_type_name();
295 let span = tracing::info_span!("receive message", %user_id, %connection_id, %address, type_name);
296 async {
297 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
298 let notifications = this.notifications.clone();
299 let is_background = message.is_background();
300 let handle_message = (handler)(this.clone(), message);
301 let handle_message = async move {
302 handle_message.await;
303 if let Some(mut notifications) = notifications {
304 let _ = notifications.send(()).await;
305 }
306 };
307 if is_background {
308 executor.spawn_detached(handle_message);
309 } else {
310 handle_message.await;
311 }
312 } else {
313 tracing::error!("no message handler");
314 }
315 }.instrument(span).await;
316 } else {
317 tracing::info!(%user_id, %connection_id, %address, "connection closed");
318 break;
319 }
320 }
321 }
322 }
323
324 if let Err(error) = this.sign_out(connection_id).await {
325 tracing::error!(%error, "error signing out");
326 }
327
328 Ok(())
329 }.instrument(span)
330 }
331
332 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
333 self.peer.disconnect(connection_id);
334 let removed_connection = self.store_mut().await.remove_connection(connection_id)?;
335
336 for (project_id, project) in removed_connection.hosted_projects {
337 if let Some(share) = project.share {
338 broadcast(connection_id, share.guests.keys().copied(), |conn_id| {
339 self.peer
340 .send(conn_id, proto::UnshareProject { project_id })
341 });
342 }
343 }
344
345 for (project_id, peer_ids) in removed_connection.guest_project_ids {
346 broadcast(connection_id, peer_ids, |conn_id| {
347 self.peer.send(
348 conn_id,
349 proto::RemoveProjectCollaborator {
350 project_id,
351 peer_id: connection_id.0,
352 },
353 )
354 });
355 }
356
357 let contacts_to_update = self
358 .app_state
359 .db
360 .get_contacts(removed_connection.user_id)
361 .await?;
362 let store = self.store().await;
363 let mut update = proto::UpdateContacts::default();
364 update
365 .contacts
366 .push(store.contact_for_user(removed_connection.user_id));
367
368 for user_id in contacts_to_update.current {
369 for connection_id in store.connection_ids_for_user(user_id) {
370 self.peer.send(connection_id, update.clone()).trace_err();
371 }
372 }
373
374 Ok(())
375 }
376
377 async fn ping(
378 self: Arc<Server>,
379 _: TypedEnvelope<proto::Ping>,
380 response: Response<proto::Ping>,
381 ) -> Result<()> {
382 response.send(proto::Ack {})?;
383 Ok(())
384 }
385
386 async fn register_project(
387 self: Arc<Server>,
388 request: TypedEnvelope<proto::RegisterProject>,
389 response: Response<proto::RegisterProject>,
390 ) -> Result<()> {
391 let project_id = {
392 let mut state = self.store_mut().await;
393 let user_id = state.user_id_for_connection(request.sender_id)?;
394 state.register_project(request.sender_id, user_id)
395 };
396 response.send(proto::RegisterProjectResponse { project_id })?;
397 Ok(())
398 }
399
400 async fn unregister_project(
401 self: Arc<Server>,
402 request: TypedEnvelope<proto::UnregisterProject>,
403 ) -> Result<()> {
404 let user_id = {
405 let mut state = self.store_mut().await;
406 state.unregister_project(request.payload.project_id, request.sender_id)?;
407 state.user_id_for_connection(request.sender_id)?
408 };
409
410 self.update_user_contacts(user_id).await?;
411 Ok(())
412 }
413
414 async fn share_project(
415 self: Arc<Server>,
416 request: TypedEnvelope<proto::ShareProject>,
417 response: Response<proto::ShareProject>,
418 ) -> Result<()> {
419 let user_id = {
420 let mut state = self.store_mut().await;
421 state.share_project(request.payload.project_id, request.sender_id)?;
422 state.user_id_for_connection(request.sender_id)?
423 };
424 self.update_user_contacts(user_id).await?;
425 response.send(proto::Ack {})?;
426 Ok(())
427 }
428
429 async fn update_user_contacts(self: &Arc<Server>, user_id: UserId) -> Result<()> {
430 let contacts = self.app_state.db.get_contacts(user_id).await?;
431 let store = self.store().await;
432 let updated_contact = store.contact_for_user(user_id);
433 for contact_user_id in contacts.current {
434 for contact_conn_id in store.connection_ids_for_user(contact_user_id) {
435 self.peer
436 .send(
437 contact_conn_id,
438 proto::UpdateContacts {
439 contacts: vec![updated_contact.clone()],
440 remove_contacts: Default::default(),
441 incoming_requests: Default::default(),
442 remove_incoming_requests: Default::default(),
443 outgoing_requests: Default::default(),
444 remove_outgoing_requests: Default::default(),
445 },
446 )
447 .trace_err();
448 }
449 }
450 Ok(())
451 }
452
453 async fn unshare_project(
454 self: Arc<Server>,
455 request: TypedEnvelope<proto::UnshareProject>,
456 ) -> Result<()> {
457 let project_id = request.payload.project_id;
458 let project;
459 {
460 let mut state = self.store_mut().await;
461 project = state.unshare_project(project_id, request.sender_id)?;
462 broadcast(request.sender_id, project.connection_ids, |conn_id| {
463 self.peer
464 .send(conn_id, proto::UnshareProject { project_id })
465 });
466 }
467 self.update_user_contacts(project.host_user_id).await?;
468 Ok(())
469 }
470
471 async fn join_project(
472 self: Arc<Server>,
473 request: TypedEnvelope<proto::JoinProject>,
474 response: Response<proto::JoinProject>,
475 ) -> Result<()> {
476 let project_id = request.payload.project_id;
477 let response_payload;
478 let host_user_id;
479 {
480 let state = &mut *self.store_mut().await;
481 let user_id = state.user_id_for_connection(request.sender_id)?;
482 let joined = state.join_project(request.sender_id, user_id, project_id)?;
483 let share = joined.project.share()?;
484 let peer_count = share.guests.len();
485 let mut collaborators = Vec::with_capacity(peer_count);
486 collaborators.push(proto::Collaborator {
487 peer_id: joined.project.host_connection_id.0,
488 replica_id: 0,
489 user_id: joined.project.host_user_id.to_proto(),
490 });
491 let worktrees = share
492 .worktrees
493 .iter()
494 .filter_map(|(id, shared_worktree)| {
495 let worktree = joined.project.worktrees.get(&id)?;
496 Some(proto::Worktree {
497 id: *id,
498 root_name: worktree.root_name.clone(),
499 entries: shared_worktree.entries.values().cloned().collect(),
500 diagnostic_summaries: shared_worktree
501 .diagnostic_summaries
502 .values()
503 .cloned()
504 .collect(),
505 visible: worktree.visible,
506 scan_id: shared_worktree.scan_id,
507 })
508 })
509 .collect();
510 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
511 if *peer_conn_id != request.sender_id {
512 collaborators.push(proto::Collaborator {
513 peer_id: peer_conn_id.0,
514 replica_id: *peer_replica_id as u32,
515 user_id: peer_user_id.to_proto(),
516 });
517 }
518 }
519 response_payload = proto::JoinProjectResponse {
520 worktrees,
521 replica_id: joined.replica_id as u32,
522 collaborators,
523 language_servers: joined.project.language_servers.clone(),
524 };
525 host_user_id = joined.project.host_user_id;
526 broadcast(
527 request.sender_id,
528 joined.project.connection_ids(),
529 |conn_id| {
530 self.peer.send(
531 conn_id,
532 proto::AddProjectCollaborator {
533 project_id,
534 collaborator: Some(proto::Collaborator {
535 peer_id: request.sender_id.0,
536 replica_id: response_payload.replica_id,
537 user_id: user_id.to_proto(),
538 }),
539 },
540 )
541 },
542 );
543 }
544 self.update_user_contacts(host_user_id).await?;
545 response.send(response_payload)?;
546 Ok(())
547 }
548
549 async fn leave_project(
550 self: Arc<Server>,
551 request: TypedEnvelope<proto::LeaveProject>,
552 ) -> Result<()> {
553 let sender_id = request.sender_id;
554 let project_id = request.payload.project_id;
555 let project;
556 {
557 let mut state = self.store_mut().await;
558 project = state.leave_project(sender_id, project_id)?;
559 broadcast(sender_id, project.connection_ids, |conn_id| {
560 self.peer.send(
561 conn_id,
562 proto::RemoveProjectCollaborator {
563 project_id,
564 peer_id: sender_id.0,
565 },
566 )
567 });
568 }
569 self.update_user_contacts(project.host_user_id).await?;
570 Ok(())
571 }
572
573 async fn register_worktree(
574 self: Arc<Server>,
575 request: TypedEnvelope<proto::RegisterWorktree>,
576 response: Response<proto::RegisterWorktree>,
577 ) -> Result<()> {
578 let host_user_id;
579 {
580 let mut state = self.store_mut().await;
581 host_user_id = state.user_id_for_connection(request.sender_id)?;
582
583 let guest_connection_ids = state
584 .read_project(request.payload.project_id, request.sender_id)?
585 .guest_connection_ids();
586 state.register_worktree(
587 request.payload.project_id,
588 request.payload.worktree_id,
589 request.sender_id,
590 Worktree {
591 root_name: request.payload.root_name.clone(),
592 visible: request.payload.visible,
593 },
594 )?;
595
596 broadcast(request.sender_id, guest_connection_ids, |connection_id| {
597 self.peer
598 .forward_send(request.sender_id, connection_id, request.payload.clone())
599 });
600 }
601 self.update_user_contacts(host_user_id).await?;
602 response.send(proto::Ack {})?;
603 Ok(())
604 }
605
606 async fn unregister_worktree(
607 self: Arc<Server>,
608 request: TypedEnvelope<proto::UnregisterWorktree>,
609 ) -> Result<()> {
610 let host_user_id;
611 let project_id = request.payload.project_id;
612 let worktree_id = request.payload.worktree_id;
613 {
614 let mut state = self.store_mut().await;
615 let (_, guest_connection_ids) =
616 state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
617 host_user_id = state.user_id_for_connection(request.sender_id)?;
618 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
619 self.peer.send(
620 conn_id,
621 proto::UnregisterWorktree {
622 project_id,
623 worktree_id,
624 },
625 )
626 });
627 }
628 self.update_user_contacts(host_user_id).await?;
629 Ok(())
630 }
631
632 async fn update_worktree(
633 self: Arc<Server>,
634 request: TypedEnvelope<proto::UpdateWorktree>,
635 response: Response<proto::UpdateWorktree>,
636 ) -> Result<()> {
637 let connection_ids = self.store_mut().await.update_worktree(
638 request.sender_id,
639 request.payload.project_id,
640 request.payload.worktree_id,
641 &request.payload.removed_entries,
642 &request.payload.updated_entries,
643 request.payload.scan_id,
644 )?;
645
646 broadcast(request.sender_id, connection_ids, |connection_id| {
647 self.peer
648 .forward_send(request.sender_id, connection_id, request.payload.clone())
649 });
650 response.send(proto::Ack {})?;
651 Ok(())
652 }
653
654 async fn update_diagnostic_summary(
655 self: Arc<Server>,
656 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
657 ) -> Result<()> {
658 let summary = request
659 .payload
660 .summary
661 .clone()
662 .ok_or_else(|| anyhow!("invalid summary"))?;
663 let receiver_ids = self.store_mut().await.update_diagnostic_summary(
664 request.payload.project_id,
665 request.payload.worktree_id,
666 request.sender_id,
667 summary,
668 )?;
669
670 broadcast(request.sender_id, receiver_ids, |connection_id| {
671 self.peer
672 .forward_send(request.sender_id, connection_id, request.payload.clone())
673 });
674 Ok(())
675 }
676
677 async fn start_language_server(
678 self: Arc<Server>,
679 request: TypedEnvelope<proto::StartLanguageServer>,
680 ) -> Result<()> {
681 let receiver_ids = self.store_mut().await.start_language_server(
682 request.payload.project_id,
683 request.sender_id,
684 request
685 .payload
686 .server
687 .clone()
688 .ok_or_else(|| anyhow!("invalid language server"))?,
689 )?;
690 broadcast(request.sender_id, receiver_ids, |connection_id| {
691 self.peer
692 .forward_send(request.sender_id, connection_id, request.payload.clone())
693 });
694 Ok(())
695 }
696
697 async fn update_language_server(
698 self: Arc<Server>,
699 request: TypedEnvelope<proto::UpdateLanguageServer>,
700 ) -> Result<()> {
701 let receiver_ids = self
702 .store()
703 .await
704 .project_connection_ids(request.payload.project_id, request.sender_id)?;
705 broadcast(request.sender_id, receiver_ids, |connection_id| {
706 self.peer
707 .forward_send(request.sender_id, connection_id, request.payload.clone())
708 });
709 Ok(())
710 }
711
712 async fn forward_project_request<T>(
713 self: Arc<Server>,
714 request: TypedEnvelope<T>,
715 response: Response<T>,
716 ) -> Result<()>
717 where
718 T: EntityMessage + RequestMessage,
719 {
720 let host_connection_id = self
721 .store()
722 .await
723 .read_project(request.payload.remote_entity_id(), request.sender_id)?
724 .host_connection_id;
725
726 response.send(
727 self.peer
728 .forward_request(request.sender_id, host_connection_id, request.payload)
729 .await?,
730 )?;
731 Ok(())
732 }
733
734 async fn save_buffer(
735 self: Arc<Server>,
736 request: TypedEnvelope<proto::SaveBuffer>,
737 response: Response<proto::SaveBuffer>,
738 ) -> Result<()> {
739 let host = self
740 .store()
741 .await
742 .read_project(request.payload.project_id, request.sender_id)?
743 .host_connection_id;
744 let response_payload = self
745 .peer
746 .forward_request(request.sender_id, host, request.payload.clone())
747 .await?;
748
749 let mut guests = self
750 .store()
751 .await
752 .read_project(request.payload.project_id, request.sender_id)?
753 .connection_ids();
754 guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
755 broadcast(host, guests, |conn_id| {
756 self.peer
757 .forward_send(host, conn_id, response_payload.clone())
758 });
759 response.send(response_payload)?;
760 Ok(())
761 }
762
763 async fn update_buffer(
764 self: Arc<Server>,
765 request: TypedEnvelope<proto::UpdateBuffer>,
766 response: Response<proto::UpdateBuffer>,
767 ) -> Result<()> {
768 let receiver_ids = self
769 .store()
770 .await
771 .project_connection_ids(request.payload.project_id, request.sender_id)?;
772 broadcast(request.sender_id, receiver_ids, |connection_id| {
773 self.peer
774 .forward_send(request.sender_id, connection_id, request.payload.clone())
775 });
776 response.send(proto::Ack {})?;
777 Ok(())
778 }
779
780 async fn update_buffer_file(
781 self: Arc<Server>,
782 request: TypedEnvelope<proto::UpdateBufferFile>,
783 ) -> Result<()> {
784 let receiver_ids = self
785 .store()
786 .await
787 .project_connection_ids(request.payload.project_id, request.sender_id)?;
788 broadcast(request.sender_id, receiver_ids, |connection_id| {
789 self.peer
790 .forward_send(request.sender_id, connection_id, request.payload.clone())
791 });
792 Ok(())
793 }
794
795 async fn buffer_reloaded(
796 self: Arc<Server>,
797 request: TypedEnvelope<proto::BufferReloaded>,
798 ) -> Result<()> {
799 let receiver_ids = self
800 .store()
801 .await
802 .project_connection_ids(request.payload.project_id, request.sender_id)?;
803 broadcast(request.sender_id, receiver_ids, |connection_id| {
804 self.peer
805 .forward_send(request.sender_id, connection_id, request.payload.clone())
806 });
807 Ok(())
808 }
809
810 async fn buffer_saved(
811 self: Arc<Server>,
812 request: TypedEnvelope<proto::BufferSaved>,
813 ) -> Result<()> {
814 let receiver_ids = self
815 .store()
816 .await
817 .project_connection_ids(request.payload.project_id, request.sender_id)?;
818 broadcast(request.sender_id, receiver_ids, |connection_id| {
819 self.peer
820 .forward_send(request.sender_id, connection_id, request.payload.clone())
821 });
822 Ok(())
823 }
824
825 async fn follow(
826 self: Arc<Self>,
827 request: TypedEnvelope<proto::Follow>,
828 response: Response<proto::Follow>,
829 ) -> Result<()> {
830 let leader_id = ConnectionId(request.payload.leader_id);
831 let follower_id = request.sender_id;
832 if !self
833 .store()
834 .await
835 .project_connection_ids(request.payload.project_id, follower_id)?
836 .contains(&leader_id)
837 {
838 Err(anyhow!("no such peer"))?;
839 }
840 let mut response_payload = self
841 .peer
842 .forward_request(request.sender_id, leader_id, request.payload)
843 .await?;
844 response_payload
845 .views
846 .retain(|view| view.leader_id != Some(follower_id.0));
847 response.send(response_payload)?;
848 Ok(())
849 }
850
851 async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
852 let leader_id = ConnectionId(request.payload.leader_id);
853 if !self
854 .store()
855 .await
856 .project_connection_ids(request.payload.project_id, request.sender_id)?
857 .contains(&leader_id)
858 {
859 Err(anyhow!("no such peer"))?;
860 }
861 self.peer
862 .forward_send(request.sender_id, leader_id, request.payload)?;
863 Ok(())
864 }
865
866 async fn update_followers(
867 self: Arc<Self>,
868 request: TypedEnvelope<proto::UpdateFollowers>,
869 ) -> Result<()> {
870 let connection_ids = self
871 .store()
872 .await
873 .project_connection_ids(request.payload.project_id, request.sender_id)?;
874 let leader_id = request
875 .payload
876 .variant
877 .as_ref()
878 .and_then(|variant| match variant {
879 proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
880 proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
881 proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
882 });
883 for follower_id in &request.payload.follower_ids {
884 let follower_id = ConnectionId(*follower_id);
885 if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
886 self.peer
887 .forward_send(request.sender_id, follower_id, request.payload.clone())?;
888 }
889 }
890 Ok(())
891 }
892
893 async fn get_channels(
894 self: Arc<Server>,
895 request: TypedEnvelope<proto::GetChannels>,
896 response: Response<proto::GetChannels>,
897 ) -> Result<()> {
898 let user_id = self
899 .store()
900 .await
901 .user_id_for_connection(request.sender_id)?;
902 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
903 response.send(proto::GetChannelsResponse {
904 channels: channels
905 .into_iter()
906 .map(|chan| proto::Channel {
907 id: chan.id.to_proto(),
908 name: chan.name,
909 })
910 .collect(),
911 })?;
912 Ok(())
913 }
914
915 async fn get_users(
916 self: Arc<Server>,
917 request: TypedEnvelope<proto::GetUsers>,
918 response: Response<proto::GetUsers>,
919 ) -> Result<()> {
920 let user_ids = request
921 .payload
922 .user_ids
923 .into_iter()
924 .map(UserId::from_proto)
925 .collect();
926 let users = self
927 .app_state
928 .db
929 .get_users_by_ids(user_ids)
930 .await?
931 .into_iter()
932 .map(|user| proto::User {
933 id: user.id.to_proto(),
934 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
935 github_login: user.github_login,
936 })
937 .collect();
938 response.send(proto::UsersResponse { users })?;
939 Ok(())
940 }
941
942 async fn fuzzy_search_users(
943 self: Arc<Server>,
944 request: TypedEnvelope<proto::FuzzySearchUsers>,
945 response: Response<proto::FuzzySearchUsers>,
946 ) -> Result<()> {
947 let query = request.payload.query;
948 let db = &self.app_state.db;
949 let users = match query.len() {
950 0 => vec![],
951 1 | 2 => db
952 .get_user_by_github_login(&query)
953 .await?
954 .into_iter()
955 .collect(),
956 _ => db.fuzzy_search_users(&query, 10).await?,
957 };
958 let users = users
959 .into_iter()
960 .map(|user| proto::User {
961 id: user.id.to_proto(),
962 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
963 github_login: user.github_login,
964 })
965 .collect();
966 response.send(proto::UsersResponse { users })?;
967 Ok(())
968 }
969
970 async fn request_contact(
971 self: Arc<Server>,
972 request: TypedEnvelope<proto::RequestContact>,
973 response: Response<proto::RequestContact>,
974 ) -> Result<()> {
975 let requester_id = self
976 .store()
977 .await
978 .user_id_for_connection(request.sender_id)?;
979 let responder_id = UserId::from_proto(request.payload.responder_id);
980 self.app_state
981 .db
982 .send_contact_request(requester_id, responder_id)
983 .await?;
984
985 // Update outgoing contact requests of requester
986 let mut update = proto::UpdateContacts::default();
987 update.outgoing_requests.push(responder_id.to_proto());
988 for connection_id in self.store().await.connection_ids_for_user(requester_id) {
989 self.peer.send(connection_id, update.clone())?;
990 }
991
992 // Update incoming contact requests of responder
993 let mut update = proto::UpdateContacts::default();
994 update
995 .incoming_requests
996 .push(proto::IncomingContactRequest {
997 requester_id: requester_id.to_proto(),
998 should_notify: true,
999 });
1000 for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1001 self.peer.send(connection_id, update.clone())?;
1002 }
1003
1004 response.send(proto::Ack {})?;
1005 Ok(())
1006 }
1007
1008 async fn respond_to_contact_request(
1009 self: Arc<Server>,
1010 request: TypedEnvelope<proto::RespondToContactRequest>,
1011 response: Response<proto::RespondToContactRequest>,
1012 ) -> Result<()> {
1013 let responder_id = self
1014 .store()
1015 .await
1016 .user_id_for_connection(request.sender_id)?;
1017 let requester_id = UserId::from_proto(request.payload.requester_id);
1018 let accept = request.payload.response == proto::ContactRequestResponse::Accept as i32;
1019 self.app_state
1020 .db
1021 .respond_to_contact_request(responder_id, requester_id, accept)
1022 .await?;
1023
1024 let store = self.store().await;
1025 // Update responder with new contact
1026 let mut update = proto::UpdateContacts::default();
1027 if accept {
1028 update.contacts.push(store.contact_for_user(requester_id));
1029 }
1030 update
1031 .remove_incoming_requests
1032 .push(requester_id.to_proto());
1033 for connection_id in store.connection_ids_for_user(responder_id) {
1034 self.peer.send(connection_id, update.clone())?;
1035 }
1036
1037 // Update requester with new contact
1038 let mut update = proto::UpdateContacts::default();
1039 if accept {
1040 update.contacts.push(store.contact_for_user(responder_id));
1041 }
1042 update
1043 .remove_outgoing_requests
1044 .push(responder_id.to_proto());
1045 for connection_id in store.connection_ids_for_user(requester_id) {
1046 self.peer.send(connection_id, update.clone())?;
1047 }
1048
1049 response.send(proto::Ack {})?;
1050 Ok(())
1051 }
1052
1053 // #[instrument(skip(self, state, user_ids))]
1054 // fn update_contacts_for_users<'a>(
1055 // self: &Arc<Self>,
1056 // state: &Store,
1057 // user_ids: impl IntoIterator<Item = &'a UserId>,
1058 // ) {
1059 // for user_id in user_ids {
1060 // let contacts = state.contacts_for_user(*user_id);
1061 // for connection_id in state.connection_ids_for_user(*user_id) {
1062 // self.peer
1063 // .send(
1064 // connection_id,
1065 // proto::UpdateContacts {
1066 // contacts: contacts.clone(),
1067 // pending_requests_from_user_ids: Default::default(),
1068 // pending_requests_to_user_ids: Default::default(),
1069 // },
1070 // )
1071 // .trace_err();
1072 // }
1073 // }
1074 // }
1075
1076 async fn join_channel(
1077 self: Arc<Self>,
1078 request: TypedEnvelope<proto::JoinChannel>,
1079 response: Response<proto::JoinChannel>,
1080 ) -> Result<()> {
1081 let user_id = self
1082 .store()
1083 .await
1084 .user_id_for_connection(request.sender_id)?;
1085 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1086 if !self
1087 .app_state
1088 .db
1089 .can_user_access_channel(user_id, channel_id)
1090 .await?
1091 {
1092 Err(anyhow!("access denied"))?;
1093 }
1094
1095 self.store_mut()
1096 .await
1097 .join_channel(request.sender_id, channel_id);
1098 let messages = self
1099 .app_state
1100 .db
1101 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
1102 .await?
1103 .into_iter()
1104 .map(|msg| proto::ChannelMessage {
1105 id: msg.id.to_proto(),
1106 body: msg.body,
1107 timestamp: msg.sent_at.unix_timestamp() as u64,
1108 sender_id: msg.sender_id.to_proto(),
1109 nonce: Some(msg.nonce.as_u128().into()),
1110 })
1111 .collect::<Vec<_>>();
1112 response.send(proto::JoinChannelResponse {
1113 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1114 messages,
1115 })?;
1116 Ok(())
1117 }
1118
1119 async fn leave_channel(
1120 self: Arc<Self>,
1121 request: TypedEnvelope<proto::LeaveChannel>,
1122 ) -> Result<()> {
1123 let user_id = self
1124 .store()
1125 .await
1126 .user_id_for_connection(request.sender_id)?;
1127 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1128 if !self
1129 .app_state
1130 .db
1131 .can_user_access_channel(user_id, channel_id)
1132 .await?
1133 {
1134 Err(anyhow!("access denied"))?;
1135 }
1136
1137 self.store_mut()
1138 .await
1139 .leave_channel(request.sender_id, channel_id);
1140
1141 Ok(())
1142 }
1143
1144 async fn send_channel_message(
1145 self: Arc<Self>,
1146 request: TypedEnvelope<proto::SendChannelMessage>,
1147 response: Response<proto::SendChannelMessage>,
1148 ) -> Result<()> {
1149 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1150 let user_id;
1151 let connection_ids;
1152 {
1153 let state = self.store().await;
1154 user_id = state.user_id_for_connection(request.sender_id)?;
1155 connection_ids = state.channel_connection_ids(channel_id)?;
1156 }
1157
1158 // Validate the message body.
1159 let body = request.payload.body.trim().to_string();
1160 if body.len() > MAX_MESSAGE_LEN {
1161 return Err(anyhow!("message is too long"))?;
1162 }
1163 if body.is_empty() {
1164 return Err(anyhow!("message can't be blank"))?;
1165 }
1166
1167 let timestamp = OffsetDateTime::now_utc();
1168 let nonce = request
1169 .payload
1170 .nonce
1171 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
1172
1173 let message_id = self
1174 .app_state
1175 .db
1176 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1177 .await?
1178 .to_proto();
1179 let message = proto::ChannelMessage {
1180 sender_id: user_id.to_proto(),
1181 id: message_id,
1182 body,
1183 timestamp: timestamp.unix_timestamp() as u64,
1184 nonce: Some(nonce),
1185 };
1186 broadcast(request.sender_id, connection_ids, |conn_id| {
1187 self.peer.send(
1188 conn_id,
1189 proto::ChannelMessageSent {
1190 channel_id: channel_id.to_proto(),
1191 message: Some(message.clone()),
1192 },
1193 )
1194 });
1195 response.send(proto::SendChannelMessageResponse {
1196 message: Some(message),
1197 })?;
1198 Ok(())
1199 }
1200
1201 async fn get_channel_messages(
1202 self: Arc<Self>,
1203 request: TypedEnvelope<proto::GetChannelMessages>,
1204 response: Response<proto::GetChannelMessages>,
1205 ) -> Result<()> {
1206 let user_id = self
1207 .store()
1208 .await
1209 .user_id_for_connection(request.sender_id)?;
1210 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1211 if !self
1212 .app_state
1213 .db
1214 .can_user_access_channel(user_id, channel_id)
1215 .await?
1216 {
1217 Err(anyhow!("access denied"))?;
1218 }
1219
1220 let messages = self
1221 .app_state
1222 .db
1223 .get_channel_messages(
1224 channel_id,
1225 MESSAGE_COUNT_PER_PAGE,
1226 Some(MessageId::from_proto(request.payload.before_message_id)),
1227 )
1228 .await?
1229 .into_iter()
1230 .map(|msg| proto::ChannelMessage {
1231 id: msg.id.to_proto(),
1232 body: msg.body,
1233 timestamp: msg.sent_at.unix_timestamp() as u64,
1234 sender_id: msg.sender_id.to_proto(),
1235 nonce: Some(msg.nonce.as_u128().into()),
1236 })
1237 .collect::<Vec<_>>();
1238 response.send(proto::GetChannelMessagesResponse {
1239 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1240 messages,
1241 })?;
1242 Ok(())
1243 }
1244
1245 async fn store<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1246 #[cfg(test)]
1247 tokio::task::yield_now().await;
1248 let guard = self.store.read().await;
1249 #[cfg(test)]
1250 tokio::task::yield_now().await;
1251 StoreReadGuard {
1252 guard,
1253 _not_send: PhantomData,
1254 }
1255 }
1256
1257 async fn store_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1258 #[cfg(test)]
1259 tokio::task::yield_now().await;
1260 let guard = self.store.write().await;
1261 #[cfg(test)]
1262 tokio::task::yield_now().await;
1263 StoreWriteGuard {
1264 guard,
1265 _not_send: PhantomData,
1266 }
1267 }
1268}
1269
1270impl<'a> Deref for StoreReadGuard<'a> {
1271 type Target = Store;
1272
1273 fn deref(&self) -> &Self::Target {
1274 &*self.guard
1275 }
1276}
1277
1278impl<'a> Deref for StoreWriteGuard<'a> {
1279 type Target = Store;
1280
1281 fn deref(&self) -> &Self::Target {
1282 &*self.guard
1283 }
1284}
1285
1286impl<'a> DerefMut for StoreWriteGuard<'a> {
1287 fn deref_mut(&mut self) -> &mut Self::Target {
1288 &mut *self.guard
1289 }
1290}
1291
1292impl<'a> Drop for StoreWriteGuard<'a> {
1293 fn drop(&mut self) {
1294 #[cfg(test)]
1295 self.check_invariants();
1296
1297 let metrics = self.metrics();
1298 tracing::info!(
1299 connections = metrics.connections,
1300 registered_projects = metrics.registered_projects,
1301 shared_projects = metrics.shared_projects,
1302 "metrics"
1303 );
1304 }
1305}
1306
1307impl Executor for RealExecutor {
1308 type Sleep = Sleep;
1309
1310 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1311 tokio::task::spawn(future);
1312 }
1313
1314 fn sleep(&self, duration: Duration) -> Self::Sleep {
1315 tokio::time::sleep(duration)
1316 }
1317}
1318
1319fn broadcast<F>(
1320 sender_id: ConnectionId,
1321 receiver_ids: impl IntoIterator<Item = ConnectionId>,
1322 mut f: F,
1323) where
1324 F: FnMut(ConnectionId) -> anyhow::Result<()>,
1325{
1326 for receiver_id in receiver_ids {
1327 if receiver_id != sender_id {
1328 f(receiver_id).trace_err();
1329 }
1330 }
1331}
1332
1333lazy_static! {
1334 static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1335}
1336
1337pub struct ProtocolVersion(u32);
1338
1339impl Header for ProtocolVersion {
1340 fn name() -> &'static HeaderName {
1341 &ZED_PROTOCOL_VERSION
1342 }
1343
1344 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1345 where
1346 Self: Sized,
1347 I: Iterator<Item = &'i axum::http::HeaderValue>,
1348 {
1349 let version = values
1350 .next()
1351 .ok_or_else(|| axum::headers::Error::invalid())?
1352 .to_str()
1353 .map_err(|_| axum::headers::Error::invalid())?
1354 .parse()
1355 .map_err(|_| axum::headers::Error::invalid())?;
1356 Ok(Self(version))
1357 }
1358
1359 fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1360 values.extend([self.0.to_string().parse().unwrap()]);
1361 }
1362}
1363
1364pub fn routes(app_state: Arc<AppState>) -> Router<Body> {
1365 let server = Server::new(app_state.clone(), None);
1366 Router::new()
1367 .route("/rpc", get(handle_websocket_request))
1368 .layer(
1369 ServiceBuilder::new()
1370 .layer(Extension(app_state))
1371 .layer(middleware::from_fn(auth::validate_header))
1372 .layer(Extension(server)),
1373 )
1374}
1375
1376pub async fn handle_websocket_request(
1377 TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1378 ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1379 Extension(server): Extension<Arc<Server>>,
1380 Extension(user_id): Extension<UserId>,
1381 ws: WebSocketUpgrade,
1382) -> axum::response::Response {
1383 if protocol_version != rpc::PROTOCOL_VERSION {
1384 return (
1385 StatusCode::UPGRADE_REQUIRED,
1386 "client must be upgraded".to_string(),
1387 )
1388 .into_response();
1389 }
1390 let socket_address = socket_address.to_string();
1391 ws.on_upgrade(move |socket| {
1392 use util::ResultExt;
1393 let socket = socket
1394 .map_ok(to_tungstenite_message)
1395 .err_into()
1396 .with(|message| async move { Ok(to_axum_message(message)) });
1397 let connection = Connection::new(Box::pin(socket));
1398 async move {
1399 server
1400 .handle_connection(connection, socket_address, user_id, None, RealExecutor)
1401 .await
1402 .log_err();
1403 }
1404 })
1405}
1406
1407fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1408 match message {
1409 TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1410 TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1411 TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1412 TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1413 TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1414 code: frame.code.into(),
1415 reason: frame.reason,
1416 })),
1417 }
1418}
1419
1420fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1421 match message {
1422 AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1423 AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1424 AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1425 AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1426 AxumMessage::Close(frame) => {
1427 TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1428 code: frame.code.into(),
1429 reason: frame.reason,
1430 }))
1431 }
1432 }
1433}
1434
1435pub trait ResultExt {
1436 type Ok;
1437
1438 fn trace_err(self) -> Option<Self::Ok>;
1439}
1440
1441impl<T, E> ResultExt for Result<T, E>
1442where
1443 E: std::fmt::Debug,
1444{
1445 type Ok = T;
1446
1447 fn trace_err(self) -> Option<T> {
1448 match self {
1449 Ok(value) => Some(value),
1450 Err(error) => {
1451 tracing::error!("{:?}", error);
1452 None
1453 }
1454 }
1455 }
1456}
1457
1458#[cfg(test)]
1459mod tests {
1460 use super::*;
1461 use crate::{
1462 db::{tests::TestDb, UserId},
1463 AppState,
1464 };
1465 use ::rpc::Peer;
1466 use client::{
1467 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1468 EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1469 };
1470 use collections::{BTreeMap, HashSet};
1471 use editor::{
1472 self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1473 ToOffset, ToggleCodeActions, Undo,
1474 };
1475 use gpui::{
1476 executor::{self, Deterministic},
1477 geometry::vector::vec2f,
1478 ModelHandle, TestAppContext, ViewHandle,
1479 };
1480 use language::{
1481 range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1482 LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1483 };
1484 use lsp::{self, FakeLanguageServer};
1485 use parking_lot::Mutex;
1486 use project::{
1487 fs::{FakeFs, Fs as _},
1488 search::SearchQuery,
1489 worktree::WorktreeHandle,
1490 DiagnosticSummary, Project, ProjectPath, WorktreeId,
1491 };
1492 use rand::prelude::*;
1493 use rpc::PeerId;
1494 use serde_json::json;
1495 use settings::Settings;
1496 use sqlx::types::time::OffsetDateTime;
1497 use std::{
1498 env,
1499 ops::Deref,
1500 path::{Path, PathBuf},
1501 rc::Rc,
1502 sync::{
1503 atomic::{AtomicBool, Ordering::SeqCst},
1504 Arc,
1505 },
1506 time::Duration,
1507 };
1508 use theme::ThemeRegistry;
1509 use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
1510
1511 #[cfg(test)]
1512 #[ctor::ctor]
1513 fn init_logger() {
1514 if std::env::var("RUST_LOG").is_ok() {
1515 env_logger::init();
1516 }
1517 }
1518
1519 #[gpui::test(iterations = 10)]
1520 async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1521 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1522 let lang_registry = Arc::new(LanguageRegistry::test());
1523 let fs = FakeFs::new(cx_a.background());
1524 cx_a.foreground().forbid_parking();
1525
1526 // Connect to a server as 2 clients.
1527 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1528 let client_a = server.create_client(cx_a, "user_a").await;
1529 let client_b = server.create_client(cx_b, "user_b").await;
1530
1531 // Share a project as client A
1532 fs.insert_tree(
1533 "/a",
1534 json!({
1535 ".zed.toml": r#"collaborators = ["user_b"]"#,
1536 "a.txt": "a-contents",
1537 "b.txt": "b-contents",
1538 }),
1539 )
1540 .await;
1541 let project_a = cx_a.update(|cx| {
1542 Project::local(
1543 client_a.clone(),
1544 client_a.user_store.clone(),
1545 lang_registry.clone(),
1546 fs.clone(),
1547 cx,
1548 )
1549 });
1550 let (worktree_a, _) = project_a
1551 .update(cx_a, |p, cx| {
1552 p.find_or_create_local_worktree("/a", true, cx)
1553 })
1554 .await
1555 .unwrap();
1556 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1557 worktree_a
1558 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1559 .await;
1560 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1561 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1562
1563 // Join that project as client B
1564 let project_b = Project::remote(
1565 project_id,
1566 client_b.clone(),
1567 client_b.user_store.clone(),
1568 lang_registry.clone(),
1569 fs.clone(),
1570 &mut cx_b.to_async(),
1571 )
1572 .await
1573 .unwrap();
1574
1575 let replica_id_b = project_b.read_with(cx_b, |project, _| {
1576 assert_eq!(
1577 project
1578 .collaborators()
1579 .get(&client_a.peer_id)
1580 .unwrap()
1581 .user
1582 .github_login,
1583 "user_a"
1584 );
1585 project.replica_id()
1586 });
1587 project_a
1588 .condition(&cx_a, |tree, _| {
1589 tree.collaborators()
1590 .get(&client_b.peer_id)
1591 .map_or(false, |collaborator| {
1592 collaborator.replica_id == replica_id_b
1593 && collaborator.user.github_login == "user_b"
1594 })
1595 })
1596 .await;
1597
1598 // Open the same file as client B and client A.
1599 let buffer_b = project_b
1600 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1601 .await
1602 .unwrap();
1603 buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1604 project_a.read_with(cx_a, |project, cx| {
1605 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1606 });
1607 let buffer_a = project_a
1608 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1609 .await
1610 .unwrap();
1611
1612 let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1613
1614 // TODO
1615 // // Create a selection set as client B and see that selection set as client A.
1616 // buffer_a
1617 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1618 // .await;
1619
1620 // Edit the buffer as client B and see that edit as client A.
1621 editor_b.update(cx_b, |editor, cx| {
1622 editor.handle_input(&Input("ok, ".into()), cx)
1623 });
1624 buffer_a
1625 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1626 .await;
1627
1628 // TODO
1629 // // Remove the selection set as client B, see those selections disappear as client A.
1630 cx_b.update(move |_| drop(editor_b));
1631 // buffer_a
1632 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1633 // .await;
1634
1635 // Dropping the client B's project removes client B from client A's collaborators.
1636 cx_b.update(move |_| drop(project_b));
1637 project_a
1638 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1639 .await;
1640 }
1641
1642 #[gpui::test(iterations = 10)]
1643 async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1644 let lang_registry = Arc::new(LanguageRegistry::test());
1645 let fs = FakeFs::new(cx_a.background());
1646 cx_a.foreground().forbid_parking();
1647
1648 // Connect to a server as 2 clients.
1649 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1650 let client_a = server.create_client(cx_a, "user_a").await;
1651 let client_b = server.create_client(cx_b, "user_b").await;
1652
1653 // Share a project as client A
1654 fs.insert_tree(
1655 "/a",
1656 json!({
1657 ".zed.toml": r#"collaborators = ["user_b"]"#,
1658 "a.txt": "a-contents",
1659 "b.txt": "b-contents",
1660 }),
1661 )
1662 .await;
1663 let project_a = cx_a.update(|cx| {
1664 Project::local(
1665 client_a.clone(),
1666 client_a.user_store.clone(),
1667 lang_registry.clone(),
1668 fs.clone(),
1669 cx,
1670 )
1671 });
1672 let (worktree_a, _) = project_a
1673 .update(cx_a, |p, cx| {
1674 p.find_or_create_local_worktree("/a", true, cx)
1675 })
1676 .await
1677 .unwrap();
1678 worktree_a
1679 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1680 .await;
1681 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1682 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1683 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1684 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1685
1686 // Join that project as client B
1687 let project_b = Project::remote(
1688 project_id,
1689 client_b.clone(),
1690 client_b.user_store.clone(),
1691 lang_registry.clone(),
1692 fs.clone(),
1693 &mut cx_b.to_async(),
1694 )
1695 .await
1696 .unwrap();
1697 project_b
1698 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1699 .await
1700 .unwrap();
1701
1702 // Unshare the project as client A
1703 project_a.update(cx_a, |project, cx| project.unshare(cx));
1704 project_b
1705 .condition(cx_b, |project, _| project.is_read_only())
1706 .await;
1707 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1708 cx_b.update(|_| {
1709 drop(project_b);
1710 });
1711
1712 // Share the project again and ensure guests can still join.
1713 project_a
1714 .update(cx_a, |project, cx| project.share(cx))
1715 .await
1716 .unwrap();
1717 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1718
1719 let project_b2 = Project::remote(
1720 project_id,
1721 client_b.clone(),
1722 client_b.user_store.clone(),
1723 lang_registry.clone(),
1724 fs.clone(),
1725 &mut cx_b.to_async(),
1726 )
1727 .await
1728 .unwrap();
1729 project_b2
1730 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1731 .await
1732 .unwrap();
1733 }
1734
1735 #[gpui::test(iterations = 10)]
1736 async fn test_host_disconnect(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1737 let lang_registry = Arc::new(LanguageRegistry::test());
1738 let fs = FakeFs::new(cx_a.background());
1739 cx_a.foreground().forbid_parking();
1740
1741 // Connect to a server as 2 clients.
1742 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1743 let client_a = server.create_client(cx_a, "user_a").await;
1744 let client_b = server.create_client(cx_b, "user_b").await;
1745
1746 // Share a project as client A
1747 fs.insert_tree(
1748 "/a",
1749 json!({
1750 ".zed.toml": r#"collaborators = ["user_b"]"#,
1751 "a.txt": "a-contents",
1752 "b.txt": "b-contents",
1753 }),
1754 )
1755 .await;
1756 let project_a = cx_a.update(|cx| {
1757 Project::local(
1758 client_a.clone(),
1759 client_a.user_store.clone(),
1760 lang_registry.clone(),
1761 fs.clone(),
1762 cx,
1763 )
1764 });
1765 let (worktree_a, _) = project_a
1766 .update(cx_a, |p, cx| {
1767 p.find_or_create_local_worktree("/a", true, cx)
1768 })
1769 .await
1770 .unwrap();
1771 worktree_a
1772 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1773 .await;
1774 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1775 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1776 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1777 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1778
1779 // Join that project as client B
1780 let project_b = Project::remote(
1781 project_id,
1782 client_b.clone(),
1783 client_b.user_store.clone(),
1784 lang_registry.clone(),
1785 fs.clone(),
1786 &mut cx_b.to_async(),
1787 )
1788 .await
1789 .unwrap();
1790 project_b
1791 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1792 .await
1793 .unwrap();
1794
1795 // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
1796 server.disconnect_client(client_a.current_user_id(cx_a));
1797 cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
1798 project_a
1799 .condition(cx_a, |project, _| project.collaborators().is_empty())
1800 .await;
1801 project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
1802 project_b
1803 .condition(cx_b, |project, _| project.is_read_only())
1804 .await;
1805 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1806 cx_b.update(|_| {
1807 drop(project_b);
1808 });
1809
1810 // Await reconnection
1811 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1812
1813 // Share the project again and ensure guests can still join.
1814 project_a
1815 .update(cx_a, |project, cx| project.share(cx))
1816 .await
1817 .unwrap();
1818 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1819
1820 let project_b2 = Project::remote(
1821 project_id,
1822 client_b.clone(),
1823 client_b.user_store.clone(),
1824 lang_registry.clone(),
1825 fs.clone(),
1826 &mut cx_b.to_async(),
1827 )
1828 .await
1829 .unwrap();
1830 project_b2
1831 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1832 .await
1833 .unwrap();
1834 }
1835
1836 #[gpui::test(iterations = 10)]
1837 async fn test_propagate_saves_and_fs_changes(
1838 cx_a: &mut TestAppContext,
1839 cx_b: &mut TestAppContext,
1840 cx_c: &mut TestAppContext,
1841 ) {
1842 let lang_registry = Arc::new(LanguageRegistry::test());
1843 let fs = FakeFs::new(cx_a.background());
1844 cx_a.foreground().forbid_parking();
1845
1846 // Connect to a server as 3 clients.
1847 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1848 let client_a = server.create_client(cx_a, "user_a").await;
1849 let client_b = server.create_client(cx_b, "user_b").await;
1850 let client_c = server.create_client(cx_c, "user_c").await;
1851
1852 // Share a worktree as client A.
1853 fs.insert_tree(
1854 "/a",
1855 json!({
1856 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1857 "file1": "",
1858 "file2": ""
1859 }),
1860 )
1861 .await;
1862 let project_a = cx_a.update(|cx| {
1863 Project::local(
1864 client_a.clone(),
1865 client_a.user_store.clone(),
1866 lang_registry.clone(),
1867 fs.clone(),
1868 cx,
1869 )
1870 });
1871 let (worktree_a, _) = project_a
1872 .update(cx_a, |p, cx| {
1873 p.find_or_create_local_worktree("/a", true, cx)
1874 })
1875 .await
1876 .unwrap();
1877 worktree_a
1878 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1879 .await;
1880 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1881 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1882 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1883
1884 // Join that worktree as clients B and C.
1885 let project_b = Project::remote(
1886 project_id,
1887 client_b.clone(),
1888 client_b.user_store.clone(),
1889 lang_registry.clone(),
1890 fs.clone(),
1891 &mut cx_b.to_async(),
1892 )
1893 .await
1894 .unwrap();
1895 let project_c = Project::remote(
1896 project_id,
1897 client_c.clone(),
1898 client_c.user_store.clone(),
1899 lang_registry.clone(),
1900 fs.clone(),
1901 &mut cx_c.to_async(),
1902 )
1903 .await
1904 .unwrap();
1905 let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1906 let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1907
1908 // Open and edit a buffer as both guests B and C.
1909 let buffer_b = project_b
1910 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1911 .await
1912 .unwrap();
1913 let buffer_c = project_c
1914 .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1915 .await
1916 .unwrap();
1917 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "i-am-b, ")], cx));
1918 buffer_c.update(cx_c, |buf, cx| buf.edit([(0..0, "i-am-c, ")], cx));
1919
1920 // Open and edit that buffer as the host.
1921 let buffer_a = project_a
1922 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1923 .await
1924 .unwrap();
1925
1926 buffer_a
1927 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1928 .await;
1929 buffer_a.update(cx_a, |buf, cx| {
1930 buf.edit([(buf.len()..buf.len(), "i-am-a")], cx)
1931 });
1932
1933 // Wait for edits to propagate
1934 buffer_a
1935 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1936 .await;
1937 buffer_b
1938 .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1939 .await;
1940 buffer_c
1941 .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1942 .await;
1943
1944 // Edit the buffer as the host and concurrently save as guest B.
1945 let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1946 buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "hi-a, ")], cx));
1947 save_b.await.unwrap();
1948 assert_eq!(
1949 fs.load("/a/file1".as_ref()).await.unwrap(),
1950 "hi-a, i-am-c, i-am-b, i-am-a"
1951 );
1952 buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1953 buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1954 buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1955
1956 worktree_a.flush_fs_events(cx_a).await;
1957
1958 // Make changes on host's file system, see those changes on guest worktrees.
1959 fs.rename(
1960 "/a/file1".as_ref(),
1961 "/a/file1-renamed".as_ref(),
1962 Default::default(),
1963 )
1964 .await
1965 .unwrap();
1966
1967 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1968 .await
1969 .unwrap();
1970 fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1971
1972 worktree_a
1973 .condition(&cx_a, |tree, _| {
1974 tree.paths()
1975 .map(|p| p.to_string_lossy())
1976 .collect::<Vec<_>>()
1977 == [".zed.toml", "file1-renamed", "file3", "file4"]
1978 })
1979 .await;
1980 worktree_b
1981 .condition(&cx_b, |tree, _| {
1982 tree.paths()
1983 .map(|p| p.to_string_lossy())
1984 .collect::<Vec<_>>()
1985 == [".zed.toml", "file1-renamed", "file3", "file4"]
1986 })
1987 .await;
1988 worktree_c
1989 .condition(&cx_c, |tree, _| {
1990 tree.paths()
1991 .map(|p| p.to_string_lossy())
1992 .collect::<Vec<_>>()
1993 == [".zed.toml", "file1-renamed", "file3", "file4"]
1994 })
1995 .await;
1996
1997 // Ensure buffer files are updated as well.
1998 buffer_a
1999 .condition(&cx_a, |buf, _| {
2000 buf.file().unwrap().path().to_str() == Some("file1-renamed")
2001 })
2002 .await;
2003 buffer_b
2004 .condition(&cx_b, |buf, _| {
2005 buf.file().unwrap().path().to_str() == Some("file1-renamed")
2006 })
2007 .await;
2008 buffer_c
2009 .condition(&cx_c, |buf, _| {
2010 buf.file().unwrap().path().to_str() == Some("file1-renamed")
2011 })
2012 .await;
2013 }
2014
2015 #[gpui::test(iterations = 10)]
2016 async fn test_fs_operations(
2017 executor: Arc<Deterministic>,
2018 cx_a: &mut TestAppContext,
2019 cx_b: &mut TestAppContext,
2020 ) {
2021 executor.forbid_parking();
2022 let fs = FakeFs::new(cx_a.background());
2023
2024 // Connect to a server as 2 clients.
2025 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2026 let mut client_a = server.create_client(cx_a, "user_a").await;
2027 let mut client_b = server.create_client(cx_b, "user_b").await;
2028
2029 // Share a project as client A
2030 fs.insert_tree(
2031 "/dir",
2032 json!({
2033 ".zed.toml": r#"collaborators = ["user_b"]"#,
2034 "a.txt": "a-contents",
2035 "b.txt": "b-contents",
2036 }),
2037 )
2038 .await;
2039
2040 let (project_a, worktree_id) = client_a.build_local_project(fs, "/dir", cx_a).await;
2041 let project_id = project_a.read_with(cx_a, |project, _| project.remote_id().unwrap());
2042 project_a
2043 .update(cx_a, |project, cx| project.share(cx))
2044 .await
2045 .unwrap();
2046
2047 let project_b = client_b.build_remote_project(project_id, cx_b).await;
2048
2049 let worktree_a =
2050 project_a.read_with(cx_a, |project, cx| project.worktrees(cx).next().unwrap());
2051 let worktree_b =
2052 project_b.read_with(cx_b, |project, cx| project.worktrees(cx).next().unwrap());
2053
2054 let entry = project_b
2055 .update(cx_b, |project, cx| {
2056 project
2057 .create_entry((worktree_id, "c.txt"), false, cx)
2058 .unwrap()
2059 })
2060 .await
2061 .unwrap();
2062 worktree_a.read_with(cx_a, |worktree, _| {
2063 assert_eq!(
2064 worktree
2065 .paths()
2066 .map(|p| p.to_string_lossy())
2067 .collect::<Vec<_>>(),
2068 [".zed.toml", "a.txt", "b.txt", "c.txt"]
2069 );
2070 });
2071 worktree_b.read_with(cx_b, |worktree, _| {
2072 assert_eq!(
2073 worktree
2074 .paths()
2075 .map(|p| p.to_string_lossy())
2076 .collect::<Vec<_>>(),
2077 [".zed.toml", "a.txt", "b.txt", "c.txt"]
2078 );
2079 });
2080
2081 project_b
2082 .update(cx_b, |project, cx| {
2083 project.rename_entry(entry.id, Path::new("d.txt"), cx)
2084 })
2085 .unwrap()
2086 .await
2087 .unwrap();
2088 worktree_a.read_with(cx_a, |worktree, _| {
2089 assert_eq!(
2090 worktree
2091 .paths()
2092 .map(|p| p.to_string_lossy())
2093 .collect::<Vec<_>>(),
2094 [".zed.toml", "a.txt", "b.txt", "d.txt"]
2095 );
2096 });
2097 worktree_b.read_with(cx_b, |worktree, _| {
2098 assert_eq!(
2099 worktree
2100 .paths()
2101 .map(|p| p.to_string_lossy())
2102 .collect::<Vec<_>>(),
2103 [".zed.toml", "a.txt", "b.txt", "d.txt"]
2104 );
2105 });
2106
2107 let dir_entry = project_b
2108 .update(cx_b, |project, cx| {
2109 project
2110 .create_entry((worktree_id, "DIR"), true, cx)
2111 .unwrap()
2112 })
2113 .await
2114 .unwrap();
2115 worktree_a.read_with(cx_a, |worktree, _| {
2116 assert_eq!(
2117 worktree
2118 .paths()
2119 .map(|p| p.to_string_lossy())
2120 .collect::<Vec<_>>(),
2121 [".zed.toml", "DIR", "a.txt", "b.txt", "d.txt"]
2122 );
2123 });
2124 worktree_b.read_with(cx_b, |worktree, _| {
2125 assert_eq!(
2126 worktree
2127 .paths()
2128 .map(|p| p.to_string_lossy())
2129 .collect::<Vec<_>>(),
2130 [".zed.toml", "DIR", "a.txt", "b.txt", "d.txt"]
2131 );
2132 });
2133
2134 project_b
2135 .update(cx_b, |project, cx| {
2136 project.delete_entry(dir_entry.id, cx).unwrap()
2137 })
2138 .await
2139 .unwrap();
2140 worktree_a.read_with(cx_a, |worktree, _| {
2141 assert_eq!(
2142 worktree
2143 .paths()
2144 .map(|p| p.to_string_lossy())
2145 .collect::<Vec<_>>(),
2146 [".zed.toml", "a.txt", "b.txt", "d.txt"]
2147 );
2148 });
2149 worktree_b.read_with(cx_b, |worktree, _| {
2150 assert_eq!(
2151 worktree
2152 .paths()
2153 .map(|p| p.to_string_lossy())
2154 .collect::<Vec<_>>(),
2155 [".zed.toml", "a.txt", "b.txt", "d.txt"]
2156 );
2157 });
2158
2159 project_b
2160 .update(cx_b, |project, cx| {
2161 project.delete_entry(entry.id, cx).unwrap()
2162 })
2163 .await
2164 .unwrap();
2165 worktree_a.read_with(cx_a, |worktree, _| {
2166 assert_eq!(
2167 worktree
2168 .paths()
2169 .map(|p| p.to_string_lossy())
2170 .collect::<Vec<_>>(),
2171 [".zed.toml", "a.txt", "b.txt"]
2172 );
2173 });
2174 worktree_b.read_with(cx_b, |worktree, _| {
2175 assert_eq!(
2176 worktree
2177 .paths()
2178 .map(|p| p.to_string_lossy())
2179 .collect::<Vec<_>>(),
2180 [".zed.toml", "a.txt", "b.txt"]
2181 );
2182 });
2183 }
2184
2185 #[gpui::test(iterations = 10)]
2186 async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2187 cx_a.foreground().forbid_parking();
2188 let lang_registry = Arc::new(LanguageRegistry::test());
2189 let fs = FakeFs::new(cx_a.background());
2190
2191 // Connect to a server as 2 clients.
2192 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2193 let client_a = server.create_client(cx_a, "user_a").await;
2194 let client_b = server.create_client(cx_b, "user_b").await;
2195
2196 // Share a project as client A
2197 fs.insert_tree(
2198 "/dir",
2199 json!({
2200 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2201 "a.txt": "a-contents",
2202 }),
2203 )
2204 .await;
2205
2206 let project_a = cx_a.update(|cx| {
2207 Project::local(
2208 client_a.clone(),
2209 client_a.user_store.clone(),
2210 lang_registry.clone(),
2211 fs.clone(),
2212 cx,
2213 )
2214 });
2215 let (worktree_a, _) = project_a
2216 .update(cx_a, |p, cx| {
2217 p.find_or_create_local_worktree("/dir", true, cx)
2218 })
2219 .await
2220 .unwrap();
2221 worktree_a
2222 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2223 .await;
2224 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2225 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2226 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2227
2228 // Join that project as client B
2229 let project_b = Project::remote(
2230 project_id,
2231 client_b.clone(),
2232 client_b.user_store.clone(),
2233 lang_registry.clone(),
2234 fs.clone(),
2235 &mut cx_b.to_async(),
2236 )
2237 .await
2238 .unwrap();
2239
2240 // Open a buffer as client B
2241 let buffer_b = project_b
2242 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2243 .await
2244 .unwrap();
2245
2246 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "world ")], cx));
2247 buffer_b.read_with(cx_b, |buf, _| {
2248 assert!(buf.is_dirty());
2249 assert!(!buf.has_conflict());
2250 });
2251
2252 buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
2253 buffer_b
2254 .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
2255 .await;
2256 buffer_b.read_with(cx_b, |buf, _| {
2257 assert!(!buf.has_conflict());
2258 });
2259
2260 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "hello ")], cx));
2261 buffer_b.read_with(cx_b, |buf, _| {
2262 assert!(buf.is_dirty());
2263 assert!(!buf.has_conflict());
2264 });
2265 }
2266
2267 #[gpui::test(iterations = 10)]
2268 async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2269 cx_a.foreground().forbid_parking();
2270 let lang_registry = Arc::new(LanguageRegistry::test());
2271 let fs = FakeFs::new(cx_a.background());
2272
2273 // Connect to a server as 2 clients.
2274 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2275 let client_a = server.create_client(cx_a, "user_a").await;
2276 let client_b = server.create_client(cx_b, "user_b").await;
2277
2278 // Share a project as client A
2279 fs.insert_tree(
2280 "/dir",
2281 json!({
2282 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2283 "a.txt": "a-contents",
2284 }),
2285 )
2286 .await;
2287
2288 let project_a = cx_a.update(|cx| {
2289 Project::local(
2290 client_a.clone(),
2291 client_a.user_store.clone(),
2292 lang_registry.clone(),
2293 fs.clone(),
2294 cx,
2295 )
2296 });
2297 let (worktree_a, _) = project_a
2298 .update(cx_a, |p, cx| {
2299 p.find_or_create_local_worktree("/dir", true, cx)
2300 })
2301 .await
2302 .unwrap();
2303 worktree_a
2304 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2305 .await;
2306 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2307 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2308 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2309
2310 // Join that project as client B
2311 let project_b = Project::remote(
2312 project_id,
2313 client_b.clone(),
2314 client_b.user_store.clone(),
2315 lang_registry.clone(),
2316 fs.clone(),
2317 &mut cx_b.to_async(),
2318 )
2319 .await
2320 .unwrap();
2321 let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2322
2323 // Open a buffer as client B
2324 let buffer_b = project_b
2325 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2326 .await
2327 .unwrap();
2328 buffer_b.read_with(cx_b, |buf, _| {
2329 assert!(!buf.is_dirty());
2330 assert!(!buf.has_conflict());
2331 });
2332
2333 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
2334 .await
2335 .unwrap();
2336 buffer_b
2337 .condition(&cx_b, |buf, _| {
2338 buf.text() == "new contents" && !buf.is_dirty()
2339 })
2340 .await;
2341 buffer_b.read_with(cx_b, |buf, _| {
2342 assert!(!buf.has_conflict());
2343 });
2344 }
2345
2346 #[gpui::test(iterations = 10)]
2347 async fn test_editing_while_guest_opens_buffer(
2348 cx_a: &mut TestAppContext,
2349 cx_b: &mut TestAppContext,
2350 ) {
2351 cx_a.foreground().forbid_parking();
2352 let lang_registry = Arc::new(LanguageRegistry::test());
2353 let fs = FakeFs::new(cx_a.background());
2354
2355 // Connect to a server as 2 clients.
2356 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2357 let client_a = server.create_client(cx_a, "user_a").await;
2358 let client_b = server.create_client(cx_b, "user_b").await;
2359
2360 // Share a project as client A
2361 fs.insert_tree(
2362 "/dir",
2363 json!({
2364 ".zed.toml": r#"collaborators = ["user_b"]"#,
2365 "a.txt": "a-contents",
2366 }),
2367 )
2368 .await;
2369 let project_a = cx_a.update(|cx| {
2370 Project::local(
2371 client_a.clone(),
2372 client_a.user_store.clone(),
2373 lang_registry.clone(),
2374 fs.clone(),
2375 cx,
2376 )
2377 });
2378 let (worktree_a, _) = project_a
2379 .update(cx_a, |p, cx| {
2380 p.find_or_create_local_worktree("/dir", true, cx)
2381 })
2382 .await
2383 .unwrap();
2384 worktree_a
2385 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2386 .await;
2387 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2388 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2389 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2390
2391 // Join that project as client B
2392 let project_b = Project::remote(
2393 project_id,
2394 client_b.clone(),
2395 client_b.user_store.clone(),
2396 lang_registry.clone(),
2397 fs.clone(),
2398 &mut cx_b.to_async(),
2399 )
2400 .await
2401 .unwrap();
2402
2403 // Open a buffer as client A
2404 let buffer_a = project_a
2405 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2406 .await
2407 .unwrap();
2408
2409 // Start opening the same buffer as client B
2410 let buffer_b = cx_b
2411 .background()
2412 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2413
2414 // Edit the buffer as client A while client B is still opening it.
2415 cx_b.background().simulate_random_delay().await;
2416 buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "X")], cx));
2417 cx_b.background().simulate_random_delay().await;
2418 buffer_a.update(cx_a, |buf, cx| buf.edit([(1..1, "Y")], cx));
2419
2420 let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2421 let buffer_b = buffer_b.await.unwrap();
2422 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2423 }
2424
2425 #[gpui::test(iterations = 10)]
2426 async fn test_leaving_worktree_while_opening_buffer(
2427 cx_a: &mut TestAppContext,
2428 cx_b: &mut TestAppContext,
2429 ) {
2430 cx_a.foreground().forbid_parking();
2431 let lang_registry = Arc::new(LanguageRegistry::test());
2432 let fs = FakeFs::new(cx_a.background());
2433
2434 // Connect to a server as 2 clients.
2435 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2436 let client_a = server.create_client(cx_a, "user_a").await;
2437 let client_b = server.create_client(cx_b, "user_b").await;
2438
2439 // Share a project as client A
2440 fs.insert_tree(
2441 "/dir",
2442 json!({
2443 ".zed.toml": r#"collaborators = ["user_b"]"#,
2444 "a.txt": "a-contents",
2445 }),
2446 )
2447 .await;
2448 let project_a = cx_a.update(|cx| {
2449 Project::local(
2450 client_a.clone(),
2451 client_a.user_store.clone(),
2452 lang_registry.clone(),
2453 fs.clone(),
2454 cx,
2455 )
2456 });
2457 let (worktree_a, _) = project_a
2458 .update(cx_a, |p, cx| {
2459 p.find_or_create_local_worktree("/dir", true, cx)
2460 })
2461 .await
2462 .unwrap();
2463 worktree_a
2464 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2465 .await;
2466 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2467 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2468 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2469
2470 // Join that project as client B
2471 let project_b = Project::remote(
2472 project_id,
2473 client_b.clone(),
2474 client_b.user_store.clone(),
2475 lang_registry.clone(),
2476 fs.clone(),
2477 &mut cx_b.to_async(),
2478 )
2479 .await
2480 .unwrap();
2481
2482 // See that a guest has joined as client A.
2483 project_a
2484 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2485 .await;
2486
2487 // Begin opening a buffer as client B, but leave the project before the open completes.
2488 let buffer_b = cx_b
2489 .background()
2490 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2491 cx_b.update(|_| drop(project_b));
2492 drop(buffer_b);
2493
2494 // See that the guest has left.
2495 project_a
2496 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2497 .await;
2498 }
2499
2500 #[gpui::test(iterations = 10)]
2501 async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2502 cx_a.foreground().forbid_parking();
2503 let lang_registry = Arc::new(LanguageRegistry::test());
2504 let fs = FakeFs::new(cx_a.background());
2505
2506 // Connect to a server as 2 clients.
2507 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2508 let client_a = server.create_client(cx_a, "user_a").await;
2509 let client_b = server.create_client(cx_b, "user_b").await;
2510
2511 // Share a project as client A
2512 fs.insert_tree(
2513 "/a",
2514 json!({
2515 ".zed.toml": r#"collaborators = ["user_b"]"#,
2516 "a.txt": "a-contents",
2517 "b.txt": "b-contents",
2518 }),
2519 )
2520 .await;
2521 let project_a = cx_a.update(|cx| {
2522 Project::local(
2523 client_a.clone(),
2524 client_a.user_store.clone(),
2525 lang_registry.clone(),
2526 fs.clone(),
2527 cx,
2528 )
2529 });
2530 let (worktree_a, _) = project_a
2531 .update(cx_a, |p, cx| {
2532 p.find_or_create_local_worktree("/a", true, cx)
2533 })
2534 .await
2535 .unwrap();
2536 worktree_a
2537 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2538 .await;
2539 let project_id = project_a
2540 .update(cx_a, |project, _| project.next_remote_id())
2541 .await;
2542 project_a
2543 .update(cx_a, |project, cx| project.share(cx))
2544 .await
2545 .unwrap();
2546
2547 // Join that project as client B
2548 let _project_b = Project::remote(
2549 project_id,
2550 client_b.clone(),
2551 client_b.user_store.clone(),
2552 lang_registry.clone(),
2553 fs.clone(),
2554 &mut cx_b.to_async(),
2555 )
2556 .await
2557 .unwrap();
2558
2559 // Client A sees that a guest has joined.
2560 project_a
2561 .condition(cx_a, |p, _| p.collaborators().len() == 1)
2562 .await;
2563
2564 // Drop client B's connection and ensure client A observes client B leaving the project.
2565 client_b.disconnect(&cx_b.to_async()).unwrap();
2566 project_a
2567 .condition(cx_a, |p, _| p.collaborators().len() == 0)
2568 .await;
2569
2570 // Rejoin the project as client B
2571 let _project_b = Project::remote(
2572 project_id,
2573 client_b.clone(),
2574 client_b.user_store.clone(),
2575 lang_registry.clone(),
2576 fs.clone(),
2577 &mut cx_b.to_async(),
2578 )
2579 .await
2580 .unwrap();
2581
2582 // Client A sees that a guest has re-joined.
2583 project_a
2584 .condition(cx_a, |p, _| p.collaborators().len() == 1)
2585 .await;
2586
2587 // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2588 client_b.wait_for_current_user(cx_b).await;
2589 server.disconnect_client(client_b.current_user_id(cx_b));
2590 cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
2591 project_a
2592 .condition(cx_a, |p, _| p.collaborators().len() == 0)
2593 .await;
2594 }
2595
2596 #[gpui::test(iterations = 10)]
2597 async fn test_collaborating_with_diagnostics(
2598 cx_a: &mut TestAppContext,
2599 cx_b: &mut TestAppContext,
2600 ) {
2601 cx_a.foreground().forbid_parking();
2602 let lang_registry = Arc::new(LanguageRegistry::test());
2603 let fs = FakeFs::new(cx_a.background());
2604
2605 // Set up a fake language server.
2606 let mut language = Language::new(
2607 LanguageConfig {
2608 name: "Rust".into(),
2609 path_suffixes: vec!["rs".to_string()],
2610 ..Default::default()
2611 },
2612 Some(tree_sitter_rust::language()),
2613 );
2614 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2615 lang_registry.add(Arc::new(language));
2616
2617 // Connect to a server as 2 clients.
2618 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2619 let client_a = server.create_client(cx_a, "user_a").await;
2620 let client_b = server.create_client(cx_b, "user_b").await;
2621
2622 // Share a project as client A
2623 fs.insert_tree(
2624 "/a",
2625 json!({
2626 ".zed.toml": r#"collaborators = ["user_b"]"#,
2627 "a.rs": "let one = two",
2628 "other.rs": "",
2629 }),
2630 )
2631 .await;
2632 let project_a = cx_a.update(|cx| {
2633 Project::local(
2634 client_a.clone(),
2635 client_a.user_store.clone(),
2636 lang_registry.clone(),
2637 fs.clone(),
2638 cx,
2639 )
2640 });
2641 let (worktree_a, _) = project_a
2642 .update(cx_a, |p, cx| {
2643 p.find_or_create_local_worktree("/a", true, cx)
2644 })
2645 .await
2646 .unwrap();
2647 worktree_a
2648 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2649 .await;
2650 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2651 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2652 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2653
2654 // Cause the language server to start.
2655 let _ = cx_a
2656 .background()
2657 .spawn(project_a.update(cx_a, |project, cx| {
2658 project.open_buffer(
2659 ProjectPath {
2660 worktree_id,
2661 path: Path::new("other.rs").into(),
2662 },
2663 cx,
2664 )
2665 }))
2666 .await
2667 .unwrap();
2668
2669 // Simulate a language server reporting errors for a file.
2670 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2671 fake_language_server
2672 .receive_notification::<lsp::notification::DidOpenTextDocument>()
2673 .await;
2674 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2675 lsp::PublishDiagnosticsParams {
2676 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2677 version: None,
2678 diagnostics: vec![lsp::Diagnostic {
2679 severity: Some(lsp::DiagnosticSeverity::ERROR),
2680 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2681 message: "message 1".to_string(),
2682 ..Default::default()
2683 }],
2684 },
2685 );
2686
2687 // Wait for server to see the diagnostics update.
2688 server
2689 .condition(|store| {
2690 let worktree = store
2691 .project(project_id)
2692 .unwrap()
2693 .share
2694 .as_ref()
2695 .unwrap()
2696 .worktrees
2697 .get(&worktree_id.to_proto())
2698 .unwrap();
2699
2700 !worktree.diagnostic_summaries.is_empty()
2701 })
2702 .await;
2703
2704 // Join the worktree as client B.
2705 let project_b = Project::remote(
2706 project_id,
2707 client_b.clone(),
2708 client_b.user_store.clone(),
2709 lang_registry.clone(),
2710 fs.clone(),
2711 &mut cx_b.to_async(),
2712 )
2713 .await
2714 .unwrap();
2715
2716 project_b.read_with(cx_b, |project, cx| {
2717 assert_eq!(
2718 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2719 &[(
2720 ProjectPath {
2721 worktree_id,
2722 path: Arc::from(Path::new("a.rs")),
2723 },
2724 DiagnosticSummary {
2725 error_count: 1,
2726 warning_count: 0,
2727 ..Default::default()
2728 },
2729 )]
2730 )
2731 });
2732
2733 // Simulate a language server reporting more errors for a file.
2734 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2735 lsp::PublishDiagnosticsParams {
2736 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2737 version: None,
2738 diagnostics: vec![
2739 lsp::Diagnostic {
2740 severity: Some(lsp::DiagnosticSeverity::ERROR),
2741 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2742 message: "message 1".to_string(),
2743 ..Default::default()
2744 },
2745 lsp::Diagnostic {
2746 severity: Some(lsp::DiagnosticSeverity::WARNING),
2747 range: lsp::Range::new(
2748 lsp::Position::new(0, 10),
2749 lsp::Position::new(0, 13),
2750 ),
2751 message: "message 2".to_string(),
2752 ..Default::default()
2753 },
2754 ],
2755 },
2756 );
2757
2758 // Client b gets the updated summaries
2759 project_b
2760 .condition(&cx_b, |project, cx| {
2761 project.diagnostic_summaries(cx).collect::<Vec<_>>()
2762 == &[(
2763 ProjectPath {
2764 worktree_id,
2765 path: Arc::from(Path::new("a.rs")),
2766 },
2767 DiagnosticSummary {
2768 error_count: 1,
2769 warning_count: 1,
2770 ..Default::default()
2771 },
2772 )]
2773 })
2774 .await;
2775
2776 // Open the file with the errors on client B. They should be present.
2777 let buffer_b = cx_b
2778 .background()
2779 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2780 .await
2781 .unwrap();
2782
2783 buffer_b.read_with(cx_b, |buffer, _| {
2784 assert_eq!(
2785 buffer
2786 .snapshot()
2787 .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2788 .map(|entry| entry)
2789 .collect::<Vec<_>>(),
2790 &[
2791 DiagnosticEntry {
2792 range: Point::new(0, 4)..Point::new(0, 7),
2793 diagnostic: Diagnostic {
2794 group_id: 0,
2795 message: "message 1".to_string(),
2796 severity: lsp::DiagnosticSeverity::ERROR,
2797 is_primary: true,
2798 ..Default::default()
2799 }
2800 },
2801 DiagnosticEntry {
2802 range: Point::new(0, 10)..Point::new(0, 13),
2803 diagnostic: Diagnostic {
2804 group_id: 1,
2805 severity: lsp::DiagnosticSeverity::WARNING,
2806 message: "message 2".to_string(),
2807 is_primary: true,
2808 ..Default::default()
2809 }
2810 }
2811 ]
2812 );
2813 });
2814
2815 // Simulate a language server reporting no errors for a file.
2816 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2817 lsp::PublishDiagnosticsParams {
2818 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2819 version: None,
2820 diagnostics: vec![],
2821 },
2822 );
2823 project_a
2824 .condition(cx_a, |project, cx| {
2825 project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2826 })
2827 .await;
2828 project_b
2829 .condition(cx_b, |project, cx| {
2830 project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2831 })
2832 .await;
2833 }
2834
2835 #[gpui::test(iterations = 10)]
2836 async fn test_collaborating_with_completion(
2837 cx_a: &mut TestAppContext,
2838 cx_b: &mut TestAppContext,
2839 ) {
2840 cx_a.foreground().forbid_parking();
2841 let lang_registry = Arc::new(LanguageRegistry::test());
2842 let fs = FakeFs::new(cx_a.background());
2843
2844 // Set up a fake language server.
2845 let mut language = Language::new(
2846 LanguageConfig {
2847 name: "Rust".into(),
2848 path_suffixes: vec!["rs".to_string()],
2849 ..Default::default()
2850 },
2851 Some(tree_sitter_rust::language()),
2852 );
2853 let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
2854 capabilities: lsp::ServerCapabilities {
2855 completion_provider: Some(lsp::CompletionOptions {
2856 trigger_characters: Some(vec![".".to_string()]),
2857 ..Default::default()
2858 }),
2859 ..Default::default()
2860 },
2861 ..Default::default()
2862 });
2863 lang_registry.add(Arc::new(language));
2864
2865 // Connect to a server as 2 clients.
2866 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2867 let client_a = server.create_client(cx_a, "user_a").await;
2868 let client_b = server.create_client(cx_b, "user_b").await;
2869
2870 // Share a project as client A
2871 fs.insert_tree(
2872 "/a",
2873 json!({
2874 ".zed.toml": r#"collaborators = ["user_b"]"#,
2875 "main.rs": "fn main() { a }",
2876 "other.rs": "",
2877 }),
2878 )
2879 .await;
2880 let project_a = cx_a.update(|cx| {
2881 Project::local(
2882 client_a.clone(),
2883 client_a.user_store.clone(),
2884 lang_registry.clone(),
2885 fs.clone(),
2886 cx,
2887 )
2888 });
2889 let (worktree_a, _) = project_a
2890 .update(cx_a, |p, cx| {
2891 p.find_or_create_local_worktree("/a", true, cx)
2892 })
2893 .await
2894 .unwrap();
2895 worktree_a
2896 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2897 .await;
2898 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2899 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2900 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2901
2902 // Join the worktree as client B.
2903 let project_b = Project::remote(
2904 project_id,
2905 client_b.clone(),
2906 client_b.user_store.clone(),
2907 lang_registry.clone(),
2908 fs.clone(),
2909 &mut cx_b.to_async(),
2910 )
2911 .await
2912 .unwrap();
2913
2914 // Open a file in an editor as the guest.
2915 let buffer_b = project_b
2916 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2917 .await
2918 .unwrap();
2919 let (window_b, _) = cx_b.add_window(|_| EmptyView);
2920 let editor_b = cx_b.add_view(window_b, |cx| {
2921 Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
2922 });
2923
2924 let fake_language_server = fake_language_servers.next().await.unwrap();
2925 buffer_b
2926 .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2927 .await;
2928
2929 // Type a completion trigger character as the guest.
2930 editor_b.update(cx_b, |editor, cx| {
2931 editor.select_ranges([13..13], None, cx);
2932 editor.handle_input(&Input(".".into()), cx);
2933 cx.focus(&editor_b);
2934 });
2935
2936 // Receive a completion request as the host's language server.
2937 // Return some completions from the host's language server.
2938 cx_a.foreground().start_waiting();
2939 fake_language_server
2940 .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
2941 assert_eq!(
2942 params.text_document_position.text_document.uri,
2943 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2944 );
2945 assert_eq!(
2946 params.text_document_position.position,
2947 lsp::Position::new(0, 14),
2948 );
2949
2950 Ok(Some(lsp::CompletionResponse::Array(vec![
2951 lsp::CompletionItem {
2952 label: "first_method(…)".into(),
2953 detail: Some("fn(&mut self, B) -> C".into()),
2954 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2955 new_text: "first_method($1)".to_string(),
2956 range: lsp::Range::new(
2957 lsp::Position::new(0, 14),
2958 lsp::Position::new(0, 14),
2959 ),
2960 })),
2961 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2962 ..Default::default()
2963 },
2964 lsp::CompletionItem {
2965 label: "second_method(…)".into(),
2966 detail: Some("fn(&mut self, C) -> D<E>".into()),
2967 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2968 new_text: "second_method()".to_string(),
2969 range: lsp::Range::new(
2970 lsp::Position::new(0, 14),
2971 lsp::Position::new(0, 14),
2972 ),
2973 })),
2974 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2975 ..Default::default()
2976 },
2977 ])))
2978 })
2979 .next()
2980 .await
2981 .unwrap();
2982 cx_a.foreground().finish_waiting();
2983
2984 // Open the buffer on the host.
2985 let buffer_a = project_a
2986 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2987 .await
2988 .unwrap();
2989 buffer_a
2990 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2991 .await;
2992
2993 // Confirm a completion on the guest.
2994 editor_b
2995 .condition(&cx_b, |editor, _| editor.context_menu_visible())
2996 .await;
2997 editor_b.update(cx_b, |editor, cx| {
2998 editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
2999 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
3000 });
3001
3002 // Return a resolved completion from the host's language server.
3003 // The resolved completion has an additional text edit.
3004 fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
3005 |params, _| async move {
3006 assert_eq!(params.label, "first_method(…)");
3007 Ok(lsp::CompletionItem {
3008 label: "first_method(…)".into(),
3009 detail: Some("fn(&mut self, B) -> C".into()),
3010 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3011 new_text: "first_method($1)".to_string(),
3012 range: lsp::Range::new(
3013 lsp::Position::new(0, 14),
3014 lsp::Position::new(0, 14),
3015 ),
3016 })),
3017 additional_text_edits: Some(vec![lsp::TextEdit {
3018 new_text: "use d::SomeTrait;\n".to_string(),
3019 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
3020 }]),
3021 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3022 ..Default::default()
3023 })
3024 },
3025 );
3026
3027 // The additional edit is applied.
3028 buffer_a
3029 .condition(&cx_a, |buffer, _| {
3030 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3031 })
3032 .await;
3033 buffer_b
3034 .condition(&cx_b, |buffer, _| {
3035 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3036 })
3037 .await;
3038 }
3039
3040 #[gpui::test(iterations = 10)]
3041 async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3042 cx_a.foreground().forbid_parking();
3043 let lang_registry = Arc::new(LanguageRegistry::test());
3044 let fs = FakeFs::new(cx_a.background());
3045
3046 // Connect to a server as 2 clients.
3047 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3048 let client_a = server.create_client(cx_a, "user_a").await;
3049 let client_b = server.create_client(cx_b, "user_b").await;
3050
3051 // Share a project as client A
3052 fs.insert_tree(
3053 "/a",
3054 json!({
3055 ".zed.toml": r#"collaborators = ["user_b"]"#,
3056 "a.rs": "let one = 1;",
3057 }),
3058 )
3059 .await;
3060 let project_a = cx_a.update(|cx| {
3061 Project::local(
3062 client_a.clone(),
3063 client_a.user_store.clone(),
3064 lang_registry.clone(),
3065 fs.clone(),
3066 cx,
3067 )
3068 });
3069 let (worktree_a, _) = project_a
3070 .update(cx_a, |p, cx| {
3071 p.find_or_create_local_worktree("/a", true, cx)
3072 })
3073 .await
3074 .unwrap();
3075 worktree_a
3076 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3077 .await;
3078 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3079 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3080 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3081 let buffer_a = project_a
3082 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
3083 .await
3084 .unwrap();
3085
3086 // Join the worktree as client B.
3087 let project_b = Project::remote(
3088 project_id,
3089 client_b.clone(),
3090 client_b.user_store.clone(),
3091 lang_registry.clone(),
3092 fs.clone(),
3093 &mut cx_b.to_async(),
3094 )
3095 .await
3096 .unwrap();
3097
3098 let buffer_b = cx_b
3099 .background()
3100 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3101 .await
3102 .unwrap();
3103 buffer_b.update(cx_b, |buffer, cx| {
3104 buffer.edit([(4..7, "six")], cx);
3105 buffer.edit([(10..11, "6")], cx);
3106 assert_eq!(buffer.text(), "let six = 6;");
3107 assert!(buffer.is_dirty());
3108 assert!(!buffer.has_conflict());
3109 });
3110 buffer_a
3111 .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
3112 .await;
3113
3114 fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
3115 .await
3116 .unwrap();
3117 buffer_a
3118 .condition(cx_a, |buffer, _| buffer.has_conflict())
3119 .await;
3120 buffer_b
3121 .condition(cx_b, |buffer, _| buffer.has_conflict())
3122 .await;
3123
3124 project_b
3125 .update(cx_b, |project, cx| {
3126 project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
3127 })
3128 .await
3129 .unwrap();
3130 buffer_a.read_with(cx_a, |buffer, _| {
3131 assert_eq!(buffer.text(), "let seven = 7;");
3132 assert!(!buffer.is_dirty());
3133 assert!(!buffer.has_conflict());
3134 });
3135 buffer_b.read_with(cx_b, |buffer, _| {
3136 assert_eq!(buffer.text(), "let seven = 7;");
3137 assert!(!buffer.is_dirty());
3138 assert!(!buffer.has_conflict());
3139 });
3140
3141 buffer_a.update(cx_a, |buffer, cx| {
3142 // Undoing on the host is a no-op when the reload was initiated by the guest.
3143 buffer.undo(cx);
3144 assert_eq!(buffer.text(), "let seven = 7;");
3145 assert!(!buffer.is_dirty());
3146 assert!(!buffer.has_conflict());
3147 });
3148 buffer_b.update(cx_b, |buffer, cx| {
3149 // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
3150 buffer.undo(cx);
3151 assert_eq!(buffer.text(), "let six = 6;");
3152 assert!(buffer.is_dirty());
3153 assert!(!buffer.has_conflict());
3154 });
3155 }
3156
3157 #[gpui::test(iterations = 10)]
3158 async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3159 cx_a.foreground().forbid_parking();
3160 let lang_registry = Arc::new(LanguageRegistry::test());
3161 let fs = FakeFs::new(cx_a.background());
3162
3163 // Set up a fake language server.
3164 let mut language = Language::new(
3165 LanguageConfig {
3166 name: "Rust".into(),
3167 path_suffixes: vec!["rs".to_string()],
3168 ..Default::default()
3169 },
3170 Some(tree_sitter_rust::language()),
3171 );
3172 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3173 lang_registry.add(Arc::new(language));
3174
3175 // Connect to a server as 2 clients.
3176 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3177 let client_a = server.create_client(cx_a, "user_a").await;
3178 let client_b = server.create_client(cx_b, "user_b").await;
3179
3180 // Share a project as client A
3181 fs.insert_tree(
3182 "/a",
3183 json!({
3184 ".zed.toml": r#"collaborators = ["user_b"]"#,
3185 "a.rs": "let one = two",
3186 }),
3187 )
3188 .await;
3189 let project_a = cx_a.update(|cx| {
3190 Project::local(
3191 client_a.clone(),
3192 client_a.user_store.clone(),
3193 lang_registry.clone(),
3194 fs.clone(),
3195 cx,
3196 )
3197 });
3198 let (worktree_a, _) = project_a
3199 .update(cx_a, |p, cx| {
3200 p.find_or_create_local_worktree("/a", true, cx)
3201 })
3202 .await
3203 .unwrap();
3204 worktree_a
3205 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3206 .await;
3207 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3208 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3209 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3210
3211 // Join the worktree as client B.
3212 let project_b = Project::remote(
3213 project_id,
3214 client_b.clone(),
3215 client_b.user_store.clone(),
3216 lang_registry.clone(),
3217 fs.clone(),
3218 &mut cx_b.to_async(),
3219 )
3220 .await
3221 .unwrap();
3222
3223 let buffer_b = cx_b
3224 .background()
3225 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3226 .await
3227 .unwrap();
3228
3229 let fake_language_server = fake_language_servers.next().await.unwrap();
3230 fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
3231 Ok(Some(vec![
3232 lsp::TextEdit {
3233 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
3234 new_text: "h".to_string(),
3235 },
3236 lsp::TextEdit {
3237 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
3238 new_text: "y".to_string(),
3239 },
3240 ]))
3241 });
3242
3243 project_b
3244 .update(cx_b, |project, cx| {
3245 project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
3246 })
3247 .await
3248 .unwrap();
3249 assert_eq!(
3250 buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
3251 "let honey = two"
3252 );
3253 }
3254
3255 #[gpui::test(iterations = 10)]
3256 async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3257 cx_a.foreground().forbid_parking();
3258 let lang_registry = Arc::new(LanguageRegistry::test());
3259 let fs = FakeFs::new(cx_a.background());
3260 fs.insert_tree(
3261 "/root-1",
3262 json!({
3263 ".zed.toml": r#"collaborators = ["user_b"]"#,
3264 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
3265 }),
3266 )
3267 .await;
3268 fs.insert_tree(
3269 "/root-2",
3270 json!({
3271 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
3272 }),
3273 )
3274 .await;
3275
3276 // Set up a fake language server.
3277 let mut language = Language::new(
3278 LanguageConfig {
3279 name: "Rust".into(),
3280 path_suffixes: vec!["rs".to_string()],
3281 ..Default::default()
3282 },
3283 Some(tree_sitter_rust::language()),
3284 );
3285 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3286 lang_registry.add(Arc::new(language));
3287
3288 // Connect to a server as 2 clients.
3289 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3290 let client_a = server.create_client(cx_a, "user_a").await;
3291 let client_b = server.create_client(cx_b, "user_b").await;
3292
3293 // Share a project as client A
3294 let project_a = cx_a.update(|cx| {
3295 Project::local(
3296 client_a.clone(),
3297 client_a.user_store.clone(),
3298 lang_registry.clone(),
3299 fs.clone(),
3300 cx,
3301 )
3302 });
3303 let (worktree_a, _) = project_a
3304 .update(cx_a, |p, cx| {
3305 p.find_or_create_local_worktree("/root-1", true, cx)
3306 })
3307 .await
3308 .unwrap();
3309 worktree_a
3310 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3311 .await;
3312 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3313 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3314 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3315
3316 // Join the worktree as client B.
3317 let project_b = Project::remote(
3318 project_id,
3319 client_b.clone(),
3320 client_b.user_store.clone(),
3321 lang_registry.clone(),
3322 fs.clone(),
3323 &mut cx_b.to_async(),
3324 )
3325 .await
3326 .unwrap();
3327
3328 // Open the file on client B.
3329 let buffer_b = cx_b
3330 .background()
3331 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3332 .await
3333 .unwrap();
3334
3335 // Request the definition of a symbol as the guest.
3336 let fake_language_server = fake_language_servers.next().await.unwrap();
3337 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3338 |_, _| async move {
3339 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3340 lsp::Location::new(
3341 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3342 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3343 ),
3344 )))
3345 },
3346 );
3347
3348 let definitions_1 = project_b
3349 .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
3350 .await
3351 .unwrap();
3352 cx_b.read(|cx| {
3353 assert_eq!(definitions_1.len(), 1);
3354 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3355 let target_buffer = definitions_1[0].buffer.read(cx);
3356 assert_eq!(
3357 target_buffer.text(),
3358 "const TWO: usize = 2;\nconst THREE: usize = 3;"
3359 );
3360 assert_eq!(
3361 definitions_1[0].range.to_point(target_buffer),
3362 Point::new(0, 6)..Point::new(0, 9)
3363 );
3364 });
3365
3366 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
3367 // the previous call to `definition`.
3368 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3369 |_, _| async move {
3370 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3371 lsp::Location::new(
3372 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3373 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
3374 ),
3375 )))
3376 },
3377 );
3378
3379 let definitions_2 = project_b
3380 .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
3381 .await
3382 .unwrap();
3383 cx_b.read(|cx| {
3384 assert_eq!(definitions_2.len(), 1);
3385 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3386 let target_buffer = definitions_2[0].buffer.read(cx);
3387 assert_eq!(
3388 target_buffer.text(),
3389 "const TWO: usize = 2;\nconst THREE: usize = 3;"
3390 );
3391 assert_eq!(
3392 definitions_2[0].range.to_point(target_buffer),
3393 Point::new(1, 6)..Point::new(1, 11)
3394 );
3395 });
3396 assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
3397 }
3398
3399 #[gpui::test(iterations = 10)]
3400 async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3401 cx_a.foreground().forbid_parking();
3402 let lang_registry = Arc::new(LanguageRegistry::test());
3403 let fs = FakeFs::new(cx_a.background());
3404 fs.insert_tree(
3405 "/root-1",
3406 json!({
3407 ".zed.toml": r#"collaborators = ["user_b"]"#,
3408 "one.rs": "const ONE: usize = 1;",
3409 "two.rs": "const TWO: usize = one::ONE + one::ONE;",
3410 }),
3411 )
3412 .await;
3413 fs.insert_tree(
3414 "/root-2",
3415 json!({
3416 "three.rs": "const THREE: usize = two::TWO + one::ONE;",
3417 }),
3418 )
3419 .await;
3420
3421 // Set up a fake language server.
3422 let mut language = Language::new(
3423 LanguageConfig {
3424 name: "Rust".into(),
3425 path_suffixes: vec!["rs".to_string()],
3426 ..Default::default()
3427 },
3428 Some(tree_sitter_rust::language()),
3429 );
3430 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3431 lang_registry.add(Arc::new(language));
3432
3433 // Connect to a server as 2 clients.
3434 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3435 let client_a = server.create_client(cx_a, "user_a").await;
3436 let client_b = server.create_client(cx_b, "user_b").await;
3437
3438 // Share a project as client A
3439 let project_a = cx_a.update(|cx| {
3440 Project::local(
3441 client_a.clone(),
3442 client_a.user_store.clone(),
3443 lang_registry.clone(),
3444 fs.clone(),
3445 cx,
3446 )
3447 });
3448 let (worktree_a, _) = project_a
3449 .update(cx_a, |p, cx| {
3450 p.find_or_create_local_worktree("/root-1", true, cx)
3451 })
3452 .await
3453 .unwrap();
3454 worktree_a
3455 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3456 .await;
3457 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3458 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3459 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3460
3461 // Join the worktree as client B.
3462 let project_b = Project::remote(
3463 project_id,
3464 client_b.clone(),
3465 client_b.user_store.clone(),
3466 lang_registry.clone(),
3467 fs.clone(),
3468 &mut cx_b.to_async(),
3469 )
3470 .await
3471 .unwrap();
3472
3473 // Open the file on client B.
3474 let buffer_b = cx_b
3475 .background()
3476 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3477 .await
3478 .unwrap();
3479
3480 // Request references to a symbol as the guest.
3481 let fake_language_server = fake_language_servers.next().await.unwrap();
3482 fake_language_server.handle_request::<lsp::request::References, _, _>(
3483 |params, _| async move {
3484 assert_eq!(
3485 params.text_document_position.text_document.uri.as_str(),
3486 "file:///root-1/one.rs"
3487 );
3488 Ok(Some(vec![
3489 lsp::Location {
3490 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3491 range: lsp::Range::new(
3492 lsp::Position::new(0, 24),
3493 lsp::Position::new(0, 27),
3494 ),
3495 },
3496 lsp::Location {
3497 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3498 range: lsp::Range::new(
3499 lsp::Position::new(0, 35),
3500 lsp::Position::new(0, 38),
3501 ),
3502 },
3503 lsp::Location {
3504 uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3505 range: lsp::Range::new(
3506 lsp::Position::new(0, 37),
3507 lsp::Position::new(0, 40),
3508 ),
3509 },
3510 ]))
3511 },
3512 );
3513
3514 let references = project_b
3515 .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3516 .await
3517 .unwrap();
3518 cx_b.read(|cx| {
3519 assert_eq!(references.len(), 3);
3520 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3521
3522 let two_buffer = references[0].buffer.read(cx);
3523 let three_buffer = references[2].buffer.read(cx);
3524 assert_eq!(
3525 two_buffer.file().unwrap().path().as_ref(),
3526 Path::new("two.rs")
3527 );
3528 assert_eq!(references[1].buffer, references[0].buffer);
3529 assert_eq!(
3530 three_buffer.file().unwrap().full_path(cx),
3531 Path::new("three.rs")
3532 );
3533
3534 assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3535 assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3536 assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3537 });
3538 }
3539
3540 #[gpui::test(iterations = 10)]
3541 async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3542 cx_a.foreground().forbid_parking();
3543 let lang_registry = Arc::new(LanguageRegistry::test());
3544 let fs = FakeFs::new(cx_a.background());
3545 fs.insert_tree(
3546 "/root-1",
3547 json!({
3548 ".zed.toml": r#"collaborators = ["user_b"]"#,
3549 "a": "hello world",
3550 "b": "goodnight moon",
3551 "c": "a world of goo",
3552 "d": "world champion of clown world",
3553 }),
3554 )
3555 .await;
3556 fs.insert_tree(
3557 "/root-2",
3558 json!({
3559 "e": "disney world is fun",
3560 }),
3561 )
3562 .await;
3563
3564 // Connect to a server as 2 clients.
3565 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3566 let client_a = server.create_client(cx_a, "user_a").await;
3567 let client_b = server.create_client(cx_b, "user_b").await;
3568
3569 // Share a project as client A
3570 let project_a = cx_a.update(|cx| {
3571 Project::local(
3572 client_a.clone(),
3573 client_a.user_store.clone(),
3574 lang_registry.clone(),
3575 fs.clone(),
3576 cx,
3577 )
3578 });
3579 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3580
3581 let (worktree_1, _) = project_a
3582 .update(cx_a, |p, cx| {
3583 p.find_or_create_local_worktree("/root-1", true, cx)
3584 })
3585 .await
3586 .unwrap();
3587 worktree_1
3588 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3589 .await;
3590 let (worktree_2, _) = project_a
3591 .update(cx_a, |p, cx| {
3592 p.find_or_create_local_worktree("/root-2", true, cx)
3593 })
3594 .await
3595 .unwrap();
3596 worktree_2
3597 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3598 .await;
3599
3600 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3601
3602 // Join the worktree as client B.
3603 let project_b = Project::remote(
3604 project_id,
3605 client_b.clone(),
3606 client_b.user_store.clone(),
3607 lang_registry.clone(),
3608 fs.clone(),
3609 &mut cx_b.to_async(),
3610 )
3611 .await
3612 .unwrap();
3613
3614 let results = project_b
3615 .update(cx_b, |project, cx| {
3616 project.search(SearchQuery::text("world", false, false), cx)
3617 })
3618 .await
3619 .unwrap();
3620
3621 let mut ranges_by_path = results
3622 .into_iter()
3623 .map(|(buffer, ranges)| {
3624 buffer.read_with(cx_b, |buffer, cx| {
3625 let path = buffer.file().unwrap().full_path(cx);
3626 let offset_ranges = ranges
3627 .into_iter()
3628 .map(|range| range.to_offset(buffer))
3629 .collect::<Vec<_>>();
3630 (path, offset_ranges)
3631 })
3632 })
3633 .collect::<Vec<_>>();
3634 ranges_by_path.sort_by_key(|(path, _)| path.clone());
3635
3636 assert_eq!(
3637 ranges_by_path,
3638 &[
3639 (PathBuf::from("root-1/a"), vec![6..11]),
3640 (PathBuf::from("root-1/c"), vec![2..7]),
3641 (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3642 (PathBuf::from("root-2/e"), vec![7..12]),
3643 ]
3644 );
3645 }
3646
3647 #[gpui::test(iterations = 10)]
3648 async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3649 cx_a.foreground().forbid_parking();
3650 let lang_registry = Arc::new(LanguageRegistry::test());
3651 let fs = FakeFs::new(cx_a.background());
3652 fs.insert_tree(
3653 "/root-1",
3654 json!({
3655 ".zed.toml": r#"collaborators = ["user_b"]"#,
3656 "main.rs": "fn double(number: i32) -> i32 { number + number }",
3657 }),
3658 )
3659 .await;
3660
3661 // Set up a fake language server.
3662 let mut language = Language::new(
3663 LanguageConfig {
3664 name: "Rust".into(),
3665 path_suffixes: vec!["rs".to_string()],
3666 ..Default::default()
3667 },
3668 Some(tree_sitter_rust::language()),
3669 );
3670 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3671 lang_registry.add(Arc::new(language));
3672
3673 // Connect to a server as 2 clients.
3674 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3675 let client_a = server.create_client(cx_a, "user_a").await;
3676 let client_b = server.create_client(cx_b, "user_b").await;
3677
3678 // Share a project as client A
3679 let project_a = cx_a.update(|cx| {
3680 Project::local(
3681 client_a.clone(),
3682 client_a.user_store.clone(),
3683 lang_registry.clone(),
3684 fs.clone(),
3685 cx,
3686 )
3687 });
3688 let (worktree_a, _) = project_a
3689 .update(cx_a, |p, cx| {
3690 p.find_or_create_local_worktree("/root-1", true, cx)
3691 })
3692 .await
3693 .unwrap();
3694 worktree_a
3695 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3696 .await;
3697 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3698 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3699 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3700
3701 // Join the worktree as client B.
3702 let project_b = Project::remote(
3703 project_id,
3704 client_b.clone(),
3705 client_b.user_store.clone(),
3706 lang_registry.clone(),
3707 fs.clone(),
3708 &mut cx_b.to_async(),
3709 )
3710 .await
3711 .unwrap();
3712
3713 // Open the file on client B.
3714 let buffer_b = cx_b
3715 .background()
3716 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
3717 .await
3718 .unwrap();
3719
3720 // Request document highlights as the guest.
3721 let fake_language_server = fake_language_servers.next().await.unwrap();
3722 fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
3723 |params, _| async move {
3724 assert_eq!(
3725 params
3726 .text_document_position_params
3727 .text_document
3728 .uri
3729 .as_str(),
3730 "file:///root-1/main.rs"
3731 );
3732 assert_eq!(
3733 params.text_document_position_params.position,
3734 lsp::Position::new(0, 34)
3735 );
3736 Ok(Some(vec![
3737 lsp::DocumentHighlight {
3738 kind: Some(lsp::DocumentHighlightKind::WRITE),
3739 range: lsp::Range::new(
3740 lsp::Position::new(0, 10),
3741 lsp::Position::new(0, 16),
3742 ),
3743 },
3744 lsp::DocumentHighlight {
3745 kind: Some(lsp::DocumentHighlightKind::READ),
3746 range: lsp::Range::new(
3747 lsp::Position::new(0, 32),
3748 lsp::Position::new(0, 38),
3749 ),
3750 },
3751 lsp::DocumentHighlight {
3752 kind: Some(lsp::DocumentHighlightKind::READ),
3753 range: lsp::Range::new(
3754 lsp::Position::new(0, 41),
3755 lsp::Position::new(0, 47),
3756 ),
3757 },
3758 ]))
3759 },
3760 );
3761
3762 let highlights = project_b
3763 .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
3764 .await
3765 .unwrap();
3766 buffer_b.read_with(cx_b, |buffer, _| {
3767 let snapshot = buffer.snapshot();
3768
3769 let highlights = highlights
3770 .into_iter()
3771 .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3772 .collect::<Vec<_>>();
3773 assert_eq!(
3774 highlights,
3775 &[
3776 (lsp::DocumentHighlightKind::WRITE, 10..16),
3777 (lsp::DocumentHighlightKind::READ, 32..38),
3778 (lsp::DocumentHighlightKind::READ, 41..47)
3779 ]
3780 )
3781 });
3782 }
3783
3784 #[gpui::test(iterations = 10)]
3785 async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3786 cx_a.foreground().forbid_parking();
3787 let lang_registry = Arc::new(LanguageRegistry::test());
3788 let fs = FakeFs::new(cx_a.background());
3789 fs.insert_tree(
3790 "/code",
3791 json!({
3792 "crate-1": {
3793 ".zed.toml": r#"collaborators = ["user_b"]"#,
3794 "one.rs": "const ONE: usize = 1;",
3795 },
3796 "crate-2": {
3797 "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3798 },
3799 "private": {
3800 "passwords.txt": "the-password",
3801 }
3802 }),
3803 )
3804 .await;
3805
3806 // Set up a fake language server.
3807 let mut language = Language::new(
3808 LanguageConfig {
3809 name: "Rust".into(),
3810 path_suffixes: vec!["rs".to_string()],
3811 ..Default::default()
3812 },
3813 Some(tree_sitter_rust::language()),
3814 );
3815 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3816 lang_registry.add(Arc::new(language));
3817
3818 // Connect to a server as 2 clients.
3819 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3820 let client_a = server.create_client(cx_a, "user_a").await;
3821 let client_b = server.create_client(cx_b, "user_b").await;
3822
3823 // Share a project as client A
3824 let project_a = cx_a.update(|cx| {
3825 Project::local(
3826 client_a.clone(),
3827 client_a.user_store.clone(),
3828 lang_registry.clone(),
3829 fs.clone(),
3830 cx,
3831 )
3832 });
3833 let (worktree_a, _) = project_a
3834 .update(cx_a, |p, cx| {
3835 p.find_or_create_local_worktree("/code/crate-1", true, cx)
3836 })
3837 .await
3838 .unwrap();
3839 worktree_a
3840 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3841 .await;
3842 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3843 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3844 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3845
3846 // Join the worktree as client B.
3847 let project_b = Project::remote(
3848 project_id,
3849 client_b.clone(),
3850 client_b.user_store.clone(),
3851 lang_registry.clone(),
3852 fs.clone(),
3853 &mut cx_b.to_async(),
3854 )
3855 .await
3856 .unwrap();
3857
3858 // Cause the language server to start.
3859 let _buffer = cx_b
3860 .background()
3861 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3862 .await
3863 .unwrap();
3864
3865 let fake_language_server = fake_language_servers.next().await.unwrap();
3866 fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
3867 |_, _| async move {
3868 #[allow(deprecated)]
3869 Ok(Some(vec![lsp::SymbolInformation {
3870 name: "TWO".into(),
3871 location: lsp::Location {
3872 uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3873 range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3874 },
3875 kind: lsp::SymbolKind::CONSTANT,
3876 tags: None,
3877 container_name: None,
3878 deprecated: None,
3879 }]))
3880 },
3881 );
3882
3883 // Request the definition of a symbol as the guest.
3884 let symbols = project_b
3885 .update(cx_b, |p, cx| p.symbols("two", cx))
3886 .await
3887 .unwrap();
3888 assert_eq!(symbols.len(), 1);
3889 assert_eq!(symbols[0].name, "TWO");
3890
3891 // Open one of the returned symbols.
3892 let buffer_b_2 = project_b
3893 .update(cx_b, |project, cx| {
3894 project.open_buffer_for_symbol(&symbols[0], cx)
3895 })
3896 .await
3897 .unwrap();
3898 buffer_b_2.read_with(cx_b, |buffer, _| {
3899 assert_eq!(
3900 buffer.file().unwrap().path().as_ref(),
3901 Path::new("../crate-2/two.rs")
3902 );
3903 });
3904
3905 // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3906 let mut fake_symbol = symbols[0].clone();
3907 fake_symbol.path = Path::new("/code/secrets").into();
3908 let error = project_b
3909 .update(cx_b, |project, cx| {
3910 project.open_buffer_for_symbol(&fake_symbol, cx)
3911 })
3912 .await
3913 .unwrap_err();
3914 assert!(error.to_string().contains("invalid symbol signature"));
3915 }
3916
3917 #[gpui::test(iterations = 10)]
3918 async fn test_open_buffer_while_getting_definition_pointing_to_it(
3919 cx_a: &mut TestAppContext,
3920 cx_b: &mut TestAppContext,
3921 mut rng: StdRng,
3922 ) {
3923 cx_a.foreground().forbid_parking();
3924 let lang_registry = Arc::new(LanguageRegistry::test());
3925 let fs = FakeFs::new(cx_a.background());
3926 fs.insert_tree(
3927 "/root",
3928 json!({
3929 ".zed.toml": r#"collaborators = ["user_b"]"#,
3930 "a.rs": "const ONE: usize = b::TWO;",
3931 "b.rs": "const TWO: usize = 2",
3932 }),
3933 )
3934 .await;
3935
3936 // Set up a fake language server.
3937 let mut language = Language::new(
3938 LanguageConfig {
3939 name: "Rust".into(),
3940 path_suffixes: vec!["rs".to_string()],
3941 ..Default::default()
3942 },
3943 Some(tree_sitter_rust::language()),
3944 );
3945 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3946 lang_registry.add(Arc::new(language));
3947
3948 // Connect to a server as 2 clients.
3949 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3950 let client_a = server.create_client(cx_a, "user_a").await;
3951 let client_b = server.create_client(cx_b, "user_b").await;
3952
3953 // Share a project as client A
3954 let project_a = cx_a.update(|cx| {
3955 Project::local(
3956 client_a.clone(),
3957 client_a.user_store.clone(),
3958 lang_registry.clone(),
3959 fs.clone(),
3960 cx,
3961 )
3962 });
3963
3964 let (worktree_a, _) = project_a
3965 .update(cx_a, |p, cx| {
3966 p.find_or_create_local_worktree("/root", true, cx)
3967 })
3968 .await
3969 .unwrap();
3970 worktree_a
3971 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3972 .await;
3973 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3974 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3975 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3976
3977 // Join the worktree as client B.
3978 let project_b = Project::remote(
3979 project_id,
3980 client_b.clone(),
3981 client_b.user_store.clone(),
3982 lang_registry.clone(),
3983 fs.clone(),
3984 &mut cx_b.to_async(),
3985 )
3986 .await
3987 .unwrap();
3988
3989 let buffer_b1 = cx_b
3990 .background()
3991 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3992 .await
3993 .unwrap();
3994
3995 let fake_language_server = fake_language_servers.next().await.unwrap();
3996 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3997 |_, _| async move {
3998 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3999 lsp::Location::new(
4000 lsp::Url::from_file_path("/root/b.rs").unwrap(),
4001 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
4002 ),
4003 )))
4004 },
4005 );
4006
4007 let definitions;
4008 let buffer_b2;
4009 if rng.gen() {
4010 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4011 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4012 } else {
4013 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4014 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4015 }
4016
4017 let buffer_b2 = buffer_b2.await.unwrap();
4018 let definitions = definitions.await.unwrap();
4019 assert_eq!(definitions.len(), 1);
4020 assert_eq!(definitions[0].buffer, buffer_b2);
4021 }
4022
4023 #[gpui::test(iterations = 10)]
4024 async fn test_collaborating_with_code_actions(
4025 cx_a: &mut TestAppContext,
4026 cx_b: &mut TestAppContext,
4027 ) {
4028 cx_a.foreground().forbid_parking();
4029 let lang_registry = Arc::new(LanguageRegistry::test());
4030 let fs = FakeFs::new(cx_a.background());
4031 cx_b.update(|cx| editor::init(cx));
4032
4033 // Set up a fake language server.
4034 let mut language = Language::new(
4035 LanguageConfig {
4036 name: "Rust".into(),
4037 path_suffixes: vec!["rs".to_string()],
4038 ..Default::default()
4039 },
4040 Some(tree_sitter_rust::language()),
4041 );
4042 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4043 lang_registry.add(Arc::new(language));
4044
4045 // Connect to a server as 2 clients.
4046 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4047 let client_a = server.create_client(cx_a, "user_a").await;
4048 let client_b = server.create_client(cx_b, "user_b").await;
4049
4050 // Share a project as client A
4051 fs.insert_tree(
4052 "/a",
4053 json!({
4054 ".zed.toml": r#"collaborators = ["user_b"]"#,
4055 "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
4056 "other.rs": "pub fn foo() -> usize { 4 }",
4057 }),
4058 )
4059 .await;
4060 let project_a = cx_a.update(|cx| {
4061 Project::local(
4062 client_a.clone(),
4063 client_a.user_store.clone(),
4064 lang_registry.clone(),
4065 fs.clone(),
4066 cx,
4067 )
4068 });
4069 let (worktree_a, _) = project_a
4070 .update(cx_a, |p, cx| {
4071 p.find_or_create_local_worktree("/a", true, cx)
4072 })
4073 .await
4074 .unwrap();
4075 worktree_a
4076 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4077 .await;
4078 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4079 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4080 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4081
4082 // Join the worktree as client B.
4083 let project_b = Project::remote(
4084 project_id,
4085 client_b.clone(),
4086 client_b.user_store.clone(),
4087 lang_registry.clone(),
4088 fs.clone(),
4089 &mut cx_b.to_async(),
4090 )
4091 .await
4092 .unwrap();
4093 let mut params = cx_b.update(WorkspaceParams::test);
4094 params.languages = lang_registry.clone();
4095 params.client = client_b.client.clone();
4096 params.user_store = client_b.user_store.clone();
4097 params.project = project_b;
4098
4099 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
4100 let editor_b = workspace_b
4101 .update(cx_b, |workspace, cx| {
4102 workspace.open_path((worktree_id, "main.rs"), true, cx)
4103 })
4104 .await
4105 .unwrap()
4106 .downcast::<Editor>()
4107 .unwrap();
4108
4109 let mut fake_language_server = fake_language_servers.next().await.unwrap();
4110 fake_language_server
4111 .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4112 assert_eq!(
4113 params.text_document.uri,
4114 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4115 );
4116 assert_eq!(params.range.start, lsp::Position::new(0, 0));
4117 assert_eq!(params.range.end, lsp::Position::new(0, 0));
4118 Ok(None)
4119 })
4120 .next()
4121 .await;
4122
4123 // Move cursor to a location that contains code actions.
4124 editor_b.update(cx_b, |editor, cx| {
4125 editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
4126 cx.focus(&editor_b);
4127 });
4128
4129 fake_language_server
4130 .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4131 assert_eq!(
4132 params.text_document.uri,
4133 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4134 );
4135 assert_eq!(params.range.start, lsp::Position::new(1, 31));
4136 assert_eq!(params.range.end, lsp::Position::new(1, 31));
4137
4138 Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
4139 lsp::CodeAction {
4140 title: "Inline into all callers".to_string(),
4141 edit: Some(lsp::WorkspaceEdit {
4142 changes: Some(
4143 [
4144 (
4145 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4146 vec![lsp::TextEdit::new(
4147 lsp::Range::new(
4148 lsp::Position::new(1, 22),
4149 lsp::Position::new(1, 34),
4150 ),
4151 "4".to_string(),
4152 )],
4153 ),
4154 (
4155 lsp::Url::from_file_path("/a/other.rs").unwrap(),
4156 vec![lsp::TextEdit::new(
4157 lsp::Range::new(
4158 lsp::Position::new(0, 0),
4159 lsp::Position::new(0, 27),
4160 ),
4161 "".to_string(),
4162 )],
4163 ),
4164 ]
4165 .into_iter()
4166 .collect(),
4167 ),
4168 ..Default::default()
4169 }),
4170 data: Some(json!({
4171 "codeActionParams": {
4172 "range": {
4173 "start": {"line": 1, "column": 31},
4174 "end": {"line": 1, "column": 31},
4175 }
4176 }
4177 })),
4178 ..Default::default()
4179 },
4180 )]))
4181 })
4182 .next()
4183 .await;
4184
4185 // Toggle code actions and wait for them to display.
4186 editor_b.update(cx_b, |editor, cx| {
4187 editor.toggle_code_actions(
4188 &ToggleCodeActions {
4189 deployed_from_indicator: false,
4190 },
4191 cx,
4192 );
4193 });
4194 editor_b
4195 .condition(&cx_b, |editor, _| editor.context_menu_visible())
4196 .await;
4197
4198 fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
4199
4200 // Confirming the code action will trigger a resolve request.
4201 let confirm_action = workspace_b
4202 .update(cx_b, |workspace, cx| {
4203 Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
4204 })
4205 .unwrap();
4206 fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
4207 |_, _| async move {
4208 Ok(lsp::CodeAction {
4209 title: "Inline into all callers".to_string(),
4210 edit: Some(lsp::WorkspaceEdit {
4211 changes: Some(
4212 [
4213 (
4214 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4215 vec![lsp::TextEdit::new(
4216 lsp::Range::new(
4217 lsp::Position::new(1, 22),
4218 lsp::Position::new(1, 34),
4219 ),
4220 "4".to_string(),
4221 )],
4222 ),
4223 (
4224 lsp::Url::from_file_path("/a/other.rs").unwrap(),
4225 vec![lsp::TextEdit::new(
4226 lsp::Range::new(
4227 lsp::Position::new(0, 0),
4228 lsp::Position::new(0, 27),
4229 ),
4230 "".to_string(),
4231 )],
4232 ),
4233 ]
4234 .into_iter()
4235 .collect(),
4236 ),
4237 ..Default::default()
4238 }),
4239 ..Default::default()
4240 })
4241 },
4242 );
4243
4244 // After the action is confirmed, an editor containing both modified files is opened.
4245 confirm_action.await.unwrap();
4246 let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4247 workspace
4248 .active_item(cx)
4249 .unwrap()
4250 .downcast::<Editor>()
4251 .unwrap()
4252 });
4253 code_action_editor.update(cx_b, |editor, cx| {
4254 assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4255 editor.undo(&Undo, cx);
4256 assert_eq!(
4257 editor.text(cx),
4258 "mod other;\nfn main() { let foo = other::foo(); }\npub fn foo() -> usize { 4 }"
4259 );
4260 editor.redo(&Redo, cx);
4261 assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4262 });
4263 }
4264
4265 #[gpui::test(iterations = 10)]
4266 async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4267 cx_a.foreground().forbid_parking();
4268 let lang_registry = Arc::new(LanguageRegistry::test());
4269 let fs = FakeFs::new(cx_a.background());
4270 cx_b.update(|cx| editor::init(cx));
4271
4272 // Set up a fake language server.
4273 let mut language = Language::new(
4274 LanguageConfig {
4275 name: "Rust".into(),
4276 path_suffixes: vec!["rs".to_string()],
4277 ..Default::default()
4278 },
4279 Some(tree_sitter_rust::language()),
4280 );
4281 let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
4282 capabilities: lsp::ServerCapabilities {
4283 rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
4284 prepare_provider: Some(true),
4285 work_done_progress_options: Default::default(),
4286 })),
4287 ..Default::default()
4288 },
4289 ..Default::default()
4290 });
4291 lang_registry.add(Arc::new(language));
4292
4293 // Connect to a server as 2 clients.
4294 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4295 let client_a = server.create_client(cx_a, "user_a").await;
4296 let client_b = server.create_client(cx_b, "user_b").await;
4297
4298 // Share a project as client A
4299 fs.insert_tree(
4300 "/dir",
4301 json!({
4302 ".zed.toml": r#"collaborators = ["user_b"]"#,
4303 "one.rs": "const ONE: usize = 1;",
4304 "two.rs": "const TWO: usize = one::ONE + one::ONE;"
4305 }),
4306 )
4307 .await;
4308 let project_a = cx_a.update(|cx| {
4309 Project::local(
4310 client_a.clone(),
4311 client_a.user_store.clone(),
4312 lang_registry.clone(),
4313 fs.clone(),
4314 cx,
4315 )
4316 });
4317 let (worktree_a, _) = project_a
4318 .update(cx_a, |p, cx| {
4319 p.find_or_create_local_worktree("/dir", true, cx)
4320 })
4321 .await
4322 .unwrap();
4323 worktree_a
4324 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4325 .await;
4326 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4327 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4328 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4329
4330 // Join the worktree as client B.
4331 let project_b = Project::remote(
4332 project_id,
4333 client_b.clone(),
4334 client_b.user_store.clone(),
4335 lang_registry.clone(),
4336 fs.clone(),
4337 &mut cx_b.to_async(),
4338 )
4339 .await
4340 .unwrap();
4341 let mut params = cx_b.update(WorkspaceParams::test);
4342 params.languages = lang_registry.clone();
4343 params.client = client_b.client.clone();
4344 params.user_store = client_b.user_store.clone();
4345 params.project = project_b;
4346
4347 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
4348 let editor_b = workspace_b
4349 .update(cx_b, |workspace, cx| {
4350 workspace.open_path((worktree_id, "one.rs"), true, cx)
4351 })
4352 .await
4353 .unwrap()
4354 .downcast::<Editor>()
4355 .unwrap();
4356 let fake_language_server = fake_language_servers.next().await.unwrap();
4357
4358 // Move cursor to a location that can be renamed.
4359 let prepare_rename = editor_b.update(cx_b, |editor, cx| {
4360 editor.select_ranges([7..7], None, cx);
4361 editor.rename(&Rename, cx).unwrap()
4362 });
4363
4364 fake_language_server
4365 .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
4366 assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
4367 assert_eq!(params.position, lsp::Position::new(0, 7));
4368 Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4369 lsp::Position::new(0, 6),
4370 lsp::Position::new(0, 9),
4371 ))))
4372 })
4373 .next()
4374 .await
4375 .unwrap();
4376 prepare_rename.await.unwrap();
4377 editor_b.update(cx_b, |editor, cx| {
4378 let rename = editor.pending_rename().unwrap();
4379 let buffer = editor.buffer().read(cx).snapshot(cx);
4380 assert_eq!(
4381 rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
4382 6..9
4383 );
4384 rename.editor.update(cx, |rename_editor, cx| {
4385 rename_editor.buffer().update(cx, |rename_buffer, cx| {
4386 rename_buffer.edit([(0..3, "THREE")], cx);
4387 });
4388 });
4389 });
4390
4391 let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
4392 Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
4393 });
4394 fake_language_server
4395 .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
4396 assert_eq!(
4397 params.text_document_position.text_document.uri.as_str(),
4398 "file:///dir/one.rs"
4399 );
4400 assert_eq!(
4401 params.text_document_position.position,
4402 lsp::Position::new(0, 6)
4403 );
4404 assert_eq!(params.new_name, "THREE");
4405 Ok(Some(lsp::WorkspaceEdit {
4406 changes: Some(
4407 [
4408 (
4409 lsp::Url::from_file_path("/dir/one.rs").unwrap(),
4410 vec![lsp::TextEdit::new(
4411 lsp::Range::new(
4412 lsp::Position::new(0, 6),
4413 lsp::Position::new(0, 9),
4414 ),
4415 "THREE".to_string(),
4416 )],
4417 ),
4418 (
4419 lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4420 vec![
4421 lsp::TextEdit::new(
4422 lsp::Range::new(
4423 lsp::Position::new(0, 24),
4424 lsp::Position::new(0, 27),
4425 ),
4426 "THREE".to_string(),
4427 ),
4428 lsp::TextEdit::new(
4429 lsp::Range::new(
4430 lsp::Position::new(0, 35),
4431 lsp::Position::new(0, 38),
4432 ),
4433 "THREE".to_string(),
4434 ),
4435 ],
4436 ),
4437 ]
4438 .into_iter()
4439 .collect(),
4440 ),
4441 ..Default::default()
4442 }))
4443 })
4444 .next()
4445 .await
4446 .unwrap();
4447 confirm_rename.await.unwrap();
4448
4449 let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4450 workspace
4451 .active_item(cx)
4452 .unwrap()
4453 .downcast::<Editor>()
4454 .unwrap()
4455 });
4456 rename_editor.update(cx_b, |editor, cx| {
4457 assert_eq!(
4458 editor.text(cx),
4459 "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4460 );
4461 editor.undo(&Undo, cx);
4462 assert_eq!(
4463 editor.text(cx),
4464 "const ONE: usize = 1;\nconst TWO: usize = one::ONE + one::ONE;"
4465 );
4466 editor.redo(&Redo, cx);
4467 assert_eq!(
4468 editor.text(cx),
4469 "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4470 );
4471 });
4472
4473 // Ensure temporary rename edits cannot be undone/redone.
4474 editor_b.update(cx_b, |editor, cx| {
4475 editor.undo(&Undo, cx);
4476 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4477 editor.undo(&Undo, cx);
4478 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4479 editor.redo(&Redo, cx);
4480 assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4481 })
4482 }
4483
4484 #[gpui::test(iterations = 10)]
4485 async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4486 cx_a.foreground().forbid_parking();
4487
4488 // Connect to a server as 2 clients.
4489 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4490 let client_a = server.create_client(cx_a, "user_a").await;
4491 let client_b = server.create_client(cx_b, "user_b").await;
4492
4493 // Create an org that includes these 2 users.
4494 let db = &server.app_state.db;
4495 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4496 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4497 .await
4498 .unwrap();
4499 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4500 .await
4501 .unwrap();
4502
4503 // Create a channel that includes all the users.
4504 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4505 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4506 .await
4507 .unwrap();
4508 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4509 .await
4510 .unwrap();
4511 db.create_channel_message(
4512 channel_id,
4513 client_b.current_user_id(&cx_b),
4514 "hello A, it's B.",
4515 OffsetDateTime::now_utc(),
4516 1,
4517 )
4518 .await
4519 .unwrap();
4520
4521 let channels_a = cx_a
4522 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4523 channels_a
4524 .condition(cx_a, |list, _| list.available_channels().is_some())
4525 .await;
4526 channels_a.read_with(cx_a, |list, _| {
4527 assert_eq!(
4528 list.available_channels().unwrap(),
4529 &[ChannelDetails {
4530 id: channel_id.to_proto(),
4531 name: "test-channel".to_string()
4532 }]
4533 )
4534 });
4535 let channel_a = channels_a.update(cx_a, |this, cx| {
4536 this.get_channel(channel_id.to_proto(), cx).unwrap()
4537 });
4538 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4539 channel_a
4540 .condition(&cx_a, |channel, _| {
4541 channel_messages(channel)
4542 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4543 })
4544 .await;
4545
4546 let channels_b = cx_b
4547 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4548 channels_b
4549 .condition(cx_b, |list, _| list.available_channels().is_some())
4550 .await;
4551 channels_b.read_with(cx_b, |list, _| {
4552 assert_eq!(
4553 list.available_channels().unwrap(),
4554 &[ChannelDetails {
4555 id: channel_id.to_proto(),
4556 name: "test-channel".to_string()
4557 }]
4558 )
4559 });
4560
4561 let channel_b = channels_b.update(cx_b, |this, cx| {
4562 this.get_channel(channel_id.to_proto(), cx).unwrap()
4563 });
4564 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4565 channel_b
4566 .condition(&cx_b, |channel, _| {
4567 channel_messages(channel)
4568 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4569 })
4570 .await;
4571
4572 channel_a
4573 .update(cx_a, |channel, cx| {
4574 channel
4575 .send_message("oh, hi B.".to_string(), cx)
4576 .unwrap()
4577 .detach();
4578 let task = channel.send_message("sup".to_string(), cx).unwrap();
4579 assert_eq!(
4580 channel_messages(channel),
4581 &[
4582 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4583 ("user_a".to_string(), "oh, hi B.".to_string(), true),
4584 ("user_a".to_string(), "sup".to_string(), true)
4585 ]
4586 );
4587 task
4588 })
4589 .await
4590 .unwrap();
4591
4592 channel_b
4593 .condition(&cx_b, |channel, _| {
4594 channel_messages(channel)
4595 == [
4596 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4597 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4598 ("user_a".to_string(), "sup".to_string(), false),
4599 ]
4600 })
4601 .await;
4602
4603 assert_eq!(
4604 server
4605 .state()
4606 .await
4607 .channel(channel_id)
4608 .unwrap()
4609 .connection_ids
4610 .len(),
4611 2
4612 );
4613 cx_b.update(|_| drop(channel_b));
4614 server
4615 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4616 .await;
4617
4618 cx_a.update(|_| drop(channel_a));
4619 server
4620 .condition(|state| state.channel(channel_id).is_none())
4621 .await;
4622 }
4623
4624 #[gpui::test(iterations = 10)]
4625 async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4626 cx_a.foreground().forbid_parking();
4627
4628 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4629 let client_a = server.create_client(cx_a, "user_a").await;
4630
4631 let db = &server.app_state.db;
4632 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4633 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4634 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4635 .await
4636 .unwrap();
4637 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4638 .await
4639 .unwrap();
4640
4641 let channels_a = cx_a
4642 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4643 channels_a
4644 .condition(cx_a, |list, _| list.available_channels().is_some())
4645 .await;
4646 let channel_a = channels_a.update(cx_a, |this, cx| {
4647 this.get_channel(channel_id.to_proto(), cx).unwrap()
4648 });
4649
4650 // Messages aren't allowed to be too long.
4651 channel_a
4652 .update(cx_a, |channel, cx| {
4653 let long_body = "this is long.\n".repeat(1024);
4654 channel.send_message(long_body, cx).unwrap()
4655 })
4656 .await
4657 .unwrap_err();
4658
4659 // Messages aren't allowed to be blank.
4660 channel_a.update(cx_a, |channel, cx| {
4661 channel.send_message(String::new(), cx).unwrap_err()
4662 });
4663
4664 // Leading and trailing whitespace are trimmed.
4665 channel_a
4666 .update(cx_a, |channel, cx| {
4667 channel
4668 .send_message("\n surrounded by whitespace \n".to_string(), cx)
4669 .unwrap()
4670 })
4671 .await
4672 .unwrap();
4673 assert_eq!(
4674 db.get_channel_messages(channel_id, 10, None)
4675 .await
4676 .unwrap()
4677 .iter()
4678 .map(|m| &m.body)
4679 .collect::<Vec<_>>(),
4680 &["surrounded by whitespace"]
4681 );
4682 }
4683
4684 #[gpui::test(iterations = 10)]
4685 async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4686 cx_a.foreground().forbid_parking();
4687
4688 // Connect to a server as 2 clients.
4689 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4690 let client_a = server.create_client(cx_a, "user_a").await;
4691 let client_b = server.create_client(cx_b, "user_b").await;
4692 let mut status_b = client_b.status();
4693
4694 // Create an org that includes these 2 users.
4695 let db = &server.app_state.db;
4696 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4697 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4698 .await
4699 .unwrap();
4700 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4701 .await
4702 .unwrap();
4703
4704 // Create a channel that includes all the users.
4705 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4706 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4707 .await
4708 .unwrap();
4709 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4710 .await
4711 .unwrap();
4712 db.create_channel_message(
4713 channel_id,
4714 client_b.current_user_id(&cx_b),
4715 "hello A, it's B.",
4716 OffsetDateTime::now_utc(),
4717 2,
4718 )
4719 .await
4720 .unwrap();
4721
4722 let channels_a = cx_a
4723 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4724 channels_a
4725 .condition(cx_a, |list, _| list.available_channels().is_some())
4726 .await;
4727
4728 channels_a.read_with(cx_a, |list, _| {
4729 assert_eq!(
4730 list.available_channels().unwrap(),
4731 &[ChannelDetails {
4732 id: channel_id.to_proto(),
4733 name: "test-channel".to_string()
4734 }]
4735 )
4736 });
4737 let channel_a = channels_a.update(cx_a, |this, cx| {
4738 this.get_channel(channel_id.to_proto(), cx).unwrap()
4739 });
4740 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4741 channel_a
4742 .condition(&cx_a, |channel, _| {
4743 channel_messages(channel)
4744 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4745 })
4746 .await;
4747
4748 let channels_b = cx_b
4749 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4750 channels_b
4751 .condition(cx_b, |list, _| list.available_channels().is_some())
4752 .await;
4753 channels_b.read_with(cx_b, |list, _| {
4754 assert_eq!(
4755 list.available_channels().unwrap(),
4756 &[ChannelDetails {
4757 id: channel_id.to_proto(),
4758 name: "test-channel".to_string()
4759 }]
4760 )
4761 });
4762
4763 let channel_b = channels_b.update(cx_b, |this, cx| {
4764 this.get_channel(channel_id.to_proto(), cx).unwrap()
4765 });
4766 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4767 channel_b
4768 .condition(&cx_b, |channel, _| {
4769 channel_messages(channel)
4770 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4771 })
4772 .await;
4773
4774 // Disconnect client B, ensuring we can still access its cached channel data.
4775 server.forbid_connections();
4776 server.disconnect_client(client_b.current_user_id(&cx_b));
4777 cx_b.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
4778 while !matches!(
4779 status_b.next().await,
4780 Some(client::Status::ReconnectionError { .. })
4781 ) {}
4782
4783 channels_b.read_with(cx_b, |channels, _| {
4784 assert_eq!(
4785 channels.available_channels().unwrap(),
4786 [ChannelDetails {
4787 id: channel_id.to_proto(),
4788 name: "test-channel".to_string()
4789 }]
4790 )
4791 });
4792 channel_b.read_with(cx_b, |channel, _| {
4793 assert_eq!(
4794 channel_messages(channel),
4795 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4796 )
4797 });
4798
4799 // Send a message from client B while it is disconnected.
4800 channel_b
4801 .update(cx_b, |channel, cx| {
4802 let task = channel
4803 .send_message("can you see this?".to_string(), cx)
4804 .unwrap();
4805 assert_eq!(
4806 channel_messages(channel),
4807 &[
4808 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4809 ("user_b".to_string(), "can you see this?".to_string(), true)
4810 ]
4811 );
4812 task
4813 })
4814 .await
4815 .unwrap_err();
4816
4817 // Send a message from client A while B is disconnected.
4818 channel_a
4819 .update(cx_a, |channel, cx| {
4820 channel
4821 .send_message("oh, hi B.".to_string(), cx)
4822 .unwrap()
4823 .detach();
4824 let task = channel.send_message("sup".to_string(), cx).unwrap();
4825 assert_eq!(
4826 channel_messages(channel),
4827 &[
4828 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4829 ("user_a".to_string(), "oh, hi B.".to_string(), true),
4830 ("user_a".to_string(), "sup".to_string(), true)
4831 ]
4832 );
4833 task
4834 })
4835 .await
4836 .unwrap();
4837
4838 // Give client B a chance to reconnect.
4839 server.allow_connections();
4840 cx_b.foreground().advance_clock(Duration::from_secs(10));
4841
4842 // Verify that B sees the new messages upon reconnection, as well as the message client B
4843 // sent while offline.
4844 channel_b
4845 .condition(&cx_b, |channel, _| {
4846 channel_messages(channel)
4847 == [
4848 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4849 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4850 ("user_a".to_string(), "sup".to_string(), false),
4851 ("user_b".to_string(), "can you see this?".to_string(), false),
4852 ]
4853 })
4854 .await;
4855
4856 // Ensure client A and B can communicate normally after reconnection.
4857 channel_a
4858 .update(cx_a, |channel, cx| {
4859 channel.send_message("you online?".to_string(), cx).unwrap()
4860 })
4861 .await
4862 .unwrap();
4863 channel_b
4864 .condition(&cx_b, |channel, _| {
4865 channel_messages(channel)
4866 == [
4867 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4868 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4869 ("user_a".to_string(), "sup".to_string(), false),
4870 ("user_b".to_string(), "can you see this?".to_string(), false),
4871 ("user_a".to_string(), "you online?".to_string(), false),
4872 ]
4873 })
4874 .await;
4875
4876 channel_b
4877 .update(cx_b, |channel, cx| {
4878 channel.send_message("yep".to_string(), cx).unwrap()
4879 })
4880 .await
4881 .unwrap();
4882 channel_a
4883 .condition(&cx_a, |channel, _| {
4884 channel_messages(channel)
4885 == [
4886 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4887 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4888 ("user_a".to_string(), "sup".to_string(), false),
4889 ("user_b".to_string(), "can you see this?".to_string(), false),
4890 ("user_a".to_string(), "you online?".to_string(), false),
4891 ("user_b".to_string(), "yep".to_string(), false),
4892 ]
4893 })
4894 .await;
4895 }
4896
4897 #[gpui::test(iterations = 10)]
4898 async fn test_contacts(
4899 deterministic: Arc<Deterministic>,
4900 cx_a: &mut TestAppContext,
4901 cx_b: &mut TestAppContext,
4902 cx_c: &mut TestAppContext,
4903 ) {
4904 cx_a.foreground().forbid_parking();
4905 let lang_registry = Arc::new(LanguageRegistry::test());
4906 let fs = FakeFs::new(cx_a.background());
4907
4908 // Connect to a server as 3 clients.
4909 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4910 let client_a = server.create_client(cx_a, "user_a").await;
4911 let client_b = server.create_client(cx_b, "user_b").await;
4912 let client_c = server.create_client(cx_c, "user_c").await;
4913 server
4914 .make_contacts(vec![
4915 (&client_a, cx_a),
4916 (&client_b, cx_b),
4917 (&client_c, cx_c),
4918 ])
4919 .await;
4920 deterministic.run_until_parked();
4921 client_a.user_store.read_with(cx_a, |store, _| {
4922 assert_eq!(contacts(store), [("user_b", vec![]), ("user_c", vec![])])
4923 });
4924 client_b.user_store.read_with(cx_b, |store, _| {
4925 assert_eq!(contacts(store), [("user_a", vec![]), ("user_c", vec![])])
4926 });
4927 client_c.user_store.read_with(cx_c, |store, _| {
4928 assert_eq!(contacts(store), [("user_a", vec![]), ("user_b", vec![])])
4929 });
4930
4931 // Share a worktree as client A.
4932 fs.create_dir(Path::new("/a")).await.unwrap();
4933
4934 let project_a = cx_a.update(|cx| {
4935 Project::local(
4936 client_a.clone(),
4937 client_a.user_store.clone(),
4938 lang_registry.clone(),
4939 fs.clone(),
4940 cx,
4941 )
4942 });
4943 let (worktree_a, _) = project_a
4944 .update(cx_a, |p, cx| {
4945 p.find_or_create_local_worktree("/a", true, cx)
4946 })
4947 .await
4948 .unwrap();
4949 worktree_a
4950 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4951 .await;
4952
4953 deterministic.run_until_parked();
4954 client_a.user_store.read_with(cx_a, |store, _| {
4955 assert_eq!(contacts(store), [("user_b", vec![]), ("user_c", vec![])])
4956 });
4957 client_b.user_store.read_with(cx_b, |store, _| {
4958 assert_eq!(
4959 contacts(store),
4960 [("user_a", vec![("a", false, vec![])]), ("user_c", vec![])]
4961 )
4962 });
4963 client_c.user_store.read_with(cx_c, |store, _| {
4964 assert_eq!(
4965 contacts(store),
4966 [("user_a", vec![("a", false, vec![])]), ("user_b", vec![])]
4967 )
4968 });
4969
4970 let project_id = project_a
4971 .update(cx_a, |project, _| project.next_remote_id())
4972 .await;
4973 project_a
4974 .update(cx_a, |project, cx| project.share(cx))
4975 .await
4976 .unwrap();
4977 deterministic.run_until_parked();
4978 client_a.user_store.read_with(cx_a, |store, _| {
4979 assert_eq!(contacts(store), [("user_b", vec![]), ("user_c", vec![])])
4980 });
4981 client_b.user_store.read_with(cx_b, |store, _| {
4982 assert_eq!(
4983 contacts(store),
4984 [("user_a", vec![("a", true, vec![])]), ("user_c", vec![])]
4985 )
4986 });
4987 client_c.user_store.read_with(cx_c, |store, _| {
4988 assert_eq!(
4989 contacts(store),
4990 [("user_a", vec![("a", true, vec![])]), ("user_b", vec![])]
4991 )
4992 });
4993
4994 let _project_b = Project::remote(
4995 project_id,
4996 client_b.clone(),
4997 client_b.user_store.clone(),
4998 lang_registry.clone(),
4999 fs.clone(),
5000 &mut cx_b.to_async(),
5001 )
5002 .await
5003 .unwrap();
5004 deterministic.run_until_parked();
5005 client_a.user_store.read_with(cx_a, |store, _| {
5006 assert_eq!(contacts(store), [("user_b", vec![]), ("user_c", vec![])])
5007 });
5008 client_b.user_store.read_with(cx_b, |store, _| {
5009 assert_eq!(
5010 contacts(store),
5011 [
5012 ("user_a", vec![("a", true, vec!["user_b"])]),
5013 ("user_c", vec![])
5014 ]
5015 )
5016 });
5017 client_c.user_store.read_with(cx_c, |store, _| {
5018 assert_eq!(
5019 contacts(store),
5020 [
5021 ("user_a", vec![("a", true, vec!["user_b"])]),
5022 ("user_b", vec![])
5023 ]
5024 )
5025 });
5026
5027 project_a
5028 .condition(&cx_a, |project, _| {
5029 project.collaborators().contains_key(&client_b.peer_id)
5030 })
5031 .await;
5032
5033 cx_a.update(move |_| drop(project_a));
5034 deterministic.run_until_parked();
5035 client_a.user_store.read_with(cx_a, |store, _| {
5036 assert_eq!(contacts(store), [("user_b", vec![]), ("user_c", vec![])])
5037 });
5038 client_b.user_store.read_with(cx_b, |store, _| {
5039 assert_eq!(contacts(store), [("user_a", vec![]), ("user_c", vec![])])
5040 });
5041 client_c.user_store.read_with(cx_c, |store, _| {
5042 assert_eq!(contacts(store), [("user_a", vec![]), ("user_b", vec![])])
5043 });
5044
5045 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, bool, Vec<&str>)>)> {
5046 user_store
5047 .contacts()
5048 .iter()
5049 .map(|contact| {
5050 let worktrees = contact
5051 .projects
5052 .iter()
5053 .map(|p| {
5054 (
5055 p.worktree_root_names[0].as_str(),
5056 p.is_shared,
5057 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
5058 )
5059 })
5060 .collect();
5061 (contact.user.github_login.as_str(), worktrees)
5062 })
5063 .collect()
5064 }
5065 }
5066
5067 #[gpui::test(iterations = 10)]
5068 async fn test_contact_requests(
5069 executor: Arc<Deterministic>,
5070 cx_a: &mut TestAppContext,
5071 cx_a2: &mut TestAppContext,
5072 cx_b: &mut TestAppContext,
5073 cx_b2: &mut TestAppContext,
5074 cx_c: &mut TestAppContext,
5075 cx_c2: &mut TestAppContext,
5076 ) {
5077 cx_a.foreground().forbid_parking();
5078
5079 // Connect to a server as 3 clients.
5080 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5081 let client_a = server.create_client(cx_a, "user_a").await;
5082 let client_a2 = server.create_client(cx_a2, "user_a").await;
5083 let client_b = server.create_client(cx_b, "user_b").await;
5084 let client_b2 = server.create_client(cx_b2, "user_b").await;
5085 let client_c = server.create_client(cx_c, "user_c").await;
5086 let client_c2 = server.create_client(cx_c2, "user_c").await;
5087
5088 assert_eq!(client_a.user_id().unwrap(), client_a2.user_id().unwrap());
5089 assert_eq!(client_b.user_id().unwrap(), client_b2.user_id().unwrap());
5090 assert_eq!(client_c.user_id().unwrap(), client_c2.user_id().unwrap());
5091
5092 // User A and User C request that user B become their contact.
5093 client_a
5094 .user_store
5095 .read_with(cx_a, |store, _| {
5096 store.request_contact(client_b.user_id().unwrap())
5097 })
5098 .await
5099 .unwrap();
5100 client_c
5101 .user_store
5102 .read_with(cx_c, |store, _| {
5103 store.request_contact(client_b.user_id().unwrap())
5104 })
5105 .await
5106 .unwrap();
5107 executor.run_until_parked();
5108
5109 // All users see the pending request appear in all their clients.
5110 assert_eq!(
5111 client_a.summarize_contacts(&cx_a).outgoing_requests,
5112 &["user_b"]
5113 );
5114 assert_eq!(
5115 client_a2.summarize_contacts(&cx_a2).outgoing_requests,
5116 &["user_b"]
5117 );
5118 assert_eq!(
5119 client_b.summarize_contacts(&cx_b).incoming_requests,
5120 &["user_a", "user_c"]
5121 );
5122 assert_eq!(
5123 client_b2.summarize_contacts(&cx_b2).incoming_requests,
5124 &["user_a", "user_c"]
5125 );
5126 assert_eq!(
5127 client_c.summarize_contacts(&cx_c).outgoing_requests,
5128 &["user_b"]
5129 );
5130 assert_eq!(
5131 client_c2.summarize_contacts(&cx_c2).outgoing_requests,
5132 &["user_b"]
5133 );
5134
5135 // Contact requests are present upon connecting (tested here via disconnect/reconnect)
5136 disconnect_and_reconnect(&client_a, cx_a).await;
5137 disconnect_and_reconnect(&client_b, cx_b).await;
5138 disconnect_and_reconnect(&client_c, cx_c).await;
5139 executor.run_until_parked();
5140 assert_eq!(
5141 client_a.summarize_contacts(&cx_a).outgoing_requests,
5142 &["user_b"]
5143 );
5144 assert_eq!(
5145 client_b.summarize_contacts(&cx_b).incoming_requests,
5146 &["user_a", "user_c"]
5147 );
5148 assert_eq!(
5149 client_c.summarize_contacts(&cx_c).outgoing_requests,
5150 &["user_b"]
5151 );
5152
5153 // User B accepts the request from user A.
5154 client_b
5155 .user_store
5156 .read_with(cx_b, |store, _| {
5157 store.respond_to_contact_request(client_a.user_id().unwrap(), true)
5158 })
5159 .await
5160 .unwrap();
5161
5162 executor.run_until_parked();
5163
5164 // User B sees user A as their contact now in all client, and the incoming request from them is removed.
5165 let contacts_b = client_b.summarize_contacts(&cx_b);
5166 assert_eq!(contacts_b.current, &["user_a"]);
5167 assert_eq!(contacts_b.incoming_requests, &["user_c"]);
5168 let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5169 assert_eq!(contacts_b2.current, &["user_a"]);
5170 assert_eq!(contacts_b2.incoming_requests, &["user_c"]);
5171
5172 // User A sees user B as their contact now in all clients, and the outgoing request to them is removed.
5173 let contacts_a = client_a.summarize_contacts(&cx_a);
5174 assert_eq!(contacts_a.current, &["user_b"]);
5175 assert!(contacts_a.outgoing_requests.is_empty());
5176 let contacts_a2 = client_a2.summarize_contacts(&cx_a2);
5177 assert_eq!(contacts_a2.current, &["user_b"]);
5178 assert!(contacts_a2.outgoing_requests.is_empty());
5179
5180 // Contacts are present upon connecting (tested here via disconnect/reconnect)
5181 disconnect_and_reconnect(&client_a, cx_a).await;
5182 disconnect_and_reconnect(&client_b, cx_b).await;
5183 disconnect_and_reconnect(&client_c, cx_c).await;
5184 executor.run_until_parked();
5185 assert_eq!(client_a.summarize_contacts(&cx_a).current, &["user_b"]);
5186 assert_eq!(client_b.summarize_contacts(&cx_b).current, &["user_a"]);
5187 assert_eq!(
5188 client_b.summarize_contacts(&cx_b).incoming_requests,
5189 &["user_c"]
5190 );
5191 assert!(client_c.summarize_contacts(&cx_c).current.is_empty());
5192 assert_eq!(
5193 client_c.summarize_contacts(&cx_c).outgoing_requests,
5194 &["user_b"]
5195 );
5196
5197 // User B rejects the request from user C.
5198 client_b
5199 .user_store
5200 .read_with(cx_b, |store, _| {
5201 store.respond_to_contact_request(client_c.user_id().unwrap(), false)
5202 })
5203 .await
5204 .unwrap();
5205
5206 executor.run_until_parked();
5207
5208 // User B doesn't see user C as their contact, and the incoming request from them is removed.
5209 let contacts_b = client_b.summarize_contacts(&cx_b);
5210 assert_eq!(contacts_b.current, &["user_a"]);
5211 assert!(contacts_b.incoming_requests.is_empty());
5212 let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5213 assert_eq!(contacts_b2.current, &["user_a"]);
5214 assert!(contacts_b2.incoming_requests.is_empty());
5215
5216 // User C doesn't see user B as their contact, and the outgoing request to them is removed.
5217 let contacts_c = client_c.summarize_contacts(&cx_c);
5218 assert!(contacts_c.current.is_empty());
5219 assert!(contacts_c.outgoing_requests.is_empty());
5220 let contacts_c2 = client_c2.summarize_contacts(&cx_c2);
5221 assert!(contacts_c2.current.is_empty());
5222 assert!(contacts_c2.outgoing_requests.is_empty());
5223
5224 // Incoming/outgoing requests are not present upon connecting (tested here via disconnect/reconnect)
5225 disconnect_and_reconnect(&client_a, cx_a).await;
5226 disconnect_and_reconnect(&client_b, cx_b).await;
5227 disconnect_and_reconnect(&client_c, cx_c).await;
5228 executor.run_until_parked();
5229 assert_eq!(client_a.summarize_contacts(&cx_a).current, &["user_b"]);
5230 assert_eq!(client_b.summarize_contacts(&cx_b).current, &["user_a"]);
5231 assert!(client_b
5232 .summarize_contacts(&cx_b)
5233 .incoming_requests
5234 .is_empty());
5235 assert!(client_c.summarize_contacts(&cx_c).current.is_empty());
5236 assert!(client_c
5237 .summarize_contacts(&cx_c)
5238 .outgoing_requests
5239 .is_empty());
5240
5241 async fn disconnect_and_reconnect(client: &TestClient, cx: &mut TestAppContext) {
5242 client.disconnect(&cx.to_async()).unwrap();
5243 client.clear_contacts(cx);
5244 client
5245 .authenticate_and_connect(false, &cx.to_async())
5246 .await
5247 .unwrap();
5248 }
5249 }
5250
5251 #[gpui::test(iterations = 10)]
5252 async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5253 cx_a.foreground().forbid_parking();
5254 let fs = FakeFs::new(cx_a.background());
5255
5256 // 2 clients connect to a server.
5257 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5258 let mut client_a = server.create_client(cx_a, "user_a").await;
5259 let mut client_b = server.create_client(cx_b, "user_b").await;
5260 cx_a.update(editor::init);
5261 cx_b.update(editor::init);
5262
5263 // Client A shares a project.
5264 fs.insert_tree(
5265 "/a",
5266 json!({
5267 ".zed.toml": r#"collaborators = ["user_b"]"#,
5268 "1.txt": "one",
5269 "2.txt": "two",
5270 "3.txt": "three",
5271 }),
5272 )
5273 .await;
5274 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5275 project_a
5276 .update(cx_a, |project, cx| project.share(cx))
5277 .await
5278 .unwrap();
5279
5280 // Client B joins the project.
5281 let project_b = client_b
5282 .build_remote_project(
5283 project_a
5284 .read_with(cx_a, |project, _| project.remote_id())
5285 .unwrap(),
5286 cx_b,
5287 )
5288 .await;
5289
5290 // Client A opens some editors.
5291 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5292 let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5293 let editor_a1 = workspace_a
5294 .update(cx_a, |workspace, cx| {
5295 workspace.open_path((worktree_id, "1.txt"), true, cx)
5296 })
5297 .await
5298 .unwrap()
5299 .downcast::<Editor>()
5300 .unwrap();
5301 let editor_a2 = workspace_a
5302 .update(cx_a, |workspace, cx| {
5303 workspace.open_path((worktree_id, "2.txt"), true, cx)
5304 })
5305 .await
5306 .unwrap()
5307 .downcast::<Editor>()
5308 .unwrap();
5309
5310 // Client B opens an editor.
5311 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5312 let editor_b1 = workspace_b
5313 .update(cx_b, |workspace, cx| {
5314 workspace.open_path((worktree_id, "1.txt"), true, cx)
5315 })
5316 .await
5317 .unwrap()
5318 .downcast::<Editor>()
5319 .unwrap();
5320
5321 let client_a_id = project_b.read_with(cx_b, |project, _| {
5322 project.collaborators().values().next().unwrap().peer_id
5323 });
5324 let client_b_id = project_a.read_with(cx_a, |project, _| {
5325 project.collaborators().values().next().unwrap().peer_id
5326 });
5327
5328 // When client B starts following client A, all visible view states are replicated to client B.
5329 editor_a1.update(cx_a, |editor, cx| editor.select_ranges([0..1], None, cx));
5330 editor_a2.update(cx_a, |editor, cx| editor.select_ranges([2..3], None, cx));
5331 workspace_b
5332 .update(cx_b, |workspace, cx| {
5333 workspace
5334 .toggle_follow(&ToggleFollow(client_a_id), cx)
5335 .unwrap()
5336 })
5337 .await
5338 .unwrap();
5339 let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5340 workspace
5341 .active_item(cx)
5342 .unwrap()
5343 .downcast::<Editor>()
5344 .unwrap()
5345 });
5346 assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
5347 assert_eq!(
5348 editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
5349 Some((worktree_id, "2.txt").into())
5350 );
5351 assert_eq!(
5352 editor_b2.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
5353 vec![2..3]
5354 );
5355 assert_eq!(
5356 editor_b1.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
5357 vec![0..1]
5358 );
5359
5360 // When client A activates a different editor, client B does so as well.
5361 workspace_a.update(cx_a, |workspace, cx| {
5362 workspace.activate_item(&editor_a1, cx)
5363 });
5364 workspace_b
5365 .condition(cx_b, |workspace, cx| {
5366 workspace.active_item(cx).unwrap().id() == editor_b1.id()
5367 })
5368 .await;
5369
5370 // When client A navigates back and forth, client B does so as well.
5371 workspace_a
5372 .update(cx_a, |workspace, cx| {
5373 workspace::Pane::go_back(workspace, None, cx)
5374 })
5375 .await;
5376 workspace_b
5377 .condition(cx_b, |workspace, cx| {
5378 workspace.active_item(cx).unwrap().id() == editor_b2.id()
5379 })
5380 .await;
5381
5382 workspace_a
5383 .update(cx_a, |workspace, cx| {
5384 workspace::Pane::go_forward(workspace, None, cx)
5385 })
5386 .await;
5387 workspace_b
5388 .condition(cx_b, |workspace, cx| {
5389 workspace.active_item(cx).unwrap().id() == editor_b1.id()
5390 })
5391 .await;
5392
5393 // Changes to client A's editor are reflected on client B.
5394 editor_a1.update(cx_a, |editor, cx| {
5395 editor.select_ranges([1..1, 2..2], None, cx);
5396 });
5397 editor_b1
5398 .condition(cx_b, |editor, cx| {
5399 editor.selected_ranges(cx) == vec![1..1, 2..2]
5400 })
5401 .await;
5402
5403 editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
5404 editor_b1
5405 .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
5406 .await;
5407
5408 editor_a1.update(cx_a, |editor, cx| {
5409 editor.select_ranges([3..3], None, cx);
5410 editor.set_scroll_position(vec2f(0., 100.), cx);
5411 });
5412 editor_b1
5413 .condition(cx_b, |editor, cx| editor.selected_ranges(cx) == vec![3..3])
5414 .await;
5415
5416 // After unfollowing, client B stops receiving updates from client A.
5417 workspace_b.update(cx_b, |workspace, cx| {
5418 workspace.unfollow(&workspace.active_pane().clone(), cx)
5419 });
5420 workspace_a.update(cx_a, |workspace, cx| {
5421 workspace.activate_item(&editor_a2, cx)
5422 });
5423 cx_a.foreground().run_until_parked();
5424 assert_eq!(
5425 workspace_b.read_with(cx_b, |workspace, cx| workspace
5426 .active_item(cx)
5427 .unwrap()
5428 .id()),
5429 editor_b1.id()
5430 );
5431
5432 // Client A starts following client B.
5433 workspace_a
5434 .update(cx_a, |workspace, cx| {
5435 workspace
5436 .toggle_follow(&ToggleFollow(client_b_id), cx)
5437 .unwrap()
5438 })
5439 .await
5440 .unwrap();
5441 assert_eq!(
5442 workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5443 Some(client_b_id)
5444 );
5445 assert_eq!(
5446 workspace_a.read_with(cx_a, |workspace, cx| workspace
5447 .active_item(cx)
5448 .unwrap()
5449 .id()),
5450 editor_a1.id()
5451 );
5452
5453 // Following interrupts when client B disconnects.
5454 client_b.disconnect(&cx_b.to_async()).unwrap();
5455 cx_a.foreground().run_until_parked();
5456 assert_eq!(
5457 workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5458 None
5459 );
5460 }
5461
5462 #[gpui::test(iterations = 10)]
5463 async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5464 cx_a.foreground().forbid_parking();
5465 let fs = FakeFs::new(cx_a.background());
5466
5467 // 2 clients connect to a server.
5468 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5469 let mut client_a = server.create_client(cx_a, "user_a").await;
5470 let mut client_b = server.create_client(cx_b, "user_b").await;
5471 cx_a.update(editor::init);
5472 cx_b.update(editor::init);
5473
5474 // Client A shares a project.
5475 fs.insert_tree(
5476 "/a",
5477 json!({
5478 ".zed.toml": r#"collaborators = ["user_b"]"#,
5479 "1.txt": "one",
5480 "2.txt": "two",
5481 "3.txt": "three",
5482 "4.txt": "four",
5483 }),
5484 )
5485 .await;
5486 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5487 project_a
5488 .update(cx_a, |project, cx| project.share(cx))
5489 .await
5490 .unwrap();
5491
5492 // Client B joins the project.
5493 let project_b = client_b
5494 .build_remote_project(
5495 project_a
5496 .read_with(cx_a, |project, _| project.remote_id())
5497 .unwrap(),
5498 cx_b,
5499 )
5500 .await;
5501
5502 // Client A opens some editors.
5503 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5504 let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5505 let _editor_a1 = workspace_a
5506 .update(cx_a, |workspace, cx| {
5507 workspace.open_path((worktree_id, "1.txt"), true, cx)
5508 })
5509 .await
5510 .unwrap()
5511 .downcast::<Editor>()
5512 .unwrap();
5513
5514 // Client B opens an editor.
5515 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5516 let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5517 let _editor_b1 = workspace_b
5518 .update(cx_b, |workspace, cx| {
5519 workspace.open_path((worktree_id, "2.txt"), true, cx)
5520 })
5521 .await
5522 .unwrap()
5523 .downcast::<Editor>()
5524 .unwrap();
5525
5526 // Clients A and B follow each other in split panes
5527 workspace_a
5528 .update(cx_a, |workspace, cx| {
5529 workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5530 assert_ne!(*workspace.active_pane(), pane_a1);
5531 let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
5532 workspace
5533 .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5534 .unwrap()
5535 })
5536 .await
5537 .unwrap();
5538 workspace_b
5539 .update(cx_b, |workspace, cx| {
5540 workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5541 assert_ne!(*workspace.active_pane(), pane_b1);
5542 let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
5543 workspace
5544 .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5545 .unwrap()
5546 })
5547 .await
5548 .unwrap();
5549
5550 workspace_a
5551 .update(cx_a, |workspace, cx| {
5552 workspace.activate_next_pane(cx);
5553 assert_eq!(*workspace.active_pane(), pane_a1);
5554 workspace.open_path((worktree_id, "3.txt"), true, cx)
5555 })
5556 .await
5557 .unwrap();
5558 workspace_b
5559 .update(cx_b, |workspace, cx| {
5560 workspace.activate_next_pane(cx);
5561 assert_eq!(*workspace.active_pane(), pane_b1);
5562 workspace.open_path((worktree_id, "4.txt"), true, cx)
5563 })
5564 .await
5565 .unwrap();
5566 cx_a.foreground().run_until_parked();
5567
5568 // Ensure leader updates don't change the active pane of followers
5569 workspace_a.read_with(cx_a, |workspace, _| {
5570 assert_eq!(*workspace.active_pane(), pane_a1);
5571 });
5572 workspace_b.read_with(cx_b, |workspace, _| {
5573 assert_eq!(*workspace.active_pane(), pane_b1);
5574 });
5575
5576 // Ensure peers following each other doesn't cause an infinite loop.
5577 assert_eq!(
5578 workspace_a.read_with(cx_a, |workspace, cx| workspace
5579 .active_item(cx)
5580 .unwrap()
5581 .project_path(cx)),
5582 Some((worktree_id, "3.txt").into())
5583 );
5584 workspace_a.update(cx_a, |workspace, cx| {
5585 assert_eq!(
5586 workspace.active_item(cx).unwrap().project_path(cx),
5587 Some((worktree_id, "3.txt").into())
5588 );
5589 workspace.activate_next_pane(cx);
5590 assert_eq!(
5591 workspace.active_item(cx).unwrap().project_path(cx),
5592 Some((worktree_id, "4.txt").into())
5593 );
5594 });
5595 workspace_b.update(cx_b, |workspace, cx| {
5596 assert_eq!(
5597 workspace.active_item(cx).unwrap().project_path(cx),
5598 Some((worktree_id, "4.txt").into())
5599 );
5600 workspace.activate_next_pane(cx);
5601 assert_eq!(
5602 workspace.active_item(cx).unwrap().project_path(cx),
5603 Some((worktree_id, "3.txt").into())
5604 );
5605 });
5606 }
5607
5608 #[gpui::test(iterations = 10)]
5609 async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5610 cx_a.foreground().forbid_parking();
5611 let fs = FakeFs::new(cx_a.background());
5612
5613 // 2 clients connect to a server.
5614 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5615 let mut client_a = server.create_client(cx_a, "user_a").await;
5616 let mut client_b = server.create_client(cx_b, "user_b").await;
5617 cx_a.update(editor::init);
5618 cx_b.update(editor::init);
5619
5620 // Client A shares a project.
5621 fs.insert_tree(
5622 "/a",
5623 json!({
5624 ".zed.toml": r#"collaborators = ["user_b"]"#,
5625 "1.txt": "one",
5626 "2.txt": "two",
5627 "3.txt": "three",
5628 }),
5629 )
5630 .await;
5631 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5632 project_a
5633 .update(cx_a, |project, cx| project.share(cx))
5634 .await
5635 .unwrap();
5636
5637 // Client B joins the project.
5638 let project_b = client_b
5639 .build_remote_project(
5640 project_a
5641 .read_with(cx_a, |project, _| project.remote_id())
5642 .unwrap(),
5643 cx_b,
5644 )
5645 .await;
5646
5647 // Client A opens some editors.
5648 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5649 let _editor_a1 = workspace_a
5650 .update(cx_a, |workspace, cx| {
5651 workspace.open_path((worktree_id, "1.txt"), true, cx)
5652 })
5653 .await
5654 .unwrap()
5655 .downcast::<Editor>()
5656 .unwrap();
5657
5658 // Client B starts following client A.
5659 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5660 let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5661 let leader_id = project_b.read_with(cx_b, |project, _| {
5662 project.collaborators().values().next().unwrap().peer_id
5663 });
5664 workspace_b
5665 .update(cx_b, |workspace, cx| {
5666 workspace
5667 .toggle_follow(&ToggleFollow(leader_id), cx)
5668 .unwrap()
5669 })
5670 .await
5671 .unwrap();
5672 assert_eq!(
5673 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5674 Some(leader_id)
5675 );
5676 let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5677 workspace
5678 .active_item(cx)
5679 .unwrap()
5680 .downcast::<Editor>()
5681 .unwrap()
5682 });
5683
5684 // When client B moves, it automatically stops following client A.
5685 editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
5686 assert_eq!(
5687 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5688 None
5689 );
5690
5691 workspace_b
5692 .update(cx_b, |workspace, cx| {
5693 workspace
5694 .toggle_follow(&ToggleFollow(leader_id), cx)
5695 .unwrap()
5696 })
5697 .await
5698 .unwrap();
5699 assert_eq!(
5700 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5701 Some(leader_id)
5702 );
5703
5704 // When client B edits, it automatically stops following client A.
5705 editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
5706 assert_eq!(
5707 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5708 None
5709 );
5710
5711 workspace_b
5712 .update(cx_b, |workspace, cx| {
5713 workspace
5714 .toggle_follow(&ToggleFollow(leader_id), cx)
5715 .unwrap()
5716 })
5717 .await
5718 .unwrap();
5719 assert_eq!(
5720 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5721 Some(leader_id)
5722 );
5723
5724 // When client B scrolls, it automatically stops following client A.
5725 editor_b2.update(cx_b, |editor, cx| {
5726 editor.set_scroll_position(vec2f(0., 3.), cx)
5727 });
5728 assert_eq!(
5729 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5730 None
5731 );
5732
5733 workspace_b
5734 .update(cx_b, |workspace, cx| {
5735 workspace
5736 .toggle_follow(&ToggleFollow(leader_id), cx)
5737 .unwrap()
5738 })
5739 .await
5740 .unwrap();
5741 assert_eq!(
5742 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5743 Some(leader_id)
5744 );
5745
5746 // When client B activates a different pane, it continues following client A in the original pane.
5747 workspace_b.update(cx_b, |workspace, cx| {
5748 workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
5749 });
5750 assert_eq!(
5751 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5752 Some(leader_id)
5753 );
5754
5755 workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
5756 assert_eq!(
5757 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5758 Some(leader_id)
5759 );
5760
5761 // When client B activates a different item in the original pane, it automatically stops following client A.
5762 workspace_b
5763 .update(cx_b, |workspace, cx| {
5764 workspace.open_path((worktree_id, "2.txt"), true, cx)
5765 })
5766 .await
5767 .unwrap();
5768 assert_eq!(
5769 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5770 None
5771 );
5772 }
5773
5774 #[gpui::test(iterations = 100)]
5775 async fn test_random_collaboration(
5776 cx: &mut TestAppContext,
5777 deterministic: Arc<Deterministic>,
5778 rng: StdRng,
5779 ) {
5780 cx.foreground().forbid_parking();
5781 let max_peers = env::var("MAX_PEERS")
5782 .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
5783 .unwrap_or(5);
5784 assert!(max_peers <= 5);
5785
5786 let max_operations = env::var("OPERATIONS")
5787 .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
5788 .unwrap_or(10);
5789
5790 let rng = Arc::new(Mutex::new(rng));
5791
5792 let guest_lang_registry = Arc::new(LanguageRegistry::test());
5793 let host_language_registry = Arc::new(LanguageRegistry::test());
5794
5795 let fs = FakeFs::new(cx.background());
5796 fs.insert_tree(
5797 "/_collab",
5798 json!({
5799 ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4"]"#
5800 }),
5801 )
5802 .await;
5803
5804 let mut server = TestServer::start(cx.foreground(), cx.background()).await;
5805 let mut clients = Vec::new();
5806 let mut user_ids = Vec::new();
5807 let mut op_start_signals = Vec::new();
5808 let files = Arc::new(Mutex::new(Vec::new()));
5809
5810 let mut next_entity_id = 100000;
5811 let mut host_cx = TestAppContext::new(
5812 cx.foreground_platform(),
5813 cx.platform(),
5814 deterministic.build_foreground(next_entity_id),
5815 deterministic.build_background(),
5816 cx.font_cache(),
5817 cx.leak_detector(),
5818 next_entity_id,
5819 );
5820 let host = server.create_client(&mut host_cx, "host").await;
5821 let host_project = host_cx.update(|cx| {
5822 Project::local(
5823 host.client.clone(),
5824 host.user_store.clone(),
5825 host_language_registry.clone(),
5826 fs.clone(),
5827 cx,
5828 )
5829 });
5830 let host_project_id = host_project
5831 .update(&mut host_cx, |p, _| p.next_remote_id())
5832 .await;
5833
5834 let (collab_worktree, _) = host_project
5835 .update(&mut host_cx, |project, cx| {
5836 project.find_or_create_local_worktree("/_collab", true, cx)
5837 })
5838 .await
5839 .unwrap();
5840 collab_worktree
5841 .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
5842 .await;
5843 host_project
5844 .update(&mut host_cx, |project, cx| project.share(cx))
5845 .await
5846 .unwrap();
5847
5848 // Set up fake language servers.
5849 let mut language = Language::new(
5850 LanguageConfig {
5851 name: "Rust".into(),
5852 path_suffixes: vec!["rs".to_string()],
5853 ..Default::default()
5854 },
5855 None,
5856 );
5857 let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
5858 name: "the-fake-language-server",
5859 capabilities: lsp::LanguageServer::full_capabilities(),
5860 initializer: Some(Box::new({
5861 let rng = rng.clone();
5862 let files = files.clone();
5863 let project = host_project.downgrade();
5864 move |fake_server: &mut FakeLanguageServer| {
5865 fake_server.handle_request::<lsp::request::Completion, _, _>(
5866 |_, _| async move {
5867 Ok(Some(lsp::CompletionResponse::Array(vec![
5868 lsp::CompletionItem {
5869 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
5870 range: lsp::Range::new(
5871 lsp::Position::new(0, 0),
5872 lsp::Position::new(0, 0),
5873 ),
5874 new_text: "the-new-text".to_string(),
5875 })),
5876 ..Default::default()
5877 },
5878 ])))
5879 },
5880 );
5881
5882 fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
5883 |_, _| async move {
5884 Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
5885 lsp::CodeAction {
5886 title: "the-code-action".to_string(),
5887 ..Default::default()
5888 },
5889 )]))
5890 },
5891 );
5892
5893 fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
5894 |params, _| async move {
5895 Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
5896 params.position,
5897 params.position,
5898 ))))
5899 },
5900 );
5901
5902 fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
5903 let files = files.clone();
5904 let rng = rng.clone();
5905 move |_, _| {
5906 let files = files.clone();
5907 let rng = rng.clone();
5908 async move {
5909 let files = files.lock();
5910 let mut rng = rng.lock();
5911 let count = rng.gen_range::<usize, _>(1..3);
5912 let files = (0..count)
5913 .map(|_| files.choose(&mut *rng).unwrap())
5914 .collect::<Vec<_>>();
5915 log::info!("LSP: Returning definitions in files {:?}", &files);
5916 Ok(Some(lsp::GotoDefinitionResponse::Array(
5917 files
5918 .into_iter()
5919 .map(|file| lsp::Location {
5920 uri: lsp::Url::from_file_path(file).unwrap(),
5921 range: Default::default(),
5922 })
5923 .collect(),
5924 )))
5925 }
5926 }
5927 });
5928
5929 fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
5930 let rng = rng.clone();
5931 let project = project.clone();
5932 move |params, mut cx| {
5933 let highlights = if let Some(project) = project.upgrade(&cx) {
5934 project.update(&mut cx, |project, cx| {
5935 let path = params
5936 .text_document_position_params
5937 .text_document
5938 .uri
5939 .to_file_path()
5940 .unwrap();
5941 let (worktree, relative_path) =
5942 project.find_local_worktree(&path, cx)?;
5943 let project_path =
5944 ProjectPath::from((worktree.read(cx).id(), relative_path));
5945 let buffer =
5946 project.get_open_buffer(&project_path, cx)?.read(cx);
5947
5948 let mut highlights = Vec::new();
5949 let highlight_count = rng.lock().gen_range(1..=5);
5950 let mut prev_end = 0;
5951 for _ in 0..highlight_count {
5952 let range =
5953 buffer.random_byte_range(prev_end, &mut *rng.lock());
5954
5955 highlights.push(lsp::DocumentHighlight {
5956 range: range_to_lsp(range.to_point_utf16(buffer)),
5957 kind: Some(lsp::DocumentHighlightKind::READ),
5958 });
5959 prev_end = range.end;
5960 }
5961 Some(highlights)
5962 })
5963 } else {
5964 None
5965 };
5966 async move { Ok(highlights) }
5967 }
5968 });
5969 }
5970 })),
5971 ..Default::default()
5972 });
5973 host_language_registry.add(Arc::new(language));
5974
5975 let op_start_signal = futures::channel::mpsc::unbounded();
5976 user_ids.push(host.current_user_id(&host_cx));
5977 op_start_signals.push(op_start_signal.0);
5978 clients.push(host_cx.foreground().spawn(host.simulate_host(
5979 host_project,
5980 files,
5981 op_start_signal.1,
5982 rng.clone(),
5983 host_cx,
5984 )));
5985
5986 let disconnect_host_at = if rng.lock().gen_bool(0.2) {
5987 rng.lock().gen_range(0..max_operations)
5988 } else {
5989 max_operations
5990 };
5991 let mut available_guests = vec![
5992 "guest-1".to_string(),
5993 "guest-2".to_string(),
5994 "guest-3".to_string(),
5995 "guest-4".to_string(),
5996 ];
5997 let mut operations = 0;
5998 while operations < max_operations {
5999 if operations == disconnect_host_at {
6000 server.disconnect_client(user_ids[0]);
6001 cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6002 drop(op_start_signals);
6003 let mut clients = futures::future::join_all(clients).await;
6004 cx.foreground().run_until_parked();
6005
6006 let (host, mut host_cx, host_err) = clients.remove(0);
6007 if let Some(host_err) = host_err {
6008 log::error!("host error - {}", host_err);
6009 }
6010 host.project
6011 .as_ref()
6012 .unwrap()
6013 .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
6014 for (guest, mut guest_cx, guest_err) in clients {
6015 if let Some(guest_err) = guest_err {
6016 log::error!("{} error - {}", guest.username, guest_err);
6017 }
6018 // TODO
6019 // let contacts = server
6020 // .store
6021 // .read()
6022 // .await
6023 // .contacts_for_user(guest.current_user_id(&guest_cx));
6024 // assert!(!contacts
6025 // .iter()
6026 // .flat_map(|contact| &contact.projects)
6027 // .any(|project| project.id == host_project_id));
6028 guest
6029 .project
6030 .as_ref()
6031 .unwrap()
6032 .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6033 guest_cx.update(|_| drop(guest));
6034 }
6035 host_cx.update(|_| drop(host));
6036
6037 return;
6038 }
6039
6040 let distribution = rng.lock().gen_range(0..100);
6041 match distribution {
6042 0..=19 if !available_guests.is_empty() => {
6043 let guest_ix = rng.lock().gen_range(0..available_guests.len());
6044 let guest_username = available_guests.remove(guest_ix);
6045 log::info!("Adding new connection for {}", guest_username);
6046 next_entity_id += 100000;
6047 let mut guest_cx = TestAppContext::new(
6048 cx.foreground_platform(),
6049 cx.platform(),
6050 deterministic.build_foreground(next_entity_id),
6051 deterministic.build_background(),
6052 cx.font_cache(),
6053 cx.leak_detector(),
6054 next_entity_id,
6055 );
6056 let guest = server.create_client(&mut guest_cx, &guest_username).await;
6057 let guest_project = Project::remote(
6058 host_project_id,
6059 guest.client.clone(),
6060 guest.user_store.clone(),
6061 guest_lang_registry.clone(),
6062 FakeFs::new(cx.background()),
6063 &mut guest_cx.to_async(),
6064 )
6065 .await
6066 .unwrap();
6067 let op_start_signal = futures::channel::mpsc::unbounded();
6068 user_ids.push(guest.current_user_id(&guest_cx));
6069 op_start_signals.push(op_start_signal.0);
6070 clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
6071 guest_username.clone(),
6072 guest_project,
6073 op_start_signal.1,
6074 rng.clone(),
6075 guest_cx,
6076 )));
6077
6078 log::info!("Added connection for {}", guest_username);
6079 operations += 1;
6080 }
6081 20..=29 if clients.len() > 1 => {
6082 log::info!("Removing guest");
6083 let guest_ix = rng.lock().gen_range(1..clients.len());
6084 let removed_guest_id = user_ids.remove(guest_ix);
6085 let guest = clients.remove(guest_ix);
6086 op_start_signals.remove(guest_ix);
6087 server.disconnect_client(removed_guest_id);
6088 cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6089 let (guest, mut guest_cx, guest_err) = guest.await;
6090 if let Some(guest_err) = guest_err {
6091 log::error!("{} error - {}", guest.username, guest_err);
6092 }
6093 guest
6094 .project
6095 .as_ref()
6096 .unwrap()
6097 .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6098 // TODO
6099 // for user_id in &user_ids {
6100 // for contact in server.store.read().await.contacts_for_user(*user_id) {
6101 // assert_ne!(
6102 // contact.user_id, removed_guest_id.0 as u64,
6103 // "removed guest is still a contact of another peer"
6104 // );
6105 // for project in contact.projects {
6106 // for project_guest_id in project.guests {
6107 // assert_ne!(
6108 // project_guest_id, removed_guest_id.0 as u64,
6109 // "removed guest appears as still participating on a project"
6110 // );
6111 // }
6112 // }
6113 // }
6114 // }
6115
6116 log::info!("{} removed", guest.username);
6117 available_guests.push(guest.username.clone());
6118 guest_cx.update(|_| drop(guest));
6119
6120 operations += 1;
6121 }
6122 _ => {
6123 while operations < max_operations && rng.lock().gen_bool(0.7) {
6124 op_start_signals
6125 .choose(&mut *rng.lock())
6126 .unwrap()
6127 .unbounded_send(())
6128 .unwrap();
6129 operations += 1;
6130 }
6131
6132 if rng.lock().gen_bool(0.8) {
6133 cx.foreground().run_until_parked();
6134 }
6135 }
6136 }
6137 }
6138
6139 drop(op_start_signals);
6140 let mut clients = futures::future::join_all(clients).await;
6141 cx.foreground().run_until_parked();
6142
6143 let (host_client, mut host_cx, host_err) = clients.remove(0);
6144 if let Some(host_err) = host_err {
6145 panic!("host error - {}", host_err);
6146 }
6147 let host_project = host_client.project.as_ref().unwrap();
6148 let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
6149 project
6150 .worktrees(cx)
6151 .map(|worktree| {
6152 let snapshot = worktree.read(cx).snapshot();
6153 (snapshot.id(), snapshot)
6154 })
6155 .collect::<BTreeMap<_, _>>()
6156 });
6157
6158 host_client
6159 .project
6160 .as_ref()
6161 .unwrap()
6162 .read_with(&host_cx, |project, cx| project.check_invariants(cx));
6163
6164 for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
6165 if let Some(guest_err) = guest_err {
6166 panic!("{} error - {}", guest_client.username, guest_err);
6167 }
6168 let worktree_snapshots =
6169 guest_client
6170 .project
6171 .as_ref()
6172 .unwrap()
6173 .read_with(&guest_cx, |project, cx| {
6174 project
6175 .worktrees(cx)
6176 .map(|worktree| {
6177 let worktree = worktree.read(cx);
6178 (worktree.id(), worktree.snapshot())
6179 })
6180 .collect::<BTreeMap<_, _>>()
6181 });
6182
6183 assert_eq!(
6184 worktree_snapshots.keys().collect::<Vec<_>>(),
6185 host_worktree_snapshots.keys().collect::<Vec<_>>(),
6186 "{} has different worktrees than the host",
6187 guest_client.username
6188 );
6189 for (id, host_snapshot) in &host_worktree_snapshots {
6190 let guest_snapshot = &worktree_snapshots[id];
6191 assert_eq!(
6192 guest_snapshot.root_name(),
6193 host_snapshot.root_name(),
6194 "{} has different root name than the host for worktree {}",
6195 guest_client.username,
6196 id
6197 );
6198 assert_eq!(
6199 guest_snapshot.entries(false).collect::<Vec<_>>(),
6200 host_snapshot.entries(false).collect::<Vec<_>>(),
6201 "{} has different snapshot than the host for worktree {}",
6202 guest_client.username,
6203 id
6204 );
6205 assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id());
6206 }
6207
6208 guest_client
6209 .project
6210 .as_ref()
6211 .unwrap()
6212 .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
6213
6214 for guest_buffer in &guest_client.buffers {
6215 let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
6216 let host_buffer = host_project.read_with(&host_cx, |project, cx| {
6217 project.buffer_for_id(buffer_id, cx).expect(&format!(
6218 "host does not have buffer for guest:{}, peer:{}, id:{}",
6219 guest_client.username, guest_client.peer_id, buffer_id
6220 ))
6221 });
6222 let path = host_buffer
6223 .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
6224
6225 assert_eq!(
6226 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
6227 0,
6228 "{}, buffer {}, path {:?} has deferred operations",
6229 guest_client.username,
6230 buffer_id,
6231 path,
6232 );
6233 assert_eq!(
6234 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
6235 host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
6236 "{}, buffer {}, path {:?}, differs from the host's buffer",
6237 guest_client.username,
6238 buffer_id,
6239 path
6240 );
6241 }
6242
6243 guest_cx.update(|_| drop(guest_client));
6244 }
6245
6246 host_cx.update(|_| drop(host_client));
6247 }
6248
6249 struct TestServer {
6250 peer: Arc<Peer>,
6251 app_state: Arc<AppState>,
6252 server: Arc<Server>,
6253 foreground: Rc<executor::Foreground>,
6254 notifications: mpsc::UnboundedReceiver<()>,
6255 connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
6256 forbid_connections: Arc<AtomicBool>,
6257 _test_db: TestDb,
6258 }
6259
6260 impl TestServer {
6261 async fn start(
6262 foreground: Rc<executor::Foreground>,
6263 background: Arc<executor::Background>,
6264 ) -> Self {
6265 let test_db = TestDb::fake(background);
6266 let app_state = Self::build_app_state(&test_db).await;
6267 let peer = Peer::new();
6268 let notifications = mpsc::unbounded();
6269 let server = Server::new(app_state.clone(), Some(notifications.0));
6270 Self {
6271 peer,
6272 app_state,
6273 server,
6274 foreground,
6275 notifications: notifications.1,
6276 connection_killers: Default::default(),
6277 forbid_connections: Default::default(),
6278 _test_db: test_db,
6279 }
6280 }
6281
6282 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
6283 cx.update(|cx| {
6284 let settings = Settings::test(cx);
6285 cx.set_global(settings);
6286 });
6287
6288 let http = FakeHttpClient::with_404_response();
6289 let user_id =
6290 if let Ok(Some(user)) = self.app_state.db.get_user_by_github_login(name).await {
6291 user.id
6292 } else {
6293 self.app_state.db.create_user(name, false).await.unwrap()
6294 };
6295 let client_name = name.to_string();
6296 let mut client = Client::new(http.clone());
6297 let server = self.server.clone();
6298 let connection_killers = self.connection_killers.clone();
6299 let forbid_connections = self.forbid_connections.clone();
6300 let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
6301
6302 Arc::get_mut(&mut client)
6303 .unwrap()
6304 .override_authenticate(move |cx| {
6305 cx.spawn(|_| async move {
6306 let access_token = "the-token".to_string();
6307 Ok(Credentials {
6308 user_id: user_id.0 as u64,
6309 access_token,
6310 })
6311 })
6312 })
6313 .override_establish_connection(move |credentials, cx| {
6314 assert_eq!(credentials.user_id, user_id.0 as u64);
6315 assert_eq!(credentials.access_token, "the-token");
6316
6317 let server = server.clone();
6318 let connection_killers = connection_killers.clone();
6319 let forbid_connections = forbid_connections.clone();
6320 let client_name = client_name.clone();
6321 let connection_id_tx = connection_id_tx.clone();
6322 cx.spawn(move |cx| async move {
6323 if forbid_connections.load(SeqCst) {
6324 Err(EstablishConnectionError::other(anyhow!(
6325 "server is forbidding connections"
6326 )))
6327 } else {
6328 let (client_conn, server_conn, killed) =
6329 Connection::in_memory(cx.background());
6330 connection_killers.lock().insert(user_id, killed);
6331 cx.background()
6332 .spawn(server.handle_connection(
6333 server_conn,
6334 client_name,
6335 user_id,
6336 Some(connection_id_tx),
6337 cx.background(),
6338 ))
6339 .detach();
6340 Ok(client_conn)
6341 }
6342 })
6343 });
6344
6345 client
6346 .authenticate_and_connect(false, &cx.to_async())
6347 .await
6348 .unwrap();
6349
6350 Channel::init(&client);
6351 Project::init(&client);
6352 cx.update(|cx| {
6353 workspace::init(&client, cx);
6354 });
6355
6356 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
6357 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
6358
6359 let client = TestClient {
6360 client,
6361 peer_id,
6362 username: name.to_string(),
6363 user_store,
6364 language_registry: Arc::new(LanguageRegistry::test()),
6365 project: Default::default(),
6366 buffers: Default::default(),
6367 };
6368 client.wait_for_current_user(cx).await;
6369 client
6370 }
6371
6372 fn disconnect_client(&self, user_id: UserId) {
6373 self.connection_killers
6374 .lock()
6375 .remove(&user_id)
6376 .unwrap()
6377 .store(true, SeqCst);
6378 }
6379
6380 fn forbid_connections(&self) {
6381 self.forbid_connections.store(true, SeqCst);
6382 }
6383
6384 fn allow_connections(&self) {
6385 self.forbid_connections.store(false, SeqCst);
6386 }
6387
6388 async fn make_contacts(&self, mut clients: Vec<(&TestClient, &mut TestAppContext)>) {
6389 while let Some((client_a, cx_a)) = clients.pop() {
6390 for (client_b, cx_b) in &mut clients {
6391 client_a
6392 .user_store
6393 .update(cx_a, |store, _| {
6394 store.request_contact(client_b.user_id().unwrap())
6395 })
6396 .await
6397 .unwrap();
6398 cx_a.foreground().run_until_parked();
6399 client_b
6400 .user_store
6401 .update(*cx_b, |store, _| {
6402 store.respond_to_contact_request(client_a.user_id().unwrap(), true)
6403 })
6404 .await
6405 .unwrap();
6406 }
6407 }
6408 }
6409
6410 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
6411 Arc::new(AppState {
6412 db: test_db.db().clone(),
6413 api_token: Default::default(),
6414 })
6415 }
6416
6417 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
6418 self.server.store.read().await
6419 }
6420
6421 async fn condition<F>(&mut self, mut predicate: F)
6422 where
6423 F: FnMut(&Store) -> bool,
6424 {
6425 assert!(
6426 self.foreground.parking_forbidden(),
6427 "you must call forbid_parking to use server conditions so we don't block indefinitely"
6428 );
6429 while !(predicate)(&*self.server.store.read().await) {
6430 self.foreground.start_waiting();
6431 self.notifications.next().await;
6432 self.foreground.finish_waiting();
6433 }
6434 }
6435 }
6436
6437 impl Deref for TestServer {
6438 type Target = Server;
6439
6440 fn deref(&self) -> &Self::Target {
6441 &self.server
6442 }
6443 }
6444
6445 impl Drop for TestServer {
6446 fn drop(&mut self) {
6447 self.peer.reset();
6448 }
6449 }
6450
6451 struct TestClient {
6452 client: Arc<Client>,
6453 username: String,
6454 pub peer_id: PeerId,
6455 pub user_store: ModelHandle<UserStore>,
6456 language_registry: Arc<LanguageRegistry>,
6457 project: Option<ModelHandle<Project>>,
6458 buffers: HashSet<ModelHandle<language::Buffer>>,
6459 }
6460
6461 impl Deref for TestClient {
6462 type Target = Arc<Client>;
6463
6464 fn deref(&self) -> &Self::Target {
6465 &self.client
6466 }
6467 }
6468
6469 struct ContactsSummary {
6470 pub current: Vec<String>,
6471 pub outgoing_requests: Vec<String>,
6472 pub incoming_requests: Vec<String>,
6473 }
6474
6475 impl TestClient {
6476 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
6477 UserId::from_proto(
6478 self.user_store
6479 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
6480 )
6481 }
6482
6483 async fn wait_for_current_user(&self, cx: &TestAppContext) {
6484 let mut authed_user = self
6485 .user_store
6486 .read_with(cx, |user_store, _| user_store.watch_current_user());
6487 while authed_user.next().await.unwrap().is_none() {}
6488 }
6489
6490 fn clear_contacts(&self, cx: &mut TestAppContext) {
6491 self.user_store.update(cx, |store, _| {
6492 store.clear_contacts();
6493 });
6494 }
6495
6496 fn summarize_contacts(&self, cx: &TestAppContext) -> ContactsSummary {
6497 self.user_store.read_with(cx, |store, _| ContactsSummary {
6498 current: store
6499 .contacts()
6500 .iter()
6501 .map(|contact| contact.user.github_login.clone())
6502 .collect(),
6503 outgoing_requests: store
6504 .outgoing_contact_requests()
6505 .iter()
6506 .map(|user| user.github_login.clone())
6507 .collect(),
6508 incoming_requests: store
6509 .incoming_contact_requests()
6510 .iter()
6511 .map(|user| user.github_login.clone())
6512 .collect(),
6513 })
6514 }
6515
6516 async fn build_local_project(
6517 &mut self,
6518 fs: Arc<FakeFs>,
6519 root_path: impl AsRef<Path>,
6520 cx: &mut TestAppContext,
6521 ) -> (ModelHandle<Project>, WorktreeId) {
6522 let project = cx.update(|cx| {
6523 Project::local(
6524 self.client.clone(),
6525 self.user_store.clone(),
6526 self.language_registry.clone(),
6527 fs,
6528 cx,
6529 )
6530 });
6531 self.project = Some(project.clone());
6532 let (worktree, _) = project
6533 .update(cx, |p, cx| {
6534 p.find_or_create_local_worktree(root_path, true, cx)
6535 })
6536 .await
6537 .unwrap();
6538 worktree
6539 .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
6540 .await;
6541 project
6542 .update(cx, |project, _| project.next_remote_id())
6543 .await;
6544 (project, worktree.read_with(cx, |tree, _| tree.id()))
6545 }
6546
6547 async fn build_remote_project(
6548 &mut self,
6549 project_id: u64,
6550 cx: &mut TestAppContext,
6551 ) -> ModelHandle<Project> {
6552 let project = Project::remote(
6553 project_id,
6554 self.client.clone(),
6555 self.user_store.clone(),
6556 self.language_registry.clone(),
6557 FakeFs::new(cx.background()),
6558 &mut cx.to_async(),
6559 )
6560 .await
6561 .unwrap();
6562 self.project = Some(project.clone());
6563 project
6564 }
6565
6566 fn build_workspace(
6567 &self,
6568 project: &ModelHandle<Project>,
6569 cx: &mut TestAppContext,
6570 ) -> ViewHandle<Workspace> {
6571 let (window_id, _) = cx.add_window(|_| EmptyView);
6572 cx.add_view(window_id, |cx| {
6573 let fs = project.read(cx).fs().clone();
6574 Workspace::new(
6575 &WorkspaceParams {
6576 fs,
6577 project: project.clone(),
6578 user_store: self.user_store.clone(),
6579 languages: self.language_registry.clone(),
6580 themes: ThemeRegistry::new((), cx.font_cache().clone()),
6581 channel_list: cx.add_model(|cx| {
6582 ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
6583 }),
6584 client: self.client.clone(),
6585 },
6586 cx,
6587 )
6588 })
6589 }
6590
6591 async fn simulate_host(
6592 mut self,
6593 project: ModelHandle<Project>,
6594 files: Arc<Mutex<Vec<PathBuf>>>,
6595 op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6596 rng: Arc<Mutex<StdRng>>,
6597 mut cx: TestAppContext,
6598 ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6599 async fn simulate_host_internal(
6600 client: &mut TestClient,
6601 project: ModelHandle<Project>,
6602 files: Arc<Mutex<Vec<PathBuf>>>,
6603 mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6604 rng: Arc<Mutex<StdRng>>,
6605 cx: &mut TestAppContext,
6606 ) -> anyhow::Result<()> {
6607 let fs = project.read_with(cx, |project, _| project.fs().clone());
6608
6609 while op_start_signal.next().await.is_some() {
6610 let distribution = rng.lock().gen_range::<usize, _>(0..100);
6611 match distribution {
6612 0..=20 if !files.lock().is_empty() => {
6613 let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
6614 let mut path = path.as_path();
6615 while let Some(parent_path) = path.parent() {
6616 path = parent_path;
6617 if rng.lock().gen() {
6618 break;
6619 }
6620 }
6621
6622 log::info!("Host: find/create local worktree {:?}", path);
6623 let find_or_create_worktree = project.update(cx, |project, cx| {
6624 project.find_or_create_local_worktree(path, true, cx)
6625 });
6626 if rng.lock().gen() {
6627 cx.background().spawn(find_or_create_worktree).detach();
6628 } else {
6629 find_or_create_worktree.await?;
6630 }
6631 }
6632 10..=80 if !files.lock().is_empty() => {
6633 let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6634 let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
6635 let (worktree, path) = project
6636 .update(cx, |project, cx| {
6637 project.find_or_create_local_worktree(
6638 file.clone(),
6639 true,
6640 cx,
6641 )
6642 })
6643 .await?;
6644 let project_path =
6645 worktree.read_with(cx, |worktree, _| (worktree.id(), path));
6646 log::info!(
6647 "Host: opening path {:?}, worktree {}, relative_path {:?}",
6648 file,
6649 project_path.0,
6650 project_path.1
6651 );
6652 let buffer = project
6653 .update(cx, |project, cx| project.open_buffer(project_path, cx))
6654 .await
6655 .unwrap();
6656 client.buffers.insert(buffer.clone());
6657 buffer
6658 } else {
6659 client
6660 .buffers
6661 .iter()
6662 .choose(&mut *rng.lock())
6663 .unwrap()
6664 .clone()
6665 };
6666
6667 if rng.lock().gen_bool(0.1) {
6668 cx.update(|cx| {
6669 log::info!(
6670 "Host: dropping buffer {:?}",
6671 buffer.read(cx).file().unwrap().full_path(cx)
6672 );
6673 client.buffers.remove(&buffer);
6674 drop(buffer);
6675 });
6676 } else {
6677 buffer.update(cx, |buffer, cx| {
6678 log::info!(
6679 "Host: updating buffer {:?} ({})",
6680 buffer.file().unwrap().full_path(cx),
6681 buffer.remote_id()
6682 );
6683
6684 if rng.lock().gen_bool(0.7) {
6685 buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6686 } else {
6687 buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6688 }
6689 });
6690 }
6691 }
6692 _ => loop {
6693 let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
6694 let mut path = PathBuf::new();
6695 path.push("/");
6696 for _ in 0..path_component_count {
6697 let letter = rng.lock().gen_range(b'a'..=b'z');
6698 path.push(std::str::from_utf8(&[letter]).unwrap());
6699 }
6700 path.set_extension("rs");
6701 let parent_path = path.parent().unwrap();
6702
6703 log::info!("Host: creating file {:?}", path,);
6704
6705 if fs.create_dir(&parent_path).await.is_ok()
6706 && fs.create_file(&path, Default::default()).await.is_ok()
6707 {
6708 files.lock().push(path);
6709 break;
6710 } else {
6711 log::info!("Host: cannot create file");
6712 }
6713 },
6714 }
6715
6716 cx.background().simulate_random_delay().await;
6717 }
6718
6719 Ok(())
6720 }
6721
6722 let result = simulate_host_internal(
6723 &mut self,
6724 project.clone(),
6725 files,
6726 op_start_signal,
6727 rng,
6728 &mut cx,
6729 )
6730 .await;
6731 log::info!("Host done");
6732 self.project = Some(project);
6733 (self, cx, result.err())
6734 }
6735
6736 pub async fn simulate_guest(
6737 mut self,
6738 guest_username: String,
6739 project: ModelHandle<Project>,
6740 op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6741 rng: Arc<Mutex<StdRng>>,
6742 mut cx: TestAppContext,
6743 ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6744 async fn simulate_guest_internal(
6745 client: &mut TestClient,
6746 guest_username: &str,
6747 project: ModelHandle<Project>,
6748 mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6749 rng: Arc<Mutex<StdRng>>,
6750 cx: &mut TestAppContext,
6751 ) -> anyhow::Result<()> {
6752 while op_start_signal.next().await.is_some() {
6753 let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6754 let worktree = if let Some(worktree) =
6755 project.read_with(cx, |project, cx| {
6756 project
6757 .worktrees(&cx)
6758 .filter(|worktree| {
6759 let worktree = worktree.read(cx);
6760 worktree.is_visible()
6761 && worktree.entries(false).any(|e| e.is_file())
6762 })
6763 .choose(&mut *rng.lock())
6764 }) {
6765 worktree
6766 } else {
6767 cx.background().simulate_random_delay().await;
6768 continue;
6769 };
6770
6771 let (worktree_root_name, project_path) =
6772 worktree.read_with(cx, |worktree, _| {
6773 let entry = worktree
6774 .entries(false)
6775 .filter(|e| e.is_file())
6776 .choose(&mut *rng.lock())
6777 .unwrap();
6778 (
6779 worktree.root_name().to_string(),
6780 (worktree.id(), entry.path.clone()),
6781 )
6782 });
6783 log::info!(
6784 "{}: opening path {:?} in worktree {} ({})",
6785 guest_username,
6786 project_path.1,
6787 project_path.0,
6788 worktree_root_name,
6789 );
6790 let buffer = project
6791 .update(cx, |project, cx| {
6792 project.open_buffer(project_path.clone(), cx)
6793 })
6794 .await?;
6795 log::info!(
6796 "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
6797 guest_username,
6798 project_path.1,
6799 project_path.0,
6800 worktree_root_name,
6801 buffer.read_with(cx, |buffer, _| buffer.remote_id())
6802 );
6803 client.buffers.insert(buffer.clone());
6804 buffer
6805 } else {
6806 client
6807 .buffers
6808 .iter()
6809 .choose(&mut *rng.lock())
6810 .unwrap()
6811 .clone()
6812 };
6813
6814 let choice = rng.lock().gen_range(0..100);
6815 match choice {
6816 0..=9 => {
6817 cx.update(|cx| {
6818 log::info!(
6819 "{}: dropping buffer {:?}",
6820 guest_username,
6821 buffer.read(cx).file().unwrap().full_path(cx)
6822 );
6823 client.buffers.remove(&buffer);
6824 drop(buffer);
6825 });
6826 }
6827 10..=19 => {
6828 let completions = project.update(cx, |project, cx| {
6829 log::info!(
6830 "{}: requesting completions for buffer {} ({:?})",
6831 guest_username,
6832 buffer.read(cx).remote_id(),
6833 buffer.read(cx).file().unwrap().full_path(cx)
6834 );
6835 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6836 project.completions(&buffer, offset, cx)
6837 });
6838 let completions = cx.background().spawn(async move {
6839 completions
6840 .await
6841 .map_err(|err| anyhow!("completions request failed: {:?}", err))
6842 });
6843 if rng.lock().gen_bool(0.3) {
6844 log::info!("{}: detaching completions request", guest_username);
6845 cx.update(|cx| completions.detach_and_log_err(cx));
6846 } else {
6847 completions.await?;
6848 }
6849 }
6850 20..=29 => {
6851 let code_actions = project.update(cx, |project, cx| {
6852 log::info!(
6853 "{}: requesting code actions for buffer {} ({:?})",
6854 guest_username,
6855 buffer.read(cx).remote_id(),
6856 buffer.read(cx).file().unwrap().full_path(cx)
6857 );
6858 let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
6859 project.code_actions(&buffer, range, cx)
6860 });
6861 let code_actions = cx.background().spawn(async move {
6862 code_actions.await.map_err(|err| {
6863 anyhow!("code actions request failed: {:?}", err)
6864 })
6865 });
6866 if rng.lock().gen_bool(0.3) {
6867 log::info!("{}: detaching code actions request", guest_username);
6868 cx.update(|cx| code_actions.detach_and_log_err(cx));
6869 } else {
6870 code_actions.await?;
6871 }
6872 }
6873 30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
6874 let (requested_version, save) = buffer.update(cx, |buffer, cx| {
6875 log::info!(
6876 "{}: saving buffer {} ({:?})",
6877 guest_username,
6878 buffer.remote_id(),
6879 buffer.file().unwrap().full_path(cx)
6880 );
6881 (buffer.version(), buffer.save(cx))
6882 });
6883 let save = cx.background().spawn(async move {
6884 let (saved_version, _) = save
6885 .await
6886 .map_err(|err| anyhow!("save request failed: {:?}", err))?;
6887 assert!(saved_version.observed_all(&requested_version));
6888 Ok::<_, anyhow::Error>(())
6889 });
6890 if rng.lock().gen_bool(0.3) {
6891 log::info!("{}: detaching save request", guest_username);
6892 cx.update(|cx| save.detach_and_log_err(cx));
6893 } else {
6894 save.await?;
6895 }
6896 }
6897 40..=44 => {
6898 let prepare_rename = project.update(cx, |project, cx| {
6899 log::info!(
6900 "{}: preparing rename for buffer {} ({:?})",
6901 guest_username,
6902 buffer.read(cx).remote_id(),
6903 buffer.read(cx).file().unwrap().full_path(cx)
6904 );
6905 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6906 project.prepare_rename(buffer, offset, cx)
6907 });
6908 let prepare_rename = cx.background().spawn(async move {
6909 prepare_rename.await.map_err(|err| {
6910 anyhow!("prepare rename request failed: {:?}", err)
6911 })
6912 });
6913 if rng.lock().gen_bool(0.3) {
6914 log::info!("{}: detaching prepare rename request", guest_username);
6915 cx.update(|cx| prepare_rename.detach_and_log_err(cx));
6916 } else {
6917 prepare_rename.await?;
6918 }
6919 }
6920 45..=49 => {
6921 let definitions = project.update(cx, |project, cx| {
6922 log::info!(
6923 "{}: requesting definitions for buffer {} ({:?})",
6924 guest_username,
6925 buffer.read(cx).remote_id(),
6926 buffer.read(cx).file().unwrap().full_path(cx)
6927 );
6928 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6929 project.definition(&buffer, offset, cx)
6930 });
6931 let definitions = cx.background().spawn(async move {
6932 definitions
6933 .await
6934 .map_err(|err| anyhow!("definitions request failed: {:?}", err))
6935 });
6936 if rng.lock().gen_bool(0.3) {
6937 log::info!("{}: detaching definitions request", guest_username);
6938 cx.update(|cx| definitions.detach_and_log_err(cx));
6939 } else {
6940 client
6941 .buffers
6942 .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
6943 }
6944 }
6945 50..=54 => {
6946 let highlights = project.update(cx, |project, cx| {
6947 log::info!(
6948 "{}: requesting highlights for buffer {} ({:?})",
6949 guest_username,
6950 buffer.read(cx).remote_id(),
6951 buffer.read(cx).file().unwrap().full_path(cx)
6952 );
6953 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6954 project.document_highlights(&buffer, offset, cx)
6955 });
6956 let highlights = cx.background().spawn(async move {
6957 highlights
6958 .await
6959 .map_err(|err| anyhow!("highlights request failed: {:?}", err))
6960 });
6961 if rng.lock().gen_bool(0.3) {
6962 log::info!("{}: detaching highlights request", guest_username);
6963 cx.update(|cx| highlights.detach_and_log_err(cx));
6964 } else {
6965 highlights.await?;
6966 }
6967 }
6968 55..=59 => {
6969 let search = project.update(cx, |project, cx| {
6970 let query = rng.lock().gen_range('a'..='z');
6971 log::info!("{}: project-wide search {:?}", guest_username, query);
6972 project.search(SearchQuery::text(query, false, false), cx)
6973 });
6974 let search = cx.background().spawn(async move {
6975 search
6976 .await
6977 .map_err(|err| anyhow!("search request failed: {:?}", err))
6978 });
6979 if rng.lock().gen_bool(0.3) {
6980 log::info!("{}: detaching search request", guest_username);
6981 cx.update(|cx| search.detach_and_log_err(cx));
6982 } else {
6983 client.buffers.extend(search.await?.into_keys());
6984 }
6985 }
6986 60..=69 => {
6987 let worktree = project
6988 .read_with(cx, |project, cx| {
6989 project
6990 .worktrees(&cx)
6991 .filter(|worktree| {
6992 let worktree = worktree.read(cx);
6993 worktree.is_visible()
6994 && worktree.entries(false).any(|e| e.is_file())
6995 && worktree
6996 .root_entry()
6997 .map_or(false, |e| e.is_dir())
6998 })
6999 .choose(&mut *rng.lock())
7000 })
7001 .unwrap();
7002 let (worktree_id, worktree_root_name) = worktree
7003 .read_with(cx, |worktree, _| {
7004 (worktree.id(), worktree.root_name().to_string())
7005 });
7006
7007 let mut new_name = String::new();
7008 for _ in 0..10 {
7009 let letter = rng.lock().gen_range('a'..='z');
7010 new_name.push(letter);
7011 }
7012 let mut new_path = PathBuf::new();
7013 new_path.push(new_name);
7014 new_path.set_extension("rs");
7015 log::info!(
7016 "{}: creating {:?} in worktree {} ({})",
7017 guest_username,
7018 new_path,
7019 worktree_id,
7020 worktree_root_name,
7021 );
7022 project
7023 .update(cx, |project, cx| {
7024 project.create_entry((worktree_id, new_path), false, cx)
7025 })
7026 .unwrap()
7027 .await?;
7028 }
7029 _ => {
7030 buffer.update(cx, |buffer, cx| {
7031 log::info!(
7032 "{}: updating buffer {} ({:?})",
7033 guest_username,
7034 buffer.remote_id(),
7035 buffer.file().unwrap().full_path(cx)
7036 );
7037 if rng.lock().gen_bool(0.7) {
7038 buffer.randomly_edit(&mut *rng.lock(), 5, cx);
7039 } else {
7040 buffer.randomly_undo_redo(&mut *rng.lock(), cx);
7041 }
7042 });
7043 }
7044 }
7045 cx.background().simulate_random_delay().await;
7046 }
7047 Ok(())
7048 }
7049
7050 let result = simulate_guest_internal(
7051 &mut self,
7052 &guest_username,
7053 project.clone(),
7054 op_start_signal,
7055 rng,
7056 &mut cx,
7057 )
7058 .await;
7059 log::info!("{}: done", guest_username);
7060
7061 self.project = Some(project);
7062 (self, cx, result.err())
7063 }
7064 }
7065
7066 impl Drop for TestClient {
7067 fn drop(&mut self) {
7068 self.client.tear_down();
7069 }
7070 }
7071
7072 impl Executor for Arc<gpui::executor::Background> {
7073 type Sleep = gpui::executor::Timer;
7074
7075 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
7076 self.spawn(future).detach();
7077 }
7078
7079 fn sleep(&self, duration: Duration) -> Self::Sleep {
7080 self.as_ref().timer(duration)
7081 }
7082 }
7083
7084 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
7085 channel
7086 .messages()
7087 .cursor::<()>()
7088 .map(|m| {
7089 (
7090 m.sender.github_login.clone(),
7091 m.body.clone(),
7092 m.is_pending(),
7093 )
7094 })
7095 .collect()
7096 }
7097
7098 struct EmptyView;
7099
7100 impl gpui::Entity for EmptyView {
7101 type Event = ();
7102 }
7103
7104 impl gpui::View for EmptyView {
7105 fn ui_name() -> &'static str {
7106 "empty view"
7107 }
7108
7109 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
7110 gpui::Element::boxed(gpui::elements::Empty)
7111 }
7112 }
7113}