1mod store;
2
3use crate::{
4 auth,
5 db::{ChannelId, MessageId, UserId},
6 AppState, Result,
7};
8use anyhow::anyhow;
9use async_tungstenite::tungstenite::{
10 protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
11};
12use axum::{
13 body::Body,
14 extract::{
15 ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
16 ConnectInfo, WebSocketUpgrade,
17 },
18 headers::{Header, HeaderName},
19 http::StatusCode,
20 middleware,
21 response::{IntoResponse, Response},
22 routing::get,
23 Extension, Router, TypedHeader,
24};
25use collections::{HashMap, HashSet};
26use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
27use lazy_static::lazy_static;
28use rpc::{
29 proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
30 Connection, ConnectionId, Peer, TypedEnvelope,
31};
32use std::{
33 any::TypeId,
34 future::Future,
35 marker::PhantomData,
36 net::SocketAddr,
37 ops::{Deref, DerefMut},
38 rc::Rc,
39 sync::Arc,
40 time::Duration,
41};
42use store::{Store, Worktree};
43use time::OffsetDateTime;
44use tokio::{
45 sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
46 time::Sleep,
47};
48use tower::ServiceBuilder;
49use tracing::{info_span, instrument, Instrument};
50
51type MessageHandler =
52 Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
53
54pub struct Server {
55 peer: Arc<Peer>,
56 store: RwLock<Store>,
57 app_state: Arc<AppState>,
58 handlers: HashMap<TypeId, MessageHandler>,
59 notifications: Option<mpsc::UnboundedSender<()>>,
60}
61
62pub trait Executor: Send + Clone {
63 type Sleep: Send + Future;
64 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
65 fn sleep(&self, duration: Duration) -> Self::Sleep;
66}
67
68#[derive(Clone)]
69pub struct RealExecutor;
70
71const MESSAGE_COUNT_PER_PAGE: usize = 100;
72const MAX_MESSAGE_LEN: usize = 1024;
73
74struct StoreReadGuard<'a> {
75 guard: RwLockReadGuard<'a, Store>,
76 _not_send: PhantomData<Rc<()>>,
77}
78
79struct StoreWriteGuard<'a> {
80 guard: RwLockWriteGuard<'a, Store>,
81 _not_send: PhantomData<Rc<()>>,
82}
83
84impl Server {
85 pub fn new(
86 app_state: Arc<AppState>,
87 notifications: Option<mpsc::UnboundedSender<()>>,
88 ) -> Arc<Self> {
89 let mut server = Self {
90 peer: Peer::new(),
91 app_state,
92 store: Default::default(),
93 handlers: Default::default(),
94 notifications,
95 };
96
97 server
98 .add_request_handler(Server::ping)
99 .add_request_handler(Server::register_project)
100 .add_message_handler(Server::unregister_project)
101 .add_request_handler(Server::share_project)
102 .add_message_handler(Server::unshare_project)
103 .add_sync_request_handler(Server::join_project)
104 .add_message_handler(Server::leave_project)
105 .add_request_handler(Server::register_worktree)
106 .add_message_handler(Server::unregister_worktree)
107 .add_request_handler(Server::update_worktree)
108 .add_message_handler(Server::start_language_server)
109 .add_message_handler(Server::update_language_server)
110 .add_message_handler(Server::update_diagnostic_summary)
111 .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
112 .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
113 .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
114 .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
115 .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
116 .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
117 .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
118 .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
119 .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
120 .add_request_handler(
121 Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
122 )
123 .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
124 .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
125 .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
126 .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
127 .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
128 .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
129 .add_request_handler(Server::forward_project_request::<proto::CreateProjectEntry>)
130 .add_request_handler(Server::forward_project_request::<proto::RenameProjectEntry>)
131 .add_request_handler(Server::forward_project_request::<proto::DeleteProjectEntry>)
132 .add_request_handler(Server::update_buffer)
133 .add_message_handler(Server::update_buffer_file)
134 .add_message_handler(Server::buffer_reloaded)
135 .add_message_handler(Server::buffer_saved)
136 .add_request_handler(Server::save_buffer)
137 .add_request_handler(Server::get_channels)
138 .add_request_handler(Server::get_users)
139 .add_request_handler(Server::join_channel)
140 .add_message_handler(Server::leave_channel)
141 .add_request_handler(Server::send_channel_message)
142 .add_request_handler(Server::follow)
143 .add_message_handler(Server::unfollow)
144 .add_message_handler(Server::update_followers)
145 .add_request_handler(Server::get_channel_messages);
146
147 Arc::new(server)
148 }
149
150 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
151 where
152 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
153 Fut: 'static + Send + Future<Output = Result<()>>,
154 M: EnvelopedMessage,
155 {
156 let prev_handler = self.handlers.insert(
157 TypeId::of::<M>(),
158 Box::new(move |server, envelope| {
159 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
160 let span = info_span!(
161 "handle message",
162 payload_type = envelope.payload_type_name(),
163 payload = format!("{:?}", envelope.payload).as_str(),
164 );
165 let future = (handler)(server, *envelope);
166 async move {
167 if let Err(error) = future.await {
168 tracing::error!(%error, "error handling message");
169 }
170 }
171 .instrument(span)
172 .boxed()
173 }),
174 );
175 if prev_handler.is_some() {
176 panic!("registered a handler for the same message twice");
177 }
178 self
179 }
180
181 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
182 where
183 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
184 Fut: 'static + Send + Future<Output = Result<M::Response>>,
185 M: RequestMessage,
186 {
187 self.add_message_handler(move |server, envelope| {
188 let receipt = envelope.receipt();
189 let response = (handler)(server.clone(), envelope);
190 async move {
191 match response.await {
192 Ok(response) => {
193 server.peer.respond(receipt, response)?;
194 Ok(())
195 }
196 Err(error) => {
197 server.peer.respond_with_error(
198 receipt,
199 proto::Error {
200 message: error.to_string(),
201 },
202 )?;
203 Err(error)
204 }
205 }
206 }
207 })
208 }
209
210 /// Handle a request while holding a lock to the store. This is useful when we're registering
211 /// a connection but we want to respond on the connection before anybody else can send on it.
212 fn add_sync_request_handler<F, M>(&mut self, handler: F) -> &mut Self
213 where
214 F: 'static
215 + Send
216 + Sync
217 + Fn(Arc<Self>, &mut Store, TypedEnvelope<M>) -> Result<M::Response>,
218 M: RequestMessage,
219 {
220 let handler = Arc::new(handler);
221 self.add_message_handler(move |server, envelope| {
222 let receipt = envelope.receipt();
223 let handler = handler.clone();
224 async move {
225 let mut store = server.state_mut().await;
226 let response = (handler)(server.clone(), &mut *store, envelope);
227 match response {
228 Ok(response) => {
229 server.peer.respond(receipt, response)?;
230 Ok(())
231 }
232 Err(error) => {
233 server.peer.respond_with_error(
234 receipt,
235 proto::Error {
236 message: error.to_string(),
237 },
238 )?;
239 Err(error)
240 }
241 }
242 }
243 })
244 }
245
246 pub fn handle_connection<E: Executor>(
247 self: &Arc<Self>,
248 connection: Connection,
249 address: String,
250 user_id: UserId,
251 mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
252 executor: E,
253 ) -> impl Future<Output = ()> {
254 let mut this = self.clone();
255 let span = info_span!("handle connection", %user_id, %address);
256 async move {
257 let (connection_id, handle_io, mut incoming_rx) = this
258 .peer
259 .add_connection(connection, {
260 let executor = executor.clone();
261 move |duration| {
262 let timer = executor.sleep(duration);
263 async move {
264 timer.await;
265 }
266 }
267 })
268 .await;
269
270 tracing::info!(%user_id, %connection_id, %address, "connection opened");
271
272 if let Some(send_connection_id) = send_connection_id.as_mut() {
273 let _ = send_connection_id.send(connection_id).await;
274 }
275
276 {
277 let mut state = this.state_mut().await;
278 state.add_connection(connection_id, user_id);
279 this.update_contacts_for_users(&*state, &[user_id]);
280 }
281
282 let handle_io = handle_io.fuse();
283 futures::pin_mut!(handle_io);
284 loop {
285 let next_message = incoming_rx.next().fuse();
286 futures::pin_mut!(next_message);
287 futures::select_biased! {
288 result = handle_io => {
289 if let Err(error) = result {
290 tracing::error!(%error, "error handling I/O");
291 }
292 break;
293 }
294 message = next_message => {
295 if let Some(message) = message {
296 let type_name = message.payload_type_name();
297 let span = tracing::info_span!("receive message", %user_id, %connection_id, %address, type_name);
298 async {
299 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
300 let notifications = this.notifications.clone();
301 let is_background = message.is_background();
302 let handle_message = (handler)(this.clone(), message);
303 let handle_message = async move {
304 handle_message.await;
305 if let Some(mut notifications) = notifications {
306 let _ = notifications.send(()).await;
307 }
308 };
309 if is_background {
310 executor.spawn_detached(handle_message);
311 } else {
312 handle_message.await;
313 }
314 } else {
315 tracing::error!("no message handler");
316 }
317 }.instrument(span).await;
318 } else {
319 tracing::info!(%user_id, %connection_id, %address, "connection closed");
320 break;
321 }
322 }
323 }
324 }
325
326 if let Err(error) = this.sign_out(connection_id).await {
327 tracing::error!(%error, "error signing out");
328 }
329 }.instrument(span)
330 }
331
332 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
333 self.peer.disconnect(connection_id);
334 let mut state = self.state_mut().await;
335 let removed_connection = state.remove_connection(connection_id)?;
336
337 for (project_id, project) in removed_connection.hosted_projects {
338 if let Some(share) = project.share {
339 broadcast(
340 connection_id,
341 share.guests.keys().copied().collect(),
342 |conn_id| {
343 self.peer
344 .send(conn_id, proto::UnshareProject { project_id })
345 },
346 );
347 }
348 }
349
350 for (project_id, peer_ids) in removed_connection.guest_project_ids {
351 broadcast(connection_id, peer_ids, |conn_id| {
352 self.peer.send(
353 conn_id,
354 proto::RemoveProjectCollaborator {
355 project_id,
356 peer_id: connection_id.0,
357 },
358 )
359 });
360 }
361
362 self.update_contacts_for_users(&*state, removed_connection.contact_ids.iter());
363 Ok(())
364 }
365
366 async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> Result<proto::Ack> {
367 Ok(proto::Ack {})
368 }
369
370 async fn register_project(
371 self: Arc<Server>,
372 request: TypedEnvelope<proto::RegisterProject>,
373 ) -> Result<proto::RegisterProjectResponse> {
374 let project_id = {
375 let mut state = self.state_mut().await;
376 let user_id = state.user_id_for_connection(request.sender_id)?;
377 state.register_project(request.sender_id, user_id)
378 };
379 Ok(proto::RegisterProjectResponse { project_id })
380 }
381
382 async fn unregister_project(
383 self: Arc<Server>,
384 request: TypedEnvelope<proto::UnregisterProject>,
385 ) -> Result<()> {
386 let mut state = self.state_mut().await;
387 let project = state.unregister_project(request.payload.project_id, request.sender_id)?;
388 self.update_contacts_for_users(&*state, &project.authorized_user_ids());
389 Ok(())
390 }
391
392 async fn share_project(
393 self: Arc<Server>,
394 request: TypedEnvelope<proto::ShareProject>,
395 ) -> Result<proto::Ack> {
396 let mut state = self.state_mut().await;
397 let project = state.share_project(request.payload.project_id, request.sender_id)?;
398 self.update_contacts_for_users(&mut *state, &project.authorized_user_ids);
399 Ok(proto::Ack {})
400 }
401
402 async fn unshare_project(
403 self: Arc<Server>,
404 request: TypedEnvelope<proto::UnshareProject>,
405 ) -> Result<()> {
406 let project_id = request.payload.project_id;
407 let mut state = self.state_mut().await;
408 let project = state.unshare_project(project_id, request.sender_id)?;
409 broadcast(request.sender_id, project.connection_ids, |conn_id| {
410 self.peer
411 .send(conn_id, proto::UnshareProject { project_id })
412 });
413 self.update_contacts_for_users(&mut *state, &project.authorized_user_ids);
414 Ok(())
415 }
416
417 fn join_project(
418 self: Arc<Server>,
419 state: &mut Store,
420 request: TypedEnvelope<proto::JoinProject>,
421 ) -> Result<proto::JoinProjectResponse> {
422 let project_id = request.payload.project_id;
423
424 let user_id = state.user_id_for_connection(request.sender_id)?;
425 let (response, connection_ids, contact_user_ids) = state
426 .join_project(request.sender_id, user_id, project_id)
427 .and_then(|joined| {
428 let share = joined.project.share()?;
429 let peer_count = share.guests.len();
430 let mut collaborators = Vec::with_capacity(peer_count);
431 collaborators.push(proto::Collaborator {
432 peer_id: joined.project.host_connection_id.0,
433 replica_id: 0,
434 user_id: joined.project.host_user_id.to_proto(),
435 });
436 let worktrees = share
437 .worktrees
438 .iter()
439 .filter_map(|(id, shared_worktree)| {
440 let worktree = joined.project.worktrees.get(&id)?;
441 Some(proto::Worktree {
442 id: *id,
443 root_name: worktree.root_name.clone(),
444 entries: shared_worktree.entries.values().cloned().collect(),
445 diagnostic_summaries: shared_worktree
446 .diagnostic_summaries
447 .values()
448 .cloned()
449 .collect(),
450 visible: worktree.visible,
451 scan_id: shared_worktree.scan_id,
452 })
453 })
454 .collect();
455 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
456 if *peer_conn_id != request.sender_id {
457 collaborators.push(proto::Collaborator {
458 peer_id: peer_conn_id.0,
459 replica_id: *peer_replica_id as u32,
460 user_id: peer_user_id.to_proto(),
461 });
462 }
463 }
464 let response = proto::JoinProjectResponse {
465 worktrees,
466 replica_id: joined.replica_id as u32,
467 collaborators,
468 language_servers: joined.project.language_servers.clone(),
469 };
470 let connection_ids = joined.project.connection_ids();
471 let contact_user_ids = joined.project.authorized_user_ids();
472 Ok((response, connection_ids, contact_user_ids))
473 })?;
474
475 broadcast(request.sender_id, connection_ids, |conn_id| {
476 self.peer.send(
477 conn_id,
478 proto::AddProjectCollaborator {
479 project_id,
480 collaborator: Some(proto::Collaborator {
481 peer_id: request.sender_id.0,
482 replica_id: response.replica_id,
483 user_id: user_id.to_proto(),
484 }),
485 },
486 )
487 });
488 self.update_contacts_for_users(state, &contact_user_ids);
489 Ok(response)
490 }
491
492 async fn leave_project(
493 self: Arc<Server>,
494 request: TypedEnvelope<proto::LeaveProject>,
495 ) -> Result<()> {
496 let sender_id = request.sender_id;
497 let project_id = request.payload.project_id;
498 let mut state = self.state_mut().await;
499 let worktree = state.leave_project(sender_id, project_id)?;
500 broadcast(sender_id, worktree.connection_ids, |conn_id| {
501 self.peer.send(
502 conn_id,
503 proto::RemoveProjectCollaborator {
504 project_id,
505 peer_id: sender_id.0,
506 },
507 )
508 });
509 self.update_contacts_for_users(&*state, &worktree.authorized_user_ids);
510 Ok(())
511 }
512
513 async fn register_worktree(
514 self: Arc<Server>,
515 request: TypedEnvelope<proto::RegisterWorktree>,
516 ) -> Result<proto::Ack> {
517 let mut contact_user_ids = HashSet::default();
518 for github_login in &request.payload.authorized_logins {
519 let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
520 contact_user_ids.insert(contact_user_id);
521 }
522
523 let mut state = self.state_mut().await;
524 let host_user_id = state.user_id_for_connection(request.sender_id)?;
525 contact_user_ids.insert(host_user_id);
526
527 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
528 let guest_connection_ids = state
529 .read_project(request.payload.project_id, request.sender_id)?
530 .guest_connection_ids();
531 state.register_worktree(
532 request.payload.project_id,
533 request.payload.worktree_id,
534 request.sender_id,
535 Worktree {
536 authorized_user_ids: contact_user_ids.clone(),
537 root_name: request.payload.root_name.clone(),
538 visible: request.payload.visible,
539 },
540 )?;
541
542 broadcast(request.sender_id, guest_connection_ids, |connection_id| {
543 self.peer
544 .forward_send(request.sender_id, connection_id, request.payload.clone())
545 });
546 self.update_contacts_for_users(&*state, &contact_user_ids);
547 Ok(proto::Ack {})
548 }
549
550 async fn unregister_worktree(
551 self: Arc<Server>,
552 request: TypedEnvelope<proto::UnregisterWorktree>,
553 ) -> Result<()> {
554 let project_id = request.payload.project_id;
555 let worktree_id = request.payload.worktree_id;
556 let mut state = self.state_mut().await;
557 let (worktree, guest_connection_ids) =
558 state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
559 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
560 self.peer.send(
561 conn_id,
562 proto::UnregisterWorktree {
563 project_id,
564 worktree_id,
565 },
566 )
567 });
568 self.update_contacts_for_users(&*state, &worktree.authorized_user_ids);
569 Ok(())
570 }
571
572 async fn update_worktree(
573 self: Arc<Server>,
574 request: TypedEnvelope<proto::UpdateWorktree>,
575 ) -> Result<proto::Ack> {
576 let connection_ids = self.state_mut().await.update_worktree(
577 request.sender_id,
578 request.payload.project_id,
579 request.payload.worktree_id,
580 &request.payload.removed_entries,
581 &request.payload.updated_entries,
582 request.payload.scan_id,
583 )?;
584
585 broadcast(request.sender_id, connection_ids, |connection_id| {
586 self.peer
587 .forward_send(request.sender_id, connection_id, request.payload.clone())
588 });
589
590 Ok(proto::Ack {})
591 }
592
593 async fn update_diagnostic_summary(
594 self: Arc<Server>,
595 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
596 ) -> Result<()> {
597 let summary = request
598 .payload
599 .summary
600 .clone()
601 .ok_or_else(|| anyhow!("invalid summary"))?;
602 let receiver_ids = self.state_mut().await.update_diagnostic_summary(
603 request.payload.project_id,
604 request.payload.worktree_id,
605 request.sender_id,
606 summary,
607 )?;
608
609 broadcast(request.sender_id, receiver_ids, |connection_id| {
610 self.peer
611 .forward_send(request.sender_id, connection_id, request.payload.clone())
612 });
613 Ok(())
614 }
615
616 async fn start_language_server(
617 self: Arc<Server>,
618 request: TypedEnvelope<proto::StartLanguageServer>,
619 ) -> Result<()> {
620 let receiver_ids = self.state_mut().await.start_language_server(
621 request.payload.project_id,
622 request.sender_id,
623 request
624 .payload
625 .server
626 .clone()
627 .ok_or_else(|| anyhow!("invalid language server"))?,
628 )?;
629 broadcast(request.sender_id, receiver_ids, |connection_id| {
630 self.peer
631 .forward_send(request.sender_id, connection_id, request.payload.clone())
632 });
633 Ok(())
634 }
635
636 async fn update_language_server(
637 self: Arc<Server>,
638 request: TypedEnvelope<proto::UpdateLanguageServer>,
639 ) -> Result<()> {
640 let receiver_ids = self
641 .state()
642 .await
643 .project_connection_ids(request.payload.project_id, request.sender_id)?;
644 broadcast(request.sender_id, receiver_ids, |connection_id| {
645 self.peer
646 .forward_send(request.sender_id, connection_id, request.payload.clone())
647 });
648 Ok(())
649 }
650
651 async fn forward_project_request<T>(
652 self: Arc<Server>,
653 request: TypedEnvelope<T>,
654 ) -> Result<T::Response>
655 where
656 T: EntityMessage + RequestMessage,
657 {
658 let host_connection_id = self
659 .state()
660 .await
661 .read_project(request.payload.remote_entity_id(), request.sender_id)?
662 .host_connection_id;
663 Ok(self
664 .peer
665 .forward_request(request.sender_id, host_connection_id, request.payload)
666 .await?)
667 }
668
669 async fn save_buffer(
670 self: Arc<Server>,
671 request: TypedEnvelope<proto::SaveBuffer>,
672 ) -> Result<proto::BufferSaved> {
673 let host = self
674 .state()
675 .await
676 .read_project(request.payload.project_id, request.sender_id)?
677 .host_connection_id;
678 let response = self
679 .peer
680 .forward_request(request.sender_id, host, request.payload.clone())
681 .await?;
682
683 let mut guests = self
684 .state()
685 .await
686 .read_project(request.payload.project_id, request.sender_id)?
687 .connection_ids();
688 guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
689 broadcast(host, guests, |conn_id| {
690 self.peer.forward_send(host, conn_id, response.clone())
691 });
692
693 Ok(response)
694 }
695
696 async fn update_buffer(
697 self: Arc<Server>,
698 request: TypedEnvelope<proto::UpdateBuffer>,
699 ) -> Result<proto::Ack> {
700 let receiver_ids = self
701 .state()
702 .await
703 .project_connection_ids(request.payload.project_id, request.sender_id)?;
704 broadcast(request.sender_id, receiver_ids, |connection_id| {
705 self.peer
706 .forward_send(request.sender_id, connection_id, request.payload.clone())
707 });
708 Ok(proto::Ack {})
709 }
710
711 async fn update_buffer_file(
712 self: Arc<Server>,
713 request: TypedEnvelope<proto::UpdateBufferFile>,
714 ) -> Result<()> {
715 let receiver_ids = self
716 .state()
717 .await
718 .project_connection_ids(request.payload.project_id, request.sender_id)?;
719 broadcast(request.sender_id, receiver_ids, |connection_id| {
720 self.peer
721 .forward_send(request.sender_id, connection_id, request.payload.clone())
722 });
723 Ok(())
724 }
725
726 async fn buffer_reloaded(
727 self: Arc<Server>,
728 request: TypedEnvelope<proto::BufferReloaded>,
729 ) -> Result<()> {
730 let receiver_ids = self
731 .state()
732 .await
733 .project_connection_ids(request.payload.project_id, request.sender_id)?;
734 broadcast(request.sender_id, receiver_ids, |connection_id| {
735 self.peer
736 .forward_send(request.sender_id, connection_id, request.payload.clone())
737 });
738 Ok(())
739 }
740
741 async fn buffer_saved(
742 self: Arc<Server>,
743 request: TypedEnvelope<proto::BufferSaved>,
744 ) -> Result<()> {
745 let receiver_ids = self
746 .state()
747 .await
748 .project_connection_ids(request.payload.project_id, request.sender_id)?;
749 broadcast(request.sender_id, receiver_ids, |connection_id| {
750 self.peer
751 .forward_send(request.sender_id, connection_id, request.payload.clone())
752 });
753 Ok(())
754 }
755
756 async fn follow(
757 self: Arc<Self>,
758 request: TypedEnvelope<proto::Follow>,
759 ) -> Result<proto::FollowResponse> {
760 let leader_id = ConnectionId(request.payload.leader_id);
761 let follower_id = request.sender_id;
762 if !self
763 .state()
764 .await
765 .project_connection_ids(request.payload.project_id, follower_id)?
766 .contains(&leader_id)
767 {
768 Err(anyhow!("no such peer"))?;
769 }
770 let mut response = self
771 .peer
772 .forward_request(request.sender_id, leader_id, request.payload)
773 .await?;
774 response
775 .views
776 .retain(|view| view.leader_id != Some(follower_id.0));
777 Ok(response)
778 }
779
780 async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
781 let leader_id = ConnectionId(request.payload.leader_id);
782 if !self
783 .state()
784 .await
785 .project_connection_ids(request.payload.project_id, request.sender_id)?
786 .contains(&leader_id)
787 {
788 Err(anyhow!("no such peer"))?;
789 }
790 self.peer
791 .forward_send(request.sender_id, leader_id, request.payload)?;
792 Ok(())
793 }
794
795 async fn update_followers(
796 self: Arc<Self>,
797 request: TypedEnvelope<proto::UpdateFollowers>,
798 ) -> Result<()> {
799 let connection_ids = self
800 .state()
801 .await
802 .project_connection_ids(request.payload.project_id, request.sender_id)?;
803 let leader_id = request
804 .payload
805 .variant
806 .as_ref()
807 .and_then(|variant| match variant {
808 proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
809 proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
810 proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
811 });
812 for follower_id in &request.payload.follower_ids {
813 let follower_id = ConnectionId(*follower_id);
814 if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
815 self.peer
816 .forward_send(request.sender_id, follower_id, request.payload.clone())?;
817 }
818 }
819 Ok(())
820 }
821
822 async fn get_channels(
823 self: Arc<Server>,
824 request: TypedEnvelope<proto::GetChannels>,
825 ) -> Result<proto::GetChannelsResponse> {
826 let user_id = self
827 .state()
828 .await
829 .user_id_for_connection(request.sender_id)?;
830 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
831 Ok(proto::GetChannelsResponse {
832 channels: channels
833 .into_iter()
834 .map(|chan| proto::Channel {
835 id: chan.id.to_proto(),
836 name: chan.name,
837 })
838 .collect(),
839 })
840 }
841
842 async fn get_users(
843 self: Arc<Server>,
844 request: TypedEnvelope<proto::GetUsers>,
845 ) -> Result<proto::GetUsersResponse> {
846 let user_ids = request
847 .payload
848 .user_ids
849 .into_iter()
850 .map(UserId::from_proto)
851 .collect();
852 let users = self
853 .app_state
854 .db
855 .get_users_by_ids(user_ids)
856 .await?
857 .into_iter()
858 .map(|user| proto::User {
859 id: user.id.to_proto(),
860 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
861 github_login: user.github_login,
862 })
863 .collect();
864 Ok(proto::GetUsersResponse { users })
865 }
866
867 #[instrument(skip(self, state, user_ids))]
868 fn update_contacts_for_users<'a>(
869 self: &Arc<Self>,
870 state: &Store,
871 user_ids: impl IntoIterator<Item = &'a UserId>,
872 ) {
873 for user_id in user_ids {
874 let contacts = state.contacts_for_user(*user_id);
875 for connection_id in state.connection_ids_for_user(*user_id) {
876 self.peer
877 .send(
878 connection_id,
879 proto::UpdateContacts {
880 contacts: contacts.clone(),
881 },
882 )
883 .trace_err();
884 }
885 }
886 }
887
888 async fn join_channel(
889 self: Arc<Self>,
890 request: TypedEnvelope<proto::JoinChannel>,
891 ) -> Result<proto::JoinChannelResponse> {
892 let user_id = self
893 .state()
894 .await
895 .user_id_for_connection(request.sender_id)?;
896 let channel_id = ChannelId::from_proto(request.payload.channel_id);
897 if !self
898 .app_state
899 .db
900 .can_user_access_channel(user_id, channel_id)
901 .await?
902 {
903 Err(anyhow!("access denied"))?;
904 }
905
906 self.state_mut()
907 .await
908 .join_channel(request.sender_id, channel_id);
909 let messages = self
910 .app_state
911 .db
912 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
913 .await?
914 .into_iter()
915 .map(|msg| proto::ChannelMessage {
916 id: msg.id.to_proto(),
917 body: msg.body,
918 timestamp: msg.sent_at.unix_timestamp() as u64,
919 sender_id: msg.sender_id.to_proto(),
920 nonce: Some(msg.nonce.as_u128().into()),
921 })
922 .collect::<Vec<_>>();
923 Ok(proto::JoinChannelResponse {
924 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
925 messages,
926 })
927 }
928
929 async fn leave_channel(
930 self: Arc<Self>,
931 request: TypedEnvelope<proto::LeaveChannel>,
932 ) -> Result<()> {
933 let user_id = self
934 .state()
935 .await
936 .user_id_for_connection(request.sender_id)?;
937 let channel_id = ChannelId::from_proto(request.payload.channel_id);
938 if !self
939 .app_state
940 .db
941 .can_user_access_channel(user_id, channel_id)
942 .await?
943 {
944 Err(anyhow!("access denied"))?;
945 }
946
947 self.state_mut()
948 .await
949 .leave_channel(request.sender_id, channel_id);
950
951 Ok(())
952 }
953
954 async fn send_channel_message(
955 self: Arc<Self>,
956 request: TypedEnvelope<proto::SendChannelMessage>,
957 ) -> Result<proto::SendChannelMessageResponse> {
958 let channel_id = ChannelId::from_proto(request.payload.channel_id);
959 let user_id;
960 let connection_ids;
961 {
962 let state = self.state().await;
963 user_id = state.user_id_for_connection(request.sender_id)?;
964 connection_ids = state.channel_connection_ids(channel_id)?;
965 }
966
967 // Validate the message body.
968 let body = request.payload.body.trim().to_string();
969 if body.len() > MAX_MESSAGE_LEN {
970 return Err(anyhow!("message is too long"))?;
971 }
972 if body.is_empty() {
973 return Err(anyhow!("message can't be blank"))?;
974 }
975
976 let timestamp = OffsetDateTime::now_utc();
977 let nonce = request
978 .payload
979 .nonce
980 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
981
982 let message_id = self
983 .app_state
984 .db
985 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
986 .await?
987 .to_proto();
988 let message = proto::ChannelMessage {
989 sender_id: user_id.to_proto(),
990 id: message_id,
991 body,
992 timestamp: timestamp.unix_timestamp() as u64,
993 nonce: Some(nonce),
994 };
995 broadcast(request.sender_id, connection_ids, |conn_id| {
996 self.peer.send(
997 conn_id,
998 proto::ChannelMessageSent {
999 channel_id: channel_id.to_proto(),
1000 message: Some(message.clone()),
1001 },
1002 )
1003 });
1004 Ok(proto::SendChannelMessageResponse {
1005 message: Some(message),
1006 })
1007 }
1008
1009 async fn get_channel_messages(
1010 self: Arc<Self>,
1011 request: TypedEnvelope<proto::GetChannelMessages>,
1012 ) -> Result<proto::GetChannelMessagesResponse> {
1013 let user_id = self
1014 .state()
1015 .await
1016 .user_id_for_connection(request.sender_id)?;
1017 let channel_id = ChannelId::from_proto(request.payload.channel_id);
1018 if !self
1019 .app_state
1020 .db
1021 .can_user_access_channel(user_id, channel_id)
1022 .await?
1023 {
1024 Err(anyhow!("access denied"))?;
1025 }
1026
1027 let messages = self
1028 .app_state
1029 .db
1030 .get_channel_messages(
1031 channel_id,
1032 MESSAGE_COUNT_PER_PAGE,
1033 Some(MessageId::from_proto(request.payload.before_message_id)),
1034 )
1035 .await?
1036 .into_iter()
1037 .map(|msg| proto::ChannelMessage {
1038 id: msg.id.to_proto(),
1039 body: msg.body,
1040 timestamp: msg.sent_at.unix_timestamp() as u64,
1041 sender_id: msg.sender_id.to_proto(),
1042 nonce: Some(msg.nonce.as_u128().into()),
1043 })
1044 .collect::<Vec<_>>();
1045
1046 Ok(proto::GetChannelMessagesResponse {
1047 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1048 messages,
1049 })
1050 }
1051
1052 async fn state<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1053 #[cfg(test)]
1054 tokio::task::yield_now().await;
1055 let guard = self.store.read().await;
1056 #[cfg(test)]
1057 tokio::task::yield_now().await;
1058 StoreReadGuard {
1059 guard,
1060 _not_send: PhantomData,
1061 }
1062 }
1063
1064 async fn state_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1065 #[cfg(test)]
1066 tokio::task::yield_now().await;
1067 let guard = self.store.write().await;
1068 #[cfg(test)]
1069 tokio::task::yield_now().await;
1070 StoreWriteGuard {
1071 guard,
1072 _not_send: PhantomData,
1073 }
1074 }
1075}
1076
1077impl<'a> Deref for StoreReadGuard<'a> {
1078 type Target = Store;
1079
1080 fn deref(&self) -> &Self::Target {
1081 &*self.guard
1082 }
1083}
1084
1085impl<'a> Deref for StoreWriteGuard<'a> {
1086 type Target = Store;
1087
1088 fn deref(&self) -> &Self::Target {
1089 &*self.guard
1090 }
1091}
1092
1093impl<'a> DerefMut for StoreWriteGuard<'a> {
1094 fn deref_mut(&mut self) -> &mut Self::Target {
1095 &mut *self.guard
1096 }
1097}
1098
1099impl<'a> Drop for StoreWriteGuard<'a> {
1100 fn drop(&mut self) {
1101 #[cfg(test)]
1102 self.check_invariants();
1103
1104 let metrics = self.metrics();
1105 tracing::info!(
1106 connections = metrics.connections,
1107 registered_projects = metrics.registered_projects,
1108 shared_projects = metrics.shared_projects,
1109 "metrics"
1110 );
1111 }
1112}
1113
1114impl Executor for RealExecutor {
1115 type Sleep = Sleep;
1116
1117 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1118 tokio::task::spawn(future);
1119 }
1120
1121 fn sleep(&self, duration: Duration) -> Self::Sleep {
1122 tokio::time::sleep(duration)
1123 }
1124}
1125
1126#[instrument(skip(f))]
1127fn broadcast<F>(sender_id: ConnectionId, receiver_ids: Vec<ConnectionId>, mut f: F)
1128where
1129 F: FnMut(ConnectionId) -> anyhow::Result<()>,
1130{
1131 for receiver_id in receiver_ids {
1132 if receiver_id != sender_id {
1133 f(receiver_id).trace_err();
1134 }
1135 }
1136}
1137
1138lazy_static! {
1139 static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1140}
1141
1142pub struct ProtocolVersion(u32);
1143
1144impl Header for ProtocolVersion {
1145 fn name() -> &'static HeaderName {
1146 &ZED_PROTOCOL_VERSION
1147 }
1148
1149 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1150 where
1151 Self: Sized,
1152 I: Iterator<Item = &'i axum::http::HeaderValue>,
1153 {
1154 let version = values
1155 .next()
1156 .ok_or_else(|| axum::headers::Error::invalid())?
1157 .to_str()
1158 .map_err(|_| axum::headers::Error::invalid())?
1159 .parse()
1160 .map_err(|_| axum::headers::Error::invalid())?;
1161 Ok(Self(version))
1162 }
1163
1164 fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1165 values.extend([self.0.to_string().parse().unwrap()]);
1166 }
1167}
1168
1169pub fn routes(app_state: Arc<AppState>) -> Router<Body> {
1170 let server = Server::new(app_state.clone(), None);
1171 Router::new()
1172 .route("/rpc", get(handle_websocket_request))
1173 .layer(
1174 ServiceBuilder::new()
1175 .layer(Extension(app_state))
1176 .layer(middleware::from_fn(auth::validate_header))
1177 .layer(Extension(server)),
1178 )
1179}
1180
1181pub async fn handle_websocket_request(
1182 TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1183 ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1184 Extension(server): Extension<Arc<Server>>,
1185 Extension(user_id): Extension<UserId>,
1186 ws: WebSocketUpgrade,
1187) -> Response {
1188 if protocol_version != rpc::PROTOCOL_VERSION {
1189 return (
1190 StatusCode::UPGRADE_REQUIRED,
1191 "client must be upgraded".to_string(),
1192 )
1193 .into_response();
1194 }
1195 let socket_address = socket_address.to_string();
1196 ws.on_upgrade(move |socket| {
1197 let socket = socket
1198 .map_ok(to_tungstenite_message)
1199 .err_into()
1200 .with(|message| async move { Ok(to_axum_message(message)) });
1201 let connection = Connection::new(Box::pin(socket));
1202 server.handle_connection(connection, socket_address, user_id, None, RealExecutor)
1203 })
1204}
1205
1206fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1207 match message {
1208 TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1209 TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1210 TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1211 TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1212 TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1213 code: frame.code.into(),
1214 reason: frame.reason,
1215 })),
1216 }
1217}
1218
1219fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1220 match message {
1221 AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1222 AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1223 AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1224 AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1225 AxumMessage::Close(frame) => {
1226 TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1227 code: frame.code.into(),
1228 reason: frame.reason,
1229 }))
1230 }
1231 }
1232}
1233
1234pub trait ResultExt {
1235 type Ok;
1236
1237 fn trace_err(self) -> Option<Self::Ok>;
1238}
1239
1240impl<T, E> ResultExt for Result<T, E>
1241where
1242 E: std::fmt::Debug,
1243{
1244 type Ok = T;
1245
1246 fn trace_err(self) -> Option<T> {
1247 match self {
1248 Ok(value) => Some(value),
1249 Err(error) => {
1250 tracing::error!("{:?}", error);
1251 None
1252 }
1253 }
1254 }
1255}
1256
1257#[cfg(test)]
1258mod tests {
1259 use super::*;
1260 use crate::{
1261 db::{tests::TestDb, UserId},
1262 AppState,
1263 };
1264 use ::rpc::Peer;
1265 use client::{
1266 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1267 EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1268 };
1269 use collections::BTreeMap;
1270 use editor::{
1271 self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1272 ToOffset, ToggleCodeActions, Undo,
1273 };
1274 use gpui::{
1275 executor::{self, Deterministic},
1276 geometry::vector::vec2f,
1277 ModelHandle, TestAppContext, ViewHandle,
1278 };
1279 use language::{
1280 range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1281 LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1282 };
1283 use lsp::{self, FakeLanguageServer};
1284 use parking_lot::Mutex;
1285 use project::{
1286 fs::{FakeFs, Fs as _},
1287 search::SearchQuery,
1288 worktree::WorktreeHandle,
1289 DiagnosticSummary, Project, ProjectPath, WorktreeId,
1290 };
1291 use rand::prelude::*;
1292 use rpc::PeerId;
1293 use serde_json::json;
1294 use settings::Settings;
1295 use sqlx::types::time::OffsetDateTime;
1296 use std::{
1297 env,
1298 ops::Deref,
1299 path::{Path, PathBuf},
1300 rc::Rc,
1301 sync::{
1302 atomic::{AtomicBool, Ordering::SeqCst},
1303 Arc,
1304 },
1305 time::Duration,
1306 };
1307 use theme::ThemeRegistry;
1308 use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
1309
1310 #[cfg(test)]
1311 #[ctor::ctor]
1312 fn init_logger() {
1313 if std::env::var("RUST_LOG").is_ok() {
1314 env_logger::init();
1315 }
1316 }
1317
1318 #[gpui::test(iterations = 10)]
1319 async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1320 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1321 let lang_registry = Arc::new(LanguageRegistry::test());
1322 let fs = FakeFs::new(cx_a.background());
1323 cx_a.foreground().forbid_parking();
1324
1325 // Connect to a server as 2 clients.
1326 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1327 let client_a = server.create_client(cx_a, "user_a").await;
1328 let client_b = server.create_client(cx_b, "user_b").await;
1329
1330 // Share a project as client A
1331 fs.insert_tree(
1332 "/a",
1333 json!({
1334 ".zed.toml": r#"collaborators = ["user_b"]"#,
1335 "a.txt": "a-contents",
1336 "b.txt": "b-contents",
1337 }),
1338 )
1339 .await;
1340 let project_a = cx_a.update(|cx| {
1341 Project::local(
1342 client_a.clone(),
1343 client_a.user_store.clone(),
1344 lang_registry.clone(),
1345 fs.clone(),
1346 cx,
1347 )
1348 });
1349 let (worktree_a, _) = project_a
1350 .update(cx_a, |p, cx| {
1351 p.find_or_create_local_worktree("/a", true, cx)
1352 })
1353 .await
1354 .unwrap();
1355 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1356 worktree_a
1357 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1358 .await;
1359 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1360 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1361
1362 // Join that project as client B
1363 let project_b = Project::remote(
1364 project_id,
1365 client_b.clone(),
1366 client_b.user_store.clone(),
1367 lang_registry.clone(),
1368 fs.clone(),
1369 &mut cx_b.to_async(),
1370 )
1371 .await
1372 .unwrap();
1373
1374 let replica_id_b = project_b.read_with(cx_b, |project, _| {
1375 assert_eq!(
1376 project
1377 .collaborators()
1378 .get(&client_a.peer_id)
1379 .unwrap()
1380 .user
1381 .github_login,
1382 "user_a"
1383 );
1384 project.replica_id()
1385 });
1386 project_a
1387 .condition(&cx_a, |tree, _| {
1388 tree.collaborators()
1389 .get(&client_b.peer_id)
1390 .map_or(false, |collaborator| {
1391 collaborator.replica_id == replica_id_b
1392 && collaborator.user.github_login == "user_b"
1393 })
1394 })
1395 .await;
1396
1397 // Open the same file as client B and client A.
1398 let buffer_b = project_b
1399 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1400 .await
1401 .unwrap();
1402 buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1403 project_a.read_with(cx_a, |project, cx| {
1404 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1405 });
1406 let buffer_a = project_a
1407 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1408 .await
1409 .unwrap();
1410
1411 let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1412
1413 // TODO
1414 // // Create a selection set as client B and see that selection set as client A.
1415 // buffer_a
1416 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1417 // .await;
1418
1419 // Edit the buffer as client B and see that edit as client A.
1420 editor_b.update(cx_b, |editor, cx| {
1421 editor.handle_input(&Input("ok, ".into()), cx)
1422 });
1423 buffer_a
1424 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1425 .await;
1426
1427 // TODO
1428 // // Remove the selection set as client B, see those selections disappear as client A.
1429 cx_b.update(move |_| drop(editor_b));
1430 // buffer_a
1431 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1432 // .await;
1433
1434 // Dropping the client B's project removes client B from client A's collaborators.
1435 cx_b.update(move |_| drop(project_b));
1436 project_a
1437 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1438 .await;
1439 }
1440
1441 #[gpui::test(iterations = 10)]
1442 async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1443 let lang_registry = Arc::new(LanguageRegistry::test());
1444 let fs = FakeFs::new(cx_a.background());
1445 cx_a.foreground().forbid_parking();
1446
1447 // Connect to a server as 2 clients.
1448 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1449 let client_a = server.create_client(cx_a, "user_a").await;
1450 let client_b = server.create_client(cx_b, "user_b").await;
1451
1452 // Share a project as client A
1453 fs.insert_tree(
1454 "/a",
1455 json!({
1456 ".zed.toml": r#"collaborators = ["user_b"]"#,
1457 "a.txt": "a-contents",
1458 "b.txt": "b-contents",
1459 }),
1460 )
1461 .await;
1462 let project_a = cx_a.update(|cx| {
1463 Project::local(
1464 client_a.clone(),
1465 client_a.user_store.clone(),
1466 lang_registry.clone(),
1467 fs.clone(),
1468 cx,
1469 )
1470 });
1471 let (worktree_a, _) = project_a
1472 .update(cx_a, |p, cx| {
1473 p.find_or_create_local_worktree("/a", true, cx)
1474 })
1475 .await
1476 .unwrap();
1477 worktree_a
1478 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1479 .await;
1480 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1481 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1482 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1483 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1484
1485 // Join that project as client B
1486 let project_b = Project::remote(
1487 project_id,
1488 client_b.clone(),
1489 client_b.user_store.clone(),
1490 lang_registry.clone(),
1491 fs.clone(),
1492 &mut cx_b.to_async(),
1493 )
1494 .await
1495 .unwrap();
1496 project_b
1497 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1498 .await
1499 .unwrap();
1500
1501 // Unshare the project as client A
1502 project_a.update(cx_a, |project, cx| project.unshare(cx));
1503 project_b
1504 .condition(cx_b, |project, _| project.is_read_only())
1505 .await;
1506 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1507 cx_b.update(|_| {
1508 drop(project_b);
1509 });
1510
1511 // Share the project again and ensure guests can still join.
1512 project_a
1513 .update(cx_a, |project, cx| project.share(cx))
1514 .await
1515 .unwrap();
1516 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1517
1518 let project_b2 = Project::remote(
1519 project_id,
1520 client_b.clone(),
1521 client_b.user_store.clone(),
1522 lang_registry.clone(),
1523 fs.clone(),
1524 &mut cx_b.to_async(),
1525 )
1526 .await
1527 .unwrap();
1528 project_b2
1529 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1530 .await
1531 .unwrap();
1532 }
1533
1534 #[gpui::test(iterations = 10)]
1535 async fn test_host_disconnect(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1536 let lang_registry = Arc::new(LanguageRegistry::test());
1537 let fs = FakeFs::new(cx_a.background());
1538 cx_a.foreground().forbid_parking();
1539
1540 // Connect to a server as 2 clients.
1541 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1542 let client_a = server.create_client(cx_a, "user_a").await;
1543 let client_b = server.create_client(cx_b, "user_b").await;
1544
1545 // Share a project as client A
1546 fs.insert_tree(
1547 "/a",
1548 json!({
1549 ".zed.toml": r#"collaborators = ["user_b"]"#,
1550 "a.txt": "a-contents",
1551 "b.txt": "b-contents",
1552 }),
1553 )
1554 .await;
1555 let project_a = cx_a.update(|cx| {
1556 Project::local(
1557 client_a.clone(),
1558 client_a.user_store.clone(),
1559 lang_registry.clone(),
1560 fs.clone(),
1561 cx,
1562 )
1563 });
1564 let (worktree_a, _) = project_a
1565 .update(cx_a, |p, cx| {
1566 p.find_or_create_local_worktree("/a", true, cx)
1567 })
1568 .await
1569 .unwrap();
1570 worktree_a
1571 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1572 .await;
1573 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1574 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1575 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1576 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1577
1578 // Join that project as client B
1579 let project_b = Project::remote(
1580 project_id,
1581 client_b.clone(),
1582 client_b.user_store.clone(),
1583 lang_registry.clone(),
1584 fs.clone(),
1585 &mut cx_b.to_async(),
1586 )
1587 .await
1588 .unwrap();
1589 project_b
1590 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1591 .await
1592 .unwrap();
1593
1594 // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
1595 server.disconnect_client(client_a.current_user_id(cx_a));
1596 cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
1597 project_a
1598 .condition(cx_a, |project, _| project.collaborators().is_empty())
1599 .await;
1600 project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
1601 project_b
1602 .condition(cx_b, |project, _| project.is_read_only())
1603 .await;
1604 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1605 cx_b.update(|_| {
1606 drop(project_b);
1607 });
1608
1609 // Await reconnection
1610 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1611
1612 // Share the project again and ensure guests can still join.
1613 project_a
1614 .update(cx_a, |project, cx| project.share(cx))
1615 .await
1616 .unwrap();
1617 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1618
1619 let project_b2 = Project::remote(
1620 project_id,
1621 client_b.clone(),
1622 client_b.user_store.clone(),
1623 lang_registry.clone(),
1624 fs.clone(),
1625 &mut cx_b.to_async(),
1626 )
1627 .await
1628 .unwrap();
1629 project_b2
1630 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1631 .await
1632 .unwrap();
1633 }
1634
1635 #[gpui::test(iterations = 10)]
1636 async fn test_propagate_saves_and_fs_changes(
1637 cx_a: &mut TestAppContext,
1638 cx_b: &mut TestAppContext,
1639 cx_c: &mut TestAppContext,
1640 ) {
1641 let lang_registry = Arc::new(LanguageRegistry::test());
1642 let fs = FakeFs::new(cx_a.background());
1643 cx_a.foreground().forbid_parking();
1644
1645 // Connect to a server as 3 clients.
1646 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1647 let client_a = server.create_client(cx_a, "user_a").await;
1648 let client_b = server.create_client(cx_b, "user_b").await;
1649 let client_c = server.create_client(cx_c, "user_c").await;
1650
1651 // Share a worktree as client A.
1652 fs.insert_tree(
1653 "/a",
1654 json!({
1655 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1656 "file1": "",
1657 "file2": ""
1658 }),
1659 )
1660 .await;
1661 let project_a = cx_a.update(|cx| {
1662 Project::local(
1663 client_a.clone(),
1664 client_a.user_store.clone(),
1665 lang_registry.clone(),
1666 fs.clone(),
1667 cx,
1668 )
1669 });
1670 let (worktree_a, _) = project_a
1671 .update(cx_a, |p, cx| {
1672 p.find_or_create_local_worktree("/a", true, cx)
1673 })
1674 .await
1675 .unwrap();
1676 worktree_a
1677 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1678 .await;
1679 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1680 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1681 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1682
1683 // Join that worktree as clients B and C.
1684 let project_b = Project::remote(
1685 project_id,
1686 client_b.clone(),
1687 client_b.user_store.clone(),
1688 lang_registry.clone(),
1689 fs.clone(),
1690 &mut cx_b.to_async(),
1691 )
1692 .await
1693 .unwrap();
1694 let project_c = Project::remote(
1695 project_id,
1696 client_c.clone(),
1697 client_c.user_store.clone(),
1698 lang_registry.clone(),
1699 fs.clone(),
1700 &mut cx_c.to_async(),
1701 )
1702 .await
1703 .unwrap();
1704 let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1705 let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1706
1707 // Open and edit a buffer as both guests B and C.
1708 let buffer_b = project_b
1709 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1710 .await
1711 .unwrap();
1712 let buffer_c = project_c
1713 .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1714 .await
1715 .unwrap();
1716 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "i-am-b, ")], cx));
1717 buffer_c.update(cx_c, |buf, cx| buf.edit([(0..0, "i-am-c, ")], cx));
1718
1719 // Open and edit that buffer as the host.
1720 let buffer_a = project_a
1721 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1722 .await
1723 .unwrap();
1724
1725 buffer_a
1726 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1727 .await;
1728 buffer_a.update(cx_a, |buf, cx| {
1729 buf.edit([(buf.len()..buf.len(), "i-am-a")], cx)
1730 });
1731
1732 // Wait for edits to propagate
1733 buffer_a
1734 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1735 .await;
1736 buffer_b
1737 .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1738 .await;
1739 buffer_c
1740 .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1741 .await;
1742
1743 // Edit the buffer as the host and concurrently save as guest B.
1744 let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1745 buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "hi-a, ")], cx));
1746 save_b.await.unwrap();
1747 assert_eq!(
1748 fs.load("/a/file1".as_ref()).await.unwrap(),
1749 "hi-a, i-am-c, i-am-b, i-am-a"
1750 );
1751 buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1752 buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1753 buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1754
1755 worktree_a.flush_fs_events(cx_a).await;
1756
1757 // Make changes on host's file system, see those changes on guest worktrees.
1758 fs.rename(
1759 "/a/file1".as_ref(),
1760 "/a/file1-renamed".as_ref(),
1761 Default::default(),
1762 )
1763 .await
1764 .unwrap();
1765
1766 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1767 .await
1768 .unwrap();
1769 fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1770
1771 worktree_a
1772 .condition(&cx_a, |tree, _| {
1773 tree.paths()
1774 .map(|p| p.to_string_lossy())
1775 .collect::<Vec<_>>()
1776 == [".zed.toml", "file1-renamed", "file3", "file4"]
1777 })
1778 .await;
1779 worktree_b
1780 .condition(&cx_b, |tree, _| {
1781 tree.paths()
1782 .map(|p| p.to_string_lossy())
1783 .collect::<Vec<_>>()
1784 == [".zed.toml", "file1-renamed", "file3", "file4"]
1785 })
1786 .await;
1787 worktree_c
1788 .condition(&cx_c, |tree, _| {
1789 tree.paths()
1790 .map(|p| p.to_string_lossy())
1791 .collect::<Vec<_>>()
1792 == [".zed.toml", "file1-renamed", "file3", "file4"]
1793 })
1794 .await;
1795
1796 // Ensure buffer files are updated as well.
1797 buffer_a
1798 .condition(&cx_a, |buf, _| {
1799 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1800 })
1801 .await;
1802 buffer_b
1803 .condition(&cx_b, |buf, _| {
1804 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1805 })
1806 .await;
1807 buffer_c
1808 .condition(&cx_c, |buf, _| {
1809 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1810 })
1811 .await;
1812 }
1813
1814 #[gpui::test(iterations = 10)]
1815 async fn test_fs_operations(
1816 executor: Arc<Deterministic>,
1817 cx_a: &mut TestAppContext,
1818 cx_b: &mut TestAppContext,
1819 ) {
1820 executor.forbid_parking();
1821 let fs = FakeFs::new(cx_a.background());
1822
1823 // Connect to a server as 2 clients.
1824 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1825 let mut client_a = server.create_client(cx_a, "user_a").await;
1826 let mut client_b = server.create_client(cx_b, "user_b").await;
1827
1828 // Share a project as client A
1829 fs.insert_tree(
1830 "/dir",
1831 json!({
1832 ".zed.toml": r#"collaborators = ["user_b"]"#,
1833 "a.txt": "a-contents",
1834 "b.txt": "b-contents",
1835 }),
1836 )
1837 .await;
1838
1839 let (project_a, worktree_id) = client_a.build_local_project(fs, "/dir", cx_a).await;
1840 let project_id = project_a.read_with(cx_a, |project, _| project.remote_id().unwrap());
1841 project_a
1842 .update(cx_a, |project, cx| project.share(cx))
1843 .await
1844 .unwrap();
1845
1846 let project_b = client_b.build_remote_project(project_id, cx_b).await;
1847
1848 let worktree_a =
1849 project_a.read_with(cx_a, |project, cx| project.worktrees(cx).next().unwrap());
1850 let worktree_b =
1851 project_b.read_with(cx_b, |project, cx| project.worktrees(cx).next().unwrap());
1852
1853 let entry = project_b
1854 .update(cx_b, |project, cx| {
1855 project
1856 .create_entry((worktree_id, "c.txt"), false, cx)
1857 .unwrap()
1858 })
1859 .await
1860 .unwrap();
1861 worktree_a.read_with(cx_a, |worktree, _| {
1862 assert_eq!(
1863 worktree
1864 .paths()
1865 .map(|p| p.to_string_lossy())
1866 .collect::<Vec<_>>(),
1867 [".zed.toml", "a.txt", "b.txt", "c.txt"]
1868 );
1869 });
1870 worktree_b.read_with(cx_b, |worktree, _| {
1871 assert_eq!(
1872 worktree
1873 .paths()
1874 .map(|p| p.to_string_lossy())
1875 .collect::<Vec<_>>(),
1876 [".zed.toml", "a.txt", "b.txt", "c.txt"]
1877 );
1878 });
1879
1880 project_b
1881 .update(cx_b, |project, cx| {
1882 project.rename_entry(entry.id, Path::new("d.txt"), cx)
1883 })
1884 .unwrap()
1885 .await
1886 .unwrap();
1887 worktree_a.read_with(cx_a, |worktree, _| {
1888 assert_eq!(
1889 worktree
1890 .paths()
1891 .map(|p| p.to_string_lossy())
1892 .collect::<Vec<_>>(),
1893 [".zed.toml", "a.txt", "b.txt", "d.txt"]
1894 );
1895 });
1896 worktree_b.read_with(cx_b, |worktree, _| {
1897 assert_eq!(
1898 worktree
1899 .paths()
1900 .map(|p| p.to_string_lossy())
1901 .collect::<Vec<_>>(),
1902 [".zed.toml", "a.txt", "b.txt", "d.txt"]
1903 );
1904 });
1905
1906 let dir_entry = project_b
1907 .update(cx_b, |project, cx| {
1908 project
1909 .create_entry((worktree_id, "DIR"), true, cx)
1910 .unwrap()
1911 })
1912 .await
1913 .unwrap();
1914 worktree_a.read_with(cx_a, |worktree, _| {
1915 assert_eq!(
1916 worktree
1917 .paths()
1918 .map(|p| p.to_string_lossy())
1919 .collect::<Vec<_>>(),
1920 [".zed.toml", "DIR", "a.txt", "b.txt", "d.txt"]
1921 );
1922 });
1923 worktree_b.read_with(cx_b, |worktree, _| {
1924 assert_eq!(
1925 worktree
1926 .paths()
1927 .map(|p| p.to_string_lossy())
1928 .collect::<Vec<_>>(),
1929 [".zed.toml", "DIR", "a.txt", "b.txt", "d.txt"]
1930 );
1931 });
1932
1933 project_b
1934 .update(cx_b, |project, cx| {
1935 project.delete_entry(dir_entry.id, cx).unwrap()
1936 })
1937 .await
1938 .unwrap();
1939 worktree_a.read_with(cx_a, |worktree, _| {
1940 assert_eq!(
1941 worktree
1942 .paths()
1943 .map(|p| p.to_string_lossy())
1944 .collect::<Vec<_>>(),
1945 [".zed.toml", "a.txt", "b.txt", "d.txt"]
1946 );
1947 });
1948 worktree_b.read_with(cx_b, |worktree, _| {
1949 assert_eq!(
1950 worktree
1951 .paths()
1952 .map(|p| p.to_string_lossy())
1953 .collect::<Vec<_>>(),
1954 [".zed.toml", "a.txt", "b.txt", "d.txt"]
1955 );
1956 });
1957
1958 project_b
1959 .update(cx_b, |project, cx| {
1960 project.delete_entry(entry.id, cx).unwrap()
1961 })
1962 .await
1963 .unwrap();
1964 worktree_a.read_with(cx_a, |worktree, _| {
1965 assert_eq!(
1966 worktree
1967 .paths()
1968 .map(|p| p.to_string_lossy())
1969 .collect::<Vec<_>>(),
1970 [".zed.toml", "a.txt", "b.txt"]
1971 );
1972 });
1973 worktree_b.read_with(cx_b, |worktree, _| {
1974 assert_eq!(
1975 worktree
1976 .paths()
1977 .map(|p| p.to_string_lossy())
1978 .collect::<Vec<_>>(),
1979 [".zed.toml", "a.txt", "b.txt"]
1980 );
1981 });
1982 }
1983
1984 #[gpui::test(iterations = 10)]
1985 async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1986 cx_a.foreground().forbid_parking();
1987 let lang_registry = Arc::new(LanguageRegistry::test());
1988 let fs = FakeFs::new(cx_a.background());
1989
1990 // Connect to a server as 2 clients.
1991 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1992 let client_a = server.create_client(cx_a, "user_a").await;
1993 let client_b = server.create_client(cx_b, "user_b").await;
1994
1995 // Share a project as client A
1996 fs.insert_tree(
1997 "/dir",
1998 json!({
1999 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2000 "a.txt": "a-contents",
2001 }),
2002 )
2003 .await;
2004
2005 let project_a = cx_a.update(|cx| {
2006 Project::local(
2007 client_a.clone(),
2008 client_a.user_store.clone(),
2009 lang_registry.clone(),
2010 fs.clone(),
2011 cx,
2012 )
2013 });
2014 let (worktree_a, _) = project_a
2015 .update(cx_a, |p, cx| {
2016 p.find_or_create_local_worktree("/dir", true, cx)
2017 })
2018 .await
2019 .unwrap();
2020 worktree_a
2021 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2022 .await;
2023 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2024 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2025 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2026
2027 // Join that project as client B
2028 let project_b = Project::remote(
2029 project_id,
2030 client_b.clone(),
2031 client_b.user_store.clone(),
2032 lang_registry.clone(),
2033 fs.clone(),
2034 &mut cx_b.to_async(),
2035 )
2036 .await
2037 .unwrap();
2038
2039 // Open a buffer as client B
2040 let buffer_b = project_b
2041 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2042 .await
2043 .unwrap();
2044
2045 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "world ")], cx));
2046 buffer_b.read_with(cx_b, |buf, _| {
2047 assert!(buf.is_dirty());
2048 assert!(!buf.has_conflict());
2049 });
2050
2051 buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
2052 buffer_b
2053 .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
2054 .await;
2055 buffer_b.read_with(cx_b, |buf, _| {
2056 assert!(!buf.has_conflict());
2057 });
2058
2059 buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "hello ")], cx));
2060 buffer_b.read_with(cx_b, |buf, _| {
2061 assert!(buf.is_dirty());
2062 assert!(!buf.has_conflict());
2063 });
2064 }
2065
2066 #[gpui::test(iterations = 10)]
2067 async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2068 cx_a.foreground().forbid_parking();
2069 let lang_registry = Arc::new(LanguageRegistry::test());
2070 let fs = FakeFs::new(cx_a.background());
2071
2072 // Connect to a server as 2 clients.
2073 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2074 let client_a = server.create_client(cx_a, "user_a").await;
2075 let client_b = server.create_client(cx_b, "user_b").await;
2076
2077 // Share a project as client A
2078 fs.insert_tree(
2079 "/dir",
2080 json!({
2081 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2082 "a.txt": "a-contents",
2083 }),
2084 )
2085 .await;
2086
2087 let project_a = cx_a.update(|cx| {
2088 Project::local(
2089 client_a.clone(),
2090 client_a.user_store.clone(),
2091 lang_registry.clone(),
2092 fs.clone(),
2093 cx,
2094 )
2095 });
2096 let (worktree_a, _) = project_a
2097 .update(cx_a, |p, cx| {
2098 p.find_or_create_local_worktree("/dir", true, cx)
2099 })
2100 .await
2101 .unwrap();
2102 worktree_a
2103 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2104 .await;
2105 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2106 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2107 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2108
2109 // Join that project as client B
2110 let project_b = Project::remote(
2111 project_id,
2112 client_b.clone(),
2113 client_b.user_store.clone(),
2114 lang_registry.clone(),
2115 fs.clone(),
2116 &mut cx_b.to_async(),
2117 )
2118 .await
2119 .unwrap();
2120 let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2121
2122 // Open a buffer as client B
2123 let buffer_b = project_b
2124 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2125 .await
2126 .unwrap();
2127 buffer_b.read_with(cx_b, |buf, _| {
2128 assert!(!buf.is_dirty());
2129 assert!(!buf.has_conflict());
2130 });
2131
2132 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
2133 .await
2134 .unwrap();
2135 buffer_b
2136 .condition(&cx_b, |buf, _| {
2137 buf.text() == "new contents" && !buf.is_dirty()
2138 })
2139 .await;
2140 buffer_b.read_with(cx_b, |buf, _| {
2141 assert!(!buf.has_conflict());
2142 });
2143 }
2144
2145 #[gpui::test(iterations = 10)]
2146 async fn test_editing_while_guest_opens_buffer(
2147 cx_a: &mut TestAppContext,
2148 cx_b: &mut TestAppContext,
2149 ) {
2150 cx_a.foreground().forbid_parking();
2151 let lang_registry = Arc::new(LanguageRegistry::test());
2152 let fs = FakeFs::new(cx_a.background());
2153
2154 // Connect to a server as 2 clients.
2155 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2156 let client_a = server.create_client(cx_a, "user_a").await;
2157 let client_b = server.create_client(cx_b, "user_b").await;
2158
2159 // Share a project as client A
2160 fs.insert_tree(
2161 "/dir",
2162 json!({
2163 ".zed.toml": r#"collaborators = ["user_b"]"#,
2164 "a.txt": "a-contents",
2165 }),
2166 )
2167 .await;
2168 let project_a = cx_a.update(|cx| {
2169 Project::local(
2170 client_a.clone(),
2171 client_a.user_store.clone(),
2172 lang_registry.clone(),
2173 fs.clone(),
2174 cx,
2175 )
2176 });
2177 let (worktree_a, _) = project_a
2178 .update(cx_a, |p, cx| {
2179 p.find_or_create_local_worktree("/dir", true, cx)
2180 })
2181 .await
2182 .unwrap();
2183 worktree_a
2184 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2185 .await;
2186 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2187 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2188 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2189
2190 // Join that project as client B
2191 let project_b = Project::remote(
2192 project_id,
2193 client_b.clone(),
2194 client_b.user_store.clone(),
2195 lang_registry.clone(),
2196 fs.clone(),
2197 &mut cx_b.to_async(),
2198 )
2199 .await
2200 .unwrap();
2201
2202 // Open a buffer as client A
2203 let buffer_a = project_a
2204 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2205 .await
2206 .unwrap();
2207
2208 // Start opening the same buffer as client B
2209 let buffer_b = cx_b
2210 .background()
2211 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2212
2213 // Edit the buffer as client A while client B is still opening it.
2214 cx_b.background().simulate_random_delay().await;
2215 buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "X")], cx));
2216 cx_b.background().simulate_random_delay().await;
2217 buffer_a.update(cx_a, |buf, cx| buf.edit([(1..1, "Y")], cx));
2218
2219 let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2220 let buffer_b = buffer_b.await.unwrap();
2221 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2222 }
2223
2224 #[gpui::test(iterations = 10)]
2225 async fn test_leaving_worktree_while_opening_buffer(
2226 cx_a: &mut TestAppContext,
2227 cx_b: &mut TestAppContext,
2228 ) {
2229 cx_a.foreground().forbid_parking();
2230 let lang_registry = Arc::new(LanguageRegistry::test());
2231 let fs = FakeFs::new(cx_a.background());
2232
2233 // Connect to a server as 2 clients.
2234 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2235 let client_a = server.create_client(cx_a, "user_a").await;
2236 let client_b = server.create_client(cx_b, "user_b").await;
2237
2238 // Share a project as client A
2239 fs.insert_tree(
2240 "/dir",
2241 json!({
2242 ".zed.toml": r#"collaborators = ["user_b"]"#,
2243 "a.txt": "a-contents",
2244 }),
2245 )
2246 .await;
2247 let project_a = cx_a.update(|cx| {
2248 Project::local(
2249 client_a.clone(),
2250 client_a.user_store.clone(),
2251 lang_registry.clone(),
2252 fs.clone(),
2253 cx,
2254 )
2255 });
2256 let (worktree_a, _) = project_a
2257 .update(cx_a, |p, cx| {
2258 p.find_or_create_local_worktree("/dir", true, cx)
2259 })
2260 .await
2261 .unwrap();
2262 worktree_a
2263 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2264 .await;
2265 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2266 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2267 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2268
2269 // Join that project as client B
2270 let project_b = Project::remote(
2271 project_id,
2272 client_b.clone(),
2273 client_b.user_store.clone(),
2274 lang_registry.clone(),
2275 fs.clone(),
2276 &mut cx_b.to_async(),
2277 )
2278 .await
2279 .unwrap();
2280
2281 // See that a guest has joined as client A.
2282 project_a
2283 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2284 .await;
2285
2286 // Begin opening a buffer as client B, but leave the project before the open completes.
2287 let buffer_b = cx_b
2288 .background()
2289 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2290 cx_b.update(|_| drop(project_b));
2291 drop(buffer_b);
2292
2293 // See that the guest has left.
2294 project_a
2295 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2296 .await;
2297 }
2298
2299 #[gpui::test(iterations = 10)]
2300 async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2301 cx_a.foreground().forbid_parking();
2302 let lang_registry = Arc::new(LanguageRegistry::test());
2303 let fs = FakeFs::new(cx_a.background());
2304
2305 // Connect to a server as 2 clients.
2306 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2307 let client_a = server.create_client(cx_a, "user_a").await;
2308 let client_b = server.create_client(cx_b, "user_b").await;
2309
2310 // Share a project as client A
2311 fs.insert_tree(
2312 "/a",
2313 json!({
2314 ".zed.toml": r#"collaborators = ["user_b"]"#,
2315 "a.txt": "a-contents",
2316 "b.txt": "b-contents",
2317 }),
2318 )
2319 .await;
2320 let project_a = cx_a.update(|cx| {
2321 Project::local(
2322 client_a.clone(),
2323 client_a.user_store.clone(),
2324 lang_registry.clone(),
2325 fs.clone(),
2326 cx,
2327 )
2328 });
2329 let (worktree_a, _) = project_a
2330 .update(cx_a, |p, cx| {
2331 p.find_or_create_local_worktree("/a", true, cx)
2332 })
2333 .await
2334 .unwrap();
2335 worktree_a
2336 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2337 .await;
2338 let project_id = project_a
2339 .update(cx_a, |project, _| project.next_remote_id())
2340 .await;
2341 project_a
2342 .update(cx_a, |project, cx| project.share(cx))
2343 .await
2344 .unwrap();
2345
2346 // Join that project as client B
2347 let _project_b = Project::remote(
2348 project_id,
2349 client_b.clone(),
2350 client_b.user_store.clone(),
2351 lang_registry.clone(),
2352 fs.clone(),
2353 &mut cx_b.to_async(),
2354 )
2355 .await
2356 .unwrap();
2357
2358 // Client A sees that a guest has joined.
2359 project_a
2360 .condition(cx_a, |p, _| p.collaborators().len() == 1)
2361 .await;
2362
2363 // Drop client B's connection and ensure client A observes client B leaving the project.
2364 client_b.disconnect(&cx_b.to_async()).unwrap();
2365 project_a
2366 .condition(cx_a, |p, _| p.collaborators().len() == 0)
2367 .await;
2368
2369 // Rejoin the project as client B
2370 let _project_b = Project::remote(
2371 project_id,
2372 client_b.clone(),
2373 client_b.user_store.clone(),
2374 lang_registry.clone(),
2375 fs.clone(),
2376 &mut cx_b.to_async(),
2377 )
2378 .await
2379 .unwrap();
2380
2381 // Client A sees that a guest has re-joined.
2382 project_a
2383 .condition(cx_a, |p, _| p.collaborators().len() == 1)
2384 .await;
2385
2386 // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2387 client_b.wait_for_current_user(cx_b).await;
2388 server.disconnect_client(client_b.current_user_id(cx_b));
2389 cx_a.foreground().advance_clock(Duration::from_secs(3));
2390 project_a
2391 .condition(cx_a, |p, _| p.collaborators().len() == 0)
2392 .await;
2393 }
2394
2395 #[gpui::test(iterations = 10)]
2396 async fn test_collaborating_with_diagnostics(
2397 cx_a: &mut TestAppContext,
2398 cx_b: &mut TestAppContext,
2399 ) {
2400 cx_a.foreground().forbid_parking();
2401 let lang_registry = Arc::new(LanguageRegistry::test());
2402 let fs = FakeFs::new(cx_a.background());
2403
2404 // Set up a fake language server.
2405 let mut language = Language::new(
2406 LanguageConfig {
2407 name: "Rust".into(),
2408 path_suffixes: vec!["rs".to_string()],
2409 ..Default::default()
2410 },
2411 Some(tree_sitter_rust::language()),
2412 );
2413 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2414 lang_registry.add(Arc::new(language));
2415
2416 // Connect to a server as 2 clients.
2417 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2418 let client_a = server.create_client(cx_a, "user_a").await;
2419 let client_b = server.create_client(cx_b, "user_b").await;
2420
2421 // Share a project as client A
2422 fs.insert_tree(
2423 "/a",
2424 json!({
2425 ".zed.toml": r#"collaborators = ["user_b"]"#,
2426 "a.rs": "let one = two",
2427 "other.rs": "",
2428 }),
2429 )
2430 .await;
2431 let project_a = cx_a.update(|cx| {
2432 Project::local(
2433 client_a.clone(),
2434 client_a.user_store.clone(),
2435 lang_registry.clone(),
2436 fs.clone(),
2437 cx,
2438 )
2439 });
2440 let (worktree_a, _) = project_a
2441 .update(cx_a, |p, cx| {
2442 p.find_or_create_local_worktree("/a", true, cx)
2443 })
2444 .await
2445 .unwrap();
2446 worktree_a
2447 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2448 .await;
2449 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2450 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2451 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2452
2453 // Cause the language server to start.
2454 let _ = cx_a
2455 .background()
2456 .spawn(project_a.update(cx_a, |project, cx| {
2457 project.open_buffer(
2458 ProjectPath {
2459 worktree_id,
2460 path: Path::new("other.rs").into(),
2461 },
2462 cx,
2463 )
2464 }))
2465 .await
2466 .unwrap();
2467
2468 // Simulate a language server reporting errors for a file.
2469 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2470 fake_language_server
2471 .receive_notification::<lsp::notification::DidOpenTextDocument>()
2472 .await;
2473 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2474 lsp::PublishDiagnosticsParams {
2475 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2476 version: None,
2477 diagnostics: vec![lsp::Diagnostic {
2478 severity: Some(lsp::DiagnosticSeverity::ERROR),
2479 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2480 message: "message 1".to_string(),
2481 ..Default::default()
2482 }],
2483 },
2484 );
2485
2486 // Wait for server to see the diagnostics update.
2487 server
2488 .condition(|store| {
2489 let worktree = store
2490 .project(project_id)
2491 .unwrap()
2492 .share
2493 .as_ref()
2494 .unwrap()
2495 .worktrees
2496 .get(&worktree_id.to_proto())
2497 .unwrap();
2498
2499 !worktree.diagnostic_summaries.is_empty()
2500 })
2501 .await;
2502
2503 // Join the worktree as client B.
2504 let project_b = Project::remote(
2505 project_id,
2506 client_b.clone(),
2507 client_b.user_store.clone(),
2508 lang_registry.clone(),
2509 fs.clone(),
2510 &mut cx_b.to_async(),
2511 )
2512 .await
2513 .unwrap();
2514
2515 project_b.read_with(cx_b, |project, cx| {
2516 assert_eq!(
2517 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2518 &[(
2519 ProjectPath {
2520 worktree_id,
2521 path: Arc::from(Path::new("a.rs")),
2522 },
2523 DiagnosticSummary {
2524 error_count: 1,
2525 warning_count: 0,
2526 ..Default::default()
2527 },
2528 )]
2529 )
2530 });
2531
2532 // Simulate a language server reporting more errors for a file.
2533 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2534 lsp::PublishDiagnosticsParams {
2535 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2536 version: None,
2537 diagnostics: vec![
2538 lsp::Diagnostic {
2539 severity: Some(lsp::DiagnosticSeverity::ERROR),
2540 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2541 message: "message 1".to_string(),
2542 ..Default::default()
2543 },
2544 lsp::Diagnostic {
2545 severity: Some(lsp::DiagnosticSeverity::WARNING),
2546 range: lsp::Range::new(
2547 lsp::Position::new(0, 10),
2548 lsp::Position::new(0, 13),
2549 ),
2550 message: "message 2".to_string(),
2551 ..Default::default()
2552 },
2553 ],
2554 },
2555 );
2556
2557 // Client b gets the updated summaries
2558 project_b
2559 .condition(&cx_b, |project, cx| {
2560 project.diagnostic_summaries(cx).collect::<Vec<_>>()
2561 == &[(
2562 ProjectPath {
2563 worktree_id,
2564 path: Arc::from(Path::new("a.rs")),
2565 },
2566 DiagnosticSummary {
2567 error_count: 1,
2568 warning_count: 1,
2569 ..Default::default()
2570 },
2571 )]
2572 })
2573 .await;
2574
2575 // Open the file with the errors on client B. They should be present.
2576 let buffer_b = cx_b
2577 .background()
2578 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2579 .await
2580 .unwrap();
2581
2582 buffer_b.read_with(cx_b, |buffer, _| {
2583 assert_eq!(
2584 buffer
2585 .snapshot()
2586 .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2587 .map(|entry| entry)
2588 .collect::<Vec<_>>(),
2589 &[
2590 DiagnosticEntry {
2591 range: Point::new(0, 4)..Point::new(0, 7),
2592 diagnostic: Diagnostic {
2593 group_id: 0,
2594 message: "message 1".to_string(),
2595 severity: lsp::DiagnosticSeverity::ERROR,
2596 is_primary: true,
2597 ..Default::default()
2598 }
2599 },
2600 DiagnosticEntry {
2601 range: Point::new(0, 10)..Point::new(0, 13),
2602 diagnostic: Diagnostic {
2603 group_id: 1,
2604 severity: lsp::DiagnosticSeverity::WARNING,
2605 message: "message 2".to_string(),
2606 is_primary: true,
2607 ..Default::default()
2608 }
2609 }
2610 ]
2611 );
2612 });
2613
2614 // Simulate a language server reporting no errors for a file.
2615 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2616 lsp::PublishDiagnosticsParams {
2617 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2618 version: None,
2619 diagnostics: vec![],
2620 },
2621 );
2622 project_a
2623 .condition(cx_a, |project, cx| {
2624 project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2625 })
2626 .await;
2627 project_b
2628 .condition(cx_b, |project, cx| {
2629 project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2630 })
2631 .await;
2632 }
2633
2634 #[gpui::test(iterations = 10)]
2635 async fn test_collaborating_with_completion(
2636 cx_a: &mut TestAppContext,
2637 cx_b: &mut TestAppContext,
2638 ) {
2639 cx_a.foreground().forbid_parking();
2640 let lang_registry = Arc::new(LanguageRegistry::test());
2641 let fs = FakeFs::new(cx_a.background());
2642
2643 // Set up a fake language server.
2644 let mut language = Language::new(
2645 LanguageConfig {
2646 name: "Rust".into(),
2647 path_suffixes: vec!["rs".to_string()],
2648 ..Default::default()
2649 },
2650 Some(tree_sitter_rust::language()),
2651 );
2652 let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
2653 capabilities: lsp::ServerCapabilities {
2654 completion_provider: Some(lsp::CompletionOptions {
2655 trigger_characters: Some(vec![".".to_string()]),
2656 ..Default::default()
2657 }),
2658 ..Default::default()
2659 },
2660 ..Default::default()
2661 });
2662 lang_registry.add(Arc::new(language));
2663
2664 // Connect to a server as 2 clients.
2665 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2666 let client_a = server.create_client(cx_a, "user_a").await;
2667 let client_b = server.create_client(cx_b, "user_b").await;
2668
2669 // Share a project as client A
2670 fs.insert_tree(
2671 "/a",
2672 json!({
2673 ".zed.toml": r#"collaborators = ["user_b"]"#,
2674 "main.rs": "fn main() { a }",
2675 "other.rs": "",
2676 }),
2677 )
2678 .await;
2679 let project_a = cx_a.update(|cx| {
2680 Project::local(
2681 client_a.clone(),
2682 client_a.user_store.clone(),
2683 lang_registry.clone(),
2684 fs.clone(),
2685 cx,
2686 )
2687 });
2688 let (worktree_a, _) = project_a
2689 .update(cx_a, |p, cx| {
2690 p.find_or_create_local_worktree("/a", true, cx)
2691 })
2692 .await
2693 .unwrap();
2694 worktree_a
2695 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2696 .await;
2697 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2698 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2699 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2700
2701 // Join the worktree as client B.
2702 let project_b = Project::remote(
2703 project_id,
2704 client_b.clone(),
2705 client_b.user_store.clone(),
2706 lang_registry.clone(),
2707 fs.clone(),
2708 &mut cx_b.to_async(),
2709 )
2710 .await
2711 .unwrap();
2712
2713 // Open a file in an editor as the guest.
2714 let buffer_b = project_b
2715 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2716 .await
2717 .unwrap();
2718 let (window_b, _) = cx_b.add_window(|_| EmptyView);
2719 let editor_b = cx_b.add_view(window_b, |cx| {
2720 Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
2721 });
2722
2723 let fake_language_server = fake_language_servers.next().await.unwrap();
2724 buffer_b
2725 .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2726 .await;
2727
2728 // Type a completion trigger character as the guest.
2729 editor_b.update(cx_b, |editor, cx| {
2730 editor.select_ranges([13..13], None, cx);
2731 editor.handle_input(&Input(".".into()), cx);
2732 cx.focus(&editor_b);
2733 });
2734
2735 // Receive a completion request as the host's language server.
2736 // Return some completions from the host's language server.
2737 cx_a.foreground().start_waiting();
2738 fake_language_server
2739 .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
2740 assert_eq!(
2741 params.text_document_position.text_document.uri,
2742 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2743 );
2744 assert_eq!(
2745 params.text_document_position.position,
2746 lsp::Position::new(0, 14),
2747 );
2748
2749 Ok(Some(lsp::CompletionResponse::Array(vec![
2750 lsp::CompletionItem {
2751 label: "first_method(…)".into(),
2752 detail: Some("fn(&mut self, B) -> C".into()),
2753 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2754 new_text: "first_method($1)".to_string(),
2755 range: lsp::Range::new(
2756 lsp::Position::new(0, 14),
2757 lsp::Position::new(0, 14),
2758 ),
2759 })),
2760 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2761 ..Default::default()
2762 },
2763 lsp::CompletionItem {
2764 label: "second_method(…)".into(),
2765 detail: Some("fn(&mut self, C) -> D<E>".into()),
2766 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2767 new_text: "second_method()".to_string(),
2768 range: lsp::Range::new(
2769 lsp::Position::new(0, 14),
2770 lsp::Position::new(0, 14),
2771 ),
2772 })),
2773 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2774 ..Default::default()
2775 },
2776 ])))
2777 })
2778 .next()
2779 .await
2780 .unwrap();
2781 cx_a.foreground().finish_waiting();
2782
2783 // Open the buffer on the host.
2784 let buffer_a = project_a
2785 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2786 .await
2787 .unwrap();
2788 buffer_a
2789 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2790 .await;
2791
2792 // Confirm a completion on the guest.
2793 editor_b
2794 .condition(&cx_b, |editor, _| editor.context_menu_visible())
2795 .await;
2796 editor_b.update(cx_b, |editor, cx| {
2797 editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
2798 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2799 });
2800
2801 // Return a resolved completion from the host's language server.
2802 // The resolved completion has an additional text edit.
2803 fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
2804 |params, _| async move {
2805 assert_eq!(params.label, "first_method(…)");
2806 Ok(lsp::CompletionItem {
2807 label: "first_method(…)".into(),
2808 detail: Some("fn(&mut self, B) -> C".into()),
2809 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2810 new_text: "first_method($1)".to_string(),
2811 range: lsp::Range::new(
2812 lsp::Position::new(0, 14),
2813 lsp::Position::new(0, 14),
2814 ),
2815 })),
2816 additional_text_edits: Some(vec![lsp::TextEdit {
2817 new_text: "use d::SomeTrait;\n".to_string(),
2818 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2819 }]),
2820 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2821 ..Default::default()
2822 })
2823 },
2824 );
2825
2826 // The additional edit is applied.
2827 buffer_a
2828 .condition(&cx_a, |buffer, _| {
2829 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2830 })
2831 .await;
2832 buffer_b
2833 .condition(&cx_b, |buffer, _| {
2834 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2835 })
2836 .await;
2837 }
2838
2839 #[gpui::test(iterations = 10)]
2840 async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2841 cx_a.foreground().forbid_parking();
2842 let lang_registry = Arc::new(LanguageRegistry::test());
2843 let fs = FakeFs::new(cx_a.background());
2844
2845 // Connect to a server as 2 clients.
2846 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2847 let client_a = server.create_client(cx_a, "user_a").await;
2848 let client_b = server.create_client(cx_b, "user_b").await;
2849
2850 // Share a project as client A
2851 fs.insert_tree(
2852 "/a",
2853 json!({
2854 ".zed.toml": r#"collaborators = ["user_b"]"#,
2855 "a.rs": "let one = 1;",
2856 }),
2857 )
2858 .await;
2859 let project_a = cx_a.update(|cx| {
2860 Project::local(
2861 client_a.clone(),
2862 client_a.user_store.clone(),
2863 lang_registry.clone(),
2864 fs.clone(),
2865 cx,
2866 )
2867 });
2868 let (worktree_a, _) = project_a
2869 .update(cx_a, |p, cx| {
2870 p.find_or_create_local_worktree("/a", true, cx)
2871 })
2872 .await
2873 .unwrap();
2874 worktree_a
2875 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2876 .await;
2877 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2878 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2879 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2880 let buffer_a = project_a
2881 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
2882 .await
2883 .unwrap();
2884
2885 // Join the worktree as client B.
2886 let project_b = Project::remote(
2887 project_id,
2888 client_b.clone(),
2889 client_b.user_store.clone(),
2890 lang_registry.clone(),
2891 fs.clone(),
2892 &mut cx_b.to_async(),
2893 )
2894 .await
2895 .unwrap();
2896
2897 let buffer_b = cx_b
2898 .background()
2899 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2900 .await
2901 .unwrap();
2902 buffer_b.update(cx_b, |buffer, cx| {
2903 buffer.edit([(4..7, "six")], cx);
2904 buffer.edit([(10..11, "6")], cx);
2905 assert_eq!(buffer.text(), "let six = 6;");
2906 assert!(buffer.is_dirty());
2907 assert!(!buffer.has_conflict());
2908 });
2909 buffer_a
2910 .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
2911 .await;
2912
2913 fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
2914 .await
2915 .unwrap();
2916 buffer_a
2917 .condition(cx_a, |buffer, _| buffer.has_conflict())
2918 .await;
2919 buffer_b
2920 .condition(cx_b, |buffer, _| buffer.has_conflict())
2921 .await;
2922
2923 project_b
2924 .update(cx_b, |project, cx| {
2925 project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
2926 })
2927 .await
2928 .unwrap();
2929 buffer_a.read_with(cx_a, |buffer, _| {
2930 assert_eq!(buffer.text(), "let seven = 7;");
2931 assert!(!buffer.is_dirty());
2932 assert!(!buffer.has_conflict());
2933 });
2934 buffer_b.read_with(cx_b, |buffer, _| {
2935 assert_eq!(buffer.text(), "let seven = 7;");
2936 assert!(!buffer.is_dirty());
2937 assert!(!buffer.has_conflict());
2938 });
2939
2940 buffer_a.update(cx_a, |buffer, cx| {
2941 // Undoing on the host is a no-op when the reload was initiated by the guest.
2942 buffer.undo(cx);
2943 assert_eq!(buffer.text(), "let seven = 7;");
2944 assert!(!buffer.is_dirty());
2945 assert!(!buffer.has_conflict());
2946 });
2947 buffer_b.update(cx_b, |buffer, cx| {
2948 // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
2949 buffer.undo(cx);
2950 assert_eq!(buffer.text(), "let six = 6;");
2951 assert!(buffer.is_dirty());
2952 assert!(!buffer.has_conflict());
2953 });
2954 }
2955
2956 #[gpui::test(iterations = 10)]
2957 async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2958 cx_a.foreground().forbid_parking();
2959 let lang_registry = Arc::new(LanguageRegistry::test());
2960 let fs = FakeFs::new(cx_a.background());
2961
2962 // Set up a fake language server.
2963 let mut language = Language::new(
2964 LanguageConfig {
2965 name: "Rust".into(),
2966 path_suffixes: vec!["rs".to_string()],
2967 ..Default::default()
2968 },
2969 Some(tree_sitter_rust::language()),
2970 );
2971 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2972 lang_registry.add(Arc::new(language));
2973
2974 // Connect to a server as 2 clients.
2975 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2976 let client_a = server.create_client(cx_a, "user_a").await;
2977 let client_b = server.create_client(cx_b, "user_b").await;
2978
2979 // Share a project as client A
2980 fs.insert_tree(
2981 "/a",
2982 json!({
2983 ".zed.toml": r#"collaborators = ["user_b"]"#,
2984 "a.rs": "let one = two",
2985 }),
2986 )
2987 .await;
2988 let project_a = cx_a.update(|cx| {
2989 Project::local(
2990 client_a.clone(),
2991 client_a.user_store.clone(),
2992 lang_registry.clone(),
2993 fs.clone(),
2994 cx,
2995 )
2996 });
2997 let (worktree_a, _) = project_a
2998 .update(cx_a, |p, cx| {
2999 p.find_or_create_local_worktree("/a", true, cx)
3000 })
3001 .await
3002 .unwrap();
3003 worktree_a
3004 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3005 .await;
3006 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3007 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3008 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3009
3010 // Join the worktree as client B.
3011 let project_b = Project::remote(
3012 project_id,
3013 client_b.clone(),
3014 client_b.user_store.clone(),
3015 lang_registry.clone(),
3016 fs.clone(),
3017 &mut cx_b.to_async(),
3018 )
3019 .await
3020 .unwrap();
3021
3022 let buffer_b = cx_b
3023 .background()
3024 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3025 .await
3026 .unwrap();
3027
3028 let fake_language_server = fake_language_servers.next().await.unwrap();
3029 fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
3030 Ok(Some(vec![
3031 lsp::TextEdit {
3032 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
3033 new_text: "h".to_string(),
3034 },
3035 lsp::TextEdit {
3036 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
3037 new_text: "y".to_string(),
3038 },
3039 ]))
3040 });
3041
3042 project_b
3043 .update(cx_b, |project, cx| {
3044 project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
3045 })
3046 .await
3047 .unwrap();
3048 assert_eq!(
3049 buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
3050 "let honey = two"
3051 );
3052 }
3053
3054 #[gpui::test(iterations = 10)]
3055 async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3056 cx_a.foreground().forbid_parking();
3057 let lang_registry = Arc::new(LanguageRegistry::test());
3058 let fs = FakeFs::new(cx_a.background());
3059 fs.insert_tree(
3060 "/root-1",
3061 json!({
3062 ".zed.toml": r#"collaborators = ["user_b"]"#,
3063 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
3064 }),
3065 )
3066 .await;
3067 fs.insert_tree(
3068 "/root-2",
3069 json!({
3070 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
3071 }),
3072 )
3073 .await;
3074
3075 // Set up a fake language server.
3076 let mut language = Language::new(
3077 LanguageConfig {
3078 name: "Rust".into(),
3079 path_suffixes: vec!["rs".to_string()],
3080 ..Default::default()
3081 },
3082 Some(tree_sitter_rust::language()),
3083 );
3084 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3085 lang_registry.add(Arc::new(language));
3086
3087 // Connect to a server as 2 clients.
3088 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3089 let client_a = server.create_client(cx_a, "user_a").await;
3090 let client_b = server.create_client(cx_b, "user_b").await;
3091
3092 // Share a project as client A
3093 let project_a = cx_a.update(|cx| {
3094 Project::local(
3095 client_a.clone(),
3096 client_a.user_store.clone(),
3097 lang_registry.clone(),
3098 fs.clone(),
3099 cx,
3100 )
3101 });
3102 let (worktree_a, _) = project_a
3103 .update(cx_a, |p, cx| {
3104 p.find_or_create_local_worktree("/root-1", true, cx)
3105 })
3106 .await
3107 .unwrap();
3108 worktree_a
3109 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3110 .await;
3111 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3112 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3113 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3114
3115 // Join the worktree as client B.
3116 let project_b = Project::remote(
3117 project_id,
3118 client_b.clone(),
3119 client_b.user_store.clone(),
3120 lang_registry.clone(),
3121 fs.clone(),
3122 &mut cx_b.to_async(),
3123 )
3124 .await
3125 .unwrap();
3126
3127 // Open the file on client B.
3128 let buffer_b = cx_b
3129 .background()
3130 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3131 .await
3132 .unwrap();
3133
3134 // Request the definition of a symbol as the guest.
3135 let fake_language_server = fake_language_servers.next().await.unwrap();
3136 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3137 |_, _| async move {
3138 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3139 lsp::Location::new(
3140 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3141 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3142 ),
3143 )))
3144 },
3145 );
3146
3147 let definitions_1 = project_b
3148 .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
3149 .await
3150 .unwrap();
3151 cx_b.read(|cx| {
3152 assert_eq!(definitions_1.len(), 1);
3153 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3154 let target_buffer = definitions_1[0].buffer.read(cx);
3155 assert_eq!(
3156 target_buffer.text(),
3157 "const TWO: usize = 2;\nconst THREE: usize = 3;"
3158 );
3159 assert_eq!(
3160 definitions_1[0].range.to_point(target_buffer),
3161 Point::new(0, 6)..Point::new(0, 9)
3162 );
3163 });
3164
3165 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
3166 // the previous call to `definition`.
3167 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3168 |_, _| async move {
3169 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3170 lsp::Location::new(
3171 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3172 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
3173 ),
3174 )))
3175 },
3176 );
3177
3178 let definitions_2 = project_b
3179 .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
3180 .await
3181 .unwrap();
3182 cx_b.read(|cx| {
3183 assert_eq!(definitions_2.len(), 1);
3184 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3185 let target_buffer = definitions_2[0].buffer.read(cx);
3186 assert_eq!(
3187 target_buffer.text(),
3188 "const TWO: usize = 2;\nconst THREE: usize = 3;"
3189 );
3190 assert_eq!(
3191 definitions_2[0].range.to_point(target_buffer),
3192 Point::new(1, 6)..Point::new(1, 11)
3193 );
3194 });
3195 assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
3196 }
3197
3198 #[gpui::test(iterations = 10)]
3199 async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3200 cx_a.foreground().forbid_parking();
3201 let lang_registry = Arc::new(LanguageRegistry::test());
3202 let fs = FakeFs::new(cx_a.background());
3203 fs.insert_tree(
3204 "/root-1",
3205 json!({
3206 ".zed.toml": r#"collaborators = ["user_b"]"#,
3207 "one.rs": "const ONE: usize = 1;",
3208 "two.rs": "const TWO: usize = one::ONE + one::ONE;",
3209 }),
3210 )
3211 .await;
3212 fs.insert_tree(
3213 "/root-2",
3214 json!({
3215 "three.rs": "const THREE: usize = two::TWO + one::ONE;",
3216 }),
3217 )
3218 .await;
3219
3220 // Set up a fake language server.
3221 let mut language = Language::new(
3222 LanguageConfig {
3223 name: "Rust".into(),
3224 path_suffixes: vec!["rs".to_string()],
3225 ..Default::default()
3226 },
3227 Some(tree_sitter_rust::language()),
3228 );
3229 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3230 lang_registry.add(Arc::new(language));
3231
3232 // Connect to a server as 2 clients.
3233 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3234 let client_a = server.create_client(cx_a, "user_a").await;
3235 let client_b = server.create_client(cx_b, "user_b").await;
3236
3237 // Share a project as client A
3238 let project_a = cx_a.update(|cx| {
3239 Project::local(
3240 client_a.clone(),
3241 client_a.user_store.clone(),
3242 lang_registry.clone(),
3243 fs.clone(),
3244 cx,
3245 )
3246 });
3247 let (worktree_a, _) = project_a
3248 .update(cx_a, |p, cx| {
3249 p.find_or_create_local_worktree("/root-1", true, cx)
3250 })
3251 .await
3252 .unwrap();
3253 worktree_a
3254 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3255 .await;
3256 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3257 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3258 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3259
3260 // Join the worktree as client B.
3261 let project_b = Project::remote(
3262 project_id,
3263 client_b.clone(),
3264 client_b.user_store.clone(),
3265 lang_registry.clone(),
3266 fs.clone(),
3267 &mut cx_b.to_async(),
3268 )
3269 .await
3270 .unwrap();
3271
3272 // Open the file on client B.
3273 let buffer_b = cx_b
3274 .background()
3275 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3276 .await
3277 .unwrap();
3278
3279 // Request references to a symbol as the guest.
3280 let fake_language_server = fake_language_servers.next().await.unwrap();
3281 fake_language_server.handle_request::<lsp::request::References, _, _>(
3282 |params, _| async move {
3283 assert_eq!(
3284 params.text_document_position.text_document.uri.as_str(),
3285 "file:///root-1/one.rs"
3286 );
3287 Ok(Some(vec![
3288 lsp::Location {
3289 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3290 range: lsp::Range::new(
3291 lsp::Position::new(0, 24),
3292 lsp::Position::new(0, 27),
3293 ),
3294 },
3295 lsp::Location {
3296 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3297 range: lsp::Range::new(
3298 lsp::Position::new(0, 35),
3299 lsp::Position::new(0, 38),
3300 ),
3301 },
3302 lsp::Location {
3303 uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3304 range: lsp::Range::new(
3305 lsp::Position::new(0, 37),
3306 lsp::Position::new(0, 40),
3307 ),
3308 },
3309 ]))
3310 },
3311 );
3312
3313 let references = project_b
3314 .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3315 .await
3316 .unwrap();
3317 cx_b.read(|cx| {
3318 assert_eq!(references.len(), 3);
3319 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3320
3321 let two_buffer = references[0].buffer.read(cx);
3322 let three_buffer = references[2].buffer.read(cx);
3323 assert_eq!(
3324 two_buffer.file().unwrap().path().as_ref(),
3325 Path::new("two.rs")
3326 );
3327 assert_eq!(references[1].buffer, references[0].buffer);
3328 assert_eq!(
3329 three_buffer.file().unwrap().full_path(cx),
3330 Path::new("three.rs")
3331 );
3332
3333 assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3334 assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3335 assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3336 });
3337 }
3338
3339 #[gpui::test(iterations = 10)]
3340 async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3341 cx_a.foreground().forbid_parking();
3342 let lang_registry = Arc::new(LanguageRegistry::test());
3343 let fs = FakeFs::new(cx_a.background());
3344 fs.insert_tree(
3345 "/root-1",
3346 json!({
3347 ".zed.toml": r#"collaborators = ["user_b"]"#,
3348 "a": "hello world",
3349 "b": "goodnight moon",
3350 "c": "a world of goo",
3351 "d": "world champion of clown world",
3352 }),
3353 )
3354 .await;
3355 fs.insert_tree(
3356 "/root-2",
3357 json!({
3358 "e": "disney world is fun",
3359 }),
3360 )
3361 .await;
3362
3363 // Connect to a server as 2 clients.
3364 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3365 let client_a = server.create_client(cx_a, "user_a").await;
3366 let client_b = server.create_client(cx_b, "user_b").await;
3367
3368 // Share a project as client A
3369 let project_a = cx_a.update(|cx| {
3370 Project::local(
3371 client_a.clone(),
3372 client_a.user_store.clone(),
3373 lang_registry.clone(),
3374 fs.clone(),
3375 cx,
3376 )
3377 });
3378 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3379
3380 let (worktree_1, _) = project_a
3381 .update(cx_a, |p, cx| {
3382 p.find_or_create_local_worktree("/root-1", true, cx)
3383 })
3384 .await
3385 .unwrap();
3386 worktree_1
3387 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3388 .await;
3389 let (worktree_2, _) = project_a
3390 .update(cx_a, |p, cx| {
3391 p.find_or_create_local_worktree("/root-2", true, cx)
3392 })
3393 .await
3394 .unwrap();
3395 worktree_2
3396 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3397 .await;
3398
3399 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3400
3401 // Join the worktree as client B.
3402 let project_b = Project::remote(
3403 project_id,
3404 client_b.clone(),
3405 client_b.user_store.clone(),
3406 lang_registry.clone(),
3407 fs.clone(),
3408 &mut cx_b.to_async(),
3409 )
3410 .await
3411 .unwrap();
3412
3413 let results = project_b
3414 .update(cx_b, |project, cx| {
3415 project.search(SearchQuery::text("world", false, false), cx)
3416 })
3417 .await
3418 .unwrap();
3419
3420 let mut ranges_by_path = results
3421 .into_iter()
3422 .map(|(buffer, ranges)| {
3423 buffer.read_with(cx_b, |buffer, cx| {
3424 let path = buffer.file().unwrap().full_path(cx);
3425 let offset_ranges = ranges
3426 .into_iter()
3427 .map(|range| range.to_offset(buffer))
3428 .collect::<Vec<_>>();
3429 (path, offset_ranges)
3430 })
3431 })
3432 .collect::<Vec<_>>();
3433 ranges_by_path.sort_by_key(|(path, _)| path.clone());
3434
3435 assert_eq!(
3436 ranges_by_path,
3437 &[
3438 (PathBuf::from("root-1/a"), vec![6..11]),
3439 (PathBuf::from("root-1/c"), vec![2..7]),
3440 (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3441 (PathBuf::from("root-2/e"), vec![7..12]),
3442 ]
3443 );
3444 }
3445
3446 #[gpui::test(iterations = 10)]
3447 async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3448 cx_a.foreground().forbid_parking();
3449 let lang_registry = Arc::new(LanguageRegistry::test());
3450 let fs = FakeFs::new(cx_a.background());
3451 fs.insert_tree(
3452 "/root-1",
3453 json!({
3454 ".zed.toml": r#"collaborators = ["user_b"]"#,
3455 "main.rs": "fn double(number: i32) -> i32 { number + number }",
3456 }),
3457 )
3458 .await;
3459
3460 // Set up a fake language server.
3461 let mut language = Language::new(
3462 LanguageConfig {
3463 name: "Rust".into(),
3464 path_suffixes: vec!["rs".to_string()],
3465 ..Default::default()
3466 },
3467 Some(tree_sitter_rust::language()),
3468 );
3469 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3470 lang_registry.add(Arc::new(language));
3471
3472 // Connect to a server as 2 clients.
3473 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3474 let client_a = server.create_client(cx_a, "user_a").await;
3475 let client_b = server.create_client(cx_b, "user_b").await;
3476
3477 // Share a project as client A
3478 let project_a = cx_a.update(|cx| {
3479 Project::local(
3480 client_a.clone(),
3481 client_a.user_store.clone(),
3482 lang_registry.clone(),
3483 fs.clone(),
3484 cx,
3485 )
3486 });
3487 let (worktree_a, _) = project_a
3488 .update(cx_a, |p, cx| {
3489 p.find_or_create_local_worktree("/root-1", true, cx)
3490 })
3491 .await
3492 .unwrap();
3493 worktree_a
3494 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3495 .await;
3496 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3497 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3498 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3499
3500 // Join the worktree as client B.
3501 let project_b = Project::remote(
3502 project_id,
3503 client_b.clone(),
3504 client_b.user_store.clone(),
3505 lang_registry.clone(),
3506 fs.clone(),
3507 &mut cx_b.to_async(),
3508 )
3509 .await
3510 .unwrap();
3511
3512 // Open the file on client B.
3513 let buffer_b = cx_b
3514 .background()
3515 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
3516 .await
3517 .unwrap();
3518
3519 // Request document highlights as the guest.
3520 let fake_language_server = fake_language_servers.next().await.unwrap();
3521 fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
3522 |params, _| async move {
3523 assert_eq!(
3524 params
3525 .text_document_position_params
3526 .text_document
3527 .uri
3528 .as_str(),
3529 "file:///root-1/main.rs"
3530 );
3531 assert_eq!(
3532 params.text_document_position_params.position,
3533 lsp::Position::new(0, 34)
3534 );
3535 Ok(Some(vec![
3536 lsp::DocumentHighlight {
3537 kind: Some(lsp::DocumentHighlightKind::WRITE),
3538 range: lsp::Range::new(
3539 lsp::Position::new(0, 10),
3540 lsp::Position::new(0, 16),
3541 ),
3542 },
3543 lsp::DocumentHighlight {
3544 kind: Some(lsp::DocumentHighlightKind::READ),
3545 range: lsp::Range::new(
3546 lsp::Position::new(0, 32),
3547 lsp::Position::new(0, 38),
3548 ),
3549 },
3550 lsp::DocumentHighlight {
3551 kind: Some(lsp::DocumentHighlightKind::READ),
3552 range: lsp::Range::new(
3553 lsp::Position::new(0, 41),
3554 lsp::Position::new(0, 47),
3555 ),
3556 },
3557 ]))
3558 },
3559 );
3560
3561 let highlights = project_b
3562 .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
3563 .await
3564 .unwrap();
3565 buffer_b.read_with(cx_b, |buffer, _| {
3566 let snapshot = buffer.snapshot();
3567
3568 let highlights = highlights
3569 .into_iter()
3570 .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3571 .collect::<Vec<_>>();
3572 assert_eq!(
3573 highlights,
3574 &[
3575 (lsp::DocumentHighlightKind::WRITE, 10..16),
3576 (lsp::DocumentHighlightKind::READ, 32..38),
3577 (lsp::DocumentHighlightKind::READ, 41..47)
3578 ]
3579 )
3580 });
3581 }
3582
3583 #[gpui::test(iterations = 10)]
3584 async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3585 cx_a.foreground().forbid_parking();
3586 let lang_registry = Arc::new(LanguageRegistry::test());
3587 let fs = FakeFs::new(cx_a.background());
3588 fs.insert_tree(
3589 "/code",
3590 json!({
3591 "crate-1": {
3592 ".zed.toml": r#"collaborators = ["user_b"]"#,
3593 "one.rs": "const ONE: usize = 1;",
3594 },
3595 "crate-2": {
3596 "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3597 },
3598 "private": {
3599 "passwords.txt": "the-password",
3600 }
3601 }),
3602 )
3603 .await;
3604
3605 // Set up a fake language server.
3606 let mut language = Language::new(
3607 LanguageConfig {
3608 name: "Rust".into(),
3609 path_suffixes: vec!["rs".to_string()],
3610 ..Default::default()
3611 },
3612 Some(tree_sitter_rust::language()),
3613 );
3614 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3615 lang_registry.add(Arc::new(language));
3616
3617 // Connect to a server as 2 clients.
3618 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3619 let client_a = server.create_client(cx_a, "user_a").await;
3620 let client_b = server.create_client(cx_b, "user_b").await;
3621
3622 // Share a project as client A
3623 let project_a = cx_a.update(|cx| {
3624 Project::local(
3625 client_a.clone(),
3626 client_a.user_store.clone(),
3627 lang_registry.clone(),
3628 fs.clone(),
3629 cx,
3630 )
3631 });
3632 let (worktree_a, _) = project_a
3633 .update(cx_a, |p, cx| {
3634 p.find_or_create_local_worktree("/code/crate-1", true, cx)
3635 })
3636 .await
3637 .unwrap();
3638 worktree_a
3639 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3640 .await;
3641 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3642 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3643 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3644
3645 // Join the worktree as client B.
3646 let project_b = Project::remote(
3647 project_id,
3648 client_b.clone(),
3649 client_b.user_store.clone(),
3650 lang_registry.clone(),
3651 fs.clone(),
3652 &mut cx_b.to_async(),
3653 )
3654 .await
3655 .unwrap();
3656
3657 // Cause the language server to start.
3658 let _buffer = cx_b
3659 .background()
3660 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3661 .await
3662 .unwrap();
3663
3664 let fake_language_server = fake_language_servers.next().await.unwrap();
3665 fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
3666 |_, _| async move {
3667 #[allow(deprecated)]
3668 Ok(Some(vec![lsp::SymbolInformation {
3669 name: "TWO".into(),
3670 location: lsp::Location {
3671 uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3672 range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3673 },
3674 kind: lsp::SymbolKind::CONSTANT,
3675 tags: None,
3676 container_name: None,
3677 deprecated: None,
3678 }]))
3679 },
3680 );
3681
3682 // Request the definition of a symbol as the guest.
3683 let symbols = project_b
3684 .update(cx_b, |p, cx| p.symbols("two", cx))
3685 .await
3686 .unwrap();
3687 assert_eq!(symbols.len(), 1);
3688 assert_eq!(symbols[0].name, "TWO");
3689
3690 // Open one of the returned symbols.
3691 let buffer_b_2 = project_b
3692 .update(cx_b, |project, cx| {
3693 project.open_buffer_for_symbol(&symbols[0], cx)
3694 })
3695 .await
3696 .unwrap();
3697 buffer_b_2.read_with(cx_b, |buffer, _| {
3698 assert_eq!(
3699 buffer.file().unwrap().path().as_ref(),
3700 Path::new("../crate-2/two.rs")
3701 );
3702 });
3703
3704 // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3705 let mut fake_symbol = symbols[0].clone();
3706 fake_symbol.path = Path::new("/code/secrets").into();
3707 let error = project_b
3708 .update(cx_b, |project, cx| {
3709 project.open_buffer_for_symbol(&fake_symbol, cx)
3710 })
3711 .await
3712 .unwrap_err();
3713 assert!(error.to_string().contains("invalid symbol signature"));
3714 }
3715
3716 #[gpui::test(iterations = 10)]
3717 async fn test_open_buffer_while_getting_definition_pointing_to_it(
3718 cx_a: &mut TestAppContext,
3719 cx_b: &mut TestAppContext,
3720 mut rng: StdRng,
3721 ) {
3722 cx_a.foreground().forbid_parking();
3723 let lang_registry = Arc::new(LanguageRegistry::test());
3724 let fs = FakeFs::new(cx_a.background());
3725 fs.insert_tree(
3726 "/root",
3727 json!({
3728 ".zed.toml": r#"collaborators = ["user_b"]"#,
3729 "a.rs": "const ONE: usize = b::TWO;",
3730 "b.rs": "const TWO: usize = 2",
3731 }),
3732 )
3733 .await;
3734
3735 // Set up a fake language server.
3736 let mut language = Language::new(
3737 LanguageConfig {
3738 name: "Rust".into(),
3739 path_suffixes: vec!["rs".to_string()],
3740 ..Default::default()
3741 },
3742 Some(tree_sitter_rust::language()),
3743 );
3744 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3745 lang_registry.add(Arc::new(language));
3746
3747 // Connect to a server as 2 clients.
3748 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3749 let client_a = server.create_client(cx_a, "user_a").await;
3750 let client_b = server.create_client(cx_b, "user_b").await;
3751
3752 // Share a project as client A
3753 let project_a = cx_a.update(|cx| {
3754 Project::local(
3755 client_a.clone(),
3756 client_a.user_store.clone(),
3757 lang_registry.clone(),
3758 fs.clone(),
3759 cx,
3760 )
3761 });
3762
3763 let (worktree_a, _) = project_a
3764 .update(cx_a, |p, cx| {
3765 p.find_or_create_local_worktree("/root", true, cx)
3766 })
3767 .await
3768 .unwrap();
3769 worktree_a
3770 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3771 .await;
3772 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3773 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3774 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3775
3776 // Join the worktree as client B.
3777 let project_b = Project::remote(
3778 project_id,
3779 client_b.clone(),
3780 client_b.user_store.clone(),
3781 lang_registry.clone(),
3782 fs.clone(),
3783 &mut cx_b.to_async(),
3784 )
3785 .await
3786 .unwrap();
3787
3788 let buffer_b1 = cx_b
3789 .background()
3790 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3791 .await
3792 .unwrap();
3793
3794 let fake_language_server = fake_language_servers.next().await.unwrap();
3795 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3796 |_, _| async move {
3797 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3798 lsp::Location::new(
3799 lsp::Url::from_file_path("/root/b.rs").unwrap(),
3800 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3801 ),
3802 )))
3803 },
3804 );
3805
3806 let definitions;
3807 let buffer_b2;
3808 if rng.gen() {
3809 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3810 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3811 } else {
3812 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3813 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3814 }
3815
3816 let buffer_b2 = buffer_b2.await.unwrap();
3817 let definitions = definitions.await.unwrap();
3818 assert_eq!(definitions.len(), 1);
3819 assert_eq!(definitions[0].buffer, buffer_b2);
3820 }
3821
3822 #[gpui::test(iterations = 10)]
3823 async fn test_collaborating_with_code_actions(
3824 cx_a: &mut TestAppContext,
3825 cx_b: &mut TestAppContext,
3826 ) {
3827 cx_a.foreground().forbid_parking();
3828 let lang_registry = Arc::new(LanguageRegistry::test());
3829 let fs = FakeFs::new(cx_a.background());
3830 cx_b.update(|cx| editor::init(cx));
3831
3832 // Set up a fake language server.
3833 let mut language = Language::new(
3834 LanguageConfig {
3835 name: "Rust".into(),
3836 path_suffixes: vec!["rs".to_string()],
3837 ..Default::default()
3838 },
3839 Some(tree_sitter_rust::language()),
3840 );
3841 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3842 lang_registry.add(Arc::new(language));
3843
3844 // Connect to a server as 2 clients.
3845 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3846 let client_a = server.create_client(cx_a, "user_a").await;
3847 let client_b = server.create_client(cx_b, "user_b").await;
3848
3849 // Share a project as client A
3850 fs.insert_tree(
3851 "/a",
3852 json!({
3853 ".zed.toml": r#"collaborators = ["user_b"]"#,
3854 "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3855 "other.rs": "pub fn foo() -> usize { 4 }",
3856 }),
3857 )
3858 .await;
3859 let project_a = cx_a.update(|cx| {
3860 Project::local(
3861 client_a.clone(),
3862 client_a.user_store.clone(),
3863 lang_registry.clone(),
3864 fs.clone(),
3865 cx,
3866 )
3867 });
3868 let (worktree_a, _) = project_a
3869 .update(cx_a, |p, cx| {
3870 p.find_or_create_local_worktree("/a", true, cx)
3871 })
3872 .await
3873 .unwrap();
3874 worktree_a
3875 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3876 .await;
3877 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3878 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3879 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3880
3881 // Join the worktree as client B.
3882 let project_b = Project::remote(
3883 project_id,
3884 client_b.clone(),
3885 client_b.user_store.clone(),
3886 lang_registry.clone(),
3887 fs.clone(),
3888 &mut cx_b.to_async(),
3889 )
3890 .await
3891 .unwrap();
3892 let mut params = cx_b.update(WorkspaceParams::test);
3893 params.languages = lang_registry.clone();
3894 params.client = client_b.client.clone();
3895 params.user_store = client_b.user_store.clone();
3896 params.project = project_b;
3897
3898 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
3899 let editor_b = workspace_b
3900 .update(cx_b, |workspace, cx| {
3901 workspace.open_path((worktree_id, "main.rs"), true, cx)
3902 })
3903 .await
3904 .unwrap()
3905 .downcast::<Editor>()
3906 .unwrap();
3907
3908 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3909 fake_language_server
3910 .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
3911 assert_eq!(
3912 params.text_document.uri,
3913 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3914 );
3915 assert_eq!(params.range.start, lsp::Position::new(0, 0));
3916 assert_eq!(params.range.end, lsp::Position::new(0, 0));
3917 Ok(None)
3918 })
3919 .next()
3920 .await;
3921
3922 // Move cursor to a location that contains code actions.
3923 editor_b.update(cx_b, |editor, cx| {
3924 editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3925 cx.focus(&editor_b);
3926 });
3927
3928 fake_language_server
3929 .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
3930 assert_eq!(
3931 params.text_document.uri,
3932 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3933 );
3934 assert_eq!(params.range.start, lsp::Position::new(1, 31));
3935 assert_eq!(params.range.end, lsp::Position::new(1, 31));
3936
3937 Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
3938 lsp::CodeAction {
3939 title: "Inline into all callers".to_string(),
3940 edit: Some(lsp::WorkspaceEdit {
3941 changes: Some(
3942 [
3943 (
3944 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3945 vec![lsp::TextEdit::new(
3946 lsp::Range::new(
3947 lsp::Position::new(1, 22),
3948 lsp::Position::new(1, 34),
3949 ),
3950 "4".to_string(),
3951 )],
3952 ),
3953 (
3954 lsp::Url::from_file_path("/a/other.rs").unwrap(),
3955 vec![lsp::TextEdit::new(
3956 lsp::Range::new(
3957 lsp::Position::new(0, 0),
3958 lsp::Position::new(0, 27),
3959 ),
3960 "".to_string(),
3961 )],
3962 ),
3963 ]
3964 .into_iter()
3965 .collect(),
3966 ),
3967 ..Default::default()
3968 }),
3969 data: Some(json!({
3970 "codeActionParams": {
3971 "range": {
3972 "start": {"line": 1, "column": 31},
3973 "end": {"line": 1, "column": 31},
3974 }
3975 }
3976 })),
3977 ..Default::default()
3978 },
3979 )]))
3980 })
3981 .next()
3982 .await;
3983
3984 // Toggle code actions and wait for them to display.
3985 editor_b.update(cx_b, |editor, cx| {
3986 editor.toggle_code_actions(
3987 &ToggleCodeActions {
3988 deployed_from_indicator: false,
3989 },
3990 cx,
3991 );
3992 });
3993 editor_b
3994 .condition(&cx_b, |editor, _| editor.context_menu_visible())
3995 .await;
3996
3997 fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3998
3999 // Confirming the code action will trigger a resolve request.
4000 let confirm_action = workspace_b
4001 .update(cx_b, |workspace, cx| {
4002 Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
4003 })
4004 .unwrap();
4005 fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
4006 |_, _| async move {
4007 Ok(lsp::CodeAction {
4008 title: "Inline into all callers".to_string(),
4009 edit: Some(lsp::WorkspaceEdit {
4010 changes: Some(
4011 [
4012 (
4013 lsp::Url::from_file_path("/a/main.rs").unwrap(),
4014 vec![lsp::TextEdit::new(
4015 lsp::Range::new(
4016 lsp::Position::new(1, 22),
4017 lsp::Position::new(1, 34),
4018 ),
4019 "4".to_string(),
4020 )],
4021 ),
4022 (
4023 lsp::Url::from_file_path("/a/other.rs").unwrap(),
4024 vec![lsp::TextEdit::new(
4025 lsp::Range::new(
4026 lsp::Position::new(0, 0),
4027 lsp::Position::new(0, 27),
4028 ),
4029 "".to_string(),
4030 )],
4031 ),
4032 ]
4033 .into_iter()
4034 .collect(),
4035 ),
4036 ..Default::default()
4037 }),
4038 ..Default::default()
4039 })
4040 },
4041 );
4042
4043 // After the action is confirmed, an editor containing both modified files is opened.
4044 confirm_action.await.unwrap();
4045 let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4046 workspace
4047 .active_item(cx)
4048 .unwrap()
4049 .downcast::<Editor>()
4050 .unwrap()
4051 });
4052 code_action_editor.update(cx_b, |editor, cx| {
4053 assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4054 editor.undo(&Undo, cx);
4055 assert_eq!(
4056 editor.text(cx),
4057 "mod other;\nfn main() { let foo = other::foo(); }\npub fn foo() -> usize { 4 }"
4058 );
4059 editor.redo(&Redo, cx);
4060 assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4061 });
4062 }
4063
4064 #[gpui::test(iterations = 10)]
4065 async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4066 cx_a.foreground().forbid_parking();
4067 let lang_registry = Arc::new(LanguageRegistry::test());
4068 let fs = FakeFs::new(cx_a.background());
4069 cx_b.update(|cx| editor::init(cx));
4070
4071 // Set up a fake language server.
4072 let mut language = Language::new(
4073 LanguageConfig {
4074 name: "Rust".into(),
4075 path_suffixes: vec!["rs".to_string()],
4076 ..Default::default()
4077 },
4078 Some(tree_sitter_rust::language()),
4079 );
4080 let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
4081 capabilities: lsp::ServerCapabilities {
4082 rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
4083 prepare_provider: Some(true),
4084 work_done_progress_options: Default::default(),
4085 })),
4086 ..Default::default()
4087 },
4088 ..Default::default()
4089 });
4090 lang_registry.add(Arc::new(language));
4091
4092 // Connect to a server as 2 clients.
4093 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4094 let client_a = server.create_client(cx_a, "user_a").await;
4095 let client_b = server.create_client(cx_b, "user_b").await;
4096
4097 // Share a project as client A
4098 fs.insert_tree(
4099 "/dir",
4100 json!({
4101 ".zed.toml": r#"collaborators = ["user_b"]"#,
4102 "one.rs": "const ONE: usize = 1;",
4103 "two.rs": "const TWO: usize = one::ONE + one::ONE;"
4104 }),
4105 )
4106 .await;
4107 let project_a = cx_a.update(|cx| {
4108 Project::local(
4109 client_a.clone(),
4110 client_a.user_store.clone(),
4111 lang_registry.clone(),
4112 fs.clone(),
4113 cx,
4114 )
4115 });
4116 let (worktree_a, _) = project_a
4117 .update(cx_a, |p, cx| {
4118 p.find_or_create_local_worktree("/dir", true, cx)
4119 })
4120 .await
4121 .unwrap();
4122 worktree_a
4123 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4124 .await;
4125 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4126 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4127 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4128
4129 // Join the worktree as client B.
4130 let project_b = Project::remote(
4131 project_id,
4132 client_b.clone(),
4133 client_b.user_store.clone(),
4134 lang_registry.clone(),
4135 fs.clone(),
4136 &mut cx_b.to_async(),
4137 )
4138 .await
4139 .unwrap();
4140 let mut params = cx_b.update(WorkspaceParams::test);
4141 params.languages = lang_registry.clone();
4142 params.client = client_b.client.clone();
4143 params.user_store = client_b.user_store.clone();
4144 params.project = project_b;
4145
4146 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
4147 let editor_b = workspace_b
4148 .update(cx_b, |workspace, cx| {
4149 workspace.open_path((worktree_id, "one.rs"), true, cx)
4150 })
4151 .await
4152 .unwrap()
4153 .downcast::<Editor>()
4154 .unwrap();
4155 let fake_language_server = fake_language_servers.next().await.unwrap();
4156
4157 // Move cursor to a location that can be renamed.
4158 let prepare_rename = editor_b.update(cx_b, |editor, cx| {
4159 editor.select_ranges([7..7], None, cx);
4160 editor.rename(&Rename, cx).unwrap()
4161 });
4162
4163 fake_language_server
4164 .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
4165 assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
4166 assert_eq!(params.position, lsp::Position::new(0, 7));
4167 Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4168 lsp::Position::new(0, 6),
4169 lsp::Position::new(0, 9),
4170 ))))
4171 })
4172 .next()
4173 .await
4174 .unwrap();
4175 prepare_rename.await.unwrap();
4176 editor_b.update(cx_b, |editor, cx| {
4177 let rename = editor.pending_rename().unwrap();
4178 let buffer = editor.buffer().read(cx).snapshot(cx);
4179 assert_eq!(
4180 rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
4181 6..9
4182 );
4183 rename.editor.update(cx, |rename_editor, cx| {
4184 rename_editor.buffer().update(cx, |rename_buffer, cx| {
4185 rename_buffer.edit([(0..3, "THREE")], cx);
4186 });
4187 });
4188 });
4189
4190 let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
4191 Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
4192 });
4193 fake_language_server
4194 .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
4195 assert_eq!(
4196 params.text_document_position.text_document.uri.as_str(),
4197 "file:///dir/one.rs"
4198 );
4199 assert_eq!(
4200 params.text_document_position.position,
4201 lsp::Position::new(0, 6)
4202 );
4203 assert_eq!(params.new_name, "THREE");
4204 Ok(Some(lsp::WorkspaceEdit {
4205 changes: Some(
4206 [
4207 (
4208 lsp::Url::from_file_path("/dir/one.rs").unwrap(),
4209 vec![lsp::TextEdit::new(
4210 lsp::Range::new(
4211 lsp::Position::new(0, 6),
4212 lsp::Position::new(0, 9),
4213 ),
4214 "THREE".to_string(),
4215 )],
4216 ),
4217 (
4218 lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4219 vec![
4220 lsp::TextEdit::new(
4221 lsp::Range::new(
4222 lsp::Position::new(0, 24),
4223 lsp::Position::new(0, 27),
4224 ),
4225 "THREE".to_string(),
4226 ),
4227 lsp::TextEdit::new(
4228 lsp::Range::new(
4229 lsp::Position::new(0, 35),
4230 lsp::Position::new(0, 38),
4231 ),
4232 "THREE".to_string(),
4233 ),
4234 ],
4235 ),
4236 ]
4237 .into_iter()
4238 .collect(),
4239 ),
4240 ..Default::default()
4241 }))
4242 })
4243 .next()
4244 .await
4245 .unwrap();
4246 confirm_rename.await.unwrap();
4247
4248 let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4249 workspace
4250 .active_item(cx)
4251 .unwrap()
4252 .downcast::<Editor>()
4253 .unwrap()
4254 });
4255 rename_editor.update(cx_b, |editor, cx| {
4256 assert_eq!(
4257 editor.text(cx),
4258 "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4259 );
4260 editor.undo(&Undo, cx);
4261 assert_eq!(
4262 editor.text(cx),
4263 "const ONE: usize = 1;\nconst TWO: usize = one::ONE + one::ONE;"
4264 );
4265 editor.redo(&Redo, cx);
4266 assert_eq!(
4267 editor.text(cx),
4268 "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4269 );
4270 });
4271
4272 // Ensure temporary rename edits cannot be undone/redone.
4273 editor_b.update(cx_b, |editor, cx| {
4274 editor.undo(&Undo, cx);
4275 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4276 editor.undo(&Undo, cx);
4277 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4278 editor.redo(&Redo, cx);
4279 assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4280 })
4281 }
4282
4283 #[gpui::test(iterations = 10)]
4284 async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4285 cx_a.foreground().forbid_parking();
4286
4287 // Connect to a server as 2 clients.
4288 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4289 let client_a = server.create_client(cx_a, "user_a").await;
4290 let client_b = server.create_client(cx_b, "user_b").await;
4291
4292 // Create an org that includes these 2 users.
4293 let db = &server.app_state.db;
4294 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4295 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4296 .await
4297 .unwrap();
4298 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4299 .await
4300 .unwrap();
4301
4302 // Create a channel that includes all the users.
4303 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4304 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4305 .await
4306 .unwrap();
4307 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4308 .await
4309 .unwrap();
4310 db.create_channel_message(
4311 channel_id,
4312 client_b.current_user_id(&cx_b),
4313 "hello A, it's B.",
4314 OffsetDateTime::now_utc(),
4315 1,
4316 )
4317 .await
4318 .unwrap();
4319
4320 let channels_a = cx_a
4321 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4322 channels_a
4323 .condition(cx_a, |list, _| list.available_channels().is_some())
4324 .await;
4325 channels_a.read_with(cx_a, |list, _| {
4326 assert_eq!(
4327 list.available_channels().unwrap(),
4328 &[ChannelDetails {
4329 id: channel_id.to_proto(),
4330 name: "test-channel".to_string()
4331 }]
4332 )
4333 });
4334 let channel_a = channels_a.update(cx_a, |this, cx| {
4335 this.get_channel(channel_id.to_proto(), cx).unwrap()
4336 });
4337 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4338 channel_a
4339 .condition(&cx_a, |channel, _| {
4340 channel_messages(channel)
4341 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4342 })
4343 .await;
4344
4345 let channels_b = cx_b
4346 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4347 channels_b
4348 .condition(cx_b, |list, _| list.available_channels().is_some())
4349 .await;
4350 channels_b.read_with(cx_b, |list, _| {
4351 assert_eq!(
4352 list.available_channels().unwrap(),
4353 &[ChannelDetails {
4354 id: channel_id.to_proto(),
4355 name: "test-channel".to_string()
4356 }]
4357 )
4358 });
4359
4360 let channel_b = channels_b.update(cx_b, |this, cx| {
4361 this.get_channel(channel_id.to_proto(), cx).unwrap()
4362 });
4363 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4364 channel_b
4365 .condition(&cx_b, |channel, _| {
4366 channel_messages(channel)
4367 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4368 })
4369 .await;
4370
4371 channel_a
4372 .update(cx_a, |channel, cx| {
4373 channel
4374 .send_message("oh, hi B.".to_string(), cx)
4375 .unwrap()
4376 .detach();
4377 let task = channel.send_message("sup".to_string(), cx).unwrap();
4378 assert_eq!(
4379 channel_messages(channel),
4380 &[
4381 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4382 ("user_a".to_string(), "oh, hi B.".to_string(), true),
4383 ("user_a".to_string(), "sup".to_string(), true)
4384 ]
4385 );
4386 task
4387 })
4388 .await
4389 .unwrap();
4390
4391 channel_b
4392 .condition(&cx_b, |channel, _| {
4393 channel_messages(channel)
4394 == [
4395 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4396 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4397 ("user_a".to_string(), "sup".to_string(), false),
4398 ]
4399 })
4400 .await;
4401
4402 assert_eq!(
4403 server
4404 .state()
4405 .await
4406 .channel(channel_id)
4407 .unwrap()
4408 .connection_ids
4409 .len(),
4410 2
4411 );
4412 cx_b.update(|_| drop(channel_b));
4413 server
4414 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4415 .await;
4416
4417 cx_a.update(|_| drop(channel_a));
4418 server
4419 .condition(|state| state.channel(channel_id).is_none())
4420 .await;
4421 }
4422
4423 #[gpui::test(iterations = 10)]
4424 async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4425 cx_a.foreground().forbid_parking();
4426
4427 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4428 let client_a = server.create_client(cx_a, "user_a").await;
4429
4430 let db = &server.app_state.db;
4431 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4432 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4433 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4434 .await
4435 .unwrap();
4436 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4437 .await
4438 .unwrap();
4439
4440 let channels_a = cx_a
4441 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4442 channels_a
4443 .condition(cx_a, |list, _| list.available_channels().is_some())
4444 .await;
4445 let channel_a = channels_a.update(cx_a, |this, cx| {
4446 this.get_channel(channel_id.to_proto(), cx).unwrap()
4447 });
4448
4449 // Messages aren't allowed to be too long.
4450 channel_a
4451 .update(cx_a, |channel, cx| {
4452 let long_body = "this is long.\n".repeat(1024);
4453 channel.send_message(long_body, cx).unwrap()
4454 })
4455 .await
4456 .unwrap_err();
4457
4458 // Messages aren't allowed to be blank.
4459 channel_a.update(cx_a, |channel, cx| {
4460 channel.send_message(String::new(), cx).unwrap_err()
4461 });
4462
4463 // Leading and trailing whitespace are trimmed.
4464 channel_a
4465 .update(cx_a, |channel, cx| {
4466 channel
4467 .send_message("\n surrounded by whitespace \n".to_string(), cx)
4468 .unwrap()
4469 })
4470 .await
4471 .unwrap();
4472 assert_eq!(
4473 db.get_channel_messages(channel_id, 10, None)
4474 .await
4475 .unwrap()
4476 .iter()
4477 .map(|m| &m.body)
4478 .collect::<Vec<_>>(),
4479 &["surrounded by whitespace"]
4480 );
4481 }
4482
4483 #[gpui::test(iterations = 10)]
4484 async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4485 cx_a.foreground().forbid_parking();
4486
4487 // Connect to a server as 2 clients.
4488 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4489 let client_a = server.create_client(cx_a, "user_a").await;
4490 let client_b = server.create_client(cx_b, "user_b").await;
4491 let mut status_b = client_b.status();
4492
4493 // Create an org that includes these 2 users.
4494 let db = &server.app_state.db;
4495 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4496 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4497 .await
4498 .unwrap();
4499 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4500 .await
4501 .unwrap();
4502
4503 // Create a channel that includes all the users.
4504 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4505 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4506 .await
4507 .unwrap();
4508 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4509 .await
4510 .unwrap();
4511 db.create_channel_message(
4512 channel_id,
4513 client_b.current_user_id(&cx_b),
4514 "hello A, it's B.",
4515 OffsetDateTime::now_utc(),
4516 2,
4517 )
4518 .await
4519 .unwrap();
4520
4521 let channels_a = cx_a
4522 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4523 channels_a
4524 .condition(cx_a, |list, _| list.available_channels().is_some())
4525 .await;
4526
4527 channels_a.read_with(cx_a, |list, _| {
4528 assert_eq!(
4529 list.available_channels().unwrap(),
4530 &[ChannelDetails {
4531 id: channel_id.to_proto(),
4532 name: "test-channel".to_string()
4533 }]
4534 )
4535 });
4536 let channel_a = channels_a.update(cx_a, |this, cx| {
4537 this.get_channel(channel_id.to_proto(), cx).unwrap()
4538 });
4539 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4540 channel_a
4541 .condition(&cx_a, |channel, _| {
4542 channel_messages(channel)
4543 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4544 })
4545 .await;
4546
4547 let channels_b = cx_b
4548 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4549 channels_b
4550 .condition(cx_b, |list, _| list.available_channels().is_some())
4551 .await;
4552 channels_b.read_with(cx_b, |list, _| {
4553 assert_eq!(
4554 list.available_channels().unwrap(),
4555 &[ChannelDetails {
4556 id: channel_id.to_proto(),
4557 name: "test-channel".to_string()
4558 }]
4559 )
4560 });
4561
4562 let channel_b = channels_b.update(cx_b, |this, cx| {
4563 this.get_channel(channel_id.to_proto(), cx).unwrap()
4564 });
4565 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4566 channel_b
4567 .condition(&cx_b, |channel, _| {
4568 channel_messages(channel)
4569 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4570 })
4571 .await;
4572
4573 // Disconnect client B, ensuring we can still access its cached channel data.
4574 server.forbid_connections();
4575 server.disconnect_client(client_b.current_user_id(&cx_b));
4576 cx_b.foreground().advance_clock(Duration::from_secs(3));
4577 while !matches!(
4578 status_b.next().await,
4579 Some(client::Status::ReconnectionError { .. })
4580 ) {}
4581
4582 channels_b.read_with(cx_b, |channels, _| {
4583 assert_eq!(
4584 channels.available_channels().unwrap(),
4585 [ChannelDetails {
4586 id: channel_id.to_proto(),
4587 name: "test-channel".to_string()
4588 }]
4589 )
4590 });
4591 channel_b.read_with(cx_b, |channel, _| {
4592 assert_eq!(
4593 channel_messages(channel),
4594 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4595 )
4596 });
4597
4598 // Send a message from client B while it is disconnected.
4599 channel_b
4600 .update(cx_b, |channel, cx| {
4601 let task = channel
4602 .send_message("can you see this?".to_string(), cx)
4603 .unwrap();
4604 assert_eq!(
4605 channel_messages(channel),
4606 &[
4607 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4608 ("user_b".to_string(), "can you see this?".to_string(), true)
4609 ]
4610 );
4611 task
4612 })
4613 .await
4614 .unwrap_err();
4615
4616 // Send a message from client A while B is disconnected.
4617 channel_a
4618 .update(cx_a, |channel, cx| {
4619 channel
4620 .send_message("oh, hi B.".to_string(), cx)
4621 .unwrap()
4622 .detach();
4623 let task = channel.send_message("sup".to_string(), cx).unwrap();
4624 assert_eq!(
4625 channel_messages(channel),
4626 &[
4627 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4628 ("user_a".to_string(), "oh, hi B.".to_string(), true),
4629 ("user_a".to_string(), "sup".to_string(), true)
4630 ]
4631 );
4632 task
4633 })
4634 .await
4635 .unwrap();
4636
4637 // Give client B a chance to reconnect.
4638 server.allow_connections();
4639 cx_b.foreground().advance_clock(Duration::from_secs(10));
4640
4641 // Verify that B sees the new messages upon reconnection, as well as the message client B
4642 // sent while offline.
4643 channel_b
4644 .condition(&cx_b, |channel, _| {
4645 channel_messages(channel)
4646 == [
4647 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4648 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4649 ("user_a".to_string(), "sup".to_string(), false),
4650 ("user_b".to_string(), "can you see this?".to_string(), false),
4651 ]
4652 })
4653 .await;
4654
4655 // Ensure client A and B can communicate normally after reconnection.
4656 channel_a
4657 .update(cx_a, |channel, cx| {
4658 channel.send_message("you online?".to_string(), cx).unwrap()
4659 })
4660 .await
4661 .unwrap();
4662 channel_b
4663 .condition(&cx_b, |channel, _| {
4664 channel_messages(channel)
4665 == [
4666 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4667 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4668 ("user_a".to_string(), "sup".to_string(), false),
4669 ("user_b".to_string(), "can you see this?".to_string(), false),
4670 ("user_a".to_string(), "you online?".to_string(), false),
4671 ]
4672 })
4673 .await;
4674
4675 channel_b
4676 .update(cx_b, |channel, cx| {
4677 channel.send_message("yep".to_string(), cx).unwrap()
4678 })
4679 .await
4680 .unwrap();
4681 channel_a
4682 .condition(&cx_a, |channel, _| {
4683 channel_messages(channel)
4684 == [
4685 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4686 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4687 ("user_a".to_string(), "sup".to_string(), false),
4688 ("user_b".to_string(), "can you see this?".to_string(), false),
4689 ("user_a".to_string(), "you online?".to_string(), false),
4690 ("user_b".to_string(), "yep".to_string(), false),
4691 ]
4692 })
4693 .await;
4694 }
4695
4696 #[gpui::test(iterations = 10)]
4697 async fn test_contacts(
4698 cx_a: &mut TestAppContext,
4699 cx_b: &mut TestAppContext,
4700 cx_c: &mut TestAppContext,
4701 ) {
4702 cx_a.foreground().forbid_parking();
4703 let lang_registry = Arc::new(LanguageRegistry::test());
4704 let fs = FakeFs::new(cx_a.background());
4705
4706 // Connect to a server as 3 clients.
4707 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4708 let client_a = server.create_client(cx_a, "user_a").await;
4709 let client_b = server.create_client(cx_b, "user_b").await;
4710 let client_c = server.create_client(cx_c, "user_c").await;
4711
4712 // Share a worktree as client A.
4713 fs.insert_tree(
4714 "/a",
4715 json!({
4716 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4717 }),
4718 )
4719 .await;
4720
4721 let project_a = cx_a.update(|cx| {
4722 Project::local(
4723 client_a.clone(),
4724 client_a.user_store.clone(),
4725 lang_registry.clone(),
4726 fs.clone(),
4727 cx,
4728 )
4729 });
4730 let (worktree_a, _) = project_a
4731 .update(cx_a, |p, cx| {
4732 p.find_or_create_local_worktree("/a", true, cx)
4733 })
4734 .await
4735 .unwrap();
4736 worktree_a
4737 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4738 .await;
4739
4740 client_a
4741 .user_store
4742 .condition(&cx_a, |user_store, _| {
4743 contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4744 })
4745 .await;
4746 client_b
4747 .user_store
4748 .condition(&cx_b, |user_store, _| {
4749 contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4750 })
4751 .await;
4752 client_c
4753 .user_store
4754 .condition(&cx_c, |user_store, _| {
4755 contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4756 })
4757 .await;
4758
4759 let project_id = project_a
4760 .update(cx_a, |project, _| project.next_remote_id())
4761 .await;
4762 project_a
4763 .update(cx_a, |project, cx| project.share(cx))
4764 .await
4765 .unwrap();
4766 client_a
4767 .user_store
4768 .condition(&cx_a, |user_store, _| {
4769 contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4770 })
4771 .await;
4772 client_b
4773 .user_store
4774 .condition(&cx_b, |user_store, _| {
4775 contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4776 })
4777 .await;
4778 client_c
4779 .user_store
4780 .condition(&cx_c, |user_store, _| {
4781 contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4782 })
4783 .await;
4784
4785 let _project_b = Project::remote(
4786 project_id,
4787 client_b.clone(),
4788 client_b.user_store.clone(),
4789 lang_registry.clone(),
4790 fs.clone(),
4791 &mut cx_b.to_async(),
4792 )
4793 .await
4794 .unwrap();
4795
4796 client_a
4797 .user_store
4798 .condition(&cx_a, |user_store, _| {
4799 contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4800 })
4801 .await;
4802 client_b
4803 .user_store
4804 .condition(&cx_b, |user_store, _| {
4805 contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4806 })
4807 .await;
4808 client_c
4809 .user_store
4810 .condition(&cx_c, |user_store, _| {
4811 contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4812 })
4813 .await;
4814
4815 project_a
4816 .condition(&cx_a, |project, _| {
4817 project.collaborators().contains_key(&client_b.peer_id)
4818 })
4819 .await;
4820
4821 cx_a.update(move |_| drop(project_a));
4822 client_a
4823 .user_store
4824 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4825 .await;
4826 client_b
4827 .user_store
4828 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4829 .await;
4830 client_c
4831 .user_store
4832 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4833 .await;
4834
4835 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, bool, Vec<&str>)>)> {
4836 user_store
4837 .contacts()
4838 .iter()
4839 .map(|contact| {
4840 let worktrees = contact
4841 .projects
4842 .iter()
4843 .map(|p| {
4844 (
4845 p.worktree_root_names[0].as_str(),
4846 p.is_shared,
4847 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4848 )
4849 })
4850 .collect();
4851 (contact.user.github_login.as_str(), worktrees)
4852 })
4853 .collect()
4854 }
4855 }
4856
4857 #[gpui::test(iterations = 10)]
4858 async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4859 cx_a.foreground().forbid_parking();
4860 let fs = FakeFs::new(cx_a.background());
4861
4862 // 2 clients connect to a server.
4863 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4864 let mut client_a = server.create_client(cx_a, "user_a").await;
4865 let mut client_b = server.create_client(cx_b, "user_b").await;
4866 cx_a.update(editor::init);
4867 cx_b.update(editor::init);
4868
4869 // Client A shares a project.
4870 fs.insert_tree(
4871 "/a",
4872 json!({
4873 ".zed.toml": r#"collaborators = ["user_b"]"#,
4874 "1.txt": "one",
4875 "2.txt": "two",
4876 "3.txt": "three",
4877 }),
4878 )
4879 .await;
4880 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
4881 project_a
4882 .update(cx_a, |project, cx| project.share(cx))
4883 .await
4884 .unwrap();
4885
4886 // Client B joins the project.
4887 let project_b = client_b
4888 .build_remote_project(
4889 project_a
4890 .read_with(cx_a, |project, _| project.remote_id())
4891 .unwrap(),
4892 cx_b,
4893 )
4894 .await;
4895
4896 // Client A opens some editors.
4897 let workspace_a = client_a.build_workspace(&project_a, cx_a);
4898 let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
4899 let editor_a1 = workspace_a
4900 .update(cx_a, |workspace, cx| {
4901 workspace.open_path((worktree_id, "1.txt"), true, cx)
4902 })
4903 .await
4904 .unwrap()
4905 .downcast::<Editor>()
4906 .unwrap();
4907 let editor_a2 = workspace_a
4908 .update(cx_a, |workspace, cx| {
4909 workspace.open_path((worktree_id, "2.txt"), true, cx)
4910 })
4911 .await
4912 .unwrap()
4913 .downcast::<Editor>()
4914 .unwrap();
4915
4916 // Client B opens an editor.
4917 let workspace_b = client_b.build_workspace(&project_b, cx_b);
4918 let editor_b1 = workspace_b
4919 .update(cx_b, |workspace, cx| {
4920 workspace.open_path((worktree_id, "1.txt"), true, cx)
4921 })
4922 .await
4923 .unwrap()
4924 .downcast::<Editor>()
4925 .unwrap();
4926
4927 let client_a_id = project_b.read_with(cx_b, |project, _| {
4928 project.collaborators().values().next().unwrap().peer_id
4929 });
4930 let client_b_id = project_a.read_with(cx_a, |project, _| {
4931 project.collaborators().values().next().unwrap().peer_id
4932 });
4933
4934 // When client B starts following client A, all visible view states are replicated to client B.
4935 editor_a1.update(cx_a, |editor, cx| editor.select_ranges([0..1], None, cx));
4936 editor_a2.update(cx_a, |editor, cx| editor.select_ranges([2..3], None, cx));
4937 workspace_b
4938 .update(cx_b, |workspace, cx| {
4939 workspace
4940 .toggle_follow(&ToggleFollow(client_a_id), cx)
4941 .unwrap()
4942 })
4943 .await
4944 .unwrap();
4945 let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
4946 workspace
4947 .active_item(cx)
4948 .unwrap()
4949 .downcast::<Editor>()
4950 .unwrap()
4951 });
4952 assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
4953 assert_eq!(
4954 editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
4955 Some((worktree_id, "2.txt").into())
4956 );
4957 assert_eq!(
4958 editor_b2.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
4959 vec![2..3]
4960 );
4961 assert_eq!(
4962 editor_b1.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
4963 vec![0..1]
4964 );
4965
4966 // When client A activates a different editor, client B does so as well.
4967 workspace_a.update(cx_a, |workspace, cx| {
4968 workspace.activate_item(&editor_a1, cx)
4969 });
4970 workspace_b
4971 .condition(cx_b, |workspace, cx| {
4972 workspace.active_item(cx).unwrap().id() == editor_b1.id()
4973 })
4974 .await;
4975
4976 // When client A navigates back and forth, client B does so as well.
4977 workspace_a
4978 .update(cx_a, |workspace, cx| {
4979 workspace::Pane::go_back(workspace, None, cx)
4980 })
4981 .await;
4982 workspace_b
4983 .condition(cx_b, |workspace, cx| {
4984 workspace.active_item(cx).unwrap().id() == editor_b2.id()
4985 })
4986 .await;
4987
4988 workspace_a
4989 .update(cx_a, |workspace, cx| {
4990 workspace::Pane::go_forward(workspace, None, cx)
4991 })
4992 .await;
4993 workspace_b
4994 .condition(cx_b, |workspace, cx| {
4995 workspace.active_item(cx).unwrap().id() == editor_b1.id()
4996 })
4997 .await;
4998
4999 // Changes to client A's editor are reflected on client B.
5000 editor_a1.update(cx_a, |editor, cx| {
5001 editor.select_ranges([1..1, 2..2], None, cx);
5002 });
5003 editor_b1
5004 .condition(cx_b, |editor, cx| {
5005 editor.selected_ranges(cx) == vec![1..1, 2..2]
5006 })
5007 .await;
5008
5009 editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
5010 editor_b1
5011 .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
5012 .await;
5013
5014 editor_a1.update(cx_a, |editor, cx| {
5015 editor.select_ranges([3..3], None, cx);
5016 editor.set_scroll_position(vec2f(0., 100.), cx);
5017 });
5018 editor_b1
5019 .condition(cx_b, |editor, cx| editor.selected_ranges(cx) == vec![3..3])
5020 .await;
5021
5022 // After unfollowing, client B stops receiving updates from client A.
5023 workspace_b.update(cx_b, |workspace, cx| {
5024 workspace.unfollow(&workspace.active_pane().clone(), cx)
5025 });
5026 workspace_a.update(cx_a, |workspace, cx| {
5027 workspace.activate_item(&editor_a2, cx)
5028 });
5029 cx_a.foreground().run_until_parked();
5030 assert_eq!(
5031 workspace_b.read_with(cx_b, |workspace, cx| workspace
5032 .active_item(cx)
5033 .unwrap()
5034 .id()),
5035 editor_b1.id()
5036 );
5037
5038 // Client A starts following client B.
5039 workspace_a
5040 .update(cx_a, |workspace, cx| {
5041 workspace
5042 .toggle_follow(&ToggleFollow(client_b_id), cx)
5043 .unwrap()
5044 })
5045 .await
5046 .unwrap();
5047 assert_eq!(
5048 workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5049 Some(client_b_id)
5050 );
5051 assert_eq!(
5052 workspace_a.read_with(cx_a, |workspace, cx| workspace
5053 .active_item(cx)
5054 .unwrap()
5055 .id()),
5056 editor_a1.id()
5057 );
5058
5059 // Following interrupts when client B disconnects.
5060 client_b.disconnect(&cx_b.to_async()).unwrap();
5061 cx_a.foreground().run_until_parked();
5062 assert_eq!(
5063 workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5064 None
5065 );
5066 }
5067
5068 #[gpui::test(iterations = 10)]
5069 async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5070 cx_a.foreground().forbid_parking();
5071 let fs = FakeFs::new(cx_a.background());
5072
5073 // 2 clients connect to a server.
5074 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5075 let mut client_a = server.create_client(cx_a, "user_a").await;
5076 let mut client_b = server.create_client(cx_b, "user_b").await;
5077 cx_a.update(editor::init);
5078 cx_b.update(editor::init);
5079
5080 // Client A shares a project.
5081 fs.insert_tree(
5082 "/a",
5083 json!({
5084 ".zed.toml": r#"collaborators = ["user_b"]"#,
5085 "1.txt": "one",
5086 "2.txt": "two",
5087 "3.txt": "three",
5088 "4.txt": "four",
5089 }),
5090 )
5091 .await;
5092 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5093 project_a
5094 .update(cx_a, |project, cx| project.share(cx))
5095 .await
5096 .unwrap();
5097
5098 // Client B joins the project.
5099 let project_b = client_b
5100 .build_remote_project(
5101 project_a
5102 .read_with(cx_a, |project, _| project.remote_id())
5103 .unwrap(),
5104 cx_b,
5105 )
5106 .await;
5107
5108 // Client A opens some editors.
5109 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5110 let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5111 let _editor_a1 = workspace_a
5112 .update(cx_a, |workspace, cx| {
5113 workspace.open_path((worktree_id, "1.txt"), true, cx)
5114 })
5115 .await
5116 .unwrap()
5117 .downcast::<Editor>()
5118 .unwrap();
5119
5120 // Client B opens an editor.
5121 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5122 let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5123 let _editor_b1 = workspace_b
5124 .update(cx_b, |workspace, cx| {
5125 workspace.open_path((worktree_id, "2.txt"), true, cx)
5126 })
5127 .await
5128 .unwrap()
5129 .downcast::<Editor>()
5130 .unwrap();
5131
5132 // Clients A and B follow each other in split panes
5133 workspace_a
5134 .update(cx_a, |workspace, cx| {
5135 workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5136 assert_ne!(*workspace.active_pane(), pane_a1);
5137 let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
5138 workspace
5139 .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5140 .unwrap()
5141 })
5142 .await
5143 .unwrap();
5144 workspace_b
5145 .update(cx_b, |workspace, cx| {
5146 workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5147 assert_ne!(*workspace.active_pane(), pane_b1);
5148 let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
5149 workspace
5150 .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5151 .unwrap()
5152 })
5153 .await
5154 .unwrap();
5155
5156 workspace_a
5157 .update(cx_a, |workspace, cx| {
5158 workspace.activate_next_pane(cx);
5159 assert_eq!(*workspace.active_pane(), pane_a1);
5160 workspace.open_path((worktree_id, "3.txt"), true, cx)
5161 })
5162 .await
5163 .unwrap();
5164 workspace_b
5165 .update(cx_b, |workspace, cx| {
5166 workspace.activate_next_pane(cx);
5167 assert_eq!(*workspace.active_pane(), pane_b1);
5168 workspace.open_path((worktree_id, "4.txt"), true, cx)
5169 })
5170 .await
5171 .unwrap();
5172 cx_a.foreground().run_until_parked();
5173
5174 // Ensure leader updates don't change the active pane of followers
5175 workspace_a.read_with(cx_a, |workspace, _| {
5176 assert_eq!(*workspace.active_pane(), pane_a1);
5177 });
5178 workspace_b.read_with(cx_b, |workspace, _| {
5179 assert_eq!(*workspace.active_pane(), pane_b1);
5180 });
5181
5182 // Ensure peers following each other doesn't cause an infinite loop.
5183 assert_eq!(
5184 workspace_a.read_with(cx_a, |workspace, cx| workspace
5185 .active_item(cx)
5186 .unwrap()
5187 .project_path(cx)),
5188 Some((worktree_id, "3.txt").into())
5189 );
5190 workspace_a.update(cx_a, |workspace, cx| {
5191 assert_eq!(
5192 workspace.active_item(cx).unwrap().project_path(cx),
5193 Some((worktree_id, "3.txt").into())
5194 );
5195 workspace.activate_next_pane(cx);
5196 assert_eq!(
5197 workspace.active_item(cx).unwrap().project_path(cx),
5198 Some((worktree_id, "4.txt").into())
5199 );
5200 });
5201 workspace_b.update(cx_b, |workspace, cx| {
5202 assert_eq!(
5203 workspace.active_item(cx).unwrap().project_path(cx),
5204 Some((worktree_id, "4.txt").into())
5205 );
5206 workspace.activate_next_pane(cx);
5207 assert_eq!(
5208 workspace.active_item(cx).unwrap().project_path(cx),
5209 Some((worktree_id, "3.txt").into())
5210 );
5211 });
5212 }
5213
5214 #[gpui::test(iterations = 10)]
5215 async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5216 cx_a.foreground().forbid_parking();
5217 let fs = FakeFs::new(cx_a.background());
5218
5219 // 2 clients connect to a server.
5220 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5221 let mut client_a = server.create_client(cx_a, "user_a").await;
5222 let mut client_b = server.create_client(cx_b, "user_b").await;
5223 cx_a.update(editor::init);
5224 cx_b.update(editor::init);
5225
5226 // Client A shares a project.
5227 fs.insert_tree(
5228 "/a",
5229 json!({
5230 ".zed.toml": r#"collaborators = ["user_b"]"#,
5231 "1.txt": "one",
5232 "2.txt": "two",
5233 "3.txt": "three",
5234 }),
5235 )
5236 .await;
5237 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5238 project_a
5239 .update(cx_a, |project, cx| project.share(cx))
5240 .await
5241 .unwrap();
5242
5243 // Client B joins the project.
5244 let project_b = client_b
5245 .build_remote_project(
5246 project_a
5247 .read_with(cx_a, |project, _| project.remote_id())
5248 .unwrap(),
5249 cx_b,
5250 )
5251 .await;
5252
5253 // Client A opens some editors.
5254 let workspace_a = client_a.build_workspace(&project_a, cx_a);
5255 let _editor_a1 = workspace_a
5256 .update(cx_a, |workspace, cx| {
5257 workspace.open_path((worktree_id, "1.txt"), true, cx)
5258 })
5259 .await
5260 .unwrap()
5261 .downcast::<Editor>()
5262 .unwrap();
5263
5264 // Client B starts following client A.
5265 let workspace_b = client_b.build_workspace(&project_b, cx_b);
5266 let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5267 let leader_id = project_b.read_with(cx_b, |project, _| {
5268 project.collaborators().values().next().unwrap().peer_id
5269 });
5270 workspace_b
5271 .update(cx_b, |workspace, cx| {
5272 workspace
5273 .toggle_follow(&ToggleFollow(leader_id), cx)
5274 .unwrap()
5275 })
5276 .await
5277 .unwrap();
5278 assert_eq!(
5279 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5280 Some(leader_id)
5281 );
5282 let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5283 workspace
5284 .active_item(cx)
5285 .unwrap()
5286 .downcast::<Editor>()
5287 .unwrap()
5288 });
5289
5290 // When client B moves, it automatically stops following client A.
5291 editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
5292 assert_eq!(
5293 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5294 None
5295 );
5296
5297 workspace_b
5298 .update(cx_b, |workspace, cx| {
5299 workspace
5300 .toggle_follow(&ToggleFollow(leader_id), cx)
5301 .unwrap()
5302 })
5303 .await
5304 .unwrap();
5305 assert_eq!(
5306 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5307 Some(leader_id)
5308 );
5309
5310 // When client B edits, it automatically stops following client A.
5311 editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
5312 assert_eq!(
5313 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5314 None
5315 );
5316
5317 workspace_b
5318 .update(cx_b, |workspace, cx| {
5319 workspace
5320 .toggle_follow(&ToggleFollow(leader_id), cx)
5321 .unwrap()
5322 })
5323 .await
5324 .unwrap();
5325 assert_eq!(
5326 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5327 Some(leader_id)
5328 );
5329
5330 // When client B scrolls, it automatically stops following client A.
5331 editor_b2.update(cx_b, |editor, cx| {
5332 editor.set_scroll_position(vec2f(0., 3.), cx)
5333 });
5334 assert_eq!(
5335 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5336 None
5337 );
5338
5339 workspace_b
5340 .update(cx_b, |workspace, cx| {
5341 workspace
5342 .toggle_follow(&ToggleFollow(leader_id), cx)
5343 .unwrap()
5344 })
5345 .await
5346 .unwrap();
5347 assert_eq!(
5348 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5349 Some(leader_id)
5350 );
5351
5352 // When client B activates a different pane, it continues following client A in the original pane.
5353 workspace_b.update(cx_b, |workspace, cx| {
5354 workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
5355 });
5356 assert_eq!(
5357 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5358 Some(leader_id)
5359 );
5360
5361 workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
5362 assert_eq!(
5363 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5364 Some(leader_id)
5365 );
5366
5367 // When client B activates a different item in the original pane, it automatically stops following client A.
5368 workspace_b
5369 .update(cx_b, |workspace, cx| {
5370 workspace.open_path((worktree_id, "2.txt"), true, cx)
5371 })
5372 .await
5373 .unwrap();
5374 assert_eq!(
5375 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5376 None
5377 );
5378 }
5379
5380 #[gpui::test(iterations = 100)]
5381 async fn test_random_collaboration(
5382 cx: &mut TestAppContext,
5383 deterministic: Arc<Deterministic>,
5384 rng: StdRng,
5385 ) {
5386 cx.foreground().forbid_parking();
5387 let max_peers = env::var("MAX_PEERS")
5388 .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
5389 .unwrap_or(5);
5390 assert!(max_peers <= 5);
5391
5392 let max_operations = env::var("OPERATIONS")
5393 .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
5394 .unwrap_or(10);
5395
5396 let rng = Arc::new(Mutex::new(rng));
5397
5398 let guest_lang_registry = Arc::new(LanguageRegistry::test());
5399 let host_language_registry = Arc::new(LanguageRegistry::test());
5400
5401 let fs = FakeFs::new(cx.background());
5402 fs.insert_tree(
5403 "/_collab",
5404 json!({
5405 ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4"]"#
5406 }),
5407 )
5408 .await;
5409
5410 let mut server = TestServer::start(cx.foreground(), cx.background()).await;
5411 let mut clients = Vec::new();
5412 let mut user_ids = Vec::new();
5413 let mut op_start_signals = Vec::new();
5414 let files = Arc::new(Mutex::new(Vec::new()));
5415
5416 let mut next_entity_id = 100000;
5417 let mut host_cx = TestAppContext::new(
5418 cx.foreground_platform(),
5419 cx.platform(),
5420 deterministic.build_foreground(next_entity_id),
5421 deterministic.build_background(),
5422 cx.font_cache(),
5423 cx.leak_detector(),
5424 next_entity_id,
5425 );
5426 let host = server.create_client(&mut host_cx, "host").await;
5427 let host_project = host_cx.update(|cx| {
5428 Project::local(
5429 host.client.clone(),
5430 host.user_store.clone(),
5431 host_language_registry.clone(),
5432 fs.clone(),
5433 cx,
5434 )
5435 });
5436 let host_project_id = host_project
5437 .update(&mut host_cx, |p, _| p.next_remote_id())
5438 .await;
5439
5440 let (collab_worktree, _) = host_project
5441 .update(&mut host_cx, |project, cx| {
5442 project.find_or_create_local_worktree("/_collab", true, cx)
5443 })
5444 .await
5445 .unwrap();
5446 collab_worktree
5447 .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
5448 .await;
5449 host_project
5450 .update(&mut host_cx, |project, cx| project.share(cx))
5451 .await
5452 .unwrap();
5453
5454 // Set up fake language servers.
5455 let mut language = Language::new(
5456 LanguageConfig {
5457 name: "Rust".into(),
5458 path_suffixes: vec!["rs".to_string()],
5459 ..Default::default()
5460 },
5461 None,
5462 );
5463 let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
5464 name: "the-fake-language-server",
5465 capabilities: lsp::LanguageServer::full_capabilities(),
5466 initializer: Some(Box::new({
5467 let rng = rng.clone();
5468 let files = files.clone();
5469 let project = host_project.downgrade();
5470 move |fake_server: &mut FakeLanguageServer| {
5471 fake_server.handle_request::<lsp::request::Completion, _, _>(
5472 |_, _| async move {
5473 Ok(Some(lsp::CompletionResponse::Array(vec![
5474 lsp::CompletionItem {
5475 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
5476 range: lsp::Range::new(
5477 lsp::Position::new(0, 0),
5478 lsp::Position::new(0, 0),
5479 ),
5480 new_text: "the-new-text".to_string(),
5481 })),
5482 ..Default::default()
5483 },
5484 ])))
5485 },
5486 );
5487
5488 fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
5489 |_, _| async move {
5490 Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
5491 lsp::CodeAction {
5492 title: "the-code-action".to_string(),
5493 ..Default::default()
5494 },
5495 )]))
5496 },
5497 );
5498
5499 fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
5500 |params, _| async move {
5501 Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
5502 params.position,
5503 params.position,
5504 ))))
5505 },
5506 );
5507
5508 fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
5509 let files = files.clone();
5510 let rng = rng.clone();
5511 move |_, _| {
5512 let files = files.clone();
5513 let rng = rng.clone();
5514 async move {
5515 let files = files.lock();
5516 let mut rng = rng.lock();
5517 let count = rng.gen_range::<usize, _>(1..3);
5518 let files = (0..count)
5519 .map(|_| files.choose(&mut *rng).unwrap())
5520 .collect::<Vec<_>>();
5521 log::info!("LSP: Returning definitions in files {:?}", &files);
5522 Ok(Some(lsp::GotoDefinitionResponse::Array(
5523 files
5524 .into_iter()
5525 .map(|file| lsp::Location {
5526 uri: lsp::Url::from_file_path(file).unwrap(),
5527 range: Default::default(),
5528 })
5529 .collect(),
5530 )))
5531 }
5532 }
5533 });
5534
5535 fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
5536 let rng = rng.clone();
5537 let project = project.clone();
5538 move |params, mut cx| {
5539 let highlights = if let Some(project) = project.upgrade(&cx) {
5540 project.update(&mut cx, |project, cx| {
5541 let path = params
5542 .text_document_position_params
5543 .text_document
5544 .uri
5545 .to_file_path()
5546 .unwrap();
5547 let (worktree, relative_path) =
5548 project.find_local_worktree(&path, cx)?;
5549 let project_path =
5550 ProjectPath::from((worktree.read(cx).id(), relative_path));
5551 let buffer =
5552 project.get_open_buffer(&project_path, cx)?.read(cx);
5553
5554 let mut highlights = Vec::new();
5555 let highlight_count = rng.lock().gen_range(1..=5);
5556 let mut prev_end = 0;
5557 for _ in 0..highlight_count {
5558 let range =
5559 buffer.random_byte_range(prev_end, &mut *rng.lock());
5560
5561 highlights.push(lsp::DocumentHighlight {
5562 range: range_to_lsp(range.to_point_utf16(buffer)),
5563 kind: Some(lsp::DocumentHighlightKind::READ),
5564 });
5565 prev_end = range.end;
5566 }
5567 Some(highlights)
5568 })
5569 } else {
5570 None
5571 };
5572 async move { Ok(highlights) }
5573 }
5574 });
5575 }
5576 })),
5577 ..Default::default()
5578 });
5579 host_language_registry.add(Arc::new(language));
5580
5581 let op_start_signal = futures::channel::mpsc::unbounded();
5582 user_ids.push(host.current_user_id(&host_cx));
5583 op_start_signals.push(op_start_signal.0);
5584 clients.push(host_cx.foreground().spawn(host.simulate_host(
5585 host_project,
5586 files,
5587 op_start_signal.1,
5588 rng.clone(),
5589 host_cx,
5590 )));
5591
5592 let disconnect_host_at = if rng.lock().gen_bool(0.2) {
5593 rng.lock().gen_range(0..max_operations)
5594 } else {
5595 max_operations
5596 };
5597 let mut available_guests = vec![
5598 "guest-1".to_string(),
5599 "guest-2".to_string(),
5600 "guest-3".to_string(),
5601 "guest-4".to_string(),
5602 ];
5603 let mut operations = 0;
5604 while operations < max_operations {
5605 if operations == disconnect_host_at {
5606 server.disconnect_client(user_ids[0]);
5607 cx.foreground().advance_clock(RECEIVE_TIMEOUT);
5608 drop(op_start_signals);
5609 let mut clients = futures::future::join_all(clients).await;
5610 cx.foreground().run_until_parked();
5611
5612 let (host, mut host_cx, host_err) = clients.remove(0);
5613 if let Some(host_err) = host_err {
5614 log::error!("host error - {}", host_err);
5615 }
5616 host.project
5617 .as_ref()
5618 .unwrap()
5619 .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
5620 for (guest, mut guest_cx, guest_err) in clients {
5621 if let Some(guest_err) = guest_err {
5622 log::error!("{} error - {}", guest.username, guest_err);
5623 }
5624 let contacts = server
5625 .store
5626 .read()
5627 .await
5628 .contacts_for_user(guest.current_user_id(&guest_cx));
5629 assert!(!contacts
5630 .iter()
5631 .flat_map(|contact| &contact.projects)
5632 .any(|project| project.id == host_project_id));
5633 guest
5634 .project
5635 .as_ref()
5636 .unwrap()
5637 .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
5638 guest_cx.update(|_| drop(guest));
5639 }
5640 host_cx.update(|_| drop(host));
5641
5642 return;
5643 }
5644
5645 let distribution = rng.lock().gen_range(0..100);
5646 match distribution {
5647 0..=19 if !available_guests.is_empty() => {
5648 let guest_ix = rng.lock().gen_range(0..available_guests.len());
5649 let guest_username = available_guests.remove(guest_ix);
5650 log::info!("Adding new connection for {}", guest_username);
5651 next_entity_id += 100000;
5652 let mut guest_cx = TestAppContext::new(
5653 cx.foreground_platform(),
5654 cx.platform(),
5655 deterministic.build_foreground(next_entity_id),
5656 deterministic.build_background(),
5657 cx.font_cache(),
5658 cx.leak_detector(),
5659 next_entity_id,
5660 );
5661 let guest = server.create_client(&mut guest_cx, &guest_username).await;
5662 let guest_project = Project::remote(
5663 host_project_id,
5664 guest.client.clone(),
5665 guest.user_store.clone(),
5666 guest_lang_registry.clone(),
5667 FakeFs::new(cx.background()),
5668 &mut guest_cx.to_async(),
5669 )
5670 .await
5671 .unwrap();
5672 let op_start_signal = futures::channel::mpsc::unbounded();
5673 user_ids.push(guest.current_user_id(&guest_cx));
5674 op_start_signals.push(op_start_signal.0);
5675 clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
5676 guest_username.clone(),
5677 guest_project,
5678 op_start_signal.1,
5679 rng.clone(),
5680 guest_cx,
5681 )));
5682
5683 log::info!("Added connection for {}", guest_username);
5684 operations += 1;
5685 }
5686 20..=29 if clients.len() > 1 => {
5687 log::info!("Removing guest");
5688 let guest_ix = rng.lock().gen_range(1..clients.len());
5689 let removed_guest_id = user_ids.remove(guest_ix);
5690 let guest = clients.remove(guest_ix);
5691 op_start_signals.remove(guest_ix);
5692 server.disconnect_client(removed_guest_id);
5693 cx.foreground().advance_clock(RECEIVE_TIMEOUT);
5694 let (guest, mut guest_cx, guest_err) = guest.await;
5695 if let Some(guest_err) = guest_err {
5696 log::error!("{} error - {}", guest.username, guest_err);
5697 }
5698 guest
5699 .project
5700 .as_ref()
5701 .unwrap()
5702 .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
5703 for user_id in &user_ids {
5704 for contact in server.store.read().await.contacts_for_user(*user_id) {
5705 assert_ne!(
5706 contact.user_id, removed_guest_id.0 as u64,
5707 "removed guest is still a contact of another peer"
5708 );
5709 for project in contact.projects {
5710 for project_guest_id in project.guests {
5711 assert_ne!(
5712 project_guest_id, removed_guest_id.0 as u64,
5713 "removed guest appears as still participating on a project"
5714 );
5715 }
5716 }
5717 }
5718 }
5719
5720 log::info!("{} removed", guest.username);
5721 available_guests.push(guest.username.clone());
5722 guest_cx.update(|_| drop(guest));
5723
5724 operations += 1;
5725 }
5726 _ => {
5727 while operations < max_operations && rng.lock().gen_bool(0.7) {
5728 op_start_signals
5729 .choose(&mut *rng.lock())
5730 .unwrap()
5731 .unbounded_send(())
5732 .unwrap();
5733 operations += 1;
5734 }
5735
5736 if rng.lock().gen_bool(0.8) {
5737 cx.foreground().run_until_parked();
5738 }
5739 }
5740 }
5741 }
5742
5743 drop(op_start_signals);
5744 let mut clients = futures::future::join_all(clients).await;
5745 cx.foreground().run_until_parked();
5746
5747 let (host_client, mut host_cx, host_err) = clients.remove(0);
5748 if let Some(host_err) = host_err {
5749 panic!("host error - {}", host_err);
5750 }
5751 let host_project = host_client.project.as_ref().unwrap();
5752 let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
5753 project
5754 .worktrees(cx)
5755 .map(|worktree| {
5756 let snapshot = worktree.read(cx).snapshot();
5757 (snapshot.id(), snapshot)
5758 })
5759 .collect::<BTreeMap<_, _>>()
5760 });
5761
5762 host_client
5763 .project
5764 .as_ref()
5765 .unwrap()
5766 .read_with(&host_cx, |project, cx| project.check_invariants(cx));
5767
5768 for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
5769 if let Some(guest_err) = guest_err {
5770 panic!("{} error - {}", guest_client.username, guest_err);
5771 }
5772 let worktree_snapshots =
5773 guest_client
5774 .project
5775 .as_ref()
5776 .unwrap()
5777 .read_with(&guest_cx, |project, cx| {
5778 project
5779 .worktrees(cx)
5780 .map(|worktree| {
5781 let worktree = worktree.read(cx);
5782 (worktree.id(), worktree.snapshot())
5783 })
5784 .collect::<BTreeMap<_, _>>()
5785 });
5786
5787 assert_eq!(
5788 worktree_snapshots.keys().collect::<Vec<_>>(),
5789 host_worktree_snapshots.keys().collect::<Vec<_>>(),
5790 "{} has different worktrees than the host",
5791 guest_client.username
5792 );
5793 for (id, host_snapshot) in &host_worktree_snapshots {
5794 let guest_snapshot = &worktree_snapshots[id];
5795 assert_eq!(
5796 guest_snapshot.root_name(),
5797 host_snapshot.root_name(),
5798 "{} has different root name than the host for worktree {}",
5799 guest_client.username,
5800 id
5801 );
5802 assert_eq!(
5803 guest_snapshot.entries(false).collect::<Vec<_>>(),
5804 host_snapshot.entries(false).collect::<Vec<_>>(),
5805 "{} has different snapshot than the host for worktree {}",
5806 guest_client.username,
5807 id
5808 );
5809 assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id());
5810 }
5811
5812 guest_client
5813 .project
5814 .as_ref()
5815 .unwrap()
5816 .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
5817
5818 for guest_buffer in &guest_client.buffers {
5819 let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
5820 let host_buffer = host_project.read_with(&host_cx, |project, cx| {
5821 project.buffer_for_id(buffer_id, cx).expect(&format!(
5822 "host does not have buffer for guest:{}, peer:{}, id:{}",
5823 guest_client.username, guest_client.peer_id, buffer_id
5824 ))
5825 });
5826 let path = host_buffer
5827 .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
5828
5829 assert_eq!(
5830 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
5831 0,
5832 "{}, buffer {}, path {:?} has deferred operations",
5833 guest_client.username,
5834 buffer_id,
5835 path,
5836 );
5837 assert_eq!(
5838 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
5839 host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
5840 "{}, buffer {}, path {:?}, differs from the host's buffer",
5841 guest_client.username,
5842 buffer_id,
5843 path
5844 );
5845 }
5846
5847 guest_cx.update(|_| drop(guest_client));
5848 }
5849
5850 host_cx.update(|_| drop(host_client));
5851 }
5852
5853 struct TestServer {
5854 peer: Arc<Peer>,
5855 app_state: Arc<AppState>,
5856 server: Arc<Server>,
5857 foreground: Rc<executor::Foreground>,
5858 notifications: mpsc::UnboundedReceiver<()>,
5859 connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
5860 forbid_connections: Arc<AtomicBool>,
5861 _test_db: TestDb,
5862 }
5863
5864 impl TestServer {
5865 async fn start(
5866 foreground: Rc<executor::Foreground>,
5867 background: Arc<executor::Background>,
5868 ) -> Self {
5869 let test_db = TestDb::fake(background);
5870 let app_state = Self::build_app_state(&test_db).await;
5871 let peer = Peer::new();
5872 let notifications = mpsc::unbounded();
5873 let server = Server::new(app_state.clone(), Some(notifications.0));
5874 Self {
5875 peer,
5876 app_state,
5877 server,
5878 foreground,
5879 notifications: notifications.1,
5880 connection_killers: Default::default(),
5881 forbid_connections: Default::default(),
5882 _test_db: test_db,
5883 }
5884 }
5885
5886 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
5887 cx.update(|cx| {
5888 let settings = Settings::test(cx);
5889 cx.set_global(settings);
5890 });
5891
5892 let http = FakeHttpClient::with_404_response();
5893 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
5894 let client_name = name.to_string();
5895 let mut client = Client::new(http.clone());
5896 let server = self.server.clone();
5897 let connection_killers = self.connection_killers.clone();
5898 let forbid_connections = self.forbid_connections.clone();
5899 let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
5900
5901 Arc::get_mut(&mut client)
5902 .unwrap()
5903 .override_authenticate(move |cx| {
5904 cx.spawn(|_| async move {
5905 let access_token = "the-token".to_string();
5906 Ok(Credentials {
5907 user_id: user_id.0 as u64,
5908 access_token,
5909 })
5910 })
5911 })
5912 .override_establish_connection(move |credentials, cx| {
5913 assert_eq!(credentials.user_id, user_id.0 as u64);
5914 assert_eq!(credentials.access_token, "the-token");
5915
5916 let server = server.clone();
5917 let connection_killers = connection_killers.clone();
5918 let forbid_connections = forbid_connections.clone();
5919 let client_name = client_name.clone();
5920 let connection_id_tx = connection_id_tx.clone();
5921 cx.spawn(move |cx| async move {
5922 if forbid_connections.load(SeqCst) {
5923 Err(EstablishConnectionError::other(anyhow!(
5924 "server is forbidding connections"
5925 )))
5926 } else {
5927 let (client_conn, server_conn, killed) =
5928 Connection::in_memory(cx.background());
5929 connection_killers.lock().insert(user_id, killed);
5930 cx.background()
5931 .spawn(server.handle_connection(
5932 server_conn,
5933 client_name,
5934 user_id,
5935 Some(connection_id_tx),
5936 cx.background(),
5937 ))
5938 .detach();
5939 Ok(client_conn)
5940 }
5941 })
5942 });
5943
5944 client
5945 .authenticate_and_connect(false, &cx.to_async())
5946 .await
5947 .unwrap();
5948
5949 Channel::init(&client);
5950 Project::init(&client);
5951 cx.update(|cx| {
5952 workspace::init(&client, cx);
5953 });
5954
5955 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
5956 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
5957
5958 let client = TestClient {
5959 client,
5960 peer_id,
5961 username: name.to_string(),
5962 user_store,
5963 language_registry: Arc::new(LanguageRegistry::test()),
5964 project: Default::default(),
5965 buffers: Default::default(),
5966 };
5967 client.wait_for_current_user(cx).await;
5968 client
5969 }
5970
5971 fn disconnect_client(&self, user_id: UserId) {
5972 self.connection_killers
5973 .lock()
5974 .remove(&user_id)
5975 .unwrap()
5976 .store(true, SeqCst);
5977 }
5978
5979 fn forbid_connections(&self) {
5980 self.forbid_connections.store(true, SeqCst);
5981 }
5982
5983 fn allow_connections(&self) {
5984 self.forbid_connections.store(false, SeqCst);
5985 }
5986
5987 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
5988 Arc::new(AppState {
5989 db: test_db.db().clone(),
5990 api_token: Default::default(),
5991 })
5992 }
5993
5994 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
5995 self.server.store.read().await
5996 }
5997
5998 async fn condition<F>(&mut self, mut predicate: F)
5999 where
6000 F: FnMut(&Store) -> bool,
6001 {
6002 assert!(
6003 self.foreground.parking_forbidden(),
6004 "you must call forbid_parking to use server conditions so we don't block indefinitely"
6005 );
6006 while !(predicate)(&*self.server.store.read().await) {
6007 self.foreground.start_waiting();
6008 self.notifications.next().await;
6009 self.foreground.finish_waiting();
6010 }
6011 }
6012 }
6013
6014 impl Deref for TestServer {
6015 type Target = Server;
6016
6017 fn deref(&self) -> &Self::Target {
6018 &self.server
6019 }
6020 }
6021
6022 impl Drop for TestServer {
6023 fn drop(&mut self) {
6024 self.peer.reset();
6025 }
6026 }
6027
6028 struct TestClient {
6029 client: Arc<Client>,
6030 username: String,
6031 pub peer_id: PeerId,
6032 pub user_store: ModelHandle<UserStore>,
6033 language_registry: Arc<LanguageRegistry>,
6034 project: Option<ModelHandle<Project>>,
6035 buffers: HashSet<ModelHandle<language::Buffer>>,
6036 }
6037
6038 impl Deref for TestClient {
6039 type Target = Arc<Client>;
6040
6041 fn deref(&self) -> &Self::Target {
6042 &self.client
6043 }
6044 }
6045
6046 impl TestClient {
6047 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
6048 UserId::from_proto(
6049 self.user_store
6050 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
6051 )
6052 }
6053
6054 async fn wait_for_current_user(&self, cx: &TestAppContext) {
6055 let mut authed_user = self
6056 .user_store
6057 .read_with(cx, |user_store, _| user_store.watch_current_user());
6058 while authed_user.next().await.unwrap().is_none() {}
6059 }
6060
6061 async fn build_local_project(
6062 &mut self,
6063 fs: Arc<FakeFs>,
6064 root_path: impl AsRef<Path>,
6065 cx: &mut TestAppContext,
6066 ) -> (ModelHandle<Project>, WorktreeId) {
6067 let project = cx.update(|cx| {
6068 Project::local(
6069 self.client.clone(),
6070 self.user_store.clone(),
6071 self.language_registry.clone(),
6072 fs,
6073 cx,
6074 )
6075 });
6076 self.project = Some(project.clone());
6077 let (worktree, _) = project
6078 .update(cx, |p, cx| {
6079 p.find_or_create_local_worktree(root_path, true, cx)
6080 })
6081 .await
6082 .unwrap();
6083 worktree
6084 .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
6085 .await;
6086 project
6087 .update(cx, |project, _| project.next_remote_id())
6088 .await;
6089 (project, worktree.read_with(cx, |tree, _| tree.id()))
6090 }
6091
6092 async fn build_remote_project(
6093 &mut self,
6094 project_id: u64,
6095 cx: &mut TestAppContext,
6096 ) -> ModelHandle<Project> {
6097 let project = Project::remote(
6098 project_id,
6099 self.client.clone(),
6100 self.user_store.clone(),
6101 self.language_registry.clone(),
6102 FakeFs::new(cx.background()),
6103 &mut cx.to_async(),
6104 )
6105 .await
6106 .unwrap();
6107 self.project = Some(project.clone());
6108 project
6109 }
6110
6111 fn build_workspace(
6112 &self,
6113 project: &ModelHandle<Project>,
6114 cx: &mut TestAppContext,
6115 ) -> ViewHandle<Workspace> {
6116 let (window_id, _) = cx.add_window(|_| EmptyView);
6117 cx.add_view(window_id, |cx| {
6118 let fs = project.read(cx).fs().clone();
6119 Workspace::new(
6120 &WorkspaceParams {
6121 fs,
6122 project: project.clone(),
6123 user_store: self.user_store.clone(),
6124 languages: self.language_registry.clone(),
6125 themes: ThemeRegistry::new((), cx.font_cache().clone()),
6126 channel_list: cx.add_model(|cx| {
6127 ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
6128 }),
6129 client: self.client.clone(),
6130 },
6131 cx,
6132 )
6133 })
6134 }
6135
6136 async fn simulate_host(
6137 mut self,
6138 project: ModelHandle<Project>,
6139 files: Arc<Mutex<Vec<PathBuf>>>,
6140 op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6141 rng: Arc<Mutex<StdRng>>,
6142 mut cx: TestAppContext,
6143 ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6144 async fn simulate_host_internal(
6145 client: &mut TestClient,
6146 project: ModelHandle<Project>,
6147 files: Arc<Mutex<Vec<PathBuf>>>,
6148 mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6149 rng: Arc<Mutex<StdRng>>,
6150 cx: &mut TestAppContext,
6151 ) -> anyhow::Result<()> {
6152 let fs = project.read_with(cx, |project, _| project.fs().clone());
6153
6154 while op_start_signal.next().await.is_some() {
6155 let distribution = rng.lock().gen_range::<usize, _>(0..100);
6156 match distribution {
6157 0..=20 if !files.lock().is_empty() => {
6158 let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
6159 let mut path = path.as_path();
6160 while let Some(parent_path) = path.parent() {
6161 path = parent_path;
6162 if rng.lock().gen() {
6163 break;
6164 }
6165 }
6166
6167 log::info!("Host: find/create local worktree {:?}", path);
6168 let find_or_create_worktree = project.update(cx, |project, cx| {
6169 project.find_or_create_local_worktree(path, true, cx)
6170 });
6171 if rng.lock().gen() {
6172 cx.background().spawn(find_or_create_worktree).detach();
6173 } else {
6174 find_or_create_worktree.await?;
6175 }
6176 }
6177 10..=80 if !files.lock().is_empty() => {
6178 let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6179 let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
6180 let (worktree, path) = project
6181 .update(cx, |project, cx| {
6182 project.find_or_create_local_worktree(
6183 file.clone(),
6184 true,
6185 cx,
6186 )
6187 })
6188 .await?;
6189 let project_path =
6190 worktree.read_with(cx, |worktree, _| (worktree.id(), path));
6191 log::info!(
6192 "Host: opening path {:?}, worktree {}, relative_path {:?}",
6193 file,
6194 project_path.0,
6195 project_path.1
6196 );
6197 let buffer = project
6198 .update(cx, |project, cx| project.open_buffer(project_path, cx))
6199 .await
6200 .unwrap();
6201 client.buffers.insert(buffer.clone());
6202 buffer
6203 } else {
6204 client
6205 .buffers
6206 .iter()
6207 .choose(&mut *rng.lock())
6208 .unwrap()
6209 .clone()
6210 };
6211
6212 if rng.lock().gen_bool(0.1) {
6213 cx.update(|cx| {
6214 log::info!(
6215 "Host: dropping buffer {:?}",
6216 buffer.read(cx).file().unwrap().full_path(cx)
6217 );
6218 client.buffers.remove(&buffer);
6219 drop(buffer);
6220 });
6221 } else {
6222 buffer.update(cx, |buffer, cx| {
6223 log::info!(
6224 "Host: updating buffer {:?} ({})",
6225 buffer.file().unwrap().full_path(cx),
6226 buffer.remote_id()
6227 );
6228
6229 if rng.lock().gen_bool(0.7) {
6230 buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6231 } else {
6232 buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6233 }
6234 });
6235 }
6236 }
6237 _ => loop {
6238 let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
6239 let mut path = PathBuf::new();
6240 path.push("/");
6241 for _ in 0..path_component_count {
6242 let letter = rng.lock().gen_range(b'a'..=b'z');
6243 path.push(std::str::from_utf8(&[letter]).unwrap());
6244 }
6245 path.set_extension("rs");
6246 let parent_path = path.parent().unwrap();
6247
6248 log::info!("Host: creating file {:?}", path,);
6249
6250 if fs.create_dir(&parent_path).await.is_ok()
6251 && fs.create_file(&path, Default::default()).await.is_ok()
6252 {
6253 files.lock().push(path);
6254 break;
6255 } else {
6256 log::info!("Host: cannot create file");
6257 }
6258 },
6259 }
6260
6261 cx.background().simulate_random_delay().await;
6262 }
6263
6264 Ok(())
6265 }
6266
6267 let result = simulate_host_internal(
6268 &mut self,
6269 project.clone(),
6270 files,
6271 op_start_signal,
6272 rng,
6273 &mut cx,
6274 )
6275 .await;
6276 log::info!("Host done");
6277 self.project = Some(project);
6278 (self, cx, result.err())
6279 }
6280
6281 pub async fn simulate_guest(
6282 mut self,
6283 guest_username: String,
6284 project: ModelHandle<Project>,
6285 op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6286 rng: Arc<Mutex<StdRng>>,
6287 mut cx: TestAppContext,
6288 ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6289 async fn simulate_guest_internal(
6290 client: &mut TestClient,
6291 guest_username: &str,
6292 project: ModelHandle<Project>,
6293 mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6294 rng: Arc<Mutex<StdRng>>,
6295 cx: &mut TestAppContext,
6296 ) -> anyhow::Result<()> {
6297 while op_start_signal.next().await.is_some() {
6298 let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6299 let worktree = if let Some(worktree) =
6300 project.read_with(cx, |project, cx| {
6301 project
6302 .worktrees(&cx)
6303 .filter(|worktree| {
6304 let worktree = worktree.read(cx);
6305 worktree.is_visible()
6306 && worktree.entries(false).any(|e| e.is_file())
6307 })
6308 .choose(&mut *rng.lock())
6309 }) {
6310 worktree
6311 } else {
6312 cx.background().simulate_random_delay().await;
6313 continue;
6314 };
6315
6316 let (worktree_root_name, project_path) =
6317 worktree.read_with(cx, |worktree, _| {
6318 let entry = worktree
6319 .entries(false)
6320 .filter(|e| e.is_file())
6321 .choose(&mut *rng.lock())
6322 .unwrap();
6323 (
6324 worktree.root_name().to_string(),
6325 (worktree.id(), entry.path.clone()),
6326 )
6327 });
6328 log::info!(
6329 "{}: opening path {:?} in worktree {} ({})",
6330 guest_username,
6331 project_path.1,
6332 project_path.0,
6333 worktree_root_name,
6334 );
6335 let buffer = project
6336 .update(cx, |project, cx| {
6337 project.open_buffer(project_path.clone(), cx)
6338 })
6339 .await?;
6340 log::info!(
6341 "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
6342 guest_username,
6343 project_path.1,
6344 project_path.0,
6345 worktree_root_name,
6346 buffer.read_with(cx, |buffer, _| buffer.remote_id())
6347 );
6348 client.buffers.insert(buffer.clone());
6349 buffer
6350 } else {
6351 client
6352 .buffers
6353 .iter()
6354 .choose(&mut *rng.lock())
6355 .unwrap()
6356 .clone()
6357 };
6358
6359 let choice = rng.lock().gen_range(0..100);
6360 match choice {
6361 0..=9 => {
6362 cx.update(|cx| {
6363 log::info!(
6364 "{}: dropping buffer {:?}",
6365 guest_username,
6366 buffer.read(cx).file().unwrap().full_path(cx)
6367 );
6368 client.buffers.remove(&buffer);
6369 drop(buffer);
6370 });
6371 }
6372 10..=19 => {
6373 let completions = project.update(cx, |project, cx| {
6374 log::info!(
6375 "{}: requesting completions for buffer {} ({:?})",
6376 guest_username,
6377 buffer.read(cx).remote_id(),
6378 buffer.read(cx).file().unwrap().full_path(cx)
6379 );
6380 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6381 project.completions(&buffer, offset, cx)
6382 });
6383 let completions = cx.background().spawn(async move {
6384 completions
6385 .await
6386 .map_err(|err| anyhow!("completions request failed: {:?}", err))
6387 });
6388 if rng.lock().gen_bool(0.3) {
6389 log::info!("{}: detaching completions request", guest_username);
6390 cx.update(|cx| completions.detach_and_log_err(cx));
6391 } else {
6392 completions.await?;
6393 }
6394 }
6395 20..=29 => {
6396 let code_actions = project.update(cx, |project, cx| {
6397 log::info!(
6398 "{}: requesting code actions for buffer {} ({:?})",
6399 guest_username,
6400 buffer.read(cx).remote_id(),
6401 buffer.read(cx).file().unwrap().full_path(cx)
6402 );
6403 let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
6404 project.code_actions(&buffer, range, cx)
6405 });
6406 let code_actions = cx.background().spawn(async move {
6407 code_actions.await.map_err(|err| {
6408 anyhow!("code actions request failed: {:?}", err)
6409 })
6410 });
6411 if rng.lock().gen_bool(0.3) {
6412 log::info!("{}: detaching code actions request", guest_username);
6413 cx.update(|cx| code_actions.detach_and_log_err(cx));
6414 } else {
6415 code_actions.await?;
6416 }
6417 }
6418 30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
6419 let (requested_version, save) = buffer.update(cx, |buffer, cx| {
6420 log::info!(
6421 "{}: saving buffer {} ({:?})",
6422 guest_username,
6423 buffer.remote_id(),
6424 buffer.file().unwrap().full_path(cx)
6425 );
6426 (buffer.version(), buffer.save(cx))
6427 });
6428 let save = cx.background().spawn(async move {
6429 let (saved_version, _) = save
6430 .await
6431 .map_err(|err| anyhow!("save request failed: {:?}", err))?;
6432 assert!(saved_version.observed_all(&requested_version));
6433 Ok::<_, anyhow::Error>(())
6434 });
6435 if rng.lock().gen_bool(0.3) {
6436 log::info!("{}: detaching save request", guest_username);
6437 cx.update(|cx| save.detach_and_log_err(cx));
6438 } else {
6439 save.await?;
6440 }
6441 }
6442 40..=44 => {
6443 let prepare_rename = project.update(cx, |project, cx| {
6444 log::info!(
6445 "{}: preparing rename for buffer {} ({:?})",
6446 guest_username,
6447 buffer.read(cx).remote_id(),
6448 buffer.read(cx).file().unwrap().full_path(cx)
6449 );
6450 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6451 project.prepare_rename(buffer, offset, cx)
6452 });
6453 let prepare_rename = cx.background().spawn(async move {
6454 prepare_rename.await.map_err(|err| {
6455 anyhow!("prepare rename request failed: {:?}", err)
6456 })
6457 });
6458 if rng.lock().gen_bool(0.3) {
6459 log::info!("{}: detaching prepare rename request", guest_username);
6460 cx.update(|cx| prepare_rename.detach_and_log_err(cx));
6461 } else {
6462 prepare_rename.await?;
6463 }
6464 }
6465 45..=49 => {
6466 let definitions = project.update(cx, |project, cx| {
6467 log::info!(
6468 "{}: requesting definitions for buffer {} ({:?})",
6469 guest_username,
6470 buffer.read(cx).remote_id(),
6471 buffer.read(cx).file().unwrap().full_path(cx)
6472 );
6473 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6474 project.definition(&buffer, offset, cx)
6475 });
6476 let definitions = cx.background().spawn(async move {
6477 definitions
6478 .await
6479 .map_err(|err| anyhow!("definitions request failed: {:?}", err))
6480 });
6481 if rng.lock().gen_bool(0.3) {
6482 log::info!("{}: detaching definitions request", guest_username);
6483 cx.update(|cx| definitions.detach_and_log_err(cx));
6484 } else {
6485 client
6486 .buffers
6487 .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
6488 }
6489 }
6490 50..=54 => {
6491 let highlights = project.update(cx, |project, cx| {
6492 log::info!(
6493 "{}: requesting highlights for buffer {} ({:?})",
6494 guest_username,
6495 buffer.read(cx).remote_id(),
6496 buffer.read(cx).file().unwrap().full_path(cx)
6497 );
6498 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6499 project.document_highlights(&buffer, offset, cx)
6500 });
6501 let highlights = cx.background().spawn(async move {
6502 highlights
6503 .await
6504 .map_err(|err| anyhow!("highlights request failed: {:?}", err))
6505 });
6506 if rng.lock().gen_bool(0.3) {
6507 log::info!("{}: detaching highlights request", guest_username);
6508 cx.update(|cx| highlights.detach_and_log_err(cx));
6509 } else {
6510 highlights.await?;
6511 }
6512 }
6513 55..=59 => {
6514 let search = project.update(cx, |project, cx| {
6515 let query = rng.lock().gen_range('a'..='z');
6516 log::info!("{}: project-wide search {:?}", guest_username, query);
6517 project.search(SearchQuery::text(query, false, false), cx)
6518 });
6519 let search = cx.background().spawn(async move {
6520 search
6521 .await
6522 .map_err(|err| anyhow!("search request failed: {:?}", err))
6523 });
6524 if rng.lock().gen_bool(0.3) {
6525 log::info!("{}: detaching search request", guest_username);
6526 cx.update(|cx| search.detach_and_log_err(cx));
6527 } else {
6528 client.buffers.extend(search.await?.into_keys());
6529 }
6530 }
6531 60..=69 => {
6532 let worktree = project
6533 .read_with(cx, |project, cx| {
6534 project
6535 .worktrees(&cx)
6536 .filter(|worktree| {
6537 let worktree = worktree.read(cx);
6538 worktree.is_visible()
6539 && worktree.entries(false).any(|e| e.is_file())
6540 && worktree
6541 .root_entry()
6542 .map_or(false, |e| e.is_dir())
6543 })
6544 .choose(&mut *rng.lock())
6545 })
6546 .unwrap();
6547 let (worktree_id, worktree_root_name) = worktree
6548 .read_with(cx, |worktree, _| {
6549 (worktree.id(), worktree.root_name().to_string())
6550 });
6551
6552 let mut new_name = String::new();
6553 for _ in 0..10 {
6554 let letter = rng.lock().gen_range('a'..='z');
6555 new_name.push(letter);
6556 }
6557 let mut new_path = PathBuf::new();
6558 new_path.push(new_name);
6559 new_path.set_extension("rs");
6560 log::info!(
6561 "{}: creating {:?} in worktree {} ({})",
6562 guest_username,
6563 new_path,
6564 worktree_id,
6565 worktree_root_name,
6566 );
6567 project
6568 .update(cx, |project, cx| {
6569 project.create_entry((worktree_id, new_path), false, cx)
6570 })
6571 .unwrap()
6572 .await?;
6573 }
6574 _ => {
6575 buffer.update(cx, |buffer, cx| {
6576 log::info!(
6577 "{}: updating buffer {} ({:?})",
6578 guest_username,
6579 buffer.remote_id(),
6580 buffer.file().unwrap().full_path(cx)
6581 );
6582 if rng.lock().gen_bool(0.7) {
6583 buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6584 } else {
6585 buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6586 }
6587 });
6588 }
6589 }
6590 cx.background().simulate_random_delay().await;
6591 }
6592 Ok(())
6593 }
6594
6595 let result = simulate_guest_internal(
6596 &mut self,
6597 &guest_username,
6598 project.clone(),
6599 op_start_signal,
6600 rng,
6601 &mut cx,
6602 )
6603 .await;
6604 log::info!("{}: done", guest_username);
6605
6606 self.project = Some(project);
6607 (self, cx, result.err())
6608 }
6609 }
6610
6611 impl Drop for TestClient {
6612 fn drop(&mut self) {
6613 self.client.tear_down();
6614 }
6615 }
6616
6617 impl Executor for Arc<gpui::executor::Background> {
6618 type Sleep = gpui::executor::Timer;
6619
6620 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
6621 self.spawn(future).detach();
6622 }
6623
6624 fn sleep(&self, duration: Duration) -> Self::Sleep {
6625 self.as_ref().timer(duration)
6626 }
6627 }
6628
6629 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
6630 channel
6631 .messages()
6632 .cursor::<()>()
6633 .map(|m| {
6634 (
6635 m.sender.github_login.clone(),
6636 m.body.clone(),
6637 m.is_pending(),
6638 )
6639 })
6640 .collect()
6641 }
6642
6643 struct EmptyView;
6644
6645 impl gpui::Entity for EmptyView {
6646 type Event = ();
6647 }
6648
6649 impl gpui::View for EmptyView {
6650 fn ui_name() -> &'static str {
6651 "empty view"
6652 }
6653
6654 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
6655 gpui::Element::boxed(gpui::elements::Empty)
6656 }
6657 }
6658}