1mod store;
2
3use super::{
4 auth::process_auth_header,
5 db::{ChannelId, MessageId, UserId},
6 AppState,
7};
8use anyhow::anyhow;
9use async_io::Timer;
10use async_std::{
11 sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
12 task,
13};
14use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
15use collections::{HashMap, HashSet};
16use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
17use log::{as_debug, as_display};
18use rpc::{
19 proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
20 Connection, ConnectionId, Peer, TypedEnvelope,
21};
22use sha1::{Digest as _, Sha1};
23use std::{
24 any::TypeId,
25 future::Future,
26 marker::PhantomData,
27 ops::{Deref, DerefMut},
28 rc::Rc,
29 sync::Arc,
30 time::{Duration, Instant},
31};
32use store::{Store, Worktree};
33use surf::StatusCode;
34use tide::{
35 http::headers::{HeaderName, CONNECTION, UPGRADE},
36 Request, Response,
37};
38use time::OffsetDateTime;
39use util::ResultExt;
40
41type MessageHandler = Box<
42 dyn Send
43 + Sync
44 + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
45>;
46
47pub struct Server {
48 peer: Arc<Peer>,
49 store: RwLock<Store>,
50 app_state: Arc<AppState>,
51 handlers: HashMap<TypeId, MessageHandler>,
52 notifications: Option<mpsc::UnboundedSender<()>>,
53}
54
55pub trait Executor: Send + Clone {
56 type Timer: Send + Future;
57 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
58 fn timer(&self, duration: Duration) -> Self::Timer;
59}
60
61#[derive(Clone)]
62pub struct RealExecutor;
63
64const MESSAGE_COUNT_PER_PAGE: usize = 100;
65const MAX_MESSAGE_LEN: usize = 1024;
66
67struct StoreReadGuard<'a> {
68 guard: RwLockReadGuard<'a, Store>,
69 _not_send: PhantomData<Rc<()>>,
70}
71
72struct StoreWriteGuard<'a> {
73 guard: RwLockWriteGuard<'a, Store>,
74 _not_send: PhantomData<Rc<()>>,
75}
76
77impl Server {
78 pub fn new(
79 app_state: Arc<AppState>,
80 peer: Arc<Peer>,
81 notifications: Option<mpsc::UnboundedSender<()>>,
82 ) -> Arc<Self> {
83 let mut server = Self {
84 peer,
85 app_state,
86 store: Default::default(),
87 handlers: Default::default(),
88 notifications,
89 };
90
91 server
92 .add_request_handler(Server::ping)
93 .add_request_handler(Server::register_project)
94 .add_message_handler(Server::unregister_project)
95 .add_request_handler(Server::share_project)
96 .add_message_handler(Server::unshare_project)
97 .add_request_handler(Server::join_project)
98 .add_message_handler(Server::leave_project)
99 .add_request_handler(Server::register_worktree)
100 .add_message_handler(Server::unregister_worktree)
101 .add_request_handler(Server::update_worktree)
102 .add_message_handler(Server::start_language_server)
103 .add_message_handler(Server::update_language_server)
104 .add_message_handler(Server::update_diagnostic_summary)
105 .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
106 .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
107 .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
108 .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
109 .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
110 .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
111 .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
112 .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
113 .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
114 .add_request_handler(
115 Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
116 )
117 .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
118 .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
119 .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
120 .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
121 .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
122 .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
123 .add_request_handler(Server::update_buffer)
124 .add_message_handler(Server::update_buffer_file)
125 .add_message_handler(Server::buffer_reloaded)
126 .add_message_handler(Server::buffer_saved)
127 .add_request_handler(Server::save_buffer)
128 .add_request_handler(Server::get_channels)
129 .add_request_handler(Server::get_users)
130 .add_request_handler(Server::join_channel)
131 .add_message_handler(Server::leave_channel)
132 .add_request_handler(Server::send_channel_message)
133 .add_request_handler(Server::follow)
134 .add_message_handler(Server::unfollow)
135 .add_message_handler(Server::update_followers)
136 .add_request_handler(Server::get_channel_messages);
137
138 Arc::new(server)
139 }
140
141 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
142 where
143 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
144 Fut: 'static + Send + Future<Output = tide::Result<()>>,
145 M: EnvelopedMessage,
146 {
147 let prev_handler = self.handlers.insert(
148 TypeId::of::<M>(),
149 Box::new(move |server, envelope| {
150 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
151 (handler)(server, *envelope).boxed()
152 }),
153 );
154 if prev_handler.is_some() {
155 panic!("registered a handler for the same message twice");
156 }
157 self
158 }
159
160 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
161 where
162 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
163 Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
164 M: RequestMessage,
165 {
166 self.add_message_handler(move |server, envelope| {
167 let receipt = envelope.receipt();
168 let response = (handler)(server.clone(), envelope);
169 async move {
170 match response.await {
171 Ok(response) => {
172 server.peer.respond(receipt, response)?;
173 Ok(())
174 }
175 Err(error) => {
176 server.peer.respond_with_error(
177 receipt,
178 proto::Error {
179 message: error.to_string(),
180 },
181 )?;
182 Err(error)
183 }
184 }
185 }
186 })
187 }
188
189 pub fn handle_connection<E: Executor>(
190 self: &Arc<Self>,
191 connection: Connection,
192 addr: String,
193 user_id: UserId,
194 mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
195 executor: E,
196 ) -> impl Future<Output = ()> {
197 let mut this = self.clone();
198 async move {
199 let (connection_id, handle_io, mut incoming_rx) = this
200 .peer
201 .add_connection(connection, {
202 let executor = executor.clone();
203 move |duration| {
204 let timer = executor.timer(duration);
205 async move {
206 timer.await;
207 }
208 }
209 })
210 .await;
211
212 if let Some(send_connection_id) = send_connection_id.as_mut() {
213 let _ = send_connection_id.send(connection_id).await;
214 }
215
216 this.state_mut()
217 .await
218 .add_connection(connection_id, user_id);
219 this.update_contacts_for_users(&[user_id]).await;
220
221 let handle_io = handle_io.fuse();
222 futures::pin_mut!(handle_io);
223 loop {
224 let next_message = incoming_rx.next().fuse();
225 futures::pin_mut!(next_message);
226 futures::select_biased! {
227 result = handle_io => {
228 if let Err(err) = result {
229 log::error!("error handling rpc connection {:?} - {:?}", addr, err);
230 }
231 break;
232 }
233 message = next_message => {
234 if let Some(message) = message {
235 let start_time = Instant::now();
236 let type_name = message.payload_type_name();
237 log::info!(connection_id = connection_id.0, type_name = type_name; "rpc message received");
238 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
239 let notifications = this.notifications.clone();
240 let is_background = message.is_background();
241 let handle_message = (handler)(this.clone(), message);
242 let handle_message = async move {
243 if let Err(err) = handle_message.await {
244 log::error!(connection_id = connection_id.0, type = type_name, error = as_display!(err); "rpc message error");
245 } else {
246 log::info!(connection_id = connection_id.0, type = type_name, duration = as_debug!(start_time.elapsed()); "rpc message handled");
247 }
248 if let Some(mut notifications) = notifications {
249 let _ = notifications.send(()).await;
250 }
251 };
252 if is_background {
253 executor.spawn_detached(handle_message);
254 } else {
255 handle_message.await;
256 }
257 } else {
258 log::warn!("unhandled message: {}", type_name);
259 }
260 } else {
261 log::info!(address = as_debug!(addr); "rpc connection closed");
262 break;
263 }
264 }
265 }
266 }
267
268 if let Err(err) = this.sign_out(connection_id).await {
269 log::error!("error signing out connection {:?} - {:?}", addr, err);
270 }
271 }
272 }
273
274 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
275 self.peer.disconnect(connection_id);
276 let removed_connection = self.state_mut().await.remove_connection(connection_id)?;
277
278 for (project_id, project) in removed_connection.hosted_projects {
279 if let Some(share) = project.share {
280 broadcast(
281 connection_id,
282 share.guests.keys().copied().collect(),
283 |conn_id| {
284 self.peer
285 .send(conn_id, proto::UnshareProject { project_id })
286 },
287 );
288 }
289 }
290
291 for (project_id, peer_ids) in removed_connection.guest_project_ids {
292 broadcast(connection_id, peer_ids, |conn_id| {
293 self.peer.send(
294 conn_id,
295 proto::RemoveProjectCollaborator {
296 project_id,
297 peer_id: connection_id.0,
298 },
299 )
300 });
301 }
302
303 self.update_contacts_for_users(removed_connection.contact_ids.iter())
304 .await;
305 Ok(())
306 }
307
308 async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
309 Ok(proto::Ack {})
310 }
311
312 async fn register_project(
313 mut self: Arc<Server>,
314 request: TypedEnvelope<proto::RegisterProject>,
315 ) -> tide::Result<proto::RegisterProjectResponse> {
316 let project_id = {
317 let mut state = self.state_mut().await;
318 let user_id = state.user_id_for_connection(request.sender_id)?;
319 state.register_project(request.sender_id, user_id)
320 };
321 Ok(proto::RegisterProjectResponse { project_id })
322 }
323
324 async fn unregister_project(
325 mut self: Arc<Server>,
326 request: TypedEnvelope<proto::UnregisterProject>,
327 ) -> tide::Result<()> {
328 let project = self
329 .state_mut()
330 .await
331 .unregister_project(request.payload.project_id, request.sender_id)?;
332 self.update_contacts_for_users(project.authorized_user_ids().iter())
333 .await;
334 Ok(())
335 }
336
337 async fn share_project(
338 mut self: Arc<Server>,
339 request: TypedEnvelope<proto::ShareProject>,
340 ) -> tide::Result<proto::Ack> {
341 self.state_mut()
342 .await
343 .share_project(request.payload.project_id, request.sender_id);
344 Ok(proto::Ack {})
345 }
346
347 async fn unshare_project(
348 mut self: Arc<Server>,
349 request: TypedEnvelope<proto::UnshareProject>,
350 ) -> tide::Result<()> {
351 let project_id = request.payload.project_id;
352 let project = self
353 .state_mut()
354 .await
355 .unshare_project(project_id, request.sender_id)?;
356
357 broadcast(request.sender_id, project.connection_ids, |conn_id| {
358 self.peer
359 .send(conn_id, proto::UnshareProject { project_id })
360 });
361 self.update_contacts_for_users(&project.authorized_user_ids)
362 .await;
363 Ok(())
364 }
365
366 async fn join_project(
367 mut self: Arc<Server>,
368 request: TypedEnvelope<proto::JoinProject>,
369 ) -> tide::Result<proto::JoinProjectResponse> {
370 let project_id = request.payload.project_id;
371
372 let user_id = self
373 .state()
374 .await
375 .user_id_for_connection(request.sender_id)?;
376 let (response, connection_ids, contact_user_ids) = self
377 .state_mut()
378 .await
379 .join_project(request.sender_id, user_id, project_id)
380 .and_then(|joined| {
381 let share = joined.project.share()?;
382 let peer_count = share.guests.len();
383 let mut collaborators = Vec::with_capacity(peer_count);
384 collaborators.push(proto::Collaborator {
385 peer_id: joined.project.host_connection_id.0,
386 replica_id: 0,
387 user_id: joined.project.host_user_id.to_proto(),
388 });
389 let worktrees = share
390 .worktrees
391 .iter()
392 .filter_map(|(id, shared_worktree)| {
393 let worktree = joined.project.worktrees.get(&id)?;
394 Some(proto::Worktree {
395 id: *id,
396 root_name: worktree.root_name.clone(),
397 entries: shared_worktree.entries.values().cloned().collect(),
398 diagnostic_summaries: shared_worktree
399 .diagnostic_summaries
400 .values()
401 .cloned()
402 .collect(),
403 visible: worktree.visible,
404 })
405 })
406 .collect();
407 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
408 if *peer_conn_id != request.sender_id {
409 collaborators.push(proto::Collaborator {
410 peer_id: peer_conn_id.0,
411 replica_id: *peer_replica_id as u32,
412 user_id: peer_user_id.to_proto(),
413 });
414 }
415 }
416 let response = proto::JoinProjectResponse {
417 worktrees,
418 replica_id: joined.replica_id as u32,
419 collaborators,
420 language_servers: joined.project.language_servers.clone(),
421 };
422 let connection_ids = joined.project.connection_ids();
423 let contact_user_ids = joined.project.authorized_user_ids();
424 Ok((response, connection_ids, contact_user_ids))
425 })?;
426
427 broadcast(request.sender_id, connection_ids, |conn_id| {
428 self.peer.send(
429 conn_id,
430 proto::AddProjectCollaborator {
431 project_id,
432 collaborator: Some(proto::Collaborator {
433 peer_id: request.sender_id.0,
434 replica_id: response.replica_id,
435 user_id: user_id.to_proto(),
436 }),
437 },
438 )
439 });
440 self.update_contacts_for_users(&contact_user_ids).await;
441 Ok(response)
442 }
443
444 async fn leave_project(
445 mut self: Arc<Server>,
446 request: TypedEnvelope<proto::LeaveProject>,
447 ) -> tide::Result<()> {
448 let sender_id = request.sender_id;
449 let project_id = request.payload.project_id;
450 let worktree = self
451 .state_mut()
452 .await
453 .leave_project(sender_id, project_id)?;
454
455 broadcast(sender_id, worktree.connection_ids, |conn_id| {
456 self.peer.send(
457 conn_id,
458 proto::RemoveProjectCollaborator {
459 project_id,
460 peer_id: sender_id.0,
461 },
462 )
463 });
464 self.update_contacts_for_users(&worktree.authorized_user_ids)
465 .await;
466
467 Ok(())
468 }
469
470 async fn register_worktree(
471 mut self: Arc<Server>,
472 request: TypedEnvelope<proto::RegisterWorktree>,
473 ) -> tide::Result<proto::Ack> {
474 let host_user_id = self
475 .state()
476 .await
477 .user_id_for_connection(request.sender_id)?;
478
479 let mut contact_user_ids = HashSet::default();
480 contact_user_ids.insert(host_user_id);
481 for github_login in &request.payload.authorized_logins {
482 let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
483 contact_user_ids.insert(contact_user_id);
484 }
485
486 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
487 let guest_connection_ids;
488 {
489 let mut state = self.state_mut().await;
490 guest_connection_ids = state
491 .read_project(request.payload.project_id, request.sender_id)?
492 .guest_connection_ids();
493 state.register_worktree(
494 request.payload.project_id,
495 request.payload.worktree_id,
496 request.sender_id,
497 Worktree {
498 authorized_user_ids: contact_user_ids.clone(),
499 root_name: request.payload.root_name.clone(),
500 visible: request.payload.visible,
501 },
502 )?;
503 }
504 broadcast(request.sender_id, guest_connection_ids, |connection_id| {
505 self.peer
506 .forward_send(request.sender_id, connection_id, request.payload.clone())
507 });
508 self.update_contacts_for_users(&contact_user_ids).await;
509 Ok(proto::Ack {})
510 }
511
512 async fn unregister_worktree(
513 mut self: Arc<Server>,
514 request: TypedEnvelope<proto::UnregisterWorktree>,
515 ) -> tide::Result<()> {
516 let project_id = request.payload.project_id;
517 let worktree_id = request.payload.worktree_id;
518 let (worktree, guest_connection_ids) = self.state_mut().await.unregister_worktree(
519 project_id,
520 worktree_id,
521 request.sender_id,
522 )?;
523 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
524 self.peer.send(
525 conn_id,
526 proto::UnregisterWorktree {
527 project_id,
528 worktree_id,
529 },
530 )
531 });
532 self.update_contacts_for_users(&worktree.authorized_user_ids)
533 .await;
534 Ok(())
535 }
536
537 async fn update_worktree(
538 mut self: Arc<Server>,
539 request: TypedEnvelope<proto::UpdateWorktree>,
540 ) -> tide::Result<proto::Ack> {
541 let connection_ids = self.state_mut().await.update_worktree(
542 request.sender_id,
543 request.payload.project_id,
544 request.payload.worktree_id,
545 &request.payload.removed_entries,
546 &request.payload.updated_entries,
547 )?;
548
549 broadcast(request.sender_id, connection_ids, |connection_id| {
550 self.peer
551 .forward_send(request.sender_id, connection_id, request.payload.clone())
552 });
553
554 Ok(proto::Ack {})
555 }
556
557 async fn update_diagnostic_summary(
558 mut self: Arc<Server>,
559 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
560 ) -> tide::Result<()> {
561 let summary = request
562 .payload
563 .summary
564 .clone()
565 .ok_or_else(|| anyhow!("invalid summary"))?;
566 let receiver_ids = self.state_mut().await.update_diagnostic_summary(
567 request.payload.project_id,
568 request.payload.worktree_id,
569 request.sender_id,
570 summary,
571 )?;
572
573 broadcast(request.sender_id, receiver_ids, |connection_id| {
574 self.peer
575 .forward_send(request.sender_id, connection_id, request.payload.clone())
576 });
577 Ok(())
578 }
579
580 async fn start_language_server(
581 mut self: Arc<Server>,
582 request: TypedEnvelope<proto::StartLanguageServer>,
583 ) -> tide::Result<()> {
584 let receiver_ids = self.state_mut().await.start_language_server(
585 request.payload.project_id,
586 request.sender_id,
587 request
588 .payload
589 .server
590 .clone()
591 .ok_or_else(|| anyhow!("invalid language server"))?,
592 )?;
593 broadcast(request.sender_id, receiver_ids, |connection_id| {
594 self.peer
595 .forward_send(request.sender_id, connection_id, request.payload.clone())
596 });
597 Ok(())
598 }
599
600 async fn update_language_server(
601 self: Arc<Server>,
602 request: TypedEnvelope<proto::UpdateLanguageServer>,
603 ) -> tide::Result<()> {
604 let receiver_ids = self
605 .state()
606 .await
607 .project_connection_ids(request.payload.project_id, request.sender_id)?;
608 broadcast(request.sender_id, receiver_ids, |connection_id| {
609 self.peer
610 .forward_send(request.sender_id, connection_id, request.payload.clone())
611 });
612 Ok(())
613 }
614
615 async fn forward_project_request<T>(
616 self: Arc<Server>,
617 request: TypedEnvelope<T>,
618 ) -> tide::Result<T::Response>
619 where
620 T: EntityMessage + RequestMessage,
621 {
622 let host_connection_id = self
623 .state()
624 .await
625 .read_project(request.payload.remote_entity_id(), request.sender_id)?
626 .host_connection_id;
627 Ok(self
628 .peer
629 .forward_request(request.sender_id, host_connection_id, request.payload)
630 .await?)
631 }
632
633 async fn save_buffer(
634 self: Arc<Server>,
635 request: TypedEnvelope<proto::SaveBuffer>,
636 ) -> tide::Result<proto::BufferSaved> {
637 let host = self
638 .state()
639 .await
640 .read_project(request.payload.project_id, request.sender_id)?
641 .host_connection_id;
642 let response = self
643 .peer
644 .forward_request(request.sender_id, host, request.payload.clone())
645 .await?;
646
647 let mut guests = self
648 .state()
649 .await
650 .read_project(request.payload.project_id, request.sender_id)?
651 .connection_ids();
652 guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
653 broadcast(host, guests, |conn_id| {
654 self.peer.forward_send(host, conn_id, response.clone())
655 });
656
657 Ok(response)
658 }
659
660 async fn update_buffer(
661 self: Arc<Server>,
662 request: TypedEnvelope<proto::UpdateBuffer>,
663 ) -> tide::Result<proto::Ack> {
664 let receiver_ids = self
665 .state()
666 .await
667 .project_connection_ids(request.payload.project_id, request.sender_id)?;
668 broadcast(request.sender_id, receiver_ids, |connection_id| {
669 self.peer
670 .forward_send(request.sender_id, connection_id, request.payload.clone())
671 });
672 Ok(proto::Ack {})
673 }
674
675 async fn update_buffer_file(
676 self: Arc<Server>,
677 request: TypedEnvelope<proto::UpdateBufferFile>,
678 ) -> tide::Result<()> {
679 let receiver_ids = self
680 .state()
681 .await
682 .project_connection_ids(request.payload.project_id, request.sender_id)?;
683 broadcast(request.sender_id, receiver_ids, |connection_id| {
684 self.peer
685 .forward_send(request.sender_id, connection_id, request.payload.clone())
686 });
687 Ok(())
688 }
689
690 async fn buffer_reloaded(
691 self: Arc<Server>,
692 request: TypedEnvelope<proto::BufferReloaded>,
693 ) -> tide::Result<()> {
694 let receiver_ids = self
695 .state()
696 .await
697 .project_connection_ids(request.payload.project_id, request.sender_id)?;
698 broadcast(request.sender_id, receiver_ids, |connection_id| {
699 self.peer
700 .forward_send(request.sender_id, connection_id, request.payload.clone())
701 });
702 Ok(())
703 }
704
705 async fn buffer_saved(
706 self: Arc<Server>,
707 request: TypedEnvelope<proto::BufferSaved>,
708 ) -> tide::Result<()> {
709 let receiver_ids = self
710 .state()
711 .await
712 .project_connection_ids(request.payload.project_id, request.sender_id)?;
713 broadcast(request.sender_id, receiver_ids, |connection_id| {
714 self.peer
715 .forward_send(request.sender_id, connection_id, request.payload.clone())
716 });
717 Ok(())
718 }
719
720 async fn follow(
721 self: Arc<Self>,
722 request: TypedEnvelope<proto::Follow>,
723 ) -> tide::Result<proto::FollowResponse> {
724 let leader_id = ConnectionId(request.payload.leader_id);
725 let follower_id = request.sender_id;
726 if !self
727 .state()
728 .await
729 .project_connection_ids(request.payload.project_id, follower_id)?
730 .contains(&leader_id)
731 {
732 Err(anyhow!("no such peer"))?;
733 }
734 let mut response = self
735 .peer
736 .forward_request(request.sender_id, leader_id, request.payload)
737 .await?;
738 response
739 .views
740 .retain(|view| view.leader_id != Some(follower_id.0));
741 Ok(response)
742 }
743
744 async fn unfollow(
745 self: Arc<Self>,
746 request: TypedEnvelope<proto::Unfollow>,
747 ) -> tide::Result<()> {
748 let leader_id = ConnectionId(request.payload.leader_id);
749 if !self
750 .state()
751 .await
752 .project_connection_ids(request.payload.project_id, request.sender_id)?
753 .contains(&leader_id)
754 {
755 Err(anyhow!("no such peer"))?;
756 }
757 self.peer
758 .forward_send(request.sender_id, leader_id, request.payload)?;
759 Ok(())
760 }
761
762 async fn update_followers(
763 self: Arc<Self>,
764 request: TypedEnvelope<proto::UpdateFollowers>,
765 ) -> tide::Result<()> {
766 let connection_ids = self
767 .state()
768 .await
769 .project_connection_ids(request.payload.project_id, request.sender_id)?;
770 let leader_id = request
771 .payload
772 .variant
773 .as_ref()
774 .and_then(|variant| match variant {
775 proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
776 proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
777 proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
778 });
779 for follower_id in &request.payload.follower_ids {
780 let follower_id = ConnectionId(*follower_id);
781 if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
782 self.peer
783 .forward_send(request.sender_id, follower_id, request.payload.clone())?;
784 }
785 }
786 Ok(())
787 }
788
789 async fn get_channels(
790 self: Arc<Server>,
791 request: TypedEnvelope<proto::GetChannels>,
792 ) -> tide::Result<proto::GetChannelsResponse> {
793 let user_id = self
794 .state()
795 .await
796 .user_id_for_connection(request.sender_id)?;
797 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
798 Ok(proto::GetChannelsResponse {
799 channels: channels
800 .into_iter()
801 .map(|chan| proto::Channel {
802 id: chan.id.to_proto(),
803 name: chan.name,
804 })
805 .collect(),
806 })
807 }
808
809 async fn get_users(
810 self: Arc<Server>,
811 request: TypedEnvelope<proto::GetUsers>,
812 ) -> tide::Result<proto::GetUsersResponse> {
813 let user_ids = request
814 .payload
815 .user_ids
816 .into_iter()
817 .map(UserId::from_proto)
818 .collect();
819 let users = self
820 .app_state
821 .db
822 .get_users_by_ids(user_ids)
823 .await?
824 .into_iter()
825 .map(|user| proto::User {
826 id: user.id.to_proto(),
827 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
828 github_login: user.github_login,
829 })
830 .collect();
831 Ok(proto::GetUsersResponse { users })
832 }
833
834 async fn update_contacts_for_users<'a>(
835 self: &Arc<Server>,
836 user_ids: impl IntoIterator<Item = &'a UserId>,
837 ) {
838 let state = self.state().await;
839 for user_id in user_ids {
840 let contacts = state.contacts_for_user(*user_id);
841 for connection_id in state.connection_ids_for_user(*user_id) {
842 self.peer
843 .send(
844 connection_id,
845 proto::UpdateContacts {
846 contacts: contacts.clone(),
847 },
848 )
849 .log_err();
850 }
851 }
852 }
853
854 async fn join_channel(
855 mut self: Arc<Self>,
856 request: TypedEnvelope<proto::JoinChannel>,
857 ) -> tide::Result<proto::JoinChannelResponse> {
858 let user_id = self
859 .state()
860 .await
861 .user_id_for_connection(request.sender_id)?;
862 let channel_id = ChannelId::from_proto(request.payload.channel_id);
863 if !self
864 .app_state
865 .db
866 .can_user_access_channel(user_id, channel_id)
867 .await?
868 {
869 Err(anyhow!("access denied"))?;
870 }
871
872 self.state_mut()
873 .await
874 .join_channel(request.sender_id, channel_id);
875 let messages = self
876 .app_state
877 .db
878 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
879 .await?
880 .into_iter()
881 .map(|msg| proto::ChannelMessage {
882 id: msg.id.to_proto(),
883 body: msg.body,
884 timestamp: msg.sent_at.unix_timestamp() as u64,
885 sender_id: msg.sender_id.to_proto(),
886 nonce: Some(msg.nonce.as_u128().into()),
887 })
888 .collect::<Vec<_>>();
889 Ok(proto::JoinChannelResponse {
890 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
891 messages,
892 })
893 }
894
895 async fn leave_channel(
896 mut self: Arc<Self>,
897 request: TypedEnvelope<proto::LeaveChannel>,
898 ) -> tide::Result<()> {
899 let user_id = self
900 .state()
901 .await
902 .user_id_for_connection(request.sender_id)?;
903 let channel_id = ChannelId::from_proto(request.payload.channel_id);
904 if !self
905 .app_state
906 .db
907 .can_user_access_channel(user_id, channel_id)
908 .await?
909 {
910 Err(anyhow!("access denied"))?;
911 }
912
913 self.state_mut()
914 .await
915 .leave_channel(request.sender_id, channel_id);
916
917 Ok(())
918 }
919
920 async fn send_channel_message(
921 self: Arc<Self>,
922 request: TypedEnvelope<proto::SendChannelMessage>,
923 ) -> tide::Result<proto::SendChannelMessageResponse> {
924 let channel_id = ChannelId::from_proto(request.payload.channel_id);
925 let user_id;
926 let connection_ids;
927 {
928 let state = self.state().await;
929 user_id = state.user_id_for_connection(request.sender_id)?;
930 connection_ids = state.channel_connection_ids(channel_id)?;
931 }
932
933 // Validate the message body.
934 let body = request.payload.body.trim().to_string();
935 if body.len() > MAX_MESSAGE_LEN {
936 return Err(anyhow!("message is too long"))?;
937 }
938 if body.is_empty() {
939 return Err(anyhow!("message can't be blank"))?;
940 }
941
942 let timestamp = OffsetDateTime::now_utc();
943 let nonce = request
944 .payload
945 .nonce
946 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
947
948 let message_id = self
949 .app_state
950 .db
951 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
952 .await?
953 .to_proto();
954 let message = proto::ChannelMessage {
955 sender_id: user_id.to_proto(),
956 id: message_id,
957 body,
958 timestamp: timestamp.unix_timestamp() as u64,
959 nonce: Some(nonce),
960 };
961 broadcast(request.sender_id, connection_ids, |conn_id| {
962 self.peer.send(
963 conn_id,
964 proto::ChannelMessageSent {
965 channel_id: channel_id.to_proto(),
966 message: Some(message.clone()),
967 },
968 )
969 });
970 Ok(proto::SendChannelMessageResponse {
971 message: Some(message),
972 })
973 }
974
975 async fn get_channel_messages(
976 self: Arc<Self>,
977 request: TypedEnvelope<proto::GetChannelMessages>,
978 ) -> tide::Result<proto::GetChannelMessagesResponse> {
979 let user_id = self
980 .state()
981 .await
982 .user_id_for_connection(request.sender_id)?;
983 let channel_id = ChannelId::from_proto(request.payload.channel_id);
984 if !self
985 .app_state
986 .db
987 .can_user_access_channel(user_id, channel_id)
988 .await?
989 {
990 Err(anyhow!("access denied"))?;
991 }
992
993 let messages = self
994 .app_state
995 .db
996 .get_channel_messages(
997 channel_id,
998 MESSAGE_COUNT_PER_PAGE,
999 Some(MessageId::from_proto(request.payload.before_message_id)),
1000 )
1001 .await?
1002 .into_iter()
1003 .map(|msg| proto::ChannelMessage {
1004 id: msg.id.to_proto(),
1005 body: msg.body,
1006 timestamp: msg.sent_at.unix_timestamp() as u64,
1007 sender_id: msg.sender_id.to_proto(),
1008 nonce: Some(msg.nonce.as_u128().into()),
1009 })
1010 .collect::<Vec<_>>();
1011
1012 Ok(proto::GetChannelMessagesResponse {
1013 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1014 messages,
1015 })
1016 }
1017
1018 async fn state<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1019 #[cfg(test)]
1020 async_std::task::yield_now().await;
1021 let guard = self.store.read().await;
1022 #[cfg(test)]
1023 async_std::task::yield_now().await;
1024 StoreReadGuard {
1025 guard,
1026 _not_send: PhantomData,
1027 }
1028 }
1029
1030 async fn state_mut<'a>(self: &'a mut Arc<Self>) -> StoreWriteGuard<'a> {
1031 #[cfg(test)]
1032 async_std::task::yield_now().await;
1033 let guard = self.store.write().await;
1034 #[cfg(test)]
1035 async_std::task::yield_now().await;
1036 StoreWriteGuard {
1037 guard,
1038 _not_send: PhantomData,
1039 }
1040 }
1041}
1042
1043impl<'a> Deref for StoreReadGuard<'a> {
1044 type Target = Store;
1045
1046 fn deref(&self) -> &Self::Target {
1047 &*self.guard
1048 }
1049}
1050
1051impl<'a> Deref for StoreWriteGuard<'a> {
1052 type Target = Store;
1053
1054 fn deref(&self) -> &Self::Target {
1055 &*self.guard
1056 }
1057}
1058
1059impl<'a> DerefMut for StoreWriteGuard<'a> {
1060 fn deref_mut(&mut self) -> &mut Self::Target {
1061 &mut *self.guard
1062 }
1063}
1064
1065impl<'a> Drop for StoreWriteGuard<'a> {
1066 fn drop(&mut self) {
1067 #[cfg(test)]
1068 self.check_invariants();
1069 }
1070}
1071
1072impl Executor for RealExecutor {
1073 type Timer = Timer;
1074
1075 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1076 task::spawn(future);
1077 }
1078
1079 fn timer(&self, duration: Duration) -> Self::Timer {
1080 Timer::after(duration)
1081 }
1082}
1083
1084fn broadcast<F>(sender_id: ConnectionId, receiver_ids: Vec<ConnectionId>, mut f: F)
1085where
1086 F: FnMut(ConnectionId) -> anyhow::Result<()>,
1087{
1088 for receiver_id in receiver_ids {
1089 if receiver_id != sender_id {
1090 f(receiver_id).log_err();
1091 }
1092 }
1093}
1094
1095pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1096 let server = Server::new(app.state().clone(), rpc.clone(), None);
1097 app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1098 let server = server.clone();
1099 async move {
1100 const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1101
1102 let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1103 let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1104 let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1105 let client_protocol_version: Option<u32> = request
1106 .header("X-Zed-Protocol-Version")
1107 .and_then(|v| v.as_str().parse().ok());
1108
1109 if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1110 return Ok(Response::new(StatusCode::UpgradeRequired));
1111 }
1112
1113 let header = match request.header("Sec-Websocket-Key") {
1114 Some(h) => h.as_str(),
1115 None => return Err(anyhow!("expected sec-websocket-key"))?,
1116 };
1117
1118 let user_id = process_auth_header(&request).await?;
1119
1120 let mut response = Response::new(StatusCode::SwitchingProtocols);
1121 response.insert_header(UPGRADE, "websocket");
1122 response.insert_header(CONNECTION, "Upgrade");
1123 let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1124 response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1125 response.insert_header("Sec-Websocket-Version", "13");
1126
1127 let http_res: &mut tide::http::Response = response.as_mut();
1128 let upgrade_receiver = http_res.recv_upgrade().await;
1129 let addr = request.remote().unwrap_or("unknown").to_string();
1130 task::spawn(async move {
1131 if let Some(stream) = upgrade_receiver.await {
1132 server
1133 .handle_connection(
1134 Connection::new(
1135 WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1136 ),
1137 addr,
1138 user_id,
1139 None,
1140 RealExecutor,
1141 )
1142 .await;
1143 }
1144 });
1145
1146 Ok(response)
1147 }
1148 });
1149}
1150
1151fn header_contains_ignore_case<T>(
1152 request: &tide::Request<T>,
1153 header_name: HeaderName,
1154 value: &str,
1155) -> bool {
1156 request
1157 .header(header_name)
1158 .map(|h| {
1159 h.as_str()
1160 .split(',')
1161 .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1162 })
1163 .unwrap_or(false)
1164}
1165
1166#[cfg(test)]
1167mod tests {
1168 use super::*;
1169 use crate::{
1170 auth,
1171 db::{tests::TestDb, UserId},
1172 github, AppState, Config,
1173 };
1174 use ::rpc::Peer;
1175 use client::{
1176 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1177 EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1178 };
1179 use collections::BTreeMap;
1180 use editor::{
1181 self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1182 ToOffset, ToggleCodeActions, Undo,
1183 };
1184 use gpui::{
1185 executor::{self, Deterministic},
1186 geometry::vector::vec2f,
1187 ModelHandle, TestAppContext, ViewHandle,
1188 };
1189 use language::{
1190 range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1191 LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1192 };
1193 use lsp::{self, FakeLanguageServer};
1194 use parking_lot::Mutex;
1195 use project::{
1196 fs::{FakeFs, Fs as _},
1197 search::SearchQuery,
1198 worktree::WorktreeHandle,
1199 DiagnosticSummary, Project, ProjectPath, WorktreeId,
1200 };
1201 use rand::prelude::*;
1202 use rpc::PeerId;
1203 use serde_json::json;
1204 use settings::Settings;
1205 use sqlx::types::time::OffsetDateTime;
1206 use std::{
1207 env,
1208 ops::Deref,
1209 path::{Path, PathBuf},
1210 rc::Rc,
1211 sync::{
1212 atomic::{AtomicBool, Ordering::SeqCst},
1213 Arc,
1214 },
1215 time::Duration,
1216 };
1217 use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
1218
1219 #[cfg(test)]
1220 #[ctor::ctor]
1221 fn init_logger() {
1222 if std::env::var("RUST_LOG").is_ok() {
1223 env_logger::init();
1224 }
1225 }
1226
1227 #[gpui::test(iterations = 10)]
1228 async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1229 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1230 let lang_registry = Arc::new(LanguageRegistry::test());
1231 let fs = FakeFs::new(cx_a.background());
1232 cx_a.foreground().forbid_parking();
1233
1234 // Connect to a server as 2 clients.
1235 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1236 let client_a = server.create_client(cx_a, "user_a").await;
1237 let client_b = server.create_client(cx_b, "user_b").await;
1238
1239 // Share a project as client A
1240 fs.insert_tree(
1241 "/a",
1242 json!({
1243 ".zed.toml": r#"collaborators = ["user_b"]"#,
1244 "a.txt": "a-contents",
1245 "b.txt": "b-contents",
1246 }),
1247 )
1248 .await;
1249 let project_a = cx_a.update(|cx| {
1250 Project::local(
1251 client_a.clone(),
1252 client_a.user_store.clone(),
1253 lang_registry.clone(),
1254 fs.clone(),
1255 cx,
1256 )
1257 });
1258 let (worktree_a, _) = project_a
1259 .update(cx_a, |p, cx| {
1260 p.find_or_create_local_worktree("/a", true, cx)
1261 })
1262 .await
1263 .unwrap();
1264 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1265 worktree_a
1266 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1267 .await;
1268 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1269 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1270
1271 // Join that project as client B
1272 let project_b = Project::remote(
1273 project_id,
1274 client_b.clone(),
1275 client_b.user_store.clone(),
1276 lang_registry.clone(),
1277 fs.clone(),
1278 &mut cx_b.to_async(),
1279 )
1280 .await
1281 .unwrap();
1282
1283 let replica_id_b = project_b.read_with(cx_b, |project, _| {
1284 assert_eq!(
1285 project
1286 .collaborators()
1287 .get(&client_a.peer_id)
1288 .unwrap()
1289 .user
1290 .github_login,
1291 "user_a"
1292 );
1293 project.replica_id()
1294 });
1295 project_a
1296 .condition(&cx_a, |tree, _| {
1297 tree.collaborators()
1298 .get(&client_b.peer_id)
1299 .map_or(false, |collaborator| {
1300 collaborator.replica_id == replica_id_b
1301 && collaborator.user.github_login == "user_b"
1302 })
1303 })
1304 .await;
1305
1306 // Open the same file as client B and client A.
1307 let buffer_b = project_b
1308 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1309 .await
1310 .unwrap();
1311 buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1312 project_a.read_with(cx_a, |project, cx| {
1313 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1314 });
1315 let buffer_a = project_a
1316 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1317 .await
1318 .unwrap();
1319
1320 let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1321
1322 // TODO
1323 // // Create a selection set as client B and see that selection set as client A.
1324 // buffer_a
1325 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1326 // .await;
1327
1328 // Edit the buffer as client B and see that edit as client A.
1329 editor_b.update(cx_b, |editor, cx| {
1330 editor.handle_input(&Input("ok, ".into()), cx)
1331 });
1332 buffer_a
1333 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1334 .await;
1335
1336 // TODO
1337 // // Remove the selection set as client B, see those selections disappear as client A.
1338 cx_b.update(move |_| drop(editor_b));
1339 // buffer_a
1340 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1341 // .await;
1342
1343 // Dropping the client B's project removes client B from client A's collaborators.
1344 cx_b.update(move |_| drop(project_b));
1345 project_a
1346 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1347 .await;
1348 }
1349
1350 #[gpui::test(iterations = 10)]
1351 async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1352 let lang_registry = Arc::new(LanguageRegistry::test());
1353 let fs = FakeFs::new(cx_a.background());
1354 cx_a.foreground().forbid_parking();
1355
1356 // Connect to a server as 2 clients.
1357 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1358 let client_a = server.create_client(cx_a, "user_a").await;
1359 let client_b = server.create_client(cx_b, "user_b").await;
1360
1361 // Share a project as client A
1362 fs.insert_tree(
1363 "/a",
1364 json!({
1365 ".zed.toml": r#"collaborators = ["user_b"]"#,
1366 "a.txt": "a-contents",
1367 "b.txt": "b-contents",
1368 }),
1369 )
1370 .await;
1371 let project_a = cx_a.update(|cx| {
1372 Project::local(
1373 client_a.clone(),
1374 client_a.user_store.clone(),
1375 lang_registry.clone(),
1376 fs.clone(),
1377 cx,
1378 )
1379 });
1380 let (worktree_a, _) = project_a
1381 .update(cx_a, |p, cx| {
1382 p.find_or_create_local_worktree("/a", true, cx)
1383 })
1384 .await
1385 .unwrap();
1386 worktree_a
1387 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1388 .await;
1389 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1390 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1391 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1392 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1393
1394 // Join that project as client B
1395 let project_b = Project::remote(
1396 project_id,
1397 client_b.clone(),
1398 client_b.user_store.clone(),
1399 lang_registry.clone(),
1400 fs.clone(),
1401 &mut cx_b.to_async(),
1402 )
1403 .await
1404 .unwrap();
1405 project_b
1406 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1407 .await
1408 .unwrap();
1409
1410 // Unshare the project as client A
1411 project_a.update(cx_a, |project, cx| project.unshare(cx));
1412 project_b
1413 .condition(cx_b, |project, _| project.is_read_only())
1414 .await;
1415 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1416 cx_b.update(|_| {
1417 drop(project_b);
1418 });
1419
1420 // Share the project again and ensure guests can still join.
1421 project_a
1422 .update(cx_a, |project, cx| project.share(cx))
1423 .await
1424 .unwrap();
1425 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1426
1427 let project_b2 = Project::remote(
1428 project_id,
1429 client_b.clone(),
1430 client_b.user_store.clone(),
1431 lang_registry.clone(),
1432 fs.clone(),
1433 &mut cx_b.to_async(),
1434 )
1435 .await
1436 .unwrap();
1437 project_b2
1438 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1439 .await
1440 .unwrap();
1441 }
1442
1443 #[gpui::test(iterations = 10)]
1444 async fn test_host_disconnect(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1445 let lang_registry = Arc::new(LanguageRegistry::test());
1446 let fs = FakeFs::new(cx_a.background());
1447 cx_a.foreground().forbid_parking();
1448
1449 // Connect to a server as 2 clients.
1450 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1451 let client_a = server.create_client(cx_a, "user_a").await;
1452 let client_b = server.create_client(cx_b, "user_b").await;
1453
1454 // Share a project as client A
1455 fs.insert_tree(
1456 "/a",
1457 json!({
1458 ".zed.toml": r#"collaborators = ["user_b"]"#,
1459 "a.txt": "a-contents",
1460 "b.txt": "b-contents",
1461 }),
1462 )
1463 .await;
1464 let project_a = cx_a.update(|cx| {
1465 Project::local(
1466 client_a.clone(),
1467 client_a.user_store.clone(),
1468 lang_registry.clone(),
1469 fs.clone(),
1470 cx,
1471 )
1472 });
1473 let (worktree_a, _) = project_a
1474 .update(cx_a, |p, cx| {
1475 p.find_or_create_local_worktree("/a", true, cx)
1476 })
1477 .await
1478 .unwrap();
1479 worktree_a
1480 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1481 .await;
1482 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1483 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1484 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1485 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1486
1487 // Join that project as client B
1488 let project_b = Project::remote(
1489 project_id,
1490 client_b.clone(),
1491 client_b.user_store.clone(),
1492 lang_registry.clone(),
1493 fs.clone(),
1494 &mut cx_b.to_async(),
1495 )
1496 .await
1497 .unwrap();
1498 project_b
1499 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1500 .await
1501 .unwrap();
1502
1503 // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
1504 server.disconnect_client(client_a.current_user_id(cx_a));
1505 cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
1506 project_a
1507 .condition(cx_a, |project, _| project.collaborators().is_empty())
1508 .await;
1509 project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
1510 project_b
1511 .condition(cx_b, |project, _| project.is_read_only())
1512 .await;
1513 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1514 cx_b.update(|_| {
1515 drop(project_b);
1516 });
1517
1518 // Await reconnection
1519 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1520
1521 // Share the project again and ensure guests can still join.
1522 project_a
1523 .update(cx_a, |project, cx| project.share(cx))
1524 .await
1525 .unwrap();
1526 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1527
1528 let project_b2 = Project::remote(
1529 project_id,
1530 client_b.clone(),
1531 client_b.user_store.clone(),
1532 lang_registry.clone(),
1533 fs.clone(),
1534 &mut cx_b.to_async(),
1535 )
1536 .await
1537 .unwrap();
1538 project_b2
1539 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1540 .await
1541 .unwrap();
1542 }
1543
1544 #[gpui::test(iterations = 10)]
1545 async fn test_propagate_saves_and_fs_changes(
1546 cx_a: &mut TestAppContext,
1547 cx_b: &mut TestAppContext,
1548 cx_c: &mut TestAppContext,
1549 ) {
1550 let lang_registry = Arc::new(LanguageRegistry::test());
1551 let fs = FakeFs::new(cx_a.background());
1552 cx_a.foreground().forbid_parking();
1553
1554 // Connect to a server as 3 clients.
1555 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1556 let client_a = server.create_client(cx_a, "user_a").await;
1557 let client_b = server.create_client(cx_b, "user_b").await;
1558 let client_c = server.create_client(cx_c, "user_c").await;
1559
1560 // Share a worktree as client A.
1561 fs.insert_tree(
1562 "/a",
1563 json!({
1564 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1565 "file1": "",
1566 "file2": ""
1567 }),
1568 )
1569 .await;
1570 let project_a = cx_a.update(|cx| {
1571 Project::local(
1572 client_a.clone(),
1573 client_a.user_store.clone(),
1574 lang_registry.clone(),
1575 fs.clone(),
1576 cx,
1577 )
1578 });
1579 let (worktree_a, _) = project_a
1580 .update(cx_a, |p, cx| {
1581 p.find_or_create_local_worktree("/a", true, cx)
1582 })
1583 .await
1584 .unwrap();
1585 worktree_a
1586 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1587 .await;
1588 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1589 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1590 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1591
1592 // Join that worktree as clients B and C.
1593 let project_b = Project::remote(
1594 project_id,
1595 client_b.clone(),
1596 client_b.user_store.clone(),
1597 lang_registry.clone(),
1598 fs.clone(),
1599 &mut cx_b.to_async(),
1600 )
1601 .await
1602 .unwrap();
1603 let project_c = Project::remote(
1604 project_id,
1605 client_c.clone(),
1606 client_c.user_store.clone(),
1607 lang_registry.clone(),
1608 fs.clone(),
1609 &mut cx_c.to_async(),
1610 )
1611 .await
1612 .unwrap();
1613 let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1614 let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1615
1616 // Open and edit a buffer as both guests B and C.
1617 let buffer_b = project_b
1618 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1619 .await
1620 .unwrap();
1621 let buffer_c = project_c
1622 .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1623 .await
1624 .unwrap();
1625 buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1626 buffer_c.update(cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1627
1628 // Open and edit that buffer as the host.
1629 let buffer_a = project_a
1630 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1631 .await
1632 .unwrap();
1633
1634 buffer_a
1635 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1636 .await;
1637 buffer_a.update(cx_a, |buf, cx| {
1638 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1639 });
1640
1641 // Wait for edits to propagate
1642 buffer_a
1643 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1644 .await;
1645 buffer_b
1646 .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1647 .await;
1648 buffer_c
1649 .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1650 .await;
1651
1652 // Edit the buffer as the host and concurrently save as guest B.
1653 let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1654 buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1655 save_b.await.unwrap();
1656 assert_eq!(
1657 fs.load("/a/file1".as_ref()).await.unwrap(),
1658 "hi-a, i-am-c, i-am-b, i-am-a"
1659 );
1660 buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1661 buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1662 buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1663
1664 worktree_a.flush_fs_events(cx_a).await;
1665
1666 // Make changes on host's file system, see those changes on guest worktrees.
1667 fs.rename(
1668 "/a/file1".as_ref(),
1669 "/a/file1-renamed".as_ref(),
1670 Default::default(),
1671 )
1672 .await
1673 .unwrap();
1674
1675 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1676 .await
1677 .unwrap();
1678 fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1679
1680 worktree_a
1681 .condition(&cx_a, |tree, _| {
1682 tree.paths()
1683 .map(|p| p.to_string_lossy())
1684 .collect::<Vec<_>>()
1685 == [".zed.toml", "file1-renamed", "file3", "file4"]
1686 })
1687 .await;
1688 worktree_b
1689 .condition(&cx_b, |tree, _| {
1690 tree.paths()
1691 .map(|p| p.to_string_lossy())
1692 .collect::<Vec<_>>()
1693 == [".zed.toml", "file1-renamed", "file3", "file4"]
1694 })
1695 .await;
1696 worktree_c
1697 .condition(&cx_c, |tree, _| {
1698 tree.paths()
1699 .map(|p| p.to_string_lossy())
1700 .collect::<Vec<_>>()
1701 == [".zed.toml", "file1-renamed", "file3", "file4"]
1702 })
1703 .await;
1704
1705 // Ensure buffer files are updated as well.
1706 buffer_a
1707 .condition(&cx_a, |buf, _| {
1708 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1709 })
1710 .await;
1711 buffer_b
1712 .condition(&cx_b, |buf, _| {
1713 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1714 })
1715 .await;
1716 buffer_c
1717 .condition(&cx_c, |buf, _| {
1718 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1719 })
1720 .await;
1721 }
1722
1723 #[gpui::test(iterations = 10)]
1724 async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1725 cx_a.foreground().forbid_parking();
1726 let lang_registry = Arc::new(LanguageRegistry::test());
1727 let fs = FakeFs::new(cx_a.background());
1728
1729 // Connect to a server as 2 clients.
1730 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1731 let client_a = server.create_client(cx_a, "user_a").await;
1732 let client_b = server.create_client(cx_b, "user_b").await;
1733
1734 // Share a project as client A
1735 fs.insert_tree(
1736 "/dir",
1737 json!({
1738 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1739 "a.txt": "a-contents",
1740 }),
1741 )
1742 .await;
1743
1744 let project_a = cx_a.update(|cx| {
1745 Project::local(
1746 client_a.clone(),
1747 client_a.user_store.clone(),
1748 lang_registry.clone(),
1749 fs.clone(),
1750 cx,
1751 )
1752 });
1753 let (worktree_a, _) = project_a
1754 .update(cx_a, |p, cx| {
1755 p.find_or_create_local_worktree("/dir", true, cx)
1756 })
1757 .await
1758 .unwrap();
1759 worktree_a
1760 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1761 .await;
1762 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1763 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1764 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1765
1766 // Join that project as client B
1767 let project_b = Project::remote(
1768 project_id,
1769 client_b.clone(),
1770 client_b.user_store.clone(),
1771 lang_registry.clone(),
1772 fs.clone(),
1773 &mut cx_b.to_async(),
1774 )
1775 .await
1776 .unwrap();
1777
1778 // Open a buffer as client B
1779 let buffer_b = project_b
1780 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1781 .await
1782 .unwrap();
1783
1784 buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1785 buffer_b.read_with(cx_b, |buf, _| {
1786 assert!(buf.is_dirty());
1787 assert!(!buf.has_conflict());
1788 });
1789
1790 buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
1791 buffer_b
1792 .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1793 .await;
1794 buffer_b.read_with(cx_b, |buf, _| {
1795 assert!(!buf.has_conflict());
1796 });
1797
1798 buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1799 buffer_b.read_with(cx_b, |buf, _| {
1800 assert!(buf.is_dirty());
1801 assert!(!buf.has_conflict());
1802 });
1803 }
1804
1805 #[gpui::test(iterations = 10)]
1806 async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1807 cx_a.foreground().forbid_parking();
1808 let lang_registry = Arc::new(LanguageRegistry::test());
1809 let fs = FakeFs::new(cx_a.background());
1810
1811 // Connect to a server as 2 clients.
1812 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1813 let client_a = server.create_client(cx_a, "user_a").await;
1814 let client_b = server.create_client(cx_b, "user_b").await;
1815
1816 // Share a project as client A
1817 fs.insert_tree(
1818 "/dir",
1819 json!({
1820 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1821 "a.txt": "a-contents",
1822 }),
1823 )
1824 .await;
1825
1826 let project_a = cx_a.update(|cx| {
1827 Project::local(
1828 client_a.clone(),
1829 client_a.user_store.clone(),
1830 lang_registry.clone(),
1831 fs.clone(),
1832 cx,
1833 )
1834 });
1835 let (worktree_a, _) = project_a
1836 .update(cx_a, |p, cx| {
1837 p.find_or_create_local_worktree("/dir", true, cx)
1838 })
1839 .await
1840 .unwrap();
1841 worktree_a
1842 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1843 .await;
1844 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1845 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1846 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1847
1848 // Join that project as client B
1849 let project_b = Project::remote(
1850 project_id,
1851 client_b.clone(),
1852 client_b.user_store.clone(),
1853 lang_registry.clone(),
1854 fs.clone(),
1855 &mut cx_b.to_async(),
1856 )
1857 .await
1858 .unwrap();
1859 let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1860
1861 // Open a buffer as client B
1862 let buffer_b = project_b
1863 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1864 .await
1865 .unwrap();
1866 buffer_b.read_with(cx_b, |buf, _| {
1867 assert!(!buf.is_dirty());
1868 assert!(!buf.has_conflict());
1869 });
1870
1871 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1872 .await
1873 .unwrap();
1874 buffer_b
1875 .condition(&cx_b, |buf, _| {
1876 buf.text() == "new contents" && !buf.is_dirty()
1877 })
1878 .await;
1879 buffer_b.read_with(cx_b, |buf, _| {
1880 assert!(!buf.has_conflict());
1881 });
1882 }
1883
1884 #[gpui::test(iterations = 10)]
1885 async fn test_editing_while_guest_opens_buffer(
1886 cx_a: &mut TestAppContext,
1887 cx_b: &mut TestAppContext,
1888 ) {
1889 cx_a.foreground().forbid_parking();
1890 let lang_registry = Arc::new(LanguageRegistry::test());
1891 let fs = FakeFs::new(cx_a.background());
1892
1893 // Connect to a server as 2 clients.
1894 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1895 let client_a = server.create_client(cx_a, "user_a").await;
1896 let client_b = server.create_client(cx_b, "user_b").await;
1897
1898 // Share a project as client A
1899 fs.insert_tree(
1900 "/dir",
1901 json!({
1902 ".zed.toml": r#"collaborators = ["user_b"]"#,
1903 "a.txt": "a-contents",
1904 }),
1905 )
1906 .await;
1907 let project_a = cx_a.update(|cx| {
1908 Project::local(
1909 client_a.clone(),
1910 client_a.user_store.clone(),
1911 lang_registry.clone(),
1912 fs.clone(),
1913 cx,
1914 )
1915 });
1916 let (worktree_a, _) = project_a
1917 .update(cx_a, |p, cx| {
1918 p.find_or_create_local_worktree("/dir", true, cx)
1919 })
1920 .await
1921 .unwrap();
1922 worktree_a
1923 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1924 .await;
1925 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1926 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1927 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1928
1929 // Join that project as client B
1930 let project_b = Project::remote(
1931 project_id,
1932 client_b.clone(),
1933 client_b.user_store.clone(),
1934 lang_registry.clone(),
1935 fs.clone(),
1936 &mut cx_b.to_async(),
1937 )
1938 .await
1939 .unwrap();
1940
1941 // Open a buffer as client A
1942 let buffer_a = project_a
1943 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1944 .await
1945 .unwrap();
1946
1947 // Start opening the same buffer as client B
1948 let buffer_b = cx_b
1949 .background()
1950 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1951
1952 // Edit the buffer as client A while client B is still opening it.
1953 cx_b.background().simulate_random_delay().await;
1954 buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1955 cx_b.background().simulate_random_delay().await;
1956 buffer_a.update(cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1957
1958 let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
1959 let buffer_b = buffer_b.await.unwrap();
1960 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1961 }
1962
1963 #[gpui::test(iterations = 10)]
1964 async fn test_leaving_worktree_while_opening_buffer(
1965 cx_a: &mut TestAppContext,
1966 cx_b: &mut TestAppContext,
1967 ) {
1968 cx_a.foreground().forbid_parking();
1969 let lang_registry = Arc::new(LanguageRegistry::test());
1970 let fs = FakeFs::new(cx_a.background());
1971
1972 // Connect to a server as 2 clients.
1973 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1974 let client_a = server.create_client(cx_a, "user_a").await;
1975 let client_b = server.create_client(cx_b, "user_b").await;
1976
1977 // Share a project as client A
1978 fs.insert_tree(
1979 "/dir",
1980 json!({
1981 ".zed.toml": r#"collaborators = ["user_b"]"#,
1982 "a.txt": "a-contents",
1983 }),
1984 )
1985 .await;
1986 let project_a = cx_a.update(|cx| {
1987 Project::local(
1988 client_a.clone(),
1989 client_a.user_store.clone(),
1990 lang_registry.clone(),
1991 fs.clone(),
1992 cx,
1993 )
1994 });
1995 let (worktree_a, _) = project_a
1996 .update(cx_a, |p, cx| {
1997 p.find_or_create_local_worktree("/dir", true, cx)
1998 })
1999 .await
2000 .unwrap();
2001 worktree_a
2002 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2003 .await;
2004 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2005 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2006 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2007
2008 // Join that project as client B
2009 let project_b = Project::remote(
2010 project_id,
2011 client_b.clone(),
2012 client_b.user_store.clone(),
2013 lang_registry.clone(),
2014 fs.clone(),
2015 &mut cx_b.to_async(),
2016 )
2017 .await
2018 .unwrap();
2019
2020 // See that a guest has joined as client A.
2021 project_a
2022 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2023 .await;
2024
2025 // Begin opening a buffer as client B, but leave the project before the open completes.
2026 let buffer_b = cx_b
2027 .background()
2028 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2029 cx_b.update(|_| drop(project_b));
2030 drop(buffer_b);
2031
2032 // See that the guest has left.
2033 project_a
2034 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2035 .await;
2036 }
2037
2038 #[gpui::test(iterations = 10)]
2039 async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2040 cx_a.foreground().forbid_parking();
2041 let lang_registry = Arc::new(LanguageRegistry::test());
2042 let fs = FakeFs::new(cx_a.background());
2043
2044 // Connect to a server as 2 clients.
2045 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2046 let client_a = server.create_client(cx_a, "user_a").await;
2047 let client_b = server.create_client(cx_b, "user_b").await;
2048
2049 // Share a project as client A
2050 fs.insert_tree(
2051 "/a",
2052 json!({
2053 ".zed.toml": r#"collaborators = ["user_b"]"#,
2054 "a.txt": "a-contents",
2055 "b.txt": "b-contents",
2056 }),
2057 )
2058 .await;
2059 let project_a = cx_a.update(|cx| {
2060 Project::local(
2061 client_a.clone(),
2062 client_a.user_store.clone(),
2063 lang_registry.clone(),
2064 fs.clone(),
2065 cx,
2066 )
2067 });
2068 let (worktree_a, _) = project_a
2069 .update(cx_a, |p, cx| {
2070 p.find_or_create_local_worktree("/a", true, cx)
2071 })
2072 .await
2073 .unwrap();
2074 worktree_a
2075 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2076 .await;
2077 let project_id = project_a
2078 .update(cx_a, |project, _| project.next_remote_id())
2079 .await;
2080 project_a
2081 .update(cx_a, |project, cx| project.share(cx))
2082 .await
2083 .unwrap();
2084
2085 // Join that project as client B
2086 let _project_b = Project::remote(
2087 project_id,
2088 client_b.clone(),
2089 client_b.user_store.clone(),
2090 lang_registry.clone(),
2091 fs.clone(),
2092 &mut cx_b.to_async(),
2093 )
2094 .await
2095 .unwrap();
2096
2097 // Client A sees that a guest has joined.
2098 project_a
2099 .condition(cx_a, |p, _| p.collaborators().len() == 1)
2100 .await;
2101
2102 // Drop client B's connection and ensure client A observes client B leaving the project.
2103 client_b.disconnect(&cx_b.to_async()).unwrap();
2104 project_a
2105 .condition(cx_a, |p, _| p.collaborators().len() == 0)
2106 .await;
2107
2108 // Rejoin the project as client B
2109 let _project_b = Project::remote(
2110 project_id,
2111 client_b.clone(),
2112 client_b.user_store.clone(),
2113 lang_registry.clone(),
2114 fs.clone(),
2115 &mut cx_b.to_async(),
2116 )
2117 .await
2118 .unwrap();
2119
2120 // Client A sees that a guest has re-joined.
2121 project_a
2122 .condition(cx_a, |p, _| p.collaborators().len() == 1)
2123 .await;
2124
2125 // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2126 client_b.wait_for_current_user(cx_b).await;
2127 server.disconnect_client(client_b.current_user_id(cx_b));
2128 cx_a.foreground().advance_clock(Duration::from_secs(3));
2129 project_a
2130 .condition(cx_a, |p, _| p.collaborators().len() == 0)
2131 .await;
2132 }
2133
2134 #[gpui::test(iterations = 10)]
2135 async fn test_collaborating_with_diagnostics(
2136 cx_a: &mut TestAppContext,
2137 cx_b: &mut TestAppContext,
2138 ) {
2139 cx_a.foreground().forbid_parking();
2140 let lang_registry = Arc::new(LanguageRegistry::test());
2141 let fs = FakeFs::new(cx_a.background());
2142
2143 // Set up a fake language server.
2144 let mut language = Language::new(
2145 LanguageConfig {
2146 name: "Rust".into(),
2147 path_suffixes: vec!["rs".to_string()],
2148 ..Default::default()
2149 },
2150 Some(tree_sitter_rust::language()),
2151 );
2152 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2153 lang_registry.add(Arc::new(language));
2154
2155 // Connect to a server as 2 clients.
2156 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2157 let client_a = server.create_client(cx_a, "user_a").await;
2158 let client_b = server.create_client(cx_b, "user_b").await;
2159
2160 // Share a project as client A
2161 fs.insert_tree(
2162 "/a",
2163 json!({
2164 ".zed.toml": r#"collaborators = ["user_b"]"#,
2165 "a.rs": "let one = two",
2166 "other.rs": "",
2167 }),
2168 )
2169 .await;
2170 let project_a = cx_a.update(|cx| {
2171 Project::local(
2172 client_a.clone(),
2173 client_a.user_store.clone(),
2174 lang_registry.clone(),
2175 fs.clone(),
2176 cx,
2177 )
2178 });
2179 let (worktree_a, _) = project_a
2180 .update(cx_a, |p, cx| {
2181 p.find_or_create_local_worktree("/a", true, cx)
2182 })
2183 .await
2184 .unwrap();
2185 worktree_a
2186 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2187 .await;
2188 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2189 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2190 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2191
2192 // Cause the language server to start.
2193 let _ = cx_a
2194 .background()
2195 .spawn(project_a.update(cx_a, |project, cx| {
2196 project.open_buffer(
2197 ProjectPath {
2198 worktree_id,
2199 path: Path::new("other.rs").into(),
2200 },
2201 cx,
2202 )
2203 }))
2204 .await
2205 .unwrap();
2206
2207 // Simulate a language server reporting errors for a file.
2208 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2209 fake_language_server
2210 .receive_notification::<lsp::notification::DidOpenTextDocument>()
2211 .await;
2212 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2213 lsp::PublishDiagnosticsParams {
2214 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2215 version: None,
2216 diagnostics: vec![lsp::Diagnostic {
2217 severity: Some(lsp::DiagnosticSeverity::ERROR),
2218 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2219 message: "message 1".to_string(),
2220 ..Default::default()
2221 }],
2222 },
2223 );
2224
2225 // Wait for server to see the diagnostics update.
2226 server
2227 .condition(|store| {
2228 let worktree = store
2229 .project(project_id)
2230 .unwrap()
2231 .share
2232 .as_ref()
2233 .unwrap()
2234 .worktrees
2235 .get(&worktree_id.to_proto())
2236 .unwrap();
2237
2238 !worktree.diagnostic_summaries.is_empty()
2239 })
2240 .await;
2241
2242 // Join the worktree as client B.
2243 let project_b = Project::remote(
2244 project_id,
2245 client_b.clone(),
2246 client_b.user_store.clone(),
2247 lang_registry.clone(),
2248 fs.clone(),
2249 &mut cx_b.to_async(),
2250 )
2251 .await
2252 .unwrap();
2253
2254 project_b.read_with(cx_b, |project, cx| {
2255 assert_eq!(
2256 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2257 &[(
2258 ProjectPath {
2259 worktree_id,
2260 path: Arc::from(Path::new("a.rs")),
2261 },
2262 DiagnosticSummary {
2263 error_count: 1,
2264 warning_count: 0,
2265 ..Default::default()
2266 },
2267 )]
2268 )
2269 });
2270
2271 // Simulate a language server reporting more errors for a file.
2272 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2273 lsp::PublishDiagnosticsParams {
2274 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2275 version: None,
2276 diagnostics: vec![
2277 lsp::Diagnostic {
2278 severity: Some(lsp::DiagnosticSeverity::ERROR),
2279 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2280 message: "message 1".to_string(),
2281 ..Default::default()
2282 },
2283 lsp::Diagnostic {
2284 severity: Some(lsp::DiagnosticSeverity::WARNING),
2285 range: lsp::Range::new(
2286 lsp::Position::new(0, 10),
2287 lsp::Position::new(0, 13),
2288 ),
2289 message: "message 2".to_string(),
2290 ..Default::default()
2291 },
2292 ],
2293 },
2294 );
2295
2296 // Client b gets the updated summaries
2297 project_b
2298 .condition(&cx_b, |project, cx| {
2299 project.diagnostic_summaries(cx).collect::<Vec<_>>()
2300 == &[(
2301 ProjectPath {
2302 worktree_id,
2303 path: Arc::from(Path::new("a.rs")),
2304 },
2305 DiagnosticSummary {
2306 error_count: 1,
2307 warning_count: 1,
2308 ..Default::default()
2309 },
2310 )]
2311 })
2312 .await;
2313
2314 // Open the file with the errors on client B. They should be present.
2315 let buffer_b = cx_b
2316 .background()
2317 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2318 .await
2319 .unwrap();
2320
2321 buffer_b.read_with(cx_b, |buffer, _| {
2322 assert_eq!(
2323 buffer
2324 .snapshot()
2325 .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2326 .map(|entry| entry)
2327 .collect::<Vec<_>>(),
2328 &[
2329 DiagnosticEntry {
2330 range: Point::new(0, 4)..Point::new(0, 7),
2331 diagnostic: Diagnostic {
2332 group_id: 0,
2333 message: "message 1".to_string(),
2334 severity: lsp::DiagnosticSeverity::ERROR,
2335 is_primary: true,
2336 ..Default::default()
2337 }
2338 },
2339 DiagnosticEntry {
2340 range: Point::new(0, 10)..Point::new(0, 13),
2341 diagnostic: Diagnostic {
2342 group_id: 1,
2343 severity: lsp::DiagnosticSeverity::WARNING,
2344 message: "message 2".to_string(),
2345 is_primary: true,
2346 ..Default::default()
2347 }
2348 }
2349 ]
2350 );
2351 });
2352 }
2353
2354 #[gpui::test(iterations = 10)]
2355 async fn test_collaborating_with_completion(
2356 cx_a: &mut TestAppContext,
2357 cx_b: &mut TestAppContext,
2358 ) {
2359 cx_a.foreground().forbid_parking();
2360 let lang_registry = Arc::new(LanguageRegistry::test());
2361 let fs = FakeFs::new(cx_a.background());
2362
2363 // Set up a fake language server.
2364 let mut language = Language::new(
2365 LanguageConfig {
2366 name: "Rust".into(),
2367 path_suffixes: vec!["rs".to_string()],
2368 ..Default::default()
2369 },
2370 Some(tree_sitter_rust::language()),
2371 );
2372 let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
2373 capabilities: lsp::ServerCapabilities {
2374 completion_provider: Some(lsp::CompletionOptions {
2375 trigger_characters: Some(vec![".".to_string()]),
2376 ..Default::default()
2377 }),
2378 ..Default::default()
2379 },
2380 ..Default::default()
2381 });
2382 lang_registry.add(Arc::new(language));
2383
2384 // Connect to a server as 2 clients.
2385 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2386 let client_a = server.create_client(cx_a, "user_a").await;
2387 let client_b = server.create_client(cx_b, "user_b").await;
2388
2389 // Share a project as client A
2390 fs.insert_tree(
2391 "/a",
2392 json!({
2393 ".zed.toml": r#"collaborators = ["user_b"]"#,
2394 "main.rs": "fn main() { a }",
2395 "other.rs": "",
2396 }),
2397 )
2398 .await;
2399 let project_a = cx_a.update(|cx| {
2400 Project::local(
2401 client_a.clone(),
2402 client_a.user_store.clone(),
2403 lang_registry.clone(),
2404 fs.clone(),
2405 cx,
2406 )
2407 });
2408 let (worktree_a, _) = project_a
2409 .update(cx_a, |p, cx| {
2410 p.find_or_create_local_worktree("/a", true, cx)
2411 })
2412 .await
2413 .unwrap();
2414 worktree_a
2415 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2416 .await;
2417 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2418 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2419 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2420
2421 // Join the worktree as client B.
2422 let project_b = Project::remote(
2423 project_id,
2424 client_b.clone(),
2425 client_b.user_store.clone(),
2426 lang_registry.clone(),
2427 fs.clone(),
2428 &mut cx_b.to_async(),
2429 )
2430 .await
2431 .unwrap();
2432
2433 // Open a file in an editor as the guest.
2434 let buffer_b = project_b
2435 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2436 .await
2437 .unwrap();
2438 let (window_b, _) = cx_b.add_window(|_| EmptyView);
2439 let editor_b = cx_b.add_view(window_b, |cx| {
2440 Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
2441 });
2442
2443 let fake_language_server = fake_language_servers.next().await.unwrap();
2444 buffer_b
2445 .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2446 .await;
2447
2448 // Type a completion trigger character as the guest.
2449 editor_b.update(cx_b, |editor, cx| {
2450 editor.select_ranges([13..13], None, cx);
2451 editor.handle_input(&Input(".".into()), cx);
2452 cx.focus(&editor_b);
2453 });
2454
2455 // Receive a completion request as the host's language server.
2456 // Return some completions from the host's language server.
2457 cx_a.foreground().start_waiting();
2458 fake_language_server
2459 .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
2460 assert_eq!(
2461 params.text_document_position.text_document.uri,
2462 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2463 );
2464 assert_eq!(
2465 params.text_document_position.position,
2466 lsp::Position::new(0, 14),
2467 );
2468
2469 Ok(Some(lsp::CompletionResponse::Array(vec![
2470 lsp::CompletionItem {
2471 label: "first_method(…)".into(),
2472 detail: Some("fn(&mut self, B) -> C".into()),
2473 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2474 new_text: "first_method($1)".to_string(),
2475 range: lsp::Range::new(
2476 lsp::Position::new(0, 14),
2477 lsp::Position::new(0, 14),
2478 ),
2479 })),
2480 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2481 ..Default::default()
2482 },
2483 lsp::CompletionItem {
2484 label: "second_method(…)".into(),
2485 detail: Some("fn(&mut self, C) -> D<E>".into()),
2486 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2487 new_text: "second_method()".to_string(),
2488 range: lsp::Range::new(
2489 lsp::Position::new(0, 14),
2490 lsp::Position::new(0, 14),
2491 ),
2492 })),
2493 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2494 ..Default::default()
2495 },
2496 ])))
2497 })
2498 .next()
2499 .await
2500 .unwrap();
2501 cx_a.foreground().finish_waiting();
2502
2503 // Open the buffer on the host.
2504 let buffer_a = project_a
2505 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2506 .await
2507 .unwrap();
2508 buffer_a
2509 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2510 .await;
2511
2512 // Confirm a completion on the guest.
2513 editor_b
2514 .condition(&cx_b, |editor, _| editor.context_menu_visible())
2515 .await;
2516 editor_b.update(cx_b, |editor, cx| {
2517 editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2518 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2519 });
2520
2521 // Return a resolved completion from the host's language server.
2522 // The resolved completion has an additional text edit.
2523 fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
2524 |params, _| async move {
2525 assert_eq!(params.label, "first_method(…)");
2526 Ok(lsp::CompletionItem {
2527 label: "first_method(…)".into(),
2528 detail: Some("fn(&mut self, B) -> C".into()),
2529 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2530 new_text: "first_method($1)".to_string(),
2531 range: lsp::Range::new(
2532 lsp::Position::new(0, 14),
2533 lsp::Position::new(0, 14),
2534 ),
2535 })),
2536 additional_text_edits: Some(vec![lsp::TextEdit {
2537 new_text: "use d::SomeTrait;\n".to_string(),
2538 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2539 }]),
2540 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2541 ..Default::default()
2542 })
2543 },
2544 );
2545
2546 // The additional edit is applied.
2547 buffer_a
2548 .condition(&cx_a, |buffer, _| {
2549 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2550 })
2551 .await;
2552 buffer_b
2553 .condition(&cx_b, |buffer, _| {
2554 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2555 })
2556 .await;
2557 }
2558
2559 #[gpui::test(iterations = 10)]
2560 async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2561 cx_a.foreground().forbid_parking();
2562 let lang_registry = Arc::new(LanguageRegistry::test());
2563 let fs = FakeFs::new(cx_a.background());
2564
2565 // Connect to a server as 2 clients.
2566 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2567 let client_a = server.create_client(cx_a, "user_a").await;
2568 let client_b = server.create_client(cx_b, "user_b").await;
2569
2570 // Share a project as client A
2571 fs.insert_tree(
2572 "/a",
2573 json!({
2574 ".zed.toml": r#"collaborators = ["user_b"]"#,
2575 "a.rs": "let one = 1;",
2576 }),
2577 )
2578 .await;
2579 let project_a = cx_a.update(|cx| {
2580 Project::local(
2581 client_a.clone(),
2582 client_a.user_store.clone(),
2583 lang_registry.clone(),
2584 fs.clone(),
2585 cx,
2586 )
2587 });
2588 let (worktree_a, _) = project_a
2589 .update(cx_a, |p, cx| {
2590 p.find_or_create_local_worktree("/a", true, cx)
2591 })
2592 .await
2593 .unwrap();
2594 worktree_a
2595 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2596 .await;
2597 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2598 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2599 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2600 let buffer_a = project_a
2601 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
2602 .await
2603 .unwrap();
2604
2605 // Join the worktree as client B.
2606 let project_b = Project::remote(
2607 project_id,
2608 client_b.clone(),
2609 client_b.user_store.clone(),
2610 lang_registry.clone(),
2611 fs.clone(),
2612 &mut cx_b.to_async(),
2613 )
2614 .await
2615 .unwrap();
2616
2617 let buffer_b = cx_b
2618 .background()
2619 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2620 .await
2621 .unwrap();
2622 buffer_b.update(cx_b, |buffer, cx| {
2623 buffer.edit([4..7], "six", cx);
2624 buffer.edit([10..11], "6", cx);
2625 assert_eq!(buffer.text(), "let six = 6;");
2626 assert!(buffer.is_dirty());
2627 assert!(!buffer.has_conflict());
2628 });
2629 buffer_a
2630 .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
2631 .await;
2632
2633 fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
2634 .await
2635 .unwrap();
2636 buffer_a
2637 .condition(cx_a, |buffer, _| buffer.has_conflict())
2638 .await;
2639 buffer_b
2640 .condition(cx_b, |buffer, _| buffer.has_conflict())
2641 .await;
2642
2643 project_b
2644 .update(cx_b, |project, cx| {
2645 project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
2646 })
2647 .await
2648 .unwrap();
2649 buffer_a.read_with(cx_a, |buffer, _| {
2650 assert_eq!(buffer.text(), "let seven = 7;");
2651 assert!(!buffer.is_dirty());
2652 assert!(!buffer.has_conflict());
2653 });
2654 buffer_b.read_with(cx_b, |buffer, _| {
2655 assert_eq!(buffer.text(), "let seven = 7;");
2656 assert!(!buffer.is_dirty());
2657 assert!(!buffer.has_conflict());
2658 });
2659
2660 buffer_a.update(cx_a, |buffer, cx| {
2661 // Undoing on the host is a no-op when the reload was initiated by the guest.
2662 buffer.undo(cx);
2663 assert_eq!(buffer.text(), "let seven = 7;");
2664 assert!(!buffer.is_dirty());
2665 assert!(!buffer.has_conflict());
2666 });
2667 buffer_b.update(cx_b, |buffer, cx| {
2668 // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
2669 buffer.undo(cx);
2670 assert_eq!(buffer.text(), "let six = 6;");
2671 assert!(buffer.is_dirty());
2672 assert!(!buffer.has_conflict());
2673 });
2674 }
2675
2676 #[gpui::test(iterations = 10)]
2677 async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2678 cx_a.foreground().forbid_parking();
2679 let lang_registry = Arc::new(LanguageRegistry::test());
2680 let fs = FakeFs::new(cx_a.background());
2681
2682 // Set up a fake language server.
2683 let mut language = Language::new(
2684 LanguageConfig {
2685 name: "Rust".into(),
2686 path_suffixes: vec!["rs".to_string()],
2687 ..Default::default()
2688 },
2689 Some(tree_sitter_rust::language()),
2690 );
2691 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2692 lang_registry.add(Arc::new(language));
2693
2694 // Connect to a server as 2 clients.
2695 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2696 let client_a = server.create_client(cx_a, "user_a").await;
2697 let client_b = server.create_client(cx_b, "user_b").await;
2698
2699 // Share a project as client A
2700 fs.insert_tree(
2701 "/a",
2702 json!({
2703 ".zed.toml": r#"collaborators = ["user_b"]"#,
2704 "a.rs": "let one = two",
2705 }),
2706 )
2707 .await;
2708 let project_a = cx_a.update(|cx| {
2709 Project::local(
2710 client_a.clone(),
2711 client_a.user_store.clone(),
2712 lang_registry.clone(),
2713 fs.clone(),
2714 cx,
2715 )
2716 });
2717 let (worktree_a, _) = project_a
2718 .update(cx_a, |p, cx| {
2719 p.find_or_create_local_worktree("/a", true, cx)
2720 })
2721 .await
2722 .unwrap();
2723 worktree_a
2724 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2725 .await;
2726 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2727 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2728 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2729
2730 // Join the worktree as client B.
2731 let project_b = Project::remote(
2732 project_id,
2733 client_b.clone(),
2734 client_b.user_store.clone(),
2735 lang_registry.clone(),
2736 fs.clone(),
2737 &mut cx_b.to_async(),
2738 )
2739 .await
2740 .unwrap();
2741
2742 let buffer_b = cx_b
2743 .background()
2744 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2745 .await
2746 .unwrap();
2747
2748 let fake_language_server = fake_language_servers.next().await.unwrap();
2749 fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
2750 Ok(Some(vec![
2751 lsp::TextEdit {
2752 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2753 new_text: "h".to_string(),
2754 },
2755 lsp::TextEdit {
2756 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2757 new_text: "y".to_string(),
2758 },
2759 ]))
2760 });
2761
2762 project_b
2763 .update(cx_b, |project, cx| {
2764 project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2765 })
2766 .await
2767 .unwrap();
2768 assert_eq!(
2769 buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
2770 "let honey = two"
2771 );
2772 }
2773
2774 #[gpui::test(iterations = 10)]
2775 async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2776 cx_a.foreground().forbid_parking();
2777 let lang_registry = Arc::new(LanguageRegistry::test());
2778 let fs = FakeFs::new(cx_a.background());
2779 fs.insert_tree(
2780 "/root-1",
2781 json!({
2782 ".zed.toml": r#"collaborators = ["user_b"]"#,
2783 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2784 }),
2785 )
2786 .await;
2787 fs.insert_tree(
2788 "/root-2",
2789 json!({
2790 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2791 }),
2792 )
2793 .await;
2794
2795 // Set up a fake language server.
2796 let mut language = Language::new(
2797 LanguageConfig {
2798 name: "Rust".into(),
2799 path_suffixes: vec!["rs".to_string()],
2800 ..Default::default()
2801 },
2802 Some(tree_sitter_rust::language()),
2803 );
2804 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2805 lang_registry.add(Arc::new(language));
2806
2807 // Connect to a server as 2 clients.
2808 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2809 let client_a = server.create_client(cx_a, "user_a").await;
2810 let client_b = server.create_client(cx_b, "user_b").await;
2811
2812 // Share a project as client A
2813 let project_a = cx_a.update(|cx| {
2814 Project::local(
2815 client_a.clone(),
2816 client_a.user_store.clone(),
2817 lang_registry.clone(),
2818 fs.clone(),
2819 cx,
2820 )
2821 });
2822 let (worktree_a, _) = project_a
2823 .update(cx_a, |p, cx| {
2824 p.find_or_create_local_worktree("/root-1", true, cx)
2825 })
2826 .await
2827 .unwrap();
2828 worktree_a
2829 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2830 .await;
2831 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2832 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2833 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2834
2835 // Join the worktree as client B.
2836 let project_b = Project::remote(
2837 project_id,
2838 client_b.clone(),
2839 client_b.user_store.clone(),
2840 lang_registry.clone(),
2841 fs.clone(),
2842 &mut cx_b.to_async(),
2843 )
2844 .await
2845 .unwrap();
2846
2847 // Open the file on client B.
2848 let buffer_b = cx_b
2849 .background()
2850 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2851 .await
2852 .unwrap();
2853
2854 // Request the definition of a symbol as the guest.
2855 let fake_language_server = fake_language_servers.next().await.unwrap();
2856 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
2857 |_, _| async move {
2858 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
2859 lsp::Location::new(
2860 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2861 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2862 ),
2863 )))
2864 },
2865 );
2866
2867 let definitions_1 = project_b
2868 .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
2869 .await
2870 .unwrap();
2871 cx_b.read(|cx| {
2872 assert_eq!(definitions_1.len(), 1);
2873 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2874 let target_buffer = definitions_1[0].buffer.read(cx);
2875 assert_eq!(
2876 target_buffer.text(),
2877 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2878 );
2879 assert_eq!(
2880 definitions_1[0].range.to_point(target_buffer),
2881 Point::new(0, 6)..Point::new(0, 9)
2882 );
2883 });
2884
2885 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2886 // the previous call to `definition`.
2887 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
2888 |_, _| async move {
2889 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
2890 lsp::Location::new(
2891 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2892 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2893 ),
2894 )))
2895 },
2896 );
2897
2898 let definitions_2 = project_b
2899 .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
2900 .await
2901 .unwrap();
2902 cx_b.read(|cx| {
2903 assert_eq!(definitions_2.len(), 1);
2904 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2905 let target_buffer = definitions_2[0].buffer.read(cx);
2906 assert_eq!(
2907 target_buffer.text(),
2908 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2909 );
2910 assert_eq!(
2911 definitions_2[0].range.to_point(target_buffer),
2912 Point::new(1, 6)..Point::new(1, 11)
2913 );
2914 });
2915 assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
2916 }
2917
2918 #[gpui::test(iterations = 10)]
2919 async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2920 cx_a.foreground().forbid_parking();
2921 let lang_registry = Arc::new(LanguageRegistry::test());
2922 let fs = FakeFs::new(cx_a.background());
2923 fs.insert_tree(
2924 "/root-1",
2925 json!({
2926 ".zed.toml": r#"collaborators = ["user_b"]"#,
2927 "one.rs": "const ONE: usize = 1;",
2928 "two.rs": "const TWO: usize = one::ONE + one::ONE;",
2929 }),
2930 )
2931 .await;
2932 fs.insert_tree(
2933 "/root-2",
2934 json!({
2935 "three.rs": "const THREE: usize = two::TWO + one::ONE;",
2936 }),
2937 )
2938 .await;
2939
2940 // Set up a fake language server.
2941 let mut language = Language::new(
2942 LanguageConfig {
2943 name: "Rust".into(),
2944 path_suffixes: vec!["rs".to_string()],
2945 ..Default::default()
2946 },
2947 Some(tree_sitter_rust::language()),
2948 );
2949 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2950 lang_registry.add(Arc::new(language));
2951
2952 // Connect to a server as 2 clients.
2953 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2954 let client_a = server.create_client(cx_a, "user_a").await;
2955 let client_b = server.create_client(cx_b, "user_b").await;
2956
2957 // Share a project as client A
2958 let project_a = cx_a.update(|cx| {
2959 Project::local(
2960 client_a.clone(),
2961 client_a.user_store.clone(),
2962 lang_registry.clone(),
2963 fs.clone(),
2964 cx,
2965 )
2966 });
2967 let (worktree_a, _) = project_a
2968 .update(cx_a, |p, cx| {
2969 p.find_or_create_local_worktree("/root-1", true, cx)
2970 })
2971 .await
2972 .unwrap();
2973 worktree_a
2974 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2975 .await;
2976 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2977 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2978 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2979
2980 // Join the worktree as client B.
2981 let project_b = Project::remote(
2982 project_id,
2983 client_b.clone(),
2984 client_b.user_store.clone(),
2985 lang_registry.clone(),
2986 fs.clone(),
2987 &mut cx_b.to_async(),
2988 )
2989 .await
2990 .unwrap();
2991
2992 // Open the file on client B.
2993 let buffer_b = cx_b
2994 .background()
2995 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
2996 .await
2997 .unwrap();
2998
2999 // Request references to a symbol as the guest.
3000 let fake_language_server = fake_language_servers.next().await.unwrap();
3001 fake_language_server.handle_request::<lsp::request::References, _, _>(
3002 |params, _| async move {
3003 assert_eq!(
3004 params.text_document_position.text_document.uri.as_str(),
3005 "file:///root-1/one.rs"
3006 );
3007 Ok(Some(vec![
3008 lsp::Location {
3009 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3010 range: lsp::Range::new(
3011 lsp::Position::new(0, 24),
3012 lsp::Position::new(0, 27),
3013 ),
3014 },
3015 lsp::Location {
3016 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3017 range: lsp::Range::new(
3018 lsp::Position::new(0, 35),
3019 lsp::Position::new(0, 38),
3020 ),
3021 },
3022 lsp::Location {
3023 uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3024 range: lsp::Range::new(
3025 lsp::Position::new(0, 37),
3026 lsp::Position::new(0, 40),
3027 ),
3028 },
3029 ]))
3030 },
3031 );
3032
3033 let references = project_b
3034 .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3035 .await
3036 .unwrap();
3037 cx_b.read(|cx| {
3038 assert_eq!(references.len(), 3);
3039 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3040
3041 let two_buffer = references[0].buffer.read(cx);
3042 let three_buffer = references[2].buffer.read(cx);
3043 assert_eq!(
3044 two_buffer.file().unwrap().path().as_ref(),
3045 Path::new("two.rs")
3046 );
3047 assert_eq!(references[1].buffer, references[0].buffer);
3048 assert_eq!(
3049 three_buffer.file().unwrap().full_path(cx),
3050 Path::new("three.rs")
3051 );
3052
3053 assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3054 assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3055 assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3056 });
3057 }
3058
3059 #[gpui::test(iterations = 10)]
3060 async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3061 cx_a.foreground().forbid_parking();
3062 let lang_registry = Arc::new(LanguageRegistry::test());
3063 let fs = FakeFs::new(cx_a.background());
3064 fs.insert_tree(
3065 "/root-1",
3066 json!({
3067 ".zed.toml": r#"collaborators = ["user_b"]"#,
3068 "a": "hello world",
3069 "b": "goodnight moon",
3070 "c": "a world of goo",
3071 "d": "world champion of clown world",
3072 }),
3073 )
3074 .await;
3075 fs.insert_tree(
3076 "/root-2",
3077 json!({
3078 "e": "disney world is fun",
3079 }),
3080 )
3081 .await;
3082
3083 // Connect to a server as 2 clients.
3084 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3085 let client_a = server.create_client(cx_a, "user_a").await;
3086 let client_b = server.create_client(cx_b, "user_b").await;
3087
3088 // Share a project as client A
3089 let project_a = cx_a.update(|cx| {
3090 Project::local(
3091 client_a.clone(),
3092 client_a.user_store.clone(),
3093 lang_registry.clone(),
3094 fs.clone(),
3095 cx,
3096 )
3097 });
3098 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3099
3100 let (worktree_1, _) = project_a
3101 .update(cx_a, |p, cx| {
3102 p.find_or_create_local_worktree("/root-1", true, cx)
3103 })
3104 .await
3105 .unwrap();
3106 worktree_1
3107 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3108 .await;
3109 let (worktree_2, _) = project_a
3110 .update(cx_a, |p, cx| {
3111 p.find_or_create_local_worktree("/root-2", true, cx)
3112 })
3113 .await
3114 .unwrap();
3115 worktree_2
3116 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3117 .await;
3118
3119 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3120
3121 // Join the worktree as client B.
3122 let project_b = Project::remote(
3123 project_id,
3124 client_b.clone(),
3125 client_b.user_store.clone(),
3126 lang_registry.clone(),
3127 fs.clone(),
3128 &mut cx_b.to_async(),
3129 )
3130 .await
3131 .unwrap();
3132
3133 let results = project_b
3134 .update(cx_b, |project, cx| {
3135 project.search(SearchQuery::text("world", false, false), cx)
3136 })
3137 .await
3138 .unwrap();
3139
3140 let mut ranges_by_path = results
3141 .into_iter()
3142 .map(|(buffer, ranges)| {
3143 buffer.read_with(cx_b, |buffer, cx| {
3144 let path = buffer.file().unwrap().full_path(cx);
3145 let offset_ranges = ranges
3146 .into_iter()
3147 .map(|range| range.to_offset(buffer))
3148 .collect::<Vec<_>>();
3149 (path, offset_ranges)
3150 })
3151 })
3152 .collect::<Vec<_>>();
3153 ranges_by_path.sort_by_key(|(path, _)| path.clone());
3154
3155 assert_eq!(
3156 ranges_by_path,
3157 &[
3158 (PathBuf::from("root-1/a"), vec![6..11]),
3159 (PathBuf::from("root-1/c"), vec![2..7]),
3160 (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3161 (PathBuf::from("root-2/e"), vec![7..12]),
3162 ]
3163 );
3164 }
3165
3166 #[gpui::test(iterations = 10)]
3167 async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3168 cx_a.foreground().forbid_parking();
3169 let lang_registry = Arc::new(LanguageRegistry::test());
3170 let fs = FakeFs::new(cx_a.background());
3171 fs.insert_tree(
3172 "/root-1",
3173 json!({
3174 ".zed.toml": r#"collaborators = ["user_b"]"#,
3175 "main.rs": "fn double(number: i32) -> i32 { number + number }",
3176 }),
3177 )
3178 .await;
3179
3180 // Set up a fake language server.
3181 let mut language = Language::new(
3182 LanguageConfig {
3183 name: "Rust".into(),
3184 path_suffixes: vec!["rs".to_string()],
3185 ..Default::default()
3186 },
3187 Some(tree_sitter_rust::language()),
3188 );
3189 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3190 lang_registry.add(Arc::new(language));
3191
3192 // Connect to a server as 2 clients.
3193 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3194 let client_a = server.create_client(cx_a, "user_a").await;
3195 let client_b = server.create_client(cx_b, "user_b").await;
3196
3197 // Share a project as client A
3198 let project_a = cx_a.update(|cx| {
3199 Project::local(
3200 client_a.clone(),
3201 client_a.user_store.clone(),
3202 lang_registry.clone(),
3203 fs.clone(),
3204 cx,
3205 )
3206 });
3207 let (worktree_a, _) = project_a
3208 .update(cx_a, |p, cx| {
3209 p.find_or_create_local_worktree("/root-1", true, cx)
3210 })
3211 .await
3212 .unwrap();
3213 worktree_a
3214 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3215 .await;
3216 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3217 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3218 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3219
3220 // Join the worktree as client B.
3221 let project_b = Project::remote(
3222 project_id,
3223 client_b.clone(),
3224 client_b.user_store.clone(),
3225 lang_registry.clone(),
3226 fs.clone(),
3227 &mut cx_b.to_async(),
3228 )
3229 .await
3230 .unwrap();
3231
3232 // Open the file on client B.
3233 let buffer_b = cx_b
3234 .background()
3235 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
3236 .await
3237 .unwrap();
3238
3239 // Request document highlights as the guest.
3240 let fake_language_server = fake_language_servers.next().await.unwrap();
3241 fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
3242 |params, _| async move {
3243 assert_eq!(
3244 params
3245 .text_document_position_params
3246 .text_document
3247 .uri
3248 .as_str(),
3249 "file:///root-1/main.rs"
3250 );
3251 assert_eq!(
3252 params.text_document_position_params.position,
3253 lsp::Position::new(0, 34)
3254 );
3255 Ok(Some(vec![
3256 lsp::DocumentHighlight {
3257 kind: Some(lsp::DocumentHighlightKind::WRITE),
3258 range: lsp::Range::new(
3259 lsp::Position::new(0, 10),
3260 lsp::Position::new(0, 16),
3261 ),
3262 },
3263 lsp::DocumentHighlight {
3264 kind: Some(lsp::DocumentHighlightKind::READ),
3265 range: lsp::Range::new(
3266 lsp::Position::new(0, 32),
3267 lsp::Position::new(0, 38),
3268 ),
3269 },
3270 lsp::DocumentHighlight {
3271 kind: Some(lsp::DocumentHighlightKind::READ),
3272 range: lsp::Range::new(
3273 lsp::Position::new(0, 41),
3274 lsp::Position::new(0, 47),
3275 ),
3276 },
3277 ]))
3278 },
3279 );
3280
3281 let highlights = project_b
3282 .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
3283 .await
3284 .unwrap();
3285 buffer_b.read_with(cx_b, |buffer, _| {
3286 let snapshot = buffer.snapshot();
3287
3288 let highlights = highlights
3289 .into_iter()
3290 .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3291 .collect::<Vec<_>>();
3292 assert_eq!(
3293 highlights,
3294 &[
3295 (lsp::DocumentHighlightKind::WRITE, 10..16),
3296 (lsp::DocumentHighlightKind::READ, 32..38),
3297 (lsp::DocumentHighlightKind::READ, 41..47)
3298 ]
3299 )
3300 });
3301 }
3302
3303 #[gpui::test(iterations = 10)]
3304 async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3305 cx_a.foreground().forbid_parking();
3306 let lang_registry = Arc::new(LanguageRegistry::test());
3307 let fs = FakeFs::new(cx_a.background());
3308 fs.insert_tree(
3309 "/code",
3310 json!({
3311 "crate-1": {
3312 ".zed.toml": r#"collaborators = ["user_b"]"#,
3313 "one.rs": "const ONE: usize = 1;",
3314 },
3315 "crate-2": {
3316 "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3317 },
3318 "private": {
3319 "passwords.txt": "the-password",
3320 }
3321 }),
3322 )
3323 .await;
3324
3325 // Set up a fake language server.
3326 let mut language = Language::new(
3327 LanguageConfig {
3328 name: "Rust".into(),
3329 path_suffixes: vec!["rs".to_string()],
3330 ..Default::default()
3331 },
3332 Some(tree_sitter_rust::language()),
3333 );
3334 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3335 lang_registry.add(Arc::new(language));
3336
3337 // Connect to a server as 2 clients.
3338 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3339 let client_a = server.create_client(cx_a, "user_a").await;
3340 let client_b = server.create_client(cx_b, "user_b").await;
3341
3342 // Share a project as client A
3343 let project_a = cx_a.update(|cx| {
3344 Project::local(
3345 client_a.clone(),
3346 client_a.user_store.clone(),
3347 lang_registry.clone(),
3348 fs.clone(),
3349 cx,
3350 )
3351 });
3352 let (worktree_a, _) = project_a
3353 .update(cx_a, |p, cx| {
3354 p.find_or_create_local_worktree("/code/crate-1", true, cx)
3355 })
3356 .await
3357 .unwrap();
3358 worktree_a
3359 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3360 .await;
3361 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3362 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3363 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3364
3365 // Join the worktree as client B.
3366 let project_b = Project::remote(
3367 project_id,
3368 client_b.clone(),
3369 client_b.user_store.clone(),
3370 lang_registry.clone(),
3371 fs.clone(),
3372 &mut cx_b.to_async(),
3373 )
3374 .await
3375 .unwrap();
3376
3377 // Cause the language server to start.
3378 let _buffer = cx_b
3379 .background()
3380 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3381 .await
3382 .unwrap();
3383
3384 let fake_language_server = fake_language_servers.next().await.unwrap();
3385 fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
3386 |_, _| async move {
3387 #[allow(deprecated)]
3388 Ok(Some(vec![lsp::SymbolInformation {
3389 name: "TWO".into(),
3390 location: lsp::Location {
3391 uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3392 range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3393 },
3394 kind: lsp::SymbolKind::CONSTANT,
3395 tags: None,
3396 container_name: None,
3397 deprecated: None,
3398 }]))
3399 },
3400 );
3401
3402 // Request the definition of a symbol as the guest.
3403 let symbols = project_b
3404 .update(cx_b, |p, cx| p.symbols("two", cx))
3405 .await
3406 .unwrap();
3407 assert_eq!(symbols.len(), 1);
3408 assert_eq!(symbols[0].name, "TWO");
3409
3410 // Open one of the returned symbols.
3411 let buffer_b_2 = project_b
3412 .update(cx_b, |project, cx| {
3413 project.open_buffer_for_symbol(&symbols[0], cx)
3414 })
3415 .await
3416 .unwrap();
3417 buffer_b_2.read_with(cx_b, |buffer, _| {
3418 assert_eq!(
3419 buffer.file().unwrap().path().as_ref(),
3420 Path::new("../crate-2/two.rs")
3421 );
3422 });
3423
3424 // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3425 let mut fake_symbol = symbols[0].clone();
3426 fake_symbol.path = Path::new("/code/secrets").into();
3427 let error = project_b
3428 .update(cx_b, |project, cx| {
3429 project.open_buffer_for_symbol(&fake_symbol, cx)
3430 })
3431 .await
3432 .unwrap_err();
3433 assert!(error.to_string().contains("invalid symbol signature"));
3434 }
3435
3436 #[gpui::test(iterations = 10)]
3437 async fn test_open_buffer_while_getting_definition_pointing_to_it(
3438 cx_a: &mut TestAppContext,
3439 cx_b: &mut TestAppContext,
3440 mut rng: StdRng,
3441 ) {
3442 cx_a.foreground().forbid_parking();
3443 let lang_registry = Arc::new(LanguageRegistry::test());
3444 let fs = FakeFs::new(cx_a.background());
3445 fs.insert_tree(
3446 "/root",
3447 json!({
3448 ".zed.toml": r#"collaborators = ["user_b"]"#,
3449 "a.rs": "const ONE: usize = b::TWO;",
3450 "b.rs": "const TWO: usize = 2",
3451 }),
3452 )
3453 .await;
3454
3455 // Set up a fake language server.
3456 let mut language = Language::new(
3457 LanguageConfig {
3458 name: "Rust".into(),
3459 path_suffixes: vec!["rs".to_string()],
3460 ..Default::default()
3461 },
3462 Some(tree_sitter_rust::language()),
3463 );
3464 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3465 lang_registry.add(Arc::new(language));
3466
3467 // Connect to a server as 2 clients.
3468 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3469 let client_a = server.create_client(cx_a, "user_a").await;
3470 let client_b = server.create_client(cx_b, "user_b").await;
3471
3472 // Share a project as client A
3473 let project_a = cx_a.update(|cx| {
3474 Project::local(
3475 client_a.clone(),
3476 client_a.user_store.clone(),
3477 lang_registry.clone(),
3478 fs.clone(),
3479 cx,
3480 )
3481 });
3482
3483 let (worktree_a, _) = project_a
3484 .update(cx_a, |p, cx| {
3485 p.find_or_create_local_worktree("/root", true, cx)
3486 })
3487 .await
3488 .unwrap();
3489 worktree_a
3490 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3491 .await;
3492 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3493 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3494 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3495
3496 // Join the worktree as client B.
3497 let project_b = Project::remote(
3498 project_id,
3499 client_b.clone(),
3500 client_b.user_store.clone(),
3501 lang_registry.clone(),
3502 fs.clone(),
3503 &mut cx_b.to_async(),
3504 )
3505 .await
3506 .unwrap();
3507
3508 let buffer_b1 = cx_b
3509 .background()
3510 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3511 .await
3512 .unwrap();
3513
3514 let fake_language_server = fake_language_servers.next().await.unwrap();
3515 fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3516 |_, _| async move {
3517 Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3518 lsp::Location::new(
3519 lsp::Url::from_file_path("/root/b.rs").unwrap(),
3520 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3521 ),
3522 )))
3523 },
3524 );
3525
3526 let definitions;
3527 let buffer_b2;
3528 if rng.gen() {
3529 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3530 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3531 } else {
3532 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3533 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3534 }
3535
3536 let buffer_b2 = buffer_b2.await.unwrap();
3537 let definitions = definitions.await.unwrap();
3538 assert_eq!(definitions.len(), 1);
3539 assert_eq!(definitions[0].buffer, buffer_b2);
3540 }
3541
3542 #[gpui::test(iterations = 10)]
3543 async fn test_collaborating_with_code_actions(
3544 cx_a: &mut TestAppContext,
3545 cx_b: &mut TestAppContext,
3546 ) {
3547 cx_a.foreground().forbid_parking();
3548 let lang_registry = Arc::new(LanguageRegistry::test());
3549 let fs = FakeFs::new(cx_a.background());
3550 cx_b.update(|cx| editor::init(cx));
3551
3552 // Set up a fake language server.
3553 let mut language = Language::new(
3554 LanguageConfig {
3555 name: "Rust".into(),
3556 path_suffixes: vec!["rs".to_string()],
3557 ..Default::default()
3558 },
3559 Some(tree_sitter_rust::language()),
3560 );
3561 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3562 lang_registry.add(Arc::new(language));
3563
3564 // Connect to a server as 2 clients.
3565 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3566 let client_a = server.create_client(cx_a, "user_a").await;
3567 let client_b = server.create_client(cx_b, "user_b").await;
3568
3569 // Share a project as client A
3570 fs.insert_tree(
3571 "/a",
3572 json!({
3573 ".zed.toml": r#"collaborators = ["user_b"]"#,
3574 "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3575 "other.rs": "pub fn foo() -> usize { 4 }",
3576 }),
3577 )
3578 .await;
3579 let project_a = cx_a.update(|cx| {
3580 Project::local(
3581 client_a.clone(),
3582 client_a.user_store.clone(),
3583 lang_registry.clone(),
3584 fs.clone(),
3585 cx,
3586 )
3587 });
3588 let (worktree_a, _) = project_a
3589 .update(cx_a, |p, cx| {
3590 p.find_or_create_local_worktree("/a", true, cx)
3591 })
3592 .await
3593 .unwrap();
3594 worktree_a
3595 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3596 .await;
3597 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3598 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3599 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3600
3601 // Join the worktree as client B.
3602 let project_b = Project::remote(
3603 project_id,
3604 client_b.clone(),
3605 client_b.user_store.clone(),
3606 lang_registry.clone(),
3607 fs.clone(),
3608 &mut cx_b.to_async(),
3609 )
3610 .await
3611 .unwrap();
3612 let mut params = cx_b.update(WorkspaceParams::test);
3613 params.languages = lang_registry.clone();
3614 params.client = client_b.client.clone();
3615 params.user_store = client_b.user_store.clone();
3616 params.project = project_b;
3617
3618 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
3619 let editor_b = workspace_b
3620 .update(cx_b, |workspace, cx| {
3621 workspace.open_path((worktree_id, "main.rs"), cx)
3622 })
3623 .await
3624 .unwrap()
3625 .downcast::<Editor>()
3626 .unwrap();
3627
3628 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3629 fake_language_server
3630 .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
3631 assert_eq!(
3632 params.text_document.uri,
3633 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3634 );
3635 assert_eq!(params.range.start, lsp::Position::new(0, 0));
3636 assert_eq!(params.range.end, lsp::Position::new(0, 0));
3637 Ok(None)
3638 })
3639 .next()
3640 .await;
3641
3642 // Move cursor to a location that contains code actions.
3643 editor_b.update(cx_b, |editor, cx| {
3644 editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3645 cx.focus(&editor_b);
3646 });
3647
3648 fake_language_server
3649 .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
3650 assert_eq!(
3651 params.text_document.uri,
3652 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3653 );
3654 assert_eq!(params.range.start, lsp::Position::new(1, 31));
3655 assert_eq!(params.range.end, lsp::Position::new(1, 31));
3656
3657 Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
3658 lsp::CodeAction {
3659 title: "Inline into all callers".to_string(),
3660 edit: Some(lsp::WorkspaceEdit {
3661 changes: Some(
3662 [
3663 (
3664 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3665 vec![lsp::TextEdit::new(
3666 lsp::Range::new(
3667 lsp::Position::new(1, 22),
3668 lsp::Position::new(1, 34),
3669 ),
3670 "4".to_string(),
3671 )],
3672 ),
3673 (
3674 lsp::Url::from_file_path("/a/other.rs").unwrap(),
3675 vec![lsp::TextEdit::new(
3676 lsp::Range::new(
3677 lsp::Position::new(0, 0),
3678 lsp::Position::new(0, 27),
3679 ),
3680 "".to_string(),
3681 )],
3682 ),
3683 ]
3684 .into_iter()
3685 .collect(),
3686 ),
3687 ..Default::default()
3688 }),
3689 data: Some(json!({
3690 "codeActionParams": {
3691 "range": {
3692 "start": {"line": 1, "column": 31},
3693 "end": {"line": 1, "column": 31},
3694 }
3695 }
3696 })),
3697 ..Default::default()
3698 },
3699 )]))
3700 })
3701 .next()
3702 .await;
3703
3704 // Toggle code actions and wait for them to display.
3705 editor_b.update(cx_b, |editor, cx| {
3706 editor.toggle_code_actions(&ToggleCodeActions(false), cx);
3707 });
3708 editor_b
3709 .condition(&cx_b, |editor, _| editor.context_menu_visible())
3710 .await;
3711
3712 fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3713
3714 // Confirming the code action will trigger a resolve request.
3715 let confirm_action = workspace_b
3716 .update(cx_b, |workspace, cx| {
3717 Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
3718 })
3719 .unwrap();
3720 fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
3721 |_, _| async move {
3722 Ok(lsp::CodeAction {
3723 title: "Inline into all callers".to_string(),
3724 edit: Some(lsp::WorkspaceEdit {
3725 changes: Some(
3726 [
3727 (
3728 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3729 vec![lsp::TextEdit::new(
3730 lsp::Range::new(
3731 lsp::Position::new(1, 22),
3732 lsp::Position::new(1, 34),
3733 ),
3734 "4".to_string(),
3735 )],
3736 ),
3737 (
3738 lsp::Url::from_file_path("/a/other.rs").unwrap(),
3739 vec![lsp::TextEdit::new(
3740 lsp::Range::new(
3741 lsp::Position::new(0, 0),
3742 lsp::Position::new(0, 27),
3743 ),
3744 "".to_string(),
3745 )],
3746 ),
3747 ]
3748 .into_iter()
3749 .collect(),
3750 ),
3751 ..Default::default()
3752 }),
3753 ..Default::default()
3754 })
3755 },
3756 );
3757
3758 // After the action is confirmed, an editor containing both modified files is opened.
3759 confirm_action.await.unwrap();
3760 let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3761 workspace
3762 .active_item(cx)
3763 .unwrap()
3764 .downcast::<Editor>()
3765 .unwrap()
3766 });
3767 code_action_editor.update(cx_b, |editor, cx| {
3768 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3769 editor.undo(&Undo, cx);
3770 assert_eq!(
3771 editor.text(cx),
3772 "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3773 );
3774 editor.redo(&Redo, cx);
3775 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3776 });
3777 }
3778
3779 #[gpui::test(iterations = 10)]
3780 async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3781 cx_a.foreground().forbid_parking();
3782 let lang_registry = Arc::new(LanguageRegistry::test());
3783 let fs = FakeFs::new(cx_a.background());
3784 cx_b.update(|cx| editor::init(cx));
3785
3786 // Set up a fake language server.
3787 let mut language = Language::new(
3788 LanguageConfig {
3789 name: "Rust".into(),
3790 path_suffixes: vec!["rs".to_string()],
3791 ..Default::default()
3792 },
3793 Some(tree_sitter_rust::language()),
3794 );
3795 let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3796 lang_registry.add(Arc::new(language));
3797
3798 // Connect to a server as 2 clients.
3799 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3800 let client_a = server.create_client(cx_a, "user_a").await;
3801 let client_b = server.create_client(cx_b, "user_b").await;
3802
3803 // Share a project as client A
3804 fs.insert_tree(
3805 "/dir",
3806 json!({
3807 ".zed.toml": r#"collaborators = ["user_b"]"#,
3808 "one.rs": "const ONE: usize = 1;",
3809 "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3810 }),
3811 )
3812 .await;
3813 let project_a = cx_a.update(|cx| {
3814 Project::local(
3815 client_a.clone(),
3816 client_a.user_store.clone(),
3817 lang_registry.clone(),
3818 fs.clone(),
3819 cx,
3820 )
3821 });
3822 let (worktree_a, _) = project_a
3823 .update(cx_a, |p, cx| {
3824 p.find_or_create_local_worktree("/dir", true, cx)
3825 })
3826 .await
3827 .unwrap();
3828 worktree_a
3829 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3830 .await;
3831 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3832 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3833 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3834
3835 // Join the worktree as client B.
3836 let project_b = Project::remote(
3837 project_id,
3838 client_b.clone(),
3839 client_b.user_store.clone(),
3840 lang_registry.clone(),
3841 fs.clone(),
3842 &mut cx_b.to_async(),
3843 )
3844 .await
3845 .unwrap();
3846 let mut params = cx_b.update(WorkspaceParams::test);
3847 params.languages = lang_registry.clone();
3848 params.client = client_b.client.clone();
3849 params.user_store = client_b.user_store.clone();
3850 params.project = project_b;
3851
3852 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
3853 let editor_b = workspace_b
3854 .update(cx_b, |workspace, cx| {
3855 workspace.open_path((worktree_id, "one.rs"), cx)
3856 })
3857 .await
3858 .unwrap()
3859 .downcast::<Editor>()
3860 .unwrap();
3861 let fake_language_server = fake_language_servers.next().await.unwrap();
3862
3863 // Move cursor to a location that can be renamed.
3864 let prepare_rename = editor_b.update(cx_b, |editor, cx| {
3865 editor.select_ranges([7..7], None, cx);
3866 editor.rename(&Rename, cx).unwrap()
3867 });
3868
3869 fake_language_server
3870 .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
3871 assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3872 assert_eq!(params.position, lsp::Position::new(0, 7));
3873 Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3874 lsp::Position::new(0, 6),
3875 lsp::Position::new(0, 9),
3876 ))))
3877 })
3878 .next()
3879 .await
3880 .unwrap();
3881 prepare_rename.await.unwrap();
3882 editor_b.update(cx_b, |editor, cx| {
3883 let rename = editor.pending_rename().unwrap();
3884 let buffer = editor.buffer().read(cx).snapshot(cx);
3885 assert_eq!(
3886 rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3887 6..9
3888 );
3889 rename.editor.update(cx, |rename_editor, cx| {
3890 rename_editor.buffer().update(cx, |rename_buffer, cx| {
3891 rename_buffer.edit([0..3], "THREE", cx);
3892 });
3893 });
3894 });
3895
3896 let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
3897 Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3898 });
3899 fake_language_server
3900 .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
3901 assert_eq!(
3902 params.text_document_position.text_document.uri.as_str(),
3903 "file:///dir/one.rs"
3904 );
3905 assert_eq!(
3906 params.text_document_position.position,
3907 lsp::Position::new(0, 6)
3908 );
3909 assert_eq!(params.new_name, "THREE");
3910 Ok(Some(lsp::WorkspaceEdit {
3911 changes: Some(
3912 [
3913 (
3914 lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3915 vec![lsp::TextEdit::new(
3916 lsp::Range::new(
3917 lsp::Position::new(0, 6),
3918 lsp::Position::new(0, 9),
3919 ),
3920 "THREE".to_string(),
3921 )],
3922 ),
3923 (
3924 lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3925 vec![
3926 lsp::TextEdit::new(
3927 lsp::Range::new(
3928 lsp::Position::new(0, 24),
3929 lsp::Position::new(0, 27),
3930 ),
3931 "THREE".to_string(),
3932 ),
3933 lsp::TextEdit::new(
3934 lsp::Range::new(
3935 lsp::Position::new(0, 35),
3936 lsp::Position::new(0, 38),
3937 ),
3938 "THREE".to_string(),
3939 ),
3940 ],
3941 ),
3942 ]
3943 .into_iter()
3944 .collect(),
3945 ),
3946 ..Default::default()
3947 }))
3948 })
3949 .next()
3950 .await
3951 .unwrap();
3952 confirm_rename.await.unwrap();
3953
3954 let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3955 workspace
3956 .active_item(cx)
3957 .unwrap()
3958 .downcast::<Editor>()
3959 .unwrap()
3960 });
3961 rename_editor.update(cx_b, |editor, cx| {
3962 assert_eq!(
3963 editor.text(cx),
3964 "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3965 );
3966 editor.undo(&Undo, cx);
3967 assert_eq!(
3968 editor.text(cx),
3969 "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3970 );
3971 editor.redo(&Redo, cx);
3972 assert_eq!(
3973 editor.text(cx),
3974 "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3975 );
3976 });
3977
3978 // Ensure temporary rename edits cannot be undone/redone.
3979 editor_b.update(cx_b, |editor, cx| {
3980 editor.undo(&Undo, cx);
3981 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3982 editor.undo(&Undo, cx);
3983 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3984 editor.redo(&Redo, cx);
3985 assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3986 })
3987 }
3988
3989 #[gpui::test(iterations = 10)]
3990 async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3991 cx_a.foreground().forbid_parking();
3992
3993 // Connect to a server as 2 clients.
3994 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3995 let client_a = server.create_client(cx_a, "user_a").await;
3996 let client_b = server.create_client(cx_b, "user_b").await;
3997
3998 // Create an org that includes these 2 users.
3999 let db = &server.app_state.db;
4000 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4001 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4002 .await
4003 .unwrap();
4004 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4005 .await
4006 .unwrap();
4007
4008 // Create a channel that includes all the users.
4009 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4010 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4011 .await
4012 .unwrap();
4013 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4014 .await
4015 .unwrap();
4016 db.create_channel_message(
4017 channel_id,
4018 client_b.current_user_id(&cx_b),
4019 "hello A, it's B.",
4020 OffsetDateTime::now_utc(),
4021 1,
4022 )
4023 .await
4024 .unwrap();
4025
4026 let channels_a = cx_a
4027 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4028 channels_a
4029 .condition(cx_a, |list, _| list.available_channels().is_some())
4030 .await;
4031 channels_a.read_with(cx_a, |list, _| {
4032 assert_eq!(
4033 list.available_channels().unwrap(),
4034 &[ChannelDetails {
4035 id: channel_id.to_proto(),
4036 name: "test-channel".to_string()
4037 }]
4038 )
4039 });
4040 let channel_a = channels_a.update(cx_a, |this, cx| {
4041 this.get_channel(channel_id.to_proto(), cx).unwrap()
4042 });
4043 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4044 channel_a
4045 .condition(&cx_a, |channel, _| {
4046 channel_messages(channel)
4047 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4048 })
4049 .await;
4050
4051 let channels_b = cx_b
4052 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4053 channels_b
4054 .condition(cx_b, |list, _| list.available_channels().is_some())
4055 .await;
4056 channels_b.read_with(cx_b, |list, _| {
4057 assert_eq!(
4058 list.available_channels().unwrap(),
4059 &[ChannelDetails {
4060 id: channel_id.to_proto(),
4061 name: "test-channel".to_string()
4062 }]
4063 )
4064 });
4065
4066 let channel_b = channels_b.update(cx_b, |this, cx| {
4067 this.get_channel(channel_id.to_proto(), cx).unwrap()
4068 });
4069 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4070 channel_b
4071 .condition(&cx_b, |channel, _| {
4072 channel_messages(channel)
4073 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4074 })
4075 .await;
4076
4077 channel_a
4078 .update(cx_a, |channel, cx| {
4079 channel
4080 .send_message("oh, hi B.".to_string(), cx)
4081 .unwrap()
4082 .detach();
4083 let task = channel.send_message("sup".to_string(), cx).unwrap();
4084 assert_eq!(
4085 channel_messages(channel),
4086 &[
4087 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4088 ("user_a".to_string(), "oh, hi B.".to_string(), true),
4089 ("user_a".to_string(), "sup".to_string(), true)
4090 ]
4091 );
4092 task
4093 })
4094 .await
4095 .unwrap();
4096
4097 channel_b
4098 .condition(&cx_b, |channel, _| {
4099 channel_messages(channel)
4100 == [
4101 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4102 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4103 ("user_a".to_string(), "sup".to_string(), false),
4104 ]
4105 })
4106 .await;
4107
4108 assert_eq!(
4109 server
4110 .state()
4111 .await
4112 .channel(channel_id)
4113 .unwrap()
4114 .connection_ids
4115 .len(),
4116 2
4117 );
4118 cx_b.update(|_| drop(channel_b));
4119 server
4120 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4121 .await;
4122
4123 cx_a.update(|_| drop(channel_a));
4124 server
4125 .condition(|state| state.channel(channel_id).is_none())
4126 .await;
4127 }
4128
4129 #[gpui::test(iterations = 10)]
4130 async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4131 cx_a.foreground().forbid_parking();
4132
4133 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4134 let client_a = server.create_client(cx_a, "user_a").await;
4135
4136 let db = &server.app_state.db;
4137 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4138 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4139 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4140 .await
4141 .unwrap();
4142 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4143 .await
4144 .unwrap();
4145
4146 let channels_a = cx_a
4147 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4148 channels_a
4149 .condition(cx_a, |list, _| list.available_channels().is_some())
4150 .await;
4151 let channel_a = channels_a.update(cx_a, |this, cx| {
4152 this.get_channel(channel_id.to_proto(), cx).unwrap()
4153 });
4154
4155 // Messages aren't allowed to be too long.
4156 channel_a
4157 .update(cx_a, |channel, cx| {
4158 let long_body = "this is long.\n".repeat(1024);
4159 channel.send_message(long_body, cx).unwrap()
4160 })
4161 .await
4162 .unwrap_err();
4163
4164 // Messages aren't allowed to be blank.
4165 channel_a.update(cx_a, |channel, cx| {
4166 channel.send_message(String::new(), cx).unwrap_err()
4167 });
4168
4169 // Leading and trailing whitespace are trimmed.
4170 channel_a
4171 .update(cx_a, |channel, cx| {
4172 channel
4173 .send_message("\n surrounded by whitespace \n".to_string(), cx)
4174 .unwrap()
4175 })
4176 .await
4177 .unwrap();
4178 assert_eq!(
4179 db.get_channel_messages(channel_id, 10, None)
4180 .await
4181 .unwrap()
4182 .iter()
4183 .map(|m| &m.body)
4184 .collect::<Vec<_>>(),
4185 &["surrounded by whitespace"]
4186 );
4187 }
4188
4189 #[gpui::test(iterations = 10)]
4190 async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4191 cx_a.foreground().forbid_parking();
4192
4193 // Connect to a server as 2 clients.
4194 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4195 let client_a = server.create_client(cx_a, "user_a").await;
4196 let client_b = server.create_client(cx_b, "user_b").await;
4197 let mut status_b = client_b.status();
4198
4199 // Create an org that includes these 2 users.
4200 let db = &server.app_state.db;
4201 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4202 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4203 .await
4204 .unwrap();
4205 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4206 .await
4207 .unwrap();
4208
4209 // Create a channel that includes all the users.
4210 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4211 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4212 .await
4213 .unwrap();
4214 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4215 .await
4216 .unwrap();
4217 db.create_channel_message(
4218 channel_id,
4219 client_b.current_user_id(&cx_b),
4220 "hello A, it's B.",
4221 OffsetDateTime::now_utc(),
4222 2,
4223 )
4224 .await
4225 .unwrap();
4226
4227 let channels_a = cx_a
4228 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4229 channels_a
4230 .condition(cx_a, |list, _| list.available_channels().is_some())
4231 .await;
4232
4233 channels_a.read_with(cx_a, |list, _| {
4234 assert_eq!(
4235 list.available_channels().unwrap(),
4236 &[ChannelDetails {
4237 id: channel_id.to_proto(),
4238 name: "test-channel".to_string()
4239 }]
4240 )
4241 });
4242 let channel_a = channels_a.update(cx_a, |this, cx| {
4243 this.get_channel(channel_id.to_proto(), cx).unwrap()
4244 });
4245 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4246 channel_a
4247 .condition(&cx_a, |channel, _| {
4248 channel_messages(channel)
4249 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4250 })
4251 .await;
4252
4253 let channels_b = cx_b
4254 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4255 channels_b
4256 .condition(cx_b, |list, _| list.available_channels().is_some())
4257 .await;
4258 channels_b.read_with(cx_b, |list, _| {
4259 assert_eq!(
4260 list.available_channels().unwrap(),
4261 &[ChannelDetails {
4262 id: channel_id.to_proto(),
4263 name: "test-channel".to_string()
4264 }]
4265 )
4266 });
4267
4268 let channel_b = channels_b.update(cx_b, |this, cx| {
4269 this.get_channel(channel_id.to_proto(), cx).unwrap()
4270 });
4271 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4272 channel_b
4273 .condition(&cx_b, |channel, _| {
4274 channel_messages(channel)
4275 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4276 })
4277 .await;
4278
4279 // Disconnect client B, ensuring we can still access its cached channel data.
4280 server.forbid_connections();
4281 server.disconnect_client(client_b.current_user_id(&cx_b));
4282 cx_b.foreground().advance_clock(Duration::from_secs(3));
4283 while !matches!(
4284 status_b.next().await,
4285 Some(client::Status::ReconnectionError { .. })
4286 ) {}
4287
4288 channels_b.read_with(cx_b, |channels, _| {
4289 assert_eq!(
4290 channels.available_channels().unwrap(),
4291 [ChannelDetails {
4292 id: channel_id.to_proto(),
4293 name: "test-channel".to_string()
4294 }]
4295 )
4296 });
4297 channel_b.read_with(cx_b, |channel, _| {
4298 assert_eq!(
4299 channel_messages(channel),
4300 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4301 )
4302 });
4303
4304 // Send a message from client B while it is disconnected.
4305 channel_b
4306 .update(cx_b, |channel, cx| {
4307 let task = channel
4308 .send_message("can you see this?".to_string(), cx)
4309 .unwrap();
4310 assert_eq!(
4311 channel_messages(channel),
4312 &[
4313 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4314 ("user_b".to_string(), "can you see this?".to_string(), true)
4315 ]
4316 );
4317 task
4318 })
4319 .await
4320 .unwrap_err();
4321
4322 // Send a message from client A while B is disconnected.
4323 channel_a
4324 .update(cx_a, |channel, cx| {
4325 channel
4326 .send_message("oh, hi B.".to_string(), cx)
4327 .unwrap()
4328 .detach();
4329 let task = channel.send_message("sup".to_string(), cx).unwrap();
4330 assert_eq!(
4331 channel_messages(channel),
4332 &[
4333 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4334 ("user_a".to_string(), "oh, hi B.".to_string(), true),
4335 ("user_a".to_string(), "sup".to_string(), true)
4336 ]
4337 );
4338 task
4339 })
4340 .await
4341 .unwrap();
4342
4343 // Give client B a chance to reconnect.
4344 server.allow_connections();
4345 cx_b.foreground().advance_clock(Duration::from_secs(10));
4346
4347 // Verify that B sees the new messages upon reconnection, as well as the message client B
4348 // sent while offline.
4349 channel_b
4350 .condition(&cx_b, |channel, _| {
4351 channel_messages(channel)
4352 == [
4353 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4354 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4355 ("user_a".to_string(), "sup".to_string(), false),
4356 ("user_b".to_string(), "can you see this?".to_string(), false),
4357 ]
4358 })
4359 .await;
4360
4361 // Ensure client A and B can communicate normally after reconnection.
4362 channel_a
4363 .update(cx_a, |channel, cx| {
4364 channel.send_message("you online?".to_string(), cx).unwrap()
4365 })
4366 .await
4367 .unwrap();
4368 channel_b
4369 .condition(&cx_b, |channel, _| {
4370 channel_messages(channel)
4371 == [
4372 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4373 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4374 ("user_a".to_string(), "sup".to_string(), false),
4375 ("user_b".to_string(), "can you see this?".to_string(), false),
4376 ("user_a".to_string(), "you online?".to_string(), false),
4377 ]
4378 })
4379 .await;
4380
4381 channel_b
4382 .update(cx_b, |channel, cx| {
4383 channel.send_message("yep".to_string(), cx).unwrap()
4384 })
4385 .await
4386 .unwrap();
4387 channel_a
4388 .condition(&cx_a, |channel, _| {
4389 channel_messages(channel)
4390 == [
4391 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4392 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4393 ("user_a".to_string(), "sup".to_string(), false),
4394 ("user_b".to_string(), "can you see this?".to_string(), false),
4395 ("user_a".to_string(), "you online?".to_string(), false),
4396 ("user_b".to_string(), "yep".to_string(), false),
4397 ]
4398 })
4399 .await;
4400 }
4401
4402 #[gpui::test(iterations = 10)]
4403 async fn test_contacts(
4404 cx_a: &mut TestAppContext,
4405 cx_b: &mut TestAppContext,
4406 cx_c: &mut TestAppContext,
4407 ) {
4408 cx_a.foreground().forbid_parking();
4409 let lang_registry = Arc::new(LanguageRegistry::test());
4410 let fs = FakeFs::new(cx_a.background());
4411
4412 // Connect to a server as 3 clients.
4413 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4414 let client_a = server.create_client(cx_a, "user_a").await;
4415 let client_b = server.create_client(cx_b, "user_b").await;
4416 let client_c = server.create_client(cx_c, "user_c").await;
4417
4418 // Share a worktree as client A.
4419 fs.insert_tree(
4420 "/a",
4421 json!({
4422 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4423 }),
4424 )
4425 .await;
4426
4427 let project_a = cx_a.update(|cx| {
4428 Project::local(
4429 client_a.clone(),
4430 client_a.user_store.clone(),
4431 lang_registry.clone(),
4432 fs.clone(),
4433 cx,
4434 )
4435 });
4436 let (worktree_a, _) = project_a
4437 .update(cx_a, |p, cx| {
4438 p.find_or_create_local_worktree("/a", true, cx)
4439 })
4440 .await
4441 .unwrap();
4442 worktree_a
4443 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4444 .await;
4445
4446 client_a
4447 .user_store
4448 .condition(&cx_a, |user_store, _| {
4449 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4450 })
4451 .await;
4452 client_b
4453 .user_store
4454 .condition(&cx_b, |user_store, _| {
4455 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4456 })
4457 .await;
4458 client_c
4459 .user_store
4460 .condition(&cx_c, |user_store, _| {
4461 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4462 })
4463 .await;
4464
4465 let project_id = project_a
4466 .update(cx_a, |project, _| project.next_remote_id())
4467 .await;
4468 project_a
4469 .update(cx_a, |project, cx| project.share(cx))
4470 .await
4471 .unwrap();
4472
4473 let _project_b = Project::remote(
4474 project_id,
4475 client_b.clone(),
4476 client_b.user_store.clone(),
4477 lang_registry.clone(),
4478 fs.clone(),
4479 &mut cx_b.to_async(),
4480 )
4481 .await
4482 .unwrap();
4483
4484 client_a
4485 .user_store
4486 .condition(&cx_a, |user_store, _| {
4487 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4488 })
4489 .await;
4490 client_b
4491 .user_store
4492 .condition(&cx_b, |user_store, _| {
4493 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4494 })
4495 .await;
4496 client_c
4497 .user_store
4498 .condition(&cx_c, |user_store, _| {
4499 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4500 })
4501 .await;
4502
4503 project_a
4504 .condition(&cx_a, |project, _| {
4505 project.collaborators().contains_key(&client_b.peer_id)
4506 })
4507 .await;
4508
4509 cx_a.update(move |_| drop(project_a));
4510 client_a
4511 .user_store
4512 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4513 .await;
4514 client_b
4515 .user_store
4516 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4517 .await;
4518 client_c
4519 .user_store
4520 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4521 .await;
4522
4523 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
4524 user_store
4525 .contacts()
4526 .iter()
4527 .map(|contact| {
4528 let worktrees = contact
4529 .projects
4530 .iter()
4531 .map(|p| {
4532 (
4533 p.worktree_root_names[0].as_str(),
4534 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4535 )
4536 })
4537 .collect();
4538 (contact.user.github_login.as_str(), worktrees)
4539 })
4540 .collect()
4541 }
4542 }
4543
4544 #[gpui::test(iterations = 10)]
4545 async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4546 cx_a.foreground().forbid_parking();
4547 let fs = FakeFs::new(cx_a.background());
4548
4549 // 2 clients connect to a server.
4550 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4551 let mut client_a = server.create_client(cx_a, "user_a").await;
4552 let mut client_b = server.create_client(cx_b, "user_b").await;
4553 cx_a.update(editor::init);
4554 cx_b.update(editor::init);
4555
4556 // Client A shares a project.
4557 fs.insert_tree(
4558 "/a",
4559 json!({
4560 ".zed.toml": r#"collaborators = ["user_b"]"#,
4561 "1.txt": "one",
4562 "2.txt": "two",
4563 "3.txt": "three",
4564 }),
4565 )
4566 .await;
4567 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
4568 project_a
4569 .update(cx_a, |project, cx| project.share(cx))
4570 .await
4571 .unwrap();
4572
4573 // Client B joins the project.
4574 let project_b = client_b
4575 .build_remote_project(
4576 project_a
4577 .read_with(cx_a, |project, _| project.remote_id())
4578 .unwrap(),
4579 cx_b,
4580 )
4581 .await;
4582
4583 // Client A opens some editors.
4584 let workspace_a = client_a.build_workspace(&project_a, cx_a);
4585 let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
4586 let editor_a1 = workspace_a
4587 .update(cx_a, |workspace, cx| {
4588 workspace.open_path((worktree_id, "1.txt"), cx)
4589 })
4590 .await
4591 .unwrap()
4592 .downcast::<Editor>()
4593 .unwrap();
4594 let editor_a2 = workspace_a
4595 .update(cx_a, |workspace, cx| {
4596 workspace.open_path((worktree_id, "2.txt"), cx)
4597 })
4598 .await
4599 .unwrap()
4600 .downcast::<Editor>()
4601 .unwrap();
4602
4603 // Client B opens an editor.
4604 let workspace_b = client_b.build_workspace(&project_b, cx_b);
4605 let editor_b1 = workspace_b
4606 .update(cx_b, |workspace, cx| {
4607 workspace.open_path((worktree_id, "1.txt"), cx)
4608 })
4609 .await
4610 .unwrap()
4611 .downcast::<Editor>()
4612 .unwrap();
4613
4614 let client_a_id = project_b.read_with(cx_b, |project, _| {
4615 project.collaborators().values().next().unwrap().peer_id
4616 });
4617 let client_b_id = project_a.read_with(cx_a, |project, _| {
4618 project.collaborators().values().next().unwrap().peer_id
4619 });
4620
4621 // When client B starts following client A, all visible view states are replicated to client B.
4622 editor_a1.update(cx_a, |editor, cx| editor.select_ranges([0..1], None, cx));
4623 editor_a2.update(cx_a, |editor, cx| editor.select_ranges([2..3], None, cx));
4624 workspace_b
4625 .update(cx_b, |workspace, cx| {
4626 workspace
4627 .toggle_follow(&ToggleFollow(client_a_id), cx)
4628 .unwrap()
4629 })
4630 .await
4631 .unwrap();
4632 let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
4633 workspace
4634 .active_item(cx)
4635 .unwrap()
4636 .downcast::<Editor>()
4637 .unwrap()
4638 });
4639 assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
4640 assert_eq!(
4641 editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
4642 Some((worktree_id, "2.txt").into())
4643 );
4644 assert_eq!(
4645 editor_b2.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
4646 vec![2..3]
4647 );
4648 assert_eq!(
4649 editor_b1.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
4650 vec![0..1]
4651 );
4652
4653 // When client A activates a different editor, client B does so as well.
4654 workspace_a.update(cx_a, |workspace, cx| {
4655 workspace.activate_item(&editor_a1, cx)
4656 });
4657 workspace_b
4658 .condition(cx_b, |workspace, cx| {
4659 workspace.active_item(cx).unwrap().id() == editor_b1.id()
4660 })
4661 .await;
4662
4663 // When client A navigates back and forth, client B does so as well.
4664 workspace_a
4665 .update(cx_a, |workspace, cx| {
4666 workspace::Pane::go_back(workspace, None, cx)
4667 })
4668 .await;
4669 workspace_b
4670 .condition(cx_b, |workspace, cx| {
4671 workspace.active_item(cx).unwrap().id() == editor_b2.id()
4672 })
4673 .await;
4674
4675 workspace_a
4676 .update(cx_a, |workspace, cx| {
4677 workspace::Pane::go_forward(workspace, None, cx)
4678 })
4679 .await;
4680 workspace_b
4681 .condition(cx_b, |workspace, cx| {
4682 workspace.active_item(cx).unwrap().id() == editor_b1.id()
4683 })
4684 .await;
4685
4686 // Changes to client A's editor are reflected on client B.
4687 editor_a1.update(cx_a, |editor, cx| {
4688 editor.select_ranges([1..1, 2..2], None, cx);
4689 });
4690 editor_b1
4691 .condition(cx_b, |editor, cx| {
4692 editor.selected_ranges(cx) == vec![1..1, 2..2]
4693 })
4694 .await;
4695
4696 editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
4697 editor_b1
4698 .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
4699 .await;
4700
4701 editor_a1.update(cx_a, |editor, cx| {
4702 editor.select_ranges([3..3], None, cx);
4703 editor.set_scroll_position(vec2f(0., 100.), cx);
4704 });
4705 editor_b1
4706 .condition(cx_b, |editor, cx| editor.selected_ranges(cx) == vec![3..3])
4707 .await;
4708
4709 // After unfollowing, client B stops receiving updates from client A.
4710 workspace_b.update(cx_b, |workspace, cx| {
4711 workspace.unfollow(&workspace.active_pane().clone(), cx)
4712 });
4713 workspace_a.update(cx_a, |workspace, cx| {
4714 workspace.activate_item(&editor_a2, cx)
4715 });
4716 cx_a.foreground().run_until_parked();
4717 assert_eq!(
4718 workspace_b.read_with(cx_b, |workspace, cx| workspace
4719 .active_item(cx)
4720 .unwrap()
4721 .id()),
4722 editor_b1.id()
4723 );
4724
4725 // Client A starts following client B.
4726 workspace_a
4727 .update(cx_a, |workspace, cx| {
4728 workspace
4729 .toggle_follow(&ToggleFollow(client_b_id), cx)
4730 .unwrap()
4731 })
4732 .await
4733 .unwrap();
4734 assert_eq!(
4735 workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
4736 Some(client_b_id)
4737 );
4738 assert_eq!(
4739 workspace_a.read_with(cx_a, |workspace, cx| workspace
4740 .active_item(cx)
4741 .unwrap()
4742 .id()),
4743 editor_a1.id()
4744 );
4745
4746 // Following interrupts when client B disconnects.
4747 client_b.disconnect(&cx_b.to_async()).unwrap();
4748 cx_a.foreground().run_until_parked();
4749 assert_eq!(
4750 workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
4751 None
4752 );
4753 }
4754
4755 #[gpui::test(iterations = 10)]
4756 async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4757 cx_a.foreground().forbid_parking();
4758 let fs = FakeFs::new(cx_a.background());
4759
4760 // 2 clients connect to a server.
4761 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4762 let mut client_a = server.create_client(cx_a, "user_a").await;
4763 let mut client_b = server.create_client(cx_b, "user_b").await;
4764 cx_a.update(editor::init);
4765 cx_b.update(editor::init);
4766
4767 // Client A shares a project.
4768 fs.insert_tree(
4769 "/a",
4770 json!({
4771 ".zed.toml": r#"collaborators = ["user_b"]"#,
4772 "1.txt": "one",
4773 "2.txt": "two",
4774 "3.txt": "three",
4775 "4.txt": "four",
4776 }),
4777 )
4778 .await;
4779 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
4780 project_a
4781 .update(cx_a, |project, cx| project.share(cx))
4782 .await
4783 .unwrap();
4784
4785 // Client B joins the project.
4786 let project_b = client_b
4787 .build_remote_project(
4788 project_a
4789 .read_with(cx_a, |project, _| project.remote_id())
4790 .unwrap(),
4791 cx_b,
4792 )
4793 .await;
4794
4795 // Client A opens some editors.
4796 let workspace_a = client_a.build_workspace(&project_a, cx_a);
4797 let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
4798 let _editor_a1 = workspace_a
4799 .update(cx_a, |workspace, cx| {
4800 workspace.open_path((worktree_id, "1.txt"), cx)
4801 })
4802 .await
4803 .unwrap()
4804 .downcast::<Editor>()
4805 .unwrap();
4806
4807 // Client B opens an editor.
4808 let workspace_b = client_b.build_workspace(&project_b, cx_b);
4809 let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
4810 let _editor_b1 = workspace_b
4811 .update(cx_b, |workspace, cx| {
4812 workspace.open_path((worktree_id, "2.txt"), cx)
4813 })
4814 .await
4815 .unwrap()
4816 .downcast::<Editor>()
4817 .unwrap();
4818
4819 // Clients A and B follow each other in split panes
4820 workspace_a
4821 .update(cx_a, |workspace, cx| {
4822 workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
4823 assert_ne!(*workspace.active_pane(), pane_a1);
4824 let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
4825 workspace
4826 .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
4827 .unwrap()
4828 })
4829 .await
4830 .unwrap();
4831 workspace_b
4832 .update(cx_b, |workspace, cx| {
4833 workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
4834 assert_ne!(*workspace.active_pane(), pane_b1);
4835 let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
4836 workspace
4837 .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
4838 .unwrap()
4839 })
4840 .await
4841 .unwrap();
4842
4843 workspace_a
4844 .update(cx_a, |workspace, cx| {
4845 workspace.activate_next_pane(cx);
4846 assert_eq!(*workspace.active_pane(), pane_a1);
4847 workspace.open_path((worktree_id, "3.txt"), cx)
4848 })
4849 .await
4850 .unwrap();
4851 workspace_b
4852 .update(cx_b, |workspace, cx| {
4853 workspace.activate_next_pane(cx);
4854 assert_eq!(*workspace.active_pane(), pane_b1);
4855 workspace.open_path((worktree_id, "4.txt"), cx)
4856 })
4857 .await
4858 .unwrap();
4859 cx_a.foreground().run_until_parked();
4860
4861 // Ensure leader updates don't change the active pane of followers
4862 workspace_a.read_with(cx_a, |workspace, _| {
4863 assert_eq!(*workspace.active_pane(), pane_a1);
4864 });
4865 workspace_b.read_with(cx_b, |workspace, _| {
4866 assert_eq!(*workspace.active_pane(), pane_b1);
4867 });
4868
4869 // Ensure peers following each other doesn't cause an infinite loop.
4870 assert_eq!(
4871 workspace_a.read_with(cx_a, |workspace, cx| workspace
4872 .active_item(cx)
4873 .unwrap()
4874 .project_path(cx)),
4875 Some((worktree_id, "3.txt").into())
4876 );
4877 workspace_a.update(cx_a, |workspace, cx| {
4878 assert_eq!(
4879 workspace.active_item(cx).unwrap().project_path(cx),
4880 Some((worktree_id, "3.txt").into())
4881 );
4882 workspace.activate_next_pane(cx);
4883 assert_eq!(
4884 workspace.active_item(cx).unwrap().project_path(cx),
4885 Some((worktree_id, "4.txt").into())
4886 );
4887 });
4888 workspace_b.update(cx_b, |workspace, cx| {
4889 assert_eq!(
4890 workspace.active_item(cx).unwrap().project_path(cx),
4891 Some((worktree_id, "4.txt").into())
4892 );
4893 workspace.activate_next_pane(cx);
4894 assert_eq!(
4895 workspace.active_item(cx).unwrap().project_path(cx),
4896 Some((worktree_id, "3.txt").into())
4897 );
4898 });
4899 }
4900
4901 #[gpui::test(iterations = 10)]
4902 async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4903 cx_a.foreground().forbid_parking();
4904 let fs = FakeFs::new(cx_a.background());
4905
4906 // 2 clients connect to a server.
4907 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4908 let mut client_a = server.create_client(cx_a, "user_a").await;
4909 let mut client_b = server.create_client(cx_b, "user_b").await;
4910 cx_a.update(editor::init);
4911 cx_b.update(editor::init);
4912
4913 // Client A shares a project.
4914 fs.insert_tree(
4915 "/a",
4916 json!({
4917 ".zed.toml": r#"collaborators = ["user_b"]"#,
4918 "1.txt": "one",
4919 "2.txt": "two",
4920 "3.txt": "three",
4921 }),
4922 )
4923 .await;
4924 let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
4925 project_a
4926 .update(cx_a, |project, cx| project.share(cx))
4927 .await
4928 .unwrap();
4929
4930 // Client B joins the project.
4931 let project_b = client_b
4932 .build_remote_project(
4933 project_a
4934 .read_with(cx_a, |project, _| project.remote_id())
4935 .unwrap(),
4936 cx_b,
4937 )
4938 .await;
4939
4940 // Client A opens some editors.
4941 let workspace_a = client_a.build_workspace(&project_a, cx_a);
4942 let _editor_a1 = workspace_a
4943 .update(cx_a, |workspace, cx| {
4944 workspace.open_path((worktree_id, "1.txt"), cx)
4945 })
4946 .await
4947 .unwrap()
4948 .downcast::<Editor>()
4949 .unwrap();
4950
4951 // Client B starts following client A.
4952 let workspace_b = client_b.build_workspace(&project_b, cx_b);
4953 let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
4954 let leader_id = project_b.read_with(cx_b, |project, _| {
4955 project.collaborators().values().next().unwrap().peer_id
4956 });
4957 workspace_b
4958 .update(cx_b, |workspace, cx| {
4959 workspace
4960 .toggle_follow(&ToggleFollow(leader_id), cx)
4961 .unwrap()
4962 })
4963 .await
4964 .unwrap();
4965 assert_eq!(
4966 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
4967 Some(leader_id)
4968 );
4969 let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
4970 workspace
4971 .active_item(cx)
4972 .unwrap()
4973 .downcast::<Editor>()
4974 .unwrap()
4975 });
4976
4977 // When client B moves, it automatically stops following client A.
4978 editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
4979 assert_eq!(
4980 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
4981 None
4982 );
4983
4984 workspace_b
4985 .update(cx_b, |workspace, cx| {
4986 workspace
4987 .toggle_follow(&ToggleFollow(leader_id), cx)
4988 .unwrap()
4989 })
4990 .await
4991 .unwrap();
4992 assert_eq!(
4993 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
4994 Some(leader_id)
4995 );
4996
4997 // When client B edits, it automatically stops following client A.
4998 editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
4999 assert_eq!(
5000 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5001 None
5002 );
5003
5004 workspace_b
5005 .update(cx_b, |workspace, cx| {
5006 workspace
5007 .toggle_follow(&ToggleFollow(leader_id), cx)
5008 .unwrap()
5009 })
5010 .await
5011 .unwrap();
5012 assert_eq!(
5013 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5014 Some(leader_id)
5015 );
5016
5017 // When client B scrolls, it automatically stops following client A.
5018 editor_b2.update(cx_b, |editor, cx| {
5019 editor.set_scroll_position(vec2f(0., 3.), cx)
5020 });
5021 assert_eq!(
5022 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5023 None
5024 );
5025
5026 workspace_b
5027 .update(cx_b, |workspace, cx| {
5028 workspace
5029 .toggle_follow(&ToggleFollow(leader_id), cx)
5030 .unwrap()
5031 })
5032 .await
5033 .unwrap();
5034 assert_eq!(
5035 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5036 Some(leader_id)
5037 );
5038
5039 // When client B activates a different pane, it continues following client A in the original pane.
5040 workspace_b.update(cx_b, |workspace, cx| {
5041 workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
5042 });
5043 assert_eq!(
5044 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5045 Some(leader_id)
5046 );
5047
5048 workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
5049 assert_eq!(
5050 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5051 Some(leader_id)
5052 );
5053
5054 // When client B activates a different item in the original pane, it automatically stops following client A.
5055 workspace_b
5056 .update(cx_b, |workspace, cx| {
5057 workspace.open_path((worktree_id, "2.txt"), cx)
5058 })
5059 .await
5060 .unwrap();
5061 assert_eq!(
5062 workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5063 None
5064 );
5065 }
5066
5067 #[gpui::test(iterations = 100)]
5068 async fn test_random_collaboration(
5069 cx: &mut TestAppContext,
5070 deterministic: Arc<Deterministic>,
5071 rng: StdRng,
5072 ) {
5073 cx.foreground().forbid_parking();
5074 let max_peers = env::var("MAX_PEERS")
5075 .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
5076 .unwrap_or(5);
5077 assert!(max_peers <= 5);
5078
5079 let max_operations = env::var("OPERATIONS")
5080 .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
5081 .unwrap_or(10);
5082
5083 let rng = Arc::new(Mutex::new(rng));
5084
5085 let guest_lang_registry = Arc::new(LanguageRegistry::test());
5086 let host_language_registry = Arc::new(LanguageRegistry::test());
5087
5088 let fs = FakeFs::new(cx.background());
5089 fs.insert_tree(
5090 "/_collab",
5091 json!({
5092 ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4"]"#
5093 }),
5094 )
5095 .await;
5096
5097 let mut server = TestServer::start(cx.foreground(), cx.background()).await;
5098 let mut clients = Vec::new();
5099 let mut user_ids = Vec::new();
5100 let mut op_start_signals = Vec::new();
5101 let files = Arc::new(Mutex::new(Vec::new()));
5102
5103 let mut next_entity_id = 100000;
5104 let mut host_cx = TestAppContext::new(
5105 cx.foreground_platform(),
5106 cx.platform(),
5107 deterministic.build_foreground(next_entity_id),
5108 deterministic.build_background(),
5109 cx.font_cache(),
5110 cx.leak_detector(),
5111 next_entity_id,
5112 );
5113 let host = server.create_client(&mut host_cx, "host").await;
5114 let host_project = host_cx.update(|cx| {
5115 Project::local(
5116 host.client.clone(),
5117 host.user_store.clone(),
5118 host_language_registry.clone(),
5119 fs.clone(),
5120 cx,
5121 )
5122 });
5123 let host_project_id = host_project
5124 .update(&mut host_cx, |p, _| p.next_remote_id())
5125 .await;
5126
5127 let (collab_worktree, _) = host_project
5128 .update(&mut host_cx, |project, cx| {
5129 project.find_or_create_local_worktree("/_collab", true, cx)
5130 })
5131 .await
5132 .unwrap();
5133 collab_worktree
5134 .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
5135 .await;
5136 host_project
5137 .update(&mut host_cx, |project, cx| project.share(cx))
5138 .await
5139 .unwrap();
5140
5141 // Set up fake language servers.
5142 let mut language = Language::new(
5143 LanguageConfig {
5144 name: "Rust".into(),
5145 path_suffixes: vec!["rs".to_string()],
5146 ..Default::default()
5147 },
5148 None,
5149 );
5150 let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
5151 name: "the-fake-language-server",
5152 capabilities: lsp::LanguageServer::full_capabilities(),
5153 initializer: Some(Box::new({
5154 let rng = rng.clone();
5155 let files = files.clone();
5156 let project = host_project.downgrade();
5157 move |fake_server: &mut FakeLanguageServer| {
5158 fake_server.handle_request::<lsp::request::Completion, _, _>(
5159 |_, _| async move {
5160 Ok(Some(lsp::CompletionResponse::Array(vec![
5161 lsp::CompletionItem {
5162 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
5163 range: lsp::Range::new(
5164 lsp::Position::new(0, 0),
5165 lsp::Position::new(0, 0),
5166 ),
5167 new_text: "the-new-text".to_string(),
5168 })),
5169 ..Default::default()
5170 },
5171 ])))
5172 },
5173 );
5174
5175 fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
5176 |_, _| async move {
5177 Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
5178 lsp::CodeAction {
5179 title: "the-code-action".to_string(),
5180 ..Default::default()
5181 },
5182 )]))
5183 },
5184 );
5185
5186 fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
5187 |params, _| async move {
5188 Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
5189 params.position,
5190 params.position,
5191 ))))
5192 },
5193 );
5194
5195 fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
5196 let files = files.clone();
5197 let rng = rng.clone();
5198 move |_, _| {
5199 let files = files.clone();
5200 let rng = rng.clone();
5201 async move {
5202 let files = files.lock();
5203 let mut rng = rng.lock();
5204 let count = rng.gen_range::<usize, _>(1..3);
5205 let files = (0..count)
5206 .map(|_| files.choose(&mut *rng).unwrap())
5207 .collect::<Vec<_>>();
5208 log::info!("LSP: Returning definitions in files {:?}", &files);
5209 Ok(Some(lsp::GotoDefinitionResponse::Array(
5210 files
5211 .into_iter()
5212 .map(|file| lsp::Location {
5213 uri: lsp::Url::from_file_path(file).unwrap(),
5214 range: Default::default(),
5215 })
5216 .collect(),
5217 )))
5218 }
5219 }
5220 });
5221
5222 fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
5223 let rng = rng.clone();
5224 let project = project.clone();
5225 move |params, mut cx| {
5226 let highlights = if let Some(project) = project.upgrade(&cx) {
5227 project.update(&mut cx, |project, cx| {
5228 let path = params
5229 .text_document_position_params
5230 .text_document
5231 .uri
5232 .to_file_path()
5233 .unwrap();
5234 let (worktree, relative_path) =
5235 project.find_local_worktree(&path, cx)?;
5236 let project_path =
5237 ProjectPath::from((worktree.read(cx).id(), relative_path));
5238 let buffer =
5239 project.get_open_buffer(&project_path, cx)?.read(cx);
5240
5241 let mut highlights = Vec::new();
5242 let highlight_count = rng.lock().gen_range(1..=5);
5243 let mut prev_end = 0;
5244 for _ in 0..highlight_count {
5245 let range =
5246 buffer.random_byte_range(prev_end, &mut *rng.lock());
5247
5248 highlights.push(lsp::DocumentHighlight {
5249 range: range_to_lsp(range.to_point_utf16(buffer)),
5250 kind: Some(lsp::DocumentHighlightKind::READ),
5251 });
5252 prev_end = range.end;
5253 }
5254 Some(highlights)
5255 })
5256 } else {
5257 None
5258 };
5259 async move { Ok(highlights) }
5260 }
5261 });
5262 }
5263 })),
5264 ..Default::default()
5265 });
5266 host_language_registry.add(Arc::new(language));
5267
5268 let op_start_signal = futures::channel::mpsc::unbounded();
5269 user_ids.push(host.current_user_id(&host_cx));
5270 op_start_signals.push(op_start_signal.0);
5271 clients.push(host_cx.foreground().spawn(host.simulate_host(
5272 host_project,
5273 files,
5274 op_start_signal.1,
5275 rng.clone(),
5276 host_cx,
5277 )));
5278
5279 let disconnect_host_at = if rng.lock().gen_bool(0.2) {
5280 rng.lock().gen_range(0..max_operations)
5281 } else {
5282 max_operations
5283 };
5284 let mut available_guests = vec![
5285 "guest-1".to_string(),
5286 "guest-2".to_string(),
5287 "guest-3".to_string(),
5288 "guest-4".to_string(),
5289 ];
5290 let mut operations = 0;
5291 while operations < max_operations {
5292 if operations == disconnect_host_at {
5293 server.disconnect_client(user_ids[0]);
5294 cx.foreground().advance_clock(RECEIVE_TIMEOUT);
5295 drop(op_start_signals);
5296 let mut clients = futures::future::join_all(clients).await;
5297 cx.foreground().run_until_parked();
5298
5299 let (host, mut host_cx, host_err) = clients.remove(0);
5300 if let Some(host_err) = host_err {
5301 log::error!("host error - {}", host_err);
5302 }
5303 host.project
5304 .as_ref()
5305 .unwrap()
5306 .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
5307 for (guest, mut guest_cx, guest_err) in clients {
5308 if let Some(guest_err) = guest_err {
5309 log::error!("{} error - {}", guest.username, guest_err);
5310 }
5311 let contacts = server
5312 .store
5313 .read()
5314 .await
5315 .contacts_for_user(guest.current_user_id(&guest_cx));
5316 assert!(!contacts
5317 .iter()
5318 .flat_map(|contact| &contact.projects)
5319 .any(|project| project.id == host_project_id));
5320 guest
5321 .project
5322 .as_ref()
5323 .unwrap()
5324 .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
5325 guest_cx.update(|_| drop(guest));
5326 }
5327 host_cx.update(|_| drop(host));
5328
5329 return;
5330 }
5331
5332 let distribution = rng.lock().gen_range(0..100);
5333 match distribution {
5334 0..=19 if !available_guests.is_empty() => {
5335 let guest_ix = rng.lock().gen_range(0..available_guests.len());
5336 let guest_username = available_guests.remove(guest_ix);
5337 log::info!("Adding new connection for {}", guest_username);
5338 next_entity_id += 100000;
5339 let mut guest_cx = TestAppContext::new(
5340 cx.foreground_platform(),
5341 cx.platform(),
5342 deterministic.build_foreground(next_entity_id),
5343 deterministic.build_background(),
5344 cx.font_cache(),
5345 cx.leak_detector(),
5346 next_entity_id,
5347 );
5348 let guest = server.create_client(&mut guest_cx, &guest_username).await;
5349 let guest_project = Project::remote(
5350 host_project_id,
5351 guest.client.clone(),
5352 guest.user_store.clone(),
5353 guest_lang_registry.clone(),
5354 FakeFs::new(cx.background()),
5355 &mut guest_cx.to_async(),
5356 )
5357 .await
5358 .unwrap();
5359 let op_start_signal = futures::channel::mpsc::unbounded();
5360 user_ids.push(guest.current_user_id(&guest_cx));
5361 op_start_signals.push(op_start_signal.0);
5362 clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
5363 guest_username.clone(),
5364 guest_project,
5365 op_start_signal.1,
5366 rng.clone(),
5367 guest_cx,
5368 )));
5369
5370 log::info!("Added connection for {}", guest_username);
5371 operations += 1;
5372 }
5373 20..=30 if clients.len() > 1 => {
5374 log::info!("Removing guest");
5375 let guest_ix = rng.lock().gen_range(1..clients.len());
5376 let removed_guest_id = user_ids.remove(guest_ix);
5377 let guest = clients.remove(guest_ix);
5378 op_start_signals.remove(guest_ix);
5379 server.disconnect_client(removed_guest_id);
5380 cx.foreground().advance_clock(RECEIVE_TIMEOUT);
5381 let (guest, mut guest_cx, guest_err) = guest.await;
5382 if let Some(guest_err) = guest_err {
5383 log::error!("{} error - {}", guest.username, guest_err);
5384 }
5385 guest
5386 .project
5387 .as_ref()
5388 .unwrap()
5389 .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
5390 for user_id in &user_ids {
5391 for contact in server.store.read().await.contacts_for_user(*user_id) {
5392 assert_ne!(
5393 contact.user_id, removed_guest_id.0 as u64,
5394 "removed guest is still a contact of another peer"
5395 );
5396 for project in contact.projects {
5397 for project_guest_id in project.guests {
5398 assert_ne!(
5399 project_guest_id, removed_guest_id.0 as u64,
5400 "removed guest appears as still participating on a project"
5401 );
5402 }
5403 }
5404 }
5405 }
5406
5407 log::info!("{} removed", guest.username);
5408 available_guests.push(guest.username.clone());
5409 guest_cx.update(|_| drop(guest));
5410
5411 operations += 1;
5412 }
5413 _ => {
5414 while operations < max_operations && rng.lock().gen_bool(0.7) {
5415 op_start_signals
5416 .choose(&mut *rng.lock())
5417 .unwrap()
5418 .unbounded_send(())
5419 .unwrap();
5420 operations += 1;
5421 }
5422
5423 if rng.lock().gen_bool(0.8) {
5424 cx.foreground().run_until_parked();
5425 }
5426 }
5427 }
5428 }
5429
5430 drop(op_start_signals);
5431 let mut clients = futures::future::join_all(clients).await;
5432 cx.foreground().run_until_parked();
5433
5434 let (host_client, mut host_cx, host_err) = clients.remove(0);
5435 if let Some(host_err) = host_err {
5436 panic!("host error - {}", host_err);
5437 }
5438 let host_project = host_client.project.as_ref().unwrap();
5439 let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
5440 project
5441 .worktrees(cx)
5442 .map(|worktree| {
5443 let snapshot = worktree.read(cx).snapshot();
5444 (snapshot.id(), snapshot)
5445 })
5446 .collect::<BTreeMap<_, _>>()
5447 });
5448
5449 host_client
5450 .project
5451 .as_ref()
5452 .unwrap()
5453 .read_with(&host_cx, |project, cx| project.check_invariants(cx));
5454
5455 for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
5456 if let Some(guest_err) = guest_err {
5457 panic!("{} error - {}", guest_client.username, guest_err);
5458 }
5459 let worktree_snapshots =
5460 guest_client
5461 .project
5462 .as_ref()
5463 .unwrap()
5464 .read_with(&guest_cx, |project, cx| {
5465 project
5466 .worktrees(cx)
5467 .map(|worktree| {
5468 let worktree = worktree.read(cx);
5469 (worktree.id(), worktree.snapshot())
5470 })
5471 .collect::<BTreeMap<_, _>>()
5472 });
5473
5474 assert_eq!(
5475 worktree_snapshots.keys().collect::<Vec<_>>(),
5476 host_worktree_snapshots.keys().collect::<Vec<_>>(),
5477 "{} has different worktrees than the host",
5478 guest_client.username
5479 );
5480 for (id, host_snapshot) in &host_worktree_snapshots {
5481 let guest_snapshot = &worktree_snapshots[id];
5482 assert_eq!(
5483 guest_snapshot.root_name(),
5484 host_snapshot.root_name(),
5485 "{} has different root name than the host for worktree {}",
5486 guest_client.username,
5487 id
5488 );
5489 assert_eq!(
5490 guest_snapshot.entries(false).collect::<Vec<_>>(),
5491 host_snapshot.entries(false).collect::<Vec<_>>(),
5492 "{} has different snapshot than the host for worktree {}",
5493 guest_client.username,
5494 id
5495 );
5496 }
5497
5498 guest_client
5499 .project
5500 .as_ref()
5501 .unwrap()
5502 .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
5503
5504 for guest_buffer in &guest_client.buffers {
5505 let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
5506 let host_buffer = host_project.read_with(&host_cx, |project, cx| {
5507 project.buffer_for_id(buffer_id, cx).expect(&format!(
5508 "host does not have buffer for guest:{}, peer:{}, id:{}",
5509 guest_client.username, guest_client.peer_id, buffer_id
5510 ))
5511 });
5512 let path = host_buffer
5513 .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
5514
5515 assert_eq!(
5516 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
5517 0,
5518 "{}, buffer {}, path {:?} has deferred operations",
5519 guest_client.username,
5520 buffer_id,
5521 path,
5522 );
5523 assert_eq!(
5524 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
5525 host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
5526 "{}, buffer {}, path {:?}, differs from the host's buffer",
5527 guest_client.username,
5528 buffer_id,
5529 path
5530 );
5531 }
5532
5533 guest_cx.update(|_| drop(guest_client));
5534 }
5535
5536 host_cx.update(|_| drop(host_client));
5537 }
5538
5539 struct TestServer {
5540 peer: Arc<Peer>,
5541 app_state: Arc<AppState>,
5542 server: Arc<Server>,
5543 foreground: Rc<executor::Foreground>,
5544 notifications: mpsc::UnboundedReceiver<()>,
5545 connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
5546 forbid_connections: Arc<AtomicBool>,
5547 _test_db: TestDb,
5548 }
5549
5550 impl TestServer {
5551 async fn start(
5552 foreground: Rc<executor::Foreground>,
5553 background: Arc<executor::Background>,
5554 ) -> Self {
5555 let test_db = TestDb::fake(background);
5556 let app_state = Self::build_app_state(&test_db).await;
5557 let peer = Peer::new();
5558 let notifications = mpsc::unbounded();
5559 let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
5560 Self {
5561 peer,
5562 app_state,
5563 server,
5564 foreground,
5565 notifications: notifications.1,
5566 connection_killers: Default::default(),
5567 forbid_connections: Default::default(),
5568 _test_db: test_db,
5569 }
5570 }
5571
5572 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
5573 cx.update(|cx| {
5574 let settings = Settings::test(cx);
5575 cx.set_global(settings);
5576 });
5577
5578 let http = FakeHttpClient::with_404_response();
5579 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
5580 let client_name = name.to_string();
5581 let mut client = Client::new(http.clone());
5582 let server = self.server.clone();
5583 let connection_killers = self.connection_killers.clone();
5584 let forbid_connections = self.forbid_connections.clone();
5585 let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
5586
5587 Arc::get_mut(&mut client)
5588 .unwrap()
5589 .override_authenticate(move |cx| {
5590 cx.spawn(|_| async move {
5591 let access_token = "the-token".to_string();
5592 Ok(Credentials {
5593 user_id: user_id.0 as u64,
5594 access_token,
5595 })
5596 })
5597 })
5598 .override_establish_connection(move |credentials, cx| {
5599 assert_eq!(credentials.user_id, user_id.0 as u64);
5600 assert_eq!(credentials.access_token, "the-token");
5601
5602 let server = server.clone();
5603 let connection_killers = connection_killers.clone();
5604 let forbid_connections = forbid_connections.clone();
5605 let client_name = client_name.clone();
5606 let connection_id_tx = connection_id_tx.clone();
5607 cx.spawn(move |cx| async move {
5608 if forbid_connections.load(SeqCst) {
5609 Err(EstablishConnectionError::other(anyhow!(
5610 "server is forbidding connections"
5611 )))
5612 } else {
5613 let (client_conn, server_conn, killed) =
5614 Connection::in_memory(cx.background());
5615 connection_killers.lock().insert(user_id, killed);
5616 cx.background()
5617 .spawn(server.handle_connection(
5618 server_conn,
5619 client_name,
5620 user_id,
5621 Some(connection_id_tx),
5622 cx.background(),
5623 ))
5624 .detach();
5625 Ok(client_conn)
5626 }
5627 })
5628 });
5629
5630 client
5631 .authenticate_and_connect(false, &cx.to_async())
5632 .await
5633 .unwrap();
5634
5635 Channel::init(&client);
5636 Project::init(&client);
5637 cx.update(|cx| {
5638 workspace::init(&client, cx);
5639 });
5640
5641 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
5642 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
5643
5644 let client = TestClient {
5645 client,
5646 peer_id,
5647 username: name.to_string(),
5648 user_store,
5649 language_registry: Arc::new(LanguageRegistry::test()),
5650 project: Default::default(),
5651 buffers: Default::default(),
5652 };
5653 client.wait_for_current_user(cx).await;
5654 client
5655 }
5656
5657 fn disconnect_client(&self, user_id: UserId) {
5658 self.connection_killers
5659 .lock()
5660 .remove(&user_id)
5661 .unwrap()
5662 .store(true, SeqCst);
5663 }
5664
5665 fn forbid_connections(&self) {
5666 self.forbid_connections.store(true, SeqCst);
5667 }
5668
5669 fn allow_connections(&self) {
5670 self.forbid_connections.store(false, SeqCst);
5671 }
5672
5673 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
5674 let mut config = Config::default();
5675 config.session_secret = "a".repeat(32);
5676 config.database_url = test_db.url.clone();
5677 let github_client = github::AppClient::test();
5678 Arc::new(AppState {
5679 db: test_db.db().clone(),
5680 handlebars: Default::default(),
5681 auth_client: auth::build_client("", ""),
5682 repo_client: github::RepoClient::test(&github_client),
5683 github_client,
5684 config,
5685 })
5686 }
5687
5688 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
5689 self.server.store.read().await
5690 }
5691
5692 async fn condition<F>(&mut self, mut predicate: F)
5693 where
5694 F: FnMut(&Store) -> bool,
5695 {
5696 async_std::future::timeout(Duration::from_millis(500), async {
5697 while !(predicate)(&*self.server.store.read().await) {
5698 self.foreground.start_waiting();
5699 self.notifications.next().await;
5700 self.foreground.finish_waiting();
5701 }
5702 })
5703 .await
5704 .expect("condition timed out");
5705 }
5706 }
5707
5708 impl Deref for TestServer {
5709 type Target = Server;
5710
5711 fn deref(&self) -> &Self::Target {
5712 &self.server
5713 }
5714 }
5715
5716 impl Drop for TestServer {
5717 fn drop(&mut self) {
5718 self.peer.reset();
5719 }
5720 }
5721
5722 struct TestClient {
5723 client: Arc<Client>,
5724 username: String,
5725 pub peer_id: PeerId,
5726 pub user_store: ModelHandle<UserStore>,
5727 language_registry: Arc<LanguageRegistry>,
5728 project: Option<ModelHandle<Project>>,
5729 buffers: HashSet<ModelHandle<language::Buffer>>,
5730 }
5731
5732 impl Deref for TestClient {
5733 type Target = Arc<Client>;
5734
5735 fn deref(&self) -> &Self::Target {
5736 &self.client
5737 }
5738 }
5739
5740 impl TestClient {
5741 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
5742 UserId::from_proto(
5743 self.user_store
5744 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
5745 )
5746 }
5747
5748 async fn wait_for_current_user(&self, cx: &TestAppContext) {
5749 let mut authed_user = self
5750 .user_store
5751 .read_with(cx, |user_store, _| user_store.watch_current_user());
5752 while authed_user.next().await.unwrap().is_none() {}
5753 }
5754
5755 async fn build_local_project(
5756 &mut self,
5757 fs: Arc<FakeFs>,
5758 root_path: impl AsRef<Path>,
5759 cx: &mut TestAppContext,
5760 ) -> (ModelHandle<Project>, WorktreeId) {
5761 let project = cx.update(|cx| {
5762 Project::local(
5763 self.client.clone(),
5764 self.user_store.clone(),
5765 self.language_registry.clone(),
5766 fs,
5767 cx,
5768 )
5769 });
5770 self.project = Some(project.clone());
5771 let (worktree, _) = project
5772 .update(cx, |p, cx| {
5773 p.find_or_create_local_worktree(root_path, true, cx)
5774 })
5775 .await
5776 .unwrap();
5777 worktree
5778 .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
5779 .await;
5780 project
5781 .update(cx, |project, _| project.next_remote_id())
5782 .await;
5783 (project, worktree.read_with(cx, |tree, _| tree.id()))
5784 }
5785
5786 async fn build_remote_project(
5787 &mut self,
5788 project_id: u64,
5789 cx: &mut TestAppContext,
5790 ) -> ModelHandle<Project> {
5791 let project = Project::remote(
5792 project_id,
5793 self.client.clone(),
5794 self.user_store.clone(),
5795 self.language_registry.clone(),
5796 FakeFs::new(cx.background()),
5797 &mut cx.to_async(),
5798 )
5799 .await
5800 .unwrap();
5801 self.project = Some(project.clone());
5802 project
5803 }
5804
5805 fn build_workspace(
5806 &self,
5807 project: &ModelHandle<Project>,
5808 cx: &mut TestAppContext,
5809 ) -> ViewHandle<Workspace> {
5810 let (window_id, _) = cx.add_window(|_| EmptyView);
5811 cx.add_view(window_id, |cx| {
5812 let fs = project.read(cx).fs().clone();
5813 Workspace::new(
5814 &WorkspaceParams {
5815 fs,
5816 project: project.clone(),
5817 user_store: self.user_store.clone(),
5818 languages: self.language_registry.clone(),
5819 channel_list: cx.add_model(|cx| {
5820 ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
5821 }),
5822 client: self.client.clone(),
5823 },
5824 cx,
5825 )
5826 })
5827 }
5828
5829 async fn simulate_host(
5830 mut self,
5831 project: ModelHandle<Project>,
5832 files: Arc<Mutex<Vec<PathBuf>>>,
5833 op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
5834 rng: Arc<Mutex<StdRng>>,
5835 mut cx: TestAppContext,
5836 ) -> (Self, TestAppContext, Option<anyhow::Error>) {
5837 async fn simulate_host_internal(
5838 client: &mut TestClient,
5839 project: ModelHandle<Project>,
5840 files: Arc<Mutex<Vec<PathBuf>>>,
5841 mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
5842 rng: Arc<Mutex<StdRng>>,
5843 cx: &mut TestAppContext,
5844 ) -> anyhow::Result<()> {
5845 let fs = project.read_with(cx, |project, _| project.fs().clone());
5846
5847 while op_start_signal.next().await.is_some() {
5848 let distribution = rng.lock().gen_range::<usize, _>(0..100);
5849 match distribution {
5850 0..=20 if !files.lock().is_empty() => {
5851 let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
5852 let mut path = path.as_path();
5853 while let Some(parent_path) = path.parent() {
5854 path = parent_path;
5855 if rng.lock().gen() {
5856 break;
5857 }
5858 }
5859
5860 log::info!("Host: find/create local worktree {:?}", path);
5861 let find_or_create_worktree = project.update(cx, |project, cx| {
5862 project.find_or_create_local_worktree(path, true, cx)
5863 });
5864 if rng.lock().gen() {
5865 cx.background().spawn(find_or_create_worktree).detach();
5866 } else {
5867 find_or_create_worktree.await?;
5868 }
5869 }
5870 10..=80 if !files.lock().is_empty() => {
5871 let buffer = if client.buffers.is_empty() || rng.lock().gen() {
5872 let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
5873 let (worktree, path) = project
5874 .update(cx, |project, cx| {
5875 project.find_or_create_local_worktree(
5876 file.clone(),
5877 true,
5878 cx,
5879 )
5880 })
5881 .await?;
5882 let project_path =
5883 worktree.read_with(cx, |worktree, _| (worktree.id(), path));
5884 log::info!(
5885 "Host: opening path {:?}, worktree {}, relative_path {:?}",
5886 file,
5887 project_path.0,
5888 project_path.1
5889 );
5890 let buffer = project
5891 .update(cx, |project, cx| project.open_buffer(project_path, cx))
5892 .await
5893 .unwrap();
5894 client.buffers.insert(buffer.clone());
5895 buffer
5896 } else {
5897 client
5898 .buffers
5899 .iter()
5900 .choose(&mut *rng.lock())
5901 .unwrap()
5902 .clone()
5903 };
5904
5905 if rng.lock().gen_bool(0.1) {
5906 cx.update(|cx| {
5907 log::info!(
5908 "Host: dropping buffer {:?}",
5909 buffer.read(cx).file().unwrap().full_path(cx)
5910 );
5911 client.buffers.remove(&buffer);
5912 drop(buffer);
5913 });
5914 } else {
5915 buffer.update(cx, |buffer, cx| {
5916 log::info!(
5917 "Host: updating buffer {:?} ({})",
5918 buffer.file().unwrap().full_path(cx),
5919 buffer.remote_id()
5920 );
5921
5922 if rng.lock().gen_bool(0.7) {
5923 buffer.randomly_edit(&mut *rng.lock(), 5, cx);
5924 } else {
5925 buffer.randomly_undo_redo(&mut *rng.lock(), cx);
5926 }
5927 });
5928 }
5929 }
5930 _ => loop {
5931 let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
5932 let mut path = PathBuf::new();
5933 path.push("/");
5934 for _ in 0..path_component_count {
5935 let letter = rng.lock().gen_range(b'a'..=b'z');
5936 path.push(std::str::from_utf8(&[letter]).unwrap());
5937 }
5938 path.set_extension("rs");
5939 let parent_path = path.parent().unwrap();
5940
5941 log::info!("Host: creating file {:?}", path,);
5942
5943 if fs.create_dir(&parent_path).await.is_ok()
5944 && fs.create_file(&path, Default::default()).await.is_ok()
5945 {
5946 files.lock().push(path);
5947 break;
5948 } else {
5949 log::info!("Host: cannot create file");
5950 }
5951 },
5952 }
5953
5954 cx.background().simulate_random_delay().await;
5955 }
5956
5957 Ok(())
5958 }
5959
5960 let result = simulate_host_internal(
5961 &mut self,
5962 project.clone(),
5963 files,
5964 op_start_signal,
5965 rng,
5966 &mut cx,
5967 )
5968 .await;
5969 log::info!("Host done");
5970 self.project = Some(project);
5971 (self, cx, result.err())
5972 }
5973
5974 pub async fn simulate_guest(
5975 mut self,
5976 guest_username: String,
5977 project: ModelHandle<Project>,
5978 op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
5979 rng: Arc<Mutex<StdRng>>,
5980 mut cx: TestAppContext,
5981 ) -> (Self, TestAppContext, Option<anyhow::Error>) {
5982 async fn simulate_guest_internal(
5983 client: &mut TestClient,
5984 guest_username: &str,
5985 project: ModelHandle<Project>,
5986 mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
5987 rng: Arc<Mutex<StdRng>>,
5988 cx: &mut TestAppContext,
5989 ) -> anyhow::Result<()> {
5990 while op_start_signal.next().await.is_some() {
5991 let buffer = if client.buffers.is_empty() || rng.lock().gen() {
5992 let worktree = if let Some(worktree) =
5993 project.read_with(cx, |project, cx| {
5994 project
5995 .worktrees(&cx)
5996 .filter(|worktree| {
5997 let worktree = worktree.read(cx);
5998 worktree.is_visible()
5999 && worktree.entries(false).any(|e| e.is_file())
6000 })
6001 .choose(&mut *rng.lock())
6002 }) {
6003 worktree
6004 } else {
6005 cx.background().simulate_random_delay().await;
6006 continue;
6007 };
6008
6009 let (worktree_root_name, project_path) =
6010 worktree.read_with(cx, |worktree, _| {
6011 let entry = worktree
6012 .entries(false)
6013 .filter(|e| e.is_file())
6014 .choose(&mut *rng.lock())
6015 .unwrap();
6016 (
6017 worktree.root_name().to_string(),
6018 (worktree.id(), entry.path.clone()),
6019 )
6020 });
6021 log::info!(
6022 "{}: opening path {:?} in worktree {} ({})",
6023 guest_username,
6024 project_path.1,
6025 project_path.0,
6026 worktree_root_name,
6027 );
6028 let buffer = project
6029 .update(cx, |project, cx| {
6030 project.open_buffer(project_path.clone(), cx)
6031 })
6032 .await?;
6033 log::info!(
6034 "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
6035 guest_username,
6036 project_path.1,
6037 project_path.0,
6038 worktree_root_name,
6039 buffer.read_with(cx, |buffer, _| buffer.remote_id())
6040 );
6041 client.buffers.insert(buffer.clone());
6042 buffer
6043 } else {
6044 client
6045 .buffers
6046 .iter()
6047 .choose(&mut *rng.lock())
6048 .unwrap()
6049 .clone()
6050 };
6051
6052 let choice = rng.lock().gen_range(0..100);
6053 match choice {
6054 0..=9 => {
6055 cx.update(|cx| {
6056 log::info!(
6057 "{}: dropping buffer {:?}",
6058 guest_username,
6059 buffer.read(cx).file().unwrap().full_path(cx)
6060 );
6061 client.buffers.remove(&buffer);
6062 drop(buffer);
6063 });
6064 }
6065 10..=19 => {
6066 let completions = project.update(cx, |project, cx| {
6067 log::info!(
6068 "{}: requesting completions for buffer {} ({:?})",
6069 guest_username,
6070 buffer.read(cx).remote_id(),
6071 buffer.read(cx).file().unwrap().full_path(cx)
6072 );
6073 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6074 project.completions(&buffer, offset, cx)
6075 });
6076 let completions = cx.background().spawn(async move {
6077 completions
6078 .await
6079 .map_err(|err| anyhow!("completions request failed: {:?}", err))
6080 });
6081 if rng.lock().gen_bool(0.3) {
6082 log::info!("{}: detaching completions request", guest_username);
6083 cx.update(|cx| completions.detach_and_log_err(cx));
6084 } else {
6085 completions.await?;
6086 }
6087 }
6088 20..=29 => {
6089 let code_actions = project.update(cx, |project, cx| {
6090 log::info!(
6091 "{}: requesting code actions for buffer {} ({:?})",
6092 guest_username,
6093 buffer.read(cx).remote_id(),
6094 buffer.read(cx).file().unwrap().full_path(cx)
6095 );
6096 let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
6097 project.code_actions(&buffer, range, cx)
6098 });
6099 let code_actions = cx.background().spawn(async move {
6100 code_actions.await.map_err(|err| {
6101 anyhow!("code actions request failed: {:?}", err)
6102 })
6103 });
6104 if rng.lock().gen_bool(0.3) {
6105 log::info!("{}: detaching code actions request", guest_username);
6106 cx.update(|cx| code_actions.detach_and_log_err(cx));
6107 } else {
6108 code_actions.await?;
6109 }
6110 }
6111 30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
6112 let (requested_version, save) = buffer.update(cx, |buffer, cx| {
6113 log::info!(
6114 "{}: saving buffer {} ({:?})",
6115 guest_username,
6116 buffer.remote_id(),
6117 buffer.file().unwrap().full_path(cx)
6118 );
6119 (buffer.version(), buffer.save(cx))
6120 });
6121 let save = cx.background().spawn(async move {
6122 let (saved_version, _) = save
6123 .await
6124 .map_err(|err| anyhow!("save request failed: {:?}", err))?;
6125 assert!(saved_version.observed_all(&requested_version));
6126 Ok::<_, anyhow::Error>(())
6127 });
6128 if rng.lock().gen_bool(0.3) {
6129 log::info!("{}: detaching save request", guest_username);
6130 cx.update(|cx| save.detach_and_log_err(cx));
6131 } else {
6132 save.await?;
6133 }
6134 }
6135 40..=44 => {
6136 let prepare_rename = project.update(cx, |project, cx| {
6137 log::info!(
6138 "{}: preparing rename for buffer {} ({:?})",
6139 guest_username,
6140 buffer.read(cx).remote_id(),
6141 buffer.read(cx).file().unwrap().full_path(cx)
6142 );
6143 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6144 project.prepare_rename(buffer, offset, cx)
6145 });
6146 let prepare_rename = cx.background().spawn(async move {
6147 prepare_rename.await.map_err(|err| {
6148 anyhow!("prepare rename request failed: {:?}", err)
6149 })
6150 });
6151 if rng.lock().gen_bool(0.3) {
6152 log::info!("{}: detaching prepare rename request", guest_username);
6153 cx.update(|cx| prepare_rename.detach_and_log_err(cx));
6154 } else {
6155 prepare_rename.await?;
6156 }
6157 }
6158 45..=49 => {
6159 let definitions = project.update(cx, |project, cx| {
6160 log::info!(
6161 "{}: requesting definitions for buffer {} ({:?})",
6162 guest_username,
6163 buffer.read(cx).remote_id(),
6164 buffer.read(cx).file().unwrap().full_path(cx)
6165 );
6166 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6167 project.definition(&buffer, offset, cx)
6168 });
6169 let definitions = cx.background().spawn(async move {
6170 definitions
6171 .await
6172 .map_err(|err| anyhow!("definitions request failed: {:?}", err))
6173 });
6174 if rng.lock().gen_bool(0.3) {
6175 log::info!("{}: detaching definitions request", guest_username);
6176 cx.update(|cx| definitions.detach_and_log_err(cx));
6177 } else {
6178 client
6179 .buffers
6180 .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
6181 }
6182 }
6183 50..=54 => {
6184 let highlights = project.update(cx, |project, cx| {
6185 log::info!(
6186 "{}: requesting highlights for buffer {} ({:?})",
6187 guest_username,
6188 buffer.read(cx).remote_id(),
6189 buffer.read(cx).file().unwrap().full_path(cx)
6190 );
6191 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6192 project.document_highlights(&buffer, offset, cx)
6193 });
6194 let highlights = cx.background().spawn(async move {
6195 highlights
6196 .await
6197 .map_err(|err| anyhow!("highlights request failed: {:?}", err))
6198 });
6199 if rng.lock().gen_bool(0.3) {
6200 log::info!("{}: detaching highlights request", guest_username);
6201 cx.update(|cx| highlights.detach_and_log_err(cx));
6202 } else {
6203 highlights.await?;
6204 }
6205 }
6206 55..=59 => {
6207 let search = project.update(cx, |project, cx| {
6208 let query = rng.lock().gen_range('a'..='z');
6209 log::info!("{}: project-wide search {:?}", guest_username, query);
6210 project.search(SearchQuery::text(query, false, false), cx)
6211 });
6212 let search = cx.background().spawn(async move {
6213 search
6214 .await
6215 .map_err(|err| anyhow!("search request failed: {:?}", err))
6216 });
6217 if rng.lock().gen_bool(0.3) {
6218 log::info!("{}: detaching search request", guest_username);
6219 cx.update(|cx| search.detach_and_log_err(cx));
6220 } else {
6221 client.buffers.extend(search.await?.into_keys());
6222 }
6223 }
6224 _ => {
6225 buffer.update(cx, |buffer, cx| {
6226 log::info!(
6227 "{}: updating buffer {} ({:?})",
6228 guest_username,
6229 buffer.remote_id(),
6230 buffer.file().unwrap().full_path(cx)
6231 );
6232 if rng.lock().gen_bool(0.7) {
6233 buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6234 } else {
6235 buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6236 }
6237 });
6238 }
6239 }
6240 cx.background().simulate_random_delay().await;
6241 }
6242 Ok(())
6243 }
6244
6245 let result = simulate_guest_internal(
6246 &mut self,
6247 &guest_username,
6248 project.clone(),
6249 op_start_signal,
6250 rng,
6251 &mut cx,
6252 )
6253 .await;
6254 log::info!("{}: done", guest_username);
6255
6256 self.project = Some(project);
6257 (self, cx, result.err())
6258 }
6259 }
6260
6261 impl Drop for TestClient {
6262 fn drop(&mut self) {
6263 self.client.tear_down();
6264 }
6265 }
6266
6267 impl Executor for Arc<gpui::executor::Background> {
6268 type Timer = gpui::executor::Timer;
6269
6270 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
6271 self.spawn(future).detach();
6272 }
6273
6274 fn timer(&self, duration: Duration) -> Self::Timer {
6275 self.as_ref().timer(duration)
6276 }
6277 }
6278
6279 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
6280 channel
6281 .messages()
6282 .cursor::<()>()
6283 .map(|m| {
6284 (
6285 m.sender.github_login.clone(),
6286 m.body.clone(),
6287 m.is_pending(),
6288 )
6289 })
6290 .collect()
6291 }
6292
6293 struct EmptyView;
6294
6295 impl gpui::Entity for EmptyView {
6296 type Event = ();
6297 }
6298
6299 impl gpui::View for EmptyView {
6300 fn ui_name() -> &'static str {
6301 "empty view"
6302 }
6303
6304 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
6305 gpui::Element::boxed(gpui::elements::Empty)
6306 }
6307 }
6308}