1mod store;
2
3use super::{
4 auth::process_auth_header,
5 db::{ChannelId, MessageId, UserId},
6 AppState,
7};
8use anyhow::anyhow;
9use async_io::Timer;
10use async_std::task;
11use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
12use collections::{HashMap, HashSet};
13use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
14use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
15use rpc::{
16 proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
17 Connection, ConnectionId, Peer, TypedEnvelope,
18};
19use sha1::{Digest as _, Sha1};
20use std::{
21 any::TypeId,
22 future::Future,
23 sync::Arc,
24 time::{Duration, Instant},
25};
26use store::{Store, Worktree};
27use surf::StatusCode;
28use tide::log;
29use tide::{
30 http::headers::{HeaderName, CONNECTION, UPGRADE},
31 Request, Response,
32};
33use time::OffsetDateTime;
34
35type MessageHandler = Box<
36 dyn Send
37 + Sync
38 + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
39>;
40
41pub struct Server {
42 peer: Arc<Peer>,
43 store: RwLock<Store>,
44 app_state: Arc<AppState>,
45 handlers: HashMap<TypeId, MessageHandler>,
46 notifications: Option<mpsc::UnboundedSender<()>>,
47}
48
49pub trait Executor: Send + Clone {
50 type Timer: Send + Future;
51 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
52 fn timer(&self, duration: Duration) -> Self::Timer;
53}
54
55#[derive(Clone)]
56pub struct RealExecutor;
57
58const MESSAGE_COUNT_PER_PAGE: usize = 100;
59const MAX_MESSAGE_LEN: usize = 1024;
60
61impl Server {
62 pub fn new(
63 app_state: Arc<AppState>,
64 peer: Arc<Peer>,
65 notifications: Option<mpsc::UnboundedSender<()>>,
66 ) -> Arc<Self> {
67 let mut server = Self {
68 peer,
69 app_state,
70 store: Default::default(),
71 handlers: Default::default(),
72 notifications,
73 };
74
75 server
76 .add_request_handler(Server::ping)
77 .add_request_handler(Server::register_project)
78 .add_message_handler(Server::unregister_project)
79 .add_request_handler(Server::share_project)
80 .add_message_handler(Server::unshare_project)
81 .add_request_handler(Server::join_project)
82 .add_message_handler(Server::leave_project)
83 .add_request_handler(Server::register_worktree)
84 .add_message_handler(Server::unregister_worktree)
85 .add_request_handler(Server::update_worktree)
86 .add_message_handler(Server::update_diagnostic_summary)
87 .add_message_handler(Server::disk_based_diagnostics_updating)
88 .add_message_handler(Server::disk_based_diagnostics_updated)
89 .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
90 .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
91 .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
92 .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
93 .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
94 .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
95 .add_request_handler(Server::forward_project_request::<proto::OpenBuffer>)
96 .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
97 .add_request_handler(
98 Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
99 )
100 .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
101 .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
102 .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
103 .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
104 .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
105 .add_request_handler(Server::update_buffer)
106 .add_message_handler(Server::update_buffer_file)
107 .add_message_handler(Server::buffer_reloaded)
108 .add_message_handler(Server::buffer_saved)
109 .add_request_handler(Server::save_buffer)
110 .add_request_handler(Server::get_channels)
111 .add_request_handler(Server::get_users)
112 .add_request_handler(Server::join_channel)
113 .add_message_handler(Server::leave_channel)
114 .add_request_handler(Server::send_channel_message)
115 .add_request_handler(Server::get_channel_messages);
116
117 Arc::new(server)
118 }
119
120 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
121 where
122 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
123 Fut: 'static + Send + Future<Output = tide::Result<()>>,
124 M: EnvelopedMessage,
125 {
126 let prev_handler = self.handlers.insert(
127 TypeId::of::<M>(),
128 Box::new(move |server, envelope| {
129 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
130 (handler)(server, *envelope).boxed()
131 }),
132 );
133 if prev_handler.is_some() {
134 panic!("registered a handler for the same message twice");
135 }
136 self
137 }
138
139 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
140 where
141 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
142 Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
143 M: RequestMessage,
144 {
145 self.add_message_handler(move |server, envelope| {
146 let receipt = envelope.receipt();
147 let response = (handler)(server.clone(), envelope);
148 async move {
149 match response.await {
150 Ok(response) => {
151 server.peer.respond(receipt, response)?;
152 Ok(())
153 }
154 Err(error) => {
155 server.peer.respond_with_error(
156 receipt,
157 proto::Error {
158 message: error.to_string(),
159 },
160 )?;
161 Err(error)
162 }
163 }
164 }
165 })
166 }
167
168 pub fn handle_connection<E: Executor>(
169 self: &Arc<Self>,
170 connection: Connection,
171 addr: String,
172 user_id: UserId,
173 mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
174 executor: E,
175 ) -> impl Future<Output = ()> {
176 let mut this = self.clone();
177 async move {
178 let (connection_id, handle_io, mut incoming_rx) = this
179 .peer
180 .add_connection(connection, {
181 let executor = executor.clone();
182 move |duration| {
183 let timer = executor.timer(duration);
184 async move {
185 timer.await;
186 }
187 }
188 })
189 .await;
190
191 if let Some(send_connection_id) = send_connection_id.as_mut() {
192 let _ = send_connection_id.send(connection_id).await;
193 }
194
195 this.state_mut().add_connection(connection_id, user_id);
196 if let Err(err) = this.update_contacts_for_users(&[user_id]) {
197 log::error!("error updating contacts for {:?}: {}", user_id, err);
198 }
199
200 let handle_io = handle_io.fuse();
201 futures::pin_mut!(handle_io);
202 loop {
203 let next_message = incoming_rx.next().fuse();
204 futures::pin_mut!(next_message);
205 futures::select_biased! {
206 result = handle_io => {
207 if let Err(err) = result {
208 log::error!("error handling rpc connection {:?} - {:?}", addr, err);
209 }
210 break;
211 }
212 message = next_message => {
213 if let Some(message) = message {
214 let start_time = Instant::now();
215 let type_name = message.payload_type_name();
216 log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
217 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
218 let notifications = this.notifications.clone();
219 let is_background = message.is_background();
220 let handle_message = (handler)(this.clone(), message);
221 let handle_message = async move {
222 if let Err(err) = handle_message.await {
223 log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
224 } else {
225 log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
226 }
227 if let Some(mut notifications) = notifications {
228 let _ = notifications.send(()).await;
229 }
230 };
231 if is_background {
232 executor.spawn_detached(handle_message);
233 } else {
234 handle_message.await;
235 }
236 } else {
237 log::warn!("unhandled message: {}", type_name);
238 }
239 } else {
240 log::info!("rpc connection closed {:?}", addr);
241 break;
242 }
243 }
244 }
245 }
246
247 if let Err(err) = this.sign_out(connection_id).await {
248 log::error!("error signing out connection {:?} - {:?}", addr, err);
249 }
250 }
251 }
252
253 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
254 self.peer.disconnect(connection_id);
255 let removed_connection = self.state_mut().remove_connection(connection_id)?;
256
257 for (project_id, project) in removed_connection.hosted_projects {
258 if let Some(share) = project.share {
259 broadcast(
260 connection_id,
261 share.guests.keys().copied().collect(),
262 |conn_id| {
263 self.peer
264 .send(conn_id, proto::UnshareProject { project_id })
265 },
266 )?;
267 }
268 }
269
270 for (project_id, peer_ids) in removed_connection.guest_project_ids {
271 broadcast(connection_id, peer_ids, |conn_id| {
272 self.peer.send(
273 conn_id,
274 proto::RemoveProjectCollaborator {
275 project_id,
276 peer_id: connection_id.0,
277 },
278 )
279 })?;
280 }
281
282 self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
283 Ok(())
284 }
285
286 async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
287 Ok(proto::Ack {})
288 }
289
290 async fn register_project(
291 mut self: Arc<Server>,
292 request: TypedEnvelope<proto::RegisterProject>,
293 ) -> tide::Result<proto::RegisterProjectResponse> {
294 let project_id = {
295 let mut state = self.state_mut();
296 let user_id = state.user_id_for_connection(request.sender_id)?;
297 state.register_project(request.sender_id, user_id)
298 };
299 Ok(proto::RegisterProjectResponse { project_id })
300 }
301
302 async fn unregister_project(
303 mut self: Arc<Server>,
304 request: TypedEnvelope<proto::UnregisterProject>,
305 ) -> tide::Result<()> {
306 let project = self
307 .state_mut()
308 .unregister_project(request.payload.project_id, request.sender_id)?;
309 self.update_contacts_for_users(project.authorized_user_ids().iter())?;
310 Ok(())
311 }
312
313 async fn share_project(
314 mut self: Arc<Server>,
315 request: TypedEnvelope<proto::ShareProject>,
316 ) -> tide::Result<proto::Ack> {
317 self.state_mut()
318 .share_project(request.payload.project_id, request.sender_id);
319 Ok(proto::Ack {})
320 }
321
322 async fn unshare_project(
323 mut self: Arc<Server>,
324 request: TypedEnvelope<proto::UnshareProject>,
325 ) -> tide::Result<()> {
326 let project_id = request.payload.project_id;
327 let project = self
328 .state_mut()
329 .unshare_project(project_id, request.sender_id)?;
330
331 broadcast(request.sender_id, project.connection_ids, |conn_id| {
332 self.peer
333 .send(conn_id, proto::UnshareProject { project_id })
334 })?;
335 self.update_contacts_for_users(&project.authorized_user_ids)?;
336 Ok(())
337 }
338
339 async fn join_project(
340 mut self: Arc<Server>,
341 request: TypedEnvelope<proto::JoinProject>,
342 ) -> tide::Result<proto::JoinProjectResponse> {
343 let project_id = request.payload.project_id;
344
345 let user_id = self.state().user_id_for_connection(request.sender_id)?;
346 let (response, connection_ids, contact_user_ids) = self
347 .state_mut()
348 .join_project(request.sender_id, user_id, project_id)
349 .and_then(|joined| {
350 let share = joined.project.share()?;
351 let peer_count = share.guests.len();
352 let mut collaborators = Vec::with_capacity(peer_count);
353 collaborators.push(proto::Collaborator {
354 peer_id: joined.project.host_connection_id.0,
355 replica_id: 0,
356 user_id: joined.project.host_user_id.to_proto(),
357 });
358 let worktrees = share
359 .worktrees
360 .iter()
361 .filter_map(|(id, shared_worktree)| {
362 let worktree = joined.project.worktrees.get(&id)?;
363 Some(proto::Worktree {
364 id: *id,
365 root_name: worktree.root_name.clone(),
366 entries: shared_worktree.entries.values().cloned().collect(),
367 diagnostic_summaries: shared_worktree
368 .diagnostic_summaries
369 .values()
370 .cloned()
371 .collect(),
372 visible: worktree.visible,
373 })
374 })
375 .collect();
376 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
377 if *peer_conn_id != request.sender_id {
378 collaborators.push(proto::Collaborator {
379 peer_id: peer_conn_id.0,
380 replica_id: *peer_replica_id as u32,
381 user_id: peer_user_id.to_proto(),
382 });
383 }
384 }
385 let response = proto::JoinProjectResponse {
386 worktrees,
387 replica_id: joined.replica_id as u32,
388 collaborators,
389 };
390 let connection_ids = joined.project.connection_ids();
391 let contact_user_ids = joined.project.authorized_user_ids();
392 Ok((response, connection_ids, contact_user_ids))
393 })?;
394
395 broadcast(request.sender_id, connection_ids, |conn_id| {
396 self.peer.send(
397 conn_id,
398 proto::AddProjectCollaborator {
399 project_id,
400 collaborator: Some(proto::Collaborator {
401 peer_id: request.sender_id.0,
402 replica_id: response.replica_id,
403 user_id: user_id.to_proto(),
404 }),
405 },
406 )
407 })?;
408 self.update_contacts_for_users(&contact_user_ids)?;
409 Ok(response)
410 }
411
412 async fn leave_project(
413 mut self: Arc<Server>,
414 request: TypedEnvelope<proto::LeaveProject>,
415 ) -> tide::Result<()> {
416 let sender_id = request.sender_id;
417 let project_id = request.payload.project_id;
418 let worktree = self.state_mut().leave_project(sender_id, project_id)?;
419
420 broadcast(sender_id, worktree.connection_ids, |conn_id| {
421 self.peer.send(
422 conn_id,
423 proto::RemoveProjectCollaborator {
424 project_id,
425 peer_id: sender_id.0,
426 },
427 )
428 })?;
429 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
430
431 Ok(())
432 }
433
434 async fn register_worktree(
435 mut self: Arc<Server>,
436 request: TypedEnvelope<proto::RegisterWorktree>,
437 ) -> tide::Result<proto::Ack> {
438 let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
439
440 let mut contact_user_ids = HashSet::default();
441 contact_user_ids.insert(host_user_id);
442 for github_login in &request.payload.authorized_logins {
443 let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
444 contact_user_ids.insert(contact_user_id);
445 }
446
447 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
448 let guest_connection_ids;
449 {
450 let mut state = self.state_mut();
451 guest_connection_ids = state
452 .read_project(request.payload.project_id, request.sender_id)?
453 .guest_connection_ids();
454 state.register_worktree(
455 request.payload.project_id,
456 request.payload.worktree_id,
457 request.sender_id,
458 Worktree {
459 authorized_user_ids: contact_user_ids.clone(),
460 root_name: request.payload.root_name.clone(),
461 visible: request.payload.visible,
462 },
463 )?;
464 }
465 broadcast(request.sender_id, guest_connection_ids, |connection_id| {
466 self.peer
467 .forward_send(request.sender_id, connection_id, request.payload.clone())
468 })?;
469 self.update_contacts_for_users(&contact_user_ids)?;
470 Ok(proto::Ack {})
471 }
472
473 async fn unregister_worktree(
474 mut self: Arc<Server>,
475 request: TypedEnvelope<proto::UnregisterWorktree>,
476 ) -> tide::Result<()> {
477 let project_id = request.payload.project_id;
478 let worktree_id = request.payload.worktree_id;
479 let (worktree, guest_connection_ids) =
480 self.state_mut()
481 .unregister_worktree(project_id, worktree_id, request.sender_id)?;
482 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
483 self.peer.send(
484 conn_id,
485 proto::UnregisterWorktree {
486 project_id,
487 worktree_id,
488 },
489 )
490 })?;
491 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
492 Ok(())
493 }
494
495 async fn update_worktree(
496 mut self: Arc<Server>,
497 request: TypedEnvelope<proto::UpdateWorktree>,
498 ) -> tide::Result<proto::Ack> {
499 let connection_ids = self.state_mut().update_worktree(
500 request.sender_id,
501 request.payload.project_id,
502 request.payload.worktree_id,
503 &request.payload.removed_entries,
504 &request.payload.updated_entries,
505 )?;
506
507 broadcast(request.sender_id, connection_ids, |connection_id| {
508 self.peer
509 .forward_send(request.sender_id, connection_id, request.payload.clone())
510 })?;
511
512 Ok(proto::Ack {})
513 }
514
515 async fn update_diagnostic_summary(
516 mut self: Arc<Server>,
517 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
518 ) -> tide::Result<()> {
519 let summary = request
520 .payload
521 .summary
522 .clone()
523 .ok_or_else(|| anyhow!("invalid summary"))?;
524 let receiver_ids = self.state_mut().update_diagnostic_summary(
525 request.payload.project_id,
526 request.payload.worktree_id,
527 request.sender_id,
528 summary,
529 )?;
530
531 broadcast(request.sender_id, receiver_ids, |connection_id| {
532 self.peer
533 .forward_send(request.sender_id, connection_id, request.payload.clone())
534 })?;
535 Ok(())
536 }
537
538 async fn disk_based_diagnostics_updating(
539 self: Arc<Server>,
540 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
541 ) -> tide::Result<()> {
542 let receiver_ids = self
543 .state()
544 .project_connection_ids(request.payload.project_id, request.sender_id)?;
545 broadcast(request.sender_id, receiver_ids, |connection_id| {
546 self.peer
547 .forward_send(request.sender_id, connection_id, request.payload.clone())
548 })?;
549 Ok(())
550 }
551
552 async fn disk_based_diagnostics_updated(
553 self: Arc<Server>,
554 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
555 ) -> tide::Result<()> {
556 let receiver_ids = self
557 .state()
558 .project_connection_ids(request.payload.project_id, request.sender_id)?;
559 broadcast(request.sender_id, receiver_ids, |connection_id| {
560 self.peer
561 .forward_send(request.sender_id, connection_id, request.payload.clone())
562 })?;
563 Ok(())
564 }
565
566 async fn forward_project_request<T>(
567 self: Arc<Server>,
568 request: TypedEnvelope<T>,
569 ) -> tide::Result<T::Response>
570 where
571 T: EntityMessage + RequestMessage,
572 {
573 let host_connection_id = self
574 .state()
575 .read_project(request.payload.remote_entity_id(), request.sender_id)?
576 .host_connection_id;
577 Ok(self
578 .peer
579 .forward_request(request.sender_id, host_connection_id, request.payload)
580 .await?)
581 }
582
583 async fn save_buffer(
584 self: Arc<Server>,
585 request: TypedEnvelope<proto::SaveBuffer>,
586 ) -> tide::Result<proto::BufferSaved> {
587 let host;
588 let mut guests;
589 {
590 let state = self.state();
591 let project = state.read_project(request.payload.project_id, request.sender_id)?;
592 host = project.host_connection_id;
593 guests = project.guest_connection_ids()
594 }
595
596 let response = self
597 .peer
598 .forward_request(request.sender_id, host, request.payload.clone())
599 .await?;
600
601 guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
602 broadcast(host, guests, |conn_id| {
603 self.peer.forward_send(host, conn_id, response.clone())
604 })?;
605
606 Ok(response)
607 }
608
609 async fn update_buffer(
610 self: Arc<Server>,
611 request: TypedEnvelope<proto::UpdateBuffer>,
612 ) -> tide::Result<proto::Ack> {
613 let receiver_ids = self
614 .state()
615 .project_connection_ids(request.payload.project_id, request.sender_id)?;
616 broadcast(request.sender_id, receiver_ids, |connection_id| {
617 self.peer
618 .forward_send(request.sender_id, connection_id, request.payload.clone())
619 })?;
620 Ok(proto::Ack {})
621 }
622
623 async fn update_buffer_file(
624 self: Arc<Server>,
625 request: TypedEnvelope<proto::UpdateBufferFile>,
626 ) -> tide::Result<()> {
627 let receiver_ids = self
628 .state()
629 .project_connection_ids(request.payload.project_id, request.sender_id)?;
630 broadcast(request.sender_id, receiver_ids, |connection_id| {
631 self.peer
632 .forward_send(request.sender_id, connection_id, request.payload.clone())
633 })?;
634 Ok(())
635 }
636
637 async fn buffer_reloaded(
638 self: Arc<Server>,
639 request: TypedEnvelope<proto::BufferReloaded>,
640 ) -> tide::Result<()> {
641 let receiver_ids = self
642 .state()
643 .project_connection_ids(request.payload.project_id, request.sender_id)?;
644 broadcast(request.sender_id, receiver_ids, |connection_id| {
645 self.peer
646 .forward_send(request.sender_id, connection_id, request.payload.clone())
647 })?;
648 Ok(())
649 }
650
651 async fn buffer_saved(
652 self: Arc<Server>,
653 request: TypedEnvelope<proto::BufferSaved>,
654 ) -> tide::Result<()> {
655 let receiver_ids = self
656 .state()
657 .project_connection_ids(request.payload.project_id, request.sender_id)?;
658 broadcast(request.sender_id, receiver_ids, |connection_id| {
659 self.peer
660 .forward_send(request.sender_id, connection_id, request.payload.clone())
661 })?;
662 Ok(())
663 }
664
665 async fn get_channels(
666 self: Arc<Server>,
667 request: TypedEnvelope<proto::GetChannels>,
668 ) -> tide::Result<proto::GetChannelsResponse> {
669 let user_id = self.state().user_id_for_connection(request.sender_id)?;
670 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
671 Ok(proto::GetChannelsResponse {
672 channels: channels
673 .into_iter()
674 .map(|chan| proto::Channel {
675 id: chan.id.to_proto(),
676 name: chan.name,
677 })
678 .collect(),
679 })
680 }
681
682 async fn get_users(
683 self: Arc<Server>,
684 request: TypedEnvelope<proto::GetUsers>,
685 ) -> tide::Result<proto::GetUsersResponse> {
686 let user_ids = request
687 .payload
688 .user_ids
689 .into_iter()
690 .map(UserId::from_proto)
691 .collect();
692 let users = self
693 .app_state
694 .db
695 .get_users_by_ids(user_ids)
696 .await?
697 .into_iter()
698 .map(|user| proto::User {
699 id: user.id.to_proto(),
700 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
701 github_login: user.github_login,
702 })
703 .collect();
704 Ok(proto::GetUsersResponse { users })
705 }
706
707 fn update_contacts_for_users<'a>(
708 self: &Arc<Server>,
709 user_ids: impl IntoIterator<Item = &'a UserId>,
710 ) -> anyhow::Result<()> {
711 let mut result = Ok(());
712 let state = self.state();
713 for user_id in user_ids {
714 let contacts = state.contacts_for_user(*user_id);
715 for connection_id in state.connection_ids_for_user(*user_id) {
716 if let Err(error) = self.peer.send(
717 connection_id,
718 proto::UpdateContacts {
719 contacts: contacts.clone(),
720 },
721 ) {
722 result = Err(error);
723 }
724 }
725 }
726 result
727 }
728
729 async fn join_channel(
730 mut self: Arc<Self>,
731 request: TypedEnvelope<proto::JoinChannel>,
732 ) -> tide::Result<proto::JoinChannelResponse> {
733 let user_id = self.state().user_id_for_connection(request.sender_id)?;
734 let channel_id = ChannelId::from_proto(request.payload.channel_id);
735 if !self
736 .app_state
737 .db
738 .can_user_access_channel(user_id, channel_id)
739 .await?
740 {
741 Err(anyhow!("access denied"))?;
742 }
743
744 self.state_mut().join_channel(request.sender_id, channel_id);
745 let messages = self
746 .app_state
747 .db
748 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
749 .await?
750 .into_iter()
751 .map(|msg| proto::ChannelMessage {
752 id: msg.id.to_proto(),
753 body: msg.body,
754 timestamp: msg.sent_at.unix_timestamp() as u64,
755 sender_id: msg.sender_id.to_proto(),
756 nonce: Some(msg.nonce.as_u128().into()),
757 })
758 .collect::<Vec<_>>();
759 Ok(proto::JoinChannelResponse {
760 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
761 messages,
762 })
763 }
764
765 async fn leave_channel(
766 mut self: Arc<Self>,
767 request: TypedEnvelope<proto::LeaveChannel>,
768 ) -> tide::Result<()> {
769 let user_id = self.state().user_id_for_connection(request.sender_id)?;
770 let channel_id = ChannelId::from_proto(request.payload.channel_id);
771 if !self
772 .app_state
773 .db
774 .can_user_access_channel(user_id, channel_id)
775 .await?
776 {
777 Err(anyhow!("access denied"))?;
778 }
779
780 self.state_mut()
781 .leave_channel(request.sender_id, channel_id);
782
783 Ok(())
784 }
785
786 async fn send_channel_message(
787 self: Arc<Self>,
788 request: TypedEnvelope<proto::SendChannelMessage>,
789 ) -> tide::Result<proto::SendChannelMessageResponse> {
790 let channel_id = ChannelId::from_proto(request.payload.channel_id);
791 let user_id;
792 let connection_ids;
793 {
794 let state = self.state();
795 user_id = state.user_id_for_connection(request.sender_id)?;
796 connection_ids = state.channel_connection_ids(channel_id)?;
797 }
798
799 // Validate the message body.
800 let body = request.payload.body.trim().to_string();
801 if body.len() > MAX_MESSAGE_LEN {
802 return Err(anyhow!("message is too long"))?;
803 }
804 if body.is_empty() {
805 return Err(anyhow!("message can't be blank"))?;
806 }
807
808 let timestamp = OffsetDateTime::now_utc();
809 let nonce = request
810 .payload
811 .nonce
812 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
813
814 let message_id = self
815 .app_state
816 .db
817 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
818 .await?
819 .to_proto();
820 let message = proto::ChannelMessage {
821 sender_id: user_id.to_proto(),
822 id: message_id,
823 body,
824 timestamp: timestamp.unix_timestamp() as u64,
825 nonce: Some(nonce),
826 };
827 broadcast(request.sender_id, connection_ids, |conn_id| {
828 self.peer.send(
829 conn_id,
830 proto::ChannelMessageSent {
831 channel_id: channel_id.to_proto(),
832 message: Some(message.clone()),
833 },
834 )
835 })?;
836 Ok(proto::SendChannelMessageResponse {
837 message: Some(message),
838 })
839 }
840
841 async fn get_channel_messages(
842 self: Arc<Self>,
843 request: TypedEnvelope<proto::GetChannelMessages>,
844 ) -> tide::Result<proto::GetChannelMessagesResponse> {
845 let user_id = self.state().user_id_for_connection(request.sender_id)?;
846 let channel_id = ChannelId::from_proto(request.payload.channel_id);
847 if !self
848 .app_state
849 .db
850 .can_user_access_channel(user_id, channel_id)
851 .await?
852 {
853 Err(anyhow!("access denied"))?;
854 }
855
856 let messages = self
857 .app_state
858 .db
859 .get_channel_messages(
860 channel_id,
861 MESSAGE_COUNT_PER_PAGE,
862 Some(MessageId::from_proto(request.payload.before_message_id)),
863 )
864 .await?
865 .into_iter()
866 .map(|msg| proto::ChannelMessage {
867 id: msg.id.to_proto(),
868 body: msg.body,
869 timestamp: msg.sent_at.unix_timestamp() as u64,
870 sender_id: msg.sender_id.to_proto(),
871 nonce: Some(msg.nonce.as_u128().into()),
872 })
873 .collect::<Vec<_>>();
874
875 Ok(proto::GetChannelMessagesResponse {
876 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
877 messages,
878 })
879 }
880
881 fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
882 self.store.read()
883 }
884
885 fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
886 self.store.write()
887 }
888}
889
890impl Executor for RealExecutor {
891 type Timer = Timer;
892
893 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
894 task::spawn(future);
895 }
896
897 fn timer(&self, duration: Duration) -> Self::Timer {
898 Timer::after(duration)
899 }
900}
901
902fn broadcast<F>(
903 sender_id: ConnectionId,
904 receiver_ids: Vec<ConnectionId>,
905 mut f: F,
906) -> anyhow::Result<()>
907where
908 F: FnMut(ConnectionId) -> anyhow::Result<()>,
909{
910 let mut result = Ok(());
911 for receiver_id in receiver_ids {
912 if receiver_id != sender_id {
913 if let Err(error) = f(receiver_id) {
914 if result.is_ok() {
915 result = Err(error);
916 }
917 }
918 }
919 }
920 result
921}
922
923pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
924 let server = Server::new(app.state().clone(), rpc.clone(), None);
925 app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
926 let server = server.clone();
927 async move {
928 const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
929
930 let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
931 let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
932 let upgrade_requested = connection_upgrade && upgrade_to_websocket;
933 let client_protocol_version: Option<u32> = request
934 .header("X-Zed-Protocol-Version")
935 .and_then(|v| v.as_str().parse().ok());
936
937 if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
938 return Ok(Response::new(StatusCode::UpgradeRequired));
939 }
940
941 let header = match request.header("Sec-Websocket-Key") {
942 Some(h) => h.as_str(),
943 None => return Err(anyhow!("expected sec-websocket-key"))?,
944 };
945
946 let user_id = process_auth_header(&request).await?;
947
948 let mut response = Response::new(StatusCode::SwitchingProtocols);
949 response.insert_header(UPGRADE, "websocket");
950 response.insert_header(CONNECTION, "Upgrade");
951 let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
952 response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
953 response.insert_header("Sec-Websocket-Version", "13");
954
955 let http_res: &mut tide::http::Response = response.as_mut();
956 let upgrade_receiver = http_res.recv_upgrade().await;
957 let addr = request.remote().unwrap_or("unknown").to_string();
958 task::spawn(async move {
959 if let Some(stream) = upgrade_receiver.await {
960 server
961 .handle_connection(
962 Connection::new(
963 WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
964 ),
965 addr,
966 user_id,
967 None,
968 RealExecutor,
969 )
970 .await;
971 }
972 });
973
974 Ok(response)
975 }
976 });
977}
978
979fn header_contains_ignore_case<T>(
980 request: &tide::Request<T>,
981 header_name: HeaderName,
982 value: &str,
983) -> bool {
984 request
985 .header(header_name)
986 .map(|h| {
987 h.as_str()
988 .split(',')
989 .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
990 })
991 .unwrap_or(false)
992}
993
994#[cfg(test)]
995mod tests {
996 use super::*;
997 use crate::{
998 auth,
999 db::{tests::TestDb, UserId},
1000 github, AppState, Config,
1001 };
1002 use ::rpc::Peer;
1003 use client::{
1004 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1005 EstablishConnectionError, UserStore,
1006 };
1007 use collections::BTreeMap;
1008 use editor::{
1009 self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, MultiBuffer,
1010 Redo, Rename, ToOffset, ToggleCodeActions, Undo,
1011 };
1012 use gpui::{executor, ModelHandle, TestAppContext};
1013 use language::{
1014 tree_sitter_rust, Diagnostic, DiagnosticEntry, Language, LanguageConfig, LanguageRegistry,
1015 LanguageServerConfig, OffsetRangeExt, Point, ToLspPosition,
1016 };
1017 use lsp;
1018 use parking_lot::Mutex;
1019 use postage::{barrier, watch};
1020 use project::{
1021 fs::{FakeFs, Fs as _},
1022 search::SearchQuery,
1023 worktree::WorktreeHandle,
1024 DiagnosticSummary, Project, ProjectPath,
1025 };
1026 use rand::prelude::*;
1027 use rpc::PeerId;
1028 use serde_json::json;
1029 use sqlx::types::time::OffsetDateTime;
1030 use std::{
1031 cell::Cell,
1032 env,
1033 ops::Deref,
1034 path::{Path, PathBuf},
1035 rc::Rc,
1036 sync::{
1037 atomic::{AtomicBool, Ordering::SeqCst},
1038 Arc,
1039 },
1040 time::Duration,
1041 };
1042 use workspace::{Settings, Workspace, WorkspaceParams};
1043
1044 #[cfg(test)]
1045 #[ctor::ctor]
1046 fn init_logger() {
1047 if std::env::var("RUST_LOG").is_ok() {
1048 env_logger::init();
1049 }
1050 }
1051
1052 #[gpui::test(iterations = 10)]
1053 async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1054 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1055 let lang_registry = Arc::new(LanguageRegistry::test());
1056 let fs = FakeFs::new(cx_a.background());
1057 cx_a.foreground().forbid_parking();
1058
1059 // Connect to a server as 2 clients.
1060 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1061 let client_a = server.create_client(cx_a, "user_a").await;
1062 let client_b = server.create_client(cx_b, "user_b").await;
1063
1064 // Share a project as client A
1065 fs.insert_tree(
1066 "/a",
1067 json!({
1068 ".zed.toml": r#"collaborators = ["user_b"]"#,
1069 "a.txt": "a-contents",
1070 "b.txt": "b-contents",
1071 }),
1072 )
1073 .await;
1074 let project_a = cx_a.update(|cx| {
1075 Project::local(
1076 client_a.clone(),
1077 client_a.user_store.clone(),
1078 lang_registry.clone(),
1079 fs.clone(),
1080 cx,
1081 )
1082 });
1083 let (worktree_a, _) = project_a
1084 .update(cx_a, |p, cx| {
1085 p.find_or_create_local_worktree("/a", true, cx)
1086 })
1087 .await
1088 .unwrap();
1089 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1090 worktree_a
1091 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1092 .await;
1093 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1094 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1095
1096 // Join that project as client B
1097 let project_b = Project::remote(
1098 project_id,
1099 client_b.clone(),
1100 client_b.user_store.clone(),
1101 lang_registry.clone(),
1102 fs.clone(),
1103 &mut cx_b.to_async(),
1104 )
1105 .await
1106 .unwrap();
1107
1108 let replica_id_b = project_b.read_with(cx_b, |project, _| {
1109 assert_eq!(
1110 project
1111 .collaborators()
1112 .get(&client_a.peer_id)
1113 .unwrap()
1114 .user
1115 .github_login,
1116 "user_a"
1117 );
1118 project.replica_id()
1119 });
1120 project_a
1121 .condition(&cx_a, |tree, _| {
1122 tree.collaborators()
1123 .get(&client_b.peer_id)
1124 .map_or(false, |collaborator| {
1125 collaborator.replica_id == replica_id_b
1126 && collaborator.user.github_login == "user_b"
1127 })
1128 })
1129 .await;
1130
1131 // Open the same file as client B and client A.
1132 let buffer_b = project_b
1133 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1134 .await
1135 .unwrap();
1136 let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1137 buffer_b.read_with(cx_b, |buf, cx| {
1138 assert_eq!(buf.read(cx).text(), "b-contents")
1139 });
1140 project_a.read_with(cx_a, |project, cx| {
1141 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1142 });
1143 let buffer_a = project_a
1144 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1145 .await
1146 .unwrap();
1147
1148 let editor_b = cx_b.add_view(window_b, |cx| {
1149 Editor::for_buffer(
1150 buffer_b,
1151 None,
1152 watch::channel_with(Settings::test(cx)).1,
1153 cx,
1154 )
1155 });
1156
1157 // TODO
1158 // // Create a selection set as client B and see that selection set as client A.
1159 // buffer_a
1160 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1161 // .await;
1162
1163 // Edit the buffer as client B and see that edit as client A.
1164 editor_b.update(cx_b, |editor, cx| {
1165 editor.handle_input(&Input("ok, ".into()), cx)
1166 });
1167 buffer_a
1168 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1169 .await;
1170
1171 // TODO
1172 // // Remove the selection set as client B, see those selections disappear as client A.
1173 cx_b.update(move |_| drop(editor_b));
1174 // buffer_a
1175 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1176 // .await;
1177
1178 // Dropping the client B's project removes client B from client A's collaborators.
1179 cx_b.update(move |_| drop(project_b));
1180 project_a
1181 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1182 .await;
1183 }
1184
1185 #[gpui::test(iterations = 10)]
1186 async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1187 let lang_registry = Arc::new(LanguageRegistry::test());
1188 let fs = FakeFs::new(cx_a.background());
1189 cx_a.foreground().forbid_parking();
1190
1191 // Connect to a server as 2 clients.
1192 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1193 let client_a = server.create_client(cx_a, "user_a").await;
1194 let client_b = server.create_client(cx_b, "user_b").await;
1195
1196 // Share a project as client A
1197 fs.insert_tree(
1198 "/a",
1199 json!({
1200 ".zed.toml": r#"collaborators = ["user_b"]"#,
1201 "a.txt": "a-contents",
1202 "b.txt": "b-contents",
1203 }),
1204 )
1205 .await;
1206 let project_a = cx_a.update(|cx| {
1207 Project::local(
1208 client_a.clone(),
1209 client_a.user_store.clone(),
1210 lang_registry.clone(),
1211 fs.clone(),
1212 cx,
1213 )
1214 });
1215 let (worktree_a, _) = project_a
1216 .update(cx_a, |p, cx| {
1217 p.find_or_create_local_worktree("/a", true, cx)
1218 })
1219 .await
1220 .unwrap();
1221 worktree_a
1222 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1223 .await;
1224 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1225 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1226 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1227 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1228
1229 // Join that project as client B
1230 let project_b = Project::remote(
1231 project_id,
1232 client_b.clone(),
1233 client_b.user_store.clone(),
1234 lang_registry.clone(),
1235 fs.clone(),
1236 &mut cx_b.to_async(),
1237 )
1238 .await
1239 .unwrap();
1240 project_b
1241 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1242 .await
1243 .unwrap();
1244
1245 // Unshare the project as client A
1246 project_a
1247 .update(cx_a, |project, cx| project.unshare(cx))
1248 .await
1249 .unwrap();
1250 project_b
1251 .condition(cx_b, |project, _| project.is_read_only())
1252 .await;
1253 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1254 cx_b.update(|_| {
1255 drop(project_b);
1256 });
1257
1258 // Share the project again and ensure guests can still join.
1259 project_a
1260 .update(cx_a, |project, cx| project.share(cx))
1261 .await
1262 .unwrap();
1263 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1264
1265 let project_b2 = Project::remote(
1266 project_id,
1267 client_b.clone(),
1268 client_b.user_store.clone(),
1269 lang_registry.clone(),
1270 fs.clone(),
1271 &mut cx_b.to_async(),
1272 )
1273 .await
1274 .unwrap();
1275 project_b2
1276 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1277 .await
1278 .unwrap();
1279 }
1280
1281 #[gpui::test(iterations = 10)]
1282 async fn test_propagate_saves_and_fs_changes(
1283 cx_a: &mut TestAppContext,
1284 cx_b: &mut TestAppContext,
1285 cx_c: &mut TestAppContext,
1286 ) {
1287 let lang_registry = Arc::new(LanguageRegistry::test());
1288 let fs = FakeFs::new(cx_a.background());
1289 cx_a.foreground().forbid_parking();
1290
1291 // Connect to a server as 3 clients.
1292 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1293 let client_a = server.create_client(cx_a, "user_a").await;
1294 let client_b = server.create_client(cx_b, "user_b").await;
1295 let client_c = server.create_client(cx_c, "user_c").await;
1296
1297 // Share a worktree as client A.
1298 fs.insert_tree(
1299 "/a",
1300 json!({
1301 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1302 "file1": "",
1303 "file2": ""
1304 }),
1305 )
1306 .await;
1307 let project_a = cx_a.update(|cx| {
1308 Project::local(
1309 client_a.clone(),
1310 client_a.user_store.clone(),
1311 lang_registry.clone(),
1312 fs.clone(),
1313 cx,
1314 )
1315 });
1316 let (worktree_a, _) = project_a
1317 .update(cx_a, |p, cx| {
1318 p.find_or_create_local_worktree("/a", true, cx)
1319 })
1320 .await
1321 .unwrap();
1322 worktree_a
1323 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1324 .await;
1325 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1326 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1327 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1328
1329 // Join that worktree as clients B and C.
1330 let project_b = Project::remote(
1331 project_id,
1332 client_b.clone(),
1333 client_b.user_store.clone(),
1334 lang_registry.clone(),
1335 fs.clone(),
1336 &mut cx_b.to_async(),
1337 )
1338 .await
1339 .unwrap();
1340 let project_c = Project::remote(
1341 project_id,
1342 client_c.clone(),
1343 client_c.user_store.clone(),
1344 lang_registry.clone(),
1345 fs.clone(),
1346 &mut cx_c.to_async(),
1347 )
1348 .await
1349 .unwrap();
1350 let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1351 let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1352
1353 // Open and edit a buffer as both guests B and C.
1354 let buffer_b = project_b
1355 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1356 .await
1357 .unwrap();
1358 let buffer_c = project_c
1359 .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1360 .await
1361 .unwrap();
1362 buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1363 buffer_c.update(cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1364
1365 // Open and edit that buffer as the host.
1366 let buffer_a = project_a
1367 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1368 .await
1369 .unwrap();
1370
1371 buffer_a
1372 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1373 .await;
1374 buffer_a.update(cx_a, |buf, cx| {
1375 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1376 });
1377
1378 // Wait for edits to propagate
1379 buffer_a
1380 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1381 .await;
1382 buffer_b
1383 .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1384 .await;
1385 buffer_c
1386 .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1387 .await;
1388
1389 // Edit the buffer as the host and concurrently save as guest B.
1390 let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1391 buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1392 save_b.await.unwrap();
1393 assert_eq!(
1394 fs.load("/a/file1".as_ref()).await.unwrap(),
1395 "hi-a, i-am-c, i-am-b, i-am-a"
1396 );
1397 buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1398 buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1399 buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1400
1401 worktree_a.flush_fs_events(cx_a).await;
1402
1403 // Make changes on host's file system, see those changes on guest worktrees.
1404 fs.rename(
1405 "/a/file1".as_ref(),
1406 "/a/file1-renamed".as_ref(),
1407 Default::default(),
1408 )
1409 .await
1410 .unwrap();
1411
1412 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1413 .await
1414 .unwrap();
1415 fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1416
1417 worktree_a
1418 .condition(&cx_a, |tree, _| {
1419 tree.paths()
1420 .map(|p| p.to_string_lossy())
1421 .collect::<Vec<_>>()
1422 == [".zed.toml", "file1-renamed", "file3", "file4"]
1423 })
1424 .await;
1425 worktree_b
1426 .condition(&cx_b, |tree, _| {
1427 tree.paths()
1428 .map(|p| p.to_string_lossy())
1429 .collect::<Vec<_>>()
1430 == [".zed.toml", "file1-renamed", "file3", "file4"]
1431 })
1432 .await;
1433 worktree_c
1434 .condition(&cx_c, |tree, _| {
1435 tree.paths()
1436 .map(|p| p.to_string_lossy())
1437 .collect::<Vec<_>>()
1438 == [".zed.toml", "file1-renamed", "file3", "file4"]
1439 })
1440 .await;
1441
1442 // Ensure buffer files are updated as well.
1443 buffer_a
1444 .condition(&cx_a, |buf, _| {
1445 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1446 })
1447 .await;
1448 buffer_b
1449 .condition(&cx_b, |buf, _| {
1450 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1451 })
1452 .await;
1453 buffer_c
1454 .condition(&cx_c, |buf, _| {
1455 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1456 })
1457 .await;
1458 }
1459
1460 #[gpui::test(iterations = 10)]
1461 async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1462 cx_a.foreground().forbid_parking();
1463 let lang_registry = Arc::new(LanguageRegistry::test());
1464 let fs = FakeFs::new(cx_a.background());
1465
1466 // Connect to a server as 2 clients.
1467 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1468 let client_a = server.create_client(cx_a, "user_a").await;
1469 let client_b = server.create_client(cx_b, "user_b").await;
1470
1471 // Share a project as client A
1472 fs.insert_tree(
1473 "/dir",
1474 json!({
1475 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1476 "a.txt": "a-contents",
1477 }),
1478 )
1479 .await;
1480
1481 let project_a = cx_a.update(|cx| {
1482 Project::local(
1483 client_a.clone(),
1484 client_a.user_store.clone(),
1485 lang_registry.clone(),
1486 fs.clone(),
1487 cx,
1488 )
1489 });
1490 let (worktree_a, _) = project_a
1491 .update(cx_a, |p, cx| {
1492 p.find_or_create_local_worktree("/dir", true, cx)
1493 })
1494 .await
1495 .unwrap();
1496 worktree_a
1497 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1498 .await;
1499 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1500 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1501 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1502
1503 // Join that project as client B
1504 let project_b = Project::remote(
1505 project_id,
1506 client_b.clone(),
1507 client_b.user_store.clone(),
1508 lang_registry.clone(),
1509 fs.clone(),
1510 &mut cx_b.to_async(),
1511 )
1512 .await
1513 .unwrap();
1514
1515 // Open a buffer as client B
1516 let buffer_b = project_b
1517 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1518 .await
1519 .unwrap();
1520
1521 buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1522 buffer_b.read_with(cx_b, |buf, _| {
1523 assert!(buf.is_dirty());
1524 assert!(!buf.has_conflict());
1525 });
1526
1527 buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
1528 buffer_b
1529 .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1530 .await;
1531 buffer_b.read_with(cx_b, |buf, _| {
1532 assert!(!buf.has_conflict());
1533 });
1534
1535 buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1536 buffer_b.read_with(cx_b, |buf, _| {
1537 assert!(buf.is_dirty());
1538 assert!(!buf.has_conflict());
1539 });
1540 }
1541
1542 #[gpui::test(iterations = 10)]
1543 async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1544 cx_a.foreground().forbid_parking();
1545 let lang_registry = Arc::new(LanguageRegistry::test());
1546 let fs = FakeFs::new(cx_a.background());
1547
1548 // Connect to a server as 2 clients.
1549 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1550 let client_a = server.create_client(cx_a, "user_a").await;
1551 let client_b = server.create_client(cx_b, "user_b").await;
1552
1553 // Share a project as client A
1554 fs.insert_tree(
1555 "/dir",
1556 json!({
1557 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1558 "a.txt": "a-contents",
1559 }),
1560 )
1561 .await;
1562
1563 let project_a = cx_a.update(|cx| {
1564 Project::local(
1565 client_a.clone(),
1566 client_a.user_store.clone(),
1567 lang_registry.clone(),
1568 fs.clone(),
1569 cx,
1570 )
1571 });
1572 let (worktree_a, _) = project_a
1573 .update(cx_a, |p, cx| {
1574 p.find_or_create_local_worktree("/dir", true, cx)
1575 })
1576 .await
1577 .unwrap();
1578 worktree_a
1579 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1580 .await;
1581 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1582 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1583 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1584
1585 // Join that project as client B
1586 let project_b = Project::remote(
1587 project_id,
1588 client_b.clone(),
1589 client_b.user_store.clone(),
1590 lang_registry.clone(),
1591 fs.clone(),
1592 &mut cx_b.to_async(),
1593 )
1594 .await
1595 .unwrap();
1596 let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1597
1598 // Open a buffer as client B
1599 let buffer_b = project_b
1600 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1601 .await
1602 .unwrap();
1603 buffer_b.read_with(cx_b, |buf, _| {
1604 assert!(!buf.is_dirty());
1605 assert!(!buf.has_conflict());
1606 });
1607
1608 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1609 .await
1610 .unwrap();
1611 buffer_b
1612 .condition(&cx_b, |buf, _| {
1613 buf.text() == "new contents" && !buf.is_dirty()
1614 })
1615 .await;
1616 buffer_b.read_with(cx_b, |buf, _| {
1617 assert!(!buf.has_conflict());
1618 });
1619 }
1620
1621 #[gpui::test(iterations = 10)]
1622 async fn test_editing_while_guest_opens_buffer(
1623 cx_a: &mut TestAppContext,
1624 cx_b: &mut TestAppContext,
1625 ) {
1626 cx_a.foreground().forbid_parking();
1627 let lang_registry = Arc::new(LanguageRegistry::test());
1628 let fs = FakeFs::new(cx_a.background());
1629
1630 // Connect to a server as 2 clients.
1631 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1632 let client_a = server.create_client(cx_a, "user_a").await;
1633 let client_b = server.create_client(cx_b, "user_b").await;
1634
1635 // Share a project as client A
1636 fs.insert_tree(
1637 "/dir",
1638 json!({
1639 ".zed.toml": r#"collaborators = ["user_b"]"#,
1640 "a.txt": "a-contents",
1641 }),
1642 )
1643 .await;
1644 let project_a = cx_a.update(|cx| {
1645 Project::local(
1646 client_a.clone(),
1647 client_a.user_store.clone(),
1648 lang_registry.clone(),
1649 fs.clone(),
1650 cx,
1651 )
1652 });
1653 let (worktree_a, _) = project_a
1654 .update(cx_a, |p, cx| {
1655 p.find_or_create_local_worktree("/dir", true, cx)
1656 })
1657 .await
1658 .unwrap();
1659 worktree_a
1660 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1661 .await;
1662 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1663 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1664 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1665
1666 // Join that project as client B
1667 let project_b = Project::remote(
1668 project_id,
1669 client_b.clone(),
1670 client_b.user_store.clone(),
1671 lang_registry.clone(),
1672 fs.clone(),
1673 &mut cx_b.to_async(),
1674 )
1675 .await
1676 .unwrap();
1677
1678 // Open a buffer as client A
1679 let buffer_a = project_a
1680 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1681 .await
1682 .unwrap();
1683
1684 // Start opening the same buffer as client B
1685 let buffer_b = cx_b
1686 .background()
1687 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1688
1689 // Edit the buffer as client A while client B is still opening it.
1690 cx_b.background().simulate_random_delay().await;
1691 buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1692 cx_b.background().simulate_random_delay().await;
1693 buffer_a.update(cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1694
1695 let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
1696 let buffer_b = buffer_b.await.unwrap();
1697 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1698 }
1699
1700 #[gpui::test(iterations = 10)]
1701 async fn test_leaving_worktree_while_opening_buffer(
1702 cx_a: &mut TestAppContext,
1703 cx_b: &mut TestAppContext,
1704 ) {
1705 cx_a.foreground().forbid_parking();
1706 let lang_registry = Arc::new(LanguageRegistry::test());
1707 let fs = FakeFs::new(cx_a.background());
1708
1709 // Connect to a server as 2 clients.
1710 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1711 let client_a = server.create_client(cx_a, "user_a").await;
1712 let client_b = server.create_client(cx_b, "user_b").await;
1713
1714 // Share a project as client A
1715 fs.insert_tree(
1716 "/dir",
1717 json!({
1718 ".zed.toml": r#"collaborators = ["user_b"]"#,
1719 "a.txt": "a-contents",
1720 }),
1721 )
1722 .await;
1723 let project_a = cx_a.update(|cx| {
1724 Project::local(
1725 client_a.clone(),
1726 client_a.user_store.clone(),
1727 lang_registry.clone(),
1728 fs.clone(),
1729 cx,
1730 )
1731 });
1732 let (worktree_a, _) = project_a
1733 .update(cx_a, |p, cx| {
1734 p.find_or_create_local_worktree("/dir", true, cx)
1735 })
1736 .await
1737 .unwrap();
1738 worktree_a
1739 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1740 .await;
1741 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1742 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1743 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1744
1745 // Join that project as client B
1746 let project_b = Project::remote(
1747 project_id,
1748 client_b.clone(),
1749 client_b.user_store.clone(),
1750 lang_registry.clone(),
1751 fs.clone(),
1752 &mut cx_b.to_async(),
1753 )
1754 .await
1755 .unwrap();
1756
1757 // See that a guest has joined as client A.
1758 project_a
1759 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1760 .await;
1761
1762 // Begin opening a buffer as client B, but leave the project before the open completes.
1763 let buffer_b = cx_b
1764 .background()
1765 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1766 cx_b.update(|_| drop(project_b));
1767 drop(buffer_b);
1768
1769 // See that the guest has left.
1770 project_a
1771 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1772 .await;
1773 }
1774
1775 #[gpui::test(iterations = 10)]
1776 async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1777 cx_a.foreground().forbid_parking();
1778 let lang_registry = Arc::new(LanguageRegistry::test());
1779 let fs = FakeFs::new(cx_a.background());
1780
1781 // Connect to a server as 2 clients.
1782 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1783 let client_a = server.create_client(cx_a, "user_a").await;
1784 let client_b = server.create_client(cx_b, "user_b").await;
1785
1786 // Share a project as client A
1787 fs.insert_tree(
1788 "/a",
1789 json!({
1790 ".zed.toml": r#"collaborators = ["user_b"]"#,
1791 "a.txt": "a-contents",
1792 "b.txt": "b-contents",
1793 }),
1794 )
1795 .await;
1796 let project_a = cx_a.update(|cx| {
1797 Project::local(
1798 client_a.clone(),
1799 client_a.user_store.clone(),
1800 lang_registry.clone(),
1801 fs.clone(),
1802 cx,
1803 )
1804 });
1805 let (worktree_a, _) = project_a
1806 .update(cx_a, |p, cx| {
1807 p.find_or_create_local_worktree("/a", true, cx)
1808 })
1809 .await
1810 .unwrap();
1811 worktree_a
1812 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1813 .await;
1814 let project_id = project_a
1815 .update(cx_a, |project, _| project.next_remote_id())
1816 .await;
1817 project_a
1818 .update(cx_a, |project, cx| project.share(cx))
1819 .await
1820 .unwrap();
1821
1822 // Join that project as client B
1823 let _project_b = Project::remote(
1824 project_id,
1825 client_b.clone(),
1826 client_b.user_store.clone(),
1827 lang_registry.clone(),
1828 fs.clone(),
1829 &mut cx_b.to_async(),
1830 )
1831 .await
1832 .unwrap();
1833
1834 // Client A sees that a guest has joined.
1835 project_a
1836 .condition(cx_a, |p, _| p.collaborators().len() == 1)
1837 .await;
1838
1839 // Drop client B's connection and ensure client A observes client B leaving the project.
1840 client_b.disconnect(&cx_b.to_async()).unwrap();
1841 project_a
1842 .condition(cx_a, |p, _| p.collaborators().len() == 0)
1843 .await;
1844
1845 // Rejoin the project as client B
1846 let _project_b = Project::remote(
1847 project_id,
1848 client_b.clone(),
1849 client_b.user_store.clone(),
1850 lang_registry.clone(),
1851 fs.clone(),
1852 &mut cx_b.to_async(),
1853 )
1854 .await
1855 .unwrap();
1856
1857 // Client A sees that a guest has re-joined.
1858 project_a
1859 .condition(cx_a, |p, _| p.collaborators().len() == 1)
1860 .await;
1861
1862 // Simulate connection loss for client B and ensure client A observes client B leaving the project.
1863 client_b.wait_for_current_user(cx_b).await;
1864 server.disconnect_client(client_b.current_user_id(cx_b));
1865 cx_a.foreground().advance_clock(Duration::from_secs(3));
1866 project_a
1867 .condition(cx_a, |p, _| p.collaborators().len() == 0)
1868 .await;
1869 }
1870
1871 #[gpui::test(iterations = 10)]
1872 async fn test_collaborating_with_diagnostics(
1873 cx_a: &mut TestAppContext,
1874 cx_b: &mut TestAppContext,
1875 ) {
1876 cx_a.foreground().forbid_parking();
1877 let mut lang_registry = Arc::new(LanguageRegistry::test());
1878 let fs = FakeFs::new(cx_a.background());
1879
1880 // Set up a fake language server.
1881 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
1882 Arc::get_mut(&mut lang_registry)
1883 .unwrap()
1884 .add(Arc::new(Language::new(
1885 LanguageConfig {
1886 name: "Rust".into(),
1887 path_suffixes: vec!["rs".to_string()],
1888 language_server: Some(language_server_config),
1889 ..Default::default()
1890 },
1891 Some(tree_sitter_rust::language()),
1892 )));
1893
1894 // Connect to a server as 2 clients.
1895 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1896 let client_a = server.create_client(cx_a, "user_a").await;
1897 let client_b = server.create_client(cx_b, "user_b").await;
1898
1899 // Share a project as client A
1900 fs.insert_tree(
1901 "/a",
1902 json!({
1903 ".zed.toml": r#"collaborators = ["user_b"]"#,
1904 "a.rs": "let one = two",
1905 "other.rs": "",
1906 }),
1907 )
1908 .await;
1909 let project_a = cx_a.update(|cx| {
1910 Project::local(
1911 client_a.clone(),
1912 client_a.user_store.clone(),
1913 lang_registry.clone(),
1914 fs.clone(),
1915 cx,
1916 )
1917 });
1918 let (worktree_a, _) = project_a
1919 .update(cx_a, |p, cx| {
1920 p.find_or_create_local_worktree("/a", true, cx)
1921 })
1922 .await
1923 .unwrap();
1924 worktree_a
1925 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1926 .await;
1927 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1928 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1929 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1930
1931 // Cause the language server to start.
1932 let _ = cx_a
1933 .background()
1934 .spawn(project_a.update(cx_a, |project, cx| {
1935 project.open_buffer(
1936 ProjectPath {
1937 worktree_id,
1938 path: Path::new("other.rs").into(),
1939 },
1940 cx,
1941 )
1942 }))
1943 .await
1944 .unwrap();
1945
1946 // Simulate a language server reporting errors for a file.
1947 let mut fake_language_server = fake_language_servers.next().await.unwrap();
1948 fake_language_server
1949 .receive_notification::<lsp::notification::DidOpenTextDocument>()
1950 .await;
1951 fake_language_server
1952 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1953 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1954 version: None,
1955 diagnostics: vec![lsp::Diagnostic {
1956 severity: Some(lsp::DiagnosticSeverity::ERROR),
1957 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1958 message: "message 1".to_string(),
1959 ..Default::default()
1960 }],
1961 })
1962 .await;
1963
1964 // Wait for server to see the diagnostics update.
1965 server
1966 .condition(|store| {
1967 let worktree = store
1968 .project(project_id)
1969 .unwrap()
1970 .share
1971 .as_ref()
1972 .unwrap()
1973 .worktrees
1974 .get(&worktree_id.to_proto())
1975 .unwrap();
1976
1977 !worktree.diagnostic_summaries.is_empty()
1978 })
1979 .await;
1980
1981 // Join the worktree as client B.
1982 let project_b = Project::remote(
1983 project_id,
1984 client_b.clone(),
1985 client_b.user_store.clone(),
1986 lang_registry.clone(),
1987 fs.clone(),
1988 &mut cx_b.to_async(),
1989 )
1990 .await
1991 .unwrap();
1992
1993 project_b.read_with(cx_b, |project, cx| {
1994 assert_eq!(
1995 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
1996 &[(
1997 ProjectPath {
1998 worktree_id,
1999 path: Arc::from(Path::new("a.rs")),
2000 },
2001 DiagnosticSummary {
2002 error_count: 1,
2003 warning_count: 0,
2004 ..Default::default()
2005 },
2006 )]
2007 )
2008 });
2009
2010 // Simulate a language server reporting more errors for a file.
2011 fake_language_server
2012 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2013 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2014 version: None,
2015 diagnostics: vec![
2016 lsp::Diagnostic {
2017 severity: Some(lsp::DiagnosticSeverity::ERROR),
2018 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2019 message: "message 1".to_string(),
2020 ..Default::default()
2021 },
2022 lsp::Diagnostic {
2023 severity: Some(lsp::DiagnosticSeverity::WARNING),
2024 range: lsp::Range::new(
2025 lsp::Position::new(0, 10),
2026 lsp::Position::new(0, 13),
2027 ),
2028 message: "message 2".to_string(),
2029 ..Default::default()
2030 },
2031 ],
2032 })
2033 .await;
2034
2035 // Client b gets the updated summaries
2036 project_b
2037 .condition(&cx_b, |project, cx| {
2038 project.diagnostic_summaries(cx).collect::<Vec<_>>()
2039 == &[(
2040 ProjectPath {
2041 worktree_id,
2042 path: Arc::from(Path::new("a.rs")),
2043 },
2044 DiagnosticSummary {
2045 error_count: 1,
2046 warning_count: 1,
2047 ..Default::default()
2048 },
2049 )]
2050 })
2051 .await;
2052
2053 // Open the file with the errors on client B. They should be present.
2054 let buffer_b = cx_b
2055 .background()
2056 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2057 .await
2058 .unwrap();
2059
2060 buffer_b.read_with(cx_b, |buffer, _| {
2061 assert_eq!(
2062 buffer
2063 .snapshot()
2064 .diagnostics_in_range::<_, Point>(0..buffer.len())
2065 .map(|entry| entry)
2066 .collect::<Vec<_>>(),
2067 &[
2068 DiagnosticEntry {
2069 range: Point::new(0, 4)..Point::new(0, 7),
2070 diagnostic: Diagnostic {
2071 group_id: 0,
2072 message: "message 1".to_string(),
2073 severity: lsp::DiagnosticSeverity::ERROR,
2074 is_primary: true,
2075 ..Default::default()
2076 }
2077 },
2078 DiagnosticEntry {
2079 range: Point::new(0, 10)..Point::new(0, 13),
2080 diagnostic: Diagnostic {
2081 group_id: 1,
2082 severity: lsp::DiagnosticSeverity::WARNING,
2083 message: "message 2".to_string(),
2084 is_primary: true,
2085 ..Default::default()
2086 }
2087 }
2088 ]
2089 );
2090 });
2091 }
2092
2093 #[gpui::test(iterations = 10)]
2094 async fn test_collaborating_with_completion(
2095 cx_a: &mut TestAppContext,
2096 cx_b: &mut TestAppContext,
2097 ) {
2098 cx_a.foreground().forbid_parking();
2099 let mut lang_registry = Arc::new(LanguageRegistry::test());
2100 let fs = FakeFs::new(cx_a.background());
2101
2102 // Set up a fake language server.
2103 let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2104 language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2105 completion_provider: Some(lsp::CompletionOptions {
2106 trigger_characters: Some(vec![".".to_string()]),
2107 ..Default::default()
2108 }),
2109 ..Default::default()
2110 });
2111 Arc::get_mut(&mut lang_registry)
2112 .unwrap()
2113 .add(Arc::new(Language::new(
2114 LanguageConfig {
2115 name: "Rust".into(),
2116 path_suffixes: vec!["rs".to_string()],
2117 language_server: Some(language_server_config),
2118 ..Default::default()
2119 },
2120 Some(tree_sitter_rust::language()),
2121 )));
2122
2123 // Connect to a server as 2 clients.
2124 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2125 let client_a = server.create_client(cx_a, "user_a").await;
2126 let client_b = server.create_client(cx_b, "user_b").await;
2127
2128 // Share a project as client A
2129 fs.insert_tree(
2130 "/a",
2131 json!({
2132 ".zed.toml": r#"collaborators = ["user_b"]"#,
2133 "main.rs": "fn main() { a }",
2134 "other.rs": "",
2135 }),
2136 )
2137 .await;
2138 let project_a = cx_a.update(|cx| {
2139 Project::local(
2140 client_a.clone(),
2141 client_a.user_store.clone(),
2142 lang_registry.clone(),
2143 fs.clone(),
2144 cx,
2145 )
2146 });
2147 let (worktree_a, _) = project_a
2148 .update(cx_a, |p, cx| {
2149 p.find_or_create_local_worktree("/a", true, cx)
2150 })
2151 .await
2152 .unwrap();
2153 worktree_a
2154 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2155 .await;
2156 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2157 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2158 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2159
2160 // Join the worktree as client B.
2161 let project_b = Project::remote(
2162 project_id,
2163 client_b.clone(),
2164 client_b.user_store.clone(),
2165 lang_registry.clone(),
2166 fs.clone(),
2167 &mut cx_b.to_async(),
2168 )
2169 .await
2170 .unwrap();
2171
2172 // Open a file in an editor as the guest.
2173 let buffer_b = project_b
2174 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2175 .await
2176 .unwrap();
2177 let (window_b, _) = cx_b.add_window(|_| EmptyView);
2178 let editor_b = cx_b.add_view(window_b, |cx| {
2179 Editor::for_buffer(
2180 cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2181 Some(project_b.clone()),
2182 watch::channel_with(Settings::test(cx)).1,
2183 cx,
2184 )
2185 });
2186
2187 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2188 buffer_b
2189 .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2190 .await;
2191
2192 // Type a completion trigger character as the guest.
2193 editor_b.update(cx_b, |editor, cx| {
2194 editor.select_ranges([13..13], None, cx);
2195 editor.handle_input(&Input(".".into()), cx);
2196 cx.focus(&editor_b);
2197 });
2198
2199 // Receive a completion request as the host's language server.
2200 // Return some completions from the host's language server.
2201 cx_a.foreground().start_waiting();
2202 fake_language_server
2203 .handle_request::<lsp::request::Completion, _>(|params, _| {
2204 assert_eq!(
2205 params.text_document_position.text_document.uri,
2206 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2207 );
2208 assert_eq!(
2209 params.text_document_position.position,
2210 lsp::Position::new(0, 14),
2211 );
2212
2213 Some(lsp::CompletionResponse::Array(vec![
2214 lsp::CompletionItem {
2215 label: "first_method(…)".into(),
2216 detail: Some("fn(&mut self, B) -> C".into()),
2217 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2218 new_text: "first_method($1)".to_string(),
2219 range: lsp::Range::new(
2220 lsp::Position::new(0, 14),
2221 lsp::Position::new(0, 14),
2222 ),
2223 })),
2224 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2225 ..Default::default()
2226 },
2227 lsp::CompletionItem {
2228 label: "second_method(…)".into(),
2229 detail: Some("fn(&mut self, C) -> D<E>".into()),
2230 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2231 new_text: "second_method()".to_string(),
2232 range: lsp::Range::new(
2233 lsp::Position::new(0, 14),
2234 lsp::Position::new(0, 14),
2235 ),
2236 })),
2237 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2238 ..Default::default()
2239 },
2240 ]))
2241 })
2242 .next()
2243 .await
2244 .unwrap();
2245 cx_a.foreground().finish_waiting();
2246
2247 // Open the buffer on the host.
2248 let buffer_a = project_a
2249 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2250 .await
2251 .unwrap();
2252 buffer_a
2253 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2254 .await;
2255
2256 // Confirm a completion on the guest.
2257 editor_b
2258 .condition(&cx_b, |editor, _| editor.context_menu_visible())
2259 .await;
2260 editor_b.update(cx_b, |editor, cx| {
2261 editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2262 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2263 });
2264
2265 // Return a resolved completion from the host's language server.
2266 // The resolved completion has an additional text edit.
2267 fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(
2268 |params, _| {
2269 assert_eq!(params.label, "first_method(…)");
2270 lsp::CompletionItem {
2271 label: "first_method(…)".into(),
2272 detail: Some("fn(&mut self, B) -> C".into()),
2273 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2274 new_text: "first_method($1)".to_string(),
2275 range: lsp::Range::new(
2276 lsp::Position::new(0, 14),
2277 lsp::Position::new(0, 14),
2278 ),
2279 })),
2280 additional_text_edits: Some(vec![lsp::TextEdit {
2281 new_text: "use d::SomeTrait;\n".to_string(),
2282 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2283 }]),
2284 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2285 ..Default::default()
2286 }
2287 },
2288 );
2289
2290 // The additional edit is applied.
2291 buffer_a
2292 .condition(&cx_a, |buffer, _| {
2293 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2294 })
2295 .await;
2296 buffer_b
2297 .condition(&cx_b, |buffer, _| {
2298 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2299 })
2300 .await;
2301 }
2302
2303 #[gpui::test(iterations = 10)]
2304 async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2305 cx_a.foreground().forbid_parking();
2306 let mut lang_registry = Arc::new(LanguageRegistry::test());
2307 let fs = FakeFs::new(cx_a.background());
2308
2309 // Set up a fake language server.
2310 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2311 Arc::get_mut(&mut lang_registry)
2312 .unwrap()
2313 .add(Arc::new(Language::new(
2314 LanguageConfig {
2315 name: "Rust".into(),
2316 path_suffixes: vec!["rs".to_string()],
2317 language_server: Some(language_server_config),
2318 ..Default::default()
2319 },
2320 Some(tree_sitter_rust::language()),
2321 )));
2322
2323 // Connect to a server as 2 clients.
2324 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2325 let client_a = server.create_client(cx_a, "user_a").await;
2326 let client_b = server.create_client(cx_b, "user_b").await;
2327
2328 // Share a project as client A
2329 fs.insert_tree(
2330 "/a",
2331 json!({
2332 ".zed.toml": r#"collaborators = ["user_b"]"#,
2333 "a.rs": "let one = two",
2334 }),
2335 )
2336 .await;
2337 let project_a = cx_a.update(|cx| {
2338 Project::local(
2339 client_a.clone(),
2340 client_a.user_store.clone(),
2341 lang_registry.clone(),
2342 fs.clone(),
2343 cx,
2344 )
2345 });
2346 let (worktree_a, _) = project_a
2347 .update(cx_a, |p, cx| {
2348 p.find_or_create_local_worktree("/a", true, cx)
2349 })
2350 .await
2351 .unwrap();
2352 worktree_a
2353 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2354 .await;
2355 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2356 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2357 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2358
2359 // Join the worktree as client B.
2360 let project_b = Project::remote(
2361 project_id,
2362 client_b.clone(),
2363 client_b.user_store.clone(),
2364 lang_registry.clone(),
2365 fs.clone(),
2366 &mut cx_b.to_async(),
2367 )
2368 .await
2369 .unwrap();
2370
2371 let buffer_b = cx_b
2372 .background()
2373 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2374 .await
2375 .unwrap();
2376
2377 let format = project_b.update(cx_b, |project, cx| {
2378 project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2379 });
2380
2381 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2382 fake_language_server.handle_request::<lsp::request::Formatting, _>(|_, _| {
2383 Some(vec![
2384 lsp::TextEdit {
2385 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2386 new_text: "h".to_string(),
2387 },
2388 lsp::TextEdit {
2389 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2390 new_text: "y".to_string(),
2391 },
2392 ])
2393 });
2394
2395 format.await.unwrap();
2396 assert_eq!(
2397 buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
2398 "let honey = two"
2399 );
2400 }
2401
2402 #[gpui::test(iterations = 10)]
2403 async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2404 cx_a.foreground().forbid_parking();
2405 let mut lang_registry = Arc::new(LanguageRegistry::test());
2406 let fs = FakeFs::new(cx_a.background());
2407 fs.insert_tree(
2408 "/root-1",
2409 json!({
2410 ".zed.toml": r#"collaborators = ["user_b"]"#,
2411 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2412 }),
2413 )
2414 .await;
2415 fs.insert_tree(
2416 "/root-2",
2417 json!({
2418 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2419 }),
2420 )
2421 .await;
2422
2423 // Set up a fake language server.
2424 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2425 Arc::get_mut(&mut lang_registry)
2426 .unwrap()
2427 .add(Arc::new(Language::new(
2428 LanguageConfig {
2429 name: "Rust".into(),
2430 path_suffixes: vec!["rs".to_string()],
2431 language_server: Some(language_server_config),
2432 ..Default::default()
2433 },
2434 Some(tree_sitter_rust::language()),
2435 )));
2436
2437 // Connect to a server as 2 clients.
2438 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2439 let client_a = server.create_client(cx_a, "user_a").await;
2440 let client_b = server.create_client(cx_b, "user_b").await;
2441
2442 // Share a project as client A
2443 let project_a = cx_a.update(|cx| {
2444 Project::local(
2445 client_a.clone(),
2446 client_a.user_store.clone(),
2447 lang_registry.clone(),
2448 fs.clone(),
2449 cx,
2450 )
2451 });
2452 let (worktree_a, _) = project_a
2453 .update(cx_a, |p, cx| {
2454 p.find_or_create_local_worktree("/root-1", true, cx)
2455 })
2456 .await
2457 .unwrap();
2458 worktree_a
2459 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2460 .await;
2461 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2462 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2463 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2464
2465 // Join the worktree as client B.
2466 let project_b = Project::remote(
2467 project_id,
2468 client_b.clone(),
2469 client_b.user_store.clone(),
2470 lang_registry.clone(),
2471 fs.clone(),
2472 &mut cx_b.to_async(),
2473 )
2474 .await
2475 .unwrap();
2476
2477 // Open the file on client B.
2478 let buffer_b = cx_b
2479 .background()
2480 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2481 .await
2482 .unwrap();
2483
2484 // Request the definition of a symbol as the guest.
2485 let definitions_1 = project_b.update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2486
2487 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2488 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2489 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2490 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2491 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2492 )))
2493 });
2494
2495 let definitions_1 = definitions_1.await.unwrap();
2496 cx_b.read(|cx| {
2497 assert_eq!(definitions_1.len(), 1);
2498 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2499 let target_buffer = definitions_1[0].buffer.read(cx);
2500 assert_eq!(
2501 target_buffer.text(),
2502 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2503 );
2504 assert_eq!(
2505 definitions_1[0].range.to_point(target_buffer),
2506 Point::new(0, 6)..Point::new(0, 9)
2507 );
2508 });
2509
2510 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2511 // the previous call to `definition`.
2512 let definitions_2 = project_b.update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2513 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2514 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2515 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2516 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2517 )))
2518 });
2519
2520 let definitions_2 = definitions_2.await.unwrap();
2521 cx_b.read(|cx| {
2522 assert_eq!(definitions_2.len(), 1);
2523 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2524 let target_buffer = definitions_2[0].buffer.read(cx);
2525 assert_eq!(
2526 target_buffer.text(),
2527 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2528 );
2529 assert_eq!(
2530 definitions_2[0].range.to_point(target_buffer),
2531 Point::new(1, 6)..Point::new(1, 11)
2532 );
2533 });
2534 assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
2535 }
2536
2537 #[gpui::test(iterations = 10)]
2538 async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2539 cx_a.foreground().forbid_parking();
2540 let mut lang_registry = Arc::new(LanguageRegistry::test());
2541 let fs = FakeFs::new(cx_a.background());
2542 fs.insert_tree(
2543 "/root-1",
2544 json!({
2545 ".zed.toml": r#"collaborators = ["user_b"]"#,
2546 "one.rs": "const ONE: usize = 1;",
2547 "two.rs": "const TWO: usize = one::ONE + one::ONE;",
2548 }),
2549 )
2550 .await;
2551 fs.insert_tree(
2552 "/root-2",
2553 json!({
2554 "three.rs": "const THREE: usize = two::TWO + one::ONE;",
2555 }),
2556 )
2557 .await;
2558
2559 // Set up a fake language server.
2560 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2561 Arc::get_mut(&mut lang_registry)
2562 .unwrap()
2563 .add(Arc::new(Language::new(
2564 LanguageConfig {
2565 name: "Rust".into(),
2566 path_suffixes: vec!["rs".to_string()],
2567 language_server: Some(language_server_config),
2568 ..Default::default()
2569 },
2570 Some(tree_sitter_rust::language()),
2571 )));
2572
2573 // Connect to a server as 2 clients.
2574 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2575 let client_a = server.create_client(cx_a, "user_a").await;
2576 let client_b = server.create_client(cx_b, "user_b").await;
2577
2578 // Share a project as client A
2579 let project_a = cx_a.update(|cx| {
2580 Project::local(
2581 client_a.clone(),
2582 client_a.user_store.clone(),
2583 lang_registry.clone(),
2584 fs.clone(),
2585 cx,
2586 )
2587 });
2588 let (worktree_a, _) = project_a
2589 .update(cx_a, |p, cx| {
2590 p.find_or_create_local_worktree("/root-1", true, cx)
2591 })
2592 .await
2593 .unwrap();
2594 worktree_a
2595 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2596 .await;
2597 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2598 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2599 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2600
2601 // Join the worktree as client B.
2602 let project_b = Project::remote(
2603 project_id,
2604 client_b.clone(),
2605 client_b.user_store.clone(),
2606 lang_registry.clone(),
2607 fs.clone(),
2608 &mut cx_b.to_async(),
2609 )
2610 .await
2611 .unwrap();
2612
2613 // Open the file on client B.
2614 let buffer_b = cx_b
2615 .background()
2616 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
2617 .await
2618 .unwrap();
2619
2620 // Request references to a symbol as the guest.
2621 let references = project_b.update(cx_b, |p, cx| p.references(&buffer_b, 7, cx));
2622
2623 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2624 fake_language_server.handle_request::<lsp::request::References, _>(|params, _| {
2625 assert_eq!(
2626 params.text_document_position.text_document.uri.as_str(),
2627 "file:///root-1/one.rs"
2628 );
2629 Some(vec![
2630 lsp::Location {
2631 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2632 range: lsp::Range::new(lsp::Position::new(0, 24), lsp::Position::new(0, 27)),
2633 },
2634 lsp::Location {
2635 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2636 range: lsp::Range::new(lsp::Position::new(0, 35), lsp::Position::new(0, 38)),
2637 },
2638 lsp::Location {
2639 uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
2640 range: lsp::Range::new(lsp::Position::new(0, 37), lsp::Position::new(0, 40)),
2641 },
2642 ])
2643 });
2644
2645 let references = references.await.unwrap();
2646 cx_b.read(|cx| {
2647 assert_eq!(references.len(), 3);
2648 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2649
2650 let two_buffer = references[0].buffer.read(cx);
2651 let three_buffer = references[2].buffer.read(cx);
2652 assert_eq!(
2653 two_buffer.file().unwrap().path().as_ref(),
2654 Path::new("two.rs")
2655 );
2656 assert_eq!(references[1].buffer, references[0].buffer);
2657 assert_eq!(
2658 three_buffer.file().unwrap().full_path(cx),
2659 Path::new("three.rs")
2660 );
2661
2662 assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
2663 assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
2664 assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
2665 });
2666 }
2667
2668 #[gpui::test(iterations = 10)]
2669 async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2670 cx_a.foreground().forbid_parking();
2671 let lang_registry = Arc::new(LanguageRegistry::test());
2672 let fs = FakeFs::new(cx_a.background());
2673 fs.insert_tree(
2674 "/root-1",
2675 json!({
2676 ".zed.toml": r#"collaborators = ["user_b"]"#,
2677 "a": "hello world",
2678 "b": "goodnight moon",
2679 "c": "a world of goo",
2680 "d": "world champion of clown world",
2681 }),
2682 )
2683 .await;
2684 fs.insert_tree(
2685 "/root-2",
2686 json!({
2687 "e": "disney world is fun",
2688 }),
2689 )
2690 .await;
2691
2692 // Connect to a server as 2 clients.
2693 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2694 let client_a = server.create_client(cx_a, "user_a").await;
2695 let client_b = server.create_client(cx_b, "user_b").await;
2696
2697 // Share a project as client A
2698 let project_a = cx_a.update(|cx| {
2699 Project::local(
2700 client_a.clone(),
2701 client_a.user_store.clone(),
2702 lang_registry.clone(),
2703 fs.clone(),
2704 cx,
2705 )
2706 });
2707 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2708
2709 let (worktree_1, _) = project_a
2710 .update(cx_a, |p, cx| {
2711 p.find_or_create_local_worktree("/root-1", true, cx)
2712 })
2713 .await
2714 .unwrap();
2715 worktree_1
2716 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2717 .await;
2718 let (worktree_2, _) = project_a
2719 .update(cx_a, |p, cx| {
2720 p.find_or_create_local_worktree("/root-2", true, cx)
2721 })
2722 .await
2723 .unwrap();
2724 worktree_2
2725 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2726 .await;
2727
2728 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2729
2730 // Join the worktree as client B.
2731 let project_b = Project::remote(
2732 project_id,
2733 client_b.clone(),
2734 client_b.user_store.clone(),
2735 lang_registry.clone(),
2736 fs.clone(),
2737 &mut cx_b.to_async(),
2738 )
2739 .await
2740 .unwrap();
2741
2742 let results = project_b
2743 .update(cx_b, |project, cx| {
2744 project.search(SearchQuery::text("world", false, false), cx)
2745 })
2746 .await
2747 .unwrap();
2748
2749 let mut ranges_by_path = results
2750 .into_iter()
2751 .map(|(buffer, ranges)| {
2752 buffer.read_with(cx_b, |buffer, cx| {
2753 let path = buffer.file().unwrap().full_path(cx);
2754 let offset_ranges = ranges
2755 .into_iter()
2756 .map(|range| range.to_offset(buffer))
2757 .collect::<Vec<_>>();
2758 (path, offset_ranges)
2759 })
2760 })
2761 .collect::<Vec<_>>();
2762 ranges_by_path.sort_by_key(|(path, _)| path.clone());
2763
2764 assert_eq!(
2765 ranges_by_path,
2766 &[
2767 (PathBuf::from("root-1/a"), vec![6..11]),
2768 (PathBuf::from("root-1/c"), vec![2..7]),
2769 (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
2770 (PathBuf::from("root-2/e"), vec![7..12]),
2771 ]
2772 );
2773 }
2774
2775 #[gpui::test(iterations = 10)]
2776 async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2777 cx_a.foreground().forbid_parking();
2778 let lang_registry = Arc::new(LanguageRegistry::test());
2779 let fs = FakeFs::new(cx_a.background());
2780 fs.insert_tree(
2781 "/root-1",
2782 json!({
2783 ".zed.toml": r#"collaborators = ["user_b"]"#,
2784 "main.rs": "fn double(number: i32) -> i32 { number + number }",
2785 }),
2786 )
2787 .await;
2788
2789 // Set up a fake language server.
2790 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2791 lang_registry.add(Arc::new(Language::new(
2792 LanguageConfig {
2793 name: "Rust".into(),
2794 path_suffixes: vec!["rs".to_string()],
2795 language_server: Some(language_server_config),
2796 ..Default::default()
2797 },
2798 Some(tree_sitter_rust::language()),
2799 )));
2800
2801 // Connect to a server as 2 clients.
2802 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2803 let client_a = server.create_client(cx_a, "user_a").await;
2804 let client_b = server.create_client(cx_b, "user_b").await;
2805
2806 // Share a project as client A
2807 let project_a = cx_a.update(|cx| {
2808 Project::local(
2809 client_a.clone(),
2810 client_a.user_store.clone(),
2811 lang_registry.clone(),
2812 fs.clone(),
2813 cx,
2814 )
2815 });
2816 let (worktree_a, _) = project_a
2817 .update(cx_a, |p, cx| {
2818 p.find_or_create_local_worktree("/root-1", true, cx)
2819 })
2820 .await
2821 .unwrap();
2822 worktree_a
2823 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2824 .await;
2825 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2826 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2827 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2828
2829 // Join the worktree as client B.
2830 let project_b = Project::remote(
2831 project_id,
2832 client_b.clone(),
2833 client_b.user_store.clone(),
2834 lang_registry.clone(),
2835 fs.clone(),
2836 &mut cx_b.to_async(),
2837 )
2838 .await
2839 .unwrap();
2840
2841 // Open the file on client B.
2842 let buffer_b = cx_b
2843 .background()
2844 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
2845 .await
2846 .unwrap();
2847
2848 // Request document highlights as the guest.
2849 let highlights = project_b.update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx));
2850
2851 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2852 fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _>(
2853 |params, _| {
2854 assert_eq!(
2855 params
2856 .text_document_position_params
2857 .text_document
2858 .uri
2859 .as_str(),
2860 "file:///root-1/main.rs"
2861 );
2862 assert_eq!(
2863 params.text_document_position_params.position,
2864 lsp::Position::new(0, 34)
2865 );
2866 Some(vec![
2867 lsp::DocumentHighlight {
2868 kind: Some(lsp::DocumentHighlightKind::WRITE),
2869 range: lsp::Range::new(
2870 lsp::Position::new(0, 10),
2871 lsp::Position::new(0, 16),
2872 ),
2873 },
2874 lsp::DocumentHighlight {
2875 kind: Some(lsp::DocumentHighlightKind::READ),
2876 range: lsp::Range::new(
2877 lsp::Position::new(0, 32),
2878 lsp::Position::new(0, 38),
2879 ),
2880 },
2881 lsp::DocumentHighlight {
2882 kind: Some(lsp::DocumentHighlightKind::READ),
2883 range: lsp::Range::new(
2884 lsp::Position::new(0, 41),
2885 lsp::Position::new(0, 47),
2886 ),
2887 },
2888 ])
2889 },
2890 );
2891
2892 let highlights = highlights.await.unwrap();
2893 buffer_b.read_with(cx_b, |buffer, _| {
2894 let snapshot = buffer.snapshot();
2895
2896 let highlights = highlights
2897 .into_iter()
2898 .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
2899 .collect::<Vec<_>>();
2900 assert_eq!(
2901 highlights,
2902 &[
2903 (lsp::DocumentHighlightKind::WRITE, 10..16),
2904 (lsp::DocumentHighlightKind::READ, 32..38),
2905 (lsp::DocumentHighlightKind::READ, 41..47)
2906 ]
2907 )
2908 });
2909 }
2910
2911 #[gpui::test(iterations = 10)]
2912 async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2913 cx_a.foreground().forbid_parking();
2914 let mut lang_registry = Arc::new(LanguageRegistry::test());
2915 let fs = FakeFs::new(cx_a.background());
2916 fs.insert_tree(
2917 "/code",
2918 json!({
2919 "crate-1": {
2920 ".zed.toml": r#"collaborators = ["user_b"]"#,
2921 "one.rs": "const ONE: usize = 1;",
2922 },
2923 "crate-2": {
2924 "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
2925 },
2926 "private": {
2927 "passwords.txt": "the-password",
2928 }
2929 }),
2930 )
2931 .await;
2932
2933 // Set up a fake language server.
2934 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2935 Arc::get_mut(&mut lang_registry)
2936 .unwrap()
2937 .add(Arc::new(Language::new(
2938 LanguageConfig {
2939 name: "Rust".into(),
2940 path_suffixes: vec!["rs".to_string()],
2941 language_server: Some(language_server_config),
2942 ..Default::default()
2943 },
2944 Some(tree_sitter_rust::language()),
2945 )));
2946
2947 // Connect to a server as 2 clients.
2948 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2949 let client_a = server.create_client(cx_a, "user_a").await;
2950 let client_b = server.create_client(cx_b, "user_b").await;
2951
2952 // Share a project as client A
2953 let project_a = cx_a.update(|cx| {
2954 Project::local(
2955 client_a.clone(),
2956 client_a.user_store.clone(),
2957 lang_registry.clone(),
2958 fs.clone(),
2959 cx,
2960 )
2961 });
2962 let (worktree_a, _) = project_a
2963 .update(cx_a, |p, cx| {
2964 p.find_or_create_local_worktree("/code/crate-1", true, cx)
2965 })
2966 .await
2967 .unwrap();
2968 worktree_a
2969 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2970 .await;
2971 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2972 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2973 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2974
2975 // Join the worktree as client B.
2976 let project_b = Project::remote(
2977 project_id,
2978 client_b.clone(),
2979 client_b.user_store.clone(),
2980 lang_registry.clone(),
2981 fs.clone(),
2982 &mut cx_b.to_async(),
2983 )
2984 .await
2985 .unwrap();
2986
2987 // Cause the language server to start.
2988 let _buffer = cx_b
2989 .background()
2990 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
2991 .await
2992 .unwrap();
2993
2994 // Request the definition of a symbol as the guest.
2995 let symbols = project_b.update(cx_b, |p, cx| p.symbols("two", cx));
2996 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2997 fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _>(|_, _| {
2998 #[allow(deprecated)]
2999 Some(vec![lsp::SymbolInformation {
3000 name: "TWO".into(),
3001 location: lsp::Location {
3002 uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3003 range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3004 },
3005 kind: lsp::SymbolKind::CONSTANT,
3006 tags: None,
3007 container_name: None,
3008 deprecated: None,
3009 }])
3010 });
3011
3012 let symbols = symbols.await.unwrap();
3013 assert_eq!(symbols.len(), 1);
3014 assert_eq!(symbols[0].name, "TWO");
3015
3016 // Open one of the returned symbols.
3017 let buffer_b_2 = project_b
3018 .update(cx_b, |project, cx| {
3019 project.open_buffer_for_symbol(&symbols[0], cx)
3020 })
3021 .await
3022 .unwrap();
3023 buffer_b_2.read_with(cx_b, |buffer, _| {
3024 assert_eq!(
3025 buffer.file().unwrap().path().as_ref(),
3026 Path::new("../crate-2/two.rs")
3027 );
3028 });
3029
3030 // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3031 let mut fake_symbol = symbols[0].clone();
3032 fake_symbol.path = Path::new("/code/secrets").into();
3033 let error = project_b
3034 .update(cx_b, |project, cx| {
3035 project.open_buffer_for_symbol(&fake_symbol, cx)
3036 })
3037 .await
3038 .unwrap_err();
3039 assert!(error.to_string().contains("invalid symbol signature"));
3040 }
3041
3042 #[gpui::test(iterations = 10)]
3043 async fn test_open_buffer_while_getting_definition_pointing_to_it(
3044 cx_a: &mut TestAppContext,
3045 cx_b: &mut TestAppContext,
3046 mut rng: StdRng,
3047 ) {
3048 cx_a.foreground().forbid_parking();
3049 let mut lang_registry = Arc::new(LanguageRegistry::test());
3050 let fs = FakeFs::new(cx_a.background());
3051 fs.insert_tree(
3052 "/root",
3053 json!({
3054 ".zed.toml": r#"collaborators = ["user_b"]"#,
3055 "a.rs": "const ONE: usize = b::TWO;",
3056 "b.rs": "const TWO: usize = 2",
3057 }),
3058 )
3059 .await;
3060
3061 // Set up a fake language server.
3062 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3063
3064 Arc::get_mut(&mut lang_registry)
3065 .unwrap()
3066 .add(Arc::new(Language::new(
3067 LanguageConfig {
3068 name: "Rust".into(),
3069 path_suffixes: vec!["rs".to_string()],
3070 language_server: Some(language_server_config),
3071 ..Default::default()
3072 },
3073 Some(tree_sitter_rust::language()),
3074 )));
3075
3076 // Connect to a server as 2 clients.
3077 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3078 let client_a = server.create_client(cx_a, "user_a").await;
3079 let client_b = server.create_client(cx_b, "user_b").await;
3080
3081 // Share a project as client A
3082 let project_a = cx_a.update(|cx| {
3083 Project::local(
3084 client_a.clone(),
3085 client_a.user_store.clone(),
3086 lang_registry.clone(),
3087 fs.clone(),
3088 cx,
3089 )
3090 });
3091
3092 let (worktree_a, _) = project_a
3093 .update(cx_a, |p, cx| {
3094 p.find_or_create_local_worktree("/root", true, cx)
3095 })
3096 .await
3097 .unwrap();
3098 worktree_a
3099 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3100 .await;
3101 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3102 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3103 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3104
3105 // Join the worktree as client B.
3106 let project_b = Project::remote(
3107 project_id,
3108 client_b.clone(),
3109 client_b.user_store.clone(),
3110 lang_registry.clone(),
3111 fs.clone(),
3112 &mut cx_b.to_async(),
3113 )
3114 .await
3115 .unwrap();
3116
3117 let buffer_b1 = cx_b
3118 .background()
3119 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3120 .await
3121 .unwrap();
3122
3123 let definitions;
3124 let buffer_b2;
3125 if rng.gen() {
3126 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3127 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3128 } else {
3129 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3130 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3131 }
3132
3133 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3134 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
3135 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
3136 lsp::Url::from_file_path("/root/b.rs").unwrap(),
3137 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3138 )))
3139 });
3140
3141 let buffer_b2 = buffer_b2.await.unwrap();
3142 let definitions = definitions.await.unwrap();
3143 assert_eq!(definitions.len(), 1);
3144 assert_eq!(definitions[0].buffer, buffer_b2);
3145 }
3146
3147 #[gpui::test(iterations = 10)]
3148 async fn test_collaborating_with_code_actions(
3149 cx_a: &mut TestAppContext,
3150 cx_b: &mut TestAppContext,
3151 ) {
3152 cx_a.foreground().forbid_parking();
3153 let mut lang_registry = Arc::new(LanguageRegistry::test());
3154 let fs = FakeFs::new(cx_a.background());
3155 let mut path_openers_b = Vec::new();
3156 cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3157
3158 // Set up a fake language server.
3159 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3160 Arc::get_mut(&mut lang_registry)
3161 .unwrap()
3162 .add(Arc::new(Language::new(
3163 LanguageConfig {
3164 name: "Rust".into(),
3165 path_suffixes: vec!["rs".to_string()],
3166 language_server: Some(language_server_config),
3167 ..Default::default()
3168 },
3169 Some(tree_sitter_rust::language()),
3170 )));
3171
3172 // Connect to a server as 2 clients.
3173 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3174 let client_a = server.create_client(cx_a, "user_a").await;
3175 let client_b = server.create_client(cx_b, "user_b").await;
3176
3177 // Share a project as client A
3178 fs.insert_tree(
3179 "/a",
3180 json!({
3181 ".zed.toml": r#"collaborators = ["user_b"]"#,
3182 "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3183 "other.rs": "pub fn foo() -> usize { 4 }",
3184 }),
3185 )
3186 .await;
3187 let project_a = cx_a.update(|cx| {
3188 Project::local(
3189 client_a.clone(),
3190 client_a.user_store.clone(),
3191 lang_registry.clone(),
3192 fs.clone(),
3193 cx,
3194 )
3195 });
3196 let (worktree_a, _) = project_a
3197 .update(cx_a, |p, cx| {
3198 p.find_or_create_local_worktree("/a", true, cx)
3199 })
3200 .await
3201 .unwrap();
3202 worktree_a
3203 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3204 .await;
3205 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3206 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3207 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3208
3209 // Join the worktree as client B.
3210 let project_b = Project::remote(
3211 project_id,
3212 client_b.clone(),
3213 client_b.user_store.clone(),
3214 lang_registry.clone(),
3215 fs.clone(),
3216 &mut cx_b.to_async(),
3217 )
3218 .await
3219 .unwrap();
3220 let mut params = cx_b.update(WorkspaceParams::test);
3221 params.languages = lang_registry.clone();
3222 params.client = client_b.client.clone();
3223 params.user_store = client_b.user_store.clone();
3224 params.project = project_b;
3225 params.path_openers = path_openers_b.into();
3226
3227 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
3228 let editor_b = workspace_b
3229 .update(cx_b, |workspace, cx| {
3230 workspace.open_path((worktree_id, "main.rs").into(), cx)
3231 })
3232 .await
3233 .unwrap()
3234 .downcast::<Editor>()
3235 .unwrap();
3236
3237 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3238 fake_language_server
3239 .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3240 assert_eq!(
3241 params.text_document.uri,
3242 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3243 );
3244 assert_eq!(params.range.start, lsp::Position::new(0, 0));
3245 assert_eq!(params.range.end, lsp::Position::new(0, 0));
3246 None
3247 })
3248 .next()
3249 .await;
3250
3251 // Move cursor to a location that contains code actions.
3252 editor_b.update(cx_b, |editor, cx| {
3253 editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3254 cx.focus(&editor_b);
3255 });
3256
3257 fake_language_server
3258 .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3259 assert_eq!(
3260 params.text_document.uri,
3261 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3262 );
3263 assert_eq!(params.range.start, lsp::Position::new(1, 31));
3264 assert_eq!(params.range.end, lsp::Position::new(1, 31));
3265
3266 Some(vec![lsp::CodeActionOrCommand::CodeAction(
3267 lsp::CodeAction {
3268 title: "Inline into all callers".to_string(),
3269 edit: Some(lsp::WorkspaceEdit {
3270 changes: Some(
3271 [
3272 (
3273 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3274 vec![lsp::TextEdit::new(
3275 lsp::Range::new(
3276 lsp::Position::new(1, 22),
3277 lsp::Position::new(1, 34),
3278 ),
3279 "4".to_string(),
3280 )],
3281 ),
3282 (
3283 lsp::Url::from_file_path("/a/other.rs").unwrap(),
3284 vec![lsp::TextEdit::new(
3285 lsp::Range::new(
3286 lsp::Position::new(0, 0),
3287 lsp::Position::new(0, 27),
3288 ),
3289 "".to_string(),
3290 )],
3291 ),
3292 ]
3293 .into_iter()
3294 .collect(),
3295 ),
3296 ..Default::default()
3297 }),
3298 data: Some(json!({
3299 "codeActionParams": {
3300 "range": {
3301 "start": {"line": 1, "column": 31},
3302 "end": {"line": 1, "column": 31},
3303 }
3304 }
3305 })),
3306 ..Default::default()
3307 },
3308 )])
3309 })
3310 .next()
3311 .await;
3312
3313 // Toggle code actions and wait for them to display.
3314 editor_b.update(cx_b, |editor, cx| {
3315 editor.toggle_code_actions(&ToggleCodeActions(false), cx);
3316 });
3317 editor_b
3318 .condition(&cx_b, |editor, _| editor.context_menu_visible())
3319 .await;
3320
3321 fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3322
3323 // Confirming the code action will trigger a resolve request.
3324 let confirm_action = workspace_b
3325 .update(cx_b, |workspace, cx| {
3326 Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
3327 })
3328 .unwrap();
3329 fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_, _| {
3330 lsp::CodeAction {
3331 title: "Inline into all callers".to_string(),
3332 edit: Some(lsp::WorkspaceEdit {
3333 changes: Some(
3334 [
3335 (
3336 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3337 vec![lsp::TextEdit::new(
3338 lsp::Range::new(
3339 lsp::Position::new(1, 22),
3340 lsp::Position::new(1, 34),
3341 ),
3342 "4".to_string(),
3343 )],
3344 ),
3345 (
3346 lsp::Url::from_file_path("/a/other.rs").unwrap(),
3347 vec![lsp::TextEdit::new(
3348 lsp::Range::new(
3349 lsp::Position::new(0, 0),
3350 lsp::Position::new(0, 27),
3351 ),
3352 "".to_string(),
3353 )],
3354 ),
3355 ]
3356 .into_iter()
3357 .collect(),
3358 ),
3359 ..Default::default()
3360 }),
3361 ..Default::default()
3362 }
3363 });
3364
3365 // After the action is confirmed, an editor containing both modified files is opened.
3366 confirm_action.await.unwrap();
3367 let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3368 workspace
3369 .active_item(cx)
3370 .unwrap()
3371 .downcast::<Editor>()
3372 .unwrap()
3373 });
3374 code_action_editor.update(cx_b, |editor, cx| {
3375 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3376 editor.undo(&Undo, cx);
3377 assert_eq!(
3378 editor.text(cx),
3379 "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3380 );
3381 editor.redo(&Redo, cx);
3382 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3383 });
3384 }
3385
3386 #[gpui::test(iterations = 10)]
3387 async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3388 cx_a.foreground().forbid_parking();
3389 let mut lang_registry = Arc::new(LanguageRegistry::test());
3390 let fs = FakeFs::new(cx_a.background());
3391 let mut path_openers_b = Vec::new();
3392 cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3393
3394 // Set up a fake language server.
3395 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3396 Arc::get_mut(&mut lang_registry)
3397 .unwrap()
3398 .add(Arc::new(Language::new(
3399 LanguageConfig {
3400 name: "Rust".into(),
3401 path_suffixes: vec!["rs".to_string()],
3402 language_server: Some(language_server_config),
3403 ..Default::default()
3404 },
3405 Some(tree_sitter_rust::language()),
3406 )));
3407
3408 // Connect to a server as 2 clients.
3409 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3410 let client_a = server.create_client(cx_a, "user_a").await;
3411 let client_b = server.create_client(cx_b, "user_b").await;
3412
3413 // Share a project as client A
3414 fs.insert_tree(
3415 "/dir",
3416 json!({
3417 ".zed.toml": r#"collaborators = ["user_b"]"#,
3418 "one.rs": "const ONE: usize = 1;",
3419 "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3420 }),
3421 )
3422 .await;
3423 let project_a = cx_a.update(|cx| {
3424 Project::local(
3425 client_a.clone(),
3426 client_a.user_store.clone(),
3427 lang_registry.clone(),
3428 fs.clone(),
3429 cx,
3430 )
3431 });
3432 let (worktree_a, _) = project_a
3433 .update(cx_a, |p, cx| {
3434 p.find_or_create_local_worktree("/dir", true, cx)
3435 })
3436 .await
3437 .unwrap();
3438 worktree_a
3439 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3440 .await;
3441 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3442 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3443 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3444
3445 // Join the worktree as client B.
3446 let project_b = Project::remote(
3447 project_id,
3448 client_b.clone(),
3449 client_b.user_store.clone(),
3450 lang_registry.clone(),
3451 fs.clone(),
3452 &mut cx_b.to_async(),
3453 )
3454 .await
3455 .unwrap();
3456 let mut params = cx_b.update(WorkspaceParams::test);
3457 params.languages = lang_registry.clone();
3458 params.client = client_b.client.clone();
3459 params.user_store = client_b.user_store.clone();
3460 params.project = project_b;
3461 params.path_openers = path_openers_b.into();
3462
3463 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
3464 let editor_b = workspace_b
3465 .update(cx_b, |workspace, cx| {
3466 workspace.open_path((worktree_id, "one.rs").into(), cx)
3467 })
3468 .await
3469 .unwrap()
3470 .downcast::<Editor>()
3471 .unwrap();
3472 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3473
3474 // Move cursor to a location that can be renamed.
3475 let prepare_rename = editor_b.update(cx_b, |editor, cx| {
3476 editor.select_ranges([7..7], None, cx);
3477 editor.rename(&Rename, cx).unwrap()
3478 });
3479
3480 fake_language_server
3481 .handle_request::<lsp::request::PrepareRenameRequest, _>(|params, _| {
3482 assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3483 assert_eq!(params.position, lsp::Position::new(0, 7));
3484 Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3485 lsp::Position::new(0, 6),
3486 lsp::Position::new(0, 9),
3487 )))
3488 })
3489 .next()
3490 .await
3491 .unwrap();
3492 prepare_rename.await.unwrap();
3493 editor_b.update(cx_b, |editor, cx| {
3494 let rename = editor.pending_rename().unwrap();
3495 let buffer = editor.buffer().read(cx).snapshot(cx);
3496 assert_eq!(
3497 rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3498 6..9
3499 );
3500 rename.editor.update(cx, |rename_editor, cx| {
3501 rename_editor.buffer().update(cx, |rename_buffer, cx| {
3502 rename_buffer.edit([0..3], "THREE", cx);
3503 });
3504 });
3505 });
3506
3507 let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
3508 Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3509 });
3510 fake_language_server
3511 .handle_request::<lsp::request::Rename, _>(|params, _| {
3512 assert_eq!(
3513 params.text_document_position.text_document.uri.as_str(),
3514 "file:///dir/one.rs"
3515 );
3516 assert_eq!(
3517 params.text_document_position.position,
3518 lsp::Position::new(0, 6)
3519 );
3520 assert_eq!(params.new_name, "THREE");
3521 Some(lsp::WorkspaceEdit {
3522 changes: Some(
3523 [
3524 (
3525 lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3526 vec![lsp::TextEdit::new(
3527 lsp::Range::new(
3528 lsp::Position::new(0, 6),
3529 lsp::Position::new(0, 9),
3530 ),
3531 "THREE".to_string(),
3532 )],
3533 ),
3534 (
3535 lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3536 vec![
3537 lsp::TextEdit::new(
3538 lsp::Range::new(
3539 lsp::Position::new(0, 24),
3540 lsp::Position::new(0, 27),
3541 ),
3542 "THREE".to_string(),
3543 ),
3544 lsp::TextEdit::new(
3545 lsp::Range::new(
3546 lsp::Position::new(0, 35),
3547 lsp::Position::new(0, 38),
3548 ),
3549 "THREE".to_string(),
3550 ),
3551 ],
3552 ),
3553 ]
3554 .into_iter()
3555 .collect(),
3556 ),
3557 ..Default::default()
3558 })
3559 })
3560 .next()
3561 .await
3562 .unwrap();
3563 confirm_rename.await.unwrap();
3564
3565 let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3566 workspace
3567 .active_item(cx)
3568 .unwrap()
3569 .downcast::<Editor>()
3570 .unwrap()
3571 });
3572 rename_editor.update(cx_b, |editor, cx| {
3573 assert_eq!(
3574 editor.text(cx),
3575 "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3576 );
3577 editor.undo(&Undo, cx);
3578 assert_eq!(
3579 editor.text(cx),
3580 "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3581 );
3582 editor.redo(&Redo, cx);
3583 assert_eq!(
3584 editor.text(cx),
3585 "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3586 );
3587 });
3588
3589 // Ensure temporary rename edits cannot be undone/redone.
3590 editor_b.update(cx_b, |editor, cx| {
3591 editor.undo(&Undo, cx);
3592 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3593 editor.undo(&Undo, cx);
3594 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3595 editor.redo(&Redo, cx);
3596 assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3597 })
3598 }
3599
3600 #[gpui::test(iterations = 10)]
3601 async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3602 cx_a.foreground().forbid_parking();
3603
3604 // Connect to a server as 2 clients.
3605 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3606 let client_a = server.create_client(cx_a, "user_a").await;
3607 let client_b = server.create_client(cx_b, "user_b").await;
3608
3609 // Create an org that includes these 2 users.
3610 let db = &server.app_state.db;
3611 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3612 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3613 .await
3614 .unwrap();
3615 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3616 .await
3617 .unwrap();
3618
3619 // Create a channel that includes all the users.
3620 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3621 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3622 .await
3623 .unwrap();
3624 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3625 .await
3626 .unwrap();
3627 db.create_channel_message(
3628 channel_id,
3629 client_b.current_user_id(&cx_b),
3630 "hello A, it's B.",
3631 OffsetDateTime::now_utc(),
3632 1,
3633 )
3634 .await
3635 .unwrap();
3636
3637 let channels_a = cx_a
3638 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3639 channels_a
3640 .condition(cx_a, |list, _| list.available_channels().is_some())
3641 .await;
3642 channels_a.read_with(cx_a, |list, _| {
3643 assert_eq!(
3644 list.available_channels().unwrap(),
3645 &[ChannelDetails {
3646 id: channel_id.to_proto(),
3647 name: "test-channel".to_string()
3648 }]
3649 )
3650 });
3651 let channel_a = channels_a.update(cx_a, |this, cx| {
3652 this.get_channel(channel_id.to_proto(), cx).unwrap()
3653 });
3654 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3655 channel_a
3656 .condition(&cx_a, |channel, _| {
3657 channel_messages(channel)
3658 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3659 })
3660 .await;
3661
3662 let channels_b = cx_b
3663 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3664 channels_b
3665 .condition(cx_b, |list, _| list.available_channels().is_some())
3666 .await;
3667 channels_b.read_with(cx_b, |list, _| {
3668 assert_eq!(
3669 list.available_channels().unwrap(),
3670 &[ChannelDetails {
3671 id: channel_id.to_proto(),
3672 name: "test-channel".to_string()
3673 }]
3674 )
3675 });
3676
3677 let channel_b = channels_b.update(cx_b, |this, cx| {
3678 this.get_channel(channel_id.to_proto(), cx).unwrap()
3679 });
3680 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3681 channel_b
3682 .condition(&cx_b, |channel, _| {
3683 channel_messages(channel)
3684 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3685 })
3686 .await;
3687
3688 channel_a
3689 .update(cx_a, |channel, cx| {
3690 channel
3691 .send_message("oh, hi B.".to_string(), cx)
3692 .unwrap()
3693 .detach();
3694 let task = channel.send_message("sup".to_string(), cx).unwrap();
3695 assert_eq!(
3696 channel_messages(channel),
3697 &[
3698 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3699 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3700 ("user_a".to_string(), "sup".to_string(), true)
3701 ]
3702 );
3703 task
3704 })
3705 .await
3706 .unwrap();
3707
3708 channel_b
3709 .condition(&cx_b, |channel, _| {
3710 channel_messages(channel)
3711 == [
3712 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3713 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3714 ("user_a".to_string(), "sup".to_string(), false),
3715 ]
3716 })
3717 .await;
3718
3719 assert_eq!(
3720 server
3721 .state()
3722 .await
3723 .channel(channel_id)
3724 .unwrap()
3725 .connection_ids
3726 .len(),
3727 2
3728 );
3729 cx_b.update(|_| drop(channel_b));
3730 server
3731 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3732 .await;
3733
3734 cx_a.update(|_| drop(channel_a));
3735 server
3736 .condition(|state| state.channel(channel_id).is_none())
3737 .await;
3738 }
3739
3740 #[gpui::test(iterations = 10)]
3741 async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
3742 cx_a.foreground().forbid_parking();
3743
3744 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3745 let client_a = server.create_client(cx_a, "user_a").await;
3746
3747 let db = &server.app_state.db;
3748 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3749 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3750 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3751 .await
3752 .unwrap();
3753 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3754 .await
3755 .unwrap();
3756
3757 let channels_a = cx_a
3758 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3759 channels_a
3760 .condition(cx_a, |list, _| list.available_channels().is_some())
3761 .await;
3762 let channel_a = channels_a.update(cx_a, |this, cx| {
3763 this.get_channel(channel_id.to_proto(), cx).unwrap()
3764 });
3765
3766 // Messages aren't allowed to be too long.
3767 channel_a
3768 .update(cx_a, |channel, cx| {
3769 let long_body = "this is long.\n".repeat(1024);
3770 channel.send_message(long_body, cx).unwrap()
3771 })
3772 .await
3773 .unwrap_err();
3774
3775 // Messages aren't allowed to be blank.
3776 channel_a.update(cx_a, |channel, cx| {
3777 channel.send_message(String::new(), cx).unwrap_err()
3778 });
3779
3780 // Leading and trailing whitespace are trimmed.
3781 channel_a
3782 .update(cx_a, |channel, cx| {
3783 channel
3784 .send_message("\n surrounded by whitespace \n".to_string(), cx)
3785 .unwrap()
3786 })
3787 .await
3788 .unwrap();
3789 assert_eq!(
3790 db.get_channel_messages(channel_id, 10, None)
3791 .await
3792 .unwrap()
3793 .iter()
3794 .map(|m| &m.body)
3795 .collect::<Vec<_>>(),
3796 &["surrounded by whitespace"]
3797 );
3798 }
3799
3800 #[gpui::test(iterations = 10)]
3801 async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3802 cx_a.foreground().forbid_parking();
3803
3804 // Connect to a server as 2 clients.
3805 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3806 let client_a = server.create_client(cx_a, "user_a").await;
3807 let client_b = server.create_client(cx_b, "user_b").await;
3808 let mut status_b = client_b.status();
3809
3810 // Create an org that includes these 2 users.
3811 let db = &server.app_state.db;
3812 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3813 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3814 .await
3815 .unwrap();
3816 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3817 .await
3818 .unwrap();
3819
3820 // Create a channel that includes all the users.
3821 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3822 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3823 .await
3824 .unwrap();
3825 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3826 .await
3827 .unwrap();
3828 db.create_channel_message(
3829 channel_id,
3830 client_b.current_user_id(&cx_b),
3831 "hello A, it's B.",
3832 OffsetDateTime::now_utc(),
3833 2,
3834 )
3835 .await
3836 .unwrap();
3837
3838 let channels_a = cx_a
3839 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3840 channels_a
3841 .condition(cx_a, |list, _| list.available_channels().is_some())
3842 .await;
3843
3844 channels_a.read_with(cx_a, |list, _| {
3845 assert_eq!(
3846 list.available_channels().unwrap(),
3847 &[ChannelDetails {
3848 id: channel_id.to_proto(),
3849 name: "test-channel".to_string()
3850 }]
3851 )
3852 });
3853 let channel_a = channels_a.update(cx_a, |this, cx| {
3854 this.get_channel(channel_id.to_proto(), cx).unwrap()
3855 });
3856 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3857 channel_a
3858 .condition(&cx_a, |channel, _| {
3859 channel_messages(channel)
3860 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3861 })
3862 .await;
3863
3864 let channels_b = cx_b
3865 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3866 channels_b
3867 .condition(cx_b, |list, _| list.available_channels().is_some())
3868 .await;
3869 channels_b.read_with(cx_b, |list, _| {
3870 assert_eq!(
3871 list.available_channels().unwrap(),
3872 &[ChannelDetails {
3873 id: channel_id.to_proto(),
3874 name: "test-channel".to_string()
3875 }]
3876 )
3877 });
3878
3879 let channel_b = channels_b.update(cx_b, |this, cx| {
3880 this.get_channel(channel_id.to_proto(), cx).unwrap()
3881 });
3882 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3883 channel_b
3884 .condition(&cx_b, |channel, _| {
3885 channel_messages(channel)
3886 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3887 })
3888 .await;
3889
3890 // Disconnect client B, ensuring we can still access its cached channel data.
3891 server.forbid_connections();
3892 server.disconnect_client(client_b.current_user_id(&cx_b));
3893 cx_b.foreground().advance_clock(Duration::from_secs(3));
3894 while !matches!(
3895 status_b.next().await,
3896 Some(client::Status::ReconnectionError { .. })
3897 ) {}
3898
3899 channels_b.read_with(cx_b, |channels, _| {
3900 assert_eq!(
3901 channels.available_channels().unwrap(),
3902 [ChannelDetails {
3903 id: channel_id.to_proto(),
3904 name: "test-channel".to_string()
3905 }]
3906 )
3907 });
3908 channel_b.read_with(cx_b, |channel, _| {
3909 assert_eq!(
3910 channel_messages(channel),
3911 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3912 )
3913 });
3914
3915 // Send a message from client B while it is disconnected.
3916 channel_b
3917 .update(cx_b, |channel, cx| {
3918 let task = channel
3919 .send_message("can you see this?".to_string(), cx)
3920 .unwrap();
3921 assert_eq!(
3922 channel_messages(channel),
3923 &[
3924 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3925 ("user_b".to_string(), "can you see this?".to_string(), true)
3926 ]
3927 );
3928 task
3929 })
3930 .await
3931 .unwrap_err();
3932
3933 // Send a message from client A while B is disconnected.
3934 channel_a
3935 .update(cx_a, |channel, cx| {
3936 channel
3937 .send_message("oh, hi B.".to_string(), cx)
3938 .unwrap()
3939 .detach();
3940 let task = channel.send_message("sup".to_string(), cx).unwrap();
3941 assert_eq!(
3942 channel_messages(channel),
3943 &[
3944 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3945 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3946 ("user_a".to_string(), "sup".to_string(), true)
3947 ]
3948 );
3949 task
3950 })
3951 .await
3952 .unwrap();
3953
3954 // Give client B a chance to reconnect.
3955 server.allow_connections();
3956 cx_b.foreground().advance_clock(Duration::from_secs(10));
3957
3958 // Verify that B sees the new messages upon reconnection, as well as the message client B
3959 // sent while offline.
3960 channel_b
3961 .condition(&cx_b, |channel, _| {
3962 channel_messages(channel)
3963 == [
3964 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3965 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3966 ("user_a".to_string(), "sup".to_string(), false),
3967 ("user_b".to_string(), "can you see this?".to_string(), false),
3968 ]
3969 })
3970 .await;
3971
3972 // Ensure client A and B can communicate normally after reconnection.
3973 channel_a
3974 .update(cx_a, |channel, cx| {
3975 channel.send_message("you online?".to_string(), cx).unwrap()
3976 })
3977 .await
3978 .unwrap();
3979 channel_b
3980 .condition(&cx_b, |channel, _| {
3981 channel_messages(channel)
3982 == [
3983 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3984 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3985 ("user_a".to_string(), "sup".to_string(), false),
3986 ("user_b".to_string(), "can you see this?".to_string(), false),
3987 ("user_a".to_string(), "you online?".to_string(), false),
3988 ]
3989 })
3990 .await;
3991
3992 channel_b
3993 .update(cx_b, |channel, cx| {
3994 channel.send_message("yep".to_string(), cx).unwrap()
3995 })
3996 .await
3997 .unwrap();
3998 channel_a
3999 .condition(&cx_a, |channel, _| {
4000 channel_messages(channel)
4001 == [
4002 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4003 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4004 ("user_a".to_string(), "sup".to_string(), false),
4005 ("user_b".to_string(), "can you see this?".to_string(), false),
4006 ("user_a".to_string(), "you online?".to_string(), false),
4007 ("user_b".to_string(), "yep".to_string(), false),
4008 ]
4009 })
4010 .await;
4011 }
4012
4013 #[gpui::test(iterations = 10)]
4014 async fn test_contacts(
4015 cx_a: &mut TestAppContext,
4016 cx_b: &mut TestAppContext,
4017 cx_c: &mut TestAppContext,
4018 ) {
4019 cx_a.foreground().forbid_parking();
4020 let lang_registry = Arc::new(LanguageRegistry::test());
4021 let fs = FakeFs::new(cx_a.background());
4022
4023 // Connect to a server as 3 clients.
4024 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4025 let client_a = server.create_client(cx_a, "user_a").await;
4026 let client_b = server.create_client(cx_b, "user_b").await;
4027 let client_c = server.create_client(cx_c, "user_c").await;
4028
4029 // Share a worktree as client A.
4030 fs.insert_tree(
4031 "/a",
4032 json!({
4033 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4034 }),
4035 )
4036 .await;
4037
4038 let project_a = cx_a.update(|cx| {
4039 Project::local(
4040 client_a.clone(),
4041 client_a.user_store.clone(),
4042 lang_registry.clone(),
4043 fs.clone(),
4044 cx,
4045 )
4046 });
4047 let (worktree_a, _) = project_a
4048 .update(cx_a, |p, cx| {
4049 p.find_or_create_local_worktree("/a", true, cx)
4050 })
4051 .await
4052 .unwrap();
4053 worktree_a
4054 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4055 .await;
4056
4057 client_a
4058 .user_store
4059 .condition(&cx_a, |user_store, _| {
4060 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4061 })
4062 .await;
4063 client_b
4064 .user_store
4065 .condition(&cx_b, |user_store, _| {
4066 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4067 })
4068 .await;
4069 client_c
4070 .user_store
4071 .condition(&cx_c, |user_store, _| {
4072 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4073 })
4074 .await;
4075
4076 let project_id = project_a
4077 .update(cx_a, |project, _| project.next_remote_id())
4078 .await;
4079 project_a
4080 .update(cx_a, |project, cx| project.share(cx))
4081 .await
4082 .unwrap();
4083
4084 let _project_b = Project::remote(
4085 project_id,
4086 client_b.clone(),
4087 client_b.user_store.clone(),
4088 lang_registry.clone(),
4089 fs.clone(),
4090 &mut cx_b.to_async(),
4091 )
4092 .await
4093 .unwrap();
4094
4095 client_a
4096 .user_store
4097 .condition(&cx_a, |user_store, _| {
4098 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4099 })
4100 .await;
4101 client_b
4102 .user_store
4103 .condition(&cx_b, |user_store, _| {
4104 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4105 })
4106 .await;
4107 client_c
4108 .user_store
4109 .condition(&cx_c, |user_store, _| {
4110 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4111 })
4112 .await;
4113
4114 project_a
4115 .condition(&cx_a, |project, _| {
4116 project.collaborators().contains_key(&client_b.peer_id)
4117 })
4118 .await;
4119
4120 cx_a.update(move |_| drop(project_a));
4121 client_a
4122 .user_store
4123 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4124 .await;
4125 client_b
4126 .user_store
4127 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4128 .await;
4129 client_c
4130 .user_store
4131 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4132 .await;
4133
4134 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
4135 user_store
4136 .contacts()
4137 .iter()
4138 .map(|contact| {
4139 let worktrees = contact
4140 .projects
4141 .iter()
4142 .map(|p| {
4143 (
4144 p.worktree_root_names[0].as_str(),
4145 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4146 )
4147 })
4148 .collect();
4149 (contact.user.github_login.as_str(), worktrees)
4150 })
4151 .collect()
4152 }
4153 }
4154
4155 #[gpui::test(iterations = 100)]
4156 async fn test_random_collaboration(cx: &mut TestAppContext, rng: StdRng) {
4157 cx.foreground().forbid_parking();
4158 let max_peers = env::var("MAX_PEERS")
4159 .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
4160 .unwrap_or(5);
4161 let max_operations = env::var("OPERATIONS")
4162 .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
4163 .unwrap_or(10);
4164
4165 let rng = Arc::new(Mutex::new(rng));
4166
4167 let guest_lang_registry = Arc::new(LanguageRegistry::test());
4168 let (language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
4169
4170 let fs = FakeFs::new(cx.background());
4171 fs.insert_tree(
4172 "/_collab",
4173 json!({
4174 ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
4175 }),
4176 )
4177 .await;
4178
4179 let operations = Rc::new(Cell::new(0));
4180 let mut server = TestServer::start(cx.foreground(), cx.background()).await;
4181 let mut clients = Vec::new();
4182
4183 let mut next_entity_id = 100000;
4184 let mut host_cx = TestAppContext::new(
4185 cx.foreground_platform(),
4186 cx.platform(),
4187 cx.foreground(),
4188 cx.background(),
4189 cx.font_cache(),
4190 cx.leak_detector(),
4191 next_entity_id,
4192 );
4193 let host = server.create_client(&mut host_cx, "host").await;
4194 let host_project = host_cx.update(|cx| {
4195 Project::local(
4196 host.client.clone(),
4197 host.user_store.clone(),
4198 Arc::new(LanguageRegistry::test()),
4199 fs.clone(),
4200 cx,
4201 )
4202 });
4203 let host_project_id = host_project
4204 .update(&mut host_cx, |p, _| p.next_remote_id())
4205 .await;
4206
4207 let (collab_worktree, _) = host_project
4208 .update(&mut host_cx, |project, cx| {
4209 project.find_or_create_local_worktree("/_collab", true, cx)
4210 })
4211 .await
4212 .unwrap();
4213 collab_worktree
4214 .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
4215 .await;
4216 host_project
4217 .update(&mut host_cx, |project, cx| project.share(cx))
4218 .await
4219 .unwrap();
4220
4221 clients.push(cx.foreground().spawn(host.simulate_host(
4222 host_project,
4223 language_server_config,
4224 operations.clone(),
4225 max_operations,
4226 rng.clone(),
4227 host_cx,
4228 )));
4229
4230 while operations.get() < max_operations {
4231 cx.background().simulate_random_delay().await;
4232 if clients.len() >= max_peers {
4233 break;
4234 } else if rng.lock().gen_bool(0.05) {
4235 operations.set(operations.get() + 1);
4236
4237 let guest_id = clients.len();
4238 log::info!("Adding guest {}", guest_id);
4239 next_entity_id += 100000;
4240 let mut guest_cx = TestAppContext::new(
4241 cx.foreground_platform(),
4242 cx.platform(),
4243 cx.foreground(),
4244 cx.background(),
4245 cx.font_cache(),
4246 cx.leak_detector(),
4247 next_entity_id,
4248 );
4249 let guest = server
4250 .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
4251 .await;
4252 let guest_project = Project::remote(
4253 host_project_id,
4254 guest.client.clone(),
4255 guest.user_store.clone(),
4256 guest_lang_registry.clone(),
4257 FakeFs::new(cx.background()),
4258 &mut guest_cx.to_async(),
4259 )
4260 .await
4261 .unwrap();
4262 clients.push(cx.foreground().spawn(guest.simulate_guest(
4263 guest_id,
4264 guest_project,
4265 operations.clone(),
4266 max_operations,
4267 rng.clone(),
4268 guest_cx,
4269 )));
4270
4271 log::info!("Guest {} added", guest_id);
4272 }
4273 }
4274
4275 let mut clients = futures::future::join_all(clients).await;
4276 cx.foreground().run_until_parked();
4277
4278 let (host_client, mut host_cx) = clients.remove(0);
4279 let host_project = host_client.project.as_ref().unwrap();
4280 let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
4281 project
4282 .worktrees(cx)
4283 .map(|worktree| {
4284 let snapshot = worktree.read(cx).snapshot();
4285 (snapshot.id(), snapshot)
4286 })
4287 .collect::<BTreeMap<_, _>>()
4288 });
4289
4290 host_client
4291 .project
4292 .as_ref()
4293 .unwrap()
4294 .read_with(&host_cx, |project, cx| project.check_invariants(cx));
4295
4296 for (guest_client, mut guest_cx) in clients.into_iter() {
4297 let guest_id = guest_client.client.id();
4298 let worktree_snapshots =
4299 guest_client
4300 .project
4301 .as_ref()
4302 .unwrap()
4303 .read_with(&guest_cx, |project, cx| {
4304 project
4305 .worktrees(cx)
4306 .map(|worktree| {
4307 let worktree = worktree.read(cx);
4308 (worktree.id(), worktree.snapshot())
4309 })
4310 .collect::<BTreeMap<_, _>>()
4311 });
4312
4313 assert_eq!(
4314 worktree_snapshots.keys().collect::<Vec<_>>(),
4315 host_worktree_snapshots.keys().collect::<Vec<_>>(),
4316 "guest {} has different worktrees than the host",
4317 guest_id
4318 );
4319 for (id, host_snapshot) in &host_worktree_snapshots {
4320 let guest_snapshot = &worktree_snapshots[id];
4321 assert_eq!(
4322 guest_snapshot.root_name(),
4323 host_snapshot.root_name(),
4324 "guest {} has different root name than the host for worktree {}",
4325 guest_id,
4326 id
4327 );
4328 assert_eq!(
4329 guest_snapshot.entries(false).collect::<Vec<_>>(),
4330 host_snapshot.entries(false).collect::<Vec<_>>(),
4331 "guest {} has different snapshot than the host for worktree {}",
4332 guest_id,
4333 id
4334 );
4335 }
4336
4337 guest_client
4338 .project
4339 .as_ref()
4340 .unwrap()
4341 .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
4342
4343 for guest_buffer in &guest_client.buffers {
4344 let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
4345 let host_buffer = host_project.read_with(&host_cx, |project, cx| {
4346 project.buffer_for_id(buffer_id, cx).expect(&format!(
4347 "host does not have buffer for guest:{}, peer:{}, id:{}",
4348 guest_id, guest_client.peer_id, buffer_id
4349 ))
4350 });
4351 let path = host_buffer
4352 .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
4353
4354 assert_eq!(
4355 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
4356 0,
4357 "guest {}, buffer {}, path {:?} has deferred operations",
4358 guest_id,
4359 buffer_id,
4360 path,
4361 );
4362 assert_eq!(
4363 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
4364 host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
4365 "guest {}, buffer {}, path {:?}, differs from the host's buffer",
4366 guest_id,
4367 buffer_id,
4368 path
4369 );
4370 }
4371
4372 guest_cx.update(|_| drop(guest_client));
4373 }
4374
4375 host_cx.update(|_| drop(host_client));
4376 }
4377
4378 struct TestServer {
4379 peer: Arc<Peer>,
4380 app_state: Arc<AppState>,
4381 server: Arc<Server>,
4382 foreground: Rc<executor::Foreground>,
4383 notifications: mpsc::UnboundedReceiver<()>,
4384 connection_killers: Arc<Mutex<HashMap<UserId, barrier::Sender>>>,
4385 forbid_connections: Arc<AtomicBool>,
4386 _test_db: TestDb,
4387 }
4388
4389 impl TestServer {
4390 async fn start(
4391 foreground: Rc<executor::Foreground>,
4392 background: Arc<executor::Background>,
4393 ) -> Self {
4394 let test_db = TestDb::fake(background);
4395 let app_state = Self::build_app_state(&test_db).await;
4396 let peer = Peer::new();
4397 let notifications = mpsc::unbounded();
4398 let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
4399 Self {
4400 peer,
4401 app_state,
4402 server,
4403 foreground,
4404 notifications: notifications.1,
4405 connection_killers: Default::default(),
4406 forbid_connections: Default::default(),
4407 _test_db: test_db,
4408 }
4409 }
4410
4411 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
4412 let http = FakeHttpClient::with_404_response();
4413 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
4414 let client_name = name.to_string();
4415 let mut client = Client::new(http.clone());
4416 let server = self.server.clone();
4417 let connection_killers = self.connection_killers.clone();
4418 let forbid_connections = self.forbid_connections.clone();
4419 let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
4420
4421 Arc::get_mut(&mut client)
4422 .unwrap()
4423 .override_authenticate(move |cx| {
4424 cx.spawn(|_| async move {
4425 let access_token = "the-token".to_string();
4426 Ok(Credentials {
4427 user_id: user_id.0 as u64,
4428 access_token,
4429 })
4430 })
4431 })
4432 .override_establish_connection(move |credentials, cx| {
4433 assert_eq!(credentials.user_id, user_id.0 as u64);
4434 assert_eq!(credentials.access_token, "the-token");
4435
4436 let server = server.clone();
4437 let connection_killers = connection_killers.clone();
4438 let forbid_connections = forbid_connections.clone();
4439 let client_name = client_name.clone();
4440 let connection_id_tx = connection_id_tx.clone();
4441 cx.spawn(move |cx| async move {
4442 if forbid_connections.load(SeqCst) {
4443 Err(EstablishConnectionError::other(anyhow!(
4444 "server is forbidding connections"
4445 )))
4446 } else {
4447 let (client_conn, server_conn, kill_conn) =
4448 Connection::in_memory(cx.background());
4449 connection_killers.lock().insert(user_id, kill_conn);
4450 cx.background()
4451 .spawn(server.handle_connection(
4452 server_conn,
4453 client_name,
4454 user_id,
4455 Some(connection_id_tx),
4456 cx.background(),
4457 ))
4458 .detach();
4459 Ok(client_conn)
4460 }
4461 })
4462 });
4463
4464 client
4465 .authenticate_and_connect(&cx.to_async())
4466 .await
4467 .unwrap();
4468
4469 Channel::init(&client);
4470 Project::init(&client);
4471
4472 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
4473 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
4474
4475 let client = TestClient {
4476 client,
4477 peer_id,
4478 user_store,
4479 project: Default::default(),
4480 buffers: Default::default(),
4481 };
4482 client.wait_for_current_user(cx).await;
4483 client
4484 }
4485
4486 fn disconnect_client(&self, user_id: UserId) {
4487 self.connection_killers.lock().remove(&user_id);
4488 }
4489
4490 fn forbid_connections(&self) {
4491 self.forbid_connections.store(true, SeqCst);
4492 }
4493
4494 fn allow_connections(&self) {
4495 self.forbid_connections.store(false, SeqCst);
4496 }
4497
4498 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
4499 let mut config = Config::default();
4500 config.session_secret = "a".repeat(32);
4501 config.database_url = test_db.url.clone();
4502 let github_client = github::AppClient::test();
4503 Arc::new(AppState {
4504 db: test_db.db().clone(),
4505 handlebars: Default::default(),
4506 auth_client: auth::build_client("", ""),
4507 repo_client: github::RepoClient::test(&github_client),
4508 github_client,
4509 config,
4510 })
4511 }
4512
4513 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
4514 self.server.store.read()
4515 }
4516
4517 async fn condition<F>(&mut self, mut predicate: F)
4518 where
4519 F: FnMut(&Store) -> bool,
4520 {
4521 async_std::future::timeout(Duration::from_millis(500), async {
4522 while !(predicate)(&*self.server.store.read()) {
4523 self.foreground.start_waiting();
4524 self.notifications.next().await;
4525 self.foreground.finish_waiting();
4526 }
4527 })
4528 .await
4529 .expect("condition timed out");
4530 }
4531 }
4532
4533 impl Drop for TestServer {
4534 fn drop(&mut self) {
4535 self.peer.reset();
4536 }
4537 }
4538
4539 struct TestClient {
4540 client: Arc<Client>,
4541 pub peer_id: PeerId,
4542 pub user_store: ModelHandle<UserStore>,
4543 project: Option<ModelHandle<Project>>,
4544 buffers: HashSet<ModelHandle<language::Buffer>>,
4545 }
4546
4547 impl Deref for TestClient {
4548 type Target = Arc<Client>;
4549
4550 fn deref(&self) -> &Self::Target {
4551 &self.client
4552 }
4553 }
4554
4555 impl TestClient {
4556 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
4557 UserId::from_proto(
4558 self.user_store
4559 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
4560 )
4561 }
4562
4563 async fn wait_for_current_user(&self, cx: &TestAppContext) {
4564 let mut authed_user = self
4565 .user_store
4566 .read_with(cx, |user_store, _| user_store.watch_current_user());
4567 while authed_user.next().await.unwrap().is_none() {}
4568 }
4569
4570 fn simulate_host(
4571 mut self,
4572 project: ModelHandle<Project>,
4573 mut language_server_config: LanguageServerConfig,
4574 operations: Rc<Cell<usize>>,
4575 max_operations: usize,
4576 rng: Arc<Mutex<StdRng>>,
4577 mut cx: TestAppContext,
4578 ) -> impl Future<Output = (Self, TestAppContext)> {
4579 let files: Arc<Mutex<Vec<PathBuf>>> = Default::default();
4580
4581 // Set up a fake language server.
4582 language_server_config.set_fake_initializer({
4583 let rng = rng.clone();
4584 let files = files.clone();
4585 let project = project.downgrade();
4586 move |fake_server| {
4587 fake_server.handle_request::<lsp::request::Completion, _>(|_, _| {
4588 Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
4589 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
4590 range: lsp::Range::new(
4591 lsp::Position::new(0, 0),
4592 lsp::Position::new(0, 0),
4593 ),
4594 new_text: "the-new-text".to_string(),
4595 })),
4596 ..Default::default()
4597 }]))
4598 });
4599
4600 fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_, _| {
4601 Some(vec![lsp::CodeActionOrCommand::CodeAction(
4602 lsp::CodeAction {
4603 title: "the-code-action".to_string(),
4604 ..Default::default()
4605 },
4606 )])
4607 });
4608
4609 fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(
4610 |params, _| {
4611 Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4612 params.position,
4613 params.position,
4614 )))
4615 },
4616 );
4617
4618 fake_server.handle_request::<lsp::request::GotoDefinition, _>({
4619 let files = files.clone();
4620 let rng = rng.clone();
4621 move |_, _| {
4622 let files = files.lock();
4623 let mut rng = rng.lock();
4624 let count = rng.gen_range::<usize, _>(1..3);
4625 let files = (0..count)
4626 .map(|_| files.choose(&mut *rng).unwrap())
4627 .collect::<Vec<_>>();
4628 log::info!("LSP: Returning definitions in files {:?}", &files);
4629 Some(lsp::GotoDefinitionResponse::Array(
4630 files
4631 .into_iter()
4632 .map(|file| lsp::Location {
4633 uri: lsp::Url::from_file_path(file).unwrap(),
4634 range: Default::default(),
4635 })
4636 .collect(),
4637 ))
4638 }
4639 });
4640
4641 fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _>({
4642 let rng = rng.clone();
4643 let project = project.clone();
4644 move |params, mut cx| {
4645 if let Some(project) = project.upgrade(&cx) {
4646 project.update(&mut cx, |project, cx| {
4647 let path = params
4648 .text_document_position_params
4649 .text_document
4650 .uri
4651 .to_file_path()
4652 .unwrap();
4653 let (worktree, relative_path) =
4654 project.find_local_worktree(&path, cx)?;
4655 let project_path =
4656 ProjectPath::from((worktree.read(cx).id(), relative_path));
4657 let buffer =
4658 project.get_open_buffer(&project_path, cx)?.read(cx);
4659
4660 let mut highlights = Vec::new();
4661 let highlight_count = rng.lock().gen_range(1..=5);
4662 let mut prev_end = 0;
4663 for _ in 0..highlight_count {
4664 let range =
4665 buffer.random_byte_range(prev_end, &mut *rng.lock());
4666 let start = buffer
4667 .offset_to_point_utf16(range.start)
4668 .to_lsp_position();
4669 let end = buffer
4670 .offset_to_point_utf16(range.end)
4671 .to_lsp_position();
4672 highlights.push(lsp::DocumentHighlight {
4673 range: lsp::Range::new(start, end),
4674 kind: Some(lsp::DocumentHighlightKind::READ),
4675 });
4676 prev_end = range.end;
4677 }
4678 Some(highlights)
4679 })
4680 } else {
4681 None
4682 }
4683 }
4684 });
4685 }
4686 });
4687
4688 project.update(&mut cx, |project, _| {
4689 project.languages().add(Arc::new(Language::new(
4690 LanguageConfig {
4691 name: "Rust".into(),
4692 path_suffixes: vec!["rs".to_string()],
4693 language_server: Some(language_server_config),
4694 ..Default::default()
4695 },
4696 None,
4697 )));
4698 });
4699
4700 async move {
4701 let fs = project.read_with(&cx, |project, _| project.fs().clone());
4702 while operations.get() < max_operations {
4703 operations.set(operations.get() + 1);
4704
4705 let distribution = rng.lock().gen_range::<usize, _>(0..100);
4706 match distribution {
4707 0..=20 if !files.lock().is_empty() => {
4708 let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4709 let mut path = path.as_path();
4710 while let Some(parent_path) = path.parent() {
4711 path = parent_path;
4712 if rng.lock().gen() {
4713 break;
4714 }
4715 }
4716
4717 log::info!("Host: find/create local worktree {:?}", path);
4718 let find_or_create_worktree = project.update(&mut cx, |project, cx| {
4719 project.find_or_create_local_worktree(path, true, cx)
4720 });
4721 let find_or_create_worktree = async move {
4722 find_or_create_worktree.await.unwrap();
4723 };
4724 if rng.lock().gen() {
4725 cx.background().spawn(find_or_create_worktree).detach();
4726 } else {
4727 find_or_create_worktree.await;
4728 }
4729 }
4730 10..=80 if !files.lock().is_empty() => {
4731 let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4732 let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4733 let (worktree, path) = project
4734 .update(&mut cx, |project, cx| {
4735 project.find_or_create_local_worktree(
4736 file.clone(),
4737 true,
4738 cx,
4739 )
4740 })
4741 .await
4742 .unwrap();
4743 let project_path =
4744 worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4745 log::info!(
4746 "Host: opening path {:?}, worktree {}, relative_path {:?}",
4747 file,
4748 project_path.0,
4749 project_path.1
4750 );
4751 let buffer = project
4752 .update(&mut cx, |project, cx| {
4753 project.open_buffer(project_path, cx)
4754 })
4755 .await
4756 .unwrap();
4757 self.buffers.insert(buffer.clone());
4758 buffer
4759 } else {
4760 self.buffers
4761 .iter()
4762 .choose(&mut *rng.lock())
4763 .unwrap()
4764 .clone()
4765 };
4766
4767 if rng.lock().gen_bool(0.1) {
4768 cx.update(|cx| {
4769 log::info!(
4770 "Host: dropping buffer {:?}",
4771 buffer.read(cx).file().unwrap().full_path(cx)
4772 );
4773 self.buffers.remove(&buffer);
4774 drop(buffer);
4775 });
4776 } else {
4777 buffer.update(&mut cx, |buffer, cx| {
4778 log::info!(
4779 "Host: updating buffer {:?} ({})",
4780 buffer.file().unwrap().full_path(cx),
4781 buffer.remote_id()
4782 );
4783 buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4784 });
4785 }
4786 }
4787 _ => loop {
4788 let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
4789 let mut path = PathBuf::new();
4790 path.push("/");
4791 for _ in 0..path_component_count {
4792 let letter = rng.lock().gen_range(b'a'..=b'z');
4793 path.push(std::str::from_utf8(&[letter]).unwrap());
4794 }
4795 path.set_extension("rs");
4796 let parent_path = path.parent().unwrap();
4797
4798 log::info!("Host: creating file {:?}", path,);
4799
4800 if fs.create_dir(&parent_path).await.is_ok()
4801 && fs.create_file(&path, Default::default()).await.is_ok()
4802 {
4803 files.lock().push(path);
4804 break;
4805 } else {
4806 log::info!("Host: cannot create file");
4807 }
4808 },
4809 }
4810
4811 cx.background().simulate_random_delay().await;
4812 }
4813
4814 log::info!("Host done");
4815
4816 self.project = Some(project);
4817 (self, cx)
4818 }
4819 }
4820
4821 pub async fn simulate_guest(
4822 mut self,
4823 guest_id: usize,
4824 project: ModelHandle<Project>,
4825 operations: Rc<Cell<usize>>,
4826 max_operations: usize,
4827 rng: Arc<Mutex<StdRng>>,
4828 mut cx: TestAppContext,
4829 ) -> (Self, TestAppContext) {
4830 while operations.get() < max_operations {
4831 let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4832 let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4833 project
4834 .worktrees(&cx)
4835 .filter(|worktree| {
4836 let worktree = worktree.read(cx);
4837 worktree.is_visible()
4838 && worktree.entries(false).any(|e| e.is_file())
4839 })
4840 .choose(&mut *rng.lock())
4841 }) {
4842 worktree
4843 } else {
4844 cx.background().simulate_random_delay().await;
4845 continue;
4846 };
4847
4848 operations.set(operations.get() + 1);
4849 let (worktree_root_name, project_path) =
4850 worktree.read_with(&cx, |worktree, _| {
4851 let entry = worktree
4852 .entries(false)
4853 .filter(|e| e.is_file())
4854 .choose(&mut *rng.lock())
4855 .unwrap();
4856 (
4857 worktree.root_name().to_string(),
4858 (worktree.id(), entry.path.clone()),
4859 )
4860 });
4861 log::info!(
4862 "Guest {}: opening path {:?} in worktree {} ({})",
4863 guest_id,
4864 project_path.1,
4865 project_path.0,
4866 worktree_root_name,
4867 );
4868 let buffer = project
4869 .update(&mut cx, |project, cx| {
4870 project.open_buffer(project_path.clone(), cx)
4871 })
4872 .await
4873 .unwrap();
4874 log::info!(
4875 "Guest {}: opened path {:?} in worktree {} ({}) with buffer id {}",
4876 guest_id,
4877 project_path.1,
4878 project_path.0,
4879 worktree_root_name,
4880 buffer.read_with(&cx, |buffer, _| buffer.remote_id())
4881 );
4882 self.buffers.insert(buffer.clone());
4883 buffer
4884 } else {
4885 operations.set(operations.get() + 1);
4886
4887 self.buffers
4888 .iter()
4889 .choose(&mut *rng.lock())
4890 .unwrap()
4891 .clone()
4892 };
4893
4894 let choice = rng.lock().gen_range(0..100);
4895 match choice {
4896 0..=9 => {
4897 cx.update(|cx| {
4898 log::info!(
4899 "Guest {}: dropping buffer {:?}",
4900 guest_id,
4901 buffer.read(cx).file().unwrap().full_path(cx)
4902 );
4903 self.buffers.remove(&buffer);
4904 drop(buffer);
4905 });
4906 }
4907 10..=19 => {
4908 let completions = project.update(&mut cx, |project, cx| {
4909 log::info!(
4910 "Guest {}: requesting completions for buffer {} ({:?})",
4911 guest_id,
4912 buffer.read(cx).remote_id(),
4913 buffer.read(cx).file().unwrap().full_path(cx)
4914 );
4915 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4916 project.completions(&buffer, offset, cx)
4917 });
4918 let completions = cx.background().spawn(async move {
4919 completions.await.expect("completions request failed");
4920 });
4921 if rng.lock().gen_bool(0.3) {
4922 log::info!("Guest {}: detaching completions request", guest_id);
4923 completions.detach();
4924 } else {
4925 completions.await;
4926 }
4927 }
4928 20..=29 => {
4929 let code_actions = project.update(&mut cx, |project, cx| {
4930 log::info!(
4931 "Guest {}: requesting code actions for buffer {} ({:?})",
4932 guest_id,
4933 buffer.read(cx).remote_id(),
4934 buffer.read(cx).file().unwrap().full_path(cx)
4935 );
4936 let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
4937 project.code_actions(&buffer, range, cx)
4938 });
4939 let code_actions = cx.background().spawn(async move {
4940 code_actions.await.expect("code actions request failed");
4941 });
4942 if rng.lock().gen_bool(0.3) {
4943 log::info!("Guest {}: detaching code actions request", guest_id);
4944 code_actions.detach();
4945 } else {
4946 code_actions.await;
4947 }
4948 }
4949 30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
4950 let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
4951 log::info!(
4952 "Guest {}: saving buffer {} ({:?})",
4953 guest_id,
4954 buffer.remote_id(),
4955 buffer.file().unwrap().full_path(cx)
4956 );
4957 (buffer.version(), buffer.save(cx))
4958 });
4959 let save = cx.background().spawn(async move {
4960 let (saved_version, _) = save.await.expect("save request failed");
4961 assert!(saved_version.observed_all(&requested_version));
4962 });
4963 if rng.lock().gen_bool(0.3) {
4964 log::info!("Guest {}: detaching save request", guest_id);
4965 save.detach();
4966 } else {
4967 save.await;
4968 }
4969 }
4970 40..=44 => {
4971 let prepare_rename = project.update(&mut cx, |project, cx| {
4972 log::info!(
4973 "Guest {}: preparing rename for buffer {} ({:?})",
4974 guest_id,
4975 buffer.read(cx).remote_id(),
4976 buffer.read(cx).file().unwrap().full_path(cx)
4977 );
4978 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4979 project.prepare_rename(buffer, offset, cx)
4980 });
4981 let prepare_rename = cx.background().spawn(async move {
4982 prepare_rename.await.expect("prepare rename request failed");
4983 });
4984 if rng.lock().gen_bool(0.3) {
4985 log::info!("Guest {}: detaching prepare rename request", guest_id);
4986 prepare_rename.detach();
4987 } else {
4988 prepare_rename.await;
4989 }
4990 }
4991 45..=49 => {
4992 let definitions = project.update(&mut cx, |project, cx| {
4993 log::info!(
4994 "Guest {}: requesting definitions for buffer {} ({:?})",
4995 guest_id,
4996 buffer.read(cx).remote_id(),
4997 buffer.read(cx).file().unwrap().full_path(cx)
4998 );
4999 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5000 project.definition(&buffer, offset, cx)
5001 });
5002 let definitions = cx.background().spawn(async move {
5003 definitions.await.expect("definitions request failed")
5004 });
5005 if rng.lock().gen_bool(0.3) {
5006 log::info!("Guest {}: detaching definitions request", guest_id);
5007 definitions.detach();
5008 } else {
5009 self.buffers
5010 .extend(definitions.await.into_iter().map(|loc| loc.buffer));
5011 }
5012 }
5013 50..=54 => {
5014 let highlights = project.update(&mut cx, |project, cx| {
5015 log::info!(
5016 "Guest {}: requesting highlights for buffer {} ({:?})",
5017 guest_id,
5018 buffer.read(cx).remote_id(),
5019 buffer.read(cx).file().unwrap().full_path(cx)
5020 );
5021 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5022 project.document_highlights(&buffer, offset, cx)
5023 });
5024 let highlights = cx.background().spawn(async move {
5025 highlights.await.expect("highlights request failed");
5026 });
5027 if rng.lock().gen_bool(0.3) {
5028 log::info!("Guest {}: detaching highlights request", guest_id);
5029 highlights.detach();
5030 } else {
5031 highlights.await;
5032 }
5033 }
5034 55..=59 => {
5035 let search = project.update(&mut cx, |project, cx| {
5036 let query = rng.lock().gen_range('a'..='z');
5037 log::info!("Guest {}: project-wide search {:?}", guest_id, query);
5038 project.search(SearchQuery::text(query, false, false), cx)
5039 });
5040 let search = cx
5041 .background()
5042 .spawn(async move { search.await.expect("search request failed") });
5043 if rng.lock().gen_bool(0.3) {
5044 log::info!("Guest {}: detaching search request", guest_id);
5045 search.detach();
5046 } else {
5047 self.buffers.extend(search.await.into_keys());
5048 }
5049 }
5050 _ => {
5051 buffer.update(&mut cx, |buffer, cx| {
5052 log::info!(
5053 "Guest {}: updating buffer {} ({:?})",
5054 guest_id,
5055 buffer.remote_id(),
5056 buffer.file().unwrap().full_path(cx)
5057 );
5058 buffer.randomly_edit(&mut *rng.lock(), 5, cx)
5059 });
5060 }
5061 }
5062 cx.background().simulate_random_delay().await;
5063 }
5064
5065 log::info!("Guest {} done", guest_id);
5066
5067 self.project = Some(project);
5068 (self, cx)
5069 }
5070 }
5071
5072 impl Drop for TestClient {
5073 fn drop(&mut self) {
5074 self.client.tear_down();
5075 }
5076 }
5077
5078 impl Executor for Arc<gpui::executor::Background> {
5079 type Timer = gpui::executor::Timer;
5080
5081 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
5082 self.spawn(future).detach();
5083 }
5084
5085 fn timer(&self, duration: Duration) -> Self::Timer {
5086 self.as_ref().timer(duration)
5087 }
5088 }
5089
5090 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
5091 channel
5092 .messages()
5093 .cursor::<()>()
5094 .map(|m| {
5095 (
5096 m.sender.github_login.clone(),
5097 m.body.clone(),
5098 m.is_pending(),
5099 )
5100 })
5101 .collect()
5102 }
5103
5104 struct EmptyView;
5105
5106 impl gpui::Entity for EmptyView {
5107 type Event = ();
5108 }
5109
5110 impl gpui::View for EmptyView {
5111 fn ui_name() -> &'static str {
5112 "empty view"
5113 }
5114
5115 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
5116 gpui::Element::boxed(gpui::elements::Empty)
5117 }
5118 }
5119}