1mod store;
2
3use crate::{
4 auth,
5 db::{ChannelId, MessageId, UserId},
6 AppState, Result,
7};
8use anyhow::anyhow;
9use async_tungstenite::tungstenite::{
10 protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
11};
12use axum::{
13 body::Body,
14 extract::{
15 ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
16 ConnectInfo, WebSocketUpgrade,
17 },
18 headers::{Header, HeaderName},
19 http::StatusCode,
20 middleware,
21 response::IntoResponse,
22 routing::get,
23 Extension, Router, TypedHeader,
24};
25use collections::{HashMap, HashSet};
26use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
27use lazy_static::lazy_static;
28use rpc::{
29 proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
30 Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
31};
32use std::{
33 any::TypeId,
34 future::Future,
35 marker::PhantomData,
36 net::SocketAddr,
37 ops::{Deref, DerefMut},
38 rc::Rc,
39 sync::{
40 atomic::{AtomicBool, Ordering::SeqCst},
41 Arc,
42 },
43 time::Duration,
44};
45use store::{Store, Worktree};
46use time::OffsetDateTime;
47use tokio::{
48 sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
49 time::Sleep,
50};
51use tower::ServiceBuilder;
52use tracing::{info_span, instrument, Instrument};
53
54type MessageHandler =
55 Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
56
57struct Response<R> {
58 server: Arc<Server>,
59 receipt: Receipt<R>,
60 responded: Arc<AtomicBool>,
61}
62
63impl<R: RequestMessage> Response<R> {
64 fn send(self, payload: R::Response) -> Result<()> {
65 self.responded.store(true, SeqCst);
66 self.server.peer.respond(self.receipt, payload)?;
67 Ok(())
68 }
69}
70
71pub struct Server {
72 peer: Arc<Peer>,
73 store: RwLock<Store>,
74 app_state: Arc<AppState>,
75 handlers: HashMap<TypeId, MessageHandler>,
76 notifications: Option<mpsc::UnboundedSender<()>>,
77}
78
79pub trait Executor: Send + Clone {
80 type Sleep: Send + Future;
81 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
82 fn sleep(&self, duration: Duration) -> Self::Sleep;
83}
84
85#[derive(Clone)]
86pub struct RealExecutor;
87
88const MESSAGE_COUNT_PER_PAGE: usize = 100;
89const MAX_MESSAGE_LEN: usize = 1024;
90
91struct StoreReadGuard<'a> {
92 guard: RwLockReadGuard<'a, Store>,
93 _not_send: PhantomData<Rc<()>>,
94}
95
96struct StoreWriteGuard<'a> {
97 guard: RwLockWriteGuard<'a, Store>,
98 _not_send: PhantomData<Rc<()>>,
99}
100
101impl Server {
102 pub fn new(
103 app_state: Arc<AppState>,
104 notifications: Option<mpsc::UnboundedSender<()>>,
105 ) -> Arc<Self> {
106 let mut server = Self {
107 peer: Peer::new(),
108 app_state,
109 store: Default::default(),
110 handlers: Default::default(),
111 notifications,
112 };
113
114 server
115 .add_request_handler(Server::ping)
116 .add_request_handler(Server::register_project)
117 .add_message_handler(Server::unregister_project)
118 .add_request_handler(Server::share_project)
119 .add_message_handler(Server::unshare_project)
120 .add_request_handler(Server::join_project)
121 .add_message_handler(Server::leave_project)
122 .add_request_handler(Server::register_worktree)
123 .add_message_handler(Server::unregister_worktree)
124 .add_request_handler(Server::update_worktree)
125 .add_message_handler(Server::start_language_server)
126 .add_message_handler(Server::update_language_server)
127 .add_message_handler(Server::update_diagnostic_summary)
128 .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
129 .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
130 .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
131 .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
132 .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
133 .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
134 .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
135 .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
136 .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
137 .add_request_handler(
138 Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
139 )
140 .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
141 .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
142 .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
143 .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
144 .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
145 .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
146 .add_request_handler(Server::forward_project_request::<proto::CreateProjectEntry>)
147 .add_request_handler(Server::forward_project_request::<proto::RenameProjectEntry>)
148 .add_request_handler(Server::forward_project_request::<proto::DeleteProjectEntry>)
149 .add_request_handler(Server::update_buffer)
150 .add_message_handler(Server::update_buffer_file)
151 .add_message_handler(Server::buffer_reloaded)
152 .add_message_handler(Server::buffer_saved)
153 .add_request_handler(Server::save_buffer)
154 .add_request_handler(Server::get_channels)
155 .add_request_handler(Server::get_users)
156 .add_request_handler(Server::fuzzy_search_users)
157 .add_request_handler(Server::request_contact)
158 .add_request_handler(Server::respond_to_contact_request)
159 .add_request_handler(Server::join_channel)
160 .add_message_handler(Server::leave_channel)
161 .add_request_handler(Server::send_channel_message)
162 .add_request_handler(Server::follow)
163 .add_message_handler(Server::unfollow)
164 .add_message_handler(Server::update_followers)
165 .add_request_handler(Server::get_channel_messages);
166
167 Arc::new(server)
168 }
169
170 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
171 where
172 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
173 Fut: 'static + Send + Future<Output = Result<()>>,
174 M: EnvelopedMessage,
175 {
176 let prev_handler = self.handlers.insert(
177 TypeId::of::<M>(),
178 Box::new(move |server, envelope| {
179 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
180 let span = info_span!(
181 "handle message",
182 payload_type = envelope.payload_type_name(),
183 payload = format!("{:?}", envelope.payload).as_str(),
184 );
185 let future = (handler)(server, *envelope);
186 async move {
187 if let Err(error) = future.await {
188 tracing::error!(%error, "error handling message");
189 }
190 }
191 .instrument(span)
192 .boxed()
193 }),
194 );
195 if prev_handler.is_some() {
196 panic!("registered a handler for the same message twice");
197 }
198 self
199 }
200
201 /// Handle a request while holding a lock to the store. This is useful when we're registering
202 /// a connection but we want to respond on the connection before anybody else can send on it.
203 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
204 where
205 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>, Response<M>) -> Fut,
206 Fut: Send + Future<Output = Result<()>>,
207 M: RequestMessage,
208 {
209 let handler = Arc::new(handler);
210 self.add_message_handler(move |server, envelope| {
211 let receipt = envelope.receipt();
212 let handler = handler.clone();
213 async move {
214 let responded = Arc::new(AtomicBool::default());
215 let response = Response {
216 server: server.clone(),
217 responded: responded.clone(),
218 receipt: envelope.receipt(),
219 };
220 match (handler)(server.clone(), envelope, response).await {
221 Ok(()) => {
222 if responded.load(std::sync::atomic::Ordering::SeqCst) {
223 Ok(())
224 } else {
225 Err(anyhow!("handler did not send a response"))?
226 }
227 }
228 Err(error) => {
229 server.peer.respond_with_error(
230 receipt,
231 proto::Error {
232 message: error.to_string(),
233 },
234 )?;
235 Err(error)
236 }
237 }
238 }
239 })
240 }
241
242 pub fn handle_connection<E: Executor>(
243 self: &Arc<Self>,
244 connection: Connection,
245 address: String,
246 user_id: UserId,
247 mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
248 executor: E,
249 ) -> impl Future<Output = ()> {
250 let mut this = self.clone();
251 let span = info_span!("handle connection", %user_id, %address);
252 async move {
253 let (connection_id, handle_io, mut incoming_rx) = this
254 .peer
255 .add_connection(connection, {
256 let executor = executor.clone();
257 move |duration| {
258 let timer = executor.sleep(duration);
259 async move {
260 timer.await;
261 }
262 }
263 })
264 .await;
265
266 tracing::info!(%user_id, %connection_id, %address, "connection opened");
267
268 if let Some(send_connection_id) = send_connection_id.as_mut() {
269 let _ = send_connection_id.send(connection_id).await;
270 }
271
272 {
273 let mut state = this.state_mut().await;
274 state.add_connection(connection_id, user_id);
275 this.update_contacts_for_users(&*state, &[user_id]);
276 }
277
278 let handle_io = handle_io.fuse();
279 futures::pin_mut!(handle_io);
280 loop {
281 let next_message = incoming_rx.next().fuse();
282 futures::pin_mut!(next_message);
283 futures::select_biased! {
284 result = handle_io => {
285 if let Err(error) = result {
286 tracing::error!(%error, "error handling I/O");
287 }
288 break;
289 }
290 message = next_message => {
291 if let Some(message) = message {
292 let type_name = message.payload_type_name();
293 let span = tracing::info_span!("receive message", %user_id, %connection_id, %address, type_name);
294 async {
295 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
296 let notifications = this.notifications.clone();
297 let is_background = message.is_background();
298 let handle_message = (handler)(this.clone(), message);
299 let handle_message = async move {
300 handle_message.await;
301 if let Some(mut notifications) = notifications {
302 let _ = notifications.send(()).await;
303 }
304 };
305 if is_background {
306 executor.spawn_detached(handle_message);
307 } else {
308 handle_message.await;
309 }
310 } else {
311 tracing::error!("no message handler");
312 }
313 }.instrument(span).await;
314 } else {
315 tracing::info!(%user_id, %connection_id, %address, "connection closed");
316 break;
317 }
318 }
319 }
320 }
321
322 if let Err(error) = this.sign_out(connection_id).await {
323 tracing::error!(%error, "error signing out");
324 }
325 }.instrument(span)
326 }
327
328 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
329 self.peer.disconnect(connection_id);
330 let mut state = self.state_mut().await;
331 let removed_connection = state.remove_connection(connection_id)?;
332
333 for (project_id, project) in removed_connection.hosted_projects {
334 if let Some(share) = project.share {
335 broadcast(
336 connection_id,
337 share.guests.keys().copied().collect(),
338 |conn_id| {
339 self.peer
340 .send(conn_id, proto::UnshareProject { project_id })
341 },
342 );
343 }
344 }
345
346 for (project_id, peer_ids) in removed_connection.guest_project_ids {
347 broadcast(connection_id, peer_ids, |conn_id| {
348 self.peer.send(
349 conn_id,
350 proto::RemoveProjectCollaborator {
351 project_id,
352 peer_id: connection_id.0,
353 },
354 )
355 });
356 }
357
358 self.update_contacts_for_users(&*state, removed_connection.contact_ids.iter());
359 Ok(())
360 }
361
362 async fn ping(
363 self: Arc<Server>,
364 _: TypedEnvelope<proto::Ping>,
365 response: Response<proto::Ping>,
366 ) -> Result<()> {
367 response.send(proto::Ack {})?;
368 Ok(())
369 }
370
371 async fn register_project(
372 self: Arc<Server>,
373 request: TypedEnvelope<proto::RegisterProject>,
374 response: Response<proto::RegisterProject>,
375 ) -> Result<()> {
376 let project_id = {
377 let mut state = self.state_mut().await;
378 let user_id = state.user_id_for_connection(request.sender_id)?;
379 state.register_project(request.sender_id, user_id)
380 };
381 response.send(proto::RegisterProjectResponse { project_id })?;
382 Ok(())
383 }
384
385 async fn unregister_project(
386 self: Arc<Server>,
387 request: TypedEnvelope<proto::UnregisterProject>,
388 ) -> Result<()> {
389 let mut state = self.state_mut().await;
390 let project = state.unregister_project(request.payload.project_id, request.sender_id)?;
391 self.update_contacts_for_users(&*state, &project.authorized_user_ids());
392 Ok(())
393 }
394
395 async fn share_project(
396 self: Arc<Server>,
397 request: TypedEnvelope<proto::ShareProject>,
398 response: Response<proto::ShareProject>,
399 ) -> Result<()> {
400 let mut state = self.state_mut().await;
401 let project = state.share_project(request.payload.project_id, request.sender_id)?;
402 self.update_contacts_for_users(&mut *state, &project.authorized_user_ids);
403 response.send(proto::Ack {})?;
404 Ok(())
405 }
406
407 async fn unshare_project(
408 self: Arc<Server>,
409 request: TypedEnvelope<proto::UnshareProject>,
410 ) -> Result<()> {
411 let project_id = request.payload.project_id;
412 let mut state = self.state_mut().await;
413 let project = state.unshare_project(project_id, request.sender_id)?;
414 broadcast(request.sender_id, project.connection_ids, |conn_id| {
415 self.peer
416 .send(conn_id, proto::UnshareProject { project_id })
417 });
418 self.update_contacts_for_users(&mut *state, &project.authorized_user_ids);
419 Ok(())
420 }
421
422 async fn join_project(
423 self: Arc<Server>,
424 request: TypedEnvelope<proto::JoinProject>,
425 response: Response<proto::JoinProject>,
426 ) -> Result<()> {
427 let project_id = request.payload.project_id;
428
429 let state = &mut *self.state_mut().await;
430 let user_id = state.user_id_for_connection(request.sender_id)?;
431 let (response_payload, connection_ids, contact_user_ids) = state
432 .join_project(request.sender_id, user_id, project_id)
433 .and_then(|joined| {
434 let share = joined.project.share()?;
435 let peer_count = share.guests.len();
436 let mut collaborators = Vec::with_capacity(peer_count);
437 collaborators.push(proto::Collaborator {
438 peer_id: joined.project.host_connection_id.0,
439 replica_id: 0,
440 user_id: joined.project.host_user_id.to_proto(),
441 });
442 let worktrees = share
443 .worktrees
444 .iter()
445 .filter_map(|(id, shared_worktree)| {
446 let worktree = joined.project.worktrees.get(&id)?;
447 Some(proto::Worktree {
448 id: *id,
449 root_name: worktree.root_name.clone(),
450 entries: shared_worktree.entries.values().cloned().collect(),
451 diagnostic_summaries: shared_worktree
452 .diagnostic_summaries
453 .values()
454 .cloned()
455 .collect(),
456 visible: worktree.visible,
457 scan_id: shared_worktree.scan_id,
458 })
459 })
460 .collect();
461 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
462 if *peer_conn_id != request.sender_id {
463 collaborators.push(proto::Collaborator {
464 peer_id: peer_conn_id.0,
465 replica_id: *peer_replica_id as u32,
466 user_id: peer_user_id.to_proto(),
467 });
468 }
469 }
470 let response = proto::JoinProjectResponse {
471 worktrees,
472 replica_id: joined.replica_id as u32,
473 collaborators,
474 language_servers: joined.project.language_servers.clone(),
475 };
476 let connection_ids = joined.project.connection_ids();
477 let contact_user_ids = joined.project.authorized_user_ids();
478 Ok((response, connection_ids, contact_user_ids))
479 })?;
480
481 broadcast(request.sender_id, connection_ids, |conn_id| {
482 self.peer.send(
483 conn_id,
484 proto::AddProjectCollaborator {
485 project_id,
486 collaborator: Some(proto::Collaborator {
487 peer_id: request.sender_id.0,
488 replica_id: response_payload.replica_id,
489 user_id: user_id.to_proto(),
490 }),
491 },
492 )
493 });
494 self.update_contacts_for_users(state, &contact_user_ids);
495 response.send(response_payload)?;
496 Ok(())
497 }
498
499 async fn leave_project(
500 self: Arc<Server>,
501 request: TypedEnvelope<proto::LeaveProject>,
502 ) -> Result<()> {
503 let sender_id = request.sender_id;
504 let project_id = request.payload.project_id;
505 let mut state = self.state_mut().await;
506 let worktree = state.leave_project(sender_id, project_id)?;
507 broadcast(sender_id, worktree.connection_ids, |conn_id| {
508 self.peer.send(
509 conn_id,
510 proto::RemoveProjectCollaborator {
511 project_id,
512 peer_id: sender_id.0,
513 },
514 )
515 });
516 self.update_contacts_for_users(&*state, &worktree.authorized_user_ids);
517 Ok(())
518 }
519
520 async fn register_worktree(
521 self: Arc<Server>,
522 request: TypedEnvelope<proto::RegisterWorktree>,
523 response: Response<proto::RegisterWorktree>,
524 ) -> Result<()> {
525 let mut contact_user_ids = HashSet::default();
526 for github_login in &request.payload.authorized_logins {
527 let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
528 contact_user_ids.insert(contact_user_id);
529 }
530
531 let mut state = self.state_mut().await;
532 let host_user_id = state.user_id_for_connection(request.sender_id)?;
533 contact_user_ids.insert(host_user_id);
534
535 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
536 let guest_connection_ids = state
537 .read_project(request.payload.project_id, request.sender_id)?
538 .guest_connection_ids();
539 state.register_worktree(
540 request.payload.project_id,
541 request.payload.worktree_id,
542 request.sender_id,
543 Worktree {
544 authorized_user_ids: contact_user_ids.clone(),
545 root_name: request.payload.root_name.clone(),
546 visible: request.payload.visible,
547 },
548 )?;
549
550 broadcast(request.sender_id, guest_connection_ids, |connection_id| {
551 self.peer
552 .forward_send(request.sender_id, connection_id, request.payload.clone())
553 });
554 self.update_contacts_for_users(&*state, &contact_user_ids);
555 response.send(proto::Ack {})?;
556 Ok(())
557 }
558
559 async fn unregister_worktree(
560 self: Arc<Server>,
561 request: TypedEnvelope<proto::UnregisterWorktree>,
562 ) -> Result<()> {
563 let project_id = request.payload.project_id;
564 let worktree_id = request.payload.worktree_id;
565 let mut state = self.state_mut().await;
566 let (worktree, guest_connection_ids) =
567 state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
568 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
569 self.peer.send(
570 conn_id,
571 proto::UnregisterWorktree {
572 project_id,
573 worktree_id,
574 },
575 )
576 });
577 self.update_contacts_for_users(&*state, &worktree.authorized_user_ids);
578 Ok(())
579 }
580
581 async fn update_worktree(
582 self: Arc<Server>,
583 request: TypedEnvelope<proto::UpdateWorktree>,
584 response: Response<proto::UpdateWorktree>,
585 ) -> Result<()> {
586 let connection_ids = self.state_mut().await.update_worktree(
587 request.sender_id,
588 request.payload.project_id,
589 request.payload.worktree_id,
590 &request.payload.removed_entries,
591 &request.payload.updated_entries,
592 request.payload.scan_id,
593 )?;
594
595 broadcast(request.sender_id, connection_ids, |connection_id| {
596 self.peer
597 .forward_send(request.sender_id, connection_id, request.payload.clone())
598 });
599 response.send(proto::Ack {})?;
600 Ok(())
601 }
602
603 async fn update_diagnostic_summary(
604 self: Arc<Server>,
605 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
606 ) -> Result<()> {
607 let summary = request
608 .payload
609 .summary
610 .clone()
611 .ok_or_else(|| anyhow!("invalid summary"))?;
612 let receiver_ids = self.state_mut().await.update_diagnostic_summary(
613 request.payload.project_id,
614 request.payload.worktree_id,
615 request.sender_id,
616 summary,
617 )?;
618
619 broadcast(request.sender_id, receiver_ids, |connection_id| {
620 self.peer
621 .forward_send(request.sender_id, connection_id, request.payload.clone())
622 });
623 Ok(())
624 }
625
626 async fn start_language_server(
627 self: Arc<Server>,
628 request: TypedEnvelope<proto::StartLanguageServer>,
629 ) -> Result<()> {
630 let receiver_ids = self.state_mut().await.start_language_server(
631 request.payload.project_id,
632 request.sender_id,
633 request
634 .payload
635 .server
636 .clone()
637 .ok_or_else(|| anyhow!("invalid language server"))?,
638 )?;
639 broadcast(request.sender_id, receiver_ids, |connection_id| {
640 self.peer
641 .forward_send(request.sender_id, connection_id, request.payload.clone())
642 });
643 Ok(())
644 }
645
646 async fn update_language_server(
647 self: Arc<Server>,
648 request: TypedEnvelope<proto::UpdateLanguageServer>,
649 ) -> Result<()> {
650 let receiver_ids = self
651 .state()
652 .await
653 .project_connection_ids(request.payload.project_id, request.sender_id)?;
654 broadcast(request.sender_id, receiver_ids, |connection_id| {
655 self.peer
656 .forward_send(request.sender_id, connection_id, request.payload.clone())
657 });
658 Ok(())
659 }
660
661 async fn forward_project_request<T>(
662 self: Arc<Server>,
663 request: TypedEnvelope<T>,
664 response: Response<T>,
665 ) -> Result<()>
666 where
667 T: EntityMessage + RequestMessage,
668 {
669 let host_connection_id = self
670 .state()
671 .await
672 .read_project(request.payload.remote_entity_id(), request.sender_id)?
673 .host_connection_id;
674
675 response.send(
676 self.peer
677 .forward_request(request.sender_id, host_connection_id, request.payload)
678 .await?,
679 )?;
680 Ok(())
681 }
682
683 async fn save_buffer(
684 self: Arc<Server>,
685 request: TypedEnvelope<proto::SaveBuffer>,
686 response: Response<proto::SaveBuffer>,
687 ) -> Result<()> {
688 let host = self
689 .state()
690 .await
691 .read_project(request.payload.project_id, request.sender_id)?
692 .host_connection_id;
693 let response_payload = self
694 .peer
695 .forward_request(request.sender_id, host, request.payload.clone())
696 .await?;
697
698 let mut guests = self
699 .state()
700 .await
701 .read_project(request.payload.project_id, request.sender_id)?
702 .connection_ids();
703 guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
704 broadcast(host, guests, |conn_id| {
705 self.peer
706 .forward_send(host, conn_id, response_payload.clone())
707 });
708 response.send(response_payload)?;
709 Ok(())
710 }
711
712 async fn update_buffer(
713 self: Arc<Server>,
714 request: TypedEnvelope<proto::UpdateBuffer>,
715 response: Response<proto::UpdateBuffer>,
716 ) -> Result<()> {
717 let receiver_ids = self
718 .state()
719 .await
720 .project_connection_ids(request.payload.project_id, request.sender_id)?;
721 broadcast(request.sender_id, receiver_ids, |connection_id| {
722 self.peer
723 .forward_send(request.sender_id, connection_id, request.payload.clone())
724 });
725 response.send(proto::Ack {})?;
726 Ok(())
727 }
728
729 async fn update_buffer_file(
730 self: Arc<Server>,
731 request: TypedEnvelope<proto::UpdateBufferFile>,
732 ) -> Result<()> {
733 let receiver_ids = self
734 .state()
735 .await
736 .project_connection_ids(request.payload.project_id, request.sender_id)?;
737 broadcast(request.sender_id, receiver_ids, |connection_id| {
738 self.peer
739 .forward_send(request.sender_id, connection_id, request.payload.clone())
740 });
741 Ok(())
742 }
743
744 async fn buffer_reloaded(
745 self: Arc<Server>,
746 request: TypedEnvelope<proto::BufferReloaded>,
747 ) -> Result<()> {
748 let receiver_ids = self
749 .state()
750 .await
751 .project_connection_ids(request.payload.project_id, request.sender_id)?;
752 broadcast(request.sender_id, receiver_ids, |connection_id| {
753 self.peer
754 .forward_send(request.sender_id, connection_id, request.payload.clone())
755 });
756 Ok(())
757 }
758
759 async fn buffer_saved(
760 self: Arc<Server>,
761 request: TypedEnvelope<proto::BufferSaved>,
762 ) -> Result<()> {
763 let receiver_ids = self
764 .state()
765 .await
766 .project_connection_ids(request.payload.project_id, request.sender_id)?;
767 broadcast(request.sender_id, receiver_ids, |connection_id| {
768 self.peer
769 .forward_send(request.sender_id, connection_id, request.payload.clone())
770 });
771 Ok(())
772 }
773
774 async fn follow(
775 self: Arc<Self>,
776 request: TypedEnvelope<proto::Follow>,
777 response: Response<proto::Follow>,
778 ) -> Result<()> {
779 let leader_id = ConnectionId(request.payload.leader_id);
780 let follower_id = request.sender_id;
781 if !self
782 .state()
783 .await
784 .project_connection_ids(request.payload.project_id, follower_id)?
785 .contains(&leader_id)
786 {
787 Err(anyhow!("no such peer"))?;
788 }
789 let mut response_payload = self
790 .peer
791 .forward_request(request.sender_id, leader_id, request.payload)
792 .await?;
793 response_payload
794 .views
795 .retain(|view| view.leader_id != Some(follower_id.0));
796 response.send(response_payload)?;
797 Ok(())
798 }
799
800 async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
801 let leader_id = ConnectionId(request.payload.leader_id);
802 if !self
803 .state()
804 .await
805 .project_connection_ids(request.payload.project_id, request.sender_id)?
806 .contains(&leader_id)
807 {
808 Err(anyhow!("no such peer"))?;
809 }
810 self.peer
811 .forward_send(request.sender_id, leader_id, request.payload)?;
812 Ok(())
813 }
814
815 async fn update_followers(
816 self: Arc<Self>,
817 request: TypedEnvelope<proto::UpdateFollowers>,
818 ) -> Result<()> {
819 let connection_ids = self
820 .state()
821 .await
822 .project_connection_ids(request.payload.project_id, request.sender_id)?;
823 let leader_id = request
824 .payload
825 .variant
826 .as_ref()
827 .and_then(|variant| match variant {
828 proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
829 proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
830 proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
831 });
832 for follower_id in &request.payload.follower_ids {
833 let follower_id = ConnectionId(*follower_id);
834 if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
835 self.peer
836 .forward_send(request.sender_id, follower_id, request.payload.clone())?;
837 }
838 }
839 Ok(())
840 }
841
842 async fn get_channels(
843 self: Arc<Server>,
844 request: TypedEnvelope<proto::GetChannels>,
845 response: Response<proto::GetChannels>,
846 ) -> Result<()> {
847 let user_id = self
848 .state()
849 .await
850 .user_id_for_connection(request.sender_id)?;
851 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
852 response.send(proto::GetChannelsResponse {
853 channels: channels
854 .into_iter()
855 .map(|chan| proto::Channel {
856 id: chan.id.to_proto(),
857 name: chan.name,
858 })
859 .collect(),
860 })?;
861 Ok(())
862 }
863
864 async fn get_users(
865 self: Arc<Server>,
866 request: TypedEnvelope<proto::GetUsers>,
867 response: Response<proto::GetUsers>,
868 ) -> Result<()> {
869 let user_ids = request
870 .payload
871 .user_ids
872 .into_iter()
873 .map(UserId::from_proto)
874 .collect();
875 let users = self
876 .app_state
877 .db
878 .get_users_by_ids(user_ids)
879 .await?
880 .into_iter()
881 .map(|user| proto::User {
882 id: user.id.to_proto(),
883 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
884 github_login: user.github_login,
885 })
886 .collect();
887 response.send(proto::UsersResponse { users })?;
888 Ok(())
889 }
890
891 async fn fuzzy_search_users(
892 self: Arc<Server>,
893 request: TypedEnvelope<proto::FuzzySearchUsers>,
894 response: Response<proto::FuzzySearchUsers>,
895 ) -> Result<()> {
896 let query = request.payload.query;
897 let db = &self.app_state.db;
898 let users = match query.len() {
899 0 => vec![],
900 1 | 2 => db
901 .get_user_by_github_login(&query)
902 .await?
903 .into_iter()
904 .collect(),
905 _ => db.fuzzy_search_users(&query, 10).await?,
906 };
907 let users = users
908 .into_iter()
909 .map(|user| proto::User {
910 id: user.id.to_proto(),
911 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
912 github_login: user.github_login,
913 })
914 .collect();
915 response.send(proto::UsersResponse { users })?;
916 Ok(())
917 }
918
919 async fn request_contact(
920 self: Arc<Server>,
921 request: TypedEnvelope<proto::RequestContact>,
922 response: Response<proto::RequestContact>,
923 ) -> Result<()> {
924 let requester_id = self
925 .store
926 .read()
927 .await
928 .user_id_for_connection(request.sender_id)?;
929 let responder_id = UserId::from_proto(request.payload.to_user_id);
930 self.app_state
931 .db
932 .send_contact_request(requester_id, responder_id)
933 .await?;
934 response.send(proto::Ack {})?;
935 Ok(())
936 }
937
938 async fn respond_to_contact_request(
939 self: Arc<Server>,
940 request: TypedEnvelope<proto::RespondToContactRequest>,
941 response: Response<proto::RespondToContactRequest>,
942 ) -> Result<()> {
943 let responder_id = self
944 .store
945 .read()
946 .await
947 .user_id_for_connection(request.sender_id)?;
948 let requester_id = UserId::from_proto(request.payload.requesting_user_id);
949 self.app_state
950 .db
951 .respond_to_contact_request(
952 responder_id,
953 requester_id,
954 request.payload.response == proto::ContactRequestResponse::Accept as i32,
955 )
956 .await?;
957 response.send(proto::Ack {})?;
958 Ok(())
959 }
960
961 #[instrument(skip(self, state, user_ids))]
962 fn update_contacts_for_users<'a>(
963 self: &Arc<Self>,
964 state: &Store,
965 user_ids: impl IntoIterator<Item = &'a UserId>,
966 ) {
967 for user_id in user_ids {
968 let contacts = state.contacts_for_user(*user_id);
969 for connection_id in state.connection_ids_for_user(*user_id) {
970 self.peer
971 .send(
972 connection_id,
973 proto::UpdateContacts {
974 contacts: contacts.clone(),
975 pending_requests_from_user_ids: Default::default(),
976 pending_requests_to_user_ids: Default::default(),
977 },
978 )
979 .trace_err();
980 }
981 }
982 }
983
984 async fn join_channel(
985 self: Arc<Self>,
986 request: TypedEnvelope<proto::JoinChannel>,
987 response: Response<proto::JoinChannel>,
988 ) -> Result<()> {
989 let user_id = self
990 .state()
991 .await
992 .user_id_for_connection(request.sender_id)?;
993 let channel_id = ChannelId::from_proto(request.payload.channel_id);
994 if !self
995 .app_state
996 .db
997 .can_user_access_channel(user_id, channel_id)
998 .await?
999 {
1000 Err(anyhow!("access denied"))?;
1001 }
1002
1003 self.state_mut()
1004 .await
1005 .join_channel(request.sender_id, channel_id);
1006 let messages = self
1007 .app_state
1008 .db
1009 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
1010 .await?
1011 .into_iter()
1012 .map(|msg| proto::ChannelMessage {
1013 id: msg.id.to_proto(),
1014 body: msg.body,
1015 timestamp: msg.sent_at.unix_timestamp() as u64,
1016 sender_id: msg.sender_id.to_proto(),
1017 nonce: Some(msg.nonce.as_u128().into()),
1018 })
1019 .collect::<Vec<_>>();
1020 response.send(proto::JoinChannelResponse {
1021 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1022 messages,
1023 })?;
1024 Ok(())
1025 }
1026
1027 async fn leave_channel(
1028 self: Arc<Self>,
1029 request: TypedEnvelope<proto::LeaveChannel>,
1030 ) -> Result<()> {
1031 let user_id = self
1032 .state()
1033 .await
1034 .user_id_for_connection(request.sender_id)?;
1035 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1036 if !self
1037 .app_state
1038 .db
1039 .can_user_access_channel(user_id, channel_id)
1040 .await?
1041 {
1042 Err(anyhow!("access denied"))?;
1043 }
1044
1045 self.state_mut()
1046 .await
1047 .leave_channel(request.sender_id, channel_id);
1048
1049 Ok(())
1050 }
1051
1052 async fn send_channel_message(
1053 self: Arc<Self>,
1054 request: TypedEnvelope<proto::SendChannelMessage>,
1055 response: Response<proto::SendChannelMessage>,
1056 ) -> Result<()> {
1057 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1058 let user_id;
1059 let connection_ids;
1060 {
1061 let state = self.state().await;
1062 user_id = state.user_id_for_connection(request.sender_id)?;
1063 connection_ids = state.channel_connection_ids(channel_id)?;
1064 }
1065
1066 // Validate the message body.
1067 let body = request.payload.body.trim().to_string();
1068 if body.len() > MAX_MESSAGE_LEN {
1069 return Err(anyhow!("message is too long"))?;
1070 }
1071 if body.is_empty() {
1072 return Err(anyhow!("message can't be blank"))?;
1073 }
1074
1075 let timestamp = OffsetDateTime::now_utc();
1076 let nonce = request
1077 .payload
1078 .nonce
1079 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
1080
1081 let message_id = self
1082 .app_state
1083 .db
1084 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1085 .await?
1086 .to_proto();
1087 let message = proto::ChannelMessage {
1088 sender_id: user_id.to_proto(),
1089 id: message_id,
1090 body,
1091 timestamp: timestamp.unix_timestamp() as u64,
1092 nonce: Some(nonce),
1093 };
1094 broadcast(request.sender_id, connection_ids, |conn_id| {
1095 self.peer.send(
1096 conn_id,
1097 proto::ChannelMessageSent {
1098 channel_id: channel_id.to_proto(),
1099 message: Some(message.clone()),
1100 },
1101 )
1102 });
1103 response.send(proto::SendChannelMessageResponse {
1104 message: Some(message),
1105 })?;
1106 Ok(())
1107 }
1108
1109 async fn get_channel_messages(
1110 self: Arc<Self>,
1111 request: TypedEnvelope<proto::GetChannelMessages>,
1112 response: Response<proto::GetChannelMessages>,
1113 ) -> Result<()> {
1114 let user_id = self
1115 .state()
1116 .await
1117 .user_id_for_connection(request.sender_id)?;
1118 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1119 if !self
1120 .app_state
1121 .db
1122 .can_user_access_channel(user_id, channel_id)
1123 .await?
1124 {
1125 Err(anyhow!("access denied"))?;
1126 }
1127
1128 let messages = self
1129 .app_state
1130 .db
1131 .get_channel_messages(
1132 channel_id,
1133 MESSAGE_COUNT_PER_PAGE,
1134 Some(MessageId::from_proto(request.payload.before_message_id)),
1135 )
1136 .await?
1137 .into_iter()
1138 .map(|msg| proto::ChannelMessage {
1139 id: msg.id.to_proto(),
1140 body: msg.body,
1141 timestamp: msg.sent_at.unix_timestamp() as u64,
1142 sender_id: msg.sender_id.to_proto(),
1143 nonce: Some(msg.nonce.as_u128().into()),
1144 })
1145 .collect::<Vec<_>>();
1146 response.send(proto::GetChannelMessagesResponse {
1147 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1148 messages,
1149 })?;
1150 Ok(())
1151 }
1152
1153 async fn state<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1154 #[cfg(test)]
1155 tokio::task::yield_now().await;
1156 let guard = self.store.read().await;
1157 #[cfg(test)]
1158 tokio::task::yield_now().await;
1159 StoreReadGuard {
1160 guard,
1161 _not_send: PhantomData,
1162 }
1163 }
1164
1165 async fn state_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1166 #[cfg(test)]
1167 tokio::task::yield_now().await;
1168 let guard = self.store.write().await;
1169 #[cfg(test)]
1170 tokio::task::yield_now().await;
1171 StoreWriteGuard {
1172 guard,
1173 _not_send: PhantomData,
1174 }
1175 }
1176}
1177
1178impl<'a> Deref for StoreReadGuard<'a> {
1179 type Target = Store;
1180
1181 fn deref(&self) -> &Self::Target {
1182 &*self.guard
1183 }
1184}
1185
1186impl<'a> Deref for StoreWriteGuard<'a> {
1187 type Target = Store;
1188
1189 fn deref(&self) -> &Self::Target {
1190 &*self.guard
1191 }
1192}
1193
1194impl<'a> DerefMut for StoreWriteGuard<'a> {
1195 fn deref_mut(&mut self) -> &mut Self::Target {
1196 &mut *self.guard
1197 }
1198}
1199
1200impl<'a> Drop for StoreWriteGuard<'a> {
1201 fn drop(&mut self) {
1202 #[cfg(test)]
1203 self.check_invariants();
1204
1205 let metrics = self.metrics();
1206 tracing::info!(
1207 connections = metrics.connections,
1208 registered_projects = metrics.registered_projects,
1209 shared_projects = metrics.shared_projects,
1210 "metrics"
1211 );
1212 }
1213}
1214
1215impl Executor for RealExecutor {
1216 type Sleep = Sleep;
1217
1218 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1219 tokio::task::spawn(future);
1220 }
1221
1222 fn sleep(&self, duration: Duration) -> Self::Sleep {
1223 tokio::time::sleep(duration)
1224 }
1225}
1226
1227#[instrument(skip(f))]
1228fn broadcast<F>(sender_id: ConnectionId, receiver_ids: Vec<ConnectionId>, mut f: F)
1229where
1230 F: FnMut(ConnectionId) -> anyhow::Result<()>,
1231{
1232 for receiver_id in receiver_ids {
1233 if receiver_id != sender_id {
1234 f(receiver_id).trace_err();
1235 }
1236 }
1237}
1238
1239lazy_static! {
1240 static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1241}
1242
1243pub struct ProtocolVersion(u32);
1244
1245impl Header for ProtocolVersion {
1246 fn name() -> &'static HeaderName {
1247 &ZED_PROTOCOL_VERSION
1248 }
1249
1250 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1251 where
1252 Self: Sized,
1253 I: Iterator<Item = &'i axum::http::HeaderValue>,
1254 {
1255 let version = values
1256 .next()
1257 .ok_or_else(|| axum::headers::Error::invalid())?
1258 .to_str()
1259 .map_err(|_| axum::headers::Error::invalid())?
1260 .parse()
1261 .map_err(|_| axum::headers::Error::invalid())?;
1262 Ok(Self(version))
1263 }
1264
1265 fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1266 values.extend([self.0.to_string().parse().unwrap()]);
1267 }
1268}
1269
1270pub fn routes(app_state: Arc<AppState>) -> Router<Body> {
1271 let server = Server::new(app_state.clone(), None);
1272 Router::new()
1273 .route("/rpc", get(handle_websocket_request))
1274 .layer(
1275 ServiceBuilder::new()
1276 .layer(Extension(app_state))
1277 .layer(middleware::from_fn(auth::validate_header))
1278 .layer(Extension(server)),
1279 )
1280}
1281
1282pub async fn handle_websocket_request(
1283 TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1284 ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1285 Extension(server): Extension<Arc<Server>>,
1286 Extension(user_id): Extension<UserId>,
1287 ws: WebSocketUpgrade,
1288) -> axum::response::Response {
1289 if protocol_version != rpc::PROTOCOL_VERSION {
1290 return (
1291 StatusCode::UPGRADE_REQUIRED,
1292 "client must be upgraded".to_string(),
1293 )
1294 .into_response();
1295 }
1296 let socket_address = socket_address.to_string();
1297 ws.on_upgrade(move |socket| {
1298 let socket = socket
1299 .map_ok(to_tungstenite_message)
1300 .err_into()
1301 .with(|message| async move { Ok(to_axum_message(message)) });
1302 let connection = Connection::new(Box::pin(socket));
1303 server.handle_connection(connection, socket_address, user_id, None, RealExecutor)
1304 })
1305}
1306
1307fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1308 match message {
1309 TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1310 TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1311 TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1312 TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1313 TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1314 code: frame.code.into(),
1315 reason: frame.reason,
1316 })),
1317 }
1318}
1319
1320fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1321 match message {
1322 AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1323 AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1324 AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1325 AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1326 AxumMessage::Close(frame) => {
1327 TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1328 code: frame.code.into(),
1329 reason: frame.reason,
1330 }))
1331 }
1332 }
1333}
1334
1335pub trait ResultExt {
1336 type Ok;
1337
1338 fn trace_err(self) -> Option<Self::Ok>;
1339}
1340
1341impl<T, E> ResultExt for Result<T, E>
1342where
1343 E: std::fmt::Debug,
1344{
1345 type Ok = T;
1346
1347 fn trace_err(self) -> Option<T> {
1348 match self {
1349 Ok(value) => Some(value),
1350 Err(error) => {
1351 tracing::error!("{:?}", error);
1352 None
1353 }
1354 }
1355 }
1356}
1357
1358#[cfg(test)]
1359mod tests {
1360 use super::*;
1361 use crate::{
1362 db::{tests::TestDb, UserId},
1363 AppState,
1364 };
1365 use ::rpc::Peer;
1366 use client::{
1367 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1368 EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1369 };
1370 use collections::BTreeMap;
1371 use editor::{
1372 self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1373 ToOffset, ToggleCodeActions, Undo,
1374 };
1375 use gpui::{
1376 executor::{self, Deterministic},
1377 geometry::vector::vec2f,
1378 ModelHandle, TestAppContext, ViewHandle,
1379 };
1380 use language::{
1381 range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1382 LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1383 };
1384 use lsp::{self, FakeLanguageServer};
1385 use parking_lot::Mutex;
1386 use project::{
1387 fs::{FakeFs, Fs as _},
1388 search::SearchQuery,
1389 worktree::WorktreeHandle,
1390 DiagnosticSummary, Project, ProjectPath, WorktreeId,
1391 };
1392 use rand::prelude::*;
1393 use rpc::PeerId;
1394 use serde_json::json;
1395 use settings::Settings;
1396 use sqlx::types::time::OffsetDateTime;
1397 use std::{
1398 env,
1399 ops::Deref,
1400 path::{Path, PathBuf},
1401 rc::Rc,
1402 sync::{
1403 atomic::{AtomicBool, Ordering::SeqCst},
1404 Arc,
1405 },
1406 time::Duration,
1407 };
1408 use theme::ThemeRegistry;
1409 use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
1410
1411 #[cfg(test)]
1412 #[ctor::ctor]
1413 fn init_logger() {
1414 if std::env::var("RUST_LOG").is_ok() {
1415 env_logger::init();
1416 }
1417 }
1418
1419 #[gpui::test(iterations = 10)]
1420 async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1421 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1422 let lang_registry = Arc::new(LanguageRegistry::test());
1423 let fs = FakeFs::new(cx_a.background());
1424 cx_a.foreground().forbid_parking();
1425
1426 // Connect to a server as 2 clients.
1427 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1428 let client_a = server.create_client(cx_a, "user_a").await;
1429 let client_b = server.create_client(cx_b, "user_b").await;
1430
1431 // Share a project as client A
1432 fs.insert_tree(
1433 "/a",
1434 json!({
1435 ".zed.toml": r#"collaborators = ["user_b"]"#,
1436 "a.txt": "a-contents",
1437 "b.txt": "b-contents",
1438 }),
1439 )
1440 .await;
1441 let project_a = cx_a.update(|cx| {
1442 Project::local(
1443 client_a.clone(),
1444 client_a.user_store.clone(),
1445 lang_registry.clone(),
1446 fs.clone(),
1447 cx,
1448 )
1449 });
1450 let (worktree_a, _) = project_a
1451 .update(cx_a, |p, cx| {
1452 p.find_or_create_local_worktree("/a", true, cx)
1453 })
1454 .await
1455 .unwrap();
1456 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1457 worktree_a
1458 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1459 .await;
1460 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1461 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1462
1463 // Join that project as client B
1464 let project_b = Project::remote(
1465 project_id,
1466 client_b.clone(),
1467 client_b.user_store.clone(),
1468 lang_registry.clone(),
1469 fs.clone(),
1470 &mut cx_b.to_async(),
1471 )
1472 .await
1473 .unwrap();
1474
1475 let replica_id_b = project_b.read_with(cx_b, |project, _| {
1476 assert_eq!(
1477 project
1478 .collaborators()
1479 .get(&client_a.peer_id)
1480 .unwrap()
1481 .user
1482 .github_login,
1483 "user_a"
1484 );
1485 project.replica_id()
1486 });
1487 project_a
1488 .condition(&cx_a, |tree, _| {
1489 tree.collaborators()
1490 .get(&client_b.peer_id)
1491 .map_or(false, |collaborator| {
1492 collaborator.replica_id == replica_id_b
1493 && collaborator.user.github_login == "user_b"
1494 })
1495 })
1496 .await;
1497
1498 // Open the same file as client B and client A.
1499 let buffer_b = project_b
1500 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1501 .await
1502 .unwrap();
1503 buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1504 project_a.read_with(cx_a, |project, cx| {
1505 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1506 });
1507 let buffer_a = project_a
1508 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1509 .await
1510 .unwrap();
1511
1512 let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1513
1514 // TODO
1515 // // Create a selection set as client B and see that selection set as client A.
1516 // buffer_a
1517 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1518 // .await;
1519
1520 // Edit the buffer as client B and see that edit as client A.
1521 editor_b.update(cx_b, |editor, cx| {
1522 editor.handle_input(&Input("ok, ".into()), cx)
1523 });
1524 buffer_a
1525 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1526 .await;
1527
1528 // TODO
1529 // // Remove the selection set as client B, see those selections disappear as client A.
1530 cx_b.update(move |_| drop(editor_b));
1531 // buffer_a
1532 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1533 // .await;
1534
1535 // Dropping the client B's project removes client B from client A's collaborators.
1536 cx_b.update(move |_| drop(project_b));
1537 project_a
1538 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1539 .await;
1540 }
1541
1542 #[gpui::test(iterations = 10)]
1543 async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1544 let lang_registry = Arc::new(LanguageRegistry::test());
1545 let fs = FakeFs::new(cx_a.background());
1546 cx_a.foreground().forbid_parking();
1547
1548 // Connect to a server as 2 clients.
1549 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1550 let client_a = server.create_client(cx_a, "user_a").await;
1551 let client_b = server.create_client(cx_b, "user_b").await;
1552
1553 // Share a project as client A
1554 fs.insert_tree(
1555 "/a",
1556 json!({
1557 ".zed.toml": r#"collaborators = ["user_b"]"#,
1558 "a.txt": "a-contents",
1559 "b.txt": "b-contents",
1560 }),
1561 )
1562 .await;
1563 let project_a = cx_a.update(|cx| {
1564 Project::local(
1565 client_a.clone(),
1566 client_a.user_store.clone(),
1567 lang_registry.clone(),
1568 fs.clone(),
1569 cx,
1570 )
1571 });
1572 let (worktree_a, _) = project_a
1573 .update(cx_a, |p, cx| {
1574 p.find_or_create_local_worktree("/a", true, cx)
1575 })
1576 .await
1577 .unwrap();
1578 worktree_a
1579 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1580 .await;
1581 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1582 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1583 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1584 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1585
1586 // Join that project as client B
1587 let project_b = Project::remote(
1588 project_id,
1589 client_b.clone(),
1590 client_b.user_store.clone(),
1591 lang_registry.clone(),
1592 fs.clone(),
1593 &mut cx_b.to_async(),
1594 )
1595 .await
1596 .unwrap();
1597 project_b
1598 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1599 .await
1600 .unwrap();
1601
1602 // Unshare the project as client A
1603 project_a.update(cx_a, |project, cx| project.unshare(cx));
1604 project_b
1605 .condition(cx_b, |project, _| project.is_read_only())
1606 .await;
1607 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1608 cx_b.update(|_| {
1609 drop(project_b);
1610 });
1611
1612 // Share the project again and ensure guests can still join.
1613 project_a
1614 .update(cx_a, |project, cx| project.share(cx))
1615 .await
1616 .unwrap();
1617 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1618
1619 let project_b2 = Project::remote(
1620 project_id,
1621 client_b.clone(),
1622 client_b.user_store.clone(),
1623 lang_registry.clone(),
1624 fs.clone(),
1625 &mut cx_b.to_async(),
1626 )
1627 .await
1628 .unwrap();
1629 project_b2
1630 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1631 .await
1632 .unwrap();
1633 }
1634
1635 #[gpui::test(iterations = 10)]
1636 async fn test_host_disconnect(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1637 let lang_registry = Arc::new(LanguageRegistry::test());
1638 let fs = FakeFs::new(cx_a.background());
1639 cx_a.foreground().forbid_parking();
1640
1641 // Connect to a server as 2 clients.
1642 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1643 let client_a = server.create_client(cx_a, "user_a").await;
1644 let client_b = server.create_client(cx_b, "user_b").await;
1645
1646 // Share a project as client A
1647 fs.insert_tree(
1648 "/a",
1649 json!({
1650 ".zed.toml": r#"collaborators = ["user_b"]"#,
1651 "a.txt": "a-contents",
1652 "b.txt": "b-contents",
1653 }),
1654 )
1655 .await;
1656 let project_a = cx_a.update(|cx| {
1657 Project::local(
1658 client_a.clone(),
1659 client_a.user_store.clone(),
1660 lang_registry.clone(),
1661 fs.clone(),
1662 cx,
1663 )
1664 });
1665 let (worktree_a, _) = project_a
1666 .update(cx_a, |p, cx| {
1667 p.find_or_create_local_worktree("/a", true, cx)
1668 })
1669 .await
1670 .unwrap();
1671 worktree_a
1672 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1673 .await;
1674 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1675 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1676 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1677 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1678
1679 // Join that project as client B
1680 let project_b = Project::remote(
1681 project_id,
1682 client_b.clone(),
1683 client_b.user_store.clone(),
1684 lang_registry.clone(),
1685 fs.clone(),
1686 &mut cx_b.to_async(),
1687 )
1688 .await
1689 .unwrap();
1690 project_b
1691 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1692 .await
1693 .unwrap();
1694
1695 // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
1696 server.disconnect_client(client_a.current_user_id(cx_a));
1697 cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
1698 project_a
1699 .condition(cx_a, |project, _| project.collaborators().is_empty())
1700 .await;
1701 project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
1702 project_b
1703 .condition(cx_b, |project, _| project.is_read_only())
1704 .await;
1705 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1706 cx_b.update(|_| {
1707 drop(project_b);
1708 });
1709
1710 // Await reconnection
1711 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1712
1713 // Share the project again and ensure guests can still join.
1714 project_a
1715 .update(cx_a, |project, cx| project.share(cx))
1716 .await
1717 .unwrap();
1718 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1719
1720 let project_b2 = Project::remote(
1721 project_id,
1722 client_b.clone(),
1723 client_b.user_store.clone(),
1724 lang_registry.clone(),
1725 fs.clone(),
1726 &mut cx_b.to_async(),
1727 )
1728 .await
1729 .unwrap();
1730 project_b2
1731 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1732 .await
1733 .unwrap();
1734 }
1735
1736 #[gpui::test(iterations = 10)]
1737 async fn test_propagate_saves_and_fs_changes(
1738 cx_a: &mut TestAppContext,
1739 cx_b: &mut TestAppContext,
1740 cx_c: &mut TestAppContext,
1741 ) {
1742 let lang_registry = Arc::new(LanguageRegistry::test());
1743 let fs = FakeFs::new(cx_a.background());
1744 cx_a.foreground().forbid_parking();
1745
1746 // Connect to a server as 3 clients.
1747 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1748 let client_a = server.create_client(cx_a, "user_a").await;
1749 let client_b = server.create_client(cx_b, "user_b").await;
1750 let client_c = server.create_client(cx_c, "user_c").await;
1751
1752 // Share a worktree as client A.
1753 fs.insert_tree(
1754 "/a",
1755 json!({
1756 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1757 "file1": "",
1758 "file2": ""
1759 }),
1760 )
1761 .await;
1762 let project_a = cx_a.update(|cx| {
1763 Project::local(
1764 client_a.clone(),
1765 client_a.user_store.clone(),
1766 lang_registry.clone(),
1767 fs.clone(),
1768 cx,
1769 )
1770 });
1771 let (worktree_a, _) = project_a
1772 .update(cx_a, |p, cx| {
1773 p.find_or_create_local_worktree("/a", true, cx)
1774 })
1775 .await
1776 .unwrap();
1777 worktree_a
1778 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1779 .await;
1780 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1781 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1782 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1783
1784 // Join that worktree as clients B and C.
1785 let project_b = Project::remote(
1786 project_id,
1787 client_b.clone(),
1788 client_b.user_store.clone(),
1789 lang_registry.clone(),
1790 fs.clone(),
1791 &mut cx_b.to_async(),
1792 )
1793 .await
1794 .unwrap();
1795 let project_c = Project::remote(
1796 project_id,
1797 client_c.clone(),
1798 client_c.user_store.clone(),
1799 lang_registry.clone(),
1800 fs.clone(),
1801 &mut cx_c.to_async(),
1802 )
1803 .await
1804 .unwrap();
1805 let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1806 let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1807
1808 // Open and edit a buffer as both guests B and C.
1809 let buffer_b = project_b
1810 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1811 .await
1812 .unwrap();
1813 let buffer_c = project_c
1814 .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1815 .await
1816 .unwrap();
1817 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "i-am-b, ")], cx));
1818 buffer_c.update(cx_c, |buf, cx| buf.edit([(0..0, "i-am-c, ")], cx));
1819
1820 // Open and edit that buffer as the host.
1821 let buffer_a = project_a
1822 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1823 .await
1824 .unwrap();
1825
1826 buffer_a
1827 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1828 .await;
1829 buffer_a.update(cx_a, |buf, cx| {
1830 buf.edit([(buf.len()..buf.len(), "i-am-a")], cx)
1831 });
1832
1833 // Wait for edits to propagate
1834 buffer_a
1835 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1836 .await;
1837 buffer_b
1838 .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1839 .await;
1840 buffer_c
1841 .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1842 .await;
1843
1844 // Edit the buffer as the host and concurrently save as guest B.
1845 let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1846 buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "hi-a, ")], cx));
1847 save_b.await.unwrap();
1848 assert_eq!(
1849 fs.load("/a/file1".as_ref()).await.unwrap(),
1850 "hi-a, i-am-c, i-am-b, i-am-a"
1851 );
1852 buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1853 buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1854 buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1855
1856 worktree_a.flush_fs_events(cx_a).await;
1857
1858 // Make changes on host's file system, see those changes on guest worktrees.
1859 fs.rename(
1860 "/a/file1".as_ref(),
1861 "/a/file1-renamed".as_ref(),
1862 Default::default(),
1863 )
1864 .await
1865 .unwrap();
1866
1867 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1868 .await
1869 .unwrap();
1870 fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1871
1872 worktree_a
1873 .condition(&cx_a, |tree, _| {
1874 tree.paths()
1875 .map(|p| p.to_string_lossy())
1876 .collect::<Vec<_>>()
1877 == [".zed.toml", "file1-renamed", "file3", "file4"]
1878 })
1879 .await;
1880 worktree_b
1881 .condition(&cx_b, |tree, _| {
1882 tree.paths()
1883 .map(|p| p.to_string_lossy())
1884 .collect::<Vec<_>>()
1885 == [".zed.toml", "file1-renamed", "file3", "file4"]
1886 })
1887 .await;
1888 worktree_c
1889 .condition(&cx_c, |tree, _| {
1890 tree.paths()
1891 .map(|p| p.to_string_lossy())
1892 .collect::<Vec<_>>()
1893 == [".zed.toml", "file1-renamed", "file3", "file4"]
1894 })
1895 .await;
1896
1897 // Ensure buffer files are updated as well.
1898 buffer_a
1899 .condition(&cx_a, |buf, _| {
1900 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1901 })
1902 .await;
1903 buffer_b
1904 .condition(&cx_b, |buf, _| {
1905 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1906 })
1907 .await;
1908 buffer_c
1909 .condition(&cx_c, |buf, _| {
1910 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1911 })
1912 .await;
1913 }
1914
1915 #[gpui::test(iterations = 10)]
1916 async fn test_fs_operations(
1917 executor: Arc<Deterministic>,
1918 cx_a: &mut TestAppContext,
1919 cx_b: &mut TestAppContext,
1920 ) {
1921 executor.forbid_parking();
1922 let fs = FakeFs::new(cx_a.background());
1923
1924 // Connect to a server as 2 clients.
1925 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1926 let mut client_a = server.create_client(cx_a, "user_a").await;
1927 let mut client_b = server.create_client(cx_b, "user_b").await;
1928
1929 // Share a project as client A
1930 fs.insert_tree(
1931 "/dir",
1932 json!({
1933 ".zed.toml": r#"collaborators = ["user_b"]"#,
1934 "a.txt": "a-contents",
1935 "b.txt": "b-contents",
1936 }),
1937 )
1938 .await;
1939
1940 let (project_a, worktree_id) = client_a.build_local_project(fs, "/dir", cx_a).await;
1941 let project_id = project_a.read_with(cx_a, |project, _| project.remote_id().unwrap());
1942 project_a
1943 .update(cx_a, |project, cx| project.share(cx))
1944 .await
1945 .unwrap();
1946
1947 let project_b = client_b.build_remote_project(project_id, cx_b).await;
1948
1949 let worktree_a =
1950 project_a.read_with(cx_a, |project, cx| project.worktrees(cx).next().unwrap());
1951 let worktree_b =
1952 project_b.read_with(cx_b, |project, cx| project.worktrees(cx).next().unwrap());
1953
1954 let entry = project_b
1955 .update(cx_b, |project, cx| {
1956 project
1957 .create_entry((worktree_id, "c.txt"), false, cx)
1958 .unwrap()
1959 })
1960 .await
1961 .unwrap();
1962 worktree_a.read_with(cx_a, |worktree, _| {
1963 assert_eq!(
1964 worktree
1965 .paths()
1966 .map(|p| p.to_string_lossy())
1967 .collect::<Vec<_>>(),
1968 [".zed.toml", "a.txt", "b.txt", "c.txt"]
1969 );
1970 });
1971 worktree_b.read_with(cx_b, |worktree, _| {
1972 assert_eq!(
1973 worktree
1974 .paths()
1975 .map(|p| p.to_string_lossy())
1976 .collect::<Vec<_>>(),
1977 [".zed.toml", "a.txt", "b.txt", "c.txt"]
1978 );
1979 });
1980
1981 project_b
1982 .update(cx_b, |project, cx| {
1983 project.rename_entry(entry.id, Path::new("d.txt"), cx)
1984 })
1985 .unwrap()
1986 .await
1987 .unwrap();
1988 worktree_a.read_with(cx_a, |worktree, _| {
1989 assert_eq!(
1990 worktree
1991 .paths()
1992 .map(|p| p.to_string_lossy())
1993 .collect::<Vec<_>>(),
1994 [".zed.toml", "a.txt", "b.txt", "d.txt"]
1995 );
1996 });
1997 worktree_b.read_with(cx_b, |worktree, _| {
1998 assert_eq!(
1999 worktree
2000 .paths()
2001 .map(|p| p.to_string_lossy())
2002 .collect::<Vec<_>>(),
2003 [".zed.toml", "a.txt", "b.txt", "d.txt"]
2004 );
2005 });
2006
2007 let dir_entry = project_b
2008 .update(cx_b, |project, cx| {
2009 project
2010 .create_entry((worktree_id, "DIR"), true, cx)
2011 .unwrap()
2012 })
2013 .await
2014 .unwrap();
2015 worktree_a.read_with(cx_a, |worktree, _| {
2016 assert_eq!(
2017 worktree
2018 .paths()
2019 .map(|p| p.to_string_lossy())
2020 .collect::<Vec<_>>(),
2021 [".zed.toml", "DIR", "a.txt", "b.txt", "d.txt"]
2022 );
2023 });
2024 worktree_b.read_with(cx_b, |worktree, _| {
2025 assert_eq!(
2026 worktree
2027 .paths()
2028 .map(|p| p.to_string_lossy())
2029 .collect::<Vec<_>>(),
2030 [".zed.toml", "DIR", "a.txt", "b.txt", "d.txt"]
2031 );
2032 });
2033
2034 project_b
2035 .update(cx_b, |project, cx| {
2036 project.delete_entry(dir_entry.id, cx).unwrap()
2037 })
2038 .await
2039 .unwrap();
2040 worktree_a.read_with(cx_a, |worktree, _| {
2041 assert_eq!(
2042 worktree
2043 .paths()
2044 .map(|p| p.to_string_lossy())
2045 .collect::<Vec<_>>(),
2046 [".zed.toml", "a.txt", "b.txt", "d.txt"]
2047 );
2048 });
2049 worktree_b.read_with(cx_b, |worktree, _| {
2050 assert_eq!(
2051 worktree
2052 .paths()
2053 .map(|p| p.to_string_lossy())
2054 .collect::<Vec<_>>(),
2055 [".zed.toml", "a.txt", "b.txt", "d.txt"]
2056 );
2057 });
2058
2059 project_b
2060 .update(cx_b, |project, cx| {
2061 project.delete_entry(entry.id, cx).unwrap()
2062 })
2063 .await
2064 .unwrap();
2065 worktree_a.read_with(cx_a, |worktree, _| {
2066 assert_eq!(
2067 worktree
2068 .paths()
2069 .map(|p| p.to_string_lossy())
2070 .collect::<Vec<_>>(),
2071 [".zed.toml", "a.txt", "b.txt"]
2072 );
2073 });
2074 worktree_b.read_with(cx_b, |worktree, _| {
2075 assert_eq!(
2076 worktree
2077 .paths()
2078 .map(|p| p.to_string_lossy())
2079 .collect::<Vec<_>>(),
2080 [".zed.toml", "a.txt", "b.txt"]
2081 );
2082 });
2083 }
2084
2085 #[gpui::test(iterations = 10)]
2086 async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2087 cx_a.foreground().forbid_parking();
2088 let lang_registry = Arc::new(LanguageRegistry::test());
2089 let fs = FakeFs::new(cx_a.background());
2090
2091 // Connect to a server as 2 clients.
2092 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2093 let client_a = server.create_client(cx_a, "user_a").await;
2094 let client_b = server.create_client(cx_b, "user_b").await;
2095
2096 // Share a project as client A
2097 fs.insert_tree(
2098 "/dir",
2099 json!({
2100 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2101 "a.txt": "a-contents",
2102 }),
2103 )
2104 .await;
2105
2106 let project_a = cx_a.update(|cx| {
2107 Project::local(
2108 client_a.clone(),
2109 client_a.user_store.clone(),
2110 lang_registry.clone(),
2111 fs.clone(),
2112 cx,
2113 )
2114 });
2115 let (worktree_a, _) = project_a
2116 .update(cx_a, |p, cx| {
2117 p.find_or_create_local_worktree("/dir", true, cx)
2118 })
2119 .await
2120 .unwrap();
2121 worktree_a
2122 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2123 .await;
2124 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2125 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2126 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2127
2128 // Join that project as client B
2129 let project_b = Project::remote(
2130 project_id,
2131 client_b.clone(),
2132 client_b.user_store.clone(),
2133 lang_registry.clone(),
2134 fs.clone(),
2135 &mut cx_b.to_async(),
2136 )
2137 .await
2138 .unwrap();
2139
2140 // Open a buffer as client B
2141 let buffer_b = project_b
2142 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2143 .await
2144 .unwrap();
2145
2146 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "world ")], cx));
2147 buffer_b.read_with(cx_b, |buf, _| {
2148 assert!(buf.is_dirty());
2149 assert!(!buf.has_conflict());
2150 });
2151
2152 buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
2153 buffer_b
2154 .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
2155 .await;
2156 buffer_b.read_with(cx_b, |buf, _| {
2157 assert!(!buf.has_conflict());
2158 });
2159
2160 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "hello ")], cx));
2161 buffer_b.read_with(cx_b, |buf, _| {
2162 assert!(buf.is_dirty());
2163 assert!(!buf.has_conflict());
2164 });
2165 }
2166
2167 #[gpui::test(iterations = 10)]
2168 async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2169 cx_a.foreground().forbid_parking();
2170 let lang_registry = Arc::new(LanguageRegistry::test());
2171 let fs = FakeFs::new(cx_a.background());
2172
2173 // Connect to a server as 2 clients.
2174 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2175 let client_a = server.create_client(cx_a, "user_a").await;
2176 let client_b = server.create_client(cx_b, "user_b").await;
2177
2178 // Share a project as client A
2179 fs.insert_tree(
2180 "/dir",
2181 json!({
2182 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2183 "a.txt": "a-contents",
2184 }),
2185 )
2186 .await;
2187
2188 let project_a = cx_a.update(|cx| {
2189 Project::local(
2190 client_a.clone(),
2191 client_a.user_store.clone(),
2192 lang_registry.clone(),
2193 fs.clone(),
2194 cx,
2195 )
2196 });
2197 let (worktree_a, _) = project_a
2198 .update(cx_a, |p, cx| {
2199 p.find_or_create_local_worktree("/dir", true, cx)
2200 })
2201 .await
2202 .unwrap();
2203 worktree_a
2204 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2205 .await;
2206 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2207 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2208 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2209
2210 // Join that project as client B
2211 let project_b = Project::remote(
2212 project_id,
2213 client_b.clone(),
2214 client_b.user_store.clone(),
2215 lang_registry.clone(),
2216 fs.clone(),
2217 &mut cx_b.to_async(),
2218 )
2219 .await
2220 .unwrap();
2221 let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2222
2223 // Open a buffer as client B
2224 let buffer_b = project_b
2225 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2226 .await
2227 .unwrap();
2228 buffer_b.read_with(cx_b, |buf, _| {
2229 assert!(!buf.is_dirty());
2230 assert!(!buf.has_conflict());
2231 });
2232
2233 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
2234 .await
2235 .unwrap();
2236 buffer_b
2237 .condition(&cx_b, |buf, _| {
2238 buf.text() == "new contents" && !buf.is_dirty()
2239 })
2240 .await;
2241 buffer_b.read_with(cx_b, |buf, _| {
2242 assert!(!buf.has_conflict());
2243 });
2244 }
2245
2246 #[gpui::test(iterations = 10)]
2247 async fn test_editing_while_guest_opens_buffer(
2248 cx_a: &mut TestAppContext,
2249 cx_b: &mut TestAppContext,
2250 ) {
2251 cx_a.foreground().forbid_parking();
2252 let lang_registry = Arc::new(LanguageRegistry::test());
2253 let fs = FakeFs::new(cx_a.background());
2254
2255 // Connect to a server as 2 clients.
2256 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2257 let client_a = server.create_client(cx_a, "user_a").await;
2258 let client_b = server.create_client(cx_b, "user_b").await;
2259
2260 // Share a project as client A
2261 fs.insert_tree(
2262 "/dir",
2263 json!({
2264 ".zed.toml": r#"collaborators = ["user_b"]"#,
2265 "a.txt": "a-contents",
2266 }),
2267 )
2268 .await;
2269 let project_a = cx_a.update(|cx| {
2270 Project::local(
2271 client_a.clone(),
2272 client_a.user_store.clone(),
2273 lang_registry.clone(),
2274 fs.clone(),
2275 cx,
2276 )
2277 });
2278 let (worktree_a, _) = project_a
2279 .update(cx_a, |p, cx| {
2280 p.find_or_create_local_worktree("/dir", true, cx)
2281 })
2282 .await
2283 .unwrap();
2284 worktree_a
2285 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2286 .await;
2287 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2288 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2289 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2290
2291 // Join that project as client B
2292 let project_b = Project::remote(
2293 project_id,
2294 client_b.clone(),
2295 client_b.user_store.clone(),
2296 lang_registry.clone(),
2297 fs.clone(),
2298 &mut cx_b.to_async(),
2299 )
2300 .await
2301 .unwrap();
2302
2303 // Open a buffer as client A
2304 let buffer_a = project_a
2305 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2306 .await
2307 .unwrap();
2308
2309 // Start opening the same buffer as client B
2310 let buffer_b = cx_b
2311 .background()
2312 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2313
2314 // Edit the buffer as client A while client B is still opening it.
2315 cx_b.background().simulate_random_delay().await;
2316 buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "X")], cx));
2317 cx_b.background().simulate_random_delay().await;
2318 buffer_a.update(cx_a, |buf, cx| buf.edit([(1..1, "Y")], cx));
2319
2320 let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2321 let buffer_b = buffer_b.await.unwrap();
2322 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2323 }
2324
2325 #[gpui::test(iterations = 10)]
2326 async fn test_leaving_worktree_while_opening_buffer(
2327 cx_a: &mut TestAppContext,
2328 cx_b: &mut TestAppContext,
2329 ) {
2330 cx_a.foreground().forbid_parking();
2331 let lang_registry = Arc::new(LanguageRegistry::test());
2332 let fs = FakeFs::new(cx_a.background());
2333
2334 // Connect to a server as 2 clients.
2335 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2336 let client_a = server.create_client(cx_a, "user_a").await;
2337 let client_b = server.create_client(cx_b, "user_b").await;
2338
2339 // Share a project as client A
2340 fs.insert_tree(
2341 "/dir",
2342 json!({
2343 ".zed.toml": r#"collaborators = ["user_b"]"#,
2344 "a.txt": "a-contents",
2345 }),
2346 )
2347 .await;
2348 let project_a = cx_a.update(|cx| {
2349 Project::local(
2350 client_a.clone(),
2351 client_a.user_store.clone(),
2352 lang_registry.clone(),
2353 fs.clone(),
2354 cx,
2355 )
2356 });
2357 let (worktree_a, _) = project_a
2358 .update(cx_a, |p, cx| {
2359 p.find_or_create_local_worktree("/dir", true, cx)
2360 })
2361 .await
2362 .unwrap();
2363 worktree_a
2364 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2365 .await;
2366 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2367 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2368 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2369
2370 // Join that project as client B
2371 let project_b = Project::remote(
2372 project_id,
2373 client_b.clone(),
2374 client_b.user_store.clone(),
2375 lang_registry.clone(),
2376 fs.clone(),
2377 &mut cx_b.to_async(),
2378 )
2379 .await
2380 .unwrap();
2381
2382 // See that a guest has joined as client A.
2383 project_a
2384 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2385 .await;
2386
2387 // Begin opening a buffer as client B, but leave the project before the open completes.
2388 let buffer_b = cx_b
2389 .background()
2390 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2391 cx_b.update(|_| drop(project_b));
2392 drop(buffer_b);
2393
2394 // See that the guest has left.
2395 project_a
2396 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2397 .await;
2398 }
2399
2400 #[gpui::test(iterations = 10)]
2401 async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2402 cx_a.foreground().forbid_parking();
2403 let lang_registry = Arc::new(LanguageRegistry::test());
2404 let fs = FakeFs::new(cx_a.background());
2405
2406 // Connect to a server as 2 clients.
2407 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2408 let client_a = server.create_client(cx_a, "user_a").await;
2409 let client_b = server.create_client(cx_b, "user_b").await;
2410
2411 // Share a project as client A
2412 fs.insert_tree(
2413 "/a",
2414 json!({
2415 ".zed.toml": r#"collaborators = ["user_b"]"#,
2416 "a.txt": "a-contents",
2417 "b.txt": "b-contents",
2418 }),
2419 )
2420 .await;
2421 let project_a = cx_a.update(|cx| {
2422 Project::local(
2423 client_a.clone(),
2424 client_a.user_store.clone(),
2425 lang_registry.clone(),
2426 fs.clone(),
2427 cx,
2428 )
2429 });
2430 let (worktree_a, _) = project_a
2431 .update(cx_a, |p, cx| {
2432 p.find_or_create_local_worktree("/a", true, cx)
2433 })
2434 .await
2435 .unwrap();
2436 worktree_a
2437 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2438 .await;
2439 let project_id = project_a
2440 .update(cx_a, |project, _| project.next_remote_id())
2441 .await;
2442 project_a
2443 .update(cx_a, |project, cx| project.share(cx))
2444 .await
2445 .unwrap();
2446
2447 // Join that project as client B
2448 let _project_b = Project::remote(
2449 project_id,
2450 client_b.clone(),
2451 client_b.user_store.clone(),
2452 lang_registry.clone(),
2453 fs.clone(),
2454 &mut cx_b.to_async(),
2455 )
2456 .await
2457 .unwrap();
2458
2459 // Client A sees that a guest has joined.
2460 project_a
2461 .condition(cx_a, |p, _| p.collaborators().len() == 1)
2462 .await;
2463
2464 // Drop client B's connection and ensure client A observes client B leaving the project.
2465 client_b.disconnect(&cx_b.to_async()).unwrap();
2466 project_a
2467 .condition(cx_a, |p, _| p.collaborators().len() == 0)
2468 .await;
2469
2470 // Rejoin the project as client B
2471 let _project_b = Project::remote(
2472 project_id,
2473 client_b.clone(),
2474 client_b.user_store.clone(),
2475 lang_registry.clone(),
2476 fs.clone(),
2477 &mut cx_b.to_async(),
2478 )
2479 .await
2480 .unwrap();
2481
2482 // Client A sees that a guest has re-joined.
2483 project_a
2484 .condition(cx_a, |p, _| p.collaborators().len() == 1)
2485 .await;
2486
2487 // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2488 client_b.wait_for_current_user(cx_b).await;
2489 server.disconnect_client(client_b.current_user_id(cx_b));
2490 cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
2491 project_a
2492 .condition(cx_a, |p, _| p.collaborators().len() == 0)
2493 .await;
2494 }
2495
2496 #[gpui::test(iterations = 10)]
2497 async fn test_collaborating_with_diagnostics(
2498 cx_a: &mut TestAppContext,
2499 cx_b: &mut TestAppContext,
2500 ) {
2501 cx_a.foreground().forbid_parking();
2502 let lang_registry = Arc::new(LanguageRegistry::test());
2503 let fs = FakeFs::new(cx_a.background());
2504
2505 // Set up a fake language server.
2506 let mut language = Language::new(
2507 LanguageConfig {
2508 name: "Rust".into(),
2509 path_suffixes: vec!["rs".to_string()],
2510 ..Default::default()
2511 },
2512 Some(tree_sitter_rust::language()),
2513 );
2514 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2515 lang_registry.add(Arc::new(language));
2516
2517 // Connect to a server as 2 clients.
2518 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2519 let client_a = server.create_client(cx_a, "user_a").await;
2520 let client_b = server.create_client(cx_b, "user_b").await;
2521
2522 // Share a project as client A
2523 fs.insert_tree(
2524 "/a",
2525 json!({
2526 ".zed.toml": r#"collaborators = ["user_b"]"#,
2527 "a.rs": "let one = two",
2528 "other.rs": "",
2529 }),
2530 )
2531 .await;
2532 let project_a = cx_a.update(|cx| {
2533 Project::local(
2534 client_a.clone(),
2535 client_a.user_store.clone(),
2536 lang_registry.clone(),
2537 fs.clone(),
2538 cx,
2539 )
2540 });
2541 let (worktree_a, _) = project_a
2542 .update(cx_a, |p, cx| {
2543 p.find_or_create_local_worktree("/a", true, cx)
2544 })
2545 .await
2546 .unwrap();
2547 worktree_a
2548 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2549 .await;
2550 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2551 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2552 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2553
2554 // Cause the language server to start.
2555 let _ = cx_a
2556 .background()
2557 .spawn(project_a.update(cx_a, |project, cx| {
2558 project.open_buffer(
2559 ProjectPath {
2560 worktree_id,
2561 path: Path::new("other.rs").into(),
2562 },
2563 cx,
2564 )
2565 }))
2566 .await
2567 .unwrap();
2568
2569 // Simulate a language server reporting errors for a file.
2570 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2571 fake_language_server
2572 .receive_notification::<lsp::notification::DidOpenTextDocument>()
2573 .await;
2574 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2575 lsp::PublishDiagnosticsParams {
2576 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2577 version: None,
2578 diagnostics: vec![lsp::Diagnostic {
2579 severity: Some(lsp::DiagnosticSeverity::ERROR),
2580 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2581 message: "message 1".to_string(),
2582 ..Default::default()
2583 }],
2584 },
2585 );
2586
2587 // Wait for server to see the diagnostics update.
2588 server
2589 .condition(|store| {
2590 let worktree = store
2591 .project(project_id)
2592 .unwrap()
2593 .share
2594 .as_ref()
2595 .unwrap()
2596 .worktrees
2597 .get(&worktree_id.to_proto())
2598 .unwrap();
2599
2600 !worktree.diagnostic_summaries.is_empty()
2601 })
2602 .await;
2603
2604 // Join the worktree as client B.
2605 let project_b = Project::remote(
2606 project_id,
2607 client_b.clone(),
2608 client_b.user_store.clone(),
2609 lang_registry.clone(),
2610 fs.clone(),
2611 &mut cx_b.to_async(),
2612 )
2613 .await
2614 .unwrap();
2615
2616 project_b.read_with(cx_b, |project, cx| {
2617 assert_eq!(
2618 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2619 &[(
2620 ProjectPath {
2621 worktree_id,
2622 path: Arc::from(Path::new("a.rs")),
2623 },
2624 DiagnosticSummary {
2625 error_count: 1,
2626 warning_count: 0,
2627 ..Default::default()
2628 },
2629 )]
2630 )
2631 });
2632
2633 // Simulate a language server reporting more errors for a file.
2634 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2635 lsp::PublishDiagnosticsParams {
2636 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2637 version: None,
2638 diagnostics: vec![
2639 lsp::Diagnostic {
2640 severity: Some(lsp::DiagnosticSeverity::ERROR),
2641 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2642 message: "message 1".to_string(),
2643 ..Default::default()
2644 },
2645 lsp::Diagnostic {
2646 severity: Some(lsp::DiagnosticSeverity::WARNING),
2647 range: lsp::Range::new(
2648 lsp::Position::new(0, 10),
2649 lsp::Position::new(0, 13),
2650 ),
2651 message: "message 2".to_string(),
2652 ..Default::default()
2653 },
2654 ],
2655 },
2656 );
2657
2658 // Client b gets the updated summaries
2659 project_b
2660 .condition(&cx_b, |project, cx| {
2661 project.diagnostic_summaries(cx).collect::<Vec<_>>()
2662 == &[(
2663 ProjectPath {
2664 worktree_id,
2665 path: Arc::from(Path::new("a.rs")),
2666 },
2667 DiagnosticSummary {
2668 error_count: 1,
2669 warning_count: 1,
2670 ..Default::default()
2671 },
2672 )]
2673 })
2674 .await;
2675
2676 // Open the file with the errors on client B. They should be present.
2677 let buffer_b = cx_b
2678 .background()
2679 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2680 .await
2681 .unwrap();
2682
2683 buffer_b.read_with(cx_b, |buffer, _| {
2684 assert_eq!(
2685 buffer
2686 .snapshot()
2687 .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2688 .map(|entry| entry)
2689 .collect::<Vec<_>>(),
2690 &[
2691 DiagnosticEntry {
2692 range: Point::new(0, 4)..Point::new(0, 7),
2693 diagnostic: Diagnostic {
2694 group_id: 0,
2695 message: "message 1".to_string(),
2696 severity: lsp::DiagnosticSeverity::ERROR,
2697 is_primary: true,
2698 ..Default::default()
2699 }
2700 },
2701 DiagnosticEntry {
2702 range: Point::new(0, 10)..Point::new(0, 13),
2703 diagnostic: Diagnostic {
2704 group_id: 1,
2705 severity: lsp::DiagnosticSeverity::WARNING,
2706 message: "message 2".to_string(),
2707 is_primary: true,
2708 ..Default::default()
2709 }
2710 }
2711 ]
2712 );
2713 });
2714
2715 // Simulate a language server reporting no errors for a file.
2716 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2717 lsp::PublishDiagnosticsParams {
2718 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2719 version: None,
2720 diagnostics: vec![],
2721 },
2722 );
2723 project_a
2724 .condition(cx_a, |project, cx| {
2725 project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2726 })
2727 .await;
2728 project_b
2729 .condition(cx_b, |project, cx| {
2730 project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2731 })
2732 .await;
2733 }
2734
2735 #[gpui::test(iterations = 10)]
2736 async fn test_collaborating_with_completion(
2737 cx_a: &mut TestAppContext,
2738 cx_b: &mut TestAppContext,
2739 ) {
2740 cx_a.foreground().forbid_parking();
2741 let lang_registry = Arc::new(LanguageRegistry::test());
2742 let fs = FakeFs::new(cx_a.background());
2743
2744 // Set up a fake language server.
2745 let mut language = Language::new(
2746 LanguageConfig {
2747 name: "Rust".into(),
2748 path_suffixes: vec!["rs".to_string()],
2749 ..Default::default()
2750 },
2751 Some(tree_sitter_rust::language()),
2752 );
2753 let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
2754 capabilities: lsp::ServerCapabilities {
2755 completion_provider: Some(lsp::CompletionOptions {
2756 trigger_characters: Some(vec![".".to_string()]),
2757 ..Default::default()
2758 }),
2759 ..Default::default()
2760 },
2761 ..Default::default()
2762 });
2763 lang_registry.add(Arc::new(language));
2764
2765 // Connect to a server as 2 clients.
2766 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2767 let client_a = server.create_client(cx_a, "user_a").await;
2768 let client_b = server.create_client(cx_b, "user_b").await;
2769
2770 // Share a project as client A
2771 fs.insert_tree(
2772 "/a",
2773 json!({
2774 ".zed.toml": r#"collaborators = ["user_b"]"#,
2775 "main.rs": "fn main() { a }",
2776 "other.rs": "",
2777 }),
2778 )
2779 .await;
2780 let project_a = cx_a.update(|cx| {
2781 Project::local(
2782 client_a.clone(),
2783 client_a.user_store.clone(),
2784 lang_registry.clone(),
2785 fs.clone(),
2786 cx,
2787 )
2788 });
2789 let (worktree_a, _) = project_a
2790 .update(cx_a, |p, cx| {
2791 p.find_or_create_local_worktree("/a", true, cx)
2792 })
2793 .await
2794 .unwrap();
2795 worktree_a
2796 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2797 .await;
2798 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2799 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2800 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2801
2802 // Join the worktree as client B.
2803 let project_b = Project::remote(
2804 project_id,
2805 client_b.clone(),
2806 client_b.user_store.clone(),
2807 lang_registry.clone(),
2808 fs.clone(),
2809 &mut cx_b.to_async(),
2810 )
2811 .await
2812 .unwrap();
2813
2814 // Open a file in an editor as the guest.
2815 let buffer_b = project_b
2816 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2817 .await
2818 .unwrap();
2819 let (window_b, _) = cx_b.add_window(|_| EmptyView);
2820 let editor_b = cx_b.add_view(window_b, |cx| {
2821 Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
2822 });
2823
2824 let fake_language_server = fake_language_servers.next().await.unwrap();
2825 buffer_b
2826 .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2827 .await;
2828
2829 // Type a completion trigger character as the guest.
2830 editor_b.update(cx_b, |editor, cx| {
2831 editor.select_ranges([13..13], None, cx);
2832 editor.handle_input(&Input(".".into()), cx);
2833 cx.focus(&editor_b);
2834 });
2835
2836 // Receive a completion request as the host's language server.
2837 // Return some completions from the host's language server.
2838 cx_a.foreground().start_waiting();
2839 fake_language_server
2840 .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
2841 assert_eq!(
2842 params.text_document_position.text_document.uri,
2843 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2844 );
2845 assert_eq!(
2846 params.text_document_position.position,
2847 lsp::Position::new(0, 14),
2848 );
2849
2850 Ok(Some(lsp::CompletionResponse::Array(vec![
2851 lsp::CompletionItem {
2852 label: "first_method(…)".into(),
2853 detail: Some("fn(&mut self, B) -> C".into()),
2854 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2855 new_text: "first_method($1)".to_string(),
2856 range: lsp::Range::new(
2857 lsp::Position::new(0, 14),
2858 lsp::Position::new(0, 14),
2859 ),
2860 })),
2861 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2862 ..Default::default()
2863 },
2864 lsp::CompletionItem {
2865 label: "second_method(…)".into(),
2866 detail: Some("fn(&mut self, C) -> D<E>".into()),
2867 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2868 new_text: "second_method()".to_string(),
2869 range: lsp::Range::new(
2870 lsp::Position::new(0, 14),
2871 lsp::Position::new(0, 14),
2872 ),
2873 })),
2874 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2875 ..Default::default()
2876 },
2877 ])))
2878 })
2879 .next()
2880 .await
2881 .unwrap();
2882 cx_a.foreground().finish_waiting();
2883
2884 // Open the buffer on the host.
2885 let buffer_a = project_a
2886 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2887 .await
2888 .unwrap();
2889 buffer_a
2890 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2891 .await;
2892
2893 // Confirm a completion on the guest.
2894 editor_b
2895 .condition(&cx_b, |editor, _| editor.context_menu_visible())
2896 .await;
2897 editor_b.update(cx_b, |editor, cx| {
2898 editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
2899 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2900 });
2901
2902 // Return a resolved completion from the host's language server.
2903 // The resolved completion has an additional text edit.
2904 fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
2905 |params, _| async move {
2906 assert_eq!(params.label, "first_method(…)");
2907 Ok(lsp::CompletionItem {
2908 label: "first_method(…)".into(),
2909 detail: Some("fn(&mut self, B) -> C".into()),
2910 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2911 new_text: "first_method($1)".to_string(),
2912 range: lsp::Range::new(
2913 lsp::Position::new(0, 14),
2914 lsp::Position::new(0, 14),
2915 ),
2916 })),
2917 additional_text_edits: Some(vec![lsp::TextEdit {
2918 new_text: "use d::SomeTrait;\n".to_string(),
2919 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2920 }]),
2921 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2922 ..Default::default()
2923 })
2924 },
2925 );
2926
2927 // The additional edit is applied.
2928 buffer_a
2929 .condition(&cx_a, |buffer, _| {
2930 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2931 })
2932 .await;
2933 buffer_b
2934 .condition(&cx_b, |buffer, _| {
2935 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2936 })
2937 .await;
2938 }
2939
2940 #[gpui::test(iterations = 10)]
2941 async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2942 cx_a.foreground().forbid_parking();
2943 let lang_registry = Arc::new(LanguageRegistry::test());
2944 let fs = FakeFs::new(cx_a.background());
2945
2946 // Connect to a server as 2 clients.
2947 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2948 let client_a = server.create_client(cx_a, "user_a").await;
2949 let client_b = server.create_client(cx_b, "user_b").await;
2950
2951 // Share a project as client A
2952 fs.insert_tree(
2953 "/a",
2954 json!({
2955 ".zed.toml": r#"collaborators = ["user_b"]"#,
2956 "a.rs": "let one = 1;",
2957 }),
2958 )
2959 .await;
2960 let project_a = cx_a.update(|cx| {
2961 Project::local(
2962 client_a.clone(),
2963 client_a.user_store.clone(),
2964 lang_registry.clone(),
2965 fs.clone(),
2966 cx,
2967 )
2968 });
2969 let (worktree_a, _) = project_a
2970 .update(cx_a, |p, cx| {
2971 p.find_or_create_local_worktree("/a", true, cx)
2972 })
2973 .await
2974 .unwrap();
2975 worktree_a
2976 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2977 .await;
2978 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2979 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2980 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2981 let buffer_a = project_a
2982 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
2983 .await
2984 .unwrap();
2985
2986 // Join the worktree as client B.
2987 let project_b = Project::remote(
2988 project_id,
2989 client_b.clone(),
2990 client_b.user_store.clone(),
2991 lang_registry.clone(),
2992 fs.clone(),
2993 &mut cx_b.to_async(),
2994 )
2995 .await
2996 .unwrap();
2997
2998 let buffer_b = cx_b
2999 .background()
3000 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3001 .await
3002 .unwrap();
3003 buffer_b.update(cx_b, |buffer, cx| {
3004 buffer.edit([(4..7, "six")], cx);
3005 buffer.edit([(10..11, "6")], cx);
3006 assert_eq!(buffer.text(), "let six = 6;");
3007 assert!(buffer.is_dirty());
3008 assert!(!buffer.has_conflict());
3009 });
3010 buffer_a
3011 .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
3012 .await;
3013
3014 fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
3015 .await
3016 .unwrap();
3017 buffer_a
3018 .condition(cx_a, |buffer, _| buffer.has_conflict())
3019 .await;
3020 buffer_b
3021 .condition(cx_b, |buffer, _| buffer.has_conflict())
3022 .await;
3023
3024 project_b
3025 .update(cx_b, |project, cx| {
3026 project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
3027 })
3028 .await
3029 .unwrap();
3030 buffer_a.read_with(cx_a, |buffer, _| {
3031 assert_eq!(buffer.text(), "let seven = 7;");
3032 assert!(!buffer.is_dirty());
3033 assert!(!buffer.has_conflict());
3034 });
3035 buffer_b.read_with(cx_b, |buffer, _| {
3036 assert_eq!(buffer.text(), "let seven = 7;");
3037 assert!(!buffer.is_dirty());
3038 assert!(!buffer.has_conflict());
3039 });
3040
3041 buffer_a.update(cx_a, |buffer, cx| {
3042 // Undoing on the host is a no-op when the reload was initiated by the guest.
3043 buffer.undo(cx);
3044 assert_eq!(buffer.text(), "let seven = 7;");
3045 assert!(!buffer.is_dirty());
3046 assert!(!buffer.has_conflict());
3047 });
3048 buffer_b.update(cx_b, |buffer, cx| {
3049 // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
3050 buffer.undo(cx);
3051 assert_eq!(buffer.text(), "let six = 6;");
3052 assert!(buffer.is_dirty());
3053 assert!(!buffer.has_conflict());
3054 });
3055 }
3056
3057 #[gpui::test(iterations = 10)]
3058 async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3059 cx_a.foreground().forbid_parking();
3060 let lang_registry = Arc::new(LanguageRegistry::test());
3061 let fs = FakeFs::new(cx_a.background());
3062
3063 // Set up a fake language server.
3064 let mut language = Language::new(
3065 LanguageConfig {
3066 name: "Rust".into(),
3067 path_suffixes: vec!["rs".to_string()],
3068 ..Default::default()
3069 },
3070 Some(tree_sitter_rust::language()),
3071 );
3072 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3073 lang_registry.add(Arc::new(language));
3074
3075 // Connect to a server as 2 clients.
3076 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3077 let client_a = server.create_client(cx_a, "user_a").await;
3078 let client_b = server.create_client(cx_b, "user_b").await;
3079
3080 // Share a project as client A
3081 fs.insert_tree(
3082 "/a",
3083 json!({
3084 ".zed.toml": r#"collaborators = ["user_b"]"#,
3085 "a.rs": "let one = two",
3086 }),
3087 )
3088 .await;
3089 let project_a = cx_a.update(|cx| {
3090 Project::local(
3091 client_a.clone(),
3092 client_a.user_store.clone(),
3093 lang_registry.clone(),
3094 fs.clone(),
3095 cx,
3096 )
3097 });
3098 let (worktree_a, _) = project_a
3099 .update(cx_a, |p, cx| {
3100 p.find_or_create_local_worktree("/a", true, cx)
3101 })
3102 .await
3103 .unwrap();
3104 worktree_a
3105 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3106 .await;
3107 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3108 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3109 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3110
3111 // Join the worktree as client B.
3112 let project_b = Project::remote(
3113 project_id,
3114 client_b.clone(),
3115 client_b.user_store.clone(),
3116 lang_registry.clone(),
3117 fs.clone(),
3118 &mut cx_b.to_async(),
3119 )
3120 .await
3121 .unwrap();
3122
3123 let buffer_b = cx_b
3124 .background()
3125 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3126 .await
3127 .unwrap();
3128
3129 let fake_language_server = fake_language_servers.next().await.unwrap();
3130 fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
3131 Ok(Some(vec![
3132 lsp::TextEdit {
3133 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
3134 new_text: "h".to_string(),
3135 },
3136 lsp::TextEdit {
3137 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
3138 new_text: "y".to_string(),
3139 },
3140 ]))
3141 });
3142
3143 project_b
3144 .update(cx_b, |project, cx| {
3145 project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
3146 })
3147 .await
3148 .unwrap();
3149 assert_eq!(
3150 buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
3151 "let honey = two"
3152 );
3153 }
3154
3155 #[gpui::test(iterations = 10)]
3156 async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3157 cx_a.foreground().forbid_parking();
3158 let lang_registry = Arc::new(LanguageRegistry::test());
3159 let fs = FakeFs::new(cx_a.background());
3160 fs.insert_tree(
3161 "/root-1",
3162 json!({
3163 ".zed.toml": r#"collaborators = ["user_b"]"#,
3164 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
3165 }),
3166 )
3167 .await;
3168 fs.insert_tree(
3169 "/root-2",
3170 json!({
3171 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
3172 }),
3173 )
3174 .await;
3175
3176 // Set up a fake language server.
3177 let mut language = Language::new(
3178 LanguageConfig {
3179 name: "Rust".into(),
3180 path_suffixes: vec!["rs".to_string()],
3181 ..Default::default()
3182 },
3183 Some(tree_sitter_rust::language()),
3184 );
3185 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3186 lang_registry.add(Arc::new(language));
3187
3188 // Connect to a server as 2 clients.
3189 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3190 let client_a = server.create_client(cx_a, "user_a").await;
3191 let client_b = server.create_client(cx_b, "user_b").await;
3192
3193 // Share a project as client A
3194 let project_a = cx_a.update(|cx| {
3195 Project::local(
3196 client_a.clone(),
3197 client_a.user_store.clone(),
3198 lang_registry.clone(),
3199 fs.clone(),
3200 cx,
3201 )
3202 });
3203 let (worktree_a, _) = project_a
3204 .update(cx_a, |p, cx| {
3205 p.find_or_create_local_worktree("/root-1", true, cx)
3206 })
3207 .await
3208 .unwrap();
3209 worktree_a
3210 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3211 .await;
3212 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3213 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3214 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3215
3216 // Join the worktree as client B.
3217 let project_b = Project::remote(
3218 project_id,
3219 client_b.clone(),
3220 client_b.user_store.clone(),
3221 lang_registry.clone(),
3222 fs.clone(),
3223 &mut cx_b.to_async(),
3224 )
3225 .await
3226 .unwrap();
3227
3228 // Open the file on client B.
3229 let buffer_b = cx_b
3230 .background()
3231 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3232 .await
3233 .unwrap();
3234
3235 // Request the definition of a symbol as the guest.
3236 let fake_language_server = fake_language_servers.next().await.unwrap();
3237 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3238 |_, _| async move {
3239 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3240 lsp::Location::new(
3241 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3242 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3243 ),
3244 )))
3245 },
3246 );
3247
3248 let definitions_1 = project_b
3249 .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
3250 .await
3251 .unwrap();
3252 cx_b.read(|cx| {
3253 assert_eq!(definitions_1.len(), 1);
3254 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3255 let target_buffer = definitions_1[0].buffer.read(cx);
3256 assert_eq!(
3257 target_buffer.text(),
3258 "const TWO: usize = 2;\nconst THREE: usize = 3;"
3259 );
3260 assert_eq!(
3261 definitions_1[0].range.to_point(target_buffer),
3262 Point::new(0, 6)..Point::new(0, 9)
3263 );
3264 });
3265
3266 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
3267 // the previous call to `definition`.
3268 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3269 |_, _| async move {
3270 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3271 lsp::Location::new(
3272 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3273 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
3274 ),
3275 )))
3276 },
3277 );
3278
3279 let definitions_2 = project_b
3280 .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
3281 .await
3282 .unwrap();
3283 cx_b.read(|cx| {
3284 assert_eq!(definitions_2.len(), 1);
3285 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3286 let target_buffer = definitions_2[0].buffer.read(cx);
3287 assert_eq!(
3288 target_buffer.text(),
3289 "const TWO: usize = 2;\nconst THREE: usize = 3;"
3290 );
3291 assert_eq!(
3292 definitions_2[0].range.to_point(target_buffer),
3293 Point::new(1, 6)..Point::new(1, 11)
3294 );
3295 });
3296 assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
3297 }
3298
3299 #[gpui::test(iterations = 10)]
3300 async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3301 cx_a.foreground().forbid_parking();
3302 let lang_registry = Arc::new(LanguageRegistry::test());
3303 let fs = FakeFs::new(cx_a.background());
3304 fs.insert_tree(
3305 "/root-1",
3306 json!({
3307 ".zed.toml": r#"collaborators = ["user_b"]"#,
3308 "one.rs": "const ONE: usize = 1;",
3309 "two.rs": "const TWO: usize = one::ONE + one::ONE;",
3310 }),
3311 )
3312 .await;
3313 fs.insert_tree(
3314 "/root-2",
3315 json!({
3316 "three.rs": "const THREE: usize = two::TWO + one::ONE;",
3317 }),
3318 )
3319 .await;
3320
3321 // Set up a fake language server.
3322 let mut language = Language::new(
3323 LanguageConfig {
3324 name: "Rust".into(),
3325 path_suffixes: vec!["rs".to_string()],
3326 ..Default::default()
3327 },
3328 Some(tree_sitter_rust::language()),
3329 );
3330 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3331 lang_registry.add(Arc::new(language));
3332
3333 // Connect to a server as 2 clients.
3334 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3335 let client_a = server.create_client(cx_a, "user_a").await;
3336 let client_b = server.create_client(cx_b, "user_b").await;
3337
3338 // Share a project as client A
3339 let project_a = cx_a.update(|cx| {
3340 Project::local(
3341 client_a.clone(),
3342 client_a.user_store.clone(),
3343 lang_registry.clone(),
3344 fs.clone(),
3345 cx,
3346 )
3347 });
3348 let (worktree_a, _) = project_a
3349 .update(cx_a, |p, cx| {
3350 p.find_or_create_local_worktree("/root-1", true, cx)
3351 })
3352 .await
3353 .unwrap();
3354 worktree_a
3355 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3356 .await;
3357 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3358 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3359 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3360
3361 // Join the worktree as client B.
3362 let project_b = Project::remote(
3363 project_id,
3364 client_b.clone(),
3365 client_b.user_store.clone(),
3366 lang_registry.clone(),
3367 fs.clone(),
3368 &mut cx_b.to_async(),
3369 )
3370 .await
3371 .unwrap();
3372
3373 // Open the file on client B.
3374 let buffer_b = cx_b
3375 .background()
3376 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3377 .await
3378 .unwrap();
3379
3380 // Request references to a symbol as the guest.
3381 let fake_language_server = fake_language_servers.next().await.unwrap();
3382 fake_language_server.handle_request::<lsp::request::References, _, _>(
3383 |params, _| async move {
3384 assert_eq!(
3385 params.text_document_position.text_document.uri.as_str(),
3386 "file:///root-1/one.rs"
3387 );
3388 Ok(Some(vec![
3389 lsp::Location {
3390 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3391 range: lsp::Range::new(
3392 lsp::Position::new(0, 24),
3393 lsp::Position::new(0, 27),
3394 ),
3395 },
3396 lsp::Location {
3397 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3398 range: lsp::Range::new(
3399 lsp::Position::new(0, 35),
3400 lsp::Position::new(0, 38),
3401 ),
3402 },
3403 lsp::Location {
3404 uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3405 range: lsp::Range::new(
3406 lsp::Position::new(0, 37),
3407 lsp::Position::new(0, 40),
3408 ),
3409 },
3410 ]))
3411 },
3412 );
3413
3414 let references = project_b
3415 .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3416 .await
3417 .unwrap();
3418 cx_b.read(|cx| {
3419 assert_eq!(references.len(), 3);
3420 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3421
3422 let two_buffer = references[0].buffer.read(cx);
3423 let three_buffer = references[2].buffer.read(cx);
3424 assert_eq!(
3425 two_buffer.file().unwrap().path().as_ref(),
3426 Path::new("two.rs")
3427 );
3428 assert_eq!(references[1].buffer, references[0].buffer);
3429 assert_eq!(
3430 three_buffer.file().unwrap().full_path(cx),
3431 Path::new("three.rs")
3432 );
3433
3434 assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3435 assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3436 assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3437 });
3438 }
3439
3440 #[gpui::test(iterations = 10)]
3441 async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3442 cx_a.foreground().forbid_parking();
3443 let lang_registry = Arc::new(LanguageRegistry::test());
3444 let fs = FakeFs::new(cx_a.background());
3445 fs.insert_tree(
3446 "/root-1",
3447 json!({
3448 ".zed.toml": r#"collaborators = ["user_b"]"#,
3449 "a": "hello world",
3450 "b": "goodnight moon",
3451 "c": "a world of goo",
3452 "d": "world champion of clown world",
3453 }),
3454 )
3455 .await;
3456 fs.insert_tree(
3457 "/root-2",
3458 json!({
3459 "e": "disney world is fun",
3460 }),
3461 )
3462 .await;
3463
3464 // Connect to a server as 2 clients.
3465 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3466 let client_a = server.create_client(cx_a, "user_a").await;
3467 let client_b = server.create_client(cx_b, "user_b").await;
3468
3469 // Share a project as client A
3470 let project_a = cx_a.update(|cx| {
3471 Project::local(
3472 client_a.clone(),
3473 client_a.user_store.clone(),
3474 lang_registry.clone(),
3475 fs.clone(),
3476 cx,
3477 )
3478 });
3479 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3480
3481 let (worktree_1, _) = project_a
3482 .update(cx_a, |p, cx| {
3483 p.find_or_create_local_worktree("/root-1", true, cx)
3484 })
3485 .await
3486 .unwrap();
3487 worktree_1
3488 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3489 .await;
3490 let (worktree_2, _) = project_a
3491 .update(cx_a, |p, cx| {
3492 p.find_or_create_local_worktree("/root-2", true, cx)
3493 })
3494 .await
3495 .unwrap();
3496 worktree_2
3497 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3498 .await;
3499
3500 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3501
3502 // Join the worktree as client B.
3503 let project_b = Project::remote(
3504 project_id,
3505 client_b.clone(),
3506 client_b.user_store.clone(),
3507 lang_registry.clone(),
3508 fs.clone(),
3509 &mut cx_b.to_async(),
3510 )
3511 .await
3512 .unwrap();
3513
3514 let results = project_b
3515 .update(cx_b, |project, cx| {
3516 project.search(SearchQuery::text("world", false, false), cx)
3517 })
3518 .await
3519 .unwrap();
3520
3521 let mut ranges_by_path = results
3522 .into_iter()
3523 .map(|(buffer, ranges)| {
3524 buffer.read_with(cx_b, |buffer, cx| {
3525 let path = buffer.file().unwrap().full_path(cx);
3526 let offset_ranges = ranges
3527 .into_iter()
3528 .map(|range| range.to_offset(buffer))
3529 .collect::<Vec<_>>();
3530 (path, offset_ranges)
3531 })
3532 })
3533 .collect::<Vec<_>>();
3534 ranges_by_path.sort_by_key(|(path, _)| path.clone());
3535
3536 assert_eq!(
3537 ranges_by_path,
3538 &[
3539 (PathBuf::from("root-1/a"), vec![6..11]),
3540 (PathBuf::from("root-1/c"), vec![2..7]),
3541 (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3542 (PathBuf::from("root-2/e"), vec![7..12]),
3543 ]
3544 );
3545 }
3546
3547 #[gpui::test(iterations = 10)]
3548 async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3549 cx_a.foreground().forbid_parking();
3550 let lang_registry = Arc::new(LanguageRegistry::test());
3551 let fs = FakeFs::new(cx_a.background());
3552 fs.insert_tree(
3553 "/root-1",
3554 json!({
3555 ".zed.toml": r#"collaborators = ["user_b"]"#,
3556 "main.rs": "fn double(number: i32) -> i32 { number + number }",
3557 }),
3558 )
3559 .await;
3560
3561 // Set up a fake language server.
3562 let mut language = Language::new(
3563 LanguageConfig {
3564 name: "Rust".into(),
3565 path_suffixes: vec!["rs".to_string()],
3566 ..Default::default()
3567 },
3568 Some(tree_sitter_rust::language()),
3569 );
3570 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3571 lang_registry.add(Arc::new(language));
3572
3573 // Connect to a server as 2 clients.
3574 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3575 let client_a = server.create_client(cx_a, "user_a").await;
3576 let client_b = server.create_client(cx_b, "user_b").await;
3577
3578 // Share a project as client A
3579 let project_a = cx_a.update(|cx| {
3580 Project::local(
3581 client_a.clone(),
3582 client_a.user_store.clone(),
3583 lang_registry.clone(),
3584 fs.clone(),
3585 cx,
3586 )
3587 });
3588 let (worktree_a, _) = project_a
3589 .update(cx_a, |p, cx| {
3590 p.find_or_create_local_worktree("/root-1", true, cx)
3591 })
3592 .await
3593 .unwrap();
3594 worktree_a
3595 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3596 .await;
3597 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3598 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3599 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3600
3601 // Join the worktree as client B.
3602 let project_b = Project::remote(
3603 project_id,
3604 client_b.clone(),
3605 client_b.user_store.clone(),
3606 lang_registry.clone(),
3607 fs.clone(),
3608 &mut cx_b.to_async(),
3609 )
3610 .await
3611 .unwrap();
3612
3613 // Open the file on client B.
3614 let buffer_b = cx_b
3615 .background()
3616 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
3617 .await
3618 .unwrap();
3619
3620 // Request document highlights as the guest.
3621 let fake_language_server = fake_language_servers.next().await.unwrap();
3622 fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
3623 |params, _| async move {
3624 assert_eq!(
3625 params
3626 .text_document_position_params
3627 .text_document
3628 .uri
3629 .as_str(),
3630 "file:///root-1/main.rs"
3631 );
3632 assert_eq!(
3633 params.text_document_position_params.position,
3634 lsp::Position::new(0, 34)
3635 );
3636 Ok(Some(vec![
3637 lsp::DocumentHighlight {
3638 kind: Some(lsp::DocumentHighlightKind::WRITE),
3639 range: lsp::Range::new(
3640 lsp::Position::new(0, 10),
3641 lsp::Position::new(0, 16),
3642 ),
3643 },
3644 lsp::DocumentHighlight {
3645 kind: Some(lsp::DocumentHighlightKind::READ),
3646 range: lsp::Range::new(
3647 lsp::Position::new(0, 32),
3648 lsp::Position::new(0, 38),
3649 ),
3650 },
3651 lsp::DocumentHighlight {
3652 kind: Some(lsp::DocumentHighlightKind::READ),
3653 range: lsp::Range::new(
3654 lsp::Position::new(0, 41),
3655 lsp::Position::new(0, 47),
3656 ),
3657 },
3658 ]))
3659 },
3660 );
3661
3662 let highlights = project_b
3663 .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
3664 .await
3665 .unwrap();
3666 buffer_b.read_with(cx_b, |buffer, _| {
3667 let snapshot = buffer.snapshot();
3668
3669 let highlights = highlights
3670 .into_iter()
3671 .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3672 .collect::<Vec<_>>();
3673 assert_eq!(
3674 highlights,
3675 &[
3676 (lsp::DocumentHighlightKind::WRITE, 10..16),
3677 (lsp::DocumentHighlightKind::READ, 32..38),
3678 (lsp::DocumentHighlightKind::READ, 41..47)
3679 ]
3680 )
3681 });
3682 }
3683
3684 #[gpui::test(iterations = 10)]
3685 async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3686 cx_a.foreground().forbid_parking();
3687 let lang_registry = Arc::new(LanguageRegistry::test());
3688 let fs = FakeFs::new(cx_a.background());
3689 fs.insert_tree(
3690 "/code",
3691 json!({
3692 "crate-1": {
3693 ".zed.toml": r#"collaborators = ["user_b"]"#,
3694 "one.rs": "const ONE: usize = 1;",
3695 },
3696 "crate-2": {
3697 "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3698 },
3699 "private": {
3700 "passwords.txt": "the-password",
3701 }
3702 }),
3703 )
3704 .await;
3705
3706 // Set up a fake language server.
3707 let mut language = Language::new(
3708 LanguageConfig {
3709 name: "Rust".into(),
3710 path_suffixes: vec!["rs".to_string()],
3711 ..Default::default()
3712 },
3713 Some(tree_sitter_rust::language()),
3714 );
3715 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3716 lang_registry.add(Arc::new(language));
3717
3718 // Connect to a server as 2 clients.
3719 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3720 let client_a = server.create_client(cx_a, "user_a").await;
3721 let client_b = server.create_client(cx_b, "user_b").await;
3722
3723 // Share a project as client A
3724 let project_a = cx_a.update(|cx| {
3725 Project::local(
3726 client_a.clone(),
3727 client_a.user_store.clone(),
3728 lang_registry.clone(),
3729 fs.clone(),
3730 cx,
3731 )
3732 });
3733 let (worktree_a, _) = project_a
3734 .update(cx_a, |p, cx| {
3735 p.find_or_create_local_worktree("/code/crate-1", true, cx)
3736 })
3737 .await
3738 .unwrap();
3739 worktree_a
3740 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3741 .await;
3742 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3743 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3744 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3745
3746 // Join the worktree as client B.
3747 let project_b = Project::remote(
3748 project_id,
3749 client_b.clone(),
3750 client_b.user_store.clone(),
3751 lang_registry.clone(),
3752 fs.clone(),
3753 &mut cx_b.to_async(),
3754 )
3755 .await
3756 .unwrap();
3757
3758 // Cause the language server to start.
3759 let _buffer = cx_b
3760 .background()
3761 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3762 .await
3763 .unwrap();
3764
3765 let fake_language_server = fake_language_servers.next().await.unwrap();
3766 fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
3767 |_, _| async move {
3768 #[allow(deprecated)]
3769 Ok(Some(vec![lsp::SymbolInformation {
3770 name: "TWO".into(),
3771 location: lsp::Location {
3772 uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3773 range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3774 },
3775 kind: lsp::SymbolKind::CONSTANT,
3776 tags: None,
3777 container_name: None,
3778 deprecated: None,
3779 }]))
3780 },
3781 );
3782
3783 // Request the definition of a symbol as the guest.
3784 let symbols = project_b
3785 .update(cx_b, |p, cx| p.symbols("two", cx))
3786 .await
3787 .unwrap();
3788 assert_eq!(symbols.len(), 1);
3789 assert_eq!(symbols[0].name, "TWO");
3790
3791 // Open one of the returned symbols.
3792 let buffer_b_2 = project_b
3793 .update(cx_b, |project, cx| {
3794 project.open_buffer_for_symbol(&symbols[0], cx)
3795 })
3796 .await
3797 .unwrap();
3798 buffer_b_2.read_with(cx_b, |buffer, _| {
3799 assert_eq!(
3800 buffer.file().unwrap().path().as_ref(),
3801 Path::new("../crate-2/two.rs")
3802 );
3803 });
3804
3805 // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3806 let mut fake_symbol = symbols[0].clone();
3807 fake_symbol.path = Path::new("/code/secrets").into();
3808 let error = project_b
3809 .update(cx_b, |project, cx| {
3810 project.open_buffer_for_symbol(&fake_symbol, cx)
3811 })
3812 .await
3813 .unwrap_err();
3814 assert!(error.to_string().contains("invalid symbol signature"));
3815 }
3816
3817 #[gpui::test(iterations = 10)]
3818 async fn test_open_buffer_while_getting_definition_pointing_to_it(
3819 cx_a: &mut TestAppContext,
3820 cx_b: &mut TestAppContext,
3821 mut rng: StdRng,
3822 ) {
3823 cx_a.foreground().forbid_parking();
3824 let lang_registry = Arc::new(LanguageRegistry::test());
3825 let fs = FakeFs::new(cx_a.background());
3826 fs.insert_tree(
3827 "/root",
3828 json!({
3829 ".zed.toml": r#"collaborators = ["user_b"]"#,
3830 "a.rs": "const ONE: usize = b::TWO;",
3831 "b.rs": "const TWO: usize = 2",
3832 }),
3833 )
3834 .await;
3835
3836 // Set up a fake language server.
3837 let mut language = Language::new(
3838 LanguageConfig {
3839 name: "Rust".into(),
3840 path_suffixes: vec!["rs".to_string()],
3841 ..Default::default()
3842 },
3843 Some(tree_sitter_rust::language()),
3844 );
3845 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3846 lang_registry.add(Arc::new(language));
3847
3848 // Connect to a server as 2 clients.
3849 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3850 let client_a = server.create_client(cx_a, "user_a").await;
3851 let client_b = server.create_client(cx_b, "user_b").await;
3852
3853 // Share a project as client A
3854 let project_a = cx_a.update(|cx| {
3855 Project::local(
3856 client_a.clone(),
3857 client_a.user_store.clone(),
3858 lang_registry.clone(),
3859 fs.clone(),
3860 cx,
3861 )
3862 });
3863
3864 let (worktree_a, _) = project_a
3865 .update(cx_a, |p, cx| {
3866 p.find_or_create_local_worktree("/root", true, cx)
3867 })
3868 .await
3869 .unwrap();
3870 worktree_a
3871 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3872 .await;
3873 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3874 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3875 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3876
3877 // Join the worktree as client B.
3878 let project_b = Project::remote(
3879 project_id,
3880 client_b.clone(),
3881 client_b.user_store.clone(),
3882 lang_registry.clone(),
3883 fs.clone(),
3884 &mut cx_b.to_async(),
3885 )
3886 .await
3887 .unwrap();
3888
3889 let buffer_b1 = cx_b
3890 .background()
3891 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3892 .await
3893 .unwrap();
3894
3895 let fake_language_server = fake_language_servers.next().await.unwrap();
3896 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3897 |_, _| async move {
3898 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3899 lsp::Location::new(
3900 lsp::Url::from_file_path("/root/b.rs").unwrap(),
3901 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3902 ),
3903 )))
3904 },
3905 );
3906
3907 let definitions;
3908 let buffer_b2;
3909 if rng.gen() {
3910 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3911 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3912 } else {
3913 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3914 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3915 }
3916
3917 let buffer_b2 = buffer_b2.await.unwrap();
3918 let definitions = definitions.await.unwrap();
3919 assert_eq!(definitions.len(), 1);
3920 assert_eq!(definitions[0].buffer, buffer_b2);
3921 }
3922
3923 #[gpui::test(iterations = 10)]
3924 async fn test_collaborating_with_code_actions(
3925 cx_a: &mut TestAppContext,
3926 cx_b: &mut TestAppContext,
3927 ) {
3928 cx_a.foreground().forbid_parking();
3929 let lang_registry = Arc::new(LanguageRegistry::test());
3930 let fs = FakeFs::new(cx_a.background());
3931 cx_b.update(|cx| editor::init(cx));
3932
3933 // Set up a fake language server.
3934 let mut language = Language::new(
3935 LanguageConfig {
3936 name: "Rust".into(),
3937 path_suffixes: vec!["rs".to_string()],
3938 ..Default::default()
3939 },
3940 Some(tree_sitter_rust::language()),
3941 );
3942 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3943 lang_registry.add(Arc::new(language));
3944
3945 // Connect to a server as 2 clients.
3946 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3947 let client_a = server.create_client(cx_a, "user_a").await;
3948 let client_b = server.create_client(cx_b, "user_b").await;
3949
3950 // Share a project as client A
3951 fs.insert_tree(
3952 "/a",
3953 json!({
3954 ".zed.toml": r#"collaborators = ["user_b"]"#,
3955 "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3956 "other.rs": "pub fn foo() -> usize { 4 }",
3957 }),
3958 )
3959 .await;
3960 let project_a = cx_a.update(|cx| {
3961 Project::local(
3962 client_a.clone(),
3963 client_a.user_store.clone(),
3964 lang_registry.clone(),
3965 fs.clone(),
3966 cx,
3967 )
3968 });
3969 let (worktree_a, _) = project_a
3970 .update(cx_a, |p, cx| {
3971 p.find_or_create_local_worktree("/a", true, cx)
3972 })
3973 .await
3974 .unwrap();
3975 worktree_a
3976 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3977 .await;
3978 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3979 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3980 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3981
3982 // Join the worktree as client B.
3983 let project_b = Project::remote(
3984 project_id,
3985 client_b.clone(),
3986 client_b.user_store.clone(),
3987 lang_registry.clone(),
3988 fs.clone(),
3989 &mut cx_b.to_async(),
3990 )
3991 .await
3992 .unwrap();
3993 let mut params = cx_b.update(WorkspaceParams::test);
3994 params.languages = lang_registry.clone();
3995 params.client = client_b.client.clone();
3996 params.user_store = client_b.user_store.clone();
3997 params.project = project_b;
3998
3999 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
4000 let editor_b = workspace_b
4001 .update(cx_b, |workspace, cx| {
4002 workspace.open_path((worktree_id, "main.rs"), true, cx)
4003 })
4004 .await
4005 .unwrap()
4006 .downcast::<Editor>()
4007 .unwrap();
4008
4009 let mut fake_language_server = fake_language_servers.next().await.unwrap();
4010 fake_language_server
4011 .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4012 assert_eq!(
4013 params.text_document.uri,
4014 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4015 );
4016 assert_eq!(params.range.start, lsp::Position::new(0, 0));
4017 assert_eq!(params.range.end, lsp::Position::new(0, 0));
4018 Ok(None)
4019 })
4020 .next()
4021 .await;
4022
4023 // Move cursor to a location that contains code actions.
4024 editor_b.update(cx_b, |editor, cx| {
4025 editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
4026 cx.focus(&editor_b);
4027 });
4028
4029 fake_language_server
4030 .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4031 assert_eq!(
4032 params.text_document.uri,
4033 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4034 );
4035 assert_eq!(params.range.start, lsp::Position::new(1, 31));
4036 assert_eq!(params.range.end, lsp::Position::new(1, 31));
4037
4038 Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
4039 lsp::CodeAction {
4040 title: "Inline into all callers".to_string(),
4041 edit: Some(lsp::WorkspaceEdit {
4042 changes: Some(
4043 [
4044 (
4045 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4046 vec![lsp::TextEdit::new(
4047 lsp::Range::new(
4048 lsp::Position::new(1, 22),
4049 lsp::Position::new(1, 34),
4050 ),
4051 "4".to_string(),
4052 )],
4053 ),
4054 (
4055 lsp::Url::from_file_path("/a/other.rs").unwrap(),
4056 vec![lsp::TextEdit::new(
4057 lsp::Range::new(
4058 lsp::Position::new(0, 0),
4059 lsp::Position::new(0, 27),
4060 ),
4061 "".to_string(),
4062 )],
4063 ),
4064 ]
4065 .into_iter()
4066 .collect(),
4067 ),
4068 ..Default::default()
4069 }),
4070 data: Some(json!({
4071 "codeActionParams": {
4072 "range": {
4073 "start": {"line": 1, "column": 31},
4074 "end": {"line": 1, "column": 31},
4075 }
4076 }
4077 })),
4078 ..Default::default()
4079 },
4080 )]))
4081 })
4082 .next()
4083 .await;
4084
4085 // Toggle code actions and wait for them to display.
4086 editor_b.update(cx_b, |editor, cx| {
4087 editor.toggle_code_actions(
4088 &ToggleCodeActions {
4089 deployed_from_indicator: false,
4090 },
4091 cx,
4092 );
4093 });
4094 editor_b
4095 .condition(&cx_b, |editor, _| editor.context_menu_visible())
4096 .await;
4097
4098 fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
4099
4100 // Confirming the code action will trigger a resolve request.
4101 let confirm_action = workspace_b
4102 .update(cx_b, |workspace, cx| {
4103 Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
4104 })
4105 .unwrap();
4106 fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
4107 |_, _| async move {
4108 Ok(lsp::CodeAction {
4109 title: "Inline into all callers".to_string(),
4110 edit: Some(lsp::WorkspaceEdit {
4111 changes: Some(
4112 [
4113 (
4114 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4115 vec![lsp::TextEdit::new(
4116 lsp::Range::new(
4117 lsp::Position::new(1, 22),
4118 lsp::Position::new(1, 34),
4119 ),
4120 "4".to_string(),
4121 )],
4122 ),
4123 (
4124 lsp::Url::from_file_path("/a/other.rs").unwrap(),
4125 vec![lsp::TextEdit::new(
4126 lsp::Range::new(
4127 lsp::Position::new(0, 0),
4128 lsp::Position::new(0, 27),
4129 ),
4130 "".to_string(),
4131 )],
4132 ),
4133 ]
4134 .into_iter()
4135 .collect(),
4136 ),
4137 ..Default::default()
4138 }),
4139 ..Default::default()
4140 })
4141 },
4142 );
4143
4144 // After the action is confirmed, an editor containing both modified files is opened.
4145 confirm_action.await.unwrap();
4146 let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4147 workspace
4148 .active_item(cx)
4149 .unwrap()
4150 .downcast::<Editor>()
4151 .unwrap()
4152 });
4153 code_action_editor.update(cx_b, |editor, cx| {
4154 assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4155 editor.undo(&Undo, cx);
4156 assert_eq!(
4157 editor.text(cx),
4158 "mod other;\nfn main() { let foo = other::foo(); }\npub fn foo() -> usize { 4 }"
4159 );
4160 editor.redo(&Redo, cx);
4161 assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4162 });
4163 }
4164
4165 #[gpui::test(iterations = 10)]
4166 async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4167 cx_a.foreground().forbid_parking();
4168 let lang_registry = Arc::new(LanguageRegistry::test());
4169 let fs = FakeFs::new(cx_a.background());
4170 cx_b.update(|cx| editor::init(cx));
4171
4172 // Set up a fake language server.
4173 let mut language = Language::new(
4174 LanguageConfig {
4175 name: "Rust".into(),
4176 path_suffixes: vec!["rs".to_string()],
4177 ..Default::default()
4178 },
4179 Some(tree_sitter_rust::language()),
4180 );
4181 let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
4182 capabilities: lsp::ServerCapabilities {
4183 rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
4184 prepare_provider: Some(true),
4185 work_done_progress_options: Default::default(),
4186 })),
4187 ..Default::default()
4188 },
4189 ..Default::default()
4190 });
4191 lang_registry.add(Arc::new(language));
4192
4193 // Connect to a server as 2 clients.
4194 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4195 let client_a = server.create_client(cx_a, "user_a").await;
4196 let client_b = server.create_client(cx_b, "user_b").await;
4197
4198 // Share a project as client A
4199 fs.insert_tree(
4200 "/dir",
4201 json!({
4202 ".zed.toml": r#"collaborators = ["user_b"]"#,
4203 "one.rs": "const ONE: usize = 1;",
4204 "two.rs": "const TWO: usize = one::ONE + one::ONE;"
4205 }),
4206 )
4207 .await;
4208 let project_a = cx_a.update(|cx| {
4209 Project::local(
4210 client_a.clone(),
4211 client_a.user_store.clone(),
4212 lang_registry.clone(),
4213 fs.clone(),
4214 cx,
4215 )
4216 });
4217 let (worktree_a, _) = project_a
4218 .update(cx_a, |p, cx| {
4219 p.find_or_create_local_worktree("/dir", true, cx)
4220 })
4221 .await
4222 .unwrap();
4223 worktree_a
4224 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4225 .await;
4226 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4227 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4228 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4229
4230 // Join the worktree as client B.
4231 let project_b = Project::remote(
4232 project_id,
4233 client_b.clone(),
4234 client_b.user_store.clone(),
4235 lang_registry.clone(),
4236 fs.clone(),
4237 &mut cx_b.to_async(),
4238 )
4239 .await
4240 .unwrap();
4241 let mut params = cx_b.update(WorkspaceParams::test);
4242 params.languages = lang_registry.clone();
4243 params.client = client_b.client.clone();
4244 params.user_store = client_b.user_store.clone();
4245 params.project = project_b;
4246
4247 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
4248 let editor_b = workspace_b
4249 .update(cx_b, |workspace, cx| {
4250 workspace.open_path((worktree_id, "one.rs"), true, cx)
4251 })
4252 .await
4253 .unwrap()
4254 .downcast::<Editor>()
4255 .unwrap();
4256 let fake_language_server = fake_language_servers.next().await.unwrap();
4257
4258 // Move cursor to a location that can be renamed.
4259 let prepare_rename = editor_b.update(cx_b, |editor, cx| {
4260 editor.select_ranges([7..7], None, cx);
4261 editor.rename(&Rename, cx).unwrap()
4262 });
4263
4264 fake_language_server
4265 .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
4266 assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
4267 assert_eq!(params.position, lsp::Position::new(0, 7));
4268 Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4269 lsp::Position::new(0, 6),
4270 lsp::Position::new(0, 9),
4271 ))))
4272 })
4273 .next()
4274 .await
4275 .unwrap();
4276 prepare_rename.await.unwrap();
4277 editor_b.update(cx_b, |editor, cx| {
4278 let rename = editor.pending_rename().unwrap();
4279 let buffer = editor.buffer().read(cx).snapshot(cx);
4280 assert_eq!(
4281 rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
4282 6..9
4283 );
4284 rename.editor.update(cx, |rename_editor, cx| {
4285 rename_editor.buffer().update(cx, |rename_buffer, cx| {
4286 rename_buffer.edit([(0..3, "THREE")], cx);
4287 });
4288 });
4289 });
4290
4291 let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
4292 Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
4293 });
4294 fake_language_server
4295 .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
4296 assert_eq!(
4297 params.text_document_position.text_document.uri.as_str(),
4298 "file:///dir/one.rs"
4299 );
4300 assert_eq!(
4301 params.text_document_position.position,
4302 lsp::Position::new(0, 6)
4303 );
4304 assert_eq!(params.new_name, "THREE");
4305 Ok(Some(lsp::WorkspaceEdit {
4306 changes: Some(
4307 [
4308 (
4309 lsp::Url::from_file_path("/dir/one.rs").unwrap(),
4310 vec![lsp::TextEdit::new(
4311 lsp::Range::new(
4312 lsp::Position::new(0, 6),
4313 lsp::Position::new(0, 9),
4314 ),
4315 "THREE".to_string(),
4316 )],
4317 ),
4318 (
4319 lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4320 vec![
4321 lsp::TextEdit::new(
4322 lsp::Range::new(
4323 lsp::Position::new(0, 24),
4324 lsp::Position::new(0, 27),
4325 ),
4326 "THREE".to_string(),
4327 ),
4328 lsp::TextEdit::new(
4329 lsp::Range::new(
4330 lsp::Position::new(0, 35),
4331 lsp::Position::new(0, 38),
4332 ),
4333 "THREE".to_string(),
4334 ),
4335 ],
4336 ),
4337 ]
4338 .into_iter()
4339 .collect(),
4340 ),
4341 ..Default::default()
4342 }))
4343 })
4344 .next()
4345 .await
4346 .unwrap();
4347 confirm_rename.await.unwrap();
4348
4349 let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4350 workspace
4351 .active_item(cx)
4352 .unwrap()
4353 .downcast::<Editor>()
4354 .unwrap()
4355 });
4356 rename_editor.update(cx_b, |editor, cx| {
4357 assert_eq!(
4358 editor.text(cx),
4359 "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4360 );
4361 editor.undo(&Undo, cx);
4362 assert_eq!(
4363 editor.text(cx),
4364 "const ONE: usize = 1;\nconst TWO: usize = one::ONE + one::ONE;"
4365 );
4366 editor.redo(&Redo, cx);
4367 assert_eq!(
4368 editor.text(cx),
4369 "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4370 );
4371 });
4372
4373 // Ensure temporary rename edits cannot be undone/redone.
4374 editor_b.update(cx_b, |editor, cx| {
4375 editor.undo(&Undo, cx);
4376 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4377 editor.undo(&Undo, cx);
4378 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4379 editor.redo(&Redo, cx);
4380 assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4381 })
4382 }
4383
4384 #[gpui::test(iterations = 10)]
4385 async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4386 cx_a.foreground().forbid_parking();
4387
4388 // Connect to a server as 2 clients.
4389 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4390 let client_a = server.create_client(cx_a, "user_a").await;
4391 let client_b = server.create_client(cx_b, "user_b").await;
4392
4393 // Create an org that includes these 2 users.
4394 let db = &server.app_state.db;
4395 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4396 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4397 .await
4398 .unwrap();
4399 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4400 .await
4401 .unwrap();
4402
4403 // Create a channel that includes all the users.
4404 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4405 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4406 .await
4407 .unwrap();
4408 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4409 .await
4410 .unwrap();
4411 db.create_channel_message(
4412 channel_id,
4413 client_b.current_user_id(&cx_b),
4414 "hello A, it's B.",
4415 OffsetDateTime::now_utc(),
4416 1,
4417 )
4418 .await
4419 .unwrap();
4420
4421 let channels_a = cx_a
4422 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4423 channels_a
4424 .condition(cx_a, |list, _| list.available_channels().is_some())
4425 .await;
4426 channels_a.read_with(cx_a, |list, _| {
4427 assert_eq!(
4428 list.available_channels().unwrap(),
4429 &[ChannelDetails {
4430 id: channel_id.to_proto(),
4431 name: "test-channel".to_string()
4432 }]
4433 )
4434 });
4435 let channel_a = channels_a.update(cx_a, |this, cx| {
4436 this.get_channel(channel_id.to_proto(), cx).unwrap()
4437 });
4438 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4439 channel_a
4440 .condition(&cx_a, |channel, _| {
4441 channel_messages(channel)
4442 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4443 })
4444 .await;
4445
4446 let channels_b = cx_b
4447 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4448 channels_b
4449 .condition(cx_b, |list, _| list.available_channels().is_some())
4450 .await;
4451 channels_b.read_with(cx_b, |list, _| {
4452 assert_eq!(
4453 list.available_channels().unwrap(),
4454 &[ChannelDetails {
4455 id: channel_id.to_proto(),
4456 name: "test-channel".to_string()
4457 }]
4458 )
4459 });
4460
4461 let channel_b = channels_b.update(cx_b, |this, cx| {
4462 this.get_channel(channel_id.to_proto(), cx).unwrap()
4463 });
4464 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4465 channel_b
4466 .condition(&cx_b, |channel, _| {
4467 channel_messages(channel)
4468 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4469 })
4470 .await;
4471
4472 channel_a
4473 .update(cx_a, |channel, cx| {
4474 channel
4475 .send_message("oh, hi B.".to_string(), cx)
4476 .unwrap()
4477 .detach();
4478 let task = channel.send_message("sup".to_string(), cx).unwrap();
4479 assert_eq!(
4480 channel_messages(channel),
4481 &[
4482 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4483 ("user_a".to_string(), "oh, hi B.".to_string(), true),
4484 ("user_a".to_string(), "sup".to_string(), true)
4485 ]
4486 );
4487 task
4488 })
4489 .await
4490 .unwrap();
4491
4492 channel_b
4493 .condition(&cx_b, |channel, _| {
4494 channel_messages(channel)
4495 == [
4496 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4497 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4498 ("user_a".to_string(), "sup".to_string(), false),
4499 ]
4500 })
4501 .await;
4502
4503 assert_eq!(
4504 server
4505 .state()
4506 .await
4507 .channel(channel_id)
4508 .unwrap()
4509 .connection_ids
4510 .len(),
4511 2
4512 );
4513 cx_b.update(|_| drop(channel_b));
4514 server
4515 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4516 .await;
4517
4518 cx_a.update(|_| drop(channel_a));
4519 server
4520 .condition(|state| state.channel(channel_id).is_none())
4521 .await;
4522 }
4523
4524 #[gpui::test(iterations = 10)]
4525 async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4526 cx_a.foreground().forbid_parking();
4527
4528 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4529 let client_a = server.create_client(cx_a, "user_a").await;
4530
4531 let db = &server.app_state.db;
4532 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4533 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4534 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4535 .await
4536 .unwrap();
4537 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4538 .await
4539 .unwrap();
4540
4541 let channels_a = cx_a
4542 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4543 channels_a
4544 .condition(cx_a, |list, _| list.available_channels().is_some())
4545 .await;
4546 let channel_a = channels_a.update(cx_a, |this, cx| {
4547 this.get_channel(channel_id.to_proto(), cx).unwrap()
4548 });
4549
4550 // Messages aren't allowed to be too long.
4551 channel_a
4552 .update(cx_a, |channel, cx| {
4553 let long_body = "this is long.\n".repeat(1024);
4554 channel.send_message(long_body, cx).unwrap()
4555 })
4556 .await
4557 .unwrap_err();
4558
4559 // Messages aren't allowed to be blank.
4560 channel_a.update(cx_a, |channel, cx| {
4561 channel.send_message(String::new(), cx).unwrap_err()
4562 });
4563
4564 // Leading and trailing whitespace are trimmed.
4565 channel_a
4566 .update(cx_a, |channel, cx| {
4567 channel
4568 .send_message("\n surrounded by whitespace \n".to_string(), cx)
4569 .unwrap()
4570 })
4571 .await
4572 .unwrap();
4573 assert_eq!(
4574 db.get_channel_messages(channel_id, 10, None)
4575 .await
4576 .unwrap()
4577 .iter()
4578 .map(|m| &m.body)
4579 .collect::<Vec<_>>(),
4580 &["surrounded by whitespace"]
4581 );
4582 }
4583
4584 #[gpui::test(iterations = 10)]
4585 async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4586 cx_a.foreground().forbid_parking();
4587
4588 // Connect to a server as 2 clients.
4589 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4590 let client_a = server.create_client(cx_a, "user_a").await;
4591 let client_b = server.create_client(cx_b, "user_b").await;
4592 let mut status_b = client_b.status();
4593
4594 // Create an org that includes these 2 users.
4595 let db = &server.app_state.db;
4596 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4597 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4598 .await
4599 .unwrap();
4600 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4601 .await
4602 .unwrap();
4603
4604 // Create a channel that includes all the users.
4605 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4606 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4607 .await
4608 .unwrap();
4609 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4610 .await
4611 .unwrap();
4612 db.create_channel_message(
4613 channel_id,
4614 client_b.current_user_id(&cx_b),
4615 "hello A, it's B.",
4616 OffsetDateTime::now_utc(),
4617 2,
4618 )
4619 .await
4620 .unwrap();
4621
4622 let channels_a = cx_a
4623 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4624 channels_a
4625 .condition(cx_a, |list, _| list.available_channels().is_some())
4626 .await;
4627
4628 channels_a.read_with(cx_a, |list, _| {
4629 assert_eq!(
4630 list.available_channels().unwrap(),
4631 &[ChannelDetails {
4632 id: channel_id.to_proto(),
4633 name: "test-channel".to_string()
4634 }]
4635 )
4636 });
4637 let channel_a = channels_a.update(cx_a, |this, cx| {
4638 this.get_channel(channel_id.to_proto(), cx).unwrap()
4639 });
4640 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4641 channel_a
4642 .condition(&cx_a, |channel, _| {
4643 channel_messages(channel)
4644 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4645 })
4646 .await;
4647
4648 let channels_b = cx_b
4649 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4650 channels_b
4651 .condition(cx_b, |list, _| list.available_channels().is_some())
4652 .await;
4653 channels_b.read_with(cx_b, |list, _| {
4654 assert_eq!(
4655 list.available_channels().unwrap(),
4656 &[ChannelDetails {
4657 id: channel_id.to_proto(),
4658 name: "test-channel".to_string()
4659 }]
4660 )
4661 });
4662
4663 let channel_b = channels_b.update(cx_b, |this, cx| {
4664 this.get_channel(channel_id.to_proto(), cx).unwrap()
4665 });
4666 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4667 channel_b
4668 .condition(&cx_b, |channel, _| {
4669 channel_messages(channel)
4670 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4671 })
4672 .await;
4673
4674 // Disconnect client B, ensuring we can still access its cached channel data.
4675 server.forbid_connections();
4676 server.disconnect_client(client_b.current_user_id(&cx_b));
4677 cx_b.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
4678 while !matches!(
4679 status_b.next().await,
4680 Some(client::Status::ReconnectionError { .. })
4681 ) {}
4682
4683 channels_b.read_with(cx_b, |channels, _| {
4684 assert_eq!(
4685 channels.available_channels().unwrap(),
4686 [ChannelDetails {
4687 id: channel_id.to_proto(),
4688 name: "test-channel".to_string()
4689 }]
4690 )
4691 });
4692 channel_b.read_with(cx_b, |channel, _| {
4693 assert_eq!(
4694 channel_messages(channel),
4695 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4696 )
4697 });
4698
4699 // Send a message from client B while it is disconnected.
4700 channel_b
4701 .update(cx_b, |channel, cx| {
4702 let task = channel
4703 .send_message("can you see this?".to_string(), cx)
4704 .unwrap();
4705 assert_eq!(
4706 channel_messages(channel),
4707 &[
4708 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4709 ("user_b".to_string(), "can you see this?".to_string(), true)
4710 ]
4711 );
4712 task
4713 })
4714 .await
4715 .unwrap_err();
4716
4717 // Send a message from client A while B is disconnected.
4718 channel_a
4719 .update(cx_a, |channel, cx| {
4720 channel
4721 .send_message("oh, hi B.".to_string(), cx)
4722 .unwrap()
4723 .detach();
4724 let task = channel.send_message("sup".to_string(), cx).unwrap();
4725 assert_eq!(
4726 channel_messages(channel),
4727 &[
4728 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4729 ("user_a".to_string(), "oh, hi B.".to_string(), true),
4730 ("user_a".to_string(), "sup".to_string(), true)
4731 ]
4732 );
4733 task
4734 })
4735 .await
4736 .unwrap();
4737
4738 // Give client B a chance to reconnect.
4739 server.allow_connections();
4740 cx_b.foreground().advance_clock(Duration::from_secs(10));
4741
4742 // Verify that B sees the new messages upon reconnection, as well as the message client B
4743 // sent while offline.
4744 channel_b
4745 .condition(&cx_b, |channel, _| {
4746 channel_messages(channel)
4747 == [
4748 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4749 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4750 ("user_a".to_string(), "sup".to_string(), false),
4751 ("user_b".to_string(), "can you see this?".to_string(), false),
4752 ]
4753 })
4754 .await;
4755
4756 // Ensure client A and B can communicate normally after reconnection.
4757 channel_a
4758 .update(cx_a, |channel, cx| {
4759 channel.send_message("you online?".to_string(), cx).unwrap()
4760 })
4761 .await
4762 .unwrap();
4763 channel_b
4764 .condition(&cx_b, |channel, _| {
4765 channel_messages(channel)
4766 == [
4767 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4768 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4769 ("user_a".to_string(), "sup".to_string(), false),
4770 ("user_b".to_string(), "can you see this?".to_string(), false),
4771 ("user_a".to_string(), "you online?".to_string(), false),
4772 ]
4773 })
4774 .await;
4775
4776 channel_b
4777 .update(cx_b, |channel, cx| {
4778 channel.send_message("yep".to_string(), cx).unwrap()
4779 })
4780 .await
4781 .unwrap();
4782 channel_a
4783 .condition(&cx_a, |channel, _| {
4784 channel_messages(channel)
4785 == [
4786 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4787 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4788 ("user_a".to_string(), "sup".to_string(), false),
4789 ("user_b".to_string(), "can you see this?".to_string(), false),
4790 ("user_a".to_string(), "you online?".to_string(), false),
4791 ("user_b".to_string(), "yep".to_string(), false),
4792 ]
4793 })
4794 .await;
4795 }
4796
4797 #[gpui::test(iterations = 10)]
4798 async fn test_contacts(
4799 cx_a: &mut TestAppContext,
4800 cx_b: &mut TestAppContext,
4801 cx_c: &mut TestAppContext,
4802 ) {
4803 cx_a.foreground().forbid_parking();
4804 let lang_registry = Arc::new(LanguageRegistry::test());
4805 let fs = FakeFs::new(cx_a.background());
4806
4807 // Connect to a server as 3 clients.
4808 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4809 let client_a = server.create_client(cx_a, "user_a").await;
4810 let client_b = server.create_client(cx_b, "user_b").await;
4811 let client_c = server.create_client(cx_c, "user_c").await;
4812
4813 // Share a worktree as client A.
4814 fs.insert_tree(
4815 "/a",
4816 json!({
4817 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4818 }),
4819 )
4820 .await;
4821
4822 let project_a = cx_a.update(|cx| {
4823 Project::local(
4824 client_a.clone(),
4825 client_a.user_store.clone(),
4826 lang_registry.clone(),
4827 fs.clone(),
4828 cx,
4829 )
4830 });
4831 let (worktree_a, _) = project_a
4832 .update(cx_a, |p, cx| {
4833 p.find_or_create_local_worktree("/a", true, cx)
4834 })
4835 .await
4836 .unwrap();
4837 worktree_a
4838 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4839 .await;
4840
4841 client_a
4842 .user_store
4843 .condition(&cx_a, |user_store, _| {
4844 contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4845 })
4846 .await;
4847 client_b
4848 .user_store
4849 .condition(&cx_b, |user_store, _| {
4850 contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4851 })
4852 .await;
4853 client_c
4854 .user_store
4855 .condition(&cx_c, |user_store, _| {
4856 contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4857 })
4858 .await;
4859
4860 let project_id = project_a
4861 .update(cx_a, |project, _| project.next_remote_id())
4862 .await;
4863 project_a
4864 .update(cx_a, |project, cx| project.share(cx))
4865 .await
4866 .unwrap();
4867 client_a
4868 .user_store
4869 .condition(&cx_a, |user_store, _| {
4870 contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4871 })
4872 .await;
4873 client_b
4874 .user_store
4875 .condition(&cx_b, |user_store, _| {
4876 contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4877 })
4878 .await;
4879 client_c
4880 .user_store
4881 .condition(&cx_c, |user_store, _| {
4882 contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4883 })
4884 .await;
4885
4886 let _project_b = Project::remote(
4887 project_id,
4888 client_b.clone(),
4889 client_b.user_store.clone(),
4890 lang_registry.clone(),
4891 fs.clone(),
4892 &mut cx_b.to_async(),
4893 )
4894 .await
4895 .unwrap();
4896
4897 client_a
4898 .user_store
4899 .condition(&cx_a, |user_store, _| {
4900 contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4901 })
4902 .await;
4903 client_b
4904 .user_store
4905 .condition(&cx_b, |user_store, _| {
4906 contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4907 })
4908 .await;
4909 client_c
4910 .user_store
4911 .condition(&cx_c, |user_store, _| {
4912 contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4913 })
4914 .await;
4915
4916 project_a
4917 .condition(&cx_a, |project, _| {
4918 project.collaborators().contains_key(&client_b.peer_id)
4919 })
4920 .await;
4921
4922 cx_a.update(move |_| drop(project_a));
4923 client_a
4924 .user_store
4925 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4926 .await;
4927 client_b
4928 .user_store
4929 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4930 .await;
4931 client_c
4932 .user_store
4933 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4934 .await;
4935
4936 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, bool, Vec<&str>)>)> {
4937 user_store
4938 .contacts()
4939 .iter()
4940 .map(|contact| {
4941 let worktrees = contact
4942 .projects
4943 .iter()
4944 .map(|p| {
4945 (
4946 p.worktree_root_names[0].as_str(),
4947 p.is_shared,
4948 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4949 )
4950 })
4951 .collect();
4952 (contact.user.github_login.as_str(), worktrees)
4953 })
4954 .collect()
4955 }
4956 }
4957
4958 #[gpui::test(iterations = 10)]
4959 async fn test_contacts_requests(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4960 cx_a.foreground().forbid_parking();
4961
4962 // Connect to a server as 3 clients.
4963 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4964 let client_a = server.create_client(cx_a, "user_a").await;
4965 let client_b = server.create_client(cx_b, "user_b").await;
4966
4967 client_a
4968 .user_store
4969 .read_with(cx_a, |store, _| {
4970 store.request_contact(client_b.user_id().unwrap())
4971 })
4972 .await
4973 .unwrap();
4974
4975 client_a.user_store.read_with(cx_a, |store, _| {
4976 let contacts = store
4977 .contacts()
4978 .iter()
4979 .map(|contact| contact.user.github_login.clone())
4980 .collect::<Vec<_>>();
4981 assert_eq!(contacts, &["user_b"])
4982 });
4983 }
4984
4985 #[gpui::test(iterations = 10)]
4986 async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4987 cx_a.foreground().forbid_parking();
4988 let fs = FakeFs::new(cx_a.background());
4989
4990 // 2 clients connect to a server.
4991 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4992 let mut client_a = server.create_client(cx_a, "user_a").await;
4993 let mut client_b = server.create_client(cx_b, "user_b").await;
4994 cx_a.update(editor::init);
4995 cx_b.update(editor::init);
4996
4997 // Client A shares a project.
4998 fs.insert_tree(
4999 "/a",
5000 json!({
5001 ".zed.toml": r#"collaborators = ["user_b"]"#,
5002 "1.txt": "one",
5003 "2.txt": "two",
5004 "3.txt": "three",
5005 }),
5006 )
5007 .await;
5008 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5009 project_a
5010 .update(cx_a, |project, cx| project.share(cx))
5011 .await
5012 .unwrap();
5013
5014 // Client B joins the project.
5015 let project_b = client_b
5016 .build_remote_project(
5017 project_a
5018 .read_with(cx_a, |project, _| project.remote_id())
5019 .unwrap(),
5020 cx_b,
5021 )
5022 .await;
5023
5024 // Client A opens some editors.
5025 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5026 let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5027 let editor_a1 = workspace_a
5028 .update(cx_a, |workspace, cx| {
5029 workspace.open_path((worktree_id, "1.txt"), true, cx)
5030 })
5031 .await
5032 .unwrap()
5033 .downcast::<Editor>()
5034 .unwrap();
5035 let editor_a2 = workspace_a
5036 .update(cx_a, |workspace, cx| {
5037 workspace.open_path((worktree_id, "2.txt"), true, cx)
5038 })
5039 .await
5040 .unwrap()
5041 .downcast::<Editor>()
5042 .unwrap();
5043
5044 // Client B opens an editor.
5045 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5046 let editor_b1 = workspace_b
5047 .update(cx_b, |workspace, cx| {
5048 workspace.open_path((worktree_id, "1.txt"), true, cx)
5049 })
5050 .await
5051 .unwrap()
5052 .downcast::<Editor>()
5053 .unwrap();
5054
5055 let client_a_id = project_b.read_with(cx_b, |project, _| {
5056 project.collaborators().values().next().unwrap().peer_id
5057 });
5058 let client_b_id = project_a.read_with(cx_a, |project, _| {
5059 project.collaborators().values().next().unwrap().peer_id
5060 });
5061
5062 // When client B starts following client A, all visible view states are replicated to client B.
5063 editor_a1.update(cx_a, |editor, cx| editor.select_ranges([0..1], None, cx));
5064 editor_a2.update(cx_a, |editor, cx| editor.select_ranges([2..3], None, cx));
5065 workspace_b
5066 .update(cx_b, |workspace, cx| {
5067 workspace
5068 .toggle_follow(&ToggleFollow(client_a_id), cx)
5069 .unwrap()
5070 })
5071 .await
5072 .unwrap();
5073 let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5074 workspace
5075 .active_item(cx)
5076 .unwrap()
5077 .downcast::<Editor>()
5078 .unwrap()
5079 });
5080 assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
5081 assert_eq!(
5082 editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
5083 Some((worktree_id, "2.txt").into())
5084 );
5085 assert_eq!(
5086 editor_b2.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
5087 vec![2..3]
5088 );
5089 assert_eq!(
5090 editor_b1.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
5091 vec![0..1]
5092 );
5093
5094 // When client A activates a different editor, client B does so as well.
5095 workspace_a.update(cx_a, |workspace, cx| {
5096 workspace.activate_item(&editor_a1, cx)
5097 });
5098 workspace_b
5099 .condition(cx_b, |workspace, cx| {
5100 workspace.active_item(cx).unwrap().id() == editor_b1.id()
5101 })
5102 .await;
5103
5104 // When client A navigates back and forth, client B does so as well.
5105 workspace_a
5106 .update(cx_a, |workspace, cx| {
5107 workspace::Pane::go_back(workspace, None, cx)
5108 })
5109 .await;
5110 workspace_b
5111 .condition(cx_b, |workspace, cx| {
5112 workspace.active_item(cx).unwrap().id() == editor_b2.id()
5113 })
5114 .await;
5115
5116 workspace_a
5117 .update(cx_a, |workspace, cx| {
5118 workspace::Pane::go_forward(workspace, None, cx)
5119 })
5120 .await;
5121 workspace_b
5122 .condition(cx_b, |workspace, cx| {
5123 workspace.active_item(cx).unwrap().id() == editor_b1.id()
5124 })
5125 .await;
5126
5127 // Changes to client A's editor are reflected on client B.
5128 editor_a1.update(cx_a, |editor, cx| {
5129 editor.select_ranges([1..1, 2..2], None, cx);
5130 });
5131 editor_b1
5132 .condition(cx_b, |editor, cx| {
5133 editor.selected_ranges(cx) == vec![1..1, 2..2]
5134 })
5135 .await;
5136
5137 editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
5138 editor_b1
5139 .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
5140 .await;
5141
5142 editor_a1.update(cx_a, |editor, cx| {
5143 editor.select_ranges([3..3], None, cx);
5144 editor.set_scroll_position(vec2f(0., 100.), cx);
5145 });
5146 editor_b1
5147 .condition(cx_b, |editor, cx| editor.selected_ranges(cx) == vec![3..3])
5148 .await;
5149
5150 // After unfollowing, client B stops receiving updates from client A.
5151 workspace_b.update(cx_b, |workspace, cx| {
5152 workspace.unfollow(&workspace.active_pane().clone(), cx)
5153 });
5154 workspace_a.update(cx_a, |workspace, cx| {
5155 workspace.activate_item(&editor_a2, cx)
5156 });
5157 cx_a.foreground().run_until_parked();
5158 assert_eq!(
5159 workspace_b.read_with(cx_b, |workspace, cx| workspace
5160 .active_item(cx)
5161 .unwrap()
5162 .id()),
5163 editor_b1.id()
5164 );
5165
5166 // Client A starts following client B.
5167 workspace_a
5168 .update(cx_a, |workspace, cx| {
5169 workspace
5170 .toggle_follow(&ToggleFollow(client_b_id), cx)
5171 .unwrap()
5172 })
5173 .await
5174 .unwrap();
5175 assert_eq!(
5176 workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5177 Some(client_b_id)
5178 );
5179 assert_eq!(
5180 workspace_a.read_with(cx_a, |workspace, cx| workspace
5181 .active_item(cx)
5182 .unwrap()
5183 .id()),
5184 editor_a1.id()
5185 );
5186
5187 // Following interrupts when client B disconnects.
5188 client_b.disconnect(&cx_b.to_async()).unwrap();
5189 cx_a.foreground().run_until_parked();
5190 assert_eq!(
5191 workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5192 None
5193 );
5194 }
5195
5196 #[gpui::test(iterations = 10)]
5197 async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5198 cx_a.foreground().forbid_parking();
5199 let fs = FakeFs::new(cx_a.background());
5200
5201 // 2 clients connect to a server.
5202 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5203 let mut client_a = server.create_client(cx_a, "user_a").await;
5204 let mut client_b = server.create_client(cx_b, "user_b").await;
5205 cx_a.update(editor::init);
5206 cx_b.update(editor::init);
5207
5208 // Client A shares a project.
5209 fs.insert_tree(
5210 "/a",
5211 json!({
5212 ".zed.toml": r#"collaborators = ["user_b"]"#,
5213 "1.txt": "one",
5214 "2.txt": "two",
5215 "3.txt": "three",
5216 "4.txt": "four",
5217 }),
5218 )
5219 .await;
5220 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5221 project_a
5222 .update(cx_a, |project, cx| project.share(cx))
5223 .await
5224 .unwrap();
5225
5226 // Client B joins the project.
5227 let project_b = client_b
5228 .build_remote_project(
5229 project_a
5230 .read_with(cx_a, |project, _| project.remote_id())
5231 .unwrap(),
5232 cx_b,
5233 )
5234 .await;
5235
5236 // Client A opens some editors.
5237 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5238 let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5239 let _editor_a1 = workspace_a
5240 .update(cx_a, |workspace, cx| {
5241 workspace.open_path((worktree_id, "1.txt"), true, cx)
5242 })
5243 .await
5244 .unwrap()
5245 .downcast::<Editor>()
5246 .unwrap();
5247
5248 // Client B opens an editor.
5249 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5250 let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5251 let _editor_b1 = workspace_b
5252 .update(cx_b, |workspace, cx| {
5253 workspace.open_path((worktree_id, "2.txt"), true, cx)
5254 })
5255 .await
5256 .unwrap()
5257 .downcast::<Editor>()
5258 .unwrap();
5259
5260 // Clients A and B follow each other in split panes
5261 workspace_a
5262 .update(cx_a, |workspace, cx| {
5263 workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5264 assert_ne!(*workspace.active_pane(), pane_a1);
5265 let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
5266 workspace
5267 .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5268 .unwrap()
5269 })
5270 .await
5271 .unwrap();
5272 workspace_b
5273 .update(cx_b, |workspace, cx| {
5274 workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5275 assert_ne!(*workspace.active_pane(), pane_b1);
5276 let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
5277 workspace
5278 .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5279 .unwrap()
5280 })
5281 .await
5282 .unwrap();
5283
5284 workspace_a
5285 .update(cx_a, |workspace, cx| {
5286 workspace.activate_next_pane(cx);
5287 assert_eq!(*workspace.active_pane(), pane_a1);
5288 workspace.open_path((worktree_id, "3.txt"), true, cx)
5289 })
5290 .await
5291 .unwrap();
5292 workspace_b
5293 .update(cx_b, |workspace, cx| {
5294 workspace.activate_next_pane(cx);
5295 assert_eq!(*workspace.active_pane(), pane_b1);
5296 workspace.open_path((worktree_id, "4.txt"), true, cx)
5297 })
5298 .await
5299 .unwrap();
5300 cx_a.foreground().run_until_parked();
5301
5302 // Ensure leader updates don't change the active pane of followers
5303 workspace_a.read_with(cx_a, |workspace, _| {
5304 assert_eq!(*workspace.active_pane(), pane_a1);
5305 });
5306 workspace_b.read_with(cx_b, |workspace, _| {
5307 assert_eq!(*workspace.active_pane(), pane_b1);
5308 });
5309
5310 // Ensure peers following each other doesn't cause an infinite loop.
5311 assert_eq!(
5312 workspace_a.read_with(cx_a, |workspace, cx| workspace
5313 .active_item(cx)
5314 .unwrap()
5315 .project_path(cx)),
5316 Some((worktree_id, "3.txt").into())
5317 );
5318 workspace_a.update(cx_a, |workspace, cx| {
5319 assert_eq!(
5320 workspace.active_item(cx).unwrap().project_path(cx),
5321 Some((worktree_id, "3.txt").into())
5322 );
5323 workspace.activate_next_pane(cx);
5324 assert_eq!(
5325 workspace.active_item(cx).unwrap().project_path(cx),
5326 Some((worktree_id, "4.txt").into())
5327 );
5328 });
5329 workspace_b.update(cx_b, |workspace, cx| {
5330 assert_eq!(
5331 workspace.active_item(cx).unwrap().project_path(cx),
5332 Some((worktree_id, "4.txt").into())
5333 );
5334 workspace.activate_next_pane(cx);
5335 assert_eq!(
5336 workspace.active_item(cx).unwrap().project_path(cx),
5337 Some((worktree_id, "3.txt").into())
5338 );
5339 });
5340 }
5341
5342 #[gpui::test(iterations = 10)]
5343 async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5344 cx_a.foreground().forbid_parking();
5345 let fs = FakeFs::new(cx_a.background());
5346
5347 // 2 clients connect to a server.
5348 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5349 let mut client_a = server.create_client(cx_a, "user_a").await;
5350 let mut client_b = server.create_client(cx_b, "user_b").await;
5351 cx_a.update(editor::init);
5352 cx_b.update(editor::init);
5353
5354 // Client A shares a project.
5355 fs.insert_tree(
5356 "/a",
5357 json!({
5358 ".zed.toml": r#"collaborators = ["user_b"]"#,
5359 "1.txt": "one",
5360 "2.txt": "two",
5361 "3.txt": "three",
5362 }),
5363 )
5364 .await;
5365 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5366 project_a
5367 .update(cx_a, |project, cx| project.share(cx))
5368 .await
5369 .unwrap();
5370
5371 // Client B joins the project.
5372 let project_b = client_b
5373 .build_remote_project(
5374 project_a
5375 .read_with(cx_a, |project, _| project.remote_id())
5376 .unwrap(),
5377 cx_b,
5378 )
5379 .await;
5380
5381 // Client A opens some editors.
5382 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5383 let _editor_a1 = workspace_a
5384 .update(cx_a, |workspace, cx| {
5385 workspace.open_path((worktree_id, "1.txt"), true, cx)
5386 })
5387 .await
5388 .unwrap()
5389 .downcast::<Editor>()
5390 .unwrap();
5391
5392 // Client B starts following client A.
5393 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5394 let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5395 let leader_id = project_b.read_with(cx_b, |project, _| {
5396 project.collaborators().values().next().unwrap().peer_id
5397 });
5398 workspace_b
5399 .update(cx_b, |workspace, cx| {
5400 workspace
5401 .toggle_follow(&ToggleFollow(leader_id), cx)
5402 .unwrap()
5403 })
5404 .await
5405 .unwrap();
5406 assert_eq!(
5407 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5408 Some(leader_id)
5409 );
5410 let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5411 workspace
5412 .active_item(cx)
5413 .unwrap()
5414 .downcast::<Editor>()
5415 .unwrap()
5416 });
5417
5418 // When client B moves, it automatically stops following client A.
5419 editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
5420 assert_eq!(
5421 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5422 None
5423 );
5424
5425 workspace_b
5426 .update(cx_b, |workspace, cx| {
5427 workspace
5428 .toggle_follow(&ToggleFollow(leader_id), cx)
5429 .unwrap()
5430 })
5431 .await
5432 .unwrap();
5433 assert_eq!(
5434 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5435 Some(leader_id)
5436 );
5437
5438 // When client B edits, it automatically stops following client A.
5439 editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
5440 assert_eq!(
5441 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5442 None
5443 );
5444
5445 workspace_b
5446 .update(cx_b, |workspace, cx| {
5447 workspace
5448 .toggle_follow(&ToggleFollow(leader_id), cx)
5449 .unwrap()
5450 })
5451 .await
5452 .unwrap();
5453 assert_eq!(
5454 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5455 Some(leader_id)
5456 );
5457
5458 // When client B scrolls, it automatically stops following client A.
5459 editor_b2.update(cx_b, |editor, cx| {
5460 editor.set_scroll_position(vec2f(0., 3.), cx)
5461 });
5462 assert_eq!(
5463 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5464 None
5465 );
5466
5467 workspace_b
5468 .update(cx_b, |workspace, cx| {
5469 workspace
5470 .toggle_follow(&ToggleFollow(leader_id), cx)
5471 .unwrap()
5472 })
5473 .await
5474 .unwrap();
5475 assert_eq!(
5476 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5477 Some(leader_id)
5478 );
5479
5480 // When client B activates a different pane, it continues following client A in the original pane.
5481 workspace_b.update(cx_b, |workspace, cx| {
5482 workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
5483 });
5484 assert_eq!(
5485 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5486 Some(leader_id)
5487 );
5488
5489 workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
5490 assert_eq!(
5491 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5492 Some(leader_id)
5493 );
5494
5495 // When client B activates a different item in the original pane, it automatically stops following client A.
5496 workspace_b
5497 .update(cx_b, |workspace, cx| {
5498 workspace.open_path((worktree_id, "2.txt"), true, cx)
5499 })
5500 .await
5501 .unwrap();
5502 assert_eq!(
5503 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5504 None
5505 );
5506 }
5507
5508 #[gpui::test(iterations = 100)]
5509 async fn test_random_collaboration(
5510 cx: &mut TestAppContext,
5511 deterministic: Arc<Deterministic>,
5512 rng: StdRng,
5513 ) {
5514 cx.foreground().forbid_parking();
5515 let max_peers = env::var("MAX_PEERS")
5516 .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
5517 .unwrap_or(5);
5518 assert!(max_peers <= 5);
5519
5520 let max_operations = env::var("OPERATIONS")
5521 .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
5522 .unwrap_or(10);
5523
5524 let rng = Arc::new(Mutex::new(rng));
5525
5526 let guest_lang_registry = Arc::new(LanguageRegistry::test());
5527 let host_language_registry = Arc::new(LanguageRegistry::test());
5528
5529 let fs = FakeFs::new(cx.background());
5530 fs.insert_tree(
5531 "/_collab",
5532 json!({
5533 ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4"]"#
5534 }),
5535 )
5536 .await;
5537
5538 let mut server = TestServer::start(cx.foreground(), cx.background()).await;
5539 let mut clients = Vec::new();
5540 let mut user_ids = Vec::new();
5541 let mut op_start_signals = Vec::new();
5542 let files = Arc::new(Mutex::new(Vec::new()));
5543
5544 let mut next_entity_id = 100000;
5545 let mut host_cx = TestAppContext::new(
5546 cx.foreground_platform(),
5547 cx.platform(),
5548 deterministic.build_foreground(next_entity_id),
5549 deterministic.build_background(),
5550 cx.font_cache(),
5551 cx.leak_detector(),
5552 next_entity_id,
5553 );
5554 let host = server.create_client(&mut host_cx, "host").await;
5555 let host_project = host_cx.update(|cx| {
5556 Project::local(
5557 host.client.clone(),
5558 host.user_store.clone(),
5559 host_language_registry.clone(),
5560 fs.clone(),
5561 cx,
5562 )
5563 });
5564 let host_project_id = host_project
5565 .update(&mut host_cx, |p, _| p.next_remote_id())
5566 .await;
5567
5568 let (collab_worktree, _) = host_project
5569 .update(&mut host_cx, |project, cx| {
5570 project.find_or_create_local_worktree("/_collab", true, cx)
5571 })
5572 .await
5573 .unwrap();
5574 collab_worktree
5575 .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
5576 .await;
5577 host_project
5578 .update(&mut host_cx, |project, cx| project.share(cx))
5579 .await
5580 .unwrap();
5581
5582 // Set up fake language servers.
5583 let mut language = Language::new(
5584 LanguageConfig {
5585 name: "Rust".into(),
5586 path_suffixes: vec!["rs".to_string()],
5587 ..Default::default()
5588 },
5589 None,
5590 );
5591 let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
5592 name: "the-fake-language-server",
5593 capabilities: lsp::LanguageServer::full_capabilities(),
5594 initializer: Some(Box::new({
5595 let rng = rng.clone();
5596 let files = files.clone();
5597 let project = host_project.downgrade();
5598 move |fake_server: &mut FakeLanguageServer| {
5599 fake_server.handle_request::<lsp::request::Completion, _, _>(
5600 |_, _| async move {
5601 Ok(Some(lsp::CompletionResponse::Array(vec![
5602 lsp::CompletionItem {
5603 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
5604 range: lsp::Range::new(
5605 lsp::Position::new(0, 0),
5606 lsp::Position::new(0, 0),
5607 ),
5608 new_text: "the-new-text".to_string(),
5609 })),
5610 ..Default::default()
5611 },
5612 ])))
5613 },
5614 );
5615
5616 fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
5617 |_, _| async move {
5618 Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
5619 lsp::CodeAction {
5620 title: "the-code-action".to_string(),
5621 ..Default::default()
5622 },
5623 )]))
5624 },
5625 );
5626
5627 fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
5628 |params, _| async move {
5629 Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
5630 params.position,
5631 params.position,
5632 ))))
5633 },
5634 );
5635
5636 fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
5637 let files = files.clone();
5638 let rng = rng.clone();
5639 move |_, _| {
5640 let files = files.clone();
5641 let rng = rng.clone();
5642 async move {
5643 let files = files.lock();
5644 let mut rng = rng.lock();
5645 let count = rng.gen_range::<usize, _>(1..3);
5646 let files = (0..count)
5647 .map(|_| files.choose(&mut *rng).unwrap())
5648 .collect::<Vec<_>>();
5649 log::info!("LSP: Returning definitions in files {:?}", &files);
5650 Ok(Some(lsp::GotoDefinitionResponse::Array(
5651 files
5652 .into_iter()
5653 .map(|file| lsp::Location {
5654 uri: lsp::Url::from_file_path(file).unwrap(),
5655 range: Default::default(),
5656 })
5657 .collect(),
5658 )))
5659 }
5660 }
5661 });
5662
5663 fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
5664 let rng = rng.clone();
5665 let project = project.clone();
5666 move |params, mut cx| {
5667 let highlights = if let Some(project) = project.upgrade(&cx) {
5668 project.update(&mut cx, |project, cx| {
5669 let path = params
5670 .text_document_position_params
5671 .text_document
5672 .uri
5673 .to_file_path()
5674 .unwrap();
5675 let (worktree, relative_path) =
5676 project.find_local_worktree(&path, cx)?;
5677 let project_path =
5678 ProjectPath::from((worktree.read(cx).id(), relative_path));
5679 let buffer =
5680 project.get_open_buffer(&project_path, cx)?.read(cx);
5681
5682 let mut highlights = Vec::new();
5683 let highlight_count = rng.lock().gen_range(1..=5);
5684 let mut prev_end = 0;
5685 for _ in 0..highlight_count {
5686 let range =
5687 buffer.random_byte_range(prev_end, &mut *rng.lock());
5688
5689 highlights.push(lsp::DocumentHighlight {
5690 range: range_to_lsp(range.to_point_utf16(buffer)),
5691 kind: Some(lsp::DocumentHighlightKind::READ),
5692 });
5693 prev_end = range.end;
5694 }
5695 Some(highlights)
5696 })
5697 } else {
5698 None
5699 };
5700 async move { Ok(highlights) }
5701 }
5702 });
5703 }
5704 })),
5705 ..Default::default()
5706 });
5707 host_language_registry.add(Arc::new(language));
5708
5709 let op_start_signal = futures::channel::mpsc::unbounded();
5710 user_ids.push(host.current_user_id(&host_cx));
5711 op_start_signals.push(op_start_signal.0);
5712 clients.push(host_cx.foreground().spawn(host.simulate_host(
5713 host_project,
5714 files,
5715 op_start_signal.1,
5716 rng.clone(),
5717 host_cx,
5718 )));
5719
5720 let disconnect_host_at = if rng.lock().gen_bool(0.2) {
5721 rng.lock().gen_range(0..max_operations)
5722 } else {
5723 max_operations
5724 };
5725 let mut available_guests = vec![
5726 "guest-1".to_string(),
5727 "guest-2".to_string(),
5728 "guest-3".to_string(),
5729 "guest-4".to_string(),
5730 ];
5731 let mut operations = 0;
5732 while operations < max_operations {
5733 if operations == disconnect_host_at {
5734 server.disconnect_client(user_ids[0]);
5735 cx.foreground().advance_clock(RECEIVE_TIMEOUT);
5736 drop(op_start_signals);
5737 let mut clients = futures::future::join_all(clients).await;
5738 cx.foreground().run_until_parked();
5739
5740 let (host, mut host_cx, host_err) = clients.remove(0);
5741 if let Some(host_err) = host_err {
5742 log::error!("host error - {}", host_err);
5743 }
5744 host.project
5745 .as_ref()
5746 .unwrap()
5747 .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
5748 for (guest, mut guest_cx, guest_err) in clients {
5749 if let Some(guest_err) = guest_err {
5750 log::error!("{} error - {}", guest.username, guest_err);
5751 }
5752 let contacts = server
5753 .store
5754 .read()
5755 .await
5756 .contacts_for_user(guest.current_user_id(&guest_cx));
5757 assert!(!contacts
5758 .iter()
5759 .flat_map(|contact| &contact.projects)
5760 .any(|project| project.id == host_project_id));
5761 guest
5762 .project
5763 .as_ref()
5764 .unwrap()
5765 .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
5766 guest_cx.update(|_| drop(guest));
5767 }
5768 host_cx.update(|_| drop(host));
5769
5770 return;
5771 }
5772
5773 let distribution = rng.lock().gen_range(0..100);
5774 match distribution {
5775 0..=19 if !available_guests.is_empty() => {
5776 let guest_ix = rng.lock().gen_range(0..available_guests.len());
5777 let guest_username = available_guests.remove(guest_ix);
5778 log::info!("Adding new connection for {}", guest_username);
5779 next_entity_id += 100000;
5780 let mut guest_cx = TestAppContext::new(
5781 cx.foreground_platform(),
5782 cx.platform(),
5783 deterministic.build_foreground(next_entity_id),
5784 deterministic.build_background(),
5785 cx.font_cache(),
5786 cx.leak_detector(),
5787 next_entity_id,
5788 );
5789 let guest = server.create_client(&mut guest_cx, &guest_username).await;
5790 let guest_project = Project::remote(
5791 host_project_id,
5792 guest.client.clone(),
5793 guest.user_store.clone(),
5794 guest_lang_registry.clone(),
5795 FakeFs::new(cx.background()),
5796 &mut guest_cx.to_async(),
5797 )
5798 .await
5799 .unwrap();
5800 let op_start_signal = futures::channel::mpsc::unbounded();
5801 user_ids.push(guest.current_user_id(&guest_cx));
5802 op_start_signals.push(op_start_signal.0);
5803 clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
5804 guest_username.clone(),
5805 guest_project,
5806 op_start_signal.1,
5807 rng.clone(),
5808 guest_cx,
5809 )));
5810
5811 log::info!("Added connection for {}", guest_username);
5812 operations += 1;
5813 }
5814 20..=29 if clients.len() > 1 => {
5815 log::info!("Removing guest");
5816 let guest_ix = rng.lock().gen_range(1..clients.len());
5817 let removed_guest_id = user_ids.remove(guest_ix);
5818 let guest = clients.remove(guest_ix);
5819 op_start_signals.remove(guest_ix);
5820 server.disconnect_client(removed_guest_id);
5821 cx.foreground().advance_clock(RECEIVE_TIMEOUT);
5822 let (guest, mut guest_cx, guest_err) = guest.await;
5823 if let Some(guest_err) = guest_err {
5824 log::error!("{} error - {}", guest.username, guest_err);
5825 }
5826 guest
5827 .project
5828 .as_ref()
5829 .unwrap()
5830 .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
5831 for user_id in &user_ids {
5832 for contact in server.store.read().await.contacts_for_user(*user_id) {
5833 assert_ne!(
5834 contact.user_id, removed_guest_id.0 as u64,
5835 "removed guest is still a contact of another peer"
5836 );
5837 for project in contact.projects {
5838 for project_guest_id in project.guests {
5839 assert_ne!(
5840 project_guest_id, removed_guest_id.0 as u64,
5841 "removed guest appears as still participating on a project"
5842 );
5843 }
5844 }
5845 }
5846 }
5847
5848 log::info!("{} removed", guest.username);
5849 available_guests.push(guest.username.clone());
5850 guest_cx.update(|_| drop(guest));
5851
5852 operations += 1;
5853 }
5854 _ => {
5855 while operations < max_operations && rng.lock().gen_bool(0.7) {
5856 op_start_signals
5857 .choose(&mut *rng.lock())
5858 .unwrap()
5859 .unbounded_send(())
5860 .unwrap();
5861 operations += 1;
5862 }
5863
5864 if rng.lock().gen_bool(0.8) {
5865 cx.foreground().run_until_parked();
5866 }
5867 }
5868 }
5869 }
5870
5871 drop(op_start_signals);
5872 let mut clients = futures::future::join_all(clients).await;
5873 cx.foreground().run_until_parked();
5874
5875 let (host_client, mut host_cx, host_err) = clients.remove(0);
5876 if let Some(host_err) = host_err {
5877 panic!("host error - {}", host_err);
5878 }
5879 let host_project = host_client.project.as_ref().unwrap();
5880 let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
5881 project
5882 .worktrees(cx)
5883 .map(|worktree| {
5884 let snapshot = worktree.read(cx).snapshot();
5885 (snapshot.id(), snapshot)
5886 })
5887 .collect::<BTreeMap<_, _>>()
5888 });
5889
5890 host_client
5891 .project
5892 .as_ref()
5893 .unwrap()
5894 .read_with(&host_cx, |project, cx| project.check_invariants(cx));
5895
5896 for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
5897 if let Some(guest_err) = guest_err {
5898 panic!("{} error - {}", guest_client.username, guest_err);
5899 }
5900 let worktree_snapshots =
5901 guest_client
5902 .project
5903 .as_ref()
5904 .unwrap()
5905 .read_with(&guest_cx, |project, cx| {
5906 project
5907 .worktrees(cx)
5908 .map(|worktree| {
5909 let worktree = worktree.read(cx);
5910 (worktree.id(), worktree.snapshot())
5911 })
5912 .collect::<BTreeMap<_, _>>()
5913 });
5914
5915 assert_eq!(
5916 worktree_snapshots.keys().collect::<Vec<_>>(),
5917 host_worktree_snapshots.keys().collect::<Vec<_>>(),
5918 "{} has different worktrees than the host",
5919 guest_client.username
5920 );
5921 for (id, host_snapshot) in &host_worktree_snapshots {
5922 let guest_snapshot = &worktree_snapshots[id];
5923 assert_eq!(
5924 guest_snapshot.root_name(),
5925 host_snapshot.root_name(),
5926 "{} has different root name than the host for worktree {}",
5927 guest_client.username,
5928 id
5929 );
5930 assert_eq!(
5931 guest_snapshot.entries(false).collect::<Vec<_>>(),
5932 host_snapshot.entries(false).collect::<Vec<_>>(),
5933 "{} has different snapshot than the host for worktree {}",
5934 guest_client.username,
5935 id
5936 );
5937 assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id());
5938 }
5939
5940 guest_client
5941 .project
5942 .as_ref()
5943 .unwrap()
5944 .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
5945
5946 for guest_buffer in &guest_client.buffers {
5947 let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
5948 let host_buffer = host_project.read_with(&host_cx, |project, cx| {
5949 project.buffer_for_id(buffer_id, cx).expect(&format!(
5950 "host does not have buffer for guest:{}, peer:{}, id:{}",
5951 guest_client.username, guest_client.peer_id, buffer_id
5952 ))
5953 });
5954 let path = host_buffer
5955 .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
5956
5957 assert_eq!(
5958 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
5959 0,
5960 "{}, buffer {}, path {:?} has deferred operations",
5961 guest_client.username,
5962 buffer_id,
5963 path,
5964 );
5965 assert_eq!(
5966 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
5967 host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
5968 "{}, buffer {}, path {:?}, differs from the host's buffer",
5969 guest_client.username,
5970 buffer_id,
5971 path
5972 );
5973 }
5974
5975 guest_cx.update(|_| drop(guest_client));
5976 }
5977
5978 host_cx.update(|_| drop(host_client));
5979 }
5980
5981 struct TestServer {
5982 peer: Arc<Peer>,
5983 app_state: Arc<AppState>,
5984 server: Arc<Server>,
5985 foreground: Rc<executor::Foreground>,
5986 notifications: mpsc::UnboundedReceiver<()>,
5987 connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
5988 forbid_connections: Arc<AtomicBool>,
5989 _test_db: TestDb,
5990 }
5991
5992 impl TestServer {
5993 async fn start(
5994 foreground: Rc<executor::Foreground>,
5995 background: Arc<executor::Background>,
5996 ) -> Self {
5997 let test_db = TestDb::fake(background);
5998 let app_state = Self::build_app_state(&test_db).await;
5999 let peer = Peer::new();
6000 let notifications = mpsc::unbounded();
6001 let server = Server::new(app_state.clone(), Some(notifications.0));
6002 Self {
6003 peer,
6004 app_state,
6005 server,
6006 foreground,
6007 notifications: notifications.1,
6008 connection_killers: Default::default(),
6009 forbid_connections: Default::default(),
6010 _test_db: test_db,
6011 }
6012 }
6013
6014 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
6015 cx.update(|cx| {
6016 let settings = Settings::test(cx);
6017 cx.set_global(settings);
6018 });
6019
6020 let http = FakeHttpClient::with_404_response();
6021 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
6022 let client_name = name.to_string();
6023 let mut client = Client::new(http.clone());
6024 let server = self.server.clone();
6025 let connection_killers = self.connection_killers.clone();
6026 let forbid_connections = self.forbid_connections.clone();
6027 let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
6028
6029 Arc::get_mut(&mut client)
6030 .unwrap()
6031 .override_authenticate(move |cx| {
6032 cx.spawn(|_| async move {
6033 let access_token = "the-token".to_string();
6034 Ok(Credentials {
6035 user_id: user_id.0 as u64,
6036 access_token,
6037 })
6038 })
6039 })
6040 .override_establish_connection(move |credentials, cx| {
6041 assert_eq!(credentials.user_id, user_id.0 as u64);
6042 assert_eq!(credentials.access_token, "the-token");
6043
6044 let server = server.clone();
6045 let connection_killers = connection_killers.clone();
6046 let forbid_connections = forbid_connections.clone();
6047 let client_name = client_name.clone();
6048 let connection_id_tx = connection_id_tx.clone();
6049 cx.spawn(move |cx| async move {
6050 if forbid_connections.load(SeqCst) {
6051 Err(EstablishConnectionError::other(anyhow!(
6052 "server is forbidding connections"
6053 )))
6054 } else {
6055 let (client_conn, server_conn, killed) =
6056 Connection::in_memory(cx.background());
6057 connection_killers.lock().insert(user_id, killed);
6058 cx.background()
6059 .spawn(server.handle_connection(
6060 server_conn,
6061 client_name,
6062 user_id,
6063 Some(connection_id_tx),
6064 cx.background(),
6065 ))
6066 .detach();
6067 Ok(client_conn)
6068 }
6069 })
6070 });
6071
6072 client
6073 .authenticate_and_connect(false, &cx.to_async())
6074 .await
6075 .unwrap();
6076
6077 Channel::init(&client);
6078 Project::init(&client);
6079 cx.update(|cx| {
6080 workspace::init(&client, cx);
6081 });
6082
6083 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
6084 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
6085
6086 let client = TestClient {
6087 client,
6088 peer_id,
6089 username: name.to_string(),
6090 user_store,
6091 language_registry: Arc::new(LanguageRegistry::test()),
6092 project: Default::default(),
6093 buffers: Default::default(),
6094 };
6095 client.wait_for_current_user(cx).await;
6096 client
6097 }
6098
6099 fn disconnect_client(&self, user_id: UserId) {
6100 self.connection_killers
6101 .lock()
6102 .remove(&user_id)
6103 .unwrap()
6104 .store(true, SeqCst);
6105 }
6106
6107 fn forbid_connections(&self) {
6108 self.forbid_connections.store(true, SeqCst);
6109 }
6110
6111 fn allow_connections(&self) {
6112 self.forbid_connections.store(false, SeqCst);
6113 }
6114
6115 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
6116 Arc::new(AppState {
6117 db: test_db.db().clone(),
6118 api_token: Default::default(),
6119 })
6120 }
6121
6122 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
6123 self.server.store.read().await
6124 }
6125
6126 async fn condition<F>(&mut self, mut predicate: F)
6127 where
6128 F: FnMut(&Store) -> bool,
6129 {
6130 assert!(
6131 self.foreground.parking_forbidden(),
6132 "you must call forbid_parking to use server conditions so we don't block indefinitely"
6133 );
6134 while !(predicate)(&*self.server.store.read().await) {
6135 self.foreground.start_waiting();
6136 self.notifications.next().await;
6137 self.foreground.finish_waiting();
6138 }
6139 }
6140 }
6141
6142 impl Deref for TestServer {
6143 type Target = Server;
6144
6145 fn deref(&self) -> &Self::Target {
6146 &self.server
6147 }
6148 }
6149
6150 impl Drop for TestServer {
6151 fn drop(&mut self) {
6152 self.peer.reset();
6153 }
6154 }
6155
6156 struct TestClient {
6157 client: Arc<Client>,
6158 username: String,
6159 pub peer_id: PeerId,
6160 pub user_store: ModelHandle<UserStore>,
6161 language_registry: Arc<LanguageRegistry>,
6162 project: Option<ModelHandle<Project>>,
6163 buffers: HashSet<ModelHandle<language::Buffer>>,
6164 }
6165
6166 impl Deref for TestClient {
6167 type Target = Arc<Client>;
6168
6169 fn deref(&self) -> &Self::Target {
6170 &self.client
6171 }
6172 }
6173
6174 impl TestClient {
6175 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
6176 UserId::from_proto(
6177 self.user_store
6178 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
6179 )
6180 }
6181
6182 async fn wait_for_current_user(&self, cx: &TestAppContext) {
6183 let mut authed_user = self
6184 .user_store
6185 .read_with(cx, |user_store, _| user_store.watch_current_user());
6186 while authed_user.next().await.unwrap().is_none() {}
6187 }
6188
6189 async fn build_local_project(
6190 &mut self,
6191 fs: Arc<FakeFs>,
6192 root_path: impl AsRef<Path>,
6193 cx: &mut TestAppContext,
6194 ) -> (ModelHandle<Project>, WorktreeId) {
6195 let project = cx.update(|cx| {
6196 Project::local(
6197 self.client.clone(),
6198 self.user_store.clone(),
6199 self.language_registry.clone(),
6200 fs,
6201 cx,
6202 )
6203 });
6204 self.project = Some(project.clone());
6205 let (worktree, _) = project
6206 .update(cx, |p, cx| {
6207 p.find_or_create_local_worktree(root_path, true, cx)
6208 })
6209 .await
6210 .unwrap();
6211 worktree
6212 .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
6213 .await;
6214 project
6215 .update(cx, |project, _| project.next_remote_id())
6216 .await;
6217 (project, worktree.read_with(cx, |tree, _| tree.id()))
6218 }
6219
6220 async fn build_remote_project(
6221 &mut self,
6222 project_id: u64,
6223 cx: &mut TestAppContext,
6224 ) -> ModelHandle<Project> {
6225 let project = Project::remote(
6226 project_id,
6227 self.client.clone(),
6228 self.user_store.clone(),
6229 self.language_registry.clone(),
6230 FakeFs::new(cx.background()),
6231 &mut cx.to_async(),
6232 )
6233 .await
6234 .unwrap();
6235 self.project = Some(project.clone());
6236 project
6237 }
6238
6239 fn build_workspace(
6240 &self,
6241 project: &ModelHandle<Project>,
6242 cx: &mut TestAppContext,
6243 ) -> ViewHandle<Workspace> {
6244 let (window_id, _) = cx.add_window(|_| EmptyView);
6245 cx.add_view(window_id, |cx| {
6246 let fs = project.read(cx).fs().clone();
6247 Workspace::new(
6248 &WorkspaceParams {
6249 fs,
6250 project: project.clone(),
6251 user_store: self.user_store.clone(),
6252 languages: self.language_registry.clone(),
6253 themes: ThemeRegistry::new((), cx.font_cache().clone()),
6254 channel_list: cx.add_model(|cx| {
6255 ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
6256 }),
6257 client: self.client.clone(),
6258 },
6259 cx,
6260 )
6261 })
6262 }
6263
6264 async fn simulate_host(
6265 mut self,
6266 project: ModelHandle<Project>,
6267 files: Arc<Mutex<Vec<PathBuf>>>,
6268 op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6269 rng: Arc<Mutex<StdRng>>,
6270 mut cx: TestAppContext,
6271 ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6272 async fn simulate_host_internal(
6273 client: &mut TestClient,
6274 project: ModelHandle<Project>,
6275 files: Arc<Mutex<Vec<PathBuf>>>,
6276 mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6277 rng: Arc<Mutex<StdRng>>,
6278 cx: &mut TestAppContext,
6279 ) -> anyhow::Result<()> {
6280 let fs = project.read_with(cx, |project, _| project.fs().clone());
6281
6282 while op_start_signal.next().await.is_some() {
6283 let distribution = rng.lock().gen_range::<usize, _>(0..100);
6284 match distribution {
6285 0..=20 if !files.lock().is_empty() => {
6286 let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
6287 let mut path = path.as_path();
6288 while let Some(parent_path) = path.parent() {
6289 path = parent_path;
6290 if rng.lock().gen() {
6291 break;
6292 }
6293 }
6294
6295 log::info!("Host: find/create local worktree {:?}", path);
6296 let find_or_create_worktree = project.update(cx, |project, cx| {
6297 project.find_or_create_local_worktree(path, true, cx)
6298 });
6299 if rng.lock().gen() {
6300 cx.background().spawn(find_or_create_worktree).detach();
6301 } else {
6302 find_or_create_worktree.await?;
6303 }
6304 }
6305 10..=80 if !files.lock().is_empty() => {
6306 let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6307 let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
6308 let (worktree, path) = project
6309 .update(cx, |project, cx| {
6310 project.find_or_create_local_worktree(
6311 file.clone(),
6312 true,
6313 cx,
6314 )
6315 })
6316 .await?;
6317 let project_path =
6318 worktree.read_with(cx, |worktree, _| (worktree.id(), path));
6319 log::info!(
6320 "Host: opening path {:?}, worktree {}, relative_path {:?}",
6321 file,
6322 project_path.0,
6323 project_path.1
6324 );
6325 let buffer = project
6326 .update(cx, |project, cx| project.open_buffer(project_path, cx))
6327 .await
6328 .unwrap();
6329 client.buffers.insert(buffer.clone());
6330 buffer
6331 } else {
6332 client
6333 .buffers
6334 .iter()
6335 .choose(&mut *rng.lock())
6336 .unwrap()
6337 .clone()
6338 };
6339
6340 if rng.lock().gen_bool(0.1) {
6341 cx.update(|cx| {
6342 log::info!(
6343 "Host: dropping buffer {:?}",
6344 buffer.read(cx).file().unwrap().full_path(cx)
6345 );
6346 client.buffers.remove(&buffer);
6347 drop(buffer);
6348 });
6349 } else {
6350 buffer.update(cx, |buffer, cx| {
6351 log::info!(
6352 "Host: updating buffer {:?} ({})",
6353 buffer.file().unwrap().full_path(cx),
6354 buffer.remote_id()
6355 );
6356
6357 if rng.lock().gen_bool(0.7) {
6358 buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6359 } else {
6360 buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6361 }
6362 });
6363 }
6364 }
6365 _ => loop {
6366 let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
6367 let mut path = PathBuf::new();
6368 path.push("/");
6369 for _ in 0..path_component_count {
6370 let letter = rng.lock().gen_range(b'a'..=b'z');
6371 path.push(std::str::from_utf8(&[letter]).unwrap());
6372 }
6373 path.set_extension("rs");
6374 let parent_path = path.parent().unwrap();
6375
6376 log::info!("Host: creating file {:?}", path,);
6377
6378 if fs.create_dir(&parent_path).await.is_ok()
6379 && fs.create_file(&path, Default::default()).await.is_ok()
6380 {
6381 files.lock().push(path);
6382 break;
6383 } else {
6384 log::info!("Host: cannot create file");
6385 }
6386 },
6387 }
6388
6389 cx.background().simulate_random_delay().await;
6390 }
6391
6392 Ok(())
6393 }
6394
6395 let result = simulate_host_internal(
6396 &mut self,
6397 project.clone(),
6398 files,
6399 op_start_signal,
6400 rng,
6401 &mut cx,
6402 )
6403 .await;
6404 log::info!("Host done");
6405 self.project = Some(project);
6406 (self, cx, result.err())
6407 }
6408
6409 pub async fn simulate_guest(
6410 mut self,
6411 guest_username: String,
6412 project: ModelHandle<Project>,
6413 op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6414 rng: Arc<Mutex<StdRng>>,
6415 mut cx: TestAppContext,
6416 ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6417 async fn simulate_guest_internal(
6418 client: &mut TestClient,
6419 guest_username: &str,
6420 project: ModelHandle<Project>,
6421 mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6422 rng: Arc<Mutex<StdRng>>,
6423 cx: &mut TestAppContext,
6424 ) -> anyhow::Result<()> {
6425 while op_start_signal.next().await.is_some() {
6426 let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6427 let worktree = if let Some(worktree) =
6428 project.read_with(cx, |project, cx| {
6429 project
6430 .worktrees(&cx)
6431 .filter(|worktree| {
6432 let worktree = worktree.read(cx);
6433 worktree.is_visible()
6434 && worktree.entries(false).any(|e| e.is_file())
6435 })
6436 .choose(&mut *rng.lock())
6437 }) {
6438 worktree
6439 } else {
6440 cx.background().simulate_random_delay().await;
6441 continue;
6442 };
6443
6444 let (worktree_root_name, project_path) =
6445 worktree.read_with(cx, |worktree, _| {
6446 let entry = worktree
6447 .entries(false)
6448 .filter(|e| e.is_file())
6449 .choose(&mut *rng.lock())
6450 .unwrap();
6451 (
6452 worktree.root_name().to_string(),
6453 (worktree.id(), entry.path.clone()),
6454 )
6455 });
6456 log::info!(
6457 "{}: opening path {:?} in worktree {} ({})",
6458 guest_username,
6459 project_path.1,
6460 project_path.0,
6461 worktree_root_name,
6462 );
6463 let buffer = project
6464 .update(cx, |project, cx| {
6465 project.open_buffer(project_path.clone(), cx)
6466 })
6467 .await?;
6468 log::info!(
6469 "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
6470 guest_username,
6471 project_path.1,
6472 project_path.0,
6473 worktree_root_name,
6474 buffer.read_with(cx, |buffer, _| buffer.remote_id())
6475 );
6476 client.buffers.insert(buffer.clone());
6477 buffer
6478 } else {
6479 client
6480 .buffers
6481 .iter()
6482 .choose(&mut *rng.lock())
6483 .unwrap()
6484 .clone()
6485 };
6486
6487 let choice = rng.lock().gen_range(0..100);
6488 match choice {
6489 0..=9 => {
6490 cx.update(|cx| {
6491 log::info!(
6492 "{}: dropping buffer {:?}",
6493 guest_username,
6494 buffer.read(cx).file().unwrap().full_path(cx)
6495 );
6496 client.buffers.remove(&buffer);
6497 drop(buffer);
6498 });
6499 }
6500 10..=19 => {
6501 let completions = project.update(cx, |project, cx| {
6502 log::info!(
6503 "{}: requesting completions for buffer {} ({:?})",
6504 guest_username,
6505 buffer.read(cx).remote_id(),
6506 buffer.read(cx).file().unwrap().full_path(cx)
6507 );
6508 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6509 project.completions(&buffer, offset, cx)
6510 });
6511 let completions = cx.background().spawn(async move {
6512 completions
6513 .await
6514 .map_err(|err| anyhow!("completions request failed: {:?}", err))
6515 });
6516 if rng.lock().gen_bool(0.3) {
6517 log::info!("{}: detaching completions request", guest_username);
6518 cx.update(|cx| completions.detach_and_log_err(cx));
6519 } else {
6520 completions.await?;
6521 }
6522 }
6523 20..=29 => {
6524 let code_actions = project.update(cx, |project, cx| {
6525 log::info!(
6526 "{}: requesting code actions for buffer {} ({:?})",
6527 guest_username,
6528 buffer.read(cx).remote_id(),
6529 buffer.read(cx).file().unwrap().full_path(cx)
6530 );
6531 let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
6532 project.code_actions(&buffer, range, cx)
6533 });
6534 let code_actions = cx.background().spawn(async move {
6535 code_actions.await.map_err(|err| {
6536 anyhow!("code actions request failed: {:?}", err)
6537 })
6538 });
6539 if rng.lock().gen_bool(0.3) {
6540 log::info!("{}: detaching code actions request", guest_username);
6541 cx.update(|cx| code_actions.detach_and_log_err(cx));
6542 } else {
6543 code_actions.await?;
6544 }
6545 }
6546 30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
6547 let (requested_version, save) = buffer.update(cx, |buffer, cx| {
6548 log::info!(
6549 "{}: saving buffer {} ({:?})",
6550 guest_username,
6551 buffer.remote_id(),
6552 buffer.file().unwrap().full_path(cx)
6553 );
6554 (buffer.version(), buffer.save(cx))
6555 });
6556 let save = cx.background().spawn(async move {
6557 let (saved_version, _) = save
6558 .await
6559 .map_err(|err| anyhow!("save request failed: {:?}", err))?;
6560 assert!(saved_version.observed_all(&requested_version));
6561 Ok::<_, anyhow::Error>(())
6562 });
6563 if rng.lock().gen_bool(0.3) {
6564 log::info!("{}: detaching save request", guest_username);
6565 cx.update(|cx| save.detach_and_log_err(cx));
6566 } else {
6567 save.await?;
6568 }
6569 }
6570 40..=44 => {
6571 let prepare_rename = project.update(cx, |project, cx| {
6572 log::info!(
6573 "{}: preparing rename for buffer {} ({:?})",
6574 guest_username,
6575 buffer.read(cx).remote_id(),
6576 buffer.read(cx).file().unwrap().full_path(cx)
6577 );
6578 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6579 project.prepare_rename(buffer, offset, cx)
6580 });
6581 let prepare_rename = cx.background().spawn(async move {
6582 prepare_rename.await.map_err(|err| {
6583 anyhow!("prepare rename request failed: {:?}", err)
6584 })
6585 });
6586 if rng.lock().gen_bool(0.3) {
6587 log::info!("{}: detaching prepare rename request", guest_username);
6588 cx.update(|cx| prepare_rename.detach_and_log_err(cx));
6589 } else {
6590 prepare_rename.await?;
6591 }
6592 }
6593 45..=49 => {
6594 let definitions = project.update(cx, |project, cx| {
6595 log::info!(
6596 "{}: requesting definitions for buffer {} ({:?})",
6597 guest_username,
6598 buffer.read(cx).remote_id(),
6599 buffer.read(cx).file().unwrap().full_path(cx)
6600 );
6601 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6602 project.definition(&buffer, offset, cx)
6603 });
6604 let definitions = cx.background().spawn(async move {
6605 definitions
6606 .await
6607 .map_err(|err| anyhow!("definitions request failed: {:?}", err))
6608 });
6609 if rng.lock().gen_bool(0.3) {
6610 log::info!("{}: detaching definitions request", guest_username);
6611 cx.update(|cx| definitions.detach_and_log_err(cx));
6612 } else {
6613 client
6614 .buffers
6615 .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
6616 }
6617 }
6618 50..=54 => {
6619 let highlights = project.update(cx, |project, cx| {
6620 log::info!(
6621 "{}: requesting highlights for buffer {} ({:?})",
6622 guest_username,
6623 buffer.read(cx).remote_id(),
6624 buffer.read(cx).file().unwrap().full_path(cx)
6625 );
6626 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6627 project.document_highlights(&buffer, offset, cx)
6628 });
6629 let highlights = cx.background().spawn(async move {
6630 highlights
6631 .await
6632 .map_err(|err| anyhow!("highlights request failed: {:?}", err))
6633 });
6634 if rng.lock().gen_bool(0.3) {
6635 log::info!("{}: detaching highlights request", guest_username);
6636 cx.update(|cx| highlights.detach_and_log_err(cx));
6637 } else {
6638 highlights.await?;
6639 }
6640 }
6641 55..=59 => {
6642 let search = project.update(cx, |project, cx| {
6643 let query = rng.lock().gen_range('a'..='z');
6644 log::info!("{}: project-wide search {:?}", guest_username, query);
6645 project.search(SearchQuery::text(query, false, false), cx)
6646 });
6647 let search = cx.background().spawn(async move {
6648 search
6649 .await
6650 .map_err(|err| anyhow!("search request failed: {:?}", err))
6651 });
6652 if rng.lock().gen_bool(0.3) {
6653 log::info!("{}: detaching search request", guest_username);
6654 cx.update(|cx| search.detach_and_log_err(cx));
6655 } else {
6656 client.buffers.extend(search.await?.into_keys());
6657 }
6658 }
6659 60..=69 => {
6660 let worktree = project
6661 .read_with(cx, |project, cx| {
6662 project
6663 .worktrees(&cx)
6664 .filter(|worktree| {
6665 let worktree = worktree.read(cx);
6666 worktree.is_visible()
6667 && worktree.entries(false).any(|e| e.is_file())
6668 && worktree
6669 .root_entry()
6670 .map_or(false, |e| e.is_dir())
6671 })
6672 .choose(&mut *rng.lock())
6673 })
6674 .unwrap();
6675 let (worktree_id, worktree_root_name) = worktree
6676 .read_with(cx, |worktree, _| {
6677 (worktree.id(), worktree.root_name().to_string())
6678 });
6679
6680 let mut new_name = String::new();
6681 for _ in 0..10 {
6682 let letter = rng.lock().gen_range('a'..='z');
6683 new_name.push(letter);
6684 }
6685 let mut new_path = PathBuf::new();
6686 new_path.push(new_name);
6687 new_path.set_extension("rs");
6688 log::info!(
6689 "{}: creating {:?} in worktree {} ({})",
6690 guest_username,
6691 new_path,
6692 worktree_id,
6693 worktree_root_name,
6694 );
6695 project
6696 .update(cx, |project, cx| {
6697 project.create_entry((worktree_id, new_path), false, cx)
6698 })
6699 .unwrap()
6700 .await?;
6701 }
6702 _ => {
6703 buffer.update(cx, |buffer, cx| {
6704 log::info!(
6705 "{}: updating buffer {} ({:?})",
6706 guest_username,
6707 buffer.remote_id(),
6708 buffer.file().unwrap().full_path(cx)
6709 );
6710 if rng.lock().gen_bool(0.7) {
6711 buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6712 } else {
6713 buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6714 }
6715 });
6716 }
6717 }
6718 cx.background().simulate_random_delay().await;
6719 }
6720 Ok(())
6721 }
6722
6723 let result = simulate_guest_internal(
6724 &mut self,
6725 &guest_username,
6726 project.clone(),
6727 op_start_signal,
6728 rng,
6729 &mut cx,
6730 )
6731 .await;
6732 log::info!("{}: done", guest_username);
6733
6734 self.project = Some(project);
6735 (self, cx, result.err())
6736 }
6737 }
6738
6739 impl Drop for TestClient {
6740 fn drop(&mut self) {
6741 self.client.tear_down();
6742 }
6743 }
6744
6745 impl Executor for Arc<gpui::executor::Background> {
6746 type Sleep = gpui::executor::Timer;
6747
6748 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
6749 self.spawn(future).detach();
6750 }
6751
6752 fn sleep(&self, duration: Duration) -> Self::Sleep {
6753 self.as_ref().timer(duration)
6754 }
6755 }
6756
6757 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
6758 channel
6759 .messages()
6760 .cursor::<()>()
6761 .map(|m| {
6762 (
6763 m.sender.github_login.clone(),
6764 m.body.clone(),
6765 m.is_pending(),
6766 )
6767 })
6768 .collect()
6769 }
6770
6771 struct EmptyView;
6772
6773 impl gpui::Entity for EmptyView {
6774 type Event = ();
6775 }
6776
6777 impl gpui::View for EmptyView {
6778 fn ui_name() -> &'static str {
6779 "empty view"
6780 }
6781
6782 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
6783 gpui::Element::boxed(gpui::elements::Empty)
6784 }
6785 }
6786}