1mod store;
2
3use crate::{
4 auth,
5 db::{ChannelId, MessageId, UserId},
6 AppState, Result,
7};
8use anyhow::anyhow;
9use async_tungstenite::tungstenite::{
10 protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
11};
12use axum::{
13 body::Body,
14 extract::{
15 ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
16 ConnectInfo, WebSocketUpgrade,
17 },
18 headers::{Header, HeaderName},
19 http::StatusCode,
20 middleware,
21 response::{IntoResponse, Response},
22 routing::get,
23 Extension, Router, TypedHeader,
24};
25use collections::{HashMap, HashSet};
26use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
27use lazy_static::lazy_static;
28use rpc::{
29 proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
30 Connection, ConnectionId, Peer, TypedEnvelope,
31};
32use std::{
33 any::TypeId,
34 future::Future,
35 marker::PhantomData,
36 net::SocketAddr,
37 ops::{Deref, DerefMut},
38 rc::Rc,
39 sync::Arc,
40 time::Duration,
41};
42use store::{Store, Worktree};
43use time::OffsetDateTime;
44use tokio::{
45 sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
46 time::Sleep,
47};
48use tower::ServiceBuilder;
49use tracing::{info_span, instrument, Instrument};
50
51type MessageHandler =
52 Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
53
54pub struct Server {
55 peer: Arc<Peer>,
56 store: RwLock<Store>,
57 app_state: Arc<AppState>,
58 handlers: HashMap<TypeId, MessageHandler>,
59 notifications: Option<mpsc::UnboundedSender<()>>,
60}
61
62pub trait Executor: Send + Clone {
63 type Sleep: Send + Future;
64 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
65 fn sleep(&self, duration: Duration) -> Self::Sleep;
66}
67
68#[derive(Clone)]
69pub struct RealExecutor;
70
71const MESSAGE_COUNT_PER_PAGE: usize = 100;
72const MAX_MESSAGE_LEN: usize = 1024;
73
74struct StoreReadGuard<'a> {
75 guard: RwLockReadGuard<'a, Store>,
76 _not_send: PhantomData<Rc<()>>,
77}
78
79struct StoreWriteGuard<'a> {
80 guard: RwLockWriteGuard<'a, Store>,
81 _not_send: PhantomData<Rc<()>>,
82}
83
84impl Server {
85 pub fn new(
86 app_state: Arc<AppState>,
87 notifications: Option<mpsc::UnboundedSender<()>>,
88 ) -> Arc<Self> {
89 let mut server = Self {
90 peer: Peer::new(),
91 app_state,
92 store: Default::default(),
93 handlers: Default::default(),
94 notifications,
95 };
96
97 server
98 .add_request_handler(Server::ping)
99 .add_request_handler(Server::register_project)
100 .add_message_handler(Server::unregister_project)
101 .add_request_handler(Server::share_project)
102 .add_message_handler(Server::unshare_project)
103 .add_sync_request_handler(Server::join_project)
104 .add_message_handler(Server::leave_project)
105 .add_request_handler(Server::register_worktree)
106 .add_message_handler(Server::unregister_worktree)
107 .add_request_handler(Server::update_worktree)
108 .add_message_handler(Server::start_language_server)
109 .add_message_handler(Server::update_language_server)
110 .add_message_handler(Server::update_diagnostic_summary)
111 .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
112 .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
113 .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
114 .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
115 .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
116 .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
117 .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
118 .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
119 .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
120 .add_request_handler(
121 Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
122 )
123 .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
124 .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
125 .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
126 .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
127 .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
128 .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
129 .add_request_handler(Server::update_buffer)
130 .add_message_handler(Server::update_buffer_file)
131 .add_message_handler(Server::buffer_reloaded)
132 .add_message_handler(Server::buffer_saved)
133 .add_request_handler(Server::save_buffer)
134 .add_request_handler(Server::get_channels)
135 .add_request_handler(Server::get_users)
136 .add_request_handler(Server::join_channel)
137 .add_message_handler(Server::leave_channel)
138 .add_request_handler(Server::send_channel_message)
139 .add_request_handler(Server::follow)
140 .add_message_handler(Server::unfollow)
141 .add_message_handler(Server::update_followers)
142 .add_request_handler(Server::get_channel_messages);
143
144 Arc::new(server)
145 }
146
147 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
148 where
149 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
150 Fut: 'static + Send + Future<Output = Result<()>>,
151 M: EnvelopedMessage,
152 {
153 let prev_handler = self.handlers.insert(
154 TypeId::of::<M>(),
155 Box::new(move |server, envelope| {
156 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
157 let span = info_span!(
158 "handle message",
159 payload_type = envelope.payload_type_name()
160 );
161 let future = (handler)(server, *envelope);
162 async move {
163 if let Err(error) = future.await {
164 tracing::error!(%error, "error handling message");
165 }
166 }
167 .instrument(span)
168 .boxed()
169 }),
170 );
171 if prev_handler.is_some() {
172 panic!("registered a handler for the same message twice");
173 }
174 self
175 }
176
177 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
178 where
179 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
180 Fut: 'static + Send + Future<Output = Result<M::Response>>,
181 M: RequestMessage,
182 {
183 self.add_message_handler(move |server, envelope| {
184 let receipt = envelope.receipt();
185 let response = (handler)(server.clone(), envelope);
186 async move {
187 match response.await {
188 Ok(response) => {
189 server.peer.respond(receipt, response)?;
190 Ok(())
191 }
192 Err(error) => {
193 server.peer.respond_with_error(
194 receipt,
195 proto::Error {
196 message: error.to_string(),
197 },
198 )?;
199 Err(error)
200 }
201 }
202 }
203 })
204 }
205
206 /// Handle a request while holding a lock to the store. This is useful when we're registering
207 /// a connection but we want to respond on the connection before anybody else can send on it.
208 fn add_sync_request_handler<F, M>(&mut self, handler: F) -> &mut Self
209 where
210 F: 'static
211 + Send
212 + Sync
213 + Fn(Arc<Self>, &mut Store, TypedEnvelope<M>) -> Result<M::Response>,
214 M: RequestMessage,
215 {
216 let handler = Arc::new(handler);
217 self.add_message_handler(move |server, envelope| {
218 let receipt = envelope.receipt();
219 let handler = handler.clone();
220 async move {
221 let mut store = server.state_mut().await;
222 let response = (handler)(server.clone(), &mut *store, envelope);
223 match response {
224 Ok(response) => {
225 server.peer.respond(receipt, response)?;
226 Ok(())
227 }
228 Err(error) => {
229 server.peer.respond_with_error(
230 receipt,
231 proto::Error {
232 message: error.to_string(),
233 },
234 )?;
235 Err(error)
236 }
237 }
238 }
239 })
240 }
241
242 pub fn handle_connection<E: Executor>(
243 self: &Arc<Self>,
244 connection: Connection,
245 address: String,
246 user_id: UserId,
247 mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
248 executor: E,
249 ) -> impl Future<Output = ()> {
250 let mut this = self.clone();
251 let span = info_span!("handle connection", %user_id, %address);
252 async move {
253 let (connection_id, handle_io, mut incoming_rx) = this
254 .peer
255 .add_connection(connection, {
256 let executor = executor.clone();
257 move |duration| {
258 let timer = executor.sleep(duration);
259 async move {
260 timer.await;
261 }
262 }
263 })
264 .await;
265
266 tracing::info!(%user_id, %connection_id, %address, "connection opened");
267
268 if let Some(send_connection_id) = send_connection_id.as_mut() {
269 let _ = send_connection_id.send(connection_id).await;
270 }
271
272 {
273 let mut state = this.state_mut().await;
274 state.add_connection(connection_id, user_id);
275 this.update_contacts_for_users(&*state, &[user_id]);
276 }
277
278 let handle_io = handle_io.fuse();
279 futures::pin_mut!(handle_io);
280 loop {
281 let next_message = incoming_rx.next().fuse();
282 futures::pin_mut!(next_message);
283 futures::select_biased! {
284 result = handle_io => {
285 if let Err(error) = result {
286 tracing::error!(%error, "error handling I/O");
287 }
288 break;
289 }
290 message = next_message => {
291 if let Some(message) = message {
292 let type_name = message.payload_type_name();
293 let span = tracing::info_span!("receive message", %user_id, %connection_id, %address, type_name);
294 async {
295 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
296 let notifications = this.notifications.clone();
297 let is_background = message.is_background();
298 let handle_message = (handler)(this.clone(), message);
299 let handle_message = async move {
300 handle_message.await;
301 if let Some(mut notifications) = notifications {
302 let _ = notifications.send(()).await;
303 }
304 };
305 if is_background {
306 executor.spawn_detached(handle_message);
307 } else {
308 handle_message.await;
309 }
310 } else {
311 tracing::error!("no message handler");
312 }
313 }.instrument(span).await;
314 } else {
315 tracing::info!(%user_id, %connection_id, %address, "connection closed");
316 break;
317 }
318 }
319 }
320 }
321
322 if let Err(error) = this.sign_out(connection_id).await {
323 tracing::error!(%error, "error signing out");
324 }
325 }.instrument(span)
326 }
327
328 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
329 self.peer.disconnect(connection_id);
330 let mut state = self.state_mut().await;
331 let removed_connection = state.remove_connection(connection_id)?;
332
333 for (project_id, project) in removed_connection.hosted_projects {
334 if let Some(share) = project.share {
335 broadcast(
336 connection_id,
337 share.guests.keys().copied().collect(),
338 |conn_id| {
339 self.peer
340 .send(conn_id, proto::UnshareProject { project_id })
341 },
342 );
343 }
344 }
345
346 for (project_id, peer_ids) in removed_connection.guest_project_ids {
347 broadcast(connection_id, peer_ids, |conn_id| {
348 self.peer.send(
349 conn_id,
350 proto::RemoveProjectCollaborator {
351 project_id,
352 peer_id: connection_id.0,
353 },
354 )
355 });
356 }
357
358 self.update_contacts_for_users(&*state, removed_connection.contact_ids.iter());
359 Ok(())
360 }
361
362 async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> Result<proto::Ack> {
363 Ok(proto::Ack {})
364 }
365
366 async fn register_project(
367 self: Arc<Server>,
368 request: TypedEnvelope<proto::RegisterProject>,
369 ) -> Result<proto::RegisterProjectResponse> {
370 let project_id = {
371 let mut state = self.state_mut().await;
372 let user_id = state.user_id_for_connection(request.sender_id)?;
373 state.register_project(request.sender_id, user_id)
374 };
375 Ok(proto::RegisterProjectResponse { project_id })
376 }
377
378 async fn unregister_project(
379 self: Arc<Server>,
380 request: TypedEnvelope<proto::UnregisterProject>,
381 ) -> Result<()> {
382 let mut state = self.state_mut().await;
383 let project = state.unregister_project(request.payload.project_id, request.sender_id)?;
384 self.update_contacts_for_users(&*state, &project.authorized_user_ids());
385 Ok(())
386 }
387
388 async fn share_project(
389 self: Arc<Server>,
390 request: TypedEnvelope<proto::ShareProject>,
391 ) -> Result<proto::Ack> {
392 let mut state = self.state_mut().await;
393 let project = state.share_project(request.payload.project_id, request.sender_id)?;
394 self.update_contacts_for_users(&mut *state, &project.authorized_user_ids);
395 Ok(proto::Ack {})
396 }
397
398 async fn unshare_project(
399 self: Arc<Server>,
400 request: TypedEnvelope<proto::UnshareProject>,
401 ) -> Result<()> {
402 let project_id = request.payload.project_id;
403 let mut state = self.state_mut().await;
404 let project = state.unshare_project(project_id, request.sender_id)?;
405 broadcast(request.sender_id, project.connection_ids, |conn_id| {
406 self.peer
407 .send(conn_id, proto::UnshareProject { project_id })
408 });
409 self.update_contacts_for_users(&mut *state, &project.authorized_user_ids);
410 Ok(())
411 }
412
413 fn join_project(
414 self: Arc<Server>,
415 state: &mut Store,
416 request: TypedEnvelope<proto::JoinProject>,
417 ) -> Result<proto::JoinProjectResponse> {
418 let project_id = request.payload.project_id;
419
420 let user_id = state.user_id_for_connection(request.sender_id)?;
421 let (response, connection_ids, contact_user_ids) = state
422 .join_project(request.sender_id, user_id, project_id)
423 .and_then(|joined| {
424 let share = joined.project.share()?;
425 let peer_count = share.guests.len();
426 let mut collaborators = Vec::with_capacity(peer_count);
427 collaborators.push(proto::Collaborator {
428 peer_id: joined.project.host_connection_id.0,
429 replica_id: 0,
430 user_id: joined.project.host_user_id.to_proto(),
431 });
432 let worktrees = share
433 .worktrees
434 .iter()
435 .filter_map(|(id, shared_worktree)| {
436 let worktree = joined.project.worktrees.get(&id)?;
437 Some(proto::Worktree {
438 id: *id,
439 root_name: worktree.root_name.clone(),
440 entries: shared_worktree.entries.values().cloned().collect(),
441 diagnostic_summaries: shared_worktree
442 .diagnostic_summaries
443 .values()
444 .cloned()
445 .collect(),
446 visible: worktree.visible,
447 })
448 })
449 .collect();
450 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
451 if *peer_conn_id != request.sender_id {
452 collaborators.push(proto::Collaborator {
453 peer_id: peer_conn_id.0,
454 replica_id: *peer_replica_id as u32,
455 user_id: peer_user_id.to_proto(),
456 });
457 }
458 }
459 let response = proto::JoinProjectResponse {
460 worktrees,
461 replica_id: joined.replica_id as u32,
462 collaborators,
463 language_servers: joined.project.language_servers.clone(),
464 };
465 let connection_ids = joined.project.connection_ids();
466 let contact_user_ids = joined.project.authorized_user_ids();
467 Ok((response, connection_ids, contact_user_ids))
468 })?;
469
470 broadcast(request.sender_id, connection_ids, |conn_id| {
471 self.peer.send(
472 conn_id,
473 proto::AddProjectCollaborator {
474 project_id,
475 collaborator: Some(proto::Collaborator {
476 peer_id: request.sender_id.0,
477 replica_id: response.replica_id,
478 user_id: user_id.to_proto(),
479 }),
480 },
481 )
482 });
483 self.update_contacts_for_users(state, &contact_user_ids);
484 Ok(response)
485 }
486
487 async fn leave_project(
488 self: Arc<Server>,
489 request: TypedEnvelope<proto::LeaveProject>,
490 ) -> Result<()> {
491 let sender_id = request.sender_id;
492 let project_id = request.payload.project_id;
493 let mut state = self.state_mut().await;
494 let worktree = state.leave_project(sender_id, project_id)?;
495 broadcast(sender_id, worktree.connection_ids, |conn_id| {
496 self.peer.send(
497 conn_id,
498 proto::RemoveProjectCollaborator {
499 project_id,
500 peer_id: sender_id.0,
501 },
502 )
503 });
504 self.update_contacts_for_users(&*state, &worktree.authorized_user_ids);
505 Ok(())
506 }
507
508 async fn register_worktree(
509 self: Arc<Server>,
510 request: TypedEnvelope<proto::RegisterWorktree>,
511 ) -> Result<proto::Ack> {
512 let mut contact_user_ids = HashSet::default();
513 for github_login in &request.payload.authorized_logins {
514 let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
515 contact_user_ids.insert(contact_user_id);
516 }
517
518 let mut state = self.state_mut().await;
519 let host_user_id = state.user_id_for_connection(request.sender_id)?;
520 contact_user_ids.insert(host_user_id);
521
522 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
523 let guest_connection_ids = state
524 .read_project(request.payload.project_id, request.sender_id)?
525 .guest_connection_ids();
526 state.register_worktree(
527 request.payload.project_id,
528 request.payload.worktree_id,
529 request.sender_id,
530 Worktree {
531 authorized_user_ids: contact_user_ids.clone(),
532 root_name: request.payload.root_name.clone(),
533 visible: request.payload.visible,
534 },
535 )?;
536
537 broadcast(request.sender_id, guest_connection_ids, |connection_id| {
538 self.peer
539 .forward_send(request.sender_id, connection_id, request.payload.clone())
540 });
541 self.update_contacts_for_users(&*state, &contact_user_ids);
542 Ok(proto::Ack {})
543 }
544
545 async fn unregister_worktree(
546 self: Arc<Server>,
547 request: TypedEnvelope<proto::UnregisterWorktree>,
548 ) -> Result<()> {
549 let project_id = request.payload.project_id;
550 let worktree_id = request.payload.worktree_id;
551 let mut state = self.state_mut().await;
552 let (worktree, guest_connection_ids) =
553 state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
554 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
555 self.peer.send(
556 conn_id,
557 proto::UnregisterWorktree {
558 project_id,
559 worktree_id,
560 },
561 )
562 });
563 self.update_contacts_for_users(&*state, &worktree.authorized_user_ids);
564 Ok(())
565 }
566
567 async fn update_worktree(
568 self: Arc<Server>,
569 request: TypedEnvelope<proto::UpdateWorktree>,
570 ) -> Result<proto::Ack> {
571 let connection_ids = self.state_mut().await.update_worktree(
572 request.sender_id,
573 request.payload.project_id,
574 request.payload.worktree_id,
575 &request.payload.removed_entries,
576 &request.payload.updated_entries,
577 )?;
578
579 broadcast(request.sender_id, connection_ids, |connection_id| {
580 self.peer
581 .forward_send(request.sender_id, connection_id, request.payload.clone())
582 });
583
584 Ok(proto::Ack {})
585 }
586
587 async fn update_diagnostic_summary(
588 self: Arc<Server>,
589 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
590 ) -> Result<()> {
591 let summary = request
592 .payload
593 .summary
594 .clone()
595 .ok_or_else(|| anyhow!("invalid summary"))?;
596 let receiver_ids = self.state_mut().await.update_diagnostic_summary(
597 request.payload.project_id,
598 request.payload.worktree_id,
599 request.sender_id,
600 summary,
601 )?;
602
603 broadcast(request.sender_id, receiver_ids, |connection_id| {
604 self.peer
605 .forward_send(request.sender_id, connection_id, request.payload.clone())
606 });
607 Ok(())
608 }
609
610 async fn start_language_server(
611 self: Arc<Server>,
612 request: TypedEnvelope<proto::StartLanguageServer>,
613 ) -> Result<()> {
614 let receiver_ids = self.state_mut().await.start_language_server(
615 request.payload.project_id,
616 request.sender_id,
617 request
618 .payload
619 .server
620 .clone()
621 .ok_or_else(|| anyhow!("invalid language server"))?,
622 )?;
623 broadcast(request.sender_id, receiver_ids, |connection_id| {
624 self.peer
625 .forward_send(request.sender_id, connection_id, request.payload.clone())
626 });
627 Ok(())
628 }
629
630 async fn update_language_server(
631 self: Arc<Server>,
632 request: TypedEnvelope<proto::UpdateLanguageServer>,
633 ) -> Result<()> {
634 let receiver_ids = self
635 .state()
636 .await
637 .project_connection_ids(request.payload.project_id, request.sender_id)?;
638 broadcast(request.sender_id, receiver_ids, |connection_id| {
639 self.peer
640 .forward_send(request.sender_id, connection_id, request.payload.clone())
641 });
642 Ok(())
643 }
644
645 async fn forward_project_request<T>(
646 self: Arc<Server>,
647 request: TypedEnvelope<T>,
648 ) -> Result<T::Response>
649 where
650 T: EntityMessage + RequestMessage,
651 {
652 let host_connection_id = self
653 .state()
654 .await
655 .read_project(request.payload.remote_entity_id(), request.sender_id)?
656 .host_connection_id;
657 Ok(self
658 .peer
659 .forward_request(request.sender_id, host_connection_id, request.payload)
660 .await?)
661 }
662
663 async fn save_buffer(
664 self: Arc<Server>,
665 request: TypedEnvelope<proto::SaveBuffer>,
666 ) -> Result<proto::BufferSaved> {
667 let host = self
668 .state()
669 .await
670 .read_project(request.payload.project_id, request.sender_id)?
671 .host_connection_id;
672 let response = self
673 .peer
674 .forward_request(request.sender_id, host, request.payload.clone())
675 .await?;
676
677 let mut guests = self
678 .state()
679 .await
680 .read_project(request.payload.project_id, request.sender_id)?
681 .connection_ids();
682 guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
683 broadcast(host, guests, |conn_id| {
684 self.peer.forward_send(host, conn_id, response.clone())
685 });
686
687 Ok(response)
688 }
689
690 async fn update_buffer(
691 self: Arc<Server>,
692 request: TypedEnvelope<proto::UpdateBuffer>,
693 ) -> Result<proto::Ack> {
694 let receiver_ids = self
695 .state()
696 .await
697 .project_connection_ids(request.payload.project_id, request.sender_id)?;
698 broadcast(request.sender_id, receiver_ids, |connection_id| {
699 self.peer
700 .forward_send(request.sender_id, connection_id, request.payload.clone())
701 });
702 Ok(proto::Ack {})
703 }
704
705 async fn update_buffer_file(
706 self: Arc<Server>,
707 request: TypedEnvelope<proto::UpdateBufferFile>,
708 ) -> Result<()> {
709 let receiver_ids = self
710 .state()
711 .await
712 .project_connection_ids(request.payload.project_id, request.sender_id)?;
713 broadcast(request.sender_id, receiver_ids, |connection_id| {
714 self.peer
715 .forward_send(request.sender_id, connection_id, request.payload.clone())
716 });
717 Ok(())
718 }
719
720 async fn buffer_reloaded(
721 self: Arc<Server>,
722 request: TypedEnvelope<proto::BufferReloaded>,
723 ) -> Result<()> {
724 let receiver_ids = self
725 .state()
726 .await
727 .project_connection_ids(request.payload.project_id, request.sender_id)?;
728 broadcast(request.sender_id, receiver_ids, |connection_id| {
729 self.peer
730 .forward_send(request.sender_id, connection_id, request.payload.clone())
731 });
732 Ok(())
733 }
734
735 async fn buffer_saved(
736 self: Arc<Server>,
737 request: TypedEnvelope<proto::BufferSaved>,
738 ) -> Result<()> {
739 let receiver_ids = self
740 .state()
741 .await
742 .project_connection_ids(request.payload.project_id, request.sender_id)?;
743 broadcast(request.sender_id, receiver_ids, |connection_id| {
744 self.peer
745 .forward_send(request.sender_id, connection_id, request.payload.clone())
746 });
747 Ok(())
748 }
749
750 async fn follow(
751 self: Arc<Self>,
752 request: TypedEnvelope<proto::Follow>,
753 ) -> Result<proto::FollowResponse> {
754 let leader_id = ConnectionId(request.payload.leader_id);
755 let follower_id = request.sender_id;
756 if !self
757 .state()
758 .await
759 .project_connection_ids(request.payload.project_id, follower_id)?
760 .contains(&leader_id)
761 {
762 Err(anyhow!("no such peer"))?;
763 }
764 let mut response = self
765 .peer
766 .forward_request(request.sender_id, leader_id, request.payload)
767 .await?;
768 response
769 .views
770 .retain(|view| view.leader_id != Some(follower_id.0));
771 Ok(response)
772 }
773
774 async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
775 let leader_id = ConnectionId(request.payload.leader_id);
776 if !self
777 .state()
778 .await
779 .project_connection_ids(request.payload.project_id, request.sender_id)?
780 .contains(&leader_id)
781 {
782 Err(anyhow!("no such peer"))?;
783 }
784 self.peer
785 .forward_send(request.sender_id, leader_id, request.payload)?;
786 Ok(())
787 }
788
789 async fn update_followers(
790 self: Arc<Self>,
791 request: TypedEnvelope<proto::UpdateFollowers>,
792 ) -> Result<()> {
793 let connection_ids = self
794 .state()
795 .await
796 .project_connection_ids(request.payload.project_id, request.sender_id)?;
797 let leader_id = request
798 .payload
799 .variant
800 .as_ref()
801 .and_then(|variant| match variant {
802 proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
803 proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
804 proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
805 });
806 for follower_id in &request.payload.follower_ids {
807 let follower_id = ConnectionId(*follower_id);
808 if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
809 self.peer
810 .forward_send(request.sender_id, follower_id, request.payload.clone())?;
811 }
812 }
813 Ok(())
814 }
815
816 async fn get_channels(
817 self: Arc<Server>,
818 request: TypedEnvelope<proto::GetChannels>,
819 ) -> Result<proto::GetChannelsResponse> {
820 let user_id = self
821 .state()
822 .await
823 .user_id_for_connection(request.sender_id)?;
824 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
825 Ok(proto::GetChannelsResponse {
826 channels: channels
827 .into_iter()
828 .map(|chan| proto::Channel {
829 id: chan.id.to_proto(),
830 name: chan.name,
831 })
832 .collect(),
833 })
834 }
835
836 async fn get_users(
837 self: Arc<Server>,
838 request: TypedEnvelope<proto::GetUsers>,
839 ) -> Result<proto::GetUsersResponse> {
840 let user_ids = request
841 .payload
842 .user_ids
843 .into_iter()
844 .map(UserId::from_proto)
845 .collect();
846 let users = self
847 .app_state
848 .db
849 .get_users_by_ids(user_ids)
850 .await?
851 .into_iter()
852 .map(|user| proto::User {
853 id: user.id.to_proto(),
854 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
855 github_login: user.github_login,
856 })
857 .collect();
858 Ok(proto::GetUsersResponse { users })
859 }
860
861 #[instrument(skip(self, state, user_ids))]
862 fn update_contacts_for_users<'a>(
863 self: &Arc<Self>,
864 state: &Store,
865 user_ids: impl IntoIterator<Item = &'a UserId>,
866 ) {
867 for user_id in user_ids {
868 let contacts = state.contacts_for_user(*user_id);
869 for connection_id in state.connection_ids_for_user(*user_id) {
870 self.peer
871 .send(
872 connection_id,
873 proto::UpdateContacts {
874 contacts: contacts.clone(),
875 },
876 )
877 .trace_err();
878 }
879 }
880 }
881
882 async fn join_channel(
883 self: Arc<Self>,
884 request: TypedEnvelope<proto::JoinChannel>,
885 ) -> Result<proto::JoinChannelResponse> {
886 let user_id = self
887 .state()
888 .await
889 .user_id_for_connection(request.sender_id)?;
890 let channel_id = ChannelId::from_proto(request.payload.channel_id);
891 if !self
892 .app_state
893 .db
894 .can_user_access_channel(user_id, channel_id)
895 .await?
896 {
897 Err(anyhow!("access denied"))?;
898 }
899
900 self.state_mut()
901 .await
902 .join_channel(request.sender_id, channel_id);
903 let messages = self
904 .app_state
905 .db
906 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
907 .await?
908 .into_iter()
909 .map(|msg| proto::ChannelMessage {
910 id: msg.id.to_proto(),
911 body: msg.body,
912 timestamp: msg.sent_at.unix_timestamp() as u64,
913 sender_id: msg.sender_id.to_proto(),
914 nonce: Some(msg.nonce.as_u128().into()),
915 })
916 .collect::<Vec<_>>();
917 Ok(proto::JoinChannelResponse {
918 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
919 messages,
920 })
921 }
922
923 async fn leave_channel(
924 self: Arc<Self>,
925 request: TypedEnvelope<proto::LeaveChannel>,
926 ) -> Result<()> {
927 let user_id = self
928 .state()
929 .await
930 .user_id_for_connection(request.sender_id)?;
931 let channel_id = ChannelId::from_proto(request.payload.channel_id);
932 if !self
933 .app_state
934 .db
935 .can_user_access_channel(user_id, channel_id)
936 .await?
937 {
938 Err(anyhow!("access denied"))?;
939 }
940
941 self.state_mut()
942 .await
943 .leave_channel(request.sender_id, channel_id);
944
945 Ok(())
946 }
947
948 async fn send_channel_message(
949 self: Arc<Self>,
950 request: TypedEnvelope<proto::SendChannelMessage>,
951 ) -> Result<proto::SendChannelMessageResponse> {
952 let channel_id = ChannelId::from_proto(request.payload.channel_id);
953 let user_id;
954 let connection_ids;
955 {
956 let state = self.state().await;
957 user_id = state.user_id_for_connection(request.sender_id)?;
958 connection_ids = state.channel_connection_ids(channel_id)?;
959 }
960
961 // Validate the message body.
962 let body = request.payload.body.trim().to_string();
963 if body.len() > MAX_MESSAGE_LEN {
964 return Err(anyhow!("message is too long"))?;
965 }
966 if body.is_empty() {
967 return Err(anyhow!("message can't be blank"))?;
968 }
969
970 let timestamp = OffsetDateTime::now_utc();
971 let nonce = request
972 .payload
973 .nonce
974 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
975
976 let message_id = self
977 .app_state
978 .db
979 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
980 .await?
981 .to_proto();
982 let message = proto::ChannelMessage {
983 sender_id: user_id.to_proto(),
984 id: message_id,
985 body,
986 timestamp: timestamp.unix_timestamp() as u64,
987 nonce: Some(nonce),
988 };
989 broadcast(request.sender_id, connection_ids, |conn_id| {
990 self.peer.send(
991 conn_id,
992 proto::ChannelMessageSent {
993 channel_id: channel_id.to_proto(),
994 message: Some(message.clone()),
995 },
996 )
997 });
998 Ok(proto::SendChannelMessageResponse {
999 message: Some(message),
1000 })
1001 }
1002
1003 async fn get_channel_messages(
1004 self: Arc<Self>,
1005 request: TypedEnvelope<proto::GetChannelMessages>,
1006 ) -> Result<proto::GetChannelMessagesResponse> {
1007 let user_id = self
1008 .state()
1009 .await
1010 .user_id_for_connection(request.sender_id)?;
1011 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1012 if !self
1013 .app_state
1014 .db
1015 .can_user_access_channel(user_id, channel_id)
1016 .await?
1017 {
1018 Err(anyhow!("access denied"))?;
1019 }
1020
1021 let messages = self
1022 .app_state
1023 .db
1024 .get_channel_messages(
1025 channel_id,
1026 MESSAGE_COUNT_PER_PAGE,
1027 Some(MessageId::from_proto(request.payload.before_message_id)),
1028 )
1029 .await?
1030 .into_iter()
1031 .map(|msg| proto::ChannelMessage {
1032 id: msg.id.to_proto(),
1033 body: msg.body,
1034 timestamp: msg.sent_at.unix_timestamp() as u64,
1035 sender_id: msg.sender_id.to_proto(),
1036 nonce: Some(msg.nonce.as_u128().into()),
1037 })
1038 .collect::<Vec<_>>();
1039
1040 Ok(proto::GetChannelMessagesResponse {
1041 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1042 messages,
1043 })
1044 }
1045
1046 async fn state<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1047 #[cfg(test)]
1048 tokio::task::yield_now().await;
1049 let guard = self.store.read().await;
1050 #[cfg(test)]
1051 tokio::task::yield_now().await;
1052 StoreReadGuard {
1053 guard,
1054 _not_send: PhantomData,
1055 }
1056 }
1057
1058 async fn state_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1059 #[cfg(test)]
1060 tokio::task::yield_now().await;
1061 let guard = self.store.write().await;
1062 #[cfg(test)]
1063 tokio::task::yield_now().await;
1064 StoreWriteGuard {
1065 guard,
1066 _not_send: PhantomData,
1067 }
1068 }
1069}
1070
1071impl<'a> Deref for StoreReadGuard<'a> {
1072 type Target = Store;
1073
1074 fn deref(&self) -> &Self::Target {
1075 &*self.guard
1076 }
1077}
1078
1079impl<'a> Deref for StoreWriteGuard<'a> {
1080 type Target = Store;
1081
1082 fn deref(&self) -> &Self::Target {
1083 &*self.guard
1084 }
1085}
1086
1087impl<'a> DerefMut for StoreWriteGuard<'a> {
1088 fn deref_mut(&mut self) -> &mut Self::Target {
1089 &mut *self.guard
1090 }
1091}
1092
1093impl<'a> Drop for StoreWriteGuard<'a> {
1094 fn drop(&mut self) {
1095 #[cfg(test)]
1096 self.check_invariants();
1097
1098 let metrics = self.metrics();
1099 tracing::info!(
1100 connections = metrics.connections,
1101 registered_projects = metrics.registered_projects,
1102 shared_projects = metrics.shared_projects,
1103 collaborators_per_project = metrics.collaborators_per_project,
1104 "metrics"
1105 );
1106 }
1107}
1108
1109impl Executor for RealExecutor {
1110 type Sleep = Sleep;
1111
1112 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1113 tokio::task::spawn(future);
1114 }
1115
1116 fn sleep(&self, duration: Duration) -> Self::Sleep {
1117 tokio::time::sleep(duration)
1118 }
1119}
1120
1121#[instrument(skip(f))]
1122fn broadcast<F>(sender_id: ConnectionId, receiver_ids: Vec<ConnectionId>, mut f: F)
1123where
1124 F: FnMut(ConnectionId) -> anyhow::Result<()>,
1125{
1126 for receiver_id in receiver_ids {
1127 if receiver_id != sender_id {
1128 f(receiver_id).trace_err();
1129 }
1130 }
1131}
1132
1133lazy_static! {
1134 static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1135}
1136
1137pub struct ProtocolVersion(u32);
1138
1139impl Header for ProtocolVersion {
1140 fn name() -> &'static HeaderName {
1141 &ZED_PROTOCOL_VERSION
1142 }
1143
1144 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1145 where
1146 Self: Sized,
1147 I: Iterator<Item = &'i axum::http::HeaderValue>,
1148 {
1149 let version = values
1150 .next()
1151 .ok_or_else(|| axum::headers::Error::invalid())?
1152 .to_str()
1153 .map_err(|_| axum::headers::Error::invalid())?
1154 .parse()
1155 .map_err(|_| axum::headers::Error::invalid())?;
1156 Ok(Self(version))
1157 }
1158
1159 fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1160 values.extend([self.0.to_string().parse().unwrap()]);
1161 }
1162}
1163
1164pub fn routes(app_state: Arc<AppState>) -> Router<Body> {
1165 let server = Server::new(app_state.clone(), None);
1166 Router::new()
1167 .route("/rpc", get(handle_websocket_request))
1168 .layer(
1169 ServiceBuilder::new()
1170 .layer(Extension(app_state))
1171 .layer(middleware::from_fn(auth::validate_header))
1172 .layer(Extension(server)),
1173 )
1174}
1175
1176pub async fn handle_websocket_request(
1177 TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1178 ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1179 Extension(server): Extension<Arc<Server>>,
1180 Extension(user_id): Extension<UserId>,
1181 ws: WebSocketUpgrade,
1182) -> Response {
1183 if protocol_version != rpc::PROTOCOL_VERSION {
1184 return (
1185 StatusCode::UPGRADE_REQUIRED,
1186 "client must be upgraded".to_string(),
1187 )
1188 .into_response();
1189 }
1190 let socket_address = socket_address.to_string();
1191 ws.on_upgrade(move |socket| {
1192 let socket = socket
1193 .map_ok(to_tungstenite_message)
1194 .err_into()
1195 .with(|message| async move { Ok(to_axum_message(message)) });
1196 let connection = Connection::new(Box::pin(socket));
1197 server.handle_connection(connection, socket_address, user_id, None, RealExecutor)
1198 })
1199}
1200
1201fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1202 match message {
1203 TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1204 TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1205 TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1206 TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1207 TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1208 code: frame.code.into(),
1209 reason: frame.reason,
1210 })),
1211 }
1212}
1213
1214fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1215 match message {
1216 AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1217 AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1218 AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1219 AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1220 AxumMessage::Close(frame) => {
1221 TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1222 code: frame.code.into(),
1223 reason: frame.reason,
1224 }))
1225 }
1226 }
1227}
1228
1229pub trait ResultExt {
1230 type Ok;
1231
1232 fn trace_err(self) -> Option<Self::Ok>;
1233}
1234
1235impl<T, E> ResultExt for Result<T, E>
1236where
1237 E: std::fmt::Debug,
1238{
1239 type Ok = T;
1240
1241 fn trace_err(self) -> Option<T> {
1242 match self {
1243 Ok(value) => Some(value),
1244 Err(error) => {
1245 tracing::error!("{:?}", error);
1246 None
1247 }
1248 }
1249 }
1250}
1251
1252#[cfg(test)]
1253mod tests {
1254 use super::*;
1255 use crate::{
1256 db::{tests::TestDb, UserId},
1257 AppState,
1258 };
1259 use ::rpc::Peer;
1260 use client::{
1261 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1262 EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1263 };
1264 use collections::BTreeMap;
1265 use editor::{
1266 self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1267 ToOffset, ToggleCodeActions, Undo,
1268 };
1269 use gpui::{
1270 executor::{self, Deterministic},
1271 geometry::vector::vec2f,
1272 ModelHandle, TestAppContext, ViewHandle,
1273 };
1274 use language::{
1275 range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1276 LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1277 };
1278 use lsp::{self, FakeLanguageServer};
1279 use parking_lot::Mutex;
1280 use project::{
1281 fs::{FakeFs, Fs as _},
1282 search::SearchQuery,
1283 worktree::WorktreeHandle,
1284 DiagnosticSummary, Project, ProjectPath, WorktreeId,
1285 };
1286 use rand::prelude::*;
1287 use rpc::PeerId;
1288 use serde_json::json;
1289 use settings::Settings;
1290 use sqlx::types::time::OffsetDateTime;
1291 use std::{
1292 env,
1293 ops::Deref,
1294 path::{Path, PathBuf},
1295 rc::Rc,
1296 sync::{
1297 atomic::{AtomicBool, Ordering::SeqCst},
1298 Arc,
1299 },
1300 time::Duration,
1301 };
1302 use theme::ThemeRegistry;
1303 use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
1304
1305 #[cfg(test)]
1306 #[ctor::ctor]
1307 fn init_logger() {
1308 if std::env::var("RUST_LOG").is_ok() {
1309 env_logger::init();
1310 }
1311 }
1312
1313 #[gpui::test(iterations = 10)]
1314 async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1315 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1316 let lang_registry = Arc::new(LanguageRegistry::test());
1317 let fs = FakeFs::new(cx_a.background());
1318 cx_a.foreground().forbid_parking();
1319
1320 // Connect to a server as 2 clients.
1321 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1322 let client_a = server.create_client(cx_a, "user_a").await;
1323 let client_b = server.create_client(cx_b, "user_b").await;
1324
1325 // Share a project as client A
1326 fs.insert_tree(
1327 "/a",
1328 json!({
1329 ".zed.toml": r#"collaborators = ["user_b"]"#,
1330 "a.txt": "a-contents",
1331 "b.txt": "b-contents",
1332 }),
1333 )
1334 .await;
1335 let project_a = cx_a.update(|cx| {
1336 Project::local(
1337 client_a.clone(),
1338 client_a.user_store.clone(),
1339 lang_registry.clone(),
1340 fs.clone(),
1341 cx,
1342 )
1343 });
1344 let (worktree_a, _) = project_a
1345 .update(cx_a, |p, cx| {
1346 p.find_or_create_local_worktree("/a", true, cx)
1347 })
1348 .await
1349 .unwrap();
1350 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1351 worktree_a
1352 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1353 .await;
1354 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1355 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1356
1357 // Join that project as client B
1358 let project_b = Project::remote(
1359 project_id,
1360 client_b.clone(),
1361 client_b.user_store.clone(),
1362 lang_registry.clone(),
1363 fs.clone(),
1364 &mut cx_b.to_async(),
1365 )
1366 .await
1367 .unwrap();
1368
1369 let replica_id_b = project_b.read_with(cx_b, |project, _| {
1370 assert_eq!(
1371 project
1372 .collaborators()
1373 .get(&client_a.peer_id)
1374 .unwrap()
1375 .user
1376 .github_login,
1377 "user_a"
1378 );
1379 project.replica_id()
1380 });
1381 project_a
1382 .condition(&cx_a, |tree, _| {
1383 tree.collaborators()
1384 .get(&client_b.peer_id)
1385 .map_or(false, |collaborator| {
1386 collaborator.replica_id == replica_id_b
1387 && collaborator.user.github_login == "user_b"
1388 })
1389 })
1390 .await;
1391
1392 // Open the same file as client B and client A.
1393 let buffer_b = project_b
1394 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1395 .await
1396 .unwrap();
1397 buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1398 project_a.read_with(cx_a, |project, cx| {
1399 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1400 });
1401 let buffer_a = project_a
1402 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1403 .await
1404 .unwrap();
1405
1406 let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1407
1408 // TODO
1409 // // Create a selection set as client B and see that selection set as client A.
1410 // buffer_a
1411 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1412 // .await;
1413
1414 // Edit the buffer as client B and see that edit as client A.
1415 editor_b.update(cx_b, |editor, cx| {
1416 editor.handle_input(&Input("ok, ".into()), cx)
1417 });
1418 buffer_a
1419 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1420 .await;
1421
1422 // TODO
1423 // // Remove the selection set as client B, see those selections disappear as client A.
1424 cx_b.update(move |_| drop(editor_b));
1425 // buffer_a
1426 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1427 // .await;
1428
1429 // Dropping the client B's project removes client B from client A's collaborators.
1430 cx_b.update(move |_| drop(project_b));
1431 project_a
1432 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1433 .await;
1434 }
1435
1436 #[gpui::test(iterations = 10)]
1437 async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1438 let lang_registry = Arc::new(LanguageRegistry::test());
1439 let fs = FakeFs::new(cx_a.background());
1440 cx_a.foreground().forbid_parking();
1441
1442 // Connect to a server as 2 clients.
1443 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1444 let client_a = server.create_client(cx_a, "user_a").await;
1445 let client_b = server.create_client(cx_b, "user_b").await;
1446
1447 // Share a project as client A
1448 fs.insert_tree(
1449 "/a",
1450 json!({
1451 ".zed.toml": r#"collaborators = ["user_b"]"#,
1452 "a.txt": "a-contents",
1453 "b.txt": "b-contents",
1454 }),
1455 )
1456 .await;
1457 let project_a = cx_a.update(|cx| {
1458 Project::local(
1459 client_a.clone(),
1460 client_a.user_store.clone(),
1461 lang_registry.clone(),
1462 fs.clone(),
1463 cx,
1464 )
1465 });
1466 let (worktree_a, _) = project_a
1467 .update(cx_a, |p, cx| {
1468 p.find_or_create_local_worktree("/a", true, cx)
1469 })
1470 .await
1471 .unwrap();
1472 worktree_a
1473 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1474 .await;
1475 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1476 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1477 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1478 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1479
1480 // Join that project as client B
1481 let project_b = Project::remote(
1482 project_id,
1483 client_b.clone(),
1484 client_b.user_store.clone(),
1485 lang_registry.clone(),
1486 fs.clone(),
1487 &mut cx_b.to_async(),
1488 )
1489 .await
1490 .unwrap();
1491 project_b
1492 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1493 .await
1494 .unwrap();
1495
1496 // Unshare the project as client A
1497 project_a.update(cx_a, |project, cx| project.unshare(cx));
1498 project_b
1499 .condition(cx_b, |project, _| project.is_read_only())
1500 .await;
1501 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1502 cx_b.update(|_| {
1503 drop(project_b);
1504 });
1505
1506 // Share the project again and ensure guests can still join.
1507 project_a
1508 .update(cx_a, |project, cx| project.share(cx))
1509 .await
1510 .unwrap();
1511 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1512
1513 let project_b2 = Project::remote(
1514 project_id,
1515 client_b.clone(),
1516 client_b.user_store.clone(),
1517 lang_registry.clone(),
1518 fs.clone(),
1519 &mut cx_b.to_async(),
1520 )
1521 .await
1522 .unwrap();
1523 project_b2
1524 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1525 .await
1526 .unwrap();
1527 }
1528
1529 #[gpui::test(iterations = 10)]
1530 async fn test_host_disconnect(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1531 let lang_registry = Arc::new(LanguageRegistry::test());
1532 let fs = FakeFs::new(cx_a.background());
1533 cx_a.foreground().forbid_parking();
1534
1535 // Connect to a server as 2 clients.
1536 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1537 let client_a = server.create_client(cx_a, "user_a").await;
1538 let client_b = server.create_client(cx_b, "user_b").await;
1539
1540 // Share a project as client A
1541 fs.insert_tree(
1542 "/a",
1543 json!({
1544 ".zed.toml": r#"collaborators = ["user_b"]"#,
1545 "a.txt": "a-contents",
1546 "b.txt": "b-contents",
1547 }),
1548 )
1549 .await;
1550 let project_a = cx_a.update(|cx| {
1551 Project::local(
1552 client_a.clone(),
1553 client_a.user_store.clone(),
1554 lang_registry.clone(),
1555 fs.clone(),
1556 cx,
1557 )
1558 });
1559 let (worktree_a, _) = project_a
1560 .update(cx_a, |p, cx| {
1561 p.find_or_create_local_worktree("/a", true, cx)
1562 })
1563 .await
1564 .unwrap();
1565 worktree_a
1566 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1567 .await;
1568 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1569 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1570 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1571 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1572
1573 // Join that project as client B
1574 let project_b = Project::remote(
1575 project_id,
1576 client_b.clone(),
1577 client_b.user_store.clone(),
1578 lang_registry.clone(),
1579 fs.clone(),
1580 &mut cx_b.to_async(),
1581 )
1582 .await
1583 .unwrap();
1584 project_b
1585 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1586 .await
1587 .unwrap();
1588
1589 // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
1590 server.disconnect_client(client_a.current_user_id(cx_a));
1591 cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
1592 project_a
1593 .condition(cx_a, |project, _| project.collaborators().is_empty())
1594 .await;
1595 project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
1596 project_b
1597 .condition(cx_b, |project, _| project.is_read_only())
1598 .await;
1599 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1600 cx_b.update(|_| {
1601 drop(project_b);
1602 });
1603
1604 // Await reconnection
1605 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1606
1607 // Share the project again and ensure guests can still join.
1608 project_a
1609 .update(cx_a, |project, cx| project.share(cx))
1610 .await
1611 .unwrap();
1612 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1613
1614 let project_b2 = Project::remote(
1615 project_id,
1616 client_b.clone(),
1617 client_b.user_store.clone(),
1618 lang_registry.clone(),
1619 fs.clone(),
1620 &mut cx_b.to_async(),
1621 )
1622 .await
1623 .unwrap();
1624 project_b2
1625 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1626 .await
1627 .unwrap();
1628 }
1629
1630 #[gpui::test(iterations = 10)]
1631 async fn test_propagate_saves_and_fs_changes(
1632 cx_a: &mut TestAppContext,
1633 cx_b: &mut TestAppContext,
1634 cx_c: &mut TestAppContext,
1635 ) {
1636 let lang_registry = Arc::new(LanguageRegistry::test());
1637 let fs = FakeFs::new(cx_a.background());
1638 cx_a.foreground().forbid_parking();
1639
1640 // Connect to a server as 3 clients.
1641 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1642 let client_a = server.create_client(cx_a, "user_a").await;
1643 let client_b = server.create_client(cx_b, "user_b").await;
1644 let client_c = server.create_client(cx_c, "user_c").await;
1645
1646 // Share a worktree as client A.
1647 fs.insert_tree(
1648 "/a",
1649 json!({
1650 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1651 "file1": "",
1652 "file2": ""
1653 }),
1654 )
1655 .await;
1656 let project_a = cx_a.update(|cx| {
1657 Project::local(
1658 client_a.clone(),
1659 client_a.user_store.clone(),
1660 lang_registry.clone(),
1661 fs.clone(),
1662 cx,
1663 )
1664 });
1665 let (worktree_a, _) = project_a
1666 .update(cx_a, |p, cx| {
1667 p.find_or_create_local_worktree("/a", true, cx)
1668 })
1669 .await
1670 .unwrap();
1671 worktree_a
1672 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1673 .await;
1674 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1675 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1676 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1677
1678 // Join that worktree as clients B and C.
1679 let project_b = Project::remote(
1680 project_id,
1681 client_b.clone(),
1682 client_b.user_store.clone(),
1683 lang_registry.clone(),
1684 fs.clone(),
1685 &mut cx_b.to_async(),
1686 )
1687 .await
1688 .unwrap();
1689 let project_c = Project::remote(
1690 project_id,
1691 client_c.clone(),
1692 client_c.user_store.clone(),
1693 lang_registry.clone(),
1694 fs.clone(),
1695 &mut cx_c.to_async(),
1696 )
1697 .await
1698 .unwrap();
1699 let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1700 let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1701
1702 // Open and edit a buffer as both guests B and C.
1703 let buffer_b = project_b
1704 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1705 .await
1706 .unwrap();
1707 let buffer_c = project_c
1708 .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1709 .await
1710 .unwrap();
1711 buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1712 buffer_c.update(cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1713
1714 // Open and edit that buffer as the host.
1715 let buffer_a = project_a
1716 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1717 .await
1718 .unwrap();
1719
1720 buffer_a
1721 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1722 .await;
1723 buffer_a.update(cx_a, |buf, cx| {
1724 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1725 });
1726
1727 // Wait for edits to propagate
1728 buffer_a
1729 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1730 .await;
1731 buffer_b
1732 .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1733 .await;
1734 buffer_c
1735 .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1736 .await;
1737
1738 // Edit the buffer as the host and concurrently save as guest B.
1739 let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1740 buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1741 save_b.await.unwrap();
1742 assert_eq!(
1743 fs.load("/a/file1".as_ref()).await.unwrap(),
1744 "hi-a, i-am-c, i-am-b, i-am-a"
1745 );
1746 buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1747 buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1748 buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1749
1750 worktree_a.flush_fs_events(cx_a).await;
1751
1752 // Make changes on host's file system, see those changes on guest worktrees.
1753 fs.rename(
1754 "/a/file1".as_ref(),
1755 "/a/file1-renamed".as_ref(),
1756 Default::default(),
1757 )
1758 .await
1759 .unwrap();
1760
1761 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1762 .await
1763 .unwrap();
1764 fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1765
1766 worktree_a
1767 .condition(&cx_a, |tree, _| {
1768 tree.paths()
1769 .map(|p| p.to_string_lossy())
1770 .collect::<Vec<_>>()
1771 == [".zed.toml", "file1-renamed", "file3", "file4"]
1772 })
1773 .await;
1774 worktree_b
1775 .condition(&cx_b, |tree, _| {
1776 tree.paths()
1777 .map(|p| p.to_string_lossy())
1778 .collect::<Vec<_>>()
1779 == [".zed.toml", "file1-renamed", "file3", "file4"]
1780 })
1781 .await;
1782 worktree_c
1783 .condition(&cx_c, |tree, _| {
1784 tree.paths()
1785 .map(|p| p.to_string_lossy())
1786 .collect::<Vec<_>>()
1787 == [".zed.toml", "file1-renamed", "file3", "file4"]
1788 })
1789 .await;
1790
1791 // Ensure buffer files are updated as well.
1792 buffer_a
1793 .condition(&cx_a, |buf, _| {
1794 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1795 })
1796 .await;
1797 buffer_b
1798 .condition(&cx_b, |buf, _| {
1799 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1800 })
1801 .await;
1802 buffer_c
1803 .condition(&cx_c, |buf, _| {
1804 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1805 })
1806 .await;
1807 }
1808
1809 #[gpui::test(iterations = 10)]
1810 async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1811 cx_a.foreground().forbid_parking();
1812 let lang_registry = Arc::new(LanguageRegistry::test());
1813 let fs = FakeFs::new(cx_a.background());
1814
1815 // Connect to a server as 2 clients.
1816 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1817 let client_a = server.create_client(cx_a, "user_a").await;
1818 let client_b = server.create_client(cx_b, "user_b").await;
1819
1820 // Share a project as client A
1821 fs.insert_tree(
1822 "/dir",
1823 json!({
1824 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1825 "a.txt": "a-contents",
1826 }),
1827 )
1828 .await;
1829
1830 let project_a = cx_a.update(|cx| {
1831 Project::local(
1832 client_a.clone(),
1833 client_a.user_store.clone(),
1834 lang_registry.clone(),
1835 fs.clone(),
1836 cx,
1837 )
1838 });
1839 let (worktree_a, _) = project_a
1840 .update(cx_a, |p, cx| {
1841 p.find_or_create_local_worktree("/dir", true, cx)
1842 })
1843 .await
1844 .unwrap();
1845 worktree_a
1846 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1847 .await;
1848 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1849 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1850 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1851
1852 // Join that project as client B
1853 let project_b = Project::remote(
1854 project_id,
1855 client_b.clone(),
1856 client_b.user_store.clone(),
1857 lang_registry.clone(),
1858 fs.clone(),
1859 &mut cx_b.to_async(),
1860 )
1861 .await
1862 .unwrap();
1863
1864 // Open a buffer as client B
1865 let buffer_b = project_b
1866 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1867 .await
1868 .unwrap();
1869
1870 buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1871 buffer_b.read_with(cx_b, |buf, _| {
1872 assert!(buf.is_dirty());
1873 assert!(!buf.has_conflict());
1874 });
1875
1876 buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
1877 buffer_b
1878 .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1879 .await;
1880 buffer_b.read_with(cx_b, |buf, _| {
1881 assert!(!buf.has_conflict());
1882 });
1883
1884 buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1885 buffer_b.read_with(cx_b, |buf, _| {
1886 assert!(buf.is_dirty());
1887 assert!(!buf.has_conflict());
1888 });
1889 }
1890
1891 #[gpui::test(iterations = 10)]
1892 async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1893 cx_a.foreground().forbid_parking();
1894 let lang_registry = Arc::new(LanguageRegistry::test());
1895 let fs = FakeFs::new(cx_a.background());
1896
1897 // Connect to a server as 2 clients.
1898 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1899 let client_a = server.create_client(cx_a, "user_a").await;
1900 let client_b = server.create_client(cx_b, "user_b").await;
1901
1902 // Share a project as client A
1903 fs.insert_tree(
1904 "/dir",
1905 json!({
1906 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1907 "a.txt": "a-contents",
1908 }),
1909 )
1910 .await;
1911
1912 let project_a = cx_a.update(|cx| {
1913 Project::local(
1914 client_a.clone(),
1915 client_a.user_store.clone(),
1916 lang_registry.clone(),
1917 fs.clone(),
1918 cx,
1919 )
1920 });
1921 let (worktree_a, _) = project_a
1922 .update(cx_a, |p, cx| {
1923 p.find_or_create_local_worktree("/dir", true, cx)
1924 })
1925 .await
1926 .unwrap();
1927 worktree_a
1928 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1929 .await;
1930 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1931 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1932 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1933
1934 // Join that project as client B
1935 let project_b = Project::remote(
1936 project_id,
1937 client_b.clone(),
1938 client_b.user_store.clone(),
1939 lang_registry.clone(),
1940 fs.clone(),
1941 &mut cx_b.to_async(),
1942 )
1943 .await
1944 .unwrap();
1945 let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1946
1947 // Open a buffer as client B
1948 let buffer_b = project_b
1949 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1950 .await
1951 .unwrap();
1952 buffer_b.read_with(cx_b, |buf, _| {
1953 assert!(!buf.is_dirty());
1954 assert!(!buf.has_conflict());
1955 });
1956
1957 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1958 .await
1959 .unwrap();
1960 buffer_b
1961 .condition(&cx_b, |buf, _| {
1962 buf.text() == "new contents" && !buf.is_dirty()
1963 })
1964 .await;
1965 buffer_b.read_with(cx_b, |buf, _| {
1966 assert!(!buf.has_conflict());
1967 });
1968 }
1969
1970 #[gpui::test(iterations = 10)]
1971 async fn test_editing_while_guest_opens_buffer(
1972 cx_a: &mut TestAppContext,
1973 cx_b: &mut TestAppContext,
1974 ) {
1975 cx_a.foreground().forbid_parking();
1976 let lang_registry = Arc::new(LanguageRegistry::test());
1977 let fs = FakeFs::new(cx_a.background());
1978
1979 // Connect to a server as 2 clients.
1980 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1981 let client_a = server.create_client(cx_a, "user_a").await;
1982 let client_b = server.create_client(cx_b, "user_b").await;
1983
1984 // Share a project as client A
1985 fs.insert_tree(
1986 "/dir",
1987 json!({
1988 ".zed.toml": r#"collaborators = ["user_b"]"#,
1989 "a.txt": "a-contents",
1990 }),
1991 )
1992 .await;
1993 let project_a = cx_a.update(|cx| {
1994 Project::local(
1995 client_a.clone(),
1996 client_a.user_store.clone(),
1997 lang_registry.clone(),
1998 fs.clone(),
1999 cx,
2000 )
2001 });
2002 let (worktree_a, _) = project_a
2003 .update(cx_a, |p, cx| {
2004 p.find_or_create_local_worktree("/dir", true, cx)
2005 })
2006 .await
2007 .unwrap();
2008 worktree_a
2009 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2010 .await;
2011 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2012 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2013 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2014
2015 // Join that project as client B
2016 let project_b = Project::remote(
2017 project_id,
2018 client_b.clone(),
2019 client_b.user_store.clone(),
2020 lang_registry.clone(),
2021 fs.clone(),
2022 &mut cx_b.to_async(),
2023 )
2024 .await
2025 .unwrap();
2026
2027 // Open a buffer as client A
2028 let buffer_a = project_a
2029 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2030 .await
2031 .unwrap();
2032
2033 // Start opening the same buffer as client B
2034 let buffer_b = cx_b
2035 .background()
2036 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2037
2038 // Edit the buffer as client A while client B is still opening it.
2039 cx_b.background().simulate_random_delay().await;
2040 buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "X", cx));
2041 cx_b.background().simulate_random_delay().await;
2042 buffer_a.update(cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
2043
2044 let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2045 let buffer_b = buffer_b.await.unwrap();
2046 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2047 }
2048
2049 #[gpui::test(iterations = 10)]
2050 async fn test_leaving_worktree_while_opening_buffer(
2051 cx_a: &mut TestAppContext,
2052 cx_b: &mut TestAppContext,
2053 ) {
2054 cx_a.foreground().forbid_parking();
2055 let lang_registry = Arc::new(LanguageRegistry::test());
2056 let fs = FakeFs::new(cx_a.background());
2057
2058 // Connect to a server as 2 clients.
2059 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2060 let client_a = server.create_client(cx_a, "user_a").await;
2061 let client_b = server.create_client(cx_b, "user_b").await;
2062
2063 // Share a project as client A
2064 fs.insert_tree(
2065 "/dir",
2066 json!({
2067 ".zed.toml": r#"collaborators = ["user_b"]"#,
2068 "a.txt": "a-contents",
2069 }),
2070 )
2071 .await;
2072 let project_a = cx_a.update(|cx| {
2073 Project::local(
2074 client_a.clone(),
2075 client_a.user_store.clone(),
2076 lang_registry.clone(),
2077 fs.clone(),
2078 cx,
2079 )
2080 });
2081 let (worktree_a, _) = project_a
2082 .update(cx_a, |p, cx| {
2083 p.find_or_create_local_worktree("/dir", true, cx)
2084 })
2085 .await
2086 .unwrap();
2087 worktree_a
2088 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2089 .await;
2090 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2091 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2092 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2093
2094 // Join that project as client B
2095 let project_b = Project::remote(
2096 project_id,
2097 client_b.clone(),
2098 client_b.user_store.clone(),
2099 lang_registry.clone(),
2100 fs.clone(),
2101 &mut cx_b.to_async(),
2102 )
2103 .await
2104 .unwrap();
2105
2106 // See that a guest has joined as client A.
2107 project_a
2108 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2109 .await;
2110
2111 // Begin opening a buffer as client B, but leave the project before the open completes.
2112 let buffer_b = cx_b
2113 .background()
2114 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2115 cx_b.update(|_| drop(project_b));
2116 drop(buffer_b);
2117
2118 // See that the guest has left.
2119 project_a
2120 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2121 .await;
2122 }
2123
2124 #[gpui::test(iterations = 10)]
2125 async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2126 cx_a.foreground().forbid_parking();
2127 let lang_registry = Arc::new(LanguageRegistry::test());
2128 let fs = FakeFs::new(cx_a.background());
2129
2130 // Connect to a server as 2 clients.
2131 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2132 let client_a = server.create_client(cx_a, "user_a").await;
2133 let client_b = server.create_client(cx_b, "user_b").await;
2134
2135 // Share a project as client A
2136 fs.insert_tree(
2137 "/a",
2138 json!({
2139 ".zed.toml": r#"collaborators = ["user_b"]"#,
2140 "a.txt": "a-contents",
2141 "b.txt": "b-contents",
2142 }),
2143 )
2144 .await;
2145 let project_a = cx_a.update(|cx| {
2146 Project::local(
2147 client_a.clone(),
2148 client_a.user_store.clone(),
2149 lang_registry.clone(),
2150 fs.clone(),
2151 cx,
2152 )
2153 });
2154 let (worktree_a, _) = project_a
2155 .update(cx_a, |p, cx| {
2156 p.find_or_create_local_worktree("/a", true, cx)
2157 })
2158 .await
2159 .unwrap();
2160 worktree_a
2161 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2162 .await;
2163 let project_id = project_a
2164 .update(cx_a, |project, _| project.next_remote_id())
2165 .await;
2166 project_a
2167 .update(cx_a, |project, cx| project.share(cx))
2168 .await
2169 .unwrap();
2170
2171 // Join that project as client B
2172 let _project_b = Project::remote(
2173 project_id,
2174 client_b.clone(),
2175 client_b.user_store.clone(),
2176 lang_registry.clone(),
2177 fs.clone(),
2178 &mut cx_b.to_async(),
2179 )
2180 .await
2181 .unwrap();
2182
2183 // Client A sees that a guest has joined.
2184 project_a
2185 .condition(cx_a, |p, _| p.collaborators().len() == 1)
2186 .await;
2187
2188 // Drop client B's connection and ensure client A observes client B leaving the project.
2189 client_b.disconnect(&cx_b.to_async()).unwrap();
2190 project_a
2191 .condition(cx_a, |p, _| p.collaborators().len() == 0)
2192 .await;
2193
2194 // Rejoin the project as client B
2195 let _project_b = Project::remote(
2196 project_id,
2197 client_b.clone(),
2198 client_b.user_store.clone(),
2199 lang_registry.clone(),
2200 fs.clone(),
2201 &mut cx_b.to_async(),
2202 )
2203 .await
2204 .unwrap();
2205
2206 // Client A sees that a guest has re-joined.
2207 project_a
2208 .condition(cx_a, |p, _| p.collaborators().len() == 1)
2209 .await;
2210
2211 // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2212 client_b.wait_for_current_user(cx_b).await;
2213 server.disconnect_client(client_b.current_user_id(cx_b));
2214 cx_a.foreground().advance_clock(Duration::from_secs(3));
2215 project_a
2216 .condition(cx_a, |p, _| p.collaborators().len() == 0)
2217 .await;
2218 }
2219
2220 #[gpui::test(iterations = 10)]
2221 async fn test_collaborating_with_diagnostics(
2222 cx_a: &mut TestAppContext,
2223 cx_b: &mut TestAppContext,
2224 ) {
2225 cx_a.foreground().forbid_parking();
2226 let lang_registry = Arc::new(LanguageRegistry::test());
2227 let fs = FakeFs::new(cx_a.background());
2228
2229 // Set up a fake language server.
2230 let mut language = Language::new(
2231 LanguageConfig {
2232 name: "Rust".into(),
2233 path_suffixes: vec!["rs".to_string()],
2234 ..Default::default()
2235 },
2236 Some(tree_sitter_rust::language()),
2237 );
2238 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2239 lang_registry.add(Arc::new(language));
2240
2241 // Connect to a server as 2 clients.
2242 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2243 let client_a = server.create_client(cx_a, "user_a").await;
2244 let client_b = server.create_client(cx_b, "user_b").await;
2245
2246 // Share a project as client A
2247 fs.insert_tree(
2248 "/a",
2249 json!({
2250 ".zed.toml": r#"collaborators = ["user_b"]"#,
2251 "a.rs": "let one = two",
2252 "other.rs": "",
2253 }),
2254 )
2255 .await;
2256 let project_a = cx_a.update(|cx| {
2257 Project::local(
2258 client_a.clone(),
2259 client_a.user_store.clone(),
2260 lang_registry.clone(),
2261 fs.clone(),
2262 cx,
2263 )
2264 });
2265 let (worktree_a, _) = project_a
2266 .update(cx_a, |p, cx| {
2267 p.find_or_create_local_worktree("/a", true, cx)
2268 })
2269 .await
2270 .unwrap();
2271 worktree_a
2272 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2273 .await;
2274 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2275 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2276 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2277
2278 // Cause the language server to start.
2279 let _ = cx_a
2280 .background()
2281 .spawn(project_a.update(cx_a, |project, cx| {
2282 project.open_buffer(
2283 ProjectPath {
2284 worktree_id,
2285 path: Path::new("other.rs").into(),
2286 },
2287 cx,
2288 )
2289 }))
2290 .await
2291 .unwrap();
2292
2293 // Simulate a language server reporting errors for a file.
2294 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2295 fake_language_server
2296 .receive_notification::<lsp::notification::DidOpenTextDocument>()
2297 .await;
2298 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2299 lsp::PublishDiagnosticsParams {
2300 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2301 version: None,
2302 diagnostics: vec![lsp::Diagnostic {
2303 severity: Some(lsp::DiagnosticSeverity::ERROR),
2304 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2305 message: "message 1".to_string(),
2306 ..Default::default()
2307 }],
2308 },
2309 );
2310
2311 // Wait for server to see the diagnostics update.
2312 server
2313 .condition(|store| {
2314 let worktree = store
2315 .project(project_id)
2316 .unwrap()
2317 .share
2318 .as_ref()
2319 .unwrap()
2320 .worktrees
2321 .get(&worktree_id.to_proto())
2322 .unwrap();
2323
2324 !worktree.diagnostic_summaries.is_empty()
2325 })
2326 .await;
2327
2328 // Join the worktree as client B.
2329 let project_b = Project::remote(
2330 project_id,
2331 client_b.clone(),
2332 client_b.user_store.clone(),
2333 lang_registry.clone(),
2334 fs.clone(),
2335 &mut cx_b.to_async(),
2336 )
2337 .await
2338 .unwrap();
2339
2340 project_b.read_with(cx_b, |project, cx| {
2341 assert_eq!(
2342 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2343 &[(
2344 ProjectPath {
2345 worktree_id,
2346 path: Arc::from(Path::new("a.rs")),
2347 },
2348 DiagnosticSummary {
2349 error_count: 1,
2350 warning_count: 0,
2351 ..Default::default()
2352 },
2353 )]
2354 )
2355 });
2356
2357 // Simulate a language server reporting more errors for a file.
2358 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2359 lsp::PublishDiagnosticsParams {
2360 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2361 version: None,
2362 diagnostics: vec![
2363 lsp::Diagnostic {
2364 severity: Some(lsp::DiagnosticSeverity::ERROR),
2365 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2366 message: "message 1".to_string(),
2367 ..Default::default()
2368 },
2369 lsp::Diagnostic {
2370 severity: Some(lsp::DiagnosticSeverity::WARNING),
2371 range: lsp::Range::new(
2372 lsp::Position::new(0, 10),
2373 lsp::Position::new(0, 13),
2374 ),
2375 message: "message 2".to_string(),
2376 ..Default::default()
2377 },
2378 ],
2379 },
2380 );
2381
2382 // Client b gets the updated summaries
2383 project_b
2384 .condition(&cx_b, |project, cx| {
2385 project.diagnostic_summaries(cx).collect::<Vec<_>>()
2386 == &[(
2387 ProjectPath {
2388 worktree_id,
2389 path: Arc::from(Path::new("a.rs")),
2390 },
2391 DiagnosticSummary {
2392 error_count: 1,
2393 warning_count: 1,
2394 ..Default::default()
2395 },
2396 )]
2397 })
2398 .await;
2399
2400 // Open the file with the errors on client B. They should be present.
2401 let buffer_b = cx_b
2402 .background()
2403 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2404 .await
2405 .unwrap();
2406
2407 buffer_b.read_with(cx_b, |buffer, _| {
2408 assert_eq!(
2409 buffer
2410 .snapshot()
2411 .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2412 .map(|entry| entry)
2413 .collect::<Vec<_>>(),
2414 &[
2415 DiagnosticEntry {
2416 range: Point::new(0, 4)..Point::new(0, 7),
2417 diagnostic: Diagnostic {
2418 group_id: 0,
2419 message: "message 1".to_string(),
2420 severity: lsp::DiagnosticSeverity::ERROR,
2421 is_primary: true,
2422 ..Default::default()
2423 }
2424 },
2425 DiagnosticEntry {
2426 range: Point::new(0, 10)..Point::new(0, 13),
2427 diagnostic: Diagnostic {
2428 group_id: 1,
2429 severity: lsp::DiagnosticSeverity::WARNING,
2430 message: "message 2".to_string(),
2431 is_primary: true,
2432 ..Default::default()
2433 }
2434 }
2435 ]
2436 );
2437 });
2438
2439 // Simulate a language server reporting no errors for a file.
2440 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2441 lsp::PublishDiagnosticsParams {
2442 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2443 version: None,
2444 diagnostics: vec![],
2445 },
2446 );
2447 project_a
2448 .condition(cx_a, |project, cx| {
2449 project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2450 })
2451 .await;
2452 project_b
2453 .condition(cx_b, |project, cx| {
2454 project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2455 })
2456 .await;
2457 }
2458
2459 #[gpui::test(iterations = 10)]
2460 async fn test_collaborating_with_completion(
2461 cx_a: &mut TestAppContext,
2462 cx_b: &mut TestAppContext,
2463 ) {
2464 cx_a.foreground().forbid_parking();
2465 let lang_registry = Arc::new(LanguageRegistry::test());
2466 let fs = FakeFs::new(cx_a.background());
2467
2468 // Set up a fake language server.
2469 let mut language = Language::new(
2470 LanguageConfig {
2471 name: "Rust".into(),
2472 path_suffixes: vec!["rs".to_string()],
2473 ..Default::default()
2474 },
2475 Some(tree_sitter_rust::language()),
2476 );
2477 let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
2478 capabilities: lsp::ServerCapabilities {
2479 completion_provider: Some(lsp::CompletionOptions {
2480 trigger_characters: Some(vec![".".to_string()]),
2481 ..Default::default()
2482 }),
2483 ..Default::default()
2484 },
2485 ..Default::default()
2486 });
2487 lang_registry.add(Arc::new(language));
2488
2489 // Connect to a server as 2 clients.
2490 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2491 let client_a = server.create_client(cx_a, "user_a").await;
2492 let client_b = server.create_client(cx_b, "user_b").await;
2493
2494 // Share a project as client A
2495 fs.insert_tree(
2496 "/a",
2497 json!({
2498 ".zed.toml": r#"collaborators = ["user_b"]"#,
2499 "main.rs": "fn main() { a }",
2500 "other.rs": "",
2501 }),
2502 )
2503 .await;
2504 let project_a = cx_a.update(|cx| {
2505 Project::local(
2506 client_a.clone(),
2507 client_a.user_store.clone(),
2508 lang_registry.clone(),
2509 fs.clone(),
2510 cx,
2511 )
2512 });
2513 let (worktree_a, _) = project_a
2514 .update(cx_a, |p, cx| {
2515 p.find_or_create_local_worktree("/a", true, cx)
2516 })
2517 .await
2518 .unwrap();
2519 worktree_a
2520 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2521 .await;
2522 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2523 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2524 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2525
2526 // Join the worktree as client B.
2527 let project_b = Project::remote(
2528 project_id,
2529 client_b.clone(),
2530 client_b.user_store.clone(),
2531 lang_registry.clone(),
2532 fs.clone(),
2533 &mut cx_b.to_async(),
2534 )
2535 .await
2536 .unwrap();
2537
2538 // Open a file in an editor as the guest.
2539 let buffer_b = project_b
2540 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2541 .await
2542 .unwrap();
2543 let (window_b, _) = cx_b.add_window(|_| EmptyView);
2544 let editor_b = cx_b.add_view(window_b, |cx| {
2545 Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
2546 });
2547
2548 let fake_language_server = fake_language_servers.next().await.unwrap();
2549 buffer_b
2550 .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2551 .await;
2552
2553 // Type a completion trigger character as the guest.
2554 editor_b.update(cx_b, |editor, cx| {
2555 editor.select_ranges([13..13], None, cx);
2556 editor.handle_input(&Input(".".into()), cx);
2557 cx.focus(&editor_b);
2558 });
2559
2560 // Receive a completion request as the host's language server.
2561 // Return some completions from the host's language server.
2562 cx_a.foreground().start_waiting();
2563 fake_language_server
2564 .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
2565 assert_eq!(
2566 params.text_document_position.text_document.uri,
2567 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2568 );
2569 assert_eq!(
2570 params.text_document_position.position,
2571 lsp::Position::new(0, 14),
2572 );
2573
2574 Ok(Some(lsp::CompletionResponse::Array(vec![
2575 lsp::CompletionItem {
2576 label: "first_method(…)".into(),
2577 detail: Some("fn(&mut self, B) -> C".into()),
2578 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2579 new_text: "first_method($1)".to_string(),
2580 range: lsp::Range::new(
2581 lsp::Position::new(0, 14),
2582 lsp::Position::new(0, 14),
2583 ),
2584 })),
2585 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2586 ..Default::default()
2587 },
2588 lsp::CompletionItem {
2589 label: "second_method(…)".into(),
2590 detail: Some("fn(&mut self, C) -> D<E>".into()),
2591 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2592 new_text: "second_method()".to_string(),
2593 range: lsp::Range::new(
2594 lsp::Position::new(0, 14),
2595 lsp::Position::new(0, 14),
2596 ),
2597 })),
2598 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2599 ..Default::default()
2600 },
2601 ])))
2602 })
2603 .next()
2604 .await
2605 .unwrap();
2606 cx_a.foreground().finish_waiting();
2607
2608 // Open the buffer on the host.
2609 let buffer_a = project_a
2610 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2611 .await
2612 .unwrap();
2613 buffer_a
2614 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2615 .await;
2616
2617 // Confirm a completion on the guest.
2618 editor_b
2619 .condition(&cx_b, |editor, _| editor.context_menu_visible())
2620 .await;
2621 editor_b.update(cx_b, |editor, cx| {
2622 editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
2623 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2624 });
2625
2626 // Return a resolved completion from the host's language server.
2627 // The resolved completion has an additional text edit.
2628 fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
2629 |params, _| async move {
2630 assert_eq!(params.label, "first_method(…)");
2631 Ok(lsp::CompletionItem {
2632 label: "first_method(…)".into(),
2633 detail: Some("fn(&mut self, B) -> C".into()),
2634 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2635 new_text: "first_method($1)".to_string(),
2636 range: lsp::Range::new(
2637 lsp::Position::new(0, 14),
2638 lsp::Position::new(0, 14),
2639 ),
2640 })),
2641 additional_text_edits: Some(vec![lsp::TextEdit {
2642 new_text: "use d::SomeTrait;\n".to_string(),
2643 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2644 }]),
2645 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2646 ..Default::default()
2647 })
2648 },
2649 );
2650
2651 // The additional edit is applied.
2652 buffer_a
2653 .condition(&cx_a, |buffer, _| {
2654 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2655 })
2656 .await;
2657 buffer_b
2658 .condition(&cx_b, |buffer, _| {
2659 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2660 })
2661 .await;
2662 }
2663
2664 #[gpui::test(iterations = 10)]
2665 async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2666 cx_a.foreground().forbid_parking();
2667 let lang_registry = Arc::new(LanguageRegistry::test());
2668 let fs = FakeFs::new(cx_a.background());
2669
2670 // Connect to a server as 2 clients.
2671 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2672 let client_a = server.create_client(cx_a, "user_a").await;
2673 let client_b = server.create_client(cx_b, "user_b").await;
2674
2675 // Share a project as client A
2676 fs.insert_tree(
2677 "/a",
2678 json!({
2679 ".zed.toml": r#"collaborators = ["user_b"]"#,
2680 "a.rs": "let one = 1;",
2681 }),
2682 )
2683 .await;
2684 let project_a = cx_a.update(|cx| {
2685 Project::local(
2686 client_a.clone(),
2687 client_a.user_store.clone(),
2688 lang_registry.clone(),
2689 fs.clone(),
2690 cx,
2691 )
2692 });
2693 let (worktree_a, _) = project_a
2694 .update(cx_a, |p, cx| {
2695 p.find_or_create_local_worktree("/a", true, cx)
2696 })
2697 .await
2698 .unwrap();
2699 worktree_a
2700 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2701 .await;
2702 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2703 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2704 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2705 let buffer_a = project_a
2706 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
2707 .await
2708 .unwrap();
2709
2710 // Join the worktree as client B.
2711 let project_b = Project::remote(
2712 project_id,
2713 client_b.clone(),
2714 client_b.user_store.clone(),
2715 lang_registry.clone(),
2716 fs.clone(),
2717 &mut cx_b.to_async(),
2718 )
2719 .await
2720 .unwrap();
2721
2722 let buffer_b = cx_b
2723 .background()
2724 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2725 .await
2726 .unwrap();
2727 buffer_b.update(cx_b, |buffer, cx| {
2728 buffer.edit([4..7], "six", cx);
2729 buffer.edit([10..11], "6", cx);
2730 assert_eq!(buffer.text(), "let six = 6;");
2731 assert!(buffer.is_dirty());
2732 assert!(!buffer.has_conflict());
2733 });
2734 buffer_a
2735 .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
2736 .await;
2737
2738 fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
2739 .await
2740 .unwrap();
2741 buffer_a
2742 .condition(cx_a, |buffer, _| buffer.has_conflict())
2743 .await;
2744 buffer_b
2745 .condition(cx_b, |buffer, _| buffer.has_conflict())
2746 .await;
2747
2748 project_b
2749 .update(cx_b, |project, cx| {
2750 project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
2751 })
2752 .await
2753 .unwrap();
2754 buffer_a.read_with(cx_a, |buffer, _| {
2755 assert_eq!(buffer.text(), "let seven = 7;");
2756 assert!(!buffer.is_dirty());
2757 assert!(!buffer.has_conflict());
2758 });
2759 buffer_b.read_with(cx_b, |buffer, _| {
2760 assert_eq!(buffer.text(), "let seven = 7;");
2761 assert!(!buffer.is_dirty());
2762 assert!(!buffer.has_conflict());
2763 });
2764
2765 buffer_a.update(cx_a, |buffer, cx| {
2766 // Undoing on the host is a no-op when the reload was initiated by the guest.
2767 buffer.undo(cx);
2768 assert_eq!(buffer.text(), "let seven = 7;");
2769 assert!(!buffer.is_dirty());
2770 assert!(!buffer.has_conflict());
2771 });
2772 buffer_b.update(cx_b, |buffer, cx| {
2773 // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
2774 buffer.undo(cx);
2775 assert_eq!(buffer.text(), "let six = 6;");
2776 assert!(buffer.is_dirty());
2777 assert!(!buffer.has_conflict());
2778 });
2779 }
2780
2781 #[gpui::test(iterations = 10)]
2782 async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2783 cx_a.foreground().forbid_parking();
2784 let lang_registry = Arc::new(LanguageRegistry::test());
2785 let fs = FakeFs::new(cx_a.background());
2786
2787 // Set up a fake language server.
2788 let mut language = Language::new(
2789 LanguageConfig {
2790 name: "Rust".into(),
2791 path_suffixes: vec!["rs".to_string()],
2792 ..Default::default()
2793 },
2794 Some(tree_sitter_rust::language()),
2795 );
2796 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2797 lang_registry.add(Arc::new(language));
2798
2799 // Connect to a server as 2 clients.
2800 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2801 let client_a = server.create_client(cx_a, "user_a").await;
2802 let client_b = server.create_client(cx_b, "user_b").await;
2803
2804 // Share a project as client A
2805 fs.insert_tree(
2806 "/a",
2807 json!({
2808 ".zed.toml": r#"collaborators = ["user_b"]"#,
2809 "a.rs": "let one = two",
2810 }),
2811 )
2812 .await;
2813 let project_a = cx_a.update(|cx| {
2814 Project::local(
2815 client_a.clone(),
2816 client_a.user_store.clone(),
2817 lang_registry.clone(),
2818 fs.clone(),
2819 cx,
2820 )
2821 });
2822 let (worktree_a, _) = project_a
2823 .update(cx_a, |p, cx| {
2824 p.find_or_create_local_worktree("/a", true, cx)
2825 })
2826 .await
2827 .unwrap();
2828 worktree_a
2829 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2830 .await;
2831 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2832 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2833 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2834
2835 // Join the worktree as client B.
2836 let project_b = Project::remote(
2837 project_id,
2838 client_b.clone(),
2839 client_b.user_store.clone(),
2840 lang_registry.clone(),
2841 fs.clone(),
2842 &mut cx_b.to_async(),
2843 )
2844 .await
2845 .unwrap();
2846
2847 let buffer_b = cx_b
2848 .background()
2849 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2850 .await
2851 .unwrap();
2852
2853 let fake_language_server = fake_language_servers.next().await.unwrap();
2854 fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
2855 Ok(Some(vec![
2856 lsp::TextEdit {
2857 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2858 new_text: "h".to_string(),
2859 },
2860 lsp::TextEdit {
2861 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2862 new_text: "y".to_string(),
2863 },
2864 ]))
2865 });
2866
2867 project_b
2868 .update(cx_b, |project, cx| {
2869 project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2870 })
2871 .await
2872 .unwrap();
2873 assert_eq!(
2874 buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
2875 "let honey = two"
2876 );
2877 }
2878
2879 #[gpui::test(iterations = 10)]
2880 async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2881 cx_a.foreground().forbid_parking();
2882 let lang_registry = Arc::new(LanguageRegistry::test());
2883 let fs = FakeFs::new(cx_a.background());
2884 fs.insert_tree(
2885 "/root-1",
2886 json!({
2887 ".zed.toml": r#"collaborators = ["user_b"]"#,
2888 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2889 }),
2890 )
2891 .await;
2892 fs.insert_tree(
2893 "/root-2",
2894 json!({
2895 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2896 }),
2897 )
2898 .await;
2899
2900 // Set up a fake language server.
2901 let mut language = Language::new(
2902 LanguageConfig {
2903 name: "Rust".into(),
2904 path_suffixes: vec!["rs".to_string()],
2905 ..Default::default()
2906 },
2907 Some(tree_sitter_rust::language()),
2908 );
2909 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2910 lang_registry.add(Arc::new(language));
2911
2912 // Connect to a server as 2 clients.
2913 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2914 let client_a = server.create_client(cx_a, "user_a").await;
2915 let client_b = server.create_client(cx_b, "user_b").await;
2916
2917 // Share a project as client A
2918 let project_a = cx_a.update(|cx| {
2919 Project::local(
2920 client_a.clone(),
2921 client_a.user_store.clone(),
2922 lang_registry.clone(),
2923 fs.clone(),
2924 cx,
2925 )
2926 });
2927 let (worktree_a, _) = project_a
2928 .update(cx_a, |p, cx| {
2929 p.find_or_create_local_worktree("/root-1", true, cx)
2930 })
2931 .await
2932 .unwrap();
2933 worktree_a
2934 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2935 .await;
2936 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2937 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2938 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2939
2940 // Join the worktree as client B.
2941 let project_b = Project::remote(
2942 project_id,
2943 client_b.clone(),
2944 client_b.user_store.clone(),
2945 lang_registry.clone(),
2946 fs.clone(),
2947 &mut cx_b.to_async(),
2948 )
2949 .await
2950 .unwrap();
2951
2952 // Open the file on client B.
2953 let buffer_b = cx_b
2954 .background()
2955 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2956 .await
2957 .unwrap();
2958
2959 // Request the definition of a symbol as the guest.
2960 let fake_language_server = fake_language_servers.next().await.unwrap();
2961 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
2962 |_, _| async move {
2963 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
2964 lsp::Location::new(
2965 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2966 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2967 ),
2968 )))
2969 },
2970 );
2971
2972 let definitions_1 = project_b
2973 .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
2974 .await
2975 .unwrap();
2976 cx_b.read(|cx| {
2977 assert_eq!(definitions_1.len(), 1);
2978 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2979 let target_buffer = definitions_1[0].buffer.read(cx);
2980 assert_eq!(
2981 target_buffer.text(),
2982 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2983 );
2984 assert_eq!(
2985 definitions_1[0].range.to_point(target_buffer),
2986 Point::new(0, 6)..Point::new(0, 9)
2987 );
2988 });
2989
2990 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2991 // the previous call to `definition`.
2992 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
2993 |_, _| async move {
2994 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
2995 lsp::Location::new(
2996 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2997 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2998 ),
2999 )))
3000 },
3001 );
3002
3003 let definitions_2 = project_b
3004 .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
3005 .await
3006 .unwrap();
3007 cx_b.read(|cx| {
3008 assert_eq!(definitions_2.len(), 1);
3009 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3010 let target_buffer = definitions_2[0].buffer.read(cx);
3011 assert_eq!(
3012 target_buffer.text(),
3013 "const TWO: usize = 2;\nconst THREE: usize = 3;"
3014 );
3015 assert_eq!(
3016 definitions_2[0].range.to_point(target_buffer),
3017 Point::new(1, 6)..Point::new(1, 11)
3018 );
3019 });
3020 assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
3021 }
3022
3023 #[gpui::test(iterations = 10)]
3024 async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3025 cx_a.foreground().forbid_parking();
3026 let lang_registry = Arc::new(LanguageRegistry::test());
3027 let fs = FakeFs::new(cx_a.background());
3028 fs.insert_tree(
3029 "/root-1",
3030 json!({
3031 ".zed.toml": r#"collaborators = ["user_b"]"#,
3032 "one.rs": "const ONE: usize = 1;",
3033 "two.rs": "const TWO: usize = one::ONE + one::ONE;",
3034 }),
3035 )
3036 .await;
3037 fs.insert_tree(
3038 "/root-2",
3039 json!({
3040 "three.rs": "const THREE: usize = two::TWO + one::ONE;",
3041 }),
3042 )
3043 .await;
3044
3045 // Set up a fake language server.
3046 let mut language = Language::new(
3047 LanguageConfig {
3048 name: "Rust".into(),
3049 path_suffixes: vec!["rs".to_string()],
3050 ..Default::default()
3051 },
3052 Some(tree_sitter_rust::language()),
3053 );
3054 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3055 lang_registry.add(Arc::new(language));
3056
3057 // Connect to a server as 2 clients.
3058 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3059 let client_a = server.create_client(cx_a, "user_a").await;
3060 let client_b = server.create_client(cx_b, "user_b").await;
3061
3062 // Share a project as client A
3063 let project_a = cx_a.update(|cx| {
3064 Project::local(
3065 client_a.clone(),
3066 client_a.user_store.clone(),
3067 lang_registry.clone(),
3068 fs.clone(),
3069 cx,
3070 )
3071 });
3072 let (worktree_a, _) = project_a
3073 .update(cx_a, |p, cx| {
3074 p.find_or_create_local_worktree("/root-1", true, cx)
3075 })
3076 .await
3077 .unwrap();
3078 worktree_a
3079 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3080 .await;
3081 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3082 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3083 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3084
3085 // Join the worktree as client B.
3086 let project_b = Project::remote(
3087 project_id,
3088 client_b.clone(),
3089 client_b.user_store.clone(),
3090 lang_registry.clone(),
3091 fs.clone(),
3092 &mut cx_b.to_async(),
3093 )
3094 .await
3095 .unwrap();
3096
3097 // Open the file on client B.
3098 let buffer_b = cx_b
3099 .background()
3100 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3101 .await
3102 .unwrap();
3103
3104 // Request references to a symbol as the guest.
3105 let fake_language_server = fake_language_servers.next().await.unwrap();
3106 fake_language_server.handle_request::<lsp::request::References, _, _>(
3107 |params, _| async move {
3108 assert_eq!(
3109 params.text_document_position.text_document.uri.as_str(),
3110 "file:///root-1/one.rs"
3111 );
3112 Ok(Some(vec![
3113 lsp::Location {
3114 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3115 range: lsp::Range::new(
3116 lsp::Position::new(0, 24),
3117 lsp::Position::new(0, 27),
3118 ),
3119 },
3120 lsp::Location {
3121 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3122 range: lsp::Range::new(
3123 lsp::Position::new(0, 35),
3124 lsp::Position::new(0, 38),
3125 ),
3126 },
3127 lsp::Location {
3128 uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3129 range: lsp::Range::new(
3130 lsp::Position::new(0, 37),
3131 lsp::Position::new(0, 40),
3132 ),
3133 },
3134 ]))
3135 },
3136 );
3137
3138 let references = project_b
3139 .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3140 .await
3141 .unwrap();
3142 cx_b.read(|cx| {
3143 assert_eq!(references.len(), 3);
3144 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3145
3146 let two_buffer = references[0].buffer.read(cx);
3147 let three_buffer = references[2].buffer.read(cx);
3148 assert_eq!(
3149 two_buffer.file().unwrap().path().as_ref(),
3150 Path::new("two.rs")
3151 );
3152 assert_eq!(references[1].buffer, references[0].buffer);
3153 assert_eq!(
3154 three_buffer.file().unwrap().full_path(cx),
3155 Path::new("three.rs")
3156 );
3157
3158 assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3159 assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3160 assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3161 });
3162 }
3163
3164 #[gpui::test(iterations = 10)]
3165 async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3166 cx_a.foreground().forbid_parking();
3167 let lang_registry = Arc::new(LanguageRegistry::test());
3168 let fs = FakeFs::new(cx_a.background());
3169 fs.insert_tree(
3170 "/root-1",
3171 json!({
3172 ".zed.toml": r#"collaborators = ["user_b"]"#,
3173 "a": "hello world",
3174 "b": "goodnight moon",
3175 "c": "a world of goo",
3176 "d": "world champion of clown world",
3177 }),
3178 )
3179 .await;
3180 fs.insert_tree(
3181 "/root-2",
3182 json!({
3183 "e": "disney world is fun",
3184 }),
3185 )
3186 .await;
3187
3188 // Connect to a server as 2 clients.
3189 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3190 let client_a = server.create_client(cx_a, "user_a").await;
3191 let client_b = server.create_client(cx_b, "user_b").await;
3192
3193 // Share a project as client A
3194 let project_a = cx_a.update(|cx| {
3195 Project::local(
3196 client_a.clone(),
3197 client_a.user_store.clone(),
3198 lang_registry.clone(),
3199 fs.clone(),
3200 cx,
3201 )
3202 });
3203 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3204
3205 let (worktree_1, _) = project_a
3206 .update(cx_a, |p, cx| {
3207 p.find_or_create_local_worktree("/root-1", true, cx)
3208 })
3209 .await
3210 .unwrap();
3211 worktree_1
3212 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3213 .await;
3214 let (worktree_2, _) = project_a
3215 .update(cx_a, |p, cx| {
3216 p.find_or_create_local_worktree("/root-2", true, cx)
3217 })
3218 .await
3219 .unwrap();
3220 worktree_2
3221 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3222 .await;
3223
3224 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3225
3226 // Join the worktree as client B.
3227 let project_b = Project::remote(
3228 project_id,
3229 client_b.clone(),
3230 client_b.user_store.clone(),
3231 lang_registry.clone(),
3232 fs.clone(),
3233 &mut cx_b.to_async(),
3234 )
3235 .await
3236 .unwrap();
3237
3238 let results = project_b
3239 .update(cx_b, |project, cx| {
3240 project.search(SearchQuery::text("world", false, false), cx)
3241 })
3242 .await
3243 .unwrap();
3244
3245 let mut ranges_by_path = results
3246 .into_iter()
3247 .map(|(buffer, ranges)| {
3248 buffer.read_with(cx_b, |buffer, cx| {
3249 let path = buffer.file().unwrap().full_path(cx);
3250 let offset_ranges = ranges
3251 .into_iter()
3252 .map(|range| range.to_offset(buffer))
3253 .collect::<Vec<_>>();
3254 (path, offset_ranges)
3255 })
3256 })
3257 .collect::<Vec<_>>();
3258 ranges_by_path.sort_by_key(|(path, _)| path.clone());
3259
3260 assert_eq!(
3261 ranges_by_path,
3262 &[
3263 (PathBuf::from("root-1/a"), vec![6..11]),
3264 (PathBuf::from("root-1/c"), vec![2..7]),
3265 (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3266 (PathBuf::from("root-2/e"), vec![7..12]),
3267 ]
3268 );
3269 }
3270
3271 #[gpui::test(iterations = 10)]
3272 async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3273 cx_a.foreground().forbid_parking();
3274 let lang_registry = Arc::new(LanguageRegistry::test());
3275 let fs = FakeFs::new(cx_a.background());
3276 fs.insert_tree(
3277 "/root-1",
3278 json!({
3279 ".zed.toml": r#"collaborators = ["user_b"]"#,
3280 "main.rs": "fn double(number: i32) -> i32 { number + number }",
3281 }),
3282 )
3283 .await;
3284
3285 // Set up a fake language server.
3286 let mut language = Language::new(
3287 LanguageConfig {
3288 name: "Rust".into(),
3289 path_suffixes: vec!["rs".to_string()],
3290 ..Default::default()
3291 },
3292 Some(tree_sitter_rust::language()),
3293 );
3294 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3295 lang_registry.add(Arc::new(language));
3296
3297 // Connect to a server as 2 clients.
3298 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3299 let client_a = server.create_client(cx_a, "user_a").await;
3300 let client_b = server.create_client(cx_b, "user_b").await;
3301
3302 // Share a project as client A
3303 let project_a = cx_a.update(|cx| {
3304 Project::local(
3305 client_a.clone(),
3306 client_a.user_store.clone(),
3307 lang_registry.clone(),
3308 fs.clone(),
3309 cx,
3310 )
3311 });
3312 let (worktree_a, _) = project_a
3313 .update(cx_a, |p, cx| {
3314 p.find_or_create_local_worktree("/root-1", true, cx)
3315 })
3316 .await
3317 .unwrap();
3318 worktree_a
3319 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3320 .await;
3321 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3322 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3323 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3324
3325 // Join the worktree as client B.
3326 let project_b = Project::remote(
3327 project_id,
3328 client_b.clone(),
3329 client_b.user_store.clone(),
3330 lang_registry.clone(),
3331 fs.clone(),
3332 &mut cx_b.to_async(),
3333 )
3334 .await
3335 .unwrap();
3336
3337 // Open the file on client B.
3338 let buffer_b = cx_b
3339 .background()
3340 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
3341 .await
3342 .unwrap();
3343
3344 // Request document highlights as the guest.
3345 let fake_language_server = fake_language_servers.next().await.unwrap();
3346 fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
3347 |params, _| async move {
3348 assert_eq!(
3349 params
3350 .text_document_position_params
3351 .text_document
3352 .uri
3353 .as_str(),
3354 "file:///root-1/main.rs"
3355 );
3356 assert_eq!(
3357 params.text_document_position_params.position,
3358 lsp::Position::new(0, 34)
3359 );
3360 Ok(Some(vec![
3361 lsp::DocumentHighlight {
3362 kind: Some(lsp::DocumentHighlightKind::WRITE),
3363 range: lsp::Range::new(
3364 lsp::Position::new(0, 10),
3365 lsp::Position::new(0, 16),
3366 ),
3367 },
3368 lsp::DocumentHighlight {
3369 kind: Some(lsp::DocumentHighlightKind::READ),
3370 range: lsp::Range::new(
3371 lsp::Position::new(0, 32),
3372 lsp::Position::new(0, 38),
3373 ),
3374 },
3375 lsp::DocumentHighlight {
3376 kind: Some(lsp::DocumentHighlightKind::READ),
3377 range: lsp::Range::new(
3378 lsp::Position::new(0, 41),
3379 lsp::Position::new(0, 47),
3380 ),
3381 },
3382 ]))
3383 },
3384 );
3385
3386 let highlights = project_b
3387 .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
3388 .await
3389 .unwrap();
3390 buffer_b.read_with(cx_b, |buffer, _| {
3391 let snapshot = buffer.snapshot();
3392
3393 let highlights = highlights
3394 .into_iter()
3395 .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3396 .collect::<Vec<_>>();
3397 assert_eq!(
3398 highlights,
3399 &[
3400 (lsp::DocumentHighlightKind::WRITE, 10..16),
3401 (lsp::DocumentHighlightKind::READ, 32..38),
3402 (lsp::DocumentHighlightKind::READ, 41..47)
3403 ]
3404 )
3405 });
3406 }
3407
3408 #[gpui::test(iterations = 10)]
3409 async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3410 cx_a.foreground().forbid_parking();
3411 let lang_registry = Arc::new(LanguageRegistry::test());
3412 let fs = FakeFs::new(cx_a.background());
3413 fs.insert_tree(
3414 "/code",
3415 json!({
3416 "crate-1": {
3417 ".zed.toml": r#"collaborators = ["user_b"]"#,
3418 "one.rs": "const ONE: usize = 1;",
3419 },
3420 "crate-2": {
3421 "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3422 },
3423 "private": {
3424 "passwords.txt": "the-password",
3425 }
3426 }),
3427 )
3428 .await;
3429
3430 // Set up a fake language server.
3431 let mut language = Language::new(
3432 LanguageConfig {
3433 name: "Rust".into(),
3434 path_suffixes: vec!["rs".to_string()],
3435 ..Default::default()
3436 },
3437 Some(tree_sitter_rust::language()),
3438 );
3439 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3440 lang_registry.add(Arc::new(language));
3441
3442 // Connect to a server as 2 clients.
3443 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3444 let client_a = server.create_client(cx_a, "user_a").await;
3445 let client_b = server.create_client(cx_b, "user_b").await;
3446
3447 // Share a project as client A
3448 let project_a = cx_a.update(|cx| {
3449 Project::local(
3450 client_a.clone(),
3451 client_a.user_store.clone(),
3452 lang_registry.clone(),
3453 fs.clone(),
3454 cx,
3455 )
3456 });
3457 let (worktree_a, _) = project_a
3458 .update(cx_a, |p, cx| {
3459 p.find_or_create_local_worktree("/code/crate-1", true, cx)
3460 })
3461 .await
3462 .unwrap();
3463 worktree_a
3464 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3465 .await;
3466 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3467 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3468 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3469
3470 // Join the worktree as client B.
3471 let project_b = Project::remote(
3472 project_id,
3473 client_b.clone(),
3474 client_b.user_store.clone(),
3475 lang_registry.clone(),
3476 fs.clone(),
3477 &mut cx_b.to_async(),
3478 )
3479 .await
3480 .unwrap();
3481
3482 // Cause the language server to start.
3483 let _buffer = cx_b
3484 .background()
3485 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3486 .await
3487 .unwrap();
3488
3489 let fake_language_server = fake_language_servers.next().await.unwrap();
3490 fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
3491 |_, _| async move {
3492 #[allow(deprecated)]
3493 Ok(Some(vec![lsp::SymbolInformation {
3494 name: "TWO".into(),
3495 location: lsp::Location {
3496 uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3497 range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3498 },
3499 kind: lsp::SymbolKind::CONSTANT,
3500 tags: None,
3501 container_name: None,
3502 deprecated: None,
3503 }]))
3504 },
3505 );
3506
3507 // Request the definition of a symbol as the guest.
3508 let symbols = project_b
3509 .update(cx_b, |p, cx| p.symbols("two", cx))
3510 .await
3511 .unwrap();
3512 assert_eq!(symbols.len(), 1);
3513 assert_eq!(symbols[0].name, "TWO");
3514
3515 // Open one of the returned symbols.
3516 let buffer_b_2 = project_b
3517 .update(cx_b, |project, cx| {
3518 project.open_buffer_for_symbol(&symbols[0], cx)
3519 })
3520 .await
3521 .unwrap();
3522 buffer_b_2.read_with(cx_b, |buffer, _| {
3523 assert_eq!(
3524 buffer.file().unwrap().path().as_ref(),
3525 Path::new("../crate-2/two.rs")
3526 );
3527 });
3528
3529 // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3530 let mut fake_symbol = symbols[0].clone();
3531 fake_symbol.path = Path::new("/code/secrets").into();
3532 let error = project_b
3533 .update(cx_b, |project, cx| {
3534 project.open_buffer_for_symbol(&fake_symbol, cx)
3535 })
3536 .await
3537 .unwrap_err();
3538 assert!(error.to_string().contains("invalid symbol signature"));
3539 }
3540
3541 #[gpui::test(iterations = 10)]
3542 async fn test_open_buffer_while_getting_definition_pointing_to_it(
3543 cx_a: &mut TestAppContext,
3544 cx_b: &mut TestAppContext,
3545 mut rng: StdRng,
3546 ) {
3547 cx_a.foreground().forbid_parking();
3548 let lang_registry = Arc::new(LanguageRegistry::test());
3549 let fs = FakeFs::new(cx_a.background());
3550 fs.insert_tree(
3551 "/root",
3552 json!({
3553 ".zed.toml": r#"collaborators = ["user_b"]"#,
3554 "a.rs": "const ONE: usize = b::TWO;",
3555 "b.rs": "const TWO: usize = 2",
3556 }),
3557 )
3558 .await;
3559
3560 // Set up a fake language server.
3561 let mut language = Language::new(
3562 LanguageConfig {
3563 name: "Rust".into(),
3564 path_suffixes: vec!["rs".to_string()],
3565 ..Default::default()
3566 },
3567 Some(tree_sitter_rust::language()),
3568 );
3569 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3570 lang_registry.add(Arc::new(language));
3571
3572 // Connect to a server as 2 clients.
3573 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3574 let client_a = server.create_client(cx_a, "user_a").await;
3575 let client_b = server.create_client(cx_b, "user_b").await;
3576
3577 // Share a project as client A
3578 let project_a = cx_a.update(|cx| {
3579 Project::local(
3580 client_a.clone(),
3581 client_a.user_store.clone(),
3582 lang_registry.clone(),
3583 fs.clone(),
3584 cx,
3585 )
3586 });
3587
3588 let (worktree_a, _) = project_a
3589 .update(cx_a, |p, cx| {
3590 p.find_or_create_local_worktree("/root", true, cx)
3591 })
3592 .await
3593 .unwrap();
3594 worktree_a
3595 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3596 .await;
3597 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3598 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3599 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3600
3601 // Join the worktree as client B.
3602 let project_b = Project::remote(
3603 project_id,
3604 client_b.clone(),
3605 client_b.user_store.clone(),
3606 lang_registry.clone(),
3607 fs.clone(),
3608 &mut cx_b.to_async(),
3609 )
3610 .await
3611 .unwrap();
3612
3613 let buffer_b1 = cx_b
3614 .background()
3615 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3616 .await
3617 .unwrap();
3618
3619 let fake_language_server = fake_language_servers.next().await.unwrap();
3620 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3621 |_, _| async move {
3622 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3623 lsp::Location::new(
3624 lsp::Url::from_file_path("/root/b.rs").unwrap(),
3625 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3626 ),
3627 )))
3628 },
3629 );
3630
3631 let definitions;
3632 let buffer_b2;
3633 if rng.gen() {
3634 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3635 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3636 } else {
3637 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3638 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3639 }
3640
3641 let buffer_b2 = buffer_b2.await.unwrap();
3642 let definitions = definitions.await.unwrap();
3643 assert_eq!(definitions.len(), 1);
3644 assert_eq!(definitions[0].buffer, buffer_b2);
3645 }
3646
3647 #[gpui::test(iterations = 10)]
3648 async fn test_collaborating_with_code_actions(
3649 cx_a: &mut TestAppContext,
3650 cx_b: &mut TestAppContext,
3651 ) {
3652 cx_a.foreground().forbid_parking();
3653 let lang_registry = Arc::new(LanguageRegistry::test());
3654 let fs = FakeFs::new(cx_a.background());
3655 cx_b.update(|cx| editor::init(cx));
3656
3657 // Set up a fake language server.
3658 let mut language = Language::new(
3659 LanguageConfig {
3660 name: "Rust".into(),
3661 path_suffixes: vec!["rs".to_string()],
3662 ..Default::default()
3663 },
3664 Some(tree_sitter_rust::language()),
3665 );
3666 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3667 lang_registry.add(Arc::new(language));
3668
3669 // Connect to a server as 2 clients.
3670 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3671 let client_a = server.create_client(cx_a, "user_a").await;
3672 let client_b = server.create_client(cx_b, "user_b").await;
3673
3674 // Share a project as client A
3675 fs.insert_tree(
3676 "/a",
3677 json!({
3678 ".zed.toml": r#"collaborators = ["user_b"]"#,
3679 "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3680 "other.rs": "pub fn foo() -> usize { 4 }",
3681 }),
3682 )
3683 .await;
3684 let project_a = cx_a.update(|cx| {
3685 Project::local(
3686 client_a.clone(),
3687 client_a.user_store.clone(),
3688 lang_registry.clone(),
3689 fs.clone(),
3690 cx,
3691 )
3692 });
3693 let (worktree_a, _) = project_a
3694 .update(cx_a, |p, cx| {
3695 p.find_or_create_local_worktree("/a", true, cx)
3696 })
3697 .await
3698 .unwrap();
3699 worktree_a
3700 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3701 .await;
3702 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3703 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3704 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3705
3706 // Join the worktree as client B.
3707 let project_b = Project::remote(
3708 project_id,
3709 client_b.clone(),
3710 client_b.user_store.clone(),
3711 lang_registry.clone(),
3712 fs.clone(),
3713 &mut cx_b.to_async(),
3714 )
3715 .await
3716 .unwrap();
3717 let mut params = cx_b.update(WorkspaceParams::test);
3718 params.languages = lang_registry.clone();
3719 params.client = client_b.client.clone();
3720 params.user_store = client_b.user_store.clone();
3721 params.project = project_b;
3722
3723 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
3724 let editor_b = workspace_b
3725 .update(cx_b, |workspace, cx| {
3726 workspace.open_path((worktree_id, "main.rs"), cx)
3727 })
3728 .await
3729 .unwrap()
3730 .downcast::<Editor>()
3731 .unwrap();
3732
3733 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3734 fake_language_server
3735 .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
3736 assert_eq!(
3737 params.text_document.uri,
3738 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3739 );
3740 assert_eq!(params.range.start, lsp::Position::new(0, 0));
3741 assert_eq!(params.range.end, lsp::Position::new(0, 0));
3742 Ok(None)
3743 })
3744 .next()
3745 .await;
3746
3747 // Move cursor to a location that contains code actions.
3748 editor_b.update(cx_b, |editor, cx| {
3749 editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3750 cx.focus(&editor_b);
3751 });
3752
3753 fake_language_server
3754 .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
3755 assert_eq!(
3756 params.text_document.uri,
3757 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3758 );
3759 assert_eq!(params.range.start, lsp::Position::new(1, 31));
3760 assert_eq!(params.range.end, lsp::Position::new(1, 31));
3761
3762 Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
3763 lsp::CodeAction {
3764 title: "Inline into all callers".to_string(),
3765 edit: Some(lsp::WorkspaceEdit {
3766 changes: Some(
3767 [
3768 (
3769 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3770 vec![lsp::TextEdit::new(
3771 lsp::Range::new(
3772 lsp::Position::new(1, 22),
3773 lsp::Position::new(1, 34),
3774 ),
3775 "4".to_string(),
3776 )],
3777 ),
3778 (
3779 lsp::Url::from_file_path("/a/other.rs").unwrap(),
3780 vec![lsp::TextEdit::new(
3781 lsp::Range::new(
3782 lsp::Position::new(0, 0),
3783 lsp::Position::new(0, 27),
3784 ),
3785 "".to_string(),
3786 )],
3787 ),
3788 ]
3789 .into_iter()
3790 .collect(),
3791 ),
3792 ..Default::default()
3793 }),
3794 data: Some(json!({
3795 "codeActionParams": {
3796 "range": {
3797 "start": {"line": 1, "column": 31},
3798 "end": {"line": 1, "column": 31},
3799 }
3800 }
3801 })),
3802 ..Default::default()
3803 },
3804 )]))
3805 })
3806 .next()
3807 .await;
3808
3809 // Toggle code actions and wait for them to display.
3810 editor_b.update(cx_b, |editor, cx| {
3811 editor.toggle_code_actions(
3812 &ToggleCodeActions {
3813 deployed_from_indicator: false,
3814 },
3815 cx,
3816 );
3817 });
3818 editor_b
3819 .condition(&cx_b, |editor, _| editor.context_menu_visible())
3820 .await;
3821
3822 fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3823
3824 // Confirming the code action will trigger a resolve request.
3825 let confirm_action = workspace_b
3826 .update(cx_b, |workspace, cx| {
3827 Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
3828 })
3829 .unwrap();
3830 fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
3831 |_, _| async move {
3832 Ok(lsp::CodeAction {
3833 title: "Inline into all callers".to_string(),
3834 edit: Some(lsp::WorkspaceEdit {
3835 changes: Some(
3836 [
3837 (
3838 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3839 vec![lsp::TextEdit::new(
3840 lsp::Range::new(
3841 lsp::Position::new(1, 22),
3842 lsp::Position::new(1, 34),
3843 ),
3844 "4".to_string(),
3845 )],
3846 ),
3847 (
3848 lsp::Url::from_file_path("/a/other.rs").unwrap(),
3849 vec![lsp::TextEdit::new(
3850 lsp::Range::new(
3851 lsp::Position::new(0, 0),
3852 lsp::Position::new(0, 27),
3853 ),
3854 "".to_string(),
3855 )],
3856 ),
3857 ]
3858 .into_iter()
3859 .collect(),
3860 ),
3861 ..Default::default()
3862 }),
3863 ..Default::default()
3864 })
3865 },
3866 );
3867
3868 // After the action is confirmed, an editor containing both modified files is opened.
3869 confirm_action.await.unwrap();
3870 let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3871 workspace
3872 .active_item(cx)
3873 .unwrap()
3874 .downcast::<Editor>()
3875 .unwrap()
3876 });
3877 code_action_editor.update(cx_b, |editor, cx| {
3878 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3879 editor.undo(&Undo, cx);
3880 assert_eq!(
3881 editor.text(cx),
3882 "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3883 );
3884 editor.redo(&Redo, cx);
3885 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3886 });
3887 }
3888
3889 #[gpui::test(iterations = 10)]
3890 async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3891 cx_a.foreground().forbid_parking();
3892 let lang_registry = Arc::new(LanguageRegistry::test());
3893 let fs = FakeFs::new(cx_a.background());
3894 cx_b.update(|cx| editor::init(cx));
3895
3896 // Set up a fake language server.
3897 let mut language = Language::new(
3898 LanguageConfig {
3899 name: "Rust".into(),
3900 path_suffixes: vec!["rs".to_string()],
3901 ..Default::default()
3902 },
3903 Some(tree_sitter_rust::language()),
3904 );
3905 let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
3906 capabilities: lsp::ServerCapabilities {
3907 rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
3908 prepare_provider: Some(true),
3909 work_done_progress_options: Default::default(),
3910 })),
3911 ..Default::default()
3912 },
3913 ..Default::default()
3914 });
3915 lang_registry.add(Arc::new(language));
3916
3917 // Connect to a server as 2 clients.
3918 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3919 let client_a = server.create_client(cx_a, "user_a").await;
3920 let client_b = server.create_client(cx_b, "user_b").await;
3921
3922 // Share a project as client A
3923 fs.insert_tree(
3924 "/dir",
3925 json!({
3926 ".zed.toml": r#"collaborators = ["user_b"]"#,
3927 "one.rs": "const ONE: usize = 1;",
3928 "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3929 }),
3930 )
3931 .await;
3932 let project_a = cx_a.update(|cx| {
3933 Project::local(
3934 client_a.clone(),
3935 client_a.user_store.clone(),
3936 lang_registry.clone(),
3937 fs.clone(),
3938 cx,
3939 )
3940 });
3941 let (worktree_a, _) = project_a
3942 .update(cx_a, |p, cx| {
3943 p.find_or_create_local_worktree("/dir", true, cx)
3944 })
3945 .await
3946 .unwrap();
3947 worktree_a
3948 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3949 .await;
3950 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3951 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3952 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3953
3954 // Join the worktree as client B.
3955 let project_b = Project::remote(
3956 project_id,
3957 client_b.clone(),
3958 client_b.user_store.clone(),
3959 lang_registry.clone(),
3960 fs.clone(),
3961 &mut cx_b.to_async(),
3962 )
3963 .await
3964 .unwrap();
3965 let mut params = cx_b.update(WorkspaceParams::test);
3966 params.languages = lang_registry.clone();
3967 params.client = client_b.client.clone();
3968 params.user_store = client_b.user_store.clone();
3969 params.project = project_b;
3970
3971 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
3972 let editor_b = workspace_b
3973 .update(cx_b, |workspace, cx| {
3974 workspace.open_path((worktree_id, "one.rs"), cx)
3975 })
3976 .await
3977 .unwrap()
3978 .downcast::<Editor>()
3979 .unwrap();
3980 let fake_language_server = fake_language_servers.next().await.unwrap();
3981
3982 // Move cursor to a location that can be renamed.
3983 let prepare_rename = editor_b.update(cx_b, |editor, cx| {
3984 editor.select_ranges([7..7], None, cx);
3985 editor.rename(&Rename, cx).unwrap()
3986 });
3987
3988 fake_language_server
3989 .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
3990 assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3991 assert_eq!(params.position, lsp::Position::new(0, 7));
3992 Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3993 lsp::Position::new(0, 6),
3994 lsp::Position::new(0, 9),
3995 ))))
3996 })
3997 .next()
3998 .await
3999 .unwrap();
4000 prepare_rename.await.unwrap();
4001 editor_b.update(cx_b, |editor, cx| {
4002 let rename = editor.pending_rename().unwrap();
4003 let buffer = editor.buffer().read(cx).snapshot(cx);
4004 assert_eq!(
4005 rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
4006 6..9
4007 );
4008 rename.editor.update(cx, |rename_editor, cx| {
4009 rename_editor.buffer().update(cx, |rename_buffer, cx| {
4010 rename_buffer.edit([0..3], "THREE", cx);
4011 });
4012 });
4013 });
4014
4015 let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
4016 Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
4017 });
4018 fake_language_server
4019 .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
4020 assert_eq!(
4021 params.text_document_position.text_document.uri.as_str(),
4022 "file:///dir/one.rs"
4023 );
4024 assert_eq!(
4025 params.text_document_position.position,
4026 lsp::Position::new(0, 6)
4027 );
4028 assert_eq!(params.new_name, "THREE");
4029 Ok(Some(lsp::WorkspaceEdit {
4030 changes: Some(
4031 [
4032 (
4033 lsp::Url::from_file_path("/dir/one.rs").unwrap(),
4034 vec![lsp::TextEdit::new(
4035 lsp::Range::new(
4036 lsp::Position::new(0, 6),
4037 lsp::Position::new(0, 9),
4038 ),
4039 "THREE".to_string(),
4040 )],
4041 ),
4042 (
4043 lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4044 vec![
4045 lsp::TextEdit::new(
4046 lsp::Range::new(
4047 lsp::Position::new(0, 24),
4048 lsp::Position::new(0, 27),
4049 ),
4050 "THREE".to_string(),
4051 ),
4052 lsp::TextEdit::new(
4053 lsp::Range::new(
4054 lsp::Position::new(0, 35),
4055 lsp::Position::new(0, 38),
4056 ),
4057 "THREE".to_string(),
4058 ),
4059 ],
4060 ),
4061 ]
4062 .into_iter()
4063 .collect(),
4064 ),
4065 ..Default::default()
4066 }))
4067 })
4068 .next()
4069 .await
4070 .unwrap();
4071 confirm_rename.await.unwrap();
4072
4073 let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4074 workspace
4075 .active_item(cx)
4076 .unwrap()
4077 .downcast::<Editor>()
4078 .unwrap()
4079 });
4080 rename_editor.update(cx_b, |editor, cx| {
4081 assert_eq!(
4082 editor.text(cx),
4083 "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
4084 );
4085 editor.undo(&Undo, cx);
4086 assert_eq!(
4087 editor.text(cx),
4088 "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
4089 );
4090 editor.redo(&Redo, cx);
4091 assert_eq!(
4092 editor.text(cx),
4093 "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
4094 );
4095 });
4096
4097 // Ensure temporary rename edits cannot be undone/redone.
4098 editor_b.update(cx_b, |editor, cx| {
4099 editor.undo(&Undo, cx);
4100 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4101 editor.undo(&Undo, cx);
4102 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4103 editor.redo(&Redo, cx);
4104 assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4105 })
4106 }
4107
4108 #[gpui::test(iterations = 10)]
4109 async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4110 cx_a.foreground().forbid_parking();
4111
4112 // Connect to a server as 2 clients.
4113 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4114 let client_a = server.create_client(cx_a, "user_a").await;
4115 let client_b = server.create_client(cx_b, "user_b").await;
4116
4117 // Create an org that includes these 2 users.
4118 let db = &server.app_state.db;
4119 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4120 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4121 .await
4122 .unwrap();
4123 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4124 .await
4125 .unwrap();
4126
4127 // Create a channel that includes all the users.
4128 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4129 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4130 .await
4131 .unwrap();
4132 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4133 .await
4134 .unwrap();
4135 db.create_channel_message(
4136 channel_id,
4137 client_b.current_user_id(&cx_b),
4138 "hello A, it's B.",
4139 OffsetDateTime::now_utc(),
4140 1,
4141 )
4142 .await
4143 .unwrap();
4144
4145 let channels_a = cx_a
4146 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4147 channels_a
4148 .condition(cx_a, |list, _| list.available_channels().is_some())
4149 .await;
4150 channels_a.read_with(cx_a, |list, _| {
4151 assert_eq!(
4152 list.available_channels().unwrap(),
4153 &[ChannelDetails {
4154 id: channel_id.to_proto(),
4155 name: "test-channel".to_string()
4156 }]
4157 )
4158 });
4159 let channel_a = channels_a.update(cx_a, |this, cx| {
4160 this.get_channel(channel_id.to_proto(), cx).unwrap()
4161 });
4162 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4163 channel_a
4164 .condition(&cx_a, |channel, _| {
4165 channel_messages(channel)
4166 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4167 })
4168 .await;
4169
4170 let channels_b = cx_b
4171 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4172 channels_b
4173 .condition(cx_b, |list, _| list.available_channels().is_some())
4174 .await;
4175 channels_b.read_with(cx_b, |list, _| {
4176 assert_eq!(
4177 list.available_channels().unwrap(),
4178 &[ChannelDetails {
4179 id: channel_id.to_proto(),
4180 name: "test-channel".to_string()
4181 }]
4182 )
4183 });
4184
4185 let channel_b = channels_b.update(cx_b, |this, cx| {
4186 this.get_channel(channel_id.to_proto(), cx).unwrap()
4187 });
4188 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4189 channel_b
4190 .condition(&cx_b, |channel, _| {
4191 channel_messages(channel)
4192 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4193 })
4194 .await;
4195
4196 channel_a
4197 .update(cx_a, |channel, cx| {
4198 channel
4199 .send_message("oh, hi B.".to_string(), cx)
4200 .unwrap()
4201 .detach();
4202 let task = channel.send_message("sup".to_string(), cx).unwrap();
4203 assert_eq!(
4204 channel_messages(channel),
4205 &[
4206 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4207 ("user_a".to_string(), "oh, hi B.".to_string(), true),
4208 ("user_a".to_string(), "sup".to_string(), true)
4209 ]
4210 );
4211 task
4212 })
4213 .await
4214 .unwrap();
4215
4216 channel_b
4217 .condition(&cx_b, |channel, _| {
4218 channel_messages(channel)
4219 == [
4220 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4221 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4222 ("user_a".to_string(), "sup".to_string(), false),
4223 ]
4224 })
4225 .await;
4226
4227 assert_eq!(
4228 server
4229 .state()
4230 .await
4231 .channel(channel_id)
4232 .unwrap()
4233 .connection_ids
4234 .len(),
4235 2
4236 );
4237 cx_b.update(|_| drop(channel_b));
4238 server
4239 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4240 .await;
4241
4242 cx_a.update(|_| drop(channel_a));
4243 server
4244 .condition(|state| state.channel(channel_id).is_none())
4245 .await;
4246 }
4247
4248 #[gpui::test(iterations = 10)]
4249 async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4250 cx_a.foreground().forbid_parking();
4251
4252 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4253 let client_a = server.create_client(cx_a, "user_a").await;
4254
4255 let db = &server.app_state.db;
4256 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4257 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4258 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4259 .await
4260 .unwrap();
4261 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4262 .await
4263 .unwrap();
4264
4265 let channels_a = cx_a
4266 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4267 channels_a
4268 .condition(cx_a, |list, _| list.available_channels().is_some())
4269 .await;
4270 let channel_a = channels_a.update(cx_a, |this, cx| {
4271 this.get_channel(channel_id.to_proto(), cx).unwrap()
4272 });
4273
4274 // Messages aren't allowed to be too long.
4275 channel_a
4276 .update(cx_a, |channel, cx| {
4277 let long_body = "this is long.\n".repeat(1024);
4278 channel.send_message(long_body, cx).unwrap()
4279 })
4280 .await
4281 .unwrap_err();
4282
4283 // Messages aren't allowed to be blank.
4284 channel_a.update(cx_a, |channel, cx| {
4285 channel.send_message(String::new(), cx).unwrap_err()
4286 });
4287
4288 // Leading and trailing whitespace are trimmed.
4289 channel_a
4290 .update(cx_a, |channel, cx| {
4291 channel
4292 .send_message("\n surrounded by whitespace \n".to_string(), cx)
4293 .unwrap()
4294 })
4295 .await
4296 .unwrap();
4297 assert_eq!(
4298 db.get_channel_messages(channel_id, 10, None)
4299 .await
4300 .unwrap()
4301 .iter()
4302 .map(|m| &m.body)
4303 .collect::<Vec<_>>(),
4304 &["surrounded by whitespace"]
4305 );
4306 }
4307
4308 #[gpui::test(iterations = 10)]
4309 async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4310 cx_a.foreground().forbid_parking();
4311
4312 // Connect to a server as 2 clients.
4313 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4314 let client_a = server.create_client(cx_a, "user_a").await;
4315 let client_b = server.create_client(cx_b, "user_b").await;
4316 let mut status_b = client_b.status();
4317
4318 // Create an org that includes these 2 users.
4319 let db = &server.app_state.db;
4320 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4321 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4322 .await
4323 .unwrap();
4324 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4325 .await
4326 .unwrap();
4327
4328 // Create a channel that includes all the users.
4329 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4330 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4331 .await
4332 .unwrap();
4333 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4334 .await
4335 .unwrap();
4336 db.create_channel_message(
4337 channel_id,
4338 client_b.current_user_id(&cx_b),
4339 "hello A, it's B.",
4340 OffsetDateTime::now_utc(),
4341 2,
4342 )
4343 .await
4344 .unwrap();
4345
4346 let channels_a = cx_a
4347 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4348 channels_a
4349 .condition(cx_a, |list, _| list.available_channels().is_some())
4350 .await;
4351
4352 channels_a.read_with(cx_a, |list, _| {
4353 assert_eq!(
4354 list.available_channels().unwrap(),
4355 &[ChannelDetails {
4356 id: channel_id.to_proto(),
4357 name: "test-channel".to_string()
4358 }]
4359 )
4360 });
4361 let channel_a = channels_a.update(cx_a, |this, cx| {
4362 this.get_channel(channel_id.to_proto(), cx).unwrap()
4363 });
4364 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4365 channel_a
4366 .condition(&cx_a, |channel, _| {
4367 channel_messages(channel)
4368 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4369 })
4370 .await;
4371
4372 let channels_b = cx_b
4373 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4374 channels_b
4375 .condition(cx_b, |list, _| list.available_channels().is_some())
4376 .await;
4377 channels_b.read_with(cx_b, |list, _| {
4378 assert_eq!(
4379 list.available_channels().unwrap(),
4380 &[ChannelDetails {
4381 id: channel_id.to_proto(),
4382 name: "test-channel".to_string()
4383 }]
4384 )
4385 });
4386
4387 let channel_b = channels_b.update(cx_b, |this, cx| {
4388 this.get_channel(channel_id.to_proto(), cx).unwrap()
4389 });
4390 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4391 channel_b
4392 .condition(&cx_b, |channel, _| {
4393 channel_messages(channel)
4394 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4395 })
4396 .await;
4397
4398 // Disconnect client B, ensuring we can still access its cached channel data.
4399 server.forbid_connections();
4400 server.disconnect_client(client_b.current_user_id(&cx_b));
4401 cx_b.foreground().advance_clock(Duration::from_secs(3));
4402 while !matches!(
4403 status_b.next().await,
4404 Some(client::Status::ReconnectionError { .. })
4405 ) {}
4406
4407 channels_b.read_with(cx_b, |channels, _| {
4408 assert_eq!(
4409 channels.available_channels().unwrap(),
4410 [ChannelDetails {
4411 id: channel_id.to_proto(),
4412 name: "test-channel".to_string()
4413 }]
4414 )
4415 });
4416 channel_b.read_with(cx_b, |channel, _| {
4417 assert_eq!(
4418 channel_messages(channel),
4419 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4420 )
4421 });
4422
4423 // Send a message from client B while it is disconnected.
4424 channel_b
4425 .update(cx_b, |channel, cx| {
4426 let task = channel
4427 .send_message("can you see this?".to_string(), cx)
4428 .unwrap();
4429 assert_eq!(
4430 channel_messages(channel),
4431 &[
4432 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4433 ("user_b".to_string(), "can you see this?".to_string(), true)
4434 ]
4435 );
4436 task
4437 })
4438 .await
4439 .unwrap_err();
4440
4441 // Send a message from client A while B is disconnected.
4442 channel_a
4443 .update(cx_a, |channel, cx| {
4444 channel
4445 .send_message("oh, hi B.".to_string(), cx)
4446 .unwrap()
4447 .detach();
4448 let task = channel.send_message("sup".to_string(), cx).unwrap();
4449 assert_eq!(
4450 channel_messages(channel),
4451 &[
4452 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4453 ("user_a".to_string(), "oh, hi B.".to_string(), true),
4454 ("user_a".to_string(), "sup".to_string(), true)
4455 ]
4456 );
4457 task
4458 })
4459 .await
4460 .unwrap();
4461
4462 // Give client B a chance to reconnect.
4463 server.allow_connections();
4464 cx_b.foreground().advance_clock(Duration::from_secs(10));
4465
4466 // Verify that B sees the new messages upon reconnection, as well as the message client B
4467 // sent while offline.
4468 channel_b
4469 .condition(&cx_b, |channel, _| {
4470 channel_messages(channel)
4471 == [
4472 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4473 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4474 ("user_a".to_string(), "sup".to_string(), false),
4475 ("user_b".to_string(), "can you see this?".to_string(), false),
4476 ]
4477 })
4478 .await;
4479
4480 // Ensure client A and B can communicate normally after reconnection.
4481 channel_a
4482 .update(cx_a, |channel, cx| {
4483 channel.send_message("you online?".to_string(), cx).unwrap()
4484 })
4485 .await
4486 .unwrap();
4487 channel_b
4488 .condition(&cx_b, |channel, _| {
4489 channel_messages(channel)
4490 == [
4491 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4492 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4493 ("user_a".to_string(), "sup".to_string(), false),
4494 ("user_b".to_string(), "can you see this?".to_string(), false),
4495 ("user_a".to_string(), "you online?".to_string(), false),
4496 ]
4497 })
4498 .await;
4499
4500 channel_b
4501 .update(cx_b, |channel, cx| {
4502 channel.send_message("yep".to_string(), cx).unwrap()
4503 })
4504 .await
4505 .unwrap();
4506 channel_a
4507 .condition(&cx_a, |channel, _| {
4508 channel_messages(channel)
4509 == [
4510 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4511 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4512 ("user_a".to_string(), "sup".to_string(), false),
4513 ("user_b".to_string(), "can you see this?".to_string(), false),
4514 ("user_a".to_string(), "you online?".to_string(), false),
4515 ("user_b".to_string(), "yep".to_string(), false),
4516 ]
4517 })
4518 .await;
4519 }
4520
4521 #[gpui::test(iterations = 10)]
4522 async fn test_contacts(
4523 cx_a: &mut TestAppContext,
4524 cx_b: &mut TestAppContext,
4525 cx_c: &mut TestAppContext,
4526 ) {
4527 cx_a.foreground().forbid_parking();
4528 let lang_registry = Arc::new(LanguageRegistry::test());
4529 let fs = FakeFs::new(cx_a.background());
4530
4531 // Connect to a server as 3 clients.
4532 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4533 let client_a = server.create_client(cx_a, "user_a").await;
4534 let client_b = server.create_client(cx_b, "user_b").await;
4535 let client_c = server.create_client(cx_c, "user_c").await;
4536
4537 // Share a worktree as client A.
4538 fs.insert_tree(
4539 "/a",
4540 json!({
4541 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4542 }),
4543 )
4544 .await;
4545
4546 let project_a = cx_a.update(|cx| {
4547 Project::local(
4548 client_a.clone(),
4549 client_a.user_store.clone(),
4550 lang_registry.clone(),
4551 fs.clone(),
4552 cx,
4553 )
4554 });
4555 let (worktree_a, _) = project_a
4556 .update(cx_a, |p, cx| {
4557 p.find_or_create_local_worktree("/a", true, cx)
4558 })
4559 .await
4560 .unwrap();
4561 worktree_a
4562 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4563 .await;
4564
4565 client_a
4566 .user_store
4567 .condition(&cx_a, |user_store, _| {
4568 contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4569 })
4570 .await;
4571 client_b
4572 .user_store
4573 .condition(&cx_b, |user_store, _| {
4574 contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4575 })
4576 .await;
4577 client_c
4578 .user_store
4579 .condition(&cx_c, |user_store, _| {
4580 contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4581 })
4582 .await;
4583
4584 let project_id = project_a
4585 .update(cx_a, |project, _| project.next_remote_id())
4586 .await;
4587 project_a
4588 .update(cx_a, |project, cx| project.share(cx))
4589 .await
4590 .unwrap();
4591 client_a
4592 .user_store
4593 .condition(&cx_a, |user_store, _| {
4594 contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4595 })
4596 .await;
4597 client_b
4598 .user_store
4599 .condition(&cx_b, |user_store, _| {
4600 contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4601 })
4602 .await;
4603 client_c
4604 .user_store
4605 .condition(&cx_c, |user_store, _| {
4606 contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4607 })
4608 .await;
4609
4610 let _project_b = Project::remote(
4611 project_id,
4612 client_b.clone(),
4613 client_b.user_store.clone(),
4614 lang_registry.clone(),
4615 fs.clone(),
4616 &mut cx_b.to_async(),
4617 )
4618 .await
4619 .unwrap();
4620
4621 client_a
4622 .user_store
4623 .condition(&cx_a, |user_store, _| {
4624 contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4625 })
4626 .await;
4627 client_b
4628 .user_store
4629 .condition(&cx_b, |user_store, _| {
4630 contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4631 })
4632 .await;
4633 client_c
4634 .user_store
4635 .condition(&cx_c, |user_store, _| {
4636 contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4637 })
4638 .await;
4639
4640 project_a
4641 .condition(&cx_a, |project, _| {
4642 project.collaborators().contains_key(&client_b.peer_id)
4643 })
4644 .await;
4645
4646 cx_a.update(move |_| drop(project_a));
4647 client_a
4648 .user_store
4649 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4650 .await;
4651 client_b
4652 .user_store
4653 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4654 .await;
4655 client_c
4656 .user_store
4657 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4658 .await;
4659
4660 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, bool, Vec<&str>)>)> {
4661 user_store
4662 .contacts()
4663 .iter()
4664 .map(|contact| {
4665 let worktrees = contact
4666 .projects
4667 .iter()
4668 .map(|p| {
4669 (
4670 p.worktree_root_names[0].as_str(),
4671 p.is_shared,
4672 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4673 )
4674 })
4675 .collect();
4676 (contact.user.github_login.as_str(), worktrees)
4677 })
4678 .collect()
4679 }
4680 }
4681
4682 #[gpui::test(iterations = 10)]
4683 async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4684 cx_a.foreground().forbid_parking();
4685 let fs = FakeFs::new(cx_a.background());
4686
4687 // 2 clients connect to a server.
4688 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4689 let mut client_a = server.create_client(cx_a, "user_a").await;
4690 let mut client_b = server.create_client(cx_b, "user_b").await;
4691 cx_a.update(editor::init);
4692 cx_b.update(editor::init);
4693
4694 // Client A shares a project.
4695 fs.insert_tree(
4696 "/a",
4697 json!({
4698 ".zed.toml": r#"collaborators = ["user_b"]"#,
4699 "1.txt": "one",
4700 "2.txt": "two",
4701 "3.txt": "three",
4702 }),
4703 )
4704 .await;
4705 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
4706 project_a
4707 .update(cx_a, |project, cx| project.share(cx))
4708 .await
4709 .unwrap();
4710
4711 // Client B joins the project.
4712 let project_b = client_b
4713 .build_remote_project(
4714 project_a
4715 .read_with(cx_a, |project, _| project.remote_id())
4716 .unwrap(),
4717 cx_b,
4718 )
4719 .await;
4720
4721 // Client A opens some editors.
4722 let workspace_a = client_a.build_workspace(&project_a, cx_a);
4723 let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
4724 let editor_a1 = workspace_a
4725 .update(cx_a, |workspace, cx| {
4726 workspace.open_path((worktree_id, "1.txt"), cx)
4727 })
4728 .await
4729 .unwrap()
4730 .downcast::<Editor>()
4731 .unwrap();
4732 let editor_a2 = workspace_a
4733 .update(cx_a, |workspace, cx| {
4734 workspace.open_path((worktree_id, "2.txt"), cx)
4735 })
4736 .await
4737 .unwrap()
4738 .downcast::<Editor>()
4739 .unwrap();
4740
4741 // Client B opens an editor.
4742 let workspace_b = client_b.build_workspace(&project_b, cx_b);
4743 let editor_b1 = workspace_b
4744 .update(cx_b, |workspace, cx| {
4745 workspace.open_path((worktree_id, "1.txt"), cx)
4746 })
4747 .await
4748 .unwrap()
4749 .downcast::<Editor>()
4750 .unwrap();
4751
4752 let client_a_id = project_b.read_with(cx_b, |project, _| {
4753 project.collaborators().values().next().unwrap().peer_id
4754 });
4755 let client_b_id = project_a.read_with(cx_a, |project, _| {
4756 project.collaborators().values().next().unwrap().peer_id
4757 });
4758
4759 // When client B starts following client A, all visible view states are replicated to client B.
4760 editor_a1.update(cx_a, |editor, cx| editor.select_ranges([0..1], None, cx));
4761 editor_a2.update(cx_a, |editor, cx| editor.select_ranges([2..3], None, cx));
4762 workspace_b
4763 .update(cx_b, |workspace, cx| {
4764 workspace
4765 .toggle_follow(&ToggleFollow(client_a_id), cx)
4766 .unwrap()
4767 })
4768 .await
4769 .unwrap();
4770 let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
4771 workspace
4772 .active_item(cx)
4773 .unwrap()
4774 .downcast::<Editor>()
4775 .unwrap()
4776 });
4777 assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
4778 assert_eq!(
4779 editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
4780 Some((worktree_id, "2.txt").into())
4781 );
4782 assert_eq!(
4783 editor_b2.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
4784 vec![2..3]
4785 );
4786 assert_eq!(
4787 editor_b1.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
4788 vec![0..1]
4789 );
4790
4791 // When client A activates a different editor, client B does so as well.
4792 workspace_a.update(cx_a, |workspace, cx| {
4793 workspace.activate_item(&editor_a1, cx)
4794 });
4795 workspace_b
4796 .condition(cx_b, |workspace, cx| {
4797 workspace.active_item(cx).unwrap().id() == editor_b1.id()
4798 })
4799 .await;
4800
4801 // When client A navigates back and forth, client B does so as well.
4802 workspace_a
4803 .update(cx_a, |workspace, cx| {
4804 workspace::Pane::go_back(workspace, None, cx)
4805 })
4806 .await;
4807 workspace_b
4808 .condition(cx_b, |workspace, cx| {
4809 workspace.active_item(cx).unwrap().id() == editor_b2.id()
4810 })
4811 .await;
4812
4813 workspace_a
4814 .update(cx_a, |workspace, cx| {
4815 workspace::Pane::go_forward(workspace, None, cx)
4816 })
4817 .await;
4818 workspace_b
4819 .condition(cx_b, |workspace, cx| {
4820 workspace.active_item(cx).unwrap().id() == editor_b1.id()
4821 })
4822 .await;
4823
4824 // Changes to client A's editor are reflected on client B.
4825 editor_a1.update(cx_a, |editor, cx| {
4826 editor.select_ranges([1..1, 2..2], None, cx);
4827 });
4828 editor_b1
4829 .condition(cx_b, |editor, cx| {
4830 editor.selected_ranges(cx) == vec![1..1, 2..2]
4831 })
4832 .await;
4833
4834 editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
4835 editor_b1
4836 .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
4837 .await;
4838
4839 editor_a1.update(cx_a, |editor, cx| {
4840 editor.select_ranges([3..3], None, cx);
4841 editor.set_scroll_position(vec2f(0., 100.), cx);
4842 });
4843 editor_b1
4844 .condition(cx_b, |editor, cx| editor.selected_ranges(cx) == vec![3..3])
4845 .await;
4846
4847 // After unfollowing, client B stops receiving updates from client A.
4848 workspace_b.update(cx_b, |workspace, cx| {
4849 workspace.unfollow(&workspace.active_pane().clone(), cx)
4850 });
4851 workspace_a.update(cx_a, |workspace, cx| {
4852 workspace.activate_item(&editor_a2, cx)
4853 });
4854 cx_a.foreground().run_until_parked();
4855 assert_eq!(
4856 workspace_b.read_with(cx_b, |workspace, cx| workspace
4857 .active_item(cx)
4858 .unwrap()
4859 .id()),
4860 editor_b1.id()
4861 );
4862
4863 // Client A starts following client B.
4864 workspace_a
4865 .update(cx_a, |workspace, cx| {
4866 workspace
4867 .toggle_follow(&ToggleFollow(client_b_id), cx)
4868 .unwrap()
4869 })
4870 .await
4871 .unwrap();
4872 assert_eq!(
4873 workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
4874 Some(client_b_id)
4875 );
4876 assert_eq!(
4877 workspace_a.read_with(cx_a, |workspace, cx| workspace
4878 .active_item(cx)
4879 .unwrap()
4880 .id()),
4881 editor_a1.id()
4882 );
4883
4884 // Following interrupts when client B disconnects.
4885 client_b.disconnect(&cx_b.to_async()).unwrap();
4886 cx_a.foreground().run_until_parked();
4887 assert_eq!(
4888 workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
4889 None
4890 );
4891 }
4892
4893 #[gpui::test(iterations = 10)]
4894 async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4895 cx_a.foreground().forbid_parking();
4896 let fs = FakeFs::new(cx_a.background());
4897
4898 // 2 clients connect to a server.
4899 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4900 let mut client_a = server.create_client(cx_a, "user_a").await;
4901 let mut client_b = server.create_client(cx_b, "user_b").await;
4902 cx_a.update(editor::init);
4903 cx_b.update(editor::init);
4904
4905 // Client A shares a project.
4906 fs.insert_tree(
4907 "/a",
4908 json!({
4909 ".zed.toml": r#"collaborators = ["user_b"]"#,
4910 "1.txt": "one",
4911 "2.txt": "two",
4912 "3.txt": "three",
4913 "4.txt": "four",
4914 }),
4915 )
4916 .await;
4917 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
4918 project_a
4919 .update(cx_a, |project, cx| project.share(cx))
4920 .await
4921 .unwrap();
4922
4923 // Client B joins the project.
4924 let project_b = client_b
4925 .build_remote_project(
4926 project_a
4927 .read_with(cx_a, |project, _| project.remote_id())
4928 .unwrap(),
4929 cx_b,
4930 )
4931 .await;
4932
4933 // Client A opens some editors.
4934 let workspace_a = client_a.build_workspace(&project_a, cx_a);
4935 let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
4936 let _editor_a1 = workspace_a
4937 .update(cx_a, |workspace, cx| {
4938 workspace.open_path((worktree_id, "1.txt"), cx)
4939 })
4940 .await
4941 .unwrap()
4942 .downcast::<Editor>()
4943 .unwrap();
4944
4945 // Client B opens an editor.
4946 let workspace_b = client_b.build_workspace(&project_b, cx_b);
4947 let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
4948 let _editor_b1 = workspace_b
4949 .update(cx_b, |workspace, cx| {
4950 workspace.open_path((worktree_id, "2.txt"), cx)
4951 })
4952 .await
4953 .unwrap()
4954 .downcast::<Editor>()
4955 .unwrap();
4956
4957 // Clients A and B follow each other in split panes
4958 workspace_a
4959 .update(cx_a, |workspace, cx| {
4960 workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
4961 assert_ne!(*workspace.active_pane(), pane_a1);
4962 let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
4963 workspace
4964 .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
4965 .unwrap()
4966 })
4967 .await
4968 .unwrap();
4969 workspace_b
4970 .update(cx_b, |workspace, cx| {
4971 workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
4972 assert_ne!(*workspace.active_pane(), pane_b1);
4973 let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
4974 workspace
4975 .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
4976 .unwrap()
4977 })
4978 .await
4979 .unwrap();
4980
4981 workspace_a
4982 .update(cx_a, |workspace, cx| {
4983 workspace.activate_next_pane(cx);
4984 assert_eq!(*workspace.active_pane(), pane_a1);
4985 workspace.open_path((worktree_id, "3.txt"), cx)
4986 })
4987 .await
4988 .unwrap();
4989 workspace_b
4990 .update(cx_b, |workspace, cx| {
4991 workspace.activate_next_pane(cx);
4992 assert_eq!(*workspace.active_pane(), pane_b1);
4993 workspace.open_path((worktree_id, "4.txt"), cx)
4994 })
4995 .await
4996 .unwrap();
4997 cx_a.foreground().run_until_parked();
4998
4999 // Ensure leader updates don't change the active pane of followers
5000 workspace_a.read_with(cx_a, |workspace, _| {
5001 assert_eq!(*workspace.active_pane(), pane_a1);
5002 });
5003 workspace_b.read_with(cx_b, |workspace, _| {
5004 assert_eq!(*workspace.active_pane(), pane_b1);
5005 });
5006
5007 // Ensure peers following each other doesn't cause an infinite loop.
5008 assert_eq!(
5009 workspace_a.read_with(cx_a, |workspace, cx| workspace
5010 .active_item(cx)
5011 .unwrap()
5012 .project_path(cx)),
5013 Some((worktree_id, "3.txt").into())
5014 );
5015 workspace_a.update(cx_a, |workspace, cx| {
5016 assert_eq!(
5017 workspace.active_item(cx).unwrap().project_path(cx),
5018 Some((worktree_id, "3.txt").into())
5019 );
5020 workspace.activate_next_pane(cx);
5021 assert_eq!(
5022 workspace.active_item(cx).unwrap().project_path(cx),
5023 Some((worktree_id, "4.txt").into())
5024 );
5025 });
5026 workspace_b.update(cx_b, |workspace, cx| {
5027 assert_eq!(
5028 workspace.active_item(cx).unwrap().project_path(cx),
5029 Some((worktree_id, "4.txt").into())
5030 );
5031 workspace.activate_next_pane(cx);
5032 assert_eq!(
5033 workspace.active_item(cx).unwrap().project_path(cx),
5034 Some((worktree_id, "3.txt").into())
5035 );
5036 });
5037 }
5038
5039 #[gpui::test(iterations = 10)]
5040 async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5041 cx_a.foreground().forbid_parking();
5042 let fs = FakeFs::new(cx_a.background());
5043
5044 // 2 clients connect to a server.
5045 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5046 let mut client_a = server.create_client(cx_a, "user_a").await;
5047 let mut client_b = server.create_client(cx_b, "user_b").await;
5048 cx_a.update(editor::init);
5049 cx_b.update(editor::init);
5050
5051 // Client A shares a project.
5052 fs.insert_tree(
5053 "/a",
5054 json!({
5055 ".zed.toml": r#"collaborators = ["user_b"]"#,
5056 "1.txt": "one",
5057 "2.txt": "two",
5058 "3.txt": "three",
5059 }),
5060 )
5061 .await;
5062 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5063 project_a
5064 .update(cx_a, |project, cx| project.share(cx))
5065 .await
5066 .unwrap();
5067
5068 // Client B joins the project.
5069 let project_b = client_b
5070 .build_remote_project(
5071 project_a
5072 .read_with(cx_a, |project, _| project.remote_id())
5073 .unwrap(),
5074 cx_b,
5075 )
5076 .await;
5077
5078 // Client A opens some editors.
5079 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5080 let _editor_a1 = workspace_a
5081 .update(cx_a, |workspace, cx| {
5082 workspace.open_path((worktree_id, "1.txt"), cx)
5083 })
5084 .await
5085 .unwrap()
5086 .downcast::<Editor>()
5087 .unwrap();
5088
5089 // Client B starts following client A.
5090 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5091 let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5092 let leader_id = project_b.read_with(cx_b, |project, _| {
5093 project.collaborators().values().next().unwrap().peer_id
5094 });
5095 workspace_b
5096 .update(cx_b, |workspace, cx| {
5097 workspace
5098 .toggle_follow(&ToggleFollow(leader_id), cx)
5099 .unwrap()
5100 })
5101 .await
5102 .unwrap();
5103 assert_eq!(
5104 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5105 Some(leader_id)
5106 );
5107 let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5108 workspace
5109 .active_item(cx)
5110 .unwrap()
5111 .downcast::<Editor>()
5112 .unwrap()
5113 });
5114
5115 // When client B moves, it automatically stops following client A.
5116 editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
5117 assert_eq!(
5118 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5119 None
5120 );
5121
5122 workspace_b
5123 .update(cx_b, |workspace, cx| {
5124 workspace
5125 .toggle_follow(&ToggleFollow(leader_id), cx)
5126 .unwrap()
5127 })
5128 .await
5129 .unwrap();
5130 assert_eq!(
5131 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5132 Some(leader_id)
5133 );
5134
5135 // When client B edits, it automatically stops following client A.
5136 editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
5137 assert_eq!(
5138 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5139 None
5140 );
5141
5142 workspace_b
5143 .update(cx_b, |workspace, cx| {
5144 workspace
5145 .toggle_follow(&ToggleFollow(leader_id), cx)
5146 .unwrap()
5147 })
5148 .await
5149 .unwrap();
5150 assert_eq!(
5151 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5152 Some(leader_id)
5153 );
5154
5155 // When client B scrolls, it automatically stops following client A.
5156 editor_b2.update(cx_b, |editor, cx| {
5157 editor.set_scroll_position(vec2f(0., 3.), cx)
5158 });
5159 assert_eq!(
5160 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5161 None
5162 );
5163
5164 workspace_b
5165 .update(cx_b, |workspace, cx| {
5166 workspace
5167 .toggle_follow(&ToggleFollow(leader_id), cx)
5168 .unwrap()
5169 })
5170 .await
5171 .unwrap();
5172 assert_eq!(
5173 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5174 Some(leader_id)
5175 );
5176
5177 // When client B activates a different pane, it continues following client A in the original pane.
5178 workspace_b.update(cx_b, |workspace, cx| {
5179 workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
5180 });
5181 assert_eq!(
5182 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5183 Some(leader_id)
5184 );
5185
5186 workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
5187 assert_eq!(
5188 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5189 Some(leader_id)
5190 );
5191
5192 // When client B activates a different item in the original pane, it automatically stops following client A.
5193 workspace_b
5194 .update(cx_b, |workspace, cx| {
5195 workspace.open_path((worktree_id, "2.txt"), cx)
5196 })
5197 .await
5198 .unwrap();
5199 assert_eq!(
5200 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5201 None
5202 );
5203 }
5204
5205 #[gpui::test(iterations = 100)]
5206 async fn test_random_collaboration(
5207 cx: &mut TestAppContext,
5208 deterministic: Arc<Deterministic>,
5209 rng: StdRng,
5210 ) {
5211 cx.foreground().forbid_parking();
5212 let max_peers = env::var("MAX_PEERS")
5213 .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
5214 .unwrap_or(5);
5215 assert!(max_peers <= 5);
5216
5217 let max_operations = env::var("OPERATIONS")
5218 .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
5219 .unwrap_or(10);
5220
5221 let rng = Arc::new(Mutex::new(rng));
5222
5223 let guest_lang_registry = Arc::new(LanguageRegistry::test());
5224 let host_language_registry = Arc::new(LanguageRegistry::test());
5225
5226 let fs = FakeFs::new(cx.background());
5227 fs.insert_tree(
5228 "/_collab",
5229 json!({
5230 ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4"]"#
5231 }),
5232 )
5233 .await;
5234
5235 let mut server = TestServer::start(cx.foreground(), cx.background()).await;
5236 let mut clients = Vec::new();
5237 let mut user_ids = Vec::new();
5238 let mut op_start_signals = Vec::new();
5239 let files = Arc::new(Mutex::new(Vec::new()));
5240
5241 let mut next_entity_id = 100000;
5242 let mut host_cx = TestAppContext::new(
5243 cx.foreground_platform(),
5244 cx.platform(),
5245 deterministic.build_foreground(next_entity_id),
5246 deterministic.build_background(),
5247 cx.font_cache(),
5248 cx.leak_detector(),
5249 next_entity_id,
5250 );
5251 let host = server.create_client(&mut host_cx, "host").await;
5252 let host_project = host_cx.update(|cx| {
5253 Project::local(
5254 host.client.clone(),
5255 host.user_store.clone(),
5256 host_language_registry.clone(),
5257 fs.clone(),
5258 cx,
5259 )
5260 });
5261 let host_project_id = host_project
5262 .update(&mut host_cx, |p, _| p.next_remote_id())
5263 .await;
5264
5265 let (collab_worktree, _) = host_project
5266 .update(&mut host_cx, |project, cx| {
5267 project.find_or_create_local_worktree("/_collab", true, cx)
5268 })
5269 .await
5270 .unwrap();
5271 collab_worktree
5272 .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
5273 .await;
5274 host_project
5275 .update(&mut host_cx, |project, cx| project.share(cx))
5276 .await
5277 .unwrap();
5278
5279 // Set up fake language servers.
5280 let mut language = Language::new(
5281 LanguageConfig {
5282 name: "Rust".into(),
5283 path_suffixes: vec!["rs".to_string()],
5284 ..Default::default()
5285 },
5286 None,
5287 );
5288 let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
5289 name: "the-fake-language-server",
5290 capabilities: lsp::LanguageServer::full_capabilities(),
5291 initializer: Some(Box::new({
5292 let rng = rng.clone();
5293 let files = files.clone();
5294 let project = host_project.downgrade();
5295 move |fake_server: &mut FakeLanguageServer| {
5296 fake_server.handle_request::<lsp::request::Completion, _, _>(
5297 |_, _| async move {
5298 Ok(Some(lsp::CompletionResponse::Array(vec![
5299 lsp::CompletionItem {
5300 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
5301 range: lsp::Range::new(
5302 lsp::Position::new(0, 0),
5303 lsp::Position::new(0, 0),
5304 ),
5305 new_text: "the-new-text".to_string(),
5306 })),
5307 ..Default::default()
5308 },
5309 ])))
5310 },
5311 );
5312
5313 fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
5314 |_, _| async move {
5315 Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
5316 lsp::CodeAction {
5317 title: "the-code-action".to_string(),
5318 ..Default::default()
5319 },
5320 )]))
5321 },
5322 );
5323
5324 fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
5325 |params, _| async move {
5326 Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
5327 params.position,
5328 params.position,
5329 ))))
5330 },
5331 );
5332
5333 fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
5334 let files = files.clone();
5335 let rng = rng.clone();
5336 move |_, _| {
5337 let files = files.clone();
5338 let rng = rng.clone();
5339 async move {
5340 let files = files.lock();
5341 let mut rng = rng.lock();
5342 let count = rng.gen_range::<usize, _>(1..3);
5343 let files = (0..count)
5344 .map(|_| files.choose(&mut *rng).unwrap())
5345 .collect::<Vec<_>>();
5346 log::info!("LSP: Returning definitions in files {:?}", &files);
5347 Ok(Some(lsp::GotoDefinitionResponse::Array(
5348 files
5349 .into_iter()
5350 .map(|file| lsp::Location {
5351 uri: lsp::Url::from_file_path(file).unwrap(),
5352 range: Default::default(),
5353 })
5354 .collect(),
5355 )))
5356 }
5357 }
5358 });
5359
5360 fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
5361 let rng = rng.clone();
5362 let project = project.clone();
5363 move |params, mut cx| {
5364 let highlights = if let Some(project) = project.upgrade(&cx) {
5365 project.update(&mut cx, |project, cx| {
5366 let path = params
5367 .text_document_position_params
5368 .text_document
5369 .uri
5370 .to_file_path()
5371 .unwrap();
5372 let (worktree, relative_path) =
5373 project.find_local_worktree(&path, cx)?;
5374 let project_path =
5375 ProjectPath::from((worktree.read(cx).id(), relative_path));
5376 let buffer =
5377 project.get_open_buffer(&project_path, cx)?.read(cx);
5378
5379 let mut highlights = Vec::new();
5380 let highlight_count = rng.lock().gen_range(1..=5);
5381 let mut prev_end = 0;
5382 for _ in 0..highlight_count {
5383 let range =
5384 buffer.random_byte_range(prev_end, &mut *rng.lock());
5385
5386 highlights.push(lsp::DocumentHighlight {
5387 range: range_to_lsp(range.to_point_utf16(buffer)),
5388 kind: Some(lsp::DocumentHighlightKind::READ),
5389 });
5390 prev_end = range.end;
5391 }
5392 Some(highlights)
5393 })
5394 } else {
5395 None
5396 };
5397 async move { Ok(highlights) }
5398 }
5399 });
5400 }
5401 })),
5402 ..Default::default()
5403 });
5404 host_language_registry.add(Arc::new(language));
5405
5406 let op_start_signal = futures::channel::mpsc::unbounded();
5407 user_ids.push(host.current_user_id(&host_cx));
5408 op_start_signals.push(op_start_signal.0);
5409 clients.push(host_cx.foreground().spawn(host.simulate_host(
5410 host_project,
5411 files,
5412 op_start_signal.1,
5413 rng.clone(),
5414 host_cx,
5415 )));
5416
5417 let disconnect_host_at = if rng.lock().gen_bool(0.2) {
5418 rng.lock().gen_range(0..max_operations)
5419 } else {
5420 max_operations
5421 };
5422 let mut available_guests = vec![
5423 "guest-1".to_string(),
5424 "guest-2".to_string(),
5425 "guest-3".to_string(),
5426 "guest-4".to_string(),
5427 ];
5428 let mut operations = 0;
5429 while operations < max_operations {
5430 if operations == disconnect_host_at {
5431 server.disconnect_client(user_ids[0]);
5432 cx.foreground().advance_clock(RECEIVE_TIMEOUT);
5433 drop(op_start_signals);
5434 let mut clients = futures::future::join_all(clients).await;
5435 cx.foreground().run_until_parked();
5436
5437 let (host, mut host_cx, host_err) = clients.remove(0);
5438 if let Some(host_err) = host_err {
5439 log::error!("host error - {}", host_err);
5440 }
5441 host.project
5442 .as_ref()
5443 .unwrap()
5444 .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
5445 for (guest, mut guest_cx, guest_err) in clients {
5446 if let Some(guest_err) = guest_err {
5447 log::error!("{} error - {}", guest.username, guest_err);
5448 }
5449 let contacts = server
5450 .store
5451 .read()
5452 .await
5453 .contacts_for_user(guest.current_user_id(&guest_cx));
5454 assert!(!contacts
5455 .iter()
5456 .flat_map(|contact| &contact.projects)
5457 .any(|project| project.id == host_project_id));
5458 guest
5459 .project
5460 .as_ref()
5461 .unwrap()
5462 .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
5463 guest_cx.update(|_| drop(guest));
5464 }
5465 host_cx.update(|_| drop(host));
5466
5467 return;
5468 }
5469
5470 let distribution = rng.lock().gen_range(0..100);
5471 match distribution {
5472 0..=19 if !available_guests.is_empty() => {
5473 let guest_ix = rng.lock().gen_range(0..available_guests.len());
5474 let guest_username = available_guests.remove(guest_ix);
5475 log::info!("Adding new connection for {}", guest_username);
5476 next_entity_id += 100000;
5477 let mut guest_cx = TestAppContext::new(
5478 cx.foreground_platform(),
5479 cx.platform(),
5480 deterministic.build_foreground(next_entity_id),
5481 deterministic.build_background(),
5482 cx.font_cache(),
5483 cx.leak_detector(),
5484 next_entity_id,
5485 );
5486 let guest = server.create_client(&mut guest_cx, &guest_username).await;
5487 let guest_project = Project::remote(
5488 host_project_id,
5489 guest.client.clone(),
5490 guest.user_store.clone(),
5491 guest_lang_registry.clone(),
5492 FakeFs::new(cx.background()),
5493 &mut guest_cx.to_async(),
5494 )
5495 .await
5496 .unwrap();
5497 let op_start_signal = futures::channel::mpsc::unbounded();
5498 user_ids.push(guest.current_user_id(&guest_cx));
5499 op_start_signals.push(op_start_signal.0);
5500 clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
5501 guest_username.clone(),
5502 guest_project,
5503 op_start_signal.1,
5504 rng.clone(),
5505 guest_cx,
5506 )));
5507
5508 log::info!("Added connection for {}", guest_username);
5509 operations += 1;
5510 }
5511 20..=29 if clients.len() > 1 => {
5512 log::info!("Removing guest");
5513 let guest_ix = rng.lock().gen_range(1..clients.len());
5514 let removed_guest_id = user_ids.remove(guest_ix);
5515 let guest = clients.remove(guest_ix);
5516 op_start_signals.remove(guest_ix);
5517 server.disconnect_client(removed_guest_id);
5518 cx.foreground().advance_clock(RECEIVE_TIMEOUT);
5519 let (guest, mut guest_cx, guest_err) = guest.await;
5520 if let Some(guest_err) = guest_err {
5521 log::error!("{} error - {}", guest.username, guest_err);
5522 }
5523 guest
5524 .project
5525 .as_ref()
5526 .unwrap()
5527 .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
5528 for user_id in &user_ids {
5529 for contact in server.store.read().await.contacts_for_user(*user_id) {
5530 assert_ne!(
5531 contact.user_id, removed_guest_id.0 as u64,
5532 "removed guest is still a contact of another peer"
5533 );
5534 for project in contact.projects {
5535 for project_guest_id in project.guests {
5536 assert_ne!(
5537 project_guest_id, removed_guest_id.0 as u64,
5538 "removed guest appears as still participating on a project"
5539 );
5540 }
5541 }
5542 }
5543 }
5544
5545 log::info!("{} removed", guest.username);
5546 available_guests.push(guest.username.clone());
5547 guest_cx.update(|_| drop(guest));
5548
5549 operations += 1;
5550 }
5551 _ => {
5552 while operations < max_operations && rng.lock().gen_bool(0.7) {
5553 op_start_signals
5554 .choose(&mut *rng.lock())
5555 .unwrap()
5556 .unbounded_send(())
5557 .unwrap();
5558 operations += 1;
5559 }
5560
5561 if rng.lock().gen_bool(0.8) {
5562 cx.foreground().run_until_parked();
5563 }
5564 }
5565 }
5566 }
5567
5568 drop(op_start_signals);
5569 let mut clients = futures::future::join_all(clients).await;
5570 cx.foreground().run_until_parked();
5571
5572 let (host_client, mut host_cx, host_err) = clients.remove(0);
5573 if let Some(host_err) = host_err {
5574 panic!("host error - {}", host_err);
5575 }
5576 let host_project = host_client.project.as_ref().unwrap();
5577 let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
5578 project
5579 .worktrees(cx)
5580 .map(|worktree| {
5581 let snapshot = worktree.read(cx).snapshot();
5582 (snapshot.id(), snapshot)
5583 })
5584 .collect::<BTreeMap<_, _>>()
5585 });
5586
5587 host_client
5588 .project
5589 .as_ref()
5590 .unwrap()
5591 .read_with(&host_cx, |project, cx| project.check_invariants(cx));
5592
5593 for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
5594 if let Some(guest_err) = guest_err {
5595 panic!("{} error - {}", guest_client.username, guest_err);
5596 }
5597 let worktree_snapshots =
5598 guest_client
5599 .project
5600 .as_ref()
5601 .unwrap()
5602 .read_with(&guest_cx, |project, cx| {
5603 project
5604 .worktrees(cx)
5605 .map(|worktree| {
5606 let worktree = worktree.read(cx);
5607 (worktree.id(), worktree.snapshot())
5608 })
5609 .collect::<BTreeMap<_, _>>()
5610 });
5611
5612 assert_eq!(
5613 worktree_snapshots.keys().collect::<Vec<_>>(),
5614 host_worktree_snapshots.keys().collect::<Vec<_>>(),
5615 "{} has different worktrees than the host",
5616 guest_client.username
5617 );
5618 for (id, host_snapshot) in &host_worktree_snapshots {
5619 let guest_snapshot = &worktree_snapshots[id];
5620 assert_eq!(
5621 guest_snapshot.root_name(),
5622 host_snapshot.root_name(),
5623 "{} has different root name than the host for worktree {}",
5624 guest_client.username,
5625 id
5626 );
5627 assert_eq!(
5628 guest_snapshot.entries(false).collect::<Vec<_>>(),
5629 host_snapshot.entries(false).collect::<Vec<_>>(),
5630 "{} has different snapshot than the host for worktree {}",
5631 guest_client.username,
5632 id
5633 );
5634 }
5635
5636 guest_client
5637 .project
5638 .as_ref()
5639 .unwrap()
5640 .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
5641
5642 for guest_buffer in &guest_client.buffers {
5643 let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
5644 let host_buffer = host_project.read_with(&host_cx, |project, cx| {
5645 project.buffer_for_id(buffer_id, cx).expect(&format!(
5646 "host does not have buffer for guest:{}, peer:{}, id:{}",
5647 guest_client.username, guest_client.peer_id, buffer_id
5648 ))
5649 });
5650 let path = host_buffer
5651 .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
5652
5653 assert_eq!(
5654 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
5655 0,
5656 "{}, buffer {}, path {:?} has deferred operations",
5657 guest_client.username,
5658 buffer_id,
5659 path,
5660 );
5661 assert_eq!(
5662 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
5663 host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
5664 "{}, buffer {}, path {:?}, differs from the host's buffer",
5665 guest_client.username,
5666 buffer_id,
5667 path
5668 );
5669 }
5670
5671 guest_cx.update(|_| drop(guest_client));
5672 }
5673
5674 host_cx.update(|_| drop(host_client));
5675 }
5676
5677 struct TestServer {
5678 peer: Arc<Peer>,
5679 app_state: Arc<AppState>,
5680 server: Arc<Server>,
5681 foreground: Rc<executor::Foreground>,
5682 notifications: mpsc::UnboundedReceiver<()>,
5683 connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
5684 forbid_connections: Arc<AtomicBool>,
5685 _test_db: TestDb,
5686 }
5687
5688 impl TestServer {
5689 async fn start(
5690 foreground: Rc<executor::Foreground>,
5691 background: Arc<executor::Background>,
5692 ) -> Self {
5693 let test_db = TestDb::fake(background);
5694 let app_state = Self::build_app_state(&test_db).await;
5695 let peer = Peer::new();
5696 let notifications = mpsc::unbounded();
5697 let server = Server::new(app_state.clone(), Some(notifications.0));
5698 Self {
5699 peer,
5700 app_state,
5701 server,
5702 foreground,
5703 notifications: notifications.1,
5704 connection_killers: Default::default(),
5705 forbid_connections: Default::default(),
5706 _test_db: test_db,
5707 }
5708 }
5709
5710 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
5711 cx.update(|cx| {
5712 let settings = Settings::test(cx);
5713 cx.set_global(settings);
5714 });
5715
5716 let http = FakeHttpClient::with_404_response();
5717 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
5718 let client_name = name.to_string();
5719 let mut client = Client::new(http.clone());
5720 let server = self.server.clone();
5721 let connection_killers = self.connection_killers.clone();
5722 let forbid_connections = self.forbid_connections.clone();
5723 let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
5724
5725 Arc::get_mut(&mut client)
5726 .unwrap()
5727 .override_authenticate(move |cx| {
5728 cx.spawn(|_| async move {
5729 let access_token = "the-token".to_string();
5730 Ok(Credentials {
5731 user_id: user_id.0 as u64,
5732 access_token,
5733 })
5734 })
5735 })
5736 .override_establish_connection(move |credentials, cx| {
5737 assert_eq!(credentials.user_id, user_id.0 as u64);
5738 assert_eq!(credentials.access_token, "the-token");
5739
5740 let server = server.clone();
5741 let connection_killers = connection_killers.clone();
5742 let forbid_connections = forbid_connections.clone();
5743 let client_name = client_name.clone();
5744 let connection_id_tx = connection_id_tx.clone();
5745 cx.spawn(move |cx| async move {
5746 if forbid_connections.load(SeqCst) {
5747 Err(EstablishConnectionError::other(anyhow!(
5748 "server is forbidding connections"
5749 )))
5750 } else {
5751 let (client_conn, server_conn, killed) =
5752 Connection::in_memory(cx.background());
5753 connection_killers.lock().insert(user_id, killed);
5754 cx.background()
5755 .spawn(server.handle_connection(
5756 server_conn,
5757 client_name,
5758 user_id,
5759 Some(connection_id_tx),
5760 cx.background(),
5761 ))
5762 .detach();
5763 Ok(client_conn)
5764 }
5765 })
5766 });
5767
5768 client
5769 .authenticate_and_connect(false, &cx.to_async())
5770 .await
5771 .unwrap();
5772
5773 Channel::init(&client);
5774 Project::init(&client);
5775 cx.update(|cx| {
5776 workspace::init(&client, cx);
5777 });
5778
5779 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
5780 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
5781
5782 let client = TestClient {
5783 client,
5784 peer_id,
5785 username: name.to_string(),
5786 user_store,
5787 language_registry: Arc::new(LanguageRegistry::test()),
5788 project: Default::default(),
5789 buffers: Default::default(),
5790 };
5791 client.wait_for_current_user(cx).await;
5792 client
5793 }
5794
5795 fn disconnect_client(&self, user_id: UserId) {
5796 self.connection_killers
5797 .lock()
5798 .remove(&user_id)
5799 .unwrap()
5800 .store(true, SeqCst);
5801 }
5802
5803 fn forbid_connections(&self) {
5804 self.forbid_connections.store(true, SeqCst);
5805 }
5806
5807 fn allow_connections(&self) {
5808 self.forbid_connections.store(false, SeqCst);
5809 }
5810
5811 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
5812 Arc::new(AppState {
5813 db: test_db.db().clone(),
5814 api_token: Default::default(),
5815 })
5816 }
5817
5818 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
5819 self.server.store.read().await
5820 }
5821
5822 async fn condition<F>(&mut self, mut predicate: F)
5823 where
5824 F: FnMut(&Store) -> bool,
5825 {
5826 assert!(
5827 self.foreground.parking_forbidden(),
5828 "you must call forbid_parking to use server conditions so we don't block indefinitely"
5829 );
5830 while !(predicate)(&*self.server.store.read().await) {
5831 self.foreground.start_waiting();
5832 self.notifications.next().await;
5833 self.foreground.finish_waiting();
5834 }
5835 }
5836 }
5837
5838 impl Deref for TestServer {
5839 type Target = Server;
5840
5841 fn deref(&self) -> &Self::Target {
5842 &self.server
5843 }
5844 }
5845
5846 impl Drop for TestServer {
5847 fn drop(&mut self) {
5848 self.peer.reset();
5849 }
5850 }
5851
5852 struct TestClient {
5853 client: Arc<Client>,
5854 username: String,
5855 pub peer_id: PeerId,
5856 pub user_store: ModelHandle<UserStore>,
5857 language_registry: Arc<LanguageRegistry>,
5858 project: Option<ModelHandle<Project>>,
5859 buffers: HashSet<ModelHandle<language::Buffer>>,
5860 }
5861
5862 impl Deref for TestClient {
5863 type Target = Arc<Client>;
5864
5865 fn deref(&self) -> &Self::Target {
5866 &self.client
5867 }
5868 }
5869
5870 impl TestClient {
5871 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
5872 UserId::from_proto(
5873 self.user_store
5874 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
5875 )
5876 }
5877
5878 async fn wait_for_current_user(&self, cx: &TestAppContext) {
5879 let mut authed_user = self
5880 .user_store
5881 .read_with(cx, |user_store, _| user_store.watch_current_user());
5882 while authed_user.next().await.unwrap().is_none() {}
5883 }
5884
5885 async fn build_local_project(
5886 &mut self,
5887 fs: Arc<FakeFs>,
5888 root_path: impl AsRef<Path>,
5889 cx: &mut TestAppContext,
5890 ) -> (ModelHandle<Project>, WorktreeId) {
5891 let project = cx.update(|cx| {
5892 Project::local(
5893 self.client.clone(),
5894 self.user_store.clone(),
5895 self.language_registry.clone(),
5896 fs,
5897 cx,
5898 )
5899 });
5900 self.project = Some(project.clone());
5901 let (worktree, _) = project
5902 .update(cx, |p, cx| {
5903 p.find_or_create_local_worktree(root_path, true, cx)
5904 })
5905 .await
5906 .unwrap();
5907 worktree
5908 .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
5909 .await;
5910 project
5911 .update(cx, |project, _| project.next_remote_id())
5912 .await;
5913 (project, worktree.read_with(cx, |tree, _| tree.id()))
5914 }
5915
5916 async fn build_remote_project(
5917 &mut self,
5918 project_id: u64,
5919 cx: &mut TestAppContext,
5920 ) -> ModelHandle<Project> {
5921 let project = Project::remote(
5922 project_id,
5923 self.client.clone(),
5924 self.user_store.clone(),
5925 self.language_registry.clone(),
5926 FakeFs::new(cx.background()),
5927 &mut cx.to_async(),
5928 )
5929 .await
5930 .unwrap();
5931 self.project = Some(project.clone());
5932 project
5933 }
5934
5935 fn build_workspace(
5936 &self,
5937 project: &ModelHandle<Project>,
5938 cx: &mut TestAppContext,
5939 ) -> ViewHandle<Workspace> {
5940 let (window_id, _) = cx.add_window(|_| EmptyView);
5941 cx.add_view(window_id, |cx| {
5942 let fs = project.read(cx).fs().clone();
5943 Workspace::new(
5944 &WorkspaceParams {
5945 fs,
5946 project: project.clone(),
5947 user_store: self.user_store.clone(),
5948 languages: self.language_registry.clone(),
5949 themes: ThemeRegistry::new((), cx.font_cache().clone()),
5950 channel_list: cx.add_model(|cx| {
5951 ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
5952 }),
5953 client: self.client.clone(),
5954 },
5955 cx,
5956 )
5957 })
5958 }
5959
5960 async fn simulate_host(
5961 mut self,
5962 project: ModelHandle<Project>,
5963 files: Arc<Mutex<Vec<PathBuf>>>,
5964 op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
5965 rng: Arc<Mutex<StdRng>>,
5966 mut cx: TestAppContext,
5967 ) -> (Self, TestAppContext, Option<anyhow::Error>) {
5968 async fn simulate_host_internal(
5969 client: &mut TestClient,
5970 project: ModelHandle<Project>,
5971 files: Arc<Mutex<Vec<PathBuf>>>,
5972 mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
5973 rng: Arc<Mutex<StdRng>>,
5974 cx: &mut TestAppContext,
5975 ) -> anyhow::Result<()> {
5976 let fs = project.read_with(cx, |project, _| project.fs().clone());
5977
5978 while op_start_signal.next().await.is_some() {
5979 let distribution = rng.lock().gen_range::<usize, _>(0..100);
5980 match distribution {
5981 0..=20 if !files.lock().is_empty() => {
5982 let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
5983 let mut path = path.as_path();
5984 while let Some(parent_path) = path.parent() {
5985 path = parent_path;
5986 if rng.lock().gen() {
5987 break;
5988 }
5989 }
5990
5991 log::info!("Host: find/create local worktree {:?}", path);
5992 let find_or_create_worktree = project.update(cx, |project, cx| {
5993 project.find_or_create_local_worktree(path, true, cx)
5994 });
5995 if rng.lock().gen() {
5996 cx.background().spawn(find_or_create_worktree).detach();
5997 } else {
5998 find_or_create_worktree.await?;
5999 }
6000 }
6001 10..=80 if !files.lock().is_empty() => {
6002 let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6003 let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
6004 let (worktree, path) = project
6005 .update(cx, |project, cx| {
6006 project.find_or_create_local_worktree(
6007 file.clone(),
6008 true,
6009 cx,
6010 )
6011 })
6012 .await?;
6013 let project_path =
6014 worktree.read_with(cx, |worktree, _| (worktree.id(), path));
6015 log::info!(
6016 "Host: opening path {:?}, worktree {}, relative_path {:?}",
6017 file,
6018 project_path.0,
6019 project_path.1
6020 );
6021 let buffer = project
6022 .update(cx, |project, cx| project.open_buffer(project_path, cx))
6023 .await
6024 .unwrap();
6025 client.buffers.insert(buffer.clone());
6026 buffer
6027 } else {
6028 client
6029 .buffers
6030 .iter()
6031 .choose(&mut *rng.lock())
6032 .unwrap()
6033 .clone()
6034 };
6035
6036 if rng.lock().gen_bool(0.1) {
6037 cx.update(|cx| {
6038 log::info!(
6039 "Host: dropping buffer {:?}",
6040 buffer.read(cx).file().unwrap().full_path(cx)
6041 );
6042 client.buffers.remove(&buffer);
6043 drop(buffer);
6044 });
6045 } else {
6046 buffer.update(cx, |buffer, cx| {
6047 log::info!(
6048 "Host: updating buffer {:?} ({})",
6049 buffer.file().unwrap().full_path(cx),
6050 buffer.remote_id()
6051 );
6052
6053 if rng.lock().gen_bool(0.7) {
6054 buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6055 } else {
6056 buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6057 }
6058 });
6059 }
6060 }
6061 _ => loop {
6062 let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
6063 let mut path = PathBuf::new();
6064 path.push("/");
6065 for _ in 0..path_component_count {
6066 let letter = rng.lock().gen_range(b'a'..=b'z');
6067 path.push(std::str::from_utf8(&[letter]).unwrap());
6068 }
6069 path.set_extension("rs");
6070 let parent_path = path.parent().unwrap();
6071
6072 log::info!("Host: creating file {:?}", path,);
6073
6074 if fs.create_dir(&parent_path).await.is_ok()
6075 && fs.create_file(&path, Default::default()).await.is_ok()
6076 {
6077 files.lock().push(path);
6078 break;
6079 } else {
6080 log::info!("Host: cannot create file");
6081 }
6082 },
6083 }
6084
6085 cx.background().simulate_random_delay().await;
6086 }
6087
6088 Ok(())
6089 }
6090
6091 let result = simulate_host_internal(
6092 &mut self,
6093 project.clone(),
6094 files,
6095 op_start_signal,
6096 rng,
6097 &mut cx,
6098 )
6099 .await;
6100 log::info!("Host done");
6101 self.project = Some(project);
6102 (self, cx, result.err())
6103 }
6104
6105 pub async fn simulate_guest(
6106 mut self,
6107 guest_username: String,
6108 project: ModelHandle<Project>,
6109 op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6110 rng: Arc<Mutex<StdRng>>,
6111 mut cx: TestAppContext,
6112 ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6113 async fn simulate_guest_internal(
6114 client: &mut TestClient,
6115 guest_username: &str,
6116 project: ModelHandle<Project>,
6117 mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6118 rng: Arc<Mutex<StdRng>>,
6119 cx: &mut TestAppContext,
6120 ) -> anyhow::Result<()> {
6121 while op_start_signal.next().await.is_some() {
6122 let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6123 let worktree = if let Some(worktree) =
6124 project.read_with(cx, |project, cx| {
6125 project
6126 .worktrees(&cx)
6127 .filter(|worktree| {
6128 let worktree = worktree.read(cx);
6129 worktree.is_visible()
6130 && worktree.entries(false).any(|e| e.is_file())
6131 })
6132 .choose(&mut *rng.lock())
6133 }) {
6134 worktree
6135 } else {
6136 cx.background().simulate_random_delay().await;
6137 continue;
6138 };
6139
6140 let (worktree_root_name, project_path) =
6141 worktree.read_with(cx, |worktree, _| {
6142 let entry = worktree
6143 .entries(false)
6144 .filter(|e| e.is_file())
6145 .choose(&mut *rng.lock())
6146 .unwrap();
6147 (
6148 worktree.root_name().to_string(),
6149 (worktree.id(), entry.path.clone()),
6150 )
6151 });
6152 log::info!(
6153 "{}: opening path {:?} in worktree {} ({})",
6154 guest_username,
6155 project_path.1,
6156 project_path.0,
6157 worktree_root_name,
6158 );
6159 let buffer = project
6160 .update(cx, |project, cx| {
6161 project.open_buffer(project_path.clone(), cx)
6162 })
6163 .await?;
6164 log::info!(
6165 "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
6166 guest_username,
6167 project_path.1,
6168 project_path.0,
6169 worktree_root_name,
6170 buffer.read_with(cx, |buffer, _| buffer.remote_id())
6171 );
6172 client.buffers.insert(buffer.clone());
6173 buffer
6174 } else {
6175 client
6176 .buffers
6177 .iter()
6178 .choose(&mut *rng.lock())
6179 .unwrap()
6180 .clone()
6181 };
6182
6183 let choice = rng.lock().gen_range(0..100);
6184 match choice {
6185 0..=9 => {
6186 cx.update(|cx| {
6187 log::info!(
6188 "{}: dropping buffer {:?}",
6189 guest_username,
6190 buffer.read(cx).file().unwrap().full_path(cx)
6191 );
6192 client.buffers.remove(&buffer);
6193 drop(buffer);
6194 });
6195 }
6196 10..=19 => {
6197 let completions = project.update(cx, |project, cx| {
6198 log::info!(
6199 "{}: requesting completions for buffer {} ({:?})",
6200 guest_username,
6201 buffer.read(cx).remote_id(),
6202 buffer.read(cx).file().unwrap().full_path(cx)
6203 );
6204 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6205 project.completions(&buffer, offset, cx)
6206 });
6207 let completions = cx.background().spawn(async move {
6208 completions
6209 .await
6210 .map_err(|err| anyhow!("completions request failed: {:?}", err))
6211 });
6212 if rng.lock().gen_bool(0.3) {
6213 log::info!("{}: detaching completions request", guest_username);
6214 cx.update(|cx| completions.detach_and_log_err(cx));
6215 } else {
6216 completions.await?;
6217 }
6218 }
6219 20..=29 => {
6220 let code_actions = project.update(cx, |project, cx| {
6221 log::info!(
6222 "{}: requesting code actions for buffer {} ({:?})",
6223 guest_username,
6224 buffer.read(cx).remote_id(),
6225 buffer.read(cx).file().unwrap().full_path(cx)
6226 );
6227 let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
6228 project.code_actions(&buffer, range, cx)
6229 });
6230 let code_actions = cx.background().spawn(async move {
6231 code_actions.await.map_err(|err| {
6232 anyhow!("code actions request failed: {:?}", err)
6233 })
6234 });
6235 if rng.lock().gen_bool(0.3) {
6236 log::info!("{}: detaching code actions request", guest_username);
6237 cx.update(|cx| code_actions.detach_and_log_err(cx));
6238 } else {
6239 code_actions.await?;
6240 }
6241 }
6242 30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
6243 let (requested_version, save) = buffer.update(cx, |buffer, cx| {
6244 log::info!(
6245 "{}: saving buffer {} ({:?})",
6246 guest_username,
6247 buffer.remote_id(),
6248 buffer.file().unwrap().full_path(cx)
6249 );
6250 (buffer.version(), buffer.save(cx))
6251 });
6252 let save = cx.background().spawn(async move {
6253 let (saved_version, _) = save
6254 .await
6255 .map_err(|err| anyhow!("save request failed: {:?}", err))?;
6256 assert!(saved_version.observed_all(&requested_version));
6257 Ok::<_, anyhow::Error>(())
6258 });
6259 if rng.lock().gen_bool(0.3) {
6260 log::info!("{}: detaching save request", guest_username);
6261 cx.update(|cx| save.detach_and_log_err(cx));
6262 } else {
6263 save.await?;
6264 }
6265 }
6266 40..=44 => {
6267 let prepare_rename = project.update(cx, |project, cx| {
6268 log::info!(
6269 "{}: preparing rename for buffer {} ({:?})",
6270 guest_username,
6271 buffer.read(cx).remote_id(),
6272 buffer.read(cx).file().unwrap().full_path(cx)
6273 );
6274 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6275 project.prepare_rename(buffer, offset, cx)
6276 });
6277 let prepare_rename = cx.background().spawn(async move {
6278 prepare_rename.await.map_err(|err| {
6279 anyhow!("prepare rename request failed: {:?}", err)
6280 })
6281 });
6282 if rng.lock().gen_bool(0.3) {
6283 log::info!("{}: detaching prepare rename request", guest_username);
6284 cx.update(|cx| prepare_rename.detach_and_log_err(cx));
6285 } else {
6286 prepare_rename.await?;
6287 }
6288 }
6289 45..=49 => {
6290 let definitions = project.update(cx, |project, cx| {
6291 log::info!(
6292 "{}: requesting definitions for buffer {} ({:?})",
6293 guest_username,
6294 buffer.read(cx).remote_id(),
6295 buffer.read(cx).file().unwrap().full_path(cx)
6296 );
6297 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6298 project.definition(&buffer, offset, cx)
6299 });
6300 let definitions = cx.background().spawn(async move {
6301 definitions
6302 .await
6303 .map_err(|err| anyhow!("definitions request failed: {:?}", err))
6304 });
6305 if rng.lock().gen_bool(0.3) {
6306 log::info!("{}: detaching definitions request", guest_username);
6307 cx.update(|cx| definitions.detach_and_log_err(cx));
6308 } else {
6309 client
6310 .buffers
6311 .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
6312 }
6313 }
6314 50..=54 => {
6315 let highlights = project.update(cx, |project, cx| {
6316 log::info!(
6317 "{}: requesting highlights for buffer {} ({:?})",
6318 guest_username,
6319 buffer.read(cx).remote_id(),
6320 buffer.read(cx).file().unwrap().full_path(cx)
6321 );
6322 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6323 project.document_highlights(&buffer, offset, cx)
6324 });
6325 let highlights = cx.background().spawn(async move {
6326 highlights
6327 .await
6328 .map_err(|err| anyhow!("highlights request failed: {:?}", err))
6329 });
6330 if rng.lock().gen_bool(0.3) {
6331 log::info!("{}: detaching highlights request", guest_username);
6332 cx.update(|cx| highlights.detach_and_log_err(cx));
6333 } else {
6334 highlights.await?;
6335 }
6336 }
6337 55..=59 => {
6338 let search = project.update(cx, |project, cx| {
6339 let query = rng.lock().gen_range('a'..='z');
6340 log::info!("{}: project-wide search {:?}", guest_username, query);
6341 project.search(SearchQuery::text(query, false, false), cx)
6342 });
6343 let search = cx.background().spawn(async move {
6344 search
6345 .await
6346 .map_err(|err| anyhow!("search request failed: {:?}", err))
6347 });
6348 if rng.lock().gen_bool(0.3) {
6349 log::info!("{}: detaching search request", guest_username);
6350 cx.update(|cx| search.detach_and_log_err(cx));
6351 } else {
6352 client.buffers.extend(search.await?.into_keys());
6353 }
6354 }
6355 _ => {
6356 buffer.update(cx, |buffer, cx| {
6357 log::info!(
6358 "{}: updating buffer {} ({:?})",
6359 guest_username,
6360 buffer.remote_id(),
6361 buffer.file().unwrap().full_path(cx)
6362 );
6363 if rng.lock().gen_bool(0.7) {
6364 buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6365 } else {
6366 buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6367 }
6368 });
6369 }
6370 }
6371 cx.background().simulate_random_delay().await;
6372 }
6373 Ok(())
6374 }
6375
6376 let result = simulate_guest_internal(
6377 &mut self,
6378 &guest_username,
6379 project.clone(),
6380 op_start_signal,
6381 rng,
6382 &mut cx,
6383 )
6384 .await;
6385 log::info!("{}: done", guest_username);
6386
6387 self.project = Some(project);
6388 (self, cx, result.err())
6389 }
6390 }
6391
6392 impl Drop for TestClient {
6393 fn drop(&mut self) {
6394 self.client.tear_down();
6395 }
6396 }
6397
6398 impl Executor for Arc<gpui::executor::Background> {
6399 type Sleep = gpui::executor::Timer;
6400
6401 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
6402 self.spawn(future).detach();
6403 }
6404
6405 fn sleep(&self, duration: Duration) -> Self::Sleep {
6406 self.as_ref().timer(duration)
6407 }
6408 }
6409
6410 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
6411 channel
6412 .messages()
6413 .cursor::<()>()
6414 .map(|m| {
6415 (
6416 m.sender.github_login.clone(),
6417 m.body.clone(),
6418 m.is_pending(),
6419 )
6420 })
6421 .collect()
6422 }
6423
6424 struct EmptyView;
6425
6426 impl gpui::Entity for EmptyView {
6427 type Event = ();
6428 }
6429
6430 impl gpui::View for EmptyView {
6431 fn ui_name() -> &'static str {
6432 "empty view"
6433 }
6434
6435 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
6436 gpui::Element::boxed(gpui::elements::Empty)
6437 }
6438 }
6439}