1mod store;
2
3use crate::{
4 auth,
5 db::{ChannelId, MessageId, UserId},
6 AppState, Result,
7};
8use anyhow::anyhow;
9use async_tungstenite::tungstenite::{
10 protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
11};
12use axum::{
13 body::Body,
14 extract::{
15 ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
16 ConnectInfo, WebSocketUpgrade,
17 },
18 headers::{Header, HeaderName},
19 http::StatusCode,
20 middleware,
21 response::{IntoResponse, Response},
22 routing::get,
23 Extension, Router, TypedHeader,
24};
25use collections::{HashMap, HashSet};
26use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
27use lazy_static::lazy_static;
28use rpc::{
29 proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
30 Connection, ConnectionId, Peer, TypedEnvelope,
31};
32use std::{
33 any::TypeId,
34 future::Future,
35 marker::PhantomData,
36 net::SocketAddr,
37 ops::{Deref, DerefMut},
38 rc::Rc,
39 sync::Arc,
40 time::Duration,
41};
42use store::{Store, Worktree};
43use time::OffsetDateTime;
44use tokio::{
45 sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
46 time::Sleep,
47};
48use tower::ServiceBuilder;
49use tracing::{info_span, instrument, Instrument};
50
51type MessageHandler =
52 Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
53
54pub struct Server {
55 peer: Arc<Peer>,
56 store: RwLock<Store>,
57 app_state: Arc<AppState>,
58 handlers: HashMap<TypeId, MessageHandler>,
59 notifications: Option<mpsc::UnboundedSender<()>>,
60}
61
62pub trait Executor: Send + Clone {
63 type Sleep: Send + Future;
64 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
65 fn sleep(&self, duration: Duration) -> Self::Sleep;
66}
67
68#[derive(Clone)]
69pub struct RealExecutor;
70
71const MESSAGE_COUNT_PER_PAGE: usize = 100;
72const MAX_MESSAGE_LEN: usize = 1024;
73
74struct StoreReadGuard<'a> {
75 guard: RwLockReadGuard<'a, Store>,
76 _not_send: PhantomData<Rc<()>>,
77}
78
79struct StoreWriteGuard<'a> {
80 guard: RwLockWriteGuard<'a, Store>,
81 _not_send: PhantomData<Rc<()>>,
82}
83
84impl Server {
85 pub fn new(
86 app_state: Arc<AppState>,
87 notifications: Option<mpsc::UnboundedSender<()>>,
88 ) -> Arc<Self> {
89 let mut server = Self {
90 peer: Peer::new(),
91 app_state,
92 store: Default::default(),
93 handlers: Default::default(),
94 notifications,
95 };
96
97 server
98 .add_request_handler(Server::ping)
99 .add_request_handler(Server::register_project)
100 .add_message_handler(Server::unregister_project)
101 .add_request_handler(Server::share_project)
102 .add_message_handler(Server::unshare_project)
103 .add_sync_request_handler(Server::join_project)
104 .add_message_handler(Server::leave_project)
105 .add_request_handler(Server::register_worktree)
106 .add_message_handler(Server::unregister_worktree)
107 .add_request_handler(Server::update_worktree)
108 .add_message_handler(Server::start_language_server)
109 .add_message_handler(Server::update_language_server)
110 .add_message_handler(Server::update_diagnostic_summary)
111 .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
112 .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
113 .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
114 .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
115 .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
116 .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
117 .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
118 .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
119 .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
120 .add_request_handler(
121 Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
122 )
123 .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
124 .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
125 .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
126 .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
127 .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
128 .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
129 .add_request_handler(Server::forward_project_request::<proto::CreateProjectEntry>)
130 .add_request_handler(Server::forward_project_request::<proto::RenameProjectEntry>)
131 .add_request_handler(Server::forward_project_request::<proto::DeleteProjectEntry>)
132 .add_request_handler(Server::update_buffer)
133 .add_message_handler(Server::update_buffer_file)
134 .add_message_handler(Server::buffer_reloaded)
135 .add_message_handler(Server::buffer_saved)
136 .add_request_handler(Server::save_buffer)
137 .add_request_handler(Server::get_channels)
138 .add_request_handler(Server::get_users)
139 .add_request_handler(Server::fuzzy_search_users)
140 .add_request_handler(Server::join_channel)
141 .add_message_handler(Server::leave_channel)
142 .add_request_handler(Server::send_channel_message)
143 .add_request_handler(Server::follow)
144 .add_message_handler(Server::unfollow)
145 .add_message_handler(Server::update_followers)
146 .add_request_handler(Server::get_channel_messages);
147
148 Arc::new(server)
149 }
150
151 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
152 where
153 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
154 Fut: 'static + Send + Future<Output = Result<()>>,
155 M: EnvelopedMessage,
156 {
157 let prev_handler = self.handlers.insert(
158 TypeId::of::<M>(),
159 Box::new(move |server, envelope| {
160 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
161 let span = info_span!(
162 "handle message",
163 payload_type = envelope.payload_type_name(),
164 payload = format!("{:?}", envelope.payload).as_str(),
165 );
166 let future = (handler)(server, *envelope);
167 async move {
168 if let Err(error) = future.await {
169 tracing::error!(%error, "error handling message");
170 }
171 }
172 .instrument(span)
173 .boxed()
174 }),
175 );
176 if prev_handler.is_some() {
177 panic!("registered a handler for the same message twice");
178 }
179 self
180 }
181
182 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
183 where
184 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
185 Fut: 'static + Send + Future<Output = Result<M::Response>>,
186 M: RequestMessage,
187 {
188 self.add_message_handler(move |server, envelope| {
189 let receipt = envelope.receipt();
190 let response = (handler)(server.clone(), envelope);
191 async move {
192 match response.await {
193 Ok(response) => {
194 server.peer.respond(receipt, response)?;
195 Ok(())
196 }
197 Err(error) => {
198 server.peer.respond_with_error(
199 receipt,
200 proto::Error {
201 message: error.to_string(),
202 },
203 )?;
204 Err(error)
205 }
206 }
207 }
208 })
209 }
210
211 /// Handle a request while holding a lock to the store. This is useful when we're registering
212 /// a connection but we want to respond on the connection before anybody else can send on it.
213 fn add_sync_request_handler<F, M>(&mut self, handler: F) -> &mut Self
214 where
215 F: 'static
216 + Send
217 + Sync
218 + Fn(Arc<Self>, &mut Store, TypedEnvelope<M>) -> Result<M::Response>,
219 M: RequestMessage,
220 {
221 let handler = Arc::new(handler);
222 self.add_message_handler(move |server, envelope| {
223 let receipt = envelope.receipt();
224 let handler = handler.clone();
225 async move {
226 let mut store = server.state_mut().await;
227 let response = (handler)(server.clone(), &mut *store, envelope);
228 match response {
229 Ok(response) => {
230 server.peer.respond(receipt, response)?;
231 Ok(())
232 }
233 Err(error) => {
234 server.peer.respond_with_error(
235 receipt,
236 proto::Error {
237 message: error.to_string(),
238 },
239 )?;
240 Err(error)
241 }
242 }
243 }
244 })
245 }
246
247 pub fn handle_connection<E: Executor>(
248 self: &Arc<Self>,
249 connection: Connection,
250 address: String,
251 user_id: UserId,
252 mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
253 executor: E,
254 ) -> impl Future<Output = ()> {
255 let mut this = self.clone();
256 let span = info_span!("handle connection", %user_id, %address);
257 async move {
258 let (connection_id, handle_io, mut incoming_rx) = this
259 .peer
260 .add_connection(connection, {
261 let executor = executor.clone();
262 move |duration| {
263 let timer = executor.sleep(duration);
264 async move {
265 timer.await;
266 }
267 }
268 })
269 .await;
270
271 tracing::info!(%user_id, %connection_id, %address, "connection opened");
272
273 if let Some(send_connection_id) = send_connection_id.as_mut() {
274 let _ = send_connection_id.send(connection_id).await;
275 }
276
277 {
278 let mut state = this.state_mut().await;
279 state.add_connection(connection_id, user_id);
280 this.update_contacts_for_users(&*state, &[user_id]);
281 }
282
283 let handle_io = handle_io.fuse();
284 futures::pin_mut!(handle_io);
285 loop {
286 let next_message = incoming_rx.next().fuse();
287 futures::pin_mut!(next_message);
288 futures::select_biased! {
289 result = handle_io => {
290 if let Err(error) = result {
291 tracing::error!(%error, "error handling I/O");
292 }
293 break;
294 }
295 message = next_message => {
296 if let Some(message) = message {
297 let type_name = message.payload_type_name();
298 let span = tracing::info_span!("receive message", %user_id, %connection_id, %address, type_name);
299 async {
300 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
301 let notifications = this.notifications.clone();
302 let is_background = message.is_background();
303 let handle_message = (handler)(this.clone(), message);
304 let handle_message = async move {
305 handle_message.await;
306 if let Some(mut notifications) = notifications {
307 let _ = notifications.send(()).await;
308 }
309 };
310 if is_background {
311 executor.spawn_detached(handle_message);
312 } else {
313 handle_message.await;
314 }
315 } else {
316 tracing::error!("no message handler");
317 }
318 }.instrument(span).await;
319 } else {
320 tracing::info!(%user_id, %connection_id, %address, "connection closed");
321 break;
322 }
323 }
324 }
325 }
326
327 if let Err(error) = this.sign_out(connection_id).await {
328 tracing::error!(%error, "error signing out");
329 }
330 }.instrument(span)
331 }
332
333 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
334 self.peer.disconnect(connection_id);
335 let mut state = self.state_mut().await;
336 let removed_connection = state.remove_connection(connection_id)?;
337
338 for (project_id, project) in removed_connection.hosted_projects {
339 if let Some(share) = project.share {
340 broadcast(
341 connection_id,
342 share.guests.keys().copied().collect(),
343 |conn_id| {
344 self.peer
345 .send(conn_id, proto::UnshareProject { project_id })
346 },
347 );
348 }
349 }
350
351 for (project_id, peer_ids) in removed_connection.guest_project_ids {
352 broadcast(connection_id, peer_ids, |conn_id| {
353 self.peer.send(
354 conn_id,
355 proto::RemoveProjectCollaborator {
356 project_id,
357 peer_id: connection_id.0,
358 },
359 )
360 });
361 }
362
363 self.update_contacts_for_users(&*state, removed_connection.contact_ids.iter());
364 Ok(())
365 }
366
367 async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> Result<proto::Ack> {
368 Ok(proto::Ack {})
369 }
370
371 async fn register_project(
372 self: Arc<Server>,
373 request: TypedEnvelope<proto::RegisterProject>,
374 ) -> Result<proto::RegisterProjectResponse> {
375 let project_id = {
376 let mut state = self.state_mut().await;
377 let user_id = state.user_id_for_connection(request.sender_id)?;
378 state.register_project(request.sender_id, user_id)
379 };
380 Ok(proto::RegisterProjectResponse { project_id })
381 }
382
383 async fn unregister_project(
384 self: Arc<Server>,
385 request: TypedEnvelope<proto::UnregisterProject>,
386 ) -> Result<()> {
387 let mut state = self.state_mut().await;
388 let project = state.unregister_project(request.payload.project_id, request.sender_id)?;
389 self.update_contacts_for_users(&*state, &project.authorized_user_ids());
390 Ok(())
391 }
392
393 async fn share_project(
394 self: Arc<Server>,
395 request: TypedEnvelope<proto::ShareProject>,
396 ) -> Result<proto::Ack> {
397 let mut state = self.state_mut().await;
398 let project = state.share_project(request.payload.project_id, request.sender_id)?;
399 self.update_contacts_for_users(&mut *state, &project.authorized_user_ids);
400 Ok(proto::Ack {})
401 }
402
403 async fn unshare_project(
404 self: Arc<Server>,
405 request: TypedEnvelope<proto::UnshareProject>,
406 ) -> Result<()> {
407 let project_id = request.payload.project_id;
408 let mut state = self.state_mut().await;
409 let project = state.unshare_project(project_id, request.sender_id)?;
410 broadcast(request.sender_id, project.connection_ids, |conn_id| {
411 self.peer
412 .send(conn_id, proto::UnshareProject { project_id })
413 });
414 self.update_contacts_for_users(&mut *state, &project.authorized_user_ids);
415 Ok(())
416 }
417
418 fn join_project(
419 self: Arc<Server>,
420 state: &mut Store,
421 request: TypedEnvelope<proto::JoinProject>,
422 ) -> Result<proto::JoinProjectResponse> {
423 let project_id = request.payload.project_id;
424
425 let user_id = state.user_id_for_connection(request.sender_id)?;
426 let (response, connection_ids, contact_user_ids) = state
427 .join_project(request.sender_id, user_id, project_id)
428 .and_then(|joined| {
429 let share = joined.project.share()?;
430 let peer_count = share.guests.len();
431 let mut collaborators = Vec::with_capacity(peer_count);
432 collaborators.push(proto::Collaborator {
433 peer_id: joined.project.host_connection_id.0,
434 replica_id: 0,
435 user_id: joined.project.host_user_id.to_proto(),
436 });
437 let worktrees = share
438 .worktrees
439 .iter()
440 .filter_map(|(id, shared_worktree)| {
441 let worktree = joined.project.worktrees.get(&id)?;
442 Some(proto::Worktree {
443 id: *id,
444 root_name: worktree.root_name.clone(),
445 entries: shared_worktree.entries.values().cloned().collect(),
446 diagnostic_summaries: shared_worktree
447 .diagnostic_summaries
448 .values()
449 .cloned()
450 .collect(),
451 visible: worktree.visible,
452 scan_id: shared_worktree.scan_id,
453 })
454 })
455 .collect();
456 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
457 if *peer_conn_id != request.sender_id {
458 collaborators.push(proto::Collaborator {
459 peer_id: peer_conn_id.0,
460 replica_id: *peer_replica_id as u32,
461 user_id: peer_user_id.to_proto(),
462 });
463 }
464 }
465 let response = proto::JoinProjectResponse {
466 worktrees,
467 replica_id: joined.replica_id as u32,
468 collaborators,
469 language_servers: joined.project.language_servers.clone(),
470 };
471 let connection_ids = joined.project.connection_ids();
472 let contact_user_ids = joined.project.authorized_user_ids();
473 Ok((response, connection_ids, contact_user_ids))
474 })?;
475
476 broadcast(request.sender_id, connection_ids, |conn_id| {
477 self.peer.send(
478 conn_id,
479 proto::AddProjectCollaborator {
480 project_id,
481 collaborator: Some(proto::Collaborator {
482 peer_id: request.sender_id.0,
483 replica_id: response.replica_id,
484 user_id: user_id.to_proto(),
485 }),
486 },
487 )
488 });
489 self.update_contacts_for_users(state, &contact_user_ids);
490 Ok(response)
491 }
492
493 async fn leave_project(
494 self: Arc<Server>,
495 request: TypedEnvelope<proto::LeaveProject>,
496 ) -> Result<()> {
497 let sender_id = request.sender_id;
498 let project_id = request.payload.project_id;
499 let mut state = self.state_mut().await;
500 let worktree = state.leave_project(sender_id, project_id)?;
501 broadcast(sender_id, worktree.connection_ids, |conn_id| {
502 self.peer.send(
503 conn_id,
504 proto::RemoveProjectCollaborator {
505 project_id,
506 peer_id: sender_id.0,
507 },
508 )
509 });
510 self.update_contacts_for_users(&*state, &worktree.authorized_user_ids);
511 Ok(())
512 }
513
514 async fn register_worktree(
515 self: Arc<Server>,
516 request: TypedEnvelope<proto::RegisterWorktree>,
517 ) -> Result<proto::Ack> {
518 let mut contact_user_ids = HashSet::default();
519 for github_login in &request.payload.authorized_logins {
520 let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
521 contact_user_ids.insert(contact_user_id);
522 }
523
524 let mut state = self.state_mut().await;
525 let host_user_id = state.user_id_for_connection(request.sender_id)?;
526 contact_user_ids.insert(host_user_id);
527
528 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
529 let guest_connection_ids = state
530 .read_project(request.payload.project_id, request.sender_id)?
531 .guest_connection_ids();
532 state.register_worktree(
533 request.payload.project_id,
534 request.payload.worktree_id,
535 request.sender_id,
536 Worktree {
537 authorized_user_ids: contact_user_ids.clone(),
538 root_name: request.payload.root_name.clone(),
539 visible: request.payload.visible,
540 },
541 )?;
542
543 broadcast(request.sender_id, guest_connection_ids, |connection_id| {
544 self.peer
545 .forward_send(request.sender_id, connection_id, request.payload.clone())
546 });
547 self.update_contacts_for_users(&*state, &contact_user_ids);
548 Ok(proto::Ack {})
549 }
550
551 async fn unregister_worktree(
552 self: Arc<Server>,
553 request: TypedEnvelope<proto::UnregisterWorktree>,
554 ) -> Result<()> {
555 let project_id = request.payload.project_id;
556 let worktree_id = request.payload.worktree_id;
557 let mut state = self.state_mut().await;
558 let (worktree, guest_connection_ids) =
559 state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
560 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
561 self.peer.send(
562 conn_id,
563 proto::UnregisterWorktree {
564 project_id,
565 worktree_id,
566 },
567 )
568 });
569 self.update_contacts_for_users(&*state, &worktree.authorized_user_ids);
570 Ok(())
571 }
572
573 async fn update_worktree(
574 self: Arc<Server>,
575 request: TypedEnvelope<proto::UpdateWorktree>,
576 ) -> Result<proto::Ack> {
577 let connection_ids = self.state_mut().await.update_worktree(
578 request.sender_id,
579 request.payload.project_id,
580 request.payload.worktree_id,
581 &request.payload.removed_entries,
582 &request.payload.updated_entries,
583 request.payload.scan_id,
584 )?;
585
586 broadcast(request.sender_id, connection_ids, |connection_id| {
587 self.peer
588 .forward_send(request.sender_id, connection_id, request.payload.clone())
589 });
590
591 Ok(proto::Ack {})
592 }
593
594 async fn update_diagnostic_summary(
595 self: Arc<Server>,
596 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
597 ) -> Result<()> {
598 let summary = request
599 .payload
600 .summary
601 .clone()
602 .ok_or_else(|| anyhow!("invalid summary"))?;
603 let receiver_ids = self.state_mut().await.update_diagnostic_summary(
604 request.payload.project_id,
605 request.payload.worktree_id,
606 request.sender_id,
607 summary,
608 )?;
609
610 broadcast(request.sender_id, receiver_ids, |connection_id| {
611 self.peer
612 .forward_send(request.sender_id, connection_id, request.payload.clone())
613 });
614 Ok(())
615 }
616
617 async fn start_language_server(
618 self: Arc<Server>,
619 request: TypedEnvelope<proto::StartLanguageServer>,
620 ) -> Result<()> {
621 let receiver_ids = self.state_mut().await.start_language_server(
622 request.payload.project_id,
623 request.sender_id,
624 request
625 .payload
626 .server
627 .clone()
628 .ok_or_else(|| anyhow!("invalid language server"))?,
629 )?;
630 broadcast(request.sender_id, receiver_ids, |connection_id| {
631 self.peer
632 .forward_send(request.sender_id, connection_id, request.payload.clone())
633 });
634 Ok(())
635 }
636
637 async fn update_language_server(
638 self: Arc<Server>,
639 request: TypedEnvelope<proto::UpdateLanguageServer>,
640 ) -> Result<()> {
641 let receiver_ids = self
642 .state()
643 .await
644 .project_connection_ids(request.payload.project_id, request.sender_id)?;
645 broadcast(request.sender_id, receiver_ids, |connection_id| {
646 self.peer
647 .forward_send(request.sender_id, connection_id, request.payload.clone())
648 });
649 Ok(())
650 }
651
652 async fn forward_project_request<T>(
653 self: Arc<Server>,
654 request: TypedEnvelope<T>,
655 ) -> Result<T::Response>
656 where
657 T: EntityMessage + RequestMessage,
658 {
659 let host_connection_id = self
660 .state()
661 .await
662 .read_project(request.payload.remote_entity_id(), request.sender_id)?
663 .host_connection_id;
664 Ok(self
665 .peer
666 .forward_request(request.sender_id, host_connection_id, request.payload)
667 .await?)
668 }
669
670 async fn save_buffer(
671 self: Arc<Server>,
672 request: TypedEnvelope<proto::SaveBuffer>,
673 ) -> Result<proto::BufferSaved> {
674 let host = self
675 .state()
676 .await
677 .read_project(request.payload.project_id, request.sender_id)?
678 .host_connection_id;
679 let response = self
680 .peer
681 .forward_request(request.sender_id, host, request.payload.clone())
682 .await?;
683
684 let mut guests = self
685 .state()
686 .await
687 .read_project(request.payload.project_id, request.sender_id)?
688 .connection_ids();
689 guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
690 broadcast(host, guests, |conn_id| {
691 self.peer.forward_send(host, conn_id, response.clone())
692 });
693
694 Ok(response)
695 }
696
697 async fn update_buffer(
698 self: Arc<Server>,
699 request: TypedEnvelope<proto::UpdateBuffer>,
700 ) -> Result<proto::Ack> {
701 let receiver_ids = self
702 .state()
703 .await
704 .project_connection_ids(request.payload.project_id, request.sender_id)?;
705 broadcast(request.sender_id, receiver_ids, |connection_id| {
706 self.peer
707 .forward_send(request.sender_id, connection_id, request.payload.clone())
708 });
709 Ok(proto::Ack {})
710 }
711
712 async fn update_buffer_file(
713 self: Arc<Server>,
714 request: TypedEnvelope<proto::UpdateBufferFile>,
715 ) -> Result<()> {
716 let receiver_ids = self
717 .state()
718 .await
719 .project_connection_ids(request.payload.project_id, request.sender_id)?;
720 broadcast(request.sender_id, receiver_ids, |connection_id| {
721 self.peer
722 .forward_send(request.sender_id, connection_id, request.payload.clone())
723 });
724 Ok(())
725 }
726
727 async fn buffer_reloaded(
728 self: Arc<Server>,
729 request: TypedEnvelope<proto::BufferReloaded>,
730 ) -> Result<()> {
731 let receiver_ids = self
732 .state()
733 .await
734 .project_connection_ids(request.payload.project_id, request.sender_id)?;
735 broadcast(request.sender_id, receiver_ids, |connection_id| {
736 self.peer
737 .forward_send(request.sender_id, connection_id, request.payload.clone())
738 });
739 Ok(())
740 }
741
742 async fn buffer_saved(
743 self: Arc<Server>,
744 request: TypedEnvelope<proto::BufferSaved>,
745 ) -> Result<()> {
746 let receiver_ids = self
747 .state()
748 .await
749 .project_connection_ids(request.payload.project_id, request.sender_id)?;
750 broadcast(request.sender_id, receiver_ids, |connection_id| {
751 self.peer
752 .forward_send(request.sender_id, connection_id, request.payload.clone())
753 });
754 Ok(())
755 }
756
757 async fn follow(
758 self: Arc<Self>,
759 request: TypedEnvelope<proto::Follow>,
760 ) -> Result<proto::FollowResponse> {
761 let leader_id = ConnectionId(request.payload.leader_id);
762 let follower_id = request.sender_id;
763 if !self
764 .state()
765 .await
766 .project_connection_ids(request.payload.project_id, follower_id)?
767 .contains(&leader_id)
768 {
769 Err(anyhow!("no such peer"))?;
770 }
771 let mut response = self
772 .peer
773 .forward_request(request.sender_id, leader_id, request.payload)
774 .await?;
775 response
776 .views
777 .retain(|view| view.leader_id != Some(follower_id.0));
778 Ok(response)
779 }
780
781 async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
782 let leader_id = ConnectionId(request.payload.leader_id);
783 if !self
784 .state()
785 .await
786 .project_connection_ids(request.payload.project_id, request.sender_id)?
787 .contains(&leader_id)
788 {
789 Err(anyhow!("no such peer"))?;
790 }
791 self.peer
792 .forward_send(request.sender_id, leader_id, request.payload)?;
793 Ok(())
794 }
795
796 async fn update_followers(
797 self: Arc<Self>,
798 request: TypedEnvelope<proto::UpdateFollowers>,
799 ) -> Result<()> {
800 let connection_ids = self
801 .state()
802 .await
803 .project_connection_ids(request.payload.project_id, request.sender_id)?;
804 let leader_id = request
805 .payload
806 .variant
807 .as_ref()
808 .and_then(|variant| match variant {
809 proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
810 proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
811 proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
812 });
813 for follower_id in &request.payload.follower_ids {
814 let follower_id = ConnectionId(*follower_id);
815 if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
816 self.peer
817 .forward_send(request.sender_id, follower_id, request.payload.clone())?;
818 }
819 }
820 Ok(())
821 }
822
823 async fn get_channels(
824 self: Arc<Server>,
825 request: TypedEnvelope<proto::GetChannels>,
826 ) -> Result<proto::GetChannelsResponse> {
827 let user_id = self
828 .state()
829 .await
830 .user_id_for_connection(request.sender_id)?;
831 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
832 Ok(proto::GetChannelsResponse {
833 channels: channels
834 .into_iter()
835 .map(|chan| proto::Channel {
836 id: chan.id.to_proto(),
837 name: chan.name,
838 })
839 .collect(),
840 })
841 }
842
843 async fn get_users(
844 self: Arc<Server>,
845 request: TypedEnvelope<proto::GetUsers>,
846 ) -> Result<proto::UsersResponse> {
847 let user_ids = request
848 .payload
849 .user_ids
850 .into_iter()
851 .map(UserId::from_proto)
852 .collect();
853 let users = self
854 .app_state
855 .db
856 .get_users_by_ids(user_ids)
857 .await?
858 .into_iter()
859 .map(|user| proto::User {
860 id: user.id.to_proto(),
861 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
862 github_login: user.github_login,
863 })
864 .collect();
865 Ok(proto::UsersResponse { users })
866 }
867
868 async fn fuzzy_search_users(
869 self: Arc<Server>,
870 request: TypedEnvelope<proto::FuzzySearchUsers>,
871 ) -> Result<proto::UsersResponse> {
872 let query = request.payload.query;
873 let db = &self.app_state.db;
874 let users = match query.len() {
875 0 => vec![],
876 1 | 2 => db
877 .get_user_by_github_login(&query)
878 .await?
879 .into_iter()
880 .collect(),
881 _ => db.fuzzy_search_users(&query, 10).await?,
882 };
883 let users = users
884 .into_iter()
885 .map(|user| proto::User {
886 id: user.id.to_proto(),
887 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
888 github_login: user.github_login,
889 })
890 .collect();
891 Ok(proto::UsersResponse { users })
892 }
893
894 #[instrument(skip(self, state, user_ids))]
895 fn update_contacts_for_users<'a>(
896 self: &Arc<Self>,
897 state: &Store,
898 user_ids: impl IntoIterator<Item = &'a UserId>,
899 ) {
900 for user_id in user_ids {
901 let contacts = state.contacts_for_user(*user_id);
902 for connection_id in state.connection_ids_for_user(*user_id) {
903 self.peer
904 .send(
905 connection_id,
906 proto::UpdateContacts {
907 contacts: contacts.clone(),
908 pending_requests_from_user_ids: Default::default(),
909 pending_requests_to_user_ids: Default::default(),
910 },
911 )
912 .trace_err();
913 }
914 }
915 }
916
917 async fn join_channel(
918 self: Arc<Self>,
919 request: TypedEnvelope<proto::JoinChannel>,
920 ) -> Result<proto::JoinChannelResponse> {
921 let user_id = self
922 .state()
923 .await
924 .user_id_for_connection(request.sender_id)?;
925 let channel_id = ChannelId::from_proto(request.payload.channel_id);
926 if !self
927 .app_state
928 .db
929 .can_user_access_channel(user_id, channel_id)
930 .await?
931 {
932 Err(anyhow!("access denied"))?;
933 }
934
935 self.state_mut()
936 .await
937 .join_channel(request.sender_id, channel_id);
938 let messages = self
939 .app_state
940 .db
941 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
942 .await?
943 .into_iter()
944 .map(|msg| proto::ChannelMessage {
945 id: msg.id.to_proto(),
946 body: msg.body,
947 timestamp: msg.sent_at.unix_timestamp() as u64,
948 sender_id: msg.sender_id.to_proto(),
949 nonce: Some(msg.nonce.as_u128().into()),
950 })
951 .collect::<Vec<_>>();
952 Ok(proto::JoinChannelResponse {
953 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
954 messages,
955 })
956 }
957
958 async fn leave_channel(
959 self: Arc<Self>,
960 request: TypedEnvelope<proto::LeaveChannel>,
961 ) -> Result<()> {
962 let user_id = self
963 .state()
964 .await
965 .user_id_for_connection(request.sender_id)?;
966 let channel_id = ChannelId::from_proto(request.payload.channel_id);
967 if !self
968 .app_state
969 .db
970 .can_user_access_channel(user_id, channel_id)
971 .await?
972 {
973 Err(anyhow!("access denied"))?;
974 }
975
976 self.state_mut()
977 .await
978 .leave_channel(request.sender_id, channel_id);
979
980 Ok(())
981 }
982
983 async fn send_channel_message(
984 self: Arc<Self>,
985 request: TypedEnvelope<proto::SendChannelMessage>,
986 ) -> Result<proto::SendChannelMessageResponse> {
987 let channel_id = ChannelId::from_proto(request.payload.channel_id);
988 let user_id;
989 let connection_ids;
990 {
991 let state = self.state().await;
992 user_id = state.user_id_for_connection(request.sender_id)?;
993 connection_ids = state.channel_connection_ids(channel_id)?;
994 }
995
996 // Validate the message body.
997 let body = request.payload.body.trim().to_string();
998 if body.len() > MAX_MESSAGE_LEN {
999 return Err(anyhow!("message is too long"))?;
1000 }
1001 if body.is_empty() {
1002 return Err(anyhow!("message can't be blank"))?;
1003 }
1004
1005 let timestamp = OffsetDateTime::now_utc();
1006 let nonce = request
1007 .payload
1008 .nonce
1009 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
1010
1011 let message_id = self
1012 .app_state
1013 .db
1014 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1015 .await?
1016 .to_proto();
1017 let message = proto::ChannelMessage {
1018 sender_id: user_id.to_proto(),
1019 id: message_id,
1020 body,
1021 timestamp: timestamp.unix_timestamp() as u64,
1022 nonce: Some(nonce),
1023 };
1024 broadcast(request.sender_id, connection_ids, |conn_id| {
1025 self.peer.send(
1026 conn_id,
1027 proto::ChannelMessageSent {
1028 channel_id: channel_id.to_proto(),
1029 message: Some(message.clone()),
1030 },
1031 )
1032 });
1033 Ok(proto::SendChannelMessageResponse {
1034 message: Some(message),
1035 })
1036 }
1037
1038 async fn get_channel_messages(
1039 self: Arc<Self>,
1040 request: TypedEnvelope<proto::GetChannelMessages>,
1041 ) -> Result<proto::GetChannelMessagesResponse> {
1042 let user_id = self
1043 .state()
1044 .await
1045 .user_id_for_connection(request.sender_id)?;
1046 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1047 if !self
1048 .app_state
1049 .db
1050 .can_user_access_channel(user_id, channel_id)
1051 .await?
1052 {
1053 Err(anyhow!("access denied"))?;
1054 }
1055
1056 let messages = self
1057 .app_state
1058 .db
1059 .get_channel_messages(
1060 channel_id,
1061 MESSAGE_COUNT_PER_PAGE,
1062 Some(MessageId::from_proto(request.payload.before_message_id)),
1063 )
1064 .await?
1065 .into_iter()
1066 .map(|msg| proto::ChannelMessage {
1067 id: msg.id.to_proto(),
1068 body: msg.body,
1069 timestamp: msg.sent_at.unix_timestamp() as u64,
1070 sender_id: msg.sender_id.to_proto(),
1071 nonce: Some(msg.nonce.as_u128().into()),
1072 })
1073 .collect::<Vec<_>>();
1074
1075 Ok(proto::GetChannelMessagesResponse {
1076 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1077 messages,
1078 })
1079 }
1080
1081 async fn state<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1082 #[cfg(test)]
1083 tokio::task::yield_now().await;
1084 let guard = self.store.read().await;
1085 #[cfg(test)]
1086 tokio::task::yield_now().await;
1087 StoreReadGuard {
1088 guard,
1089 _not_send: PhantomData,
1090 }
1091 }
1092
1093 async fn state_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1094 #[cfg(test)]
1095 tokio::task::yield_now().await;
1096 let guard = self.store.write().await;
1097 #[cfg(test)]
1098 tokio::task::yield_now().await;
1099 StoreWriteGuard {
1100 guard,
1101 _not_send: PhantomData,
1102 }
1103 }
1104}
1105
1106impl<'a> Deref for StoreReadGuard<'a> {
1107 type Target = Store;
1108
1109 fn deref(&self) -> &Self::Target {
1110 &*self.guard
1111 }
1112}
1113
1114impl<'a> Deref for StoreWriteGuard<'a> {
1115 type Target = Store;
1116
1117 fn deref(&self) -> &Self::Target {
1118 &*self.guard
1119 }
1120}
1121
1122impl<'a> DerefMut for StoreWriteGuard<'a> {
1123 fn deref_mut(&mut self) -> &mut Self::Target {
1124 &mut *self.guard
1125 }
1126}
1127
1128impl<'a> Drop for StoreWriteGuard<'a> {
1129 fn drop(&mut self) {
1130 #[cfg(test)]
1131 self.check_invariants();
1132
1133 let metrics = self.metrics();
1134 tracing::info!(
1135 connections = metrics.connections,
1136 registered_projects = metrics.registered_projects,
1137 shared_projects = metrics.shared_projects,
1138 "metrics"
1139 );
1140 }
1141}
1142
1143impl Executor for RealExecutor {
1144 type Sleep = Sleep;
1145
1146 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1147 tokio::task::spawn(future);
1148 }
1149
1150 fn sleep(&self, duration: Duration) -> Self::Sleep {
1151 tokio::time::sleep(duration)
1152 }
1153}
1154
1155#[instrument(skip(f))]
1156fn broadcast<F>(sender_id: ConnectionId, receiver_ids: Vec<ConnectionId>, mut f: F)
1157where
1158 F: FnMut(ConnectionId) -> anyhow::Result<()>,
1159{
1160 for receiver_id in receiver_ids {
1161 if receiver_id != sender_id {
1162 f(receiver_id).trace_err();
1163 }
1164 }
1165}
1166
1167lazy_static! {
1168 static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1169}
1170
1171pub struct ProtocolVersion(u32);
1172
1173impl Header for ProtocolVersion {
1174 fn name() -> &'static HeaderName {
1175 &ZED_PROTOCOL_VERSION
1176 }
1177
1178 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1179 where
1180 Self: Sized,
1181 I: Iterator<Item = &'i axum::http::HeaderValue>,
1182 {
1183 let version = values
1184 .next()
1185 .ok_or_else(|| axum::headers::Error::invalid())?
1186 .to_str()
1187 .map_err(|_| axum::headers::Error::invalid())?
1188 .parse()
1189 .map_err(|_| axum::headers::Error::invalid())?;
1190 Ok(Self(version))
1191 }
1192
1193 fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1194 values.extend([self.0.to_string().parse().unwrap()]);
1195 }
1196}
1197
1198pub fn routes(app_state: Arc<AppState>) -> Router<Body> {
1199 let server = Server::new(app_state.clone(), None);
1200 Router::new()
1201 .route("/rpc", get(handle_websocket_request))
1202 .layer(
1203 ServiceBuilder::new()
1204 .layer(Extension(app_state))
1205 .layer(middleware::from_fn(auth::validate_header))
1206 .layer(Extension(server)),
1207 )
1208}
1209
1210pub async fn handle_websocket_request(
1211 TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1212 ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1213 Extension(server): Extension<Arc<Server>>,
1214 Extension(user_id): Extension<UserId>,
1215 ws: WebSocketUpgrade,
1216) -> Response {
1217 if protocol_version != rpc::PROTOCOL_VERSION {
1218 return (
1219 StatusCode::UPGRADE_REQUIRED,
1220 "client must be upgraded".to_string(),
1221 )
1222 .into_response();
1223 }
1224 let socket_address = socket_address.to_string();
1225 ws.on_upgrade(move |socket| {
1226 let socket = socket
1227 .map_ok(to_tungstenite_message)
1228 .err_into()
1229 .with(|message| async move { Ok(to_axum_message(message)) });
1230 let connection = Connection::new(Box::pin(socket));
1231 server.handle_connection(connection, socket_address, user_id, None, RealExecutor)
1232 })
1233}
1234
1235fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1236 match message {
1237 TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1238 TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1239 TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1240 TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1241 TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1242 code: frame.code.into(),
1243 reason: frame.reason,
1244 })),
1245 }
1246}
1247
1248fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1249 match message {
1250 AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1251 AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1252 AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1253 AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1254 AxumMessage::Close(frame) => {
1255 TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1256 code: frame.code.into(),
1257 reason: frame.reason,
1258 }))
1259 }
1260 }
1261}
1262
1263pub trait ResultExt {
1264 type Ok;
1265
1266 fn trace_err(self) -> Option<Self::Ok>;
1267}
1268
1269impl<T, E> ResultExt for Result<T, E>
1270where
1271 E: std::fmt::Debug,
1272{
1273 type Ok = T;
1274
1275 fn trace_err(self) -> Option<T> {
1276 match self {
1277 Ok(value) => Some(value),
1278 Err(error) => {
1279 tracing::error!("{:?}", error);
1280 None
1281 }
1282 }
1283 }
1284}
1285
1286#[cfg(test)]
1287mod tests {
1288 use super::*;
1289 use crate::{
1290 db::{tests::TestDb, UserId},
1291 AppState,
1292 };
1293 use ::rpc::Peer;
1294 use client::{
1295 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1296 EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1297 };
1298 use collections::BTreeMap;
1299 use editor::{
1300 self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1301 ToOffset, ToggleCodeActions, Undo,
1302 };
1303 use gpui::{
1304 executor::{self, Deterministic},
1305 geometry::vector::vec2f,
1306 ModelHandle, TestAppContext, ViewHandle,
1307 };
1308 use language::{
1309 range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1310 LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1311 };
1312 use lsp::{self, FakeLanguageServer};
1313 use parking_lot::Mutex;
1314 use project::{
1315 fs::{FakeFs, Fs as _},
1316 search::SearchQuery,
1317 worktree::WorktreeHandle,
1318 DiagnosticSummary, Project, ProjectPath, WorktreeId,
1319 };
1320 use rand::prelude::*;
1321 use rpc::PeerId;
1322 use serde_json::json;
1323 use settings::Settings;
1324 use sqlx::types::time::OffsetDateTime;
1325 use std::{
1326 env,
1327 ops::Deref,
1328 path::{Path, PathBuf},
1329 rc::Rc,
1330 sync::{
1331 atomic::{AtomicBool, Ordering::SeqCst},
1332 Arc,
1333 },
1334 time::Duration,
1335 };
1336 use theme::ThemeRegistry;
1337 use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
1338
1339 #[cfg(test)]
1340 #[ctor::ctor]
1341 fn init_logger() {
1342 if std::env::var("RUST_LOG").is_ok() {
1343 env_logger::init();
1344 }
1345 }
1346
1347 #[gpui::test(iterations = 10)]
1348 async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1349 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1350 let lang_registry = Arc::new(LanguageRegistry::test());
1351 let fs = FakeFs::new(cx_a.background());
1352 cx_a.foreground().forbid_parking();
1353
1354 // Connect to a server as 2 clients.
1355 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1356 let client_a = server.create_client(cx_a, "user_a").await;
1357 let client_b = server.create_client(cx_b, "user_b").await;
1358
1359 // Share a project as client A
1360 fs.insert_tree(
1361 "/a",
1362 json!({
1363 ".zed.toml": r#"collaborators = ["user_b"]"#,
1364 "a.txt": "a-contents",
1365 "b.txt": "b-contents",
1366 }),
1367 )
1368 .await;
1369 let project_a = cx_a.update(|cx| {
1370 Project::local(
1371 client_a.clone(),
1372 client_a.user_store.clone(),
1373 lang_registry.clone(),
1374 fs.clone(),
1375 cx,
1376 )
1377 });
1378 let (worktree_a, _) = project_a
1379 .update(cx_a, |p, cx| {
1380 p.find_or_create_local_worktree("/a", true, cx)
1381 })
1382 .await
1383 .unwrap();
1384 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1385 worktree_a
1386 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1387 .await;
1388 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1389 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1390
1391 // Join that project as client B
1392 let project_b = Project::remote(
1393 project_id,
1394 client_b.clone(),
1395 client_b.user_store.clone(),
1396 lang_registry.clone(),
1397 fs.clone(),
1398 &mut cx_b.to_async(),
1399 )
1400 .await
1401 .unwrap();
1402
1403 let replica_id_b = project_b.read_with(cx_b, |project, _| {
1404 assert_eq!(
1405 project
1406 .collaborators()
1407 .get(&client_a.peer_id)
1408 .unwrap()
1409 .user
1410 .github_login,
1411 "user_a"
1412 );
1413 project.replica_id()
1414 });
1415 project_a
1416 .condition(&cx_a, |tree, _| {
1417 tree.collaborators()
1418 .get(&client_b.peer_id)
1419 .map_or(false, |collaborator| {
1420 collaborator.replica_id == replica_id_b
1421 && collaborator.user.github_login == "user_b"
1422 })
1423 })
1424 .await;
1425
1426 // Open the same file as client B and client A.
1427 let buffer_b = project_b
1428 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1429 .await
1430 .unwrap();
1431 buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1432 project_a.read_with(cx_a, |project, cx| {
1433 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1434 });
1435 let buffer_a = project_a
1436 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1437 .await
1438 .unwrap();
1439
1440 let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1441
1442 // TODO
1443 // // Create a selection set as client B and see that selection set as client A.
1444 // buffer_a
1445 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1446 // .await;
1447
1448 // Edit the buffer as client B and see that edit as client A.
1449 editor_b.update(cx_b, |editor, cx| {
1450 editor.handle_input(&Input("ok, ".into()), cx)
1451 });
1452 buffer_a
1453 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1454 .await;
1455
1456 // TODO
1457 // // Remove the selection set as client B, see those selections disappear as client A.
1458 cx_b.update(move |_| drop(editor_b));
1459 // buffer_a
1460 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1461 // .await;
1462
1463 // Dropping the client B's project removes client B from client A's collaborators.
1464 cx_b.update(move |_| drop(project_b));
1465 project_a
1466 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1467 .await;
1468 }
1469
1470 #[gpui::test(iterations = 10)]
1471 async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1472 let lang_registry = Arc::new(LanguageRegistry::test());
1473 let fs = FakeFs::new(cx_a.background());
1474 cx_a.foreground().forbid_parking();
1475
1476 // Connect to a server as 2 clients.
1477 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1478 let client_a = server.create_client(cx_a, "user_a").await;
1479 let client_b = server.create_client(cx_b, "user_b").await;
1480
1481 // Share a project as client A
1482 fs.insert_tree(
1483 "/a",
1484 json!({
1485 ".zed.toml": r#"collaborators = ["user_b"]"#,
1486 "a.txt": "a-contents",
1487 "b.txt": "b-contents",
1488 }),
1489 )
1490 .await;
1491 let project_a = cx_a.update(|cx| {
1492 Project::local(
1493 client_a.clone(),
1494 client_a.user_store.clone(),
1495 lang_registry.clone(),
1496 fs.clone(),
1497 cx,
1498 )
1499 });
1500 let (worktree_a, _) = project_a
1501 .update(cx_a, |p, cx| {
1502 p.find_or_create_local_worktree("/a", true, cx)
1503 })
1504 .await
1505 .unwrap();
1506 worktree_a
1507 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1508 .await;
1509 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1510 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1511 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1512 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1513
1514 // Join that project as client B
1515 let project_b = Project::remote(
1516 project_id,
1517 client_b.clone(),
1518 client_b.user_store.clone(),
1519 lang_registry.clone(),
1520 fs.clone(),
1521 &mut cx_b.to_async(),
1522 )
1523 .await
1524 .unwrap();
1525 project_b
1526 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1527 .await
1528 .unwrap();
1529
1530 // Unshare the project as client A
1531 project_a.update(cx_a, |project, cx| project.unshare(cx));
1532 project_b
1533 .condition(cx_b, |project, _| project.is_read_only())
1534 .await;
1535 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1536 cx_b.update(|_| {
1537 drop(project_b);
1538 });
1539
1540 // Share the project again and ensure guests can still join.
1541 project_a
1542 .update(cx_a, |project, cx| project.share(cx))
1543 .await
1544 .unwrap();
1545 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1546
1547 let project_b2 = Project::remote(
1548 project_id,
1549 client_b.clone(),
1550 client_b.user_store.clone(),
1551 lang_registry.clone(),
1552 fs.clone(),
1553 &mut cx_b.to_async(),
1554 )
1555 .await
1556 .unwrap();
1557 project_b2
1558 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1559 .await
1560 .unwrap();
1561 }
1562
1563 #[gpui::test(iterations = 10)]
1564 async fn test_host_disconnect(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1565 let lang_registry = Arc::new(LanguageRegistry::test());
1566 let fs = FakeFs::new(cx_a.background());
1567 cx_a.foreground().forbid_parking();
1568
1569 // Connect to a server as 2 clients.
1570 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1571 let client_a = server.create_client(cx_a, "user_a").await;
1572 let client_b = server.create_client(cx_b, "user_b").await;
1573
1574 // Share a project as client A
1575 fs.insert_tree(
1576 "/a",
1577 json!({
1578 ".zed.toml": r#"collaborators = ["user_b"]"#,
1579 "a.txt": "a-contents",
1580 "b.txt": "b-contents",
1581 }),
1582 )
1583 .await;
1584 let project_a = cx_a.update(|cx| {
1585 Project::local(
1586 client_a.clone(),
1587 client_a.user_store.clone(),
1588 lang_registry.clone(),
1589 fs.clone(),
1590 cx,
1591 )
1592 });
1593 let (worktree_a, _) = project_a
1594 .update(cx_a, |p, cx| {
1595 p.find_or_create_local_worktree("/a", true, cx)
1596 })
1597 .await
1598 .unwrap();
1599 worktree_a
1600 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1601 .await;
1602 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1603 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1604 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1605 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1606
1607 // Join that project as client B
1608 let project_b = Project::remote(
1609 project_id,
1610 client_b.clone(),
1611 client_b.user_store.clone(),
1612 lang_registry.clone(),
1613 fs.clone(),
1614 &mut cx_b.to_async(),
1615 )
1616 .await
1617 .unwrap();
1618 project_b
1619 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1620 .await
1621 .unwrap();
1622
1623 // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
1624 server.disconnect_client(client_a.current_user_id(cx_a));
1625 cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
1626 project_a
1627 .condition(cx_a, |project, _| project.collaborators().is_empty())
1628 .await;
1629 project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
1630 project_b
1631 .condition(cx_b, |project, _| project.is_read_only())
1632 .await;
1633 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1634 cx_b.update(|_| {
1635 drop(project_b);
1636 });
1637
1638 // Await reconnection
1639 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1640
1641 // Share the project again and ensure guests can still join.
1642 project_a
1643 .update(cx_a, |project, cx| project.share(cx))
1644 .await
1645 .unwrap();
1646 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1647
1648 let project_b2 = Project::remote(
1649 project_id,
1650 client_b.clone(),
1651 client_b.user_store.clone(),
1652 lang_registry.clone(),
1653 fs.clone(),
1654 &mut cx_b.to_async(),
1655 )
1656 .await
1657 .unwrap();
1658 project_b2
1659 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1660 .await
1661 .unwrap();
1662 }
1663
1664 #[gpui::test(iterations = 10)]
1665 async fn test_propagate_saves_and_fs_changes(
1666 cx_a: &mut TestAppContext,
1667 cx_b: &mut TestAppContext,
1668 cx_c: &mut TestAppContext,
1669 ) {
1670 let lang_registry = Arc::new(LanguageRegistry::test());
1671 let fs = FakeFs::new(cx_a.background());
1672 cx_a.foreground().forbid_parking();
1673
1674 // Connect to a server as 3 clients.
1675 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1676 let client_a = server.create_client(cx_a, "user_a").await;
1677 let client_b = server.create_client(cx_b, "user_b").await;
1678 let client_c = server.create_client(cx_c, "user_c").await;
1679
1680 // Share a worktree as client A.
1681 fs.insert_tree(
1682 "/a",
1683 json!({
1684 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1685 "file1": "",
1686 "file2": ""
1687 }),
1688 )
1689 .await;
1690 let project_a = cx_a.update(|cx| {
1691 Project::local(
1692 client_a.clone(),
1693 client_a.user_store.clone(),
1694 lang_registry.clone(),
1695 fs.clone(),
1696 cx,
1697 )
1698 });
1699 let (worktree_a, _) = project_a
1700 .update(cx_a, |p, cx| {
1701 p.find_or_create_local_worktree("/a", true, cx)
1702 })
1703 .await
1704 .unwrap();
1705 worktree_a
1706 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1707 .await;
1708 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1709 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1710 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1711
1712 // Join that worktree as clients B and C.
1713 let project_b = Project::remote(
1714 project_id,
1715 client_b.clone(),
1716 client_b.user_store.clone(),
1717 lang_registry.clone(),
1718 fs.clone(),
1719 &mut cx_b.to_async(),
1720 )
1721 .await
1722 .unwrap();
1723 let project_c = Project::remote(
1724 project_id,
1725 client_c.clone(),
1726 client_c.user_store.clone(),
1727 lang_registry.clone(),
1728 fs.clone(),
1729 &mut cx_c.to_async(),
1730 )
1731 .await
1732 .unwrap();
1733 let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1734 let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1735
1736 // Open and edit a buffer as both guests B and C.
1737 let buffer_b = project_b
1738 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1739 .await
1740 .unwrap();
1741 let buffer_c = project_c
1742 .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1743 .await
1744 .unwrap();
1745 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "i-am-b, ")], cx));
1746 buffer_c.update(cx_c, |buf, cx| buf.edit([(0..0, "i-am-c, ")], cx));
1747
1748 // Open and edit that buffer as the host.
1749 let buffer_a = project_a
1750 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1751 .await
1752 .unwrap();
1753
1754 buffer_a
1755 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1756 .await;
1757 buffer_a.update(cx_a, |buf, cx| {
1758 buf.edit([(buf.len()..buf.len(), "i-am-a")], cx)
1759 });
1760
1761 // Wait for edits to propagate
1762 buffer_a
1763 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1764 .await;
1765 buffer_b
1766 .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1767 .await;
1768 buffer_c
1769 .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1770 .await;
1771
1772 // Edit the buffer as the host and concurrently save as guest B.
1773 let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1774 buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "hi-a, ")], cx));
1775 save_b.await.unwrap();
1776 assert_eq!(
1777 fs.load("/a/file1".as_ref()).await.unwrap(),
1778 "hi-a, i-am-c, i-am-b, i-am-a"
1779 );
1780 buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1781 buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1782 buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1783
1784 worktree_a.flush_fs_events(cx_a).await;
1785
1786 // Make changes on host's file system, see those changes on guest worktrees.
1787 fs.rename(
1788 "/a/file1".as_ref(),
1789 "/a/file1-renamed".as_ref(),
1790 Default::default(),
1791 )
1792 .await
1793 .unwrap();
1794
1795 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1796 .await
1797 .unwrap();
1798 fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1799
1800 worktree_a
1801 .condition(&cx_a, |tree, _| {
1802 tree.paths()
1803 .map(|p| p.to_string_lossy())
1804 .collect::<Vec<_>>()
1805 == [".zed.toml", "file1-renamed", "file3", "file4"]
1806 })
1807 .await;
1808 worktree_b
1809 .condition(&cx_b, |tree, _| {
1810 tree.paths()
1811 .map(|p| p.to_string_lossy())
1812 .collect::<Vec<_>>()
1813 == [".zed.toml", "file1-renamed", "file3", "file4"]
1814 })
1815 .await;
1816 worktree_c
1817 .condition(&cx_c, |tree, _| {
1818 tree.paths()
1819 .map(|p| p.to_string_lossy())
1820 .collect::<Vec<_>>()
1821 == [".zed.toml", "file1-renamed", "file3", "file4"]
1822 })
1823 .await;
1824
1825 // Ensure buffer files are updated as well.
1826 buffer_a
1827 .condition(&cx_a, |buf, _| {
1828 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1829 })
1830 .await;
1831 buffer_b
1832 .condition(&cx_b, |buf, _| {
1833 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1834 })
1835 .await;
1836 buffer_c
1837 .condition(&cx_c, |buf, _| {
1838 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1839 })
1840 .await;
1841 }
1842
1843 #[gpui::test(iterations = 10)]
1844 async fn test_fs_operations(
1845 executor: Arc<Deterministic>,
1846 cx_a: &mut TestAppContext,
1847 cx_b: &mut TestAppContext,
1848 ) {
1849 executor.forbid_parking();
1850 let fs = FakeFs::new(cx_a.background());
1851
1852 // Connect to a server as 2 clients.
1853 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1854 let mut client_a = server.create_client(cx_a, "user_a").await;
1855 let mut client_b = server.create_client(cx_b, "user_b").await;
1856
1857 // Share a project as client A
1858 fs.insert_tree(
1859 "/dir",
1860 json!({
1861 ".zed.toml": r#"collaborators = ["user_b"]"#,
1862 "a.txt": "a-contents",
1863 "b.txt": "b-contents",
1864 }),
1865 )
1866 .await;
1867
1868 let (project_a, worktree_id) = client_a.build_local_project(fs, "/dir", cx_a).await;
1869 let project_id = project_a.read_with(cx_a, |project, _| project.remote_id().unwrap());
1870 project_a
1871 .update(cx_a, |project, cx| project.share(cx))
1872 .await
1873 .unwrap();
1874
1875 let project_b = client_b.build_remote_project(project_id, cx_b).await;
1876
1877 let worktree_a =
1878 project_a.read_with(cx_a, |project, cx| project.worktrees(cx).next().unwrap());
1879 let worktree_b =
1880 project_b.read_with(cx_b, |project, cx| project.worktrees(cx).next().unwrap());
1881
1882 let entry = project_b
1883 .update(cx_b, |project, cx| {
1884 project
1885 .create_entry((worktree_id, "c.txt"), false, cx)
1886 .unwrap()
1887 })
1888 .await
1889 .unwrap();
1890 worktree_a.read_with(cx_a, |worktree, _| {
1891 assert_eq!(
1892 worktree
1893 .paths()
1894 .map(|p| p.to_string_lossy())
1895 .collect::<Vec<_>>(),
1896 [".zed.toml", "a.txt", "b.txt", "c.txt"]
1897 );
1898 });
1899 worktree_b.read_with(cx_b, |worktree, _| {
1900 assert_eq!(
1901 worktree
1902 .paths()
1903 .map(|p| p.to_string_lossy())
1904 .collect::<Vec<_>>(),
1905 [".zed.toml", "a.txt", "b.txt", "c.txt"]
1906 );
1907 });
1908
1909 project_b
1910 .update(cx_b, |project, cx| {
1911 project.rename_entry(entry.id, Path::new("d.txt"), cx)
1912 })
1913 .unwrap()
1914 .await
1915 .unwrap();
1916 worktree_a.read_with(cx_a, |worktree, _| {
1917 assert_eq!(
1918 worktree
1919 .paths()
1920 .map(|p| p.to_string_lossy())
1921 .collect::<Vec<_>>(),
1922 [".zed.toml", "a.txt", "b.txt", "d.txt"]
1923 );
1924 });
1925 worktree_b.read_with(cx_b, |worktree, _| {
1926 assert_eq!(
1927 worktree
1928 .paths()
1929 .map(|p| p.to_string_lossy())
1930 .collect::<Vec<_>>(),
1931 [".zed.toml", "a.txt", "b.txt", "d.txt"]
1932 );
1933 });
1934
1935 let dir_entry = project_b
1936 .update(cx_b, |project, cx| {
1937 project
1938 .create_entry((worktree_id, "DIR"), true, cx)
1939 .unwrap()
1940 })
1941 .await
1942 .unwrap();
1943 worktree_a.read_with(cx_a, |worktree, _| {
1944 assert_eq!(
1945 worktree
1946 .paths()
1947 .map(|p| p.to_string_lossy())
1948 .collect::<Vec<_>>(),
1949 [".zed.toml", "DIR", "a.txt", "b.txt", "d.txt"]
1950 );
1951 });
1952 worktree_b.read_with(cx_b, |worktree, _| {
1953 assert_eq!(
1954 worktree
1955 .paths()
1956 .map(|p| p.to_string_lossy())
1957 .collect::<Vec<_>>(),
1958 [".zed.toml", "DIR", "a.txt", "b.txt", "d.txt"]
1959 );
1960 });
1961
1962 project_b
1963 .update(cx_b, |project, cx| {
1964 project.delete_entry(dir_entry.id, cx).unwrap()
1965 })
1966 .await
1967 .unwrap();
1968 worktree_a.read_with(cx_a, |worktree, _| {
1969 assert_eq!(
1970 worktree
1971 .paths()
1972 .map(|p| p.to_string_lossy())
1973 .collect::<Vec<_>>(),
1974 [".zed.toml", "a.txt", "b.txt", "d.txt"]
1975 );
1976 });
1977 worktree_b.read_with(cx_b, |worktree, _| {
1978 assert_eq!(
1979 worktree
1980 .paths()
1981 .map(|p| p.to_string_lossy())
1982 .collect::<Vec<_>>(),
1983 [".zed.toml", "a.txt", "b.txt", "d.txt"]
1984 );
1985 });
1986
1987 project_b
1988 .update(cx_b, |project, cx| {
1989 project.delete_entry(entry.id, cx).unwrap()
1990 })
1991 .await
1992 .unwrap();
1993 worktree_a.read_with(cx_a, |worktree, _| {
1994 assert_eq!(
1995 worktree
1996 .paths()
1997 .map(|p| p.to_string_lossy())
1998 .collect::<Vec<_>>(),
1999 [".zed.toml", "a.txt", "b.txt"]
2000 );
2001 });
2002 worktree_b.read_with(cx_b, |worktree, _| {
2003 assert_eq!(
2004 worktree
2005 .paths()
2006 .map(|p| p.to_string_lossy())
2007 .collect::<Vec<_>>(),
2008 [".zed.toml", "a.txt", "b.txt"]
2009 );
2010 });
2011 }
2012
2013 #[gpui::test(iterations = 10)]
2014 async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2015 cx_a.foreground().forbid_parking();
2016 let lang_registry = Arc::new(LanguageRegistry::test());
2017 let fs = FakeFs::new(cx_a.background());
2018
2019 // Connect to a server as 2 clients.
2020 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2021 let client_a = server.create_client(cx_a, "user_a").await;
2022 let client_b = server.create_client(cx_b, "user_b").await;
2023
2024 // Share a project as client A
2025 fs.insert_tree(
2026 "/dir",
2027 json!({
2028 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2029 "a.txt": "a-contents",
2030 }),
2031 )
2032 .await;
2033
2034 let project_a = cx_a.update(|cx| {
2035 Project::local(
2036 client_a.clone(),
2037 client_a.user_store.clone(),
2038 lang_registry.clone(),
2039 fs.clone(),
2040 cx,
2041 )
2042 });
2043 let (worktree_a, _) = project_a
2044 .update(cx_a, |p, cx| {
2045 p.find_or_create_local_worktree("/dir", true, cx)
2046 })
2047 .await
2048 .unwrap();
2049 worktree_a
2050 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2051 .await;
2052 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2053 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2054 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2055
2056 // Join that project as client B
2057 let project_b = Project::remote(
2058 project_id,
2059 client_b.clone(),
2060 client_b.user_store.clone(),
2061 lang_registry.clone(),
2062 fs.clone(),
2063 &mut cx_b.to_async(),
2064 )
2065 .await
2066 .unwrap();
2067
2068 // Open a buffer as client B
2069 let buffer_b = project_b
2070 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2071 .await
2072 .unwrap();
2073
2074 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "world ")], cx));
2075 buffer_b.read_with(cx_b, |buf, _| {
2076 assert!(buf.is_dirty());
2077 assert!(!buf.has_conflict());
2078 });
2079
2080 buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
2081 buffer_b
2082 .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
2083 .await;
2084 buffer_b.read_with(cx_b, |buf, _| {
2085 assert!(!buf.has_conflict());
2086 });
2087
2088 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "hello ")], cx));
2089 buffer_b.read_with(cx_b, |buf, _| {
2090 assert!(buf.is_dirty());
2091 assert!(!buf.has_conflict());
2092 });
2093 }
2094
2095 #[gpui::test(iterations = 10)]
2096 async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2097 cx_a.foreground().forbid_parking();
2098 let lang_registry = Arc::new(LanguageRegistry::test());
2099 let fs = FakeFs::new(cx_a.background());
2100
2101 // Connect to a server as 2 clients.
2102 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2103 let client_a = server.create_client(cx_a, "user_a").await;
2104 let client_b = server.create_client(cx_b, "user_b").await;
2105
2106 // Share a project as client A
2107 fs.insert_tree(
2108 "/dir",
2109 json!({
2110 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2111 "a.txt": "a-contents",
2112 }),
2113 )
2114 .await;
2115
2116 let project_a = cx_a.update(|cx| {
2117 Project::local(
2118 client_a.clone(),
2119 client_a.user_store.clone(),
2120 lang_registry.clone(),
2121 fs.clone(),
2122 cx,
2123 )
2124 });
2125 let (worktree_a, _) = project_a
2126 .update(cx_a, |p, cx| {
2127 p.find_or_create_local_worktree("/dir", true, cx)
2128 })
2129 .await
2130 .unwrap();
2131 worktree_a
2132 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2133 .await;
2134 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2135 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2136 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2137
2138 // Join that project as client B
2139 let project_b = Project::remote(
2140 project_id,
2141 client_b.clone(),
2142 client_b.user_store.clone(),
2143 lang_registry.clone(),
2144 fs.clone(),
2145 &mut cx_b.to_async(),
2146 )
2147 .await
2148 .unwrap();
2149 let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2150
2151 // Open a buffer as client B
2152 let buffer_b = project_b
2153 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2154 .await
2155 .unwrap();
2156 buffer_b.read_with(cx_b, |buf, _| {
2157 assert!(!buf.is_dirty());
2158 assert!(!buf.has_conflict());
2159 });
2160
2161 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
2162 .await
2163 .unwrap();
2164 buffer_b
2165 .condition(&cx_b, |buf, _| {
2166 buf.text() == "new contents" && !buf.is_dirty()
2167 })
2168 .await;
2169 buffer_b.read_with(cx_b, |buf, _| {
2170 assert!(!buf.has_conflict());
2171 });
2172 }
2173
2174 #[gpui::test(iterations = 10)]
2175 async fn test_editing_while_guest_opens_buffer(
2176 cx_a: &mut TestAppContext,
2177 cx_b: &mut TestAppContext,
2178 ) {
2179 cx_a.foreground().forbid_parking();
2180 let lang_registry = Arc::new(LanguageRegistry::test());
2181 let fs = FakeFs::new(cx_a.background());
2182
2183 // Connect to a server as 2 clients.
2184 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2185 let client_a = server.create_client(cx_a, "user_a").await;
2186 let client_b = server.create_client(cx_b, "user_b").await;
2187
2188 // Share a project as client A
2189 fs.insert_tree(
2190 "/dir",
2191 json!({
2192 ".zed.toml": r#"collaborators = ["user_b"]"#,
2193 "a.txt": "a-contents",
2194 }),
2195 )
2196 .await;
2197 let project_a = cx_a.update(|cx| {
2198 Project::local(
2199 client_a.clone(),
2200 client_a.user_store.clone(),
2201 lang_registry.clone(),
2202 fs.clone(),
2203 cx,
2204 )
2205 });
2206 let (worktree_a, _) = project_a
2207 .update(cx_a, |p, cx| {
2208 p.find_or_create_local_worktree("/dir", true, cx)
2209 })
2210 .await
2211 .unwrap();
2212 worktree_a
2213 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2214 .await;
2215 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2216 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2217 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2218
2219 // Join that project as client B
2220 let project_b = Project::remote(
2221 project_id,
2222 client_b.clone(),
2223 client_b.user_store.clone(),
2224 lang_registry.clone(),
2225 fs.clone(),
2226 &mut cx_b.to_async(),
2227 )
2228 .await
2229 .unwrap();
2230
2231 // Open a buffer as client A
2232 let buffer_a = project_a
2233 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2234 .await
2235 .unwrap();
2236
2237 // Start opening the same buffer as client B
2238 let buffer_b = cx_b
2239 .background()
2240 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2241
2242 // Edit the buffer as client A while client B is still opening it.
2243 cx_b.background().simulate_random_delay().await;
2244 buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "X")], cx));
2245 cx_b.background().simulate_random_delay().await;
2246 buffer_a.update(cx_a, |buf, cx| buf.edit([(1..1, "Y")], cx));
2247
2248 let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2249 let buffer_b = buffer_b.await.unwrap();
2250 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2251 }
2252
2253 #[gpui::test(iterations = 10)]
2254 async fn test_leaving_worktree_while_opening_buffer(
2255 cx_a: &mut TestAppContext,
2256 cx_b: &mut TestAppContext,
2257 ) {
2258 cx_a.foreground().forbid_parking();
2259 let lang_registry = Arc::new(LanguageRegistry::test());
2260 let fs = FakeFs::new(cx_a.background());
2261
2262 // Connect to a server as 2 clients.
2263 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2264 let client_a = server.create_client(cx_a, "user_a").await;
2265 let client_b = server.create_client(cx_b, "user_b").await;
2266
2267 // Share a project as client A
2268 fs.insert_tree(
2269 "/dir",
2270 json!({
2271 ".zed.toml": r#"collaborators = ["user_b"]"#,
2272 "a.txt": "a-contents",
2273 }),
2274 )
2275 .await;
2276 let project_a = cx_a.update(|cx| {
2277 Project::local(
2278 client_a.clone(),
2279 client_a.user_store.clone(),
2280 lang_registry.clone(),
2281 fs.clone(),
2282 cx,
2283 )
2284 });
2285 let (worktree_a, _) = project_a
2286 .update(cx_a, |p, cx| {
2287 p.find_or_create_local_worktree("/dir", true, cx)
2288 })
2289 .await
2290 .unwrap();
2291 worktree_a
2292 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2293 .await;
2294 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2295 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2296 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2297
2298 // Join that project as client B
2299 let project_b = Project::remote(
2300 project_id,
2301 client_b.clone(),
2302 client_b.user_store.clone(),
2303 lang_registry.clone(),
2304 fs.clone(),
2305 &mut cx_b.to_async(),
2306 )
2307 .await
2308 .unwrap();
2309
2310 // See that a guest has joined as client A.
2311 project_a
2312 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2313 .await;
2314
2315 // Begin opening a buffer as client B, but leave the project before the open completes.
2316 let buffer_b = cx_b
2317 .background()
2318 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2319 cx_b.update(|_| drop(project_b));
2320 drop(buffer_b);
2321
2322 // See that the guest has left.
2323 project_a
2324 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2325 .await;
2326 }
2327
2328 #[gpui::test(iterations = 10)]
2329 async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2330 cx_a.foreground().forbid_parking();
2331 let lang_registry = Arc::new(LanguageRegistry::test());
2332 let fs = FakeFs::new(cx_a.background());
2333
2334 // Connect to a server as 2 clients.
2335 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2336 let client_a = server.create_client(cx_a, "user_a").await;
2337 let client_b = server.create_client(cx_b, "user_b").await;
2338
2339 // Share a project as client A
2340 fs.insert_tree(
2341 "/a",
2342 json!({
2343 ".zed.toml": r#"collaborators = ["user_b"]"#,
2344 "a.txt": "a-contents",
2345 "b.txt": "b-contents",
2346 }),
2347 )
2348 .await;
2349 let project_a = cx_a.update(|cx| {
2350 Project::local(
2351 client_a.clone(),
2352 client_a.user_store.clone(),
2353 lang_registry.clone(),
2354 fs.clone(),
2355 cx,
2356 )
2357 });
2358 let (worktree_a, _) = project_a
2359 .update(cx_a, |p, cx| {
2360 p.find_or_create_local_worktree("/a", true, cx)
2361 })
2362 .await
2363 .unwrap();
2364 worktree_a
2365 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2366 .await;
2367 let project_id = project_a
2368 .update(cx_a, |project, _| project.next_remote_id())
2369 .await;
2370 project_a
2371 .update(cx_a, |project, cx| project.share(cx))
2372 .await
2373 .unwrap();
2374
2375 // Join that project as client B
2376 let _project_b = Project::remote(
2377 project_id,
2378 client_b.clone(),
2379 client_b.user_store.clone(),
2380 lang_registry.clone(),
2381 fs.clone(),
2382 &mut cx_b.to_async(),
2383 )
2384 .await
2385 .unwrap();
2386
2387 // Client A sees that a guest has joined.
2388 project_a
2389 .condition(cx_a, |p, _| p.collaborators().len() == 1)
2390 .await;
2391
2392 // Drop client B's connection and ensure client A observes client B leaving the project.
2393 client_b.disconnect(&cx_b.to_async()).unwrap();
2394 project_a
2395 .condition(cx_a, |p, _| p.collaborators().len() == 0)
2396 .await;
2397
2398 // Rejoin the project as client B
2399 let _project_b = Project::remote(
2400 project_id,
2401 client_b.clone(),
2402 client_b.user_store.clone(),
2403 lang_registry.clone(),
2404 fs.clone(),
2405 &mut cx_b.to_async(),
2406 )
2407 .await
2408 .unwrap();
2409
2410 // Client A sees that a guest has re-joined.
2411 project_a
2412 .condition(cx_a, |p, _| p.collaborators().len() == 1)
2413 .await;
2414
2415 // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2416 client_b.wait_for_current_user(cx_b).await;
2417 server.disconnect_client(client_b.current_user_id(cx_b));
2418 cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
2419 project_a
2420 .condition(cx_a, |p, _| p.collaborators().len() == 0)
2421 .await;
2422 }
2423
2424 #[gpui::test(iterations = 10)]
2425 async fn test_collaborating_with_diagnostics(
2426 cx_a: &mut TestAppContext,
2427 cx_b: &mut TestAppContext,
2428 ) {
2429 cx_a.foreground().forbid_parking();
2430 let lang_registry = Arc::new(LanguageRegistry::test());
2431 let fs = FakeFs::new(cx_a.background());
2432
2433 // Set up a fake language server.
2434 let mut language = Language::new(
2435 LanguageConfig {
2436 name: "Rust".into(),
2437 path_suffixes: vec!["rs".to_string()],
2438 ..Default::default()
2439 },
2440 Some(tree_sitter_rust::language()),
2441 );
2442 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2443 lang_registry.add(Arc::new(language));
2444
2445 // Connect to a server as 2 clients.
2446 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2447 let client_a = server.create_client(cx_a, "user_a").await;
2448 let client_b = server.create_client(cx_b, "user_b").await;
2449
2450 // Share a project as client A
2451 fs.insert_tree(
2452 "/a",
2453 json!({
2454 ".zed.toml": r#"collaborators = ["user_b"]"#,
2455 "a.rs": "let one = two",
2456 "other.rs": "",
2457 }),
2458 )
2459 .await;
2460 let project_a = cx_a.update(|cx| {
2461 Project::local(
2462 client_a.clone(),
2463 client_a.user_store.clone(),
2464 lang_registry.clone(),
2465 fs.clone(),
2466 cx,
2467 )
2468 });
2469 let (worktree_a, _) = project_a
2470 .update(cx_a, |p, cx| {
2471 p.find_or_create_local_worktree("/a", true, cx)
2472 })
2473 .await
2474 .unwrap();
2475 worktree_a
2476 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2477 .await;
2478 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2479 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2480 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2481
2482 // Cause the language server to start.
2483 let _ = cx_a
2484 .background()
2485 .spawn(project_a.update(cx_a, |project, cx| {
2486 project.open_buffer(
2487 ProjectPath {
2488 worktree_id,
2489 path: Path::new("other.rs").into(),
2490 },
2491 cx,
2492 )
2493 }))
2494 .await
2495 .unwrap();
2496
2497 // Simulate a language server reporting errors for a file.
2498 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2499 fake_language_server
2500 .receive_notification::<lsp::notification::DidOpenTextDocument>()
2501 .await;
2502 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2503 lsp::PublishDiagnosticsParams {
2504 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2505 version: None,
2506 diagnostics: vec![lsp::Diagnostic {
2507 severity: Some(lsp::DiagnosticSeverity::ERROR),
2508 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2509 message: "message 1".to_string(),
2510 ..Default::default()
2511 }],
2512 },
2513 );
2514
2515 // Wait for server to see the diagnostics update.
2516 server
2517 .condition(|store| {
2518 let worktree = store
2519 .project(project_id)
2520 .unwrap()
2521 .share
2522 .as_ref()
2523 .unwrap()
2524 .worktrees
2525 .get(&worktree_id.to_proto())
2526 .unwrap();
2527
2528 !worktree.diagnostic_summaries.is_empty()
2529 })
2530 .await;
2531
2532 // Join the worktree as client B.
2533 let project_b = Project::remote(
2534 project_id,
2535 client_b.clone(),
2536 client_b.user_store.clone(),
2537 lang_registry.clone(),
2538 fs.clone(),
2539 &mut cx_b.to_async(),
2540 )
2541 .await
2542 .unwrap();
2543
2544 project_b.read_with(cx_b, |project, cx| {
2545 assert_eq!(
2546 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2547 &[(
2548 ProjectPath {
2549 worktree_id,
2550 path: Arc::from(Path::new("a.rs")),
2551 },
2552 DiagnosticSummary {
2553 error_count: 1,
2554 warning_count: 0,
2555 ..Default::default()
2556 },
2557 )]
2558 )
2559 });
2560
2561 // Simulate a language server reporting more errors for a file.
2562 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2563 lsp::PublishDiagnosticsParams {
2564 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2565 version: None,
2566 diagnostics: vec![
2567 lsp::Diagnostic {
2568 severity: Some(lsp::DiagnosticSeverity::ERROR),
2569 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2570 message: "message 1".to_string(),
2571 ..Default::default()
2572 },
2573 lsp::Diagnostic {
2574 severity: Some(lsp::DiagnosticSeverity::WARNING),
2575 range: lsp::Range::new(
2576 lsp::Position::new(0, 10),
2577 lsp::Position::new(0, 13),
2578 ),
2579 message: "message 2".to_string(),
2580 ..Default::default()
2581 },
2582 ],
2583 },
2584 );
2585
2586 // Client b gets the updated summaries
2587 project_b
2588 .condition(&cx_b, |project, cx| {
2589 project.diagnostic_summaries(cx).collect::<Vec<_>>()
2590 == &[(
2591 ProjectPath {
2592 worktree_id,
2593 path: Arc::from(Path::new("a.rs")),
2594 },
2595 DiagnosticSummary {
2596 error_count: 1,
2597 warning_count: 1,
2598 ..Default::default()
2599 },
2600 )]
2601 })
2602 .await;
2603
2604 // Open the file with the errors on client B. They should be present.
2605 let buffer_b = cx_b
2606 .background()
2607 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2608 .await
2609 .unwrap();
2610
2611 buffer_b.read_with(cx_b, |buffer, _| {
2612 assert_eq!(
2613 buffer
2614 .snapshot()
2615 .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2616 .map(|entry| entry)
2617 .collect::<Vec<_>>(),
2618 &[
2619 DiagnosticEntry {
2620 range: Point::new(0, 4)..Point::new(0, 7),
2621 diagnostic: Diagnostic {
2622 group_id: 0,
2623 message: "message 1".to_string(),
2624 severity: lsp::DiagnosticSeverity::ERROR,
2625 is_primary: true,
2626 ..Default::default()
2627 }
2628 },
2629 DiagnosticEntry {
2630 range: Point::new(0, 10)..Point::new(0, 13),
2631 diagnostic: Diagnostic {
2632 group_id: 1,
2633 severity: lsp::DiagnosticSeverity::WARNING,
2634 message: "message 2".to_string(),
2635 is_primary: true,
2636 ..Default::default()
2637 }
2638 }
2639 ]
2640 );
2641 });
2642
2643 // Simulate a language server reporting no errors for a file.
2644 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2645 lsp::PublishDiagnosticsParams {
2646 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2647 version: None,
2648 diagnostics: vec![],
2649 },
2650 );
2651 project_a
2652 .condition(cx_a, |project, cx| {
2653 project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2654 })
2655 .await;
2656 project_b
2657 .condition(cx_b, |project, cx| {
2658 project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2659 })
2660 .await;
2661 }
2662
2663 #[gpui::test(iterations = 10)]
2664 async fn test_collaborating_with_completion(
2665 cx_a: &mut TestAppContext,
2666 cx_b: &mut TestAppContext,
2667 ) {
2668 cx_a.foreground().forbid_parking();
2669 let lang_registry = Arc::new(LanguageRegistry::test());
2670 let fs = FakeFs::new(cx_a.background());
2671
2672 // Set up a fake language server.
2673 let mut language = Language::new(
2674 LanguageConfig {
2675 name: "Rust".into(),
2676 path_suffixes: vec!["rs".to_string()],
2677 ..Default::default()
2678 },
2679 Some(tree_sitter_rust::language()),
2680 );
2681 let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
2682 capabilities: lsp::ServerCapabilities {
2683 completion_provider: Some(lsp::CompletionOptions {
2684 trigger_characters: Some(vec![".".to_string()]),
2685 ..Default::default()
2686 }),
2687 ..Default::default()
2688 },
2689 ..Default::default()
2690 });
2691 lang_registry.add(Arc::new(language));
2692
2693 // Connect to a server as 2 clients.
2694 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2695 let client_a = server.create_client(cx_a, "user_a").await;
2696 let client_b = server.create_client(cx_b, "user_b").await;
2697
2698 // Share a project as client A
2699 fs.insert_tree(
2700 "/a",
2701 json!({
2702 ".zed.toml": r#"collaborators = ["user_b"]"#,
2703 "main.rs": "fn main() { a }",
2704 "other.rs": "",
2705 }),
2706 )
2707 .await;
2708 let project_a = cx_a.update(|cx| {
2709 Project::local(
2710 client_a.clone(),
2711 client_a.user_store.clone(),
2712 lang_registry.clone(),
2713 fs.clone(),
2714 cx,
2715 )
2716 });
2717 let (worktree_a, _) = project_a
2718 .update(cx_a, |p, cx| {
2719 p.find_or_create_local_worktree("/a", true, cx)
2720 })
2721 .await
2722 .unwrap();
2723 worktree_a
2724 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2725 .await;
2726 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2727 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2728 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2729
2730 // Join the worktree as client B.
2731 let project_b = Project::remote(
2732 project_id,
2733 client_b.clone(),
2734 client_b.user_store.clone(),
2735 lang_registry.clone(),
2736 fs.clone(),
2737 &mut cx_b.to_async(),
2738 )
2739 .await
2740 .unwrap();
2741
2742 // Open a file in an editor as the guest.
2743 let buffer_b = project_b
2744 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2745 .await
2746 .unwrap();
2747 let (window_b, _) = cx_b.add_window(|_| EmptyView);
2748 let editor_b = cx_b.add_view(window_b, |cx| {
2749 Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
2750 });
2751
2752 let fake_language_server = fake_language_servers.next().await.unwrap();
2753 buffer_b
2754 .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2755 .await;
2756
2757 // Type a completion trigger character as the guest.
2758 editor_b.update(cx_b, |editor, cx| {
2759 editor.select_ranges([13..13], None, cx);
2760 editor.handle_input(&Input(".".into()), cx);
2761 cx.focus(&editor_b);
2762 });
2763
2764 // Receive a completion request as the host's language server.
2765 // Return some completions from the host's language server.
2766 cx_a.foreground().start_waiting();
2767 fake_language_server
2768 .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
2769 assert_eq!(
2770 params.text_document_position.text_document.uri,
2771 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2772 );
2773 assert_eq!(
2774 params.text_document_position.position,
2775 lsp::Position::new(0, 14),
2776 );
2777
2778 Ok(Some(lsp::CompletionResponse::Array(vec![
2779 lsp::CompletionItem {
2780 label: "first_method(…)".into(),
2781 detail: Some("fn(&mut self, B) -> C".into()),
2782 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2783 new_text: "first_method($1)".to_string(),
2784 range: lsp::Range::new(
2785 lsp::Position::new(0, 14),
2786 lsp::Position::new(0, 14),
2787 ),
2788 })),
2789 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2790 ..Default::default()
2791 },
2792 lsp::CompletionItem {
2793 label: "second_method(…)".into(),
2794 detail: Some("fn(&mut self, C) -> D<E>".into()),
2795 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2796 new_text: "second_method()".to_string(),
2797 range: lsp::Range::new(
2798 lsp::Position::new(0, 14),
2799 lsp::Position::new(0, 14),
2800 ),
2801 })),
2802 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2803 ..Default::default()
2804 },
2805 ])))
2806 })
2807 .next()
2808 .await
2809 .unwrap();
2810 cx_a.foreground().finish_waiting();
2811
2812 // Open the buffer on the host.
2813 let buffer_a = project_a
2814 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2815 .await
2816 .unwrap();
2817 buffer_a
2818 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2819 .await;
2820
2821 // Confirm a completion on the guest.
2822 editor_b
2823 .condition(&cx_b, |editor, _| editor.context_menu_visible())
2824 .await;
2825 editor_b.update(cx_b, |editor, cx| {
2826 editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
2827 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2828 });
2829
2830 // Return a resolved completion from the host's language server.
2831 // The resolved completion has an additional text edit.
2832 fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
2833 |params, _| async move {
2834 assert_eq!(params.label, "first_method(…)");
2835 Ok(lsp::CompletionItem {
2836 label: "first_method(…)".into(),
2837 detail: Some("fn(&mut self, B) -> C".into()),
2838 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2839 new_text: "first_method($1)".to_string(),
2840 range: lsp::Range::new(
2841 lsp::Position::new(0, 14),
2842 lsp::Position::new(0, 14),
2843 ),
2844 })),
2845 additional_text_edits: Some(vec![lsp::TextEdit {
2846 new_text: "use d::SomeTrait;\n".to_string(),
2847 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2848 }]),
2849 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2850 ..Default::default()
2851 })
2852 },
2853 );
2854
2855 // The additional edit is applied.
2856 buffer_a
2857 .condition(&cx_a, |buffer, _| {
2858 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2859 })
2860 .await;
2861 buffer_b
2862 .condition(&cx_b, |buffer, _| {
2863 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2864 })
2865 .await;
2866 }
2867
2868 #[gpui::test(iterations = 10)]
2869 async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2870 cx_a.foreground().forbid_parking();
2871 let lang_registry = Arc::new(LanguageRegistry::test());
2872 let fs = FakeFs::new(cx_a.background());
2873
2874 // Connect to a server as 2 clients.
2875 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2876 let client_a = server.create_client(cx_a, "user_a").await;
2877 let client_b = server.create_client(cx_b, "user_b").await;
2878
2879 // Share a project as client A
2880 fs.insert_tree(
2881 "/a",
2882 json!({
2883 ".zed.toml": r#"collaborators = ["user_b"]"#,
2884 "a.rs": "let one = 1;",
2885 }),
2886 )
2887 .await;
2888 let project_a = cx_a.update(|cx| {
2889 Project::local(
2890 client_a.clone(),
2891 client_a.user_store.clone(),
2892 lang_registry.clone(),
2893 fs.clone(),
2894 cx,
2895 )
2896 });
2897 let (worktree_a, _) = project_a
2898 .update(cx_a, |p, cx| {
2899 p.find_or_create_local_worktree("/a", true, cx)
2900 })
2901 .await
2902 .unwrap();
2903 worktree_a
2904 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2905 .await;
2906 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2907 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2908 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2909 let buffer_a = project_a
2910 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
2911 .await
2912 .unwrap();
2913
2914 // Join the worktree as client B.
2915 let project_b = Project::remote(
2916 project_id,
2917 client_b.clone(),
2918 client_b.user_store.clone(),
2919 lang_registry.clone(),
2920 fs.clone(),
2921 &mut cx_b.to_async(),
2922 )
2923 .await
2924 .unwrap();
2925
2926 let buffer_b = cx_b
2927 .background()
2928 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2929 .await
2930 .unwrap();
2931 buffer_b.update(cx_b, |buffer, cx| {
2932 buffer.edit([(4..7, "six")], cx);
2933 buffer.edit([(10..11, "6")], cx);
2934 assert_eq!(buffer.text(), "let six = 6;");
2935 assert!(buffer.is_dirty());
2936 assert!(!buffer.has_conflict());
2937 });
2938 buffer_a
2939 .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
2940 .await;
2941
2942 fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
2943 .await
2944 .unwrap();
2945 buffer_a
2946 .condition(cx_a, |buffer, _| buffer.has_conflict())
2947 .await;
2948 buffer_b
2949 .condition(cx_b, |buffer, _| buffer.has_conflict())
2950 .await;
2951
2952 project_b
2953 .update(cx_b, |project, cx| {
2954 project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
2955 })
2956 .await
2957 .unwrap();
2958 buffer_a.read_with(cx_a, |buffer, _| {
2959 assert_eq!(buffer.text(), "let seven = 7;");
2960 assert!(!buffer.is_dirty());
2961 assert!(!buffer.has_conflict());
2962 });
2963 buffer_b.read_with(cx_b, |buffer, _| {
2964 assert_eq!(buffer.text(), "let seven = 7;");
2965 assert!(!buffer.is_dirty());
2966 assert!(!buffer.has_conflict());
2967 });
2968
2969 buffer_a.update(cx_a, |buffer, cx| {
2970 // Undoing on the host is a no-op when the reload was initiated by the guest.
2971 buffer.undo(cx);
2972 assert_eq!(buffer.text(), "let seven = 7;");
2973 assert!(!buffer.is_dirty());
2974 assert!(!buffer.has_conflict());
2975 });
2976 buffer_b.update(cx_b, |buffer, cx| {
2977 // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
2978 buffer.undo(cx);
2979 assert_eq!(buffer.text(), "let six = 6;");
2980 assert!(buffer.is_dirty());
2981 assert!(!buffer.has_conflict());
2982 });
2983 }
2984
2985 #[gpui::test(iterations = 10)]
2986 async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2987 cx_a.foreground().forbid_parking();
2988 let lang_registry = Arc::new(LanguageRegistry::test());
2989 let fs = FakeFs::new(cx_a.background());
2990
2991 // Set up a fake language server.
2992 let mut language = Language::new(
2993 LanguageConfig {
2994 name: "Rust".into(),
2995 path_suffixes: vec!["rs".to_string()],
2996 ..Default::default()
2997 },
2998 Some(tree_sitter_rust::language()),
2999 );
3000 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3001 lang_registry.add(Arc::new(language));
3002
3003 // Connect to a server as 2 clients.
3004 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3005 let client_a = server.create_client(cx_a, "user_a").await;
3006 let client_b = server.create_client(cx_b, "user_b").await;
3007
3008 // Share a project as client A
3009 fs.insert_tree(
3010 "/a",
3011 json!({
3012 ".zed.toml": r#"collaborators = ["user_b"]"#,
3013 "a.rs": "let one = two",
3014 }),
3015 )
3016 .await;
3017 let project_a = cx_a.update(|cx| {
3018 Project::local(
3019 client_a.clone(),
3020 client_a.user_store.clone(),
3021 lang_registry.clone(),
3022 fs.clone(),
3023 cx,
3024 )
3025 });
3026 let (worktree_a, _) = project_a
3027 .update(cx_a, |p, cx| {
3028 p.find_or_create_local_worktree("/a", true, cx)
3029 })
3030 .await
3031 .unwrap();
3032 worktree_a
3033 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3034 .await;
3035 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3036 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3037 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3038
3039 // Join the worktree as client B.
3040 let project_b = Project::remote(
3041 project_id,
3042 client_b.clone(),
3043 client_b.user_store.clone(),
3044 lang_registry.clone(),
3045 fs.clone(),
3046 &mut cx_b.to_async(),
3047 )
3048 .await
3049 .unwrap();
3050
3051 let buffer_b = cx_b
3052 .background()
3053 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3054 .await
3055 .unwrap();
3056
3057 let fake_language_server = fake_language_servers.next().await.unwrap();
3058 fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
3059 Ok(Some(vec![
3060 lsp::TextEdit {
3061 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
3062 new_text: "h".to_string(),
3063 },
3064 lsp::TextEdit {
3065 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
3066 new_text: "y".to_string(),
3067 },
3068 ]))
3069 });
3070
3071 project_b
3072 .update(cx_b, |project, cx| {
3073 project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
3074 })
3075 .await
3076 .unwrap();
3077 assert_eq!(
3078 buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
3079 "let honey = two"
3080 );
3081 }
3082
3083 #[gpui::test(iterations = 10)]
3084 async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3085 cx_a.foreground().forbid_parking();
3086 let lang_registry = Arc::new(LanguageRegistry::test());
3087 let fs = FakeFs::new(cx_a.background());
3088 fs.insert_tree(
3089 "/root-1",
3090 json!({
3091 ".zed.toml": r#"collaborators = ["user_b"]"#,
3092 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
3093 }),
3094 )
3095 .await;
3096 fs.insert_tree(
3097 "/root-2",
3098 json!({
3099 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
3100 }),
3101 )
3102 .await;
3103
3104 // Set up a fake language server.
3105 let mut language = Language::new(
3106 LanguageConfig {
3107 name: "Rust".into(),
3108 path_suffixes: vec!["rs".to_string()],
3109 ..Default::default()
3110 },
3111 Some(tree_sitter_rust::language()),
3112 );
3113 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3114 lang_registry.add(Arc::new(language));
3115
3116 // Connect to a server as 2 clients.
3117 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3118 let client_a = server.create_client(cx_a, "user_a").await;
3119 let client_b = server.create_client(cx_b, "user_b").await;
3120
3121 // Share a project as client A
3122 let project_a = cx_a.update(|cx| {
3123 Project::local(
3124 client_a.clone(),
3125 client_a.user_store.clone(),
3126 lang_registry.clone(),
3127 fs.clone(),
3128 cx,
3129 )
3130 });
3131 let (worktree_a, _) = project_a
3132 .update(cx_a, |p, cx| {
3133 p.find_or_create_local_worktree("/root-1", true, cx)
3134 })
3135 .await
3136 .unwrap();
3137 worktree_a
3138 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3139 .await;
3140 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3141 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3142 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3143
3144 // Join the worktree as client B.
3145 let project_b = Project::remote(
3146 project_id,
3147 client_b.clone(),
3148 client_b.user_store.clone(),
3149 lang_registry.clone(),
3150 fs.clone(),
3151 &mut cx_b.to_async(),
3152 )
3153 .await
3154 .unwrap();
3155
3156 // Open the file on client B.
3157 let buffer_b = cx_b
3158 .background()
3159 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3160 .await
3161 .unwrap();
3162
3163 // Request the definition of a symbol as the guest.
3164 let fake_language_server = fake_language_servers.next().await.unwrap();
3165 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3166 |_, _| async move {
3167 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3168 lsp::Location::new(
3169 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3170 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3171 ),
3172 )))
3173 },
3174 );
3175
3176 let definitions_1 = project_b
3177 .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
3178 .await
3179 .unwrap();
3180 cx_b.read(|cx| {
3181 assert_eq!(definitions_1.len(), 1);
3182 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3183 let target_buffer = definitions_1[0].buffer.read(cx);
3184 assert_eq!(
3185 target_buffer.text(),
3186 "const TWO: usize = 2;\nconst THREE: usize = 3;"
3187 );
3188 assert_eq!(
3189 definitions_1[0].range.to_point(target_buffer),
3190 Point::new(0, 6)..Point::new(0, 9)
3191 );
3192 });
3193
3194 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
3195 // the previous call to `definition`.
3196 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3197 |_, _| async move {
3198 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3199 lsp::Location::new(
3200 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3201 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
3202 ),
3203 )))
3204 },
3205 );
3206
3207 let definitions_2 = project_b
3208 .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
3209 .await
3210 .unwrap();
3211 cx_b.read(|cx| {
3212 assert_eq!(definitions_2.len(), 1);
3213 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3214 let target_buffer = definitions_2[0].buffer.read(cx);
3215 assert_eq!(
3216 target_buffer.text(),
3217 "const TWO: usize = 2;\nconst THREE: usize = 3;"
3218 );
3219 assert_eq!(
3220 definitions_2[0].range.to_point(target_buffer),
3221 Point::new(1, 6)..Point::new(1, 11)
3222 );
3223 });
3224 assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
3225 }
3226
3227 #[gpui::test(iterations = 10)]
3228 async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3229 cx_a.foreground().forbid_parking();
3230 let lang_registry = Arc::new(LanguageRegistry::test());
3231 let fs = FakeFs::new(cx_a.background());
3232 fs.insert_tree(
3233 "/root-1",
3234 json!({
3235 ".zed.toml": r#"collaborators = ["user_b"]"#,
3236 "one.rs": "const ONE: usize = 1;",
3237 "two.rs": "const TWO: usize = one::ONE + one::ONE;",
3238 }),
3239 )
3240 .await;
3241 fs.insert_tree(
3242 "/root-2",
3243 json!({
3244 "three.rs": "const THREE: usize = two::TWO + one::ONE;",
3245 }),
3246 )
3247 .await;
3248
3249 // Set up a fake language server.
3250 let mut language = Language::new(
3251 LanguageConfig {
3252 name: "Rust".into(),
3253 path_suffixes: vec!["rs".to_string()],
3254 ..Default::default()
3255 },
3256 Some(tree_sitter_rust::language()),
3257 );
3258 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3259 lang_registry.add(Arc::new(language));
3260
3261 // Connect to a server as 2 clients.
3262 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3263 let client_a = server.create_client(cx_a, "user_a").await;
3264 let client_b = server.create_client(cx_b, "user_b").await;
3265
3266 // Share a project as client A
3267 let project_a = cx_a.update(|cx| {
3268 Project::local(
3269 client_a.clone(),
3270 client_a.user_store.clone(),
3271 lang_registry.clone(),
3272 fs.clone(),
3273 cx,
3274 )
3275 });
3276 let (worktree_a, _) = project_a
3277 .update(cx_a, |p, cx| {
3278 p.find_or_create_local_worktree("/root-1", true, cx)
3279 })
3280 .await
3281 .unwrap();
3282 worktree_a
3283 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3284 .await;
3285 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3286 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3287 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3288
3289 // Join the worktree as client B.
3290 let project_b = Project::remote(
3291 project_id,
3292 client_b.clone(),
3293 client_b.user_store.clone(),
3294 lang_registry.clone(),
3295 fs.clone(),
3296 &mut cx_b.to_async(),
3297 )
3298 .await
3299 .unwrap();
3300
3301 // Open the file on client B.
3302 let buffer_b = cx_b
3303 .background()
3304 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3305 .await
3306 .unwrap();
3307
3308 // Request references to a symbol as the guest.
3309 let fake_language_server = fake_language_servers.next().await.unwrap();
3310 fake_language_server.handle_request::<lsp::request::References, _, _>(
3311 |params, _| async move {
3312 assert_eq!(
3313 params.text_document_position.text_document.uri.as_str(),
3314 "file:///root-1/one.rs"
3315 );
3316 Ok(Some(vec![
3317 lsp::Location {
3318 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3319 range: lsp::Range::new(
3320 lsp::Position::new(0, 24),
3321 lsp::Position::new(0, 27),
3322 ),
3323 },
3324 lsp::Location {
3325 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3326 range: lsp::Range::new(
3327 lsp::Position::new(0, 35),
3328 lsp::Position::new(0, 38),
3329 ),
3330 },
3331 lsp::Location {
3332 uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3333 range: lsp::Range::new(
3334 lsp::Position::new(0, 37),
3335 lsp::Position::new(0, 40),
3336 ),
3337 },
3338 ]))
3339 },
3340 );
3341
3342 let references = project_b
3343 .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3344 .await
3345 .unwrap();
3346 cx_b.read(|cx| {
3347 assert_eq!(references.len(), 3);
3348 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3349
3350 let two_buffer = references[0].buffer.read(cx);
3351 let three_buffer = references[2].buffer.read(cx);
3352 assert_eq!(
3353 two_buffer.file().unwrap().path().as_ref(),
3354 Path::new("two.rs")
3355 );
3356 assert_eq!(references[1].buffer, references[0].buffer);
3357 assert_eq!(
3358 three_buffer.file().unwrap().full_path(cx),
3359 Path::new("three.rs")
3360 );
3361
3362 assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3363 assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3364 assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3365 });
3366 }
3367
3368 #[gpui::test(iterations = 10)]
3369 async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3370 cx_a.foreground().forbid_parking();
3371 let lang_registry = Arc::new(LanguageRegistry::test());
3372 let fs = FakeFs::new(cx_a.background());
3373 fs.insert_tree(
3374 "/root-1",
3375 json!({
3376 ".zed.toml": r#"collaborators = ["user_b"]"#,
3377 "a": "hello world",
3378 "b": "goodnight moon",
3379 "c": "a world of goo",
3380 "d": "world champion of clown world",
3381 }),
3382 )
3383 .await;
3384 fs.insert_tree(
3385 "/root-2",
3386 json!({
3387 "e": "disney world is fun",
3388 }),
3389 )
3390 .await;
3391
3392 // Connect to a server as 2 clients.
3393 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3394 let client_a = server.create_client(cx_a, "user_a").await;
3395 let client_b = server.create_client(cx_b, "user_b").await;
3396
3397 // Share a project as client A
3398 let project_a = cx_a.update(|cx| {
3399 Project::local(
3400 client_a.clone(),
3401 client_a.user_store.clone(),
3402 lang_registry.clone(),
3403 fs.clone(),
3404 cx,
3405 )
3406 });
3407 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3408
3409 let (worktree_1, _) = project_a
3410 .update(cx_a, |p, cx| {
3411 p.find_or_create_local_worktree("/root-1", true, cx)
3412 })
3413 .await
3414 .unwrap();
3415 worktree_1
3416 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3417 .await;
3418 let (worktree_2, _) = project_a
3419 .update(cx_a, |p, cx| {
3420 p.find_or_create_local_worktree("/root-2", true, cx)
3421 })
3422 .await
3423 .unwrap();
3424 worktree_2
3425 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3426 .await;
3427
3428 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3429
3430 // Join the worktree as client B.
3431 let project_b = Project::remote(
3432 project_id,
3433 client_b.clone(),
3434 client_b.user_store.clone(),
3435 lang_registry.clone(),
3436 fs.clone(),
3437 &mut cx_b.to_async(),
3438 )
3439 .await
3440 .unwrap();
3441
3442 let results = project_b
3443 .update(cx_b, |project, cx| {
3444 project.search(SearchQuery::text("world", false, false), cx)
3445 })
3446 .await
3447 .unwrap();
3448
3449 let mut ranges_by_path = results
3450 .into_iter()
3451 .map(|(buffer, ranges)| {
3452 buffer.read_with(cx_b, |buffer, cx| {
3453 let path = buffer.file().unwrap().full_path(cx);
3454 let offset_ranges = ranges
3455 .into_iter()
3456 .map(|range| range.to_offset(buffer))
3457 .collect::<Vec<_>>();
3458 (path, offset_ranges)
3459 })
3460 })
3461 .collect::<Vec<_>>();
3462 ranges_by_path.sort_by_key(|(path, _)| path.clone());
3463
3464 assert_eq!(
3465 ranges_by_path,
3466 &[
3467 (PathBuf::from("root-1/a"), vec![6..11]),
3468 (PathBuf::from("root-1/c"), vec![2..7]),
3469 (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3470 (PathBuf::from("root-2/e"), vec![7..12]),
3471 ]
3472 );
3473 }
3474
3475 #[gpui::test(iterations = 10)]
3476 async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3477 cx_a.foreground().forbid_parking();
3478 let lang_registry = Arc::new(LanguageRegistry::test());
3479 let fs = FakeFs::new(cx_a.background());
3480 fs.insert_tree(
3481 "/root-1",
3482 json!({
3483 ".zed.toml": r#"collaborators = ["user_b"]"#,
3484 "main.rs": "fn double(number: i32) -> i32 { number + number }",
3485 }),
3486 )
3487 .await;
3488
3489 // Set up a fake language server.
3490 let mut language = Language::new(
3491 LanguageConfig {
3492 name: "Rust".into(),
3493 path_suffixes: vec!["rs".to_string()],
3494 ..Default::default()
3495 },
3496 Some(tree_sitter_rust::language()),
3497 );
3498 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3499 lang_registry.add(Arc::new(language));
3500
3501 // Connect to a server as 2 clients.
3502 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3503 let client_a = server.create_client(cx_a, "user_a").await;
3504 let client_b = server.create_client(cx_b, "user_b").await;
3505
3506 // Share a project as client A
3507 let project_a = cx_a.update(|cx| {
3508 Project::local(
3509 client_a.clone(),
3510 client_a.user_store.clone(),
3511 lang_registry.clone(),
3512 fs.clone(),
3513 cx,
3514 )
3515 });
3516 let (worktree_a, _) = project_a
3517 .update(cx_a, |p, cx| {
3518 p.find_or_create_local_worktree("/root-1", true, cx)
3519 })
3520 .await
3521 .unwrap();
3522 worktree_a
3523 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3524 .await;
3525 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3526 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3527 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3528
3529 // Join the worktree as client B.
3530 let project_b = Project::remote(
3531 project_id,
3532 client_b.clone(),
3533 client_b.user_store.clone(),
3534 lang_registry.clone(),
3535 fs.clone(),
3536 &mut cx_b.to_async(),
3537 )
3538 .await
3539 .unwrap();
3540
3541 // Open the file on client B.
3542 let buffer_b = cx_b
3543 .background()
3544 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
3545 .await
3546 .unwrap();
3547
3548 // Request document highlights as the guest.
3549 let fake_language_server = fake_language_servers.next().await.unwrap();
3550 fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
3551 |params, _| async move {
3552 assert_eq!(
3553 params
3554 .text_document_position_params
3555 .text_document
3556 .uri
3557 .as_str(),
3558 "file:///root-1/main.rs"
3559 );
3560 assert_eq!(
3561 params.text_document_position_params.position,
3562 lsp::Position::new(0, 34)
3563 );
3564 Ok(Some(vec![
3565 lsp::DocumentHighlight {
3566 kind: Some(lsp::DocumentHighlightKind::WRITE),
3567 range: lsp::Range::new(
3568 lsp::Position::new(0, 10),
3569 lsp::Position::new(0, 16),
3570 ),
3571 },
3572 lsp::DocumentHighlight {
3573 kind: Some(lsp::DocumentHighlightKind::READ),
3574 range: lsp::Range::new(
3575 lsp::Position::new(0, 32),
3576 lsp::Position::new(0, 38),
3577 ),
3578 },
3579 lsp::DocumentHighlight {
3580 kind: Some(lsp::DocumentHighlightKind::READ),
3581 range: lsp::Range::new(
3582 lsp::Position::new(0, 41),
3583 lsp::Position::new(0, 47),
3584 ),
3585 },
3586 ]))
3587 },
3588 );
3589
3590 let highlights = project_b
3591 .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
3592 .await
3593 .unwrap();
3594 buffer_b.read_with(cx_b, |buffer, _| {
3595 let snapshot = buffer.snapshot();
3596
3597 let highlights = highlights
3598 .into_iter()
3599 .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3600 .collect::<Vec<_>>();
3601 assert_eq!(
3602 highlights,
3603 &[
3604 (lsp::DocumentHighlightKind::WRITE, 10..16),
3605 (lsp::DocumentHighlightKind::READ, 32..38),
3606 (lsp::DocumentHighlightKind::READ, 41..47)
3607 ]
3608 )
3609 });
3610 }
3611
3612 #[gpui::test(iterations = 10)]
3613 async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3614 cx_a.foreground().forbid_parking();
3615 let lang_registry = Arc::new(LanguageRegistry::test());
3616 let fs = FakeFs::new(cx_a.background());
3617 fs.insert_tree(
3618 "/code",
3619 json!({
3620 "crate-1": {
3621 ".zed.toml": r#"collaborators = ["user_b"]"#,
3622 "one.rs": "const ONE: usize = 1;",
3623 },
3624 "crate-2": {
3625 "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3626 },
3627 "private": {
3628 "passwords.txt": "the-password",
3629 }
3630 }),
3631 )
3632 .await;
3633
3634 // Set up a fake language server.
3635 let mut language = Language::new(
3636 LanguageConfig {
3637 name: "Rust".into(),
3638 path_suffixes: vec!["rs".to_string()],
3639 ..Default::default()
3640 },
3641 Some(tree_sitter_rust::language()),
3642 );
3643 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3644 lang_registry.add(Arc::new(language));
3645
3646 // Connect to a server as 2 clients.
3647 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3648 let client_a = server.create_client(cx_a, "user_a").await;
3649 let client_b = server.create_client(cx_b, "user_b").await;
3650
3651 // Share a project as client A
3652 let project_a = cx_a.update(|cx| {
3653 Project::local(
3654 client_a.clone(),
3655 client_a.user_store.clone(),
3656 lang_registry.clone(),
3657 fs.clone(),
3658 cx,
3659 )
3660 });
3661 let (worktree_a, _) = project_a
3662 .update(cx_a, |p, cx| {
3663 p.find_or_create_local_worktree("/code/crate-1", true, cx)
3664 })
3665 .await
3666 .unwrap();
3667 worktree_a
3668 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3669 .await;
3670 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3671 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3672 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3673
3674 // Join the worktree as client B.
3675 let project_b = Project::remote(
3676 project_id,
3677 client_b.clone(),
3678 client_b.user_store.clone(),
3679 lang_registry.clone(),
3680 fs.clone(),
3681 &mut cx_b.to_async(),
3682 )
3683 .await
3684 .unwrap();
3685
3686 // Cause the language server to start.
3687 let _buffer = cx_b
3688 .background()
3689 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3690 .await
3691 .unwrap();
3692
3693 let fake_language_server = fake_language_servers.next().await.unwrap();
3694 fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
3695 |_, _| async move {
3696 #[allow(deprecated)]
3697 Ok(Some(vec![lsp::SymbolInformation {
3698 name: "TWO".into(),
3699 location: lsp::Location {
3700 uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3701 range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3702 },
3703 kind: lsp::SymbolKind::CONSTANT,
3704 tags: None,
3705 container_name: None,
3706 deprecated: None,
3707 }]))
3708 },
3709 );
3710
3711 // Request the definition of a symbol as the guest.
3712 let symbols = project_b
3713 .update(cx_b, |p, cx| p.symbols("two", cx))
3714 .await
3715 .unwrap();
3716 assert_eq!(symbols.len(), 1);
3717 assert_eq!(symbols[0].name, "TWO");
3718
3719 // Open one of the returned symbols.
3720 let buffer_b_2 = project_b
3721 .update(cx_b, |project, cx| {
3722 project.open_buffer_for_symbol(&symbols[0], cx)
3723 })
3724 .await
3725 .unwrap();
3726 buffer_b_2.read_with(cx_b, |buffer, _| {
3727 assert_eq!(
3728 buffer.file().unwrap().path().as_ref(),
3729 Path::new("../crate-2/two.rs")
3730 );
3731 });
3732
3733 // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3734 let mut fake_symbol = symbols[0].clone();
3735 fake_symbol.path = Path::new("/code/secrets").into();
3736 let error = project_b
3737 .update(cx_b, |project, cx| {
3738 project.open_buffer_for_symbol(&fake_symbol, cx)
3739 })
3740 .await
3741 .unwrap_err();
3742 assert!(error.to_string().contains("invalid symbol signature"));
3743 }
3744
3745 #[gpui::test(iterations = 10)]
3746 async fn test_open_buffer_while_getting_definition_pointing_to_it(
3747 cx_a: &mut TestAppContext,
3748 cx_b: &mut TestAppContext,
3749 mut rng: StdRng,
3750 ) {
3751 cx_a.foreground().forbid_parking();
3752 let lang_registry = Arc::new(LanguageRegistry::test());
3753 let fs = FakeFs::new(cx_a.background());
3754 fs.insert_tree(
3755 "/root",
3756 json!({
3757 ".zed.toml": r#"collaborators = ["user_b"]"#,
3758 "a.rs": "const ONE: usize = b::TWO;",
3759 "b.rs": "const TWO: usize = 2",
3760 }),
3761 )
3762 .await;
3763
3764 // Set up a fake language server.
3765 let mut language = Language::new(
3766 LanguageConfig {
3767 name: "Rust".into(),
3768 path_suffixes: vec!["rs".to_string()],
3769 ..Default::default()
3770 },
3771 Some(tree_sitter_rust::language()),
3772 );
3773 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3774 lang_registry.add(Arc::new(language));
3775
3776 // Connect to a server as 2 clients.
3777 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3778 let client_a = server.create_client(cx_a, "user_a").await;
3779 let client_b = server.create_client(cx_b, "user_b").await;
3780
3781 // Share a project as client A
3782 let project_a = cx_a.update(|cx| {
3783 Project::local(
3784 client_a.clone(),
3785 client_a.user_store.clone(),
3786 lang_registry.clone(),
3787 fs.clone(),
3788 cx,
3789 )
3790 });
3791
3792 let (worktree_a, _) = project_a
3793 .update(cx_a, |p, cx| {
3794 p.find_or_create_local_worktree("/root", true, cx)
3795 })
3796 .await
3797 .unwrap();
3798 worktree_a
3799 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3800 .await;
3801 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3802 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3803 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3804
3805 // Join the worktree as client B.
3806 let project_b = Project::remote(
3807 project_id,
3808 client_b.clone(),
3809 client_b.user_store.clone(),
3810 lang_registry.clone(),
3811 fs.clone(),
3812 &mut cx_b.to_async(),
3813 )
3814 .await
3815 .unwrap();
3816
3817 let buffer_b1 = cx_b
3818 .background()
3819 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3820 .await
3821 .unwrap();
3822
3823 let fake_language_server = fake_language_servers.next().await.unwrap();
3824 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3825 |_, _| async move {
3826 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3827 lsp::Location::new(
3828 lsp::Url::from_file_path("/root/b.rs").unwrap(),
3829 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3830 ),
3831 )))
3832 },
3833 );
3834
3835 let definitions;
3836 let buffer_b2;
3837 if rng.gen() {
3838 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3839 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3840 } else {
3841 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3842 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3843 }
3844
3845 let buffer_b2 = buffer_b2.await.unwrap();
3846 let definitions = definitions.await.unwrap();
3847 assert_eq!(definitions.len(), 1);
3848 assert_eq!(definitions[0].buffer, buffer_b2);
3849 }
3850
3851 #[gpui::test(iterations = 10)]
3852 async fn test_collaborating_with_code_actions(
3853 cx_a: &mut TestAppContext,
3854 cx_b: &mut TestAppContext,
3855 ) {
3856 cx_a.foreground().forbid_parking();
3857 let lang_registry = Arc::new(LanguageRegistry::test());
3858 let fs = FakeFs::new(cx_a.background());
3859 cx_b.update(|cx| editor::init(cx));
3860
3861 // Set up a fake language server.
3862 let mut language = Language::new(
3863 LanguageConfig {
3864 name: "Rust".into(),
3865 path_suffixes: vec!["rs".to_string()],
3866 ..Default::default()
3867 },
3868 Some(tree_sitter_rust::language()),
3869 );
3870 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3871 lang_registry.add(Arc::new(language));
3872
3873 // Connect to a server as 2 clients.
3874 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3875 let client_a = server.create_client(cx_a, "user_a").await;
3876 let client_b = server.create_client(cx_b, "user_b").await;
3877
3878 // Share a project as client A
3879 fs.insert_tree(
3880 "/a",
3881 json!({
3882 ".zed.toml": r#"collaborators = ["user_b"]"#,
3883 "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3884 "other.rs": "pub fn foo() -> usize { 4 }",
3885 }),
3886 )
3887 .await;
3888 let project_a = cx_a.update(|cx| {
3889 Project::local(
3890 client_a.clone(),
3891 client_a.user_store.clone(),
3892 lang_registry.clone(),
3893 fs.clone(),
3894 cx,
3895 )
3896 });
3897 let (worktree_a, _) = project_a
3898 .update(cx_a, |p, cx| {
3899 p.find_or_create_local_worktree("/a", true, cx)
3900 })
3901 .await
3902 .unwrap();
3903 worktree_a
3904 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3905 .await;
3906 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3907 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3908 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3909
3910 // Join the worktree as client B.
3911 let project_b = Project::remote(
3912 project_id,
3913 client_b.clone(),
3914 client_b.user_store.clone(),
3915 lang_registry.clone(),
3916 fs.clone(),
3917 &mut cx_b.to_async(),
3918 )
3919 .await
3920 .unwrap();
3921 let mut params = cx_b.update(WorkspaceParams::test);
3922 params.languages = lang_registry.clone();
3923 params.client = client_b.client.clone();
3924 params.user_store = client_b.user_store.clone();
3925 params.project = project_b;
3926
3927 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
3928 let editor_b = workspace_b
3929 .update(cx_b, |workspace, cx| {
3930 workspace.open_path((worktree_id, "main.rs"), true, cx)
3931 })
3932 .await
3933 .unwrap()
3934 .downcast::<Editor>()
3935 .unwrap();
3936
3937 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3938 fake_language_server
3939 .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
3940 assert_eq!(
3941 params.text_document.uri,
3942 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3943 );
3944 assert_eq!(params.range.start, lsp::Position::new(0, 0));
3945 assert_eq!(params.range.end, lsp::Position::new(0, 0));
3946 Ok(None)
3947 })
3948 .next()
3949 .await;
3950
3951 // Move cursor to a location that contains code actions.
3952 editor_b.update(cx_b, |editor, cx| {
3953 editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3954 cx.focus(&editor_b);
3955 });
3956
3957 fake_language_server
3958 .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
3959 assert_eq!(
3960 params.text_document.uri,
3961 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3962 );
3963 assert_eq!(params.range.start, lsp::Position::new(1, 31));
3964 assert_eq!(params.range.end, lsp::Position::new(1, 31));
3965
3966 Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
3967 lsp::CodeAction {
3968 title: "Inline into all callers".to_string(),
3969 edit: Some(lsp::WorkspaceEdit {
3970 changes: Some(
3971 [
3972 (
3973 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3974 vec![lsp::TextEdit::new(
3975 lsp::Range::new(
3976 lsp::Position::new(1, 22),
3977 lsp::Position::new(1, 34),
3978 ),
3979 "4".to_string(),
3980 )],
3981 ),
3982 (
3983 lsp::Url::from_file_path("/a/other.rs").unwrap(),
3984 vec![lsp::TextEdit::new(
3985 lsp::Range::new(
3986 lsp::Position::new(0, 0),
3987 lsp::Position::new(0, 27),
3988 ),
3989 "".to_string(),
3990 )],
3991 ),
3992 ]
3993 .into_iter()
3994 .collect(),
3995 ),
3996 ..Default::default()
3997 }),
3998 data: Some(json!({
3999 "codeActionParams": {
4000 "range": {
4001 "start": {"line": 1, "column": 31},
4002 "end": {"line": 1, "column": 31},
4003 }
4004 }
4005 })),
4006 ..Default::default()
4007 },
4008 )]))
4009 })
4010 .next()
4011 .await;
4012
4013 // Toggle code actions and wait for them to display.
4014 editor_b.update(cx_b, |editor, cx| {
4015 editor.toggle_code_actions(
4016 &ToggleCodeActions {
4017 deployed_from_indicator: false,
4018 },
4019 cx,
4020 );
4021 });
4022 editor_b
4023 .condition(&cx_b, |editor, _| editor.context_menu_visible())
4024 .await;
4025
4026 fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
4027
4028 // Confirming the code action will trigger a resolve request.
4029 let confirm_action = workspace_b
4030 .update(cx_b, |workspace, cx| {
4031 Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
4032 })
4033 .unwrap();
4034 fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
4035 |_, _| async move {
4036 Ok(lsp::CodeAction {
4037 title: "Inline into all callers".to_string(),
4038 edit: Some(lsp::WorkspaceEdit {
4039 changes: Some(
4040 [
4041 (
4042 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4043 vec![lsp::TextEdit::new(
4044 lsp::Range::new(
4045 lsp::Position::new(1, 22),
4046 lsp::Position::new(1, 34),
4047 ),
4048 "4".to_string(),
4049 )],
4050 ),
4051 (
4052 lsp::Url::from_file_path("/a/other.rs").unwrap(),
4053 vec![lsp::TextEdit::new(
4054 lsp::Range::new(
4055 lsp::Position::new(0, 0),
4056 lsp::Position::new(0, 27),
4057 ),
4058 "".to_string(),
4059 )],
4060 ),
4061 ]
4062 .into_iter()
4063 .collect(),
4064 ),
4065 ..Default::default()
4066 }),
4067 ..Default::default()
4068 })
4069 },
4070 );
4071
4072 // After the action is confirmed, an editor containing both modified files is opened.
4073 confirm_action.await.unwrap();
4074 let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4075 workspace
4076 .active_item(cx)
4077 .unwrap()
4078 .downcast::<Editor>()
4079 .unwrap()
4080 });
4081 code_action_editor.update(cx_b, |editor, cx| {
4082 assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4083 editor.undo(&Undo, cx);
4084 assert_eq!(
4085 editor.text(cx),
4086 "mod other;\nfn main() { let foo = other::foo(); }\npub fn foo() -> usize { 4 }"
4087 );
4088 editor.redo(&Redo, cx);
4089 assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4090 });
4091 }
4092
4093 #[gpui::test(iterations = 10)]
4094 async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4095 cx_a.foreground().forbid_parking();
4096 let lang_registry = Arc::new(LanguageRegistry::test());
4097 let fs = FakeFs::new(cx_a.background());
4098 cx_b.update(|cx| editor::init(cx));
4099
4100 // Set up a fake language server.
4101 let mut language = Language::new(
4102 LanguageConfig {
4103 name: "Rust".into(),
4104 path_suffixes: vec!["rs".to_string()],
4105 ..Default::default()
4106 },
4107 Some(tree_sitter_rust::language()),
4108 );
4109 let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
4110 capabilities: lsp::ServerCapabilities {
4111 rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
4112 prepare_provider: Some(true),
4113 work_done_progress_options: Default::default(),
4114 })),
4115 ..Default::default()
4116 },
4117 ..Default::default()
4118 });
4119 lang_registry.add(Arc::new(language));
4120
4121 // Connect to a server as 2 clients.
4122 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4123 let client_a = server.create_client(cx_a, "user_a").await;
4124 let client_b = server.create_client(cx_b, "user_b").await;
4125
4126 // Share a project as client A
4127 fs.insert_tree(
4128 "/dir",
4129 json!({
4130 ".zed.toml": r#"collaborators = ["user_b"]"#,
4131 "one.rs": "const ONE: usize = 1;",
4132 "two.rs": "const TWO: usize = one::ONE + one::ONE;"
4133 }),
4134 )
4135 .await;
4136 let project_a = cx_a.update(|cx| {
4137 Project::local(
4138 client_a.clone(),
4139 client_a.user_store.clone(),
4140 lang_registry.clone(),
4141 fs.clone(),
4142 cx,
4143 )
4144 });
4145 let (worktree_a, _) = project_a
4146 .update(cx_a, |p, cx| {
4147 p.find_or_create_local_worktree("/dir", true, cx)
4148 })
4149 .await
4150 .unwrap();
4151 worktree_a
4152 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4153 .await;
4154 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4155 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4156 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4157
4158 // Join the worktree as client B.
4159 let project_b = Project::remote(
4160 project_id,
4161 client_b.clone(),
4162 client_b.user_store.clone(),
4163 lang_registry.clone(),
4164 fs.clone(),
4165 &mut cx_b.to_async(),
4166 )
4167 .await
4168 .unwrap();
4169 let mut params = cx_b.update(WorkspaceParams::test);
4170 params.languages = lang_registry.clone();
4171 params.client = client_b.client.clone();
4172 params.user_store = client_b.user_store.clone();
4173 params.project = project_b;
4174
4175 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
4176 let editor_b = workspace_b
4177 .update(cx_b, |workspace, cx| {
4178 workspace.open_path((worktree_id, "one.rs"), true, cx)
4179 })
4180 .await
4181 .unwrap()
4182 .downcast::<Editor>()
4183 .unwrap();
4184 let fake_language_server = fake_language_servers.next().await.unwrap();
4185
4186 // Move cursor to a location that can be renamed.
4187 let prepare_rename = editor_b.update(cx_b, |editor, cx| {
4188 editor.select_ranges([7..7], None, cx);
4189 editor.rename(&Rename, cx).unwrap()
4190 });
4191
4192 fake_language_server
4193 .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
4194 assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
4195 assert_eq!(params.position, lsp::Position::new(0, 7));
4196 Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4197 lsp::Position::new(0, 6),
4198 lsp::Position::new(0, 9),
4199 ))))
4200 })
4201 .next()
4202 .await
4203 .unwrap();
4204 prepare_rename.await.unwrap();
4205 editor_b.update(cx_b, |editor, cx| {
4206 let rename = editor.pending_rename().unwrap();
4207 let buffer = editor.buffer().read(cx).snapshot(cx);
4208 assert_eq!(
4209 rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
4210 6..9
4211 );
4212 rename.editor.update(cx, |rename_editor, cx| {
4213 rename_editor.buffer().update(cx, |rename_buffer, cx| {
4214 rename_buffer.edit([(0..3, "THREE")], cx);
4215 });
4216 });
4217 });
4218
4219 let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
4220 Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
4221 });
4222 fake_language_server
4223 .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
4224 assert_eq!(
4225 params.text_document_position.text_document.uri.as_str(),
4226 "file:///dir/one.rs"
4227 );
4228 assert_eq!(
4229 params.text_document_position.position,
4230 lsp::Position::new(0, 6)
4231 );
4232 assert_eq!(params.new_name, "THREE");
4233 Ok(Some(lsp::WorkspaceEdit {
4234 changes: Some(
4235 [
4236 (
4237 lsp::Url::from_file_path("/dir/one.rs").unwrap(),
4238 vec![lsp::TextEdit::new(
4239 lsp::Range::new(
4240 lsp::Position::new(0, 6),
4241 lsp::Position::new(0, 9),
4242 ),
4243 "THREE".to_string(),
4244 )],
4245 ),
4246 (
4247 lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4248 vec![
4249 lsp::TextEdit::new(
4250 lsp::Range::new(
4251 lsp::Position::new(0, 24),
4252 lsp::Position::new(0, 27),
4253 ),
4254 "THREE".to_string(),
4255 ),
4256 lsp::TextEdit::new(
4257 lsp::Range::new(
4258 lsp::Position::new(0, 35),
4259 lsp::Position::new(0, 38),
4260 ),
4261 "THREE".to_string(),
4262 ),
4263 ],
4264 ),
4265 ]
4266 .into_iter()
4267 .collect(),
4268 ),
4269 ..Default::default()
4270 }))
4271 })
4272 .next()
4273 .await
4274 .unwrap();
4275 confirm_rename.await.unwrap();
4276
4277 let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4278 workspace
4279 .active_item(cx)
4280 .unwrap()
4281 .downcast::<Editor>()
4282 .unwrap()
4283 });
4284 rename_editor.update(cx_b, |editor, cx| {
4285 assert_eq!(
4286 editor.text(cx),
4287 "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4288 );
4289 editor.undo(&Undo, cx);
4290 assert_eq!(
4291 editor.text(cx),
4292 "const ONE: usize = 1;\nconst TWO: usize = one::ONE + one::ONE;"
4293 );
4294 editor.redo(&Redo, cx);
4295 assert_eq!(
4296 editor.text(cx),
4297 "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4298 );
4299 });
4300
4301 // Ensure temporary rename edits cannot be undone/redone.
4302 editor_b.update(cx_b, |editor, cx| {
4303 editor.undo(&Undo, cx);
4304 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4305 editor.undo(&Undo, cx);
4306 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4307 editor.redo(&Redo, cx);
4308 assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4309 })
4310 }
4311
4312 #[gpui::test(iterations = 10)]
4313 async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4314 cx_a.foreground().forbid_parking();
4315
4316 // Connect to a server as 2 clients.
4317 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4318 let client_a = server.create_client(cx_a, "user_a").await;
4319 let client_b = server.create_client(cx_b, "user_b").await;
4320
4321 // Create an org that includes these 2 users.
4322 let db = &server.app_state.db;
4323 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4324 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4325 .await
4326 .unwrap();
4327 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4328 .await
4329 .unwrap();
4330
4331 // Create a channel that includes all the users.
4332 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4333 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4334 .await
4335 .unwrap();
4336 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4337 .await
4338 .unwrap();
4339 db.create_channel_message(
4340 channel_id,
4341 client_b.current_user_id(&cx_b),
4342 "hello A, it's B.",
4343 OffsetDateTime::now_utc(),
4344 1,
4345 )
4346 .await
4347 .unwrap();
4348
4349 let channels_a = cx_a
4350 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4351 channels_a
4352 .condition(cx_a, |list, _| list.available_channels().is_some())
4353 .await;
4354 channels_a.read_with(cx_a, |list, _| {
4355 assert_eq!(
4356 list.available_channels().unwrap(),
4357 &[ChannelDetails {
4358 id: channel_id.to_proto(),
4359 name: "test-channel".to_string()
4360 }]
4361 )
4362 });
4363 let channel_a = channels_a.update(cx_a, |this, cx| {
4364 this.get_channel(channel_id.to_proto(), cx).unwrap()
4365 });
4366 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4367 channel_a
4368 .condition(&cx_a, |channel, _| {
4369 channel_messages(channel)
4370 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4371 })
4372 .await;
4373
4374 let channels_b = cx_b
4375 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4376 channels_b
4377 .condition(cx_b, |list, _| list.available_channels().is_some())
4378 .await;
4379 channels_b.read_with(cx_b, |list, _| {
4380 assert_eq!(
4381 list.available_channels().unwrap(),
4382 &[ChannelDetails {
4383 id: channel_id.to_proto(),
4384 name: "test-channel".to_string()
4385 }]
4386 )
4387 });
4388
4389 let channel_b = channels_b.update(cx_b, |this, cx| {
4390 this.get_channel(channel_id.to_proto(), cx).unwrap()
4391 });
4392 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4393 channel_b
4394 .condition(&cx_b, |channel, _| {
4395 channel_messages(channel)
4396 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4397 })
4398 .await;
4399
4400 channel_a
4401 .update(cx_a, |channel, cx| {
4402 channel
4403 .send_message("oh, hi B.".to_string(), cx)
4404 .unwrap()
4405 .detach();
4406 let task = channel.send_message("sup".to_string(), cx).unwrap();
4407 assert_eq!(
4408 channel_messages(channel),
4409 &[
4410 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4411 ("user_a".to_string(), "oh, hi B.".to_string(), true),
4412 ("user_a".to_string(), "sup".to_string(), true)
4413 ]
4414 );
4415 task
4416 })
4417 .await
4418 .unwrap();
4419
4420 channel_b
4421 .condition(&cx_b, |channel, _| {
4422 channel_messages(channel)
4423 == [
4424 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4425 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4426 ("user_a".to_string(), "sup".to_string(), false),
4427 ]
4428 })
4429 .await;
4430
4431 assert_eq!(
4432 server
4433 .state()
4434 .await
4435 .channel(channel_id)
4436 .unwrap()
4437 .connection_ids
4438 .len(),
4439 2
4440 );
4441 cx_b.update(|_| drop(channel_b));
4442 server
4443 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4444 .await;
4445
4446 cx_a.update(|_| drop(channel_a));
4447 server
4448 .condition(|state| state.channel(channel_id).is_none())
4449 .await;
4450 }
4451
4452 #[gpui::test(iterations = 10)]
4453 async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4454 cx_a.foreground().forbid_parking();
4455
4456 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4457 let client_a = server.create_client(cx_a, "user_a").await;
4458
4459 let db = &server.app_state.db;
4460 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4461 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4462 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4463 .await
4464 .unwrap();
4465 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4466 .await
4467 .unwrap();
4468
4469 let channels_a = cx_a
4470 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4471 channels_a
4472 .condition(cx_a, |list, _| list.available_channels().is_some())
4473 .await;
4474 let channel_a = channels_a.update(cx_a, |this, cx| {
4475 this.get_channel(channel_id.to_proto(), cx).unwrap()
4476 });
4477
4478 // Messages aren't allowed to be too long.
4479 channel_a
4480 .update(cx_a, |channel, cx| {
4481 let long_body = "this is long.\n".repeat(1024);
4482 channel.send_message(long_body, cx).unwrap()
4483 })
4484 .await
4485 .unwrap_err();
4486
4487 // Messages aren't allowed to be blank.
4488 channel_a.update(cx_a, |channel, cx| {
4489 channel.send_message(String::new(), cx).unwrap_err()
4490 });
4491
4492 // Leading and trailing whitespace are trimmed.
4493 channel_a
4494 .update(cx_a, |channel, cx| {
4495 channel
4496 .send_message("\n surrounded by whitespace \n".to_string(), cx)
4497 .unwrap()
4498 })
4499 .await
4500 .unwrap();
4501 assert_eq!(
4502 db.get_channel_messages(channel_id, 10, None)
4503 .await
4504 .unwrap()
4505 .iter()
4506 .map(|m| &m.body)
4507 .collect::<Vec<_>>(),
4508 &["surrounded by whitespace"]
4509 );
4510 }
4511
4512 #[gpui::test(iterations = 10)]
4513 async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4514 cx_a.foreground().forbid_parking();
4515
4516 // Connect to a server as 2 clients.
4517 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4518 let client_a = server.create_client(cx_a, "user_a").await;
4519 let client_b = server.create_client(cx_b, "user_b").await;
4520 let mut status_b = client_b.status();
4521
4522 // Create an org that includes these 2 users.
4523 let db = &server.app_state.db;
4524 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4525 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4526 .await
4527 .unwrap();
4528 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4529 .await
4530 .unwrap();
4531
4532 // Create a channel that includes all the users.
4533 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4534 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4535 .await
4536 .unwrap();
4537 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4538 .await
4539 .unwrap();
4540 db.create_channel_message(
4541 channel_id,
4542 client_b.current_user_id(&cx_b),
4543 "hello A, it's B.",
4544 OffsetDateTime::now_utc(),
4545 2,
4546 )
4547 .await
4548 .unwrap();
4549
4550 let channels_a = cx_a
4551 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4552 channels_a
4553 .condition(cx_a, |list, _| list.available_channels().is_some())
4554 .await;
4555
4556 channels_a.read_with(cx_a, |list, _| {
4557 assert_eq!(
4558 list.available_channels().unwrap(),
4559 &[ChannelDetails {
4560 id: channel_id.to_proto(),
4561 name: "test-channel".to_string()
4562 }]
4563 )
4564 });
4565 let channel_a = channels_a.update(cx_a, |this, cx| {
4566 this.get_channel(channel_id.to_proto(), cx).unwrap()
4567 });
4568 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4569 channel_a
4570 .condition(&cx_a, |channel, _| {
4571 channel_messages(channel)
4572 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4573 })
4574 .await;
4575
4576 let channels_b = cx_b
4577 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4578 channels_b
4579 .condition(cx_b, |list, _| list.available_channels().is_some())
4580 .await;
4581 channels_b.read_with(cx_b, |list, _| {
4582 assert_eq!(
4583 list.available_channels().unwrap(),
4584 &[ChannelDetails {
4585 id: channel_id.to_proto(),
4586 name: "test-channel".to_string()
4587 }]
4588 )
4589 });
4590
4591 let channel_b = channels_b.update(cx_b, |this, cx| {
4592 this.get_channel(channel_id.to_proto(), cx).unwrap()
4593 });
4594 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4595 channel_b
4596 .condition(&cx_b, |channel, _| {
4597 channel_messages(channel)
4598 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4599 })
4600 .await;
4601
4602 // Disconnect client B, ensuring we can still access its cached channel data.
4603 server.forbid_connections();
4604 server.disconnect_client(client_b.current_user_id(&cx_b));
4605 cx_b.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
4606 while !matches!(
4607 status_b.next().await,
4608 Some(client::Status::ReconnectionError { .. })
4609 ) {}
4610
4611 channels_b.read_with(cx_b, |channels, _| {
4612 assert_eq!(
4613 channels.available_channels().unwrap(),
4614 [ChannelDetails {
4615 id: channel_id.to_proto(),
4616 name: "test-channel".to_string()
4617 }]
4618 )
4619 });
4620 channel_b.read_with(cx_b, |channel, _| {
4621 assert_eq!(
4622 channel_messages(channel),
4623 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4624 )
4625 });
4626
4627 // Send a message from client B while it is disconnected.
4628 channel_b
4629 .update(cx_b, |channel, cx| {
4630 let task = channel
4631 .send_message("can you see this?".to_string(), cx)
4632 .unwrap();
4633 assert_eq!(
4634 channel_messages(channel),
4635 &[
4636 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4637 ("user_b".to_string(), "can you see this?".to_string(), true)
4638 ]
4639 );
4640 task
4641 })
4642 .await
4643 .unwrap_err();
4644
4645 // Send a message from client A while B is disconnected.
4646 channel_a
4647 .update(cx_a, |channel, cx| {
4648 channel
4649 .send_message("oh, hi B.".to_string(), cx)
4650 .unwrap()
4651 .detach();
4652 let task = channel.send_message("sup".to_string(), cx).unwrap();
4653 assert_eq!(
4654 channel_messages(channel),
4655 &[
4656 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4657 ("user_a".to_string(), "oh, hi B.".to_string(), true),
4658 ("user_a".to_string(), "sup".to_string(), true)
4659 ]
4660 );
4661 task
4662 })
4663 .await
4664 .unwrap();
4665
4666 // Give client B a chance to reconnect.
4667 server.allow_connections();
4668 cx_b.foreground().advance_clock(Duration::from_secs(10));
4669
4670 // Verify that B sees the new messages upon reconnection, as well as the message client B
4671 // sent while offline.
4672 channel_b
4673 .condition(&cx_b, |channel, _| {
4674 channel_messages(channel)
4675 == [
4676 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4677 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4678 ("user_a".to_string(), "sup".to_string(), false),
4679 ("user_b".to_string(), "can you see this?".to_string(), false),
4680 ]
4681 })
4682 .await;
4683
4684 // Ensure client A and B can communicate normally after reconnection.
4685 channel_a
4686 .update(cx_a, |channel, cx| {
4687 channel.send_message("you online?".to_string(), cx).unwrap()
4688 })
4689 .await
4690 .unwrap();
4691 channel_b
4692 .condition(&cx_b, |channel, _| {
4693 channel_messages(channel)
4694 == [
4695 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4696 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4697 ("user_a".to_string(), "sup".to_string(), false),
4698 ("user_b".to_string(), "can you see this?".to_string(), false),
4699 ("user_a".to_string(), "you online?".to_string(), false),
4700 ]
4701 })
4702 .await;
4703
4704 channel_b
4705 .update(cx_b, |channel, cx| {
4706 channel.send_message("yep".to_string(), cx).unwrap()
4707 })
4708 .await
4709 .unwrap();
4710 channel_a
4711 .condition(&cx_a, |channel, _| {
4712 channel_messages(channel)
4713 == [
4714 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4715 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4716 ("user_a".to_string(), "sup".to_string(), false),
4717 ("user_b".to_string(), "can you see this?".to_string(), false),
4718 ("user_a".to_string(), "you online?".to_string(), false),
4719 ("user_b".to_string(), "yep".to_string(), false),
4720 ]
4721 })
4722 .await;
4723 }
4724
4725 #[gpui::test(iterations = 10)]
4726 async fn test_contacts(
4727 cx_a: &mut TestAppContext,
4728 cx_b: &mut TestAppContext,
4729 cx_c: &mut TestAppContext,
4730 ) {
4731 cx_a.foreground().forbid_parking();
4732 let lang_registry = Arc::new(LanguageRegistry::test());
4733 let fs = FakeFs::new(cx_a.background());
4734
4735 // Connect to a server as 3 clients.
4736 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4737 let client_a = server.create_client(cx_a, "user_a").await;
4738 let client_b = server.create_client(cx_b, "user_b").await;
4739 let client_c = server.create_client(cx_c, "user_c").await;
4740
4741 // Share a worktree as client A.
4742 fs.insert_tree(
4743 "/a",
4744 json!({
4745 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4746 }),
4747 )
4748 .await;
4749
4750 let project_a = cx_a.update(|cx| {
4751 Project::local(
4752 client_a.clone(),
4753 client_a.user_store.clone(),
4754 lang_registry.clone(),
4755 fs.clone(),
4756 cx,
4757 )
4758 });
4759 let (worktree_a, _) = project_a
4760 .update(cx_a, |p, cx| {
4761 p.find_or_create_local_worktree("/a", true, cx)
4762 })
4763 .await
4764 .unwrap();
4765 worktree_a
4766 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4767 .await;
4768
4769 client_a
4770 .user_store
4771 .condition(&cx_a, |user_store, _| {
4772 contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4773 })
4774 .await;
4775 client_b
4776 .user_store
4777 .condition(&cx_b, |user_store, _| {
4778 contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4779 })
4780 .await;
4781 client_c
4782 .user_store
4783 .condition(&cx_c, |user_store, _| {
4784 contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4785 })
4786 .await;
4787
4788 let project_id = project_a
4789 .update(cx_a, |project, _| project.next_remote_id())
4790 .await;
4791 project_a
4792 .update(cx_a, |project, cx| project.share(cx))
4793 .await
4794 .unwrap();
4795 client_a
4796 .user_store
4797 .condition(&cx_a, |user_store, _| {
4798 contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4799 })
4800 .await;
4801 client_b
4802 .user_store
4803 .condition(&cx_b, |user_store, _| {
4804 contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4805 })
4806 .await;
4807 client_c
4808 .user_store
4809 .condition(&cx_c, |user_store, _| {
4810 contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4811 })
4812 .await;
4813
4814 let _project_b = Project::remote(
4815 project_id,
4816 client_b.clone(),
4817 client_b.user_store.clone(),
4818 lang_registry.clone(),
4819 fs.clone(),
4820 &mut cx_b.to_async(),
4821 )
4822 .await
4823 .unwrap();
4824
4825 client_a
4826 .user_store
4827 .condition(&cx_a, |user_store, _| {
4828 contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4829 })
4830 .await;
4831 client_b
4832 .user_store
4833 .condition(&cx_b, |user_store, _| {
4834 contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4835 })
4836 .await;
4837 client_c
4838 .user_store
4839 .condition(&cx_c, |user_store, _| {
4840 contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4841 })
4842 .await;
4843
4844 project_a
4845 .condition(&cx_a, |project, _| {
4846 project.collaborators().contains_key(&client_b.peer_id)
4847 })
4848 .await;
4849
4850 cx_a.update(move |_| drop(project_a));
4851 client_a
4852 .user_store
4853 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4854 .await;
4855 client_b
4856 .user_store
4857 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4858 .await;
4859 client_c
4860 .user_store
4861 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4862 .await;
4863
4864 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, bool, Vec<&str>)>)> {
4865 user_store
4866 .contacts()
4867 .iter()
4868 .map(|contact| {
4869 let worktrees = contact
4870 .projects
4871 .iter()
4872 .map(|p| {
4873 (
4874 p.worktree_root_names[0].as_str(),
4875 p.is_shared,
4876 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4877 )
4878 })
4879 .collect();
4880 (contact.user.github_login.as_str(), worktrees)
4881 })
4882 .collect()
4883 }
4884 }
4885
4886 #[gpui::test(iterations = 10)]
4887 async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4888 cx_a.foreground().forbid_parking();
4889 let fs = FakeFs::new(cx_a.background());
4890
4891 // 2 clients connect to a server.
4892 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4893 let mut client_a = server.create_client(cx_a, "user_a").await;
4894 let mut client_b = server.create_client(cx_b, "user_b").await;
4895 cx_a.update(editor::init);
4896 cx_b.update(editor::init);
4897
4898 // Client A shares a project.
4899 fs.insert_tree(
4900 "/a",
4901 json!({
4902 ".zed.toml": r#"collaborators = ["user_b"]"#,
4903 "1.txt": "one",
4904 "2.txt": "two",
4905 "3.txt": "three",
4906 }),
4907 )
4908 .await;
4909 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
4910 project_a
4911 .update(cx_a, |project, cx| project.share(cx))
4912 .await
4913 .unwrap();
4914
4915 // Client B joins the project.
4916 let project_b = client_b
4917 .build_remote_project(
4918 project_a
4919 .read_with(cx_a, |project, _| project.remote_id())
4920 .unwrap(),
4921 cx_b,
4922 )
4923 .await;
4924
4925 // Client A opens some editors.
4926 let workspace_a = client_a.build_workspace(&project_a, cx_a);
4927 let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
4928 let editor_a1 = workspace_a
4929 .update(cx_a, |workspace, cx| {
4930 workspace.open_path((worktree_id, "1.txt"), true, cx)
4931 })
4932 .await
4933 .unwrap()
4934 .downcast::<Editor>()
4935 .unwrap();
4936 let editor_a2 = workspace_a
4937 .update(cx_a, |workspace, cx| {
4938 workspace.open_path((worktree_id, "2.txt"), true, cx)
4939 })
4940 .await
4941 .unwrap()
4942 .downcast::<Editor>()
4943 .unwrap();
4944
4945 // Client B opens an editor.
4946 let workspace_b = client_b.build_workspace(&project_b, cx_b);
4947 let editor_b1 = workspace_b
4948 .update(cx_b, |workspace, cx| {
4949 workspace.open_path((worktree_id, "1.txt"), true, cx)
4950 })
4951 .await
4952 .unwrap()
4953 .downcast::<Editor>()
4954 .unwrap();
4955
4956 let client_a_id = project_b.read_with(cx_b, |project, _| {
4957 project.collaborators().values().next().unwrap().peer_id
4958 });
4959 let client_b_id = project_a.read_with(cx_a, |project, _| {
4960 project.collaborators().values().next().unwrap().peer_id
4961 });
4962
4963 // When client B starts following client A, all visible view states are replicated to client B.
4964 editor_a1.update(cx_a, |editor, cx| editor.select_ranges([0..1], None, cx));
4965 editor_a2.update(cx_a, |editor, cx| editor.select_ranges([2..3], None, cx));
4966 workspace_b
4967 .update(cx_b, |workspace, cx| {
4968 workspace
4969 .toggle_follow(&ToggleFollow(client_a_id), cx)
4970 .unwrap()
4971 })
4972 .await
4973 .unwrap();
4974 let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
4975 workspace
4976 .active_item(cx)
4977 .unwrap()
4978 .downcast::<Editor>()
4979 .unwrap()
4980 });
4981 assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
4982 assert_eq!(
4983 editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
4984 Some((worktree_id, "2.txt").into())
4985 );
4986 assert_eq!(
4987 editor_b2.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
4988 vec![2..3]
4989 );
4990 assert_eq!(
4991 editor_b1.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
4992 vec![0..1]
4993 );
4994
4995 // When client A activates a different editor, client B does so as well.
4996 workspace_a.update(cx_a, |workspace, cx| {
4997 workspace.activate_item(&editor_a1, cx)
4998 });
4999 workspace_b
5000 .condition(cx_b, |workspace, cx| {
5001 workspace.active_item(cx).unwrap().id() == editor_b1.id()
5002 })
5003 .await;
5004
5005 // When client A navigates back and forth, client B does so as well.
5006 workspace_a
5007 .update(cx_a, |workspace, cx| {
5008 workspace::Pane::go_back(workspace, None, cx)
5009 })
5010 .await;
5011 workspace_b
5012 .condition(cx_b, |workspace, cx| {
5013 workspace.active_item(cx).unwrap().id() == editor_b2.id()
5014 })
5015 .await;
5016
5017 workspace_a
5018 .update(cx_a, |workspace, cx| {
5019 workspace::Pane::go_forward(workspace, None, cx)
5020 })
5021 .await;
5022 workspace_b
5023 .condition(cx_b, |workspace, cx| {
5024 workspace.active_item(cx).unwrap().id() == editor_b1.id()
5025 })
5026 .await;
5027
5028 // Changes to client A's editor are reflected on client B.
5029 editor_a1.update(cx_a, |editor, cx| {
5030 editor.select_ranges([1..1, 2..2], None, cx);
5031 });
5032 editor_b1
5033 .condition(cx_b, |editor, cx| {
5034 editor.selected_ranges(cx) == vec![1..1, 2..2]
5035 })
5036 .await;
5037
5038 editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
5039 editor_b1
5040 .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
5041 .await;
5042
5043 editor_a1.update(cx_a, |editor, cx| {
5044 editor.select_ranges([3..3], None, cx);
5045 editor.set_scroll_position(vec2f(0., 100.), cx);
5046 });
5047 editor_b1
5048 .condition(cx_b, |editor, cx| editor.selected_ranges(cx) == vec![3..3])
5049 .await;
5050
5051 // After unfollowing, client B stops receiving updates from client A.
5052 workspace_b.update(cx_b, |workspace, cx| {
5053 workspace.unfollow(&workspace.active_pane().clone(), cx)
5054 });
5055 workspace_a.update(cx_a, |workspace, cx| {
5056 workspace.activate_item(&editor_a2, cx)
5057 });
5058 cx_a.foreground().run_until_parked();
5059 assert_eq!(
5060 workspace_b.read_with(cx_b, |workspace, cx| workspace
5061 .active_item(cx)
5062 .unwrap()
5063 .id()),
5064 editor_b1.id()
5065 );
5066
5067 // Client A starts following client B.
5068 workspace_a
5069 .update(cx_a, |workspace, cx| {
5070 workspace
5071 .toggle_follow(&ToggleFollow(client_b_id), cx)
5072 .unwrap()
5073 })
5074 .await
5075 .unwrap();
5076 assert_eq!(
5077 workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5078 Some(client_b_id)
5079 );
5080 assert_eq!(
5081 workspace_a.read_with(cx_a, |workspace, cx| workspace
5082 .active_item(cx)
5083 .unwrap()
5084 .id()),
5085 editor_a1.id()
5086 );
5087
5088 // Following interrupts when client B disconnects.
5089 client_b.disconnect(&cx_b.to_async()).unwrap();
5090 cx_a.foreground().run_until_parked();
5091 assert_eq!(
5092 workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5093 None
5094 );
5095 }
5096
5097 #[gpui::test(iterations = 10)]
5098 async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5099 cx_a.foreground().forbid_parking();
5100 let fs = FakeFs::new(cx_a.background());
5101
5102 // 2 clients connect to a server.
5103 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5104 let mut client_a = server.create_client(cx_a, "user_a").await;
5105 let mut client_b = server.create_client(cx_b, "user_b").await;
5106 cx_a.update(editor::init);
5107 cx_b.update(editor::init);
5108
5109 // Client A shares a project.
5110 fs.insert_tree(
5111 "/a",
5112 json!({
5113 ".zed.toml": r#"collaborators = ["user_b"]"#,
5114 "1.txt": "one",
5115 "2.txt": "two",
5116 "3.txt": "three",
5117 "4.txt": "four",
5118 }),
5119 )
5120 .await;
5121 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5122 project_a
5123 .update(cx_a, |project, cx| project.share(cx))
5124 .await
5125 .unwrap();
5126
5127 // Client B joins the project.
5128 let project_b = client_b
5129 .build_remote_project(
5130 project_a
5131 .read_with(cx_a, |project, _| project.remote_id())
5132 .unwrap(),
5133 cx_b,
5134 )
5135 .await;
5136
5137 // Client A opens some editors.
5138 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5139 let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5140 let _editor_a1 = workspace_a
5141 .update(cx_a, |workspace, cx| {
5142 workspace.open_path((worktree_id, "1.txt"), true, cx)
5143 })
5144 .await
5145 .unwrap()
5146 .downcast::<Editor>()
5147 .unwrap();
5148
5149 // Client B opens an editor.
5150 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5151 let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5152 let _editor_b1 = workspace_b
5153 .update(cx_b, |workspace, cx| {
5154 workspace.open_path((worktree_id, "2.txt"), true, cx)
5155 })
5156 .await
5157 .unwrap()
5158 .downcast::<Editor>()
5159 .unwrap();
5160
5161 // Clients A and B follow each other in split panes
5162 workspace_a
5163 .update(cx_a, |workspace, cx| {
5164 workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5165 assert_ne!(*workspace.active_pane(), pane_a1);
5166 let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
5167 workspace
5168 .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5169 .unwrap()
5170 })
5171 .await
5172 .unwrap();
5173 workspace_b
5174 .update(cx_b, |workspace, cx| {
5175 workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5176 assert_ne!(*workspace.active_pane(), pane_b1);
5177 let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
5178 workspace
5179 .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5180 .unwrap()
5181 })
5182 .await
5183 .unwrap();
5184
5185 workspace_a
5186 .update(cx_a, |workspace, cx| {
5187 workspace.activate_next_pane(cx);
5188 assert_eq!(*workspace.active_pane(), pane_a1);
5189 workspace.open_path((worktree_id, "3.txt"), true, cx)
5190 })
5191 .await
5192 .unwrap();
5193 workspace_b
5194 .update(cx_b, |workspace, cx| {
5195 workspace.activate_next_pane(cx);
5196 assert_eq!(*workspace.active_pane(), pane_b1);
5197 workspace.open_path((worktree_id, "4.txt"), true, cx)
5198 })
5199 .await
5200 .unwrap();
5201 cx_a.foreground().run_until_parked();
5202
5203 // Ensure leader updates don't change the active pane of followers
5204 workspace_a.read_with(cx_a, |workspace, _| {
5205 assert_eq!(*workspace.active_pane(), pane_a1);
5206 });
5207 workspace_b.read_with(cx_b, |workspace, _| {
5208 assert_eq!(*workspace.active_pane(), pane_b1);
5209 });
5210
5211 // Ensure peers following each other doesn't cause an infinite loop.
5212 assert_eq!(
5213 workspace_a.read_with(cx_a, |workspace, cx| workspace
5214 .active_item(cx)
5215 .unwrap()
5216 .project_path(cx)),
5217 Some((worktree_id, "3.txt").into())
5218 );
5219 workspace_a.update(cx_a, |workspace, cx| {
5220 assert_eq!(
5221 workspace.active_item(cx).unwrap().project_path(cx),
5222 Some((worktree_id, "3.txt").into())
5223 );
5224 workspace.activate_next_pane(cx);
5225 assert_eq!(
5226 workspace.active_item(cx).unwrap().project_path(cx),
5227 Some((worktree_id, "4.txt").into())
5228 );
5229 });
5230 workspace_b.update(cx_b, |workspace, cx| {
5231 assert_eq!(
5232 workspace.active_item(cx).unwrap().project_path(cx),
5233 Some((worktree_id, "4.txt").into())
5234 );
5235 workspace.activate_next_pane(cx);
5236 assert_eq!(
5237 workspace.active_item(cx).unwrap().project_path(cx),
5238 Some((worktree_id, "3.txt").into())
5239 );
5240 });
5241 }
5242
5243 #[gpui::test(iterations = 10)]
5244 async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5245 cx_a.foreground().forbid_parking();
5246 let fs = FakeFs::new(cx_a.background());
5247
5248 // 2 clients connect to a server.
5249 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5250 let mut client_a = server.create_client(cx_a, "user_a").await;
5251 let mut client_b = server.create_client(cx_b, "user_b").await;
5252 cx_a.update(editor::init);
5253 cx_b.update(editor::init);
5254
5255 // Client A shares a project.
5256 fs.insert_tree(
5257 "/a",
5258 json!({
5259 ".zed.toml": r#"collaborators = ["user_b"]"#,
5260 "1.txt": "one",
5261 "2.txt": "two",
5262 "3.txt": "three",
5263 }),
5264 )
5265 .await;
5266 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5267 project_a
5268 .update(cx_a, |project, cx| project.share(cx))
5269 .await
5270 .unwrap();
5271
5272 // Client B joins the project.
5273 let project_b = client_b
5274 .build_remote_project(
5275 project_a
5276 .read_with(cx_a, |project, _| project.remote_id())
5277 .unwrap(),
5278 cx_b,
5279 )
5280 .await;
5281
5282 // Client A opens some editors.
5283 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5284 let _editor_a1 = workspace_a
5285 .update(cx_a, |workspace, cx| {
5286 workspace.open_path((worktree_id, "1.txt"), true, cx)
5287 })
5288 .await
5289 .unwrap()
5290 .downcast::<Editor>()
5291 .unwrap();
5292
5293 // Client B starts following client A.
5294 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5295 let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5296 let leader_id = project_b.read_with(cx_b, |project, _| {
5297 project.collaborators().values().next().unwrap().peer_id
5298 });
5299 workspace_b
5300 .update(cx_b, |workspace, cx| {
5301 workspace
5302 .toggle_follow(&ToggleFollow(leader_id), cx)
5303 .unwrap()
5304 })
5305 .await
5306 .unwrap();
5307 assert_eq!(
5308 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5309 Some(leader_id)
5310 );
5311 let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5312 workspace
5313 .active_item(cx)
5314 .unwrap()
5315 .downcast::<Editor>()
5316 .unwrap()
5317 });
5318
5319 // When client B moves, it automatically stops following client A.
5320 editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
5321 assert_eq!(
5322 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5323 None
5324 );
5325
5326 workspace_b
5327 .update(cx_b, |workspace, cx| {
5328 workspace
5329 .toggle_follow(&ToggleFollow(leader_id), cx)
5330 .unwrap()
5331 })
5332 .await
5333 .unwrap();
5334 assert_eq!(
5335 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5336 Some(leader_id)
5337 );
5338
5339 // When client B edits, it automatically stops following client A.
5340 editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
5341 assert_eq!(
5342 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5343 None
5344 );
5345
5346 workspace_b
5347 .update(cx_b, |workspace, cx| {
5348 workspace
5349 .toggle_follow(&ToggleFollow(leader_id), cx)
5350 .unwrap()
5351 })
5352 .await
5353 .unwrap();
5354 assert_eq!(
5355 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5356 Some(leader_id)
5357 );
5358
5359 // When client B scrolls, it automatically stops following client A.
5360 editor_b2.update(cx_b, |editor, cx| {
5361 editor.set_scroll_position(vec2f(0., 3.), cx)
5362 });
5363 assert_eq!(
5364 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5365 None
5366 );
5367
5368 workspace_b
5369 .update(cx_b, |workspace, cx| {
5370 workspace
5371 .toggle_follow(&ToggleFollow(leader_id), cx)
5372 .unwrap()
5373 })
5374 .await
5375 .unwrap();
5376 assert_eq!(
5377 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5378 Some(leader_id)
5379 );
5380
5381 // When client B activates a different pane, it continues following client A in the original pane.
5382 workspace_b.update(cx_b, |workspace, cx| {
5383 workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
5384 });
5385 assert_eq!(
5386 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5387 Some(leader_id)
5388 );
5389
5390 workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
5391 assert_eq!(
5392 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5393 Some(leader_id)
5394 );
5395
5396 // When client B activates a different item in the original pane, it automatically stops following client A.
5397 workspace_b
5398 .update(cx_b, |workspace, cx| {
5399 workspace.open_path((worktree_id, "2.txt"), true, cx)
5400 })
5401 .await
5402 .unwrap();
5403 assert_eq!(
5404 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5405 None
5406 );
5407 }
5408
5409 #[gpui::test(iterations = 100)]
5410 async fn test_random_collaboration(
5411 cx: &mut TestAppContext,
5412 deterministic: Arc<Deterministic>,
5413 rng: StdRng,
5414 ) {
5415 cx.foreground().forbid_parking();
5416 let max_peers = env::var("MAX_PEERS")
5417 .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
5418 .unwrap_or(5);
5419 assert!(max_peers <= 5);
5420
5421 let max_operations = env::var("OPERATIONS")
5422 .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
5423 .unwrap_or(10);
5424
5425 let rng = Arc::new(Mutex::new(rng));
5426
5427 let guest_lang_registry = Arc::new(LanguageRegistry::test());
5428 let host_language_registry = Arc::new(LanguageRegistry::test());
5429
5430 let fs = FakeFs::new(cx.background());
5431 fs.insert_tree(
5432 "/_collab",
5433 json!({
5434 ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4"]"#
5435 }),
5436 )
5437 .await;
5438
5439 let mut server = TestServer::start(cx.foreground(), cx.background()).await;
5440 let mut clients = Vec::new();
5441 let mut user_ids = Vec::new();
5442 let mut op_start_signals = Vec::new();
5443 let files = Arc::new(Mutex::new(Vec::new()));
5444
5445 let mut next_entity_id = 100000;
5446 let mut host_cx = TestAppContext::new(
5447 cx.foreground_platform(),
5448 cx.platform(),
5449 deterministic.build_foreground(next_entity_id),
5450 deterministic.build_background(),
5451 cx.font_cache(),
5452 cx.leak_detector(),
5453 next_entity_id,
5454 );
5455 let host = server.create_client(&mut host_cx, "host").await;
5456 let host_project = host_cx.update(|cx| {
5457 Project::local(
5458 host.client.clone(),
5459 host.user_store.clone(),
5460 host_language_registry.clone(),
5461 fs.clone(),
5462 cx,
5463 )
5464 });
5465 let host_project_id = host_project
5466 .update(&mut host_cx, |p, _| p.next_remote_id())
5467 .await;
5468
5469 let (collab_worktree, _) = host_project
5470 .update(&mut host_cx, |project, cx| {
5471 project.find_or_create_local_worktree("/_collab", true, cx)
5472 })
5473 .await
5474 .unwrap();
5475 collab_worktree
5476 .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
5477 .await;
5478 host_project
5479 .update(&mut host_cx, |project, cx| project.share(cx))
5480 .await
5481 .unwrap();
5482
5483 // Set up fake language servers.
5484 let mut language = Language::new(
5485 LanguageConfig {
5486 name: "Rust".into(),
5487 path_suffixes: vec!["rs".to_string()],
5488 ..Default::default()
5489 },
5490 None,
5491 );
5492 let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
5493 name: "the-fake-language-server",
5494 capabilities: lsp::LanguageServer::full_capabilities(),
5495 initializer: Some(Box::new({
5496 let rng = rng.clone();
5497 let files = files.clone();
5498 let project = host_project.downgrade();
5499 move |fake_server: &mut FakeLanguageServer| {
5500 fake_server.handle_request::<lsp::request::Completion, _, _>(
5501 |_, _| async move {
5502 Ok(Some(lsp::CompletionResponse::Array(vec![
5503 lsp::CompletionItem {
5504 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
5505 range: lsp::Range::new(
5506 lsp::Position::new(0, 0),
5507 lsp::Position::new(0, 0),
5508 ),
5509 new_text: "the-new-text".to_string(),
5510 })),
5511 ..Default::default()
5512 },
5513 ])))
5514 },
5515 );
5516
5517 fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
5518 |_, _| async move {
5519 Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
5520 lsp::CodeAction {
5521 title: "the-code-action".to_string(),
5522 ..Default::default()
5523 },
5524 )]))
5525 },
5526 );
5527
5528 fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
5529 |params, _| async move {
5530 Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
5531 params.position,
5532 params.position,
5533 ))))
5534 },
5535 );
5536
5537 fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
5538 let files = files.clone();
5539 let rng = rng.clone();
5540 move |_, _| {
5541 let files = files.clone();
5542 let rng = rng.clone();
5543 async move {
5544 let files = files.lock();
5545 let mut rng = rng.lock();
5546 let count = rng.gen_range::<usize, _>(1..3);
5547 let files = (0..count)
5548 .map(|_| files.choose(&mut *rng).unwrap())
5549 .collect::<Vec<_>>();
5550 log::info!("LSP: Returning definitions in files {:?}", &files);
5551 Ok(Some(lsp::GotoDefinitionResponse::Array(
5552 files
5553 .into_iter()
5554 .map(|file| lsp::Location {
5555 uri: lsp::Url::from_file_path(file).unwrap(),
5556 range: Default::default(),
5557 })
5558 .collect(),
5559 )))
5560 }
5561 }
5562 });
5563
5564 fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
5565 let rng = rng.clone();
5566 let project = project.clone();
5567 move |params, mut cx| {
5568 let highlights = if let Some(project) = project.upgrade(&cx) {
5569 project.update(&mut cx, |project, cx| {
5570 let path = params
5571 .text_document_position_params
5572 .text_document
5573 .uri
5574 .to_file_path()
5575 .unwrap();
5576 let (worktree, relative_path) =
5577 project.find_local_worktree(&path, cx)?;
5578 let project_path =
5579 ProjectPath::from((worktree.read(cx).id(), relative_path));
5580 let buffer =
5581 project.get_open_buffer(&project_path, cx)?.read(cx);
5582
5583 let mut highlights = Vec::new();
5584 let highlight_count = rng.lock().gen_range(1..=5);
5585 let mut prev_end = 0;
5586 for _ in 0..highlight_count {
5587 let range =
5588 buffer.random_byte_range(prev_end, &mut *rng.lock());
5589
5590 highlights.push(lsp::DocumentHighlight {
5591 range: range_to_lsp(range.to_point_utf16(buffer)),
5592 kind: Some(lsp::DocumentHighlightKind::READ),
5593 });
5594 prev_end = range.end;
5595 }
5596 Some(highlights)
5597 })
5598 } else {
5599 None
5600 };
5601 async move { Ok(highlights) }
5602 }
5603 });
5604 }
5605 })),
5606 ..Default::default()
5607 });
5608 host_language_registry.add(Arc::new(language));
5609
5610 let op_start_signal = futures::channel::mpsc::unbounded();
5611 user_ids.push(host.current_user_id(&host_cx));
5612 op_start_signals.push(op_start_signal.0);
5613 clients.push(host_cx.foreground().spawn(host.simulate_host(
5614 host_project,
5615 files,
5616 op_start_signal.1,
5617 rng.clone(),
5618 host_cx,
5619 )));
5620
5621 let disconnect_host_at = if rng.lock().gen_bool(0.2) {
5622 rng.lock().gen_range(0..max_operations)
5623 } else {
5624 max_operations
5625 };
5626 let mut available_guests = vec![
5627 "guest-1".to_string(),
5628 "guest-2".to_string(),
5629 "guest-3".to_string(),
5630 "guest-4".to_string(),
5631 ];
5632 let mut operations = 0;
5633 while operations < max_operations {
5634 if operations == disconnect_host_at {
5635 server.disconnect_client(user_ids[0]);
5636 cx.foreground().advance_clock(RECEIVE_TIMEOUT);
5637 drop(op_start_signals);
5638 let mut clients = futures::future::join_all(clients).await;
5639 cx.foreground().run_until_parked();
5640
5641 let (host, mut host_cx, host_err) = clients.remove(0);
5642 if let Some(host_err) = host_err {
5643 log::error!("host error - {}", host_err);
5644 }
5645 host.project
5646 .as_ref()
5647 .unwrap()
5648 .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
5649 for (guest, mut guest_cx, guest_err) in clients {
5650 if let Some(guest_err) = guest_err {
5651 log::error!("{} error - {}", guest.username, guest_err);
5652 }
5653 let contacts = server
5654 .store
5655 .read()
5656 .await
5657 .contacts_for_user(guest.current_user_id(&guest_cx));
5658 assert!(!contacts
5659 .iter()
5660 .flat_map(|contact| &contact.projects)
5661 .any(|project| project.id == host_project_id));
5662 guest
5663 .project
5664 .as_ref()
5665 .unwrap()
5666 .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
5667 guest_cx.update(|_| drop(guest));
5668 }
5669 host_cx.update(|_| drop(host));
5670
5671 return;
5672 }
5673
5674 let distribution = rng.lock().gen_range(0..100);
5675 match distribution {
5676 0..=19 if !available_guests.is_empty() => {
5677 let guest_ix = rng.lock().gen_range(0..available_guests.len());
5678 let guest_username = available_guests.remove(guest_ix);
5679 log::info!("Adding new connection for {}", guest_username);
5680 next_entity_id += 100000;
5681 let mut guest_cx = TestAppContext::new(
5682 cx.foreground_platform(),
5683 cx.platform(),
5684 deterministic.build_foreground(next_entity_id),
5685 deterministic.build_background(),
5686 cx.font_cache(),
5687 cx.leak_detector(),
5688 next_entity_id,
5689 );
5690 let guest = server.create_client(&mut guest_cx, &guest_username).await;
5691 let guest_project = Project::remote(
5692 host_project_id,
5693 guest.client.clone(),
5694 guest.user_store.clone(),
5695 guest_lang_registry.clone(),
5696 FakeFs::new(cx.background()),
5697 &mut guest_cx.to_async(),
5698 )
5699 .await
5700 .unwrap();
5701 let op_start_signal = futures::channel::mpsc::unbounded();
5702 user_ids.push(guest.current_user_id(&guest_cx));
5703 op_start_signals.push(op_start_signal.0);
5704 clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
5705 guest_username.clone(),
5706 guest_project,
5707 op_start_signal.1,
5708 rng.clone(),
5709 guest_cx,
5710 )));
5711
5712 log::info!("Added connection for {}", guest_username);
5713 operations += 1;
5714 }
5715 20..=29 if clients.len() > 1 => {
5716 log::info!("Removing guest");
5717 let guest_ix = rng.lock().gen_range(1..clients.len());
5718 let removed_guest_id = user_ids.remove(guest_ix);
5719 let guest = clients.remove(guest_ix);
5720 op_start_signals.remove(guest_ix);
5721 server.disconnect_client(removed_guest_id);
5722 cx.foreground().advance_clock(RECEIVE_TIMEOUT);
5723 let (guest, mut guest_cx, guest_err) = guest.await;
5724 if let Some(guest_err) = guest_err {
5725 log::error!("{} error - {}", guest.username, guest_err);
5726 }
5727 guest
5728 .project
5729 .as_ref()
5730 .unwrap()
5731 .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
5732 for user_id in &user_ids {
5733 for contact in server.store.read().await.contacts_for_user(*user_id) {
5734 assert_ne!(
5735 contact.user_id, removed_guest_id.0 as u64,
5736 "removed guest is still a contact of another peer"
5737 );
5738 for project in contact.projects {
5739 for project_guest_id in project.guests {
5740 assert_ne!(
5741 project_guest_id, removed_guest_id.0 as u64,
5742 "removed guest appears as still participating on a project"
5743 );
5744 }
5745 }
5746 }
5747 }
5748
5749 log::info!("{} removed", guest.username);
5750 available_guests.push(guest.username.clone());
5751 guest_cx.update(|_| drop(guest));
5752
5753 operations += 1;
5754 }
5755 _ => {
5756 while operations < max_operations && rng.lock().gen_bool(0.7) {
5757 op_start_signals
5758 .choose(&mut *rng.lock())
5759 .unwrap()
5760 .unbounded_send(())
5761 .unwrap();
5762 operations += 1;
5763 }
5764
5765 if rng.lock().gen_bool(0.8) {
5766 cx.foreground().run_until_parked();
5767 }
5768 }
5769 }
5770 }
5771
5772 drop(op_start_signals);
5773 let mut clients = futures::future::join_all(clients).await;
5774 cx.foreground().run_until_parked();
5775
5776 let (host_client, mut host_cx, host_err) = clients.remove(0);
5777 if let Some(host_err) = host_err {
5778 panic!("host error - {}", host_err);
5779 }
5780 let host_project = host_client.project.as_ref().unwrap();
5781 let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
5782 project
5783 .worktrees(cx)
5784 .map(|worktree| {
5785 let snapshot = worktree.read(cx).snapshot();
5786 (snapshot.id(), snapshot)
5787 })
5788 .collect::<BTreeMap<_, _>>()
5789 });
5790
5791 host_client
5792 .project
5793 .as_ref()
5794 .unwrap()
5795 .read_with(&host_cx, |project, cx| project.check_invariants(cx));
5796
5797 for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
5798 if let Some(guest_err) = guest_err {
5799 panic!("{} error - {}", guest_client.username, guest_err);
5800 }
5801 let worktree_snapshots =
5802 guest_client
5803 .project
5804 .as_ref()
5805 .unwrap()
5806 .read_with(&guest_cx, |project, cx| {
5807 project
5808 .worktrees(cx)
5809 .map(|worktree| {
5810 let worktree = worktree.read(cx);
5811 (worktree.id(), worktree.snapshot())
5812 })
5813 .collect::<BTreeMap<_, _>>()
5814 });
5815
5816 assert_eq!(
5817 worktree_snapshots.keys().collect::<Vec<_>>(),
5818 host_worktree_snapshots.keys().collect::<Vec<_>>(),
5819 "{} has different worktrees than the host",
5820 guest_client.username
5821 );
5822 for (id, host_snapshot) in &host_worktree_snapshots {
5823 let guest_snapshot = &worktree_snapshots[id];
5824 assert_eq!(
5825 guest_snapshot.root_name(),
5826 host_snapshot.root_name(),
5827 "{} has different root name than the host for worktree {}",
5828 guest_client.username,
5829 id
5830 );
5831 assert_eq!(
5832 guest_snapshot.entries(false).collect::<Vec<_>>(),
5833 host_snapshot.entries(false).collect::<Vec<_>>(),
5834 "{} has different snapshot than the host for worktree {}",
5835 guest_client.username,
5836 id
5837 );
5838 assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id());
5839 }
5840
5841 guest_client
5842 .project
5843 .as_ref()
5844 .unwrap()
5845 .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
5846
5847 for guest_buffer in &guest_client.buffers {
5848 let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
5849 let host_buffer = host_project.read_with(&host_cx, |project, cx| {
5850 project.buffer_for_id(buffer_id, cx).expect(&format!(
5851 "host does not have buffer for guest:{}, peer:{}, id:{}",
5852 guest_client.username, guest_client.peer_id, buffer_id
5853 ))
5854 });
5855 let path = host_buffer
5856 .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
5857
5858 assert_eq!(
5859 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
5860 0,
5861 "{}, buffer {}, path {:?} has deferred operations",
5862 guest_client.username,
5863 buffer_id,
5864 path,
5865 );
5866 assert_eq!(
5867 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
5868 host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
5869 "{}, buffer {}, path {:?}, differs from the host's buffer",
5870 guest_client.username,
5871 buffer_id,
5872 path
5873 );
5874 }
5875
5876 guest_cx.update(|_| drop(guest_client));
5877 }
5878
5879 host_cx.update(|_| drop(host_client));
5880 }
5881
5882 struct TestServer {
5883 peer: Arc<Peer>,
5884 app_state: Arc<AppState>,
5885 server: Arc<Server>,
5886 foreground: Rc<executor::Foreground>,
5887 notifications: mpsc::UnboundedReceiver<()>,
5888 connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
5889 forbid_connections: Arc<AtomicBool>,
5890 _test_db: TestDb,
5891 }
5892
5893 impl TestServer {
5894 async fn start(
5895 foreground: Rc<executor::Foreground>,
5896 background: Arc<executor::Background>,
5897 ) -> Self {
5898 let test_db = TestDb::fake(background);
5899 let app_state = Self::build_app_state(&test_db).await;
5900 let peer = Peer::new();
5901 let notifications = mpsc::unbounded();
5902 let server = Server::new(app_state.clone(), Some(notifications.0));
5903 Self {
5904 peer,
5905 app_state,
5906 server,
5907 foreground,
5908 notifications: notifications.1,
5909 connection_killers: Default::default(),
5910 forbid_connections: Default::default(),
5911 _test_db: test_db,
5912 }
5913 }
5914
5915 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
5916 cx.update(|cx| {
5917 let settings = Settings::test(cx);
5918 cx.set_global(settings);
5919 });
5920
5921 let http = FakeHttpClient::with_404_response();
5922 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
5923 let client_name = name.to_string();
5924 let mut client = Client::new(http.clone());
5925 let server = self.server.clone();
5926 let connection_killers = self.connection_killers.clone();
5927 let forbid_connections = self.forbid_connections.clone();
5928 let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
5929
5930 Arc::get_mut(&mut client)
5931 .unwrap()
5932 .override_authenticate(move |cx| {
5933 cx.spawn(|_| async move {
5934 let access_token = "the-token".to_string();
5935 Ok(Credentials {
5936 user_id: user_id.0 as u64,
5937 access_token,
5938 })
5939 })
5940 })
5941 .override_establish_connection(move |credentials, cx| {
5942 assert_eq!(credentials.user_id, user_id.0 as u64);
5943 assert_eq!(credentials.access_token, "the-token");
5944
5945 let server = server.clone();
5946 let connection_killers = connection_killers.clone();
5947 let forbid_connections = forbid_connections.clone();
5948 let client_name = client_name.clone();
5949 let connection_id_tx = connection_id_tx.clone();
5950 cx.spawn(move |cx| async move {
5951 if forbid_connections.load(SeqCst) {
5952 Err(EstablishConnectionError::other(anyhow!(
5953 "server is forbidding connections"
5954 )))
5955 } else {
5956 let (client_conn, server_conn, killed) =
5957 Connection::in_memory(cx.background());
5958 connection_killers.lock().insert(user_id, killed);
5959 cx.background()
5960 .spawn(server.handle_connection(
5961 server_conn,
5962 client_name,
5963 user_id,
5964 Some(connection_id_tx),
5965 cx.background(),
5966 ))
5967 .detach();
5968 Ok(client_conn)
5969 }
5970 })
5971 });
5972
5973 client
5974 .authenticate_and_connect(false, &cx.to_async())
5975 .await
5976 .unwrap();
5977
5978 Channel::init(&client);
5979 Project::init(&client);
5980 cx.update(|cx| {
5981 workspace::init(&client, cx);
5982 });
5983
5984 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
5985 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
5986
5987 let client = TestClient {
5988 client,
5989 peer_id,
5990 username: name.to_string(),
5991 user_store,
5992 language_registry: Arc::new(LanguageRegistry::test()),
5993 project: Default::default(),
5994 buffers: Default::default(),
5995 };
5996 client.wait_for_current_user(cx).await;
5997 client
5998 }
5999
6000 fn disconnect_client(&self, user_id: UserId) {
6001 self.connection_killers
6002 .lock()
6003 .remove(&user_id)
6004 .unwrap()
6005 .store(true, SeqCst);
6006 }
6007
6008 fn forbid_connections(&self) {
6009 self.forbid_connections.store(true, SeqCst);
6010 }
6011
6012 fn allow_connections(&self) {
6013 self.forbid_connections.store(false, SeqCst);
6014 }
6015
6016 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
6017 Arc::new(AppState {
6018 db: test_db.db().clone(),
6019 api_token: Default::default(),
6020 })
6021 }
6022
6023 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
6024 self.server.store.read().await
6025 }
6026
6027 async fn condition<F>(&mut self, mut predicate: F)
6028 where
6029 F: FnMut(&Store) -> bool,
6030 {
6031 assert!(
6032 self.foreground.parking_forbidden(),
6033 "you must call forbid_parking to use server conditions so we don't block indefinitely"
6034 );
6035 while !(predicate)(&*self.server.store.read().await) {
6036 self.foreground.start_waiting();
6037 self.notifications.next().await;
6038 self.foreground.finish_waiting();
6039 }
6040 }
6041 }
6042
6043 impl Deref for TestServer {
6044 type Target = Server;
6045
6046 fn deref(&self) -> &Self::Target {
6047 &self.server
6048 }
6049 }
6050
6051 impl Drop for TestServer {
6052 fn drop(&mut self) {
6053 self.peer.reset();
6054 }
6055 }
6056
6057 struct TestClient {
6058 client: Arc<Client>,
6059 username: String,
6060 pub peer_id: PeerId,
6061 pub user_store: ModelHandle<UserStore>,
6062 language_registry: Arc<LanguageRegistry>,
6063 project: Option<ModelHandle<Project>>,
6064 buffers: HashSet<ModelHandle<language::Buffer>>,
6065 }
6066
6067 impl Deref for TestClient {
6068 type Target = Arc<Client>;
6069
6070 fn deref(&self) -> &Self::Target {
6071 &self.client
6072 }
6073 }
6074
6075 impl TestClient {
6076 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
6077 UserId::from_proto(
6078 self.user_store
6079 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
6080 )
6081 }
6082
6083 async fn wait_for_current_user(&self, cx: &TestAppContext) {
6084 let mut authed_user = self
6085 .user_store
6086 .read_with(cx, |user_store, _| user_store.watch_current_user());
6087 while authed_user.next().await.unwrap().is_none() {}
6088 }
6089
6090 async fn build_local_project(
6091 &mut self,
6092 fs: Arc<FakeFs>,
6093 root_path: impl AsRef<Path>,
6094 cx: &mut TestAppContext,
6095 ) -> (ModelHandle<Project>, WorktreeId) {
6096 let project = cx.update(|cx| {
6097 Project::local(
6098 self.client.clone(),
6099 self.user_store.clone(),
6100 self.language_registry.clone(),
6101 fs,
6102 cx,
6103 )
6104 });
6105 self.project = Some(project.clone());
6106 let (worktree, _) = project
6107 .update(cx, |p, cx| {
6108 p.find_or_create_local_worktree(root_path, true, cx)
6109 })
6110 .await
6111 .unwrap();
6112 worktree
6113 .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
6114 .await;
6115 project
6116 .update(cx, |project, _| project.next_remote_id())
6117 .await;
6118 (project, worktree.read_with(cx, |tree, _| tree.id()))
6119 }
6120
6121 async fn build_remote_project(
6122 &mut self,
6123 project_id: u64,
6124 cx: &mut TestAppContext,
6125 ) -> ModelHandle<Project> {
6126 let project = Project::remote(
6127 project_id,
6128 self.client.clone(),
6129 self.user_store.clone(),
6130 self.language_registry.clone(),
6131 FakeFs::new(cx.background()),
6132 &mut cx.to_async(),
6133 )
6134 .await
6135 .unwrap();
6136 self.project = Some(project.clone());
6137 project
6138 }
6139
6140 fn build_workspace(
6141 &self,
6142 project: &ModelHandle<Project>,
6143 cx: &mut TestAppContext,
6144 ) -> ViewHandle<Workspace> {
6145 let (window_id, _) = cx.add_window(|_| EmptyView);
6146 cx.add_view(window_id, |cx| {
6147 let fs = project.read(cx).fs().clone();
6148 Workspace::new(
6149 &WorkspaceParams {
6150 fs,
6151 project: project.clone(),
6152 user_store: self.user_store.clone(),
6153 languages: self.language_registry.clone(),
6154 themes: ThemeRegistry::new((), cx.font_cache().clone()),
6155 channel_list: cx.add_model(|cx| {
6156 ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
6157 }),
6158 client: self.client.clone(),
6159 },
6160 cx,
6161 )
6162 })
6163 }
6164
6165 async fn simulate_host(
6166 mut self,
6167 project: ModelHandle<Project>,
6168 files: Arc<Mutex<Vec<PathBuf>>>,
6169 op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6170 rng: Arc<Mutex<StdRng>>,
6171 mut cx: TestAppContext,
6172 ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6173 async fn simulate_host_internal(
6174 client: &mut TestClient,
6175 project: ModelHandle<Project>,
6176 files: Arc<Mutex<Vec<PathBuf>>>,
6177 mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6178 rng: Arc<Mutex<StdRng>>,
6179 cx: &mut TestAppContext,
6180 ) -> anyhow::Result<()> {
6181 let fs = project.read_with(cx, |project, _| project.fs().clone());
6182
6183 while op_start_signal.next().await.is_some() {
6184 let distribution = rng.lock().gen_range::<usize, _>(0..100);
6185 match distribution {
6186 0..=20 if !files.lock().is_empty() => {
6187 let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
6188 let mut path = path.as_path();
6189 while let Some(parent_path) = path.parent() {
6190 path = parent_path;
6191 if rng.lock().gen() {
6192 break;
6193 }
6194 }
6195
6196 log::info!("Host: find/create local worktree {:?}", path);
6197 let find_or_create_worktree = project.update(cx, |project, cx| {
6198 project.find_or_create_local_worktree(path, true, cx)
6199 });
6200 if rng.lock().gen() {
6201 cx.background().spawn(find_or_create_worktree).detach();
6202 } else {
6203 find_or_create_worktree.await?;
6204 }
6205 }
6206 10..=80 if !files.lock().is_empty() => {
6207 let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6208 let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
6209 let (worktree, path) = project
6210 .update(cx, |project, cx| {
6211 project.find_or_create_local_worktree(
6212 file.clone(),
6213 true,
6214 cx,
6215 )
6216 })
6217 .await?;
6218 let project_path =
6219 worktree.read_with(cx, |worktree, _| (worktree.id(), path));
6220 log::info!(
6221 "Host: opening path {:?}, worktree {}, relative_path {:?}",
6222 file,
6223 project_path.0,
6224 project_path.1
6225 );
6226 let buffer = project
6227 .update(cx, |project, cx| project.open_buffer(project_path, cx))
6228 .await
6229 .unwrap();
6230 client.buffers.insert(buffer.clone());
6231 buffer
6232 } else {
6233 client
6234 .buffers
6235 .iter()
6236 .choose(&mut *rng.lock())
6237 .unwrap()
6238 .clone()
6239 };
6240
6241 if rng.lock().gen_bool(0.1) {
6242 cx.update(|cx| {
6243 log::info!(
6244 "Host: dropping buffer {:?}",
6245 buffer.read(cx).file().unwrap().full_path(cx)
6246 );
6247 client.buffers.remove(&buffer);
6248 drop(buffer);
6249 });
6250 } else {
6251 buffer.update(cx, |buffer, cx| {
6252 log::info!(
6253 "Host: updating buffer {:?} ({})",
6254 buffer.file().unwrap().full_path(cx),
6255 buffer.remote_id()
6256 );
6257
6258 if rng.lock().gen_bool(0.7) {
6259 buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6260 } else {
6261 buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6262 }
6263 });
6264 }
6265 }
6266 _ => loop {
6267 let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
6268 let mut path = PathBuf::new();
6269 path.push("/");
6270 for _ in 0..path_component_count {
6271 let letter = rng.lock().gen_range(b'a'..=b'z');
6272 path.push(std::str::from_utf8(&[letter]).unwrap());
6273 }
6274 path.set_extension("rs");
6275 let parent_path = path.parent().unwrap();
6276
6277 log::info!("Host: creating file {:?}", path,);
6278
6279 if fs.create_dir(&parent_path).await.is_ok()
6280 && fs.create_file(&path, Default::default()).await.is_ok()
6281 {
6282 files.lock().push(path);
6283 break;
6284 } else {
6285 log::info!("Host: cannot create file");
6286 }
6287 },
6288 }
6289
6290 cx.background().simulate_random_delay().await;
6291 }
6292
6293 Ok(())
6294 }
6295
6296 let result = simulate_host_internal(
6297 &mut self,
6298 project.clone(),
6299 files,
6300 op_start_signal,
6301 rng,
6302 &mut cx,
6303 )
6304 .await;
6305 log::info!("Host done");
6306 self.project = Some(project);
6307 (self, cx, result.err())
6308 }
6309
6310 pub async fn simulate_guest(
6311 mut self,
6312 guest_username: String,
6313 project: ModelHandle<Project>,
6314 op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6315 rng: Arc<Mutex<StdRng>>,
6316 mut cx: TestAppContext,
6317 ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6318 async fn simulate_guest_internal(
6319 client: &mut TestClient,
6320 guest_username: &str,
6321 project: ModelHandle<Project>,
6322 mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6323 rng: Arc<Mutex<StdRng>>,
6324 cx: &mut TestAppContext,
6325 ) -> anyhow::Result<()> {
6326 while op_start_signal.next().await.is_some() {
6327 let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6328 let worktree = if let Some(worktree) =
6329 project.read_with(cx, |project, cx| {
6330 project
6331 .worktrees(&cx)
6332 .filter(|worktree| {
6333 let worktree = worktree.read(cx);
6334 worktree.is_visible()
6335 && worktree.entries(false).any(|e| e.is_file())
6336 })
6337 .choose(&mut *rng.lock())
6338 }) {
6339 worktree
6340 } else {
6341 cx.background().simulate_random_delay().await;
6342 continue;
6343 };
6344
6345 let (worktree_root_name, project_path) =
6346 worktree.read_with(cx, |worktree, _| {
6347 let entry = worktree
6348 .entries(false)
6349 .filter(|e| e.is_file())
6350 .choose(&mut *rng.lock())
6351 .unwrap();
6352 (
6353 worktree.root_name().to_string(),
6354 (worktree.id(), entry.path.clone()),
6355 )
6356 });
6357 log::info!(
6358 "{}: opening path {:?} in worktree {} ({})",
6359 guest_username,
6360 project_path.1,
6361 project_path.0,
6362 worktree_root_name,
6363 );
6364 let buffer = project
6365 .update(cx, |project, cx| {
6366 project.open_buffer(project_path.clone(), cx)
6367 })
6368 .await?;
6369 log::info!(
6370 "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
6371 guest_username,
6372 project_path.1,
6373 project_path.0,
6374 worktree_root_name,
6375 buffer.read_with(cx, |buffer, _| buffer.remote_id())
6376 );
6377 client.buffers.insert(buffer.clone());
6378 buffer
6379 } else {
6380 client
6381 .buffers
6382 .iter()
6383 .choose(&mut *rng.lock())
6384 .unwrap()
6385 .clone()
6386 };
6387
6388 let choice = rng.lock().gen_range(0..100);
6389 match choice {
6390 0..=9 => {
6391 cx.update(|cx| {
6392 log::info!(
6393 "{}: dropping buffer {:?}",
6394 guest_username,
6395 buffer.read(cx).file().unwrap().full_path(cx)
6396 );
6397 client.buffers.remove(&buffer);
6398 drop(buffer);
6399 });
6400 }
6401 10..=19 => {
6402 let completions = project.update(cx, |project, cx| {
6403 log::info!(
6404 "{}: requesting completions for buffer {} ({:?})",
6405 guest_username,
6406 buffer.read(cx).remote_id(),
6407 buffer.read(cx).file().unwrap().full_path(cx)
6408 );
6409 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6410 project.completions(&buffer, offset, cx)
6411 });
6412 let completions = cx.background().spawn(async move {
6413 completions
6414 .await
6415 .map_err(|err| anyhow!("completions request failed: {:?}", err))
6416 });
6417 if rng.lock().gen_bool(0.3) {
6418 log::info!("{}: detaching completions request", guest_username);
6419 cx.update(|cx| completions.detach_and_log_err(cx));
6420 } else {
6421 completions.await?;
6422 }
6423 }
6424 20..=29 => {
6425 let code_actions = project.update(cx, |project, cx| {
6426 log::info!(
6427 "{}: requesting code actions for buffer {} ({:?})",
6428 guest_username,
6429 buffer.read(cx).remote_id(),
6430 buffer.read(cx).file().unwrap().full_path(cx)
6431 );
6432 let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
6433 project.code_actions(&buffer, range, cx)
6434 });
6435 let code_actions = cx.background().spawn(async move {
6436 code_actions.await.map_err(|err| {
6437 anyhow!("code actions request failed: {:?}", err)
6438 })
6439 });
6440 if rng.lock().gen_bool(0.3) {
6441 log::info!("{}: detaching code actions request", guest_username);
6442 cx.update(|cx| code_actions.detach_and_log_err(cx));
6443 } else {
6444 code_actions.await?;
6445 }
6446 }
6447 30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
6448 let (requested_version, save) = buffer.update(cx, |buffer, cx| {
6449 log::info!(
6450 "{}: saving buffer {} ({:?})",
6451 guest_username,
6452 buffer.remote_id(),
6453 buffer.file().unwrap().full_path(cx)
6454 );
6455 (buffer.version(), buffer.save(cx))
6456 });
6457 let save = cx.background().spawn(async move {
6458 let (saved_version, _) = save
6459 .await
6460 .map_err(|err| anyhow!("save request failed: {:?}", err))?;
6461 assert!(saved_version.observed_all(&requested_version));
6462 Ok::<_, anyhow::Error>(())
6463 });
6464 if rng.lock().gen_bool(0.3) {
6465 log::info!("{}: detaching save request", guest_username);
6466 cx.update(|cx| save.detach_and_log_err(cx));
6467 } else {
6468 save.await?;
6469 }
6470 }
6471 40..=44 => {
6472 let prepare_rename = project.update(cx, |project, cx| {
6473 log::info!(
6474 "{}: preparing rename for buffer {} ({:?})",
6475 guest_username,
6476 buffer.read(cx).remote_id(),
6477 buffer.read(cx).file().unwrap().full_path(cx)
6478 );
6479 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6480 project.prepare_rename(buffer, offset, cx)
6481 });
6482 let prepare_rename = cx.background().spawn(async move {
6483 prepare_rename.await.map_err(|err| {
6484 anyhow!("prepare rename request failed: {:?}", err)
6485 })
6486 });
6487 if rng.lock().gen_bool(0.3) {
6488 log::info!("{}: detaching prepare rename request", guest_username);
6489 cx.update(|cx| prepare_rename.detach_and_log_err(cx));
6490 } else {
6491 prepare_rename.await?;
6492 }
6493 }
6494 45..=49 => {
6495 let definitions = project.update(cx, |project, cx| {
6496 log::info!(
6497 "{}: requesting definitions for buffer {} ({:?})",
6498 guest_username,
6499 buffer.read(cx).remote_id(),
6500 buffer.read(cx).file().unwrap().full_path(cx)
6501 );
6502 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6503 project.definition(&buffer, offset, cx)
6504 });
6505 let definitions = cx.background().spawn(async move {
6506 definitions
6507 .await
6508 .map_err(|err| anyhow!("definitions request failed: {:?}", err))
6509 });
6510 if rng.lock().gen_bool(0.3) {
6511 log::info!("{}: detaching definitions request", guest_username);
6512 cx.update(|cx| definitions.detach_and_log_err(cx));
6513 } else {
6514 client
6515 .buffers
6516 .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
6517 }
6518 }
6519 50..=54 => {
6520 let highlights = project.update(cx, |project, cx| {
6521 log::info!(
6522 "{}: requesting highlights for buffer {} ({:?})",
6523 guest_username,
6524 buffer.read(cx).remote_id(),
6525 buffer.read(cx).file().unwrap().full_path(cx)
6526 );
6527 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6528 project.document_highlights(&buffer, offset, cx)
6529 });
6530 let highlights = cx.background().spawn(async move {
6531 highlights
6532 .await
6533 .map_err(|err| anyhow!("highlights request failed: {:?}", err))
6534 });
6535 if rng.lock().gen_bool(0.3) {
6536 log::info!("{}: detaching highlights request", guest_username);
6537 cx.update(|cx| highlights.detach_and_log_err(cx));
6538 } else {
6539 highlights.await?;
6540 }
6541 }
6542 55..=59 => {
6543 let search = project.update(cx, |project, cx| {
6544 let query = rng.lock().gen_range('a'..='z');
6545 log::info!("{}: project-wide search {:?}", guest_username, query);
6546 project.search(SearchQuery::text(query, false, false), cx)
6547 });
6548 let search = cx.background().spawn(async move {
6549 search
6550 .await
6551 .map_err(|err| anyhow!("search request failed: {:?}", err))
6552 });
6553 if rng.lock().gen_bool(0.3) {
6554 log::info!("{}: detaching search request", guest_username);
6555 cx.update(|cx| search.detach_and_log_err(cx));
6556 } else {
6557 client.buffers.extend(search.await?.into_keys());
6558 }
6559 }
6560 60..=69 => {
6561 let worktree = project
6562 .read_with(cx, |project, cx| {
6563 project
6564 .worktrees(&cx)
6565 .filter(|worktree| {
6566 let worktree = worktree.read(cx);
6567 worktree.is_visible()
6568 && worktree.entries(false).any(|e| e.is_file())
6569 && worktree
6570 .root_entry()
6571 .map_or(false, |e| e.is_dir())
6572 })
6573 .choose(&mut *rng.lock())
6574 })
6575 .unwrap();
6576 let (worktree_id, worktree_root_name) = worktree
6577 .read_with(cx, |worktree, _| {
6578 (worktree.id(), worktree.root_name().to_string())
6579 });
6580
6581 let mut new_name = String::new();
6582 for _ in 0..10 {
6583 let letter = rng.lock().gen_range('a'..='z');
6584 new_name.push(letter);
6585 }
6586 let mut new_path = PathBuf::new();
6587 new_path.push(new_name);
6588 new_path.set_extension("rs");
6589 log::info!(
6590 "{}: creating {:?} in worktree {} ({})",
6591 guest_username,
6592 new_path,
6593 worktree_id,
6594 worktree_root_name,
6595 );
6596 project
6597 .update(cx, |project, cx| {
6598 project.create_entry((worktree_id, new_path), false, cx)
6599 })
6600 .unwrap()
6601 .await?;
6602 }
6603 _ => {
6604 buffer.update(cx, |buffer, cx| {
6605 log::info!(
6606 "{}: updating buffer {} ({:?})",
6607 guest_username,
6608 buffer.remote_id(),
6609 buffer.file().unwrap().full_path(cx)
6610 );
6611 if rng.lock().gen_bool(0.7) {
6612 buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6613 } else {
6614 buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6615 }
6616 });
6617 }
6618 }
6619 cx.background().simulate_random_delay().await;
6620 }
6621 Ok(())
6622 }
6623
6624 let result = simulate_guest_internal(
6625 &mut self,
6626 &guest_username,
6627 project.clone(),
6628 op_start_signal,
6629 rng,
6630 &mut cx,
6631 )
6632 .await;
6633 log::info!("{}: done", guest_username);
6634
6635 self.project = Some(project);
6636 (self, cx, result.err())
6637 }
6638 }
6639
6640 impl Drop for TestClient {
6641 fn drop(&mut self) {
6642 self.client.tear_down();
6643 }
6644 }
6645
6646 impl Executor for Arc<gpui::executor::Background> {
6647 type Sleep = gpui::executor::Timer;
6648
6649 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
6650 self.spawn(future).detach();
6651 }
6652
6653 fn sleep(&self, duration: Duration) -> Self::Sleep {
6654 self.as_ref().timer(duration)
6655 }
6656 }
6657
6658 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
6659 channel
6660 .messages()
6661 .cursor::<()>()
6662 .map(|m| {
6663 (
6664 m.sender.github_login.clone(),
6665 m.body.clone(),
6666 m.is_pending(),
6667 )
6668 })
6669 .collect()
6670 }
6671
6672 struct EmptyView;
6673
6674 impl gpui::Entity for EmptyView {
6675 type Event = ();
6676 }
6677
6678 impl gpui::View for EmptyView {
6679 fn ui_name() -> &'static str {
6680 "empty view"
6681 }
6682
6683 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
6684 gpui::Element::boxed(gpui::elements::Empty)
6685 }
6686 }
6687}