1mod store;
2
3use super::{
4 auth::process_auth_header,
5 db::{ChannelId, MessageId, UserId},
6 AppState,
7};
8use anyhow::anyhow;
9use async_std::task;
10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
11use collections::{HashMap, HashSet};
12use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
14use rpc::{
15 proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
16 Connection, ConnectionId, Peer, TypedEnvelope,
17};
18use sha1::{Digest as _, Sha1};
19use std::{any::TypeId, future::Future, sync::Arc, time::Instant};
20use store::{Store, Worktree};
21use surf::StatusCode;
22use tide::log;
23use tide::{
24 http::headers::{HeaderName, CONNECTION, UPGRADE},
25 Request, Response,
26};
27use time::OffsetDateTime;
28
29type MessageHandler = Box<
30 dyn Send
31 + Sync
32 + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
33>;
34
35pub struct Server {
36 peer: Arc<Peer>,
37 store: RwLock<Store>,
38 app_state: Arc<AppState>,
39 handlers: HashMap<TypeId, MessageHandler>,
40 notifications: Option<mpsc::UnboundedSender<()>>,
41}
42
43pub trait Executor {
44 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
45}
46
47pub struct RealExecutor;
48
49const MESSAGE_COUNT_PER_PAGE: usize = 100;
50const MAX_MESSAGE_LEN: usize = 1024;
51
52impl Server {
53 pub fn new(
54 app_state: Arc<AppState>,
55 peer: Arc<Peer>,
56 notifications: Option<mpsc::UnboundedSender<()>>,
57 ) -> Arc<Self> {
58 let mut server = Self {
59 peer,
60 app_state,
61 store: Default::default(),
62 handlers: Default::default(),
63 notifications,
64 };
65
66 server
67 .add_request_handler(Server::ping)
68 .add_request_handler(Server::register_project)
69 .add_message_handler(Server::unregister_project)
70 .add_request_handler(Server::share_project)
71 .add_message_handler(Server::unshare_project)
72 .add_request_handler(Server::join_project)
73 .add_message_handler(Server::leave_project)
74 .add_request_handler(Server::register_worktree)
75 .add_message_handler(Server::unregister_worktree)
76 .add_request_handler(Server::update_worktree)
77 .add_message_handler(Server::update_diagnostic_summary)
78 .add_message_handler(Server::disk_based_diagnostics_updating)
79 .add_message_handler(Server::disk_based_diagnostics_updated)
80 .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
81 .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
82 .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
83 .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
84 .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
85 .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
86 .add_request_handler(Server::forward_project_request::<proto::OpenBuffer>)
87 .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
88 .add_request_handler(
89 Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
90 )
91 .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
92 .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
93 .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
94 .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
95 .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
96 .add_message_handler(Server::close_buffer)
97 .add_request_handler(Server::update_buffer)
98 .add_message_handler(Server::update_buffer_file)
99 .add_message_handler(Server::buffer_reloaded)
100 .add_message_handler(Server::buffer_saved)
101 .add_request_handler(Server::save_buffer)
102 .add_request_handler(Server::get_channels)
103 .add_request_handler(Server::get_users)
104 .add_request_handler(Server::join_channel)
105 .add_message_handler(Server::leave_channel)
106 .add_request_handler(Server::send_channel_message)
107 .add_request_handler(Server::get_channel_messages);
108
109 Arc::new(server)
110 }
111
112 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
113 where
114 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
115 Fut: 'static + Send + Future<Output = tide::Result<()>>,
116 M: EnvelopedMessage,
117 {
118 let prev_handler = self.handlers.insert(
119 TypeId::of::<M>(),
120 Box::new(move |server, envelope| {
121 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
122 (handler)(server, *envelope).boxed()
123 }),
124 );
125 if prev_handler.is_some() {
126 panic!("registered a handler for the same message twice");
127 }
128 self
129 }
130
131 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
132 where
133 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
134 Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
135 M: RequestMessage,
136 {
137 self.add_message_handler(move |server, envelope| {
138 let receipt = envelope.receipt();
139 let response = (handler)(server.clone(), envelope);
140 async move {
141 match response.await {
142 Ok(response) => {
143 server.peer.respond(receipt, response)?;
144 Ok(())
145 }
146 Err(error) => {
147 server.peer.respond_with_error(
148 receipt,
149 proto::Error {
150 message: error.to_string(),
151 },
152 )?;
153 Err(error)
154 }
155 }
156 }
157 })
158 }
159
160 pub fn handle_connection<E: Executor>(
161 self: &Arc<Self>,
162 connection: Connection,
163 addr: String,
164 user_id: UserId,
165 mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
166 executor: E,
167 ) -> impl Future<Output = ()> {
168 let mut this = self.clone();
169 async move {
170 let (connection_id, handle_io, mut incoming_rx) =
171 this.peer.add_connection(connection).await;
172
173 if let Some(send_connection_id) = send_connection_id.as_mut() {
174 let _ = send_connection_id.send(connection_id).await;
175 }
176
177 this.state_mut().add_connection(connection_id, user_id);
178 if let Err(err) = this.update_contacts_for_users(&[user_id]) {
179 log::error!("error updating contacts for {:?}: {}", user_id, err);
180 }
181
182 let handle_io = handle_io.fuse();
183 futures::pin_mut!(handle_io);
184 loop {
185 let next_message = incoming_rx.next().fuse();
186 futures::pin_mut!(next_message);
187 futures::select_biased! {
188 result = handle_io => {
189 if let Err(err) = result {
190 log::error!("error handling rpc connection {:?} - {:?}", addr, err);
191 }
192 break;
193 }
194 message = next_message => {
195 if let Some(message) = message {
196 let start_time = Instant::now();
197 let type_name = message.payload_type_name();
198 log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
199 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
200 let notifications = this.notifications.clone();
201 let is_background = message.is_background();
202 let handle_message = (handler)(this.clone(), message);
203 let handle_message = async move {
204 if let Err(err) = handle_message.await {
205 log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
206 } else {
207 log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
208 }
209 if let Some(mut notifications) = notifications {
210 let _ = notifications.send(()).await;
211 }
212 };
213 if is_background {
214 executor.spawn_detached(handle_message);
215 } else {
216 handle_message.await;
217 }
218 } else {
219 log::warn!("unhandled message: {}", type_name);
220 }
221 } else {
222 log::info!("rpc connection closed {:?}", addr);
223 break;
224 }
225 }
226 }
227 }
228
229 if let Err(err) = this.sign_out(connection_id).await {
230 log::error!("error signing out connection {:?} - {:?}", addr, err);
231 }
232 }
233 }
234
235 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
236 self.peer.disconnect(connection_id);
237 let removed_connection = self.state_mut().remove_connection(connection_id)?;
238
239 for (project_id, project) in removed_connection.hosted_projects {
240 if let Some(share) = project.share {
241 broadcast(
242 connection_id,
243 share.guests.keys().copied().collect(),
244 |conn_id| {
245 self.peer
246 .send(conn_id, proto::UnshareProject { project_id })
247 },
248 )?;
249 }
250 }
251
252 for (project_id, peer_ids) in removed_connection.guest_project_ids {
253 broadcast(connection_id, peer_ids, |conn_id| {
254 self.peer.send(
255 conn_id,
256 proto::RemoveProjectCollaborator {
257 project_id,
258 peer_id: connection_id.0,
259 },
260 )
261 })?;
262 }
263
264 self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
265 Ok(())
266 }
267
268 async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
269 Ok(proto::Ack {})
270 }
271
272 async fn register_project(
273 mut self: Arc<Server>,
274 request: TypedEnvelope<proto::RegisterProject>,
275 ) -> tide::Result<proto::RegisterProjectResponse> {
276 let project_id = {
277 let mut state = self.state_mut();
278 let user_id = state.user_id_for_connection(request.sender_id)?;
279 state.register_project(request.sender_id, user_id)
280 };
281 Ok(proto::RegisterProjectResponse { project_id })
282 }
283
284 async fn unregister_project(
285 mut self: Arc<Server>,
286 request: TypedEnvelope<proto::UnregisterProject>,
287 ) -> tide::Result<()> {
288 let project = self
289 .state_mut()
290 .unregister_project(request.payload.project_id, request.sender_id)?;
291 self.update_contacts_for_users(project.authorized_user_ids().iter())?;
292 Ok(())
293 }
294
295 async fn share_project(
296 mut self: Arc<Server>,
297 request: TypedEnvelope<proto::ShareProject>,
298 ) -> tide::Result<proto::Ack> {
299 self.state_mut()
300 .share_project(request.payload.project_id, request.sender_id);
301 Ok(proto::Ack {})
302 }
303
304 async fn unshare_project(
305 mut self: Arc<Server>,
306 request: TypedEnvelope<proto::UnshareProject>,
307 ) -> tide::Result<()> {
308 let project_id = request.payload.project_id;
309 let project = self
310 .state_mut()
311 .unshare_project(project_id, request.sender_id)?;
312
313 broadcast(request.sender_id, project.connection_ids, |conn_id| {
314 self.peer
315 .send(conn_id, proto::UnshareProject { project_id })
316 })?;
317 self.update_contacts_for_users(&project.authorized_user_ids)?;
318 Ok(())
319 }
320
321 async fn join_project(
322 mut self: Arc<Server>,
323 request: TypedEnvelope<proto::JoinProject>,
324 ) -> tide::Result<proto::JoinProjectResponse> {
325 let project_id = request.payload.project_id;
326
327 let user_id = self.state().user_id_for_connection(request.sender_id)?;
328 let (response, connection_ids, contact_user_ids) = self
329 .state_mut()
330 .join_project(request.sender_id, user_id, project_id)
331 .and_then(|joined| {
332 let share = joined.project.share()?;
333 let peer_count = share.guests.len();
334 let mut collaborators = Vec::with_capacity(peer_count);
335 collaborators.push(proto::Collaborator {
336 peer_id: joined.project.host_connection_id.0,
337 replica_id: 0,
338 user_id: joined.project.host_user_id.to_proto(),
339 });
340 let worktrees = share
341 .worktrees
342 .iter()
343 .filter_map(|(id, shared_worktree)| {
344 let worktree = joined.project.worktrees.get(&id)?;
345 Some(proto::Worktree {
346 id: *id,
347 root_name: worktree.root_name.clone(),
348 entries: shared_worktree.entries.values().cloned().collect(),
349 diagnostic_summaries: shared_worktree
350 .diagnostic_summaries
351 .values()
352 .cloned()
353 .collect(),
354 weak: worktree.weak,
355 })
356 })
357 .collect();
358 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
359 if *peer_conn_id != request.sender_id {
360 collaborators.push(proto::Collaborator {
361 peer_id: peer_conn_id.0,
362 replica_id: *peer_replica_id as u32,
363 user_id: peer_user_id.to_proto(),
364 });
365 }
366 }
367 let response = proto::JoinProjectResponse {
368 worktrees,
369 replica_id: joined.replica_id as u32,
370 collaborators,
371 };
372 let connection_ids = joined.project.connection_ids();
373 let contact_user_ids = joined.project.authorized_user_ids();
374 Ok((response, connection_ids, contact_user_ids))
375 })?;
376
377 broadcast(request.sender_id, connection_ids, |conn_id| {
378 self.peer.send(
379 conn_id,
380 proto::AddProjectCollaborator {
381 project_id,
382 collaborator: Some(proto::Collaborator {
383 peer_id: request.sender_id.0,
384 replica_id: response.replica_id,
385 user_id: user_id.to_proto(),
386 }),
387 },
388 )
389 })?;
390 self.update_contacts_for_users(&contact_user_ids)?;
391 Ok(response)
392 }
393
394 async fn leave_project(
395 mut self: Arc<Server>,
396 request: TypedEnvelope<proto::LeaveProject>,
397 ) -> tide::Result<()> {
398 let sender_id = request.sender_id;
399 let project_id = request.payload.project_id;
400 let worktree = self.state_mut().leave_project(sender_id, project_id)?;
401
402 broadcast(sender_id, worktree.connection_ids, |conn_id| {
403 self.peer.send(
404 conn_id,
405 proto::RemoveProjectCollaborator {
406 project_id,
407 peer_id: sender_id.0,
408 },
409 )
410 })?;
411 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
412
413 Ok(())
414 }
415
416 async fn register_worktree(
417 mut self: Arc<Server>,
418 request: TypedEnvelope<proto::RegisterWorktree>,
419 ) -> tide::Result<proto::Ack> {
420 let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
421
422 let mut contact_user_ids = HashSet::default();
423 contact_user_ids.insert(host_user_id);
424 for github_login in &request.payload.authorized_logins {
425 let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
426 contact_user_ids.insert(contact_user_id);
427 }
428
429 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
430 let guest_connection_ids;
431 {
432 let mut state = self.state_mut();
433 guest_connection_ids = state
434 .read_project(request.payload.project_id, request.sender_id)?
435 .guest_connection_ids();
436 state.register_worktree(
437 request.payload.project_id,
438 request.payload.worktree_id,
439 request.sender_id,
440 Worktree {
441 authorized_user_ids: contact_user_ids.clone(),
442 root_name: request.payload.root_name.clone(),
443 weak: request.payload.weak,
444 },
445 )?;
446 }
447 broadcast(request.sender_id, guest_connection_ids, |connection_id| {
448 self.peer
449 .forward_send(request.sender_id, connection_id, request.payload.clone())
450 })?;
451 self.update_contacts_for_users(&contact_user_ids)?;
452 Ok(proto::Ack {})
453 }
454
455 async fn unregister_worktree(
456 mut self: Arc<Server>,
457 request: TypedEnvelope<proto::UnregisterWorktree>,
458 ) -> tide::Result<()> {
459 let project_id = request.payload.project_id;
460 let worktree_id = request.payload.worktree_id;
461 let (worktree, guest_connection_ids) =
462 self.state_mut()
463 .unregister_worktree(project_id, worktree_id, request.sender_id)?;
464 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
465 self.peer.send(
466 conn_id,
467 proto::UnregisterWorktree {
468 project_id,
469 worktree_id,
470 },
471 )
472 })?;
473 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
474 Ok(())
475 }
476
477 async fn update_worktree(
478 mut self: Arc<Server>,
479 request: TypedEnvelope<proto::UpdateWorktree>,
480 ) -> tide::Result<proto::Ack> {
481 let connection_ids = self.state_mut().update_worktree(
482 request.sender_id,
483 request.payload.project_id,
484 request.payload.worktree_id,
485 &request.payload.removed_entries,
486 &request.payload.updated_entries,
487 )?;
488
489 broadcast(request.sender_id, connection_ids, |connection_id| {
490 self.peer
491 .forward_send(request.sender_id, connection_id, request.payload.clone())
492 })?;
493
494 Ok(proto::Ack {})
495 }
496
497 async fn update_diagnostic_summary(
498 mut self: Arc<Server>,
499 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
500 ) -> tide::Result<()> {
501 let summary = request
502 .payload
503 .summary
504 .clone()
505 .ok_or_else(|| anyhow!("invalid summary"))?;
506 let receiver_ids = self.state_mut().update_diagnostic_summary(
507 request.payload.project_id,
508 request.payload.worktree_id,
509 request.sender_id,
510 summary,
511 )?;
512
513 broadcast(request.sender_id, receiver_ids, |connection_id| {
514 self.peer
515 .forward_send(request.sender_id, connection_id, request.payload.clone())
516 })?;
517 Ok(())
518 }
519
520 async fn disk_based_diagnostics_updating(
521 self: Arc<Server>,
522 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
523 ) -> tide::Result<()> {
524 let receiver_ids = self
525 .state()
526 .project_connection_ids(request.payload.project_id, request.sender_id)?;
527 broadcast(request.sender_id, receiver_ids, |connection_id| {
528 self.peer
529 .forward_send(request.sender_id, connection_id, request.payload.clone())
530 })?;
531 Ok(())
532 }
533
534 async fn disk_based_diagnostics_updated(
535 self: Arc<Server>,
536 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
537 ) -> tide::Result<()> {
538 let receiver_ids = self
539 .state()
540 .project_connection_ids(request.payload.project_id, request.sender_id)?;
541 broadcast(request.sender_id, receiver_ids, |connection_id| {
542 self.peer
543 .forward_send(request.sender_id, connection_id, request.payload.clone())
544 })?;
545 Ok(())
546 }
547
548 async fn forward_project_request<T>(
549 self: Arc<Server>,
550 request: TypedEnvelope<T>,
551 ) -> tide::Result<T::Response>
552 where
553 T: EntityMessage + RequestMessage,
554 {
555 let host_connection_id = self
556 .state()
557 .read_project(request.payload.remote_entity_id(), request.sender_id)?
558 .host_connection_id;
559 Ok(self
560 .peer
561 .forward_request(request.sender_id, host_connection_id, request.payload)
562 .await?)
563 }
564
565 async fn close_buffer(
566 self: Arc<Server>,
567 request: TypedEnvelope<proto::CloseBuffer>,
568 ) -> tide::Result<()> {
569 let host_connection_id = self
570 .state()
571 .read_project(request.payload.project_id, request.sender_id)?
572 .host_connection_id;
573 self.peer
574 .forward_send(request.sender_id, host_connection_id, request.payload)?;
575 Ok(())
576 }
577
578 async fn save_buffer(
579 self: Arc<Server>,
580 request: TypedEnvelope<proto::SaveBuffer>,
581 ) -> tide::Result<proto::BufferSaved> {
582 let host;
583 let mut guests;
584 {
585 let state = self.state();
586 let project = state.read_project(request.payload.project_id, request.sender_id)?;
587 host = project.host_connection_id;
588 guests = project.guest_connection_ids()
589 }
590
591 let response = self
592 .peer
593 .forward_request(request.sender_id, host, request.payload.clone())
594 .await?;
595
596 guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
597 broadcast(host, guests, |conn_id| {
598 self.peer.forward_send(host, conn_id, response.clone())
599 })?;
600
601 Ok(response)
602 }
603
604 async fn update_buffer(
605 self: Arc<Server>,
606 request: TypedEnvelope<proto::UpdateBuffer>,
607 ) -> tide::Result<proto::Ack> {
608 let receiver_ids = self
609 .state()
610 .project_connection_ids(request.payload.project_id, request.sender_id)?;
611 broadcast(request.sender_id, receiver_ids, |connection_id| {
612 self.peer
613 .forward_send(request.sender_id, connection_id, request.payload.clone())
614 })?;
615 Ok(proto::Ack {})
616 }
617
618 async fn update_buffer_file(
619 self: Arc<Server>,
620 request: TypedEnvelope<proto::UpdateBufferFile>,
621 ) -> tide::Result<()> {
622 let receiver_ids = self
623 .state()
624 .project_connection_ids(request.payload.project_id, request.sender_id)?;
625 broadcast(request.sender_id, receiver_ids, |connection_id| {
626 self.peer
627 .forward_send(request.sender_id, connection_id, request.payload.clone())
628 })?;
629 Ok(())
630 }
631
632 async fn buffer_reloaded(
633 self: Arc<Server>,
634 request: TypedEnvelope<proto::BufferReloaded>,
635 ) -> tide::Result<()> {
636 let receiver_ids = self
637 .state()
638 .project_connection_ids(request.payload.project_id, request.sender_id)?;
639 broadcast(request.sender_id, receiver_ids, |connection_id| {
640 self.peer
641 .forward_send(request.sender_id, connection_id, request.payload.clone())
642 })?;
643 Ok(())
644 }
645
646 async fn buffer_saved(
647 self: Arc<Server>,
648 request: TypedEnvelope<proto::BufferSaved>,
649 ) -> tide::Result<()> {
650 let receiver_ids = self
651 .state()
652 .project_connection_ids(request.payload.project_id, request.sender_id)?;
653 broadcast(request.sender_id, receiver_ids, |connection_id| {
654 self.peer
655 .forward_send(request.sender_id, connection_id, request.payload.clone())
656 })?;
657 Ok(())
658 }
659
660 async fn get_channels(
661 self: Arc<Server>,
662 request: TypedEnvelope<proto::GetChannels>,
663 ) -> tide::Result<proto::GetChannelsResponse> {
664 let user_id = self.state().user_id_for_connection(request.sender_id)?;
665 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
666 Ok(proto::GetChannelsResponse {
667 channels: channels
668 .into_iter()
669 .map(|chan| proto::Channel {
670 id: chan.id.to_proto(),
671 name: chan.name,
672 })
673 .collect(),
674 })
675 }
676
677 async fn get_users(
678 self: Arc<Server>,
679 request: TypedEnvelope<proto::GetUsers>,
680 ) -> tide::Result<proto::GetUsersResponse> {
681 let user_ids = request
682 .payload
683 .user_ids
684 .into_iter()
685 .map(UserId::from_proto)
686 .collect();
687 let users = self
688 .app_state
689 .db
690 .get_users_by_ids(user_ids)
691 .await?
692 .into_iter()
693 .map(|user| proto::User {
694 id: user.id.to_proto(),
695 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
696 github_login: user.github_login,
697 })
698 .collect();
699 Ok(proto::GetUsersResponse { users })
700 }
701
702 fn update_contacts_for_users<'a>(
703 self: &Arc<Server>,
704 user_ids: impl IntoIterator<Item = &'a UserId>,
705 ) -> anyhow::Result<()> {
706 let mut result = Ok(());
707 let state = self.state();
708 for user_id in user_ids {
709 let contacts = state.contacts_for_user(*user_id);
710 for connection_id in state.connection_ids_for_user(*user_id) {
711 if let Err(error) = self.peer.send(
712 connection_id,
713 proto::UpdateContacts {
714 contacts: contacts.clone(),
715 },
716 ) {
717 result = Err(error);
718 }
719 }
720 }
721 result
722 }
723
724 async fn join_channel(
725 mut self: Arc<Self>,
726 request: TypedEnvelope<proto::JoinChannel>,
727 ) -> tide::Result<proto::JoinChannelResponse> {
728 let user_id = self.state().user_id_for_connection(request.sender_id)?;
729 let channel_id = ChannelId::from_proto(request.payload.channel_id);
730 if !self
731 .app_state
732 .db
733 .can_user_access_channel(user_id, channel_id)
734 .await?
735 {
736 Err(anyhow!("access denied"))?;
737 }
738
739 self.state_mut().join_channel(request.sender_id, channel_id);
740 let messages = self
741 .app_state
742 .db
743 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
744 .await?
745 .into_iter()
746 .map(|msg| proto::ChannelMessage {
747 id: msg.id.to_proto(),
748 body: msg.body,
749 timestamp: msg.sent_at.unix_timestamp() as u64,
750 sender_id: msg.sender_id.to_proto(),
751 nonce: Some(msg.nonce.as_u128().into()),
752 })
753 .collect::<Vec<_>>();
754 Ok(proto::JoinChannelResponse {
755 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
756 messages,
757 })
758 }
759
760 async fn leave_channel(
761 mut self: Arc<Self>,
762 request: TypedEnvelope<proto::LeaveChannel>,
763 ) -> tide::Result<()> {
764 let user_id = self.state().user_id_for_connection(request.sender_id)?;
765 let channel_id = ChannelId::from_proto(request.payload.channel_id);
766 if !self
767 .app_state
768 .db
769 .can_user_access_channel(user_id, channel_id)
770 .await?
771 {
772 Err(anyhow!("access denied"))?;
773 }
774
775 self.state_mut()
776 .leave_channel(request.sender_id, channel_id);
777
778 Ok(())
779 }
780
781 async fn send_channel_message(
782 self: Arc<Self>,
783 request: TypedEnvelope<proto::SendChannelMessage>,
784 ) -> tide::Result<proto::SendChannelMessageResponse> {
785 let channel_id = ChannelId::from_proto(request.payload.channel_id);
786 let user_id;
787 let connection_ids;
788 {
789 let state = self.state();
790 user_id = state.user_id_for_connection(request.sender_id)?;
791 connection_ids = state.channel_connection_ids(channel_id)?;
792 }
793
794 // Validate the message body.
795 let body = request.payload.body.trim().to_string();
796 if body.len() > MAX_MESSAGE_LEN {
797 return Err(anyhow!("message is too long"))?;
798 }
799 if body.is_empty() {
800 return Err(anyhow!("message can't be blank"))?;
801 }
802
803 let timestamp = OffsetDateTime::now_utc();
804 let nonce = request
805 .payload
806 .nonce
807 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
808
809 let message_id = self
810 .app_state
811 .db
812 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
813 .await?
814 .to_proto();
815 let message = proto::ChannelMessage {
816 sender_id: user_id.to_proto(),
817 id: message_id,
818 body,
819 timestamp: timestamp.unix_timestamp() as u64,
820 nonce: Some(nonce),
821 };
822 broadcast(request.sender_id, connection_ids, |conn_id| {
823 self.peer.send(
824 conn_id,
825 proto::ChannelMessageSent {
826 channel_id: channel_id.to_proto(),
827 message: Some(message.clone()),
828 },
829 )
830 })?;
831 Ok(proto::SendChannelMessageResponse {
832 message: Some(message),
833 })
834 }
835
836 async fn get_channel_messages(
837 self: Arc<Self>,
838 request: TypedEnvelope<proto::GetChannelMessages>,
839 ) -> tide::Result<proto::GetChannelMessagesResponse> {
840 let user_id = self.state().user_id_for_connection(request.sender_id)?;
841 let channel_id = ChannelId::from_proto(request.payload.channel_id);
842 if !self
843 .app_state
844 .db
845 .can_user_access_channel(user_id, channel_id)
846 .await?
847 {
848 Err(anyhow!("access denied"))?;
849 }
850
851 let messages = self
852 .app_state
853 .db
854 .get_channel_messages(
855 channel_id,
856 MESSAGE_COUNT_PER_PAGE,
857 Some(MessageId::from_proto(request.payload.before_message_id)),
858 )
859 .await?
860 .into_iter()
861 .map(|msg| proto::ChannelMessage {
862 id: msg.id.to_proto(),
863 body: msg.body,
864 timestamp: msg.sent_at.unix_timestamp() as u64,
865 sender_id: msg.sender_id.to_proto(),
866 nonce: Some(msg.nonce.as_u128().into()),
867 })
868 .collect::<Vec<_>>();
869
870 Ok(proto::GetChannelMessagesResponse {
871 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
872 messages,
873 })
874 }
875
876 fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
877 self.store.read()
878 }
879
880 fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
881 self.store.write()
882 }
883}
884
885impl Executor for RealExecutor {
886 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
887 task::spawn(future);
888 }
889}
890
891fn broadcast<F>(
892 sender_id: ConnectionId,
893 receiver_ids: Vec<ConnectionId>,
894 mut f: F,
895) -> anyhow::Result<()>
896where
897 F: FnMut(ConnectionId) -> anyhow::Result<()>,
898{
899 let mut result = Ok(());
900 for receiver_id in receiver_ids {
901 if receiver_id != sender_id {
902 if let Err(error) = f(receiver_id) {
903 if result.is_ok() {
904 result = Err(error);
905 }
906 }
907 }
908 }
909 result
910}
911
912pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
913 let server = Server::new(app.state().clone(), rpc.clone(), None);
914 app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
915 let server = server.clone();
916 async move {
917 const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
918
919 let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
920 let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
921 let upgrade_requested = connection_upgrade && upgrade_to_websocket;
922 let client_protocol_version: Option<u32> = request
923 .header("X-Zed-Protocol-Version")
924 .and_then(|v| v.as_str().parse().ok());
925
926 if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
927 return Ok(Response::new(StatusCode::UpgradeRequired));
928 }
929
930 let header = match request.header("Sec-Websocket-Key") {
931 Some(h) => h.as_str(),
932 None => return Err(anyhow!("expected sec-websocket-key"))?,
933 };
934
935 let user_id = process_auth_header(&request).await?;
936
937 let mut response = Response::new(StatusCode::SwitchingProtocols);
938 response.insert_header(UPGRADE, "websocket");
939 response.insert_header(CONNECTION, "Upgrade");
940 let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
941 response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
942 response.insert_header("Sec-Websocket-Version", "13");
943
944 let http_res: &mut tide::http::Response = response.as_mut();
945 let upgrade_receiver = http_res.recv_upgrade().await;
946 let addr = request.remote().unwrap_or("unknown").to_string();
947 task::spawn(async move {
948 if let Some(stream) = upgrade_receiver.await {
949 server
950 .handle_connection(
951 Connection::new(
952 WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
953 ),
954 addr,
955 user_id,
956 None,
957 RealExecutor,
958 )
959 .await;
960 }
961 });
962
963 Ok(response)
964 }
965 });
966}
967
968fn header_contains_ignore_case<T>(
969 request: &tide::Request<T>,
970 header_name: HeaderName,
971 value: &str,
972) -> bool {
973 request
974 .header(header_name)
975 .map(|h| {
976 h.as_str()
977 .split(',')
978 .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
979 })
980 .unwrap_or(false)
981}
982
983#[cfg(test)]
984mod tests {
985 use super::*;
986 use crate::{
987 auth,
988 db::{tests::TestDb, UserId},
989 github, AppState, Config,
990 };
991 use ::rpc::Peer;
992 use collections::BTreeMap;
993 use gpui::{executor, ModelHandle, TestAppContext};
994 use parking_lot::Mutex;
995 use postage::{sink::Sink, watch};
996 use rand::prelude::*;
997 use rpc::PeerId;
998 use serde_json::json;
999 use sqlx::types::time::OffsetDateTime;
1000 use std::{
1001 cell::Cell,
1002 env,
1003 ops::Deref,
1004 path::{Path, PathBuf},
1005 rc::Rc,
1006 sync::{
1007 atomic::{AtomicBool, Ordering::SeqCst},
1008 Arc,
1009 },
1010 time::Duration,
1011 };
1012 use zed::{
1013 client::{
1014 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1015 EstablishConnectionError, UserStore,
1016 },
1017 editor::{
1018 self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, MultiBuffer,
1019 Redo, Rename, ToOffset, ToggleCodeActions, Undo,
1020 },
1021 fs::{FakeFs, Fs as _},
1022 language::{
1023 tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1024 LanguageConfig, LanguageRegistry, LanguageServerConfig, Point, ToLspPosition,
1025 },
1026 lsp,
1027 project::{search::SearchQuery, DiagnosticSummary, Project, ProjectPath},
1028 workspace::{Settings, Workspace, WorkspaceParams},
1029 };
1030
1031 #[cfg(test)]
1032 #[ctor::ctor]
1033 fn init_logger() {
1034 if std::env::var("RUST_LOG").is_ok() {
1035 env_logger::init();
1036 }
1037 }
1038
1039 #[gpui::test(iterations = 10)]
1040 async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1041 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1042 let lang_registry = Arc::new(LanguageRegistry::new());
1043 let fs = FakeFs::new(cx_a.background());
1044 cx_a.foreground().forbid_parking();
1045
1046 // Connect to a server as 2 clients.
1047 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1048 let client_a = server.create_client(cx_a, "user_a").await;
1049 let client_b = server.create_client(cx_b, "user_b").await;
1050
1051 // Share a project as client A
1052 fs.insert_tree(
1053 "/a",
1054 json!({
1055 ".zed.toml": r#"collaborators = ["user_b"]"#,
1056 "a.txt": "a-contents",
1057 "b.txt": "b-contents",
1058 }),
1059 )
1060 .await;
1061 let project_a = cx_a.update(|cx| {
1062 Project::local(
1063 client_a.clone(),
1064 client_a.user_store.clone(),
1065 lang_registry.clone(),
1066 fs.clone(),
1067 cx,
1068 )
1069 });
1070 let (worktree_a, _) = project_a
1071 .update(cx_a, |p, cx| {
1072 p.find_or_create_local_worktree("/a", false, cx)
1073 })
1074 .await
1075 .unwrap();
1076 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1077 worktree_a
1078 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1079 .await;
1080 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1081 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1082
1083 // Join that project as client B
1084 let project_b = Project::remote(
1085 project_id,
1086 client_b.clone(),
1087 client_b.user_store.clone(),
1088 lang_registry.clone(),
1089 fs.clone(),
1090 &mut cx_b.to_async(),
1091 )
1092 .await
1093 .unwrap();
1094
1095 let replica_id_b = project_b.read_with(cx_b, |project, _| {
1096 assert_eq!(
1097 project
1098 .collaborators()
1099 .get(&client_a.peer_id)
1100 .unwrap()
1101 .user
1102 .github_login,
1103 "user_a"
1104 );
1105 project.replica_id()
1106 });
1107 project_a
1108 .condition(&cx_a, |tree, _| {
1109 tree.collaborators()
1110 .get(&client_b.peer_id)
1111 .map_or(false, |collaborator| {
1112 collaborator.replica_id == replica_id_b
1113 && collaborator.user.github_login == "user_b"
1114 })
1115 })
1116 .await;
1117
1118 // Open the same file as client B and client A.
1119 let buffer_b = project_b
1120 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1121 .await
1122 .unwrap();
1123 let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1124 buffer_b.read_with(cx_b, |buf, cx| {
1125 assert_eq!(buf.read(cx).text(), "b-contents")
1126 });
1127 project_a.read_with(cx_a, |project, cx| {
1128 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1129 });
1130 let buffer_a = project_a
1131 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1132 .await
1133 .unwrap();
1134
1135 let editor_b = cx_b.add_view(window_b, |cx| {
1136 Editor::for_buffer(
1137 buffer_b,
1138 None,
1139 watch::channel_with(Settings::test(cx)).1,
1140 cx,
1141 )
1142 });
1143
1144 // TODO
1145 // // Create a selection set as client B and see that selection set as client A.
1146 // buffer_a
1147 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1148 // .await;
1149
1150 // Edit the buffer as client B and see that edit as client A.
1151 editor_b.update(cx_b, |editor, cx| {
1152 editor.handle_input(&Input("ok, ".into()), cx)
1153 });
1154 buffer_a
1155 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1156 .await;
1157
1158 // TODO
1159 // // Remove the selection set as client B, see those selections disappear as client A.
1160 cx_b.update(move |_| drop(editor_b));
1161 // buffer_a
1162 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1163 // .await;
1164
1165 // Dropping the client B's project removes client B from client A's collaborators.
1166 cx_b.update(move |_| drop(project_b));
1167 project_a
1168 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1169 .await;
1170 }
1171
1172 #[gpui::test(iterations = 10)]
1173 async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1174 let lang_registry = Arc::new(LanguageRegistry::new());
1175 let fs = FakeFs::new(cx_a.background());
1176 cx_a.foreground().forbid_parking();
1177
1178 // Connect to a server as 2 clients.
1179 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1180 let client_a = server.create_client(cx_a, "user_a").await;
1181 let client_b = server.create_client(cx_b, "user_b").await;
1182
1183 // Share a project as client A
1184 fs.insert_tree(
1185 "/a",
1186 json!({
1187 ".zed.toml": r#"collaborators = ["user_b"]"#,
1188 "a.txt": "a-contents",
1189 "b.txt": "b-contents",
1190 }),
1191 )
1192 .await;
1193 let project_a = cx_a.update(|cx| {
1194 Project::local(
1195 client_a.clone(),
1196 client_a.user_store.clone(),
1197 lang_registry.clone(),
1198 fs.clone(),
1199 cx,
1200 )
1201 });
1202 let (worktree_a, _) = project_a
1203 .update(cx_a, |p, cx| {
1204 p.find_or_create_local_worktree("/a", false, cx)
1205 })
1206 .await
1207 .unwrap();
1208 worktree_a
1209 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1210 .await;
1211 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1212 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1213 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1214 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1215
1216 // Join that project as client B
1217 let project_b = Project::remote(
1218 project_id,
1219 client_b.clone(),
1220 client_b.user_store.clone(),
1221 lang_registry.clone(),
1222 fs.clone(),
1223 &mut cx_b.to_async(),
1224 )
1225 .await
1226 .unwrap();
1227 project_b
1228 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1229 .await
1230 .unwrap();
1231
1232 // Unshare the project as client A
1233 project_a
1234 .update(cx_a, |project, cx| project.unshare(cx))
1235 .await
1236 .unwrap();
1237 project_b
1238 .condition(cx_b, |project, _| project.is_read_only())
1239 .await;
1240 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1241 cx_b.update(|_| {
1242 drop(project_b);
1243 });
1244
1245 // Share the project again and ensure guests can still join.
1246 project_a
1247 .update(cx_a, |project, cx| project.share(cx))
1248 .await
1249 .unwrap();
1250 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1251
1252 let project_b2 = Project::remote(
1253 project_id,
1254 client_b.clone(),
1255 client_b.user_store.clone(),
1256 lang_registry.clone(),
1257 fs.clone(),
1258 &mut cx_b.to_async(),
1259 )
1260 .await
1261 .unwrap();
1262 project_b2
1263 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1264 .await
1265 .unwrap();
1266 }
1267
1268 #[gpui::test(iterations = 10)]
1269 async fn test_propagate_saves_and_fs_changes(
1270 cx_a: &mut TestAppContext,
1271 cx_b: &mut TestAppContext,
1272 cx_c: &mut TestAppContext,
1273 ) {
1274 let lang_registry = Arc::new(LanguageRegistry::new());
1275 let fs = FakeFs::new(cx_a.background());
1276 cx_a.foreground().forbid_parking();
1277
1278 // Connect to a server as 3 clients.
1279 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1280 let client_a = server.create_client(cx_a, "user_a").await;
1281 let client_b = server.create_client(cx_b, "user_b").await;
1282 let client_c = server.create_client(cx_c, "user_c").await;
1283
1284 // Share a worktree as client A.
1285 fs.insert_tree(
1286 "/a",
1287 json!({
1288 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1289 "file1": "",
1290 "file2": ""
1291 }),
1292 )
1293 .await;
1294 let project_a = cx_a.update(|cx| {
1295 Project::local(
1296 client_a.clone(),
1297 client_a.user_store.clone(),
1298 lang_registry.clone(),
1299 fs.clone(),
1300 cx,
1301 )
1302 });
1303 let (worktree_a, _) = project_a
1304 .update(cx_a, |p, cx| {
1305 p.find_or_create_local_worktree("/a", false, cx)
1306 })
1307 .await
1308 .unwrap();
1309 worktree_a
1310 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1311 .await;
1312 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1313 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1314 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1315
1316 // Join that worktree as clients B and C.
1317 let project_b = Project::remote(
1318 project_id,
1319 client_b.clone(),
1320 client_b.user_store.clone(),
1321 lang_registry.clone(),
1322 fs.clone(),
1323 &mut cx_b.to_async(),
1324 )
1325 .await
1326 .unwrap();
1327 let project_c = Project::remote(
1328 project_id,
1329 client_c.clone(),
1330 client_c.user_store.clone(),
1331 lang_registry.clone(),
1332 fs.clone(),
1333 &mut cx_c.to_async(),
1334 )
1335 .await
1336 .unwrap();
1337 let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1338 let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1339
1340 // Open and edit a buffer as both guests B and C.
1341 let buffer_b = project_b
1342 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1343 .await
1344 .unwrap();
1345 let buffer_c = project_c
1346 .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1347 .await
1348 .unwrap();
1349 buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1350 buffer_c.update(cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1351
1352 // Open and edit that buffer as the host.
1353 let buffer_a = project_a
1354 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1355 .await
1356 .unwrap();
1357
1358 buffer_a
1359 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1360 .await;
1361 buffer_a.update(cx_a, |buf, cx| {
1362 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1363 });
1364
1365 // Wait for edits to propagate
1366 buffer_a
1367 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1368 .await;
1369 buffer_b
1370 .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1371 .await;
1372 buffer_c
1373 .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1374 .await;
1375
1376 // Edit the buffer as the host and concurrently save as guest B.
1377 let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1378 buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1379 save_b.await.unwrap();
1380 assert_eq!(
1381 fs.load("/a/file1".as_ref()).await.unwrap(),
1382 "hi-a, i-am-c, i-am-b, i-am-a"
1383 );
1384 buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1385 buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1386 buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1387
1388 // Make changes on host's file system, see those changes on guest worktrees.
1389 fs.rename(
1390 "/a/file1".as_ref(),
1391 "/a/file1-renamed".as_ref(),
1392 Default::default(),
1393 )
1394 .await
1395 .unwrap();
1396
1397 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1398 .await
1399 .unwrap();
1400 fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1401
1402 worktree_a
1403 .condition(&cx_a, |tree, _| {
1404 tree.paths()
1405 .map(|p| p.to_string_lossy())
1406 .collect::<Vec<_>>()
1407 == [".zed.toml", "file1-renamed", "file3", "file4"]
1408 })
1409 .await;
1410 worktree_b
1411 .condition(&cx_b, |tree, _| {
1412 tree.paths()
1413 .map(|p| p.to_string_lossy())
1414 .collect::<Vec<_>>()
1415 == [".zed.toml", "file1-renamed", "file3", "file4"]
1416 })
1417 .await;
1418 worktree_c
1419 .condition(&cx_c, |tree, _| {
1420 tree.paths()
1421 .map(|p| p.to_string_lossy())
1422 .collect::<Vec<_>>()
1423 == [".zed.toml", "file1-renamed", "file3", "file4"]
1424 })
1425 .await;
1426
1427 // Ensure buffer files are updated as well.
1428 buffer_a
1429 .condition(&cx_a, |buf, _| {
1430 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1431 })
1432 .await;
1433 buffer_b
1434 .condition(&cx_b, |buf, _| {
1435 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1436 })
1437 .await;
1438 buffer_c
1439 .condition(&cx_c, |buf, _| {
1440 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1441 })
1442 .await;
1443 }
1444
1445 #[gpui::test(iterations = 10)]
1446 async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1447 cx_a.foreground().forbid_parking();
1448 let lang_registry = Arc::new(LanguageRegistry::new());
1449 let fs = FakeFs::new(cx_a.background());
1450
1451 // Connect to a server as 2 clients.
1452 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1453 let client_a = server.create_client(cx_a, "user_a").await;
1454 let client_b = server.create_client(cx_b, "user_b").await;
1455
1456 // Share a project as client A
1457 fs.insert_tree(
1458 "/dir",
1459 json!({
1460 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1461 "a.txt": "a-contents",
1462 }),
1463 )
1464 .await;
1465
1466 let project_a = cx_a.update(|cx| {
1467 Project::local(
1468 client_a.clone(),
1469 client_a.user_store.clone(),
1470 lang_registry.clone(),
1471 fs.clone(),
1472 cx,
1473 )
1474 });
1475 let (worktree_a, _) = project_a
1476 .update(cx_a, |p, cx| {
1477 p.find_or_create_local_worktree("/dir", false, cx)
1478 })
1479 .await
1480 .unwrap();
1481 worktree_a
1482 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1483 .await;
1484 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1485 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1486 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1487
1488 // Join that project as client B
1489 let project_b = Project::remote(
1490 project_id,
1491 client_b.clone(),
1492 client_b.user_store.clone(),
1493 lang_registry.clone(),
1494 fs.clone(),
1495 &mut cx_b.to_async(),
1496 )
1497 .await
1498 .unwrap();
1499
1500 // Open a buffer as client B
1501 let buffer_b = project_b
1502 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1503 .await
1504 .unwrap();
1505
1506 buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1507 buffer_b.read_with(cx_b, |buf, _| {
1508 assert!(buf.is_dirty());
1509 assert!(!buf.has_conflict());
1510 });
1511
1512 buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
1513 buffer_b
1514 .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1515 .await;
1516 buffer_b.read_with(cx_b, |buf, _| {
1517 assert!(!buf.has_conflict());
1518 });
1519
1520 buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1521 buffer_b.read_with(cx_b, |buf, _| {
1522 assert!(buf.is_dirty());
1523 assert!(!buf.has_conflict());
1524 });
1525 }
1526
1527 #[gpui::test(iterations = 10)]
1528 async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1529 cx_a.foreground().forbid_parking();
1530 let lang_registry = Arc::new(LanguageRegistry::new());
1531 let fs = FakeFs::new(cx_a.background());
1532
1533 // Connect to a server as 2 clients.
1534 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1535 let client_a = server.create_client(cx_a, "user_a").await;
1536 let client_b = server.create_client(cx_b, "user_b").await;
1537
1538 // Share a project as client A
1539 fs.insert_tree(
1540 "/dir",
1541 json!({
1542 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1543 "a.txt": "a-contents",
1544 }),
1545 )
1546 .await;
1547
1548 let project_a = cx_a.update(|cx| {
1549 Project::local(
1550 client_a.clone(),
1551 client_a.user_store.clone(),
1552 lang_registry.clone(),
1553 fs.clone(),
1554 cx,
1555 )
1556 });
1557 let (worktree_a, _) = project_a
1558 .update(cx_a, |p, cx| {
1559 p.find_or_create_local_worktree("/dir", false, cx)
1560 })
1561 .await
1562 .unwrap();
1563 worktree_a
1564 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1565 .await;
1566 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1567 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1568 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1569
1570 // Join that project as client B
1571 let project_b = Project::remote(
1572 project_id,
1573 client_b.clone(),
1574 client_b.user_store.clone(),
1575 lang_registry.clone(),
1576 fs.clone(),
1577 &mut cx_b.to_async(),
1578 )
1579 .await
1580 .unwrap();
1581 let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1582
1583 // Open a buffer as client B
1584 let buffer_b = project_b
1585 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1586 .await
1587 .unwrap();
1588 buffer_b.read_with(cx_b, |buf, _| {
1589 assert!(!buf.is_dirty());
1590 assert!(!buf.has_conflict());
1591 });
1592
1593 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1594 .await
1595 .unwrap();
1596 buffer_b
1597 .condition(&cx_b, |buf, _| {
1598 buf.text() == "new contents" && !buf.is_dirty()
1599 })
1600 .await;
1601 buffer_b.read_with(cx_b, |buf, _| {
1602 assert!(!buf.has_conflict());
1603 });
1604 }
1605
1606 #[gpui::test(iterations = 10)]
1607 async fn test_editing_while_guest_opens_buffer(
1608 cx_a: &mut TestAppContext,
1609 cx_b: &mut TestAppContext,
1610 ) {
1611 cx_a.foreground().forbid_parking();
1612 let lang_registry = Arc::new(LanguageRegistry::new());
1613 let fs = FakeFs::new(cx_a.background());
1614
1615 // Connect to a server as 2 clients.
1616 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1617 let client_a = server.create_client(cx_a, "user_a").await;
1618 let client_b = server.create_client(cx_b, "user_b").await;
1619
1620 // Share a project as client A
1621 fs.insert_tree(
1622 "/dir",
1623 json!({
1624 ".zed.toml": r#"collaborators = ["user_b"]"#,
1625 "a.txt": "a-contents",
1626 }),
1627 )
1628 .await;
1629 let project_a = cx_a.update(|cx| {
1630 Project::local(
1631 client_a.clone(),
1632 client_a.user_store.clone(),
1633 lang_registry.clone(),
1634 fs.clone(),
1635 cx,
1636 )
1637 });
1638 let (worktree_a, _) = project_a
1639 .update(cx_a, |p, cx| {
1640 p.find_or_create_local_worktree("/dir", false, cx)
1641 })
1642 .await
1643 .unwrap();
1644 worktree_a
1645 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1646 .await;
1647 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1648 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1649 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1650
1651 // Join that project as client B
1652 let project_b = Project::remote(
1653 project_id,
1654 client_b.clone(),
1655 client_b.user_store.clone(),
1656 lang_registry.clone(),
1657 fs.clone(),
1658 &mut cx_b.to_async(),
1659 )
1660 .await
1661 .unwrap();
1662
1663 // Open a buffer as client A
1664 let buffer_a = project_a
1665 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1666 .await
1667 .unwrap();
1668
1669 // Start opening the same buffer as client B
1670 let buffer_b = cx_b
1671 .background()
1672 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1673
1674 // Edit the buffer as client A while client B is still opening it.
1675 cx_b.background().simulate_random_delay().await;
1676 buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1677 cx_b.background().simulate_random_delay().await;
1678 buffer_a.update(cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1679
1680 let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
1681 let buffer_b = buffer_b.await.unwrap();
1682 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1683 }
1684
1685 #[gpui::test(iterations = 10)]
1686 async fn test_leaving_worktree_while_opening_buffer(
1687 cx_a: &mut TestAppContext,
1688 cx_b: &mut TestAppContext,
1689 ) {
1690 cx_a.foreground().forbid_parking();
1691 let lang_registry = Arc::new(LanguageRegistry::new());
1692 let fs = FakeFs::new(cx_a.background());
1693
1694 // Connect to a server as 2 clients.
1695 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1696 let client_a = server.create_client(cx_a, "user_a").await;
1697 let client_b = server.create_client(cx_b, "user_b").await;
1698
1699 // Share a project as client A
1700 fs.insert_tree(
1701 "/dir",
1702 json!({
1703 ".zed.toml": r#"collaborators = ["user_b"]"#,
1704 "a.txt": "a-contents",
1705 }),
1706 )
1707 .await;
1708 let project_a = cx_a.update(|cx| {
1709 Project::local(
1710 client_a.clone(),
1711 client_a.user_store.clone(),
1712 lang_registry.clone(),
1713 fs.clone(),
1714 cx,
1715 )
1716 });
1717 let (worktree_a, _) = project_a
1718 .update(cx_a, |p, cx| {
1719 p.find_or_create_local_worktree("/dir", false, cx)
1720 })
1721 .await
1722 .unwrap();
1723 worktree_a
1724 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1725 .await;
1726 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1727 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1728 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1729
1730 // Join that project as client B
1731 let project_b = Project::remote(
1732 project_id,
1733 client_b.clone(),
1734 client_b.user_store.clone(),
1735 lang_registry.clone(),
1736 fs.clone(),
1737 &mut cx_b.to_async(),
1738 )
1739 .await
1740 .unwrap();
1741
1742 // See that a guest has joined as client A.
1743 project_a
1744 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1745 .await;
1746
1747 // Begin opening a buffer as client B, but leave the project before the open completes.
1748 let buffer_b = cx_b
1749 .background()
1750 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1751 cx_b.update(|_| drop(project_b));
1752 drop(buffer_b);
1753
1754 // See that the guest has left.
1755 project_a
1756 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1757 .await;
1758 }
1759
1760 #[gpui::test(iterations = 10)]
1761 async fn test_peer_disconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1762 cx_a.foreground().forbid_parking();
1763 let lang_registry = Arc::new(LanguageRegistry::new());
1764 let fs = FakeFs::new(cx_a.background());
1765
1766 // Connect to a server as 2 clients.
1767 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1768 let client_a = server.create_client(cx_a, "user_a").await;
1769 let client_b = server.create_client(cx_b, "user_b").await;
1770
1771 // Share a project as client A
1772 fs.insert_tree(
1773 "/a",
1774 json!({
1775 ".zed.toml": r#"collaborators = ["user_b"]"#,
1776 "a.txt": "a-contents",
1777 "b.txt": "b-contents",
1778 }),
1779 )
1780 .await;
1781 let project_a = cx_a.update(|cx| {
1782 Project::local(
1783 client_a.clone(),
1784 client_a.user_store.clone(),
1785 lang_registry.clone(),
1786 fs.clone(),
1787 cx,
1788 )
1789 });
1790 let (worktree_a, _) = project_a
1791 .update(cx_a, |p, cx| {
1792 p.find_or_create_local_worktree("/a", false, cx)
1793 })
1794 .await
1795 .unwrap();
1796 worktree_a
1797 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1798 .await;
1799 let project_id = project_a
1800 .update(cx_a, |project, _| project.next_remote_id())
1801 .await;
1802 project_a
1803 .update(cx_a, |project, cx| project.share(cx))
1804 .await
1805 .unwrap();
1806
1807 // Join that project as client B
1808 let _project_b = Project::remote(
1809 project_id,
1810 client_b.clone(),
1811 client_b.user_store.clone(),
1812 lang_registry.clone(),
1813 fs.clone(),
1814 &mut cx_b.to_async(),
1815 )
1816 .await
1817 .unwrap();
1818
1819 // See that a guest has joined as client A.
1820 project_a
1821 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1822 .await;
1823
1824 // Drop client B's connection and ensure client A observes client B leaving the worktree.
1825 client_b.disconnect(&cx_b.to_async()).unwrap();
1826 project_a
1827 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1828 .await;
1829 }
1830
1831 #[gpui::test(iterations = 10)]
1832 async fn test_collaborating_with_diagnostics(
1833 cx_a: &mut TestAppContext,
1834 cx_b: &mut TestAppContext,
1835 ) {
1836 cx_a.foreground().forbid_parking();
1837 let mut lang_registry = Arc::new(LanguageRegistry::new());
1838 let fs = FakeFs::new(cx_a.background());
1839
1840 // Set up a fake language server.
1841 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
1842 Arc::get_mut(&mut lang_registry)
1843 .unwrap()
1844 .add(Arc::new(Language::new(
1845 LanguageConfig {
1846 name: "Rust".into(),
1847 path_suffixes: vec!["rs".to_string()],
1848 language_server: Some(language_server_config),
1849 ..Default::default()
1850 },
1851 Some(tree_sitter_rust::language()),
1852 )));
1853
1854 // Connect to a server as 2 clients.
1855 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1856 let client_a = server.create_client(cx_a, "user_a").await;
1857 let client_b = server.create_client(cx_b, "user_b").await;
1858
1859 // Share a project as client A
1860 fs.insert_tree(
1861 "/a",
1862 json!({
1863 ".zed.toml": r#"collaborators = ["user_b"]"#,
1864 "a.rs": "let one = two",
1865 "other.rs": "",
1866 }),
1867 )
1868 .await;
1869 let project_a = cx_a.update(|cx| {
1870 Project::local(
1871 client_a.clone(),
1872 client_a.user_store.clone(),
1873 lang_registry.clone(),
1874 fs.clone(),
1875 cx,
1876 )
1877 });
1878 let (worktree_a, _) = project_a
1879 .update(cx_a, |p, cx| {
1880 p.find_or_create_local_worktree("/a", false, cx)
1881 })
1882 .await
1883 .unwrap();
1884 worktree_a
1885 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1886 .await;
1887 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1888 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1889 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1890
1891 // Cause the language server to start.
1892 let _ = cx_a
1893 .background()
1894 .spawn(project_a.update(cx_a, |project, cx| {
1895 project.open_buffer(
1896 ProjectPath {
1897 worktree_id,
1898 path: Path::new("other.rs").into(),
1899 },
1900 cx,
1901 )
1902 }))
1903 .await
1904 .unwrap();
1905
1906 // Simulate a language server reporting errors for a file.
1907 let mut fake_language_server = fake_language_servers.next().await.unwrap();
1908 fake_language_server
1909 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1910 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1911 version: None,
1912 diagnostics: vec![lsp::Diagnostic {
1913 severity: Some(lsp::DiagnosticSeverity::ERROR),
1914 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1915 message: "message 1".to_string(),
1916 ..Default::default()
1917 }],
1918 })
1919 .await;
1920
1921 // Wait for server to see the diagnostics update.
1922 server
1923 .condition(|store| {
1924 let worktree = store
1925 .project(project_id)
1926 .unwrap()
1927 .share
1928 .as_ref()
1929 .unwrap()
1930 .worktrees
1931 .get(&worktree_id.to_proto())
1932 .unwrap();
1933
1934 !worktree.diagnostic_summaries.is_empty()
1935 })
1936 .await;
1937
1938 // Join the worktree as client B.
1939 let project_b = Project::remote(
1940 project_id,
1941 client_b.clone(),
1942 client_b.user_store.clone(),
1943 lang_registry.clone(),
1944 fs.clone(),
1945 &mut cx_b.to_async(),
1946 )
1947 .await
1948 .unwrap();
1949
1950 project_b.read_with(cx_b, |project, cx| {
1951 assert_eq!(
1952 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
1953 &[(
1954 ProjectPath {
1955 worktree_id,
1956 path: Arc::from(Path::new("a.rs")),
1957 },
1958 DiagnosticSummary {
1959 error_count: 1,
1960 warning_count: 0,
1961 ..Default::default()
1962 },
1963 )]
1964 )
1965 });
1966
1967 // Simulate a language server reporting more errors for a file.
1968 fake_language_server
1969 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1970 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1971 version: None,
1972 diagnostics: vec![
1973 lsp::Diagnostic {
1974 severity: Some(lsp::DiagnosticSeverity::ERROR),
1975 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1976 message: "message 1".to_string(),
1977 ..Default::default()
1978 },
1979 lsp::Diagnostic {
1980 severity: Some(lsp::DiagnosticSeverity::WARNING),
1981 range: lsp::Range::new(
1982 lsp::Position::new(0, 10),
1983 lsp::Position::new(0, 13),
1984 ),
1985 message: "message 2".to_string(),
1986 ..Default::default()
1987 },
1988 ],
1989 })
1990 .await;
1991
1992 // Client b gets the updated summaries
1993 project_b
1994 .condition(&cx_b, |project, cx| {
1995 project.diagnostic_summaries(cx).collect::<Vec<_>>()
1996 == &[(
1997 ProjectPath {
1998 worktree_id,
1999 path: Arc::from(Path::new("a.rs")),
2000 },
2001 DiagnosticSummary {
2002 error_count: 1,
2003 warning_count: 1,
2004 ..Default::default()
2005 },
2006 )]
2007 })
2008 .await;
2009
2010 // Open the file with the errors on client B. They should be present.
2011 let buffer_b = cx_b
2012 .background()
2013 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2014 .await
2015 .unwrap();
2016
2017 buffer_b.read_with(cx_b, |buffer, _| {
2018 assert_eq!(
2019 buffer
2020 .snapshot()
2021 .diagnostics_in_range::<_, Point>(0..buffer.len())
2022 .map(|entry| entry)
2023 .collect::<Vec<_>>(),
2024 &[
2025 DiagnosticEntry {
2026 range: Point::new(0, 4)..Point::new(0, 7),
2027 diagnostic: Diagnostic {
2028 group_id: 0,
2029 message: "message 1".to_string(),
2030 severity: lsp::DiagnosticSeverity::ERROR,
2031 is_primary: true,
2032 ..Default::default()
2033 }
2034 },
2035 DiagnosticEntry {
2036 range: Point::new(0, 10)..Point::new(0, 13),
2037 diagnostic: Diagnostic {
2038 group_id: 1,
2039 severity: lsp::DiagnosticSeverity::WARNING,
2040 message: "message 2".to_string(),
2041 is_primary: true,
2042 ..Default::default()
2043 }
2044 }
2045 ]
2046 );
2047 });
2048 }
2049
2050 #[gpui::test(iterations = 10)]
2051 async fn test_collaborating_with_completion(
2052 cx_a: &mut TestAppContext,
2053 cx_b: &mut TestAppContext,
2054 ) {
2055 cx_a.foreground().forbid_parking();
2056 let mut lang_registry = Arc::new(LanguageRegistry::new());
2057 let fs = FakeFs::new(cx_a.background());
2058
2059 // Set up a fake language server.
2060 let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2061 language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2062 completion_provider: Some(lsp::CompletionOptions {
2063 trigger_characters: Some(vec![".".to_string()]),
2064 ..Default::default()
2065 }),
2066 ..Default::default()
2067 });
2068 Arc::get_mut(&mut lang_registry)
2069 .unwrap()
2070 .add(Arc::new(Language::new(
2071 LanguageConfig {
2072 name: "Rust".into(),
2073 path_suffixes: vec!["rs".to_string()],
2074 language_server: Some(language_server_config),
2075 ..Default::default()
2076 },
2077 Some(tree_sitter_rust::language()),
2078 )));
2079
2080 // Connect to a server as 2 clients.
2081 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2082 let client_a = server.create_client(cx_a, "user_a").await;
2083 let client_b = server.create_client(cx_b, "user_b").await;
2084
2085 // Share a project as client A
2086 fs.insert_tree(
2087 "/a",
2088 json!({
2089 ".zed.toml": r#"collaborators = ["user_b"]"#,
2090 "main.rs": "fn main() { a }",
2091 "other.rs": "",
2092 }),
2093 )
2094 .await;
2095 let project_a = cx_a.update(|cx| {
2096 Project::local(
2097 client_a.clone(),
2098 client_a.user_store.clone(),
2099 lang_registry.clone(),
2100 fs.clone(),
2101 cx,
2102 )
2103 });
2104 let (worktree_a, _) = project_a
2105 .update(cx_a, |p, cx| {
2106 p.find_or_create_local_worktree("/a", false, cx)
2107 })
2108 .await
2109 .unwrap();
2110 worktree_a
2111 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2112 .await;
2113 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2114 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2115 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2116
2117 // Join the worktree as client B.
2118 let project_b = Project::remote(
2119 project_id,
2120 client_b.clone(),
2121 client_b.user_store.clone(),
2122 lang_registry.clone(),
2123 fs.clone(),
2124 &mut cx_b.to_async(),
2125 )
2126 .await
2127 .unwrap();
2128
2129 // Open a file in an editor as the guest.
2130 let buffer_b = project_b
2131 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2132 .await
2133 .unwrap();
2134 let (window_b, _) = cx_b.add_window(|_| EmptyView);
2135 let editor_b = cx_b.add_view(window_b, |cx| {
2136 Editor::for_buffer(
2137 cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2138 Some(project_b.clone()),
2139 watch::channel_with(Settings::test(cx)).1,
2140 cx,
2141 )
2142 });
2143
2144 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2145 buffer_b
2146 .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2147 .await;
2148
2149 // Type a completion trigger character as the guest.
2150 editor_b.update(cx_b, |editor, cx| {
2151 editor.select_ranges([13..13], None, cx);
2152 editor.handle_input(&Input(".".into()), cx);
2153 cx.focus(&editor_b);
2154 });
2155
2156 // Receive a completion request as the host's language server.
2157 // Return some completions from the host's language server.
2158 cx_a.foreground().start_waiting();
2159 fake_language_server
2160 .handle_request::<lsp::request::Completion, _>(|params, _| {
2161 assert_eq!(
2162 params.text_document_position.text_document.uri,
2163 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2164 );
2165 assert_eq!(
2166 params.text_document_position.position,
2167 lsp::Position::new(0, 14),
2168 );
2169
2170 Some(lsp::CompletionResponse::Array(vec![
2171 lsp::CompletionItem {
2172 label: "first_method(…)".into(),
2173 detail: Some("fn(&mut self, B) -> C".into()),
2174 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2175 new_text: "first_method($1)".to_string(),
2176 range: lsp::Range::new(
2177 lsp::Position::new(0, 14),
2178 lsp::Position::new(0, 14),
2179 ),
2180 })),
2181 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2182 ..Default::default()
2183 },
2184 lsp::CompletionItem {
2185 label: "second_method(…)".into(),
2186 detail: Some("fn(&mut self, C) -> D<E>".into()),
2187 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2188 new_text: "second_method()".to_string(),
2189 range: lsp::Range::new(
2190 lsp::Position::new(0, 14),
2191 lsp::Position::new(0, 14),
2192 ),
2193 })),
2194 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2195 ..Default::default()
2196 },
2197 ]))
2198 })
2199 .next()
2200 .await
2201 .unwrap();
2202 cx_a.foreground().finish_waiting();
2203
2204 // Open the buffer on the host.
2205 let buffer_a = project_a
2206 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2207 .await
2208 .unwrap();
2209 buffer_a
2210 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2211 .await;
2212
2213 // Confirm a completion on the guest.
2214 editor_b
2215 .condition(&cx_b, |editor, _| editor.context_menu_visible())
2216 .await;
2217 editor_b.update(cx_b, |editor, cx| {
2218 editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2219 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2220 });
2221
2222 // Return a resolved completion from the host's language server.
2223 // The resolved completion has an additional text edit.
2224 fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(
2225 |params, _| {
2226 assert_eq!(params.label, "first_method(…)");
2227 lsp::CompletionItem {
2228 label: "first_method(…)".into(),
2229 detail: Some("fn(&mut self, B) -> C".into()),
2230 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2231 new_text: "first_method($1)".to_string(),
2232 range: lsp::Range::new(
2233 lsp::Position::new(0, 14),
2234 lsp::Position::new(0, 14),
2235 ),
2236 })),
2237 additional_text_edits: Some(vec![lsp::TextEdit {
2238 new_text: "use d::SomeTrait;\n".to_string(),
2239 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2240 }]),
2241 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2242 ..Default::default()
2243 }
2244 },
2245 );
2246
2247 // The additional edit is applied.
2248 buffer_a
2249 .condition(&cx_a, |buffer, _| {
2250 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2251 })
2252 .await;
2253 buffer_b
2254 .condition(&cx_b, |buffer, _| {
2255 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2256 })
2257 .await;
2258 }
2259
2260 #[gpui::test(iterations = 10)]
2261 async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2262 cx_a.foreground().forbid_parking();
2263 let mut lang_registry = Arc::new(LanguageRegistry::new());
2264 let fs = FakeFs::new(cx_a.background());
2265
2266 // Set up a fake language server.
2267 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2268 Arc::get_mut(&mut lang_registry)
2269 .unwrap()
2270 .add(Arc::new(Language::new(
2271 LanguageConfig {
2272 name: "Rust".into(),
2273 path_suffixes: vec!["rs".to_string()],
2274 language_server: Some(language_server_config),
2275 ..Default::default()
2276 },
2277 Some(tree_sitter_rust::language()),
2278 )));
2279
2280 // Connect to a server as 2 clients.
2281 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2282 let client_a = server.create_client(cx_a, "user_a").await;
2283 let client_b = server.create_client(cx_b, "user_b").await;
2284
2285 // Share a project as client A
2286 fs.insert_tree(
2287 "/a",
2288 json!({
2289 ".zed.toml": r#"collaborators = ["user_b"]"#,
2290 "a.rs": "let one = two",
2291 }),
2292 )
2293 .await;
2294 let project_a = cx_a.update(|cx| {
2295 Project::local(
2296 client_a.clone(),
2297 client_a.user_store.clone(),
2298 lang_registry.clone(),
2299 fs.clone(),
2300 cx,
2301 )
2302 });
2303 let (worktree_a, _) = project_a
2304 .update(cx_a, |p, cx| {
2305 p.find_or_create_local_worktree("/a", false, cx)
2306 })
2307 .await
2308 .unwrap();
2309 worktree_a
2310 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2311 .await;
2312 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2313 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2314 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2315
2316 // Join the worktree as client B.
2317 let project_b = Project::remote(
2318 project_id,
2319 client_b.clone(),
2320 client_b.user_store.clone(),
2321 lang_registry.clone(),
2322 fs.clone(),
2323 &mut cx_b.to_async(),
2324 )
2325 .await
2326 .unwrap();
2327
2328 let buffer_b = cx_b
2329 .background()
2330 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2331 .await
2332 .unwrap();
2333
2334 let format = project_b.update(cx_b, |project, cx| {
2335 project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2336 });
2337
2338 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2339 fake_language_server.handle_request::<lsp::request::Formatting, _>(|_, _| {
2340 Some(vec![
2341 lsp::TextEdit {
2342 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2343 new_text: "h".to_string(),
2344 },
2345 lsp::TextEdit {
2346 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2347 new_text: "y".to_string(),
2348 },
2349 ])
2350 });
2351
2352 format.await.unwrap();
2353 assert_eq!(
2354 buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
2355 "let honey = two"
2356 );
2357 }
2358
2359 #[gpui::test(iterations = 10)]
2360 async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2361 cx_a.foreground().forbid_parking();
2362 let mut lang_registry = Arc::new(LanguageRegistry::new());
2363 let fs = FakeFs::new(cx_a.background());
2364 fs.insert_tree(
2365 "/root-1",
2366 json!({
2367 ".zed.toml": r#"collaborators = ["user_b"]"#,
2368 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2369 }),
2370 )
2371 .await;
2372 fs.insert_tree(
2373 "/root-2",
2374 json!({
2375 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2376 }),
2377 )
2378 .await;
2379
2380 // Set up a fake language server.
2381 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2382 Arc::get_mut(&mut lang_registry)
2383 .unwrap()
2384 .add(Arc::new(Language::new(
2385 LanguageConfig {
2386 name: "Rust".into(),
2387 path_suffixes: vec!["rs".to_string()],
2388 language_server: Some(language_server_config),
2389 ..Default::default()
2390 },
2391 Some(tree_sitter_rust::language()),
2392 )));
2393
2394 // Connect to a server as 2 clients.
2395 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2396 let client_a = server.create_client(cx_a, "user_a").await;
2397 let client_b = server.create_client(cx_b, "user_b").await;
2398
2399 // Share a project as client A
2400 let project_a = cx_a.update(|cx| {
2401 Project::local(
2402 client_a.clone(),
2403 client_a.user_store.clone(),
2404 lang_registry.clone(),
2405 fs.clone(),
2406 cx,
2407 )
2408 });
2409 let (worktree_a, _) = project_a
2410 .update(cx_a, |p, cx| {
2411 p.find_or_create_local_worktree("/root-1", false, cx)
2412 })
2413 .await
2414 .unwrap();
2415 worktree_a
2416 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2417 .await;
2418 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2419 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2420 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2421
2422 // Join the worktree as client B.
2423 let project_b = Project::remote(
2424 project_id,
2425 client_b.clone(),
2426 client_b.user_store.clone(),
2427 lang_registry.clone(),
2428 fs.clone(),
2429 &mut cx_b.to_async(),
2430 )
2431 .await
2432 .unwrap();
2433
2434 // Open the file on client B.
2435 let buffer_b = cx_b
2436 .background()
2437 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2438 .await
2439 .unwrap();
2440
2441 // Request the definition of a symbol as the guest.
2442 let definitions_1 = project_b.update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2443
2444 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2445 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2446 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2447 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2448 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2449 )))
2450 });
2451
2452 let definitions_1 = definitions_1.await.unwrap();
2453 cx_b.read(|cx| {
2454 assert_eq!(definitions_1.len(), 1);
2455 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2456 let target_buffer = definitions_1[0].buffer.read(cx);
2457 assert_eq!(
2458 target_buffer.text(),
2459 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2460 );
2461 assert_eq!(
2462 definitions_1[0].range.to_point(target_buffer),
2463 Point::new(0, 6)..Point::new(0, 9)
2464 );
2465 });
2466
2467 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2468 // the previous call to `definition`.
2469 let definitions_2 = project_b.update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2470 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2471 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2472 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2473 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2474 )))
2475 });
2476
2477 let definitions_2 = definitions_2.await.unwrap();
2478 cx_b.read(|cx| {
2479 assert_eq!(definitions_2.len(), 1);
2480 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2481 let target_buffer = definitions_2[0].buffer.read(cx);
2482 assert_eq!(
2483 target_buffer.text(),
2484 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2485 );
2486 assert_eq!(
2487 definitions_2[0].range.to_point(target_buffer),
2488 Point::new(1, 6)..Point::new(1, 11)
2489 );
2490 });
2491 assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
2492 }
2493
2494 #[gpui::test(iterations = 10)]
2495 async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2496 cx_a.foreground().forbid_parking();
2497 let mut lang_registry = Arc::new(LanguageRegistry::new());
2498 let fs = FakeFs::new(cx_a.background());
2499 fs.insert_tree(
2500 "/root-1",
2501 json!({
2502 ".zed.toml": r#"collaborators = ["user_b"]"#,
2503 "one.rs": "const ONE: usize = 1;",
2504 "two.rs": "const TWO: usize = one::ONE + one::ONE;",
2505 }),
2506 )
2507 .await;
2508 fs.insert_tree(
2509 "/root-2",
2510 json!({
2511 "three.rs": "const THREE: usize = two::TWO + one::ONE;",
2512 }),
2513 )
2514 .await;
2515
2516 // Set up a fake language server.
2517 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2518 Arc::get_mut(&mut lang_registry)
2519 .unwrap()
2520 .add(Arc::new(Language::new(
2521 LanguageConfig {
2522 name: "Rust".into(),
2523 path_suffixes: vec!["rs".to_string()],
2524 language_server: Some(language_server_config),
2525 ..Default::default()
2526 },
2527 Some(tree_sitter_rust::language()),
2528 )));
2529
2530 // Connect to a server as 2 clients.
2531 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2532 let client_a = server.create_client(cx_a, "user_a").await;
2533 let client_b = server.create_client(cx_b, "user_b").await;
2534
2535 // Share a project as client A
2536 let project_a = cx_a.update(|cx| {
2537 Project::local(
2538 client_a.clone(),
2539 client_a.user_store.clone(),
2540 lang_registry.clone(),
2541 fs.clone(),
2542 cx,
2543 )
2544 });
2545 let (worktree_a, _) = project_a
2546 .update(cx_a, |p, cx| {
2547 p.find_or_create_local_worktree("/root-1", false, cx)
2548 })
2549 .await
2550 .unwrap();
2551 worktree_a
2552 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2553 .await;
2554 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2555 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2556 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2557
2558 // Join the worktree as client B.
2559 let project_b = Project::remote(
2560 project_id,
2561 client_b.clone(),
2562 client_b.user_store.clone(),
2563 lang_registry.clone(),
2564 fs.clone(),
2565 &mut cx_b.to_async(),
2566 )
2567 .await
2568 .unwrap();
2569
2570 // Open the file on client B.
2571 let buffer_b = cx_b
2572 .background()
2573 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
2574 .await
2575 .unwrap();
2576
2577 // Request references to a symbol as the guest.
2578 let references = project_b.update(cx_b, |p, cx| p.references(&buffer_b, 7, cx));
2579
2580 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2581 fake_language_server.handle_request::<lsp::request::References, _>(|params, _| {
2582 assert_eq!(
2583 params.text_document_position.text_document.uri.as_str(),
2584 "file:///root-1/one.rs"
2585 );
2586 Some(vec![
2587 lsp::Location {
2588 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2589 range: lsp::Range::new(lsp::Position::new(0, 24), lsp::Position::new(0, 27)),
2590 },
2591 lsp::Location {
2592 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2593 range: lsp::Range::new(lsp::Position::new(0, 35), lsp::Position::new(0, 38)),
2594 },
2595 lsp::Location {
2596 uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
2597 range: lsp::Range::new(lsp::Position::new(0, 37), lsp::Position::new(0, 40)),
2598 },
2599 ])
2600 });
2601
2602 let references = references.await.unwrap();
2603 cx_b.read(|cx| {
2604 assert_eq!(references.len(), 3);
2605 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2606
2607 let two_buffer = references[0].buffer.read(cx);
2608 let three_buffer = references[2].buffer.read(cx);
2609 assert_eq!(
2610 two_buffer.file().unwrap().path().as_ref(),
2611 Path::new("two.rs")
2612 );
2613 assert_eq!(references[1].buffer, references[0].buffer);
2614 assert_eq!(
2615 three_buffer.file().unwrap().full_path(cx),
2616 Path::new("three.rs")
2617 );
2618
2619 assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
2620 assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
2621 assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
2622 });
2623 }
2624
2625 #[gpui::test(iterations = 10)]
2626 async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2627 cx_a.foreground().forbid_parking();
2628 let lang_registry = Arc::new(LanguageRegistry::new());
2629 let fs = FakeFs::new(cx_a.background());
2630 fs.insert_tree(
2631 "/root-1",
2632 json!({
2633 ".zed.toml": r#"collaborators = ["user_b"]"#,
2634 "a": "hello world",
2635 "b": "goodnight moon",
2636 "c": "a world of goo",
2637 "d": "world champion of clown world",
2638 }),
2639 )
2640 .await;
2641 fs.insert_tree(
2642 "/root-2",
2643 json!({
2644 "e": "disney world is fun",
2645 }),
2646 )
2647 .await;
2648
2649 // Connect to a server as 2 clients.
2650 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2651 let client_a = server.create_client(cx_a, "user_a").await;
2652 let client_b = server.create_client(cx_b, "user_b").await;
2653
2654 // Share a project as client A
2655 let project_a = cx_a.update(|cx| {
2656 Project::local(
2657 client_a.clone(),
2658 client_a.user_store.clone(),
2659 lang_registry.clone(),
2660 fs.clone(),
2661 cx,
2662 )
2663 });
2664 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2665
2666 let (worktree_1, _) = project_a
2667 .update(cx_a, |p, cx| {
2668 p.find_or_create_local_worktree("/root-1", false, cx)
2669 })
2670 .await
2671 .unwrap();
2672 worktree_1
2673 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2674 .await;
2675 let (worktree_2, _) = project_a
2676 .update(cx_a, |p, cx| {
2677 p.find_or_create_local_worktree("/root-2", false, cx)
2678 })
2679 .await
2680 .unwrap();
2681 worktree_2
2682 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2683 .await;
2684
2685 eprintln!("sharing");
2686
2687 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2688
2689 // Join the worktree as client B.
2690 let project_b = Project::remote(
2691 project_id,
2692 client_b.clone(),
2693 client_b.user_store.clone(),
2694 lang_registry.clone(),
2695 fs.clone(),
2696 &mut cx_b.to_async(),
2697 )
2698 .await
2699 .unwrap();
2700
2701 let results = project_b
2702 .update(cx_b, |project, cx| {
2703 project.search(SearchQuery::text("world", false, false), cx)
2704 })
2705 .await
2706 .unwrap();
2707
2708 let mut ranges_by_path = results
2709 .into_iter()
2710 .map(|(buffer, ranges)| {
2711 buffer.read_with(cx_b, |buffer, cx| {
2712 let path = buffer.file().unwrap().full_path(cx);
2713 let offset_ranges = ranges
2714 .into_iter()
2715 .map(|range| range.to_offset(buffer))
2716 .collect::<Vec<_>>();
2717 (path, offset_ranges)
2718 })
2719 })
2720 .collect::<Vec<_>>();
2721 ranges_by_path.sort_by_key(|(path, _)| path.clone());
2722
2723 assert_eq!(
2724 ranges_by_path,
2725 &[
2726 (PathBuf::from("root-1/a"), vec![6..11]),
2727 (PathBuf::from("root-1/c"), vec![2..7]),
2728 (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
2729 (PathBuf::from("root-2/e"), vec![7..12]),
2730 ]
2731 );
2732 }
2733
2734 #[gpui::test(iterations = 10)]
2735 async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2736 cx_a.foreground().forbid_parking();
2737 let lang_registry = Arc::new(LanguageRegistry::new());
2738 let fs = FakeFs::new(cx_a.background());
2739 fs.insert_tree(
2740 "/root-1",
2741 json!({
2742 ".zed.toml": r#"collaborators = ["user_b"]"#,
2743 "main.rs": "fn double(number: i32) -> i32 { number + number }",
2744 }),
2745 )
2746 .await;
2747
2748 // Set up a fake language server.
2749 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2750 lang_registry.add(Arc::new(Language::new(
2751 LanguageConfig {
2752 name: "Rust".into(),
2753 path_suffixes: vec!["rs".to_string()],
2754 language_server: Some(language_server_config),
2755 ..Default::default()
2756 },
2757 Some(tree_sitter_rust::language()),
2758 )));
2759
2760 // Connect to a server as 2 clients.
2761 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2762 let client_a = server.create_client(cx_a, "user_a").await;
2763 let client_b = server.create_client(cx_b, "user_b").await;
2764
2765 // Share a project as client A
2766 let project_a = cx_a.update(|cx| {
2767 Project::local(
2768 client_a.clone(),
2769 client_a.user_store.clone(),
2770 lang_registry.clone(),
2771 fs.clone(),
2772 cx,
2773 )
2774 });
2775 let (worktree_a, _) = project_a
2776 .update(cx_a, |p, cx| {
2777 p.find_or_create_local_worktree("/root-1", false, cx)
2778 })
2779 .await
2780 .unwrap();
2781 worktree_a
2782 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2783 .await;
2784 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2785 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2786 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2787
2788 // Join the worktree as client B.
2789 let project_b = Project::remote(
2790 project_id,
2791 client_b.clone(),
2792 client_b.user_store.clone(),
2793 lang_registry.clone(),
2794 fs.clone(),
2795 &mut cx_b.to_async(),
2796 )
2797 .await
2798 .unwrap();
2799
2800 // Open the file on client B.
2801 let buffer_b = cx_b
2802 .background()
2803 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
2804 .await
2805 .unwrap();
2806
2807 // Request document highlights as the guest.
2808 let highlights = project_b.update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx));
2809
2810 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2811 fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _>(
2812 |params, _| {
2813 assert_eq!(
2814 params
2815 .text_document_position_params
2816 .text_document
2817 .uri
2818 .as_str(),
2819 "file:///root-1/main.rs"
2820 );
2821 assert_eq!(
2822 params.text_document_position_params.position,
2823 lsp::Position::new(0, 34)
2824 );
2825 Some(vec![
2826 lsp::DocumentHighlight {
2827 kind: Some(lsp::DocumentHighlightKind::WRITE),
2828 range: lsp::Range::new(
2829 lsp::Position::new(0, 10),
2830 lsp::Position::new(0, 16),
2831 ),
2832 },
2833 lsp::DocumentHighlight {
2834 kind: Some(lsp::DocumentHighlightKind::READ),
2835 range: lsp::Range::new(
2836 lsp::Position::new(0, 32),
2837 lsp::Position::new(0, 38),
2838 ),
2839 },
2840 lsp::DocumentHighlight {
2841 kind: Some(lsp::DocumentHighlightKind::READ),
2842 range: lsp::Range::new(
2843 lsp::Position::new(0, 41),
2844 lsp::Position::new(0, 47),
2845 ),
2846 },
2847 ])
2848 },
2849 );
2850
2851 let highlights = highlights.await.unwrap();
2852 buffer_b.read_with(cx_b, |buffer, _| {
2853 let snapshot = buffer.snapshot();
2854
2855 let highlights = highlights
2856 .into_iter()
2857 .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
2858 .collect::<Vec<_>>();
2859 assert_eq!(
2860 highlights,
2861 &[
2862 (lsp::DocumentHighlightKind::WRITE, 10..16),
2863 (lsp::DocumentHighlightKind::READ, 32..38),
2864 (lsp::DocumentHighlightKind::READ, 41..47)
2865 ]
2866 )
2867 });
2868 }
2869
2870 #[gpui::test(iterations = 10)]
2871 async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2872 cx_a.foreground().forbid_parking();
2873 let mut lang_registry = Arc::new(LanguageRegistry::new());
2874 let fs = FakeFs::new(cx_a.background());
2875 fs.insert_tree(
2876 "/code",
2877 json!({
2878 "crate-1": {
2879 ".zed.toml": r#"collaborators = ["user_b"]"#,
2880 "one.rs": "const ONE: usize = 1;",
2881 },
2882 "crate-2": {
2883 "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
2884 },
2885 "private": {
2886 "passwords.txt": "the-password",
2887 }
2888 }),
2889 )
2890 .await;
2891
2892 // Set up a fake language server.
2893 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2894 Arc::get_mut(&mut lang_registry)
2895 .unwrap()
2896 .add(Arc::new(Language::new(
2897 LanguageConfig {
2898 name: "Rust".into(),
2899 path_suffixes: vec!["rs".to_string()],
2900 language_server: Some(language_server_config),
2901 ..Default::default()
2902 },
2903 Some(tree_sitter_rust::language()),
2904 )));
2905
2906 // Connect to a server as 2 clients.
2907 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2908 let client_a = server.create_client(cx_a, "user_a").await;
2909 let client_b = server.create_client(cx_b, "user_b").await;
2910
2911 // Share a project as client A
2912 let project_a = cx_a.update(|cx| {
2913 Project::local(
2914 client_a.clone(),
2915 client_a.user_store.clone(),
2916 lang_registry.clone(),
2917 fs.clone(),
2918 cx,
2919 )
2920 });
2921 let (worktree_a, _) = project_a
2922 .update(cx_a, |p, cx| {
2923 p.find_or_create_local_worktree("/code/crate-1", false, cx)
2924 })
2925 .await
2926 .unwrap();
2927 worktree_a
2928 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2929 .await;
2930 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2931 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2932 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2933
2934 // Join the worktree as client B.
2935 let project_b = Project::remote(
2936 project_id,
2937 client_b.clone(),
2938 client_b.user_store.clone(),
2939 lang_registry.clone(),
2940 fs.clone(),
2941 &mut cx_b.to_async(),
2942 )
2943 .await
2944 .unwrap();
2945
2946 // Cause the language server to start.
2947 let _buffer = cx_b
2948 .background()
2949 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
2950 .await
2951 .unwrap();
2952
2953 // Request the definition of a symbol as the guest.
2954 let symbols = project_b.update(cx_b, |p, cx| p.symbols("two", cx));
2955 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2956 fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _>(|_, _| {
2957 #[allow(deprecated)]
2958 Some(vec![lsp::SymbolInformation {
2959 name: "TWO".into(),
2960 location: lsp::Location {
2961 uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
2962 range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2963 },
2964 kind: lsp::SymbolKind::CONSTANT,
2965 tags: None,
2966 container_name: None,
2967 deprecated: None,
2968 }])
2969 });
2970
2971 let symbols = symbols.await.unwrap();
2972 assert_eq!(symbols.len(), 1);
2973 assert_eq!(symbols[0].name, "TWO");
2974
2975 // Open one of the returned symbols.
2976 let buffer_b_2 = project_b
2977 .update(cx_b, |project, cx| {
2978 project.open_buffer_for_symbol(&symbols[0], cx)
2979 })
2980 .await
2981 .unwrap();
2982 buffer_b_2.read_with(cx_b, |buffer, _| {
2983 assert_eq!(
2984 buffer.file().unwrap().path().as_ref(),
2985 Path::new("../crate-2/two.rs")
2986 );
2987 });
2988
2989 // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
2990 let mut fake_symbol = symbols[0].clone();
2991 fake_symbol.path = Path::new("/code/secrets").into();
2992 let error = project_b
2993 .update(cx_b, |project, cx| {
2994 project.open_buffer_for_symbol(&fake_symbol, cx)
2995 })
2996 .await
2997 .unwrap_err();
2998 assert!(error.to_string().contains("invalid symbol signature"));
2999 }
3000
3001 #[gpui::test(iterations = 10)]
3002 async fn test_open_buffer_while_getting_definition_pointing_to_it(
3003 cx_a: &mut TestAppContext,
3004 cx_b: &mut TestAppContext,
3005 mut rng: StdRng,
3006 ) {
3007 cx_a.foreground().forbid_parking();
3008 let mut lang_registry = Arc::new(LanguageRegistry::new());
3009 let fs = FakeFs::new(cx_a.background());
3010 fs.insert_tree(
3011 "/root",
3012 json!({
3013 ".zed.toml": r#"collaborators = ["user_b"]"#,
3014 "a.rs": "const ONE: usize = b::TWO;",
3015 "b.rs": "const TWO: usize = 2",
3016 }),
3017 )
3018 .await;
3019
3020 // Set up a fake language server.
3021 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3022
3023 Arc::get_mut(&mut lang_registry)
3024 .unwrap()
3025 .add(Arc::new(Language::new(
3026 LanguageConfig {
3027 name: "Rust".into(),
3028 path_suffixes: vec!["rs".to_string()],
3029 language_server: Some(language_server_config),
3030 ..Default::default()
3031 },
3032 Some(tree_sitter_rust::language()),
3033 )));
3034
3035 // Connect to a server as 2 clients.
3036 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3037 let client_a = server.create_client(cx_a, "user_a").await;
3038 let client_b = server.create_client(cx_b, "user_b").await;
3039
3040 // Share a project as client A
3041 let project_a = cx_a.update(|cx| {
3042 Project::local(
3043 client_a.clone(),
3044 client_a.user_store.clone(),
3045 lang_registry.clone(),
3046 fs.clone(),
3047 cx,
3048 )
3049 });
3050
3051 let (worktree_a, _) = project_a
3052 .update(cx_a, |p, cx| {
3053 p.find_or_create_local_worktree("/root", false, cx)
3054 })
3055 .await
3056 .unwrap();
3057 worktree_a
3058 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3059 .await;
3060 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3061 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3062 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3063
3064 // Join the worktree as client B.
3065 let project_b = Project::remote(
3066 project_id,
3067 client_b.clone(),
3068 client_b.user_store.clone(),
3069 lang_registry.clone(),
3070 fs.clone(),
3071 &mut cx_b.to_async(),
3072 )
3073 .await
3074 .unwrap();
3075
3076 let buffer_b1 = cx_b
3077 .background()
3078 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3079 .await
3080 .unwrap();
3081
3082 let definitions;
3083 let buffer_b2;
3084 if rng.gen() {
3085 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3086 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3087 } else {
3088 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3089 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3090 }
3091
3092 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3093 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
3094 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
3095 lsp::Url::from_file_path("/root/b.rs").unwrap(),
3096 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3097 )))
3098 });
3099
3100 let buffer_b2 = buffer_b2.await.unwrap();
3101 let definitions = definitions.await.unwrap();
3102 assert_eq!(definitions.len(), 1);
3103 assert_eq!(definitions[0].buffer, buffer_b2);
3104 }
3105
3106 #[gpui::test(iterations = 10)]
3107 async fn test_collaborating_with_code_actions(
3108 cx_a: &mut TestAppContext,
3109 cx_b: &mut TestAppContext,
3110 ) {
3111 cx_a.foreground().forbid_parking();
3112 let mut lang_registry = Arc::new(LanguageRegistry::new());
3113 let fs = FakeFs::new(cx_a.background());
3114 let mut path_openers_b = Vec::new();
3115 cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3116
3117 // Set up a fake language server.
3118 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3119 Arc::get_mut(&mut lang_registry)
3120 .unwrap()
3121 .add(Arc::new(Language::new(
3122 LanguageConfig {
3123 name: "Rust".into(),
3124 path_suffixes: vec!["rs".to_string()],
3125 language_server: Some(language_server_config),
3126 ..Default::default()
3127 },
3128 Some(tree_sitter_rust::language()),
3129 )));
3130
3131 // Connect to a server as 2 clients.
3132 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3133 let client_a = server.create_client(cx_a, "user_a").await;
3134 let client_b = server.create_client(cx_b, "user_b").await;
3135
3136 // Share a project as client A
3137 fs.insert_tree(
3138 "/a",
3139 json!({
3140 ".zed.toml": r#"collaborators = ["user_b"]"#,
3141 "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3142 "other.rs": "pub fn foo() -> usize { 4 }",
3143 }),
3144 )
3145 .await;
3146 let project_a = cx_a.update(|cx| {
3147 Project::local(
3148 client_a.clone(),
3149 client_a.user_store.clone(),
3150 lang_registry.clone(),
3151 fs.clone(),
3152 cx,
3153 )
3154 });
3155 let (worktree_a, _) = project_a
3156 .update(cx_a, |p, cx| {
3157 p.find_or_create_local_worktree("/a", false, cx)
3158 })
3159 .await
3160 .unwrap();
3161 worktree_a
3162 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3163 .await;
3164 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3165 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3166 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3167
3168 // Join the worktree as client B.
3169 let project_b = Project::remote(
3170 project_id,
3171 client_b.clone(),
3172 client_b.user_store.clone(),
3173 lang_registry.clone(),
3174 fs.clone(),
3175 &mut cx_b.to_async(),
3176 )
3177 .await
3178 .unwrap();
3179 let mut params = cx_b.update(WorkspaceParams::test);
3180 params.languages = lang_registry.clone();
3181 params.client = client_b.client.clone();
3182 params.user_store = client_b.user_store.clone();
3183 params.project = project_b;
3184 params.path_openers = path_openers_b.into();
3185
3186 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
3187 let editor_b = workspace_b
3188 .update(cx_b, |workspace, cx| {
3189 workspace.open_path((worktree_id, "main.rs").into(), cx)
3190 })
3191 .await
3192 .unwrap()
3193 .downcast::<Editor>()
3194 .unwrap();
3195
3196 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3197 fake_language_server
3198 .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3199 assert_eq!(
3200 params.text_document.uri,
3201 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3202 );
3203 assert_eq!(params.range.start, lsp::Position::new(0, 0));
3204 assert_eq!(params.range.end, lsp::Position::new(0, 0));
3205 None
3206 })
3207 .next()
3208 .await;
3209
3210 // Move cursor to a location that contains code actions.
3211 editor_b.update(cx_b, |editor, cx| {
3212 editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3213 cx.focus(&editor_b);
3214 });
3215
3216 fake_language_server
3217 .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3218 assert_eq!(
3219 params.text_document.uri,
3220 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3221 );
3222 assert_eq!(params.range.start, lsp::Position::new(1, 31));
3223 assert_eq!(params.range.end, lsp::Position::new(1, 31));
3224
3225 Some(vec![lsp::CodeActionOrCommand::CodeAction(
3226 lsp::CodeAction {
3227 title: "Inline into all callers".to_string(),
3228 edit: Some(lsp::WorkspaceEdit {
3229 changes: Some(
3230 [
3231 (
3232 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3233 vec![lsp::TextEdit::new(
3234 lsp::Range::new(
3235 lsp::Position::new(1, 22),
3236 lsp::Position::new(1, 34),
3237 ),
3238 "4".to_string(),
3239 )],
3240 ),
3241 (
3242 lsp::Url::from_file_path("/a/other.rs").unwrap(),
3243 vec![lsp::TextEdit::new(
3244 lsp::Range::new(
3245 lsp::Position::new(0, 0),
3246 lsp::Position::new(0, 27),
3247 ),
3248 "".to_string(),
3249 )],
3250 ),
3251 ]
3252 .into_iter()
3253 .collect(),
3254 ),
3255 ..Default::default()
3256 }),
3257 data: Some(json!({
3258 "codeActionParams": {
3259 "range": {
3260 "start": {"line": 1, "column": 31},
3261 "end": {"line": 1, "column": 31},
3262 }
3263 }
3264 })),
3265 ..Default::default()
3266 },
3267 )])
3268 })
3269 .next()
3270 .await;
3271
3272 // Toggle code actions and wait for them to display.
3273 editor_b.update(cx_b, |editor, cx| {
3274 editor.toggle_code_actions(&ToggleCodeActions(false), cx);
3275 });
3276 editor_b
3277 .condition(&cx_b, |editor, _| editor.context_menu_visible())
3278 .await;
3279
3280 fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3281
3282 // Confirming the code action will trigger a resolve request.
3283 let confirm_action = workspace_b
3284 .update(cx_b, |workspace, cx| {
3285 Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
3286 })
3287 .unwrap();
3288 fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_, _| {
3289 lsp::CodeAction {
3290 title: "Inline into all callers".to_string(),
3291 edit: Some(lsp::WorkspaceEdit {
3292 changes: Some(
3293 [
3294 (
3295 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3296 vec![lsp::TextEdit::new(
3297 lsp::Range::new(
3298 lsp::Position::new(1, 22),
3299 lsp::Position::new(1, 34),
3300 ),
3301 "4".to_string(),
3302 )],
3303 ),
3304 (
3305 lsp::Url::from_file_path("/a/other.rs").unwrap(),
3306 vec![lsp::TextEdit::new(
3307 lsp::Range::new(
3308 lsp::Position::new(0, 0),
3309 lsp::Position::new(0, 27),
3310 ),
3311 "".to_string(),
3312 )],
3313 ),
3314 ]
3315 .into_iter()
3316 .collect(),
3317 ),
3318 ..Default::default()
3319 }),
3320 ..Default::default()
3321 }
3322 });
3323
3324 // After the action is confirmed, an editor containing both modified files is opened.
3325 confirm_action.await.unwrap();
3326 let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3327 workspace
3328 .active_item(cx)
3329 .unwrap()
3330 .downcast::<Editor>()
3331 .unwrap()
3332 });
3333 code_action_editor.update(cx_b, |editor, cx| {
3334 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3335 editor.undo(&Undo, cx);
3336 assert_eq!(
3337 editor.text(cx),
3338 "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3339 );
3340 editor.redo(&Redo, cx);
3341 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3342 });
3343 }
3344
3345 #[gpui::test(iterations = 10)]
3346 async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3347 cx_a.foreground().forbid_parking();
3348 let mut lang_registry = Arc::new(LanguageRegistry::new());
3349 let fs = FakeFs::new(cx_a.background());
3350 let mut path_openers_b = Vec::new();
3351 cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3352
3353 // Set up a fake language server.
3354 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3355 Arc::get_mut(&mut lang_registry)
3356 .unwrap()
3357 .add(Arc::new(Language::new(
3358 LanguageConfig {
3359 name: "Rust".into(),
3360 path_suffixes: vec!["rs".to_string()],
3361 language_server: Some(language_server_config),
3362 ..Default::default()
3363 },
3364 Some(tree_sitter_rust::language()),
3365 )));
3366
3367 // Connect to a server as 2 clients.
3368 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3369 let client_a = server.create_client(cx_a, "user_a").await;
3370 let client_b = server.create_client(cx_b, "user_b").await;
3371
3372 // Share a project as client A
3373 fs.insert_tree(
3374 "/dir",
3375 json!({
3376 ".zed.toml": r#"collaborators = ["user_b"]"#,
3377 "one.rs": "const ONE: usize = 1;",
3378 "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3379 }),
3380 )
3381 .await;
3382 let project_a = cx_a.update(|cx| {
3383 Project::local(
3384 client_a.clone(),
3385 client_a.user_store.clone(),
3386 lang_registry.clone(),
3387 fs.clone(),
3388 cx,
3389 )
3390 });
3391 let (worktree_a, _) = project_a
3392 .update(cx_a, |p, cx| {
3393 p.find_or_create_local_worktree("/dir", false, cx)
3394 })
3395 .await
3396 .unwrap();
3397 worktree_a
3398 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3399 .await;
3400 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3401 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3402 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3403
3404 // Join the worktree as client B.
3405 let project_b = Project::remote(
3406 project_id,
3407 client_b.clone(),
3408 client_b.user_store.clone(),
3409 lang_registry.clone(),
3410 fs.clone(),
3411 &mut cx_b.to_async(),
3412 )
3413 .await
3414 .unwrap();
3415 let mut params = cx_b.update(WorkspaceParams::test);
3416 params.languages = lang_registry.clone();
3417 params.client = client_b.client.clone();
3418 params.user_store = client_b.user_store.clone();
3419 params.project = project_b;
3420 params.path_openers = path_openers_b.into();
3421
3422 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
3423 let editor_b = workspace_b
3424 .update(cx_b, |workspace, cx| {
3425 workspace.open_path((worktree_id, "one.rs").into(), cx)
3426 })
3427 .await
3428 .unwrap()
3429 .downcast::<Editor>()
3430 .unwrap();
3431 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3432
3433 // Move cursor to a location that can be renamed.
3434 let prepare_rename = editor_b.update(cx_b, |editor, cx| {
3435 editor.select_ranges([7..7], None, cx);
3436 editor.rename(&Rename, cx).unwrap()
3437 });
3438
3439 fake_language_server
3440 .handle_request::<lsp::request::PrepareRenameRequest, _>(|params, _| {
3441 assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3442 assert_eq!(params.position, lsp::Position::new(0, 7));
3443 Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3444 lsp::Position::new(0, 6),
3445 lsp::Position::new(0, 9),
3446 )))
3447 })
3448 .next()
3449 .await
3450 .unwrap();
3451 prepare_rename.await.unwrap();
3452 editor_b.update(cx_b, |editor, cx| {
3453 let rename = editor.pending_rename().unwrap();
3454 let buffer = editor.buffer().read(cx).snapshot(cx);
3455 assert_eq!(
3456 rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3457 6..9
3458 );
3459 rename.editor.update(cx, |rename_editor, cx| {
3460 rename_editor.buffer().update(cx, |rename_buffer, cx| {
3461 rename_buffer.edit([0..3], "THREE", cx);
3462 });
3463 });
3464 });
3465
3466 let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
3467 Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3468 });
3469 fake_language_server
3470 .handle_request::<lsp::request::Rename, _>(|params, _| {
3471 assert_eq!(
3472 params.text_document_position.text_document.uri.as_str(),
3473 "file:///dir/one.rs"
3474 );
3475 assert_eq!(
3476 params.text_document_position.position,
3477 lsp::Position::new(0, 6)
3478 );
3479 assert_eq!(params.new_name, "THREE");
3480 Some(lsp::WorkspaceEdit {
3481 changes: Some(
3482 [
3483 (
3484 lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3485 vec![lsp::TextEdit::new(
3486 lsp::Range::new(
3487 lsp::Position::new(0, 6),
3488 lsp::Position::new(0, 9),
3489 ),
3490 "THREE".to_string(),
3491 )],
3492 ),
3493 (
3494 lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3495 vec![
3496 lsp::TextEdit::new(
3497 lsp::Range::new(
3498 lsp::Position::new(0, 24),
3499 lsp::Position::new(0, 27),
3500 ),
3501 "THREE".to_string(),
3502 ),
3503 lsp::TextEdit::new(
3504 lsp::Range::new(
3505 lsp::Position::new(0, 35),
3506 lsp::Position::new(0, 38),
3507 ),
3508 "THREE".to_string(),
3509 ),
3510 ],
3511 ),
3512 ]
3513 .into_iter()
3514 .collect(),
3515 ),
3516 ..Default::default()
3517 })
3518 })
3519 .next()
3520 .await
3521 .unwrap();
3522 confirm_rename.await.unwrap();
3523
3524 let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3525 workspace
3526 .active_item(cx)
3527 .unwrap()
3528 .downcast::<Editor>()
3529 .unwrap()
3530 });
3531 rename_editor.update(cx_b, |editor, cx| {
3532 assert_eq!(
3533 editor.text(cx),
3534 "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3535 );
3536 editor.undo(&Undo, cx);
3537 assert_eq!(
3538 editor.text(cx),
3539 "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3540 );
3541 editor.redo(&Redo, cx);
3542 assert_eq!(
3543 editor.text(cx),
3544 "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3545 );
3546 });
3547
3548 // Ensure temporary rename edits cannot be undone/redone.
3549 editor_b.update(cx_b, |editor, cx| {
3550 editor.undo(&Undo, cx);
3551 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3552 editor.undo(&Undo, cx);
3553 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3554 editor.redo(&Redo, cx);
3555 assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3556 })
3557 }
3558
3559 #[gpui::test(iterations = 10)]
3560 async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3561 cx_a.foreground().forbid_parking();
3562
3563 // Connect to a server as 2 clients.
3564 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3565 let client_a = server.create_client(cx_a, "user_a").await;
3566 let client_b = server.create_client(cx_b, "user_b").await;
3567
3568 // Create an org that includes these 2 users.
3569 let db = &server.app_state.db;
3570 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3571 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3572 .await
3573 .unwrap();
3574 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3575 .await
3576 .unwrap();
3577
3578 // Create a channel that includes all the users.
3579 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3580 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3581 .await
3582 .unwrap();
3583 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3584 .await
3585 .unwrap();
3586 db.create_channel_message(
3587 channel_id,
3588 client_b.current_user_id(&cx_b),
3589 "hello A, it's B.",
3590 OffsetDateTime::now_utc(),
3591 1,
3592 )
3593 .await
3594 .unwrap();
3595
3596 let channels_a = cx_a
3597 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3598 channels_a
3599 .condition(cx_a, |list, _| list.available_channels().is_some())
3600 .await;
3601 channels_a.read_with(cx_a, |list, _| {
3602 assert_eq!(
3603 list.available_channels().unwrap(),
3604 &[ChannelDetails {
3605 id: channel_id.to_proto(),
3606 name: "test-channel".to_string()
3607 }]
3608 )
3609 });
3610 let channel_a = channels_a.update(cx_a, |this, cx| {
3611 this.get_channel(channel_id.to_proto(), cx).unwrap()
3612 });
3613 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3614 channel_a
3615 .condition(&cx_a, |channel, _| {
3616 channel_messages(channel)
3617 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3618 })
3619 .await;
3620
3621 let channels_b = cx_b
3622 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3623 channels_b
3624 .condition(cx_b, |list, _| list.available_channels().is_some())
3625 .await;
3626 channels_b.read_with(cx_b, |list, _| {
3627 assert_eq!(
3628 list.available_channels().unwrap(),
3629 &[ChannelDetails {
3630 id: channel_id.to_proto(),
3631 name: "test-channel".to_string()
3632 }]
3633 )
3634 });
3635
3636 let channel_b = channels_b.update(cx_b, |this, cx| {
3637 this.get_channel(channel_id.to_proto(), cx).unwrap()
3638 });
3639 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3640 channel_b
3641 .condition(&cx_b, |channel, _| {
3642 channel_messages(channel)
3643 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3644 })
3645 .await;
3646
3647 channel_a
3648 .update(cx_a, |channel, cx| {
3649 channel
3650 .send_message("oh, hi B.".to_string(), cx)
3651 .unwrap()
3652 .detach();
3653 let task = channel.send_message("sup".to_string(), cx).unwrap();
3654 assert_eq!(
3655 channel_messages(channel),
3656 &[
3657 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3658 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3659 ("user_a".to_string(), "sup".to_string(), true)
3660 ]
3661 );
3662 task
3663 })
3664 .await
3665 .unwrap();
3666
3667 channel_b
3668 .condition(&cx_b, |channel, _| {
3669 channel_messages(channel)
3670 == [
3671 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3672 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3673 ("user_a".to_string(), "sup".to_string(), false),
3674 ]
3675 })
3676 .await;
3677
3678 assert_eq!(
3679 server
3680 .state()
3681 .await
3682 .channel(channel_id)
3683 .unwrap()
3684 .connection_ids
3685 .len(),
3686 2
3687 );
3688 cx_b.update(|_| drop(channel_b));
3689 server
3690 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3691 .await;
3692
3693 cx_a.update(|_| drop(channel_a));
3694 server
3695 .condition(|state| state.channel(channel_id).is_none())
3696 .await;
3697 }
3698
3699 #[gpui::test(iterations = 10)]
3700 async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
3701 cx_a.foreground().forbid_parking();
3702
3703 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3704 let client_a = server.create_client(cx_a, "user_a").await;
3705
3706 let db = &server.app_state.db;
3707 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3708 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3709 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3710 .await
3711 .unwrap();
3712 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3713 .await
3714 .unwrap();
3715
3716 let channels_a = cx_a
3717 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3718 channels_a
3719 .condition(cx_a, |list, _| list.available_channels().is_some())
3720 .await;
3721 let channel_a = channels_a.update(cx_a, |this, cx| {
3722 this.get_channel(channel_id.to_proto(), cx).unwrap()
3723 });
3724
3725 // Messages aren't allowed to be too long.
3726 channel_a
3727 .update(cx_a, |channel, cx| {
3728 let long_body = "this is long.\n".repeat(1024);
3729 channel.send_message(long_body, cx).unwrap()
3730 })
3731 .await
3732 .unwrap_err();
3733
3734 // Messages aren't allowed to be blank.
3735 channel_a.update(cx_a, |channel, cx| {
3736 channel.send_message(String::new(), cx).unwrap_err()
3737 });
3738
3739 // Leading and trailing whitespace are trimmed.
3740 channel_a
3741 .update(cx_a, |channel, cx| {
3742 channel
3743 .send_message("\n surrounded by whitespace \n".to_string(), cx)
3744 .unwrap()
3745 })
3746 .await
3747 .unwrap();
3748 assert_eq!(
3749 db.get_channel_messages(channel_id, 10, None)
3750 .await
3751 .unwrap()
3752 .iter()
3753 .map(|m| &m.body)
3754 .collect::<Vec<_>>(),
3755 &["surrounded by whitespace"]
3756 );
3757 }
3758
3759 #[gpui::test(iterations = 10)]
3760 async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3761 cx_a.foreground().forbid_parking();
3762
3763 // Connect to a server as 2 clients.
3764 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3765 let client_a = server.create_client(cx_a, "user_a").await;
3766 let client_b = server.create_client(cx_b, "user_b").await;
3767 let mut status_b = client_b.status();
3768
3769 // Create an org that includes these 2 users.
3770 let db = &server.app_state.db;
3771 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3772 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3773 .await
3774 .unwrap();
3775 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3776 .await
3777 .unwrap();
3778
3779 // Create a channel that includes all the users.
3780 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3781 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3782 .await
3783 .unwrap();
3784 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3785 .await
3786 .unwrap();
3787 db.create_channel_message(
3788 channel_id,
3789 client_b.current_user_id(&cx_b),
3790 "hello A, it's B.",
3791 OffsetDateTime::now_utc(),
3792 2,
3793 )
3794 .await
3795 .unwrap();
3796
3797 let channels_a = cx_a
3798 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3799 channels_a
3800 .condition(cx_a, |list, _| list.available_channels().is_some())
3801 .await;
3802
3803 channels_a.read_with(cx_a, |list, _| {
3804 assert_eq!(
3805 list.available_channels().unwrap(),
3806 &[ChannelDetails {
3807 id: channel_id.to_proto(),
3808 name: "test-channel".to_string()
3809 }]
3810 )
3811 });
3812 let channel_a = channels_a.update(cx_a, |this, cx| {
3813 this.get_channel(channel_id.to_proto(), cx).unwrap()
3814 });
3815 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3816 channel_a
3817 .condition(&cx_a, |channel, _| {
3818 channel_messages(channel)
3819 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3820 })
3821 .await;
3822
3823 let channels_b = cx_b
3824 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3825 channels_b
3826 .condition(cx_b, |list, _| list.available_channels().is_some())
3827 .await;
3828 channels_b.read_with(cx_b, |list, _| {
3829 assert_eq!(
3830 list.available_channels().unwrap(),
3831 &[ChannelDetails {
3832 id: channel_id.to_proto(),
3833 name: "test-channel".to_string()
3834 }]
3835 )
3836 });
3837
3838 let channel_b = channels_b.update(cx_b, |this, cx| {
3839 this.get_channel(channel_id.to_proto(), cx).unwrap()
3840 });
3841 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3842 channel_b
3843 .condition(&cx_b, |channel, _| {
3844 channel_messages(channel)
3845 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3846 })
3847 .await;
3848
3849 // Disconnect client B, ensuring we can still access its cached channel data.
3850 server.forbid_connections();
3851 server.disconnect_client(client_b.current_user_id(&cx_b));
3852 while !matches!(
3853 status_b.next().await,
3854 Some(client::Status::ReconnectionError { .. })
3855 ) {}
3856
3857 channels_b.read_with(cx_b, |channels, _| {
3858 assert_eq!(
3859 channels.available_channels().unwrap(),
3860 [ChannelDetails {
3861 id: channel_id.to_proto(),
3862 name: "test-channel".to_string()
3863 }]
3864 )
3865 });
3866 channel_b.read_with(cx_b, |channel, _| {
3867 assert_eq!(
3868 channel_messages(channel),
3869 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3870 )
3871 });
3872
3873 // Send a message from client B while it is disconnected.
3874 channel_b
3875 .update(cx_b, |channel, cx| {
3876 let task = channel
3877 .send_message("can you see this?".to_string(), cx)
3878 .unwrap();
3879 assert_eq!(
3880 channel_messages(channel),
3881 &[
3882 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3883 ("user_b".to_string(), "can you see this?".to_string(), true)
3884 ]
3885 );
3886 task
3887 })
3888 .await
3889 .unwrap_err();
3890
3891 // Send a message from client A while B is disconnected.
3892 channel_a
3893 .update(cx_a, |channel, cx| {
3894 channel
3895 .send_message("oh, hi B.".to_string(), cx)
3896 .unwrap()
3897 .detach();
3898 let task = channel.send_message("sup".to_string(), cx).unwrap();
3899 assert_eq!(
3900 channel_messages(channel),
3901 &[
3902 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3903 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3904 ("user_a".to_string(), "sup".to_string(), true)
3905 ]
3906 );
3907 task
3908 })
3909 .await
3910 .unwrap();
3911
3912 // Give client B a chance to reconnect.
3913 server.allow_connections();
3914 cx_b.foreground().advance_clock(Duration::from_secs(10));
3915
3916 // Verify that B sees the new messages upon reconnection, as well as the message client B
3917 // sent while offline.
3918 channel_b
3919 .condition(&cx_b, |channel, _| {
3920 channel_messages(channel)
3921 == [
3922 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3923 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3924 ("user_a".to_string(), "sup".to_string(), false),
3925 ("user_b".to_string(), "can you see this?".to_string(), false),
3926 ]
3927 })
3928 .await;
3929
3930 // Ensure client A and B can communicate normally after reconnection.
3931 channel_a
3932 .update(cx_a, |channel, cx| {
3933 channel.send_message("you online?".to_string(), cx).unwrap()
3934 })
3935 .await
3936 .unwrap();
3937 channel_b
3938 .condition(&cx_b, |channel, _| {
3939 channel_messages(channel)
3940 == [
3941 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3942 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3943 ("user_a".to_string(), "sup".to_string(), false),
3944 ("user_b".to_string(), "can you see this?".to_string(), false),
3945 ("user_a".to_string(), "you online?".to_string(), false),
3946 ]
3947 })
3948 .await;
3949
3950 channel_b
3951 .update(cx_b, |channel, cx| {
3952 channel.send_message("yep".to_string(), cx).unwrap()
3953 })
3954 .await
3955 .unwrap();
3956 channel_a
3957 .condition(&cx_a, |channel, _| {
3958 channel_messages(channel)
3959 == [
3960 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3961 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3962 ("user_a".to_string(), "sup".to_string(), false),
3963 ("user_b".to_string(), "can you see this?".to_string(), false),
3964 ("user_a".to_string(), "you online?".to_string(), false),
3965 ("user_b".to_string(), "yep".to_string(), false),
3966 ]
3967 })
3968 .await;
3969 }
3970
3971 #[gpui::test(iterations = 10)]
3972 async fn test_contacts(
3973 cx_a: &mut TestAppContext,
3974 cx_b: &mut TestAppContext,
3975 cx_c: &mut TestAppContext,
3976 ) {
3977 cx_a.foreground().forbid_parking();
3978 let lang_registry = Arc::new(LanguageRegistry::new());
3979 let fs = FakeFs::new(cx_a.background());
3980
3981 // Connect to a server as 3 clients.
3982 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3983 let client_a = server.create_client(cx_a, "user_a").await;
3984 let client_b = server.create_client(cx_b, "user_b").await;
3985 let client_c = server.create_client(cx_c, "user_c").await;
3986
3987 // Share a worktree as client A.
3988 fs.insert_tree(
3989 "/a",
3990 json!({
3991 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3992 }),
3993 )
3994 .await;
3995
3996 let project_a = cx_a.update(|cx| {
3997 Project::local(
3998 client_a.clone(),
3999 client_a.user_store.clone(),
4000 lang_registry.clone(),
4001 fs.clone(),
4002 cx,
4003 )
4004 });
4005 let (worktree_a, _) = project_a
4006 .update(cx_a, |p, cx| {
4007 p.find_or_create_local_worktree("/a", false, cx)
4008 })
4009 .await
4010 .unwrap();
4011 worktree_a
4012 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4013 .await;
4014
4015 client_a
4016 .user_store
4017 .condition(&cx_a, |user_store, _| {
4018 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4019 })
4020 .await;
4021 client_b
4022 .user_store
4023 .condition(&cx_b, |user_store, _| {
4024 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4025 })
4026 .await;
4027 client_c
4028 .user_store
4029 .condition(&cx_c, |user_store, _| {
4030 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4031 })
4032 .await;
4033
4034 let project_id = project_a
4035 .update(cx_a, |project, _| project.next_remote_id())
4036 .await;
4037 project_a
4038 .update(cx_a, |project, cx| project.share(cx))
4039 .await
4040 .unwrap();
4041
4042 let _project_b = Project::remote(
4043 project_id,
4044 client_b.clone(),
4045 client_b.user_store.clone(),
4046 lang_registry.clone(),
4047 fs.clone(),
4048 &mut cx_b.to_async(),
4049 )
4050 .await
4051 .unwrap();
4052
4053 client_a
4054 .user_store
4055 .condition(&cx_a, |user_store, _| {
4056 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4057 })
4058 .await;
4059 client_b
4060 .user_store
4061 .condition(&cx_b, |user_store, _| {
4062 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4063 })
4064 .await;
4065 client_c
4066 .user_store
4067 .condition(&cx_c, |user_store, _| {
4068 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4069 })
4070 .await;
4071
4072 project_a
4073 .condition(&cx_a, |project, _| {
4074 project.collaborators().contains_key(&client_b.peer_id)
4075 })
4076 .await;
4077
4078 cx_a.update(move |_| drop(project_a));
4079 client_a
4080 .user_store
4081 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4082 .await;
4083 client_b
4084 .user_store
4085 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4086 .await;
4087 client_c
4088 .user_store
4089 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4090 .await;
4091
4092 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
4093 user_store
4094 .contacts()
4095 .iter()
4096 .map(|contact| {
4097 let worktrees = contact
4098 .projects
4099 .iter()
4100 .map(|p| {
4101 (
4102 p.worktree_root_names[0].as_str(),
4103 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4104 )
4105 })
4106 .collect();
4107 (contact.user.github_login.as_str(), worktrees)
4108 })
4109 .collect()
4110 }
4111 }
4112
4113 #[gpui::test(iterations = 100)]
4114 async fn test_random_collaboration(cx: &mut TestAppContext, rng: StdRng) {
4115 cx.foreground().forbid_parking();
4116 let max_peers = env::var("MAX_PEERS")
4117 .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
4118 .unwrap_or(5);
4119 let max_operations = env::var("OPERATIONS")
4120 .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
4121 .unwrap_or(10);
4122
4123 let rng = Arc::new(Mutex::new(rng));
4124
4125 let guest_lang_registry = Arc::new(LanguageRegistry::new());
4126 let (language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
4127
4128 let fs = FakeFs::new(cx.background());
4129 fs.insert_tree(
4130 "/_collab",
4131 json!({
4132 ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
4133 }),
4134 )
4135 .await;
4136
4137 let operations = Rc::new(Cell::new(0));
4138 let mut server = TestServer::start(cx.foreground(), cx.background()).await;
4139 let mut clients = Vec::new();
4140
4141 let mut next_entity_id = 100000;
4142 let mut host_cx = TestAppContext::new(
4143 cx.foreground_platform(),
4144 cx.platform(),
4145 cx.foreground(),
4146 cx.background(),
4147 cx.font_cache(),
4148 cx.leak_detector(),
4149 next_entity_id,
4150 );
4151 let host = server.create_client(&mut host_cx, "host").await;
4152 let host_project = host_cx.update(|cx| {
4153 Project::local(
4154 host.client.clone(),
4155 host.user_store.clone(),
4156 Arc::new(LanguageRegistry::new()),
4157 fs.clone(),
4158 cx,
4159 )
4160 });
4161 let host_project_id = host_project
4162 .update(&mut host_cx, |p, _| p.next_remote_id())
4163 .await;
4164
4165 let (collab_worktree, _) = host_project
4166 .update(&mut host_cx, |project, cx| {
4167 project.find_or_create_local_worktree("/_collab", false, cx)
4168 })
4169 .await
4170 .unwrap();
4171 collab_worktree
4172 .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
4173 .await;
4174 host_project
4175 .update(&mut host_cx, |project, cx| project.share(cx))
4176 .await
4177 .unwrap();
4178
4179 clients.push(cx.foreground().spawn(host.simulate_host(
4180 host_project,
4181 language_server_config,
4182 operations.clone(),
4183 max_operations,
4184 rng.clone(),
4185 host_cx,
4186 )));
4187
4188 while operations.get() < max_operations {
4189 cx.background().simulate_random_delay().await;
4190 if clients.len() >= max_peers {
4191 break;
4192 } else if rng.lock().gen_bool(0.05) {
4193 operations.set(operations.get() + 1);
4194
4195 let guest_id = clients.len();
4196 log::info!("Adding guest {}", guest_id);
4197 next_entity_id += 100000;
4198 let mut guest_cx = TestAppContext::new(
4199 cx.foreground_platform(),
4200 cx.platform(),
4201 cx.foreground(),
4202 cx.background(),
4203 cx.font_cache(),
4204 cx.leak_detector(),
4205 next_entity_id,
4206 );
4207 let guest = server
4208 .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
4209 .await;
4210 let guest_project = Project::remote(
4211 host_project_id,
4212 guest.client.clone(),
4213 guest.user_store.clone(),
4214 guest_lang_registry.clone(),
4215 FakeFs::new(cx.background()),
4216 &mut guest_cx.to_async(),
4217 )
4218 .await
4219 .unwrap();
4220 clients.push(cx.foreground().spawn(guest.simulate_guest(
4221 guest_id,
4222 guest_project,
4223 operations.clone(),
4224 max_operations,
4225 rng.clone(),
4226 guest_cx,
4227 )));
4228
4229 log::info!("Guest {} added", guest_id);
4230 }
4231 }
4232
4233 let mut clients = futures::future::join_all(clients).await;
4234 cx.foreground().run_until_parked();
4235
4236 let (host_client, host_cx) = clients.remove(0);
4237 let host_project = host_client.project.as_ref().unwrap();
4238 let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
4239 project
4240 .worktrees(cx)
4241 .map(|worktree| {
4242 let snapshot = worktree.read(cx).snapshot();
4243 (snapshot.id(), snapshot)
4244 })
4245 .collect::<BTreeMap<_, _>>()
4246 });
4247
4248 for (guest_client, guest_cx) in clients.iter() {
4249 let guest_id = guest_client.client.id();
4250 let worktree_snapshots =
4251 guest_client
4252 .project
4253 .as_ref()
4254 .unwrap()
4255 .read_with(guest_cx, |project, cx| {
4256 project
4257 .worktrees(cx)
4258 .map(|worktree| {
4259 let worktree = worktree.read(cx);
4260 (worktree.id(), worktree.snapshot())
4261 })
4262 .collect::<BTreeMap<_, _>>()
4263 });
4264
4265 assert_eq!(
4266 worktree_snapshots.keys().collect::<Vec<_>>(),
4267 host_worktree_snapshots.keys().collect::<Vec<_>>(),
4268 "guest {} has different worktrees than the host",
4269 guest_id
4270 );
4271 for (id, host_snapshot) in &host_worktree_snapshots {
4272 let guest_snapshot = &worktree_snapshots[id];
4273 assert_eq!(
4274 guest_snapshot.root_name(),
4275 host_snapshot.root_name(),
4276 "guest {} has different root name than the host for worktree {}",
4277 guest_id,
4278 id
4279 );
4280 assert_eq!(
4281 guest_snapshot.entries(false).collect::<Vec<_>>(),
4282 host_snapshot.entries(false).collect::<Vec<_>>(),
4283 "guest {} has different snapshot than the host for worktree {}",
4284 guest_id,
4285 id
4286 );
4287 }
4288
4289 guest_client
4290 .project
4291 .as_ref()
4292 .unwrap()
4293 .read_with(guest_cx, |project, cx| {
4294 assert!(
4295 !project.has_deferred_operations(cx),
4296 "guest {} has deferred operations",
4297 guest_id,
4298 );
4299 });
4300
4301 for guest_buffer in &guest_client.buffers {
4302 let buffer_id = guest_buffer.read_with(guest_cx, |buffer, _| buffer.remote_id());
4303 let host_buffer = host_project.read_with(&host_cx, |project, cx| {
4304 project.buffer_for_id(buffer_id, cx).expect(&format!(
4305 "host does not have buffer for guest:{}, peer:{}, id:{}",
4306 guest_id, guest_client.peer_id, buffer_id
4307 ))
4308 });
4309 assert_eq!(
4310 guest_buffer.read_with(guest_cx, |buffer, _| buffer.text()),
4311 host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
4312 "guest {}, buffer {}, path {:?}, differs from the host's buffer",
4313 guest_id,
4314 buffer_id,
4315 host_buffer
4316 .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx))
4317 );
4318 }
4319 }
4320 }
4321
4322 struct TestServer {
4323 peer: Arc<Peer>,
4324 app_state: Arc<AppState>,
4325 server: Arc<Server>,
4326 foreground: Rc<executor::Foreground>,
4327 notifications: mpsc::UnboundedReceiver<()>,
4328 connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
4329 forbid_connections: Arc<AtomicBool>,
4330 _test_db: TestDb,
4331 }
4332
4333 impl TestServer {
4334 async fn start(
4335 foreground: Rc<executor::Foreground>,
4336 background: Arc<executor::Background>,
4337 ) -> Self {
4338 let test_db = TestDb::fake(background);
4339 let app_state = Self::build_app_state(&test_db).await;
4340 let peer = Peer::new();
4341 let notifications = mpsc::unbounded();
4342 let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
4343 Self {
4344 peer,
4345 app_state,
4346 server,
4347 foreground,
4348 notifications: notifications.1,
4349 connection_killers: Default::default(),
4350 forbid_connections: Default::default(),
4351 _test_db: test_db,
4352 }
4353 }
4354
4355 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
4356 let http = FakeHttpClient::with_404_response();
4357 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
4358 let client_name = name.to_string();
4359 let mut client = Client::new(http.clone());
4360 let server = self.server.clone();
4361 let connection_killers = self.connection_killers.clone();
4362 let forbid_connections = self.forbid_connections.clone();
4363 let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
4364
4365 Arc::get_mut(&mut client)
4366 .unwrap()
4367 .override_authenticate(move |cx| {
4368 cx.spawn(|_| async move {
4369 let access_token = "the-token".to_string();
4370 Ok(Credentials {
4371 user_id: user_id.0 as u64,
4372 access_token,
4373 })
4374 })
4375 })
4376 .override_establish_connection(move |credentials, cx| {
4377 assert_eq!(credentials.user_id, user_id.0 as u64);
4378 assert_eq!(credentials.access_token, "the-token");
4379
4380 let server = server.clone();
4381 let connection_killers = connection_killers.clone();
4382 let forbid_connections = forbid_connections.clone();
4383 let client_name = client_name.clone();
4384 let connection_id_tx = connection_id_tx.clone();
4385 cx.spawn(move |cx| async move {
4386 if forbid_connections.load(SeqCst) {
4387 Err(EstablishConnectionError::other(anyhow!(
4388 "server is forbidding connections"
4389 )))
4390 } else {
4391 let (client_conn, server_conn, kill_conn) =
4392 Connection::in_memory(cx.background());
4393 connection_killers.lock().insert(user_id, kill_conn);
4394 cx.background()
4395 .spawn(server.handle_connection(
4396 server_conn,
4397 client_name,
4398 user_id,
4399 Some(connection_id_tx),
4400 cx.background(),
4401 ))
4402 .detach();
4403 Ok(client_conn)
4404 }
4405 })
4406 });
4407
4408 client
4409 .authenticate_and_connect(&cx.to_async())
4410 .await
4411 .unwrap();
4412
4413 Channel::init(&client);
4414 Project::init(&client);
4415
4416 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
4417 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
4418 let mut authed_user =
4419 user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
4420 while authed_user.next().await.unwrap().is_none() {}
4421
4422 TestClient {
4423 client,
4424 peer_id,
4425 user_store,
4426 project: Default::default(),
4427 buffers: Default::default(),
4428 }
4429 }
4430
4431 fn disconnect_client(&self, user_id: UserId) {
4432 if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
4433 let _ = kill_conn.try_send(Some(()));
4434 }
4435 }
4436
4437 fn forbid_connections(&self) {
4438 self.forbid_connections.store(true, SeqCst);
4439 }
4440
4441 fn allow_connections(&self) {
4442 self.forbid_connections.store(false, SeqCst);
4443 }
4444
4445 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
4446 let mut config = Config::default();
4447 config.session_secret = "a".repeat(32);
4448 config.database_url = test_db.url.clone();
4449 let github_client = github::AppClient::test();
4450 Arc::new(AppState {
4451 db: test_db.db().clone(),
4452 handlebars: Default::default(),
4453 auth_client: auth::build_client("", ""),
4454 repo_client: github::RepoClient::test(&github_client),
4455 github_client,
4456 config,
4457 })
4458 }
4459
4460 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
4461 self.server.store.read()
4462 }
4463
4464 async fn condition<F>(&mut self, mut predicate: F)
4465 where
4466 F: FnMut(&Store) -> bool,
4467 {
4468 async_std::future::timeout(Duration::from_millis(500), async {
4469 while !(predicate)(&*self.server.store.read()) {
4470 self.foreground.start_waiting();
4471 self.notifications.next().await;
4472 self.foreground.finish_waiting();
4473 }
4474 })
4475 .await
4476 .expect("condition timed out");
4477 }
4478 }
4479
4480 impl Drop for TestServer {
4481 fn drop(&mut self) {
4482 self.peer.reset();
4483 }
4484 }
4485
4486 struct TestClient {
4487 client: Arc<Client>,
4488 pub peer_id: PeerId,
4489 pub user_store: ModelHandle<UserStore>,
4490 project: Option<ModelHandle<Project>>,
4491 buffers: HashSet<ModelHandle<zed::language::Buffer>>,
4492 }
4493
4494 impl Deref for TestClient {
4495 type Target = Arc<Client>;
4496
4497 fn deref(&self) -> &Self::Target {
4498 &self.client
4499 }
4500 }
4501
4502 impl TestClient {
4503 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
4504 UserId::from_proto(
4505 self.user_store
4506 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
4507 )
4508 }
4509
4510 fn simulate_host(
4511 mut self,
4512 project: ModelHandle<Project>,
4513 mut language_server_config: LanguageServerConfig,
4514 operations: Rc<Cell<usize>>,
4515 max_operations: usize,
4516 rng: Arc<Mutex<StdRng>>,
4517 mut cx: TestAppContext,
4518 ) -> impl Future<Output = (Self, TestAppContext)> {
4519 let files: Arc<Mutex<Vec<PathBuf>>> = Default::default();
4520
4521 // Set up a fake language server.
4522 language_server_config.set_fake_initializer({
4523 let rng = rng.clone();
4524 let files = files.clone();
4525 let project = project.downgrade();
4526 move |fake_server| {
4527 fake_server.handle_request::<lsp::request::Completion, _>(|_, _| {
4528 Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
4529 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
4530 range: lsp::Range::new(
4531 lsp::Position::new(0, 0),
4532 lsp::Position::new(0, 0),
4533 ),
4534 new_text: "the-new-text".to_string(),
4535 })),
4536 ..Default::default()
4537 }]))
4538 });
4539
4540 fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_, _| {
4541 Some(vec![lsp::CodeActionOrCommand::CodeAction(
4542 lsp::CodeAction {
4543 title: "the-code-action".to_string(),
4544 ..Default::default()
4545 },
4546 )])
4547 });
4548
4549 fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(
4550 |params, _| {
4551 Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4552 params.position,
4553 params.position,
4554 )))
4555 },
4556 );
4557
4558 fake_server.handle_request::<lsp::request::GotoDefinition, _>({
4559 let files = files.clone();
4560 let rng = rng.clone();
4561 move |_, _| {
4562 let files = files.lock();
4563 let mut rng = rng.lock();
4564 let count = rng.gen_range::<usize, _>(1..3);
4565 let files = (0..count)
4566 .map(|_| files.choose(&mut *rng).unwrap())
4567 .collect::<Vec<_>>();
4568 log::info!("LSP: Returning definitions in files {:?}", &files);
4569 Some(lsp::GotoDefinitionResponse::Array(
4570 files
4571 .into_iter()
4572 .map(|file| lsp::Location {
4573 uri: lsp::Url::from_file_path(file).unwrap(),
4574 range: Default::default(),
4575 })
4576 .collect(),
4577 ))
4578 }
4579 });
4580
4581 fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _>({
4582 let rng = rng.clone();
4583 let project = project.clone();
4584 move |params, mut cx| {
4585 if let Some(project) = project.upgrade(&cx) {
4586 project.update(&mut cx, |project, cx| {
4587 let path = params
4588 .text_document_position_params
4589 .text_document
4590 .uri
4591 .to_file_path()
4592 .unwrap();
4593 let (worktree, relative_path) =
4594 project.find_local_worktree(&path, cx)?;
4595 let project_path =
4596 ProjectPath::from((worktree.read(cx).id(), relative_path));
4597 let buffer =
4598 project.get_open_buffer(&project_path, cx)?.read(cx);
4599
4600 let mut highlights = Vec::new();
4601 let highlight_count = rng.lock().gen_range(1..=5);
4602 let mut prev_end = 0;
4603 for _ in 0..highlight_count {
4604 let range =
4605 buffer.random_byte_range(prev_end, &mut *rng.lock());
4606 let start = buffer
4607 .offset_to_point_utf16(range.start)
4608 .to_lsp_position();
4609 let end = buffer
4610 .offset_to_point_utf16(range.end)
4611 .to_lsp_position();
4612 highlights.push(lsp::DocumentHighlight {
4613 range: lsp::Range::new(start, end),
4614 kind: Some(lsp::DocumentHighlightKind::READ),
4615 });
4616 prev_end = range.end;
4617 }
4618 Some(highlights)
4619 })
4620 } else {
4621 None
4622 }
4623 }
4624 });
4625 }
4626 });
4627
4628 project.update(&mut cx, |project, _| {
4629 project.languages().add(Arc::new(Language::new(
4630 LanguageConfig {
4631 name: "Rust".into(),
4632 path_suffixes: vec!["rs".to_string()],
4633 language_server: Some(language_server_config),
4634 ..Default::default()
4635 },
4636 None,
4637 )));
4638 });
4639
4640 async move {
4641 let fs = project.read_with(&cx, |project, _| project.fs().clone());
4642 while operations.get() < max_operations {
4643 operations.set(operations.get() + 1);
4644
4645 let distribution = rng.lock().gen_range::<usize, _>(0..100);
4646 match distribution {
4647 0..=20 if !files.lock().is_empty() => {
4648 let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4649 let mut path = path.as_path();
4650 while let Some(parent_path) = path.parent() {
4651 path = parent_path;
4652 if rng.lock().gen() {
4653 break;
4654 }
4655 }
4656
4657 log::info!("Host: find/create local worktree {:?}", path);
4658 project
4659 .update(&mut cx, |project, cx| {
4660 project.find_or_create_local_worktree(path, false, cx)
4661 })
4662 .await
4663 .unwrap();
4664 }
4665 10..=80 if !files.lock().is_empty() => {
4666 let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4667 let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4668 let (worktree, path) = project
4669 .update(&mut cx, |project, cx| {
4670 project.find_or_create_local_worktree(
4671 file.clone(),
4672 false,
4673 cx,
4674 )
4675 })
4676 .await
4677 .unwrap();
4678 let project_path =
4679 worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4680 log::info!("Host: opening path {:?}, {:?}", file, project_path);
4681 let buffer = project
4682 .update(&mut cx, |project, cx| {
4683 project.open_buffer(project_path, cx)
4684 })
4685 .await
4686 .unwrap();
4687 self.buffers.insert(buffer.clone());
4688 buffer
4689 } else {
4690 self.buffers
4691 .iter()
4692 .choose(&mut *rng.lock())
4693 .unwrap()
4694 .clone()
4695 };
4696
4697 if rng.lock().gen_bool(0.1) {
4698 cx.update(|cx| {
4699 log::info!(
4700 "Host: dropping buffer {:?}",
4701 buffer.read(cx).file().unwrap().full_path(cx)
4702 );
4703 self.buffers.remove(&buffer);
4704 drop(buffer);
4705 });
4706 } else {
4707 buffer.update(&mut cx, |buffer, cx| {
4708 log::info!(
4709 "Host: updating buffer {:?} ({})",
4710 buffer.file().unwrap().full_path(cx),
4711 buffer.remote_id()
4712 );
4713 buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4714 });
4715 }
4716 }
4717 _ => loop {
4718 let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
4719 let mut path = PathBuf::new();
4720 path.push("/");
4721 for _ in 0..path_component_count {
4722 let letter = rng.lock().gen_range(b'a'..=b'z');
4723 path.push(std::str::from_utf8(&[letter]).unwrap());
4724 }
4725 path.set_extension("rs");
4726 let parent_path = path.parent().unwrap();
4727
4728 log::info!("Host: creating file {:?}", path,);
4729
4730 if fs.create_dir(&parent_path).await.is_ok()
4731 && fs.create_file(&path, Default::default()).await.is_ok()
4732 {
4733 files.lock().push(path);
4734 break;
4735 } else {
4736 log::info!("Host: cannot create file");
4737 }
4738 },
4739 }
4740
4741 cx.background().simulate_random_delay().await;
4742 }
4743
4744 log::info!("Host done");
4745
4746 self.project = Some(project);
4747 (self, cx)
4748 }
4749 }
4750
4751 pub async fn simulate_guest(
4752 mut self,
4753 guest_id: usize,
4754 project: ModelHandle<Project>,
4755 operations: Rc<Cell<usize>>,
4756 max_operations: usize,
4757 rng: Arc<Mutex<StdRng>>,
4758 mut cx: TestAppContext,
4759 ) -> (Self, TestAppContext) {
4760 while operations.get() < max_operations {
4761 let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4762 let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4763 project
4764 .worktrees(&cx)
4765 .filter(|worktree| {
4766 let worktree = worktree.read(cx);
4767 !worktree.is_weak() && worktree.entries(false).any(|e| e.is_file())
4768 })
4769 .choose(&mut *rng.lock())
4770 }) {
4771 worktree
4772 } else {
4773 cx.background().simulate_random_delay().await;
4774 continue;
4775 };
4776
4777 operations.set(operations.get() + 1);
4778 let (worktree_root_name, project_path) =
4779 worktree.read_with(&cx, |worktree, _| {
4780 let entry = worktree
4781 .entries(false)
4782 .filter(|e| e.is_file())
4783 .choose(&mut *rng.lock())
4784 .unwrap();
4785 (
4786 worktree.root_name().to_string(),
4787 (worktree.id(), entry.path.clone()),
4788 )
4789 });
4790 log::info!(
4791 "Guest {}: opening path in worktree {:?} {:?} {:?}",
4792 guest_id,
4793 project_path.0,
4794 worktree_root_name,
4795 project_path.1
4796 );
4797 let buffer = project
4798 .update(&mut cx, |project, cx| {
4799 project.open_buffer(project_path.clone(), cx)
4800 })
4801 .await
4802 .unwrap();
4803 log::info!(
4804 "Guest {}: path in worktree {:?} {:?} {:?} opened with buffer id {:?}",
4805 guest_id,
4806 project_path.0,
4807 worktree_root_name,
4808 project_path.1,
4809 buffer.read_with(&cx, |buffer, _| buffer.remote_id())
4810 );
4811 self.buffers.insert(buffer.clone());
4812 buffer
4813 } else {
4814 operations.set(operations.get() + 1);
4815
4816 self.buffers
4817 .iter()
4818 .choose(&mut *rng.lock())
4819 .unwrap()
4820 .clone()
4821 };
4822
4823 let choice = rng.lock().gen_range(0..100);
4824 match choice {
4825 0..=9 => {
4826 cx.update(|cx| {
4827 log::info!(
4828 "Guest {}: dropping buffer {:?}",
4829 guest_id,
4830 buffer.read(cx).file().unwrap().full_path(cx)
4831 );
4832 self.buffers.remove(&buffer);
4833 drop(buffer);
4834 });
4835 }
4836 10..=19 => {
4837 let completions = project.update(&mut cx, |project, cx| {
4838 log::info!(
4839 "Guest {}: requesting completions for buffer {:?}",
4840 guest_id,
4841 buffer.read(cx).file().unwrap().full_path(cx)
4842 );
4843 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4844 project.completions(&buffer, offset, cx)
4845 });
4846 let completions = cx.background().spawn(async move {
4847 completions.await.expect("completions request failed");
4848 });
4849 if rng.lock().gen_bool(0.3) {
4850 log::info!("Guest {}: detaching completions request", guest_id);
4851 completions.detach();
4852 } else {
4853 completions.await;
4854 }
4855 }
4856 20..=29 => {
4857 let code_actions = project.update(&mut cx, |project, cx| {
4858 log::info!(
4859 "Guest {}: requesting code actions for buffer {:?}",
4860 guest_id,
4861 buffer.read(cx).file().unwrap().full_path(cx)
4862 );
4863 let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
4864 project.code_actions(&buffer, range, cx)
4865 });
4866 let code_actions = cx.background().spawn(async move {
4867 code_actions.await.expect("code actions request failed");
4868 });
4869 if rng.lock().gen_bool(0.3) {
4870 log::info!("Guest {}: detaching code actions request", guest_id);
4871 code_actions.detach();
4872 } else {
4873 code_actions.await;
4874 }
4875 }
4876 30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
4877 let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
4878 log::info!(
4879 "Guest {}: saving buffer {:?}",
4880 guest_id,
4881 buffer.file().unwrap().full_path(cx)
4882 );
4883 (buffer.version(), buffer.save(cx))
4884 });
4885 let save = cx.spawn(|cx| async move {
4886 let (saved_version, _) = save.await.expect("save request failed");
4887 buffer.read_with(&cx, |buffer, _| {
4888 assert!(buffer.version().observed_all(&saved_version));
4889 assert!(saved_version.observed_all(&requested_version));
4890 });
4891 });
4892 if rng.lock().gen_bool(0.3) {
4893 log::info!("Guest {}: detaching save request", guest_id);
4894 save.detach();
4895 } else {
4896 save.await;
4897 }
4898 }
4899 40..=44 => {
4900 let prepare_rename = project.update(&mut cx, |project, cx| {
4901 log::info!(
4902 "Guest {}: preparing rename for buffer {:?}",
4903 guest_id,
4904 buffer.read(cx).file().unwrap().full_path(cx)
4905 );
4906 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4907 project.prepare_rename(buffer, offset, cx)
4908 });
4909 let prepare_rename = cx.background().spawn(async move {
4910 prepare_rename.await.expect("prepare rename request failed");
4911 });
4912 if rng.lock().gen_bool(0.3) {
4913 log::info!("Guest {}: detaching prepare rename request", guest_id);
4914 prepare_rename.detach();
4915 } else {
4916 prepare_rename.await;
4917 }
4918 }
4919 45..=49 => {
4920 let definitions = project.update(&mut cx, |project, cx| {
4921 log::info!(
4922 "Guest {}: requesting definitions for buffer {:?}",
4923 guest_id,
4924 buffer.read(cx).file().unwrap().full_path(cx)
4925 );
4926 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4927 project.definition(&buffer, offset, cx)
4928 });
4929 let definitions = cx.background().spawn(async move {
4930 definitions.await.expect("definitions request failed")
4931 });
4932 if rng.lock().gen_bool(0.3) {
4933 log::info!("Guest {}: detaching definitions request", guest_id);
4934 definitions.detach();
4935 } else {
4936 self.buffers
4937 .extend(definitions.await.into_iter().map(|loc| loc.buffer));
4938 }
4939 }
4940 50..=54 => {
4941 let highlights = project.update(&mut cx, |project, cx| {
4942 log::info!(
4943 "Guest {}: requesting highlights for buffer {:?}",
4944 guest_id,
4945 buffer.read(cx).file().unwrap().full_path(cx)
4946 );
4947 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4948 project.document_highlights(&buffer, offset, cx)
4949 });
4950 let highlights = cx.background().spawn(async move {
4951 highlights.await.expect("highlights request failed");
4952 });
4953 if rng.lock().gen_bool(0.3) {
4954 log::info!("Guest {}: detaching highlights request", guest_id);
4955 highlights.detach();
4956 } else {
4957 highlights.await;
4958 }
4959 }
4960 55..=59 => {
4961 let search = project.update(&mut cx, |project, cx| {
4962 let query = rng.lock().gen_range('a'..='z');
4963 log::info!("Guest {}: project-wide search {:?}", guest_id, query);
4964 project.search(SearchQuery::text(query, false, false), cx)
4965 });
4966 let search = cx
4967 .background()
4968 .spawn(async move { search.await.expect("search request failed") });
4969 if rng.lock().gen_bool(0.3) {
4970 log::info!("Guest {}: detaching search request", guest_id);
4971 search.detach();
4972 } else {
4973 self.buffers.extend(search.await.into_keys());
4974 }
4975 }
4976 _ => {
4977 buffer.update(&mut cx, |buffer, cx| {
4978 log::info!(
4979 "Guest {}: updating buffer {:?}",
4980 guest_id,
4981 buffer.file().unwrap().full_path(cx)
4982 );
4983 buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4984 });
4985 }
4986 }
4987 cx.background().simulate_random_delay().await;
4988 }
4989
4990 log::info!("Guest {} done", guest_id);
4991
4992 self.project = Some(project);
4993 (self, cx)
4994 }
4995 }
4996
4997 impl Drop for TestClient {
4998 fn drop(&mut self) {
4999 self.client.tear_down();
5000 }
5001 }
5002
5003 impl Executor for Arc<gpui::executor::Background> {
5004 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
5005 self.spawn(future).detach();
5006 }
5007 }
5008
5009 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
5010 channel
5011 .messages()
5012 .cursor::<()>()
5013 .map(|m| {
5014 (
5015 m.sender.github_login.clone(),
5016 m.body.clone(),
5017 m.is_pending(),
5018 )
5019 })
5020 .collect()
5021 }
5022
5023 struct EmptyView;
5024
5025 impl gpui::Entity for EmptyView {
5026 type Event = ();
5027 }
5028
5029 impl gpui::View for EmptyView {
5030 fn ui_name() -> &'static str {
5031 "empty view"
5032 }
5033
5034 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
5035 gpui::Element::boxed(gpui::elements::Empty)
5036 }
5037 }
5038}