1mod store;
2
3use crate::{
4 auth,
5 db::{ChannelId, MessageId, UserId},
6 AppState, Result,
7};
8use anyhow::anyhow;
9use async_tungstenite::tungstenite::{
10 protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
11};
12use axum::{
13 body::Body,
14 extract::{
15 ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
16 ConnectInfo, WebSocketUpgrade,
17 },
18 headers::{Header, HeaderName},
19 http::StatusCode,
20 middleware,
21 response::{IntoResponse, Response},
22 routing::get,
23 Extension, Router, TypedHeader,
24};
25use collections::{HashMap, HashSet};
26use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
27use lazy_static::lazy_static;
28use rpc::{
29 proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
30 Connection, ConnectionId, Peer, TypedEnvelope,
31};
32use std::{
33 any::TypeId,
34 future::Future,
35 marker::PhantomData,
36 net::SocketAddr,
37 ops::{Deref, DerefMut},
38 rc::Rc,
39 sync::Arc,
40 time::Duration,
41};
42use store::{Store, Worktree};
43use time::OffsetDateTime;
44use tokio::{
45 sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
46 time::Sleep,
47};
48use tower::ServiceBuilder;
49use tracing::{info_span, instrument, Instrument};
50
51type MessageHandler =
52 Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
53
54pub struct Server {
55 peer: Arc<Peer>,
56 store: RwLock<Store>,
57 app_state: Arc<AppState>,
58 handlers: HashMap<TypeId, MessageHandler>,
59 notifications: Option<mpsc::UnboundedSender<()>>,
60}
61
62pub trait Executor: Send + Clone {
63 type Sleep: Send + Future;
64 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
65 fn sleep(&self, duration: Duration) -> Self::Sleep;
66}
67
68#[derive(Clone)]
69pub struct RealExecutor;
70
71const MESSAGE_COUNT_PER_PAGE: usize = 100;
72const MAX_MESSAGE_LEN: usize = 1024;
73
74struct StoreReadGuard<'a> {
75 guard: RwLockReadGuard<'a, Store>,
76 _not_send: PhantomData<Rc<()>>,
77}
78
79struct StoreWriteGuard<'a> {
80 guard: RwLockWriteGuard<'a, Store>,
81 _not_send: PhantomData<Rc<()>>,
82}
83
84impl Server {
85 pub fn new(
86 app_state: Arc<AppState>,
87 notifications: Option<mpsc::UnboundedSender<()>>,
88 ) -> Arc<Self> {
89 let mut server = Self {
90 peer: Peer::new(),
91 app_state,
92 store: Default::default(),
93 handlers: Default::default(),
94 notifications,
95 };
96
97 server
98 .add_request_handler(Server::ping)
99 .add_request_handler(Server::register_project)
100 .add_message_handler(Server::unregister_project)
101 .add_request_handler(Server::share_project)
102 .add_message_handler(Server::unshare_project)
103 .add_sync_request_handler(Server::join_project)
104 .add_message_handler(Server::leave_project)
105 .add_request_handler(Server::register_worktree)
106 .add_message_handler(Server::unregister_worktree)
107 .add_request_handler(Server::update_worktree)
108 .add_message_handler(Server::start_language_server)
109 .add_message_handler(Server::update_language_server)
110 .add_message_handler(Server::update_diagnostic_summary)
111 .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
112 .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
113 .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
114 .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
115 .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
116 .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
117 .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
118 .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
119 .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
120 .add_request_handler(
121 Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
122 )
123 .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
124 .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
125 .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
126 .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
127 .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
128 .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
129 .add_request_handler(Server::update_buffer)
130 .add_message_handler(Server::update_buffer_file)
131 .add_message_handler(Server::buffer_reloaded)
132 .add_message_handler(Server::buffer_saved)
133 .add_request_handler(Server::save_buffer)
134 .add_request_handler(Server::get_channels)
135 .add_request_handler(Server::get_users)
136 .add_request_handler(Server::join_channel)
137 .add_message_handler(Server::leave_channel)
138 .add_request_handler(Server::send_channel_message)
139 .add_request_handler(Server::follow)
140 .add_message_handler(Server::unfollow)
141 .add_message_handler(Server::update_followers)
142 .add_request_handler(Server::get_channel_messages);
143
144 Arc::new(server)
145 }
146
147 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
148 where
149 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
150 Fut: 'static + Send + Future<Output = Result<()>>,
151 M: EnvelopedMessage,
152 {
153 let prev_handler = self.handlers.insert(
154 TypeId::of::<M>(),
155 Box::new(move |server, envelope| {
156 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
157 let span = info_span!(
158 "handle message",
159 payload_type = envelope.payload_type_name(),
160 payload = serde_json::to_string_pretty(&envelope.payload)
161 .unwrap()
162 .as_str(),
163 );
164 let future = (handler)(server, *envelope);
165 async move {
166 if let Err(error) = future.await {
167 tracing::error!(%error, "error handling message");
168 }
169 }
170 .instrument(span)
171 .boxed()
172 }),
173 );
174 if prev_handler.is_some() {
175 panic!("registered a handler for the same message twice");
176 }
177 self
178 }
179
180 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
181 where
182 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
183 Fut: 'static + Send + Future<Output = Result<M::Response>>,
184 M: RequestMessage,
185 {
186 self.add_message_handler(move |server, envelope| {
187 let receipt = envelope.receipt();
188 let response = (handler)(server.clone(), envelope);
189 async move {
190 match response.await {
191 Ok(response) => {
192 server.peer.respond(receipt, response)?;
193 Ok(())
194 }
195 Err(error) => {
196 server.peer.respond_with_error(
197 receipt,
198 proto::Error {
199 message: error.to_string(),
200 },
201 )?;
202 Err(error)
203 }
204 }
205 }
206 })
207 }
208
209 /// Handle a request while holding a lock to the store. This is useful when we're registering
210 /// a connection but we want to respond on the connection before anybody else can send on it.
211 fn add_sync_request_handler<F, M>(&mut self, handler: F) -> &mut Self
212 where
213 F: 'static
214 + Send
215 + Sync
216 + Fn(Arc<Self>, &mut Store, TypedEnvelope<M>) -> Result<M::Response>,
217 M: RequestMessage,
218 {
219 let handler = Arc::new(handler);
220 self.add_message_handler(move |server, envelope| {
221 let receipt = envelope.receipt();
222 let handler = handler.clone();
223 async move {
224 let mut store = server.state_mut().await;
225 let response = (handler)(server.clone(), &mut *store, envelope);
226 match response {
227 Ok(response) => {
228 server.peer.respond(receipt, response)?;
229 Ok(())
230 }
231 Err(error) => {
232 server.peer.respond_with_error(
233 receipt,
234 proto::Error {
235 message: error.to_string(),
236 },
237 )?;
238 Err(error)
239 }
240 }
241 }
242 })
243 }
244
245 pub fn handle_connection<E: Executor>(
246 self: &Arc<Self>,
247 connection: Connection,
248 address: String,
249 user_id: UserId,
250 mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
251 executor: E,
252 ) -> impl Future<Output = ()> {
253 let mut this = self.clone();
254 let span = info_span!("handle connection", %user_id, %address);
255 async move {
256 let (connection_id, handle_io, mut incoming_rx) = this
257 .peer
258 .add_connection(connection, {
259 let executor = executor.clone();
260 move |duration| {
261 let timer = executor.sleep(duration);
262 async move {
263 timer.await;
264 }
265 }
266 })
267 .await;
268
269 tracing::info!(%user_id, %connection_id, %address, "connection opened");
270
271 if let Some(send_connection_id) = send_connection_id.as_mut() {
272 let _ = send_connection_id.send(connection_id).await;
273 }
274
275 {
276 let mut state = this.state_mut().await;
277 state.add_connection(connection_id, user_id);
278 this.update_contacts_for_users(&*state, &[user_id]);
279 }
280
281 let handle_io = handle_io.fuse();
282 futures::pin_mut!(handle_io);
283 loop {
284 let next_message = incoming_rx.next().fuse();
285 futures::pin_mut!(next_message);
286 futures::select_biased! {
287 result = handle_io => {
288 if let Err(error) = result {
289 tracing::error!(%error, "error handling I/O");
290 }
291 break;
292 }
293 message = next_message => {
294 if let Some(message) = message {
295 let type_name = message.payload_type_name();
296 let span = tracing::info_span!("receive message", %user_id, %connection_id, %address, type_name);
297 async {
298 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
299 let notifications = this.notifications.clone();
300 let is_background = message.is_background();
301 let handle_message = (handler)(this.clone(), message);
302 let handle_message = async move {
303 handle_message.await;
304 if let Some(mut notifications) = notifications {
305 let _ = notifications.send(()).await;
306 }
307 };
308 if is_background {
309 executor.spawn_detached(handle_message);
310 } else {
311 handle_message.await;
312 }
313 } else {
314 tracing::error!("no message handler");
315 }
316 }.instrument(span).await;
317 } else {
318 tracing::info!(%user_id, %connection_id, %address, "connection closed");
319 break;
320 }
321 }
322 }
323 }
324
325 if let Err(error) = this.sign_out(connection_id).await {
326 tracing::error!(%error, "error signing out");
327 }
328 }.instrument(span)
329 }
330
331 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
332 self.peer.disconnect(connection_id);
333 let mut state = self.state_mut().await;
334 let removed_connection = state.remove_connection(connection_id)?;
335
336 for (project_id, project) in removed_connection.hosted_projects {
337 if let Some(share) = project.share {
338 broadcast(
339 connection_id,
340 share.guests.keys().copied().collect(),
341 |conn_id| {
342 self.peer
343 .send(conn_id, proto::UnshareProject { project_id })
344 },
345 );
346 }
347 }
348
349 for (project_id, peer_ids) in removed_connection.guest_project_ids {
350 broadcast(connection_id, peer_ids, |conn_id| {
351 self.peer.send(
352 conn_id,
353 proto::RemoveProjectCollaborator {
354 project_id,
355 peer_id: connection_id.0,
356 },
357 )
358 });
359 }
360
361 self.update_contacts_for_users(&*state, removed_connection.contact_ids.iter());
362 Ok(())
363 }
364
365 async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> Result<proto::Ack> {
366 Ok(proto::Ack {})
367 }
368
369 async fn register_project(
370 self: Arc<Server>,
371 request: TypedEnvelope<proto::RegisterProject>,
372 ) -> Result<proto::RegisterProjectResponse> {
373 let project_id = {
374 let mut state = self.state_mut().await;
375 let user_id = state.user_id_for_connection(request.sender_id)?;
376 state.register_project(request.sender_id, user_id)
377 };
378 Ok(proto::RegisterProjectResponse { project_id })
379 }
380
381 async fn unregister_project(
382 self: Arc<Server>,
383 request: TypedEnvelope<proto::UnregisterProject>,
384 ) -> Result<()> {
385 let mut state = self.state_mut().await;
386 let project = state.unregister_project(request.payload.project_id, request.sender_id)?;
387 self.update_contacts_for_users(&*state, &project.authorized_user_ids());
388 Ok(())
389 }
390
391 async fn share_project(
392 self: Arc<Server>,
393 request: TypedEnvelope<proto::ShareProject>,
394 ) -> Result<proto::Ack> {
395 let mut state = self.state_mut().await;
396 let project = state.share_project(request.payload.project_id, request.sender_id)?;
397 self.update_contacts_for_users(&mut *state, &project.authorized_user_ids);
398 Ok(proto::Ack {})
399 }
400
401 async fn unshare_project(
402 self: Arc<Server>,
403 request: TypedEnvelope<proto::UnshareProject>,
404 ) -> Result<()> {
405 let project_id = request.payload.project_id;
406 let mut state = self.state_mut().await;
407 let project = state.unshare_project(project_id, request.sender_id)?;
408 broadcast(request.sender_id, project.connection_ids, |conn_id| {
409 self.peer
410 .send(conn_id, proto::UnshareProject { project_id })
411 });
412 self.update_contacts_for_users(&mut *state, &project.authorized_user_ids);
413 Ok(())
414 }
415
416 fn join_project(
417 self: Arc<Server>,
418 state: &mut Store,
419 request: TypedEnvelope<proto::JoinProject>,
420 ) -> Result<proto::JoinProjectResponse> {
421 let project_id = request.payload.project_id;
422
423 let user_id = state.user_id_for_connection(request.sender_id)?;
424 let (response, connection_ids, contact_user_ids) = state
425 .join_project(request.sender_id, user_id, project_id)
426 .and_then(|joined| {
427 let share = joined.project.share()?;
428 let peer_count = share.guests.len();
429 let mut collaborators = Vec::with_capacity(peer_count);
430 collaborators.push(proto::Collaborator {
431 peer_id: joined.project.host_connection_id.0,
432 replica_id: 0,
433 user_id: joined.project.host_user_id.to_proto(),
434 });
435 let worktrees = share
436 .worktrees
437 .iter()
438 .filter_map(|(id, shared_worktree)| {
439 let worktree = joined.project.worktrees.get(&id)?;
440 Some(proto::Worktree {
441 id: *id,
442 root_name: worktree.root_name.clone(),
443 entries: shared_worktree.entries.values().cloned().collect(),
444 diagnostic_summaries: shared_worktree
445 .diagnostic_summaries
446 .values()
447 .cloned()
448 .collect(),
449 visible: worktree.visible,
450 })
451 })
452 .collect();
453 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
454 if *peer_conn_id != request.sender_id {
455 collaborators.push(proto::Collaborator {
456 peer_id: peer_conn_id.0,
457 replica_id: *peer_replica_id as u32,
458 user_id: peer_user_id.to_proto(),
459 });
460 }
461 }
462 let response = proto::JoinProjectResponse {
463 worktrees,
464 replica_id: joined.replica_id as u32,
465 collaborators,
466 language_servers: joined.project.language_servers.clone(),
467 };
468 let connection_ids = joined.project.connection_ids();
469 let contact_user_ids = joined.project.authorized_user_ids();
470 Ok((response, connection_ids, contact_user_ids))
471 })?;
472
473 broadcast(request.sender_id, connection_ids, |conn_id| {
474 self.peer.send(
475 conn_id,
476 proto::AddProjectCollaborator {
477 project_id,
478 collaborator: Some(proto::Collaborator {
479 peer_id: request.sender_id.0,
480 replica_id: response.replica_id,
481 user_id: user_id.to_proto(),
482 }),
483 },
484 )
485 });
486 self.update_contacts_for_users(state, &contact_user_ids);
487 Ok(response)
488 }
489
490 async fn leave_project(
491 self: Arc<Server>,
492 request: TypedEnvelope<proto::LeaveProject>,
493 ) -> Result<()> {
494 let sender_id = request.sender_id;
495 let project_id = request.payload.project_id;
496 let mut state = self.state_mut().await;
497 let worktree = state.leave_project(sender_id, project_id)?;
498 broadcast(sender_id, worktree.connection_ids, |conn_id| {
499 self.peer.send(
500 conn_id,
501 proto::RemoveProjectCollaborator {
502 project_id,
503 peer_id: sender_id.0,
504 },
505 )
506 });
507 self.update_contacts_for_users(&*state, &worktree.authorized_user_ids);
508 Ok(())
509 }
510
511 async fn register_worktree(
512 self: Arc<Server>,
513 request: TypedEnvelope<proto::RegisterWorktree>,
514 ) -> Result<proto::Ack> {
515 let mut contact_user_ids = HashSet::default();
516 for github_login in &request.payload.authorized_logins {
517 let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
518 contact_user_ids.insert(contact_user_id);
519 }
520
521 let mut state = self.state_mut().await;
522 let host_user_id = state.user_id_for_connection(request.sender_id)?;
523 contact_user_ids.insert(host_user_id);
524
525 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
526 let guest_connection_ids = state
527 .read_project(request.payload.project_id, request.sender_id)?
528 .guest_connection_ids();
529 state.register_worktree(
530 request.payload.project_id,
531 request.payload.worktree_id,
532 request.sender_id,
533 Worktree {
534 authorized_user_ids: contact_user_ids.clone(),
535 root_name: request.payload.root_name.clone(),
536 visible: request.payload.visible,
537 },
538 )?;
539
540 broadcast(request.sender_id, guest_connection_ids, |connection_id| {
541 self.peer
542 .forward_send(request.sender_id, connection_id, request.payload.clone())
543 });
544 self.update_contacts_for_users(&*state, &contact_user_ids);
545 Ok(proto::Ack {})
546 }
547
548 async fn unregister_worktree(
549 self: Arc<Server>,
550 request: TypedEnvelope<proto::UnregisterWorktree>,
551 ) -> Result<()> {
552 let project_id = request.payload.project_id;
553 let worktree_id = request.payload.worktree_id;
554 let mut state = self.state_mut().await;
555 let (worktree, guest_connection_ids) =
556 state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
557 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
558 self.peer.send(
559 conn_id,
560 proto::UnregisterWorktree {
561 project_id,
562 worktree_id,
563 },
564 )
565 });
566 self.update_contacts_for_users(&*state, &worktree.authorized_user_ids);
567 Ok(())
568 }
569
570 async fn update_worktree(
571 self: Arc<Server>,
572 request: TypedEnvelope<proto::UpdateWorktree>,
573 ) -> Result<proto::Ack> {
574 let connection_ids = self.state_mut().await.update_worktree(
575 request.sender_id,
576 request.payload.project_id,
577 request.payload.worktree_id,
578 &request.payload.removed_entries,
579 &request.payload.updated_entries,
580 )?;
581
582 broadcast(request.sender_id, connection_ids, |connection_id| {
583 self.peer
584 .forward_send(request.sender_id, connection_id, request.payload.clone())
585 });
586
587 Ok(proto::Ack {})
588 }
589
590 async fn update_diagnostic_summary(
591 self: Arc<Server>,
592 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
593 ) -> Result<()> {
594 let summary = request
595 .payload
596 .summary
597 .clone()
598 .ok_or_else(|| anyhow!("invalid summary"))?;
599 let receiver_ids = self.state_mut().await.update_diagnostic_summary(
600 request.payload.project_id,
601 request.payload.worktree_id,
602 request.sender_id,
603 summary,
604 )?;
605
606 broadcast(request.sender_id, receiver_ids, |connection_id| {
607 self.peer
608 .forward_send(request.sender_id, connection_id, request.payload.clone())
609 });
610 Ok(())
611 }
612
613 async fn start_language_server(
614 self: Arc<Server>,
615 request: TypedEnvelope<proto::StartLanguageServer>,
616 ) -> Result<()> {
617 let receiver_ids = self.state_mut().await.start_language_server(
618 request.payload.project_id,
619 request.sender_id,
620 request
621 .payload
622 .server
623 .clone()
624 .ok_or_else(|| anyhow!("invalid language server"))?,
625 )?;
626 broadcast(request.sender_id, receiver_ids, |connection_id| {
627 self.peer
628 .forward_send(request.sender_id, connection_id, request.payload.clone())
629 });
630 Ok(())
631 }
632
633 async fn update_language_server(
634 self: Arc<Server>,
635 request: TypedEnvelope<proto::UpdateLanguageServer>,
636 ) -> Result<()> {
637 let receiver_ids = self
638 .state()
639 .await
640 .project_connection_ids(request.payload.project_id, request.sender_id)?;
641 broadcast(request.sender_id, receiver_ids, |connection_id| {
642 self.peer
643 .forward_send(request.sender_id, connection_id, request.payload.clone())
644 });
645 Ok(())
646 }
647
648 async fn forward_project_request<T>(
649 self: Arc<Server>,
650 request: TypedEnvelope<T>,
651 ) -> Result<T::Response>
652 where
653 T: EntityMessage + RequestMessage,
654 {
655 let host_connection_id = self
656 .state()
657 .await
658 .read_project(request.payload.remote_entity_id(), request.sender_id)?
659 .host_connection_id;
660 Ok(self
661 .peer
662 .forward_request(request.sender_id, host_connection_id, request.payload)
663 .await?)
664 }
665
666 async fn save_buffer(
667 self: Arc<Server>,
668 request: TypedEnvelope<proto::SaveBuffer>,
669 ) -> Result<proto::BufferSaved> {
670 let host = self
671 .state()
672 .await
673 .read_project(request.payload.project_id, request.sender_id)?
674 .host_connection_id;
675 let response = self
676 .peer
677 .forward_request(request.sender_id, host, request.payload.clone())
678 .await?;
679
680 let mut guests = self
681 .state()
682 .await
683 .read_project(request.payload.project_id, request.sender_id)?
684 .connection_ids();
685 guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
686 broadcast(host, guests, |conn_id| {
687 self.peer.forward_send(host, conn_id, response.clone())
688 });
689
690 Ok(response)
691 }
692
693 async fn update_buffer(
694 self: Arc<Server>,
695 request: TypedEnvelope<proto::UpdateBuffer>,
696 ) -> Result<proto::Ack> {
697 let receiver_ids = self
698 .state()
699 .await
700 .project_connection_ids(request.payload.project_id, request.sender_id)?;
701 broadcast(request.sender_id, receiver_ids, |connection_id| {
702 self.peer
703 .forward_send(request.sender_id, connection_id, request.payload.clone())
704 });
705 Ok(proto::Ack {})
706 }
707
708 async fn update_buffer_file(
709 self: Arc<Server>,
710 request: TypedEnvelope<proto::UpdateBufferFile>,
711 ) -> Result<()> {
712 let receiver_ids = self
713 .state()
714 .await
715 .project_connection_ids(request.payload.project_id, request.sender_id)?;
716 broadcast(request.sender_id, receiver_ids, |connection_id| {
717 self.peer
718 .forward_send(request.sender_id, connection_id, request.payload.clone())
719 });
720 Ok(())
721 }
722
723 async fn buffer_reloaded(
724 self: Arc<Server>,
725 request: TypedEnvelope<proto::BufferReloaded>,
726 ) -> Result<()> {
727 let receiver_ids = self
728 .state()
729 .await
730 .project_connection_ids(request.payload.project_id, request.sender_id)?;
731 broadcast(request.sender_id, receiver_ids, |connection_id| {
732 self.peer
733 .forward_send(request.sender_id, connection_id, request.payload.clone())
734 });
735 Ok(())
736 }
737
738 async fn buffer_saved(
739 self: Arc<Server>,
740 request: TypedEnvelope<proto::BufferSaved>,
741 ) -> Result<()> {
742 let receiver_ids = self
743 .state()
744 .await
745 .project_connection_ids(request.payload.project_id, request.sender_id)?;
746 broadcast(request.sender_id, receiver_ids, |connection_id| {
747 self.peer
748 .forward_send(request.sender_id, connection_id, request.payload.clone())
749 });
750 Ok(())
751 }
752
753 async fn follow(
754 self: Arc<Self>,
755 request: TypedEnvelope<proto::Follow>,
756 ) -> Result<proto::FollowResponse> {
757 let leader_id = ConnectionId(request.payload.leader_id);
758 let follower_id = request.sender_id;
759 if !self
760 .state()
761 .await
762 .project_connection_ids(request.payload.project_id, follower_id)?
763 .contains(&leader_id)
764 {
765 Err(anyhow!("no such peer"))?;
766 }
767 let mut response = self
768 .peer
769 .forward_request(request.sender_id, leader_id, request.payload)
770 .await?;
771 response
772 .views
773 .retain(|view| view.leader_id != Some(follower_id.0));
774 Ok(response)
775 }
776
777 async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
778 let leader_id = ConnectionId(request.payload.leader_id);
779 if !self
780 .state()
781 .await
782 .project_connection_ids(request.payload.project_id, request.sender_id)?
783 .contains(&leader_id)
784 {
785 Err(anyhow!("no such peer"))?;
786 }
787 self.peer
788 .forward_send(request.sender_id, leader_id, request.payload)?;
789 Ok(())
790 }
791
792 async fn update_followers(
793 self: Arc<Self>,
794 request: TypedEnvelope<proto::UpdateFollowers>,
795 ) -> Result<()> {
796 let connection_ids = self
797 .state()
798 .await
799 .project_connection_ids(request.payload.project_id, request.sender_id)?;
800 let leader_id = request
801 .payload
802 .variant
803 .as_ref()
804 .and_then(|variant| match variant {
805 proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
806 proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
807 proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
808 });
809 for follower_id in &request.payload.follower_ids {
810 let follower_id = ConnectionId(*follower_id);
811 if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
812 self.peer
813 .forward_send(request.sender_id, follower_id, request.payload.clone())?;
814 }
815 }
816 Ok(())
817 }
818
819 async fn get_channels(
820 self: Arc<Server>,
821 request: TypedEnvelope<proto::GetChannels>,
822 ) -> Result<proto::GetChannelsResponse> {
823 let user_id = self
824 .state()
825 .await
826 .user_id_for_connection(request.sender_id)?;
827 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
828 Ok(proto::GetChannelsResponse {
829 channels: channels
830 .into_iter()
831 .map(|chan| proto::Channel {
832 id: chan.id.to_proto(),
833 name: chan.name,
834 })
835 .collect(),
836 })
837 }
838
839 async fn get_users(
840 self: Arc<Server>,
841 request: TypedEnvelope<proto::GetUsers>,
842 ) -> Result<proto::GetUsersResponse> {
843 let user_ids = request
844 .payload
845 .user_ids
846 .into_iter()
847 .map(UserId::from_proto)
848 .collect();
849 let users = self
850 .app_state
851 .db
852 .get_users_by_ids(user_ids)
853 .await?
854 .into_iter()
855 .map(|user| proto::User {
856 id: user.id.to_proto(),
857 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
858 github_login: user.github_login,
859 })
860 .collect();
861 Ok(proto::GetUsersResponse { users })
862 }
863
864 #[instrument(skip(self, state, user_ids))]
865 fn update_contacts_for_users<'a>(
866 self: &Arc<Self>,
867 state: &Store,
868 user_ids: impl IntoIterator<Item = &'a UserId>,
869 ) {
870 for user_id in user_ids {
871 let contacts = state.contacts_for_user(*user_id);
872 for connection_id in state.connection_ids_for_user(*user_id) {
873 self.peer
874 .send(
875 connection_id,
876 proto::UpdateContacts {
877 contacts: contacts.clone(),
878 },
879 )
880 .trace_err();
881 }
882 }
883 }
884
885 async fn join_channel(
886 self: Arc<Self>,
887 request: TypedEnvelope<proto::JoinChannel>,
888 ) -> Result<proto::JoinChannelResponse> {
889 let user_id = self
890 .state()
891 .await
892 .user_id_for_connection(request.sender_id)?;
893 let channel_id = ChannelId::from_proto(request.payload.channel_id);
894 if !self
895 .app_state
896 .db
897 .can_user_access_channel(user_id, channel_id)
898 .await?
899 {
900 Err(anyhow!("access denied"))?;
901 }
902
903 self.state_mut()
904 .await
905 .join_channel(request.sender_id, channel_id);
906 let messages = self
907 .app_state
908 .db
909 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
910 .await?
911 .into_iter()
912 .map(|msg| proto::ChannelMessage {
913 id: msg.id.to_proto(),
914 body: msg.body,
915 timestamp: msg.sent_at.unix_timestamp() as u64,
916 sender_id: msg.sender_id.to_proto(),
917 nonce: Some(msg.nonce.as_u128().into()),
918 })
919 .collect::<Vec<_>>();
920 Ok(proto::JoinChannelResponse {
921 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
922 messages,
923 })
924 }
925
926 async fn leave_channel(
927 self: Arc<Self>,
928 request: TypedEnvelope<proto::LeaveChannel>,
929 ) -> Result<()> {
930 let user_id = self
931 .state()
932 .await
933 .user_id_for_connection(request.sender_id)?;
934 let channel_id = ChannelId::from_proto(request.payload.channel_id);
935 if !self
936 .app_state
937 .db
938 .can_user_access_channel(user_id, channel_id)
939 .await?
940 {
941 Err(anyhow!("access denied"))?;
942 }
943
944 self.state_mut()
945 .await
946 .leave_channel(request.sender_id, channel_id);
947
948 Ok(())
949 }
950
951 async fn send_channel_message(
952 self: Arc<Self>,
953 request: TypedEnvelope<proto::SendChannelMessage>,
954 ) -> Result<proto::SendChannelMessageResponse> {
955 let channel_id = ChannelId::from_proto(request.payload.channel_id);
956 let user_id;
957 let connection_ids;
958 {
959 let state = self.state().await;
960 user_id = state.user_id_for_connection(request.sender_id)?;
961 connection_ids = state.channel_connection_ids(channel_id)?;
962 }
963
964 // Validate the message body.
965 let body = request.payload.body.trim().to_string();
966 if body.len() > MAX_MESSAGE_LEN {
967 return Err(anyhow!("message is too long"))?;
968 }
969 if body.is_empty() {
970 return Err(anyhow!("message can't be blank"))?;
971 }
972
973 let timestamp = OffsetDateTime::now_utc();
974 let nonce = request
975 .payload
976 .nonce
977 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
978
979 let message_id = self
980 .app_state
981 .db
982 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
983 .await?
984 .to_proto();
985 let message = proto::ChannelMessage {
986 sender_id: user_id.to_proto(),
987 id: message_id,
988 body,
989 timestamp: timestamp.unix_timestamp() as u64,
990 nonce: Some(nonce),
991 };
992 broadcast(request.sender_id, connection_ids, |conn_id| {
993 self.peer.send(
994 conn_id,
995 proto::ChannelMessageSent {
996 channel_id: channel_id.to_proto(),
997 message: Some(message.clone()),
998 },
999 )
1000 });
1001 Ok(proto::SendChannelMessageResponse {
1002 message: Some(message),
1003 })
1004 }
1005
1006 async fn get_channel_messages(
1007 self: Arc<Self>,
1008 request: TypedEnvelope<proto::GetChannelMessages>,
1009 ) -> Result<proto::GetChannelMessagesResponse> {
1010 let user_id = self
1011 .state()
1012 .await
1013 .user_id_for_connection(request.sender_id)?;
1014 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1015 if !self
1016 .app_state
1017 .db
1018 .can_user_access_channel(user_id, channel_id)
1019 .await?
1020 {
1021 Err(anyhow!("access denied"))?;
1022 }
1023
1024 let messages = self
1025 .app_state
1026 .db
1027 .get_channel_messages(
1028 channel_id,
1029 MESSAGE_COUNT_PER_PAGE,
1030 Some(MessageId::from_proto(request.payload.before_message_id)),
1031 )
1032 .await?
1033 .into_iter()
1034 .map(|msg| proto::ChannelMessage {
1035 id: msg.id.to_proto(),
1036 body: msg.body,
1037 timestamp: msg.sent_at.unix_timestamp() as u64,
1038 sender_id: msg.sender_id.to_proto(),
1039 nonce: Some(msg.nonce.as_u128().into()),
1040 })
1041 .collect::<Vec<_>>();
1042
1043 Ok(proto::GetChannelMessagesResponse {
1044 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1045 messages,
1046 })
1047 }
1048
1049 async fn state<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1050 #[cfg(test)]
1051 tokio::task::yield_now().await;
1052 let guard = self.store.read().await;
1053 #[cfg(test)]
1054 tokio::task::yield_now().await;
1055 StoreReadGuard {
1056 guard,
1057 _not_send: PhantomData,
1058 }
1059 }
1060
1061 async fn state_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1062 #[cfg(test)]
1063 tokio::task::yield_now().await;
1064 let guard = self.store.write().await;
1065 #[cfg(test)]
1066 tokio::task::yield_now().await;
1067 StoreWriteGuard {
1068 guard,
1069 _not_send: PhantomData,
1070 }
1071 }
1072}
1073
1074impl<'a> Deref for StoreReadGuard<'a> {
1075 type Target = Store;
1076
1077 fn deref(&self) -> &Self::Target {
1078 &*self.guard
1079 }
1080}
1081
1082impl<'a> Deref for StoreWriteGuard<'a> {
1083 type Target = Store;
1084
1085 fn deref(&self) -> &Self::Target {
1086 &*self.guard
1087 }
1088}
1089
1090impl<'a> DerefMut for StoreWriteGuard<'a> {
1091 fn deref_mut(&mut self) -> &mut Self::Target {
1092 &mut *self.guard
1093 }
1094}
1095
1096impl<'a> Drop for StoreWriteGuard<'a> {
1097 fn drop(&mut self) {
1098 #[cfg(test)]
1099 self.check_invariants();
1100
1101 let metrics = self.metrics();
1102 tracing::info!(
1103 connections = metrics.connections,
1104 registered_projects = metrics.registered_projects,
1105 shared_projects = metrics.shared_projects,
1106 "metrics"
1107 );
1108 }
1109}
1110
1111impl Executor for RealExecutor {
1112 type Sleep = Sleep;
1113
1114 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1115 tokio::task::spawn(future);
1116 }
1117
1118 fn sleep(&self, duration: Duration) -> Self::Sleep {
1119 tokio::time::sleep(duration)
1120 }
1121}
1122
1123#[instrument(skip(f))]
1124fn broadcast<F>(sender_id: ConnectionId, receiver_ids: Vec<ConnectionId>, mut f: F)
1125where
1126 F: FnMut(ConnectionId) -> anyhow::Result<()>,
1127{
1128 for receiver_id in receiver_ids {
1129 if receiver_id != sender_id {
1130 f(receiver_id).trace_err();
1131 }
1132 }
1133}
1134
1135lazy_static! {
1136 static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1137}
1138
1139pub struct ProtocolVersion(u32);
1140
1141impl Header for ProtocolVersion {
1142 fn name() -> &'static HeaderName {
1143 &ZED_PROTOCOL_VERSION
1144 }
1145
1146 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1147 where
1148 Self: Sized,
1149 I: Iterator<Item = &'i axum::http::HeaderValue>,
1150 {
1151 let version = values
1152 .next()
1153 .ok_or_else(|| axum::headers::Error::invalid())?
1154 .to_str()
1155 .map_err(|_| axum::headers::Error::invalid())?
1156 .parse()
1157 .map_err(|_| axum::headers::Error::invalid())?;
1158 Ok(Self(version))
1159 }
1160
1161 fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1162 values.extend([self.0.to_string().parse().unwrap()]);
1163 }
1164}
1165
1166pub fn routes(app_state: Arc<AppState>) -> Router<Body> {
1167 let server = Server::new(app_state.clone(), None);
1168 Router::new()
1169 .route("/rpc", get(handle_websocket_request))
1170 .layer(
1171 ServiceBuilder::new()
1172 .layer(Extension(app_state))
1173 .layer(middleware::from_fn(auth::validate_header))
1174 .layer(Extension(server)),
1175 )
1176}
1177
1178pub async fn handle_websocket_request(
1179 TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1180 ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1181 Extension(server): Extension<Arc<Server>>,
1182 Extension(user_id): Extension<UserId>,
1183 ws: WebSocketUpgrade,
1184) -> Response {
1185 if protocol_version != rpc::PROTOCOL_VERSION {
1186 return (
1187 StatusCode::UPGRADE_REQUIRED,
1188 "client must be upgraded".to_string(),
1189 )
1190 .into_response();
1191 }
1192 let socket_address = socket_address.to_string();
1193 ws.on_upgrade(move |socket| {
1194 let socket = socket
1195 .map_ok(to_tungstenite_message)
1196 .err_into()
1197 .with(|message| async move { Ok(to_axum_message(message)) });
1198 let connection = Connection::new(Box::pin(socket));
1199 server.handle_connection(connection, socket_address, user_id, None, RealExecutor)
1200 })
1201}
1202
1203fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1204 match message {
1205 TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1206 TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1207 TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1208 TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1209 TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1210 code: frame.code.into(),
1211 reason: frame.reason,
1212 })),
1213 }
1214}
1215
1216fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1217 match message {
1218 AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1219 AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1220 AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1221 AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1222 AxumMessage::Close(frame) => {
1223 TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1224 code: frame.code.into(),
1225 reason: frame.reason,
1226 }))
1227 }
1228 }
1229}
1230
1231pub trait ResultExt {
1232 type Ok;
1233
1234 fn trace_err(self) -> Option<Self::Ok>;
1235}
1236
1237impl<T, E> ResultExt for Result<T, E>
1238where
1239 E: std::fmt::Debug,
1240{
1241 type Ok = T;
1242
1243 fn trace_err(self) -> Option<T> {
1244 match self {
1245 Ok(value) => Some(value),
1246 Err(error) => {
1247 tracing::error!("{:?}", error);
1248 None
1249 }
1250 }
1251 }
1252}
1253
1254#[cfg(test)]
1255mod tests {
1256 use super::*;
1257 use crate::{
1258 db::{tests::TestDb, UserId},
1259 AppState,
1260 };
1261 use ::rpc::Peer;
1262 use client::{
1263 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1264 EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1265 };
1266 use collections::BTreeMap;
1267 use editor::{
1268 self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1269 ToOffset, ToggleCodeActions, Undo,
1270 };
1271 use gpui::{
1272 executor::{self, Deterministic},
1273 geometry::vector::vec2f,
1274 ModelHandle, TestAppContext, ViewHandle,
1275 };
1276 use language::{
1277 range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1278 LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1279 };
1280 use lsp::{self, FakeLanguageServer};
1281 use parking_lot::Mutex;
1282 use project::{
1283 fs::{FakeFs, Fs as _},
1284 search::SearchQuery,
1285 worktree::WorktreeHandle,
1286 DiagnosticSummary, Project, ProjectPath, WorktreeId,
1287 };
1288 use rand::prelude::*;
1289 use rpc::PeerId;
1290 use serde_json::json;
1291 use settings::Settings;
1292 use sqlx::types::time::OffsetDateTime;
1293 use std::{
1294 env,
1295 ops::Deref,
1296 path::{Path, PathBuf},
1297 rc::Rc,
1298 sync::{
1299 atomic::{AtomicBool, Ordering::SeqCst},
1300 Arc,
1301 },
1302 time::Duration,
1303 };
1304 use theme::ThemeRegistry;
1305 use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
1306
1307 #[cfg(test)]
1308 #[ctor::ctor]
1309 fn init_logger() {
1310 if std::env::var("RUST_LOG").is_ok() {
1311 env_logger::init();
1312 }
1313 }
1314
1315 #[gpui::test(iterations = 10)]
1316 async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1317 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1318 let lang_registry = Arc::new(LanguageRegistry::test());
1319 let fs = FakeFs::new(cx_a.background());
1320 cx_a.foreground().forbid_parking();
1321
1322 // Connect to a server as 2 clients.
1323 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1324 let client_a = server.create_client(cx_a, "user_a").await;
1325 let client_b = server.create_client(cx_b, "user_b").await;
1326
1327 // Share a project as client A
1328 fs.insert_tree(
1329 "/a",
1330 json!({
1331 ".zed.toml": r#"collaborators = ["user_b"]"#,
1332 "a.txt": "a-contents",
1333 "b.txt": "b-contents",
1334 }),
1335 )
1336 .await;
1337 let project_a = cx_a.update(|cx| {
1338 Project::local(
1339 client_a.clone(),
1340 client_a.user_store.clone(),
1341 lang_registry.clone(),
1342 fs.clone(),
1343 cx,
1344 )
1345 });
1346 let (worktree_a, _) = project_a
1347 .update(cx_a, |p, cx| {
1348 p.find_or_create_local_worktree("/a", true, cx)
1349 })
1350 .await
1351 .unwrap();
1352 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1353 worktree_a
1354 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1355 .await;
1356 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1357 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1358
1359 // Join that project as client B
1360 let project_b = Project::remote(
1361 project_id,
1362 client_b.clone(),
1363 client_b.user_store.clone(),
1364 lang_registry.clone(),
1365 fs.clone(),
1366 &mut cx_b.to_async(),
1367 )
1368 .await
1369 .unwrap();
1370
1371 let replica_id_b = project_b.read_with(cx_b, |project, _| {
1372 assert_eq!(
1373 project
1374 .collaborators()
1375 .get(&client_a.peer_id)
1376 .unwrap()
1377 .user
1378 .github_login,
1379 "user_a"
1380 );
1381 project.replica_id()
1382 });
1383 project_a
1384 .condition(&cx_a, |tree, _| {
1385 tree.collaborators()
1386 .get(&client_b.peer_id)
1387 .map_or(false, |collaborator| {
1388 collaborator.replica_id == replica_id_b
1389 && collaborator.user.github_login == "user_b"
1390 })
1391 })
1392 .await;
1393
1394 // Open the same file as client B and client A.
1395 let buffer_b = project_b
1396 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1397 .await
1398 .unwrap();
1399 buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1400 project_a.read_with(cx_a, |project, cx| {
1401 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1402 });
1403 let buffer_a = project_a
1404 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1405 .await
1406 .unwrap();
1407
1408 let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1409
1410 // TODO
1411 // // Create a selection set as client B and see that selection set as client A.
1412 // buffer_a
1413 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1414 // .await;
1415
1416 // Edit the buffer as client B and see that edit as client A.
1417 editor_b.update(cx_b, |editor, cx| {
1418 editor.handle_input(&Input("ok, ".into()), cx)
1419 });
1420 buffer_a
1421 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1422 .await;
1423
1424 // TODO
1425 // // Remove the selection set as client B, see those selections disappear as client A.
1426 cx_b.update(move |_| drop(editor_b));
1427 // buffer_a
1428 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1429 // .await;
1430
1431 // Dropping the client B's project removes client B from client A's collaborators.
1432 cx_b.update(move |_| drop(project_b));
1433 project_a
1434 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1435 .await;
1436 }
1437
1438 #[gpui::test(iterations = 10)]
1439 async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1440 let lang_registry = Arc::new(LanguageRegistry::test());
1441 let fs = FakeFs::new(cx_a.background());
1442 cx_a.foreground().forbid_parking();
1443
1444 // Connect to a server as 2 clients.
1445 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1446 let client_a = server.create_client(cx_a, "user_a").await;
1447 let client_b = server.create_client(cx_b, "user_b").await;
1448
1449 // Share a project as client A
1450 fs.insert_tree(
1451 "/a",
1452 json!({
1453 ".zed.toml": r#"collaborators = ["user_b"]"#,
1454 "a.txt": "a-contents",
1455 "b.txt": "b-contents",
1456 }),
1457 )
1458 .await;
1459 let project_a = cx_a.update(|cx| {
1460 Project::local(
1461 client_a.clone(),
1462 client_a.user_store.clone(),
1463 lang_registry.clone(),
1464 fs.clone(),
1465 cx,
1466 )
1467 });
1468 let (worktree_a, _) = project_a
1469 .update(cx_a, |p, cx| {
1470 p.find_or_create_local_worktree("/a", true, cx)
1471 })
1472 .await
1473 .unwrap();
1474 worktree_a
1475 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1476 .await;
1477 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1478 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1479 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1480 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1481
1482 // Join that project as client B
1483 let project_b = Project::remote(
1484 project_id,
1485 client_b.clone(),
1486 client_b.user_store.clone(),
1487 lang_registry.clone(),
1488 fs.clone(),
1489 &mut cx_b.to_async(),
1490 )
1491 .await
1492 .unwrap();
1493 project_b
1494 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1495 .await
1496 .unwrap();
1497
1498 // Unshare the project as client A
1499 project_a.update(cx_a, |project, cx| project.unshare(cx));
1500 project_b
1501 .condition(cx_b, |project, _| project.is_read_only())
1502 .await;
1503 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1504 cx_b.update(|_| {
1505 drop(project_b);
1506 });
1507
1508 // Share the project again and ensure guests can still join.
1509 project_a
1510 .update(cx_a, |project, cx| project.share(cx))
1511 .await
1512 .unwrap();
1513 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1514
1515 let project_b2 = Project::remote(
1516 project_id,
1517 client_b.clone(),
1518 client_b.user_store.clone(),
1519 lang_registry.clone(),
1520 fs.clone(),
1521 &mut cx_b.to_async(),
1522 )
1523 .await
1524 .unwrap();
1525 project_b2
1526 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1527 .await
1528 .unwrap();
1529 }
1530
1531 #[gpui::test(iterations = 10)]
1532 async fn test_host_disconnect(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1533 let lang_registry = Arc::new(LanguageRegistry::test());
1534 let fs = FakeFs::new(cx_a.background());
1535 cx_a.foreground().forbid_parking();
1536
1537 // Connect to a server as 2 clients.
1538 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1539 let client_a = server.create_client(cx_a, "user_a").await;
1540 let client_b = server.create_client(cx_b, "user_b").await;
1541
1542 // Share a project as client A
1543 fs.insert_tree(
1544 "/a",
1545 json!({
1546 ".zed.toml": r#"collaborators = ["user_b"]"#,
1547 "a.txt": "a-contents",
1548 "b.txt": "b-contents",
1549 }),
1550 )
1551 .await;
1552 let project_a = cx_a.update(|cx| {
1553 Project::local(
1554 client_a.clone(),
1555 client_a.user_store.clone(),
1556 lang_registry.clone(),
1557 fs.clone(),
1558 cx,
1559 )
1560 });
1561 let (worktree_a, _) = project_a
1562 .update(cx_a, |p, cx| {
1563 p.find_or_create_local_worktree("/a", true, cx)
1564 })
1565 .await
1566 .unwrap();
1567 worktree_a
1568 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1569 .await;
1570 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1571 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1572 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1573 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1574
1575 // Join that project as client B
1576 let project_b = Project::remote(
1577 project_id,
1578 client_b.clone(),
1579 client_b.user_store.clone(),
1580 lang_registry.clone(),
1581 fs.clone(),
1582 &mut cx_b.to_async(),
1583 )
1584 .await
1585 .unwrap();
1586 project_b
1587 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1588 .await
1589 .unwrap();
1590
1591 // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
1592 server.disconnect_client(client_a.current_user_id(cx_a));
1593 cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
1594 project_a
1595 .condition(cx_a, |project, _| project.collaborators().is_empty())
1596 .await;
1597 project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
1598 project_b
1599 .condition(cx_b, |project, _| project.is_read_only())
1600 .await;
1601 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1602 cx_b.update(|_| {
1603 drop(project_b);
1604 });
1605
1606 // Await reconnection
1607 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1608
1609 // Share the project again and ensure guests can still join.
1610 project_a
1611 .update(cx_a, |project, cx| project.share(cx))
1612 .await
1613 .unwrap();
1614 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1615
1616 let project_b2 = Project::remote(
1617 project_id,
1618 client_b.clone(),
1619 client_b.user_store.clone(),
1620 lang_registry.clone(),
1621 fs.clone(),
1622 &mut cx_b.to_async(),
1623 )
1624 .await
1625 .unwrap();
1626 project_b2
1627 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1628 .await
1629 .unwrap();
1630 }
1631
1632 #[gpui::test(iterations = 10)]
1633 async fn test_propagate_saves_and_fs_changes(
1634 cx_a: &mut TestAppContext,
1635 cx_b: &mut TestAppContext,
1636 cx_c: &mut TestAppContext,
1637 ) {
1638 let lang_registry = Arc::new(LanguageRegistry::test());
1639 let fs = FakeFs::new(cx_a.background());
1640 cx_a.foreground().forbid_parking();
1641
1642 // Connect to a server as 3 clients.
1643 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1644 let client_a = server.create_client(cx_a, "user_a").await;
1645 let client_b = server.create_client(cx_b, "user_b").await;
1646 let client_c = server.create_client(cx_c, "user_c").await;
1647
1648 // Share a worktree as client A.
1649 fs.insert_tree(
1650 "/a",
1651 json!({
1652 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1653 "file1": "",
1654 "file2": ""
1655 }),
1656 )
1657 .await;
1658 let project_a = cx_a.update(|cx| {
1659 Project::local(
1660 client_a.clone(),
1661 client_a.user_store.clone(),
1662 lang_registry.clone(),
1663 fs.clone(),
1664 cx,
1665 )
1666 });
1667 let (worktree_a, _) = project_a
1668 .update(cx_a, |p, cx| {
1669 p.find_or_create_local_worktree("/a", true, cx)
1670 })
1671 .await
1672 .unwrap();
1673 worktree_a
1674 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1675 .await;
1676 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1677 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1678 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1679
1680 // Join that worktree as clients B and C.
1681 let project_b = Project::remote(
1682 project_id,
1683 client_b.clone(),
1684 client_b.user_store.clone(),
1685 lang_registry.clone(),
1686 fs.clone(),
1687 &mut cx_b.to_async(),
1688 )
1689 .await
1690 .unwrap();
1691 let project_c = Project::remote(
1692 project_id,
1693 client_c.clone(),
1694 client_c.user_store.clone(),
1695 lang_registry.clone(),
1696 fs.clone(),
1697 &mut cx_c.to_async(),
1698 )
1699 .await
1700 .unwrap();
1701 let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1702 let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1703
1704 // Open and edit a buffer as both guests B and C.
1705 let buffer_b = project_b
1706 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1707 .await
1708 .unwrap();
1709 let buffer_c = project_c
1710 .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1711 .await
1712 .unwrap();
1713 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "i-am-b, ")], cx));
1714 buffer_c.update(cx_c, |buf, cx| buf.edit([(0..0, "i-am-c, ")], cx));
1715
1716 // Open and edit that buffer as the host.
1717 let buffer_a = project_a
1718 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1719 .await
1720 .unwrap();
1721
1722 buffer_a
1723 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1724 .await;
1725 buffer_a.update(cx_a, |buf, cx| {
1726 buf.edit([(buf.len()..buf.len(), "i-am-a")], cx)
1727 });
1728
1729 // Wait for edits to propagate
1730 buffer_a
1731 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1732 .await;
1733 buffer_b
1734 .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1735 .await;
1736 buffer_c
1737 .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1738 .await;
1739
1740 // Edit the buffer as the host and concurrently save as guest B.
1741 let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1742 buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "hi-a, ")], cx));
1743 save_b.await.unwrap();
1744 assert_eq!(
1745 fs.load("/a/file1".as_ref()).await.unwrap(),
1746 "hi-a, i-am-c, i-am-b, i-am-a"
1747 );
1748 buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1749 buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1750 buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1751
1752 worktree_a.flush_fs_events(cx_a).await;
1753
1754 // Make changes on host's file system, see those changes on guest worktrees.
1755 fs.rename(
1756 "/a/file1".as_ref(),
1757 "/a/file1-renamed".as_ref(),
1758 Default::default(),
1759 )
1760 .await
1761 .unwrap();
1762
1763 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1764 .await
1765 .unwrap();
1766 fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1767
1768 worktree_a
1769 .condition(&cx_a, |tree, _| {
1770 tree.paths()
1771 .map(|p| p.to_string_lossy())
1772 .collect::<Vec<_>>()
1773 == [".zed.toml", "file1-renamed", "file3", "file4"]
1774 })
1775 .await;
1776 worktree_b
1777 .condition(&cx_b, |tree, _| {
1778 tree.paths()
1779 .map(|p| p.to_string_lossy())
1780 .collect::<Vec<_>>()
1781 == [".zed.toml", "file1-renamed", "file3", "file4"]
1782 })
1783 .await;
1784 worktree_c
1785 .condition(&cx_c, |tree, _| {
1786 tree.paths()
1787 .map(|p| p.to_string_lossy())
1788 .collect::<Vec<_>>()
1789 == [".zed.toml", "file1-renamed", "file3", "file4"]
1790 })
1791 .await;
1792
1793 // Ensure buffer files are updated as well.
1794 buffer_a
1795 .condition(&cx_a, |buf, _| {
1796 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1797 })
1798 .await;
1799 buffer_b
1800 .condition(&cx_b, |buf, _| {
1801 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1802 })
1803 .await;
1804 buffer_c
1805 .condition(&cx_c, |buf, _| {
1806 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1807 })
1808 .await;
1809 }
1810
1811 #[gpui::test(iterations = 10)]
1812 async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1813 cx_a.foreground().forbid_parking();
1814 let lang_registry = Arc::new(LanguageRegistry::test());
1815 let fs = FakeFs::new(cx_a.background());
1816
1817 // Connect to a server as 2 clients.
1818 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1819 let client_a = server.create_client(cx_a, "user_a").await;
1820 let client_b = server.create_client(cx_b, "user_b").await;
1821
1822 // Share a project as client A
1823 fs.insert_tree(
1824 "/dir",
1825 json!({
1826 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1827 "a.txt": "a-contents",
1828 }),
1829 )
1830 .await;
1831
1832 let project_a = cx_a.update(|cx| {
1833 Project::local(
1834 client_a.clone(),
1835 client_a.user_store.clone(),
1836 lang_registry.clone(),
1837 fs.clone(),
1838 cx,
1839 )
1840 });
1841 let (worktree_a, _) = project_a
1842 .update(cx_a, |p, cx| {
1843 p.find_or_create_local_worktree("/dir", true, cx)
1844 })
1845 .await
1846 .unwrap();
1847 worktree_a
1848 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1849 .await;
1850 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1851 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1852 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1853
1854 // Join that project as client B
1855 let project_b = Project::remote(
1856 project_id,
1857 client_b.clone(),
1858 client_b.user_store.clone(),
1859 lang_registry.clone(),
1860 fs.clone(),
1861 &mut cx_b.to_async(),
1862 )
1863 .await
1864 .unwrap();
1865
1866 // Open a buffer as client B
1867 let buffer_b = project_b
1868 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1869 .await
1870 .unwrap();
1871
1872 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "world ")], cx));
1873 buffer_b.read_with(cx_b, |buf, _| {
1874 assert!(buf.is_dirty());
1875 assert!(!buf.has_conflict());
1876 });
1877
1878 buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
1879 buffer_b
1880 .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1881 .await;
1882 buffer_b.read_with(cx_b, |buf, _| {
1883 assert!(!buf.has_conflict());
1884 });
1885
1886 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "hello ")], cx));
1887 buffer_b.read_with(cx_b, |buf, _| {
1888 assert!(buf.is_dirty());
1889 assert!(!buf.has_conflict());
1890 });
1891 }
1892
1893 #[gpui::test(iterations = 10)]
1894 async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1895 cx_a.foreground().forbid_parking();
1896 let lang_registry = Arc::new(LanguageRegistry::test());
1897 let fs = FakeFs::new(cx_a.background());
1898
1899 // Connect to a server as 2 clients.
1900 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1901 let client_a = server.create_client(cx_a, "user_a").await;
1902 let client_b = server.create_client(cx_b, "user_b").await;
1903
1904 // Share a project as client A
1905 fs.insert_tree(
1906 "/dir",
1907 json!({
1908 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1909 "a.txt": "a-contents",
1910 }),
1911 )
1912 .await;
1913
1914 let project_a = cx_a.update(|cx| {
1915 Project::local(
1916 client_a.clone(),
1917 client_a.user_store.clone(),
1918 lang_registry.clone(),
1919 fs.clone(),
1920 cx,
1921 )
1922 });
1923 let (worktree_a, _) = project_a
1924 .update(cx_a, |p, cx| {
1925 p.find_or_create_local_worktree("/dir", true, cx)
1926 })
1927 .await
1928 .unwrap();
1929 worktree_a
1930 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1931 .await;
1932 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1933 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1934 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1935
1936 // Join that project as client B
1937 let project_b = Project::remote(
1938 project_id,
1939 client_b.clone(),
1940 client_b.user_store.clone(),
1941 lang_registry.clone(),
1942 fs.clone(),
1943 &mut cx_b.to_async(),
1944 )
1945 .await
1946 .unwrap();
1947 let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1948
1949 // Open a buffer as client B
1950 let buffer_b = project_b
1951 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1952 .await
1953 .unwrap();
1954 buffer_b.read_with(cx_b, |buf, _| {
1955 assert!(!buf.is_dirty());
1956 assert!(!buf.has_conflict());
1957 });
1958
1959 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1960 .await
1961 .unwrap();
1962 buffer_b
1963 .condition(&cx_b, |buf, _| {
1964 buf.text() == "new contents" && !buf.is_dirty()
1965 })
1966 .await;
1967 buffer_b.read_with(cx_b, |buf, _| {
1968 assert!(!buf.has_conflict());
1969 });
1970 }
1971
1972 #[gpui::test(iterations = 10)]
1973 async fn test_editing_while_guest_opens_buffer(
1974 cx_a: &mut TestAppContext,
1975 cx_b: &mut TestAppContext,
1976 ) {
1977 cx_a.foreground().forbid_parking();
1978 let lang_registry = Arc::new(LanguageRegistry::test());
1979 let fs = FakeFs::new(cx_a.background());
1980
1981 // Connect to a server as 2 clients.
1982 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1983 let client_a = server.create_client(cx_a, "user_a").await;
1984 let client_b = server.create_client(cx_b, "user_b").await;
1985
1986 // Share a project as client A
1987 fs.insert_tree(
1988 "/dir",
1989 json!({
1990 ".zed.toml": r#"collaborators = ["user_b"]"#,
1991 "a.txt": "a-contents",
1992 }),
1993 )
1994 .await;
1995 let project_a = cx_a.update(|cx| {
1996 Project::local(
1997 client_a.clone(),
1998 client_a.user_store.clone(),
1999 lang_registry.clone(),
2000 fs.clone(),
2001 cx,
2002 )
2003 });
2004 let (worktree_a, _) = project_a
2005 .update(cx_a, |p, cx| {
2006 p.find_or_create_local_worktree("/dir", true, cx)
2007 })
2008 .await
2009 .unwrap();
2010 worktree_a
2011 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2012 .await;
2013 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2014 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2015 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2016
2017 // Join that project as client B
2018 let project_b = Project::remote(
2019 project_id,
2020 client_b.clone(),
2021 client_b.user_store.clone(),
2022 lang_registry.clone(),
2023 fs.clone(),
2024 &mut cx_b.to_async(),
2025 )
2026 .await
2027 .unwrap();
2028
2029 // Open a buffer as client A
2030 let buffer_a = project_a
2031 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2032 .await
2033 .unwrap();
2034
2035 // Start opening the same buffer as client B
2036 let buffer_b = cx_b
2037 .background()
2038 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2039
2040 // Edit the buffer as client A while client B is still opening it.
2041 cx_b.background().simulate_random_delay().await;
2042 buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "X")], cx));
2043 cx_b.background().simulate_random_delay().await;
2044 buffer_a.update(cx_a, |buf, cx| buf.edit([(1..1, "Y")], cx));
2045
2046 let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2047 let buffer_b = buffer_b.await.unwrap();
2048 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2049 }
2050
2051 #[gpui::test(iterations = 10)]
2052 async fn test_leaving_worktree_while_opening_buffer(
2053 cx_a: &mut TestAppContext,
2054 cx_b: &mut TestAppContext,
2055 ) {
2056 cx_a.foreground().forbid_parking();
2057 let lang_registry = Arc::new(LanguageRegistry::test());
2058 let fs = FakeFs::new(cx_a.background());
2059
2060 // Connect to a server as 2 clients.
2061 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2062 let client_a = server.create_client(cx_a, "user_a").await;
2063 let client_b = server.create_client(cx_b, "user_b").await;
2064
2065 // Share a project as client A
2066 fs.insert_tree(
2067 "/dir",
2068 json!({
2069 ".zed.toml": r#"collaborators = ["user_b"]"#,
2070 "a.txt": "a-contents",
2071 }),
2072 )
2073 .await;
2074 let project_a = cx_a.update(|cx| {
2075 Project::local(
2076 client_a.clone(),
2077 client_a.user_store.clone(),
2078 lang_registry.clone(),
2079 fs.clone(),
2080 cx,
2081 )
2082 });
2083 let (worktree_a, _) = project_a
2084 .update(cx_a, |p, cx| {
2085 p.find_or_create_local_worktree("/dir", true, cx)
2086 })
2087 .await
2088 .unwrap();
2089 worktree_a
2090 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2091 .await;
2092 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2093 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2094 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2095
2096 // Join that project as client B
2097 let project_b = Project::remote(
2098 project_id,
2099 client_b.clone(),
2100 client_b.user_store.clone(),
2101 lang_registry.clone(),
2102 fs.clone(),
2103 &mut cx_b.to_async(),
2104 )
2105 .await
2106 .unwrap();
2107
2108 // See that a guest has joined as client A.
2109 project_a
2110 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2111 .await;
2112
2113 // Begin opening a buffer as client B, but leave the project before the open completes.
2114 let buffer_b = cx_b
2115 .background()
2116 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2117 cx_b.update(|_| drop(project_b));
2118 drop(buffer_b);
2119
2120 // See that the guest has left.
2121 project_a
2122 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2123 .await;
2124 }
2125
2126 #[gpui::test(iterations = 10)]
2127 async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2128 cx_a.foreground().forbid_parking();
2129 let lang_registry = Arc::new(LanguageRegistry::test());
2130 let fs = FakeFs::new(cx_a.background());
2131
2132 // Connect to a server as 2 clients.
2133 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2134 let client_a = server.create_client(cx_a, "user_a").await;
2135 let client_b = server.create_client(cx_b, "user_b").await;
2136
2137 // Share a project as client A
2138 fs.insert_tree(
2139 "/a",
2140 json!({
2141 ".zed.toml": r#"collaborators = ["user_b"]"#,
2142 "a.txt": "a-contents",
2143 "b.txt": "b-contents",
2144 }),
2145 )
2146 .await;
2147 let project_a = cx_a.update(|cx| {
2148 Project::local(
2149 client_a.clone(),
2150 client_a.user_store.clone(),
2151 lang_registry.clone(),
2152 fs.clone(),
2153 cx,
2154 )
2155 });
2156 let (worktree_a, _) = project_a
2157 .update(cx_a, |p, cx| {
2158 p.find_or_create_local_worktree("/a", true, cx)
2159 })
2160 .await
2161 .unwrap();
2162 worktree_a
2163 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2164 .await;
2165 let project_id = project_a
2166 .update(cx_a, |project, _| project.next_remote_id())
2167 .await;
2168 project_a
2169 .update(cx_a, |project, cx| project.share(cx))
2170 .await
2171 .unwrap();
2172
2173 // Join that project as client B
2174 let _project_b = Project::remote(
2175 project_id,
2176 client_b.clone(),
2177 client_b.user_store.clone(),
2178 lang_registry.clone(),
2179 fs.clone(),
2180 &mut cx_b.to_async(),
2181 )
2182 .await
2183 .unwrap();
2184
2185 // Client A sees that a guest has joined.
2186 project_a
2187 .condition(cx_a, |p, _| p.collaborators().len() == 1)
2188 .await;
2189
2190 // Drop client B's connection and ensure client A observes client B leaving the project.
2191 client_b.disconnect(&cx_b.to_async()).unwrap();
2192 project_a
2193 .condition(cx_a, |p, _| p.collaborators().len() == 0)
2194 .await;
2195
2196 // Rejoin the project as client B
2197 let _project_b = Project::remote(
2198 project_id,
2199 client_b.clone(),
2200 client_b.user_store.clone(),
2201 lang_registry.clone(),
2202 fs.clone(),
2203 &mut cx_b.to_async(),
2204 )
2205 .await
2206 .unwrap();
2207
2208 // Client A sees that a guest has re-joined.
2209 project_a
2210 .condition(cx_a, |p, _| p.collaborators().len() == 1)
2211 .await;
2212
2213 // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2214 client_b.wait_for_current_user(cx_b).await;
2215 server.disconnect_client(client_b.current_user_id(cx_b));
2216 cx_a.foreground().advance_clock(Duration::from_secs(3));
2217 project_a
2218 .condition(cx_a, |p, _| p.collaborators().len() == 0)
2219 .await;
2220 }
2221
2222 #[gpui::test(iterations = 10)]
2223 async fn test_collaborating_with_diagnostics(
2224 cx_a: &mut TestAppContext,
2225 cx_b: &mut TestAppContext,
2226 ) {
2227 cx_a.foreground().forbid_parking();
2228 let lang_registry = Arc::new(LanguageRegistry::test());
2229 let fs = FakeFs::new(cx_a.background());
2230
2231 // Set up a fake language server.
2232 let mut language = Language::new(
2233 LanguageConfig {
2234 name: "Rust".into(),
2235 path_suffixes: vec!["rs".to_string()],
2236 ..Default::default()
2237 },
2238 Some(tree_sitter_rust::language()),
2239 );
2240 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2241 lang_registry.add(Arc::new(language));
2242
2243 // Connect to a server as 2 clients.
2244 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2245 let client_a = server.create_client(cx_a, "user_a").await;
2246 let client_b = server.create_client(cx_b, "user_b").await;
2247
2248 // Share a project as client A
2249 fs.insert_tree(
2250 "/a",
2251 json!({
2252 ".zed.toml": r#"collaborators = ["user_b"]"#,
2253 "a.rs": "let one = two",
2254 "other.rs": "",
2255 }),
2256 )
2257 .await;
2258 let project_a = cx_a.update(|cx| {
2259 Project::local(
2260 client_a.clone(),
2261 client_a.user_store.clone(),
2262 lang_registry.clone(),
2263 fs.clone(),
2264 cx,
2265 )
2266 });
2267 let (worktree_a, _) = project_a
2268 .update(cx_a, |p, cx| {
2269 p.find_or_create_local_worktree("/a", true, cx)
2270 })
2271 .await
2272 .unwrap();
2273 worktree_a
2274 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2275 .await;
2276 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2277 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2278 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2279
2280 // Cause the language server to start.
2281 let _ = cx_a
2282 .background()
2283 .spawn(project_a.update(cx_a, |project, cx| {
2284 project.open_buffer(
2285 ProjectPath {
2286 worktree_id,
2287 path: Path::new("other.rs").into(),
2288 },
2289 cx,
2290 )
2291 }))
2292 .await
2293 .unwrap();
2294
2295 // Simulate a language server reporting errors for a file.
2296 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2297 fake_language_server
2298 .receive_notification::<lsp::notification::DidOpenTextDocument>()
2299 .await;
2300 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2301 lsp::PublishDiagnosticsParams {
2302 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2303 version: None,
2304 diagnostics: vec![lsp::Diagnostic {
2305 severity: Some(lsp::DiagnosticSeverity::ERROR),
2306 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2307 message: "message 1".to_string(),
2308 ..Default::default()
2309 }],
2310 },
2311 );
2312
2313 // Wait for server to see the diagnostics update.
2314 server
2315 .condition(|store| {
2316 let worktree = store
2317 .project(project_id)
2318 .unwrap()
2319 .share
2320 .as_ref()
2321 .unwrap()
2322 .worktrees
2323 .get(&worktree_id.to_proto())
2324 .unwrap();
2325
2326 !worktree.diagnostic_summaries.is_empty()
2327 })
2328 .await;
2329
2330 // Join the worktree as client B.
2331 let project_b = Project::remote(
2332 project_id,
2333 client_b.clone(),
2334 client_b.user_store.clone(),
2335 lang_registry.clone(),
2336 fs.clone(),
2337 &mut cx_b.to_async(),
2338 )
2339 .await
2340 .unwrap();
2341
2342 project_b.read_with(cx_b, |project, cx| {
2343 assert_eq!(
2344 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2345 &[(
2346 ProjectPath {
2347 worktree_id,
2348 path: Arc::from(Path::new("a.rs")),
2349 },
2350 DiagnosticSummary {
2351 error_count: 1,
2352 warning_count: 0,
2353 ..Default::default()
2354 },
2355 )]
2356 )
2357 });
2358
2359 // Simulate a language server reporting more errors for a file.
2360 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2361 lsp::PublishDiagnosticsParams {
2362 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2363 version: None,
2364 diagnostics: vec![
2365 lsp::Diagnostic {
2366 severity: Some(lsp::DiagnosticSeverity::ERROR),
2367 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2368 message: "message 1".to_string(),
2369 ..Default::default()
2370 },
2371 lsp::Diagnostic {
2372 severity: Some(lsp::DiagnosticSeverity::WARNING),
2373 range: lsp::Range::new(
2374 lsp::Position::new(0, 10),
2375 lsp::Position::new(0, 13),
2376 ),
2377 message: "message 2".to_string(),
2378 ..Default::default()
2379 },
2380 ],
2381 },
2382 );
2383
2384 // Client b gets the updated summaries
2385 project_b
2386 .condition(&cx_b, |project, cx| {
2387 project.diagnostic_summaries(cx).collect::<Vec<_>>()
2388 == &[(
2389 ProjectPath {
2390 worktree_id,
2391 path: Arc::from(Path::new("a.rs")),
2392 },
2393 DiagnosticSummary {
2394 error_count: 1,
2395 warning_count: 1,
2396 ..Default::default()
2397 },
2398 )]
2399 })
2400 .await;
2401
2402 // Open the file with the errors on client B. They should be present.
2403 let buffer_b = cx_b
2404 .background()
2405 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2406 .await
2407 .unwrap();
2408
2409 buffer_b.read_with(cx_b, |buffer, _| {
2410 assert_eq!(
2411 buffer
2412 .snapshot()
2413 .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2414 .map(|entry| entry)
2415 .collect::<Vec<_>>(),
2416 &[
2417 DiagnosticEntry {
2418 range: Point::new(0, 4)..Point::new(0, 7),
2419 diagnostic: Diagnostic {
2420 group_id: 0,
2421 message: "message 1".to_string(),
2422 severity: lsp::DiagnosticSeverity::ERROR,
2423 is_primary: true,
2424 ..Default::default()
2425 }
2426 },
2427 DiagnosticEntry {
2428 range: Point::new(0, 10)..Point::new(0, 13),
2429 diagnostic: Diagnostic {
2430 group_id: 1,
2431 severity: lsp::DiagnosticSeverity::WARNING,
2432 message: "message 2".to_string(),
2433 is_primary: true,
2434 ..Default::default()
2435 }
2436 }
2437 ]
2438 );
2439 });
2440
2441 // Simulate a language server reporting no errors for a file.
2442 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2443 lsp::PublishDiagnosticsParams {
2444 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2445 version: None,
2446 diagnostics: vec![],
2447 },
2448 );
2449 project_a
2450 .condition(cx_a, |project, cx| {
2451 project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2452 })
2453 .await;
2454 project_b
2455 .condition(cx_b, |project, cx| {
2456 project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2457 })
2458 .await;
2459 }
2460
2461 #[gpui::test(iterations = 10)]
2462 async fn test_collaborating_with_completion(
2463 cx_a: &mut TestAppContext,
2464 cx_b: &mut TestAppContext,
2465 ) {
2466 cx_a.foreground().forbid_parking();
2467 let lang_registry = Arc::new(LanguageRegistry::test());
2468 let fs = FakeFs::new(cx_a.background());
2469
2470 // Set up a fake language server.
2471 let mut language = Language::new(
2472 LanguageConfig {
2473 name: "Rust".into(),
2474 path_suffixes: vec!["rs".to_string()],
2475 ..Default::default()
2476 },
2477 Some(tree_sitter_rust::language()),
2478 );
2479 let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
2480 capabilities: lsp::ServerCapabilities {
2481 completion_provider: Some(lsp::CompletionOptions {
2482 trigger_characters: Some(vec![".".to_string()]),
2483 ..Default::default()
2484 }),
2485 ..Default::default()
2486 },
2487 ..Default::default()
2488 });
2489 lang_registry.add(Arc::new(language));
2490
2491 // Connect to a server as 2 clients.
2492 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2493 let client_a = server.create_client(cx_a, "user_a").await;
2494 let client_b = server.create_client(cx_b, "user_b").await;
2495
2496 // Share a project as client A
2497 fs.insert_tree(
2498 "/a",
2499 json!({
2500 ".zed.toml": r#"collaborators = ["user_b"]"#,
2501 "main.rs": "fn main() { a }",
2502 "other.rs": "",
2503 }),
2504 )
2505 .await;
2506 let project_a = cx_a.update(|cx| {
2507 Project::local(
2508 client_a.clone(),
2509 client_a.user_store.clone(),
2510 lang_registry.clone(),
2511 fs.clone(),
2512 cx,
2513 )
2514 });
2515 let (worktree_a, _) = project_a
2516 .update(cx_a, |p, cx| {
2517 p.find_or_create_local_worktree("/a", true, cx)
2518 })
2519 .await
2520 .unwrap();
2521 worktree_a
2522 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2523 .await;
2524 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2525 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2526 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2527
2528 // Join the worktree as client B.
2529 let project_b = Project::remote(
2530 project_id,
2531 client_b.clone(),
2532 client_b.user_store.clone(),
2533 lang_registry.clone(),
2534 fs.clone(),
2535 &mut cx_b.to_async(),
2536 )
2537 .await
2538 .unwrap();
2539
2540 // Open a file in an editor as the guest.
2541 let buffer_b = project_b
2542 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2543 .await
2544 .unwrap();
2545 let (window_b, _) = cx_b.add_window(|_| EmptyView);
2546 let editor_b = cx_b.add_view(window_b, |cx| {
2547 Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
2548 });
2549
2550 let fake_language_server = fake_language_servers.next().await.unwrap();
2551 buffer_b
2552 .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2553 .await;
2554
2555 // Type a completion trigger character as the guest.
2556 editor_b.update(cx_b, |editor, cx| {
2557 editor.select_ranges([13..13], None, cx);
2558 editor.handle_input(&Input(".".into()), cx);
2559 cx.focus(&editor_b);
2560 });
2561
2562 // Receive a completion request as the host's language server.
2563 // Return some completions from the host's language server.
2564 cx_a.foreground().start_waiting();
2565 fake_language_server
2566 .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
2567 assert_eq!(
2568 params.text_document_position.text_document.uri,
2569 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2570 );
2571 assert_eq!(
2572 params.text_document_position.position,
2573 lsp::Position::new(0, 14),
2574 );
2575
2576 Ok(Some(lsp::CompletionResponse::Array(vec![
2577 lsp::CompletionItem {
2578 label: "first_method(…)".into(),
2579 detail: Some("fn(&mut self, B) -> C".into()),
2580 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2581 new_text: "first_method($1)".to_string(),
2582 range: lsp::Range::new(
2583 lsp::Position::new(0, 14),
2584 lsp::Position::new(0, 14),
2585 ),
2586 })),
2587 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2588 ..Default::default()
2589 },
2590 lsp::CompletionItem {
2591 label: "second_method(…)".into(),
2592 detail: Some("fn(&mut self, C) -> D<E>".into()),
2593 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2594 new_text: "second_method()".to_string(),
2595 range: lsp::Range::new(
2596 lsp::Position::new(0, 14),
2597 lsp::Position::new(0, 14),
2598 ),
2599 })),
2600 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2601 ..Default::default()
2602 },
2603 ])))
2604 })
2605 .next()
2606 .await
2607 .unwrap();
2608 cx_a.foreground().finish_waiting();
2609
2610 // Open the buffer on the host.
2611 let buffer_a = project_a
2612 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2613 .await
2614 .unwrap();
2615 buffer_a
2616 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2617 .await;
2618
2619 // Confirm a completion on the guest.
2620 editor_b
2621 .condition(&cx_b, |editor, _| editor.context_menu_visible())
2622 .await;
2623 editor_b.update(cx_b, |editor, cx| {
2624 editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
2625 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2626 });
2627
2628 // Return a resolved completion from the host's language server.
2629 // The resolved completion has an additional text edit.
2630 fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
2631 |params, _| async move {
2632 assert_eq!(params.label, "first_method(…)");
2633 Ok(lsp::CompletionItem {
2634 label: "first_method(…)".into(),
2635 detail: Some("fn(&mut self, B) -> C".into()),
2636 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2637 new_text: "first_method($1)".to_string(),
2638 range: lsp::Range::new(
2639 lsp::Position::new(0, 14),
2640 lsp::Position::new(0, 14),
2641 ),
2642 })),
2643 additional_text_edits: Some(vec![lsp::TextEdit {
2644 new_text: "use d::SomeTrait;\n".to_string(),
2645 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2646 }]),
2647 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2648 ..Default::default()
2649 })
2650 },
2651 );
2652
2653 // The additional edit is applied.
2654 buffer_a
2655 .condition(&cx_a, |buffer, _| {
2656 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2657 })
2658 .await;
2659 buffer_b
2660 .condition(&cx_b, |buffer, _| {
2661 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2662 })
2663 .await;
2664 }
2665
2666 #[gpui::test(iterations = 10)]
2667 async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2668 cx_a.foreground().forbid_parking();
2669 let lang_registry = Arc::new(LanguageRegistry::test());
2670 let fs = FakeFs::new(cx_a.background());
2671
2672 // Connect to a server as 2 clients.
2673 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2674 let client_a = server.create_client(cx_a, "user_a").await;
2675 let client_b = server.create_client(cx_b, "user_b").await;
2676
2677 // Share a project as client A
2678 fs.insert_tree(
2679 "/a",
2680 json!({
2681 ".zed.toml": r#"collaborators = ["user_b"]"#,
2682 "a.rs": "let one = 1;",
2683 }),
2684 )
2685 .await;
2686 let project_a = cx_a.update(|cx| {
2687 Project::local(
2688 client_a.clone(),
2689 client_a.user_store.clone(),
2690 lang_registry.clone(),
2691 fs.clone(),
2692 cx,
2693 )
2694 });
2695 let (worktree_a, _) = project_a
2696 .update(cx_a, |p, cx| {
2697 p.find_or_create_local_worktree("/a", true, cx)
2698 })
2699 .await
2700 .unwrap();
2701 worktree_a
2702 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2703 .await;
2704 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2705 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2706 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2707 let buffer_a = project_a
2708 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
2709 .await
2710 .unwrap();
2711
2712 // Join the worktree as client B.
2713 let project_b = Project::remote(
2714 project_id,
2715 client_b.clone(),
2716 client_b.user_store.clone(),
2717 lang_registry.clone(),
2718 fs.clone(),
2719 &mut cx_b.to_async(),
2720 )
2721 .await
2722 .unwrap();
2723
2724 let buffer_b = cx_b
2725 .background()
2726 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2727 .await
2728 .unwrap();
2729 buffer_b.update(cx_b, |buffer, cx| {
2730 buffer.edit([(4..7, "six")], cx);
2731 buffer.edit([(10..11, "6")], cx);
2732 assert_eq!(buffer.text(), "let six = 6;");
2733 assert!(buffer.is_dirty());
2734 assert!(!buffer.has_conflict());
2735 });
2736 buffer_a
2737 .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
2738 .await;
2739
2740 fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
2741 .await
2742 .unwrap();
2743 buffer_a
2744 .condition(cx_a, |buffer, _| buffer.has_conflict())
2745 .await;
2746 buffer_b
2747 .condition(cx_b, |buffer, _| buffer.has_conflict())
2748 .await;
2749
2750 project_b
2751 .update(cx_b, |project, cx| {
2752 project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
2753 })
2754 .await
2755 .unwrap();
2756 buffer_a.read_with(cx_a, |buffer, _| {
2757 assert_eq!(buffer.text(), "let seven = 7;");
2758 assert!(!buffer.is_dirty());
2759 assert!(!buffer.has_conflict());
2760 });
2761 buffer_b.read_with(cx_b, |buffer, _| {
2762 assert_eq!(buffer.text(), "let seven = 7;");
2763 assert!(!buffer.is_dirty());
2764 assert!(!buffer.has_conflict());
2765 });
2766
2767 buffer_a.update(cx_a, |buffer, cx| {
2768 // Undoing on the host is a no-op when the reload was initiated by the guest.
2769 buffer.undo(cx);
2770 assert_eq!(buffer.text(), "let seven = 7;");
2771 assert!(!buffer.is_dirty());
2772 assert!(!buffer.has_conflict());
2773 });
2774 buffer_b.update(cx_b, |buffer, cx| {
2775 // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
2776 buffer.undo(cx);
2777 assert_eq!(buffer.text(), "let six = 6;");
2778 assert!(buffer.is_dirty());
2779 assert!(!buffer.has_conflict());
2780 });
2781 }
2782
2783 #[gpui::test(iterations = 10)]
2784 async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2785 cx_a.foreground().forbid_parking();
2786 let lang_registry = Arc::new(LanguageRegistry::test());
2787 let fs = FakeFs::new(cx_a.background());
2788
2789 // Set up a fake language server.
2790 let mut language = Language::new(
2791 LanguageConfig {
2792 name: "Rust".into(),
2793 path_suffixes: vec!["rs".to_string()],
2794 ..Default::default()
2795 },
2796 Some(tree_sitter_rust::language()),
2797 );
2798 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2799 lang_registry.add(Arc::new(language));
2800
2801 // Connect to a server as 2 clients.
2802 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2803 let client_a = server.create_client(cx_a, "user_a").await;
2804 let client_b = server.create_client(cx_b, "user_b").await;
2805
2806 // Share a project as client A
2807 fs.insert_tree(
2808 "/a",
2809 json!({
2810 ".zed.toml": r#"collaborators = ["user_b"]"#,
2811 "a.rs": "let one = two",
2812 }),
2813 )
2814 .await;
2815 let project_a = cx_a.update(|cx| {
2816 Project::local(
2817 client_a.clone(),
2818 client_a.user_store.clone(),
2819 lang_registry.clone(),
2820 fs.clone(),
2821 cx,
2822 )
2823 });
2824 let (worktree_a, _) = project_a
2825 .update(cx_a, |p, cx| {
2826 p.find_or_create_local_worktree("/a", true, cx)
2827 })
2828 .await
2829 .unwrap();
2830 worktree_a
2831 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2832 .await;
2833 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2834 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2835 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2836
2837 // Join the worktree as client B.
2838 let project_b = Project::remote(
2839 project_id,
2840 client_b.clone(),
2841 client_b.user_store.clone(),
2842 lang_registry.clone(),
2843 fs.clone(),
2844 &mut cx_b.to_async(),
2845 )
2846 .await
2847 .unwrap();
2848
2849 let buffer_b = cx_b
2850 .background()
2851 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2852 .await
2853 .unwrap();
2854
2855 let fake_language_server = fake_language_servers.next().await.unwrap();
2856 fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
2857 Ok(Some(vec![
2858 lsp::TextEdit {
2859 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2860 new_text: "h".to_string(),
2861 },
2862 lsp::TextEdit {
2863 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2864 new_text: "y".to_string(),
2865 },
2866 ]))
2867 });
2868
2869 project_b
2870 .update(cx_b, |project, cx| {
2871 project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2872 })
2873 .await
2874 .unwrap();
2875 assert_eq!(
2876 buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
2877 "let honey = two"
2878 );
2879 }
2880
2881 #[gpui::test(iterations = 10)]
2882 async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2883 cx_a.foreground().forbid_parking();
2884 let lang_registry = Arc::new(LanguageRegistry::test());
2885 let fs = FakeFs::new(cx_a.background());
2886 fs.insert_tree(
2887 "/root-1",
2888 json!({
2889 ".zed.toml": r#"collaborators = ["user_b"]"#,
2890 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2891 }),
2892 )
2893 .await;
2894 fs.insert_tree(
2895 "/root-2",
2896 json!({
2897 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2898 }),
2899 )
2900 .await;
2901
2902 // Set up a fake language server.
2903 let mut language = Language::new(
2904 LanguageConfig {
2905 name: "Rust".into(),
2906 path_suffixes: vec!["rs".to_string()],
2907 ..Default::default()
2908 },
2909 Some(tree_sitter_rust::language()),
2910 );
2911 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2912 lang_registry.add(Arc::new(language));
2913
2914 // Connect to a server as 2 clients.
2915 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2916 let client_a = server.create_client(cx_a, "user_a").await;
2917 let client_b = server.create_client(cx_b, "user_b").await;
2918
2919 // Share a project as client A
2920 let project_a = cx_a.update(|cx| {
2921 Project::local(
2922 client_a.clone(),
2923 client_a.user_store.clone(),
2924 lang_registry.clone(),
2925 fs.clone(),
2926 cx,
2927 )
2928 });
2929 let (worktree_a, _) = project_a
2930 .update(cx_a, |p, cx| {
2931 p.find_or_create_local_worktree("/root-1", true, cx)
2932 })
2933 .await
2934 .unwrap();
2935 worktree_a
2936 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2937 .await;
2938 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2939 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2940 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2941
2942 // Join the worktree as client B.
2943 let project_b = Project::remote(
2944 project_id,
2945 client_b.clone(),
2946 client_b.user_store.clone(),
2947 lang_registry.clone(),
2948 fs.clone(),
2949 &mut cx_b.to_async(),
2950 )
2951 .await
2952 .unwrap();
2953
2954 // Open the file on client B.
2955 let buffer_b = cx_b
2956 .background()
2957 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2958 .await
2959 .unwrap();
2960
2961 // Request the definition of a symbol as the guest.
2962 let fake_language_server = fake_language_servers.next().await.unwrap();
2963 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
2964 |_, _| async move {
2965 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
2966 lsp::Location::new(
2967 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2968 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2969 ),
2970 )))
2971 },
2972 );
2973
2974 let definitions_1 = project_b
2975 .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
2976 .await
2977 .unwrap();
2978 cx_b.read(|cx| {
2979 assert_eq!(definitions_1.len(), 1);
2980 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2981 let target_buffer = definitions_1[0].buffer.read(cx);
2982 assert_eq!(
2983 target_buffer.text(),
2984 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2985 );
2986 assert_eq!(
2987 definitions_1[0].range.to_point(target_buffer),
2988 Point::new(0, 6)..Point::new(0, 9)
2989 );
2990 });
2991
2992 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2993 // the previous call to `definition`.
2994 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
2995 |_, _| async move {
2996 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
2997 lsp::Location::new(
2998 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2999 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
3000 ),
3001 )))
3002 },
3003 );
3004
3005 let definitions_2 = project_b
3006 .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
3007 .await
3008 .unwrap();
3009 cx_b.read(|cx| {
3010 assert_eq!(definitions_2.len(), 1);
3011 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3012 let target_buffer = definitions_2[0].buffer.read(cx);
3013 assert_eq!(
3014 target_buffer.text(),
3015 "const TWO: usize = 2;\nconst THREE: usize = 3;"
3016 );
3017 assert_eq!(
3018 definitions_2[0].range.to_point(target_buffer),
3019 Point::new(1, 6)..Point::new(1, 11)
3020 );
3021 });
3022 assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
3023 }
3024
3025 #[gpui::test(iterations = 10)]
3026 async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3027 cx_a.foreground().forbid_parking();
3028 let lang_registry = Arc::new(LanguageRegistry::test());
3029 let fs = FakeFs::new(cx_a.background());
3030 fs.insert_tree(
3031 "/root-1",
3032 json!({
3033 ".zed.toml": r#"collaborators = ["user_b"]"#,
3034 "one.rs": "const ONE: usize = 1;",
3035 "two.rs": "const TWO: usize = one::ONE + one::ONE;",
3036 }),
3037 )
3038 .await;
3039 fs.insert_tree(
3040 "/root-2",
3041 json!({
3042 "three.rs": "const THREE: usize = two::TWO + one::ONE;",
3043 }),
3044 )
3045 .await;
3046
3047 // Set up a fake language server.
3048 let mut language = Language::new(
3049 LanguageConfig {
3050 name: "Rust".into(),
3051 path_suffixes: vec!["rs".to_string()],
3052 ..Default::default()
3053 },
3054 Some(tree_sitter_rust::language()),
3055 );
3056 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3057 lang_registry.add(Arc::new(language));
3058
3059 // Connect to a server as 2 clients.
3060 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3061 let client_a = server.create_client(cx_a, "user_a").await;
3062 let client_b = server.create_client(cx_b, "user_b").await;
3063
3064 // Share a project as client A
3065 let project_a = cx_a.update(|cx| {
3066 Project::local(
3067 client_a.clone(),
3068 client_a.user_store.clone(),
3069 lang_registry.clone(),
3070 fs.clone(),
3071 cx,
3072 )
3073 });
3074 let (worktree_a, _) = project_a
3075 .update(cx_a, |p, cx| {
3076 p.find_or_create_local_worktree("/root-1", true, cx)
3077 })
3078 .await
3079 .unwrap();
3080 worktree_a
3081 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3082 .await;
3083 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3084 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3085 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3086
3087 // Join the worktree as client B.
3088 let project_b = Project::remote(
3089 project_id,
3090 client_b.clone(),
3091 client_b.user_store.clone(),
3092 lang_registry.clone(),
3093 fs.clone(),
3094 &mut cx_b.to_async(),
3095 )
3096 .await
3097 .unwrap();
3098
3099 // Open the file on client B.
3100 let buffer_b = cx_b
3101 .background()
3102 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3103 .await
3104 .unwrap();
3105
3106 // Request references to a symbol as the guest.
3107 let fake_language_server = fake_language_servers.next().await.unwrap();
3108 fake_language_server.handle_request::<lsp::request::References, _, _>(
3109 |params, _| async move {
3110 assert_eq!(
3111 params.text_document_position.text_document.uri.as_str(),
3112 "file:///root-1/one.rs"
3113 );
3114 Ok(Some(vec![
3115 lsp::Location {
3116 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3117 range: lsp::Range::new(
3118 lsp::Position::new(0, 24),
3119 lsp::Position::new(0, 27),
3120 ),
3121 },
3122 lsp::Location {
3123 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3124 range: lsp::Range::new(
3125 lsp::Position::new(0, 35),
3126 lsp::Position::new(0, 38),
3127 ),
3128 },
3129 lsp::Location {
3130 uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3131 range: lsp::Range::new(
3132 lsp::Position::new(0, 37),
3133 lsp::Position::new(0, 40),
3134 ),
3135 },
3136 ]))
3137 },
3138 );
3139
3140 let references = project_b
3141 .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3142 .await
3143 .unwrap();
3144 cx_b.read(|cx| {
3145 assert_eq!(references.len(), 3);
3146 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3147
3148 let two_buffer = references[0].buffer.read(cx);
3149 let three_buffer = references[2].buffer.read(cx);
3150 assert_eq!(
3151 two_buffer.file().unwrap().path().as_ref(),
3152 Path::new("two.rs")
3153 );
3154 assert_eq!(references[1].buffer, references[0].buffer);
3155 assert_eq!(
3156 three_buffer.file().unwrap().full_path(cx),
3157 Path::new("three.rs")
3158 );
3159
3160 assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3161 assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3162 assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3163 });
3164 }
3165
3166 #[gpui::test(iterations = 10)]
3167 async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3168 cx_a.foreground().forbid_parking();
3169 let lang_registry = Arc::new(LanguageRegistry::test());
3170 let fs = FakeFs::new(cx_a.background());
3171 fs.insert_tree(
3172 "/root-1",
3173 json!({
3174 ".zed.toml": r#"collaborators = ["user_b"]"#,
3175 "a": "hello world",
3176 "b": "goodnight moon",
3177 "c": "a world of goo",
3178 "d": "world champion of clown world",
3179 }),
3180 )
3181 .await;
3182 fs.insert_tree(
3183 "/root-2",
3184 json!({
3185 "e": "disney world is fun",
3186 }),
3187 )
3188 .await;
3189
3190 // Connect to a server as 2 clients.
3191 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3192 let client_a = server.create_client(cx_a, "user_a").await;
3193 let client_b = server.create_client(cx_b, "user_b").await;
3194
3195 // Share a project as client A
3196 let project_a = cx_a.update(|cx| {
3197 Project::local(
3198 client_a.clone(),
3199 client_a.user_store.clone(),
3200 lang_registry.clone(),
3201 fs.clone(),
3202 cx,
3203 )
3204 });
3205 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3206
3207 let (worktree_1, _) = project_a
3208 .update(cx_a, |p, cx| {
3209 p.find_or_create_local_worktree("/root-1", true, cx)
3210 })
3211 .await
3212 .unwrap();
3213 worktree_1
3214 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3215 .await;
3216 let (worktree_2, _) = project_a
3217 .update(cx_a, |p, cx| {
3218 p.find_or_create_local_worktree("/root-2", true, cx)
3219 })
3220 .await
3221 .unwrap();
3222 worktree_2
3223 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3224 .await;
3225
3226 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3227
3228 // Join the worktree as client B.
3229 let project_b = Project::remote(
3230 project_id,
3231 client_b.clone(),
3232 client_b.user_store.clone(),
3233 lang_registry.clone(),
3234 fs.clone(),
3235 &mut cx_b.to_async(),
3236 )
3237 .await
3238 .unwrap();
3239
3240 let results = project_b
3241 .update(cx_b, |project, cx| {
3242 project.search(SearchQuery::text("world", false, false), cx)
3243 })
3244 .await
3245 .unwrap();
3246
3247 let mut ranges_by_path = results
3248 .into_iter()
3249 .map(|(buffer, ranges)| {
3250 buffer.read_with(cx_b, |buffer, cx| {
3251 let path = buffer.file().unwrap().full_path(cx);
3252 let offset_ranges = ranges
3253 .into_iter()
3254 .map(|range| range.to_offset(buffer))
3255 .collect::<Vec<_>>();
3256 (path, offset_ranges)
3257 })
3258 })
3259 .collect::<Vec<_>>();
3260 ranges_by_path.sort_by_key(|(path, _)| path.clone());
3261
3262 assert_eq!(
3263 ranges_by_path,
3264 &[
3265 (PathBuf::from("root-1/a"), vec![6..11]),
3266 (PathBuf::from("root-1/c"), vec![2..7]),
3267 (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3268 (PathBuf::from("root-2/e"), vec![7..12]),
3269 ]
3270 );
3271 }
3272
3273 #[gpui::test(iterations = 10)]
3274 async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3275 cx_a.foreground().forbid_parking();
3276 let lang_registry = Arc::new(LanguageRegistry::test());
3277 let fs = FakeFs::new(cx_a.background());
3278 fs.insert_tree(
3279 "/root-1",
3280 json!({
3281 ".zed.toml": r#"collaborators = ["user_b"]"#,
3282 "main.rs": "fn double(number: i32) -> i32 { number + number }",
3283 }),
3284 )
3285 .await;
3286
3287 // Set up a fake language server.
3288 let mut language = Language::new(
3289 LanguageConfig {
3290 name: "Rust".into(),
3291 path_suffixes: vec!["rs".to_string()],
3292 ..Default::default()
3293 },
3294 Some(tree_sitter_rust::language()),
3295 );
3296 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3297 lang_registry.add(Arc::new(language));
3298
3299 // Connect to a server as 2 clients.
3300 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3301 let client_a = server.create_client(cx_a, "user_a").await;
3302 let client_b = server.create_client(cx_b, "user_b").await;
3303
3304 // Share a project as client A
3305 let project_a = cx_a.update(|cx| {
3306 Project::local(
3307 client_a.clone(),
3308 client_a.user_store.clone(),
3309 lang_registry.clone(),
3310 fs.clone(),
3311 cx,
3312 )
3313 });
3314 let (worktree_a, _) = project_a
3315 .update(cx_a, |p, cx| {
3316 p.find_or_create_local_worktree("/root-1", true, cx)
3317 })
3318 .await
3319 .unwrap();
3320 worktree_a
3321 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3322 .await;
3323 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3324 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3325 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3326
3327 // Join the worktree as client B.
3328 let project_b = Project::remote(
3329 project_id,
3330 client_b.clone(),
3331 client_b.user_store.clone(),
3332 lang_registry.clone(),
3333 fs.clone(),
3334 &mut cx_b.to_async(),
3335 )
3336 .await
3337 .unwrap();
3338
3339 // Open the file on client B.
3340 let buffer_b = cx_b
3341 .background()
3342 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
3343 .await
3344 .unwrap();
3345
3346 // Request document highlights as the guest.
3347 let fake_language_server = fake_language_servers.next().await.unwrap();
3348 fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
3349 |params, _| async move {
3350 assert_eq!(
3351 params
3352 .text_document_position_params
3353 .text_document
3354 .uri
3355 .as_str(),
3356 "file:///root-1/main.rs"
3357 );
3358 assert_eq!(
3359 params.text_document_position_params.position,
3360 lsp::Position::new(0, 34)
3361 );
3362 Ok(Some(vec![
3363 lsp::DocumentHighlight {
3364 kind: Some(lsp::DocumentHighlightKind::WRITE),
3365 range: lsp::Range::new(
3366 lsp::Position::new(0, 10),
3367 lsp::Position::new(0, 16),
3368 ),
3369 },
3370 lsp::DocumentHighlight {
3371 kind: Some(lsp::DocumentHighlightKind::READ),
3372 range: lsp::Range::new(
3373 lsp::Position::new(0, 32),
3374 lsp::Position::new(0, 38),
3375 ),
3376 },
3377 lsp::DocumentHighlight {
3378 kind: Some(lsp::DocumentHighlightKind::READ),
3379 range: lsp::Range::new(
3380 lsp::Position::new(0, 41),
3381 lsp::Position::new(0, 47),
3382 ),
3383 },
3384 ]))
3385 },
3386 );
3387
3388 let highlights = project_b
3389 .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
3390 .await
3391 .unwrap();
3392 buffer_b.read_with(cx_b, |buffer, _| {
3393 let snapshot = buffer.snapshot();
3394
3395 let highlights = highlights
3396 .into_iter()
3397 .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3398 .collect::<Vec<_>>();
3399 assert_eq!(
3400 highlights,
3401 &[
3402 (lsp::DocumentHighlightKind::WRITE, 10..16),
3403 (lsp::DocumentHighlightKind::READ, 32..38),
3404 (lsp::DocumentHighlightKind::READ, 41..47)
3405 ]
3406 )
3407 });
3408 }
3409
3410 #[gpui::test(iterations = 10)]
3411 async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3412 cx_a.foreground().forbid_parking();
3413 let lang_registry = Arc::new(LanguageRegistry::test());
3414 let fs = FakeFs::new(cx_a.background());
3415 fs.insert_tree(
3416 "/code",
3417 json!({
3418 "crate-1": {
3419 ".zed.toml": r#"collaborators = ["user_b"]"#,
3420 "one.rs": "const ONE: usize = 1;",
3421 },
3422 "crate-2": {
3423 "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3424 },
3425 "private": {
3426 "passwords.txt": "the-password",
3427 }
3428 }),
3429 )
3430 .await;
3431
3432 // Set up a fake language server.
3433 let mut language = Language::new(
3434 LanguageConfig {
3435 name: "Rust".into(),
3436 path_suffixes: vec!["rs".to_string()],
3437 ..Default::default()
3438 },
3439 Some(tree_sitter_rust::language()),
3440 );
3441 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3442 lang_registry.add(Arc::new(language));
3443
3444 // Connect to a server as 2 clients.
3445 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3446 let client_a = server.create_client(cx_a, "user_a").await;
3447 let client_b = server.create_client(cx_b, "user_b").await;
3448
3449 // Share a project as client A
3450 let project_a = cx_a.update(|cx| {
3451 Project::local(
3452 client_a.clone(),
3453 client_a.user_store.clone(),
3454 lang_registry.clone(),
3455 fs.clone(),
3456 cx,
3457 )
3458 });
3459 let (worktree_a, _) = project_a
3460 .update(cx_a, |p, cx| {
3461 p.find_or_create_local_worktree("/code/crate-1", true, cx)
3462 })
3463 .await
3464 .unwrap();
3465 worktree_a
3466 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3467 .await;
3468 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3469 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3470 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3471
3472 // Join the worktree as client B.
3473 let project_b = Project::remote(
3474 project_id,
3475 client_b.clone(),
3476 client_b.user_store.clone(),
3477 lang_registry.clone(),
3478 fs.clone(),
3479 &mut cx_b.to_async(),
3480 )
3481 .await
3482 .unwrap();
3483
3484 // Cause the language server to start.
3485 let _buffer = cx_b
3486 .background()
3487 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3488 .await
3489 .unwrap();
3490
3491 let fake_language_server = fake_language_servers.next().await.unwrap();
3492 fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
3493 |_, _| async move {
3494 #[allow(deprecated)]
3495 Ok(Some(vec![lsp::SymbolInformation {
3496 name: "TWO".into(),
3497 location: lsp::Location {
3498 uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3499 range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3500 },
3501 kind: lsp::SymbolKind::CONSTANT,
3502 tags: None,
3503 container_name: None,
3504 deprecated: None,
3505 }]))
3506 },
3507 );
3508
3509 // Request the definition of a symbol as the guest.
3510 let symbols = project_b
3511 .update(cx_b, |p, cx| p.symbols("two", cx))
3512 .await
3513 .unwrap();
3514 assert_eq!(symbols.len(), 1);
3515 assert_eq!(symbols[0].name, "TWO");
3516
3517 // Open one of the returned symbols.
3518 let buffer_b_2 = project_b
3519 .update(cx_b, |project, cx| {
3520 project.open_buffer_for_symbol(&symbols[0], cx)
3521 })
3522 .await
3523 .unwrap();
3524 buffer_b_2.read_with(cx_b, |buffer, _| {
3525 assert_eq!(
3526 buffer.file().unwrap().path().as_ref(),
3527 Path::new("../crate-2/two.rs")
3528 );
3529 });
3530
3531 // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3532 let mut fake_symbol = symbols[0].clone();
3533 fake_symbol.path = Path::new("/code/secrets").into();
3534 let error = project_b
3535 .update(cx_b, |project, cx| {
3536 project.open_buffer_for_symbol(&fake_symbol, cx)
3537 })
3538 .await
3539 .unwrap_err();
3540 assert!(error.to_string().contains("invalid symbol signature"));
3541 }
3542
3543 #[gpui::test(iterations = 10)]
3544 async fn test_open_buffer_while_getting_definition_pointing_to_it(
3545 cx_a: &mut TestAppContext,
3546 cx_b: &mut TestAppContext,
3547 mut rng: StdRng,
3548 ) {
3549 cx_a.foreground().forbid_parking();
3550 let lang_registry = Arc::new(LanguageRegistry::test());
3551 let fs = FakeFs::new(cx_a.background());
3552 fs.insert_tree(
3553 "/root",
3554 json!({
3555 ".zed.toml": r#"collaborators = ["user_b"]"#,
3556 "a.rs": "const ONE: usize = b::TWO;",
3557 "b.rs": "const TWO: usize = 2",
3558 }),
3559 )
3560 .await;
3561
3562 // Set up a fake language server.
3563 let mut language = Language::new(
3564 LanguageConfig {
3565 name: "Rust".into(),
3566 path_suffixes: vec!["rs".to_string()],
3567 ..Default::default()
3568 },
3569 Some(tree_sitter_rust::language()),
3570 );
3571 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3572 lang_registry.add(Arc::new(language));
3573
3574 // Connect to a server as 2 clients.
3575 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3576 let client_a = server.create_client(cx_a, "user_a").await;
3577 let client_b = server.create_client(cx_b, "user_b").await;
3578
3579 // Share a project as client A
3580 let project_a = cx_a.update(|cx| {
3581 Project::local(
3582 client_a.clone(),
3583 client_a.user_store.clone(),
3584 lang_registry.clone(),
3585 fs.clone(),
3586 cx,
3587 )
3588 });
3589
3590 let (worktree_a, _) = project_a
3591 .update(cx_a, |p, cx| {
3592 p.find_or_create_local_worktree("/root", true, cx)
3593 })
3594 .await
3595 .unwrap();
3596 worktree_a
3597 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3598 .await;
3599 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3600 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3601 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3602
3603 // Join the worktree as client B.
3604 let project_b = Project::remote(
3605 project_id,
3606 client_b.clone(),
3607 client_b.user_store.clone(),
3608 lang_registry.clone(),
3609 fs.clone(),
3610 &mut cx_b.to_async(),
3611 )
3612 .await
3613 .unwrap();
3614
3615 let buffer_b1 = cx_b
3616 .background()
3617 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3618 .await
3619 .unwrap();
3620
3621 let fake_language_server = fake_language_servers.next().await.unwrap();
3622 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3623 |_, _| async move {
3624 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3625 lsp::Location::new(
3626 lsp::Url::from_file_path("/root/b.rs").unwrap(),
3627 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3628 ),
3629 )))
3630 },
3631 );
3632
3633 let definitions;
3634 let buffer_b2;
3635 if rng.gen() {
3636 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3637 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3638 } else {
3639 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3640 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3641 }
3642
3643 let buffer_b2 = buffer_b2.await.unwrap();
3644 let definitions = definitions.await.unwrap();
3645 assert_eq!(definitions.len(), 1);
3646 assert_eq!(definitions[0].buffer, buffer_b2);
3647 }
3648
3649 #[gpui::test(iterations = 10)]
3650 async fn test_collaborating_with_code_actions(
3651 cx_a: &mut TestAppContext,
3652 cx_b: &mut TestAppContext,
3653 ) {
3654 cx_a.foreground().forbid_parking();
3655 let lang_registry = Arc::new(LanguageRegistry::test());
3656 let fs = FakeFs::new(cx_a.background());
3657 cx_b.update(|cx| editor::init(cx));
3658
3659 // Set up a fake language server.
3660 let mut language = Language::new(
3661 LanguageConfig {
3662 name: "Rust".into(),
3663 path_suffixes: vec!["rs".to_string()],
3664 ..Default::default()
3665 },
3666 Some(tree_sitter_rust::language()),
3667 );
3668 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3669 lang_registry.add(Arc::new(language));
3670
3671 // Connect to a server as 2 clients.
3672 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3673 let client_a = server.create_client(cx_a, "user_a").await;
3674 let client_b = server.create_client(cx_b, "user_b").await;
3675
3676 // Share a project as client A
3677 fs.insert_tree(
3678 "/a",
3679 json!({
3680 ".zed.toml": r#"collaborators = ["user_b"]"#,
3681 "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3682 "other.rs": "pub fn foo() -> usize { 4 }",
3683 }),
3684 )
3685 .await;
3686 let project_a = cx_a.update(|cx| {
3687 Project::local(
3688 client_a.clone(),
3689 client_a.user_store.clone(),
3690 lang_registry.clone(),
3691 fs.clone(),
3692 cx,
3693 )
3694 });
3695 let (worktree_a, _) = project_a
3696 .update(cx_a, |p, cx| {
3697 p.find_or_create_local_worktree("/a", true, cx)
3698 })
3699 .await
3700 .unwrap();
3701 worktree_a
3702 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3703 .await;
3704 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3705 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3706 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3707
3708 // Join the worktree as client B.
3709 let project_b = Project::remote(
3710 project_id,
3711 client_b.clone(),
3712 client_b.user_store.clone(),
3713 lang_registry.clone(),
3714 fs.clone(),
3715 &mut cx_b.to_async(),
3716 )
3717 .await
3718 .unwrap();
3719 let mut params = cx_b.update(WorkspaceParams::test);
3720 params.languages = lang_registry.clone();
3721 params.client = client_b.client.clone();
3722 params.user_store = client_b.user_store.clone();
3723 params.project = project_b;
3724
3725 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
3726 let editor_b = workspace_b
3727 .update(cx_b, |workspace, cx| {
3728 workspace.open_path((worktree_id, "main.rs"), cx)
3729 })
3730 .await
3731 .unwrap()
3732 .downcast::<Editor>()
3733 .unwrap();
3734
3735 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3736 fake_language_server
3737 .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
3738 assert_eq!(
3739 params.text_document.uri,
3740 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3741 );
3742 assert_eq!(params.range.start, lsp::Position::new(0, 0));
3743 assert_eq!(params.range.end, lsp::Position::new(0, 0));
3744 Ok(None)
3745 })
3746 .next()
3747 .await;
3748
3749 // Move cursor to a location that contains code actions.
3750 editor_b.update(cx_b, |editor, cx| {
3751 editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3752 cx.focus(&editor_b);
3753 });
3754
3755 fake_language_server
3756 .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
3757 assert_eq!(
3758 params.text_document.uri,
3759 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3760 );
3761 assert_eq!(params.range.start, lsp::Position::new(1, 31));
3762 assert_eq!(params.range.end, lsp::Position::new(1, 31));
3763
3764 Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
3765 lsp::CodeAction {
3766 title: "Inline into all callers".to_string(),
3767 edit: Some(lsp::WorkspaceEdit {
3768 changes: Some(
3769 [
3770 (
3771 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3772 vec![lsp::TextEdit::new(
3773 lsp::Range::new(
3774 lsp::Position::new(1, 22),
3775 lsp::Position::new(1, 34),
3776 ),
3777 "4".to_string(),
3778 )],
3779 ),
3780 (
3781 lsp::Url::from_file_path("/a/other.rs").unwrap(),
3782 vec![lsp::TextEdit::new(
3783 lsp::Range::new(
3784 lsp::Position::new(0, 0),
3785 lsp::Position::new(0, 27),
3786 ),
3787 "".to_string(),
3788 )],
3789 ),
3790 ]
3791 .into_iter()
3792 .collect(),
3793 ),
3794 ..Default::default()
3795 }),
3796 data: Some(json!({
3797 "codeActionParams": {
3798 "range": {
3799 "start": {"line": 1, "column": 31},
3800 "end": {"line": 1, "column": 31},
3801 }
3802 }
3803 })),
3804 ..Default::default()
3805 },
3806 )]))
3807 })
3808 .next()
3809 .await;
3810
3811 // Toggle code actions and wait for them to display.
3812 editor_b.update(cx_b, |editor, cx| {
3813 editor.toggle_code_actions(
3814 &ToggleCodeActions {
3815 deployed_from_indicator: false,
3816 },
3817 cx,
3818 );
3819 });
3820 editor_b
3821 .condition(&cx_b, |editor, _| editor.context_menu_visible())
3822 .await;
3823
3824 fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3825
3826 // Confirming the code action will trigger a resolve request.
3827 let confirm_action = workspace_b
3828 .update(cx_b, |workspace, cx| {
3829 Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
3830 })
3831 .unwrap();
3832 fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
3833 |_, _| async move {
3834 Ok(lsp::CodeAction {
3835 title: "Inline into all callers".to_string(),
3836 edit: Some(lsp::WorkspaceEdit {
3837 changes: Some(
3838 [
3839 (
3840 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3841 vec![lsp::TextEdit::new(
3842 lsp::Range::new(
3843 lsp::Position::new(1, 22),
3844 lsp::Position::new(1, 34),
3845 ),
3846 "4".to_string(),
3847 )],
3848 ),
3849 (
3850 lsp::Url::from_file_path("/a/other.rs").unwrap(),
3851 vec![lsp::TextEdit::new(
3852 lsp::Range::new(
3853 lsp::Position::new(0, 0),
3854 lsp::Position::new(0, 27),
3855 ),
3856 "".to_string(),
3857 )],
3858 ),
3859 ]
3860 .into_iter()
3861 .collect(),
3862 ),
3863 ..Default::default()
3864 }),
3865 ..Default::default()
3866 })
3867 },
3868 );
3869
3870 // After the action is confirmed, an editor containing both modified files is opened.
3871 confirm_action.await.unwrap();
3872 let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3873 workspace
3874 .active_item(cx)
3875 .unwrap()
3876 .downcast::<Editor>()
3877 .unwrap()
3878 });
3879 code_action_editor.update(cx_b, |editor, cx| {
3880 assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
3881 editor.undo(&Undo, cx);
3882 assert_eq!(
3883 editor.text(cx),
3884 "mod other;\nfn main() { let foo = other::foo(); }\npub fn foo() -> usize { 4 }"
3885 );
3886 editor.redo(&Redo, cx);
3887 assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
3888 });
3889 }
3890
3891 #[gpui::test(iterations = 10)]
3892 async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3893 cx_a.foreground().forbid_parking();
3894 let lang_registry = Arc::new(LanguageRegistry::test());
3895 let fs = FakeFs::new(cx_a.background());
3896 cx_b.update(|cx| editor::init(cx));
3897
3898 // Set up a fake language server.
3899 let mut language = Language::new(
3900 LanguageConfig {
3901 name: "Rust".into(),
3902 path_suffixes: vec!["rs".to_string()],
3903 ..Default::default()
3904 },
3905 Some(tree_sitter_rust::language()),
3906 );
3907 let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
3908 capabilities: lsp::ServerCapabilities {
3909 rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
3910 prepare_provider: Some(true),
3911 work_done_progress_options: Default::default(),
3912 })),
3913 ..Default::default()
3914 },
3915 ..Default::default()
3916 });
3917 lang_registry.add(Arc::new(language));
3918
3919 // Connect to a server as 2 clients.
3920 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3921 let client_a = server.create_client(cx_a, "user_a").await;
3922 let client_b = server.create_client(cx_b, "user_b").await;
3923
3924 // Share a project as client A
3925 fs.insert_tree(
3926 "/dir",
3927 json!({
3928 ".zed.toml": r#"collaborators = ["user_b"]"#,
3929 "one.rs": "const ONE: usize = 1;",
3930 "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3931 }),
3932 )
3933 .await;
3934 let project_a = cx_a.update(|cx| {
3935 Project::local(
3936 client_a.clone(),
3937 client_a.user_store.clone(),
3938 lang_registry.clone(),
3939 fs.clone(),
3940 cx,
3941 )
3942 });
3943 let (worktree_a, _) = project_a
3944 .update(cx_a, |p, cx| {
3945 p.find_or_create_local_worktree("/dir", true, cx)
3946 })
3947 .await
3948 .unwrap();
3949 worktree_a
3950 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3951 .await;
3952 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3953 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3954 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3955
3956 // Join the worktree as client B.
3957 let project_b = Project::remote(
3958 project_id,
3959 client_b.clone(),
3960 client_b.user_store.clone(),
3961 lang_registry.clone(),
3962 fs.clone(),
3963 &mut cx_b.to_async(),
3964 )
3965 .await
3966 .unwrap();
3967 let mut params = cx_b.update(WorkspaceParams::test);
3968 params.languages = lang_registry.clone();
3969 params.client = client_b.client.clone();
3970 params.user_store = client_b.user_store.clone();
3971 params.project = project_b;
3972
3973 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
3974 let editor_b = workspace_b
3975 .update(cx_b, |workspace, cx| {
3976 workspace.open_path((worktree_id, "one.rs"), cx)
3977 })
3978 .await
3979 .unwrap()
3980 .downcast::<Editor>()
3981 .unwrap();
3982 let fake_language_server = fake_language_servers.next().await.unwrap();
3983
3984 // Move cursor to a location that can be renamed.
3985 let prepare_rename = editor_b.update(cx_b, |editor, cx| {
3986 editor.select_ranges([7..7], None, cx);
3987 editor.rename(&Rename, cx).unwrap()
3988 });
3989
3990 fake_language_server
3991 .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
3992 assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3993 assert_eq!(params.position, lsp::Position::new(0, 7));
3994 Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3995 lsp::Position::new(0, 6),
3996 lsp::Position::new(0, 9),
3997 ))))
3998 })
3999 .next()
4000 .await
4001 .unwrap();
4002 prepare_rename.await.unwrap();
4003 editor_b.update(cx_b, |editor, cx| {
4004 let rename = editor.pending_rename().unwrap();
4005 let buffer = editor.buffer().read(cx).snapshot(cx);
4006 assert_eq!(
4007 rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
4008 6..9
4009 );
4010 rename.editor.update(cx, |rename_editor, cx| {
4011 rename_editor.buffer().update(cx, |rename_buffer, cx| {
4012 rename_buffer.edit([(0..3, "THREE")], cx);
4013 });
4014 });
4015 });
4016
4017 let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
4018 Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
4019 });
4020 fake_language_server
4021 .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
4022 assert_eq!(
4023 params.text_document_position.text_document.uri.as_str(),
4024 "file:///dir/one.rs"
4025 );
4026 assert_eq!(
4027 params.text_document_position.position,
4028 lsp::Position::new(0, 6)
4029 );
4030 assert_eq!(params.new_name, "THREE");
4031 Ok(Some(lsp::WorkspaceEdit {
4032 changes: Some(
4033 [
4034 (
4035 lsp::Url::from_file_path("/dir/one.rs").unwrap(),
4036 vec![lsp::TextEdit::new(
4037 lsp::Range::new(
4038 lsp::Position::new(0, 6),
4039 lsp::Position::new(0, 9),
4040 ),
4041 "THREE".to_string(),
4042 )],
4043 ),
4044 (
4045 lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4046 vec![
4047 lsp::TextEdit::new(
4048 lsp::Range::new(
4049 lsp::Position::new(0, 24),
4050 lsp::Position::new(0, 27),
4051 ),
4052 "THREE".to_string(),
4053 ),
4054 lsp::TextEdit::new(
4055 lsp::Range::new(
4056 lsp::Position::new(0, 35),
4057 lsp::Position::new(0, 38),
4058 ),
4059 "THREE".to_string(),
4060 ),
4061 ],
4062 ),
4063 ]
4064 .into_iter()
4065 .collect(),
4066 ),
4067 ..Default::default()
4068 }))
4069 })
4070 .next()
4071 .await
4072 .unwrap();
4073 confirm_rename.await.unwrap();
4074
4075 let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4076 workspace
4077 .active_item(cx)
4078 .unwrap()
4079 .downcast::<Editor>()
4080 .unwrap()
4081 });
4082 rename_editor.update(cx_b, |editor, cx| {
4083 assert_eq!(
4084 editor.text(cx),
4085 "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4086 );
4087 editor.undo(&Undo, cx);
4088 assert_eq!(
4089 editor.text(cx),
4090 "const ONE: usize = 1;\nconst TWO: usize = one::ONE + one::ONE;"
4091 );
4092 editor.redo(&Redo, cx);
4093 assert_eq!(
4094 editor.text(cx),
4095 "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4096 );
4097 });
4098
4099 // Ensure temporary rename edits cannot be undone/redone.
4100 editor_b.update(cx_b, |editor, cx| {
4101 editor.undo(&Undo, cx);
4102 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4103 editor.undo(&Undo, cx);
4104 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4105 editor.redo(&Redo, cx);
4106 assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4107 })
4108 }
4109
4110 #[gpui::test(iterations = 10)]
4111 async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4112 cx_a.foreground().forbid_parking();
4113
4114 // Connect to a server as 2 clients.
4115 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4116 let client_a = server.create_client(cx_a, "user_a").await;
4117 let client_b = server.create_client(cx_b, "user_b").await;
4118
4119 // Create an org that includes these 2 users.
4120 let db = &server.app_state.db;
4121 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4122 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4123 .await
4124 .unwrap();
4125 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4126 .await
4127 .unwrap();
4128
4129 // Create a channel that includes all the users.
4130 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4131 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4132 .await
4133 .unwrap();
4134 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4135 .await
4136 .unwrap();
4137 db.create_channel_message(
4138 channel_id,
4139 client_b.current_user_id(&cx_b),
4140 "hello A, it's B.",
4141 OffsetDateTime::now_utc(),
4142 1,
4143 )
4144 .await
4145 .unwrap();
4146
4147 let channels_a = cx_a
4148 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4149 channels_a
4150 .condition(cx_a, |list, _| list.available_channels().is_some())
4151 .await;
4152 channels_a.read_with(cx_a, |list, _| {
4153 assert_eq!(
4154 list.available_channels().unwrap(),
4155 &[ChannelDetails {
4156 id: channel_id.to_proto(),
4157 name: "test-channel".to_string()
4158 }]
4159 )
4160 });
4161 let channel_a = channels_a.update(cx_a, |this, cx| {
4162 this.get_channel(channel_id.to_proto(), cx).unwrap()
4163 });
4164 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4165 channel_a
4166 .condition(&cx_a, |channel, _| {
4167 channel_messages(channel)
4168 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4169 })
4170 .await;
4171
4172 let channels_b = cx_b
4173 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4174 channels_b
4175 .condition(cx_b, |list, _| list.available_channels().is_some())
4176 .await;
4177 channels_b.read_with(cx_b, |list, _| {
4178 assert_eq!(
4179 list.available_channels().unwrap(),
4180 &[ChannelDetails {
4181 id: channel_id.to_proto(),
4182 name: "test-channel".to_string()
4183 }]
4184 )
4185 });
4186
4187 let channel_b = channels_b.update(cx_b, |this, cx| {
4188 this.get_channel(channel_id.to_proto(), cx).unwrap()
4189 });
4190 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4191 channel_b
4192 .condition(&cx_b, |channel, _| {
4193 channel_messages(channel)
4194 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4195 })
4196 .await;
4197
4198 channel_a
4199 .update(cx_a, |channel, cx| {
4200 channel
4201 .send_message("oh, hi B.".to_string(), cx)
4202 .unwrap()
4203 .detach();
4204 let task = channel.send_message("sup".to_string(), cx).unwrap();
4205 assert_eq!(
4206 channel_messages(channel),
4207 &[
4208 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4209 ("user_a".to_string(), "oh, hi B.".to_string(), true),
4210 ("user_a".to_string(), "sup".to_string(), true)
4211 ]
4212 );
4213 task
4214 })
4215 .await
4216 .unwrap();
4217
4218 channel_b
4219 .condition(&cx_b, |channel, _| {
4220 channel_messages(channel)
4221 == [
4222 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4223 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4224 ("user_a".to_string(), "sup".to_string(), false),
4225 ]
4226 })
4227 .await;
4228
4229 assert_eq!(
4230 server
4231 .state()
4232 .await
4233 .channel(channel_id)
4234 .unwrap()
4235 .connection_ids
4236 .len(),
4237 2
4238 );
4239 cx_b.update(|_| drop(channel_b));
4240 server
4241 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4242 .await;
4243
4244 cx_a.update(|_| drop(channel_a));
4245 server
4246 .condition(|state| state.channel(channel_id).is_none())
4247 .await;
4248 }
4249
4250 #[gpui::test(iterations = 10)]
4251 async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4252 cx_a.foreground().forbid_parking();
4253
4254 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4255 let client_a = server.create_client(cx_a, "user_a").await;
4256
4257 let db = &server.app_state.db;
4258 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4259 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4260 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4261 .await
4262 .unwrap();
4263 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4264 .await
4265 .unwrap();
4266
4267 let channels_a = cx_a
4268 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4269 channels_a
4270 .condition(cx_a, |list, _| list.available_channels().is_some())
4271 .await;
4272 let channel_a = channels_a.update(cx_a, |this, cx| {
4273 this.get_channel(channel_id.to_proto(), cx).unwrap()
4274 });
4275
4276 // Messages aren't allowed to be too long.
4277 channel_a
4278 .update(cx_a, |channel, cx| {
4279 let long_body = "this is long.\n".repeat(1024);
4280 channel.send_message(long_body, cx).unwrap()
4281 })
4282 .await
4283 .unwrap_err();
4284
4285 // Messages aren't allowed to be blank.
4286 channel_a.update(cx_a, |channel, cx| {
4287 channel.send_message(String::new(), cx).unwrap_err()
4288 });
4289
4290 // Leading and trailing whitespace are trimmed.
4291 channel_a
4292 .update(cx_a, |channel, cx| {
4293 channel
4294 .send_message("\n surrounded by whitespace \n".to_string(), cx)
4295 .unwrap()
4296 })
4297 .await
4298 .unwrap();
4299 assert_eq!(
4300 db.get_channel_messages(channel_id, 10, None)
4301 .await
4302 .unwrap()
4303 .iter()
4304 .map(|m| &m.body)
4305 .collect::<Vec<_>>(),
4306 &["surrounded by whitespace"]
4307 );
4308 }
4309
4310 #[gpui::test(iterations = 10)]
4311 async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4312 cx_a.foreground().forbid_parking();
4313
4314 // Connect to a server as 2 clients.
4315 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4316 let client_a = server.create_client(cx_a, "user_a").await;
4317 let client_b = server.create_client(cx_b, "user_b").await;
4318 let mut status_b = client_b.status();
4319
4320 // Create an org that includes these 2 users.
4321 let db = &server.app_state.db;
4322 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4323 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4324 .await
4325 .unwrap();
4326 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4327 .await
4328 .unwrap();
4329
4330 // Create a channel that includes all the users.
4331 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4332 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4333 .await
4334 .unwrap();
4335 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4336 .await
4337 .unwrap();
4338 db.create_channel_message(
4339 channel_id,
4340 client_b.current_user_id(&cx_b),
4341 "hello A, it's B.",
4342 OffsetDateTime::now_utc(),
4343 2,
4344 )
4345 .await
4346 .unwrap();
4347
4348 let channels_a = cx_a
4349 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4350 channels_a
4351 .condition(cx_a, |list, _| list.available_channels().is_some())
4352 .await;
4353
4354 channels_a.read_with(cx_a, |list, _| {
4355 assert_eq!(
4356 list.available_channels().unwrap(),
4357 &[ChannelDetails {
4358 id: channel_id.to_proto(),
4359 name: "test-channel".to_string()
4360 }]
4361 )
4362 });
4363 let channel_a = channels_a.update(cx_a, |this, cx| {
4364 this.get_channel(channel_id.to_proto(), cx).unwrap()
4365 });
4366 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4367 channel_a
4368 .condition(&cx_a, |channel, _| {
4369 channel_messages(channel)
4370 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4371 })
4372 .await;
4373
4374 let channels_b = cx_b
4375 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4376 channels_b
4377 .condition(cx_b, |list, _| list.available_channels().is_some())
4378 .await;
4379 channels_b.read_with(cx_b, |list, _| {
4380 assert_eq!(
4381 list.available_channels().unwrap(),
4382 &[ChannelDetails {
4383 id: channel_id.to_proto(),
4384 name: "test-channel".to_string()
4385 }]
4386 )
4387 });
4388
4389 let channel_b = channels_b.update(cx_b, |this, cx| {
4390 this.get_channel(channel_id.to_proto(), cx).unwrap()
4391 });
4392 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4393 channel_b
4394 .condition(&cx_b, |channel, _| {
4395 channel_messages(channel)
4396 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4397 })
4398 .await;
4399
4400 // Disconnect client B, ensuring we can still access its cached channel data.
4401 server.forbid_connections();
4402 server.disconnect_client(client_b.current_user_id(&cx_b));
4403 cx_b.foreground().advance_clock(Duration::from_secs(3));
4404 while !matches!(
4405 status_b.next().await,
4406 Some(client::Status::ReconnectionError { .. })
4407 ) {}
4408
4409 channels_b.read_with(cx_b, |channels, _| {
4410 assert_eq!(
4411 channels.available_channels().unwrap(),
4412 [ChannelDetails {
4413 id: channel_id.to_proto(),
4414 name: "test-channel".to_string()
4415 }]
4416 )
4417 });
4418 channel_b.read_with(cx_b, |channel, _| {
4419 assert_eq!(
4420 channel_messages(channel),
4421 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4422 )
4423 });
4424
4425 // Send a message from client B while it is disconnected.
4426 channel_b
4427 .update(cx_b, |channel, cx| {
4428 let task = channel
4429 .send_message("can you see this?".to_string(), cx)
4430 .unwrap();
4431 assert_eq!(
4432 channel_messages(channel),
4433 &[
4434 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4435 ("user_b".to_string(), "can you see this?".to_string(), true)
4436 ]
4437 );
4438 task
4439 })
4440 .await
4441 .unwrap_err();
4442
4443 // Send a message from client A while B is disconnected.
4444 channel_a
4445 .update(cx_a, |channel, cx| {
4446 channel
4447 .send_message("oh, hi B.".to_string(), cx)
4448 .unwrap()
4449 .detach();
4450 let task = channel.send_message("sup".to_string(), cx).unwrap();
4451 assert_eq!(
4452 channel_messages(channel),
4453 &[
4454 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4455 ("user_a".to_string(), "oh, hi B.".to_string(), true),
4456 ("user_a".to_string(), "sup".to_string(), true)
4457 ]
4458 );
4459 task
4460 })
4461 .await
4462 .unwrap();
4463
4464 // Give client B a chance to reconnect.
4465 server.allow_connections();
4466 cx_b.foreground().advance_clock(Duration::from_secs(10));
4467
4468 // Verify that B sees the new messages upon reconnection, as well as the message client B
4469 // sent while offline.
4470 channel_b
4471 .condition(&cx_b, |channel, _| {
4472 channel_messages(channel)
4473 == [
4474 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4475 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4476 ("user_a".to_string(), "sup".to_string(), false),
4477 ("user_b".to_string(), "can you see this?".to_string(), false),
4478 ]
4479 })
4480 .await;
4481
4482 // Ensure client A and B can communicate normally after reconnection.
4483 channel_a
4484 .update(cx_a, |channel, cx| {
4485 channel.send_message("you online?".to_string(), cx).unwrap()
4486 })
4487 .await
4488 .unwrap();
4489 channel_b
4490 .condition(&cx_b, |channel, _| {
4491 channel_messages(channel)
4492 == [
4493 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4494 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4495 ("user_a".to_string(), "sup".to_string(), false),
4496 ("user_b".to_string(), "can you see this?".to_string(), false),
4497 ("user_a".to_string(), "you online?".to_string(), false),
4498 ]
4499 })
4500 .await;
4501
4502 channel_b
4503 .update(cx_b, |channel, cx| {
4504 channel.send_message("yep".to_string(), cx).unwrap()
4505 })
4506 .await
4507 .unwrap();
4508 channel_a
4509 .condition(&cx_a, |channel, _| {
4510 channel_messages(channel)
4511 == [
4512 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4513 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4514 ("user_a".to_string(), "sup".to_string(), false),
4515 ("user_b".to_string(), "can you see this?".to_string(), false),
4516 ("user_a".to_string(), "you online?".to_string(), false),
4517 ("user_b".to_string(), "yep".to_string(), false),
4518 ]
4519 })
4520 .await;
4521 }
4522
4523 #[gpui::test(iterations = 10)]
4524 async fn test_contacts(
4525 cx_a: &mut TestAppContext,
4526 cx_b: &mut TestAppContext,
4527 cx_c: &mut TestAppContext,
4528 ) {
4529 cx_a.foreground().forbid_parking();
4530 let lang_registry = Arc::new(LanguageRegistry::test());
4531 let fs = FakeFs::new(cx_a.background());
4532
4533 // Connect to a server as 3 clients.
4534 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4535 let client_a = server.create_client(cx_a, "user_a").await;
4536 let client_b = server.create_client(cx_b, "user_b").await;
4537 let client_c = server.create_client(cx_c, "user_c").await;
4538
4539 // Share a worktree as client A.
4540 fs.insert_tree(
4541 "/a",
4542 json!({
4543 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4544 }),
4545 )
4546 .await;
4547
4548 let project_a = cx_a.update(|cx| {
4549 Project::local(
4550 client_a.clone(),
4551 client_a.user_store.clone(),
4552 lang_registry.clone(),
4553 fs.clone(),
4554 cx,
4555 )
4556 });
4557 let (worktree_a, _) = project_a
4558 .update(cx_a, |p, cx| {
4559 p.find_or_create_local_worktree("/a", true, cx)
4560 })
4561 .await
4562 .unwrap();
4563 worktree_a
4564 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4565 .await;
4566
4567 client_a
4568 .user_store
4569 .condition(&cx_a, |user_store, _| {
4570 contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4571 })
4572 .await;
4573 client_b
4574 .user_store
4575 .condition(&cx_b, |user_store, _| {
4576 contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4577 })
4578 .await;
4579 client_c
4580 .user_store
4581 .condition(&cx_c, |user_store, _| {
4582 contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4583 })
4584 .await;
4585
4586 let project_id = project_a
4587 .update(cx_a, |project, _| project.next_remote_id())
4588 .await;
4589 project_a
4590 .update(cx_a, |project, cx| project.share(cx))
4591 .await
4592 .unwrap();
4593 client_a
4594 .user_store
4595 .condition(&cx_a, |user_store, _| {
4596 contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4597 })
4598 .await;
4599 client_b
4600 .user_store
4601 .condition(&cx_b, |user_store, _| {
4602 contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4603 })
4604 .await;
4605 client_c
4606 .user_store
4607 .condition(&cx_c, |user_store, _| {
4608 contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4609 })
4610 .await;
4611
4612 let _project_b = Project::remote(
4613 project_id,
4614 client_b.clone(),
4615 client_b.user_store.clone(),
4616 lang_registry.clone(),
4617 fs.clone(),
4618 &mut cx_b.to_async(),
4619 )
4620 .await
4621 .unwrap();
4622
4623 client_a
4624 .user_store
4625 .condition(&cx_a, |user_store, _| {
4626 contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4627 })
4628 .await;
4629 client_b
4630 .user_store
4631 .condition(&cx_b, |user_store, _| {
4632 contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4633 })
4634 .await;
4635 client_c
4636 .user_store
4637 .condition(&cx_c, |user_store, _| {
4638 contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4639 })
4640 .await;
4641
4642 project_a
4643 .condition(&cx_a, |project, _| {
4644 project.collaborators().contains_key(&client_b.peer_id)
4645 })
4646 .await;
4647
4648 cx_a.update(move |_| drop(project_a));
4649 client_a
4650 .user_store
4651 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4652 .await;
4653 client_b
4654 .user_store
4655 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4656 .await;
4657 client_c
4658 .user_store
4659 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4660 .await;
4661
4662 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, bool, Vec<&str>)>)> {
4663 user_store
4664 .contacts()
4665 .iter()
4666 .map(|contact| {
4667 let worktrees = contact
4668 .projects
4669 .iter()
4670 .map(|p| {
4671 (
4672 p.worktree_root_names[0].as_str(),
4673 p.is_shared,
4674 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4675 )
4676 })
4677 .collect();
4678 (contact.user.github_login.as_str(), worktrees)
4679 })
4680 .collect()
4681 }
4682 }
4683
4684 #[gpui::test(iterations = 10)]
4685 async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4686 cx_a.foreground().forbid_parking();
4687 let fs = FakeFs::new(cx_a.background());
4688
4689 // 2 clients connect to a server.
4690 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4691 let mut client_a = server.create_client(cx_a, "user_a").await;
4692 let mut client_b = server.create_client(cx_b, "user_b").await;
4693 cx_a.update(editor::init);
4694 cx_b.update(editor::init);
4695
4696 // Client A shares a project.
4697 fs.insert_tree(
4698 "/a",
4699 json!({
4700 ".zed.toml": r#"collaborators = ["user_b"]"#,
4701 "1.txt": "one",
4702 "2.txt": "two",
4703 "3.txt": "three",
4704 }),
4705 )
4706 .await;
4707 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
4708 project_a
4709 .update(cx_a, |project, cx| project.share(cx))
4710 .await
4711 .unwrap();
4712
4713 // Client B joins the project.
4714 let project_b = client_b
4715 .build_remote_project(
4716 project_a
4717 .read_with(cx_a, |project, _| project.remote_id())
4718 .unwrap(),
4719 cx_b,
4720 )
4721 .await;
4722
4723 // Client A opens some editors.
4724 let workspace_a = client_a.build_workspace(&project_a, cx_a);
4725 let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
4726 let editor_a1 = workspace_a
4727 .update(cx_a, |workspace, cx| {
4728 workspace.open_path((worktree_id, "1.txt"), cx)
4729 })
4730 .await
4731 .unwrap()
4732 .downcast::<Editor>()
4733 .unwrap();
4734 let editor_a2 = workspace_a
4735 .update(cx_a, |workspace, cx| {
4736 workspace.open_path((worktree_id, "2.txt"), cx)
4737 })
4738 .await
4739 .unwrap()
4740 .downcast::<Editor>()
4741 .unwrap();
4742
4743 // Client B opens an editor.
4744 let workspace_b = client_b.build_workspace(&project_b, cx_b);
4745 let editor_b1 = workspace_b
4746 .update(cx_b, |workspace, cx| {
4747 workspace.open_path((worktree_id, "1.txt"), cx)
4748 })
4749 .await
4750 .unwrap()
4751 .downcast::<Editor>()
4752 .unwrap();
4753
4754 let client_a_id = project_b.read_with(cx_b, |project, _| {
4755 project.collaborators().values().next().unwrap().peer_id
4756 });
4757 let client_b_id = project_a.read_with(cx_a, |project, _| {
4758 project.collaborators().values().next().unwrap().peer_id
4759 });
4760
4761 // When client B starts following client A, all visible view states are replicated to client B.
4762 editor_a1.update(cx_a, |editor, cx| editor.select_ranges([0..1], None, cx));
4763 editor_a2.update(cx_a, |editor, cx| editor.select_ranges([2..3], None, cx));
4764 workspace_b
4765 .update(cx_b, |workspace, cx| {
4766 workspace
4767 .toggle_follow(&ToggleFollow(client_a_id), cx)
4768 .unwrap()
4769 })
4770 .await
4771 .unwrap();
4772 let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
4773 workspace
4774 .active_item(cx)
4775 .unwrap()
4776 .downcast::<Editor>()
4777 .unwrap()
4778 });
4779 assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
4780 assert_eq!(
4781 editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
4782 Some((worktree_id, "2.txt").into())
4783 );
4784 assert_eq!(
4785 editor_b2.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
4786 vec![2..3]
4787 );
4788 assert_eq!(
4789 editor_b1.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
4790 vec![0..1]
4791 );
4792
4793 // When client A activates a different editor, client B does so as well.
4794 workspace_a.update(cx_a, |workspace, cx| {
4795 workspace.activate_item(&editor_a1, cx)
4796 });
4797 workspace_b
4798 .condition(cx_b, |workspace, cx| {
4799 workspace.active_item(cx).unwrap().id() == editor_b1.id()
4800 })
4801 .await;
4802
4803 // When client A navigates back and forth, client B does so as well.
4804 workspace_a
4805 .update(cx_a, |workspace, cx| {
4806 workspace::Pane::go_back(workspace, None, cx)
4807 })
4808 .await;
4809 workspace_b
4810 .condition(cx_b, |workspace, cx| {
4811 workspace.active_item(cx).unwrap().id() == editor_b2.id()
4812 })
4813 .await;
4814
4815 workspace_a
4816 .update(cx_a, |workspace, cx| {
4817 workspace::Pane::go_forward(workspace, None, cx)
4818 })
4819 .await;
4820 workspace_b
4821 .condition(cx_b, |workspace, cx| {
4822 workspace.active_item(cx).unwrap().id() == editor_b1.id()
4823 })
4824 .await;
4825
4826 // Changes to client A's editor are reflected on client B.
4827 editor_a1.update(cx_a, |editor, cx| {
4828 editor.select_ranges([1..1, 2..2], None, cx);
4829 });
4830 editor_b1
4831 .condition(cx_b, |editor, cx| {
4832 editor.selected_ranges(cx) == vec![1..1, 2..2]
4833 })
4834 .await;
4835
4836 editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
4837 editor_b1
4838 .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
4839 .await;
4840
4841 editor_a1.update(cx_a, |editor, cx| {
4842 editor.select_ranges([3..3], None, cx);
4843 editor.set_scroll_position(vec2f(0., 100.), cx);
4844 });
4845 editor_b1
4846 .condition(cx_b, |editor, cx| editor.selected_ranges(cx) == vec![3..3])
4847 .await;
4848
4849 // After unfollowing, client B stops receiving updates from client A.
4850 workspace_b.update(cx_b, |workspace, cx| {
4851 workspace.unfollow(&workspace.active_pane().clone(), cx)
4852 });
4853 workspace_a.update(cx_a, |workspace, cx| {
4854 workspace.activate_item(&editor_a2, cx)
4855 });
4856 cx_a.foreground().run_until_parked();
4857 assert_eq!(
4858 workspace_b.read_with(cx_b, |workspace, cx| workspace
4859 .active_item(cx)
4860 .unwrap()
4861 .id()),
4862 editor_b1.id()
4863 );
4864
4865 // Client A starts following client B.
4866 workspace_a
4867 .update(cx_a, |workspace, cx| {
4868 workspace
4869 .toggle_follow(&ToggleFollow(client_b_id), cx)
4870 .unwrap()
4871 })
4872 .await
4873 .unwrap();
4874 assert_eq!(
4875 workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
4876 Some(client_b_id)
4877 );
4878 assert_eq!(
4879 workspace_a.read_with(cx_a, |workspace, cx| workspace
4880 .active_item(cx)
4881 .unwrap()
4882 .id()),
4883 editor_a1.id()
4884 );
4885
4886 // Following interrupts when client B disconnects.
4887 client_b.disconnect(&cx_b.to_async()).unwrap();
4888 cx_a.foreground().run_until_parked();
4889 assert_eq!(
4890 workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
4891 None
4892 );
4893 }
4894
4895 #[gpui::test(iterations = 10)]
4896 async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4897 cx_a.foreground().forbid_parking();
4898 let fs = FakeFs::new(cx_a.background());
4899
4900 // 2 clients connect to a server.
4901 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4902 let mut client_a = server.create_client(cx_a, "user_a").await;
4903 let mut client_b = server.create_client(cx_b, "user_b").await;
4904 cx_a.update(editor::init);
4905 cx_b.update(editor::init);
4906
4907 // Client A shares a project.
4908 fs.insert_tree(
4909 "/a",
4910 json!({
4911 ".zed.toml": r#"collaborators = ["user_b"]"#,
4912 "1.txt": "one",
4913 "2.txt": "two",
4914 "3.txt": "three",
4915 "4.txt": "four",
4916 }),
4917 )
4918 .await;
4919 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
4920 project_a
4921 .update(cx_a, |project, cx| project.share(cx))
4922 .await
4923 .unwrap();
4924
4925 // Client B joins the project.
4926 let project_b = client_b
4927 .build_remote_project(
4928 project_a
4929 .read_with(cx_a, |project, _| project.remote_id())
4930 .unwrap(),
4931 cx_b,
4932 )
4933 .await;
4934
4935 // Client A opens some editors.
4936 let workspace_a = client_a.build_workspace(&project_a, cx_a);
4937 let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
4938 let _editor_a1 = workspace_a
4939 .update(cx_a, |workspace, cx| {
4940 workspace.open_path((worktree_id, "1.txt"), cx)
4941 })
4942 .await
4943 .unwrap()
4944 .downcast::<Editor>()
4945 .unwrap();
4946
4947 // Client B opens an editor.
4948 let workspace_b = client_b.build_workspace(&project_b, cx_b);
4949 let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
4950 let _editor_b1 = workspace_b
4951 .update(cx_b, |workspace, cx| {
4952 workspace.open_path((worktree_id, "2.txt"), cx)
4953 })
4954 .await
4955 .unwrap()
4956 .downcast::<Editor>()
4957 .unwrap();
4958
4959 // Clients A and B follow each other in split panes
4960 workspace_a
4961 .update(cx_a, |workspace, cx| {
4962 workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
4963 assert_ne!(*workspace.active_pane(), pane_a1);
4964 let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
4965 workspace
4966 .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
4967 .unwrap()
4968 })
4969 .await
4970 .unwrap();
4971 workspace_b
4972 .update(cx_b, |workspace, cx| {
4973 workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
4974 assert_ne!(*workspace.active_pane(), pane_b1);
4975 let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
4976 workspace
4977 .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
4978 .unwrap()
4979 })
4980 .await
4981 .unwrap();
4982
4983 workspace_a
4984 .update(cx_a, |workspace, cx| {
4985 workspace.activate_next_pane(cx);
4986 assert_eq!(*workspace.active_pane(), pane_a1);
4987 workspace.open_path((worktree_id, "3.txt"), cx)
4988 })
4989 .await
4990 .unwrap();
4991 workspace_b
4992 .update(cx_b, |workspace, cx| {
4993 workspace.activate_next_pane(cx);
4994 assert_eq!(*workspace.active_pane(), pane_b1);
4995 workspace.open_path((worktree_id, "4.txt"), cx)
4996 })
4997 .await
4998 .unwrap();
4999 cx_a.foreground().run_until_parked();
5000
5001 // Ensure leader updates don't change the active pane of followers
5002 workspace_a.read_with(cx_a, |workspace, _| {
5003 assert_eq!(*workspace.active_pane(), pane_a1);
5004 });
5005 workspace_b.read_with(cx_b, |workspace, _| {
5006 assert_eq!(*workspace.active_pane(), pane_b1);
5007 });
5008
5009 // Ensure peers following each other doesn't cause an infinite loop.
5010 assert_eq!(
5011 workspace_a.read_with(cx_a, |workspace, cx| workspace
5012 .active_item(cx)
5013 .unwrap()
5014 .project_path(cx)),
5015 Some((worktree_id, "3.txt").into())
5016 );
5017 workspace_a.update(cx_a, |workspace, cx| {
5018 assert_eq!(
5019 workspace.active_item(cx).unwrap().project_path(cx),
5020 Some((worktree_id, "3.txt").into())
5021 );
5022 workspace.activate_next_pane(cx);
5023 assert_eq!(
5024 workspace.active_item(cx).unwrap().project_path(cx),
5025 Some((worktree_id, "4.txt").into())
5026 );
5027 });
5028 workspace_b.update(cx_b, |workspace, cx| {
5029 assert_eq!(
5030 workspace.active_item(cx).unwrap().project_path(cx),
5031 Some((worktree_id, "4.txt").into())
5032 );
5033 workspace.activate_next_pane(cx);
5034 assert_eq!(
5035 workspace.active_item(cx).unwrap().project_path(cx),
5036 Some((worktree_id, "3.txt").into())
5037 );
5038 });
5039 }
5040
5041 #[gpui::test(iterations = 10)]
5042 async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5043 cx_a.foreground().forbid_parking();
5044 let fs = FakeFs::new(cx_a.background());
5045
5046 // 2 clients connect to a server.
5047 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5048 let mut client_a = server.create_client(cx_a, "user_a").await;
5049 let mut client_b = server.create_client(cx_b, "user_b").await;
5050 cx_a.update(editor::init);
5051 cx_b.update(editor::init);
5052
5053 // Client A shares a project.
5054 fs.insert_tree(
5055 "/a",
5056 json!({
5057 ".zed.toml": r#"collaborators = ["user_b"]"#,
5058 "1.txt": "one",
5059 "2.txt": "two",
5060 "3.txt": "three",
5061 }),
5062 )
5063 .await;
5064 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5065 project_a
5066 .update(cx_a, |project, cx| project.share(cx))
5067 .await
5068 .unwrap();
5069
5070 // Client B joins the project.
5071 let project_b = client_b
5072 .build_remote_project(
5073 project_a
5074 .read_with(cx_a, |project, _| project.remote_id())
5075 .unwrap(),
5076 cx_b,
5077 )
5078 .await;
5079
5080 // Client A opens some editors.
5081 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5082 let _editor_a1 = workspace_a
5083 .update(cx_a, |workspace, cx| {
5084 workspace.open_path((worktree_id, "1.txt"), cx)
5085 })
5086 .await
5087 .unwrap()
5088 .downcast::<Editor>()
5089 .unwrap();
5090
5091 // Client B starts following client A.
5092 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5093 let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5094 let leader_id = project_b.read_with(cx_b, |project, _| {
5095 project.collaborators().values().next().unwrap().peer_id
5096 });
5097 workspace_b
5098 .update(cx_b, |workspace, cx| {
5099 workspace
5100 .toggle_follow(&ToggleFollow(leader_id), cx)
5101 .unwrap()
5102 })
5103 .await
5104 .unwrap();
5105 assert_eq!(
5106 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5107 Some(leader_id)
5108 );
5109 let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5110 workspace
5111 .active_item(cx)
5112 .unwrap()
5113 .downcast::<Editor>()
5114 .unwrap()
5115 });
5116
5117 // When client B moves, it automatically stops following client A.
5118 editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
5119 assert_eq!(
5120 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5121 None
5122 );
5123
5124 workspace_b
5125 .update(cx_b, |workspace, cx| {
5126 workspace
5127 .toggle_follow(&ToggleFollow(leader_id), cx)
5128 .unwrap()
5129 })
5130 .await
5131 .unwrap();
5132 assert_eq!(
5133 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5134 Some(leader_id)
5135 );
5136
5137 // When client B edits, it automatically stops following client A.
5138 editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
5139 assert_eq!(
5140 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5141 None
5142 );
5143
5144 workspace_b
5145 .update(cx_b, |workspace, cx| {
5146 workspace
5147 .toggle_follow(&ToggleFollow(leader_id), cx)
5148 .unwrap()
5149 })
5150 .await
5151 .unwrap();
5152 assert_eq!(
5153 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5154 Some(leader_id)
5155 );
5156
5157 // When client B scrolls, it automatically stops following client A.
5158 editor_b2.update(cx_b, |editor, cx| {
5159 editor.set_scroll_position(vec2f(0., 3.), cx)
5160 });
5161 assert_eq!(
5162 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5163 None
5164 );
5165
5166 workspace_b
5167 .update(cx_b, |workspace, cx| {
5168 workspace
5169 .toggle_follow(&ToggleFollow(leader_id), cx)
5170 .unwrap()
5171 })
5172 .await
5173 .unwrap();
5174 assert_eq!(
5175 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5176 Some(leader_id)
5177 );
5178
5179 // When client B activates a different pane, it continues following client A in the original pane.
5180 workspace_b.update(cx_b, |workspace, cx| {
5181 workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
5182 });
5183 assert_eq!(
5184 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5185 Some(leader_id)
5186 );
5187
5188 workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
5189 assert_eq!(
5190 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5191 Some(leader_id)
5192 );
5193
5194 // When client B activates a different item in the original pane, it automatically stops following client A.
5195 workspace_b
5196 .update(cx_b, |workspace, cx| {
5197 workspace.open_path((worktree_id, "2.txt"), cx)
5198 })
5199 .await
5200 .unwrap();
5201 assert_eq!(
5202 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5203 None
5204 );
5205 }
5206
5207 #[gpui::test(iterations = 100)]
5208 async fn test_random_collaboration(
5209 cx: &mut TestAppContext,
5210 deterministic: Arc<Deterministic>,
5211 rng: StdRng,
5212 ) {
5213 cx.foreground().forbid_parking();
5214 let max_peers = env::var("MAX_PEERS")
5215 .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
5216 .unwrap_or(5);
5217 assert!(max_peers <= 5);
5218
5219 let max_operations = env::var("OPERATIONS")
5220 .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
5221 .unwrap_or(10);
5222
5223 let rng = Arc::new(Mutex::new(rng));
5224
5225 let guest_lang_registry = Arc::new(LanguageRegistry::test());
5226 let host_language_registry = Arc::new(LanguageRegistry::test());
5227
5228 let fs = FakeFs::new(cx.background());
5229 fs.insert_tree(
5230 "/_collab",
5231 json!({
5232 ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4"]"#
5233 }),
5234 )
5235 .await;
5236
5237 let mut server = TestServer::start(cx.foreground(), cx.background()).await;
5238 let mut clients = Vec::new();
5239 let mut user_ids = Vec::new();
5240 let mut op_start_signals = Vec::new();
5241 let files = Arc::new(Mutex::new(Vec::new()));
5242
5243 let mut next_entity_id = 100000;
5244 let mut host_cx = TestAppContext::new(
5245 cx.foreground_platform(),
5246 cx.platform(),
5247 deterministic.build_foreground(next_entity_id),
5248 deterministic.build_background(),
5249 cx.font_cache(),
5250 cx.leak_detector(),
5251 next_entity_id,
5252 );
5253 let host = server.create_client(&mut host_cx, "host").await;
5254 let host_project = host_cx.update(|cx| {
5255 Project::local(
5256 host.client.clone(),
5257 host.user_store.clone(),
5258 host_language_registry.clone(),
5259 fs.clone(),
5260 cx,
5261 )
5262 });
5263 let host_project_id = host_project
5264 .update(&mut host_cx, |p, _| p.next_remote_id())
5265 .await;
5266
5267 let (collab_worktree, _) = host_project
5268 .update(&mut host_cx, |project, cx| {
5269 project.find_or_create_local_worktree("/_collab", true, cx)
5270 })
5271 .await
5272 .unwrap();
5273 collab_worktree
5274 .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
5275 .await;
5276 host_project
5277 .update(&mut host_cx, |project, cx| project.share(cx))
5278 .await
5279 .unwrap();
5280
5281 // Set up fake language servers.
5282 let mut language = Language::new(
5283 LanguageConfig {
5284 name: "Rust".into(),
5285 path_suffixes: vec!["rs".to_string()],
5286 ..Default::default()
5287 },
5288 None,
5289 );
5290 let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
5291 name: "the-fake-language-server",
5292 capabilities: lsp::LanguageServer::full_capabilities(),
5293 initializer: Some(Box::new({
5294 let rng = rng.clone();
5295 let files = files.clone();
5296 let project = host_project.downgrade();
5297 move |fake_server: &mut FakeLanguageServer| {
5298 fake_server.handle_request::<lsp::request::Completion, _, _>(
5299 |_, _| async move {
5300 Ok(Some(lsp::CompletionResponse::Array(vec![
5301 lsp::CompletionItem {
5302 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
5303 range: lsp::Range::new(
5304 lsp::Position::new(0, 0),
5305 lsp::Position::new(0, 0),
5306 ),
5307 new_text: "the-new-text".to_string(),
5308 })),
5309 ..Default::default()
5310 },
5311 ])))
5312 },
5313 );
5314
5315 fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
5316 |_, _| async move {
5317 Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
5318 lsp::CodeAction {
5319 title: "the-code-action".to_string(),
5320 ..Default::default()
5321 },
5322 )]))
5323 },
5324 );
5325
5326 fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
5327 |params, _| async move {
5328 Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
5329 params.position,
5330 params.position,
5331 ))))
5332 },
5333 );
5334
5335 fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
5336 let files = files.clone();
5337 let rng = rng.clone();
5338 move |_, _| {
5339 let files = files.clone();
5340 let rng = rng.clone();
5341 async move {
5342 let files = files.lock();
5343 let mut rng = rng.lock();
5344 let count = rng.gen_range::<usize, _>(1..3);
5345 let files = (0..count)
5346 .map(|_| files.choose(&mut *rng).unwrap())
5347 .collect::<Vec<_>>();
5348 log::info!("LSP: Returning definitions in files {:?}", &files);
5349 Ok(Some(lsp::GotoDefinitionResponse::Array(
5350 files
5351 .into_iter()
5352 .map(|file| lsp::Location {
5353 uri: lsp::Url::from_file_path(file).unwrap(),
5354 range: Default::default(),
5355 })
5356 .collect(),
5357 )))
5358 }
5359 }
5360 });
5361
5362 fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
5363 let rng = rng.clone();
5364 let project = project.clone();
5365 move |params, mut cx| {
5366 let highlights = if let Some(project) = project.upgrade(&cx) {
5367 project.update(&mut cx, |project, cx| {
5368 let path = params
5369 .text_document_position_params
5370 .text_document
5371 .uri
5372 .to_file_path()
5373 .unwrap();
5374 let (worktree, relative_path) =
5375 project.find_local_worktree(&path, cx)?;
5376 let project_path =
5377 ProjectPath::from((worktree.read(cx).id(), relative_path));
5378 let buffer =
5379 project.get_open_buffer(&project_path, cx)?.read(cx);
5380
5381 let mut highlights = Vec::new();
5382 let highlight_count = rng.lock().gen_range(1..=5);
5383 let mut prev_end = 0;
5384 for _ in 0..highlight_count {
5385 let range =
5386 buffer.random_byte_range(prev_end, &mut *rng.lock());
5387
5388 highlights.push(lsp::DocumentHighlight {
5389 range: range_to_lsp(range.to_point_utf16(buffer)),
5390 kind: Some(lsp::DocumentHighlightKind::READ),
5391 });
5392 prev_end = range.end;
5393 }
5394 Some(highlights)
5395 })
5396 } else {
5397 None
5398 };
5399 async move { Ok(highlights) }
5400 }
5401 });
5402 }
5403 })),
5404 ..Default::default()
5405 });
5406 host_language_registry.add(Arc::new(language));
5407
5408 let op_start_signal = futures::channel::mpsc::unbounded();
5409 user_ids.push(host.current_user_id(&host_cx));
5410 op_start_signals.push(op_start_signal.0);
5411 clients.push(host_cx.foreground().spawn(host.simulate_host(
5412 host_project,
5413 files,
5414 op_start_signal.1,
5415 rng.clone(),
5416 host_cx,
5417 )));
5418
5419 let disconnect_host_at = if rng.lock().gen_bool(0.2) {
5420 rng.lock().gen_range(0..max_operations)
5421 } else {
5422 max_operations
5423 };
5424 let mut available_guests = vec![
5425 "guest-1".to_string(),
5426 "guest-2".to_string(),
5427 "guest-3".to_string(),
5428 "guest-4".to_string(),
5429 ];
5430 let mut operations = 0;
5431 while operations < max_operations {
5432 if operations == disconnect_host_at {
5433 server.disconnect_client(user_ids[0]);
5434 cx.foreground().advance_clock(RECEIVE_TIMEOUT);
5435 drop(op_start_signals);
5436 let mut clients = futures::future::join_all(clients).await;
5437 cx.foreground().run_until_parked();
5438
5439 let (host, mut host_cx, host_err) = clients.remove(0);
5440 if let Some(host_err) = host_err {
5441 log::error!("host error - {}", host_err);
5442 }
5443 host.project
5444 .as_ref()
5445 .unwrap()
5446 .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
5447 for (guest, mut guest_cx, guest_err) in clients {
5448 if let Some(guest_err) = guest_err {
5449 log::error!("{} error - {}", guest.username, guest_err);
5450 }
5451 let contacts = server
5452 .store
5453 .read()
5454 .await
5455 .contacts_for_user(guest.current_user_id(&guest_cx));
5456 assert!(!contacts
5457 .iter()
5458 .flat_map(|contact| &contact.projects)
5459 .any(|project| project.id == host_project_id));
5460 guest
5461 .project
5462 .as_ref()
5463 .unwrap()
5464 .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
5465 guest_cx.update(|_| drop(guest));
5466 }
5467 host_cx.update(|_| drop(host));
5468
5469 return;
5470 }
5471
5472 let distribution = rng.lock().gen_range(0..100);
5473 match distribution {
5474 0..=19 if !available_guests.is_empty() => {
5475 let guest_ix = rng.lock().gen_range(0..available_guests.len());
5476 let guest_username = available_guests.remove(guest_ix);
5477 log::info!("Adding new connection for {}", guest_username);
5478 next_entity_id += 100000;
5479 let mut guest_cx = TestAppContext::new(
5480 cx.foreground_platform(),
5481 cx.platform(),
5482 deterministic.build_foreground(next_entity_id),
5483 deterministic.build_background(),
5484 cx.font_cache(),
5485 cx.leak_detector(),
5486 next_entity_id,
5487 );
5488 let guest = server.create_client(&mut guest_cx, &guest_username).await;
5489 let guest_project = Project::remote(
5490 host_project_id,
5491 guest.client.clone(),
5492 guest.user_store.clone(),
5493 guest_lang_registry.clone(),
5494 FakeFs::new(cx.background()),
5495 &mut guest_cx.to_async(),
5496 )
5497 .await
5498 .unwrap();
5499 let op_start_signal = futures::channel::mpsc::unbounded();
5500 user_ids.push(guest.current_user_id(&guest_cx));
5501 op_start_signals.push(op_start_signal.0);
5502 clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
5503 guest_username.clone(),
5504 guest_project,
5505 op_start_signal.1,
5506 rng.clone(),
5507 guest_cx,
5508 )));
5509
5510 log::info!("Added connection for {}", guest_username);
5511 operations += 1;
5512 }
5513 20..=29 if clients.len() > 1 => {
5514 log::info!("Removing guest");
5515 let guest_ix = rng.lock().gen_range(1..clients.len());
5516 let removed_guest_id = user_ids.remove(guest_ix);
5517 let guest = clients.remove(guest_ix);
5518 op_start_signals.remove(guest_ix);
5519 server.disconnect_client(removed_guest_id);
5520 cx.foreground().advance_clock(RECEIVE_TIMEOUT);
5521 let (guest, mut guest_cx, guest_err) = guest.await;
5522 if let Some(guest_err) = guest_err {
5523 log::error!("{} error - {}", guest.username, guest_err);
5524 }
5525 guest
5526 .project
5527 .as_ref()
5528 .unwrap()
5529 .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
5530 for user_id in &user_ids {
5531 for contact in server.store.read().await.contacts_for_user(*user_id) {
5532 assert_ne!(
5533 contact.user_id, removed_guest_id.0 as u64,
5534 "removed guest is still a contact of another peer"
5535 );
5536 for project in contact.projects {
5537 for project_guest_id in project.guests {
5538 assert_ne!(
5539 project_guest_id, removed_guest_id.0 as u64,
5540 "removed guest appears as still participating on a project"
5541 );
5542 }
5543 }
5544 }
5545 }
5546
5547 log::info!("{} removed", guest.username);
5548 available_guests.push(guest.username.clone());
5549 guest_cx.update(|_| drop(guest));
5550
5551 operations += 1;
5552 }
5553 _ => {
5554 while operations < max_operations && rng.lock().gen_bool(0.7) {
5555 op_start_signals
5556 .choose(&mut *rng.lock())
5557 .unwrap()
5558 .unbounded_send(())
5559 .unwrap();
5560 operations += 1;
5561 }
5562
5563 if rng.lock().gen_bool(0.8) {
5564 cx.foreground().run_until_parked();
5565 }
5566 }
5567 }
5568 }
5569
5570 drop(op_start_signals);
5571 let mut clients = futures::future::join_all(clients).await;
5572 cx.foreground().run_until_parked();
5573
5574 let (host_client, mut host_cx, host_err) = clients.remove(0);
5575 if let Some(host_err) = host_err {
5576 panic!("host error - {}", host_err);
5577 }
5578 let host_project = host_client.project.as_ref().unwrap();
5579 let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
5580 project
5581 .worktrees(cx)
5582 .map(|worktree| {
5583 let snapshot = worktree.read(cx).snapshot();
5584 (snapshot.id(), snapshot)
5585 })
5586 .collect::<BTreeMap<_, _>>()
5587 });
5588
5589 host_client
5590 .project
5591 .as_ref()
5592 .unwrap()
5593 .read_with(&host_cx, |project, cx| project.check_invariants(cx));
5594
5595 for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
5596 if let Some(guest_err) = guest_err {
5597 panic!("{} error - {}", guest_client.username, guest_err);
5598 }
5599 let worktree_snapshots =
5600 guest_client
5601 .project
5602 .as_ref()
5603 .unwrap()
5604 .read_with(&guest_cx, |project, cx| {
5605 project
5606 .worktrees(cx)
5607 .map(|worktree| {
5608 let worktree = worktree.read(cx);
5609 (worktree.id(), worktree.snapshot())
5610 })
5611 .collect::<BTreeMap<_, _>>()
5612 });
5613
5614 assert_eq!(
5615 worktree_snapshots.keys().collect::<Vec<_>>(),
5616 host_worktree_snapshots.keys().collect::<Vec<_>>(),
5617 "{} has different worktrees than the host",
5618 guest_client.username
5619 );
5620 for (id, host_snapshot) in &host_worktree_snapshots {
5621 let guest_snapshot = &worktree_snapshots[id];
5622 assert_eq!(
5623 guest_snapshot.root_name(),
5624 host_snapshot.root_name(),
5625 "{} has different root name than the host for worktree {}",
5626 guest_client.username,
5627 id
5628 );
5629 assert_eq!(
5630 guest_snapshot.entries(false).collect::<Vec<_>>(),
5631 host_snapshot.entries(false).collect::<Vec<_>>(),
5632 "{} has different snapshot than the host for worktree {}",
5633 guest_client.username,
5634 id
5635 );
5636 }
5637
5638 guest_client
5639 .project
5640 .as_ref()
5641 .unwrap()
5642 .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
5643
5644 for guest_buffer in &guest_client.buffers {
5645 let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
5646 let host_buffer = host_project.read_with(&host_cx, |project, cx| {
5647 project.buffer_for_id(buffer_id, cx).expect(&format!(
5648 "host does not have buffer for guest:{}, peer:{}, id:{}",
5649 guest_client.username, guest_client.peer_id, buffer_id
5650 ))
5651 });
5652 let path = host_buffer
5653 .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
5654
5655 assert_eq!(
5656 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
5657 0,
5658 "{}, buffer {}, path {:?} has deferred operations",
5659 guest_client.username,
5660 buffer_id,
5661 path,
5662 );
5663 assert_eq!(
5664 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
5665 host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
5666 "{}, buffer {}, path {:?}, differs from the host's buffer",
5667 guest_client.username,
5668 buffer_id,
5669 path
5670 );
5671 }
5672
5673 guest_cx.update(|_| drop(guest_client));
5674 }
5675
5676 host_cx.update(|_| drop(host_client));
5677 }
5678
5679 struct TestServer {
5680 peer: Arc<Peer>,
5681 app_state: Arc<AppState>,
5682 server: Arc<Server>,
5683 foreground: Rc<executor::Foreground>,
5684 notifications: mpsc::UnboundedReceiver<()>,
5685 connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
5686 forbid_connections: Arc<AtomicBool>,
5687 _test_db: TestDb,
5688 }
5689
5690 impl TestServer {
5691 async fn start(
5692 foreground: Rc<executor::Foreground>,
5693 background: Arc<executor::Background>,
5694 ) -> Self {
5695 let test_db = TestDb::fake(background);
5696 let app_state = Self::build_app_state(&test_db).await;
5697 let peer = Peer::new();
5698 let notifications = mpsc::unbounded();
5699 let server = Server::new(app_state.clone(), Some(notifications.0));
5700 Self {
5701 peer,
5702 app_state,
5703 server,
5704 foreground,
5705 notifications: notifications.1,
5706 connection_killers: Default::default(),
5707 forbid_connections: Default::default(),
5708 _test_db: test_db,
5709 }
5710 }
5711
5712 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
5713 cx.update(|cx| {
5714 let settings = Settings::test(cx);
5715 cx.set_global(settings);
5716 });
5717
5718 let http = FakeHttpClient::with_404_response();
5719 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
5720 let client_name = name.to_string();
5721 let mut client = Client::new(http.clone());
5722 let server = self.server.clone();
5723 let connection_killers = self.connection_killers.clone();
5724 let forbid_connections = self.forbid_connections.clone();
5725 let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
5726
5727 Arc::get_mut(&mut client)
5728 .unwrap()
5729 .override_authenticate(move |cx| {
5730 cx.spawn(|_| async move {
5731 let access_token = "the-token".to_string();
5732 Ok(Credentials {
5733 user_id: user_id.0 as u64,
5734 access_token,
5735 })
5736 })
5737 })
5738 .override_establish_connection(move |credentials, cx| {
5739 assert_eq!(credentials.user_id, user_id.0 as u64);
5740 assert_eq!(credentials.access_token, "the-token");
5741
5742 let server = server.clone();
5743 let connection_killers = connection_killers.clone();
5744 let forbid_connections = forbid_connections.clone();
5745 let client_name = client_name.clone();
5746 let connection_id_tx = connection_id_tx.clone();
5747 cx.spawn(move |cx| async move {
5748 if forbid_connections.load(SeqCst) {
5749 Err(EstablishConnectionError::other(anyhow!(
5750 "server is forbidding connections"
5751 )))
5752 } else {
5753 let (client_conn, server_conn, killed) =
5754 Connection::in_memory(cx.background());
5755 connection_killers.lock().insert(user_id, killed);
5756 cx.background()
5757 .spawn(server.handle_connection(
5758 server_conn,
5759 client_name,
5760 user_id,
5761 Some(connection_id_tx),
5762 cx.background(),
5763 ))
5764 .detach();
5765 Ok(client_conn)
5766 }
5767 })
5768 });
5769
5770 client
5771 .authenticate_and_connect(false, &cx.to_async())
5772 .await
5773 .unwrap();
5774
5775 Channel::init(&client);
5776 Project::init(&client);
5777 cx.update(|cx| {
5778 workspace::init(&client, cx);
5779 });
5780
5781 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
5782 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
5783
5784 let client = TestClient {
5785 client,
5786 peer_id,
5787 username: name.to_string(),
5788 user_store,
5789 language_registry: Arc::new(LanguageRegistry::test()),
5790 project: Default::default(),
5791 buffers: Default::default(),
5792 };
5793 client.wait_for_current_user(cx).await;
5794 client
5795 }
5796
5797 fn disconnect_client(&self, user_id: UserId) {
5798 self.connection_killers
5799 .lock()
5800 .remove(&user_id)
5801 .unwrap()
5802 .store(true, SeqCst);
5803 }
5804
5805 fn forbid_connections(&self) {
5806 self.forbid_connections.store(true, SeqCst);
5807 }
5808
5809 fn allow_connections(&self) {
5810 self.forbid_connections.store(false, SeqCst);
5811 }
5812
5813 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
5814 Arc::new(AppState {
5815 db: test_db.db().clone(),
5816 api_token: Default::default(),
5817 })
5818 }
5819
5820 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
5821 self.server.store.read().await
5822 }
5823
5824 async fn condition<F>(&mut self, mut predicate: F)
5825 where
5826 F: FnMut(&Store) -> bool,
5827 {
5828 assert!(
5829 self.foreground.parking_forbidden(),
5830 "you must call forbid_parking to use server conditions so we don't block indefinitely"
5831 );
5832 while !(predicate)(&*self.server.store.read().await) {
5833 self.foreground.start_waiting();
5834 self.notifications.next().await;
5835 self.foreground.finish_waiting();
5836 }
5837 }
5838 }
5839
5840 impl Deref for TestServer {
5841 type Target = Server;
5842
5843 fn deref(&self) -> &Self::Target {
5844 &self.server
5845 }
5846 }
5847
5848 impl Drop for TestServer {
5849 fn drop(&mut self) {
5850 self.peer.reset();
5851 }
5852 }
5853
5854 struct TestClient {
5855 client: Arc<Client>,
5856 username: String,
5857 pub peer_id: PeerId,
5858 pub user_store: ModelHandle<UserStore>,
5859 language_registry: Arc<LanguageRegistry>,
5860 project: Option<ModelHandle<Project>>,
5861 buffers: HashSet<ModelHandle<language::Buffer>>,
5862 }
5863
5864 impl Deref for TestClient {
5865 type Target = Arc<Client>;
5866
5867 fn deref(&self) -> &Self::Target {
5868 &self.client
5869 }
5870 }
5871
5872 impl TestClient {
5873 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
5874 UserId::from_proto(
5875 self.user_store
5876 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
5877 )
5878 }
5879
5880 async fn wait_for_current_user(&self, cx: &TestAppContext) {
5881 let mut authed_user = self
5882 .user_store
5883 .read_with(cx, |user_store, _| user_store.watch_current_user());
5884 while authed_user.next().await.unwrap().is_none() {}
5885 }
5886
5887 async fn build_local_project(
5888 &mut self,
5889 fs: Arc<FakeFs>,
5890 root_path: impl AsRef<Path>,
5891 cx: &mut TestAppContext,
5892 ) -> (ModelHandle<Project>, WorktreeId) {
5893 let project = cx.update(|cx| {
5894 Project::local(
5895 self.client.clone(),
5896 self.user_store.clone(),
5897 self.language_registry.clone(),
5898 fs,
5899 cx,
5900 )
5901 });
5902 self.project = Some(project.clone());
5903 let (worktree, _) = project
5904 .update(cx, |p, cx| {
5905 p.find_or_create_local_worktree(root_path, true, cx)
5906 })
5907 .await
5908 .unwrap();
5909 worktree
5910 .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
5911 .await;
5912 project
5913 .update(cx, |project, _| project.next_remote_id())
5914 .await;
5915 (project, worktree.read_with(cx, |tree, _| tree.id()))
5916 }
5917
5918 async fn build_remote_project(
5919 &mut self,
5920 project_id: u64,
5921 cx: &mut TestAppContext,
5922 ) -> ModelHandle<Project> {
5923 let project = Project::remote(
5924 project_id,
5925 self.client.clone(),
5926 self.user_store.clone(),
5927 self.language_registry.clone(),
5928 FakeFs::new(cx.background()),
5929 &mut cx.to_async(),
5930 )
5931 .await
5932 .unwrap();
5933 self.project = Some(project.clone());
5934 project
5935 }
5936
5937 fn build_workspace(
5938 &self,
5939 project: &ModelHandle<Project>,
5940 cx: &mut TestAppContext,
5941 ) -> ViewHandle<Workspace> {
5942 let (window_id, _) = cx.add_window(|_| EmptyView);
5943 cx.add_view(window_id, |cx| {
5944 let fs = project.read(cx).fs().clone();
5945 Workspace::new(
5946 &WorkspaceParams {
5947 fs,
5948 project: project.clone(),
5949 user_store: self.user_store.clone(),
5950 languages: self.language_registry.clone(),
5951 themes: ThemeRegistry::new((), cx.font_cache().clone()),
5952 channel_list: cx.add_model(|cx| {
5953 ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
5954 }),
5955 client: self.client.clone(),
5956 },
5957 cx,
5958 )
5959 })
5960 }
5961
5962 async fn simulate_host(
5963 mut self,
5964 project: ModelHandle<Project>,
5965 files: Arc<Mutex<Vec<PathBuf>>>,
5966 op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
5967 rng: Arc<Mutex<StdRng>>,
5968 mut cx: TestAppContext,
5969 ) -> (Self, TestAppContext, Option<anyhow::Error>) {
5970 async fn simulate_host_internal(
5971 client: &mut TestClient,
5972 project: ModelHandle<Project>,
5973 files: Arc<Mutex<Vec<PathBuf>>>,
5974 mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
5975 rng: Arc<Mutex<StdRng>>,
5976 cx: &mut TestAppContext,
5977 ) -> anyhow::Result<()> {
5978 let fs = project.read_with(cx, |project, _| project.fs().clone());
5979
5980 while op_start_signal.next().await.is_some() {
5981 let distribution = rng.lock().gen_range::<usize, _>(0..100);
5982 match distribution {
5983 0..=20 if !files.lock().is_empty() => {
5984 let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
5985 let mut path = path.as_path();
5986 while let Some(parent_path) = path.parent() {
5987 path = parent_path;
5988 if rng.lock().gen() {
5989 break;
5990 }
5991 }
5992
5993 log::info!("Host: find/create local worktree {:?}", path);
5994 let find_or_create_worktree = project.update(cx, |project, cx| {
5995 project.find_or_create_local_worktree(path, true, cx)
5996 });
5997 if rng.lock().gen() {
5998 cx.background().spawn(find_or_create_worktree).detach();
5999 } else {
6000 find_or_create_worktree.await?;
6001 }
6002 }
6003 10..=80 if !files.lock().is_empty() => {
6004 let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6005 let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
6006 let (worktree, path) = project
6007 .update(cx, |project, cx| {
6008 project.find_or_create_local_worktree(
6009 file.clone(),
6010 true,
6011 cx,
6012 )
6013 })
6014 .await?;
6015 let project_path =
6016 worktree.read_with(cx, |worktree, _| (worktree.id(), path));
6017 log::info!(
6018 "Host: opening path {:?}, worktree {}, relative_path {:?}",
6019 file,
6020 project_path.0,
6021 project_path.1
6022 );
6023 let buffer = project
6024 .update(cx, |project, cx| project.open_buffer(project_path, cx))
6025 .await
6026 .unwrap();
6027 client.buffers.insert(buffer.clone());
6028 buffer
6029 } else {
6030 client
6031 .buffers
6032 .iter()
6033 .choose(&mut *rng.lock())
6034 .unwrap()
6035 .clone()
6036 };
6037
6038 if rng.lock().gen_bool(0.1) {
6039 cx.update(|cx| {
6040 log::info!(
6041 "Host: dropping buffer {:?}",
6042 buffer.read(cx).file().unwrap().full_path(cx)
6043 );
6044 client.buffers.remove(&buffer);
6045 drop(buffer);
6046 });
6047 } else {
6048 buffer.update(cx, |buffer, cx| {
6049 log::info!(
6050 "Host: updating buffer {:?} ({})",
6051 buffer.file().unwrap().full_path(cx),
6052 buffer.remote_id()
6053 );
6054
6055 if rng.lock().gen_bool(0.7) {
6056 buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6057 } else {
6058 buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6059 }
6060 });
6061 }
6062 }
6063 _ => loop {
6064 let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
6065 let mut path = PathBuf::new();
6066 path.push("/");
6067 for _ in 0..path_component_count {
6068 let letter = rng.lock().gen_range(b'a'..=b'z');
6069 path.push(std::str::from_utf8(&[letter]).unwrap());
6070 }
6071 path.set_extension("rs");
6072 let parent_path = path.parent().unwrap();
6073
6074 log::info!("Host: creating file {:?}", path,);
6075
6076 if fs.create_dir(&parent_path).await.is_ok()
6077 && fs.create_file(&path, Default::default()).await.is_ok()
6078 {
6079 files.lock().push(path);
6080 break;
6081 } else {
6082 log::info!("Host: cannot create file");
6083 }
6084 },
6085 }
6086
6087 cx.background().simulate_random_delay().await;
6088 }
6089
6090 Ok(())
6091 }
6092
6093 let result = simulate_host_internal(
6094 &mut self,
6095 project.clone(),
6096 files,
6097 op_start_signal,
6098 rng,
6099 &mut cx,
6100 )
6101 .await;
6102 log::info!("Host done");
6103 self.project = Some(project);
6104 (self, cx, result.err())
6105 }
6106
6107 pub async fn simulate_guest(
6108 mut self,
6109 guest_username: String,
6110 project: ModelHandle<Project>,
6111 op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6112 rng: Arc<Mutex<StdRng>>,
6113 mut cx: TestAppContext,
6114 ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6115 async fn simulate_guest_internal(
6116 client: &mut TestClient,
6117 guest_username: &str,
6118 project: ModelHandle<Project>,
6119 mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6120 rng: Arc<Mutex<StdRng>>,
6121 cx: &mut TestAppContext,
6122 ) -> anyhow::Result<()> {
6123 while op_start_signal.next().await.is_some() {
6124 let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6125 let worktree = if let Some(worktree) =
6126 project.read_with(cx, |project, cx| {
6127 project
6128 .worktrees(&cx)
6129 .filter(|worktree| {
6130 let worktree = worktree.read(cx);
6131 worktree.is_visible()
6132 && worktree.entries(false).any(|e| e.is_file())
6133 })
6134 .choose(&mut *rng.lock())
6135 }) {
6136 worktree
6137 } else {
6138 cx.background().simulate_random_delay().await;
6139 continue;
6140 };
6141
6142 let (worktree_root_name, project_path) =
6143 worktree.read_with(cx, |worktree, _| {
6144 let entry = worktree
6145 .entries(false)
6146 .filter(|e| e.is_file())
6147 .choose(&mut *rng.lock())
6148 .unwrap();
6149 (
6150 worktree.root_name().to_string(),
6151 (worktree.id(), entry.path.clone()),
6152 )
6153 });
6154 log::info!(
6155 "{}: opening path {:?} in worktree {} ({})",
6156 guest_username,
6157 project_path.1,
6158 project_path.0,
6159 worktree_root_name,
6160 );
6161 let buffer = project
6162 .update(cx, |project, cx| {
6163 project.open_buffer(project_path.clone(), cx)
6164 })
6165 .await?;
6166 log::info!(
6167 "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
6168 guest_username,
6169 project_path.1,
6170 project_path.0,
6171 worktree_root_name,
6172 buffer.read_with(cx, |buffer, _| buffer.remote_id())
6173 );
6174 client.buffers.insert(buffer.clone());
6175 buffer
6176 } else {
6177 client
6178 .buffers
6179 .iter()
6180 .choose(&mut *rng.lock())
6181 .unwrap()
6182 .clone()
6183 };
6184
6185 let choice = rng.lock().gen_range(0..100);
6186 match choice {
6187 0..=9 => {
6188 cx.update(|cx| {
6189 log::info!(
6190 "{}: dropping buffer {:?}",
6191 guest_username,
6192 buffer.read(cx).file().unwrap().full_path(cx)
6193 );
6194 client.buffers.remove(&buffer);
6195 drop(buffer);
6196 });
6197 }
6198 10..=19 => {
6199 let completions = project.update(cx, |project, cx| {
6200 log::info!(
6201 "{}: requesting completions for buffer {} ({:?})",
6202 guest_username,
6203 buffer.read(cx).remote_id(),
6204 buffer.read(cx).file().unwrap().full_path(cx)
6205 );
6206 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6207 project.completions(&buffer, offset, cx)
6208 });
6209 let completions = cx.background().spawn(async move {
6210 completions
6211 .await
6212 .map_err(|err| anyhow!("completions request failed: {:?}", err))
6213 });
6214 if rng.lock().gen_bool(0.3) {
6215 log::info!("{}: detaching completions request", guest_username);
6216 cx.update(|cx| completions.detach_and_log_err(cx));
6217 } else {
6218 completions.await?;
6219 }
6220 }
6221 20..=29 => {
6222 let code_actions = project.update(cx, |project, cx| {
6223 log::info!(
6224 "{}: requesting code actions for buffer {} ({:?})",
6225 guest_username,
6226 buffer.read(cx).remote_id(),
6227 buffer.read(cx).file().unwrap().full_path(cx)
6228 );
6229 let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
6230 project.code_actions(&buffer, range, cx)
6231 });
6232 let code_actions = cx.background().spawn(async move {
6233 code_actions.await.map_err(|err| {
6234 anyhow!("code actions request failed: {:?}", err)
6235 })
6236 });
6237 if rng.lock().gen_bool(0.3) {
6238 log::info!("{}: detaching code actions request", guest_username);
6239 cx.update(|cx| code_actions.detach_and_log_err(cx));
6240 } else {
6241 code_actions.await?;
6242 }
6243 }
6244 30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
6245 let (requested_version, save) = buffer.update(cx, |buffer, cx| {
6246 log::info!(
6247 "{}: saving buffer {} ({:?})",
6248 guest_username,
6249 buffer.remote_id(),
6250 buffer.file().unwrap().full_path(cx)
6251 );
6252 (buffer.version(), buffer.save(cx))
6253 });
6254 let save = cx.background().spawn(async move {
6255 let (saved_version, _) = save
6256 .await
6257 .map_err(|err| anyhow!("save request failed: {:?}", err))?;
6258 assert!(saved_version.observed_all(&requested_version));
6259 Ok::<_, anyhow::Error>(())
6260 });
6261 if rng.lock().gen_bool(0.3) {
6262 log::info!("{}: detaching save request", guest_username);
6263 cx.update(|cx| save.detach_and_log_err(cx));
6264 } else {
6265 save.await?;
6266 }
6267 }
6268 40..=44 => {
6269 let prepare_rename = project.update(cx, |project, cx| {
6270 log::info!(
6271 "{}: preparing rename for buffer {} ({:?})",
6272 guest_username,
6273 buffer.read(cx).remote_id(),
6274 buffer.read(cx).file().unwrap().full_path(cx)
6275 );
6276 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6277 project.prepare_rename(buffer, offset, cx)
6278 });
6279 let prepare_rename = cx.background().spawn(async move {
6280 prepare_rename.await.map_err(|err| {
6281 anyhow!("prepare rename request failed: {:?}", err)
6282 })
6283 });
6284 if rng.lock().gen_bool(0.3) {
6285 log::info!("{}: detaching prepare rename request", guest_username);
6286 cx.update(|cx| prepare_rename.detach_and_log_err(cx));
6287 } else {
6288 prepare_rename.await?;
6289 }
6290 }
6291 45..=49 => {
6292 let definitions = project.update(cx, |project, cx| {
6293 log::info!(
6294 "{}: requesting definitions for buffer {} ({:?})",
6295 guest_username,
6296 buffer.read(cx).remote_id(),
6297 buffer.read(cx).file().unwrap().full_path(cx)
6298 );
6299 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6300 project.definition(&buffer, offset, cx)
6301 });
6302 let definitions = cx.background().spawn(async move {
6303 definitions
6304 .await
6305 .map_err(|err| anyhow!("definitions request failed: {:?}", err))
6306 });
6307 if rng.lock().gen_bool(0.3) {
6308 log::info!("{}: detaching definitions request", guest_username);
6309 cx.update(|cx| definitions.detach_and_log_err(cx));
6310 } else {
6311 client
6312 .buffers
6313 .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
6314 }
6315 }
6316 50..=54 => {
6317 let highlights = project.update(cx, |project, cx| {
6318 log::info!(
6319 "{}: requesting highlights for buffer {} ({:?})",
6320 guest_username,
6321 buffer.read(cx).remote_id(),
6322 buffer.read(cx).file().unwrap().full_path(cx)
6323 );
6324 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6325 project.document_highlights(&buffer, offset, cx)
6326 });
6327 let highlights = cx.background().spawn(async move {
6328 highlights
6329 .await
6330 .map_err(|err| anyhow!("highlights request failed: {:?}", err))
6331 });
6332 if rng.lock().gen_bool(0.3) {
6333 log::info!("{}: detaching highlights request", guest_username);
6334 cx.update(|cx| highlights.detach_and_log_err(cx));
6335 } else {
6336 highlights.await?;
6337 }
6338 }
6339 55..=59 => {
6340 let search = project.update(cx, |project, cx| {
6341 let query = rng.lock().gen_range('a'..='z');
6342 log::info!("{}: project-wide search {:?}", guest_username, query);
6343 project.search(SearchQuery::text(query, false, false), cx)
6344 });
6345 let search = cx.background().spawn(async move {
6346 search
6347 .await
6348 .map_err(|err| anyhow!("search request failed: {:?}", err))
6349 });
6350 if rng.lock().gen_bool(0.3) {
6351 log::info!("{}: detaching search request", guest_username);
6352 cx.update(|cx| search.detach_and_log_err(cx));
6353 } else {
6354 client.buffers.extend(search.await?.into_keys());
6355 }
6356 }
6357 _ => {
6358 buffer.update(cx, |buffer, cx| {
6359 log::info!(
6360 "{}: updating buffer {} ({:?})",
6361 guest_username,
6362 buffer.remote_id(),
6363 buffer.file().unwrap().full_path(cx)
6364 );
6365 if rng.lock().gen_bool(0.7) {
6366 buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6367 } else {
6368 buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6369 }
6370 });
6371 }
6372 }
6373 cx.background().simulate_random_delay().await;
6374 }
6375 Ok(())
6376 }
6377
6378 let result = simulate_guest_internal(
6379 &mut self,
6380 &guest_username,
6381 project.clone(),
6382 op_start_signal,
6383 rng,
6384 &mut cx,
6385 )
6386 .await;
6387 log::info!("{}: done", guest_username);
6388
6389 self.project = Some(project);
6390 (self, cx, result.err())
6391 }
6392 }
6393
6394 impl Drop for TestClient {
6395 fn drop(&mut self) {
6396 self.client.tear_down();
6397 }
6398 }
6399
6400 impl Executor for Arc<gpui::executor::Background> {
6401 type Sleep = gpui::executor::Timer;
6402
6403 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
6404 self.spawn(future).detach();
6405 }
6406
6407 fn sleep(&self, duration: Duration) -> Self::Sleep {
6408 self.as_ref().timer(duration)
6409 }
6410 }
6411
6412 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
6413 channel
6414 .messages()
6415 .cursor::<()>()
6416 .map(|m| {
6417 (
6418 m.sender.github_login.clone(),
6419 m.body.clone(),
6420 m.is_pending(),
6421 )
6422 })
6423 .collect()
6424 }
6425
6426 struct EmptyView;
6427
6428 impl gpui::Entity for EmptyView {
6429 type Event = ();
6430 }
6431
6432 impl gpui::View for EmptyView {
6433 fn ui_name() -> &'static str {
6434 "empty view"
6435 }
6436
6437 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
6438 gpui::Element::boxed(gpui::elements::Empty)
6439 }
6440 }
6441}