1mod store;
2
3use crate::{
4 auth,
5 db::{ChannelId, MessageId, UserId},
6 AppState, Result,
7};
8use anyhow::anyhow;
9use async_tungstenite::tungstenite::{
10 protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
11};
12use axum::{
13 body::Body,
14 extract::{
15 ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
16 ConnectInfo, WebSocketUpgrade,
17 },
18 headers::{Header, HeaderName},
19 http::StatusCode,
20 middleware,
21 response::{IntoResponse, Response},
22 routing::get,
23 Extension, Router, TypedHeader,
24};
25use collections::{HashMap, HashSet};
26use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
27use lazy_static::lazy_static;
28use rpc::{
29 proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
30 Connection, ConnectionId, Peer, TypedEnvelope,
31};
32use std::{
33 any::TypeId,
34 future::Future,
35 marker::PhantomData,
36 net::SocketAddr,
37 ops::{Deref, DerefMut},
38 rc::Rc,
39 sync::Arc,
40 time::Duration,
41};
42use store::{Store, Worktree};
43use time::OffsetDateTime;
44use tokio::{
45 sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
46 time::Sleep,
47};
48use tower::ServiceBuilder;
49use tracing::{info_span, instrument, Instrument};
50
51type MessageHandler =
52 Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
53
54pub struct Server {
55 peer: Arc<Peer>,
56 store: RwLock<Store>,
57 app_state: Arc<AppState>,
58 handlers: HashMap<TypeId, MessageHandler>,
59 notifications: Option<mpsc::UnboundedSender<()>>,
60}
61
62pub trait Executor: Send + Clone {
63 type Sleep: Send + Future;
64 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
65 fn sleep(&self, duration: Duration) -> Self::Sleep;
66}
67
68#[derive(Clone)]
69pub struct RealExecutor;
70
71const MESSAGE_COUNT_PER_PAGE: usize = 100;
72const MAX_MESSAGE_LEN: usize = 1024;
73
74struct StoreReadGuard<'a> {
75 guard: RwLockReadGuard<'a, Store>,
76 _not_send: PhantomData<Rc<()>>,
77}
78
79struct StoreWriteGuard<'a> {
80 guard: RwLockWriteGuard<'a, Store>,
81 _not_send: PhantomData<Rc<()>>,
82}
83
84impl Server {
85 pub fn new(
86 app_state: Arc<AppState>,
87 notifications: Option<mpsc::UnboundedSender<()>>,
88 ) -> Arc<Self> {
89 let mut server = Self {
90 peer: Peer::new(),
91 app_state,
92 store: Default::default(),
93 handlers: Default::default(),
94 notifications,
95 };
96
97 server
98 .add_request_handler(Server::ping)
99 .add_request_handler(Server::register_project)
100 .add_message_handler(Server::unregister_project)
101 .add_request_handler(Server::share_project)
102 .add_message_handler(Server::unshare_project)
103 .add_sync_request_handler(Server::join_project)
104 .add_message_handler(Server::leave_project)
105 .add_request_handler(Server::register_worktree)
106 .add_message_handler(Server::unregister_worktree)
107 .add_request_handler(Server::update_worktree)
108 .add_message_handler(Server::start_language_server)
109 .add_message_handler(Server::update_language_server)
110 .add_message_handler(Server::update_diagnostic_summary)
111 .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
112 .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
113 .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
114 .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
115 .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
116 .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
117 .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
118 .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
119 .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
120 .add_request_handler(
121 Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
122 )
123 .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
124 .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
125 .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
126 .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
127 .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
128 .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
129 .add_request_handler(Server::forward_project_request::<proto::CreateProjectEntry>)
130 .add_request_handler(Server::forward_project_request::<proto::RenameProjectEntry>)
131 .add_request_handler(Server::forward_project_request::<proto::DeleteProjectEntry>)
132 .add_request_handler(Server::update_buffer)
133 .add_message_handler(Server::update_buffer_file)
134 .add_message_handler(Server::buffer_reloaded)
135 .add_message_handler(Server::buffer_saved)
136 .add_request_handler(Server::save_buffer)
137 .add_request_handler(Server::get_channels)
138 .add_request_handler(Server::get_users)
139 .add_request_handler(Server::fuzzy_search_users)
140 .add_request_handler(Server::join_channel)
141 .add_message_handler(Server::leave_channel)
142 .add_request_handler(Server::send_channel_message)
143 .add_request_handler(Server::follow)
144 .add_message_handler(Server::unfollow)
145 .add_message_handler(Server::update_followers)
146 .add_request_handler(Server::get_channel_messages);
147
148 Arc::new(server)
149 }
150
151 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
152 where
153 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
154 Fut: 'static + Send + Future<Output = Result<()>>,
155 M: EnvelopedMessage,
156 {
157 let prev_handler = self.handlers.insert(
158 TypeId::of::<M>(),
159 Box::new(move |server, envelope| {
160 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
161 let span = info_span!(
162 "handle message",
163 payload_type = envelope.payload_type_name(),
164 payload = format!("{:?}", envelope.payload).as_str(),
165 );
166 let future = (handler)(server, *envelope);
167 async move {
168 if let Err(error) = future.await {
169 tracing::error!(%error, "error handling message");
170 }
171 }
172 .instrument(span)
173 .boxed()
174 }),
175 );
176 if prev_handler.is_some() {
177 panic!("registered a handler for the same message twice");
178 }
179 self
180 }
181
182 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
183 where
184 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
185 Fut: 'static + Send + Future<Output = Result<M::Response>>,
186 M: RequestMessage,
187 {
188 self.add_message_handler(move |server, envelope| {
189 let receipt = envelope.receipt();
190 let response = (handler)(server.clone(), envelope);
191 async move {
192 match response.await {
193 Ok(response) => {
194 server.peer.respond(receipt, response)?;
195 Ok(())
196 }
197 Err(error) => {
198 server.peer.respond_with_error(
199 receipt,
200 proto::Error {
201 message: error.to_string(),
202 },
203 )?;
204 Err(error)
205 }
206 }
207 }
208 })
209 }
210
211 /// Handle a request while holding a lock to the store. This is useful when we're registering
212 /// a connection but we want to respond on the connection before anybody else can send on it.
213 fn add_sync_request_handler<F, M>(&mut self, handler: F) -> &mut Self
214 where
215 F: 'static
216 + Send
217 + Sync
218 + Fn(Arc<Self>, &mut Store, TypedEnvelope<M>) -> Result<M::Response>,
219 M: RequestMessage,
220 {
221 let handler = Arc::new(handler);
222 self.add_message_handler(move |server, envelope| {
223 let receipt = envelope.receipt();
224 let handler = handler.clone();
225 async move {
226 let mut store = server.state_mut().await;
227 let response = (handler)(server.clone(), &mut *store, envelope);
228 match response {
229 Ok(response) => {
230 server.peer.respond(receipt, response)?;
231 Ok(())
232 }
233 Err(error) => {
234 server.peer.respond_with_error(
235 receipt,
236 proto::Error {
237 message: error.to_string(),
238 },
239 )?;
240 Err(error)
241 }
242 }
243 }
244 })
245 }
246
247 pub fn handle_connection<E: Executor>(
248 self: &Arc<Self>,
249 connection: Connection,
250 address: String,
251 user_id: UserId,
252 mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
253 executor: E,
254 ) -> impl Future<Output = ()> {
255 let mut this = self.clone();
256 let span = info_span!("handle connection", %user_id, %address);
257 async move {
258 let (connection_id, handle_io, mut incoming_rx) = this
259 .peer
260 .add_connection(connection, {
261 let executor = executor.clone();
262 move |duration| {
263 let timer = executor.sleep(duration);
264 async move {
265 timer.await;
266 }
267 }
268 })
269 .await;
270
271 tracing::info!(%user_id, %connection_id, %address, "connection opened");
272
273 if let Some(send_connection_id) = send_connection_id.as_mut() {
274 let _ = send_connection_id.send(connection_id).await;
275 }
276
277 {
278 let mut state = this.state_mut().await;
279 state.add_connection(connection_id, user_id);
280 this.update_contacts_for_users(&*state, &[user_id]);
281 }
282
283 let handle_io = handle_io.fuse();
284 futures::pin_mut!(handle_io);
285 loop {
286 let next_message = incoming_rx.next().fuse();
287 futures::pin_mut!(next_message);
288 futures::select_biased! {
289 result = handle_io => {
290 if let Err(error) = result {
291 tracing::error!(%error, "error handling I/O");
292 }
293 break;
294 }
295 message = next_message => {
296 if let Some(message) = message {
297 let type_name = message.payload_type_name();
298 let span = tracing::info_span!("receive message", %user_id, %connection_id, %address, type_name);
299 async {
300 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
301 let notifications = this.notifications.clone();
302 let is_background = message.is_background();
303 let handle_message = (handler)(this.clone(), message);
304 let handle_message = async move {
305 handle_message.await;
306 if let Some(mut notifications) = notifications {
307 let _ = notifications.send(()).await;
308 }
309 };
310 if is_background {
311 executor.spawn_detached(handle_message);
312 } else {
313 handle_message.await;
314 }
315 } else {
316 tracing::error!("no message handler");
317 }
318 }.instrument(span).await;
319 } else {
320 tracing::info!(%user_id, %connection_id, %address, "connection closed");
321 break;
322 }
323 }
324 }
325 }
326
327 if let Err(error) = this.sign_out(connection_id).await {
328 tracing::error!(%error, "error signing out");
329 }
330 }.instrument(span)
331 }
332
333 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
334 self.peer.disconnect(connection_id);
335 let mut state = self.state_mut().await;
336 let removed_connection = state.remove_connection(connection_id)?;
337
338 for (project_id, project) in removed_connection.hosted_projects {
339 if let Some(share) = project.share {
340 broadcast(
341 connection_id,
342 share.guests.keys().copied().collect(),
343 |conn_id| {
344 self.peer
345 .send(conn_id, proto::UnshareProject { project_id })
346 },
347 );
348 }
349 }
350
351 for (project_id, peer_ids) in removed_connection.guest_project_ids {
352 broadcast(connection_id, peer_ids, |conn_id| {
353 self.peer.send(
354 conn_id,
355 proto::RemoveProjectCollaborator {
356 project_id,
357 peer_id: connection_id.0,
358 },
359 )
360 });
361 }
362
363 self.update_contacts_for_users(&*state, removed_connection.contact_ids.iter());
364 Ok(())
365 }
366
367 async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> Result<proto::Ack> {
368 Ok(proto::Ack {})
369 }
370
371 async fn register_project(
372 self: Arc<Server>,
373 request: TypedEnvelope<proto::RegisterProject>,
374 ) -> Result<proto::RegisterProjectResponse> {
375 let project_id = {
376 let mut state = self.state_mut().await;
377 let user_id = state.user_id_for_connection(request.sender_id)?;
378 state.register_project(request.sender_id, user_id)
379 };
380 Ok(proto::RegisterProjectResponse { project_id })
381 }
382
383 async fn unregister_project(
384 self: Arc<Server>,
385 request: TypedEnvelope<proto::UnregisterProject>,
386 ) -> Result<()> {
387 let mut state = self.state_mut().await;
388 let project = state.unregister_project(request.payload.project_id, request.sender_id)?;
389 self.update_contacts_for_users(&*state, &project.authorized_user_ids());
390 Ok(())
391 }
392
393 async fn share_project(
394 self: Arc<Server>,
395 request: TypedEnvelope<proto::ShareProject>,
396 ) -> Result<proto::Ack> {
397 let mut state = self.state_mut().await;
398 let project = state.share_project(request.payload.project_id, request.sender_id)?;
399 self.update_contacts_for_users(&mut *state, &project.authorized_user_ids);
400 Ok(proto::Ack {})
401 }
402
403 async fn unshare_project(
404 self: Arc<Server>,
405 request: TypedEnvelope<proto::UnshareProject>,
406 ) -> Result<()> {
407 let project_id = request.payload.project_id;
408 let mut state = self.state_mut().await;
409 let project = state.unshare_project(project_id, request.sender_id)?;
410 broadcast(request.sender_id, project.connection_ids, |conn_id| {
411 self.peer
412 .send(conn_id, proto::UnshareProject { project_id })
413 });
414 self.update_contacts_for_users(&mut *state, &project.authorized_user_ids);
415 Ok(())
416 }
417
418 fn join_project(
419 self: Arc<Server>,
420 state: &mut Store,
421 request: TypedEnvelope<proto::JoinProject>,
422 ) -> Result<proto::JoinProjectResponse> {
423 let project_id = request.payload.project_id;
424
425 let user_id = state.user_id_for_connection(request.sender_id)?;
426 let (response, connection_ids, contact_user_ids) = state
427 .join_project(request.sender_id, user_id, project_id)
428 .and_then(|joined| {
429 let share = joined.project.share()?;
430 let peer_count = share.guests.len();
431 let mut collaborators = Vec::with_capacity(peer_count);
432 collaborators.push(proto::Collaborator {
433 peer_id: joined.project.host_connection_id.0,
434 replica_id: 0,
435 user_id: joined.project.host_user_id.to_proto(),
436 });
437 let worktrees = share
438 .worktrees
439 .iter()
440 .filter_map(|(id, shared_worktree)| {
441 let worktree = joined.project.worktrees.get(&id)?;
442 Some(proto::Worktree {
443 id: *id,
444 root_name: worktree.root_name.clone(),
445 entries: shared_worktree.entries.values().cloned().collect(),
446 diagnostic_summaries: shared_worktree
447 .diagnostic_summaries
448 .values()
449 .cloned()
450 .collect(),
451 visible: worktree.visible,
452 scan_id: shared_worktree.scan_id,
453 })
454 })
455 .collect();
456 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
457 if *peer_conn_id != request.sender_id {
458 collaborators.push(proto::Collaborator {
459 peer_id: peer_conn_id.0,
460 replica_id: *peer_replica_id as u32,
461 user_id: peer_user_id.to_proto(),
462 });
463 }
464 }
465 let response = proto::JoinProjectResponse {
466 worktrees,
467 replica_id: joined.replica_id as u32,
468 collaborators,
469 language_servers: joined.project.language_servers.clone(),
470 };
471 let connection_ids = joined.project.connection_ids();
472 let contact_user_ids = joined.project.authorized_user_ids();
473 Ok((response, connection_ids, contact_user_ids))
474 })?;
475
476 broadcast(request.sender_id, connection_ids, |conn_id| {
477 self.peer.send(
478 conn_id,
479 proto::AddProjectCollaborator {
480 project_id,
481 collaborator: Some(proto::Collaborator {
482 peer_id: request.sender_id.0,
483 replica_id: response.replica_id,
484 user_id: user_id.to_proto(),
485 }),
486 },
487 )
488 });
489 self.update_contacts_for_users(state, &contact_user_ids);
490 Ok(response)
491 }
492
493 async fn leave_project(
494 self: Arc<Server>,
495 request: TypedEnvelope<proto::LeaveProject>,
496 ) -> Result<()> {
497 let sender_id = request.sender_id;
498 let project_id = request.payload.project_id;
499 let mut state = self.state_mut().await;
500 let worktree = state.leave_project(sender_id, project_id)?;
501 broadcast(sender_id, worktree.connection_ids, |conn_id| {
502 self.peer.send(
503 conn_id,
504 proto::RemoveProjectCollaborator {
505 project_id,
506 peer_id: sender_id.0,
507 },
508 )
509 });
510 self.update_contacts_for_users(&*state, &worktree.authorized_user_ids);
511 Ok(())
512 }
513
514 async fn register_worktree(
515 self: Arc<Server>,
516 request: TypedEnvelope<proto::RegisterWorktree>,
517 ) -> Result<proto::Ack> {
518 let mut contact_user_ids = HashSet::default();
519 for github_login in &request.payload.authorized_logins {
520 let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
521 contact_user_ids.insert(contact_user_id);
522 }
523
524 let mut state = self.state_mut().await;
525 let host_user_id = state.user_id_for_connection(request.sender_id)?;
526 contact_user_ids.insert(host_user_id);
527
528 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
529 let guest_connection_ids = state
530 .read_project(request.payload.project_id, request.sender_id)?
531 .guest_connection_ids();
532 state.register_worktree(
533 request.payload.project_id,
534 request.payload.worktree_id,
535 request.sender_id,
536 Worktree {
537 authorized_user_ids: contact_user_ids.clone(),
538 root_name: request.payload.root_name.clone(),
539 visible: request.payload.visible,
540 },
541 )?;
542
543 broadcast(request.sender_id, guest_connection_ids, |connection_id| {
544 self.peer
545 .forward_send(request.sender_id, connection_id, request.payload.clone())
546 });
547 self.update_contacts_for_users(&*state, &contact_user_ids);
548 Ok(proto::Ack {})
549 }
550
551 async fn unregister_worktree(
552 self: Arc<Server>,
553 request: TypedEnvelope<proto::UnregisterWorktree>,
554 ) -> Result<()> {
555 let project_id = request.payload.project_id;
556 let worktree_id = request.payload.worktree_id;
557 let mut state = self.state_mut().await;
558 let (worktree, guest_connection_ids) =
559 state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
560 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
561 self.peer.send(
562 conn_id,
563 proto::UnregisterWorktree {
564 project_id,
565 worktree_id,
566 },
567 )
568 });
569 self.update_contacts_for_users(&*state, &worktree.authorized_user_ids);
570 Ok(())
571 }
572
573 async fn update_worktree(
574 self: Arc<Server>,
575 request: TypedEnvelope<proto::UpdateWorktree>,
576 ) -> Result<proto::Ack> {
577 let connection_ids = self.state_mut().await.update_worktree(
578 request.sender_id,
579 request.payload.project_id,
580 request.payload.worktree_id,
581 &request.payload.removed_entries,
582 &request.payload.updated_entries,
583 request.payload.scan_id,
584 )?;
585
586 broadcast(request.sender_id, connection_ids, |connection_id| {
587 self.peer
588 .forward_send(request.sender_id, connection_id, request.payload.clone())
589 });
590
591 Ok(proto::Ack {})
592 }
593
594 async fn update_diagnostic_summary(
595 self: Arc<Server>,
596 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
597 ) -> Result<()> {
598 let summary = request
599 .payload
600 .summary
601 .clone()
602 .ok_or_else(|| anyhow!("invalid summary"))?;
603 let receiver_ids = self.state_mut().await.update_diagnostic_summary(
604 request.payload.project_id,
605 request.payload.worktree_id,
606 request.sender_id,
607 summary,
608 )?;
609
610 broadcast(request.sender_id, receiver_ids, |connection_id| {
611 self.peer
612 .forward_send(request.sender_id, connection_id, request.payload.clone())
613 });
614 Ok(())
615 }
616
617 async fn start_language_server(
618 self: Arc<Server>,
619 request: TypedEnvelope<proto::StartLanguageServer>,
620 ) -> Result<()> {
621 let receiver_ids = self.state_mut().await.start_language_server(
622 request.payload.project_id,
623 request.sender_id,
624 request
625 .payload
626 .server
627 .clone()
628 .ok_or_else(|| anyhow!("invalid language server"))?,
629 )?;
630 broadcast(request.sender_id, receiver_ids, |connection_id| {
631 self.peer
632 .forward_send(request.sender_id, connection_id, request.payload.clone())
633 });
634 Ok(())
635 }
636
637 async fn update_language_server(
638 self: Arc<Server>,
639 request: TypedEnvelope<proto::UpdateLanguageServer>,
640 ) -> Result<()> {
641 let receiver_ids = self
642 .state()
643 .await
644 .project_connection_ids(request.payload.project_id, request.sender_id)?;
645 broadcast(request.sender_id, receiver_ids, |connection_id| {
646 self.peer
647 .forward_send(request.sender_id, connection_id, request.payload.clone())
648 });
649 Ok(())
650 }
651
652 async fn forward_project_request<T>(
653 self: Arc<Server>,
654 request: TypedEnvelope<T>,
655 ) -> Result<T::Response>
656 where
657 T: EntityMessage + RequestMessage,
658 {
659 let host_connection_id = self
660 .state()
661 .await
662 .read_project(request.payload.remote_entity_id(), request.sender_id)?
663 .host_connection_id;
664 Ok(self
665 .peer
666 .forward_request(request.sender_id, host_connection_id, request.payload)
667 .await?)
668 }
669
670 async fn save_buffer(
671 self: Arc<Server>,
672 request: TypedEnvelope<proto::SaveBuffer>,
673 ) -> Result<proto::BufferSaved> {
674 let host = self
675 .state()
676 .await
677 .read_project(request.payload.project_id, request.sender_id)?
678 .host_connection_id;
679 let response = self
680 .peer
681 .forward_request(request.sender_id, host, request.payload.clone())
682 .await?;
683
684 let mut guests = self
685 .state()
686 .await
687 .read_project(request.payload.project_id, request.sender_id)?
688 .connection_ids();
689 guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
690 broadcast(host, guests, |conn_id| {
691 self.peer.forward_send(host, conn_id, response.clone())
692 });
693
694 Ok(response)
695 }
696
697 async fn update_buffer(
698 self: Arc<Server>,
699 request: TypedEnvelope<proto::UpdateBuffer>,
700 ) -> Result<proto::Ack> {
701 let receiver_ids = self
702 .state()
703 .await
704 .project_connection_ids(request.payload.project_id, request.sender_id)?;
705 broadcast(request.sender_id, receiver_ids, |connection_id| {
706 self.peer
707 .forward_send(request.sender_id, connection_id, request.payload.clone())
708 });
709 Ok(proto::Ack {})
710 }
711
712 async fn update_buffer_file(
713 self: Arc<Server>,
714 request: TypedEnvelope<proto::UpdateBufferFile>,
715 ) -> Result<()> {
716 let receiver_ids = self
717 .state()
718 .await
719 .project_connection_ids(request.payload.project_id, request.sender_id)?;
720 broadcast(request.sender_id, receiver_ids, |connection_id| {
721 self.peer
722 .forward_send(request.sender_id, connection_id, request.payload.clone())
723 });
724 Ok(())
725 }
726
727 async fn buffer_reloaded(
728 self: Arc<Server>,
729 request: TypedEnvelope<proto::BufferReloaded>,
730 ) -> Result<()> {
731 let receiver_ids = self
732 .state()
733 .await
734 .project_connection_ids(request.payload.project_id, request.sender_id)?;
735 broadcast(request.sender_id, receiver_ids, |connection_id| {
736 self.peer
737 .forward_send(request.sender_id, connection_id, request.payload.clone())
738 });
739 Ok(())
740 }
741
742 async fn buffer_saved(
743 self: Arc<Server>,
744 request: TypedEnvelope<proto::BufferSaved>,
745 ) -> Result<()> {
746 let receiver_ids = self
747 .state()
748 .await
749 .project_connection_ids(request.payload.project_id, request.sender_id)?;
750 broadcast(request.sender_id, receiver_ids, |connection_id| {
751 self.peer
752 .forward_send(request.sender_id, connection_id, request.payload.clone())
753 });
754 Ok(())
755 }
756
757 async fn follow(
758 self: Arc<Self>,
759 request: TypedEnvelope<proto::Follow>,
760 ) -> Result<proto::FollowResponse> {
761 let leader_id = ConnectionId(request.payload.leader_id);
762 let follower_id = request.sender_id;
763 if !self
764 .state()
765 .await
766 .project_connection_ids(request.payload.project_id, follower_id)?
767 .contains(&leader_id)
768 {
769 Err(anyhow!("no such peer"))?;
770 }
771 let mut response = self
772 .peer
773 .forward_request(request.sender_id, leader_id, request.payload)
774 .await?;
775 response
776 .views
777 .retain(|view| view.leader_id != Some(follower_id.0));
778 Ok(response)
779 }
780
781 async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
782 let leader_id = ConnectionId(request.payload.leader_id);
783 if !self
784 .state()
785 .await
786 .project_connection_ids(request.payload.project_id, request.sender_id)?
787 .contains(&leader_id)
788 {
789 Err(anyhow!("no such peer"))?;
790 }
791 self.peer
792 .forward_send(request.sender_id, leader_id, request.payload)?;
793 Ok(())
794 }
795
796 async fn update_followers(
797 self: Arc<Self>,
798 request: TypedEnvelope<proto::UpdateFollowers>,
799 ) -> Result<()> {
800 let connection_ids = self
801 .state()
802 .await
803 .project_connection_ids(request.payload.project_id, request.sender_id)?;
804 let leader_id = request
805 .payload
806 .variant
807 .as_ref()
808 .and_then(|variant| match variant {
809 proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
810 proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
811 proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
812 });
813 for follower_id in &request.payload.follower_ids {
814 let follower_id = ConnectionId(*follower_id);
815 if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
816 self.peer
817 .forward_send(request.sender_id, follower_id, request.payload.clone())?;
818 }
819 }
820 Ok(())
821 }
822
823 async fn get_channels(
824 self: Arc<Server>,
825 request: TypedEnvelope<proto::GetChannels>,
826 ) -> Result<proto::GetChannelsResponse> {
827 let user_id = self
828 .state()
829 .await
830 .user_id_for_connection(request.sender_id)?;
831 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
832 Ok(proto::GetChannelsResponse {
833 channels: channels
834 .into_iter()
835 .map(|chan| proto::Channel {
836 id: chan.id.to_proto(),
837 name: chan.name,
838 })
839 .collect(),
840 })
841 }
842
843 async fn get_users(
844 self: Arc<Server>,
845 request: TypedEnvelope<proto::GetUsers>,
846 ) -> Result<proto::UsersResponse> {
847 let user_ids = request
848 .payload
849 .user_ids
850 .into_iter()
851 .map(UserId::from_proto)
852 .collect();
853 let users = self
854 .app_state
855 .db
856 .get_users_by_ids(user_ids)
857 .await?
858 .into_iter()
859 .map(|user| proto::User {
860 id: user.id.to_proto(),
861 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
862 github_login: user.github_login,
863 })
864 .collect();
865 Ok(proto::UsersResponse { users })
866 }
867
868 async fn fuzzy_search_users(
869 self: Arc<Server>,
870 request: TypedEnvelope<proto::FuzzySearchUsers>,
871 ) -> Result<proto::UsersResponse> {
872 let query = request.payload.query;
873 let db = &self.app_state.db;
874 let users = match query.len() {
875 0 => vec![],
876 1 | 2 => db
877 .get_user_by_github_login(&query)
878 .await?
879 .into_iter()
880 .collect(),
881 _ => db.fuzzy_search_users(&query, 10).await?,
882 };
883 let users = users
884 .into_iter()
885 .map(|user| proto::User {
886 id: user.id.to_proto(),
887 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
888 github_login: user.github_login,
889 })
890 .collect();
891 Ok(proto::UsersResponse { users })
892 }
893
894 #[instrument(skip(self, state, user_ids))]
895 fn update_contacts_for_users<'a>(
896 self: &Arc<Self>,
897 state: &Store,
898 user_ids: impl IntoIterator<Item = &'a UserId>,
899 ) {
900 for user_id in user_ids {
901 let contacts = state.contacts_for_user(*user_id);
902 for connection_id in state.connection_ids_for_user(*user_id) {
903 self.peer
904 .send(
905 connection_id,
906 proto::UpdateContacts {
907 contacts: contacts.clone(),
908 },
909 )
910 .trace_err();
911 }
912 }
913 }
914
915 async fn join_channel(
916 self: Arc<Self>,
917 request: TypedEnvelope<proto::JoinChannel>,
918 ) -> Result<proto::JoinChannelResponse> {
919 let user_id = self
920 .state()
921 .await
922 .user_id_for_connection(request.sender_id)?;
923 let channel_id = ChannelId::from_proto(request.payload.channel_id);
924 if !self
925 .app_state
926 .db
927 .can_user_access_channel(user_id, channel_id)
928 .await?
929 {
930 Err(anyhow!("access denied"))?;
931 }
932
933 self.state_mut()
934 .await
935 .join_channel(request.sender_id, channel_id);
936 let messages = self
937 .app_state
938 .db
939 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
940 .await?
941 .into_iter()
942 .map(|msg| proto::ChannelMessage {
943 id: msg.id.to_proto(),
944 body: msg.body,
945 timestamp: msg.sent_at.unix_timestamp() as u64,
946 sender_id: msg.sender_id.to_proto(),
947 nonce: Some(msg.nonce.as_u128().into()),
948 })
949 .collect::<Vec<_>>();
950 Ok(proto::JoinChannelResponse {
951 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
952 messages,
953 })
954 }
955
956 async fn leave_channel(
957 self: Arc<Self>,
958 request: TypedEnvelope<proto::LeaveChannel>,
959 ) -> Result<()> {
960 let user_id = self
961 .state()
962 .await
963 .user_id_for_connection(request.sender_id)?;
964 let channel_id = ChannelId::from_proto(request.payload.channel_id);
965 if !self
966 .app_state
967 .db
968 .can_user_access_channel(user_id, channel_id)
969 .await?
970 {
971 Err(anyhow!("access denied"))?;
972 }
973
974 self.state_mut()
975 .await
976 .leave_channel(request.sender_id, channel_id);
977
978 Ok(())
979 }
980
981 async fn send_channel_message(
982 self: Arc<Self>,
983 request: TypedEnvelope<proto::SendChannelMessage>,
984 ) -> Result<proto::SendChannelMessageResponse> {
985 let channel_id = ChannelId::from_proto(request.payload.channel_id);
986 let user_id;
987 let connection_ids;
988 {
989 let state = self.state().await;
990 user_id = state.user_id_for_connection(request.sender_id)?;
991 connection_ids = state.channel_connection_ids(channel_id)?;
992 }
993
994 // Validate the message body.
995 let body = request.payload.body.trim().to_string();
996 if body.len() > MAX_MESSAGE_LEN {
997 return Err(anyhow!("message is too long"))?;
998 }
999 if body.is_empty() {
1000 return Err(anyhow!("message can't be blank"))?;
1001 }
1002
1003 let timestamp = OffsetDateTime::now_utc();
1004 let nonce = request
1005 .payload
1006 .nonce
1007 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
1008
1009 let message_id = self
1010 .app_state
1011 .db
1012 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1013 .await?
1014 .to_proto();
1015 let message = proto::ChannelMessage {
1016 sender_id: user_id.to_proto(),
1017 id: message_id,
1018 body,
1019 timestamp: timestamp.unix_timestamp() as u64,
1020 nonce: Some(nonce),
1021 };
1022 broadcast(request.sender_id, connection_ids, |conn_id| {
1023 self.peer.send(
1024 conn_id,
1025 proto::ChannelMessageSent {
1026 channel_id: channel_id.to_proto(),
1027 message: Some(message.clone()),
1028 },
1029 )
1030 });
1031 Ok(proto::SendChannelMessageResponse {
1032 message: Some(message),
1033 })
1034 }
1035
1036 async fn get_channel_messages(
1037 self: Arc<Self>,
1038 request: TypedEnvelope<proto::GetChannelMessages>,
1039 ) -> Result<proto::GetChannelMessagesResponse> {
1040 let user_id = self
1041 .state()
1042 .await
1043 .user_id_for_connection(request.sender_id)?;
1044 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1045 if !self
1046 .app_state
1047 .db
1048 .can_user_access_channel(user_id, channel_id)
1049 .await?
1050 {
1051 Err(anyhow!("access denied"))?;
1052 }
1053
1054 let messages = self
1055 .app_state
1056 .db
1057 .get_channel_messages(
1058 channel_id,
1059 MESSAGE_COUNT_PER_PAGE,
1060 Some(MessageId::from_proto(request.payload.before_message_id)),
1061 )
1062 .await?
1063 .into_iter()
1064 .map(|msg| proto::ChannelMessage {
1065 id: msg.id.to_proto(),
1066 body: msg.body,
1067 timestamp: msg.sent_at.unix_timestamp() as u64,
1068 sender_id: msg.sender_id.to_proto(),
1069 nonce: Some(msg.nonce.as_u128().into()),
1070 })
1071 .collect::<Vec<_>>();
1072
1073 Ok(proto::GetChannelMessagesResponse {
1074 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1075 messages,
1076 })
1077 }
1078
1079 async fn state<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1080 #[cfg(test)]
1081 tokio::task::yield_now().await;
1082 let guard = self.store.read().await;
1083 #[cfg(test)]
1084 tokio::task::yield_now().await;
1085 StoreReadGuard {
1086 guard,
1087 _not_send: PhantomData,
1088 }
1089 }
1090
1091 async fn state_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1092 #[cfg(test)]
1093 tokio::task::yield_now().await;
1094 let guard = self.store.write().await;
1095 #[cfg(test)]
1096 tokio::task::yield_now().await;
1097 StoreWriteGuard {
1098 guard,
1099 _not_send: PhantomData,
1100 }
1101 }
1102}
1103
1104impl<'a> Deref for StoreReadGuard<'a> {
1105 type Target = Store;
1106
1107 fn deref(&self) -> &Self::Target {
1108 &*self.guard
1109 }
1110}
1111
1112impl<'a> Deref for StoreWriteGuard<'a> {
1113 type Target = Store;
1114
1115 fn deref(&self) -> &Self::Target {
1116 &*self.guard
1117 }
1118}
1119
1120impl<'a> DerefMut for StoreWriteGuard<'a> {
1121 fn deref_mut(&mut self) -> &mut Self::Target {
1122 &mut *self.guard
1123 }
1124}
1125
1126impl<'a> Drop for StoreWriteGuard<'a> {
1127 fn drop(&mut self) {
1128 #[cfg(test)]
1129 self.check_invariants();
1130
1131 let metrics = self.metrics();
1132 tracing::info!(
1133 connections = metrics.connections,
1134 registered_projects = metrics.registered_projects,
1135 shared_projects = metrics.shared_projects,
1136 "metrics"
1137 );
1138 }
1139}
1140
1141impl Executor for RealExecutor {
1142 type Sleep = Sleep;
1143
1144 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1145 tokio::task::spawn(future);
1146 }
1147
1148 fn sleep(&self, duration: Duration) -> Self::Sleep {
1149 tokio::time::sleep(duration)
1150 }
1151}
1152
1153#[instrument(skip(f))]
1154fn broadcast<F>(sender_id: ConnectionId, receiver_ids: Vec<ConnectionId>, mut f: F)
1155where
1156 F: FnMut(ConnectionId) -> anyhow::Result<()>,
1157{
1158 for receiver_id in receiver_ids {
1159 if receiver_id != sender_id {
1160 f(receiver_id).trace_err();
1161 }
1162 }
1163}
1164
1165lazy_static! {
1166 static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1167}
1168
1169pub struct ProtocolVersion(u32);
1170
1171impl Header for ProtocolVersion {
1172 fn name() -> &'static HeaderName {
1173 &ZED_PROTOCOL_VERSION
1174 }
1175
1176 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1177 where
1178 Self: Sized,
1179 I: Iterator<Item = &'i axum::http::HeaderValue>,
1180 {
1181 let version = values
1182 .next()
1183 .ok_or_else(|| axum::headers::Error::invalid())?
1184 .to_str()
1185 .map_err(|_| axum::headers::Error::invalid())?
1186 .parse()
1187 .map_err(|_| axum::headers::Error::invalid())?;
1188 Ok(Self(version))
1189 }
1190
1191 fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1192 values.extend([self.0.to_string().parse().unwrap()]);
1193 }
1194}
1195
1196pub fn routes(app_state: Arc<AppState>) -> Router<Body> {
1197 let server = Server::new(app_state.clone(), None);
1198 Router::new()
1199 .route("/rpc", get(handle_websocket_request))
1200 .layer(
1201 ServiceBuilder::new()
1202 .layer(Extension(app_state))
1203 .layer(middleware::from_fn(auth::validate_header))
1204 .layer(Extension(server)),
1205 )
1206}
1207
1208pub async fn handle_websocket_request(
1209 TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1210 ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1211 Extension(server): Extension<Arc<Server>>,
1212 Extension(user_id): Extension<UserId>,
1213 ws: WebSocketUpgrade,
1214) -> Response {
1215 if protocol_version != rpc::PROTOCOL_VERSION {
1216 return (
1217 StatusCode::UPGRADE_REQUIRED,
1218 "client must be upgraded".to_string(),
1219 )
1220 .into_response();
1221 }
1222 let socket_address = socket_address.to_string();
1223 ws.on_upgrade(move |socket| {
1224 let socket = socket
1225 .map_ok(to_tungstenite_message)
1226 .err_into()
1227 .with(|message| async move { Ok(to_axum_message(message)) });
1228 let connection = Connection::new(Box::pin(socket));
1229 server.handle_connection(connection, socket_address, user_id, None, RealExecutor)
1230 })
1231}
1232
1233fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1234 match message {
1235 TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1236 TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1237 TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1238 TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1239 TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1240 code: frame.code.into(),
1241 reason: frame.reason,
1242 })),
1243 }
1244}
1245
1246fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1247 match message {
1248 AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1249 AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1250 AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1251 AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1252 AxumMessage::Close(frame) => {
1253 TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1254 code: frame.code.into(),
1255 reason: frame.reason,
1256 }))
1257 }
1258 }
1259}
1260
1261pub trait ResultExt {
1262 type Ok;
1263
1264 fn trace_err(self) -> Option<Self::Ok>;
1265}
1266
1267impl<T, E> ResultExt for Result<T, E>
1268where
1269 E: std::fmt::Debug,
1270{
1271 type Ok = T;
1272
1273 fn trace_err(self) -> Option<T> {
1274 match self {
1275 Ok(value) => Some(value),
1276 Err(error) => {
1277 tracing::error!("{:?}", error);
1278 None
1279 }
1280 }
1281 }
1282}
1283
1284#[cfg(test)]
1285mod tests {
1286 use super::*;
1287 use crate::{
1288 db::{tests::TestDb, UserId},
1289 AppState,
1290 };
1291 use ::rpc::Peer;
1292 use client::{
1293 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1294 EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1295 };
1296 use collections::BTreeMap;
1297 use editor::{
1298 self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1299 ToOffset, ToggleCodeActions, Undo,
1300 };
1301 use gpui::{
1302 executor::{self, Deterministic},
1303 geometry::vector::vec2f,
1304 ModelHandle, TestAppContext, ViewHandle,
1305 };
1306 use language::{
1307 range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1308 LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1309 };
1310 use lsp::{self, FakeLanguageServer};
1311 use parking_lot::Mutex;
1312 use project::{
1313 fs::{FakeFs, Fs as _},
1314 search::SearchQuery,
1315 worktree::WorktreeHandle,
1316 DiagnosticSummary, Project, ProjectPath, WorktreeId,
1317 };
1318 use rand::prelude::*;
1319 use rpc::PeerId;
1320 use serde_json::json;
1321 use settings::Settings;
1322 use sqlx::types::time::OffsetDateTime;
1323 use std::{
1324 env,
1325 ops::Deref,
1326 path::{Path, PathBuf},
1327 rc::Rc,
1328 sync::{
1329 atomic::{AtomicBool, Ordering::SeqCst},
1330 Arc,
1331 },
1332 time::Duration,
1333 };
1334 use theme::ThemeRegistry;
1335 use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
1336
1337 #[cfg(test)]
1338 #[ctor::ctor]
1339 fn init_logger() {
1340 if std::env::var("RUST_LOG").is_ok() {
1341 env_logger::init();
1342 }
1343 }
1344
1345 #[gpui::test(iterations = 10)]
1346 async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1347 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1348 let lang_registry = Arc::new(LanguageRegistry::test());
1349 let fs = FakeFs::new(cx_a.background());
1350 cx_a.foreground().forbid_parking();
1351
1352 // Connect to a server as 2 clients.
1353 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1354 let client_a = server.create_client(cx_a, "user_a").await;
1355 let client_b = server.create_client(cx_b, "user_b").await;
1356
1357 // Share a project as client A
1358 fs.insert_tree(
1359 "/a",
1360 json!({
1361 ".zed.toml": r#"collaborators = ["user_b"]"#,
1362 "a.txt": "a-contents",
1363 "b.txt": "b-contents",
1364 }),
1365 )
1366 .await;
1367 let project_a = cx_a.update(|cx| {
1368 Project::local(
1369 client_a.clone(),
1370 client_a.user_store.clone(),
1371 lang_registry.clone(),
1372 fs.clone(),
1373 cx,
1374 )
1375 });
1376 let (worktree_a, _) = project_a
1377 .update(cx_a, |p, cx| {
1378 p.find_or_create_local_worktree("/a", true, cx)
1379 })
1380 .await
1381 .unwrap();
1382 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1383 worktree_a
1384 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1385 .await;
1386 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1387 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1388
1389 // Join that project as client B
1390 let project_b = Project::remote(
1391 project_id,
1392 client_b.clone(),
1393 client_b.user_store.clone(),
1394 lang_registry.clone(),
1395 fs.clone(),
1396 &mut cx_b.to_async(),
1397 )
1398 .await
1399 .unwrap();
1400
1401 let replica_id_b = project_b.read_with(cx_b, |project, _| {
1402 assert_eq!(
1403 project
1404 .collaborators()
1405 .get(&client_a.peer_id)
1406 .unwrap()
1407 .user
1408 .github_login,
1409 "user_a"
1410 );
1411 project.replica_id()
1412 });
1413 project_a
1414 .condition(&cx_a, |tree, _| {
1415 tree.collaborators()
1416 .get(&client_b.peer_id)
1417 .map_or(false, |collaborator| {
1418 collaborator.replica_id == replica_id_b
1419 && collaborator.user.github_login == "user_b"
1420 })
1421 })
1422 .await;
1423
1424 // Open the same file as client B and client A.
1425 let buffer_b = project_b
1426 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1427 .await
1428 .unwrap();
1429 buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1430 project_a.read_with(cx_a, |project, cx| {
1431 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1432 });
1433 let buffer_a = project_a
1434 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1435 .await
1436 .unwrap();
1437
1438 let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1439
1440 // TODO
1441 // // Create a selection set as client B and see that selection set as client A.
1442 // buffer_a
1443 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1444 // .await;
1445
1446 // Edit the buffer as client B and see that edit as client A.
1447 editor_b.update(cx_b, |editor, cx| {
1448 editor.handle_input(&Input("ok, ".into()), cx)
1449 });
1450 buffer_a
1451 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1452 .await;
1453
1454 // TODO
1455 // // Remove the selection set as client B, see those selections disappear as client A.
1456 cx_b.update(move |_| drop(editor_b));
1457 // buffer_a
1458 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1459 // .await;
1460
1461 // Dropping the client B's project removes client B from client A's collaborators.
1462 cx_b.update(move |_| drop(project_b));
1463 project_a
1464 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1465 .await;
1466 }
1467
1468 #[gpui::test(iterations = 10)]
1469 async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1470 let lang_registry = Arc::new(LanguageRegistry::test());
1471 let fs = FakeFs::new(cx_a.background());
1472 cx_a.foreground().forbid_parking();
1473
1474 // Connect to a server as 2 clients.
1475 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1476 let client_a = server.create_client(cx_a, "user_a").await;
1477 let client_b = server.create_client(cx_b, "user_b").await;
1478
1479 // Share a project as client A
1480 fs.insert_tree(
1481 "/a",
1482 json!({
1483 ".zed.toml": r#"collaborators = ["user_b"]"#,
1484 "a.txt": "a-contents",
1485 "b.txt": "b-contents",
1486 }),
1487 )
1488 .await;
1489 let project_a = cx_a.update(|cx| {
1490 Project::local(
1491 client_a.clone(),
1492 client_a.user_store.clone(),
1493 lang_registry.clone(),
1494 fs.clone(),
1495 cx,
1496 )
1497 });
1498 let (worktree_a, _) = project_a
1499 .update(cx_a, |p, cx| {
1500 p.find_or_create_local_worktree("/a", true, cx)
1501 })
1502 .await
1503 .unwrap();
1504 worktree_a
1505 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1506 .await;
1507 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1508 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1509 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1510 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1511
1512 // Join that project as client B
1513 let project_b = Project::remote(
1514 project_id,
1515 client_b.clone(),
1516 client_b.user_store.clone(),
1517 lang_registry.clone(),
1518 fs.clone(),
1519 &mut cx_b.to_async(),
1520 )
1521 .await
1522 .unwrap();
1523 project_b
1524 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1525 .await
1526 .unwrap();
1527
1528 // Unshare the project as client A
1529 project_a.update(cx_a, |project, cx| project.unshare(cx));
1530 project_b
1531 .condition(cx_b, |project, _| project.is_read_only())
1532 .await;
1533 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1534 cx_b.update(|_| {
1535 drop(project_b);
1536 });
1537
1538 // Share the project again and ensure guests can still join.
1539 project_a
1540 .update(cx_a, |project, cx| project.share(cx))
1541 .await
1542 .unwrap();
1543 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1544
1545 let project_b2 = Project::remote(
1546 project_id,
1547 client_b.clone(),
1548 client_b.user_store.clone(),
1549 lang_registry.clone(),
1550 fs.clone(),
1551 &mut cx_b.to_async(),
1552 )
1553 .await
1554 .unwrap();
1555 project_b2
1556 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1557 .await
1558 .unwrap();
1559 }
1560
1561 #[gpui::test(iterations = 10)]
1562 async fn test_host_disconnect(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1563 let lang_registry = Arc::new(LanguageRegistry::test());
1564 let fs = FakeFs::new(cx_a.background());
1565 cx_a.foreground().forbid_parking();
1566
1567 // Connect to a server as 2 clients.
1568 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1569 let client_a = server.create_client(cx_a, "user_a").await;
1570 let client_b = server.create_client(cx_b, "user_b").await;
1571
1572 // Share a project as client A
1573 fs.insert_tree(
1574 "/a",
1575 json!({
1576 ".zed.toml": r#"collaborators = ["user_b"]"#,
1577 "a.txt": "a-contents",
1578 "b.txt": "b-contents",
1579 }),
1580 )
1581 .await;
1582 let project_a = cx_a.update(|cx| {
1583 Project::local(
1584 client_a.clone(),
1585 client_a.user_store.clone(),
1586 lang_registry.clone(),
1587 fs.clone(),
1588 cx,
1589 )
1590 });
1591 let (worktree_a, _) = project_a
1592 .update(cx_a, |p, cx| {
1593 p.find_or_create_local_worktree("/a", true, cx)
1594 })
1595 .await
1596 .unwrap();
1597 worktree_a
1598 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1599 .await;
1600 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1601 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1602 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1603 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1604
1605 // Join that project as client B
1606 let project_b = Project::remote(
1607 project_id,
1608 client_b.clone(),
1609 client_b.user_store.clone(),
1610 lang_registry.clone(),
1611 fs.clone(),
1612 &mut cx_b.to_async(),
1613 )
1614 .await
1615 .unwrap();
1616 project_b
1617 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1618 .await
1619 .unwrap();
1620
1621 // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
1622 server.disconnect_client(client_a.current_user_id(cx_a));
1623 cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
1624 project_a
1625 .condition(cx_a, |project, _| project.collaborators().is_empty())
1626 .await;
1627 project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
1628 project_b
1629 .condition(cx_b, |project, _| project.is_read_only())
1630 .await;
1631 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1632 cx_b.update(|_| {
1633 drop(project_b);
1634 });
1635
1636 // Await reconnection
1637 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1638
1639 // Share the project again and ensure guests can still join.
1640 project_a
1641 .update(cx_a, |project, cx| project.share(cx))
1642 .await
1643 .unwrap();
1644 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1645
1646 let project_b2 = Project::remote(
1647 project_id,
1648 client_b.clone(),
1649 client_b.user_store.clone(),
1650 lang_registry.clone(),
1651 fs.clone(),
1652 &mut cx_b.to_async(),
1653 )
1654 .await
1655 .unwrap();
1656 project_b2
1657 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1658 .await
1659 .unwrap();
1660 }
1661
1662 #[gpui::test(iterations = 10)]
1663 async fn test_propagate_saves_and_fs_changes(
1664 cx_a: &mut TestAppContext,
1665 cx_b: &mut TestAppContext,
1666 cx_c: &mut TestAppContext,
1667 ) {
1668 let lang_registry = Arc::new(LanguageRegistry::test());
1669 let fs = FakeFs::new(cx_a.background());
1670 cx_a.foreground().forbid_parking();
1671
1672 // Connect to a server as 3 clients.
1673 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1674 let client_a = server.create_client(cx_a, "user_a").await;
1675 let client_b = server.create_client(cx_b, "user_b").await;
1676 let client_c = server.create_client(cx_c, "user_c").await;
1677
1678 // Share a worktree as client A.
1679 fs.insert_tree(
1680 "/a",
1681 json!({
1682 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1683 "file1": "",
1684 "file2": ""
1685 }),
1686 )
1687 .await;
1688 let project_a = cx_a.update(|cx| {
1689 Project::local(
1690 client_a.clone(),
1691 client_a.user_store.clone(),
1692 lang_registry.clone(),
1693 fs.clone(),
1694 cx,
1695 )
1696 });
1697 let (worktree_a, _) = project_a
1698 .update(cx_a, |p, cx| {
1699 p.find_or_create_local_worktree("/a", true, cx)
1700 })
1701 .await
1702 .unwrap();
1703 worktree_a
1704 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1705 .await;
1706 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1707 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1708 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1709
1710 // Join that worktree as clients B and C.
1711 let project_b = Project::remote(
1712 project_id,
1713 client_b.clone(),
1714 client_b.user_store.clone(),
1715 lang_registry.clone(),
1716 fs.clone(),
1717 &mut cx_b.to_async(),
1718 )
1719 .await
1720 .unwrap();
1721 let project_c = Project::remote(
1722 project_id,
1723 client_c.clone(),
1724 client_c.user_store.clone(),
1725 lang_registry.clone(),
1726 fs.clone(),
1727 &mut cx_c.to_async(),
1728 )
1729 .await
1730 .unwrap();
1731 let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1732 let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1733
1734 // Open and edit a buffer as both guests B and C.
1735 let buffer_b = project_b
1736 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1737 .await
1738 .unwrap();
1739 let buffer_c = project_c
1740 .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1741 .await
1742 .unwrap();
1743 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "i-am-b, ")], cx));
1744 buffer_c.update(cx_c, |buf, cx| buf.edit([(0..0, "i-am-c, ")], cx));
1745
1746 // Open and edit that buffer as the host.
1747 let buffer_a = project_a
1748 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1749 .await
1750 .unwrap();
1751
1752 buffer_a
1753 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1754 .await;
1755 buffer_a.update(cx_a, |buf, cx| {
1756 buf.edit([(buf.len()..buf.len(), "i-am-a")], cx)
1757 });
1758
1759 // Wait for edits to propagate
1760 buffer_a
1761 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1762 .await;
1763 buffer_b
1764 .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1765 .await;
1766 buffer_c
1767 .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1768 .await;
1769
1770 // Edit the buffer as the host and concurrently save as guest B.
1771 let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1772 buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "hi-a, ")], cx));
1773 save_b.await.unwrap();
1774 assert_eq!(
1775 fs.load("/a/file1".as_ref()).await.unwrap(),
1776 "hi-a, i-am-c, i-am-b, i-am-a"
1777 );
1778 buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1779 buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1780 buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1781
1782 worktree_a.flush_fs_events(cx_a).await;
1783
1784 // Make changes on host's file system, see those changes on guest worktrees.
1785 fs.rename(
1786 "/a/file1".as_ref(),
1787 "/a/file1-renamed".as_ref(),
1788 Default::default(),
1789 )
1790 .await
1791 .unwrap();
1792
1793 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1794 .await
1795 .unwrap();
1796 fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1797
1798 worktree_a
1799 .condition(&cx_a, |tree, _| {
1800 tree.paths()
1801 .map(|p| p.to_string_lossy())
1802 .collect::<Vec<_>>()
1803 == [".zed.toml", "file1-renamed", "file3", "file4"]
1804 })
1805 .await;
1806 worktree_b
1807 .condition(&cx_b, |tree, _| {
1808 tree.paths()
1809 .map(|p| p.to_string_lossy())
1810 .collect::<Vec<_>>()
1811 == [".zed.toml", "file1-renamed", "file3", "file4"]
1812 })
1813 .await;
1814 worktree_c
1815 .condition(&cx_c, |tree, _| {
1816 tree.paths()
1817 .map(|p| p.to_string_lossy())
1818 .collect::<Vec<_>>()
1819 == [".zed.toml", "file1-renamed", "file3", "file4"]
1820 })
1821 .await;
1822
1823 // Ensure buffer files are updated as well.
1824 buffer_a
1825 .condition(&cx_a, |buf, _| {
1826 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1827 })
1828 .await;
1829 buffer_b
1830 .condition(&cx_b, |buf, _| {
1831 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1832 })
1833 .await;
1834 buffer_c
1835 .condition(&cx_c, |buf, _| {
1836 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1837 })
1838 .await;
1839 }
1840
1841 #[gpui::test(iterations = 10)]
1842 async fn test_fs_operations(
1843 executor: Arc<Deterministic>,
1844 cx_a: &mut TestAppContext,
1845 cx_b: &mut TestAppContext,
1846 ) {
1847 executor.forbid_parking();
1848 let fs = FakeFs::new(cx_a.background());
1849
1850 // Connect to a server as 2 clients.
1851 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1852 let mut client_a = server.create_client(cx_a, "user_a").await;
1853 let mut client_b = server.create_client(cx_b, "user_b").await;
1854
1855 // Share a project as client A
1856 fs.insert_tree(
1857 "/dir",
1858 json!({
1859 ".zed.toml": r#"collaborators = ["user_b"]"#,
1860 "a.txt": "a-contents",
1861 "b.txt": "b-contents",
1862 }),
1863 )
1864 .await;
1865
1866 let (project_a, worktree_id) = client_a.build_local_project(fs, "/dir", cx_a).await;
1867 let project_id = project_a.read_with(cx_a, |project, _| project.remote_id().unwrap());
1868 project_a
1869 .update(cx_a, |project, cx| project.share(cx))
1870 .await
1871 .unwrap();
1872
1873 let project_b = client_b.build_remote_project(project_id, cx_b).await;
1874
1875 let worktree_a =
1876 project_a.read_with(cx_a, |project, cx| project.worktrees(cx).next().unwrap());
1877 let worktree_b =
1878 project_b.read_with(cx_b, |project, cx| project.worktrees(cx).next().unwrap());
1879
1880 let entry = project_b
1881 .update(cx_b, |project, cx| {
1882 project
1883 .create_entry((worktree_id, "c.txt"), false, cx)
1884 .unwrap()
1885 })
1886 .await
1887 .unwrap();
1888 worktree_a.read_with(cx_a, |worktree, _| {
1889 assert_eq!(
1890 worktree
1891 .paths()
1892 .map(|p| p.to_string_lossy())
1893 .collect::<Vec<_>>(),
1894 [".zed.toml", "a.txt", "b.txt", "c.txt"]
1895 );
1896 });
1897 worktree_b.read_with(cx_b, |worktree, _| {
1898 assert_eq!(
1899 worktree
1900 .paths()
1901 .map(|p| p.to_string_lossy())
1902 .collect::<Vec<_>>(),
1903 [".zed.toml", "a.txt", "b.txt", "c.txt"]
1904 );
1905 });
1906
1907 project_b
1908 .update(cx_b, |project, cx| {
1909 project.rename_entry(entry.id, Path::new("d.txt"), cx)
1910 })
1911 .unwrap()
1912 .await
1913 .unwrap();
1914 worktree_a.read_with(cx_a, |worktree, _| {
1915 assert_eq!(
1916 worktree
1917 .paths()
1918 .map(|p| p.to_string_lossy())
1919 .collect::<Vec<_>>(),
1920 [".zed.toml", "a.txt", "b.txt", "d.txt"]
1921 );
1922 });
1923 worktree_b.read_with(cx_b, |worktree, _| {
1924 assert_eq!(
1925 worktree
1926 .paths()
1927 .map(|p| p.to_string_lossy())
1928 .collect::<Vec<_>>(),
1929 [".zed.toml", "a.txt", "b.txt", "d.txt"]
1930 );
1931 });
1932
1933 let dir_entry = project_b
1934 .update(cx_b, |project, cx| {
1935 project
1936 .create_entry((worktree_id, "DIR"), true, cx)
1937 .unwrap()
1938 })
1939 .await
1940 .unwrap();
1941 worktree_a.read_with(cx_a, |worktree, _| {
1942 assert_eq!(
1943 worktree
1944 .paths()
1945 .map(|p| p.to_string_lossy())
1946 .collect::<Vec<_>>(),
1947 [".zed.toml", "DIR", "a.txt", "b.txt", "d.txt"]
1948 );
1949 });
1950 worktree_b.read_with(cx_b, |worktree, _| {
1951 assert_eq!(
1952 worktree
1953 .paths()
1954 .map(|p| p.to_string_lossy())
1955 .collect::<Vec<_>>(),
1956 [".zed.toml", "DIR", "a.txt", "b.txt", "d.txt"]
1957 );
1958 });
1959
1960 project_b
1961 .update(cx_b, |project, cx| {
1962 project.delete_entry(dir_entry.id, cx).unwrap()
1963 })
1964 .await
1965 .unwrap();
1966 worktree_a.read_with(cx_a, |worktree, _| {
1967 assert_eq!(
1968 worktree
1969 .paths()
1970 .map(|p| p.to_string_lossy())
1971 .collect::<Vec<_>>(),
1972 [".zed.toml", "a.txt", "b.txt", "d.txt"]
1973 );
1974 });
1975 worktree_b.read_with(cx_b, |worktree, _| {
1976 assert_eq!(
1977 worktree
1978 .paths()
1979 .map(|p| p.to_string_lossy())
1980 .collect::<Vec<_>>(),
1981 [".zed.toml", "a.txt", "b.txt", "d.txt"]
1982 );
1983 });
1984
1985 project_b
1986 .update(cx_b, |project, cx| {
1987 project.delete_entry(entry.id, cx).unwrap()
1988 })
1989 .await
1990 .unwrap();
1991 worktree_a.read_with(cx_a, |worktree, _| {
1992 assert_eq!(
1993 worktree
1994 .paths()
1995 .map(|p| p.to_string_lossy())
1996 .collect::<Vec<_>>(),
1997 [".zed.toml", "a.txt", "b.txt"]
1998 );
1999 });
2000 worktree_b.read_with(cx_b, |worktree, _| {
2001 assert_eq!(
2002 worktree
2003 .paths()
2004 .map(|p| p.to_string_lossy())
2005 .collect::<Vec<_>>(),
2006 [".zed.toml", "a.txt", "b.txt"]
2007 );
2008 });
2009 }
2010
2011 #[gpui::test(iterations = 10)]
2012 async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2013 cx_a.foreground().forbid_parking();
2014 let lang_registry = Arc::new(LanguageRegistry::test());
2015 let fs = FakeFs::new(cx_a.background());
2016
2017 // Connect to a server as 2 clients.
2018 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2019 let client_a = server.create_client(cx_a, "user_a").await;
2020 let client_b = server.create_client(cx_b, "user_b").await;
2021
2022 // Share a project as client A
2023 fs.insert_tree(
2024 "/dir",
2025 json!({
2026 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2027 "a.txt": "a-contents",
2028 }),
2029 )
2030 .await;
2031
2032 let project_a = cx_a.update(|cx| {
2033 Project::local(
2034 client_a.clone(),
2035 client_a.user_store.clone(),
2036 lang_registry.clone(),
2037 fs.clone(),
2038 cx,
2039 )
2040 });
2041 let (worktree_a, _) = project_a
2042 .update(cx_a, |p, cx| {
2043 p.find_or_create_local_worktree("/dir", true, cx)
2044 })
2045 .await
2046 .unwrap();
2047 worktree_a
2048 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2049 .await;
2050 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2051 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2052 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2053
2054 // Join that project as client B
2055 let project_b = Project::remote(
2056 project_id,
2057 client_b.clone(),
2058 client_b.user_store.clone(),
2059 lang_registry.clone(),
2060 fs.clone(),
2061 &mut cx_b.to_async(),
2062 )
2063 .await
2064 .unwrap();
2065
2066 // Open a buffer as client B
2067 let buffer_b = project_b
2068 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2069 .await
2070 .unwrap();
2071
2072 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "world ")], cx));
2073 buffer_b.read_with(cx_b, |buf, _| {
2074 assert!(buf.is_dirty());
2075 assert!(!buf.has_conflict());
2076 });
2077
2078 buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
2079 buffer_b
2080 .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
2081 .await;
2082 buffer_b.read_with(cx_b, |buf, _| {
2083 assert!(!buf.has_conflict());
2084 });
2085
2086 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "hello ")], cx));
2087 buffer_b.read_with(cx_b, |buf, _| {
2088 assert!(buf.is_dirty());
2089 assert!(!buf.has_conflict());
2090 });
2091 }
2092
2093 #[gpui::test(iterations = 10)]
2094 async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2095 cx_a.foreground().forbid_parking();
2096 let lang_registry = Arc::new(LanguageRegistry::test());
2097 let fs = FakeFs::new(cx_a.background());
2098
2099 // Connect to a server as 2 clients.
2100 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2101 let client_a = server.create_client(cx_a, "user_a").await;
2102 let client_b = server.create_client(cx_b, "user_b").await;
2103
2104 // Share a project as client A
2105 fs.insert_tree(
2106 "/dir",
2107 json!({
2108 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2109 "a.txt": "a-contents",
2110 }),
2111 )
2112 .await;
2113
2114 let project_a = cx_a.update(|cx| {
2115 Project::local(
2116 client_a.clone(),
2117 client_a.user_store.clone(),
2118 lang_registry.clone(),
2119 fs.clone(),
2120 cx,
2121 )
2122 });
2123 let (worktree_a, _) = project_a
2124 .update(cx_a, |p, cx| {
2125 p.find_or_create_local_worktree("/dir", true, cx)
2126 })
2127 .await
2128 .unwrap();
2129 worktree_a
2130 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2131 .await;
2132 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2133 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2134 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2135
2136 // Join that project as client B
2137 let project_b = Project::remote(
2138 project_id,
2139 client_b.clone(),
2140 client_b.user_store.clone(),
2141 lang_registry.clone(),
2142 fs.clone(),
2143 &mut cx_b.to_async(),
2144 )
2145 .await
2146 .unwrap();
2147 let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2148
2149 // Open a buffer as client B
2150 let buffer_b = project_b
2151 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2152 .await
2153 .unwrap();
2154 buffer_b.read_with(cx_b, |buf, _| {
2155 assert!(!buf.is_dirty());
2156 assert!(!buf.has_conflict());
2157 });
2158
2159 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
2160 .await
2161 .unwrap();
2162 buffer_b
2163 .condition(&cx_b, |buf, _| {
2164 buf.text() == "new contents" && !buf.is_dirty()
2165 })
2166 .await;
2167 buffer_b.read_with(cx_b, |buf, _| {
2168 assert!(!buf.has_conflict());
2169 });
2170 }
2171
2172 #[gpui::test(iterations = 10)]
2173 async fn test_editing_while_guest_opens_buffer(
2174 cx_a: &mut TestAppContext,
2175 cx_b: &mut TestAppContext,
2176 ) {
2177 cx_a.foreground().forbid_parking();
2178 let lang_registry = Arc::new(LanguageRegistry::test());
2179 let fs = FakeFs::new(cx_a.background());
2180
2181 // Connect to a server as 2 clients.
2182 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2183 let client_a = server.create_client(cx_a, "user_a").await;
2184 let client_b = server.create_client(cx_b, "user_b").await;
2185
2186 // Share a project as client A
2187 fs.insert_tree(
2188 "/dir",
2189 json!({
2190 ".zed.toml": r#"collaborators = ["user_b"]"#,
2191 "a.txt": "a-contents",
2192 }),
2193 )
2194 .await;
2195 let project_a = cx_a.update(|cx| {
2196 Project::local(
2197 client_a.clone(),
2198 client_a.user_store.clone(),
2199 lang_registry.clone(),
2200 fs.clone(),
2201 cx,
2202 )
2203 });
2204 let (worktree_a, _) = project_a
2205 .update(cx_a, |p, cx| {
2206 p.find_or_create_local_worktree("/dir", true, cx)
2207 })
2208 .await
2209 .unwrap();
2210 worktree_a
2211 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2212 .await;
2213 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2214 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2215 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2216
2217 // Join that project as client B
2218 let project_b = Project::remote(
2219 project_id,
2220 client_b.clone(),
2221 client_b.user_store.clone(),
2222 lang_registry.clone(),
2223 fs.clone(),
2224 &mut cx_b.to_async(),
2225 )
2226 .await
2227 .unwrap();
2228
2229 // Open a buffer as client A
2230 let buffer_a = project_a
2231 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2232 .await
2233 .unwrap();
2234
2235 // Start opening the same buffer as client B
2236 let buffer_b = cx_b
2237 .background()
2238 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2239
2240 // Edit the buffer as client A while client B is still opening it.
2241 cx_b.background().simulate_random_delay().await;
2242 buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "X")], cx));
2243 cx_b.background().simulate_random_delay().await;
2244 buffer_a.update(cx_a, |buf, cx| buf.edit([(1..1, "Y")], cx));
2245
2246 let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2247 let buffer_b = buffer_b.await.unwrap();
2248 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2249 }
2250
2251 #[gpui::test(iterations = 10)]
2252 async fn test_leaving_worktree_while_opening_buffer(
2253 cx_a: &mut TestAppContext,
2254 cx_b: &mut TestAppContext,
2255 ) {
2256 cx_a.foreground().forbid_parking();
2257 let lang_registry = Arc::new(LanguageRegistry::test());
2258 let fs = FakeFs::new(cx_a.background());
2259
2260 // Connect to a server as 2 clients.
2261 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2262 let client_a = server.create_client(cx_a, "user_a").await;
2263 let client_b = server.create_client(cx_b, "user_b").await;
2264
2265 // Share a project as client A
2266 fs.insert_tree(
2267 "/dir",
2268 json!({
2269 ".zed.toml": r#"collaborators = ["user_b"]"#,
2270 "a.txt": "a-contents",
2271 }),
2272 )
2273 .await;
2274 let project_a = cx_a.update(|cx| {
2275 Project::local(
2276 client_a.clone(),
2277 client_a.user_store.clone(),
2278 lang_registry.clone(),
2279 fs.clone(),
2280 cx,
2281 )
2282 });
2283 let (worktree_a, _) = project_a
2284 .update(cx_a, |p, cx| {
2285 p.find_or_create_local_worktree("/dir", true, cx)
2286 })
2287 .await
2288 .unwrap();
2289 worktree_a
2290 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2291 .await;
2292 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2293 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2294 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2295
2296 // Join that project as client B
2297 let project_b = Project::remote(
2298 project_id,
2299 client_b.clone(),
2300 client_b.user_store.clone(),
2301 lang_registry.clone(),
2302 fs.clone(),
2303 &mut cx_b.to_async(),
2304 )
2305 .await
2306 .unwrap();
2307
2308 // See that a guest has joined as client A.
2309 project_a
2310 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2311 .await;
2312
2313 // Begin opening a buffer as client B, but leave the project before the open completes.
2314 let buffer_b = cx_b
2315 .background()
2316 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2317 cx_b.update(|_| drop(project_b));
2318 drop(buffer_b);
2319
2320 // See that the guest has left.
2321 project_a
2322 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2323 .await;
2324 }
2325
2326 #[gpui::test(iterations = 10)]
2327 async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2328 cx_a.foreground().forbid_parking();
2329 let lang_registry = Arc::new(LanguageRegistry::test());
2330 let fs = FakeFs::new(cx_a.background());
2331
2332 // Connect to a server as 2 clients.
2333 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2334 let client_a = server.create_client(cx_a, "user_a").await;
2335 let client_b = server.create_client(cx_b, "user_b").await;
2336
2337 // Share a project as client A
2338 fs.insert_tree(
2339 "/a",
2340 json!({
2341 ".zed.toml": r#"collaborators = ["user_b"]"#,
2342 "a.txt": "a-contents",
2343 "b.txt": "b-contents",
2344 }),
2345 )
2346 .await;
2347 let project_a = cx_a.update(|cx| {
2348 Project::local(
2349 client_a.clone(),
2350 client_a.user_store.clone(),
2351 lang_registry.clone(),
2352 fs.clone(),
2353 cx,
2354 )
2355 });
2356 let (worktree_a, _) = project_a
2357 .update(cx_a, |p, cx| {
2358 p.find_or_create_local_worktree("/a", true, cx)
2359 })
2360 .await
2361 .unwrap();
2362 worktree_a
2363 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2364 .await;
2365 let project_id = project_a
2366 .update(cx_a, |project, _| project.next_remote_id())
2367 .await;
2368 project_a
2369 .update(cx_a, |project, cx| project.share(cx))
2370 .await
2371 .unwrap();
2372
2373 // Join that project as client B
2374 let _project_b = Project::remote(
2375 project_id,
2376 client_b.clone(),
2377 client_b.user_store.clone(),
2378 lang_registry.clone(),
2379 fs.clone(),
2380 &mut cx_b.to_async(),
2381 )
2382 .await
2383 .unwrap();
2384
2385 // Client A sees that a guest has joined.
2386 project_a
2387 .condition(cx_a, |p, _| p.collaborators().len() == 1)
2388 .await;
2389
2390 // Drop client B's connection and ensure client A observes client B leaving the project.
2391 client_b.disconnect(&cx_b.to_async()).unwrap();
2392 project_a
2393 .condition(cx_a, |p, _| p.collaborators().len() == 0)
2394 .await;
2395
2396 // Rejoin the project as client B
2397 let _project_b = Project::remote(
2398 project_id,
2399 client_b.clone(),
2400 client_b.user_store.clone(),
2401 lang_registry.clone(),
2402 fs.clone(),
2403 &mut cx_b.to_async(),
2404 )
2405 .await
2406 .unwrap();
2407
2408 // Client A sees that a guest has re-joined.
2409 project_a
2410 .condition(cx_a, |p, _| p.collaborators().len() == 1)
2411 .await;
2412
2413 // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2414 client_b.wait_for_current_user(cx_b).await;
2415 server.disconnect_client(client_b.current_user_id(cx_b));
2416 cx_a.foreground().advance_clock(Duration::from_secs(3));
2417 project_a
2418 .condition(cx_a, |p, _| p.collaborators().len() == 0)
2419 .await;
2420 }
2421
2422 #[gpui::test(iterations = 10)]
2423 async fn test_collaborating_with_diagnostics(
2424 cx_a: &mut TestAppContext,
2425 cx_b: &mut TestAppContext,
2426 ) {
2427 cx_a.foreground().forbid_parking();
2428 let lang_registry = Arc::new(LanguageRegistry::test());
2429 let fs = FakeFs::new(cx_a.background());
2430
2431 // Set up a fake language server.
2432 let mut language = Language::new(
2433 LanguageConfig {
2434 name: "Rust".into(),
2435 path_suffixes: vec!["rs".to_string()],
2436 ..Default::default()
2437 },
2438 Some(tree_sitter_rust::language()),
2439 );
2440 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2441 lang_registry.add(Arc::new(language));
2442
2443 // Connect to a server as 2 clients.
2444 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2445 let client_a = server.create_client(cx_a, "user_a").await;
2446 let client_b = server.create_client(cx_b, "user_b").await;
2447
2448 // Share a project as client A
2449 fs.insert_tree(
2450 "/a",
2451 json!({
2452 ".zed.toml": r#"collaborators = ["user_b"]"#,
2453 "a.rs": "let one = two",
2454 "other.rs": "",
2455 }),
2456 )
2457 .await;
2458 let project_a = cx_a.update(|cx| {
2459 Project::local(
2460 client_a.clone(),
2461 client_a.user_store.clone(),
2462 lang_registry.clone(),
2463 fs.clone(),
2464 cx,
2465 )
2466 });
2467 let (worktree_a, _) = project_a
2468 .update(cx_a, |p, cx| {
2469 p.find_or_create_local_worktree("/a", true, cx)
2470 })
2471 .await
2472 .unwrap();
2473 worktree_a
2474 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2475 .await;
2476 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2477 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2478 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2479
2480 // Cause the language server to start.
2481 let _ = cx_a
2482 .background()
2483 .spawn(project_a.update(cx_a, |project, cx| {
2484 project.open_buffer(
2485 ProjectPath {
2486 worktree_id,
2487 path: Path::new("other.rs").into(),
2488 },
2489 cx,
2490 )
2491 }))
2492 .await
2493 .unwrap();
2494
2495 // Simulate a language server reporting errors for a file.
2496 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2497 fake_language_server
2498 .receive_notification::<lsp::notification::DidOpenTextDocument>()
2499 .await;
2500 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2501 lsp::PublishDiagnosticsParams {
2502 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2503 version: None,
2504 diagnostics: vec![lsp::Diagnostic {
2505 severity: Some(lsp::DiagnosticSeverity::ERROR),
2506 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2507 message: "message 1".to_string(),
2508 ..Default::default()
2509 }],
2510 },
2511 );
2512
2513 // Wait for server to see the diagnostics update.
2514 server
2515 .condition(|store| {
2516 let worktree = store
2517 .project(project_id)
2518 .unwrap()
2519 .share
2520 .as_ref()
2521 .unwrap()
2522 .worktrees
2523 .get(&worktree_id.to_proto())
2524 .unwrap();
2525
2526 !worktree.diagnostic_summaries.is_empty()
2527 })
2528 .await;
2529
2530 // Join the worktree as client B.
2531 let project_b = Project::remote(
2532 project_id,
2533 client_b.clone(),
2534 client_b.user_store.clone(),
2535 lang_registry.clone(),
2536 fs.clone(),
2537 &mut cx_b.to_async(),
2538 )
2539 .await
2540 .unwrap();
2541
2542 project_b.read_with(cx_b, |project, cx| {
2543 assert_eq!(
2544 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2545 &[(
2546 ProjectPath {
2547 worktree_id,
2548 path: Arc::from(Path::new("a.rs")),
2549 },
2550 DiagnosticSummary {
2551 error_count: 1,
2552 warning_count: 0,
2553 ..Default::default()
2554 },
2555 )]
2556 )
2557 });
2558
2559 // Simulate a language server reporting more errors for a file.
2560 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2561 lsp::PublishDiagnosticsParams {
2562 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2563 version: None,
2564 diagnostics: vec![
2565 lsp::Diagnostic {
2566 severity: Some(lsp::DiagnosticSeverity::ERROR),
2567 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2568 message: "message 1".to_string(),
2569 ..Default::default()
2570 },
2571 lsp::Diagnostic {
2572 severity: Some(lsp::DiagnosticSeverity::WARNING),
2573 range: lsp::Range::new(
2574 lsp::Position::new(0, 10),
2575 lsp::Position::new(0, 13),
2576 ),
2577 message: "message 2".to_string(),
2578 ..Default::default()
2579 },
2580 ],
2581 },
2582 );
2583
2584 // Client b gets the updated summaries
2585 project_b
2586 .condition(&cx_b, |project, cx| {
2587 project.diagnostic_summaries(cx).collect::<Vec<_>>()
2588 == &[(
2589 ProjectPath {
2590 worktree_id,
2591 path: Arc::from(Path::new("a.rs")),
2592 },
2593 DiagnosticSummary {
2594 error_count: 1,
2595 warning_count: 1,
2596 ..Default::default()
2597 },
2598 )]
2599 })
2600 .await;
2601
2602 // Open the file with the errors on client B. They should be present.
2603 let buffer_b = cx_b
2604 .background()
2605 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2606 .await
2607 .unwrap();
2608
2609 buffer_b.read_with(cx_b, |buffer, _| {
2610 assert_eq!(
2611 buffer
2612 .snapshot()
2613 .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2614 .map(|entry| entry)
2615 .collect::<Vec<_>>(),
2616 &[
2617 DiagnosticEntry {
2618 range: Point::new(0, 4)..Point::new(0, 7),
2619 diagnostic: Diagnostic {
2620 group_id: 0,
2621 message: "message 1".to_string(),
2622 severity: lsp::DiagnosticSeverity::ERROR,
2623 is_primary: true,
2624 ..Default::default()
2625 }
2626 },
2627 DiagnosticEntry {
2628 range: Point::new(0, 10)..Point::new(0, 13),
2629 diagnostic: Diagnostic {
2630 group_id: 1,
2631 severity: lsp::DiagnosticSeverity::WARNING,
2632 message: "message 2".to_string(),
2633 is_primary: true,
2634 ..Default::default()
2635 }
2636 }
2637 ]
2638 );
2639 });
2640
2641 // Simulate a language server reporting no errors for a file.
2642 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2643 lsp::PublishDiagnosticsParams {
2644 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2645 version: None,
2646 diagnostics: vec![],
2647 },
2648 );
2649 project_a
2650 .condition(cx_a, |project, cx| {
2651 project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2652 })
2653 .await;
2654 project_b
2655 .condition(cx_b, |project, cx| {
2656 project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2657 })
2658 .await;
2659 }
2660
2661 #[gpui::test(iterations = 10)]
2662 async fn test_collaborating_with_completion(
2663 cx_a: &mut TestAppContext,
2664 cx_b: &mut TestAppContext,
2665 ) {
2666 cx_a.foreground().forbid_parking();
2667 let lang_registry = Arc::new(LanguageRegistry::test());
2668 let fs = FakeFs::new(cx_a.background());
2669
2670 // Set up a fake language server.
2671 let mut language = Language::new(
2672 LanguageConfig {
2673 name: "Rust".into(),
2674 path_suffixes: vec!["rs".to_string()],
2675 ..Default::default()
2676 },
2677 Some(tree_sitter_rust::language()),
2678 );
2679 let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
2680 capabilities: lsp::ServerCapabilities {
2681 completion_provider: Some(lsp::CompletionOptions {
2682 trigger_characters: Some(vec![".".to_string()]),
2683 ..Default::default()
2684 }),
2685 ..Default::default()
2686 },
2687 ..Default::default()
2688 });
2689 lang_registry.add(Arc::new(language));
2690
2691 // Connect to a server as 2 clients.
2692 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2693 let client_a = server.create_client(cx_a, "user_a").await;
2694 let client_b = server.create_client(cx_b, "user_b").await;
2695
2696 // Share a project as client A
2697 fs.insert_tree(
2698 "/a",
2699 json!({
2700 ".zed.toml": r#"collaborators = ["user_b"]"#,
2701 "main.rs": "fn main() { a }",
2702 "other.rs": "",
2703 }),
2704 )
2705 .await;
2706 let project_a = cx_a.update(|cx| {
2707 Project::local(
2708 client_a.clone(),
2709 client_a.user_store.clone(),
2710 lang_registry.clone(),
2711 fs.clone(),
2712 cx,
2713 )
2714 });
2715 let (worktree_a, _) = project_a
2716 .update(cx_a, |p, cx| {
2717 p.find_or_create_local_worktree("/a", true, cx)
2718 })
2719 .await
2720 .unwrap();
2721 worktree_a
2722 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2723 .await;
2724 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2725 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2726 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2727
2728 // Join the worktree as client B.
2729 let project_b = Project::remote(
2730 project_id,
2731 client_b.clone(),
2732 client_b.user_store.clone(),
2733 lang_registry.clone(),
2734 fs.clone(),
2735 &mut cx_b.to_async(),
2736 )
2737 .await
2738 .unwrap();
2739
2740 // Open a file in an editor as the guest.
2741 let buffer_b = project_b
2742 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2743 .await
2744 .unwrap();
2745 let (window_b, _) = cx_b.add_window(|_| EmptyView);
2746 let editor_b = cx_b.add_view(window_b, |cx| {
2747 Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
2748 });
2749
2750 let fake_language_server = fake_language_servers.next().await.unwrap();
2751 buffer_b
2752 .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2753 .await;
2754
2755 // Type a completion trigger character as the guest.
2756 editor_b.update(cx_b, |editor, cx| {
2757 editor.select_ranges([13..13], None, cx);
2758 editor.handle_input(&Input(".".into()), cx);
2759 cx.focus(&editor_b);
2760 });
2761
2762 // Receive a completion request as the host's language server.
2763 // Return some completions from the host's language server.
2764 cx_a.foreground().start_waiting();
2765 fake_language_server
2766 .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
2767 assert_eq!(
2768 params.text_document_position.text_document.uri,
2769 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2770 );
2771 assert_eq!(
2772 params.text_document_position.position,
2773 lsp::Position::new(0, 14),
2774 );
2775
2776 Ok(Some(lsp::CompletionResponse::Array(vec![
2777 lsp::CompletionItem {
2778 label: "first_method(…)".into(),
2779 detail: Some("fn(&mut self, B) -> C".into()),
2780 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2781 new_text: "first_method($1)".to_string(),
2782 range: lsp::Range::new(
2783 lsp::Position::new(0, 14),
2784 lsp::Position::new(0, 14),
2785 ),
2786 })),
2787 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2788 ..Default::default()
2789 },
2790 lsp::CompletionItem {
2791 label: "second_method(…)".into(),
2792 detail: Some("fn(&mut self, C) -> D<E>".into()),
2793 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2794 new_text: "second_method()".to_string(),
2795 range: lsp::Range::new(
2796 lsp::Position::new(0, 14),
2797 lsp::Position::new(0, 14),
2798 ),
2799 })),
2800 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2801 ..Default::default()
2802 },
2803 ])))
2804 })
2805 .next()
2806 .await
2807 .unwrap();
2808 cx_a.foreground().finish_waiting();
2809
2810 // Open the buffer on the host.
2811 let buffer_a = project_a
2812 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2813 .await
2814 .unwrap();
2815 buffer_a
2816 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2817 .await;
2818
2819 // Confirm a completion on the guest.
2820 editor_b
2821 .condition(&cx_b, |editor, _| editor.context_menu_visible())
2822 .await;
2823 editor_b.update(cx_b, |editor, cx| {
2824 editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
2825 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2826 });
2827
2828 // Return a resolved completion from the host's language server.
2829 // The resolved completion has an additional text edit.
2830 fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
2831 |params, _| async move {
2832 assert_eq!(params.label, "first_method(…)");
2833 Ok(lsp::CompletionItem {
2834 label: "first_method(…)".into(),
2835 detail: Some("fn(&mut self, B) -> C".into()),
2836 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2837 new_text: "first_method($1)".to_string(),
2838 range: lsp::Range::new(
2839 lsp::Position::new(0, 14),
2840 lsp::Position::new(0, 14),
2841 ),
2842 })),
2843 additional_text_edits: Some(vec![lsp::TextEdit {
2844 new_text: "use d::SomeTrait;\n".to_string(),
2845 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2846 }]),
2847 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2848 ..Default::default()
2849 })
2850 },
2851 );
2852
2853 // The additional edit is applied.
2854 buffer_a
2855 .condition(&cx_a, |buffer, _| {
2856 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2857 })
2858 .await;
2859 buffer_b
2860 .condition(&cx_b, |buffer, _| {
2861 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2862 })
2863 .await;
2864 }
2865
2866 #[gpui::test(iterations = 10)]
2867 async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2868 cx_a.foreground().forbid_parking();
2869 let lang_registry = Arc::new(LanguageRegistry::test());
2870 let fs = FakeFs::new(cx_a.background());
2871
2872 // Connect to a server as 2 clients.
2873 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2874 let client_a = server.create_client(cx_a, "user_a").await;
2875 let client_b = server.create_client(cx_b, "user_b").await;
2876
2877 // Share a project as client A
2878 fs.insert_tree(
2879 "/a",
2880 json!({
2881 ".zed.toml": r#"collaborators = ["user_b"]"#,
2882 "a.rs": "let one = 1;",
2883 }),
2884 )
2885 .await;
2886 let project_a = cx_a.update(|cx| {
2887 Project::local(
2888 client_a.clone(),
2889 client_a.user_store.clone(),
2890 lang_registry.clone(),
2891 fs.clone(),
2892 cx,
2893 )
2894 });
2895 let (worktree_a, _) = project_a
2896 .update(cx_a, |p, cx| {
2897 p.find_or_create_local_worktree("/a", true, cx)
2898 })
2899 .await
2900 .unwrap();
2901 worktree_a
2902 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2903 .await;
2904 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2905 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2906 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2907 let buffer_a = project_a
2908 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
2909 .await
2910 .unwrap();
2911
2912 // Join the worktree as client B.
2913 let project_b = Project::remote(
2914 project_id,
2915 client_b.clone(),
2916 client_b.user_store.clone(),
2917 lang_registry.clone(),
2918 fs.clone(),
2919 &mut cx_b.to_async(),
2920 )
2921 .await
2922 .unwrap();
2923
2924 let buffer_b = cx_b
2925 .background()
2926 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2927 .await
2928 .unwrap();
2929 buffer_b.update(cx_b, |buffer, cx| {
2930 buffer.edit([(4..7, "six")], cx);
2931 buffer.edit([(10..11, "6")], cx);
2932 assert_eq!(buffer.text(), "let six = 6;");
2933 assert!(buffer.is_dirty());
2934 assert!(!buffer.has_conflict());
2935 });
2936 buffer_a
2937 .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
2938 .await;
2939
2940 fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
2941 .await
2942 .unwrap();
2943 buffer_a
2944 .condition(cx_a, |buffer, _| buffer.has_conflict())
2945 .await;
2946 buffer_b
2947 .condition(cx_b, |buffer, _| buffer.has_conflict())
2948 .await;
2949
2950 project_b
2951 .update(cx_b, |project, cx| {
2952 project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
2953 })
2954 .await
2955 .unwrap();
2956 buffer_a.read_with(cx_a, |buffer, _| {
2957 assert_eq!(buffer.text(), "let seven = 7;");
2958 assert!(!buffer.is_dirty());
2959 assert!(!buffer.has_conflict());
2960 });
2961 buffer_b.read_with(cx_b, |buffer, _| {
2962 assert_eq!(buffer.text(), "let seven = 7;");
2963 assert!(!buffer.is_dirty());
2964 assert!(!buffer.has_conflict());
2965 });
2966
2967 buffer_a.update(cx_a, |buffer, cx| {
2968 // Undoing on the host is a no-op when the reload was initiated by the guest.
2969 buffer.undo(cx);
2970 assert_eq!(buffer.text(), "let seven = 7;");
2971 assert!(!buffer.is_dirty());
2972 assert!(!buffer.has_conflict());
2973 });
2974 buffer_b.update(cx_b, |buffer, cx| {
2975 // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
2976 buffer.undo(cx);
2977 assert_eq!(buffer.text(), "let six = 6;");
2978 assert!(buffer.is_dirty());
2979 assert!(!buffer.has_conflict());
2980 });
2981 }
2982
2983 #[gpui::test(iterations = 10)]
2984 async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2985 cx_a.foreground().forbid_parking();
2986 let lang_registry = Arc::new(LanguageRegistry::test());
2987 let fs = FakeFs::new(cx_a.background());
2988
2989 // Set up a fake language server.
2990 let mut language = Language::new(
2991 LanguageConfig {
2992 name: "Rust".into(),
2993 path_suffixes: vec!["rs".to_string()],
2994 ..Default::default()
2995 },
2996 Some(tree_sitter_rust::language()),
2997 );
2998 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2999 lang_registry.add(Arc::new(language));
3000
3001 // Connect to a server as 2 clients.
3002 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3003 let client_a = server.create_client(cx_a, "user_a").await;
3004 let client_b = server.create_client(cx_b, "user_b").await;
3005
3006 // Share a project as client A
3007 fs.insert_tree(
3008 "/a",
3009 json!({
3010 ".zed.toml": r#"collaborators = ["user_b"]"#,
3011 "a.rs": "let one = two",
3012 }),
3013 )
3014 .await;
3015 let project_a = cx_a.update(|cx| {
3016 Project::local(
3017 client_a.clone(),
3018 client_a.user_store.clone(),
3019 lang_registry.clone(),
3020 fs.clone(),
3021 cx,
3022 )
3023 });
3024 let (worktree_a, _) = project_a
3025 .update(cx_a, |p, cx| {
3026 p.find_or_create_local_worktree("/a", true, cx)
3027 })
3028 .await
3029 .unwrap();
3030 worktree_a
3031 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3032 .await;
3033 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3034 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3035 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3036
3037 // Join the worktree as client B.
3038 let project_b = Project::remote(
3039 project_id,
3040 client_b.clone(),
3041 client_b.user_store.clone(),
3042 lang_registry.clone(),
3043 fs.clone(),
3044 &mut cx_b.to_async(),
3045 )
3046 .await
3047 .unwrap();
3048
3049 let buffer_b = cx_b
3050 .background()
3051 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3052 .await
3053 .unwrap();
3054
3055 let fake_language_server = fake_language_servers.next().await.unwrap();
3056 fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
3057 Ok(Some(vec![
3058 lsp::TextEdit {
3059 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
3060 new_text: "h".to_string(),
3061 },
3062 lsp::TextEdit {
3063 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
3064 new_text: "y".to_string(),
3065 },
3066 ]))
3067 });
3068
3069 project_b
3070 .update(cx_b, |project, cx| {
3071 project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
3072 })
3073 .await
3074 .unwrap();
3075 assert_eq!(
3076 buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
3077 "let honey = two"
3078 );
3079 }
3080
3081 #[gpui::test(iterations = 10)]
3082 async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3083 cx_a.foreground().forbid_parking();
3084 let lang_registry = Arc::new(LanguageRegistry::test());
3085 let fs = FakeFs::new(cx_a.background());
3086 fs.insert_tree(
3087 "/root-1",
3088 json!({
3089 ".zed.toml": r#"collaborators = ["user_b"]"#,
3090 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
3091 }),
3092 )
3093 .await;
3094 fs.insert_tree(
3095 "/root-2",
3096 json!({
3097 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
3098 }),
3099 )
3100 .await;
3101
3102 // Set up a fake language server.
3103 let mut language = Language::new(
3104 LanguageConfig {
3105 name: "Rust".into(),
3106 path_suffixes: vec!["rs".to_string()],
3107 ..Default::default()
3108 },
3109 Some(tree_sitter_rust::language()),
3110 );
3111 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3112 lang_registry.add(Arc::new(language));
3113
3114 // Connect to a server as 2 clients.
3115 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3116 let client_a = server.create_client(cx_a, "user_a").await;
3117 let client_b = server.create_client(cx_b, "user_b").await;
3118
3119 // Share a project as client A
3120 let project_a = cx_a.update(|cx| {
3121 Project::local(
3122 client_a.clone(),
3123 client_a.user_store.clone(),
3124 lang_registry.clone(),
3125 fs.clone(),
3126 cx,
3127 )
3128 });
3129 let (worktree_a, _) = project_a
3130 .update(cx_a, |p, cx| {
3131 p.find_or_create_local_worktree("/root-1", true, cx)
3132 })
3133 .await
3134 .unwrap();
3135 worktree_a
3136 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3137 .await;
3138 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3139 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3140 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3141
3142 // Join the worktree as client B.
3143 let project_b = Project::remote(
3144 project_id,
3145 client_b.clone(),
3146 client_b.user_store.clone(),
3147 lang_registry.clone(),
3148 fs.clone(),
3149 &mut cx_b.to_async(),
3150 )
3151 .await
3152 .unwrap();
3153
3154 // Open the file on client B.
3155 let buffer_b = cx_b
3156 .background()
3157 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3158 .await
3159 .unwrap();
3160
3161 // Request the definition of a symbol as the guest.
3162 let fake_language_server = fake_language_servers.next().await.unwrap();
3163 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3164 |_, _| async move {
3165 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3166 lsp::Location::new(
3167 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3168 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3169 ),
3170 )))
3171 },
3172 );
3173
3174 let definitions_1 = project_b
3175 .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
3176 .await
3177 .unwrap();
3178 cx_b.read(|cx| {
3179 assert_eq!(definitions_1.len(), 1);
3180 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3181 let target_buffer = definitions_1[0].buffer.read(cx);
3182 assert_eq!(
3183 target_buffer.text(),
3184 "const TWO: usize = 2;\nconst THREE: usize = 3;"
3185 );
3186 assert_eq!(
3187 definitions_1[0].range.to_point(target_buffer),
3188 Point::new(0, 6)..Point::new(0, 9)
3189 );
3190 });
3191
3192 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
3193 // the previous call to `definition`.
3194 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3195 |_, _| async move {
3196 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3197 lsp::Location::new(
3198 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3199 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
3200 ),
3201 )))
3202 },
3203 );
3204
3205 let definitions_2 = project_b
3206 .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
3207 .await
3208 .unwrap();
3209 cx_b.read(|cx| {
3210 assert_eq!(definitions_2.len(), 1);
3211 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3212 let target_buffer = definitions_2[0].buffer.read(cx);
3213 assert_eq!(
3214 target_buffer.text(),
3215 "const TWO: usize = 2;\nconst THREE: usize = 3;"
3216 );
3217 assert_eq!(
3218 definitions_2[0].range.to_point(target_buffer),
3219 Point::new(1, 6)..Point::new(1, 11)
3220 );
3221 });
3222 assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
3223 }
3224
3225 #[gpui::test(iterations = 10)]
3226 async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3227 cx_a.foreground().forbid_parking();
3228 let lang_registry = Arc::new(LanguageRegistry::test());
3229 let fs = FakeFs::new(cx_a.background());
3230 fs.insert_tree(
3231 "/root-1",
3232 json!({
3233 ".zed.toml": r#"collaborators = ["user_b"]"#,
3234 "one.rs": "const ONE: usize = 1;",
3235 "two.rs": "const TWO: usize = one::ONE + one::ONE;",
3236 }),
3237 )
3238 .await;
3239 fs.insert_tree(
3240 "/root-2",
3241 json!({
3242 "three.rs": "const THREE: usize = two::TWO + one::ONE;",
3243 }),
3244 )
3245 .await;
3246
3247 // Set up a fake language server.
3248 let mut language = Language::new(
3249 LanguageConfig {
3250 name: "Rust".into(),
3251 path_suffixes: vec!["rs".to_string()],
3252 ..Default::default()
3253 },
3254 Some(tree_sitter_rust::language()),
3255 );
3256 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3257 lang_registry.add(Arc::new(language));
3258
3259 // Connect to a server as 2 clients.
3260 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3261 let client_a = server.create_client(cx_a, "user_a").await;
3262 let client_b = server.create_client(cx_b, "user_b").await;
3263
3264 // Share a project as client A
3265 let project_a = cx_a.update(|cx| {
3266 Project::local(
3267 client_a.clone(),
3268 client_a.user_store.clone(),
3269 lang_registry.clone(),
3270 fs.clone(),
3271 cx,
3272 )
3273 });
3274 let (worktree_a, _) = project_a
3275 .update(cx_a, |p, cx| {
3276 p.find_or_create_local_worktree("/root-1", true, cx)
3277 })
3278 .await
3279 .unwrap();
3280 worktree_a
3281 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3282 .await;
3283 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3284 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3285 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3286
3287 // Join the worktree as client B.
3288 let project_b = Project::remote(
3289 project_id,
3290 client_b.clone(),
3291 client_b.user_store.clone(),
3292 lang_registry.clone(),
3293 fs.clone(),
3294 &mut cx_b.to_async(),
3295 )
3296 .await
3297 .unwrap();
3298
3299 // Open the file on client B.
3300 let buffer_b = cx_b
3301 .background()
3302 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3303 .await
3304 .unwrap();
3305
3306 // Request references to a symbol as the guest.
3307 let fake_language_server = fake_language_servers.next().await.unwrap();
3308 fake_language_server.handle_request::<lsp::request::References, _, _>(
3309 |params, _| async move {
3310 assert_eq!(
3311 params.text_document_position.text_document.uri.as_str(),
3312 "file:///root-1/one.rs"
3313 );
3314 Ok(Some(vec![
3315 lsp::Location {
3316 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3317 range: lsp::Range::new(
3318 lsp::Position::new(0, 24),
3319 lsp::Position::new(0, 27),
3320 ),
3321 },
3322 lsp::Location {
3323 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3324 range: lsp::Range::new(
3325 lsp::Position::new(0, 35),
3326 lsp::Position::new(0, 38),
3327 ),
3328 },
3329 lsp::Location {
3330 uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3331 range: lsp::Range::new(
3332 lsp::Position::new(0, 37),
3333 lsp::Position::new(0, 40),
3334 ),
3335 },
3336 ]))
3337 },
3338 );
3339
3340 let references = project_b
3341 .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3342 .await
3343 .unwrap();
3344 cx_b.read(|cx| {
3345 assert_eq!(references.len(), 3);
3346 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3347
3348 let two_buffer = references[0].buffer.read(cx);
3349 let three_buffer = references[2].buffer.read(cx);
3350 assert_eq!(
3351 two_buffer.file().unwrap().path().as_ref(),
3352 Path::new("two.rs")
3353 );
3354 assert_eq!(references[1].buffer, references[0].buffer);
3355 assert_eq!(
3356 three_buffer.file().unwrap().full_path(cx),
3357 Path::new("three.rs")
3358 );
3359
3360 assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3361 assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3362 assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3363 });
3364 }
3365
3366 #[gpui::test(iterations = 10)]
3367 async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3368 cx_a.foreground().forbid_parking();
3369 let lang_registry = Arc::new(LanguageRegistry::test());
3370 let fs = FakeFs::new(cx_a.background());
3371 fs.insert_tree(
3372 "/root-1",
3373 json!({
3374 ".zed.toml": r#"collaborators = ["user_b"]"#,
3375 "a": "hello world",
3376 "b": "goodnight moon",
3377 "c": "a world of goo",
3378 "d": "world champion of clown world",
3379 }),
3380 )
3381 .await;
3382 fs.insert_tree(
3383 "/root-2",
3384 json!({
3385 "e": "disney world is fun",
3386 }),
3387 )
3388 .await;
3389
3390 // Connect to a server as 2 clients.
3391 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3392 let client_a = server.create_client(cx_a, "user_a").await;
3393 let client_b = server.create_client(cx_b, "user_b").await;
3394
3395 // Share a project as client A
3396 let project_a = cx_a.update(|cx| {
3397 Project::local(
3398 client_a.clone(),
3399 client_a.user_store.clone(),
3400 lang_registry.clone(),
3401 fs.clone(),
3402 cx,
3403 )
3404 });
3405 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3406
3407 let (worktree_1, _) = project_a
3408 .update(cx_a, |p, cx| {
3409 p.find_or_create_local_worktree("/root-1", true, cx)
3410 })
3411 .await
3412 .unwrap();
3413 worktree_1
3414 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3415 .await;
3416 let (worktree_2, _) = project_a
3417 .update(cx_a, |p, cx| {
3418 p.find_or_create_local_worktree("/root-2", true, cx)
3419 })
3420 .await
3421 .unwrap();
3422 worktree_2
3423 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3424 .await;
3425
3426 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3427
3428 // Join the worktree as client B.
3429 let project_b = Project::remote(
3430 project_id,
3431 client_b.clone(),
3432 client_b.user_store.clone(),
3433 lang_registry.clone(),
3434 fs.clone(),
3435 &mut cx_b.to_async(),
3436 )
3437 .await
3438 .unwrap();
3439
3440 let results = project_b
3441 .update(cx_b, |project, cx| {
3442 project.search(SearchQuery::text("world", false, false), cx)
3443 })
3444 .await
3445 .unwrap();
3446
3447 let mut ranges_by_path = results
3448 .into_iter()
3449 .map(|(buffer, ranges)| {
3450 buffer.read_with(cx_b, |buffer, cx| {
3451 let path = buffer.file().unwrap().full_path(cx);
3452 let offset_ranges = ranges
3453 .into_iter()
3454 .map(|range| range.to_offset(buffer))
3455 .collect::<Vec<_>>();
3456 (path, offset_ranges)
3457 })
3458 })
3459 .collect::<Vec<_>>();
3460 ranges_by_path.sort_by_key(|(path, _)| path.clone());
3461
3462 assert_eq!(
3463 ranges_by_path,
3464 &[
3465 (PathBuf::from("root-1/a"), vec![6..11]),
3466 (PathBuf::from("root-1/c"), vec![2..7]),
3467 (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3468 (PathBuf::from("root-2/e"), vec![7..12]),
3469 ]
3470 );
3471 }
3472
3473 #[gpui::test(iterations = 10)]
3474 async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3475 cx_a.foreground().forbid_parking();
3476 let lang_registry = Arc::new(LanguageRegistry::test());
3477 let fs = FakeFs::new(cx_a.background());
3478 fs.insert_tree(
3479 "/root-1",
3480 json!({
3481 ".zed.toml": r#"collaborators = ["user_b"]"#,
3482 "main.rs": "fn double(number: i32) -> i32 { number + number }",
3483 }),
3484 )
3485 .await;
3486
3487 // Set up a fake language server.
3488 let mut language = Language::new(
3489 LanguageConfig {
3490 name: "Rust".into(),
3491 path_suffixes: vec!["rs".to_string()],
3492 ..Default::default()
3493 },
3494 Some(tree_sitter_rust::language()),
3495 );
3496 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3497 lang_registry.add(Arc::new(language));
3498
3499 // Connect to a server as 2 clients.
3500 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3501 let client_a = server.create_client(cx_a, "user_a").await;
3502 let client_b = server.create_client(cx_b, "user_b").await;
3503
3504 // Share a project as client A
3505 let project_a = cx_a.update(|cx| {
3506 Project::local(
3507 client_a.clone(),
3508 client_a.user_store.clone(),
3509 lang_registry.clone(),
3510 fs.clone(),
3511 cx,
3512 )
3513 });
3514 let (worktree_a, _) = project_a
3515 .update(cx_a, |p, cx| {
3516 p.find_or_create_local_worktree("/root-1", true, cx)
3517 })
3518 .await
3519 .unwrap();
3520 worktree_a
3521 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3522 .await;
3523 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3524 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3525 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3526
3527 // Join the worktree as client B.
3528 let project_b = Project::remote(
3529 project_id,
3530 client_b.clone(),
3531 client_b.user_store.clone(),
3532 lang_registry.clone(),
3533 fs.clone(),
3534 &mut cx_b.to_async(),
3535 )
3536 .await
3537 .unwrap();
3538
3539 // Open the file on client B.
3540 let buffer_b = cx_b
3541 .background()
3542 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
3543 .await
3544 .unwrap();
3545
3546 // Request document highlights as the guest.
3547 let fake_language_server = fake_language_servers.next().await.unwrap();
3548 fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
3549 |params, _| async move {
3550 assert_eq!(
3551 params
3552 .text_document_position_params
3553 .text_document
3554 .uri
3555 .as_str(),
3556 "file:///root-1/main.rs"
3557 );
3558 assert_eq!(
3559 params.text_document_position_params.position,
3560 lsp::Position::new(0, 34)
3561 );
3562 Ok(Some(vec![
3563 lsp::DocumentHighlight {
3564 kind: Some(lsp::DocumentHighlightKind::WRITE),
3565 range: lsp::Range::new(
3566 lsp::Position::new(0, 10),
3567 lsp::Position::new(0, 16),
3568 ),
3569 },
3570 lsp::DocumentHighlight {
3571 kind: Some(lsp::DocumentHighlightKind::READ),
3572 range: lsp::Range::new(
3573 lsp::Position::new(0, 32),
3574 lsp::Position::new(0, 38),
3575 ),
3576 },
3577 lsp::DocumentHighlight {
3578 kind: Some(lsp::DocumentHighlightKind::READ),
3579 range: lsp::Range::new(
3580 lsp::Position::new(0, 41),
3581 lsp::Position::new(0, 47),
3582 ),
3583 },
3584 ]))
3585 },
3586 );
3587
3588 let highlights = project_b
3589 .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
3590 .await
3591 .unwrap();
3592 buffer_b.read_with(cx_b, |buffer, _| {
3593 let snapshot = buffer.snapshot();
3594
3595 let highlights = highlights
3596 .into_iter()
3597 .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3598 .collect::<Vec<_>>();
3599 assert_eq!(
3600 highlights,
3601 &[
3602 (lsp::DocumentHighlightKind::WRITE, 10..16),
3603 (lsp::DocumentHighlightKind::READ, 32..38),
3604 (lsp::DocumentHighlightKind::READ, 41..47)
3605 ]
3606 )
3607 });
3608 }
3609
3610 #[gpui::test(iterations = 10)]
3611 async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3612 cx_a.foreground().forbid_parking();
3613 let lang_registry = Arc::new(LanguageRegistry::test());
3614 let fs = FakeFs::new(cx_a.background());
3615 fs.insert_tree(
3616 "/code",
3617 json!({
3618 "crate-1": {
3619 ".zed.toml": r#"collaborators = ["user_b"]"#,
3620 "one.rs": "const ONE: usize = 1;",
3621 },
3622 "crate-2": {
3623 "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3624 },
3625 "private": {
3626 "passwords.txt": "the-password",
3627 }
3628 }),
3629 )
3630 .await;
3631
3632 // Set up a fake language server.
3633 let mut language = Language::new(
3634 LanguageConfig {
3635 name: "Rust".into(),
3636 path_suffixes: vec!["rs".to_string()],
3637 ..Default::default()
3638 },
3639 Some(tree_sitter_rust::language()),
3640 );
3641 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3642 lang_registry.add(Arc::new(language));
3643
3644 // Connect to a server as 2 clients.
3645 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3646 let client_a = server.create_client(cx_a, "user_a").await;
3647 let client_b = server.create_client(cx_b, "user_b").await;
3648
3649 // Share a project as client A
3650 let project_a = cx_a.update(|cx| {
3651 Project::local(
3652 client_a.clone(),
3653 client_a.user_store.clone(),
3654 lang_registry.clone(),
3655 fs.clone(),
3656 cx,
3657 )
3658 });
3659 let (worktree_a, _) = project_a
3660 .update(cx_a, |p, cx| {
3661 p.find_or_create_local_worktree("/code/crate-1", true, cx)
3662 })
3663 .await
3664 .unwrap();
3665 worktree_a
3666 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3667 .await;
3668 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3669 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3670 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3671
3672 // Join the worktree as client B.
3673 let project_b = Project::remote(
3674 project_id,
3675 client_b.clone(),
3676 client_b.user_store.clone(),
3677 lang_registry.clone(),
3678 fs.clone(),
3679 &mut cx_b.to_async(),
3680 )
3681 .await
3682 .unwrap();
3683
3684 // Cause the language server to start.
3685 let _buffer = cx_b
3686 .background()
3687 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3688 .await
3689 .unwrap();
3690
3691 let fake_language_server = fake_language_servers.next().await.unwrap();
3692 fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
3693 |_, _| async move {
3694 #[allow(deprecated)]
3695 Ok(Some(vec![lsp::SymbolInformation {
3696 name: "TWO".into(),
3697 location: lsp::Location {
3698 uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3699 range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3700 },
3701 kind: lsp::SymbolKind::CONSTANT,
3702 tags: None,
3703 container_name: None,
3704 deprecated: None,
3705 }]))
3706 },
3707 );
3708
3709 // Request the definition of a symbol as the guest.
3710 let symbols = project_b
3711 .update(cx_b, |p, cx| p.symbols("two", cx))
3712 .await
3713 .unwrap();
3714 assert_eq!(symbols.len(), 1);
3715 assert_eq!(symbols[0].name, "TWO");
3716
3717 // Open one of the returned symbols.
3718 let buffer_b_2 = project_b
3719 .update(cx_b, |project, cx| {
3720 project.open_buffer_for_symbol(&symbols[0], cx)
3721 })
3722 .await
3723 .unwrap();
3724 buffer_b_2.read_with(cx_b, |buffer, _| {
3725 assert_eq!(
3726 buffer.file().unwrap().path().as_ref(),
3727 Path::new("../crate-2/two.rs")
3728 );
3729 });
3730
3731 // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3732 let mut fake_symbol = symbols[0].clone();
3733 fake_symbol.path = Path::new("/code/secrets").into();
3734 let error = project_b
3735 .update(cx_b, |project, cx| {
3736 project.open_buffer_for_symbol(&fake_symbol, cx)
3737 })
3738 .await
3739 .unwrap_err();
3740 assert!(error.to_string().contains("invalid symbol signature"));
3741 }
3742
3743 #[gpui::test(iterations = 10)]
3744 async fn test_open_buffer_while_getting_definition_pointing_to_it(
3745 cx_a: &mut TestAppContext,
3746 cx_b: &mut TestAppContext,
3747 mut rng: StdRng,
3748 ) {
3749 cx_a.foreground().forbid_parking();
3750 let lang_registry = Arc::new(LanguageRegistry::test());
3751 let fs = FakeFs::new(cx_a.background());
3752 fs.insert_tree(
3753 "/root",
3754 json!({
3755 ".zed.toml": r#"collaborators = ["user_b"]"#,
3756 "a.rs": "const ONE: usize = b::TWO;",
3757 "b.rs": "const TWO: usize = 2",
3758 }),
3759 )
3760 .await;
3761
3762 // Set up a fake language server.
3763 let mut language = Language::new(
3764 LanguageConfig {
3765 name: "Rust".into(),
3766 path_suffixes: vec!["rs".to_string()],
3767 ..Default::default()
3768 },
3769 Some(tree_sitter_rust::language()),
3770 );
3771 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3772 lang_registry.add(Arc::new(language));
3773
3774 // Connect to a server as 2 clients.
3775 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3776 let client_a = server.create_client(cx_a, "user_a").await;
3777 let client_b = server.create_client(cx_b, "user_b").await;
3778
3779 // Share a project as client A
3780 let project_a = cx_a.update(|cx| {
3781 Project::local(
3782 client_a.clone(),
3783 client_a.user_store.clone(),
3784 lang_registry.clone(),
3785 fs.clone(),
3786 cx,
3787 )
3788 });
3789
3790 let (worktree_a, _) = project_a
3791 .update(cx_a, |p, cx| {
3792 p.find_or_create_local_worktree("/root", true, cx)
3793 })
3794 .await
3795 .unwrap();
3796 worktree_a
3797 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3798 .await;
3799 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3800 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3801 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3802
3803 // Join the worktree as client B.
3804 let project_b = Project::remote(
3805 project_id,
3806 client_b.clone(),
3807 client_b.user_store.clone(),
3808 lang_registry.clone(),
3809 fs.clone(),
3810 &mut cx_b.to_async(),
3811 )
3812 .await
3813 .unwrap();
3814
3815 let buffer_b1 = cx_b
3816 .background()
3817 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3818 .await
3819 .unwrap();
3820
3821 let fake_language_server = fake_language_servers.next().await.unwrap();
3822 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3823 |_, _| async move {
3824 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3825 lsp::Location::new(
3826 lsp::Url::from_file_path("/root/b.rs").unwrap(),
3827 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3828 ),
3829 )))
3830 },
3831 );
3832
3833 let definitions;
3834 let buffer_b2;
3835 if rng.gen() {
3836 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3837 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3838 } else {
3839 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3840 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3841 }
3842
3843 let buffer_b2 = buffer_b2.await.unwrap();
3844 let definitions = definitions.await.unwrap();
3845 assert_eq!(definitions.len(), 1);
3846 assert_eq!(definitions[0].buffer, buffer_b2);
3847 }
3848
3849 #[gpui::test(iterations = 10)]
3850 async fn test_collaborating_with_code_actions(
3851 cx_a: &mut TestAppContext,
3852 cx_b: &mut TestAppContext,
3853 ) {
3854 cx_a.foreground().forbid_parking();
3855 let lang_registry = Arc::new(LanguageRegistry::test());
3856 let fs = FakeFs::new(cx_a.background());
3857 cx_b.update(|cx| editor::init(cx));
3858
3859 // Set up a fake language server.
3860 let mut language = Language::new(
3861 LanguageConfig {
3862 name: "Rust".into(),
3863 path_suffixes: vec!["rs".to_string()],
3864 ..Default::default()
3865 },
3866 Some(tree_sitter_rust::language()),
3867 );
3868 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3869 lang_registry.add(Arc::new(language));
3870
3871 // Connect to a server as 2 clients.
3872 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3873 let client_a = server.create_client(cx_a, "user_a").await;
3874 let client_b = server.create_client(cx_b, "user_b").await;
3875
3876 // Share a project as client A
3877 fs.insert_tree(
3878 "/a",
3879 json!({
3880 ".zed.toml": r#"collaborators = ["user_b"]"#,
3881 "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3882 "other.rs": "pub fn foo() -> usize { 4 }",
3883 }),
3884 )
3885 .await;
3886 let project_a = cx_a.update(|cx| {
3887 Project::local(
3888 client_a.clone(),
3889 client_a.user_store.clone(),
3890 lang_registry.clone(),
3891 fs.clone(),
3892 cx,
3893 )
3894 });
3895 let (worktree_a, _) = project_a
3896 .update(cx_a, |p, cx| {
3897 p.find_or_create_local_worktree("/a", true, cx)
3898 })
3899 .await
3900 .unwrap();
3901 worktree_a
3902 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3903 .await;
3904 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3905 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3906 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3907
3908 // Join the worktree as client B.
3909 let project_b = Project::remote(
3910 project_id,
3911 client_b.clone(),
3912 client_b.user_store.clone(),
3913 lang_registry.clone(),
3914 fs.clone(),
3915 &mut cx_b.to_async(),
3916 )
3917 .await
3918 .unwrap();
3919 let mut params = cx_b.update(WorkspaceParams::test);
3920 params.languages = lang_registry.clone();
3921 params.client = client_b.client.clone();
3922 params.user_store = client_b.user_store.clone();
3923 params.project = project_b;
3924
3925 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
3926 let editor_b = workspace_b
3927 .update(cx_b, |workspace, cx| {
3928 workspace.open_path((worktree_id, "main.rs"), true, cx)
3929 })
3930 .await
3931 .unwrap()
3932 .downcast::<Editor>()
3933 .unwrap();
3934
3935 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3936 fake_language_server
3937 .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
3938 assert_eq!(
3939 params.text_document.uri,
3940 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3941 );
3942 assert_eq!(params.range.start, lsp::Position::new(0, 0));
3943 assert_eq!(params.range.end, lsp::Position::new(0, 0));
3944 Ok(None)
3945 })
3946 .next()
3947 .await;
3948
3949 // Move cursor to a location that contains code actions.
3950 editor_b.update(cx_b, |editor, cx| {
3951 editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3952 cx.focus(&editor_b);
3953 });
3954
3955 fake_language_server
3956 .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
3957 assert_eq!(
3958 params.text_document.uri,
3959 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3960 );
3961 assert_eq!(params.range.start, lsp::Position::new(1, 31));
3962 assert_eq!(params.range.end, lsp::Position::new(1, 31));
3963
3964 Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
3965 lsp::CodeAction {
3966 title: "Inline into all callers".to_string(),
3967 edit: Some(lsp::WorkspaceEdit {
3968 changes: Some(
3969 [
3970 (
3971 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3972 vec![lsp::TextEdit::new(
3973 lsp::Range::new(
3974 lsp::Position::new(1, 22),
3975 lsp::Position::new(1, 34),
3976 ),
3977 "4".to_string(),
3978 )],
3979 ),
3980 (
3981 lsp::Url::from_file_path("/a/other.rs").unwrap(),
3982 vec![lsp::TextEdit::new(
3983 lsp::Range::new(
3984 lsp::Position::new(0, 0),
3985 lsp::Position::new(0, 27),
3986 ),
3987 "".to_string(),
3988 )],
3989 ),
3990 ]
3991 .into_iter()
3992 .collect(),
3993 ),
3994 ..Default::default()
3995 }),
3996 data: Some(json!({
3997 "codeActionParams": {
3998 "range": {
3999 "start": {"line": 1, "column": 31},
4000 "end": {"line": 1, "column": 31},
4001 }
4002 }
4003 })),
4004 ..Default::default()
4005 },
4006 )]))
4007 })
4008 .next()
4009 .await;
4010
4011 // Toggle code actions and wait for them to display.
4012 editor_b.update(cx_b, |editor, cx| {
4013 editor.toggle_code_actions(
4014 &ToggleCodeActions {
4015 deployed_from_indicator: false,
4016 },
4017 cx,
4018 );
4019 });
4020 editor_b
4021 .condition(&cx_b, |editor, _| editor.context_menu_visible())
4022 .await;
4023
4024 fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
4025
4026 // Confirming the code action will trigger a resolve request.
4027 let confirm_action = workspace_b
4028 .update(cx_b, |workspace, cx| {
4029 Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
4030 })
4031 .unwrap();
4032 fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
4033 |_, _| async move {
4034 Ok(lsp::CodeAction {
4035 title: "Inline into all callers".to_string(),
4036 edit: Some(lsp::WorkspaceEdit {
4037 changes: Some(
4038 [
4039 (
4040 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4041 vec![lsp::TextEdit::new(
4042 lsp::Range::new(
4043 lsp::Position::new(1, 22),
4044 lsp::Position::new(1, 34),
4045 ),
4046 "4".to_string(),
4047 )],
4048 ),
4049 (
4050 lsp::Url::from_file_path("/a/other.rs").unwrap(),
4051 vec![lsp::TextEdit::new(
4052 lsp::Range::new(
4053 lsp::Position::new(0, 0),
4054 lsp::Position::new(0, 27),
4055 ),
4056 "".to_string(),
4057 )],
4058 ),
4059 ]
4060 .into_iter()
4061 .collect(),
4062 ),
4063 ..Default::default()
4064 }),
4065 ..Default::default()
4066 })
4067 },
4068 );
4069
4070 // After the action is confirmed, an editor containing both modified files is opened.
4071 confirm_action.await.unwrap();
4072 let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4073 workspace
4074 .active_item(cx)
4075 .unwrap()
4076 .downcast::<Editor>()
4077 .unwrap()
4078 });
4079 code_action_editor.update(cx_b, |editor, cx| {
4080 assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4081 editor.undo(&Undo, cx);
4082 assert_eq!(
4083 editor.text(cx),
4084 "mod other;\nfn main() { let foo = other::foo(); }\npub fn foo() -> usize { 4 }"
4085 );
4086 editor.redo(&Redo, cx);
4087 assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4088 });
4089 }
4090
4091 #[gpui::test(iterations = 10)]
4092 async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4093 cx_a.foreground().forbid_parking();
4094 let lang_registry = Arc::new(LanguageRegistry::test());
4095 let fs = FakeFs::new(cx_a.background());
4096 cx_b.update(|cx| editor::init(cx));
4097
4098 // Set up a fake language server.
4099 let mut language = Language::new(
4100 LanguageConfig {
4101 name: "Rust".into(),
4102 path_suffixes: vec!["rs".to_string()],
4103 ..Default::default()
4104 },
4105 Some(tree_sitter_rust::language()),
4106 );
4107 let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
4108 capabilities: lsp::ServerCapabilities {
4109 rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
4110 prepare_provider: Some(true),
4111 work_done_progress_options: Default::default(),
4112 })),
4113 ..Default::default()
4114 },
4115 ..Default::default()
4116 });
4117 lang_registry.add(Arc::new(language));
4118
4119 // Connect to a server as 2 clients.
4120 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4121 let client_a = server.create_client(cx_a, "user_a").await;
4122 let client_b = server.create_client(cx_b, "user_b").await;
4123
4124 // Share a project as client A
4125 fs.insert_tree(
4126 "/dir",
4127 json!({
4128 ".zed.toml": r#"collaborators = ["user_b"]"#,
4129 "one.rs": "const ONE: usize = 1;",
4130 "two.rs": "const TWO: usize = one::ONE + one::ONE;"
4131 }),
4132 )
4133 .await;
4134 let project_a = cx_a.update(|cx| {
4135 Project::local(
4136 client_a.clone(),
4137 client_a.user_store.clone(),
4138 lang_registry.clone(),
4139 fs.clone(),
4140 cx,
4141 )
4142 });
4143 let (worktree_a, _) = project_a
4144 .update(cx_a, |p, cx| {
4145 p.find_or_create_local_worktree("/dir", true, cx)
4146 })
4147 .await
4148 .unwrap();
4149 worktree_a
4150 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4151 .await;
4152 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4153 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4154 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4155
4156 // Join the worktree as client B.
4157 let project_b = Project::remote(
4158 project_id,
4159 client_b.clone(),
4160 client_b.user_store.clone(),
4161 lang_registry.clone(),
4162 fs.clone(),
4163 &mut cx_b.to_async(),
4164 )
4165 .await
4166 .unwrap();
4167 let mut params = cx_b.update(WorkspaceParams::test);
4168 params.languages = lang_registry.clone();
4169 params.client = client_b.client.clone();
4170 params.user_store = client_b.user_store.clone();
4171 params.project = project_b;
4172
4173 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
4174 let editor_b = workspace_b
4175 .update(cx_b, |workspace, cx| {
4176 workspace.open_path((worktree_id, "one.rs"), true, cx)
4177 })
4178 .await
4179 .unwrap()
4180 .downcast::<Editor>()
4181 .unwrap();
4182 let fake_language_server = fake_language_servers.next().await.unwrap();
4183
4184 // Move cursor to a location that can be renamed.
4185 let prepare_rename = editor_b.update(cx_b, |editor, cx| {
4186 editor.select_ranges([7..7], None, cx);
4187 editor.rename(&Rename, cx).unwrap()
4188 });
4189
4190 fake_language_server
4191 .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
4192 assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
4193 assert_eq!(params.position, lsp::Position::new(0, 7));
4194 Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4195 lsp::Position::new(0, 6),
4196 lsp::Position::new(0, 9),
4197 ))))
4198 })
4199 .next()
4200 .await
4201 .unwrap();
4202 prepare_rename.await.unwrap();
4203 editor_b.update(cx_b, |editor, cx| {
4204 let rename = editor.pending_rename().unwrap();
4205 let buffer = editor.buffer().read(cx).snapshot(cx);
4206 assert_eq!(
4207 rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
4208 6..9
4209 );
4210 rename.editor.update(cx, |rename_editor, cx| {
4211 rename_editor.buffer().update(cx, |rename_buffer, cx| {
4212 rename_buffer.edit([(0..3, "THREE")], cx);
4213 });
4214 });
4215 });
4216
4217 let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
4218 Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
4219 });
4220 fake_language_server
4221 .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
4222 assert_eq!(
4223 params.text_document_position.text_document.uri.as_str(),
4224 "file:///dir/one.rs"
4225 );
4226 assert_eq!(
4227 params.text_document_position.position,
4228 lsp::Position::new(0, 6)
4229 );
4230 assert_eq!(params.new_name, "THREE");
4231 Ok(Some(lsp::WorkspaceEdit {
4232 changes: Some(
4233 [
4234 (
4235 lsp::Url::from_file_path("/dir/one.rs").unwrap(),
4236 vec![lsp::TextEdit::new(
4237 lsp::Range::new(
4238 lsp::Position::new(0, 6),
4239 lsp::Position::new(0, 9),
4240 ),
4241 "THREE".to_string(),
4242 )],
4243 ),
4244 (
4245 lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4246 vec![
4247 lsp::TextEdit::new(
4248 lsp::Range::new(
4249 lsp::Position::new(0, 24),
4250 lsp::Position::new(0, 27),
4251 ),
4252 "THREE".to_string(),
4253 ),
4254 lsp::TextEdit::new(
4255 lsp::Range::new(
4256 lsp::Position::new(0, 35),
4257 lsp::Position::new(0, 38),
4258 ),
4259 "THREE".to_string(),
4260 ),
4261 ],
4262 ),
4263 ]
4264 .into_iter()
4265 .collect(),
4266 ),
4267 ..Default::default()
4268 }))
4269 })
4270 .next()
4271 .await
4272 .unwrap();
4273 confirm_rename.await.unwrap();
4274
4275 let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4276 workspace
4277 .active_item(cx)
4278 .unwrap()
4279 .downcast::<Editor>()
4280 .unwrap()
4281 });
4282 rename_editor.update(cx_b, |editor, cx| {
4283 assert_eq!(
4284 editor.text(cx),
4285 "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4286 );
4287 editor.undo(&Undo, cx);
4288 assert_eq!(
4289 editor.text(cx),
4290 "const ONE: usize = 1;\nconst TWO: usize = one::ONE + one::ONE;"
4291 );
4292 editor.redo(&Redo, cx);
4293 assert_eq!(
4294 editor.text(cx),
4295 "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4296 );
4297 });
4298
4299 // Ensure temporary rename edits cannot be undone/redone.
4300 editor_b.update(cx_b, |editor, cx| {
4301 editor.undo(&Undo, cx);
4302 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4303 editor.undo(&Undo, cx);
4304 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4305 editor.redo(&Redo, cx);
4306 assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4307 })
4308 }
4309
4310 #[gpui::test(iterations = 10)]
4311 async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4312 cx_a.foreground().forbid_parking();
4313
4314 // Connect to a server as 2 clients.
4315 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4316 let client_a = server.create_client(cx_a, "user_a").await;
4317 let client_b = server.create_client(cx_b, "user_b").await;
4318
4319 // Create an org that includes these 2 users.
4320 let db = &server.app_state.db;
4321 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4322 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4323 .await
4324 .unwrap();
4325 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4326 .await
4327 .unwrap();
4328
4329 // Create a channel that includes all the users.
4330 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4331 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4332 .await
4333 .unwrap();
4334 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4335 .await
4336 .unwrap();
4337 db.create_channel_message(
4338 channel_id,
4339 client_b.current_user_id(&cx_b),
4340 "hello A, it's B.",
4341 OffsetDateTime::now_utc(),
4342 1,
4343 )
4344 .await
4345 .unwrap();
4346
4347 let channels_a = cx_a
4348 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4349 channels_a
4350 .condition(cx_a, |list, _| list.available_channels().is_some())
4351 .await;
4352 channels_a.read_with(cx_a, |list, _| {
4353 assert_eq!(
4354 list.available_channels().unwrap(),
4355 &[ChannelDetails {
4356 id: channel_id.to_proto(),
4357 name: "test-channel".to_string()
4358 }]
4359 )
4360 });
4361 let channel_a = channels_a.update(cx_a, |this, cx| {
4362 this.get_channel(channel_id.to_proto(), cx).unwrap()
4363 });
4364 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4365 channel_a
4366 .condition(&cx_a, |channel, _| {
4367 channel_messages(channel)
4368 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4369 })
4370 .await;
4371
4372 let channels_b = cx_b
4373 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4374 channels_b
4375 .condition(cx_b, |list, _| list.available_channels().is_some())
4376 .await;
4377 channels_b.read_with(cx_b, |list, _| {
4378 assert_eq!(
4379 list.available_channels().unwrap(),
4380 &[ChannelDetails {
4381 id: channel_id.to_proto(),
4382 name: "test-channel".to_string()
4383 }]
4384 )
4385 });
4386
4387 let channel_b = channels_b.update(cx_b, |this, cx| {
4388 this.get_channel(channel_id.to_proto(), cx).unwrap()
4389 });
4390 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4391 channel_b
4392 .condition(&cx_b, |channel, _| {
4393 channel_messages(channel)
4394 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4395 })
4396 .await;
4397
4398 channel_a
4399 .update(cx_a, |channel, cx| {
4400 channel
4401 .send_message("oh, hi B.".to_string(), cx)
4402 .unwrap()
4403 .detach();
4404 let task = channel.send_message("sup".to_string(), cx).unwrap();
4405 assert_eq!(
4406 channel_messages(channel),
4407 &[
4408 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4409 ("user_a".to_string(), "oh, hi B.".to_string(), true),
4410 ("user_a".to_string(), "sup".to_string(), true)
4411 ]
4412 );
4413 task
4414 })
4415 .await
4416 .unwrap();
4417
4418 channel_b
4419 .condition(&cx_b, |channel, _| {
4420 channel_messages(channel)
4421 == [
4422 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4423 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4424 ("user_a".to_string(), "sup".to_string(), false),
4425 ]
4426 })
4427 .await;
4428
4429 assert_eq!(
4430 server
4431 .state()
4432 .await
4433 .channel(channel_id)
4434 .unwrap()
4435 .connection_ids
4436 .len(),
4437 2
4438 );
4439 cx_b.update(|_| drop(channel_b));
4440 server
4441 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4442 .await;
4443
4444 cx_a.update(|_| drop(channel_a));
4445 server
4446 .condition(|state| state.channel(channel_id).is_none())
4447 .await;
4448 }
4449
4450 #[gpui::test(iterations = 10)]
4451 async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4452 cx_a.foreground().forbid_parking();
4453
4454 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4455 let client_a = server.create_client(cx_a, "user_a").await;
4456
4457 let db = &server.app_state.db;
4458 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4459 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4460 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4461 .await
4462 .unwrap();
4463 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4464 .await
4465 .unwrap();
4466
4467 let channels_a = cx_a
4468 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4469 channels_a
4470 .condition(cx_a, |list, _| list.available_channels().is_some())
4471 .await;
4472 let channel_a = channels_a.update(cx_a, |this, cx| {
4473 this.get_channel(channel_id.to_proto(), cx).unwrap()
4474 });
4475
4476 // Messages aren't allowed to be too long.
4477 channel_a
4478 .update(cx_a, |channel, cx| {
4479 let long_body = "this is long.\n".repeat(1024);
4480 channel.send_message(long_body, cx).unwrap()
4481 })
4482 .await
4483 .unwrap_err();
4484
4485 // Messages aren't allowed to be blank.
4486 channel_a.update(cx_a, |channel, cx| {
4487 channel.send_message(String::new(), cx).unwrap_err()
4488 });
4489
4490 // Leading and trailing whitespace are trimmed.
4491 channel_a
4492 .update(cx_a, |channel, cx| {
4493 channel
4494 .send_message("\n surrounded by whitespace \n".to_string(), cx)
4495 .unwrap()
4496 })
4497 .await
4498 .unwrap();
4499 assert_eq!(
4500 db.get_channel_messages(channel_id, 10, None)
4501 .await
4502 .unwrap()
4503 .iter()
4504 .map(|m| &m.body)
4505 .collect::<Vec<_>>(),
4506 &["surrounded by whitespace"]
4507 );
4508 }
4509
4510 #[gpui::test(iterations = 10)]
4511 async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4512 cx_a.foreground().forbid_parking();
4513
4514 // Connect to a server as 2 clients.
4515 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4516 let client_a = server.create_client(cx_a, "user_a").await;
4517 let client_b = server.create_client(cx_b, "user_b").await;
4518 let mut status_b = client_b.status();
4519
4520 // Create an org that includes these 2 users.
4521 let db = &server.app_state.db;
4522 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4523 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4524 .await
4525 .unwrap();
4526 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4527 .await
4528 .unwrap();
4529
4530 // Create a channel that includes all the users.
4531 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4532 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4533 .await
4534 .unwrap();
4535 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4536 .await
4537 .unwrap();
4538 db.create_channel_message(
4539 channel_id,
4540 client_b.current_user_id(&cx_b),
4541 "hello A, it's B.",
4542 OffsetDateTime::now_utc(),
4543 2,
4544 )
4545 .await
4546 .unwrap();
4547
4548 let channels_a = cx_a
4549 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4550 channels_a
4551 .condition(cx_a, |list, _| list.available_channels().is_some())
4552 .await;
4553
4554 channels_a.read_with(cx_a, |list, _| {
4555 assert_eq!(
4556 list.available_channels().unwrap(),
4557 &[ChannelDetails {
4558 id: channel_id.to_proto(),
4559 name: "test-channel".to_string()
4560 }]
4561 )
4562 });
4563 let channel_a = channels_a.update(cx_a, |this, cx| {
4564 this.get_channel(channel_id.to_proto(), cx).unwrap()
4565 });
4566 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4567 channel_a
4568 .condition(&cx_a, |channel, _| {
4569 channel_messages(channel)
4570 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4571 })
4572 .await;
4573
4574 let channels_b = cx_b
4575 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4576 channels_b
4577 .condition(cx_b, |list, _| list.available_channels().is_some())
4578 .await;
4579 channels_b.read_with(cx_b, |list, _| {
4580 assert_eq!(
4581 list.available_channels().unwrap(),
4582 &[ChannelDetails {
4583 id: channel_id.to_proto(),
4584 name: "test-channel".to_string()
4585 }]
4586 )
4587 });
4588
4589 let channel_b = channels_b.update(cx_b, |this, cx| {
4590 this.get_channel(channel_id.to_proto(), cx).unwrap()
4591 });
4592 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4593 channel_b
4594 .condition(&cx_b, |channel, _| {
4595 channel_messages(channel)
4596 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4597 })
4598 .await;
4599
4600 // Disconnect client B, ensuring we can still access its cached channel data.
4601 server.forbid_connections();
4602 server.disconnect_client(client_b.current_user_id(&cx_b));
4603 cx_b.foreground().advance_clock(Duration::from_secs(3));
4604 while !matches!(
4605 status_b.next().await,
4606 Some(client::Status::ReconnectionError { .. })
4607 ) {}
4608
4609 channels_b.read_with(cx_b, |channels, _| {
4610 assert_eq!(
4611 channels.available_channels().unwrap(),
4612 [ChannelDetails {
4613 id: channel_id.to_proto(),
4614 name: "test-channel".to_string()
4615 }]
4616 )
4617 });
4618 channel_b.read_with(cx_b, |channel, _| {
4619 assert_eq!(
4620 channel_messages(channel),
4621 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4622 )
4623 });
4624
4625 // Send a message from client B while it is disconnected.
4626 channel_b
4627 .update(cx_b, |channel, cx| {
4628 let task = channel
4629 .send_message("can you see this?".to_string(), cx)
4630 .unwrap();
4631 assert_eq!(
4632 channel_messages(channel),
4633 &[
4634 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4635 ("user_b".to_string(), "can you see this?".to_string(), true)
4636 ]
4637 );
4638 task
4639 })
4640 .await
4641 .unwrap_err();
4642
4643 // Send a message from client A while B is disconnected.
4644 channel_a
4645 .update(cx_a, |channel, cx| {
4646 channel
4647 .send_message("oh, hi B.".to_string(), cx)
4648 .unwrap()
4649 .detach();
4650 let task = channel.send_message("sup".to_string(), cx).unwrap();
4651 assert_eq!(
4652 channel_messages(channel),
4653 &[
4654 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4655 ("user_a".to_string(), "oh, hi B.".to_string(), true),
4656 ("user_a".to_string(), "sup".to_string(), true)
4657 ]
4658 );
4659 task
4660 })
4661 .await
4662 .unwrap();
4663
4664 // Give client B a chance to reconnect.
4665 server.allow_connections();
4666 cx_b.foreground().advance_clock(Duration::from_secs(10));
4667
4668 // Verify that B sees the new messages upon reconnection, as well as the message client B
4669 // sent while offline.
4670 channel_b
4671 .condition(&cx_b, |channel, _| {
4672 channel_messages(channel)
4673 == [
4674 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4675 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4676 ("user_a".to_string(), "sup".to_string(), false),
4677 ("user_b".to_string(), "can you see this?".to_string(), false),
4678 ]
4679 })
4680 .await;
4681
4682 // Ensure client A and B can communicate normally after reconnection.
4683 channel_a
4684 .update(cx_a, |channel, cx| {
4685 channel.send_message("you online?".to_string(), cx).unwrap()
4686 })
4687 .await
4688 .unwrap();
4689 channel_b
4690 .condition(&cx_b, |channel, _| {
4691 channel_messages(channel)
4692 == [
4693 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4694 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4695 ("user_a".to_string(), "sup".to_string(), false),
4696 ("user_b".to_string(), "can you see this?".to_string(), false),
4697 ("user_a".to_string(), "you online?".to_string(), false),
4698 ]
4699 })
4700 .await;
4701
4702 channel_b
4703 .update(cx_b, |channel, cx| {
4704 channel.send_message("yep".to_string(), cx).unwrap()
4705 })
4706 .await
4707 .unwrap();
4708 channel_a
4709 .condition(&cx_a, |channel, _| {
4710 channel_messages(channel)
4711 == [
4712 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4713 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4714 ("user_a".to_string(), "sup".to_string(), false),
4715 ("user_b".to_string(), "can you see this?".to_string(), false),
4716 ("user_a".to_string(), "you online?".to_string(), false),
4717 ("user_b".to_string(), "yep".to_string(), false),
4718 ]
4719 })
4720 .await;
4721 }
4722
4723 #[gpui::test(iterations = 10)]
4724 async fn test_contacts(
4725 cx_a: &mut TestAppContext,
4726 cx_b: &mut TestAppContext,
4727 cx_c: &mut TestAppContext,
4728 ) {
4729 cx_a.foreground().forbid_parking();
4730 let lang_registry = Arc::new(LanguageRegistry::test());
4731 let fs = FakeFs::new(cx_a.background());
4732
4733 // Connect to a server as 3 clients.
4734 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4735 let client_a = server.create_client(cx_a, "user_a").await;
4736 let client_b = server.create_client(cx_b, "user_b").await;
4737 let client_c = server.create_client(cx_c, "user_c").await;
4738
4739 // Share a worktree as client A.
4740 fs.insert_tree(
4741 "/a",
4742 json!({
4743 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4744 }),
4745 )
4746 .await;
4747
4748 let project_a = cx_a.update(|cx| {
4749 Project::local(
4750 client_a.clone(),
4751 client_a.user_store.clone(),
4752 lang_registry.clone(),
4753 fs.clone(),
4754 cx,
4755 )
4756 });
4757 let (worktree_a, _) = project_a
4758 .update(cx_a, |p, cx| {
4759 p.find_or_create_local_worktree("/a", true, cx)
4760 })
4761 .await
4762 .unwrap();
4763 worktree_a
4764 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4765 .await;
4766
4767 client_a
4768 .user_store
4769 .condition(&cx_a, |user_store, _| {
4770 contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4771 })
4772 .await;
4773 client_b
4774 .user_store
4775 .condition(&cx_b, |user_store, _| {
4776 contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4777 })
4778 .await;
4779 client_c
4780 .user_store
4781 .condition(&cx_c, |user_store, _| {
4782 contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4783 })
4784 .await;
4785
4786 let project_id = project_a
4787 .update(cx_a, |project, _| project.next_remote_id())
4788 .await;
4789 project_a
4790 .update(cx_a, |project, cx| project.share(cx))
4791 .await
4792 .unwrap();
4793 client_a
4794 .user_store
4795 .condition(&cx_a, |user_store, _| {
4796 contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4797 })
4798 .await;
4799 client_b
4800 .user_store
4801 .condition(&cx_b, |user_store, _| {
4802 contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4803 })
4804 .await;
4805 client_c
4806 .user_store
4807 .condition(&cx_c, |user_store, _| {
4808 contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4809 })
4810 .await;
4811
4812 let _project_b = Project::remote(
4813 project_id,
4814 client_b.clone(),
4815 client_b.user_store.clone(),
4816 lang_registry.clone(),
4817 fs.clone(),
4818 &mut cx_b.to_async(),
4819 )
4820 .await
4821 .unwrap();
4822
4823 client_a
4824 .user_store
4825 .condition(&cx_a, |user_store, _| {
4826 contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4827 })
4828 .await;
4829 client_b
4830 .user_store
4831 .condition(&cx_b, |user_store, _| {
4832 contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4833 })
4834 .await;
4835 client_c
4836 .user_store
4837 .condition(&cx_c, |user_store, _| {
4838 contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4839 })
4840 .await;
4841
4842 project_a
4843 .condition(&cx_a, |project, _| {
4844 project.collaborators().contains_key(&client_b.peer_id)
4845 })
4846 .await;
4847
4848 cx_a.update(move |_| drop(project_a));
4849 client_a
4850 .user_store
4851 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4852 .await;
4853 client_b
4854 .user_store
4855 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4856 .await;
4857 client_c
4858 .user_store
4859 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4860 .await;
4861
4862 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, bool, Vec<&str>)>)> {
4863 user_store
4864 .contacts()
4865 .iter()
4866 .map(|contact| {
4867 let worktrees = contact
4868 .projects
4869 .iter()
4870 .map(|p| {
4871 (
4872 p.worktree_root_names[0].as_str(),
4873 p.is_shared,
4874 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4875 )
4876 })
4877 .collect();
4878 (contact.user.github_login.as_str(), worktrees)
4879 })
4880 .collect()
4881 }
4882 }
4883
4884 #[gpui::test(iterations = 10)]
4885 async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4886 cx_a.foreground().forbid_parking();
4887 let fs = FakeFs::new(cx_a.background());
4888
4889 // 2 clients connect to a server.
4890 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4891 let mut client_a = server.create_client(cx_a, "user_a").await;
4892 let mut client_b = server.create_client(cx_b, "user_b").await;
4893 cx_a.update(editor::init);
4894 cx_b.update(editor::init);
4895
4896 // Client A shares a project.
4897 fs.insert_tree(
4898 "/a",
4899 json!({
4900 ".zed.toml": r#"collaborators = ["user_b"]"#,
4901 "1.txt": "one",
4902 "2.txt": "two",
4903 "3.txt": "three",
4904 }),
4905 )
4906 .await;
4907 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
4908 project_a
4909 .update(cx_a, |project, cx| project.share(cx))
4910 .await
4911 .unwrap();
4912
4913 // Client B joins the project.
4914 let project_b = client_b
4915 .build_remote_project(
4916 project_a
4917 .read_with(cx_a, |project, _| project.remote_id())
4918 .unwrap(),
4919 cx_b,
4920 )
4921 .await;
4922
4923 // Client A opens some editors.
4924 let workspace_a = client_a.build_workspace(&project_a, cx_a);
4925 let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
4926 let editor_a1 = workspace_a
4927 .update(cx_a, |workspace, cx| {
4928 workspace.open_path((worktree_id, "1.txt"), true, cx)
4929 })
4930 .await
4931 .unwrap()
4932 .downcast::<Editor>()
4933 .unwrap();
4934 let editor_a2 = workspace_a
4935 .update(cx_a, |workspace, cx| {
4936 workspace.open_path((worktree_id, "2.txt"), true, cx)
4937 })
4938 .await
4939 .unwrap()
4940 .downcast::<Editor>()
4941 .unwrap();
4942
4943 // Client B opens an editor.
4944 let workspace_b = client_b.build_workspace(&project_b, cx_b);
4945 let editor_b1 = workspace_b
4946 .update(cx_b, |workspace, cx| {
4947 workspace.open_path((worktree_id, "1.txt"), true, cx)
4948 })
4949 .await
4950 .unwrap()
4951 .downcast::<Editor>()
4952 .unwrap();
4953
4954 let client_a_id = project_b.read_with(cx_b, |project, _| {
4955 project.collaborators().values().next().unwrap().peer_id
4956 });
4957 let client_b_id = project_a.read_with(cx_a, |project, _| {
4958 project.collaborators().values().next().unwrap().peer_id
4959 });
4960
4961 // When client B starts following client A, all visible view states are replicated to client B.
4962 editor_a1.update(cx_a, |editor, cx| editor.select_ranges([0..1], None, cx));
4963 editor_a2.update(cx_a, |editor, cx| editor.select_ranges([2..3], None, cx));
4964 workspace_b
4965 .update(cx_b, |workspace, cx| {
4966 workspace
4967 .toggle_follow(&ToggleFollow(client_a_id), cx)
4968 .unwrap()
4969 })
4970 .await
4971 .unwrap();
4972 let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
4973 workspace
4974 .active_item(cx)
4975 .unwrap()
4976 .downcast::<Editor>()
4977 .unwrap()
4978 });
4979 assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
4980 assert_eq!(
4981 editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
4982 Some((worktree_id, "2.txt").into())
4983 );
4984 assert_eq!(
4985 editor_b2.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
4986 vec![2..3]
4987 );
4988 assert_eq!(
4989 editor_b1.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
4990 vec![0..1]
4991 );
4992
4993 // When client A activates a different editor, client B does so as well.
4994 workspace_a.update(cx_a, |workspace, cx| {
4995 workspace.activate_item(&editor_a1, cx)
4996 });
4997 workspace_b
4998 .condition(cx_b, |workspace, cx| {
4999 workspace.active_item(cx).unwrap().id() == editor_b1.id()
5000 })
5001 .await;
5002
5003 // When client A navigates back and forth, client B does so as well.
5004 workspace_a
5005 .update(cx_a, |workspace, cx| {
5006 workspace::Pane::go_back(workspace, None, cx)
5007 })
5008 .await;
5009 workspace_b
5010 .condition(cx_b, |workspace, cx| {
5011 workspace.active_item(cx).unwrap().id() == editor_b2.id()
5012 })
5013 .await;
5014
5015 workspace_a
5016 .update(cx_a, |workspace, cx| {
5017 workspace::Pane::go_forward(workspace, None, cx)
5018 })
5019 .await;
5020 workspace_b
5021 .condition(cx_b, |workspace, cx| {
5022 workspace.active_item(cx).unwrap().id() == editor_b1.id()
5023 })
5024 .await;
5025
5026 // Changes to client A's editor are reflected on client B.
5027 editor_a1.update(cx_a, |editor, cx| {
5028 editor.select_ranges([1..1, 2..2], None, cx);
5029 });
5030 editor_b1
5031 .condition(cx_b, |editor, cx| {
5032 editor.selected_ranges(cx) == vec![1..1, 2..2]
5033 })
5034 .await;
5035
5036 editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
5037 editor_b1
5038 .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
5039 .await;
5040
5041 editor_a1.update(cx_a, |editor, cx| {
5042 editor.select_ranges([3..3], None, cx);
5043 editor.set_scroll_position(vec2f(0., 100.), cx);
5044 });
5045 editor_b1
5046 .condition(cx_b, |editor, cx| editor.selected_ranges(cx) == vec![3..3])
5047 .await;
5048
5049 // After unfollowing, client B stops receiving updates from client A.
5050 workspace_b.update(cx_b, |workspace, cx| {
5051 workspace.unfollow(&workspace.active_pane().clone(), cx)
5052 });
5053 workspace_a.update(cx_a, |workspace, cx| {
5054 workspace.activate_item(&editor_a2, cx)
5055 });
5056 cx_a.foreground().run_until_parked();
5057 assert_eq!(
5058 workspace_b.read_with(cx_b, |workspace, cx| workspace
5059 .active_item(cx)
5060 .unwrap()
5061 .id()),
5062 editor_b1.id()
5063 );
5064
5065 // Client A starts following client B.
5066 workspace_a
5067 .update(cx_a, |workspace, cx| {
5068 workspace
5069 .toggle_follow(&ToggleFollow(client_b_id), cx)
5070 .unwrap()
5071 })
5072 .await
5073 .unwrap();
5074 assert_eq!(
5075 workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5076 Some(client_b_id)
5077 );
5078 assert_eq!(
5079 workspace_a.read_with(cx_a, |workspace, cx| workspace
5080 .active_item(cx)
5081 .unwrap()
5082 .id()),
5083 editor_a1.id()
5084 );
5085
5086 // Following interrupts when client B disconnects.
5087 client_b.disconnect(&cx_b.to_async()).unwrap();
5088 cx_a.foreground().run_until_parked();
5089 assert_eq!(
5090 workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5091 None
5092 );
5093 }
5094
5095 #[gpui::test(iterations = 10)]
5096 async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5097 cx_a.foreground().forbid_parking();
5098 let fs = FakeFs::new(cx_a.background());
5099
5100 // 2 clients connect to a server.
5101 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5102 let mut client_a = server.create_client(cx_a, "user_a").await;
5103 let mut client_b = server.create_client(cx_b, "user_b").await;
5104 cx_a.update(editor::init);
5105 cx_b.update(editor::init);
5106
5107 // Client A shares a project.
5108 fs.insert_tree(
5109 "/a",
5110 json!({
5111 ".zed.toml": r#"collaborators = ["user_b"]"#,
5112 "1.txt": "one",
5113 "2.txt": "two",
5114 "3.txt": "three",
5115 "4.txt": "four",
5116 }),
5117 )
5118 .await;
5119 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5120 project_a
5121 .update(cx_a, |project, cx| project.share(cx))
5122 .await
5123 .unwrap();
5124
5125 // Client B joins the project.
5126 let project_b = client_b
5127 .build_remote_project(
5128 project_a
5129 .read_with(cx_a, |project, _| project.remote_id())
5130 .unwrap(),
5131 cx_b,
5132 )
5133 .await;
5134
5135 // Client A opens some editors.
5136 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5137 let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5138 let _editor_a1 = workspace_a
5139 .update(cx_a, |workspace, cx| {
5140 workspace.open_path((worktree_id, "1.txt"), true, cx)
5141 })
5142 .await
5143 .unwrap()
5144 .downcast::<Editor>()
5145 .unwrap();
5146
5147 // Client B opens an editor.
5148 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5149 let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5150 let _editor_b1 = workspace_b
5151 .update(cx_b, |workspace, cx| {
5152 workspace.open_path((worktree_id, "2.txt"), true, cx)
5153 })
5154 .await
5155 .unwrap()
5156 .downcast::<Editor>()
5157 .unwrap();
5158
5159 // Clients A and B follow each other in split panes
5160 workspace_a
5161 .update(cx_a, |workspace, cx| {
5162 workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5163 assert_ne!(*workspace.active_pane(), pane_a1);
5164 let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
5165 workspace
5166 .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5167 .unwrap()
5168 })
5169 .await
5170 .unwrap();
5171 workspace_b
5172 .update(cx_b, |workspace, cx| {
5173 workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5174 assert_ne!(*workspace.active_pane(), pane_b1);
5175 let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
5176 workspace
5177 .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5178 .unwrap()
5179 })
5180 .await
5181 .unwrap();
5182
5183 workspace_a
5184 .update(cx_a, |workspace, cx| {
5185 workspace.activate_next_pane(cx);
5186 assert_eq!(*workspace.active_pane(), pane_a1);
5187 workspace.open_path((worktree_id, "3.txt"), true, cx)
5188 })
5189 .await
5190 .unwrap();
5191 workspace_b
5192 .update(cx_b, |workspace, cx| {
5193 workspace.activate_next_pane(cx);
5194 assert_eq!(*workspace.active_pane(), pane_b1);
5195 workspace.open_path((worktree_id, "4.txt"), true, cx)
5196 })
5197 .await
5198 .unwrap();
5199 cx_a.foreground().run_until_parked();
5200
5201 // Ensure leader updates don't change the active pane of followers
5202 workspace_a.read_with(cx_a, |workspace, _| {
5203 assert_eq!(*workspace.active_pane(), pane_a1);
5204 });
5205 workspace_b.read_with(cx_b, |workspace, _| {
5206 assert_eq!(*workspace.active_pane(), pane_b1);
5207 });
5208
5209 // Ensure peers following each other doesn't cause an infinite loop.
5210 assert_eq!(
5211 workspace_a.read_with(cx_a, |workspace, cx| workspace
5212 .active_item(cx)
5213 .unwrap()
5214 .project_path(cx)),
5215 Some((worktree_id, "3.txt").into())
5216 );
5217 workspace_a.update(cx_a, |workspace, cx| {
5218 assert_eq!(
5219 workspace.active_item(cx).unwrap().project_path(cx),
5220 Some((worktree_id, "3.txt").into())
5221 );
5222 workspace.activate_next_pane(cx);
5223 assert_eq!(
5224 workspace.active_item(cx).unwrap().project_path(cx),
5225 Some((worktree_id, "4.txt").into())
5226 );
5227 });
5228 workspace_b.update(cx_b, |workspace, cx| {
5229 assert_eq!(
5230 workspace.active_item(cx).unwrap().project_path(cx),
5231 Some((worktree_id, "4.txt").into())
5232 );
5233 workspace.activate_next_pane(cx);
5234 assert_eq!(
5235 workspace.active_item(cx).unwrap().project_path(cx),
5236 Some((worktree_id, "3.txt").into())
5237 );
5238 });
5239 }
5240
5241 #[gpui::test(iterations = 10)]
5242 async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5243 cx_a.foreground().forbid_parking();
5244 let fs = FakeFs::new(cx_a.background());
5245
5246 // 2 clients connect to a server.
5247 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5248 let mut client_a = server.create_client(cx_a, "user_a").await;
5249 let mut client_b = server.create_client(cx_b, "user_b").await;
5250 cx_a.update(editor::init);
5251 cx_b.update(editor::init);
5252
5253 // Client A shares a project.
5254 fs.insert_tree(
5255 "/a",
5256 json!({
5257 ".zed.toml": r#"collaborators = ["user_b"]"#,
5258 "1.txt": "one",
5259 "2.txt": "two",
5260 "3.txt": "three",
5261 }),
5262 )
5263 .await;
5264 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5265 project_a
5266 .update(cx_a, |project, cx| project.share(cx))
5267 .await
5268 .unwrap();
5269
5270 // Client B joins the project.
5271 let project_b = client_b
5272 .build_remote_project(
5273 project_a
5274 .read_with(cx_a, |project, _| project.remote_id())
5275 .unwrap(),
5276 cx_b,
5277 )
5278 .await;
5279
5280 // Client A opens some editors.
5281 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5282 let _editor_a1 = workspace_a
5283 .update(cx_a, |workspace, cx| {
5284 workspace.open_path((worktree_id, "1.txt"), true, cx)
5285 })
5286 .await
5287 .unwrap()
5288 .downcast::<Editor>()
5289 .unwrap();
5290
5291 // Client B starts following client A.
5292 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5293 let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5294 let leader_id = project_b.read_with(cx_b, |project, _| {
5295 project.collaborators().values().next().unwrap().peer_id
5296 });
5297 workspace_b
5298 .update(cx_b, |workspace, cx| {
5299 workspace
5300 .toggle_follow(&ToggleFollow(leader_id), cx)
5301 .unwrap()
5302 })
5303 .await
5304 .unwrap();
5305 assert_eq!(
5306 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5307 Some(leader_id)
5308 );
5309 let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5310 workspace
5311 .active_item(cx)
5312 .unwrap()
5313 .downcast::<Editor>()
5314 .unwrap()
5315 });
5316
5317 // When client B moves, it automatically stops following client A.
5318 editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
5319 assert_eq!(
5320 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5321 None
5322 );
5323
5324 workspace_b
5325 .update(cx_b, |workspace, cx| {
5326 workspace
5327 .toggle_follow(&ToggleFollow(leader_id), cx)
5328 .unwrap()
5329 })
5330 .await
5331 .unwrap();
5332 assert_eq!(
5333 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5334 Some(leader_id)
5335 );
5336
5337 // When client B edits, it automatically stops following client A.
5338 editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
5339 assert_eq!(
5340 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5341 None
5342 );
5343
5344 workspace_b
5345 .update(cx_b, |workspace, cx| {
5346 workspace
5347 .toggle_follow(&ToggleFollow(leader_id), cx)
5348 .unwrap()
5349 })
5350 .await
5351 .unwrap();
5352 assert_eq!(
5353 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5354 Some(leader_id)
5355 );
5356
5357 // When client B scrolls, it automatically stops following client A.
5358 editor_b2.update(cx_b, |editor, cx| {
5359 editor.set_scroll_position(vec2f(0., 3.), cx)
5360 });
5361 assert_eq!(
5362 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5363 None
5364 );
5365
5366 workspace_b
5367 .update(cx_b, |workspace, cx| {
5368 workspace
5369 .toggle_follow(&ToggleFollow(leader_id), cx)
5370 .unwrap()
5371 })
5372 .await
5373 .unwrap();
5374 assert_eq!(
5375 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5376 Some(leader_id)
5377 );
5378
5379 // When client B activates a different pane, it continues following client A in the original pane.
5380 workspace_b.update(cx_b, |workspace, cx| {
5381 workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
5382 });
5383 assert_eq!(
5384 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5385 Some(leader_id)
5386 );
5387
5388 workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
5389 assert_eq!(
5390 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5391 Some(leader_id)
5392 );
5393
5394 // When client B activates a different item in the original pane, it automatically stops following client A.
5395 workspace_b
5396 .update(cx_b, |workspace, cx| {
5397 workspace.open_path((worktree_id, "2.txt"), true, cx)
5398 })
5399 .await
5400 .unwrap();
5401 assert_eq!(
5402 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5403 None
5404 );
5405 }
5406
5407 #[gpui::test(iterations = 100)]
5408 async fn test_random_collaboration(
5409 cx: &mut TestAppContext,
5410 deterministic: Arc<Deterministic>,
5411 rng: StdRng,
5412 ) {
5413 cx.foreground().forbid_parking();
5414 let max_peers = env::var("MAX_PEERS")
5415 .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
5416 .unwrap_or(5);
5417 assert!(max_peers <= 5);
5418
5419 let max_operations = env::var("OPERATIONS")
5420 .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
5421 .unwrap_or(10);
5422
5423 let rng = Arc::new(Mutex::new(rng));
5424
5425 let guest_lang_registry = Arc::new(LanguageRegistry::test());
5426 let host_language_registry = Arc::new(LanguageRegistry::test());
5427
5428 let fs = FakeFs::new(cx.background());
5429 fs.insert_tree(
5430 "/_collab",
5431 json!({
5432 ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4"]"#
5433 }),
5434 )
5435 .await;
5436
5437 let mut server = TestServer::start(cx.foreground(), cx.background()).await;
5438 let mut clients = Vec::new();
5439 let mut user_ids = Vec::new();
5440 let mut op_start_signals = Vec::new();
5441 let files = Arc::new(Mutex::new(Vec::new()));
5442
5443 let mut next_entity_id = 100000;
5444 let mut host_cx = TestAppContext::new(
5445 cx.foreground_platform(),
5446 cx.platform(),
5447 deterministic.build_foreground(next_entity_id),
5448 deterministic.build_background(),
5449 cx.font_cache(),
5450 cx.leak_detector(),
5451 next_entity_id,
5452 );
5453 let host = server.create_client(&mut host_cx, "host").await;
5454 let host_project = host_cx.update(|cx| {
5455 Project::local(
5456 host.client.clone(),
5457 host.user_store.clone(),
5458 host_language_registry.clone(),
5459 fs.clone(),
5460 cx,
5461 )
5462 });
5463 let host_project_id = host_project
5464 .update(&mut host_cx, |p, _| p.next_remote_id())
5465 .await;
5466
5467 let (collab_worktree, _) = host_project
5468 .update(&mut host_cx, |project, cx| {
5469 project.find_or_create_local_worktree("/_collab", true, cx)
5470 })
5471 .await
5472 .unwrap();
5473 collab_worktree
5474 .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
5475 .await;
5476 host_project
5477 .update(&mut host_cx, |project, cx| project.share(cx))
5478 .await
5479 .unwrap();
5480
5481 // Set up fake language servers.
5482 let mut language = Language::new(
5483 LanguageConfig {
5484 name: "Rust".into(),
5485 path_suffixes: vec!["rs".to_string()],
5486 ..Default::default()
5487 },
5488 None,
5489 );
5490 let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
5491 name: "the-fake-language-server",
5492 capabilities: lsp::LanguageServer::full_capabilities(),
5493 initializer: Some(Box::new({
5494 let rng = rng.clone();
5495 let files = files.clone();
5496 let project = host_project.downgrade();
5497 move |fake_server: &mut FakeLanguageServer| {
5498 fake_server.handle_request::<lsp::request::Completion, _, _>(
5499 |_, _| async move {
5500 Ok(Some(lsp::CompletionResponse::Array(vec![
5501 lsp::CompletionItem {
5502 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
5503 range: lsp::Range::new(
5504 lsp::Position::new(0, 0),
5505 lsp::Position::new(0, 0),
5506 ),
5507 new_text: "the-new-text".to_string(),
5508 })),
5509 ..Default::default()
5510 },
5511 ])))
5512 },
5513 );
5514
5515 fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
5516 |_, _| async move {
5517 Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
5518 lsp::CodeAction {
5519 title: "the-code-action".to_string(),
5520 ..Default::default()
5521 },
5522 )]))
5523 },
5524 );
5525
5526 fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
5527 |params, _| async move {
5528 Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
5529 params.position,
5530 params.position,
5531 ))))
5532 },
5533 );
5534
5535 fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
5536 let files = files.clone();
5537 let rng = rng.clone();
5538 move |_, _| {
5539 let files = files.clone();
5540 let rng = rng.clone();
5541 async move {
5542 let files = files.lock();
5543 let mut rng = rng.lock();
5544 let count = rng.gen_range::<usize, _>(1..3);
5545 let files = (0..count)
5546 .map(|_| files.choose(&mut *rng).unwrap())
5547 .collect::<Vec<_>>();
5548 log::info!("LSP: Returning definitions in files {:?}", &files);
5549 Ok(Some(lsp::GotoDefinitionResponse::Array(
5550 files
5551 .into_iter()
5552 .map(|file| lsp::Location {
5553 uri: lsp::Url::from_file_path(file).unwrap(),
5554 range: Default::default(),
5555 })
5556 .collect(),
5557 )))
5558 }
5559 }
5560 });
5561
5562 fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
5563 let rng = rng.clone();
5564 let project = project.clone();
5565 move |params, mut cx| {
5566 let highlights = if let Some(project) = project.upgrade(&cx) {
5567 project.update(&mut cx, |project, cx| {
5568 let path = params
5569 .text_document_position_params
5570 .text_document
5571 .uri
5572 .to_file_path()
5573 .unwrap();
5574 let (worktree, relative_path) =
5575 project.find_local_worktree(&path, cx)?;
5576 let project_path =
5577 ProjectPath::from((worktree.read(cx).id(), relative_path));
5578 let buffer =
5579 project.get_open_buffer(&project_path, cx)?.read(cx);
5580
5581 let mut highlights = Vec::new();
5582 let highlight_count = rng.lock().gen_range(1..=5);
5583 let mut prev_end = 0;
5584 for _ in 0..highlight_count {
5585 let range =
5586 buffer.random_byte_range(prev_end, &mut *rng.lock());
5587
5588 highlights.push(lsp::DocumentHighlight {
5589 range: range_to_lsp(range.to_point_utf16(buffer)),
5590 kind: Some(lsp::DocumentHighlightKind::READ),
5591 });
5592 prev_end = range.end;
5593 }
5594 Some(highlights)
5595 })
5596 } else {
5597 None
5598 };
5599 async move { Ok(highlights) }
5600 }
5601 });
5602 }
5603 })),
5604 ..Default::default()
5605 });
5606 host_language_registry.add(Arc::new(language));
5607
5608 let op_start_signal = futures::channel::mpsc::unbounded();
5609 user_ids.push(host.current_user_id(&host_cx));
5610 op_start_signals.push(op_start_signal.0);
5611 clients.push(host_cx.foreground().spawn(host.simulate_host(
5612 host_project,
5613 files,
5614 op_start_signal.1,
5615 rng.clone(),
5616 host_cx,
5617 )));
5618
5619 let disconnect_host_at = if rng.lock().gen_bool(0.2) {
5620 rng.lock().gen_range(0..max_operations)
5621 } else {
5622 max_operations
5623 };
5624 let mut available_guests = vec![
5625 "guest-1".to_string(),
5626 "guest-2".to_string(),
5627 "guest-3".to_string(),
5628 "guest-4".to_string(),
5629 ];
5630 let mut operations = 0;
5631 while operations < max_operations {
5632 if operations == disconnect_host_at {
5633 server.disconnect_client(user_ids[0]);
5634 cx.foreground().advance_clock(RECEIVE_TIMEOUT);
5635 drop(op_start_signals);
5636 let mut clients = futures::future::join_all(clients).await;
5637 cx.foreground().run_until_parked();
5638
5639 let (host, mut host_cx, host_err) = clients.remove(0);
5640 if let Some(host_err) = host_err {
5641 log::error!("host error - {}", host_err);
5642 }
5643 host.project
5644 .as_ref()
5645 .unwrap()
5646 .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
5647 for (guest, mut guest_cx, guest_err) in clients {
5648 if let Some(guest_err) = guest_err {
5649 log::error!("{} error - {}", guest.username, guest_err);
5650 }
5651 let contacts = server
5652 .store
5653 .read()
5654 .await
5655 .contacts_for_user(guest.current_user_id(&guest_cx));
5656 assert!(!contacts
5657 .iter()
5658 .flat_map(|contact| &contact.projects)
5659 .any(|project| project.id == host_project_id));
5660 guest
5661 .project
5662 .as_ref()
5663 .unwrap()
5664 .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
5665 guest_cx.update(|_| drop(guest));
5666 }
5667 host_cx.update(|_| drop(host));
5668
5669 return;
5670 }
5671
5672 let distribution = rng.lock().gen_range(0..100);
5673 match distribution {
5674 0..=19 if !available_guests.is_empty() => {
5675 let guest_ix = rng.lock().gen_range(0..available_guests.len());
5676 let guest_username = available_guests.remove(guest_ix);
5677 log::info!("Adding new connection for {}", guest_username);
5678 next_entity_id += 100000;
5679 let mut guest_cx = TestAppContext::new(
5680 cx.foreground_platform(),
5681 cx.platform(),
5682 deterministic.build_foreground(next_entity_id),
5683 deterministic.build_background(),
5684 cx.font_cache(),
5685 cx.leak_detector(),
5686 next_entity_id,
5687 );
5688 let guest = server.create_client(&mut guest_cx, &guest_username).await;
5689 let guest_project = Project::remote(
5690 host_project_id,
5691 guest.client.clone(),
5692 guest.user_store.clone(),
5693 guest_lang_registry.clone(),
5694 FakeFs::new(cx.background()),
5695 &mut guest_cx.to_async(),
5696 )
5697 .await
5698 .unwrap();
5699 let op_start_signal = futures::channel::mpsc::unbounded();
5700 user_ids.push(guest.current_user_id(&guest_cx));
5701 op_start_signals.push(op_start_signal.0);
5702 clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
5703 guest_username.clone(),
5704 guest_project,
5705 op_start_signal.1,
5706 rng.clone(),
5707 guest_cx,
5708 )));
5709
5710 log::info!("Added connection for {}", guest_username);
5711 operations += 1;
5712 }
5713 20..=29 if clients.len() > 1 => {
5714 log::info!("Removing guest");
5715 let guest_ix = rng.lock().gen_range(1..clients.len());
5716 let removed_guest_id = user_ids.remove(guest_ix);
5717 let guest = clients.remove(guest_ix);
5718 op_start_signals.remove(guest_ix);
5719 server.disconnect_client(removed_guest_id);
5720 cx.foreground().advance_clock(RECEIVE_TIMEOUT);
5721 let (guest, mut guest_cx, guest_err) = guest.await;
5722 if let Some(guest_err) = guest_err {
5723 log::error!("{} error - {}", guest.username, guest_err);
5724 }
5725 guest
5726 .project
5727 .as_ref()
5728 .unwrap()
5729 .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
5730 for user_id in &user_ids {
5731 for contact in server.store.read().await.contacts_for_user(*user_id) {
5732 assert_ne!(
5733 contact.user_id, removed_guest_id.0 as u64,
5734 "removed guest is still a contact of another peer"
5735 );
5736 for project in contact.projects {
5737 for project_guest_id in project.guests {
5738 assert_ne!(
5739 project_guest_id, removed_guest_id.0 as u64,
5740 "removed guest appears as still participating on a project"
5741 );
5742 }
5743 }
5744 }
5745 }
5746
5747 log::info!("{} removed", guest.username);
5748 available_guests.push(guest.username.clone());
5749 guest_cx.update(|_| drop(guest));
5750
5751 operations += 1;
5752 }
5753 _ => {
5754 while operations < max_operations && rng.lock().gen_bool(0.7) {
5755 op_start_signals
5756 .choose(&mut *rng.lock())
5757 .unwrap()
5758 .unbounded_send(())
5759 .unwrap();
5760 operations += 1;
5761 }
5762
5763 if rng.lock().gen_bool(0.8) {
5764 cx.foreground().run_until_parked();
5765 }
5766 }
5767 }
5768 }
5769
5770 drop(op_start_signals);
5771 let mut clients = futures::future::join_all(clients).await;
5772 cx.foreground().run_until_parked();
5773
5774 let (host_client, mut host_cx, host_err) = clients.remove(0);
5775 if let Some(host_err) = host_err {
5776 panic!("host error - {}", host_err);
5777 }
5778 let host_project = host_client.project.as_ref().unwrap();
5779 let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
5780 project
5781 .worktrees(cx)
5782 .map(|worktree| {
5783 let snapshot = worktree.read(cx).snapshot();
5784 (snapshot.id(), snapshot)
5785 })
5786 .collect::<BTreeMap<_, _>>()
5787 });
5788
5789 host_client
5790 .project
5791 .as_ref()
5792 .unwrap()
5793 .read_with(&host_cx, |project, cx| project.check_invariants(cx));
5794
5795 for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
5796 if let Some(guest_err) = guest_err {
5797 panic!("{} error - {}", guest_client.username, guest_err);
5798 }
5799 let worktree_snapshots =
5800 guest_client
5801 .project
5802 .as_ref()
5803 .unwrap()
5804 .read_with(&guest_cx, |project, cx| {
5805 project
5806 .worktrees(cx)
5807 .map(|worktree| {
5808 let worktree = worktree.read(cx);
5809 (worktree.id(), worktree.snapshot())
5810 })
5811 .collect::<BTreeMap<_, _>>()
5812 });
5813
5814 assert_eq!(
5815 worktree_snapshots.keys().collect::<Vec<_>>(),
5816 host_worktree_snapshots.keys().collect::<Vec<_>>(),
5817 "{} has different worktrees than the host",
5818 guest_client.username
5819 );
5820 for (id, host_snapshot) in &host_worktree_snapshots {
5821 let guest_snapshot = &worktree_snapshots[id];
5822 assert_eq!(
5823 guest_snapshot.root_name(),
5824 host_snapshot.root_name(),
5825 "{} has different root name than the host for worktree {}",
5826 guest_client.username,
5827 id
5828 );
5829 assert_eq!(
5830 guest_snapshot.entries(false).collect::<Vec<_>>(),
5831 host_snapshot.entries(false).collect::<Vec<_>>(),
5832 "{} has different snapshot than the host for worktree {}",
5833 guest_client.username,
5834 id
5835 );
5836 assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id());
5837 }
5838
5839 guest_client
5840 .project
5841 .as_ref()
5842 .unwrap()
5843 .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
5844
5845 for guest_buffer in &guest_client.buffers {
5846 let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
5847 let host_buffer = host_project.read_with(&host_cx, |project, cx| {
5848 project.buffer_for_id(buffer_id, cx).expect(&format!(
5849 "host does not have buffer for guest:{}, peer:{}, id:{}",
5850 guest_client.username, guest_client.peer_id, buffer_id
5851 ))
5852 });
5853 let path = host_buffer
5854 .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
5855
5856 assert_eq!(
5857 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
5858 0,
5859 "{}, buffer {}, path {:?} has deferred operations",
5860 guest_client.username,
5861 buffer_id,
5862 path,
5863 );
5864 assert_eq!(
5865 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
5866 host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
5867 "{}, buffer {}, path {:?}, differs from the host's buffer",
5868 guest_client.username,
5869 buffer_id,
5870 path
5871 );
5872 }
5873
5874 guest_cx.update(|_| drop(guest_client));
5875 }
5876
5877 host_cx.update(|_| drop(host_client));
5878 }
5879
5880 struct TestServer {
5881 peer: Arc<Peer>,
5882 app_state: Arc<AppState>,
5883 server: Arc<Server>,
5884 foreground: Rc<executor::Foreground>,
5885 notifications: mpsc::UnboundedReceiver<()>,
5886 connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
5887 forbid_connections: Arc<AtomicBool>,
5888 _test_db: TestDb,
5889 }
5890
5891 impl TestServer {
5892 async fn start(
5893 foreground: Rc<executor::Foreground>,
5894 background: Arc<executor::Background>,
5895 ) -> Self {
5896 let test_db = TestDb::fake(background);
5897 let app_state = Self::build_app_state(&test_db).await;
5898 let peer = Peer::new();
5899 let notifications = mpsc::unbounded();
5900 let server = Server::new(app_state.clone(), Some(notifications.0));
5901 Self {
5902 peer,
5903 app_state,
5904 server,
5905 foreground,
5906 notifications: notifications.1,
5907 connection_killers: Default::default(),
5908 forbid_connections: Default::default(),
5909 _test_db: test_db,
5910 }
5911 }
5912
5913 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
5914 cx.update(|cx| {
5915 let settings = Settings::test(cx);
5916 cx.set_global(settings);
5917 });
5918
5919 let http = FakeHttpClient::with_404_response();
5920 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
5921 let client_name = name.to_string();
5922 let mut client = Client::new(http.clone());
5923 let server = self.server.clone();
5924 let connection_killers = self.connection_killers.clone();
5925 let forbid_connections = self.forbid_connections.clone();
5926 let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
5927
5928 Arc::get_mut(&mut client)
5929 .unwrap()
5930 .override_authenticate(move |cx| {
5931 cx.spawn(|_| async move {
5932 let access_token = "the-token".to_string();
5933 Ok(Credentials {
5934 user_id: user_id.0 as u64,
5935 access_token,
5936 })
5937 })
5938 })
5939 .override_establish_connection(move |credentials, cx| {
5940 assert_eq!(credentials.user_id, user_id.0 as u64);
5941 assert_eq!(credentials.access_token, "the-token");
5942
5943 let server = server.clone();
5944 let connection_killers = connection_killers.clone();
5945 let forbid_connections = forbid_connections.clone();
5946 let client_name = client_name.clone();
5947 let connection_id_tx = connection_id_tx.clone();
5948 cx.spawn(move |cx| async move {
5949 if forbid_connections.load(SeqCst) {
5950 Err(EstablishConnectionError::other(anyhow!(
5951 "server is forbidding connections"
5952 )))
5953 } else {
5954 let (client_conn, server_conn, killed) =
5955 Connection::in_memory(cx.background());
5956 connection_killers.lock().insert(user_id, killed);
5957 cx.background()
5958 .spawn(server.handle_connection(
5959 server_conn,
5960 client_name,
5961 user_id,
5962 Some(connection_id_tx),
5963 cx.background(),
5964 ))
5965 .detach();
5966 Ok(client_conn)
5967 }
5968 })
5969 });
5970
5971 client
5972 .authenticate_and_connect(false, &cx.to_async())
5973 .await
5974 .unwrap();
5975
5976 Channel::init(&client);
5977 Project::init(&client);
5978 cx.update(|cx| {
5979 workspace::init(&client, cx);
5980 });
5981
5982 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
5983 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
5984
5985 let client = TestClient {
5986 client,
5987 peer_id,
5988 username: name.to_string(),
5989 user_store,
5990 language_registry: Arc::new(LanguageRegistry::test()),
5991 project: Default::default(),
5992 buffers: Default::default(),
5993 };
5994 client.wait_for_current_user(cx).await;
5995 client
5996 }
5997
5998 fn disconnect_client(&self, user_id: UserId) {
5999 self.connection_killers
6000 .lock()
6001 .remove(&user_id)
6002 .unwrap()
6003 .store(true, SeqCst);
6004 }
6005
6006 fn forbid_connections(&self) {
6007 self.forbid_connections.store(true, SeqCst);
6008 }
6009
6010 fn allow_connections(&self) {
6011 self.forbid_connections.store(false, SeqCst);
6012 }
6013
6014 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
6015 Arc::new(AppState {
6016 db: test_db.db().clone(),
6017 api_token: Default::default(),
6018 })
6019 }
6020
6021 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
6022 self.server.store.read().await
6023 }
6024
6025 async fn condition<F>(&mut self, mut predicate: F)
6026 where
6027 F: FnMut(&Store) -> bool,
6028 {
6029 assert!(
6030 self.foreground.parking_forbidden(),
6031 "you must call forbid_parking to use server conditions so we don't block indefinitely"
6032 );
6033 while !(predicate)(&*self.server.store.read().await) {
6034 self.foreground.start_waiting();
6035 self.notifications.next().await;
6036 self.foreground.finish_waiting();
6037 }
6038 }
6039 }
6040
6041 impl Deref for TestServer {
6042 type Target = Server;
6043
6044 fn deref(&self) -> &Self::Target {
6045 &self.server
6046 }
6047 }
6048
6049 impl Drop for TestServer {
6050 fn drop(&mut self) {
6051 self.peer.reset();
6052 }
6053 }
6054
6055 struct TestClient {
6056 client: Arc<Client>,
6057 username: String,
6058 pub peer_id: PeerId,
6059 pub user_store: ModelHandle<UserStore>,
6060 language_registry: Arc<LanguageRegistry>,
6061 project: Option<ModelHandle<Project>>,
6062 buffers: HashSet<ModelHandle<language::Buffer>>,
6063 }
6064
6065 impl Deref for TestClient {
6066 type Target = Arc<Client>;
6067
6068 fn deref(&self) -> &Self::Target {
6069 &self.client
6070 }
6071 }
6072
6073 impl TestClient {
6074 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
6075 UserId::from_proto(
6076 self.user_store
6077 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
6078 )
6079 }
6080
6081 async fn wait_for_current_user(&self, cx: &TestAppContext) {
6082 let mut authed_user = self
6083 .user_store
6084 .read_with(cx, |user_store, _| user_store.watch_current_user());
6085 while authed_user.next().await.unwrap().is_none() {}
6086 }
6087
6088 async fn build_local_project(
6089 &mut self,
6090 fs: Arc<FakeFs>,
6091 root_path: impl AsRef<Path>,
6092 cx: &mut TestAppContext,
6093 ) -> (ModelHandle<Project>, WorktreeId) {
6094 let project = cx.update(|cx| {
6095 Project::local(
6096 self.client.clone(),
6097 self.user_store.clone(),
6098 self.language_registry.clone(),
6099 fs,
6100 cx,
6101 )
6102 });
6103 self.project = Some(project.clone());
6104 let (worktree, _) = project
6105 .update(cx, |p, cx| {
6106 p.find_or_create_local_worktree(root_path, true, cx)
6107 })
6108 .await
6109 .unwrap();
6110 worktree
6111 .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
6112 .await;
6113 project
6114 .update(cx, |project, _| project.next_remote_id())
6115 .await;
6116 (project, worktree.read_with(cx, |tree, _| tree.id()))
6117 }
6118
6119 async fn build_remote_project(
6120 &mut self,
6121 project_id: u64,
6122 cx: &mut TestAppContext,
6123 ) -> ModelHandle<Project> {
6124 let project = Project::remote(
6125 project_id,
6126 self.client.clone(),
6127 self.user_store.clone(),
6128 self.language_registry.clone(),
6129 FakeFs::new(cx.background()),
6130 &mut cx.to_async(),
6131 )
6132 .await
6133 .unwrap();
6134 self.project = Some(project.clone());
6135 project
6136 }
6137
6138 fn build_workspace(
6139 &self,
6140 project: &ModelHandle<Project>,
6141 cx: &mut TestAppContext,
6142 ) -> ViewHandle<Workspace> {
6143 let (window_id, _) = cx.add_window(|_| EmptyView);
6144 cx.add_view(window_id, |cx| {
6145 let fs = project.read(cx).fs().clone();
6146 Workspace::new(
6147 &WorkspaceParams {
6148 fs,
6149 project: project.clone(),
6150 user_store: self.user_store.clone(),
6151 languages: self.language_registry.clone(),
6152 themes: ThemeRegistry::new((), cx.font_cache().clone()),
6153 channel_list: cx.add_model(|cx| {
6154 ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
6155 }),
6156 client: self.client.clone(),
6157 },
6158 cx,
6159 )
6160 })
6161 }
6162
6163 async fn simulate_host(
6164 mut self,
6165 project: ModelHandle<Project>,
6166 files: Arc<Mutex<Vec<PathBuf>>>,
6167 op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6168 rng: Arc<Mutex<StdRng>>,
6169 mut cx: TestAppContext,
6170 ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6171 async fn simulate_host_internal(
6172 client: &mut TestClient,
6173 project: ModelHandle<Project>,
6174 files: Arc<Mutex<Vec<PathBuf>>>,
6175 mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6176 rng: Arc<Mutex<StdRng>>,
6177 cx: &mut TestAppContext,
6178 ) -> anyhow::Result<()> {
6179 let fs = project.read_with(cx, |project, _| project.fs().clone());
6180
6181 while op_start_signal.next().await.is_some() {
6182 let distribution = rng.lock().gen_range::<usize, _>(0..100);
6183 match distribution {
6184 0..=20 if !files.lock().is_empty() => {
6185 let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
6186 let mut path = path.as_path();
6187 while let Some(parent_path) = path.parent() {
6188 path = parent_path;
6189 if rng.lock().gen() {
6190 break;
6191 }
6192 }
6193
6194 log::info!("Host: find/create local worktree {:?}", path);
6195 let find_or_create_worktree = project.update(cx, |project, cx| {
6196 project.find_or_create_local_worktree(path, true, cx)
6197 });
6198 if rng.lock().gen() {
6199 cx.background().spawn(find_or_create_worktree).detach();
6200 } else {
6201 find_or_create_worktree.await?;
6202 }
6203 }
6204 10..=80 if !files.lock().is_empty() => {
6205 let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6206 let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
6207 let (worktree, path) = project
6208 .update(cx, |project, cx| {
6209 project.find_or_create_local_worktree(
6210 file.clone(),
6211 true,
6212 cx,
6213 )
6214 })
6215 .await?;
6216 let project_path =
6217 worktree.read_with(cx, |worktree, _| (worktree.id(), path));
6218 log::info!(
6219 "Host: opening path {:?}, worktree {}, relative_path {:?}",
6220 file,
6221 project_path.0,
6222 project_path.1
6223 );
6224 let buffer = project
6225 .update(cx, |project, cx| project.open_buffer(project_path, cx))
6226 .await
6227 .unwrap();
6228 client.buffers.insert(buffer.clone());
6229 buffer
6230 } else {
6231 client
6232 .buffers
6233 .iter()
6234 .choose(&mut *rng.lock())
6235 .unwrap()
6236 .clone()
6237 };
6238
6239 if rng.lock().gen_bool(0.1) {
6240 cx.update(|cx| {
6241 log::info!(
6242 "Host: dropping buffer {:?}",
6243 buffer.read(cx).file().unwrap().full_path(cx)
6244 );
6245 client.buffers.remove(&buffer);
6246 drop(buffer);
6247 });
6248 } else {
6249 buffer.update(cx, |buffer, cx| {
6250 log::info!(
6251 "Host: updating buffer {:?} ({})",
6252 buffer.file().unwrap().full_path(cx),
6253 buffer.remote_id()
6254 );
6255
6256 if rng.lock().gen_bool(0.7) {
6257 buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6258 } else {
6259 buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6260 }
6261 });
6262 }
6263 }
6264 _ => loop {
6265 let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
6266 let mut path = PathBuf::new();
6267 path.push("/");
6268 for _ in 0..path_component_count {
6269 let letter = rng.lock().gen_range(b'a'..=b'z');
6270 path.push(std::str::from_utf8(&[letter]).unwrap());
6271 }
6272 path.set_extension("rs");
6273 let parent_path = path.parent().unwrap();
6274
6275 log::info!("Host: creating file {:?}", path,);
6276
6277 if fs.create_dir(&parent_path).await.is_ok()
6278 && fs.create_file(&path, Default::default()).await.is_ok()
6279 {
6280 files.lock().push(path);
6281 break;
6282 } else {
6283 log::info!("Host: cannot create file");
6284 }
6285 },
6286 }
6287
6288 cx.background().simulate_random_delay().await;
6289 }
6290
6291 Ok(())
6292 }
6293
6294 let result = simulate_host_internal(
6295 &mut self,
6296 project.clone(),
6297 files,
6298 op_start_signal,
6299 rng,
6300 &mut cx,
6301 )
6302 .await;
6303 log::info!("Host done");
6304 self.project = Some(project);
6305 (self, cx, result.err())
6306 }
6307
6308 pub async fn simulate_guest(
6309 mut self,
6310 guest_username: String,
6311 project: ModelHandle<Project>,
6312 op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6313 rng: Arc<Mutex<StdRng>>,
6314 mut cx: TestAppContext,
6315 ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6316 async fn simulate_guest_internal(
6317 client: &mut TestClient,
6318 guest_username: &str,
6319 project: ModelHandle<Project>,
6320 mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6321 rng: Arc<Mutex<StdRng>>,
6322 cx: &mut TestAppContext,
6323 ) -> anyhow::Result<()> {
6324 while op_start_signal.next().await.is_some() {
6325 let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6326 let worktree = if let Some(worktree) =
6327 project.read_with(cx, |project, cx| {
6328 project
6329 .worktrees(&cx)
6330 .filter(|worktree| {
6331 let worktree = worktree.read(cx);
6332 worktree.is_visible()
6333 && worktree.entries(false).any(|e| e.is_file())
6334 })
6335 .choose(&mut *rng.lock())
6336 }) {
6337 worktree
6338 } else {
6339 cx.background().simulate_random_delay().await;
6340 continue;
6341 };
6342
6343 let (worktree_root_name, project_path) =
6344 worktree.read_with(cx, |worktree, _| {
6345 let entry = worktree
6346 .entries(false)
6347 .filter(|e| e.is_file())
6348 .choose(&mut *rng.lock())
6349 .unwrap();
6350 (
6351 worktree.root_name().to_string(),
6352 (worktree.id(), entry.path.clone()),
6353 )
6354 });
6355 log::info!(
6356 "{}: opening path {:?} in worktree {} ({})",
6357 guest_username,
6358 project_path.1,
6359 project_path.0,
6360 worktree_root_name,
6361 );
6362 let buffer = project
6363 .update(cx, |project, cx| {
6364 project.open_buffer(project_path.clone(), cx)
6365 })
6366 .await?;
6367 log::info!(
6368 "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
6369 guest_username,
6370 project_path.1,
6371 project_path.0,
6372 worktree_root_name,
6373 buffer.read_with(cx, |buffer, _| buffer.remote_id())
6374 );
6375 client.buffers.insert(buffer.clone());
6376 buffer
6377 } else {
6378 client
6379 .buffers
6380 .iter()
6381 .choose(&mut *rng.lock())
6382 .unwrap()
6383 .clone()
6384 };
6385
6386 let choice = rng.lock().gen_range(0..100);
6387 match choice {
6388 0..=9 => {
6389 cx.update(|cx| {
6390 log::info!(
6391 "{}: dropping buffer {:?}",
6392 guest_username,
6393 buffer.read(cx).file().unwrap().full_path(cx)
6394 );
6395 client.buffers.remove(&buffer);
6396 drop(buffer);
6397 });
6398 }
6399 10..=19 => {
6400 let completions = project.update(cx, |project, cx| {
6401 log::info!(
6402 "{}: requesting completions for buffer {} ({:?})",
6403 guest_username,
6404 buffer.read(cx).remote_id(),
6405 buffer.read(cx).file().unwrap().full_path(cx)
6406 );
6407 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6408 project.completions(&buffer, offset, cx)
6409 });
6410 let completions = cx.background().spawn(async move {
6411 completions
6412 .await
6413 .map_err(|err| anyhow!("completions request failed: {:?}", err))
6414 });
6415 if rng.lock().gen_bool(0.3) {
6416 log::info!("{}: detaching completions request", guest_username);
6417 cx.update(|cx| completions.detach_and_log_err(cx));
6418 } else {
6419 completions.await?;
6420 }
6421 }
6422 20..=29 => {
6423 let code_actions = project.update(cx, |project, cx| {
6424 log::info!(
6425 "{}: requesting code actions for buffer {} ({:?})",
6426 guest_username,
6427 buffer.read(cx).remote_id(),
6428 buffer.read(cx).file().unwrap().full_path(cx)
6429 );
6430 let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
6431 project.code_actions(&buffer, range, cx)
6432 });
6433 let code_actions = cx.background().spawn(async move {
6434 code_actions.await.map_err(|err| {
6435 anyhow!("code actions request failed: {:?}", err)
6436 })
6437 });
6438 if rng.lock().gen_bool(0.3) {
6439 log::info!("{}: detaching code actions request", guest_username);
6440 cx.update(|cx| code_actions.detach_and_log_err(cx));
6441 } else {
6442 code_actions.await?;
6443 }
6444 }
6445 30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
6446 let (requested_version, save) = buffer.update(cx, |buffer, cx| {
6447 log::info!(
6448 "{}: saving buffer {} ({:?})",
6449 guest_username,
6450 buffer.remote_id(),
6451 buffer.file().unwrap().full_path(cx)
6452 );
6453 (buffer.version(), buffer.save(cx))
6454 });
6455 let save = cx.background().spawn(async move {
6456 let (saved_version, _) = save
6457 .await
6458 .map_err(|err| anyhow!("save request failed: {:?}", err))?;
6459 assert!(saved_version.observed_all(&requested_version));
6460 Ok::<_, anyhow::Error>(())
6461 });
6462 if rng.lock().gen_bool(0.3) {
6463 log::info!("{}: detaching save request", guest_username);
6464 cx.update(|cx| save.detach_and_log_err(cx));
6465 } else {
6466 save.await?;
6467 }
6468 }
6469 40..=44 => {
6470 let prepare_rename = project.update(cx, |project, cx| {
6471 log::info!(
6472 "{}: preparing rename for buffer {} ({:?})",
6473 guest_username,
6474 buffer.read(cx).remote_id(),
6475 buffer.read(cx).file().unwrap().full_path(cx)
6476 );
6477 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6478 project.prepare_rename(buffer, offset, cx)
6479 });
6480 let prepare_rename = cx.background().spawn(async move {
6481 prepare_rename.await.map_err(|err| {
6482 anyhow!("prepare rename request failed: {:?}", err)
6483 })
6484 });
6485 if rng.lock().gen_bool(0.3) {
6486 log::info!("{}: detaching prepare rename request", guest_username);
6487 cx.update(|cx| prepare_rename.detach_and_log_err(cx));
6488 } else {
6489 prepare_rename.await?;
6490 }
6491 }
6492 45..=49 => {
6493 let definitions = project.update(cx, |project, cx| {
6494 log::info!(
6495 "{}: requesting definitions for buffer {} ({:?})",
6496 guest_username,
6497 buffer.read(cx).remote_id(),
6498 buffer.read(cx).file().unwrap().full_path(cx)
6499 );
6500 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6501 project.definition(&buffer, offset, cx)
6502 });
6503 let definitions = cx.background().spawn(async move {
6504 definitions
6505 .await
6506 .map_err(|err| anyhow!("definitions request failed: {:?}", err))
6507 });
6508 if rng.lock().gen_bool(0.3) {
6509 log::info!("{}: detaching definitions request", guest_username);
6510 cx.update(|cx| definitions.detach_and_log_err(cx));
6511 } else {
6512 client
6513 .buffers
6514 .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
6515 }
6516 }
6517 50..=54 => {
6518 let highlights = project.update(cx, |project, cx| {
6519 log::info!(
6520 "{}: requesting highlights for buffer {} ({:?})",
6521 guest_username,
6522 buffer.read(cx).remote_id(),
6523 buffer.read(cx).file().unwrap().full_path(cx)
6524 );
6525 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6526 project.document_highlights(&buffer, offset, cx)
6527 });
6528 let highlights = cx.background().spawn(async move {
6529 highlights
6530 .await
6531 .map_err(|err| anyhow!("highlights request failed: {:?}", err))
6532 });
6533 if rng.lock().gen_bool(0.3) {
6534 log::info!("{}: detaching highlights request", guest_username);
6535 cx.update(|cx| highlights.detach_and_log_err(cx));
6536 } else {
6537 highlights.await?;
6538 }
6539 }
6540 55..=59 => {
6541 let search = project.update(cx, |project, cx| {
6542 let query = rng.lock().gen_range('a'..='z');
6543 log::info!("{}: project-wide search {:?}", guest_username, query);
6544 project.search(SearchQuery::text(query, false, false), cx)
6545 });
6546 let search = cx.background().spawn(async move {
6547 search
6548 .await
6549 .map_err(|err| anyhow!("search request failed: {:?}", err))
6550 });
6551 if rng.lock().gen_bool(0.3) {
6552 log::info!("{}: detaching search request", guest_username);
6553 cx.update(|cx| search.detach_and_log_err(cx));
6554 } else {
6555 client.buffers.extend(search.await?.into_keys());
6556 }
6557 }
6558 60..=69 => {
6559 let worktree = project
6560 .read_with(cx, |project, cx| {
6561 project
6562 .worktrees(&cx)
6563 .filter(|worktree| {
6564 let worktree = worktree.read(cx);
6565 worktree.is_visible()
6566 && worktree.entries(false).any(|e| e.is_file())
6567 && worktree
6568 .root_entry()
6569 .map_or(false, |e| e.is_dir())
6570 })
6571 .choose(&mut *rng.lock())
6572 })
6573 .unwrap();
6574 let (worktree_id, worktree_root_name) = worktree
6575 .read_with(cx, |worktree, _| {
6576 (worktree.id(), worktree.root_name().to_string())
6577 });
6578
6579 let mut new_name = String::new();
6580 for _ in 0..10 {
6581 let letter = rng.lock().gen_range('a'..='z');
6582 new_name.push(letter);
6583 }
6584 let mut new_path = PathBuf::new();
6585 new_path.push(new_name);
6586 new_path.set_extension("rs");
6587 log::info!(
6588 "{}: creating {:?} in worktree {} ({})",
6589 guest_username,
6590 new_path,
6591 worktree_id,
6592 worktree_root_name,
6593 );
6594 project
6595 .update(cx, |project, cx| {
6596 project.create_entry((worktree_id, new_path), false, cx)
6597 })
6598 .unwrap()
6599 .await?;
6600 }
6601 _ => {
6602 buffer.update(cx, |buffer, cx| {
6603 log::info!(
6604 "{}: updating buffer {} ({:?})",
6605 guest_username,
6606 buffer.remote_id(),
6607 buffer.file().unwrap().full_path(cx)
6608 );
6609 if rng.lock().gen_bool(0.7) {
6610 buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6611 } else {
6612 buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6613 }
6614 });
6615 }
6616 }
6617 cx.background().simulate_random_delay().await;
6618 }
6619 Ok(())
6620 }
6621
6622 let result = simulate_guest_internal(
6623 &mut self,
6624 &guest_username,
6625 project.clone(),
6626 op_start_signal,
6627 rng,
6628 &mut cx,
6629 )
6630 .await;
6631 log::info!("{}: done", guest_username);
6632
6633 self.project = Some(project);
6634 (self, cx, result.err())
6635 }
6636 }
6637
6638 impl Drop for TestClient {
6639 fn drop(&mut self) {
6640 self.client.tear_down();
6641 }
6642 }
6643
6644 impl Executor for Arc<gpui::executor::Background> {
6645 type Sleep = gpui::executor::Timer;
6646
6647 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
6648 self.spawn(future).detach();
6649 }
6650
6651 fn sleep(&self, duration: Duration) -> Self::Sleep {
6652 self.as_ref().timer(duration)
6653 }
6654 }
6655
6656 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
6657 channel
6658 .messages()
6659 .cursor::<()>()
6660 .map(|m| {
6661 (
6662 m.sender.github_login.clone(),
6663 m.body.clone(),
6664 m.is_pending(),
6665 )
6666 })
6667 .collect()
6668 }
6669
6670 struct EmptyView;
6671
6672 impl gpui::Entity for EmptyView {
6673 type Event = ();
6674 }
6675
6676 impl gpui::View for EmptyView {
6677 fn ui_name() -> &'static str {
6678 "empty view"
6679 }
6680
6681 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
6682 gpui::Element::boxed(gpui::elements::Empty)
6683 }
6684 }
6685}