1mod store;
2
3use crate::{
4 auth,
5 db::{ChannelId, MessageId, UserId},
6 AppState, Result,
7};
8use anyhow::anyhow;
9use async_tungstenite::tungstenite::{
10 protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
11};
12use axum::{
13 body::Body,
14 extract::{
15 ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
16 ConnectInfo, WebSocketUpgrade,
17 },
18 headers::{Header, HeaderName},
19 http::StatusCode,
20 middleware,
21 response::{IntoResponse, Response},
22 routing::get,
23 Extension, Router, TypedHeader,
24};
25use collections::{HashMap, HashSet};
26use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
27use lazy_static::lazy_static;
28use log::{as_debug, as_display};
29use rpc::{
30 proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
31 Connection, ConnectionId, Peer, TypedEnvelope,
32};
33use std::{
34 any::TypeId,
35 future::Future,
36 marker::PhantomData,
37 net::SocketAddr,
38 ops::{Deref, DerefMut},
39 rc::Rc,
40 sync::Arc,
41 time::{Duration, Instant},
42};
43use store::{Store, Worktree};
44use time::OffsetDateTime;
45use tokio::{
46 sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
47 time::Sleep,
48};
49use tower::ServiceBuilder;
50use tracing::{info_span, Instrument};
51use util::ResultExt;
52
53type MessageHandler = Box<
54 dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, Result<()>>,
55>;
56
57pub struct Server {
58 peer: Arc<Peer>,
59 store: RwLock<Store>,
60 app_state: Arc<AppState>,
61 handlers: HashMap<TypeId, MessageHandler>,
62 notifications: Option<mpsc::UnboundedSender<()>>,
63}
64
65pub trait Executor: Send + Clone {
66 type Sleep: Send + Future;
67 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
68 fn sleep(&self, duration: Duration) -> Self::Sleep;
69}
70
71#[derive(Clone)]
72pub struct RealExecutor;
73
74const MESSAGE_COUNT_PER_PAGE: usize = 100;
75const MAX_MESSAGE_LEN: usize = 1024;
76
77struct StoreReadGuard<'a> {
78 guard: RwLockReadGuard<'a, Store>,
79 _not_send: PhantomData<Rc<()>>,
80}
81
82struct StoreWriteGuard<'a> {
83 guard: RwLockWriteGuard<'a, Store>,
84 _not_send: PhantomData<Rc<()>>,
85}
86
87impl Server {
88 pub fn new(
89 app_state: Arc<AppState>,
90 notifications: Option<mpsc::UnboundedSender<()>>,
91 ) -> Arc<Self> {
92 let mut server = Self {
93 peer: Peer::new(),
94 app_state,
95 store: Default::default(),
96 handlers: Default::default(),
97 notifications,
98 };
99
100 server
101 .add_request_handler(Server::ping)
102 .add_request_handler(Server::register_project)
103 .add_message_handler(Server::unregister_project)
104 .add_request_handler(Server::share_project)
105 .add_message_handler(Server::unshare_project)
106 .add_sync_request_handler(Server::join_project)
107 .add_message_handler(Server::leave_project)
108 .add_request_handler(Server::register_worktree)
109 .add_message_handler(Server::unregister_worktree)
110 .add_request_handler(Server::update_worktree)
111 .add_message_handler(Server::start_language_server)
112 .add_message_handler(Server::update_language_server)
113 .add_message_handler(Server::update_diagnostic_summary)
114 .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
115 .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
116 .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
117 .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
118 .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
119 .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
120 .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
121 .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
122 .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
123 .add_request_handler(
124 Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
125 )
126 .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
127 .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
128 .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
129 .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
130 .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
131 .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
132 .add_request_handler(Server::update_buffer)
133 .add_message_handler(Server::update_buffer_file)
134 .add_message_handler(Server::buffer_reloaded)
135 .add_message_handler(Server::buffer_saved)
136 .add_request_handler(Server::save_buffer)
137 .add_request_handler(Server::get_channels)
138 .add_request_handler(Server::get_users)
139 .add_request_handler(Server::join_channel)
140 .add_message_handler(Server::leave_channel)
141 .add_request_handler(Server::send_channel_message)
142 .add_request_handler(Server::follow)
143 .add_message_handler(Server::unfollow)
144 .add_message_handler(Server::update_followers)
145 .add_request_handler(Server::get_channel_messages);
146
147 Arc::new(server)
148 }
149
150 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
151 where
152 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
153 Fut: 'static + Send + Future<Output = Result<()>>,
154 M: EnvelopedMessage,
155 {
156 let prev_handler = self.handlers.insert(
157 TypeId::of::<M>(),
158 Box::new(move |server, envelope| {
159 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
160 let span = info_span!(
161 "handle message",
162 payload_type = envelope.payload_type_name()
163 );
164 (handler)(server, *envelope).instrument(span).boxed()
165 }),
166 );
167 if prev_handler.is_some() {
168 panic!("registered a handler for the same message twice");
169 }
170 self
171 }
172
173 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
174 where
175 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
176 Fut: 'static + Send + Future<Output = Result<M::Response>>,
177 M: RequestMessage,
178 {
179 self.add_message_handler(move |server, envelope| {
180 let receipt = envelope.receipt();
181 let response = (handler)(server.clone(), envelope);
182 async move {
183 match response.await {
184 Ok(response) => {
185 server.peer.respond(receipt, response)?;
186 Ok(())
187 }
188 Err(error) => {
189 server.peer.respond_with_error(
190 receipt,
191 proto::Error {
192 message: error.to_string(),
193 },
194 )?;
195 Err(error)
196 }
197 }
198 }
199 })
200 }
201
202 /// Handle a request while holding a lock to the store. This is useful when we're registering
203 /// a connection but we want to respond on the connection before anybody else can send on it.
204 fn add_sync_request_handler<F, M>(&mut self, handler: F) -> &mut Self
205 where
206 F: 'static
207 + Send
208 + Sync
209 + Fn(Arc<Self>, &mut Store, TypedEnvelope<M>) -> Result<M::Response>,
210 M: RequestMessage,
211 {
212 let handler = Arc::new(handler);
213 self.add_message_handler(move |server, envelope| {
214 let receipt = envelope.receipt();
215 let handler = handler.clone();
216 async move {
217 let mut store = server.store.write().await;
218 let response = (handler)(server.clone(), &mut *store, envelope);
219 match response {
220 Ok(response) => {
221 server.peer.respond(receipt, response)?;
222 Ok(())
223 }
224 Err(error) => {
225 server.peer.respond_with_error(
226 receipt,
227 proto::Error {
228 message: error.to_string(),
229 },
230 )?;
231 Err(error)
232 }
233 }
234 }
235 })
236 }
237
238 pub fn handle_connection<E: Executor>(
239 self: &Arc<Self>,
240 connection: Connection,
241 addr: String,
242 user_id: UserId,
243 mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
244 executor: E,
245 ) -> impl Future<Output = ()> {
246 let mut this = self.clone();
247 async move {
248 let (connection_id, handle_io, mut incoming_rx) = this
249 .peer
250 .add_connection(connection, {
251 let executor = executor.clone();
252 move |duration| {
253 let timer = executor.sleep(duration);
254 async move {
255 timer.await;
256 }
257 }
258 })
259 .await;
260
261 if let Some(send_connection_id) = send_connection_id.as_mut() {
262 let _ = send_connection_id.send(connection_id).await;
263 }
264
265 {
266 let mut state = this.state_mut().await;
267 state.add_connection(connection_id, user_id);
268 this.update_contacts_for_users(&*state, &[user_id]);
269 }
270
271 let handle_io = handle_io.fuse();
272 futures::pin_mut!(handle_io);
273 loop {
274 let next_message = incoming_rx.next().fuse();
275 futures::pin_mut!(next_message);
276 futures::select_biased! {
277 result = handle_io => {
278 if let Err(err) = result {
279 log::error!("error handling rpc connection {:?} - {:?}", addr, err);
280 }
281 break;
282 }
283 message = next_message => {
284 if let Some(message) = message {
285 let start_time = Instant::now();
286 let type_name = message.payload_type_name();
287 log::info!(connection_id = connection_id.0, type_name = type_name; "rpc message received");
288 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
289 let notifications = this.notifications.clone();
290 let is_background = message.is_background();
291 let handle_message = (handler)(this.clone(), message);
292 let handle_message = async move {
293 if let Err(err) = handle_message.await {
294 log::error!(connection_id = connection_id.0, type = type_name, error = as_display!(err); "rpc message error");
295 } else {
296 log::info!(connection_id = connection_id.0, type = type_name, duration = as_debug!(start_time.elapsed()); "rpc message handled");
297 }
298 if let Some(mut notifications) = notifications {
299 let _ = notifications.send(()).await;
300 }
301 };
302 if is_background {
303 executor.spawn_detached(handle_message);
304 } else {
305 handle_message.await;
306 }
307 } else {
308 log::warn!("unhandled message: {}", type_name);
309 }
310 } else {
311 log::info!(address = as_debug!(addr); "rpc connection closed");
312 break;
313 }
314 }
315 }
316 }
317
318 if let Err(err) = this.sign_out(connection_id).await {
319 log::error!("error signing out connection {:?} - {:?}", addr, err);
320 }
321 }
322 }
323
324 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
325 self.peer.disconnect(connection_id);
326 let mut state = self.state_mut().await;
327 let removed_connection = state.remove_connection(connection_id)?;
328
329 for (project_id, project) in removed_connection.hosted_projects {
330 if let Some(share) = project.share {
331 broadcast(
332 connection_id,
333 share.guests.keys().copied().collect(),
334 |conn_id| {
335 self.peer
336 .send(conn_id, proto::UnshareProject { project_id })
337 },
338 );
339 }
340 }
341
342 for (project_id, peer_ids) in removed_connection.guest_project_ids {
343 broadcast(connection_id, peer_ids, |conn_id| {
344 self.peer.send(
345 conn_id,
346 proto::RemoveProjectCollaborator {
347 project_id,
348 peer_id: connection_id.0,
349 },
350 )
351 });
352 }
353
354 self.update_contacts_for_users(&*state, removed_connection.contact_ids.iter());
355 Ok(())
356 }
357
358 async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> Result<proto::Ack> {
359 Ok(proto::Ack {})
360 }
361
362 async fn register_project(
363 self: Arc<Server>,
364 request: TypedEnvelope<proto::RegisterProject>,
365 ) -> Result<proto::RegisterProjectResponse> {
366 let project_id = {
367 let mut state = self.state_mut().await;
368 let user_id = state.user_id_for_connection(request.sender_id)?;
369 state.register_project(request.sender_id, user_id)
370 };
371 Ok(proto::RegisterProjectResponse { project_id })
372 }
373
374 async fn unregister_project(
375 self: Arc<Server>,
376 request: TypedEnvelope<proto::UnregisterProject>,
377 ) -> Result<()> {
378 let mut state = self.state_mut().await;
379 let project = state.unregister_project(request.payload.project_id, request.sender_id)?;
380 self.update_contacts_for_users(&*state, &project.authorized_user_ids());
381 Ok(())
382 }
383
384 async fn share_project(
385 self: Arc<Server>,
386 request: TypedEnvelope<proto::ShareProject>,
387 ) -> Result<proto::Ack> {
388 let mut state = self.state_mut().await;
389 let project = state.share_project(request.payload.project_id, request.sender_id)?;
390 self.update_contacts_for_users(&mut *state, &project.authorized_user_ids);
391 Ok(proto::Ack {})
392 }
393
394 async fn unshare_project(
395 self: Arc<Server>,
396 request: TypedEnvelope<proto::UnshareProject>,
397 ) -> Result<()> {
398 let project_id = request.payload.project_id;
399 let mut state = self.state_mut().await;
400 let project = state.unshare_project(project_id, request.sender_id)?;
401 broadcast(request.sender_id, project.connection_ids, |conn_id| {
402 self.peer
403 .send(conn_id, proto::UnshareProject { project_id })
404 });
405 self.update_contacts_for_users(&mut *state, &project.authorized_user_ids);
406 Ok(())
407 }
408
409 fn join_project(
410 self: Arc<Server>,
411 state: &mut Store,
412 request: TypedEnvelope<proto::JoinProject>,
413 ) -> Result<proto::JoinProjectResponse> {
414 let project_id = request.payload.project_id;
415
416 let user_id = state.user_id_for_connection(request.sender_id)?;
417 let (response, connection_ids, contact_user_ids) = state
418 .join_project(request.sender_id, user_id, project_id)
419 .and_then(|joined| {
420 let share = joined.project.share()?;
421 let peer_count = share.guests.len();
422 let mut collaborators = Vec::with_capacity(peer_count);
423 collaborators.push(proto::Collaborator {
424 peer_id: joined.project.host_connection_id.0,
425 replica_id: 0,
426 user_id: joined.project.host_user_id.to_proto(),
427 });
428 let worktrees = share
429 .worktrees
430 .iter()
431 .filter_map(|(id, shared_worktree)| {
432 let worktree = joined.project.worktrees.get(&id)?;
433 Some(proto::Worktree {
434 id: *id,
435 root_name: worktree.root_name.clone(),
436 entries: shared_worktree.entries.values().cloned().collect(),
437 diagnostic_summaries: shared_worktree
438 .diagnostic_summaries
439 .values()
440 .cloned()
441 .collect(),
442 visible: worktree.visible,
443 })
444 })
445 .collect();
446 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
447 if *peer_conn_id != request.sender_id {
448 collaborators.push(proto::Collaborator {
449 peer_id: peer_conn_id.0,
450 replica_id: *peer_replica_id as u32,
451 user_id: peer_user_id.to_proto(),
452 });
453 }
454 }
455 let response = proto::JoinProjectResponse {
456 worktrees,
457 replica_id: joined.replica_id as u32,
458 collaborators,
459 language_servers: joined.project.language_servers.clone(),
460 };
461 let connection_ids = joined.project.connection_ids();
462 let contact_user_ids = joined.project.authorized_user_ids();
463 Ok((response, connection_ids, contact_user_ids))
464 })?;
465
466 broadcast(request.sender_id, connection_ids, |conn_id| {
467 self.peer.send(
468 conn_id,
469 proto::AddProjectCollaborator {
470 project_id,
471 collaborator: Some(proto::Collaborator {
472 peer_id: request.sender_id.0,
473 replica_id: response.replica_id,
474 user_id: user_id.to_proto(),
475 }),
476 },
477 )
478 });
479 self.update_contacts_for_users(state, &contact_user_ids);
480 Ok(response)
481 }
482
483 async fn leave_project(
484 self: Arc<Server>,
485 request: TypedEnvelope<proto::LeaveProject>,
486 ) -> Result<()> {
487 let sender_id = request.sender_id;
488 let project_id = request.payload.project_id;
489 let mut state = self.state_mut().await;
490 let worktree = state.leave_project(sender_id, project_id)?;
491 broadcast(sender_id, worktree.connection_ids, |conn_id| {
492 self.peer.send(
493 conn_id,
494 proto::RemoveProjectCollaborator {
495 project_id,
496 peer_id: sender_id.0,
497 },
498 )
499 });
500 self.update_contacts_for_users(&*state, &worktree.authorized_user_ids);
501 Ok(())
502 }
503
504 async fn register_worktree(
505 self: Arc<Server>,
506 request: TypedEnvelope<proto::RegisterWorktree>,
507 ) -> Result<proto::Ack> {
508 let mut contact_user_ids = HashSet::default();
509 for github_login in &request.payload.authorized_logins {
510 let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
511 contact_user_ids.insert(contact_user_id);
512 }
513
514 let mut state = self.state_mut().await;
515 let host_user_id = state.user_id_for_connection(request.sender_id)?;
516 contact_user_ids.insert(host_user_id);
517
518 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
519 let guest_connection_ids = state
520 .read_project(request.payload.project_id, request.sender_id)?
521 .guest_connection_ids();
522 state.register_worktree(
523 request.payload.project_id,
524 request.payload.worktree_id,
525 request.sender_id,
526 Worktree {
527 authorized_user_ids: contact_user_ids.clone(),
528 root_name: request.payload.root_name.clone(),
529 visible: request.payload.visible,
530 },
531 )?;
532
533 broadcast(request.sender_id, guest_connection_ids, |connection_id| {
534 self.peer
535 .forward_send(request.sender_id, connection_id, request.payload.clone())
536 });
537 self.update_contacts_for_users(&*state, &contact_user_ids);
538 Ok(proto::Ack {})
539 }
540
541 async fn unregister_worktree(
542 self: Arc<Server>,
543 request: TypedEnvelope<proto::UnregisterWorktree>,
544 ) -> Result<()> {
545 let project_id = request.payload.project_id;
546 let worktree_id = request.payload.worktree_id;
547 let mut state = self.state_mut().await;
548 let (worktree, guest_connection_ids) =
549 state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
550 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
551 self.peer.send(
552 conn_id,
553 proto::UnregisterWorktree {
554 project_id,
555 worktree_id,
556 },
557 )
558 });
559 self.update_contacts_for_users(&*state, &worktree.authorized_user_ids);
560 Ok(())
561 }
562
563 async fn update_worktree(
564 self: Arc<Server>,
565 request: TypedEnvelope<proto::UpdateWorktree>,
566 ) -> Result<proto::Ack> {
567 let connection_ids = self.state_mut().await.update_worktree(
568 request.sender_id,
569 request.payload.project_id,
570 request.payload.worktree_id,
571 &request.payload.removed_entries,
572 &request.payload.updated_entries,
573 )?;
574
575 broadcast(request.sender_id, connection_ids, |connection_id| {
576 self.peer
577 .forward_send(request.sender_id, connection_id, request.payload.clone())
578 });
579
580 Ok(proto::Ack {})
581 }
582
583 async fn update_diagnostic_summary(
584 self: Arc<Server>,
585 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
586 ) -> Result<()> {
587 let summary = request
588 .payload
589 .summary
590 .clone()
591 .ok_or_else(|| anyhow!("invalid summary"))?;
592 let receiver_ids = self.state_mut().await.update_diagnostic_summary(
593 request.payload.project_id,
594 request.payload.worktree_id,
595 request.sender_id,
596 summary,
597 )?;
598
599 broadcast(request.sender_id, receiver_ids, |connection_id| {
600 self.peer
601 .forward_send(request.sender_id, connection_id, request.payload.clone())
602 });
603 Ok(())
604 }
605
606 async fn start_language_server(
607 self: Arc<Server>,
608 request: TypedEnvelope<proto::StartLanguageServer>,
609 ) -> Result<()> {
610 let receiver_ids = self.state_mut().await.start_language_server(
611 request.payload.project_id,
612 request.sender_id,
613 request
614 .payload
615 .server
616 .clone()
617 .ok_or_else(|| anyhow!("invalid language server"))?,
618 )?;
619 broadcast(request.sender_id, receiver_ids, |connection_id| {
620 self.peer
621 .forward_send(request.sender_id, connection_id, request.payload.clone())
622 });
623 Ok(())
624 }
625
626 async fn update_language_server(
627 self: Arc<Server>,
628 request: TypedEnvelope<proto::UpdateLanguageServer>,
629 ) -> Result<()> {
630 let receiver_ids = self
631 .state()
632 .await
633 .project_connection_ids(request.payload.project_id, request.sender_id)?;
634 broadcast(request.sender_id, receiver_ids, |connection_id| {
635 self.peer
636 .forward_send(request.sender_id, connection_id, request.payload.clone())
637 });
638 Ok(())
639 }
640
641 async fn forward_project_request<T>(
642 self: Arc<Server>,
643 request: TypedEnvelope<T>,
644 ) -> Result<T::Response>
645 where
646 T: EntityMessage + RequestMessage,
647 {
648 let host_connection_id = self
649 .state()
650 .await
651 .read_project(request.payload.remote_entity_id(), request.sender_id)?
652 .host_connection_id;
653 Ok(self
654 .peer
655 .forward_request(request.sender_id, host_connection_id, request.payload)
656 .await?)
657 }
658
659 async fn save_buffer(
660 self: Arc<Server>,
661 request: TypedEnvelope<proto::SaveBuffer>,
662 ) -> Result<proto::BufferSaved> {
663 let host = self
664 .state()
665 .await
666 .read_project(request.payload.project_id, request.sender_id)?
667 .host_connection_id;
668 let response = self
669 .peer
670 .forward_request(request.sender_id, host, request.payload.clone())
671 .await?;
672
673 let mut guests = self
674 .state()
675 .await
676 .read_project(request.payload.project_id, request.sender_id)?
677 .connection_ids();
678 guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
679 broadcast(host, guests, |conn_id| {
680 self.peer.forward_send(host, conn_id, response.clone())
681 });
682
683 Ok(response)
684 }
685
686 async fn update_buffer(
687 self: Arc<Server>,
688 request: TypedEnvelope<proto::UpdateBuffer>,
689 ) -> Result<proto::Ack> {
690 let receiver_ids = self
691 .state()
692 .await
693 .project_connection_ids(request.payload.project_id, request.sender_id)?;
694 broadcast(request.sender_id, receiver_ids, |connection_id| {
695 self.peer
696 .forward_send(request.sender_id, connection_id, request.payload.clone())
697 });
698 Ok(proto::Ack {})
699 }
700
701 async fn update_buffer_file(
702 self: Arc<Server>,
703 request: TypedEnvelope<proto::UpdateBufferFile>,
704 ) -> Result<()> {
705 let receiver_ids = self
706 .state()
707 .await
708 .project_connection_ids(request.payload.project_id, request.sender_id)?;
709 broadcast(request.sender_id, receiver_ids, |connection_id| {
710 self.peer
711 .forward_send(request.sender_id, connection_id, request.payload.clone())
712 });
713 Ok(())
714 }
715
716 async fn buffer_reloaded(
717 self: Arc<Server>,
718 request: TypedEnvelope<proto::BufferReloaded>,
719 ) -> Result<()> {
720 let receiver_ids = self
721 .state()
722 .await
723 .project_connection_ids(request.payload.project_id, request.sender_id)?;
724 broadcast(request.sender_id, receiver_ids, |connection_id| {
725 self.peer
726 .forward_send(request.sender_id, connection_id, request.payload.clone())
727 });
728 Ok(())
729 }
730
731 async fn buffer_saved(
732 self: Arc<Server>,
733 request: TypedEnvelope<proto::BufferSaved>,
734 ) -> Result<()> {
735 let receiver_ids = self
736 .state()
737 .await
738 .project_connection_ids(request.payload.project_id, request.sender_id)?;
739 broadcast(request.sender_id, receiver_ids, |connection_id| {
740 self.peer
741 .forward_send(request.sender_id, connection_id, request.payload.clone())
742 });
743 Ok(())
744 }
745
746 async fn follow(
747 self: Arc<Self>,
748 request: TypedEnvelope<proto::Follow>,
749 ) -> Result<proto::FollowResponse> {
750 let leader_id = ConnectionId(request.payload.leader_id);
751 let follower_id = request.sender_id;
752 if !self
753 .state()
754 .await
755 .project_connection_ids(request.payload.project_id, follower_id)?
756 .contains(&leader_id)
757 {
758 Err(anyhow!("no such peer"))?;
759 }
760 let mut response = self
761 .peer
762 .forward_request(request.sender_id, leader_id, request.payload)
763 .await?;
764 response
765 .views
766 .retain(|view| view.leader_id != Some(follower_id.0));
767 Ok(response)
768 }
769
770 async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
771 let leader_id = ConnectionId(request.payload.leader_id);
772 if !self
773 .state()
774 .await
775 .project_connection_ids(request.payload.project_id, request.sender_id)?
776 .contains(&leader_id)
777 {
778 Err(anyhow!("no such peer"))?;
779 }
780 self.peer
781 .forward_send(request.sender_id, leader_id, request.payload)?;
782 Ok(())
783 }
784
785 async fn update_followers(
786 self: Arc<Self>,
787 request: TypedEnvelope<proto::UpdateFollowers>,
788 ) -> Result<()> {
789 let connection_ids = self
790 .state()
791 .await
792 .project_connection_ids(request.payload.project_id, request.sender_id)?;
793 let leader_id = request
794 .payload
795 .variant
796 .as_ref()
797 .and_then(|variant| match variant {
798 proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
799 proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
800 proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
801 });
802 for follower_id in &request.payload.follower_ids {
803 let follower_id = ConnectionId(*follower_id);
804 if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
805 self.peer
806 .forward_send(request.sender_id, follower_id, request.payload.clone())?;
807 }
808 }
809 Ok(())
810 }
811
812 async fn get_channels(
813 self: Arc<Server>,
814 request: TypedEnvelope<proto::GetChannels>,
815 ) -> Result<proto::GetChannelsResponse> {
816 let user_id = self
817 .state()
818 .await
819 .user_id_for_connection(request.sender_id)?;
820 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
821 Ok(proto::GetChannelsResponse {
822 channels: channels
823 .into_iter()
824 .map(|chan| proto::Channel {
825 id: chan.id.to_proto(),
826 name: chan.name,
827 })
828 .collect(),
829 })
830 }
831
832 async fn get_users(
833 self: Arc<Server>,
834 request: TypedEnvelope<proto::GetUsers>,
835 ) -> Result<proto::GetUsersResponse> {
836 let user_ids = request
837 .payload
838 .user_ids
839 .into_iter()
840 .map(UserId::from_proto)
841 .collect();
842 let users = self
843 .app_state
844 .db
845 .get_users_by_ids(user_ids)
846 .await?
847 .into_iter()
848 .map(|user| proto::User {
849 id: user.id.to_proto(),
850 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
851 github_login: user.github_login,
852 })
853 .collect();
854 Ok(proto::GetUsersResponse { users })
855 }
856
857 fn update_contacts_for_users<'a>(
858 self: &Arc<Self>,
859 state: &Store,
860 user_ids: impl IntoIterator<Item = &'a UserId>,
861 ) {
862 for user_id in user_ids {
863 let contacts = state.contacts_for_user(*user_id);
864 for connection_id in state.connection_ids_for_user(*user_id) {
865 self.peer
866 .send(
867 connection_id,
868 proto::UpdateContacts {
869 contacts: contacts.clone(),
870 },
871 )
872 .log_err();
873 }
874 }
875 }
876
877 async fn join_channel(
878 self: Arc<Self>,
879 request: TypedEnvelope<proto::JoinChannel>,
880 ) -> Result<proto::JoinChannelResponse> {
881 let user_id = self
882 .state()
883 .await
884 .user_id_for_connection(request.sender_id)?;
885 let channel_id = ChannelId::from_proto(request.payload.channel_id);
886 if !self
887 .app_state
888 .db
889 .can_user_access_channel(user_id, channel_id)
890 .await?
891 {
892 Err(anyhow!("access denied"))?;
893 }
894
895 self.state_mut()
896 .await
897 .join_channel(request.sender_id, channel_id);
898 let messages = self
899 .app_state
900 .db
901 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
902 .await?
903 .into_iter()
904 .map(|msg| proto::ChannelMessage {
905 id: msg.id.to_proto(),
906 body: msg.body,
907 timestamp: msg.sent_at.unix_timestamp() as u64,
908 sender_id: msg.sender_id.to_proto(),
909 nonce: Some(msg.nonce.as_u128().into()),
910 })
911 .collect::<Vec<_>>();
912 Ok(proto::JoinChannelResponse {
913 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
914 messages,
915 })
916 }
917
918 async fn leave_channel(
919 self: Arc<Self>,
920 request: TypedEnvelope<proto::LeaveChannel>,
921 ) -> Result<()> {
922 let user_id = self
923 .state()
924 .await
925 .user_id_for_connection(request.sender_id)?;
926 let channel_id = ChannelId::from_proto(request.payload.channel_id);
927 if !self
928 .app_state
929 .db
930 .can_user_access_channel(user_id, channel_id)
931 .await?
932 {
933 Err(anyhow!("access denied"))?;
934 }
935
936 self.state_mut()
937 .await
938 .leave_channel(request.sender_id, channel_id);
939
940 Ok(())
941 }
942
943 async fn send_channel_message(
944 self: Arc<Self>,
945 request: TypedEnvelope<proto::SendChannelMessage>,
946 ) -> Result<proto::SendChannelMessageResponse> {
947 let channel_id = ChannelId::from_proto(request.payload.channel_id);
948 let user_id;
949 let connection_ids;
950 {
951 let state = self.state().await;
952 user_id = state.user_id_for_connection(request.sender_id)?;
953 connection_ids = state.channel_connection_ids(channel_id)?;
954 }
955
956 // Validate the message body.
957 let body = request.payload.body.trim().to_string();
958 if body.len() > MAX_MESSAGE_LEN {
959 return Err(anyhow!("message is too long"))?;
960 }
961 if body.is_empty() {
962 return Err(anyhow!("message can't be blank"))?;
963 }
964
965 let timestamp = OffsetDateTime::now_utc();
966 let nonce = request
967 .payload
968 .nonce
969 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
970
971 let message_id = self
972 .app_state
973 .db
974 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
975 .await?
976 .to_proto();
977 let message = proto::ChannelMessage {
978 sender_id: user_id.to_proto(),
979 id: message_id,
980 body,
981 timestamp: timestamp.unix_timestamp() as u64,
982 nonce: Some(nonce),
983 };
984 broadcast(request.sender_id, connection_ids, |conn_id| {
985 self.peer.send(
986 conn_id,
987 proto::ChannelMessageSent {
988 channel_id: channel_id.to_proto(),
989 message: Some(message.clone()),
990 },
991 )
992 });
993 Ok(proto::SendChannelMessageResponse {
994 message: Some(message),
995 })
996 }
997
998 async fn get_channel_messages(
999 self: Arc<Self>,
1000 request: TypedEnvelope<proto::GetChannelMessages>,
1001 ) -> Result<proto::GetChannelMessagesResponse> {
1002 let user_id = self
1003 .state()
1004 .await
1005 .user_id_for_connection(request.sender_id)?;
1006 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1007 if !self
1008 .app_state
1009 .db
1010 .can_user_access_channel(user_id, channel_id)
1011 .await?
1012 {
1013 Err(anyhow!("access denied"))?;
1014 }
1015
1016 let messages = self
1017 .app_state
1018 .db
1019 .get_channel_messages(
1020 channel_id,
1021 MESSAGE_COUNT_PER_PAGE,
1022 Some(MessageId::from_proto(request.payload.before_message_id)),
1023 )
1024 .await?
1025 .into_iter()
1026 .map(|msg| proto::ChannelMessage {
1027 id: msg.id.to_proto(),
1028 body: msg.body,
1029 timestamp: msg.sent_at.unix_timestamp() as u64,
1030 sender_id: msg.sender_id.to_proto(),
1031 nonce: Some(msg.nonce.as_u128().into()),
1032 })
1033 .collect::<Vec<_>>();
1034
1035 Ok(proto::GetChannelMessagesResponse {
1036 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1037 messages,
1038 })
1039 }
1040
1041 async fn state<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1042 #[cfg(test)]
1043 tokio::task::yield_now().await;
1044 let guard = self.store.read().await;
1045 #[cfg(test)]
1046 tokio::task::yield_now().await;
1047 StoreReadGuard {
1048 guard,
1049 _not_send: PhantomData,
1050 }
1051 }
1052
1053 async fn state_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1054 #[cfg(test)]
1055 tokio::task::yield_now().await;
1056 let guard = self.store.write().await;
1057 #[cfg(test)]
1058 tokio::task::yield_now().await;
1059 StoreWriteGuard {
1060 guard,
1061 _not_send: PhantomData,
1062 }
1063 }
1064}
1065
1066impl<'a> Deref for StoreReadGuard<'a> {
1067 type Target = Store;
1068
1069 fn deref(&self) -> &Self::Target {
1070 &*self.guard
1071 }
1072}
1073
1074impl<'a> Deref for StoreWriteGuard<'a> {
1075 type Target = Store;
1076
1077 fn deref(&self) -> &Self::Target {
1078 &*self.guard
1079 }
1080}
1081
1082impl<'a> DerefMut for StoreWriteGuard<'a> {
1083 fn deref_mut(&mut self) -> &mut Self::Target {
1084 &mut *self.guard
1085 }
1086}
1087
1088impl<'a> Drop for StoreWriteGuard<'a> {
1089 fn drop(&mut self) {
1090 #[cfg(test)]
1091 self.check_invariants();
1092 }
1093}
1094
1095impl Executor for RealExecutor {
1096 type Sleep = Sleep;
1097
1098 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1099 tokio::task::spawn(future);
1100 }
1101
1102 fn sleep(&self, duration: Duration) -> Self::Sleep {
1103 tokio::time::sleep(duration)
1104 }
1105}
1106
1107fn broadcast<F>(sender_id: ConnectionId, receiver_ids: Vec<ConnectionId>, mut f: F)
1108where
1109 F: FnMut(ConnectionId) -> anyhow::Result<()>,
1110{
1111 for receiver_id in receiver_ids {
1112 if receiver_id != sender_id {
1113 f(receiver_id).log_err();
1114 }
1115 }
1116}
1117
1118lazy_static! {
1119 static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1120}
1121
1122pub struct ProtocolVersion(u32);
1123
1124impl Header for ProtocolVersion {
1125 fn name() -> &'static HeaderName {
1126 &ZED_PROTOCOL_VERSION
1127 }
1128
1129 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1130 where
1131 Self: Sized,
1132 I: Iterator<Item = &'i axum::http::HeaderValue>,
1133 {
1134 let version = values
1135 .next()
1136 .ok_or_else(|| axum::headers::Error::invalid())?
1137 .to_str()
1138 .map_err(|_| axum::headers::Error::invalid())?
1139 .parse()
1140 .map_err(|_| axum::headers::Error::invalid())?;
1141 Ok(Self(version))
1142 }
1143
1144 fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1145 values.extend([self.0.to_string().parse().unwrap()]);
1146 }
1147}
1148
1149pub fn routes(app_state: Arc<AppState>) -> Router<Body> {
1150 let server = Server::new(app_state.clone(), None);
1151 Router::new()
1152 .route("/rpc", get(handle_websocket_request))
1153 .layer(
1154 ServiceBuilder::new()
1155 .layer(Extension(app_state))
1156 .layer(middleware::from_fn(auth::validate_header))
1157 .layer(Extension(server)),
1158 )
1159}
1160
1161pub async fn handle_websocket_request(
1162 TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1163 ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1164 Extension(server): Extension<Arc<Server>>,
1165 Extension(user_id): Extension<UserId>,
1166 ws: WebSocketUpgrade,
1167) -> Response {
1168 if protocol_version != rpc::PROTOCOL_VERSION {
1169 return (
1170 StatusCode::UPGRADE_REQUIRED,
1171 "client must be upgraded".to_string(),
1172 )
1173 .into_response();
1174 }
1175 let socket_address = socket_address.to_string();
1176 ws.on_upgrade(move |socket| {
1177 let socket = socket
1178 .map_ok(to_tungstenite_message)
1179 .err_into()
1180 .with(|message| async move { Ok(to_axum_message(message)) });
1181 let connection = Connection::new(Box::pin(socket));
1182 server.handle_connection(connection, socket_address, user_id, None, RealExecutor)
1183 })
1184}
1185
1186fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1187 match message {
1188 TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1189 TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1190 TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1191 TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1192 TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1193 code: frame.code.into(),
1194 reason: frame.reason,
1195 })),
1196 }
1197}
1198
1199fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1200 match message {
1201 AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1202 AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1203 AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1204 AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1205 AxumMessage::Close(frame) => {
1206 TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1207 code: frame.code.into(),
1208 reason: frame.reason,
1209 }))
1210 }
1211 }
1212}
1213
1214#[cfg(test)]
1215mod tests {
1216 use super::*;
1217 use crate::{
1218 db::{tests::TestDb, UserId},
1219 AppState,
1220 };
1221 use ::rpc::Peer;
1222 use client::{
1223 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1224 EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1225 };
1226 use collections::BTreeMap;
1227 use editor::{
1228 self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1229 ToOffset, ToggleCodeActions, Undo,
1230 };
1231 use gpui::{
1232 executor::{self, Deterministic},
1233 geometry::vector::vec2f,
1234 ModelHandle, TestAppContext, ViewHandle,
1235 };
1236 use language::{
1237 range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1238 LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1239 };
1240 use lsp::{self, FakeLanguageServer};
1241 use parking_lot::Mutex;
1242 use project::{
1243 fs::{FakeFs, Fs as _},
1244 search::SearchQuery,
1245 worktree::WorktreeHandle,
1246 DiagnosticSummary, Project, ProjectPath, WorktreeId,
1247 };
1248 use rand::prelude::*;
1249 use rpc::PeerId;
1250 use serde_json::json;
1251 use settings::Settings;
1252 use sqlx::types::time::OffsetDateTime;
1253 use std::{
1254 env,
1255 ops::Deref,
1256 path::{Path, PathBuf},
1257 rc::Rc,
1258 sync::{
1259 atomic::{AtomicBool, Ordering::SeqCst},
1260 Arc,
1261 },
1262 time::Duration,
1263 };
1264 use theme::ThemeRegistry;
1265 use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
1266
1267 #[cfg(test)]
1268 #[ctor::ctor]
1269 fn init_logger() {
1270 if std::env::var("RUST_LOG").is_ok() {
1271 env_logger::init();
1272 }
1273 }
1274
1275 #[gpui::test(iterations = 10)]
1276 async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1277 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1278 let lang_registry = Arc::new(LanguageRegistry::test());
1279 let fs = FakeFs::new(cx_a.background());
1280 cx_a.foreground().forbid_parking();
1281
1282 // Connect to a server as 2 clients.
1283 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1284 let client_a = server.create_client(cx_a, "user_a").await;
1285 let client_b = server.create_client(cx_b, "user_b").await;
1286
1287 // Share a project as client A
1288 fs.insert_tree(
1289 "/a",
1290 json!({
1291 ".zed.toml": r#"collaborators = ["user_b"]"#,
1292 "a.txt": "a-contents",
1293 "b.txt": "b-contents",
1294 }),
1295 )
1296 .await;
1297 let project_a = cx_a.update(|cx| {
1298 Project::local(
1299 client_a.clone(),
1300 client_a.user_store.clone(),
1301 lang_registry.clone(),
1302 fs.clone(),
1303 cx,
1304 )
1305 });
1306 let (worktree_a, _) = project_a
1307 .update(cx_a, |p, cx| {
1308 p.find_or_create_local_worktree("/a", true, cx)
1309 })
1310 .await
1311 .unwrap();
1312 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1313 worktree_a
1314 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1315 .await;
1316 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1317 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1318
1319 // Join that project as client B
1320 let project_b = Project::remote(
1321 project_id,
1322 client_b.clone(),
1323 client_b.user_store.clone(),
1324 lang_registry.clone(),
1325 fs.clone(),
1326 &mut cx_b.to_async(),
1327 )
1328 .await
1329 .unwrap();
1330
1331 let replica_id_b = project_b.read_with(cx_b, |project, _| {
1332 assert_eq!(
1333 project
1334 .collaborators()
1335 .get(&client_a.peer_id)
1336 .unwrap()
1337 .user
1338 .github_login,
1339 "user_a"
1340 );
1341 project.replica_id()
1342 });
1343 project_a
1344 .condition(&cx_a, |tree, _| {
1345 tree.collaborators()
1346 .get(&client_b.peer_id)
1347 .map_or(false, |collaborator| {
1348 collaborator.replica_id == replica_id_b
1349 && collaborator.user.github_login == "user_b"
1350 })
1351 })
1352 .await;
1353
1354 // Open the same file as client B and client A.
1355 let buffer_b = project_b
1356 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1357 .await
1358 .unwrap();
1359 buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1360 project_a.read_with(cx_a, |project, cx| {
1361 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1362 });
1363 let buffer_a = project_a
1364 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1365 .await
1366 .unwrap();
1367
1368 let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1369
1370 // TODO
1371 // // Create a selection set as client B and see that selection set as client A.
1372 // buffer_a
1373 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1374 // .await;
1375
1376 // Edit the buffer as client B and see that edit as client A.
1377 editor_b.update(cx_b, |editor, cx| {
1378 editor.handle_input(&Input("ok, ".into()), cx)
1379 });
1380 buffer_a
1381 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1382 .await;
1383
1384 // TODO
1385 // // Remove the selection set as client B, see those selections disappear as client A.
1386 cx_b.update(move |_| drop(editor_b));
1387 // buffer_a
1388 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1389 // .await;
1390
1391 // Dropping the client B's project removes client B from client A's collaborators.
1392 cx_b.update(move |_| drop(project_b));
1393 project_a
1394 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1395 .await;
1396 }
1397
1398 #[gpui::test(iterations = 10)]
1399 async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1400 let lang_registry = Arc::new(LanguageRegistry::test());
1401 let fs = FakeFs::new(cx_a.background());
1402 cx_a.foreground().forbid_parking();
1403
1404 // Connect to a server as 2 clients.
1405 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1406 let client_a = server.create_client(cx_a, "user_a").await;
1407 let client_b = server.create_client(cx_b, "user_b").await;
1408
1409 // Share a project as client A
1410 fs.insert_tree(
1411 "/a",
1412 json!({
1413 ".zed.toml": r#"collaborators = ["user_b"]"#,
1414 "a.txt": "a-contents",
1415 "b.txt": "b-contents",
1416 }),
1417 )
1418 .await;
1419 let project_a = cx_a.update(|cx| {
1420 Project::local(
1421 client_a.clone(),
1422 client_a.user_store.clone(),
1423 lang_registry.clone(),
1424 fs.clone(),
1425 cx,
1426 )
1427 });
1428 let (worktree_a, _) = project_a
1429 .update(cx_a, |p, cx| {
1430 p.find_or_create_local_worktree("/a", true, cx)
1431 })
1432 .await
1433 .unwrap();
1434 worktree_a
1435 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1436 .await;
1437 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1438 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1439 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1440 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1441
1442 // Join that project as client B
1443 let project_b = Project::remote(
1444 project_id,
1445 client_b.clone(),
1446 client_b.user_store.clone(),
1447 lang_registry.clone(),
1448 fs.clone(),
1449 &mut cx_b.to_async(),
1450 )
1451 .await
1452 .unwrap();
1453 project_b
1454 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1455 .await
1456 .unwrap();
1457
1458 // Unshare the project as client A
1459 project_a.update(cx_a, |project, cx| project.unshare(cx));
1460 project_b
1461 .condition(cx_b, |project, _| project.is_read_only())
1462 .await;
1463 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1464 cx_b.update(|_| {
1465 drop(project_b);
1466 });
1467
1468 // Share the project again and ensure guests can still join.
1469 project_a
1470 .update(cx_a, |project, cx| project.share(cx))
1471 .await
1472 .unwrap();
1473 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1474
1475 let project_b2 = Project::remote(
1476 project_id,
1477 client_b.clone(),
1478 client_b.user_store.clone(),
1479 lang_registry.clone(),
1480 fs.clone(),
1481 &mut cx_b.to_async(),
1482 )
1483 .await
1484 .unwrap();
1485 project_b2
1486 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1487 .await
1488 .unwrap();
1489 }
1490
1491 #[gpui::test(iterations = 10)]
1492 async fn test_host_disconnect(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1493 let lang_registry = Arc::new(LanguageRegistry::test());
1494 let fs = FakeFs::new(cx_a.background());
1495 cx_a.foreground().forbid_parking();
1496
1497 // Connect to a server as 2 clients.
1498 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1499 let client_a = server.create_client(cx_a, "user_a").await;
1500 let client_b = server.create_client(cx_b, "user_b").await;
1501
1502 // Share a project as client A
1503 fs.insert_tree(
1504 "/a",
1505 json!({
1506 ".zed.toml": r#"collaborators = ["user_b"]"#,
1507 "a.txt": "a-contents",
1508 "b.txt": "b-contents",
1509 }),
1510 )
1511 .await;
1512 let project_a = cx_a.update(|cx| {
1513 Project::local(
1514 client_a.clone(),
1515 client_a.user_store.clone(),
1516 lang_registry.clone(),
1517 fs.clone(),
1518 cx,
1519 )
1520 });
1521 let (worktree_a, _) = project_a
1522 .update(cx_a, |p, cx| {
1523 p.find_or_create_local_worktree("/a", true, cx)
1524 })
1525 .await
1526 .unwrap();
1527 worktree_a
1528 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1529 .await;
1530 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1531 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1532 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1533 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1534
1535 // Join that project as client B
1536 let project_b = Project::remote(
1537 project_id,
1538 client_b.clone(),
1539 client_b.user_store.clone(),
1540 lang_registry.clone(),
1541 fs.clone(),
1542 &mut cx_b.to_async(),
1543 )
1544 .await
1545 .unwrap();
1546 project_b
1547 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1548 .await
1549 .unwrap();
1550
1551 // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
1552 server.disconnect_client(client_a.current_user_id(cx_a));
1553 cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
1554 project_a
1555 .condition(cx_a, |project, _| project.collaborators().is_empty())
1556 .await;
1557 project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
1558 project_b
1559 .condition(cx_b, |project, _| project.is_read_only())
1560 .await;
1561 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1562 cx_b.update(|_| {
1563 drop(project_b);
1564 });
1565
1566 // Await reconnection
1567 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1568
1569 // Share the project again and ensure guests can still join.
1570 project_a
1571 .update(cx_a, |project, cx| project.share(cx))
1572 .await
1573 .unwrap();
1574 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1575
1576 let project_b2 = Project::remote(
1577 project_id,
1578 client_b.clone(),
1579 client_b.user_store.clone(),
1580 lang_registry.clone(),
1581 fs.clone(),
1582 &mut cx_b.to_async(),
1583 )
1584 .await
1585 .unwrap();
1586 project_b2
1587 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1588 .await
1589 .unwrap();
1590 }
1591
1592 #[gpui::test(iterations = 10)]
1593 async fn test_propagate_saves_and_fs_changes(
1594 cx_a: &mut TestAppContext,
1595 cx_b: &mut TestAppContext,
1596 cx_c: &mut TestAppContext,
1597 ) {
1598 let lang_registry = Arc::new(LanguageRegistry::test());
1599 let fs = FakeFs::new(cx_a.background());
1600 cx_a.foreground().forbid_parking();
1601
1602 // Connect to a server as 3 clients.
1603 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1604 let client_a = server.create_client(cx_a, "user_a").await;
1605 let client_b = server.create_client(cx_b, "user_b").await;
1606 let client_c = server.create_client(cx_c, "user_c").await;
1607
1608 // Share a worktree as client A.
1609 fs.insert_tree(
1610 "/a",
1611 json!({
1612 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1613 "file1": "",
1614 "file2": ""
1615 }),
1616 )
1617 .await;
1618 let project_a = cx_a.update(|cx| {
1619 Project::local(
1620 client_a.clone(),
1621 client_a.user_store.clone(),
1622 lang_registry.clone(),
1623 fs.clone(),
1624 cx,
1625 )
1626 });
1627 let (worktree_a, _) = project_a
1628 .update(cx_a, |p, cx| {
1629 p.find_or_create_local_worktree("/a", true, cx)
1630 })
1631 .await
1632 .unwrap();
1633 worktree_a
1634 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1635 .await;
1636 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1637 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1638 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1639
1640 // Join that worktree as clients B and C.
1641 let project_b = Project::remote(
1642 project_id,
1643 client_b.clone(),
1644 client_b.user_store.clone(),
1645 lang_registry.clone(),
1646 fs.clone(),
1647 &mut cx_b.to_async(),
1648 )
1649 .await
1650 .unwrap();
1651 let project_c = Project::remote(
1652 project_id,
1653 client_c.clone(),
1654 client_c.user_store.clone(),
1655 lang_registry.clone(),
1656 fs.clone(),
1657 &mut cx_c.to_async(),
1658 )
1659 .await
1660 .unwrap();
1661 let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1662 let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1663
1664 // Open and edit a buffer as both guests B and C.
1665 let buffer_b = project_b
1666 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1667 .await
1668 .unwrap();
1669 let buffer_c = project_c
1670 .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1671 .await
1672 .unwrap();
1673 buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1674 buffer_c.update(cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1675
1676 // Open and edit that buffer as the host.
1677 let buffer_a = project_a
1678 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1679 .await
1680 .unwrap();
1681
1682 buffer_a
1683 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1684 .await;
1685 buffer_a.update(cx_a, |buf, cx| {
1686 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1687 });
1688
1689 // Wait for edits to propagate
1690 buffer_a
1691 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1692 .await;
1693 buffer_b
1694 .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1695 .await;
1696 buffer_c
1697 .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1698 .await;
1699
1700 // Edit the buffer as the host and concurrently save as guest B.
1701 let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1702 buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1703 save_b.await.unwrap();
1704 assert_eq!(
1705 fs.load("/a/file1".as_ref()).await.unwrap(),
1706 "hi-a, i-am-c, i-am-b, i-am-a"
1707 );
1708 buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1709 buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1710 buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1711
1712 worktree_a.flush_fs_events(cx_a).await;
1713
1714 // Make changes on host's file system, see those changes on guest worktrees.
1715 fs.rename(
1716 "/a/file1".as_ref(),
1717 "/a/file1-renamed".as_ref(),
1718 Default::default(),
1719 )
1720 .await
1721 .unwrap();
1722
1723 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1724 .await
1725 .unwrap();
1726 fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1727
1728 worktree_a
1729 .condition(&cx_a, |tree, _| {
1730 tree.paths()
1731 .map(|p| p.to_string_lossy())
1732 .collect::<Vec<_>>()
1733 == [".zed.toml", "file1-renamed", "file3", "file4"]
1734 })
1735 .await;
1736 worktree_b
1737 .condition(&cx_b, |tree, _| {
1738 tree.paths()
1739 .map(|p| p.to_string_lossy())
1740 .collect::<Vec<_>>()
1741 == [".zed.toml", "file1-renamed", "file3", "file4"]
1742 })
1743 .await;
1744 worktree_c
1745 .condition(&cx_c, |tree, _| {
1746 tree.paths()
1747 .map(|p| p.to_string_lossy())
1748 .collect::<Vec<_>>()
1749 == [".zed.toml", "file1-renamed", "file3", "file4"]
1750 })
1751 .await;
1752
1753 // Ensure buffer files are updated as well.
1754 buffer_a
1755 .condition(&cx_a, |buf, _| {
1756 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1757 })
1758 .await;
1759 buffer_b
1760 .condition(&cx_b, |buf, _| {
1761 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1762 })
1763 .await;
1764 buffer_c
1765 .condition(&cx_c, |buf, _| {
1766 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1767 })
1768 .await;
1769 }
1770
1771 #[gpui::test(iterations = 10)]
1772 async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1773 cx_a.foreground().forbid_parking();
1774 let lang_registry = Arc::new(LanguageRegistry::test());
1775 let fs = FakeFs::new(cx_a.background());
1776
1777 // Connect to a server as 2 clients.
1778 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1779 let client_a = server.create_client(cx_a, "user_a").await;
1780 let client_b = server.create_client(cx_b, "user_b").await;
1781
1782 // Share a project as client A
1783 fs.insert_tree(
1784 "/dir",
1785 json!({
1786 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1787 "a.txt": "a-contents",
1788 }),
1789 )
1790 .await;
1791
1792 let project_a = cx_a.update(|cx| {
1793 Project::local(
1794 client_a.clone(),
1795 client_a.user_store.clone(),
1796 lang_registry.clone(),
1797 fs.clone(),
1798 cx,
1799 )
1800 });
1801 let (worktree_a, _) = project_a
1802 .update(cx_a, |p, cx| {
1803 p.find_or_create_local_worktree("/dir", true, cx)
1804 })
1805 .await
1806 .unwrap();
1807 worktree_a
1808 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1809 .await;
1810 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1811 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1812 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1813
1814 // Join that project as client B
1815 let project_b = Project::remote(
1816 project_id,
1817 client_b.clone(),
1818 client_b.user_store.clone(),
1819 lang_registry.clone(),
1820 fs.clone(),
1821 &mut cx_b.to_async(),
1822 )
1823 .await
1824 .unwrap();
1825
1826 // Open a buffer as client B
1827 let buffer_b = project_b
1828 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1829 .await
1830 .unwrap();
1831
1832 buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1833 buffer_b.read_with(cx_b, |buf, _| {
1834 assert!(buf.is_dirty());
1835 assert!(!buf.has_conflict());
1836 });
1837
1838 buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
1839 buffer_b
1840 .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1841 .await;
1842 buffer_b.read_with(cx_b, |buf, _| {
1843 assert!(!buf.has_conflict());
1844 });
1845
1846 buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1847 buffer_b.read_with(cx_b, |buf, _| {
1848 assert!(buf.is_dirty());
1849 assert!(!buf.has_conflict());
1850 });
1851 }
1852
1853 #[gpui::test(iterations = 10)]
1854 async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1855 cx_a.foreground().forbid_parking();
1856 let lang_registry = Arc::new(LanguageRegistry::test());
1857 let fs = FakeFs::new(cx_a.background());
1858
1859 // Connect to a server as 2 clients.
1860 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1861 let client_a = server.create_client(cx_a, "user_a").await;
1862 let client_b = server.create_client(cx_b, "user_b").await;
1863
1864 // Share a project as client A
1865 fs.insert_tree(
1866 "/dir",
1867 json!({
1868 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1869 "a.txt": "a-contents",
1870 }),
1871 )
1872 .await;
1873
1874 let project_a = cx_a.update(|cx| {
1875 Project::local(
1876 client_a.clone(),
1877 client_a.user_store.clone(),
1878 lang_registry.clone(),
1879 fs.clone(),
1880 cx,
1881 )
1882 });
1883 let (worktree_a, _) = project_a
1884 .update(cx_a, |p, cx| {
1885 p.find_or_create_local_worktree("/dir", true, cx)
1886 })
1887 .await
1888 .unwrap();
1889 worktree_a
1890 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1891 .await;
1892 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1893 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1894 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1895
1896 // Join that project as client B
1897 let project_b = Project::remote(
1898 project_id,
1899 client_b.clone(),
1900 client_b.user_store.clone(),
1901 lang_registry.clone(),
1902 fs.clone(),
1903 &mut cx_b.to_async(),
1904 )
1905 .await
1906 .unwrap();
1907 let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1908
1909 // Open a buffer as client B
1910 let buffer_b = project_b
1911 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1912 .await
1913 .unwrap();
1914 buffer_b.read_with(cx_b, |buf, _| {
1915 assert!(!buf.is_dirty());
1916 assert!(!buf.has_conflict());
1917 });
1918
1919 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1920 .await
1921 .unwrap();
1922 buffer_b
1923 .condition(&cx_b, |buf, _| {
1924 buf.text() == "new contents" && !buf.is_dirty()
1925 })
1926 .await;
1927 buffer_b.read_with(cx_b, |buf, _| {
1928 assert!(!buf.has_conflict());
1929 });
1930 }
1931
1932 #[gpui::test(iterations = 10)]
1933 async fn test_editing_while_guest_opens_buffer(
1934 cx_a: &mut TestAppContext,
1935 cx_b: &mut TestAppContext,
1936 ) {
1937 cx_a.foreground().forbid_parking();
1938 let lang_registry = Arc::new(LanguageRegistry::test());
1939 let fs = FakeFs::new(cx_a.background());
1940
1941 // Connect to a server as 2 clients.
1942 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1943 let client_a = server.create_client(cx_a, "user_a").await;
1944 let client_b = server.create_client(cx_b, "user_b").await;
1945
1946 // Share a project as client A
1947 fs.insert_tree(
1948 "/dir",
1949 json!({
1950 ".zed.toml": r#"collaborators = ["user_b"]"#,
1951 "a.txt": "a-contents",
1952 }),
1953 )
1954 .await;
1955 let project_a = cx_a.update(|cx| {
1956 Project::local(
1957 client_a.clone(),
1958 client_a.user_store.clone(),
1959 lang_registry.clone(),
1960 fs.clone(),
1961 cx,
1962 )
1963 });
1964 let (worktree_a, _) = project_a
1965 .update(cx_a, |p, cx| {
1966 p.find_or_create_local_worktree("/dir", true, cx)
1967 })
1968 .await
1969 .unwrap();
1970 worktree_a
1971 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1972 .await;
1973 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1974 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1975 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1976
1977 // Join that project as client B
1978 let project_b = Project::remote(
1979 project_id,
1980 client_b.clone(),
1981 client_b.user_store.clone(),
1982 lang_registry.clone(),
1983 fs.clone(),
1984 &mut cx_b.to_async(),
1985 )
1986 .await
1987 .unwrap();
1988
1989 // Open a buffer as client A
1990 let buffer_a = project_a
1991 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1992 .await
1993 .unwrap();
1994
1995 // Start opening the same buffer as client B
1996 let buffer_b = cx_b
1997 .background()
1998 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1999
2000 // Edit the buffer as client A while client B is still opening it.
2001 cx_b.background().simulate_random_delay().await;
2002 buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "X", cx));
2003 cx_b.background().simulate_random_delay().await;
2004 buffer_a.update(cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
2005
2006 let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2007 let buffer_b = buffer_b.await.unwrap();
2008 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2009 }
2010
2011 #[gpui::test(iterations = 10)]
2012 async fn test_leaving_worktree_while_opening_buffer(
2013 cx_a: &mut TestAppContext,
2014 cx_b: &mut TestAppContext,
2015 ) {
2016 cx_a.foreground().forbid_parking();
2017 let lang_registry = Arc::new(LanguageRegistry::test());
2018 let fs = FakeFs::new(cx_a.background());
2019
2020 // Connect to a server as 2 clients.
2021 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2022 let client_a = server.create_client(cx_a, "user_a").await;
2023 let client_b = server.create_client(cx_b, "user_b").await;
2024
2025 // Share a project as client A
2026 fs.insert_tree(
2027 "/dir",
2028 json!({
2029 ".zed.toml": r#"collaborators = ["user_b"]"#,
2030 "a.txt": "a-contents",
2031 }),
2032 )
2033 .await;
2034 let project_a = cx_a.update(|cx| {
2035 Project::local(
2036 client_a.clone(),
2037 client_a.user_store.clone(),
2038 lang_registry.clone(),
2039 fs.clone(),
2040 cx,
2041 )
2042 });
2043 let (worktree_a, _) = project_a
2044 .update(cx_a, |p, cx| {
2045 p.find_or_create_local_worktree("/dir", true, cx)
2046 })
2047 .await
2048 .unwrap();
2049 worktree_a
2050 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2051 .await;
2052 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2053 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2054 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2055
2056 // Join that project as client B
2057 let project_b = Project::remote(
2058 project_id,
2059 client_b.clone(),
2060 client_b.user_store.clone(),
2061 lang_registry.clone(),
2062 fs.clone(),
2063 &mut cx_b.to_async(),
2064 )
2065 .await
2066 .unwrap();
2067
2068 // See that a guest has joined as client A.
2069 project_a
2070 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2071 .await;
2072
2073 // Begin opening a buffer as client B, but leave the project before the open completes.
2074 let buffer_b = cx_b
2075 .background()
2076 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2077 cx_b.update(|_| drop(project_b));
2078 drop(buffer_b);
2079
2080 // See that the guest has left.
2081 project_a
2082 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2083 .await;
2084 }
2085
2086 #[gpui::test(iterations = 10)]
2087 async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2088 cx_a.foreground().forbid_parking();
2089 let lang_registry = Arc::new(LanguageRegistry::test());
2090 let fs = FakeFs::new(cx_a.background());
2091
2092 // Connect to a server as 2 clients.
2093 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2094 let client_a = server.create_client(cx_a, "user_a").await;
2095 let client_b = server.create_client(cx_b, "user_b").await;
2096
2097 // Share a project as client A
2098 fs.insert_tree(
2099 "/a",
2100 json!({
2101 ".zed.toml": r#"collaborators = ["user_b"]"#,
2102 "a.txt": "a-contents",
2103 "b.txt": "b-contents",
2104 }),
2105 )
2106 .await;
2107 let project_a = cx_a.update(|cx| {
2108 Project::local(
2109 client_a.clone(),
2110 client_a.user_store.clone(),
2111 lang_registry.clone(),
2112 fs.clone(),
2113 cx,
2114 )
2115 });
2116 let (worktree_a, _) = project_a
2117 .update(cx_a, |p, cx| {
2118 p.find_or_create_local_worktree("/a", true, cx)
2119 })
2120 .await
2121 .unwrap();
2122 worktree_a
2123 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2124 .await;
2125 let project_id = project_a
2126 .update(cx_a, |project, _| project.next_remote_id())
2127 .await;
2128 project_a
2129 .update(cx_a, |project, cx| project.share(cx))
2130 .await
2131 .unwrap();
2132
2133 // Join that project as client B
2134 let _project_b = Project::remote(
2135 project_id,
2136 client_b.clone(),
2137 client_b.user_store.clone(),
2138 lang_registry.clone(),
2139 fs.clone(),
2140 &mut cx_b.to_async(),
2141 )
2142 .await
2143 .unwrap();
2144
2145 // Client A sees that a guest has joined.
2146 project_a
2147 .condition(cx_a, |p, _| p.collaborators().len() == 1)
2148 .await;
2149
2150 // Drop client B's connection and ensure client A observes client B leaving the project.
2151 client_b.disconnect(&cx_b.to_async()).unwrap();
2152 project_a
2153 .condition(cx_a, |p, _| p.collaborators().len() == 0)
2154 .await;
2155
2156 // Rejoin the project as client B
2157 let _project_b = Project::remote(
2158 project_id,
2159 client_b.clone(),
2160 client_b.user_store.clone(),
2161 lang_registry.clone(),
2162 fs.clone(),
2163 &mut cx_b.to_async(),
2164 )
2165 .await
2166 .unwrap();
2167
2168 // Client A sees that a guest has re-joined.
2169 project_a
2170 .condition(cx_a, |p, _| p.collaborators().len() == 1)
2171 .await;
2172
2173 // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2174 client_b.wait_for_current_user(cx_b).await;
2175 server.disconnect_client(client_b.current_user_id(cx_b));
2176 cx_a.foreground().advance_clock(Duration::from_secs(3));
2177 project_a
2178 .condition(cx_a, |p, _| p.collaborators().len() == 0)
2179 .await;
2180 }
2181
2182 #[gpui::test(iterations = 10)]
2183 async fn test_collaborating_with_diagnostics(
2184 cx_a: &mut TestAppContext,
2185 cx_b: &mut TestAppContext,
2186 ) {
2187 cx_a.foreground().forbid_parking();
2188 let lang_registry = Arc::new(LanguageRegistry::test());
2189 let fs = FakeFs::new(cx_a.background());
2190
2191 // Set up a fake language server.
2192 let mut language = Language::new(
2193 LanguageConfig {
2194 name: "Rust".into(),
2195 path_suffixes: vec!["rs".to_string()],
2196 ..Default::default()
2197 },
2198 Some(tree_sitter_rust::language()),
2199 );
2200 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2201 lang_registry.add(Arc::new(language));
2202
2203 // Connect to a server as 2 clients.
2204 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2205 let client_a = server.create_client(cx_a, "user_a").await;
2206 let client_b = server.create_client(cx_b, "user_b").await;
2207
2208 // Share a project as client A
2209 fs.insert_tree(
2210 "/a",
2211 json!({
2212 ".zed.toml": r#"collaborators = ["user_b"]"#,
2213 "a.rs": "let one = two",
2214 "other.rs": "",
2215 }),
2216 )
2217 .await;
2218 let project_a = cx_a.update(|cx| {
2219 Project::local(
2220 client_a.clone(),
2221 client_a.user_store.clone(),
2222 lang_registry.clone(),
2223 fs.clone(),
2224 cx,
2225 )
2226 });
2227 let (worktree_a, _) = project_a
2228 .update(cx_a, |p, cx| {
2229 p.find_or_create_local_worktree("/a", true, cx)
2230 })
2231 .await
2232 .unwrap();
2233 worktree_a
2234 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2235 .await;
2236 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2237 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2238 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2239
2240 // Cause the language server to start.
2241 let _ = cx_a
2242 .background()
2243 .spawn(project_a.update(cx_a, |project, cx| {
2244 project.open_buffer(
2245 ProjectPath {
2246 worktree_id,
2247 path: Path::new("other.rs").into(),
2248 },
2249 cx,
2250 )
2251 }))
2252 .await
2253 .unwrap();
2254
2255 // Simulate a language server reporting errors for a file.
2256 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2257 fake_language_server
2258 .receive_notification::<lsp::notification::DidOpenTextDocument>()
2259 .await;
2260 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2261 lsp::PublishDiagnosticsParams {
2262 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2263 version: None,
2264 diagnostics: vec![lsp::Diagnostic {
2265 severity: Some(lsp::DiagnosticSeverity::ERROR),
2266 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2267 message: "message 1".to_string(),
2268 ..Default::default()
2269 }],
2270 },
2271 );
2272
2273 // Wait for server to see the diagnostics update.
2274 server
2275 .condition(|store| {
2276 let worktree = store
2277 .project(project_id)
2278 .unwrap()
2279 .share
2280 .as_ref()
2281 .unwrap()
2282 .worktrees
2283 .get(&worktree_id.to_proto())
2284 .unwrap();
2285
2286 !worktree.diagnostic_summaries.is_empty()
2287 })
2288 .await;
2289
2290 // Join the worktree as client B.
2291 let project_b = Project::remote(
2292 project_id,
2293 client_b.clone(),
2294 client_b.user_store.clone(),
2295 lang_registry.clone(),
2296 fs.clone(),
2297 &mut cx_b.to_async(),
2298 )
2299 .await
2300 .unwrap();
2301
2302 project_b.read_with(cx_b, |project, cx| {
2303 assert_eq!(
2304 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2305 &[(
2306 ProjectPath {
2307 worktree_id,
2308 path: Arc::from(Path::new("a.rs")),
2309 },
2310 DiagnosticSummary {
2311 error_count: 1,
2312 warning_count: 0,
2313 ..Default::default()
2314 },
2315 )]
2316 )
2317 });
2318
2319 // Simulate a language server reporting more errors for a file.
2320 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2321 lsp::PublishDiagnosticsParams {
2322 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2323 version: None,
2324 diagnostics: vec![
2325 lsp::Diagnostic {
2326 severity: Some(lsp::DiagnosticSeverity::ERROR),
2327 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2328 message: "message 1".to_string(),
2329 ..Default::default()
2330 },
2331 lsp::Diagnostic {
2332 severity: Some(lsp::DiagnosticSeverity::WARNING),
2333 range: lsp::Range::new(
2334 lsp::Position::new(0, 10),
2335 lsp::Position::new(0, 13),
2336 ),
2337 message: "message 2".to_string(),
2338 ..Default::default()
2339 },
2340 ],
2341 },
2342 );
2343
2344 // Client b gets the updated summaries
2345 project_b
2346 .condition(&cx_b, |project, cx| {
2347 project.diagnostic_summaries(cx).collect::<Vec<_>>()
2348 == &[(
2349 ProjectPath {
2350 worktree_id,
2351 path: Arc::from(Path::new("a.rs")),
2352 },
2353 DiagnosticSummary {
2354 error_count: 1,
2355 warning_count: 1,
2356 ..Default::default()
2357 },
2358 )]
2359 })
2360 .await;
2361
2362 // Open the file with the errors on client B. They should be present.
2363 let buffer_b = cx_b
2364 .background()
2365 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2366 .await
2367 .unwrap();
2368
2369 buffer_b.read_with(cx_b, |buffer, _| {
2370 assert_eq!(
2371 buffer
2372 .snapshot()
2373 .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2374 .map(|entry| entry)
2375 .collect::<Vec<_>>(),
2376 &[
2377 DiagnosticEntry {
2378 range: Point::new(0, 4)..Point::new(0, 7),
2379 diagnostic: Diagnostic {
2380 group_id: 0,
2381 message: "message 1".to_string(),
2382 severity: lsp::DiagnosticSeverity::ERROR,
2383 is_primary: true,
2384 ..Default::default()
2385 }
2386 },
2387 DiagnosticEntry {
2388 range: Point::new(0, 10)..Point::new(0, 13),
2389 diagnostic: Diagnostic {
2390 group_id: 1,
2391 severity: lsp::DiagnosticSeverity::WARNING,
2392 message: "message 2".to_string(),
2393 is_primary: true,
2394 ..Default::default()
2395 }
2396 }
2397 ]
2398 );
2399 });
2400
2401 // Simulate a language server reporting no errors for a file.
2402 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2403 lsp::PublishDiagnosticsParams {
2404 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2405 version: None,
2406 diagnostics: vec![],
2407 },
2408 );
2409 project_a
2410 .condition(cx_a, |project, cx| {
2411 project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2412 })
2413 .await;
2414 project_b
2415 .condition(cx_b, |project, cx| {
2416 project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2417 })
2418 .await;
2419 }
2420
2421 #[gpui::test(iterations = 10)]
2422 async fn test_collaborating_with_completion(
2423 cx_a: &mut TestAppContext,
2424 cx_b: &mut TestAppContext,
2425 ) {
2426 cx_a.foreground().forbid_parking();
2427 let lang_registry = Arc::new(LanguageRegistry::test());
2428 let fs = FakeFs::new(cx_a.background());
2429
2430 // Set up a fake language server.
2431 let mut language = Language::new(
2432 LanguageConfig {
2433 name: "Rust".into(),
2434 path_suffixes: vec!["rs".to_string()],
2435 ..Default::default()
2436 },
2437 Some(tree_sitter_rust::language()),
2438 );
2439 let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
2440 capabilities: lsp::ServerCapabilities {
2441 completion_provider: Some(lsp::CompletionOptions {
2442 trigger_characters: Some(vec![".".to_string()]),
2443 ..Default::default()
2444 }),
2445 ..Default::default()
2446 },
2447 ..Default::default()
2448 });
2449 lang_registry.add(Arc::new(language));
2450
2451 // Connect to a server as 2 clients.
2452 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2453 let client_a = server.create_client(cx_a, "user_a").await;
2454 let client_b = server.create_client(cx_b, "user_b").await;
2455
2456 // Share a project as client A
2457 fs.insert_tree(
2458 "/a",
2459 json!({
2460 ".zed.toml": r#"collaborators = ["user_b"]"#,
2461 "main.rs": "fn main() { a }",
2462 "other.rs": "",
2463 }),
2464 )
2465 .await;
2466 let project_a = cx_a.update(|cx| {
2467 Project::local(
2468 client_a.clone(),
2469 client_a.user_store.clone(),
2470 lang_registry.clone(),
2471 fs.clone(),
2472 cx,
2473 )
2474 });
2475 let (worktree_a, _) = project_a
2476 .update(cx_a, |p, cx| {
2477 p.find_or_create_local_worktree("/a", true, cx)
2478 })
2479 .await
2480 .unwrap();
2481 worktree_a
2482 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2483 .await;
2484 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2485 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2486 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2487
2488 // Join the worktree as client B.
2489 let project_b = Project::remote(
2490 project_id,
2491 client_b.clone(),
2492 client_b.user_store.clone(),
2493 lang_registry.clone(),
2494 fs.clone(),
2495 &mut cx_b.to_async(),
2496 )
2497 .await
2498 .unwrap();
2499
2500 // Open a file in an editor as the guest.
2501 let buffer_b = project_b
2502 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2503 .await
2504 .unwrap();
2505 let (window_b, _) = cx_b.add_window(|_| EmptyView);
2506 let editor_b = cx_b.add_view(window_b, |cx| {
2507 Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
2508 });
2509
2510 let fake_language_server = fake_language_servers.next().await.unwrap();
2511 buffer_b
2512 .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2513 .await;
2514
2515 // Type a completion trigger character as the guest.
2516 editor_b.update(cx_b, |editor, cx| {
2517 editor.select_ranges([13..13], None, cx);
2518 editor.handle_input(&Input(".".into()), cx);
2519 cx.focus(&editor_b);
2520 });
2521
2522 // Receive a completion request as the host's language server.
2523 // Return some completions from the host's language server.
2524 cx_a.foreground().start_waiting();
2525 fake_language_server
2526 .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
2527 assert_eq!(
2528 params.text_document_position.text_document.uri,
2529 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2530 );
2531 assert_eq!(
2532 params.text_document_position.position,
2533 lsp::Position::new(0, 14),
2534 );
2535
2536 Ok(Some(lsp::CompletionResponse::Array(vec![
2537 lsp::CompletionItem {
2538 label: "first_method(…)".into(),
2539 detail: Some("fn(&mut self, B) -> C".into()),
2540 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2541 new_text: "first_method($1)".to_string(),
2542 range: lsp::Range::new(
2543 lsp::Position::new(0, 14),
2544 lsp::Position::new(0, 14),
2545 ),
2546 })),
2547 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2548 ..Default::default()
2549 },
2550 lsp::CompletionItem {
2551 label: "second_method(…)".into(),
2552 detail: Some("fn(&mut self, C) -> D<E>".into()),
2553 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2554 new_text: "second_method()".to_string(),
2555 range: lsp::Range::new(
2556 lsp::Position::new(0, 14),
2557 lsp::Position::new(0, 14),
2558 ),
2559 })),
2560 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2561 ..Default::default()
2562 },
2563 ])))
2564 })
2565 .next()
2566 .await
2567 .unwrap();
2568 cx_a.foreground().finish_waiting();
2569
2570 // Open the buffer on the host.
2571 let buffer_a = project_a
2572 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2573 .await
2574 .unwrap();
2575 buffer_a
2576 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2577 .await;
2578
2579 // Confirm a completion on the guest.
2580 editor_b
2581 .condition(&cx_b, |editor, _| editor.context_menu_visible())
2582 .await;
2583 editor_b.update(cx_b, |editor, cx| {
2584 editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
2585 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2586 });
2587
2588 // Return a resolved completion from the host's language server.
2589 // The resolved completion has an additional text edit.
2590 fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
2591 |params, _| async move {
2592 assert_eq!(params.label, "first_method(…)");
2593 Ok(lsp::CompletionItem {
2594 label: "first_method(…)".into(),
2595 detail: Some("fn(&mut self, B) -> C".into()),
2596 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2597 new_text: "first_method($1)".to_string(),
2598 range: lsp::Range::new(
2599 lsp::Position::new(0, 14),
2600 lsp::Position::new(0, 14),
2601 ),
2602 })),
2603 additional_text_edits: Some(vec![lsp::TextEdit {
2604 new_text: "use d::SomeTrait;\n".to_string(),
2605 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2606 }]),
2607 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2608 ..Default::default()
2609 })
2610 },
2611 );
2612
2613 // The additional edit is applied.
2614 buffer_a
2615 .condition(&cx_a, |buffer, _| {
2616 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2617 })
2618 .await;
2619 buffer_b
2620 .condition(&cx_b, |buffer, _| {
2621 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2622 })
2623 .await;
2624 }
2625
2626 #[gpui::test(iterations = 10)]
2627 async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2628 cx_a.foreground().forbid_parking();
2629 let lang_registry = Arc::new(LanguageRegistry::test());
2630 let fs = FakeFs::new(cx_a.background());
2631
2632 // Connect to a server as 2 clients.
2633 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2634 let client_a = server.create_client(cx_a, "user_a").await;
2635 let client_b = server.create_client(cx_b, "user_b").await;
2636
2637 // Share a project as client A
2638 fs.insert_tree(
2639 "/a",
2640 json!({
2641 ".zed.toml": r#"collaborators = ["user_b"]"#,
2642 "a.rs": "let one = 1;",
2643 }),
2644 )
2645 .await;
2646 let project_a = cx_a.update(|cx| {
2647 Project::local(
2648 client_a.clone(),
2649 client_a.user_store.clone(),
2650 lang_registry.clone(),
2651 fs.clone(),
2652 cx,
2653 )
2654 });
2655 let (worktree_a, _) = project_a
2656 .update(cx_a, |p, cx| {
2657 p.find_or_create_local_worktree("/a", true, cx)
2658 })
2659 .await
2660 .unwrap();
2661 worktree_a
2662 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2663 .await;
2664 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2665 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2666 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2667 let buffer_a = project_a
2668 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
2669 .await
2670 .unwrap();
2671
2672 // Join the worktree as client B.
2673 let project_b = Project::remote(
2674 project_id,
2675 client_b.clone(),
2676 client_b.user_store.clone(),
2677 lang_registry.clone(),
2678 fs.clone(),
2679 &mut cx_b.to_async(),
2680 )
2681 .await
2682 .unwrap();
2683
2684 let buffer_b = cx_b
2685 .background()
2686 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2687 .await
2688 .unwrap();
2689 buffer_b.update(cx_b, |buffer, cx| {
2690 buffer.edit([4..7], "six", cx);
2691 buffer.edit([10..11], "6", cx);
2692 assert_eq!(buffer.text(), "let six = 6;");
2693 assert!(buffer.is_dirty());
2694 assert!(!buffer.has_conflict());
2695 });
2696 buffer_a
2697 .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
2698 .await;
2699
2700 fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
2701 .await
2702 .unwrap();
2703 buffer_a
2704 .condition(cx_a, |buffer, _| buffer.has_conflict())
2705 .await;
2706 buffer_b
2707 .condition(cx_b, |buffer, _| buffer.has_conflict())
2708 .await;
2709
2710 project_b
2711 .update(cx_b, |project, cx| {
2712 project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
2713 })
2714 .await
2715 .unwrap();
2716 buffer_a.read_with(cx_a, |buffer, _| {
2717 assert_eq!(buffer.text(), "let seven = 7;");
2718 assert!(!buffer.is_dirty());
2719 assert!(!buffer.has_conflict());
2720 });
2721 buffer_b.read_with(cx_b, |buffer, _| {
2722 assert_eq!(buffer.text(), "let seven = 7;");
2723 assert!(!buffer.is_dirty());
2724 assert!(!buffer.has_conflict());
2725 });
2726
2727 buffer_a.update(cx_a, |buffer, cx| {
2728 // Undoing on the host is a no-op when the reload was initiated by the guest.
2729 buffer.undo(cx);
2730 assert_eq!(buffer.text(), "let seven = 7;");
2731 assert!(!buffer.is_dirty());
2732 assert!(!buffer.has_conflict());
2733 });
2734 buffer_b.update(cx_b, |buffer, cx| {
2735 // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
2736 buffer.undo(cx);
2737 assert_eq!(buffer.text(), "let six = 6;");
2738 assert!(buffer.is_dirty());
2739 assert!(!buffer.has_conflict());
2740 });
2741 }
2742
2743 #[gpui::test(iterations = 10)]
2744 async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2745 cx_a.foreground().forbid_parking();
2746 let lang_registry = Arc::new(LanguageRegistry::test());
2747 let fs = FakeFs::new(cx_a.background());
2748
2749 // Set up a fake language server.
2750 let mut language = Language::new(
2751 LanguageConfig {
2752 name: "Rust".into(),
2753 path_suffixes: vec!["rs".to_string()],
2754 ..Default::default()
2755 },
2756 Some(tree_sitter_rust::language()),
2757 );
2758 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2759 lang_registry.add(Arc::new(language));
2760
2761 // Connect to a server as 2 clients.
2762 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2763 let client_a = server.create_client(cx_a, "user_a").await;
2764 let client_b = server.create_client(cx_b, "user_b").await;
2765
2766 // Share a project as client A
2767 fs.insert_tree(
2768 "/a",
2769 json!({
2770 ".zed.toml": r#"collaborators = ["user_b"]"#,
2771 "a.rs": "let one = two",
2772 }),
2773 )
2774 .await;
2775 let project_a = cx_a.update(|cx| {
2776 Project::local(
2777 client_a.clone(),
2778 client_a.user_store.clone(),
2779 lang_registry.clone(),
2780 fs.clone(),
2781 cx,
2782 )
2783 });
2784 let (worktree_a, _) = project_a
2785 .update(cx_a, |p, cx| {
2786 p.find_or_create_local_worktree("/a", true, cx)
2787 })
2788 .await
2789 .unwrap();
2790 worktree_a
2791 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2792 .await;
2793 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2794 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2795 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2796
2797 // Join the worktree as client B.
2798 let project_b = Project::remote(
2799 project_id,
2800 client_b.clone(),
2801 client_b.user_store.clone(),
2802 lang_registry.clone(),
2803 fs.clone(),
2804 &mut cx_b.to_async(),
2805 )
2806 .await
2807 .unwrap();
2808
2809 let buffer_b = cx_b
2810 .background()
2811 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2812 .await
2813 .unwrap();
2814
2815 let fake_language_server = fake_language_servers.next().await.unwrap();
2816 fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
2817 Ok(Some(vec![
2818 lsp::TextEdit {
2819 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2820 new_text: "h".to_string(),
2821 },
2822 lsp::TextEdit {
2823 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2824 new_text: "y".to_string(),
2825 },
2826 ]))
2827 });
2828
2829 project_b
2830 .update(cx_b, |project, cx| {
2831 project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2832 })
2833 .await
2834 .unwrap();
2835 assert_eq!(
2836 buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
2837 "let honey = two"
2838 );
2839 }
2840
2841 #[gpui::test(iterations = 10)]
2842 async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2843 cx_a.foreground().forbid_parking();
2844 let lang_registry = Arc::new(LanguageRegistry::test());
2845 let fs = FakeFs::new(cx_a.background());
2846 fs.insert_tree(
2847 "/root-1",
2848 json!({
2849 ".zed.toml": r#"collaborators = ["user_b"]"#,
2850 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2851 }),
2852 )
2853 .await;
2854 fs.insert_tree(
2855 "/root-2",
2856 json!({
2857 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2858 }),
2859 )
2860 .await;
2861
2862 // Set up a fake language server.
2863 let mut language = Language::new(
2864 LanguageConfig {
2865 name: "Rust".into(),
2866 path_suffixes: vec!["rs".to_string()],
2867 ..Default::default()
2868 },
2869 Some(tree_sitter_rust::language()),
2870 );
2871 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2872 lang_registry.add(Arc::new(language));
2873
2874 // Connect to a server as 2 clients.
2875 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2876 let client_a = server.create_client(cx_a, "user_a").await;
2877 let client_b = server.create_client(cx_b, "user_b").await;
2878
2879 // Share a project as client A
2880 let project_a = cx_a.update(|cx| {
2881 Project::local(
2882 client_a.clone(),
2883 client_a.user_store.clone(),
2884 lang_registry.clone(),
2885 fs.clone(),
2886 cx,
2887 )
2888 });
2889 let (worktree_a, _) = project_a
2890 .update(cx_a, |p, cx| {
2891 p.find_or_create_local_worktree("/root-1", true, cx)
2892 })
2893 .await
2894 .unwrap();
2895 worktree_a
2896 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2897 .await;
2898 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2899 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2900 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2901
2902 // Join the worktree as client B.
2903 let project_b = Project::remote(
2904 project_id,
2905 client_b.clone(),
2906 client_b.user_store.clone(),
2907 lang_registry.clone(),
2908 fs.clone(),
2909 &mut cx_b.to_async(),
2910 )
2911 .await
2912 .unwrap();
2913
2914 // Open the file on client B.
2915 let buffer_b = cx_b
2916 .background()
2917 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2918 .await
2919 .unwrap();
2920
2921 // Request the definition of a symbol as the guest.
2922 let fake_language_server = fake_language_servers.next().await.unwrap();
2923 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
2924 |_, _| async move {
2925 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
2926 lsp::Location::new(
2927 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2928 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2929 ),
2930 )))
2931 },
2932 );
2933
2934 let definitions_1 = project_b
2935 .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
2936 .await
2937 .unwrap();
2938 cx_b.read(|cx| {
2939 assert_eq!(definitions_1.len(), 1);
2940 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2941 let target_buffer = definitions_1[0].buffer.read(cx);
2942 assert_eq!(
2943 target_buffer.text(),
2944 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2945 );
2946 assert_eq!(
2947 definitions_1[0].range.to_point(target_buffer),
2948 Point::new(0, 6)..Point::new(0, 9)
2949 );
2950 });
2951
2952 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2953 // the previous call to `definition`.
2954 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
2955 |_, _| async move {
2956 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
2957 lsp::Location::new(
2958 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2959 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2960 ),
2961 )))
2962 },
2963 );
2964
2965 let definitions_2 = project_b
2966 .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
2967 .await
2968 .unwrap();
2969 cx_b.read(|cx| {
2970 assert_eq!(definitions_2.len(), 1);
2971 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2972 let target_buffer = definitions_2[0].buffer.read(cx);
2973 assert_eq!(
2974 target_buffer.text(),
2975 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2976 );
2977 assert_eq!(
2978 definitions_2[0].range.to_point(target_buffer),
2979 Point::new(1, 6)..Point::new(1, 11)
2980 );
2981 });
2982 assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
2983 }
2984
2985 #[gpui::test(iterations = 10)]
2986 async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2987 cx_a.foreground().forbid_parking();
2988 let lang_registry = Arc::new(LanguageRegistry::test());
2989 let fs = FakeFs::new(cx_a.background());
2990 fs.insert_tree(
2991 "/root-1",
2992 json!({
2993 ".zed.toml": r#"collaborators = ["user_b"]"#,
2994 "one.rs": "const ONE: usize = 1;",
2995 "two.rs": "const TWO: usize = one::ONE + one::ONE;",
2996 }),
2997 )
2998 .await;
2999 fs.insert_tree(
3000 "/root-2",
3001 json!({
3002 "three.rs": "const THREE: usize = two::TWO + one::ONE;",
3003 }),
3004 )
3005 .await;
3006
3007 // Set up a fake language server.
3008 let mut language = Language::new(
3009 LanguageConfig {
3010 name: "Rust".into(),
3011 path_suffixes: vec!["rs".to_string()],
3012 ..Default::default()
3013 },
3014 Some(tree_sitter_rust::language()),
3015 );
3016 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3017 lang_registry.add(Arc::new(language));
3018
3019 // Connect to a server as 2 clients.
3020 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3021 let client_a = server.create_client(cx_a, "user_a").await;
3022 let client_b = server.create_client(cx_b, "user_b").await;
3023
3024 // Share a project as client A
3025 let project_a = cx_a.update(|cx| {
3026 Project::local(
3027 client_a.clone(),
3028 client_a.user_store.clone(),
3029 lang_registry.clone(),
3030 fs.clone(),
3031 cx,
3032 )
3033 });
3034 let (worktree_a, _) = project_a
3035 .update(cx_a, |p, cx| {
3036 p.find_or_create_local_worktree("/root-1", true, cx)
3037 })
3038 .await
3039 .unwrap();
3040 worktree_a
3041 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3042 .await;
3043 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3044 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3045 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3046
3047 // Join the worktree as client B.
3048 let project_b = Project::remote(
3049 project_id,
3050 client_b.clone(),
3051 client_b.user_store.clone(),
3052 lang_registry.clone(),
3053 fs.clone(),
3054 &mut cx_b.to_async(),
3055 )
3056 .await
3057 .unwrap();
3058
3059 // Open the file on client B.
3060 let buffer_b = cx_b
3061 .background()
3062 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3063 .await
3064 .unwrap();
3065
3066 // Request references to a symbol as the guest.
3067 let fake_language_server = fake_language_servers.next().await.unwrap();
3068 fake_language_server.handle_request::<lsp::request::References, _, _>(
3069 |params, _| async move {
3070 assert_eq!(
3071 params.text_document_position.text_document.uri.as_str(),
3072 "file:///root-1/one.rs"
3073 );
3074 Ok(Some(vec![
3075 lsp::Location {
3076 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3077 range: lsp::Range::new(
3078 lsp::Position::new(0, 24),
3079 lsp::Position::new(0, 27),
3080 ),
3081 },
3082 lsp::Location {
3083 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3084 range: lsp::Range::new(
3085 lsp::Position::new(0, 35),
3086 lsp::Position::new(0, 38),
3087 ),
3088 },
3089 lsp::Location {
3090 uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3091 range: lsp::Range::new(
3092 lsp::Position::new(0, 37),
3093 lsp::Position::new(0, 40),
3094 ),
3095 },
3096 ]))
3097 },
3098 );
3099
3100 let references = project_b
3101 .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3102 .await
3103 .unwrap();
3104 cx_b.read(|cx| {
3105 assert_eq!(references.len(), 3);
3106 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3107
3108 let two_buffer = references[0].buffer.read(cx);
3109 let three_buffer = references[2].buffer.read(cx);
3110 assert_eq!(
3111 two_buffer.file().unwrap().path().as_ref(),
3112 Path::new("two.rs")
3113 );
3114 assert_eq!(references[1].buffer, references[0].buffer);
3115 assert_eq!(
3116 three_buffer.file().unwrap().full_path(cx),
3117 Path::new("three.rs")
3118 );
3119
3120 assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3121 assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3122 assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3123 });
3124 }
3125
3126 #[gpui::test(iterations = 10)]
3127 async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3128 cx_a.foreground().forbid_parking();
3129 let lang_registry = Arc::new(LanguageRegistry::test());
3130 let fs = FakeFs::new(cx_a.background());
3131 fs.insert_tree(
3132 "/root-1",
3133 json!({
3134 ".zed.toml": r#"collaborators = ["user_b"]"#,
3135 "a": "hello world",
3136 "b": "goodnight moon",
3137 "c": "a world of goo",
3138 "d": "world champion of clown world",
3139 }),
3140 )
3141 .await;
3142 fs.insert_tree(
3143 "/root-2",
3144 json!({
3145 "e": "disney world is fun",
3146 }),
3147 )
3148 .await;
3149
3150 // Connect to a server as 2 clients.
3151 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3152 let client_a = server.create_client(cx_a, "user_a").await;
3153 let client_b = server.create_client(cx_b, "user_b").await;
3154
3155 // Share a project as client A
3156 let project_a = cx_a.update(|cx| {
3157 Project::local(
3158 client_a.clone(),
3159 client_a.user_store.clone(),
3160 lang_registry.clone(),
3161 fs.clone(),
3162 cx,
3163 )
3164 });
3165 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3166
3167 let (worktree_1, _) = project_a
3168 .update(cx_a, |p, cx| {
3169 p.find_or_create_local_worktree("/root-1", true, cx)
3170 })
3171 .await
3172 .unwrap();
3173 worktree_1
3174 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3175 .await;
3176 let (worktree_2, _) = project_a
3177 .update(cx_a, |p, cx| {
3178 p.find_or_create_local_worktree("/root-2", true, cx)
3179 })
3180 .await
3181 .unwrap();
3182 worktree_2
3183 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3184 .await;
3185
3186 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3187
3188 // Join the worktree as client B.
3189 let project_b = Project::remote(
3190 project_id,
3191 client_b.clone(),
3192 client_b.user_store.clone(),
3193 lang_registry.clone(),
3194 fs.clone(),
3195 &mut cx_b.to_async(),
3196 )
3197 .await
3198 .unwrap();
3199
3200 let results = project_b
3201 .update(cx_b, |project, cx| {
3202 project.search(SearchQuery::text("world", false, false), cx)
3203 })
3204 .await
3205 .unwrap();
3206
3207 let mut ranges_by_path = results
3208 .into_iter()
3209 .map(|(buffer, ranges)| {
3210 buffer.read_with(cx_b, |buffer, cx| {
3211 let path = buffer.file().unwrap().full_path(cx);
3212 let offset_ranges = ranges
3213 .into_iter()
3214 .map(|range| range.to_offset(buffer))
3215 .collect::<Vec<_>>();
3216 (path, offset_ranges)
3217 })
3218 })
3219 .collect::<Vec<_>>();
3220 ranges_by_path.sort_by_key(|(path, _)| path.clone());
3221
3222 assert_eq!(
3223 ranges_by_path,
3224 &[
3225 (PathBuf::from("root-1/a"), vec![6..11]),
3226 (PathBuf::from("root-1/c"), vec![2..7]),
3227 (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3228 (PathBuf::from("root-2/e"), vec![7..12]),
3229 ]
3230 );
3231 }
3232
3233 #[gpui::test(iterations = 10)]
3234 async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3235 cx_a.foreground().forbid_parking();
3236 let lang_registry = Arc::new(LanguageRegistry::test());
3237 let fs = FakeFs::new(cx_a.background());
3238 fs.insert_tree(
3239 "/root-1",
3240 json!({
3241 ".zed.toml": r#"collaborators = ["user_b"]"#,
3242 "main.rs": "fn double(number: i32) -> i32 { number + number }",
3243 }),
3244 )
3245 .await;
3246
3247 // Set up a fake language server.
3248 let mut language = Language::new(
3249 LanguageConfig {
3250 name: "Rust".into(),
3251 path_suffixes: vec!["rs".to_string()],
3252 ..Default::default()
3253 },
3254 Some(tree_sitter_rust::language()),
3255 );
3256 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3257 lang_registry.add(Arc::new(language));
3258
3259 // Connect to a server as 2 clients.
3260 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3261 let client_a = server.create_client(cx_a, "user_a").await;
3262 let client_b = server.create_client(cx_b, "user_b").await;
3263
3264 // Share a project as client A
3265 let project_a = cx_a.update(|cx| {
3266 Project::local(
3267 client_a.clone(),
3268 client_a.user_store.clone(),
3269 lang_registry.clone(),
3270 fs.clone(),
3271 cx,
3272 )
3273 });
3274 let (worktree_a, _) = project_a
3275 .update(cx_a, |p, cx| {
3276 p.find_or_create_local_worktree("/root-1", true, cx)
3277 })
3278 .await
3279 .unwrap();
3280 worktree_a
3281 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3282 .await;
3283 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3284 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3285 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3286
3287 // Join the worktree as client B.
3288 let project_b = Project::remote(
3289 project_id,
3290 client_b.clone(),
3291 client_b.user_store.clone(),
3292 lang_registry.clone(),
3293 fs.clone(),
3294 &mut cx_b.to_async(),
3295 )
3296 .await
3297 .unwrap();
3298
3299 // Open the file on client B.
3300 let buffer_b = cx_b
3301 .background()
3302 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
3303 .await
3304 .unwrap();
3305
3306 // Request document highlights as the guest.
3307 let fake_language_server = fake_language_servers.next().await.unwrap();
3308 fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
3309 |params, _| async move {
3310 assert_eq!(
3311 params
3312 .text_document_position_params
3313 .text_document
3314 .uri
3315 .as_str(),
3316 "file:///root-1/main.rs"
3317 );
3318 assert_eq!(
3319 params.text_document_position_params.position,
3320 lsp::Position::new(0, 34)
3321 );
3322 Ok(Some(vec![
3323 lsp::DocumentHighlight {
3324 kind: Some(lsp::DocumentHighlightKind::WRITE),
3325 range: lsp::Range::new(
3326 lsp::Position::new(0, 10),
3327 lsp::Position::new(0, 16),
3328 ),
3329 },
3330 lsp::DocumentHighlight {
3331 kind: Some(lsp::DocumentHighlightKind::READ),
3332 range: lsp::Range::new(
3333 lsp::Position::new(0, 32),
3334 lsp::Position::new(0, 38),
3335 ),
3336 },
3337 lsp::DocumentHighlight {
3338 kind: Some(lsp::DocumentHighlightKind::READ),
3339 range: lsp::Range::new(
3340 lsp::Position::new(0, 41),
3341 lsp::Position::new(0, 47),
3342 ),
3343 },
3344 ]))
3345 },
3346 );
3347
3348 let highlights = project_b
3349 .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
3350 .await
3351 .unwrap();
3352 buffer_b.read_with(cx_b, |buffer, _| {
3353 let snapshot = buffer.snapshot();
3354
3355 let highlights = highlights
3356 .into_iter()
3357 .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3358 .collect::<Vec<_>>();
3359 assert_eq!(
3360 highlights,
3361 &[
3362 (lsp::DocumentHighlightKind::WRITE, 10..16),
3363 (lsp::DocumentHighlightKind::READ, 32..38),
3364 (lsp::DocumentHighlightKind::READ, 41..47)
3365 ]
3366 )
3367 });
3368 }
3369
3370 #[gpui::test(iterations = 10)]
3371 async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3372 cx_a.foreground().forbid_parking();
3373 let lang_registry = Arc::new(LanguageRegistry::test());
3374 let fs = FakeFs::new(cx_a.background());
3375 fs.insert_tree(
3376 "/code",
3377 json!({
3378 "crate-1": {
3379 ".zed.toml": r#"collaborators = ["user_b"]"#,
3380 "one.rs": "const ONE: usize = 1;",
3381 },
3382 "crate-2": {
3383 "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3384 },
3385 "private": {
3386 "passwords.txt": "the-password",
3387 }
3388 }),
3389 )
3390 .await;
3391
3392 // Set up a fake language server.
3393 let mut language = Language::new(
3394 LanguageConfig {
3395 name: "Rust".into(),
3396 path_suffixes: vec!["rs".to_string()],
3397 ..Default::default()
3398 },
3399 Some(tree_sitter_rust::language()),
3400 );
3401 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3402 lang_registry.add(Arc::new(language));
3403
3404 // Connect to a server as 2 clients.
3405 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3406 let client_a = server.create_client(cx_a, "user_a").await;
3407 let client_b = server.create_client(cx_b, "user_b").await;
3408
3409 // Share a project as client A
3410 let project_a = cx_a.update(|cx| {
3411 Project::local(
3412 client_a.clone(),
3413 client_a.user_store.clone(),
3414 lang_registry.clone(),
3415 fs.clone(),
3416 cx,
3417 )
3418 });
3419 let (worktree_a, _) = project_a
3420 .update(cx_a, |p, cx| {
3421 p.find_or_create_local_worktree("/code/crate-1", true, cx)
3422 })
3423 .await
3424 .unwrap();
3425 worktree_a
3426 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3427 .await;
3428 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3429 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3430 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3431
3432 // Join the worktree as client B.
3433 let project_b = Project::remote(
3434 project_id,
3435 client_b.clone(),
3436 client_b.user_store.clone(),
3437 lang_registry.clone(),
3438 fs.clone(),
3439 &mut cx_b.to_async(),
3440 )
3441 .await
3442 .unwrap();
3443
3444 // Cause the language server to start.
3445 let _buffer = cx_b
3446 .background()
3447 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3448 .await
3449 .unwrap();
3450
3451 let fake_language_server = fake_language_servers.next().await.unwrap();
3452 fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
3453 |_, _| async move {
3454 #[allow(deprecated)]
3455 Ok(Some(vec![lsp::SymbolInformation {
3456 name: "TWO".into(),
3457 location: lsp::Location {
3458 uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3459 range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3460 },
3461 kind: lsp::SymbolKind::CONSTANT,
3462 tags: None,
3463 container_name: None,
3464 deprecated: None,
3465 }]))
3466 },
3467 );
3468
3469 // Request the definition of a symbol as the guest.
3470 let symbols = project_b
3471 .update(cx_b, |p, cx| p.symbols("two", cx))
3472 .await
3473 .unwrap();
3474 assert_eq!(symbols.len(), 1);
3475 assert_eq!(symbols[0].name, "TWO");
3476
3477 // Open one of the returned symbols.
3478 let buffer_b_2 = project_b
3479 .update(cx_b, |project, cx| {
3480 project.open_buffer_for_symbol(&symbols[0], cx)
3481 })
3482 .await
3483 .unwrap();
3484 buffer_b_2.read_with(cx_b, |buffer, _| {
3485 assert_eq!(
3486 buffer.file().unwrap().path().as_ref(),
3487 Path::new("../crate-2/two.rs")
3488 );
3489 });
3490
3491 // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3492 let mut fake_symbol = symbols[0].clone();
3493 fake_symbol.path = Path::new("/code/secrets").into();
3494 let error = project_b
3495 .update(cx_b, |project, cx| {
3496 project.open_buffer_for_symbol(&fake_symbol, cx)
3497 })
3498 .await
3499 .unwrap_err();
3500 assert!(error.to_string().contains("invalid symbol signature"));
3501 }
3502
3503 #[gpui::test(iterations = 10)]
3504 async fn test_open_buffer_while_getting_definition_pointing_to_it(
3505 cx_a: &mut TestAppContext,
3506 cx_b: &mut TestAppContext,
3507 mut rng: StdRng,
3508 ) {
3509 cx_a.foreground().forbid_parking();
3510 let lang_registry = Arc::new(LanguageRegistry::test());
3511 let fs = FakeFs::new(cx_a.background());
3512 fs.insert_tree(
3513 "/root",
3514 json!({
3515 ".zed.toml": r#"collaborators = ["user_b"]"#,
3516 "a.rs": "const ONE: usize = b::TWO;",
3517 "b.rs": "const TWO: usize = 2",
3518 }),
3519 )
3520 .await;
3521
3522 // Set up a fake language server.
3523 let mut language = Language::new(
3524 LanguageConfig {
3525 name: "Rust".into(),
3526 path_suffixes: vec!["rs".to_string()],
3527 ..Default::default()
3528 },
3529 Some(tree_sitter_rust::language()),
3530 );
3531 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3532 lang_registry.add(Arc::new(language));
3533
3534 // Connect to a server as 2 clients.
3535 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3536 let client_a = server.create_client(cx_a, "user_a").await;
3537 let client_b = server.create_client(cx_b, "user_b").await;
3538
3539 // Share a project as client A
3540 let project_a = cx_a.update(|cx| {
3541 Project::local(
3542 client_a.clone(),
3543 client_a.user_store.clone(),
3544 lang_registry.clone(),
3545 fs.clone(),
3546 cx,
3547 )
3548 });
3549
3550 let (worktree_a, _) = project_a
3551 .update(cx_a, |p, cx| {
3552 p.find_or_create_local_worktree("/root", true, cx)
3553 })
3554 .await
3555 .unwrap();
3556 worktree_a
3557 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3558 .await;
3559 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3560 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3561 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3562
3563 // Join the worktree as client B.
3564 let project_b = Project::remote(
3565 project_id,
3566 client_b.clone(),
3567 client_b.user_store.clone(),
3568 lang_registry.clone(),
3569 fs.clone(),
3570 &mut cx_b.to_async(),
3571 )
3572 .await
3573 .unwrap();
3574
3575 let buffer_b1 = cx_b
3576 .background()
3577 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3578 .await
3579 .unwrap();
3580
3581 let fake_language_server = fake_language_servers.next().await.unwrap();
3582 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3583 |_, _| async move {
3584 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3585 lsp::Location::new(
3586 lsp::Url::from_file_path("/root/b.rs").unwrap(),
3587 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3588 ),
3589 )))
3590 },
3591 );
3592
3593 let definitions;
3594 let buffer_b2;
3595 if rng.gen() {
3596 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3597 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3598 } else {
3599 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3600 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3601 }
3602
3603 let buffer_b2 = buffer_b2.await.unwrap();
3604 let definitions = definitions.await.unwrap();
3605 assert_eq!(definitions.len(), 1);
3606 assert_eq!(definitions[0].buffer, buffer_b2);
3607 }
3608
3609 #[gpui::test(iterations = 10)]
3610 async fn test_collaborating_with_code_actions(
3611 cx_a: &mut TestAppContext,
3612 cx_b: &mut TestAppContext,
3613 ) {
3614 cx_a.foreground().forbid_parking();
3615 let lang_registry = Arc::new(LanguageRegistry::test());
3616 let fs = FakeFs::new(cx_a.background());
3617 cx_b.update(|cx| editor::init(cx));
3618
3619 // Set up a fake language server.
3620 let mut language = Language::new(
3621 LanguageConfig {
3622 name: "Rust".into(),
3623 path_suffixes: vec!["rs".to_string()],
3624 ..Default::default()
3625 },
3626 Some(tree_sitter_rust::language()),
3627 );
3628 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3629 lang_registry.add(Arc::new(language));
3630
3631 // Connect to a server as 2 clients.
3632 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3633 let client_a = server.create_client(cx_a, "user_a").await;
3634 let client_b = server.create_client(cx_b, "user_b").await;
3635
3636 // Share a project as client A
3637 fs.insert_tree(
3638 "/a",
3639 json!({
3640 ".zed.toml": r#"collaborators = ["user_b"]"#,
3641 "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3642 "other.rs": "pub fn foo() -> usize { 4 }",
3643 }),
3644 )
3645 .await;
3646 let project_a = cx_a.update(|cx| {
3647 Project::local(
3648 client_a.clone(),
3649 client_a.user_store.clone(),
3650 lang_registry.clone(),
3651 fs.clone(),
3652 cx,
3653 )
3654 });
3655 let (worktree_a, _) = project_a
3656 .update(cx_a, |p, cx| {
3657 p.find_or_create_local_worktree("/a", true, cx)
3658 })
3659 .await
3660 .unwrap();
3661 worktree_a
3662 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3663 .await;
3664 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3665 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3666 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3667
3668 // Join the worktree as client B.
3669 let project_b = Project::remote(
3670 project_id,
3671 client_b.clone(),
3672 client_b.user_store.clone(),
3673 lang_registry.clone(),
3674 fs.clone(),
3675 &mut cx_b.to_async(),
3676 )
3677 .await
3678 .unwrap();
3679 let mut params = cx_b.update(WorkspaceParams::test);
3680 params.languages = lang_registry.clone();
3681 params.client = client_b.client.clone();
3682 params.user_store = client_b.user_store.clone();
3683 params.project = project_b;
3684
3685 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
3686 let editor_b = workspace_b
3687 .update(cx_b, |workspace, cx| {
3688 workspace.open_path((worktree_id, "main.rs"), cx)
3689 })
3690 .await
3691 .unwrap()
3692 .downcast::<Editor>()
3693 .unwrap();
3694
3695 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3696 fake_language_server
3697 .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
3698 assert_eq!(
3699 params.text_document.uri,
3700 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3701 );
3702 assert_eq!(params.range.start, lsp::Position::new(0, 0));
3703 assert_eq!(params.range.end, lsp::Position::new(0, 0));
3704 Ok(None)
3705 })
3706 .next()
3707 .await;
3708
3709 // Move cursor to a location that contains code actions.
3710 editor_b.update(cx_b, |editor, cx| {
3711 editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3712 cx.focus(&editor_b);
3713 });
3714
3715 fake_language_server
3716 .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
3717 assert_eq!(
3718 params.text_document.uri,
3719 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3720 );
3721 assert_eq!(params.range.start, lsp::Position::new(1, 31));
3722 assert_eq!(params.range.end, lsp::Position::new(1, 31));
3723
3724 Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
3725 lsp::CodeAction {
3726 title: "Inline into all callers".to_string(),
3727 edit: Some(lsp::WorkspaceEdit {
3728 changes: Some(
3729 [
3730 (
3731 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3732 vec![lsp::TextEdit::new(
3733 lsp::Range::new(
3734 lsp::Position::new(1, 22),
3735 lsp::Position::new(1, 34),
3736 ),
3737 "4".to_string(),
3738 )],
3739 ),
3740 (
3741 lsp::Url::from_file_path("/a/other.rs").unwrap(),
3742 vec![lsp::TextEdit::new(
3743 lsp::Range::new(
3744 lsp::Position::new(0, 0),
3745 lsp::Position::new(0, 27),
3746 ),
3747 "".to_string(),
3748 )],
3749 ),
3750 ]
3751 .into_iter()
3752 .collect(),
3753 ),
3754 ..Default::default()
3755 }),
3756 data: Some(json!({
3757 "codeActionParams": {
3758 "range": {
3759 "start": {"line": 1, "column": 31},
3760 "end": {"line": 1, "column": 31},
3761 }
3762 }
3763 })),
3764 ..Default::default()
3765 },
3766 )]))
3767 })
3768 .next()
3769 .await;
3770
3771 // Toggle code actions and wait for them to display.
3772 editor_b.update(cx_b, |editor, cx| {
3773 editor.toggle_code_actions(
3774 &ToggleCodeActions {
3775 deployed_from_indicator: false,
3776 },
3777 cx,
3778 );
3779 });
3780 editor_b
3781 .condition(&cx_b, |editor, _| editor.context_menu_visible())
3782 .await;
3783
3784 fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3785
3786 // Confirming the code action will trigger a resolve request.
3787 let confirm_action = workspace_b
3788 .update(cx_b, |workspace, cx| {
3789 Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
3790 })
3791 .unwrap();
3792 fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
3793 |_, _| async move {
3794 Ok(lsp::CodeAction {
3795 title: "Inline into all callers".to_string(),
3796 edit: Some(lsp::WorkspaceEdit {
3797 changes: Some(
3798 [
3799 (
3800 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3801 vec![lsp::TextEdit::new(
3802 lsp::Range::new(
3803 lsp::Position::new(1, 22),
3804 lsp::Position::new(1, 34),
3805 ),
3806 "4".to_string(),
3807 )],
3808 ),
3809 (
3810 lsp::Url::from_file_path("/a/other.rs").unwrap(),
3811 vec![lsp::TextEdit::new(
3812 lsp::Range::new(
3813 lsp::Position::new(0, 0),
3814 lsp::Position::new(0, 27),
3815 ),
3816 "".to_string(),
3817 )],
3818 ),
3819 ]
3820 .into_iter()
3821 .collect(),
3822 ),
3823 ..Default::default()
3824 }),
3825 ..Default::default()
3826 })
3827 },
3828 );
3829
3830 // After the action is confirmed, an editor containing both modified files is opened.
3831 confirm_action.await.unwrap();
3832 let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3833 workspace
3834 .active_item(cx)
3835 .unwrap()
3836 .downcast::<Editor>()
3837 .unwrap()
3838 });
3839 code_action_editor.update(cx_b, |editor, cx| {
3840 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3841 editor.undo(&Undo, cx);
3842 assert_eq!(
3843 editor.text(cx),
3844 "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3845 );
3846 editor.redo(&Redo, cx);
3847 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3848 });
3849 }
3850
3851 #[gpui::test(iterations = 10)]
3852 async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3853 cx_a.foreground().forbid_parking();
3854 let lang_registry = Arc::new(LanguageRegistry::test());
3855 let fs = FakeFs::new(cx_a.background());
3856 cx_b.update(|cx| editor::init(cx));
3857
3858 // Set up a fake language server.
3859 let mut language = Language::new(
3860 LanguageConfig {
3861 name: "Rust".into(),
3862 path_suffixes: vec!["rs".to_string()],
3863 ..Default::default()
3864 },
3865 Some(tree_sitter_rust::language()),
3866 );
3867 let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
3868 capabilities: lsp::ServerCapabilities {
3869 rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
3870 prepare_provider: Some(true),
3871 work_done_progress_options: Default::default(),
3872 })),
3873 ..Default::default()
3874 },
3875 ..Default::default()
3876 });
3877 lang_registry.add(Arc::new(language));
3878
3879 // Connect to a server as 2 clients.
3880 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3881 let client_a = server.create_client(cx_a, "user_a").await;
3882 let client_b = server.create_client(cx_b, "user_b").await;
3883
3884 // Share a project as client A
3885 fs.insert_tree(
3886 "/dir",
3887 json!({
3888 ".zed.toml": r#"collaborators = ["user_b"]"#,
3889 "one.rs": "const ONE: usize = 1;",
3890 "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3891 }),
3892 )
3893 .await;
3894 let project_a = cx_a.update(|cx| {
3895 Project::local(
3896 client_a.clone(),
3897 client_a.user_store.clone(),
3898 lang_registry.clone(),
3899 fs.clone(),
3900 cx,
3901 )
3902 });
3903 let (worktree_a, _) = project_a
3904 .update(cx_a, |p, cx| {
3905 p.find_or_create_local_worktree("/dir", true, cx)
3906 })
3907 .await
3908 .unwrap();
3909 worktree_a
3910 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3911 .await;
3912 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3913 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3914 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3915
3916 // Join the worktree as client B.
3917 let project_b = Project::remote(
3918 project_id,
3919 client_b.clone(),
3920 client_b.user_store.clone(),
3921 lang_registry.clone(),
3922 fs.clone(),
3923 &mut cx_b.to_async(),
3924 )
3925 .await
3926 .unwrap();
3927 let mut params = cx_b.update(WorkspaceParams::test);
3928 params.languages = lang_registry.clone();
3929 params.client = client_b.client.clone();
3930 params.user_store = client_b.user_store.clone();
3931 params.project = project_b;
3932
3933 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
3934 let editor_b = workspace_b
3935 .update(cx_b, |workspace, cx| {
3936 workspace.open_path((worktree_id, "one.rs"), cx)
3937 })
3938 .await
3939 .unwrap()
3940 .downcast::<Editor>()
3941 .unwrap();
3942 let fake_language_server = fake_language_servers.next().await.unwrap();
3943
3944 // Move cursor to a location that can be renamed.
3945 let prepare_rename = editor_b.update(cx_b, |editor, cx| {
3946 editor.select_ranges([7..7], None, cx);
3947 editor.rename(&Rename, cx).unwrap()
3948 });
3949
3950 fake_language_server
3951 .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
3952 assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3953 assert_eq!(params.position, lsp::Position::new(0, 7));
3954 Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3955 lsp::Position::new(0, 6),
3956 lsp::Position::new(0, 9),
3957 ))))
3958 })
3959 .next()
3960 .await
3961 .unwrap();
3962 prepare_rename.await.unwrap();
3963 editor_b.update(cx_b, |editor, cx| {
3964 let rename = editor.pending_rename().unwrap();
3965 let buffer = editor.buffer().read(cx).snapshot(cx);
3966 assert_eq!(
3967 rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3968 6..9
3969 );
3970 rename.editor.update(cx, |rename_editor, cx| {
3971 rename_editor.buffer().update(cx, |rename_buffer, cx| {
3972 rename_buffer.edit([0..3], "THREE", cx);
3973 });
3974 });
3975 });
3976
3977 let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
3978 Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3979 });
3980 fake_language_server
3981 .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
3982 assert_eq!(
3983 params.text_document_position.text_document.uri.as_str(),
3984 "file:///dir/one.rs"
3985 );
3986 assert_eq!(
3987 params.text_document_position.position,
3988 lsp::Position::new(0, 6)
3989 );
3990 assert_eq!(params.new_name, "THREE");
3991 Ok(Some(lsp::WorkspaceEdit {
3992 changes: Some(
3993 [
3994 (
3995 lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3996 vec![lsp::TextEdit::new(
3997 lsp::Range::new(
3998 lsp::Position::new(0, 6),
3999 lsp::Position::new(0, 9),
4000 ),
4001 "THREE".to_string(),
4002 )],
4003 ),
4004 (
4005 lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4006 vec![
4007 lsp::TextEdit::new(
4008 lsp::Range::new(
4009 lsp::Position::new(0, 24),
4010 lsp::Position::new(0, 27),
4011 ),
4012 "THREE".to_string(),
4013 ),
4014 lsp::TextEdit::new(
4015 lsp::Range::new(
4016 lsp::Position::new(0, 35),
4017 lsp::Position::new(0, 38),
4018 ),
4019 "THREE".to_string(),
4020 ),
4021 ],
4022 ),
4023 ]
4024 .into_iter()
4025 .collect(),
4026 ),
4027 ..Default::default()
4028 }))
4029 })
4030 .next()
4031 .await
4032 .unwrap();
4033 confirm_rename.await.unwrap();
4034
4035 let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4036 workspace
4037 .active_item(cx)
4038 .unwrap()
4039 .downcast::<Editor>()
4040 .unwrap()
4041 });
4042 rename_editor.update(cx_b, |editor, cx| {
4043 assert_eq!(
4044 editor.text(cx),
4045 "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
4046 );
4047 editor.undo(&Undo, cx);
4048 assert_eq!(
4049 editor.text(cx),
4050 "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
4051 );
4052 editor.redo(&Redo, cx);
4053 assert_eq!(
4054 editor.text(cx),
4055 "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
4056 );
4057 });
4058
4059 // Ensure temporary rename edits cannot be undone/redone.
4060 editor_b.update(cx_b, |editor, cx| {
4061 editor.undo(&Undo, cx);
4062 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4063 editor.undo(&Undo, cx);
4064 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4065 editor.redo(&Redo, cx);
4066 assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4067 })
4068 }
4069
4070 #[gpui::test(iterations = 10)]
4071 async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4072 cx_a.foreground().forbid_parking();
4073
4074 // Connect to a server as 2 clients.
4075 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4076 let client_a = server.create_client(cx_a, "user_a").await;
4077 let client_b = server.create_client(cx_b, "user_b").await;
4078
4079 // Create an org that includes these 2 users.
4080 let db = &server.app_state.db;
4081 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4082 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4083 .await
4084 .unwrap();
4085 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4086 .await
4087 .unwrap();
4088
4089 // Create a channel that includes all the users.
4090 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4091 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4092 .await
4093 .unwrap();
4094 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4095 .await
4096 .unwrap();
4097 db.create_channel_message(
4098 channel_id,
4099 client_b.current_user_id(&cx_b),
4100 "hello A, it's B.",
4101 OffsetDateTime::now_utc(),
4102 1,
4103 )
4104 .await
4105 .unwrap();
4106
4107 let channels_a = cx_a
4108 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4109 channels_a
4110 .condition(cx_a, |list, _| list.available_channels().is_some())
4111 .await;
4112 channels_a.read_with(cx_a, |list, _| {
4113 assert_eq!(
4114 list.available_channels().unwrap(),
4115 &[ChannelDetails {
4116 id: channel_id.to_proto(),
4117 name: "test-channel".to_string()
4118 }]
4119 )
4120 });
4121 let channel_a = channels_a.update(cx_a, |this, cx| {
4122 this.get_channel(channel_id.to_proto(), cx).unwrap()
4123 });
4124 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4125 channel_a
4126 .condition(&cx_a, |channel, _| {
4127 channel_messages(channel)
4128 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4129 })
4130 .await;
4131
4132 let channels_b = cx_b
4133 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4134 channels_b
4135 .condition(cx_b, |list, _| list.available_channels().is_some())
4136 .await;
4137 channels_b.read_with(cx_b, |list, _| {
4138 assert_eq!(
4139 list.available_channels().unwrap(),
4140 &[ChannelDetails {
4141 id: channel_id.to_proto(),
4142 name: "test-channel".to_string()
4143 }]
4144 )
4145 });
4146
4147 let channel_b = channels_b.update(cx_b, |this, cx| {
4148 this.get_channel(channel_id.to_proto(), cx).unwrap()
4149 });
4150 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4151 channel_b
4152 .condition(&cx_b, |channel, _| {
4153 channel_messages(channel)
4154 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4155 })
4156 .await;
4157
4158 channel_a
4159 .update(cx_a, |channel, cx| {
4160 channel
4161 .send_message("oh, hi B.".to_string(), cx)
4162 .unwrap()
4163 .detach();
4164 let task = channel.send_message("sup".to_string(), cx).unwrap();
4165 assert_eq!(
4166 channel_messages(channel),
4167 &[
4168 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4169 ("user_a".to_string(), "oh, hi B.".to_string(), true),
4170 ("user_a".to_string(), "sup".to_string(), true)
4171 ]
4172 );
4173 task
4174 })
4175 .await
4176 .unwrap();
4177
4178 channel_b
4179 .condition(&cx_b, |channel, _| {
4180 channel_messages(channel)
4181 == [
4182 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4183 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4184 ("user_a".to_string(), "sup".to_string(), false),
4185 ]
4186 })
4187 .await;
4188
4189 assert_eq!(
4190 server
4191 .state()
4192 .await
4193 .channel(channel_id)
4194 .unwrap()
4195 .connection_ids
4196 .len(),
4197 2
4198 );
4199 cx_b.update(|_| drop(channel_b));
4200 server
4201 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4202 .await;
4203
4204 cx_a.update(|_| drop(channel_a));
4205 server
4206 .condition(|state| state.channel(channel_id).is_none())
4207 .await;
4208 }
4209
4210 #[gpui::test(iterations = 10)]
4211 async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4212 cx_a.foreground().forbid_parking();
4213
4214 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4215 let client_a = server.create_client(cx_a, "user_a").await;
4216
4217 let db = &server.app_state.db;
4218 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4219 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4220 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4221 .await
4222 .unwrap();
4223 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4224 .await
4225 .unwrap();
4226
4227 let channels_a = cx_a
4228 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4229 channels_a
4230 .condition(cx_a, |list, _| list.available_channels().is_some())
4231 .await;
4232 let channel_a = channels_a.update(cx_a, |this, cx| {
4233 this.get_channel(channel_id.to_proto(), cx).unwrap()
4234 });
4235
4236 // Messages aren't allowed to be too long.
4237 channel_a
4238 .update(cx_a, |channel, cx| {
4239 let long_body = "this is long.\n".repeat(1024);
4240 channel.send_message(long_body, cx).unwrap()
4241 })
4242 .await
4243 .unwrap_err();
4244
4245 // Messages aren't allowed to be blank.
4246 channel_a.update(cx_a, |channel, cx| {
4247 channel.send_message(String::new(), cx).unwrap_err()
4248 });
4249
4250 // Leading and trailing whitespace are trimmed.
4251 channel_a
4252 .update(cx_a, |channel, cx| {
4253 channel
4254 .send_message("\n surrounded by whitespace \n".to_string(), cx)
4255 .unwrap()
4256 })
4257 .await
4258 .unwrap();
4259 assert_eq!(
4260 db.get_channel_messages(channel_id, 10, None)
4261 .await
4262 .unwrap()
4263 .iter()
4264 .map(|m| &m.body)
4265 .collect::<Vec<_>>(),
4266 &["surrounded by whitespace"]
4267 );
4268 }
4269
4270 #[gpui::test(iterations = 10)]
4271 async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4272 cx_a.foreground().forbid_parking();
4273
4274 // Connect to a server as 2 clients.
4275 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4276 let client_a = server.create_client(cx_a, "user_a").await;
4277 let client_b = server.create_client(cx_b, "user_b").await;
4278 let mut status_b = client_b.status();
4279
4280 // Create an org that includes these 2 users.
4281 let db = &server.app_state.db;
4282 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4283 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4284 .await
4285 .unwrap();
4286 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4287 .await
4288 .unwrap();
4289
4290 // Create a channel that includes all the users.
4291 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4292 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4293 .await
4294 .unwrap();
4295 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4296 .await
4297 .unwrap();
4298 db.create_channel_message(
4299 channel_id,
4300 client_b.current_user_id(&cx_b),
4301 "hello A, it's B.",
4302 OffsetDateTime::now_utc(),
4303 2,
4304 )
4305 .await
4306 .unwrap();
4307
4308 let channels_a = cx_a
4309 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4310 channels_a
4311 .condition(cx_a, |list, _| list.available_channels().is_some())
4312 .await;
4313
4314 channels_a.read_with(cx_a, |list, _| {
4315 assert_eq!(
4316 list.available_channels().unwrap(),
4317 &[ChannelDetails {
4318 id: channel_id.to_proto(),
4319 name: "test-channel".to_string()
4320 }]
4321 )
4322 });
4323 let channel_a = channels_a.update(cx_a, |this, cx| {
4324 this.get_channel(channel_id.to_proto(), cx).unwrap()
4325 });
4326 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4327 channel_a
4328 .condition(&cx_a, |channel, _| {
4329 channel_messages(channel)
4330 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4331 })
4332 .await;
4333
4334 let channels_b = cx_b
4335 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4336 channels_b
4337 .condition(cx_b, |list, _| list.available_channels().is_some())
4338 .await;
4339 channels_b.read_with(cx_b, |list, _| {
4340 assert_eq!(
4341 list.available_channels().unwrap(),
4342 &[ChannelDetails {
4343 id: channel_id.to_proto(),
4344 name: "test-channel".to_string()
4345 }]
4346 )
4347 });
4348
4349 let channel_b = channels_b.update(cx_b, |this, cx| {
4350 this.get_channel(channel_id.to_proto(), cx).unwrap()
4351 });
4352 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4353 channel_b
4354 .condition(&cx_b, |channel, _| {
4355 channel_messages(channel)
4356 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4357 })
4358 .await;
4359
4360 // Disconnect client B, ensuring we can still access its cached channel data.
4361 server.forbid_connections();
4362 server.disconnect_client(client_b.current_user_id(&cx_b));
4363 cx_b.foreground().advance_clock(Duration::from_secs(3));
4364 while !matches!(
4365 status_b.next().await,
4366 Some(client::Status::ReconnectionError { .. })
4367 ) {}
4368
4369 channels_b.read_with(cx_b, |channels, _| {
4370 assert_eq!(
4371 channels.available_channels().unwrap(),
4372 [ChannelDetails {
4373 id: channel_id.to_proto(),
4374 name: "test-channel".to_string()
4375 }]
4376 )
4377 });
4378 channel_b.read_with(cx_b, |channel, _| {
4379 assert_eq!(
4380 channel_messages(channel),
4381 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4382 )
4383 });
4384
4385 // Send a message from client B while it is disconnected.
4386 channel_b
4387 .update(cx_b, |channel, cx| {
4388 let task = channel
4389 .send_message("can you see this?".to_string(), cx)
4390 .unwrap();
4391 assert_eq!(
4392 channel_messages(channel),
4393 &[
4394 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4395 ("user_b".to_string(), "can you see this?".to_string(), true)
4396 ]
4397 );
4398 task
4399 })
4400 .await
4401 .unwrap_err();
4402
4403 // Send a message from client A while B is disconnected.
4404 channel_a
4405 .update(cx_a, |channel, cx| {
4406 channel
4407 .send_message("oh, hi B.".to_string(), cx)
4408 .unwrap()
4409 .detach();
4410 let task = channel.send_message("sup".to_string(), cx).unwrap();
4411 assert_eq!(
4412 channel_messages(channel),
4413 &[
4414 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4415 ("user_a".to_string(), "oh, hi B.".to_string(), true),
4416 ("user_a".to_string(), "sup".to_string(), true)
4417 ]
4418 );
4419 task
4420 })
4421 .await
4422 .unwrap();
4423
4424 // Give client B a chance to reconnect.
4425 server.allow_connections();
4426 cx_b.foreground().advance_clock(Duration::from_secs(10));
4427
4428 // Verify that B sees the new messages upon reconnection, as well as the message client B
4429 // sent while offline.
4430 channel_b
4431 .condition(&cx_b, |channel, _| {
4432 channel_messages(channel)
4433 == [
4434 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4435 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4436 ("user_a".to_string(), "sup".to_string(), false),
4437 ("user_b".to_string(), "can you see this?".to_string(), false),
4438 ]
4439 })
4440 .await;
4441
4442 // Ensure client A and B can communicate normally after reconnection.
4443 channel_a
4444 .update(cx_a, |channel, cx| {
4445 channel.send_message("you online?".to_string(), cx).unwrap()
4446 })
4447 .await
4448 .unwrap();
4449 channel_b
4450 .condition(&cx_b, |channel, _| {
4451 channel_messages(channel)
4452 == [
4453 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4454 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4455 ("user_a".to_string(), "sup".to_string(), false),
4456 ("user_b".to_string(), "can you see this?".to_string(), false),
4457 ("user_a".to_string(), "you online?".to_string(), false),
4458 ]
4459 })
4460 .await;
4461
4462 channel_b
4463 .update(cx_b, |channel, cx| {
4464 channel.send_message("yep".to_string(), cx).unwrap()
4465 })
4466 .await
4467 .unwrap();
4468 channel_a
4469 .condition(&cx_a, |channel, _| {
4470 channel_messages(channel)
4471 == [
4472 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4473 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4474 ("user_a".to_string(), "sup".to_string(), false),
4475 ("user_b".to_string(), "can you see this?".to_string(), false),
4476 ("user_a".to_string(), "you online?".to_string(), false),
4477 ("user_b".to_string(), "yep".to_string(), false),
4478 ]
4479 })
4480 .await;
4481 }
4482
4483 #[gpui::test(iterations = 10)]
4484 async fn test_contacts(
4485 cx_a: &mut TestAppContext,
4486 cx_b: &mut TestAppContext,
4487 cx_c: &mut TestAppContext,
4488 ) {
4489 cx_a.foreground().forbid_parking();
4490 let lang_registry = Arc::new(LanguageRegistry::test());
4491 let fs = FakeFs::new(cx_a.background());
4492
4493 // Connect to a server as 3 clients.
4494 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4495 let client_a = server.create_client(cx_a, "user_a").await;
4496 let client_b = server.create_client(cx_b, "user_b").await;
4497 let client_c = server.create_client(cx_c, "user_c").await;
4498
4499 // Share a worktree as client A.
4500 fs.insert_tree(
4501 "/a",
4502 json!({
4503 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4504 }),
4505 )
4506 .await;
4507
4508 let project_a = cx_a.update(|cx| {
4509 Project::local(
4510 client_a.clone(),
4511 client_a.user_store.clone(),
4512 lang_registry.clone(),
4513 fs.clone(),
4514 cx,
4515 )
4516 });
4517 let (worktree_a, _) = project_a
4518 .update(cx_a, |p, cx| {
4519 p.find_or_create_local_worktree("/a", true, cx)
4520 })
4521 .await
4522 .unwrap();
4523 worktree_a
4524 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4525 .await;
4526
4527 client_a
4528 .user_store
4529 .condition(&cx_a, |user_store, _| {
4530 contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4531 })
4532 .await;
4533 client_b
4534 .user_store
4535 .condition(&cx_b, |user_store, _| {
4536 contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4537 })
4538 .await;
4539 client_c
4540 .user_store
4541 .condition(&cx_c, |user_store, _| {
4542 contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4543 })
4544 .await;
4545
4546 let project_id = project_a
4547 .update(cx_a, |project, _| project.next_remote_id())
4548 .await;
4549 project_a
4550 .update(cx_a, |project, cx| project.share(cx))
4551 .await
4552 .unwrap();
4553 client_a
4554 .user_store
4555 .condition(&cx_a, |user_store, _| {
4556 contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4557 })
4558 .await;
4559 client_b
4560 .user_store
4561 .condition(&cx_b, |user_store, _| {
4562 contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4563 })
4564 .await;
4565 client_c
4566 .user_store
4567 .condition(&cx_c, |user_store, _| {
4568 contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4569 })
4570 .await;
4571
4572 let _project_b = Project::remote(
4573 project_id,
4574 client_b.clone(),
4575 client_b.user_store.clone(),
4576 lang_registry.clone(),
4577 fs.clone(),
4578 &mut cx_b.to_async(),
4579 )
4580 .await
4581 .unwrap();
4582
4583 client_a
4584 .user_store
4585 .condition(&cx_a, |user_store, _| {
4586 contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4587 })
4588 .await;
4589 client_b
4590 .user_store
4591 .condition(&cx_b, |user_store, _| {
4592 contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4593 })
4594 .await;
4595 client_c
4596 .user_store
4597 .condition(&cx_c, |user_store, _| {
4598 contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4599 })
4600 .await;
4601
4602 project_a
4603 .condition(&cx_a, |project, _| {
4604 project.collaborators().contains_key(&client_b.peer_id)
4605 })
4606 .await;
4607
4608 cx_a.update(move |_| drop(project_a));
4609 client_a
4610 .user_store
4611 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4612 .await;
4613 client_b
4614 .user_store
4615 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4616 .await;
4617 client_c
4618 .user_store
4619 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4620 .await;
4621
4622 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, bool, Vec<&str>)>)> {
4623 user_store
4624 .contacts()
4625 .iter()
4626 .map(|contact| {
4627 let worktrees = contact
4628 .projects
4629 .iter()
4630 .map(|p| {
4631 (
4632 p.worktree_root_names[0].as_str(),
4633 p.is_shared,
4634 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4635 )
4636 })
4637 .collect();
4638 (contact.user.github_login.as_str(), worktrees)
4639 })
4640 .collect()
4641 }
4642 }
4643
4644 #[gpui::test(iterations = 10)]
4645 async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4646 cx_a.foreground().forbid_parking();
4647 let fs = FakeFs::new(cx_a.background());
4648
4649 // 2 clients connect to a server.
4650 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4651 let mut client_a = server.create_client(cx_a, "user_a").await;
4652 let mut client_b = server.create_client(cx_b, "user_b").await;
4653 cx_a.update(editor::init);
4654 cx_b.update(editor::init);
4655
4656 // Client A shares a project.
4657 fs.insert_tree(
4658 "/a",
4659 json!({
4660 ".zed.toml": r#"collaborators = ["user_b"]"#,
4661 "1.txt": "one",
4662 "2.txt": "two",
4663 "3.txt": "three",
4664 }),
4665 )
4666 .await;
4667 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
4668 project_a
4669 .update(cx_a, |project, cx| project.share(cx))
4670 .await
4671 .unwrap();
4672
4673 // Client B joins the project.
4674 let project_b = client_b
4675 .build_remote_project(
4676 project_a
4677 .read_with(cx_a, |project, _| project.remote_id())
4678 .unwrap(),
4679 cx_b,
4680 )
4681 .await;
4682
4683 // Client A opens some editors.
4684 let workspace_a = client_a.build_workspace(&project_a, cx_a);
4685 let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
4686 let editor_a1 = workspace_a
4687 .update(cx_a, |workspace, cx| {
4688 workspace.open_path((worktree_id, "1.txt"), cx)
4689 })
4690 .await
4691 .unwrap()
4692 .downcast::<Editor>()
4693 .unwrap();
4694 let editor_a2 = workspace_a
4695 .update(cx_a, |workspace, cx| {
4696 workspace.open_path((worktree_id, "2.txt"), cx)
4697 })
4698 .await
4699 .unwrap()
4700 .downcast::<Editor>()
4701 .unwrap();
4702
4703 // Client B opens an editor.
4704 let workspace_b = client_b.build_workspace(&project_b, cx_b);
4705 let editor_b1 = workspace_b
4706 .update(cx_b, |workspace, cx| {
4707 workspace.open_path((worktree_id, "1.txt"), cx)
4708 })
4709 .await
4710 .unwrap()
4711 .downcast::<Editor>()
4712 .unwrap();
4713
4714 let client_a_id = project_b.read_with(cx_b, |project, _| {
4715 project.collaborators().values().next().unwrap().peer_id
4716 });
4717 let client_b_id = project_a.read_with(cx_a, |project, _| {
4718 project.collaborators().values().next().unwrap().peer_id
4719 });
4720
4721 // When client B starts following client A, all visible view states are replicated to client B.
4722 editor_a1.update(cx_a, |editor, cx| editor.select_ranges([0..1], None, cx));
4723 editor_a2.update(cx_a, |editor, cx| editor.select_ranges([2..3], None, cx));
4724 workspace_b
4725 .update(cx_b, |workspace, cx| {
4726 workspace
4727 .toggle_follow(&ToggleFollow(client_a_id), cx)
4728 .unwrap()
4729 })
4730 .await
4731 .unwrap();
4732 let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
4733 workspace
4734 .active_item(cx)
4735 .unwrap()
4736 .downcast::<Editor>()
4737 .unwrap()
4738 });
4739 assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
4740 assert_eq!(
4741 editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
4742 Some((worktree_id, "2.txt").into())
4743 );
4744 assert_eq!(
4745 editor_b2.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
4746 vec![2..3]
4747 );
4748 assert_eq!(
4749 editor_b1.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
4750 vec![0..1]
4751 );
4752
4753 // When client A activates a different editor, client B does so as well.
4754 workspace_a.update(cx_a, |workspace, cx| {
4755 workspace.activate_item(&editor_a1, cx)
4756 });
4757 workspace_b
4758 .condition(cx_b, |workspace, cx| {
4759 workspace.active_item(cx).unwrap().id() == editor_b1.id()
4760 })
4761 .await;
4762
4763 // When client A navigates back and forth, client B does so as well.
4764 workspace_a
4765 .update(cx_a, |workspace, cx| {
4766 workspace::Pane::go_back(workspace, None, cx)
4767 })
4768 .await;
4769 workspace_b
4770 .condition(cx_b, |workspace, cx| {
4771 workspace.active_item(cx).unwrap().id() == editor_b2.id()
4772 })
4773 .await;
4774
4775 workspace_a
4776 .update(cx_a, |workspace, cx| {
4777 workspace::Pane::go_forward(workspace, None, cx)
4778 })
4779 .await;
4780 workspace_b
4781 .condition(cx_b, |workspace, cx| {
4782 workspace.active_item(cx).unwrap().id() == editor_b1.id()
4783 })
4784 .await;
4785
4786 // Changes to client A's editor are reflected on client B.
4787 editor_a1.update(cx_a, |editor, cx| {
4788 editor.select_ranges([1..1, 2..2], None, cx);
4789 });
4790 editor_b1
4791 .condition(cx_b, |editor, cx| {
4792 editor.selected_ranges(cx) == vec![1..1, 2..2]
4793 })
4794 .await;
4795
4796 editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
4797 editor_b1
4798 .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
4799 .await;
4800
4801 editor_a1.update(cx_a, |editor, cx| {
4802 editor.select_ranges([3..3], None, cx);
4803 editor.set_scroll_position(vec2f(0., 100.), cx);
4804 });
4805 editor_b1
4806 .condition(cx_b, |editor, cx| editor.selected_ranges(cx) == vec![3..3])
4807 .await;
4808
4809 // After unfollowing, client B stops receiving updates from client A.
4810 workspace_b.update(cx_b, |workspace, cx| {
4811 workspace.unfollow(&workspace.active_pane().clone(), cx)
4812 });
4813 workspace_a.update(cx_a, |workspace, cx| {
4814 workspace.activate_item(&editor_a2, cx)
4815 });
4816 cx_a.foreground().run_until_parked();
4817 assert_eq!(
4818 workspace_b.read_with(cx_b, |workspace, cx| workspace
4819 .active_item(cx)
4820 .unwrap()
4821 .id()),
4822 editor_b1.id()
4823 );
4824
4825 // Client A starts following client B.
4826 workspace_a
4827 .update(cx_a, |workspace, cx| {
4828 workspace
4829 .toggle_follow(&ToggleFollow(client_b_id), cx)
4830 .unwrap()
4831 })
4832 .await
4833 .unwrap();
4834 assert_eq!(
4835 workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
4836 Some(client_b_id)
4837 );
4838 assert_eq!(
4839 workspace_a.read_with(cx_a, |workspace, cx| workspace
4840 .active_item(cx)
4841 .unwrap()
4842 .id()),
4843 editor_a1.id()
4844 );
4845
4846 // Following interrupts when client B disconnects.
4847 client_b.disconnect(&cx_b.to_async()).unwrap();
4848 cx_a.foreground().run_until_parked();
4849 assert_eq!(
4850 workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
4851 None
4852 );
4853 }
4854
4855 #[gpui::test(iterations = 10)]
4856 async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4857 cx_a.foreground().forbid_parking();
4858 let fs = FakeFs::new(cx_a.background());
4859
4860 // 2 clients connect to a server.
4861 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4862 let mut client_a = server.create_client(cx_a, "user_a").await;
4863 let mut client_b = server.create_client(cx_b, "user_b").await;
4864 cx_a.update(editor::init);
4865 cx_b.update(editor::init);
4866
4867 // Client A shares a project.
4868 fs.insert_tree(
4869 "/a",
4870 json!({
4871 ".zed.toml": r#"collaborators = ["user_b"]"#,
4872 "1.txt": "one",
4873 "2.txt": "two",
4874 "3.txt": "three",
4875 "4.txt": "four",
4876 }),
4877 )
4878 .await;
4879 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
4880 project_a
4881 .update(cx_a, |project, cx| project.share(cx))
4882 .await
4883 .unwrap();
4884
4885 // Client B joins the project.
4886 let project_b = client_b
4887 .build_remote_project(
4888 project_a
4889 .read_with(cx_a, |project, _| project.remote_id())
4890 .unwrap(),
4891 cx_b,
4892 )
4893 .await;
4894
4895 // Client A opens some editors.
4896 let workspace_a = client_a.build_workspace(&project_a, cx_a);
4897 let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
4898 let _editor_a1 = workspace_a
4899 .update(cx_a, |workspace, cx| {
4900 workspace.open_path((worktree_id, "1.txt"), cx)
4901 })
4902 .await
4903 .unwrap()
4904 .downcast::<Editor>()
4905 .unwrap();
4906
4907 // Client B opens an editor.
4908 let workspace_b = client_b.build_workspace(&project_b, cx_b);
4909 let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
4910 let _editor_b1 = workspace_b
4911 .update(cx_b, |workspace, cx| {
4912 workspace.open_path((worktree_id, "2.txt"), cx)
4913 })
4914 .await
4915 .unwrap()
4916 .downcast::<Editor>()
4917 .unwrap();
4918
4919 // Clients A and B follow each other in split panes
4920 workspace_a
4921 .update(cx_a, |workspace, cx| {
4922 workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
4923 assert_ne!(*workspace.active_pane(), pane_a1);
4924 let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
4925 workspace
4926 .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
4927 .unwrap()
4928 })
4929 .await
4930 .unwrap();
4931 workspace_b
4932 .update(cx_b, |workspace, cx| {
4933 workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
4934 assert_ne!(*workspace.active_pane(), pane_b1);
4935 let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
4936 workspace
4937 .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
4938 .unwrap()
4939 })
4940 .await
4941 .unwrap();
4942
4943 workspace_a
4944 .update(cx_a, |workspace, cx| {
4945 workspace.activate_next_pane(cx);
4946 assert_eq!(*workspace.active_pane(), pane_a1);
4947 workspace.open_path((worktree_id, "3.txt"), cx)
4948 })
4949 .await
4950 .unwrap();
4951 workspace_b
4952 .update(cx_b, |workspace, cx| {
4953 workspace.activate_next_pane(cx);
4954 assert_eq!(*workspace.active_pane(), pane_b1);
4955 workspace.open_path((worktree_id, "4.txt"), cx)
4956 })
4957 .await
4958 .unwrap();
4959 cx_a.foreground().run_until_parked();
4960
4961 // Ensure leader updates don't change the active pane of followers
4962 workspace_a.read_with(cx_a, |workspace, _| {
4963 assert_eq!(*workspace.active_pane(), pane_a1);
4964 });
4965 workspace_b.read_with(cx_b, |workspace, _| {
4966 assert_eq!(*workspace.active_pane(), pane_b1);
4967 });
4968
4969 // Ensure peers following each other doesn't cause an infinite loop.
4970 assert_eq!(
4971 workspace_a.read_with(cx_a, |workspace, cx| workspace
4972 .active_item(cx)
4973 .unwrap()
4974 .project_path(cx)),
4975 Some((worktree_id, "3.txt").into())
4976 );
4977 workspace_a.update(cx_a, |workspace, cx| {
4978 assert_eq!(
4979 workspace.active_item(cx).unwrap().project_path(cx),
4980 Some((worktree_id, "3.txt").into())
4981 );
4982 workspace.activate_next_pane(cx);
4983 assert_eq!(
4984 workspace.active_item(cx).unwrap().project_path(cx),
4985 Some((worktree_id, "4.txt").into())
4986 );
4987 });
4988 workspace_b.update(cx_b, |workspace, cx| {
4989 assert_eq!(
4990 workspace.active_item(cx).unwrap().project_path(cx),
4991 Some((worktree_id, "4.txt").into())
4992 );
4993 workspace.activate_next_pane(cx);
4994 assert_eq!(
4995 workspace.active_item(cx).unwrap().project_path(cx),
4996 Some((worktree_id, "3.txt").into())
4997 );
4998 });
4999 }
5000
5001 #[gpui::test(iterations = 10)]
5002 async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5003 cx_a.foreground().forbid_parking();
5004 let fs = FakeFs::new(cx_a.background());
5005
5006 // 2 clients connect to a server.
5007 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5008 let mut client_a = server.create_client(cx_a, "user_a").await;
5009 let mut client_b = server.create_client(cx_b, "user_b").await;
5010 cx_a.update(editor::init);
5011 cx_b.update(editor::init);
5012
5013 // Client A shares a project.
5014 fs.insert_tree(
5015 "/a",
5016 json!({
5017 ".zed.toml": r#"collaborators = ["user_b"]"#,
5018 "1.txt": "one",
5019 "2.txt": "two",
5020 "3.txt": "three",
5021 }),
5022 )
5023 .await;
5024 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5025 project_a
5026 .update(cx_a, |project, cx| project.share(cx))
5027 .await
5028 .unwrap();
5029
5030 // Client B joins the project.
5031 let project_b = client_b
5032 .build_remote_project(
5033 project_a
5034 .read_with(cx_a, |project, _| project.remote_id())
5035 .unwrap(),
5036 cx_b,
5037 )
5038 .await;
5039
5040 // Client A opens some editors.
5041 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5042 let _editor_a1 = workspace_a
5043 .update(cx_a, |workspace, cx| {
5044 workspace.open_path((worktree_id, "1.txt"), cx)
5045 })
5046 .await
5047 .unwrap()
5048 .downcast::<Editor>()
5049 .unwrap();
5050
5051 // Client B starts following client A.
5052 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5053 let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5054 let leader_id = project_b.read_with(cx_b, |project, _| {
5055 project.collaborators().values().next().unwrap().peer_id
5056 });
5057 workspace_b
5058 .update(cx_b, |workspace, cx| {
5059 workspace
5060 .toggle_follow(&ToggleFollow(leader_id), cx)
5061 .unwrap()
5062 })
5063 .await
5064 .unwrap();
5065 assert_eq!(
5066 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5067 Some(leader_id)
5068 );
5069 let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5070 workspace
5071 .active_item(cx)
5072 .unwrap()
5073 .downcast::<Editor>()
5074 .unwrap()
5075 });
5076
5077 // When client B moves, it automatically stops following client A.
5078 editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
5079 assert_eq!(
5080 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5081 None
5082 );
5083
5084 workspace_b
5085 .update(cx_b, |workspace, cx| {
5086 workspace
5087 .toggle_follow(&ToggleFollow(leader_id), cx)
5088 .unwrap()
5089 })
5090 .await
5091 .unwrap();
5092 assert_eq!(
5093 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5094 Some(leader_id)
5095 );
5096
5097 // When client B edits, it automatically stops following client A.
5098 editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
5099 assert_eq!(
5100 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5101 None
5102 );
5103
5104 workspace_b
5105 .update(cx_b, |workspace, cx| {
5106 workspace
5107 .toggle_follow(&ToggleFollow(leader_id), cx)
5108 .unwrap()
5109 })
5110 .await
5111 .unwrap();
5112 assert_eq!(
5113 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5114 Some(leader_id)
5115 );
5116
5117 // When client B scrolls, it automatically stops following client A.
5118 editor_b2.update(cx_b, |editor, cx| {
5119 editor.set_scroll_position(vec2f(0., 3.), cx)
5120 });
5121 assert_eq!(
5122 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5123 None
5124 );
5125
5126 workspace_b
5127 .update(cx_b, |workspace, cx| {
5128 workspace
5129 .toggle_follow(&ToggleFollow(leader_id), cx)
5130 .unwrap()
5131 })
5132 .await
5133 .unwrap();
5134 assert_eq!(
5135 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5136 Some(leader_id)
5137 );
5138
5139 // When client B activates a different pane, it continues following client A in the original pane.
5140 workspace_b.update(cx_b, |workspace, cx| {
5141 workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
5142 });
5143 assert_eq!(
5144 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5145 Some(leader_id)
5146 );
5147
5148 workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
5149 assert_eq!(
5150 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5151 Some(leader_id)
5152 );
5153
5154 // When client B activates a different item in the original pane, it automatically stops following client A.
5155 workspace_b
5156 .update(cx_b, |workspace, cx| {
5157 workspace.open_path((worktree_id, "2.txt"), cx)
5158 })
5159 .await
5160 .unwrap();
5161 assert_eq!(
5162 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5163 None
5164 );
5165 }
5166
5167 #[gpui::test(iterations = 100)]
5168 async fn test_random_collaboration(
5169 cx: &mut TestAppContext,
5170 deterministic: Arc<Deterministic>,
5171 rng: StdRng,
5172 ) {
5173 cx.foreground().forbid_parking();
5174 let max_peers = env::var("MAX_PEERS")
5175 .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
5176 .unwrap_or(5);
5177 assert!(max_peers <= 5);
5178
5179 let max_operations = env::var("OPERATIONS")
5180 .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
5181 .unwrap_or(10);
5182
5183 let rng = Arc::new(Mutex::new(rng));
5184
5185 let guest_lang_registry = Arc::new(LanguageRegistry::test());
5186 let host_language_registry = Arc::new(LanguageRegistry::test());
5187
5188 let fs = FakeFs::new(cx.background());
5189 fs.insert_tree(
5190 "/_collab",
5191 json!({
5192 ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4"]"#
5193 }),
5194 )
5195 .await;
5196
5197 let mut server = TestServer::start(cx.foreground(), cx.background()).await;
5198 let mut clients = Vec::new();
5199 let mut user_ids = Vec::new();
5200 let mut op_start_signals = Vec::new();
5201 let files = Arc::new(Mutex::new(Vec::new()));
5202
5203 let mut next_entity_id = 100000;
5204 let mut host_cx = TestAppContext::new(
5205 cx.foreground_platform(),
5206 cx.platform(),
5207 deterministic.build_foreground(next_entity_id),
5208 deterministic.build_background(),
5209 cx.font_cache(),
5210 cx.leak_detector(),
5211 next_entity_id,
5212 );
5213 let host = server.create_client(&mut host_cx, "host").await;
5214 let host_project = host_cx.update(|cx| {
5215 Project::local(
5216 host.client.clone(),
5217 host.user_store.clone(),
5218 host_language_registry.clone(),
5219 fs.clone(),
5220 cx,
5221 )
5222 });
5223 let host_project_id = host_project
5224 .update(&mut host_cx, |p, _| p.next_remote_id())
5225 .await;
5226
5227 let (collab_worktree, _) = host_project
5228 .update(&mut host_cx, |project, cx| {
5229 project.find_or_create_local_worktree("/_collab", true, cx)
5230 })
5231 .await
5232 .unwrap();
5233 collab_worktree
5234 .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
5235 .await;
5236 host_project
5237 .update(&mut host_cx, |project, cx| project.share(cx))
5238 .await
5239 .unwrap();
5240
5241 // Set up fake language servers.
5242 let mut language = Language::new(
5243 LanguageConfig {
5244 name: "Rust".into(),
5245 path_suffixes: vec!["rs".to_string()],
5246 ..Default::default()
5247 },
5248 None,
5249 );
5250 let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
5251 name: "the-fake-language-server",
5252 capabilities: lsp::LanguageServer::full_capabilities(),
5253 initializer: Some(Box::new({
5254 let rng = rng.clone();
5255 let files = files.clone();
5256 let project = host_project.downgrade();
5257 move |fake_server: &mut FakeLanguageServer| {
5258 fake_server.handle_request::<lsp::request::Completion, _, _>(
5259 |_, _| async move {
5260 Ok(Some(lsp::CompletionResponse::Array(vec![
5261 lsp::CompletionItem {
5262 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
5263 range: lsp::Range::new(
5264 lsp::Position::new(0, 0),
5265 lsp::Position::new(0, 0),
5266 ),
5267 new_text: "the-new-text".to_string(),
5268 })),
5269 ..Default::default()
5270 },
5271 ])))
5272 },
5273 );
5274
5275 fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
5276 |_, _| async move {
5277 Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
5278 lsp::CodeAction {
5279 title: "the-code-action".to_string(),
5280 ..Default::default()
5281 },
5282 )]))
5283 },
5284 );
5285
5286 fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
5287 |params, _| async move {
5288 Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
5289 params.position,
5290 params.position,
5291 ))))
5292 },
5293 );
5294
5295 fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
5296 let files = files.clone();
5297 let rng = rng.clone();
5298 move |_, _| {
5299 let files = files.clone();
5300 let rng = rng.clone();
5301 async move {
5302 let files = files.lock();
5303 let mut rng = rng.lock();
5304 let count = rng.gen_range::<usize, _>(1..3);
5305 let files = (0..count)
5306 .map(|_| files.choose(&mut *rng).unwrap())
5307 .collect::<Vec<_>>();
5308 log::info!("LSP: Returning definitions in files {:?}", &files);
5309 Ok(Some(lsp::GotoDefinitionResponse::Array(
5310 files
5311 .into_iter()
5312 .map(|file| lsp::Location {
5313 uri: lsp::Url::from_file_path(file).unwrap(),
5314 range: Default::default(),
5315 })
5316 .collect(),
5317 )))
5318 }
5319 }
5320 });
5321
5322 fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
5323 let rng = rng.clone();
5324 let project = project.clone();
5325 move |params, mut cx| {
5326 let highlights = if let Some(project) = project.upgrade(&cx) {
5327 project.update(&mut cx, |project, cx| {
5328 let path = params
5329 .text_document_position_params
5330 .text_document
5331 .uri
5332 .to_file_path()
5333 .unwrap();
5334 let (worktree, relative_path) =
5335 project.find_local_worktree(&path, cx)?;
5336 let project_path =
5337 ProjectPath::from((worktree.read(cx).id(), relative_path));
5338 let buffer =
5339 project.get_open_buffer(&project_path, cx)?.read(cx);
5340
5341 let mut highlights = Vec::new();
5342 let highlight_count = rng.lock().gen_range(1..=5);
5343 let mut prev_end = 0;
5344 for _ in 0..highlight_count {
5345 let range =
5346 buffer.random_byte_range(prev_end, &mut *rng.lock());
5347
5348 highlights.push(lsp::DocumentHighlight {
5349 range: range_to_lsp(range.to_point_utf16(buffer)),
5350 kind: Some(lsp::DocumentHighlightKind::READ),
5351 });
5352 prev_end = range.end;
5353 }
5354 Some(highlights)
5355 })
5356 } else {
5357 None
5358 };
5359 async move { Ok(highlights) }
5360 }
5361 });
5362 }
5363 })),
5364 ..Default::default()
5365 });
5366 host_language_registry.add(Arc::new(language));
5367
5368 let op_start_signal = futures::channel::mpsc::unbounded();
5369 user_ids.push(host.current_user_id(&host_cx));
5370 op_start_signals.push(op_start_signal.0);
5371 clients.push(host_cx.foreground().spawn(host.simulate_host(
5372 host_project,
5373 files,
5374 op_start_signal.1,
5375 rng.clone(),
5376 host_cx,
5377 )));
5378
5379 let disconnect_host_at = if rng.lock().gen_bool(0.2) {
5380 rng.lock().gen_range(0..max_operations)
5381 } else {
5382 max_operations
5383 };
5384 let mut available_guests = vec![
5385 "guest-1".to_string(),
5386 "guest-2".to_string(),
5387 "guest-3".to_string(),
5388 "guest-4".to_string(),
5389 ];
5390 let mut operations = 0;
5391 while operations < max_operations {
5392 if operations == disconnect_host_at {
5393 server.disconnect_client(user_ids[0]);
5394 cx.foreground().advance_clock(RECEIVE_TIMEOUT);
5395 drop(op_start_signals);
5396 let mut clients = futures::future::join_all(clients).await;
5397 cx.foreground().run_until_parked();
5398
5399 let (host, mut host_cx, host_err) = clients.remove(0);
5400 if let Some(host_err) = host_err {
5401 log::error!("host error - {}", host_err);
5402 }
5403 host.project
5404 .as_ref()
5405 .unwrap()
5406 .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
5407 for (guest, mut guest_cx, guest_err) in clients {
5408 if let Some(guest_err) = guest_err {
5409 log::error!("{} error - {}", guest.username, guest_err);
5410 }
5411 let contacts = server
5412 .store
5413 .read()
5414 .await
5415 .contacts_for_user(guest.current_user_id(&guest_cx));
5416 assert!(!contacts
5417 .iter()
5418 .flat_map(|contact| &contact.projects)
5419 .any(|project| project.id == host_project_id));
5420 guest
5421 .project
5422 .as_ref()
5423 .unwrap()
5424 .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
5425 guest_cx.update(|_| drop(guest));
5426 }
5427 host_cx.update(|_| drop(host));
5428
5429 return;
5430 }
5431
5432 let distribution = rng.lock().gen_range(0..100);
5433 match distribution {
5434 0..=19 if !available_guests.is_empty() => {
5435 let guest_ix = rng.lock().gen_range(0..available_guests.len());
5436 let guest_username = available_guests.remove(guest_ix);
5437 log::info!("Adding new connection for {}", guest_username);
5438 next_entity_id += 100000;
5439 let mut guest_cx = TestAppContext::new(
5440 cx.foreground_platform(),
5441 cx.platform(),
5442 deterministic.build_foreground(next_entity_id),
5443 deterministic.build_background(),
5444 cx.font_cache(),
5445 cx.leak_detector(),
5446 next_entity_id,
5447 );
5448 let guest = server.create_client(&mut guest_cx, &guest_username).await;
5449 let guest_project = Project::remote(
5450 host_project_id,
5451 guest.client.clone(),
5452 guest.user_store.clone(),
5453 guest_lang_registry.clone(),
5454 FakeFs::new(cx.background()),
5455 &mut guest_cx.to_async(),
5456 )
5457 .await
5458 .unwrap();
5459 let op_start_signal = futures::channel::mpsc::unbounded();
5460 user_ids.push(guest.current_user_id(&guest_cx));
5461 op_start_signals.push(op_start_signal.0);
5462 clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
5463 guest_username.clone(),
5464 guest_project,
5465 op_start_signal.1,
5466 rng.clone(),
5467 guest_cx,
5468 )));
5469
5470 log::info!("Added connection for {}", guest_username);
5471 operations += 1;
5472 }
5473 20..=29 if clients.len() > 1 => {
5474 log::info!("Removing guest");
5475 let guest_ix = rng.lock().gen_range(1..clients.len());
5476 let removed_guest_id = user_ids.remove(guest_ix);
5477 let guest = clients.remove(guest_ix);
5478 op_start_signals.remove(guest_ix);
5479 server.disconnect_client(removed_guest_id);
5480 cx.foreground().advance_clock(RECEIVE_TIMEOUT);
5481 let (guest, mut guest_cx, guest_err) = guest.await;
5482 if let Some(guest_err) = guest_err {
5483 log::error!("{} error - {}", guest.username, guest_err);
5484 }
5485 guest
5486 .project
5487 .as_ref()
5488 .unwrap()
5489 .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
5490 for user_id in &user_ids {
5491 for contact in server.store.read().await.contacts_for_user(*user_id) {
5492 assert_ne!(
5493 contact.user_id, removed_guest_id.0 as u64,
5494 "removed guest is still a contact of another peer"
5495 );
5496 for project in contact.projects {
5497 for project_guest_id in project.guests {
5498 assert_ne!(
5499 project_guest_id, removed_guest_id.0 as u64,
5500 "removed guest appears as still participating on a project"
5501 );
5502 }
5503 }
5504 }
5505 }
5506
5507 log::info!("{} removed", guest.username);
5508 available_guests.push(guest.username.clone());
5509 guest_cx.update(|_| drop(guest));
5510
5511 operations += 1;
5512 }
5513 _ => {
5514 while operations < max_operations && rng.lock().gen_bool(0.7) {
5515 op_start_signals
5516 .choose(&mut *rng.lock())
5517 .unwrap()
5518 .unbounded_send(())
5519 .unwrap();
5520 operations += 1;
5521 }
5522
5523 if rng.lock().gen_bool(0.8) {
5524 cx.foreground().run_until_parked();
5525 }
5526 }
5527 }
5528 }
5529
5530 drop(op_start_signals);
5531 let mut clients = futures::future::join_all(clients).await;
5532 cx.foreground().run_until_parked();
5533
5534 let (host_client, mut host_cx, host_err) = clients.remove(0);
5535 if let Some(host_err) = host_err {
5536 panic!("host error - {}", host_err);
5537 }
5538 let host_project = host_client.project.as_ref().unwrap();
5539 let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
5540 project
5541 .worktrees(cx)
5542 .map(|worktree| {
5543 let snapshot = worktree.read(cx).snapshot();
5544 (snapshot.id(), snapshot)
5545 })
5546 .collect::<BTreeMap<_, _>>()
5547 });
5548
5549 host_client
5550 .project
5551 .as_ref()
5552 .unwrap()
5553 .read_with(&host_cx, |project, cx| project.check_invariants(cx));
5554
5555 for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
5556 if let Some(guest_err) = guest_err {
5557 panic!("{} error - {}", guest_client.username, guest_err);
5558 }
5559 let worktree_snapshots =
5560 guest_client
5561 .project
5562 .as_ref()
5563 .unwrap()
5564 .read_with(&guest_cx, |project, cx| {
5565 project
5566 .worktrees(cx)
5567 .map(|worktree| {
5568 let worktree = worktree.read(cx);
5569 (worktree.id(), worktree.snapshot())
5570 })
5571 .collect::<BTreeMap<_, _>>()
5572 });
5573
5574 assert_eq!(
5575 worktree_snapshots.keys().collect::<Vec<_>>(),
5576 host_worktree_snapshots.keys().collect::<Vec<_>>(),
5577 "{} has different worktrees than the host",
5578 guest_client.username
5579 );
5580 for (id, host_snapshot) in &host_worktree_snapshots {
5581 let guest_snapshot = &worktree_snapshots[id];
5582 assert_eq!(
5583 guest_snapshot.root_name(),
5584 host_snapshot.root_name(),
5585 "{} has different root name than the host for worktree {}",
5586 guest_client.username,
5587 id
5588 );
5589 assert_eq!(
5590 guest_snapshot.entries(false).collect::<Vec<_>>(),
5591 host_snapshot.entries(false).collect::<Vec<_>>(),
5592 "{} has different snapshot than the host for worktree {}",
5593 guest_client.username,
5594 id
5595 );
5596 }
5597
5598 guest_client
5599 .project
5600 .as_ref()
5601 .unwrap()
5602 .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
5603
5604 for guest_buffer in &guest_client.buffers {
5605 let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
5606 let host_buffer = host_project.read_with(&host_cx, |project, cx| {
5607 project.buffer_for_id(buffer_id, cx).expect(&format!(
5608 "host does not have buffer for guest:{}, peer:{}, id:{}",
5609 guest_client.username, guest_client.peer_id, buffer_id
5610 ))
5611 });
5612 let path = host_buffer
5613 .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
5614
5615 assert_eq!(
5616 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
5617 0,
5618 "{}, buffer {}, path {:?} has deferred operations",
5619 guest_client.username,
5620 buffer_id,
5621 path,
5622 );
5623 assert_eq!(
5624 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
5625 host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
5626 "{}, buffer {}, path {:?}, differs from the host's buffer",
5627 guest_client.username,
5628 buffer_id,
5629 path
5630 );
5631 }
5632
5633 guest_cx.update(|_| drop(guest_client));
5634 }
5635
5636 host_cx.update(|_| drop(host_client));
5637 }
5638
5639 struct TestServer {
5640 peer: Arc<Peer>,
5641 app_state: Arc<AppState>,
5642 server: Arc<Server>,
5643 foreground: Rc<executor::Foreground>,
5644 notifications: mpsc::UnboundedReceiver<()>,
5645 connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
5646 forbid_connections: Arc<AtomicBool>,
5647 _test_db: TestDb,
5648 }
5649
5650 impl TestServer {
5651 async fn start(
5652 foreground: Rc<executor::Foreground>,
5653 background: Arc<executor::Background>,
5654 ) -> Self {
5655 let test_db = TestDb::fake(background);
5656 let app_state = Self::build_app_state(&test_db).await;
5657 let peer = Peer::new();
5658 let notifications = mpsc::unbounded();
5659 let server = Server::new(app_state.clone(), Some(notifications.0));
5660 Self {
5661 peer,
5662 app_state,
5663 server,
5664 foreground,
5665 notifications: notifications.1,
5666 connection_killers: Default::default(),
5667 forbid_connections: Default::default(),
5668 _test_db: test_db,
5669 }
5670 }
5671
5672 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
5673 cx.update(|cx| {
5674 let settings = Settings::test(cx);
5675 cx.set_global(settings);
5676 });
5677
5678 let http = FakeHttpClient::with_404_response();
5679 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
5680 let client_name = name.to_string();
5681 let mut client = Client::new(http.clone());
5682 let server = self.server.clone();
5683 let connection_killers = self.connection_killers.clone();
5684 let forbid_connections = self.forbid_connections.clone();
5685 let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
5686
5687 Arc::get_mut(&mut client)
5688 .unwrap()
5689 .override_authenticate(move |cx| {
5690 cx.spawn(|_| async move {
5691 let access_token = "the-token".to_string();
5692 Ok(Credentials {
5693 user_id: user_id.0 as u64,
5694 access_token,
5695 })
5696 })
5697 })
5698 .override_establish_connection(move |credentials, cx| {
5699 assert_eq!(credentials.user_id, user_id.0 as u64);
5700 assert_eq!(credentials.access_token, "the-token");
5701
5702 let server = server.clone();
5703 let connection_killers = connection_killers.clone();
5704 let forbid_connections = forbid_connections.clone();
5705 let client_name = client_name.clone();
5706 let connection_id_tx = connection_id_tx.clone();
5707 cx.spawn(move |cx| async move {
5708 if forbid_connections.load(SeqCst) {
5709 Err(EstablishConnectionError::other(anyhow!(
5710 "server is forbidding connections"
5711 )))
5712 } else {
5713 let (client_conn, server_conn, killed) =
5714 Connection::in_memory(cx.background());
5715 connection_killers.lock().insert(user_id, killed);
5716 cx.background()
5717 .spawn(server.handle_connection(
5718 server_conn,
5719 client_name,
5720 user_id,
5721 Some(connection_id_tx),
5722 cx.background(),
5723 ))
5724 .detach();
5725 Ok(client_conn)
5726 }
5727 })
5728 });
5729
5730 client
5731 .authenticate_and_connect(false, &cx.to_async())
5732 .await
5733 .unwrap();
5734
5735 Channel::init(&client);
5736 Project::init(&client);
5737 cx.update(|cx| {
5738 workspace::init(&client, cx);
5739 });
5740
5741 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
5742 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
5743
5744 let client = TestClient {
5745 client,
5746 peer_id,
5747 username: name.to_string(),
5748 user_store,
5749 language_registry: Arc::new(LanguageRegistry::test()),
5750 project: Default::default(),
5751 buffers: Default::default(),
5752 };
5753 client.wait_for_current_user(cx).await;
5754 client
5755 }
5756
5757 fn disconnect_client(&self, user_id: UserId) {
5758 self.connection_killers
5759 .lock()
5760 .remove(&user_id)
5761 .unwrap()
5762 .store(true, SeqCst);
5763 }
5764
5765 fn forbid_connections(&self) {
5766 self.forbid_connections.store(true, SeqCst);
5767 }
5768
5769 fn allow_connections(&self) {
5770 self.forbid_connections.store(false, SeqCst);
5771 }
5772
5773 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
5774 Arc::new(AppState {
5775 db: test_db.db().clone(),
5776 api_token: Default::default(),
5777 })
5778 }
5779
5780 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
5781 self.server.store.read().await
5782 }
5783
5784 async fn condition<F>(&mut self, mut predicate: F)
5785 where
5786 F: FnMut(&Store) -> bool,
5787 {
5788 assert!(
5789 self.foreground.parking_forbidden(),
5790 "you must call forbid_parking to use server conditions so we don't block indefinitely"
5791 );
5792 while !(predicate)(&*self.server.store.read().await) {
5793 self.foreground.start_waiting();
5794 self.notifications.next().await;
5795 self.foreground.finish_waiting();
5796 }
5797 }
5798 }
5799
5800 impl Deref for TestServer {
5801 type Target = Server;
5802
5803 fn deref(&self) -> &Self::Target {
5804 &self.server
5805 }
5806 }
5807
5808 impl Drop for TestServer {
5809 fn drop(&mut self) {
5810 self.peer.reset();
5811 }
5812 }
5813
5814 struct TestClient {
5815 client: Arc<Client>,
5816 username: String,
5817 pub peer_id: PeerId,
5818 pub user_store: ModelHandle<UserStore>,
5819 language_registry: Arc<LanguageRegistry>,
5820 project: Option<ModelHandle<Project>>,
5821 buffers: HashSet<ModelHandle<language::Buffer>>,
5822 }
5823
5824 impl Deref for TestClient {
5825 type Target = Arc<Client>;
5826
5827 fn deref(&self) -> &Self::Target {
5828 &self.client
5829 }
5830 }
5831
5832 impl TestClient {
5833 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
5834 UserId::from_proto(
5835 self.user_store
5836 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
5837 )
5838 }
5839
5840 async fn wait_for_current_user(&self, cx: &TestAppContext) {
5841 let mut authed_user = self
5842 .user_store
5843 .read_with(cx, |user_store, _| user_store.watch_current_user());
5844 while authed_user.next().await.unwrap().is_none() {}
5845 }
5846
5847 async fn build_local_project(
5848 &mut self,
5849 fs: Arc<FakeFs>,
5850 root_path: impl AsRef<Path>,
5851 cx: &mut TestAppContext,
5852 ) -> (ModelHandle<Project>, WorktreeId) {
5853 let project = cx.update(|cx| {
5854 Project::local(
5855 self.client.clone(),
5856 self.user_store.clone(),
5857 self.language_registry.clone(),
5858 fs,
5859 cx,
5860 )
5861 });
5862 self.project = Some(project.clone());
5863 let (worktree, _) = project
5864 .update(cx, |p, cx| {
5865 p.find_or_create_local_worktree(root_path, true, cx)
5866 })
5867 .await
5868 .unwrap();
5869 worktree
5870 .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
5871 .await;
5872 project
5873 .update(cx, |project, _| project.next_remote_id())
5874 .await;
5875 (project, worktree.read_with(cx, |tree, _| tree.id()))
5876 }
5877
5878 async fn build_remote_project(
5879 &mut self,
5880 project_id: u64,
5881 cx: &mut TestAppContext,
5882 ) -> ModelHandle<Project> {
5883 let project = Project::remote(
5884 project_id,
5885 self.client.clone(),
5886 self.user_store.clone(),
5887 self.language_registry.clone(),
5888 FakeFs::new(cx.background()),
5889 &mut cx.to_async(),
5890 )
5891 .await
5892 .unwrap();
5893 self.project = Some(project.clone());
5894 project
5895 }
5896
5897 fn build_workspace(
5898 &self,
5899 project: &ModelHandle<Project>,
5900 cx: &mut TestAppContext,
5901 ) -> ViewHandle<Workspace> {
5902 let (window_id, _) = cx.add_window(|_| EmptyView);
5903 cx.add_view(window_id, |cx| {
5904 let fs = project.read(cx).fs().clone();
5905 Workspace::new(
5906 &WorkspaceParams {
5907 fs,
5908 project: project.clone(),
5909 user_store: self.user_store.clone(),
5910 languages: self.language_registry.clone(),
5911 themes: ThemeRegistry::new((), cx.font_cache().clone()),
5912 channel_list: cx.add_model(|cx| {
5913 ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
5914 }),
5915 client: self.client.clone(),
5916 },
5917 cx,
5918 )
5919 })
5920 }
5921
5922 async fn simulate_host(
5923 mut self,
5924 project: ModelHandle<Project>,
5925 files: Arc<Mutex<Vec<PathBuf>>>,
5926 op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
5927 rng: Arc<Mutex<StdRng>>,
5928 mut cx: TestAppContext,
5929 ) -> (Self, TestAppContext, Option<anyhow::Error>) {
5930 async fn simulate_host_internal(
5931 client: &mut TestClient,
5932 project: ModelHandle<Project>,
5933 files: Arc<Mutex<Vec<PathBuf>>>,
5934 mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
5935 rng: Arc<Mutex<StdRng>>,
5936 cx: &mut TestAppContext,
5937 ) -> anyhow::Result<()> {
5938 let fs = project.read_with(cx, |project, _| project.fs().clone());
5939
5940 while op_start_signal.next().await.is_some() {
5941 let distribution = rng.lock().gen_range::<usize, _>(0..100);
5942 match distribution {
5943 0..=20 if !files.lock().is_empty() => {
5944 let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
5945 let mut path = path.as_path();
5946 while let Some(parent_path) = path.parent() {
5947 path = parent_path;
5948 if rng.lock().gen() {
5949 break;
5950 }
5951 }
5952
5953 log::info!("Host: find/create local worktree {:?}", path);
5954 let find_or_create_worktree = project.update(cx, |project, cx| {
5955 project.find_or_create_local_worktree(path, true, cx)
5956 });
5957 if rng.lock().gen() {
5958 cx.background().spawn(find_or_create_worktree).detach();
5959 } else {
5960 find_or_create_worktree.await?;
5961 }
5962 }
5963 10..=80 if !files.lock().is_empty() => {
5964 let buffer = if client.buffers.is_empty() || rng.lock().gen() {
5965 let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
5966 let (worktree, path) = project
5967 .update(cx, |project, cx| {
5968 project.find_or_create_local_worktree(
5969 file.clone(),
5970 true,
5971 cx,
5972 )
5973 })
5974 .await?;
5975 let project_path =
5976 worktree.read_with(cx, |worktree, _| (worktree.id(), path));
5977 log::info!(
5978 "Host: opening path {:?}, worktree {}, relative_path {:?}",
5979 file,
5980 project_path.0,
5981 project_path.1
5982 );
5983 let buffer = project
5984 .update(cx, |project, cx| project.open_buffer(project_path, cx))
5985 .await
5986 .unwrap();
5987 client.buffers.insert(buffer.clone());
5988 buffer
5989 } else {
5990 client
5991 .buffers
5992 .iter()
5993 .choose(&mut *rng.lock())
5994 .unwrap()
5995 .clone()
5996 };
5997
5998 if rng.lock().gen_bool(0.1) {
5999 cx.update(|cx| {
6000 log::info!(
6001 "Host: dropping buffer {:?}",
6002 buffer.read(cx).file().unwrap().full_path(cx)
6003 );
6004 client.buffers.remove(&buffer);
6005 drop(buffer);
6006 });
6007 } else {
6008 buffer.update(cx, |buffer, cx| {
6009 log::info!(
6010 "Host: updating buffer {:?} ({})",
6011 buffer.file().unwrap().full_path(cx),
6012 buffer.remote_id()
6013 );
6014
6015 if rng.lock().gen_bool(0.7) {
6016 buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6017 } else {
6018 buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6019 }
6020 });
6021 }
6022 }
6023 _ => loop {
6024 let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
6025 let mut path = PathBuf::new();
6026 path.push("/");
6027 for _ in 0..path_component_count {
6028 let letter = rng.lock().gen_range(b'a'..=b'z');
6029 path.push(std::str::from_utf8(&[letter]).unwrap());
6030 }
6031 path.set_extension("rs");
6032 let parent_path = path.parent().unwrap();
6033
6034 log::info!("Host: creating file {:?}", path,);
6035
6036 if fs.create_dir(&parent_path).await.is_ok()
6037 && fs.create_file(&path, Default::default()).await.is_ok()
6038 {
6039 files.lock().push(path);
6040 break;
6041 } else {
6042 log::info!("Host: cannot create file");
6043 }
6044 },
6045 }
6046
6047 cx.background().simulate_random_delay().await;
6048 }
6049
6050 Ok(())
6051 }
6052
6053 let result = simulate_host_internal(
6054 &mut self,
6055 project.clone(),
6056 files,
6057 op_start_signal,
6058 rng,
6059 &mut cx,
6060 )
6061 .await;
6062 log::info!("Host done");
6063 self.project = Some(project);
6064 (self, cx, result.err())
6065 }
6066
6067 pub async fn simulate_guest(
6068 mut self,
6069 guest_username: String,
6070 project: ModelHandle<Project>,
6071 op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6072 rng: Arc<Mutex<StdRng>>,
6073 mut cx: TestAppContext,
6074 ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6075 async fn simulate_guest_internal(
6076 client: &mut TestClient,
6077 guest_username: &str,
6078 project: ModelHandle<Project>,
6079 mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6080 rng: Arc<Mutex<StdRng>>,
6081 cx: &mut TestAppContext,
6082 ) -> anyhow::Result<()> {
6083 while op_start_signal.next().await.is_some() {
6084 let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6085 let worktree = if let Some(worktree) =
6086 project.read_with(cx, |project, cx| {
6087 project
6088 .worktrees(&cx)
6089 .filter(|worktree| {
6090 let worktree = worktree.read(cx);
6091 worktree.is_visible()
6092 && worktree.entries(false).any(|e| e.is_file())
6093 })
6094 .choose(&mut *rng.lock())
6095 }) {
6096 worktree
6097 } else {
6098 cx.background().simulate_random_delay().await;
6099 continue;
6100 };
6101
6102 let (worktree_root_name, project_path) =
6103 worktree.read_with(cx, |worktree, _| {
6104 let entry = worktree
6105 .entries(false)
6106 .filter(|e| e.is_file())
6107 .choose(&mut *rng.lock())
6108 .unwrap();
6109 (
6110 worktree.root_name().to_string(),
6111 (worktree.id(), entry.path.clone()),
6112 )
6113 });
6114 log::info!(
6115 "{}: opening path {:?} in worktree {} ({})",
6116 guest_username,
6117 project_path.1,
6118 project_path.0,
6119 worktree_root_name,
6120 );
6121 let buffer = project
6122 .update(cx, |project, cx| {
6123 project.open_buffer(project_path.clone(), cx)
6124 })
6125 .await?;
6126 log::info!(
6127 "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
6128 guest_username,
6129 project_path.1,
6130 project_path.0,
6131 worktree_root_name,
6132 buffer.read_with(cx, |buffer, _| buffer.remote_id())
6133 );
6134 client.buffers.insert(buffer.clone());
6135 buffer
6136 } else {
6137 client
6138 .buffers
6139 .iter()
6140 .choose(&mut *rng.lock())
6141 .unwrap()
6142 .clone()
6143 };
6144
6145 let choice = rng.lock().gen_range(0..100);
6146 match choice {
6147 0..=9 => {
6148 cx.update(|cx| {
6149 log::info!(
6150 "{}: dropping buffer {:?}",
6151 guest_username,
6152 buffer.read(cx).file().unwrap().full_path(cx)
6153 );
6154 client.buffers.remove(&buffer);
6155 drop(buffer);
6156 });
6157 }
6158 10..=19 => {
6159 let completions = project.update(cx, |project, cx| {
6160 log::info!(
6161 "{}: requesting completions for buffer {} ({:?})",
6162 guest_username,
6163 buffer.read(cx).remote_id(),
6164 buffer.read(cx).file().unwrap().full_path(cx)
6165 );
6166 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6167 project.completions(&buffer, offset, cx)
6168 });
6169 let completions = cx.background().spawn(async move {
6170 completions
6171 .await
6172 .map_err(|err| anyhow!("completions request failed: {:?}", err))
6173 });
6174 if rng.lock().gen_bool(0.3) {
6175 log::info!("{}: detaching completions request", guest_username);
6176 cx.update(|cx| completions.detach_and_log_err(cx));
6177 } else {
6178 completions.await?;
6179 }
6180 }
6181 20..=29 => {
6182 let code_actions = project.update(cx, |project, cx| {
6183 log::info!(
6184 "{}: requesting code actions for buffer {} ({:?})",
6185 guest_username,
6186 buffer.read(cx).remote_id(),
6187 buffer.read(cx).file().unwrap().full_path(cx)
6188 );
6189 let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
6190 project.code_actions(&buffer, range, cx)
6191 });
6192 let code_actions = cx.background().spawn(async move {
6193 code_actions.await.map_err(|err| {
6194 anyhow!("code actions request failed: {:?}", err)
6195 })
6196 });
6197 if rng.lock().gen_bool(0.3) {
6198 log::info!("{}: detaching code actions request", guest_username);
6199 cx.update(|cx| code_actions.detach_and_log_err(cx));
6200 } else {
6201 code_actions.await?;
6202 }
6203 }
6204 30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
6205 let (requested_version, save) = buffer.update(cx, |buffer, cx| {
6206 log::info!(
6207 "{}: saving buffer {} ({:?})",
6208 guest_username,
6209 buffer.remote_id(),
6210 buffer.file().unwrap().full_path(cx)
6211 );
6212 (buffer.version(), buffer.save(cx))
6213 });
6214 let save = cx.background().spawn(async move {
6215 let (saved_version, _) = save
6216 .await
6217 .map_err(|err| anyhow!("save request failed: {:?}", err))?;
6218 assert!(saved_version.observed_all(&requested_version));
6219 Ok::<_, anyhow::Error>(())
6220 });
6221 if rng.lock().gen_bool(0.3) {
6222 log::info!("{}: detaching save request", guest_username);
6223 cx.update(|cx| save.detach_and_log_err(cx));
6224 } else {
6225 save.await?;
6226 }
6227 }
6228 40..=44 => {
6229 let prepare_rename = project.update(cx, |project, cx| {
6230 log::info!(
6231 "{}: preparing rename for buffer {} ({:?})",
6232 guest_username,
6233 buffer.read(cx).remote_id(),
6234 buffer.read(cx).file().unwrap().full_path(cx)
6235 );
6236 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6237 project.prepare_rename(buffer, offset, cx)
6238 });
6239 let prepare_rename = cx.background().spawn(async move {
6240 prepare_rename.await.map_err(|err| {
6241 anyhow!("prepare rename request failed: {:?}", err)
6242 })
6243 });
6244 if rng.lock().gen_bool(0.3) {
6245 log::info!("{}: detaching prepare rename request", guest_username);
6246 cx.update(|cx| prepare_rename.detach_and_log_err(cx));
6247 } else {
6248 prepare_rename.await?;
6249 }
6250 }
6251 45..=49 => {
6252 let definitions = project.update(cx, |project, cx| {
6253 log::info!(
6254 "{}: requesting definitions for buffer {} ({:?})",
6255 guest_username,
6256 buffer.read(cx).remote_id(),
6257 buffer.read(cx).file().unwrap().full_path(cx)
6258 );
6259 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6260 project.definition(&buffer, offset, cx)
6261 });
6262 let definitions = cx.background().spawn(async move {
6263 definitions
6264 .await
6265 .map_err(|err| anyhow!("definitions request failed: {:?}", err))
6266 });
6267 if rng.lock().gen_bool(0.3) {
6268 log::info!("{}: detaching definitions request", guest_username);
6269 cx.update(|cx| definitions.detach_and_log_err(cx));
6270 } else {
6271 client
6272 .buffers
6273 .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
6274 }
6275 }
6276 50..=54 => {
6277 let highlights = project.update(cx, |project, cx| {
6278 log::info!(
6279 "{}: requesting highlights for buffer {} ({:?})",
6280 guest_username,
6281 buffer.read(cx).remote_id(),
6282 buffer.read(cx).file().unwrap().full_path(cx)
6283 );
6284 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6285 project.document_highlights(&buffer, offset, cx)
6286 });
6287 let highlights = cx.background().spawn(async move {
6288 highlights
6289 .await
6290 .map_err(|err| anyhow!("highlights request failed: {:?}", err))
6291 });
6292 if rng.lock().gen_bool(0.3) {
6293 log::info!("{}: detaching highlights request", guest_username);
6294 cx.update(|cx| highlights.detach_and_log_err(cx));
6295 } else {
6296 highlights.await?;
6297 }
6298 }
6299 55..=59 => {
6300 let search = project.update(cx, |project, cx| {
6301 let query = rng.lock().gen_range('a'..='z');
6302 log::info!("{}: project-wide search {:?}", guest_username, query);
6303 project.search(SearchQuery::text(query, false, false), cx)
6304 });
6305 let search = cx.background().spawn(async move {
6306 search
6307 .await
6308 .map_err(|err| anyhow!("search request failed: {:?}", err))
6309 });
6310 if rng.lock().gen_bool(0.3) {
6311 log::info!("{}: detaching search request", guest_username);
6312 cx.update(|cx| search.detach_and_log_err(cx));
6313 } else {
6314 client.buffers.extend(search.await?.into_keys());
6315 }
6316 }
6317 _ => {
6318 buffer.update(cx, |buffer, cx| {
6319 log::info!(
6320 "{}: updating buffer {} ({:?})",
6321 guest_username,
6322 buffer.remote_id(),
6323 buffer.file().unwrap().full_path(cx)
6324 );
6325 if rng.lock().gen_bool(0.7) {
6326 buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6327 } else {
6328 buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6329 }
6330 });
6331 }
6332 }
6333 cx.background().simulate_random_delay().await;
6334 }
6335 Ok(())
6336 }
6337
6338 let result = simulate_guest_internal(
6339 &mut self,
6340 &guest_username,
6341 project.clone(),
6342 op_start_signal,
6343 rng,
6344 &mut cx,
6345 )
6346 .await;
6347 log::info!("{}: done", guest_username);
6348
6349 self.project = Some(project);
6350 (self, cx, result.err())
6351 }
6352 }
6353
6354 impl Drop for TestClient {
6355 fn drop(&mut self) {
6356 self.client.tear_down();
6357 }
6358 }
6359
6360 impl Executor for Arc<gpui::executor::Background> {
6361 type Sleep = gpui::executor::Timer;
6362
6363 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
6364 self.spawn(future).detach();
6365 }
6366
6367 fn sleep(&self, duration: Duration) -> Self::Sleep {
6368 self.as_ref().timer(duration)
6369 }
6370 }
6371
6372 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
6373 channel
6374 .messages()
6375 .cursor::<()>()
6376 .map(|m| {
6377 (
6378 m.sender.github_login.clone(),
6379 m.body.clone(),
6380 m.is_pending(),
6381 )
6382 })
6383 .collect()
6384 }
6385
6386 struct EmptyView;
6387
6388 impl gpui::Entity for EmptyView {
6389 type Event = ();
6390 }
6391
6392 impl gpui::View for EmptyView {
6393 fn ui_name() -> &'static str {
6394 "empty view"
6395 }
6396
6397 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
6398 gpui::Element::boxed(gpui::elements::Empty)
6399 }
6400 }
6401}