1mod store;
2
3use super::{
4 auth::process_auth_header,
5 db::{ChannelId, MessageId, UserId},
6 AppState,
7};
8use anyhow::anyhow;
9use async_io::Timer;
10use async_std::task;
11use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
12use collections::{HashMap, HashSet};
13use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
14use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
15use rpc::{
16 proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
17 Connection, ConnectionId, Peer, TypedEnvelope,
18};
19use sha1::{Digest as _, Sha1};
20use std::{
21 any::TypeId,
22 future::Future,
23 sync::Arc,
24 time::{Duration, Instant},
25};
26use store::{Store, Worktree};
27use surf::StatusCode;
28use tide::log;
29use tide::{
30 http::headers::{HeaderName, CONNECTION, UPGRADE},
31 Request, Response,
32};
33use time::OffsetDateTime;
34
35type MessageHandler = Box<
36 dyn Send
37 + Sync
38 + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
39>;
40
41pub struct Server {
42 peer: Arc<Peer>,
43 store: RwLock<Store>,
44 app_state: Arc<AppState>,
45 handlers: HashMap<TypeId, MessageHandler>,
46 notifications: Option<mpsc::UnboundedSender<()>>,
47}
48
49pub trait Executor: Send + Clone {
50 type Timer: Send + Future;
51 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
52 fn timer(&self, duration: Duration) -> Self::Timer;
53}
54
55#[derive(Clone)]
56pub struct RealExecutor;
57
58const MESSAGE_COUNT_PER_PAGE: usize = 100;
59const MAX_MESSAGE_LEN: usize = 1024;
60
61impl Server {
62 pub fn new(
63 app_state: Arc<AppState>,
64 peer: Arc<Peer>,
65 notifications: Option<mpsc::UnboundedSender<()>>,
66 ) -> Arc<Self> {
67 let mut server = Self {
68 peer,
69 app_state,
70 store: Default::default(),
71 handlers: Default::default(),
72 notifications,
73 };
74
75 server
76 .add_request_handler(Server::ping)
77 .add_request_handler(Server::register_project)
78 .add_message_handler(Server::unregister_project)
79 .add_request_handler(Server::share_project)
80 .add_message_handler(Server::unshare_project)
81 .add_request_handler(Server::join_project)
82 .add_message_handler(Server::leave_project)
83 .add_request_handler(Server::register_worktree)
84 .add_message_handler(Server::unregister_worktree)
85 .add_request_handler(Server::update_worktree)
86 .add_message_handler(Server::update_diagnostic_summary)
87 .add_message_handler(Server::disk_based_diagnostics_updating)
88 .add_message_handler(Server::disk_based_diagnostics_updated)
89 .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
90 .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
91 .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
92 .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
93 .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
94 .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
95 .add_request_handler(Server::forward_project_request::<proto::OpenBuffer>)
96 .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
97 .add_request_handler(
98 Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
99 )
100 .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
101 .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
102 .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
103 .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
104 .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
105 .add_request_handler(Server::update_buffer)
106 .add_message_handler(Server::update_buffer_file)
107 .add_message_handler(Server::buffer_reloaded)
108 .add_message_handler(Server::buffer_saved)
109 .add_request_handler(Server::save_buffer)
110 .add_request_handler(Server::get_channels)
111 .add_request_handler(Server::get_users)
112 .add_request_handler(Server::join_channel)
113 .add_message_handler(Server::leave_channel)
114 .add_request_handler(Server::send_channel_message)
115 .add_request_handler(Server::get_channel_messages);
116
117 Arc::new(server)
118 }
119
120 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
121 where
122 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
123 Fut: 'static + Send + Future<Output = tide::Result<()>>,
124 M: EnvelopedMessage,
125 {
126 let prev_handler = self.handlers.insert(
127 TypeId::of::<M>(),
128 Box::new(move |server, envelope| {
129 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
130 (handler)(server, *envelope).boxed()
131 }),
132 );
133 if prev_handler.is_some() {
134 panic!("registered a handler for the same message twice");
135 }
136 self
137 }
138
139 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
140 where
141 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
142 Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
143 M: RequestMessage,
144 {
145 self.add_message_handler(move |server, envelope| {
146 let receipt = envelope.receipt();
147 let response = (handler)(server.clone(), envelope);
148 async move {
149 match response.await {
150 Ok(response) => {
151 server.peer.respond(receipt, response)?;
152 Ok(())
153 }
154 Err(error) => {
155 server.peer.respond_with_error(
156 receipt,
157 proto::Error {
158 message: error.to_string(),
159 },
160 )?;
161 Err(error)
162 }
163 }
164 }
165 })
166 }
167
168 pub fn handle_connection<E: Executor>(
169 self: &Arc<Self>,
170 connection: Connection,
171 addr: String,
172 user_id: UserId,
173 mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
174 executor: E,
175 ) -> impl Future<Output = ()> {
176 let mut this = self.clone();
177 async move {
178 let (connection_id, handle_io, mut incoming_rx) = this
179 .peer
180 .add_connection(connection, {
181 let executor = executor.clone();
182 move |duration| {
183 let timer = executor.timer(duration);
184 async move {
185 timer.await;
186 }
187 }
188 })
189 .await;
190
191 if let Some(send_connection_id) = send_connection_id.as_mut() {
192 let _ = send_connection_id.send(connection_id).await;
193 }
194
195 this.state_mut().add_connection(connection_id, user_id);
196 if let Err(err) = this.update_contacts_for_users(&[user_id]) {
197 log::error!("error updating contacts for {:?}: {}", user_id, err);
198 }
199
200 let handle_io = handle_io.fuse();
201 futures::pin_mut!(handle_io);
202 loop {
203 let next_message = incoming_rx.next().fuse();
204 futures::pin_mut!(next_message);
205 futures::select_biased! {
206 result = handle_io => {
207 if let Err(err) = result {
208 log::error!("error handling rpc connection {:?} - {:?}", addr, err);
209 }
210 break;
211 }
212 message = next_message => {
213 if let Some(message) = message {
214 let start_time = Instant::now();
215 let type_name = message.payload_type_name();
216 log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
217 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
218 let notifications = this.notifications.clone();
219 let is_background = message.is_background();
220 let handle_message = (handler)(this.clone(), message);
221 let handle_message = async move {
222 if let Err(err) = handle_message.await {
223 log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
224 } else {
225 log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
226 }
227 if let Some(mut notifications) = notifications {
228 let _ = notifications.send(()).await;
229 }
230 };
231 if is_background {
232 executor.spawn_detached(handle_message);
233 } else {
234 handle_message.await;
235 }
236 } else {
237 log::warn!("unhandled message: {}", type_name);
238 }
239 } else {
240 log::info!("rpc connection closed {:?}", addr);
241 break;
242 }
243 }
244 }
245 }
246
247 if let Err(err) = this.sign_out(connection_id).await {
248 log::error!("error signing out connection {:?} - {:?}", addr, err);
249 }
250 }
251 }
252
253 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
254 self.peer.disconnect(connection_id);
255 let removed_connection = self.state_mut().remove_connection(connection_id)?;
256
257 for (project_id, project) in removed_connection.hosted_projects {
258 if let Some(share) = project.share {
259 broadcast(
260 connection_id,
261 share.guests.keys().copied().collect(),
262 |conn_id| {
263 self.peer
264 .send(conn_id, proto::UnshareProject { project_id })
265 },
266 )?;
267 }
268 }
269
270 for (project_id, peer_ids) in removed_connection.guest_project_ids {
271 broadcast(connection_id, peer_ids, |conn_id| {
272 self.peer.send(
273 conn_id,
274 proto::RemoveProjectCollaborator {
275 project_id,
276 peer_id: connection_id.0,
277 },
278 )
279 })?;
280 }
281
282 self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
283 Ok(())
284 }
285
286 async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
287 Ok(proto::Ack {})
288 }
289
290 async fn register_project(
291 mut self: Arc<Server>,
292 request: TypedEnvelope<proto::RegisterProject>,
293 ) -> tide::Result<proto::RegisterProjectResponse> {
294 let project_id = {
295 let mut state = self.state_mut();
296 let user_id = state.user_id_for_connection(request.sender_id)?;
297 state.register_project(request.sender_id, user_id)
298 };
299 Ok(proto::RegisterProjectResponse { project_id })
300 }
301
302 async fn unregister_project(
303 mut self: Arc<Server>,
304 request: TypedEnvelope<proto::UnregisterProject>,
305 ) -> tide::Result<()> {
306 let project = self
307 .state_mut()
308 .unregister_project(request.payload.project_id, request.sender_id)?;
309 self.update_contacts_for_users(project.authorized_user_ids().iter())?;
310 Ok(())
311 }
312
313 async fn share_project(
314 mut self: Arc<Server>,
315 request: TypedEnvelope<proto::ShareProject>,
316 ) -> tide::Result<proto::Ack> {
317 self.state_mut()
318 .share_project(request.payload.project_id, request.sender_id);
319 Ok(proto::Ack {})
320 }
321
322 async fn unshare_project(
323 mut self: Arc<Server>,
324 request: TypedEnvelope<proto::UnshareProject>,
325 ) -> tide::Result<()> {
326 let project_id = request.payload.project_id;
327 let project = self
328 .state_mut()
329 .unshare_project(project_id, request.sender_id)?;
330
331 broadcast(request.sender_id, project.connection_ids, |conn_id| {
332 self.peer
333 .send(conn_id, proto::UnshareProject { project_id })
334 })?;
335 self.update_contacts_for_users(&project.authorized_user_ids)?;
336 Ok(())
337 }
338
339 async fn join_project(
340 mut self: Arc<Server>,
341 request: TypedEnvelope<proto::JoinProject>,
342 ) -> tide::Result<proto::JoinProjectResponse> {
343 let project_id = request.payload.project_id;
344
345 let user_id = self.state().user_id_for_connection(request.sender_id)?;
346 let (response, connection_ids, contact_user_ids) = self
347 .state_mut()
348 .join_project(request.sender_id, user_id, project_id)
349 .and_then(|joined| {
350 let share = joined.project.share()?;
351 let peer_count = share.guests.len();
352 let mut collaborators = Vec::with_capacity(peer_count);
353 collaborators.push(proto::Collaborator {
354 peer_id: joined.project.host_connection_id.0,
355 replica_id: 0,
356 user_id: joined.project.host_user_id.to_proto(),
357 });
358 let worktrees = share
359 .worktrees
360 .iter()
361 .filter_map(|(id, shared_worktree)| {
362 let worktree = joined.project.worktrees.get(&id)?;
363 Some(proto::Worktree {
364 id: *id,
365 root_name: worktree.root_name.clone(),
366 entries: shared_worktree.entries.values().cloned().collect(),
367 diagnostic_summaries: shared_worktree
368 .diagnostic_summaries
369 .values()
370 .cloned()
371 .collect(),
372 visible: worktree.visible,
373 })
374 })
375 .collect();
376 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
377 if *peer_conn_id != request.sender_id {
378 collaborators.push(proto::Collaborator {
379 peer_id: peer_conn_id.0,
380 replica_id: *peer_replica_id as u32,
381 user_id: peer_user_id.to_proto(),
382 });
383 }
384 }
385 let response = proto::JoinProjectResponse {
386 worktrees,
387 replica_id: joined.replica_id as u32,
388 collaborators,
389 };
390 let connection_ids = joined.project.connection_ids();
391 let contact_user_ids = joined.project.authorized_user_ids();
392 Ok((response, connection_ids, contact_user_ids))
393 })?;
394
395 broadcast(request.sender_id, connection_ids, |conn_id| {
396 self.peer.send(
397 conn_id,
398 proto::AddProjectCollaborator {
399 project_id,
400 collaborator: Some(proto::Collaborator {
401 peer_id: request.sender_id.0,
402 replica_id: response.replica_id,
403 user_id: user_id.to_proto(),
404 }),
405 },
406 )
407 })?;
408 self.update_contacts_for_users(&contact_user_ids)?;
409 Ok(response)
410 }
411
412 async fn leave_project(
413 mut self: Arc<Server>,
414 request: TypedEnvelope<proto::LeaveProject>,
415 ) -> tide::Result<()> {
416 let sender_id = request.sender_id;
417 let project_id = request.payload.project_id;
418 let worktree = self.state_mut().leave_project(sender_id, project_id)?;
419
420 broadcast(sender_id, worktree.connection_ids, |conn_id| {
421 self.peer.send(
422 conn_id,
423 proto::RemoveProjectCollaborator {
424 project_id,
425 peer_id: sender_id.0,
426 },
427 )
428 })?;
429 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
430
431 Ok(())
432 }
433
434 async fn register_worktree(
435 mut self: Arc<Server>,
436 request: TypedEnvelope<proto::RegisterWorktree>,
437 ) -> tide::Result<proto::Ack> {
438 let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
439
440 let mut contact_user_ids = HashSet::default();
441 contact_user_ids.insert(host_user_id);
442 for github_login in &request.payload.authorized_logins {
443 let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
444 contact_user_ids.insert(contact_user_id);
445 }
446
447 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
448 let guest_connection_ids;
449 {
450 let mut state = self.state_mut();
451 guest_connection_ids = state
452 .read_project(request.payload.project_id, request.sender_id)?
453 .guest_connection_ids();
454 state.register_worktree(
455 request.payload.project_id,
456 request.payload.worktree_id,
457 request.sender_id,
458 Worktree {
459 authorized_user_ids: contact_user_ids.clone(),
460 root_name: request.payload.root_name.clone(),
461 visible: request.payload.visible,
462 },
463 )?;
464 }
465 broadcast(request.sender_id, guest_connection_ids, |connection_id| {
466 self.peer
467 .forward_send(request.sender_id, connection_id, request.payload.clone())
468 })?;
469 self.update_contacts_for_users(&contact_user_ids)?;
470 Ok(proto::Ack {})
471 }
472
473 async fn unregister_worktree(
474 mut self: Arc<Server>,
475 request: TypedEnvelope<proto::UnregisterWorktree>,
476 ) -> tide::Result<()> {
477 let project_id = request.payload.project_id;
478 let worktree_id = request.payload.worktree_id;
479 let (worktree, guest_connection_ids) =
480 self.state_mut()
481 .unregister_worktree(project_id, worktree_id, request.sender_id)?;
482 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
483 self.peer.send(
484 conn_id,
485 proto::UnregisterWorktree {
486 project_id,
487 worktree_id,
488 },
489 )
490 })?;
491 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
492 Ok(())
493 }
494
495 async fn update_worktree(
496 mut self: Arc<Server>,
497 request: TypedEnvelope<proto::UpdateWorktree>,
498 ) -> tide::Result<proto::Ack> {
499 let connection_ids = self.state_mut().update_worktree(
500 request.sender_id,
501 request.payload.project_id,
502 request.payload.worktree_id,
503 &request.payload.removed_entries,
504 &request.payload.updated_entries,
505 )?;
506
507 broadcast(request.sender_id, connection_ids, |connection_id| {
508 self.peer
509 .forward_send(request.sender_id, connection_id, request.payload.clone())
510 })?;
511
512 Ok(proto::Ack {})
513 }
514
515 async fn update_diagnostic_summary(
516 mut self: Arc<Server>,
517 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
518 ) -> tide::Result<()> {
519 let summary = request
520 .payload
521 .summary
522 .clone()
523 .ok_or_else(|| anyhow!("invalid summary"))?;
524 let receiver_ids = self.state_mut().update_diagnostic_summary(
525 request.payload.project_id,
526 request.payload.worktree_id,
527 request.sender_id,
528 summary,
529 )?;
530
531 broadcast(request.sender_id, receiver_ids, |connection_id| {
532 self.peer
533 .forward_send(request.sender_id, connection_id, request.payload.clone())
534 })?;
535 Ok(())
536 }
537
538 async fn disk_based_diagnostics_updating(
539 self: Arc<Server>,
540 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
541 ) -> tide::Result<()> {
542 let receiver_ids = self
543 .state()
544 .project_connection_ids(request.payload.project_id, request.sender_id)?;
545 broadcast(request.sender_id, receiver_ids, |connection_id| {
546 self.peer
547 .forward_send(request.sender_id, connection_id, request.payload.clone())
548 })?;
549 Ok(())
550 }
551
552 async fn disk_based_diagnostics_updated(
553 self: Arc<Server>,
554 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
555 ) -> tide::Result<()> {
556 let receiver_ids = self
557 .state()
558 .project_connection_ids(request.payload.project_id, request.sender_id)?;
559 broadcast(request.sender_id, receiver_ids, |connection_id| {
560 self.peer
561 .forward_send(request.sender_id, connection_id, request.payload.clone())
562 })?;
563 Ok(())
564 }
565
566 async fn forward_project_request<T>(
567 self: Arc<Server>,
568 request: TypedEnvelope<T>,
569 ) -> tide::Result<T::Response>
570 where
571 T: EntityMessage + RequestMessage,
572 {
573 let host_connection_id = self
574 .state()
575 .read_project(request.payload.remote_entity_id(), request.sender_id)?
576 .host_connection_id;
577 Ok(self
578 .peer
579 .forward_request(request.sender_id, host_connection_id, request.payload)
580 .await?)
581 }
582
583 async fn save_buffer(
584 self: Arc<Server>,
585 request: TypedEnvelope<proto::SaveBuffer>,
586 ) -> tide::Result<proto::BufferSaved> {
587 let host;
588 let mut guests;
589 {
590 let state = self.state();
591 let project = state.read_project(request.payload.project_id, request.sender_id)?;
592 host = project.host_connection_id;
593 guests = project.guest_connection_ids()
594 }
595
596 let response = self
597 .peer
598 .forward_request(request.sender_id, host, request.payload.clone())
599 .await?;
600
601 guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
602 broadcast(host, guests, |conn_id| {
603 self.peer.forward_send(host, conn_id, response.clone())
604 })?;
605
606 Ok(response)
607 }
608
609 async fn update_buffer(
610 self: Arc<Server>,
611 request: TypedEnvelope<proto::UpdateBuffer>,
612 ) -> tide::Result<proto::Ack> {
613 let receiver_ids = self
614 .state()
615 .project_connection_ids(request.payload.project_id, request.sender_id)?;
616 broadcast(request.sender_id, receiver_ids, |connection_id| {
617 self.peer
618 .forward_send(request.sender_id, connection_id, request.payload.clone())
619 })?;
620 Ok(proto::Ack {})
621 }
622
623 async fn update_buffer_file(
624 self: Arc<Server>,
625 request: TypedEnvelope<proto::UpdateBufferFile>,
626 ) -> tide::Result<()> {
627 let receiver_ids = self
628 .state()
629 .project_connection_ids(request.payload.project_id, request.sender_id)?;
630 broadcast(request.sender_id, receiver_ids, |connection_id| {
631 self.peer
632 .forward_send(request.sender_id, connection_id, request.payload.clone())
633 })?;
634 Ok(())
635 }
636
637 async fn buffer_reloaded(
638 self: Arc<Server>,
639 request: TypedEnvelope<proto::BufferReloaded>,
640 ) -> tide::Result<()> {
641 let receiver_ids = self
642 .state()
643 .project_connection_ids(request.payload.project_id, request.sender_id)?;
644 broadcast(request.sender_id, receiver_ids, |connection_id| {
645 self.peer
646 .forward_send(request.sender_id, connection_id, request.payload.clone())
647 })?;
648 Ok(())
649 }
650
651 async fn buffer_saved(
652 self: Arc<Server>,
653 request: TypedEnvelope<proto::BufferSaved>,
654 ) -> tide::Result<()> {
655 let receiver_ids = self
656 .state()
657 .project_connection_ids(request.payload.project_id, request.sender_id)?;
658 broadcast(request.sender_id, receiver_ids, |connection_id| {
659 self.peer
660 .forward_send(request.sender_id, connection_id, request.payload.clone())
661 })?;
662 Ok(())
663 }
664
665 async fn get_channels(
666 self: Arc<Server>,
667 request: TypedEnvelope<proto::GetChannels>,
668 ) -> tide::Result<proto::GetChannelsResponse> {
669 let user_id = self.state().user_id_for_connection(request.sender_id)?;
670 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
671 Ok(proto::GetChannelsResponse {
672 channels: channels
673 .into_iter()
674 .map(|chan| proto::Channel {
675 id: chan.id.to_proto(),
676 name: chan.name,
677 })
678 .collect(),
679 })
680 }
681
682 async fn get_users(
683 self: Arc<Server>,
684 request: TypedEnvelope<proto::GetUsers>,
685 ) -> tide::Result<proto::GetUsersResponse> {
686 let user_ids = request
687 .payload
688 .user_ids
689 .into_iter()
690 .map(UserId::from_proto)
691 .collect();
692 let users = self
693 .app_state
694 .db
695 .get_users_by_ids(user_ids)
696 .await?
697 .into_iter()
698 .map(|user| proto::User {
699 id: user.id.to_proto(),
700 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
701 github_login: user.github_login,
702 })
703 .collect();
704 Ok(proto::GetUsersResponse { users })
705 }
706
707 fn update_contacts_for_users<'a>(
708 self: &Arc<Server>,
709 user_ids: impl IntoIterator<Item = &'a UserId>,
710 ) -> anyhow::Result<()> {
711 let mut result = Ok(());
712 let state = self.state();
713 for user_id in user_ids {
714 let contacts = state.contacts_for_user(*user_id);
715 for connection_id in state.connection_ids_for_user(*user_id) {
716 if let Err(error) = self.peer.send(
717 connection_id,
718 proto::UpdateContacts {
719 contacts: contacts.clone(),
720 },
721 ) {
722 result = Err(error);
723 }
724 }
725 }
726 result
727 }
728
729 async fn join_channel(
730 mut self: Arc<Self>,
731 request: TypedEnvelope<proto::JoinChannel>,
732 ) -> tide::Result<proto::JoinChannelResponse> {
733 let user_id = self.state().user_id_for_connection(request.sender_id)?;
734 let channel_id = ChannelId::from_proto(request.payload.channel_id);
735 if !self
736 .app_state
737 .db
738 .can_user_access_channel(user_id, channel_id)
739 .await?
740 {
741 Err(anyhow!("access denied"))?;
742 }
743
744 self.state_mut().join_channel(request.sender_id, channel_id);
745 let messages = self
746 .app_state
747 .db
748 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
749 .await?
750 .into_iter()
751 .map(|msg| proto::ChannelMessage {
752 id: msg.id.to_proto(),
753 body: msg.body,
754 timestamp: msg.sent_at.unix_timestamp() as u64,
755 sender_id: msg.sender_id.to_proto(),
756 nonce: Some(msg.nonce.as_u128().into()),
757 })
758 .collect::<Vec<_>>();
759 Ok(proto::JoinChannelResponse {
760 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
761 messages,
762 })
763 }
764
765 async fn leave_channel(
766 mut self: Arc<Self>,
767 request: TypedEnvelope<proto::LeaveChannel>,
768 ) -> tide::Result<()> {
769 let user_id = self.state().user_id_for_connection(request.sender_id)?;
770 let channel_id = ChannelId::from_proto(request.payload.channel_id);
771 if !self
772 .app_state
773 .db
774 .can_user_access_channel(user_id, channel_id)
775 .await?
776 {
777 Err(anyhow!("access denied"))?;
778 }
779
780 self.state_mut()
781 .leave_channel(request.sender_id, channel_id);
782
783 Ok(())
784 }
785
786 async fn send_channel_message(
787 self: Arc<Self>,
788 request: TypedEnvelope<proto::SendChannelMessage>,
789 ) -> tide::Result<proto::SendChannelMessageResponse> {
790 let channel_id = ChannelId::from_proto(request.payload.channel_id);
791 let user_id;
792 let connection_ids;
793 {
794 let state = self.state();
795 user_id = state.user_id_for_connection(request.sender_id)?;
796 connection_ids = state.channel_connection_ids(channel_id)?;
797 }
798
799 // Validate the message body.
800 let body = request.payload.body.trim().to_string();
801 if body.len() > MAX_MESSAGE_LEN {
802 return Err(anyhow!("message is too long"))?;
803 }
804 if body.is_empty() {
805 return Err(anyhow!("message can't be blank"))?;
806 }
807
808 let timestamp = OffsetDateTime::now_utc();
809 let nonce = request
810 .payload
811 .nonce
812 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
813
814 let message_id = self
815 .app_state
816 .db
817 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
818 .await?
819 .to_proto();
820 let message = proto::ChannelMessage {
821 sender_id: user_id.to_proto(),
822 id: message_id,
823 body,
824 timestamp: timestamp.unix_timestamp() as u64,
825 nonce: Some(nonce),
826 };
827 broadcast(request.sender_id, connection_ids, |conn_id| {
828 self.peer.send(
829 conn_id,
830 proto::ChannelMessageSent {
831 channel_id: channel_id.to_proto(),
832 message: Some(message.clone()),
833 },
834 )
835 })?;
836 Ok(proto::SendChannelMessageResponse {
837 message: Some(message),
838 })
839 }
840
841 async fn get_channel_messages(
842 self: Arc<Self>,
843 request: TypedEnvelope<proto::GetChannelMessages>,
844 ) -> tide::Result<proto::GetChannelMessagesResponse> {
845 let user_id = self.state().user_id_for_connection(request.sender_id)?;
846 let channel_id = ChannelId::from_proto(request.payload.channel_id);
847 if !self
848 .app_state
849 .db
850 .can_user_access_channel(user_id, channel_id)
851 .await?
852 {
853 Err(anyhow!("access denied"))?;
854 }
855
856 let messages = self
857 .app_state
858 .db
859 .get_channel_messages(
860 channel_id,
861 MESSAGE_COUNT_PER_PAGE,
862 Some(MessageId::from_proto(request.payload.before_message_id)),
863 )
864 .await?
865 .into_iter()
866 .map(|msg| proto::ChannelMessage {
867 id: msg.id.to_proto(),
868 body: msg.body,
869 timestamp: msg.sent_at.unix_timestamp() as u64,
870 sender_id: msg.sender_id.to_proto(),
871 nonce: Some(msg.nonce.as_u128().into()),
872 })
873 .collect::<Vec<_>>();
874
875 Ok(proto::GetChannelMessagesResponse {
876 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
877 messages,
878 })
879 }
880
881 fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
882 self.store.read()
883 }
884
885 fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
886 self.store.write()
887 }
888}
889
890impl Executor for RealExecutor {
891 type Timer = Timer;
892
893 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
894 task::spawn(future);
895 }
896
897 fn timer(&self, duration: Duration) -> Self::Timer {
898 Timer::after(duration)
899 }
900}
901
902fn broadcast<F>(
903 sender_id: ConnectionId,
904 receiver_ids: Vec<ConnectionId>,
905 mut f: F,
906) -> anyhow::Result<()>
907where
908 F: FnMut(ConnectionId) -> anyhow::Result<()>,
909{
910 let mut result = Ok(());
911 for receiver_id in receiver_ids {
912 if receiver_id != sender_id {
913 if let Err(error) = f(receiver_id) {
914 if result.is_ok() {
915 result = Err(error);
916 }
917 }
918 }
919 }
920 result
921}
922
923pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
924 let server = Server::new(app.state().clone(), rpc.clone(), None);
925 app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
926 let server = server.clone();
927 async move {
928 const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
929
930 let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
931 let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
932 let upgrade_requested = connection_upgrade && upgrade_to_websocket;
933 let client_protocol_version: Option<u32> = request
934 .header("X-Zed-Protocol-Version")
935 .and_then(|v| v.as_str().parse().ok());
936
937 if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
938 return Ok(Response::new(StatusCode::UpgradeRequired));
939 }
940
941 let header = match request.header("Sec-Websocket-Key") {
942 Some(h) => h.as_str(),
943 None => return Err(anyhow!("expected sec-websocket-key"))?,
944 };
945
946 let user_id = process_auth_header(&request).await?;
947
948 let mut response = Response::new(StatusCode::SwitchingProtocols);
949 response.insert_header(UPGRADE, "websocket");
950 response.insert_header(CONNECTION, "Upgrade");
951 let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
952 response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
953 response.insert_header("Sec-Websocket-Version", "13");
954
955 let http_res: &mut tide::http::Response = response.as_mut();
956 let upgrade_receiver = http_res.recv_upgrade().await;
957 let addr = request.remote().unwrap_or("unknown").to_string();
958 task::spawn(async move {
959 if let Some(stream) = upgrade_receiver.await {
960 server
961 .handle_connection(
962 Connection::new(
963 WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
964 ),
965 addr,
966 user_id,
967 None,
968 RealExecutor,
969 )
970 .await;
971 }
972 });
973
974 Ok(response)
975 }
976 });
977}
978
979fn header_contains_ignore_case<T>(
980 request: &tide::Request<T>,
981 header_name: HeaderName,
982 value: &str,
983) -> bool {
984 request
985 .header(header_name)
986 .map(|h| {
987 h.as_str()
988 .split(',')
989 .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
990 })
991 .unwrap_or(false)
992}
993
994#[cfg(test)]
995mod tests {
996 use super::*;
997 use crate::{
998 auth,
999 db::{tests::TestDb, UserId},
1000 github, AppState, Config,
1001 };
1002 use ::rpc::Peer;
1003 use client::{
1004 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1005 EstablishConnectionError, UserStore,
1006 };
1007 use collections::BTreeMap;
1008 use editor::{
1009 self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, MultiBuffer,
1010 Redo, Rename, ToOffset, ToggleCodeActions, Undo,
1011 };
1012 use gpui::{executor, ModelHandle, TestAppContext};
1013 use language::{
1014 tree_sitter_rust, Diagnostic, DiagnosticEntry, Language, LanguageConfig, LanguageRegistry,
1015 LanguageServerConfig, OffsetRangeExt, Point, ToLspPosition,
1016 };
1017 use lsp;
1018 use parking_lot::Mutex;
1019 use postage::{barrier, watch};
1020 use project::{
1021 fs::{FakeFs, Fs as _},
1022 search::SearchQuery,
1023 worktree::WorktreeHandle,
1024 DiagnosticSummary, Project, ProjectPath,
1025 };
1026 use rand::prelude::*;
1027 use rpc::PeerId;
1028 use serde_json::json;
1029 use sqlx::types::time::OffsetDateTime;
1030 use std::{
1031 cell::Cell,
1032 env,
1033 ops::Deref,
1034 path::{Path, PathBuf},
1035 rc::Rc,
1036 sync::{
1037 atomic::{AtomicBool, Ordering::SeqCst},
1038 Arc,
1039 },
1040 time::Duration,
1041 };
1042 use workspace::{Settings, Workspace, WorkspaceParams};
1043
1044 #[cfg(test)]
1045 #[ctor::ctor]
1046 fn init_logger() {
1047 if std::env::var("RUST_LOG").is_ok() {
1048 env_logger::init();
1049 }
1050 }
1051
1052 #[gpui::test(iterations = 10)]
1053 async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1054 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1055 let lang_registry = Arc::new(LanguageRegistry::test());
1056 let fs = FakeFs::new(cx_a.background());
1057 cx_a.foreground().forbid_parking();
1058
1059 // Connect to a server as 2 clients.
1060 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1061 let client_a = server.create_client(cx_a, "user_a").await;
1062 let client_b = server.create_client(cx_b, "user_b").await;
1063
1064 // Share a project as client A
1065 fs.insert_tree(
1066 "/a",
1067 json!({
1068 ".zed.toml": r#"collaborators = ["user_b"]"#,
1069 "a.txt": "a-contents",
1070 "b.txt": "b-contents",
1071 }),
1072 )
1073 .await;
1074 let project_a = cx_a.update(|cx| {
1075 Project::local(
1076 client_a.clone(),
1077 client_a.user_store.clone(),
1078 lang_registry.clone(),
1079 fs.clone(),
1080 cx,
1081 )
1082 });
1083 let (worktree_a, _) = project_a
1084 .update(cx_a, |p, cx| {
1085 p.find_or_create_local_worktree("/a", true, cx)
1086 })
1087 .await
1088 .unwrap();
1089 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1090 worktree_a
1091 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1092 .await;
1093 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1094 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1095
1096 // Join that project as client B
1097 let project_b = Project::remote(
1098 project_id,
1099 client_b.clone(),
1100 client_b.user_store.clone(),
1101 lang_registry.clone(),
1102 fs.clone(),
1103 &mut cx_b.to_async(),
1104 )
1105 .await
1106 .unwrap();
1107
1108 let replica_id_b = project_b.read_with(cx_b, |project, _| {
1109 assert_eq!(
1110 project
1111 .collaborators()
1112 .get(&client_a.peer_id)
1113 .unwrap()
1114 .user
1115 .github_login,
1116 "user_a"
1117 );
1118 project.replica_id()
1119 });
1120 project_a
1121 .condition(&cx_a, |tree, _| {
1122 tree.collaborators()
1123 .get(&client_b.peer_id)
1124 .map_or(false, |collaborator| {
1125 collaborator.replica_id == replica_id_b
1126 && collaborator.user.github_login == "user_b"
1127 })
1128 })
1129 .await;
1130
1131 // Open the same file as client B and client A.
1132 let buffer_b = project_b
1133 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1134 .await
1135 .unwrap();
1136 let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1137 buffer_b.read_with(cx_b, |buf, cx| {
1138 assert_eq!(buf.read(cx).text(), "b-contents")
1139 });
1140 project_a.read_with(cx_a, |project, cx| {
1141 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1142 });
1143 let buffer_a = project_a
1144 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1145 .await
1146 .unwrap();
1147
1148 let editor_b = cx_b.add_view(window_b, |cx| {
1149 Editor::for_buffer(
1150 buffer_b,
1151 None,
1152 watch::channel_with(Settings::test(cx)).1,
1153 cx,
1154 )
1155 });
1156
1157 // TODO
1158 // // Create a selection set as client B and see that selection set as client A.
1159 // buffer_a
1160 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1161 // .await;
1162
1163 // Edit the buffer as client B and see that edit as client A.
1164 editor_b.update(cx_b, |editor, cx| {
1165 editor.handle_input(&Input("ok, ".into()), cx)
1166 });
1167 buffer_a
1168 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1169 .await;
1170
1171 // TODO
1172 // // Remove the selection set as client B, see those selections disappear as client A.
1173 cx_b.update(move |_| drop(editor_b));
1174 // buffer_a
1175 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1176 // .await;
1177
1178 // Dropping the client B's project removes client B from client A's collaborators.
1179 cx_b.update(move |_| drop(project_b));
1180 project_a
1181 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1182 .await;
1183 }
1184
1185 #[gpui::test(iterations = 10)]
1186 async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1187 let lang_registry = Arc::new(LanguageRegistry::test());
1188 let fs = FakeFs::new(cx_a.background());
1189 cx_a.foreground().forbid_parking();
1190
1191 // Connect to a server as 2 clients.
1192 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1193 let client_a = server.create_client(cx_a, "user_a").await;
1194 let client_b = server.create_client(cx_b, "user_b").await;
1195
1196 // Share a project as client A
1197 fs.insert_tree(
1198 "/a",
1199 json!({
1200 ".zed.toml": r#"collaborators = ["user_b"]"#,
1201 "a.txt": "a-contents",
1202 "b.txt": "b-contents",
1203 }),
1204 )
1205 .await;
1206 let project_a = cx_a.update(|cx| {
1207 Project::local(
1208 client_a.clone(),
1209 client_a.user_store.clone(),
1210 lang_registry.clone(),
1211 fs.clone(),
1212 cx,
1213 )
1214 });
1215 let (worktree_a, _) = project_a
1216 .update(cx_a, |p, cx| {
1217 p.find_or_create_local_worktree("/a", true, cx)
1218 })
1219 .await
1220 .unwrap();
1221 worktree_a
1222 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1223 .await;
1224 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1225 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1226 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1227 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1228
1229 // Join that project as client B
1230 let project_b = Project::remote(
1231 project_id,
1232 client_b.clone(),
1233 client_b.user_store.clone(),
1234 lang_registry.clone(),
1235 fs.clone(),
1236 &mut cx_b.to_async(),
1237 )
1238 .await
1239 .unwrap();
1240 project_b
1241 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1242 .await
1243 .unwrap();
1244
1245 // Unshare the project as client A
1246 project_a
1247 .update(cx_a, |project, cx| project.unshare(cx))
1248 .await
1249 .unwrap();
1250 project_b
1251 .condition(cx_b, |project, _| project.is_read_only())
1252 .await;
1253 assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1254 cx_b.update(|_| {
1255 drop(project_b);
1256 });
1257
1258 // Share the project again and ensure guests can still join.
1259 project_a
1260 .update(cx_a, |project, cx| project.share(cx))
1261 .await
1262 .unwrap();
1263 assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1264
1265 let project_b2 = Project::remote(
1266 project_id,
1267 client_b.clone(),
1268 client_b.user_store.clone(),
1269 lang_registry.clone(),
1270 fs.clone(),
1271 &mut cx_b.to_async(),
1272 )
1273 .await
1274 .unwrap();
1275 project_b2
1276 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1277 .await
1278 .unwrap();
1279 }
1280
1281 #[gpui::test(iterations = 10)]
1282 async fn test_propagate_saves_and_fs_changes(
1283 cx_a: &mut TestAppContext,
1284 cx_b: &mut TestAppContext,
1285 cx_c: &mut TestAppContext,
1286 ) {
1287 let lang_registry = Arc::new(LanguageRegistry::test());
1288 let fs = FakeFs::new(cx_a.background());
1289 cx_a.foreground().forbid_parking();
1290
1291 // Connect to a server as 3 clients.
1292 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1293 let client_a = server.create_client(cx_a, "user_a").await;
1294 let client_b = server.create_client(cx_b, "user_b").await;
1295 let client_c = server.create_client(cx_c, "user_c").await;
1296
1297 // Share a worktree as client A.
1298 fs.insert_tree(
1299 "/a",
1300 json!({
1301 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1302 "file1": "",
1303 "file2": ""
1304 }),
1305 )
1306 .await;
1307 let project_a = cx_a.update(|cx| {
1308 Project::local(
1309 client_a.clone(),
1310 client_a.user_store.clone(),
1311 lang_registry.clone(),
1312 fs.clone(),
1313 cx,
1314 )
1315 });
1316 let (worktree_a, _) = project_a
1317 .update(cx_a, |p, cx| {
1318 p.find_or_create_local_worktree("/a", true, cx)
1319 })
1320 .await
1321 .unwrap();
1322 worktree_a
1323 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1324 .await;
1325 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1326 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1327 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1328
1329 // Join that worktree as clients B and C.
1330 let project_b = Project::remote(
1331 project_id,
1332 client_b.clone(),
1333 client_b.user_store.clone(),
1334 lang_registry.clone(),
1335 fs.clone(),
1336 &mut cx_b.to_async(),
1337 )
1338 .await
1339 .unwrap();
1340 let project_c = Project::remote(
1341 project_id,
1342 client_c.clone(),
1343 client_c.user_store.clone(),
1344 lang_registry.clone(),
1345 fs.clone(),
1346 &mut cx_c.to_async(),
1347 )
1348 .await
1349 .unwrap();
1350 let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1351 let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1352
1353 // Open and edit a buffer as both guests B and C.
1354 let buffer_b = project_b
1355 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1356 .await
1357 .unwrap();
1358 let buffer_c = project_c
1359 .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1360 .await
1361 .unwrap();
1362 buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1363 buffer_c.update(cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1364
1365 // Open and edit that buffer as the host.
1366 let buffer_a = project_a
1367 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1368 .await
1369 .unwrap();
1370
1371 buffer_a
1372 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1373 .await;
1374 buffer_a.update(cx_a, |buf, cx| {
1375 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1376 });
1377
1378 // Wait for edits to propagate
1379 buffer_a
1380 .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1381 .await;
1382 buffer_b
1383 .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1384 .await;
1385 buffer_c
1386 .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1387 .await;
1388
1389 // Edit the buffer as the host and concurrently save as guest B.
1390 let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1391 buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1392 save_b.await.unwrap();
1393 assert_eq!(
1394 fs.load("/a/file1".as_ref()).await.unwrap(),
1395 "hi-a, i-am-c, i-am-b, i-am-a"
1396 );
1397 buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1398 buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1399 buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1400
1401 worktree_a.flush_fs_events(cx_a).await;
1402
1403 // Make changes on host's file system, see those changes on guest worktrees.
1404 fs.rename(
1405 "/a/file1".as_ref(),
1406 "/a/file1-renamed".as_ref(),
1407 Default::default(),
1408 )
1409 .await
1410 .unwrap();
1411
1412 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1413 .await
1414 .unwrap();
1415 fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1416
1417 worktree_a
1418 .condition(&cx_a, |tree, _| {
1419 tree.paths()
1420 .map(|p| p.to_string_lossy())
1421 .collect::<Vec<_>>()
1422 == [".zed.toml", "file1-renamed", "file3", "file4"]
1423 })
1424 .await;
1425 worktree_b
1426 .condition(&cx_b, |tree, _| {
1427 tree.paths()
1428 .map(|p| p.to_string_lossy())
1429 .collect::<Vec<_>>()
1430 == [".zed.toml", "file1-renamed", "file3", "file4"]
1431 })
1432 .await;
1433 worktree_c
1434 .condition(&cx_c, |tree, _| {
1435 tree.paths()
1436 .map(|p| p.to_string_lossy())
1437 .collect::<Vec<_>>()
1438 == [".zed.toml", "file1-renamed", "file3", "file4"]
1439 })
1440 .await;
1441
1442 // Ensure buffer files are updated as well.
1443 buffer_a
1444 .condition(&cx_a, |buf, _| {
1445 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1446 })
1447 .await;
1448 buffer_b
1449 .condition(&cx_b, |buf, _| {
1450 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1451 })
1452 .await;
1453 buffer_c
1454 .condition(&cx_c, |buf, _| {
1455 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1456 })
1457 .await;
1458 }
1459
1460 #[gpui::test(iterations = 10)]
1461 async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1462 cx_a.foreground().forbid_parking();
1463 let lang_registry = Arc::new(LanguageRegistry::test());
1464 let fs = FakeFs::new(cx_a.background());
1465
1466 // Connect to a server as 2 clients.
1467 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1468 let client_a = server.create_client(cx_a, "user_a").await;
1469 let client_b = server.create_client(cx_b, "user_b").await;
1470
1471 // Share a project as client A
1472 fs.insert_tree(
1473 "/dir",
1474 json!({
1475 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1476 "a.txt": "a-contents",
1477 }),
1478 )
1479 .await;
1480
1481 let project_a = cx_a.update(|cx| {
1482 Project::local(
1483 client_a.clone(),
1484 client_a.user_store.clone(),
1485 lang_registry.clone(),
1486 fs.clone(),
1487 cx,
1488 )
1489 });
1490 let (worktree_a, _) = project_a
1491 .update(cx_a, |p, cx| {
1492 p.find_or_create_local_worktree("/dir", true, cx)
1493 })
1494 .await
1495 .unwrap();
1496 worktree_a
1497 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1498 .await;
1499 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1500 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1501 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1502
1503 // Join that project as client B
1504 let project_b = Project::remote(
1505 project_id,
1506 client_b.clone(),
1507 client_b.user_store.clone(),
1508 lang_registry.clone(),
1509 fs.clone(),
1510 &mut cx_b.to_async(),
1511 )
1512 .await
1513 .unwrap();
1514
1515 // Open a buffer as client B
1516 let buffer_b = project_b
1517 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1518 .await
1519 .unwrap();
1520
1521 buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1522 buffer_b.read_with(cx_b, |buf, _| {
1523 assert!(buf.is_dirty());
1524 assert!(!buf.has_conflict());
1525 });
1526
1527 buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
1528 buffer_b
1529 .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1530 .await;
1531 buffer_b.read_with(cx_b, |buf, _| {
1532 assert!(!buf.has_conflict());
1533 });
1534
1535 buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1536 buffer_b.read_with(cx_b, |buf, _| {
1537 assert!(buf.is_dirty());
1538 assert!(!buf.has_conflict());
1539 });
1540 }
1541
1542 #[gpui::test(iterations = 10)]
1543 async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1544 cx_a.foreground().forbid_parking();
1545 let lang_registry = Arc::new(LanguageRegistry::test());
1546 let fs = FakeFs::new(cx_a.background());
1547
1548 // Connect to a server as 2 clients.
1549 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1550 let client_a = server.create_client(cx_a, "user_a").await;
1551 let client_b = server.create_client(cx_b, "user_b").await;
1552
1553 // Share a project as client A
1554 fs.insert_tree(
1555 "/dir",
1556 json!({
1557 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1558 "a.txt": "a-contents",
1559 }),
1560 )
1561 .await;
1562
1563 let project_a = cx_a.update(|cx| {
1564 Project::local(
1565 client_a.clone(),
1566 client_a.user_store.clone(),
1567 lang_registry.clone(),
1568 fs.clone(),
1569 cx,
1570 )
1571 });
1572 let (worktree_a, _) = project_a
1573 .update(cx_a, |p, cx| {
1574 p.find_or_create_local_worktree("/dir", true, cx)
1575 })
1576 .await
1577 .unwrap();
1578 worktree_a
1579 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1580 .await;
1581 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1582 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1583 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1584
1585 // Join that project as client B
1586 let project_b = Project::remote(
1587 project_id,
1588 client_b.clone(),
1589 client_b.user_store.clone(),
1590 lang_registry.clone(),
1591 fs.clone(),
1592 &mut cx_b.to_async(),
1593 )
1594 .await
1595 .unwrap();
1596 let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1597
1598 // Open a buffer as client B
1599 let buffer_b = project_b
1600 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1601 .await
1602 .unwrap();
1603 buffer_b.read_with(cx_b, |buf, _| {
1604 assert!(!buf.is_dirty());
1605 assert!(!buf.has_conflict());
1606 });
1607
1608 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1609 .await
1610 .unwrap();
1611 buffer_b
1612 .condition(&cx_b, |buf, _| {
1613 buf.text() == "new contents" && !buf.is_dirty()
1614 })
1615 .await;
1616 buffer_b.read_with(cx_b, |buf, _| {
1617 assert!(!buf.has_conflict());
1618 });
1619 }
1620
1621 #[gpui::test(iterations = 10)]
1622 async fn test_editing_while_guest_opens_buffer(
1623 cx_a: &mut TestAppContext,
1624 cx_b: &mut TestAppContext,
1625 ) {
1626 cx_a.foreground().forbid_parking();
1627 let lang_registry = Arc::new(LanguageRegistry::test());
1628 let fs = FakeFs::new(cx_a.background());
1629
1630 // Connect to a server as 2 clients.
1631 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1632 let client_a = server.create_client(cx_a, "user_a").await;
1633 let client_b = server.create_client(cx_b, "user_b").await;
1634
1635 // Share a project as client A
1636 fs.insert_tree(
1637 "/dir",
1638 json!({
1639 ".zed.toml": r#"collaborators = ["user_b"]"#,
1640 "a.txt": "a-contents",
1641 }),
1642 )
1643 .await;
1644 let project_a = cx_a.update(|cx| {
1645 Project::local(
1646 client_a.clone(),
1647 client_a.user_store.clone(),
1648 lang_registry.clone(),
1649 fs.clone(),
1650 cx,
1651 )
1652 });
1653 let (worktree_a, _) = project_a
1654 .update(cx_a, |p, cx| {
1655 p.find_or_create_local_worktree("/dir", true, cx)
1656 })
1657 .await
1658 .unwrap();
1659 worktree_a
1660 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1661 .await;
1662 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1663 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1664 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1665
1666 // Join that project as client B
1667 let project_b = Project::remote(
1668 project_id,
1669 client_b.clone(),
1670 client_b.user_store.clone(),
1671 lang_registry.clone(),
1672 fs.clone(),
1673 &mut cx_b.to_async(),
1674 )
1675 .await
1676 .unwrap();
1677
1678 // Open a buffer as client A
1679 let buffer_a = project_a
1680 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1681 .await
1682 .unwrap();
1683
1684 // Start opening the same buffer as client B
1685 let buffer_b = cx_b
1686 .background()
1687 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1688
1689 // Edit the buffer as client A while client B is still opening it.
1690 cx_b.background().simulate_random_delay().await;
1691 buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1692 cx_b.background().simulate_random_delay().await;
1693 buffer_a.update(cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1694
1695 let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
1696 let buffer_b = buffer_b.await.unwrap();
1697 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1698 }
1699
1700 #[gpui::test(iterations = 10)]
1701 async fn test_leaving_worktree_while_opening_buffer(
1702 cx_a: &mut TestAppContext,
1703 cx_b: &mut TestAppContext,
1704 ) {
1705 cx_a.foreground().forbid_parking();
1706 let lang_registry = Arc::new(LanguageRegistry::test());
1707 let fs = FakeFs::new(cx_a.background());
1708
1709 // Connect to a server as 2 clients.
1710 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1711 let client_a = server.create_client(cx_a, "user_a").await;
1712 let client_b = server.create_client(cx_b, "user_b").await;
1713
1714 // Share a project as client A
1715 fs.insert_tree(
1716 "/dir",
1717 json!({
1718 ".zed.toml": r#"collaborators = ["user_b"]"#,
1719 "a.txt": "a-contents",
1720 }),
1721 )
1722 .await;
1723 let project_a = cx_a.update(|cx| {
1724 Project::local(
1725 client_a.clone(),
1726 client_a.user_store.clone(),
1727 lang_registry.clone(),
1728 fs.clone(),
1729 cx,
1730 )
1731 });
1732 let (worktree_a, _) = project_a
1733 .update(cx_a, |p, cx| {
1734 p.find_or_create_local_worktree("/dir", true, cx)
1735 })
1736 .await
1737 .unwrap();
1738 worktree_a
1739 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1740 .await;
1741 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1742 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1743 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1744
1745 // Join that project as client B
1746 let project_b = Project::remote(
1747 project_id,
1748 client_b.clone(),
1749 client_b.user_store.clone(),
1750 lang_registry.clone(),
1751 fs.clone(),
1752 &mut cx_b.to_async(),
1753 )
1754 .await
1755 .unwrap();
1756
1757 // See that a guest has joined as client A.
1758 project_a
1759 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1760 .await;
1761
1762 // Begin opening a buffer as client B, but leave the project before the open completes.
1763 let buffer_b = cx_b
1764 .background()
1765 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1766 cx_b.update(|_| drop(project_b));
1767 drop(buffer_b);
1768
1769 // See that the guest has left.
1770 project_a
1771 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1772 .await;
1773 }
1774
1775 #[gpui::test(iterations = 10)]
1776 async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1777 cx_a.foreground().forbid_parking();
1778 let lang_registry = Arc::new(LanguageRegistry::test());
1779 let fs = FakeFs::new(cx_a.background());
1780
1781 // Connect to a server as 2 clients.
1782 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1783 let client_a = server.create_client(cx_a, "user_a").await;
1784 let client_b = server.create_client(cx_b, "user_b").await;
1785
1786 // Share a project as client A
1787 fs.insert_tree(
1788 "/a",
1789 json!({
1790 ".zed.toml": r#"collaborators = ["user_b"]"#,
1791 "a.txt": "a-contents",
1792 "b.txt": "b-contents",
1793 }),
1794 )
1795 .await;
1796 let project_a = cx_a.update(|cx| {
1797 Project::local(
1798 client_a.clone(),
1799 client_a.user_store.clone(),
1800 lang_registry.clone(),
1801 fs.clone(),
1802 cx,
1803 )
1804 });
1805 let (worktree_a, _) = project_a
1806 .update(cx_a, |p, cx| {
1807 p.find_or_create_local_worktree("/a", true, cx)
1808 })
1809 .await
1810 .unwrap();
1811 worktree_a
1812 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1813 .await;
1814 let project_id = project_a
1815 .update(cx_a, |project, _| project.next_remote_id())
1816 .await;
1817 project_a
1818 .update(cx_a, |project, cx| project.share(cx))
1819 .await
1820 .unwrap();
1821
1822 // Join that project as client B
1823 let _project_b = Project::remote(
1824 project_id,
1825 client_b.clone(),
1826 client_b.user_store.clone(),
1827 lang_registry.clone(),
1828 fs.clone(),
1829 &mut cx_b.to_async(),
1830 )
1831 .await
1832 .unwrap();
1833
1834 // Client A sees that a guest has joined.
1835 project_a
1836 .condition(cx_a, |p, _| p.collaborators().len() == 1)
1837 .await;
1838
1839 // Drop client B's connection and ensure client A observes client B leaving the project.
1840 client_b.disconnect(&cx_b.to_async()).unwrap();
1841 project_a
1842 .condition(cx_a, |p, _| p.collaborators().len() == 0)
1843 .await;
1844
1845 // Rejoin the project as client B
1846 let _project_b = Project::remote(
1847 project_id,
1848 client_b.clone(),
1849 client_b.user_store.clone(),
1850 lang_registry.clone(),
1851 fs.clone(),
1852 &mut cx_b.to_async(),
1853 )
1854 .await
1855 .unwrap();
1856
1857 // Client A sees that a guest has re-joined.
1858 project_a
1859 .condition(cx_a, |p, _| p.collaborators().len() == 1)
1860 .await;
1861
1862 // Simulate connection loss for client B and ensure client A observes client B leaving the project.
1863 client_b.wait_for_current_user(cx_b).await;
1864 server.disconnect_client(client_b.current_user_id(cx_b));
1865 cx_a.foreground().advance_clock(Duration::from_secs(3));
1866 project_a
1867 .condition(cx_a, |p, _| p.collaborators().len() == 0)
1868 .await;
1869 }
1870
1871 #[gpui::test(iterations = 10)]
1872 async fn test_collaborating_with_diagnostics(
1873 cx_a: &mut TestAppContext,
1874 cx_b: &mut TestAppContext,
1875 ) {
1876 cx_a.foreground().forbid_parking();
1877 let mut lang_registry = Arc::new(LanguageRegistry::test());
1878 let fs = FakeFs::new(cx_a.background());
1879
1880 // Set up a fake language server.
1881 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
1882 Arc::get_mut(&mut lang_registry)
1883 .unwrap()
1884 .add(Arc::new(Language::new(
1885 LanguageConfig {
1886 name: "Rust".into(),
1887 path_suffixes: vec!["rs".to_string()],
1888 language_server: Some(language_server_config),
1889 ..Default::default()
1890 },
1891 Some(tree_sitter_rust::language()),
1892 )));
1893
1894 // Connect to a server as 2 clients.
1895 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1896 let client_a = server.create_client(cx_a, "user_a").await;
1897 let client_b = server.create_client(cx_b, "user_b").await;
1898
1899 // Share a project as client A
1900 fs.insert_tree(
1901 "/a",
1902 json!({
1903 ".zed.toml": r#"collaborators = ["user_b"]"#,
1904 "a.rs": "let one = two",
1905 "other.rs": "",
1906 }),
1907 )
1908 .await;
1909 let project_a = cx_a.update(|cx| {
1910 Project::local(
1911 client_a.clone(),
1912 client_a.user_store.clone(),
1913 lang_registry.clone(),
1914 fs.clone(),
1915 cx,
1916 )
1917 });
1918 let (worktree_a, _) = project_a
1919 .update(cx_a, |p, cx| {
1920 p.find_or_create_local_worktree("/a", true, cx)
1921 })
1922 .await
1923 .unwrap();
1924 worktree_a
1925 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1926 .await;
1927 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1928 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1929 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1930
1931 // Cause the language server to start.
1932 let _ = cx_a
1933 .background()
1934 .spawn(project_a.update(cx_a, |project, cx| {
1935 project.open_buffer(
1936 ProjectPath {
1937 worktree_id,
1938 path: Path::new("other.rs").into(),
1939 },
1940 cx,
1941 )
1942 }))
1943 .await
1944 .unwrap();
1945
1946 // Simulate a language server reporting errors for a file.
1947 let mut fake_language_server = fake_language_servers.next().await.unwrap();
1948 fake_language_server
1949 .receive_notification::<lsp::notification::DidOpenTextDocument>()
1950 .await;
1951 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
1952 lsp::PublishDiagnosticsParams {
1953 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1954 version: None,
1955 diagnostics: vec![lsp::Diagnostic {
1956 severity: Some(lsp::DiagnosticSeverity::ERROR),
1957 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1958 message: "message 1".to_string(),
1959 ..Default::default()
1960 }],
1961 },
1962 );
1963
1964 // Wait for server to see the diagnostics update.
1965 server
1966 .condition(|store| {
1967 let worktree = store
1968 .project(project_id)
1969 .unwrap()
1970 .share
1971 .as_ref()
1972 .unwrap()
1973 .worktrees
1974 .get(&worktree_id.to_proto())
1975 .unwrap();
1976
1977 !worktree.diagnostic_summaries.is_empty()
1978 })
1979 .await;
1980
1981 // Join the worktree as client B.
1982 let project_b = Project::remote(
1983 project_id,
1984 client_b.clone(),
1985 client_b.user_store.clone(),
1986 lang_registry.clone(),
1987 fs.clone(),
1988 &mut cx_b.to_async(),
1989 )
1990 .await
1991 .unwrap();
1992
1993 project_b.read_with(cx_b, |project, cx| {
1994 assert_eq!(
1995 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
1996 &[(
1997 ProjectPath {
1998 worktree_id,
1999 path: Arc::from(Path::new("a.rs")),
2000 },
2001 DiagnosticSummary {
2002 error_count: 1,
2003 warning_count: 0,
2004 ..Default::default()
2005 },
2006 )]
2007 )
2008 });
2009
2010 // Simulate a language server reporting more errors for a file.
2011 fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2012 lsp::PublishDiagnosticsParams {
2013 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2014 version: None,
2015 diagnostics: vec![
2016 lsp::Diagnostic {
2017 severity: Some(lsp::DiagnosticSeverity::ERROR),
2018 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2019 message: "message 1".to_string(),
2020 ..Default::default()
2021 },
2022 lsp::Diagnostic {
2023 severity: Some(lsp::DiagnosticSeverity::WARNING),
2024 range: lsp::Range::new(
2025 lsp::Position::new(0, 10),
2026 lsp::Position::new(0, 13),
2027 ),
2028 message: "message 2".to_string(),
2029 ..Default::default()
2030 },
2031 ],
2032 },
2033 );
2034
2035 // Client b gets the updated summaries
2036 project_b
2037 .condition(&cx_b, |project, cx| {
2038 project.diagnostic_summaries(cx).collect::<Vec<_>>()
2039 == &[(
2040 ProjectPath {
2041 worktree_id,
2042 path: Arc::from(Path::new("a.rs")),
2043 },
2044 DiagnosticSummary {
2045 error_count: 1,
2046 warning_count: 1,
2047 ..Default::default()
2048 },
2049 )]
2050 })
2051 .await;
2052
2053 // Open the file with the errors on client B. They should be present.
2054 let buffer_b = cx_b
2055 .background()
2056 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2057 .await
2058 .unwrap();
2059
2060 buffer_b.read_with(cx_b, |buffer, _| {
2061 assert_eq!(
2062 buffer
2063 .snapshot()
2064 .diagnostics_in_range::<_, Point>(0..buffer.len())
2065 .map(|entry| entry)
2066 .collect::<Vec<_>>(),
2067 &[
2068 DiagnosticEntry {
2069 range: Point::new(0, 4)..Point::new(0, 7),
2070 diagnostic: Diagnostic {
2071 group_id: 0,
2072 message: "message 1".to_string(),
2073 severity: lsp::DiagnosticSeverity::ERROR,
2074 is_primary: true,
2075 ..Default::default()
2076 }
2077 },
2078 DiagnosticEntry {
2079 range: Point::new(0, 10)..Point::new(0, 13),
2080 diagnostic: Diagnostic {
2081 group_id: 1,
2082 severity: lsp::DiagnosticSeverity::WARNING,
2083 message: "message 2".to_string(),
2084 is_primary: true,
2085 ..Default::default()
2086 }
2087 }
2088 ]
2089 );
2090 });
2091 }
2092
2093 #[gpui::test(iterations = 10)]
2094 async fn test_collaborating_with_completion(
2095 cx_a: &mut TestAppContext,
2096 cx_b: &mut TestAppContext,
2097 ) {
2098 cx_a.foreground().forbid_parking();
2099 let mut lang_registry = Arc::new(LanguageRegistry::test());
2100 let fs = FakeFs::new(cx_a.background());
2101
2102 // Set up a fake language server.
2103 let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2104 language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2105 completion_provider: Some(lsp::CompletionOptions {
2106 trigger_characters: Some(vec![".".to_string()]),
2107 ..Default::default()
2108 }),
2109 ..Default::default()
2110 });
2111 Arc::get_mut(&mut lang_registry)
2112 .unwrap()
2113 .add(Arc::new(Language::new(
2114 LanguageConfig {
2115 name: "Rust".into(),
2116 path_suffixes: vec!["rs".to_string()],
2117 language_server: Some(language_server_config),
2118 ..Default::default()
2119 },
2120 Some(tree_sitter_rust::language()),
2121 )));
2122
2123 // Connect to a server as 2 clients.
2124 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2125 let client_a = server.create_client(cx_a, "user_a").await;
2126 let client_b = server.create_client(cx_b, "user_b").await;
2127
2128 // Share a project as client A
2129 fs.insert_tree(
2130 "/a",
2131 json!({
2132 ".zed.toml": r#"collaborators = ["user_b"]"#,
2133 "main.rs": "fn main() { a }",
2134 "other.rs": "",
2135 }),
2136 )
2137 .await;
2138 let project_a = cx_a.update(|cx| {
2139 Project::local(
2140 client_a.clone(),
2141 client_a.user_store.clone(),
2142 lang_registry.clone(),
2143 fs.clone(),
2144 cx,
2145 )
2146 });
2147 let (worktree_a, _) = project_a
2148 .update(cx_a, |p, cx| {
2149 p.find_or_create_local_worktree("/a", true, cx)
2150 })
2151 .await
2152 .unwrap();
2153 worktree_a
2154 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2155 .await;
2156 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2157 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2158 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2159
2160 // Join the worktree as client B.
2161 let project_b = Project::remote(
2162 project_id,
2163 client_b.clone(),
2164 client_b.user_store.clone(),
2165 lang_registry.clone(),
2166 fs.clone(),
2167 &mut cx_b.to_async(),
2168 )
2169 .await
2170 .unwrap();
2171
2172 // Open a file in an editor as the guest.
2173 let buffer_b = project_b
2174 .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2175 .await
2176 .unwrap();
2177 let (window_b, _) = cx_b.add_window(|_| EmptyView);
2178 let editor_b = cx_b.add_view(window_b, |cx| {
2179 Editor::for_buffer(
2180 cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2181 Some(project_b.clone()),
2182 watch::channel_with(Settings::test(cx)).1,
2183 cx,
2184 )
2185 });
2186
2187 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2188 buffer_b
2189 .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2190 .await;
2191
2192 // Type a completion trigger character as the guest.
2193 editor_b.update(cx_b, |editor, cx| {
2194 editor.select_ranges([13..13], None, cx);
2195 editor.handle_input(&Input(".".into()), cx);
2196 cx.focus(&editor_b);
2197 });
2198
2199 // Receive a completion request as the host's language server.
2200 // Return some completions from the host's language server.
2201 cx_a.foreground().start_waiting();
2202 fake_language_server
2203 .handle_request::<lsp::request::Completion, _>(|params, _| {
2204 assert_eq!(
2205 params.text_document_position.text_document.uri,
2206 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2207 );
2208 assert_eq!(
2209 params.text_document_position.position,
2210 lsp::Position::new(0, 14),
2211 );
2212
2213 Some(lsp::CompletionResponse::Array(vec![
2214 lsp::CompletionItem {
2215 label: "first_method(…)".into(),
2216 detail: Some("fn(&mut self, B) -> C".into()),
2217 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2218 new_text: "first_method($1)".to_string(),
2219 range: lsp::Range::new(
2220 lsp::Position::new(0, 14),
2221 lsp::Position::new(0, 14),
2222 ),
2223 })),
2224 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2225 ..Default::default()
2226 },
2227 lsp::CompletionItem {
2228 label: "second_method(…)".into(),
2229 detail: Some("fn(&mut self, C) -> D<E>".into()),
2230 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2231 new_text: "second_method()".to_string(),
2232 range: lsp::Range::new(
2233 lsp::Position::new(0, 14),
2234 lsp::Position::new(0, 14),
2235 ),
2236 })),
2237 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2238 ..Default::default()
2239 },
2240 ]))
2241 })
2242 .next()
2243 .await
2244 .unwrap();
2245 cx_a.foreground().finish_waiting();
2246
2247 // Open the buffer on the host.
2248 let buffer_a = project_a
2249 .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2250 .await
2251 .unwrap();
2252 buffer_a
2253 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2254 .await;
2255
2256 // Confirm a completion on the guest.
2257 editor_b
2258 .condition(&cx_b, |editor, _| editor.context_menu_visible())
2259 .await;
2260 editor_b.update(cx_b, |editor, cx| {
2261 editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2262 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2263 });
2264
2265 // Return a resolved completion from the host's language server.
2266 // The resolved completion has an additional text edit.
2267 fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(
2268 |params, _| {
2269 assert_eq!(params.label, "first_method(…)");
2270 lsp::CompletionItem {
2271 label: "first_method(…)".into(),
2272 detail: Some("fn(&mut self, B) -> C".into()),
2273 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2274 new_text: "first_method($1)".to_string(),
2275 range: lsp::Range::new(
2276 lsp::Position::new(0, 14),
2277 lsp::Position::new(0, 14),
2278 ),
2279 })),
2280 additional_text_edits: Some(vec![lsp::TextEdit {
2281 new_text: "use d::SomeTrait;\n".to_string(),
2282 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2283 }]),
2284 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2285 ..Default::default()
2286 }
2287 },
2288 );
2289
2290 // The additional edit is applied.
2291 buffer_a
2292 .condition(&cx_a, |buffer, _| {
2293 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2294 })
2295 .await;
2296 buffer_b
2297 .condition(&cx_b, |buffer, _| {
2298 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2299 })
2300 .await;
2301 }
2302
2303 #[gpui::test(iterations = 10)]
2304 async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2305 cx_a.foreground().forbid_parking();
2306 let mut lang_registry = Arc::new(LanguageRegistry::test());
2307 let fs = FakeFs::new(cx_a.background());
2308
2309 // Set up a fake language server.
2310 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2311 Arc::get_mut(&mut lang_registry)
2312 .unwrap()
2313 .add(Arc::new(Language::new(
2314 LanguageConfig {
2315 name: "Rust".into(),
2316 path_suffixes: vec!["rs".to_string()],
2317 language_server: Some(language_server_config),
2318 ..Default::default()
2319 },
2320 Some(tree_sitter_rust::language()),
2321 )));
2322
2323 // Connect to a server as 2 clients.
2324 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2325 let client_a = server.create_client(cx_a, "user_a").await;
2326 let client_b = server.create_client(cx_b, "user_b").await;
2327
2328 // Share a project as client A
2329 fs.insert_tree(
2330 "/a",
2331 json!({
2332 ".zed.toml": r#"collaborators = ["user_b"]"#,
2333 "a.rs": "let one = two",
2334 }),
2335 )
2336 .await;
2337 let project_a = cx_a.update(|cx| {
2338 Project::local(
2339 client_a.clone(),
2340 client_a.user_store.clone(),
2341 lang_registry.clone(),
2342 fs.clone(),
2343 cx,
2344 )
2345 });
2346 let (worktree_a, _) = project_a
2347 .update(cx_a, |p, cx| {
2348 p.find_or_create_local_worktree("/a", true, cx)
2349 })
2350 .await
2351 .unwrap();
2352 worktree_a
2353 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2354 .await;
2355 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2356 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2357 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2358
2359 // Join the worktree as client B.
2360 let project_b = Project::remote(
2361 project_id,
2362 client_b.clone(),
2363 client_b.user_store.clone(),
2364 lang_registry.clone(),
2365 fs.clone(),
2366 &mut cx_b.to_async(),
2367 )
2368 .await
2369 .unwrap();
2370
2371 let buffer_b = cx_b
2372 .background()
2373 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2374 .await
2375 .unwrap();
2376
2377 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2378 fake_language_server.handle_request::<lsp::request::Formatting, _>(|_, _| {
2379 Some(vec![
2380 lsp::TextEdit {
2381 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2382 new_text: "h".to_string(),
2383 },
2384 lsp::TextEdit {
2385 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2386 new_text: "y".to_string(),
2387 },
2388 ])
2389 });
2390
2391 project_b
2392 .update(cx_b, |project, cx| {
2393 project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2394 })
2395 .await
2396 .unwrap();
2397 assert_eq!(
2398 buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
2399 "let honey = two"
2400 );
2401 }
2402
2403 #[gpui::test(iterations = 10)]
2404 async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2405 cx_a.foreground().forbid_parking();
2406 let mut lang_registry = Arc::new(LanguageRegistry::test());
2407 let fs = FakeFs::new(cx_a.background());
2408 fs.insert_tree(
2409 "/root-1",
2410 json!({
2411 ".zed.toml": r#"collaborators = ["user_b"]"#,
2412 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2413 }),
2414 )
2415 .await;
2416 fs.insert_tree(
2417 "/root-2",
2418 json!({
2419 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2420 }),
2421 )
2422 .await;
2423
2424 // Set up a fake language server.
2425 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2426 Arc::get_mut(&mut lang_registry)
2427 .unwrap()
2428 .add(Arc::new(Language::new(
2429 LanguageConfig {
2430 name: "Rust".into(),
2431 path_suffixes: vec!["rs".to_string()],
2432 language_server: Some(language_server_config),
2433 ..Default::default()
2434 },
2435 Some(tree_sitter_rust::language()),
2436 )));
2437
2438 // Connect to a server as 2 clients.
2439 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2440 let client_a = server.create_client(cx_a, "user_a").await;
2441 let client_b = server.create_client(cx_b, "user_b").await;
2442
2443 // Share a project as client A
2444 let project_a = cx_a.update(|cx| {
2445 Project::local(
2446 client_a.clone(),
2447 client_a.user_store.clone(),
2448 lang_registry.clone(),
2449 fs.clone(),
2450 cx,
2451 )
2452 });
2453 let (worktree_a, _) = project_a
2454 .update(cx_a, |p, cx| {
2455 p.find_or_create_local_worktree("/root-1", true, cx)
2456 })
2457 .await
2458 .unwrap();
2459 worktree_a
2460 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2461 .await;
2462 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2463 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2464 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2465
2466 // Join the worktree as client B.
2467 let project_b = Project::remote(
2468 project_id,
2469 client_b.clone(),
2470 client_b.user_store.clone(),
2471 lang_registry.clone(),
2472 fs.clone(),
2473 &mut cx_b.to_async(),
2474 )
2475 .await
2476 .unwrap();
2477
2478 // Open the file on client B.
2479 let buffer_b = cx_b
2480 .background()
2481 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2482 .await
2483 .unwrap();
2484
2485 // Request the definition of a symbol as the guest.
2486 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2487 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2488 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2489 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2490 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2491 )))
2492 });
2493
2494 let definitions_1 = project_b
2495 .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
2496 .await
2497 .unwrap();
2498 cx_b.read(|cx| {
2499 assert_eq!(definitions_1.len(), 1);
2500 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2501 let target_buffer = definitions_1[0].buffer.read(cx);
2502 assert_eq!(
2503 target_buffer.text(),
2504 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2505 );
2506 assert_eq!(
2507 definitions_1[0].range.to_point(target_buffer),
2508 Point::new(0, 6)..Point::new(0, 9)
2509 );
2510 });
2511
2512 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2513 // the previous call to `definition`.
2514 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2515 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2516 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2517 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2518 )))
2519 });
2520
2521 let definitions_2 = project_b
2522 .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
2523 .await
2524 .unwrap();
2525 cx_b.read(|cx| {
2526 assert_eq!(definitions_2.len(), 1);
2527 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2528 let target_buffer = definitions_2[0].buffer.read(cx);
2529 assert_eq!(
2530 target_buffer.text(),
2531 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2532 );
2533 assert_eq!(
2534 definitions_2[0].range.to_point(target_buffer),
2535 Point::new(1, 6)..Point::new(1, 11)
2536 );
2537 });
2538 assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
2539 }
2540
2541 #[gpui::test(iterations = 10)]
2542 async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2543 cx_a.foreground().forbid_parking();
2544 let mut lang_registry = Arc::new(LanguageRegistry::test());
2545 let fs = FakeFs::new(cx_a.background());
2546 fs.insert_tree(
2547 "/root-1",
2548 json!({
2549 ".zed.toml": r#"collaborators = ["user_b"]"#,
2550 "one.rs": "const ONE: usize = 1;",
2551 "two.rs": "const TWO: usize = one::ONE + one::ONE;",
2552 }),
2553 )
2554 .await;
2555 fs.insert_tree(
2556 "/root-2",
2557 json!({
2558 "three.rs": "const THREE: usize = two::TWO + one::ONE;",
2559 }),
2560 )
2561 .await;
2562
2563 // Set up a fake language server.
2564 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2565 Arc::get_mut(&mut lang_registry)
2566 .unwrap()
2567 .add(Arc::new(Language::new(
2568 LanguageConfig {
2569 name: "Rust".into(),
2570 path_suffixes: vec!["rs".to_string()],
2571 language_server: Some(language_server_config),
2572 ..Default::default()
2573 },
2574 Some(tree_sitter_rust::language()),
2575 )));
2576
2577 // Connect to a server as 2 clients.
2578 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2579 let client_a = server.create_client(cx_a, "user_a").await;
2580 let client_b = server.create_client(cx_b, "user_b").await;
2581
2582 // Share a project as client A
2583 let project_a = cx_a.update(|cx| {
2584 Project::local(
2585 client_a.clone(),
2586 client_a.user_store.clone(),
2587 lang_registry.clone(),
2588 fs.clone(),
2589 cx,
2590 )
2591 });
2592 let (worktree_a, _) = project_a
2593 .update(cx_a, |p, cx| {
2594 p.find_or_create_local_worktree("/root-1", true, cx)
2595 })
2596 .await
2597 .unwrap();
2598 worktree_a
2599 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2600 .await;
2601 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2602 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2603 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2604
2605 // Join the worktree as client B.
2606 let project_b = Project::remote(
2607 project_id,
2608 client_b.clone(),
2609 client_b.user_store.clone(),
2610 lang_registry.clone(),
2611 fs.clone(),
2612 &mut cx_b.to_async(),
2613 )
2614 .await
2615 .unwrap();
2616
2617 // Open the file on client B.
2618 let buffer_b = cx_b
2619 .background()
2620 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
2621 .await
2622 .unwrap();
2623
2624 // Request references to a symbol as the guest.
2625 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2626 fake_language_server.handle_request::<lsp::request::References, _>(|params, _| {
2627 assert_eq!(
2628 params.text_document_position.text_document.uri.as_str(),
2629 "file:///root-1/one.rs"
2630 );
2631 Some(vec![
2632 lsp::Location {
2633 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2634 range: lsp::Range::new(lsp::Position::new(0, 24), lsp::Position::new(0, 27)),
2635 },
2636 lsp::Location {
2637 uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2638 range: lsp::Range::new(lsp::Position::new(0, 35), lsp::Position::new(0, 38)),
2639 },
2640 lsp::Location {
2641 uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
2642 range: lsp::Range::new(lsp::Position::new(0, 37), lsp::Position::new(0, 40)),
2643 },
2644 ])
2645 });
2646
2647 let references = project_b
2648 .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
2649 .await
2650 .unwrap();
2651 cx_b.read(|cx| {
2652 assert_eq!(references.len(), 3);
2653 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2654
2655 let two_buffer = references[0].buffer.read(cx);
2656 let three_buffer = references[2].buffer.read(cx);
2657 assert_eq!(
2658 two_buffer.file().unwrap().path().as_ref(),
2659 Path::new("two.rs")
2660 );
2661 assert_eq!(references[1].buffer, references[0].buffer);
2662 assert_eq!(
2663 three_buffer.file().unwrap().full_path(cx),
2664 Path::new("three.rs")
2665 );
2666
2667 assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
2668 assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
2669 assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
2670 });
2671 }
2672
2673 #[gpui::test(iterations = 10)]
2674 async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2675 cx_a.foreground().forbid_parking();
2676 let lang_registry = Arc::new(LanguageRegistry::test());
2677 let fs = FakeFs::new(cx_a.background());
2678 fs.insert_tree(
2679 "/root-1",
2680 json!({
2681 ".zed.toml": r#"collaborators = ["user_b"]"#,
2682 "a": "hello world",
2683 "b": "goodnight moon",
2684 "c": "a world of goo",
2685 "d": "world champion of clown world",
2686 }),
2687 )
2688 .await;
2689 fs.insert_tree(
2690 "/root-2",
2691 json!({
2692 "e": "disney world is fun",
2693 }),
2694 )
2695 .await;
2696
2697 // Connect to a server as 2 clients.
2698 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2699 let client_a = server.create_client(cx_a, "user_a").await;
2700 let client_b = server.create_client(cx_b, "user_b").await;
2701
2702 // Share a project as client A
2703 let project_a = cx_a.update(|cx| {
2704 Project::local(
2705 client_a.clone(),
2706 client_a.user_store.clone(),
2707 lang_registry.clone(),
2708 fs.clone(),
2709 cx,
2710 )
2711 });
2712 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2713
2714 let (worktree_1, _) = project_a
2715 .update(cx_a, |p, cx| {
2716 p.find_or_create_local_worktree("/root-1", true, cx)
2717 })
2718 .await
2719 .unwrap();
2720 worktree_1
2721 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2722 .await;
2723 let (worktree_2, _) = project_a
2724 .update(cx_a, |p, cx| {
2725 p.find_or_create_local_worktree("/root-2", true, cx)
2726 })
2727 .await
2728 .unwrap();
2729 worktree_2
2730 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2731 .await;
2732
2733 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2734
2735 // Join the worktree as client B.
2736 let project_b = Project::remote(
2737 project_id,
2738 client_b.clone(),
2739 client_b.user_store.clone(),
2740 lang_registry.clone(),
2741 fs.clone(),
2742 &mut cx_b.to_async(),
2743 )
2744 .await
2745 .unwrap();
2746
2747 let results = project_b
2748 .update(cx_b, |project, cx| {
2749 project.search(SearchQuery::text("world", false, false), cx)
2750 })
2751 .await
2752 .unwrap();
2753
2754 let mut ranges_by_path = results
2755 .into_iter()
2756 .map(|(buffer, ranges)| {
2757 buffer.read_with(cx_b, |buffer, cx| {
2758 let path = buffer.file().unwrap().full_path(cx);
2759 let offset_ranges = ranges
2760 .into_iter()
2761 .map(|range| range.to_offset(buffer))
2762 .collect::<Vec<_>>();
2763 (path, offset_ranges)
2764 })
2765 })
2766 .collect::<Vec<_>>();
2767 ranges_by_path.sort_by_key(|(path, _)| path.clone());
2768
2769 assert_eq!(
2770 ranges_by_path,
2771 &[
2772 (PathBuf::from("root-1/a"), vec![6..11]),
2773 (PathBuf::from("root-1/c"), vec![2..7]),
2774 (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
2775 (PathBuf::from("root-2/e"), vec![7..12]),
2776 ]
2777 );
2778 }
2779
2780 #[gpui::test(iterations = 10)]
2781 async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2782 cx_a.foreground().forbid_parking();
2783 let lang_registry = Arc::new(LanguageRegistry::test());
2784 let fs = FakeFs::new(cx_a.background());
2785 fs.insert_tree(
2786 "/root-1",
2787 json!({
2788 ".zed.toml": r#"collaborators = ["user_b"]"#,
2789 "main.rs": "fn double(number: i32) -> i32 { number + number }",
2790 }),
2791 )
2792 .await;
2793
2794 // Set up a fake language server.
2795 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2796 lang_registry.add(Arc::new(Language::new(
2797 LanguageConfig {
2798 name: "Rust".into(),
2799 path_suffixes: vec!["rs".to_string()],
2800 language_server: Some(language_server_config),
2801 ..Default::default()
2802 },
2803 Some(tree_sitter_rust::language()),
2804 )));
2805
2806 // Connect to a server as 2 clients.
2807 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2808 let client_a = server.create_client(cx_a, "user_a").await;
2809 let client_b = server.create_client(cx_b, "user_b").await;
2810
2811 // Share a project as client A
2812 let project_a = cx_a.update(|cx| {
2813 Project::local(
2814 client_a.clone(),
2815 client_a.user_store.clone(),
2816 lang_registry.clone(),
2817 fs.clone(),
2818 cx,
2819 )
2820 });
2821 let (worktree_a, _) = project_a
2822 .update(cx_a, |p, cx| {
2823 p.find_or_create_local_worktree("/root-1", true, cx)
2824 })
2825 .await
2826 .unwrap();
2827 worktree_a
2828 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2829 .await;
2830 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2831 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2832 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2833
2834 // Join the worktree as client B.
2835 let project_b = Project::remote(
2836 project_id,
2837 client_b.clone(),
2838 client_b.user_store.clone(),
2839 lang_registry.clone(),
2840 fs.clone(),
2841 &mut cx_b.to_async(),
2842 )
2843 .await
2844 .unwrap();
2845
2846 // Open the file on client B.
2847 let buffer_b = cx_b
2848 .background()
2849 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
2850 .await
2851 .unwrap();
2852
2853 // Request document highlights as the guest.
2854 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2855 fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _>(
2856 |params, _| {
2857 assert_eq!(
2858 params
2859 .text_document_position_params
2860 .text_document
2861 .uri
2862 .as_str(),
2863 "file:///root-1/main.rs"
2864 );
2865 assert_eq!(
2866 params.text_document_position_params.position,
2867 lsp::Position::new(0, 34)
2868 );
2869 Some(vec![
2870 lsp::DocumentHighlight {
2871 kind: Some(lsp::DocumentHighlightKind::WRITE),
2872 range: lsp::Range::new(
2873 lsp::Position::new(0, 10),
2874 lsp::Position::new(0, 16),
2875 ),
2876 },
2877 lsp::DocumentHighlight {
2878 kind: Some(lsp::DocumentHighlightKind::READ),
2879 range: lsp::Range::new(
2880 lsp::Position::new(0, 32),
2881 lsp::Position::new(0, 38),
2882 ),
2883 },
2884 lsp::DocumentHighlight {
2885 kind: Some(lsp::DocumentHighlightKind::READ),
2886 range: lsp::Range::new(
2887 lsp::Position::new(0, 41),
2888 lsp::Position::new(0, 47),
2889 ),
2890 },
2891 ])
2892 },
2893 );
2894
2895 let highlights = project_b
2896 .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
2897 .await
2898 .unwrap();
2899 buffer_b.read_with(cx_b, |buffer, _| {
2900 let snapshot = buffer.snapshot();
2901
2902 let highlights = highlights
2903 .into_iter()
2904 .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
2905 .collect::<Vec<_>>();
2906 assert_eq!(
2907 highlights,
2908 &[
2909 (lsp::DocumentHighlightKind::WRITE, 10..16),
2910 (lsp::DocumentHighlightKind::READ, 32..38),
2911 (lsp::DocumentHighlightKind::READ, 41..47)
2912 ]
2913 )
2914 });
2915 }
2916
2917 #[gpui::test(iterations = 10)]
2918 async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2919 cx_a.foreground().forbid_parking();
2920 let mut lang_registry = Arc::new(LanguageRegistry::test());
2921 let fs = FakeFs::new(cx_a.background());
2922 fs.insert_tree(
2923 "/code",
2924 json!({
2925 "crate-1": {
2926 ".zed.toml": r#"collaborators = ["user_b"]"#,
2927 "one.rs": "const ONE: usize = 1;",
2928 },
2929 "crate-2": {
2930 "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
2931 },
2932 "private": {
2933 "passwords.txt": "the-password",
2934 }
2935 }),
2936 )
2937 .await;
2938
2939 // Set up a fake language server.
2940 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2941 Arc::get_mut(&mut lang_registry)
2942 .unwrap()
2943 .add(Arc::new(Language::new(
2944 LanguageConfig {
2945 name: "Rust".into(),
2946 path_suffixes: vec!["rs".to_string()],
2947 language_server: Some(language_server_config),
2948 ..Default::default()
2949 },
2950 Some(tree_sitter_rust::language()),
2951 )));
2952
2953 // Connect to a server as 2 clients.
2954 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2955 let client_a = server.create_client(cx_a, "user_a").await;
2956 let client_b = server.create_client(cx_b, "user_b").await;
2957
2958 // Share a project as client A
2959 let project_a = cx_a.update(|cx| {
2960 Project::local(
2961 client_a.clone(),
2962 client_a.user_store.clone(),
2963 lang_registry.clone(),
2964 fs.clone(),
2965 cx,
2966 )
2967 });
2968 let (worktree_a, _) = project_a
2969 .update(cx_a, |p, cx| {
2970 p.find_or_create_local_worktree("/code/crate-1", true, cx)
2971 })
2972 .await
2973 .unwrap();
2974 worktree_a
2975 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2976 .await;
2977 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2978 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2979 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2980
2981 // Join the worktree as client B.
2982 let project_b = Project::remote(
2983 project_id,
2984 client_b.clone(),
2985 client_b.user_store.clone(),
2986 lang_registry.clone(),
2987 fs.clone(),
2988 &mut cx_b.to_async(),
2989 )
2990 .await
2991 .unwrap();
2992
2993 // Cause the language server to start.
2994 let _buffer = cx_b
2995 .background()
2996 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
2997 .await
2998 .unwrap();
2999
3000 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3001 fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _>(|_, _| {
3002 #[allow(deprecated)]
3003 Some(vec![lsp::SymbolInformation {
3004 name: "TWO".into(),
3005 location: lsp::Location {
3006 uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3007 range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3008 },
3009 kind: lsp::SymbolKind::CONSTANT,
3010 tags: None,
3011 container_name: None,
3012 deprecated: None,
3013 }])
3014 });
3015
3016 // Request the definition of a symbol as the guest.
3017 let symbols = project_b
3018 .update(cx_b, |p, cx| p.symbols("two", cx))
3019 .await
3020 .unwrap();
3021 assert_eq!(symbols.len(), 1);
3022 assert_eq!(symbols[0].name, "TWO");
3023
3024 // Open one of the returned symbols.
3025 let buffer_b_2 = project_b
3026 .update(cx_b, |project, cx| {
3027 project.open_buffer_for_symbol(&symbols[0], cx)
3028 })
3029 .await
3030 .unwrap();
3031 buffer_b_2.read_with(cx_b, |buffer, _| {
3032 assert_eq!(
3033 buffer.file().unwrap().path().as_ref(),
3034 Path::new("../crate-2/two.rs")
3035 );
3036 });
3037
3038 // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3039 let mut fake_symbol = symbols[0].clone();
3040 fake_symbol.path = Path::new("/code/secrets").into();
3041 let error = project_b
3042 .update(cx_b, |project, cx| {
3043 project.open_buffer_for_symbol(&fake_symbol, cx)
3044 })
3045 .await
3046 .unwrap_err();
3047 assert!(error.to_string().contains("invalid symbol signature"));
3048 }
3049
3050 #[gpui::test(iterations = 10)]
3051 async fn test_open_buffer_while_getting_definition_pointing_to_it(
3052 cx_a: &mut TestAppContext,
3053 cx_b: &mut TestAppContext,
3054 mut rng: StdRng,
3055 ) {
3056 cx_a.foreground().forbid_parking();
3057 let mut lang_registry = Arc::new(LanguageRegistry::test());
3058 let fs = FakeFs::new(cx_a.background());
3059 fs.insert_tree(
3060 "/root",
3061 json!({
3062 ".zed.toml": r#"collaborators = ["user_b"]"#,
3063 "a.rs": "const ONE: usize = b::TWO;",
3064 "b.rs": "const TWO: usize = 2",
3065 }),
3066 )
3067 .await;
3068
3069 // Set up a fake language server.
3070 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3071
3072 Arc::get_mut(&mut lang_registry)
3073 .unwrap()
3074 .add(Arc::new(Language::new(
3075 LanguageConfig {
3076 name: "Rust".into(),
3077 path_suffixes: vec!["rs".to_string()],
3078 language_server: Some(language_server_config),
3079 ..Default::default()
3080 },
3081 Some(tree_sitter_rust::language()),
3082 )));
3083
3084 // Connect to a server as 2 clients.
3085 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3086 let client_a = server.create_client(cx_a, "user_a").await;
3087 let client_b = server.create_client(cx_b, "user_b").await;
3088
3089 // Share a project as client A
3090 let project_a = cx_a.update(|cx| {
3091 Project::local(
3092 client_a.clone(),
3093 client_a.user_store.clone(),
3094 lang_registry.clone(),
3095 fs.clone(),
3096 cx,
3097 )
3098 });
3099
3100 let (worktree_a, _) = project_a
3101 .update(cx_a, |p, cx| {
3102 p.find_or_create_local_worktree("/root", true, cx)
3103 })
3104 .await
3105 .unwrap();
3106 worktree_a
3107 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3108 .await;
3109 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3110 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3111 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3112
3113 // Join the worktree as client B.
3114 let project_b = Project::remote(
3115 project_id,
3116 client_b.clone(),
3117 client_b.user_store.clone(),
3118 lang_registry.clone(),
3119 fs.clone(),
3120 &mut cx_b.to_async(),
3121 )
3122 .await
3123 .unwrap();
3124
3125 let buffer_b1 = cx_b
3126 .background()
3127 .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3128 .await
3129 .unwrap();
3130
3131 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3132 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
3133 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
3134 lsp::Url::from_file_path("/root/b.rs").unwrap(),
3135 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3136 )))
3137 });
3138
3139 let definitions;
3140 let buffer_b2;
3141 if rng.gen() {
3142 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3143 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3144 } else {
3145 buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3146 definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3147 }
3148
3149 let buffer_b2 = buffer_b2.await.unwrap();
3150 let definitions = definitions.await.unwrap();
3151 assert_eq!(definitions.len(), 1);
3152 assert_eq!(definitions[0].buffer, buffer_b2);
3153 }
3154
3155 #[gpui::test(iterations = 10)]
3156 async fn test_collaborating_with_code_actions(
3157 cx_a: &mut TestAppContext,
3158 cx_b: &mut TestAppContext,
3159 ) {
3160 cx_a.foreground().forbid_parking();
3161 let mut lang_registry = Arc::new(LanguageRegistry::test());
3162 let fs = FakeFs::new(cx_a.background());
3163 let mut path_openers_b = Vec::new();
3164 cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3165
3166 // Set up a fake language server.
3167 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3168 Arc::get_mut(&mut lang_registry)
3169 .unwrap()
3170 .add(Arc::new(Language::new(
3171 LanguageConfig {
3172 name: "Rust".into(),
3173 path_suffixes: vec!["rs".to_string()],
3174 language_server: Some(language_server_config),
3175 ..Default::default()
3176 },
3177 Some(tree_sitter_rust::language()),
3178 )));
3179
3180 // Connect to a server as 2 clients.
3181 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3182 let client_a = server.create_client(cx_a, "user_a").await;
3183 let client_b = server.create_client(cx_b, "user_b").await;
3184
3185 // Share a project as client A
3186 fs.insert_tree(
3187 "/a",
3188 json!({
3189 ".zed.toml": r#"collaborators = ["user_b"]"#,
3190 "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3191 "other.rs": "pub fn foo() -> usize { 4 }",
3192 }),
3193 )
3194 .await;
3195 let project_a = cx_a.update(|cx| {
3196 Project::local(
3197 client_a.clone(),
3198 client_a.user_store.clone(),
3199 lang_registry.clone(),
3200 fs.clone(),
3201 cx,
3202 )
3203 });
3204 let (worktree_a, _) = project_a
3205 .update(cx_a, |p, cx| {
3206 p.find_or_create_local_worktree("/a", true, cx)
3207 })
3208 .await
3209 .unwrap();
3210 worktree_a
3211 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3212 .await;
3213 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3214 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3215 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3216
3217 // Join the worktree as client B.
3218 let project_b = Project::remote(
3219 project_id,
3220 client_b.clone(),
3221 client_b.user_store.clone(),
3222 lang_registry.clone(),
3223 fs.clone(),
3224 &mut cx_b.to_async(),
3225 )
3226 .await
3227 .unwrap();
3228 let mut params = cx_b.update(WorkspaceParams::test);
3229 params.languages = lang_registry.clone();
3230 params.client = client_b.client.clone();
3231 params.user_store = client_b.user_store.clone();
3232 params.project = project_b;
3233 params.path_openers = path_openers_b.into();
3234
3235 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
3236 let editor_b = workspace_b
3237 .update(cx_b, |workspace, cx| {
3238 workspace.open_path((worktree_id, "main.rs").into(), cx)
3239 })
3240 .await
3241 .unwrap()
3242 .downcast::<Editor>()
3243 .unwrap();
3244
3245 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3246 fake_language_server
3247 .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3248 assert_eq!(
3249 params.text_document.uri,
3250 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3251 );
3252 assert_eq!(params.range.start, lsp::Position::new(0, 0));
3253 assert_eq!(params.range.end, lsp::Position::new(0, 0));
3254 None
3255 })
3256 .next()
3257 .await;
3258
3259 // Move cursor to a location that contains code actions.
3260 editor_b.update(cx_b, |editor, cx| {
3261 editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3262 cx.focus(&editor_b);
3263 });
3264
3265 fake_language_server
3266 .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3267 assert_eq!(
3268 params.text_document.uri,
3269 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3270 );
3271 assert_eq!(params.range.start, lsp::Position::new(1, 31));
3272 assert_eq!(params.range.end, lsp::Position::new(1, 31));
3273
3274 Some(vec![lsp::CodeActionOrCommand::CodeAction(
3275 lsp::CodeAction {
3276 title: "Inline into all callers".to_string(),
3277 edit: Some(lsp::WorkspaceEdit {
3278 changes: Some(
3279 [
3280 (
3281 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3282 vec![lsp::TextEdit::new(
3283 lsp::Range::new(
3284 lsp::Position::new(1, 22),
3285 lsp::Position::new(1, 34),
3286 ),
3287 "4".to_string(),
3288 )],
3289 ),
3290 (
3291 lsp::Url::from_file_path("/a/other.rs").unwrap(),
3292 vec![lsp::TextEdit::new(
3293 lsp::Range::new(
3294 lsp::Position::new(0, 0),
3295 lsp::Position::new(0, 27),
3296 ),
3297 "".to_string(),
3298 )],
3299 ),
3300 ]
3301 .into_iter()
3302 .collect(),
3303 ),
3304 ..Default::default()
3305 }),
3306 data: Some(json!({
3307 "codeActionParams": {
3308 "range": {
3309 "start": {"line": 1, "column": 31},
3310 "end": {"line": 1, "column": 31},
3311 }
3312 }
3313 })),
3314 ..Default::default()
3315 },
3316 )])
3317 })
3318 .next()
3319 .await;
3320
3321 // Toggle code actions and wait for them to display.
3322 editor_b.update(cx_b, |editor, cx| {
3323 editor.toggle_code_actions(&ToggleCodeActions(false), cx);
3324 });
3325 editor_b
3326 .condition(&cx_b, |editor, _| editor.context_menu_visible())
3327 .await;
3328
3329 fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3330
3331 // Confirming the code action will trigger a resolve request.
3332 let confirm_action = workspace_b
3333 .update(cx_b, |workspace, cx| {
3334 Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
3335 })
3336 .unwrap();
3337 fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_, _| {
3338 lsp::CodeAction {
3339 title: "Inline into all callers".to_string(),
3340 edit: Some(lsp::WorkspaceEdit {
3341 changes: Some(
3342 [
3343 (
3344 lsp::Url::from_file_path("/a/main.rs").unwrap(),
3345 vec![lsp::TextEdit::new(
3346 lsp::Range::new(
3347 lsp::Position::new(1, 22),
3348 lsp::Position::new(1, 34),
3349 ),
3350 "4".to_string(),
3351 )],
3352 ),
3353 (
3354 lsp::Url::from_file_path("/a/other.rs").unwrap(),
3355 vec![lsp::TextEdit::new(
3356 lsp::Range::new(
3357 lsp::Position::new(0, 0),
3358 lsp::Position::new(0, 27),
3359 ),
3360 "".to_string(),
3361 )],
3362 ),
3363 ]
3364 .into_iter()
3365 .collect(),
3366 ),
3367 ..Default::default()
3368 }),
3369 ..Default::default()
3370 }
3371 });
3372
3373 // After the action is confirmed, an editor containing both modified files is opened.
3374 confirm_action.await.unwrap();
3375 let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3376 workspace
3377 .active_item(cx)
3378 .unwrap()
3379 .downcast::<Editor>()
3380 .unwrap()
3381 });
3382 code_action_editor.update(cx_b, |editor, cx| {
3383 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3384 editor.undo(&Undo, cx);
3385 assert_eq!(
3386 editor.text(cx),
3387 "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3388 );
3389 editor.redo(&Redo, cx);
3390 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3391 });
3392 }
3393
3394 #[gpui::test(iterations = 10)]
3395 async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3396 cx_a.foreground().forbid_parking();
3397 let mut lang_registry = Arc::new(LanguageRegistry::test());
3398 let fs = FakeFs::new(cx_a.background());
3399 let mut path_openers_b = Vec::new();
3400 cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3401
3402 // Set up a fake language server.
3403 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3404 Arc::get_mut(&mut lang_registry)
3405 .unwrap()
3406 .add(Arc::new(Language::new(
3407 LanguageConfig {
3408 name: "Rust".into(),
3409 path_suffixes: vec!["rs".to_string()],
3410 language_server: Some(language_server_config),
3411 ..Default::default()
3412 },
3413 Some(tree_sitter_rust::language()),
3414 )));
3415
3416 // Connect to a server as 2 clients.
3417 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3418 let client_a = server.create_client(cx_a, "user_a").await;
3419 let client_b = server.create_client(cx_b, "user_b").await;
3420
3421 // Share a project as client A
3422 fs.insert_tree(
3423 "/dir",
3424 json!({
3425 ".zed.toml": r#"collaborators = ["user_b"]"#,
3426 "one.rs": "const ONE: usize = 1;",
3427 "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3428 }),
3429 )
3430 .await;
3431 let project_a = cx_a.update(|cx| {
3432 Project::local(
3433 client_a.clone(),
3434 client_a.user_store.clone(),
3435 lang_registry.clone(),
3436 fs.clone(),
3437 cx,
3438 )
3439 });
3440 let (worktree_a, _) = project_a
3441 .update(cx_a, |p, cx| {
3442 p.find_or_create_local_worktree("/dir", true, cx)
3443 })
3444 .await
3445 .unwrap();
3446 worktree_a
3447 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3448 .await;
3449 let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3450 let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3451 project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3452
3453 // Join the worktree as client B.
3454 let project_b = Project::remote(
3455 project_id,
3456 client_b.clone(),
3457 client_b.user_store.clone(),
3458 lang_registry.clone(),
3459 fs.clone(),
3460 &mut cx_b.to_async(),
3461 )
3462 .await
3463 .unwrap();
3464 let mut params = cx_b.update(WorkspaceParams::test);
3465 params.languages = lang_registry.clone();
3466 params.client = client_b.client.clone();
3467 params.user_store = client_b.user_store.clone();
3468 params.project = project_b;
3469 params.path_openers = path_openers_b.into();
3470
3471 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
3472 let editor_b = workspace_b
3473 .update(cx_b, |workspace, cx| {
3474 workspace.open_path((worktree_id, "one.rs").into(), cx)
3475 })
3476 .await
3477 .unwrap()
3478 .downcast::<Editor>()
3479 .unwrap();
3480 let mut fake_language_server = fake_language_servers.next().await.unwrap();
3481
3482 // Move cursor to a location that can be renamed.
3483 let prepare_rename = editor_b.update(cx_b, |editor, cx| {
3484 editor.select_ranges([7..7], None, cx);
3485 editor.rename(&Rename, cx).unwrap()
3486 });
3487
3488 fake_language_server
3489 .handle_request::<lsp::request::PrepareRenameRequest, _>(|params, _| {
3490 assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3491 assert_eq!(params.position, lsp::Position::new(0, 7));
3492 Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3493 lsp::Position::new(0, 6),
3494 lsp::Position::new(0, 9),
3495 )))
3496 })
3497 .next()
3498 .await
3499 .unwrap();
3500 prepare_rename.await.unwrap();
3501 editor_b.update(cx_b, |editor, cx| {
3502 let rename = editor.pending_rename().unwrap();
3503 let buffer = editor.buffer().read(cx).snapshot(cx);
3504 assert_eq!(
3505 rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3506 6..9
3507 );
3508 rename.editor.update(cx, |rename_editor, cx| {
3509 rename_editor.buffer().update(cx, |rename_buffer, cx| {
3510 rename_buffer.edit([0..3], "THREE", cx);
3511 });
3512 });
3513 });
3514
3515 let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
3516 Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3517 });
3518 fake_language_server
3519 .handle_request::<lsp::request::Rename, _>(|params, _| {
3520 assert_eq!(
3521 params.text_document_position.text_document.uri.as_str(),
3522 "file:///dir/one.rs"
3523 );
3524 assert_eq!(
3525 params.text_document_position.position,
3526 lsp::Position::new(0, 6)
3527 );
3528 assert_eq!(params.new_name, "THREE");
3529 Some(lsp::WorkspaceEdit {
3530 changes: Some(
3531 [
3532 (
3533 lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3534 vec![lsp::TextEdit::new(
3535 lsp::Range::new(
3536 lsp::Position::new(0, 6),
3537 lsp::Position::new(0, 9),
3538 ),
3539 "THREE".to_string(),
3540 )],
3541 ),
3542 (
3543 lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3544 vec![
3545 lsp::TextEdit::new(
3546 lsp::Range::new(
3547 lsp::Position::new(0, 24),
3548 lsp::Position::new(0, 27),
3549 ),
3550 "THREE".to_string(),
3551 ),
3552 lsp::TextEdit::new(
3553 lsp::Range::new(
3554 lsp::Position::new(0, 35),
3555 lsp::Position::new(0, 38),
3556 ),
3557 "THREE".to_string(),
3558 ),
3559 ],
3560 ),
3561 ]
3562 .into_iter()
3563 .collect(),
3564 ),
3565 ..Default::default()
3566 })
3567 })
3568 .next()
3569 .await
3570 .unwrap();
3571 confirm_rename.await.unwrap();
3572
3573 let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3574 workspace
3575 .active_item(cx)
3576 .unwrap()
3577 .downcast::<Editor>()
3578 .unwrap()
3579 });
3580 rename_editor.update(cx_b, |editor, cx| {
3581 assert_eq!(
3582 editor.text(cx),
3583 "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3584 );
3585 editor.undo(&Undo, cx);
3586 assert_eq!(
3587 editor.text(cx),
3588 "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3589 );
3590 editor.redo(&Redo, cx);
3591 assert_eq!(
3592 editor.text(cx),
3593 "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3594 );
3595 });
3596
3597 // Ensure temporary rename edits cannot be undone/redone.
3598 editor_b.update(cx_b, |editor, cx| {
3599 editor.undo(&Undo, cx);
3600 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3601 editor.undo(&Undo, cx);
3602 assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3603 editor.redo(&Redo, cx);
3604 assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3605 })
3606 }
3607
3608 #[gpui::test(iterations = 10)]
3609 async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3610 cx_a.foreground().forbid_parking();
3611
3612 // Connect to a server as 2 clients.
3613 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3614 let client_a = server.create_client(cx_a, "user_a").await;
3615 let client_b = server.create_client(cx_b, "user_b").await;
3616
3617 // Create an org that includes these 2 users.
3618 let db = &server.app_state.db;
3619 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3620 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3621 .await
3622 .unwrap();
3623 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3624 .await
3625 .unwrap();
3626
3627 // Create a channel that includes all the users.
3628 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3629 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3630 .await
3631 .unwrap();
3632 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3633 .await
3634 .unwrap();
3635 db.create_channel_message(
3636 channel_id,
3637 client_b.current_user_id(&cx_b),
3638 "hello A, it's B.",
3639 OffsetDateTime::now_utc(),
3640 1,
3641 )
3642 .await
3643 .unwrap();
3644
3645 let channels_a = cx_a
3646 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3647 channels_a
3648 .condition(cx_a, |list, _| list.available_channels().is_some())
3649 .await;
3650 channels_a.read_with(cx_a, |list, _| {
3651 assert_eq!(
3652 list.available_channels().unwrap(),
3653 &[ChannelDetails {
3654 id: channel_id.to_proto(),
3655 name: "test-channel".to_string()
3656 }]
3657 )
3658 });
3659 let channel_a = channels_a.update(cx_a, |this, cx| {
3660 this.get_channel(channel_id.to_proto(), cx).unwrap()
3661 });
3662 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3663 channel_a
3664 .condition(&cx_a, |channel, _| {
3665 channel_messages(channel)
3666 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3667 })
3668 .await;
3669
3670 let channels_b = cx_b
3671 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3672 channels_b
3673 .condition(cx_b, |list, _| list.available_channels().is_some())
3674 .await;
3675 channels_b.read_with(cx_b, |list, _| {
3676 assert_eq!(
3677 list.available_channels().unwrap(),
3678 &[ChannelDetails {
3679 id: channel_id.to_proto(),
3680 name: "test-channel".to_string()
3681 }]
3682 )
3683 });
3684
3685 let channel_b = channels_b.update(cx_b, |this, cx| {
3686 this.get_channel(channel_id.to_proto(), cx).unwrap()
3687 });
3688 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3689 channel_b
3690 .condition(&cx_b, |channel, _| {
3691 channel_messages(channel)
3692 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3693 })
3694 .await;
3695
3696 channel_a
3697 .update(cx_a, |channel, cx| {
3698 channel
3699 .send_message("oh, hi B.".to_string(), cx)
3700 .unwrap()
3701 .detach();
3702 let task = channel.send_message("sup".to_string(), cx).unwrap();
3703 assert_eq!(
3704 channel_messages(channel),
3705 &[
3706 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3707 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3708 ("user_a".to_string(), "sup".to_string(), true)
3709 ]
3710 );
3711 task
3712 })
3713 .await
3714 .unwrap();
3715
3716 channel_b
3717 .condition(&cx_b, |channel, _| {
3718 channel_messages(channel)
3719 == [
3720 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3721 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3722 ("user_a".to_string(), "sup".to_string(), false),
3723 ]
3724 })
3725 .await;
3726
3727 assert_eq!(
3728 server
3729 .state()
3730 .await
3731 .channel(channel_id)
3732 .unwrap()
3733 .connection_ids
3734 .len(),
3735 2
3736 );
3737 cx_b.update(|_| drop(channel_b));
3738 server
3739 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3740 .await;
3741
3742 cx_a.update(|_| drop(channel_a));
3743 server
3744 .condition(|state| state.channel(channel_id).is_none())
3745 .await;
3746 }
3747
3748 #[gpui::test(iterations = 10)]
3749 async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
3750 cx_a.foreground().forbid_parking();
3751
3752 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3753 let client_a = server.create_client(cx_a, "user_a").await;
3754
3755 let db = &server.app_state.db;
3756 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3757 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3758 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3759 .await
3760 .unwrap();
3761 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3762 .await
3763 .unwrap();
3764
3765 let channels_a = cx_a
3766 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3767 channels_a
3768 .condition(cx_a, |list, _| list.available_channels().is_some())
3769 .await;
3770 let channel_a = channels_a.update(cx_a, |this, cx| {
3771 this.get_channel(channel_id.to_proto(), cx).unwrap()
3772 });
3773
3774 // Messages aren't allowed to be too long.
3775 channel_a
3776 .update(cx_a, |channel, cx| {
3777 let long_body = "this is long.\n".repeat(1024);
3778 channel.send_message(long_body, cx).unwrap()
3779 })
3780 .await
3781 .unwrap_err();
3782
3783 // Messages aren't allowed to be blank.
3784 channel_a.update(cx_a, |channel, cx| {
3785 channel.send_message(String::new(), cx).unwrap_err()
3786 });
3787
3788 // Leading and trailing whitespace are trimmed.
3789 channel_a
3790 .update(cx_a, |channel, cx| {
3791 channel
3792 .send_message("\n surrounded by whitespace \n".to_string(), cx)
3793 .unwrap()
3794 })
3795 .await
3796 .unwrap();
3797 assert_eq!(
3798 db.get_channel_messages(channel_id, 10, None)
3799 .await
3800 .unwrap()
3801 .iter()
3802 .map(|m| &m.body)
3803 .collect::<Vec<_>>(),
3804 &["surrounded by whitespace"]
3805 );
3806 }
3807
3808 #[gpui::test(iterations = 10)]
3809 async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3810 cx_a.foreground().forbid_parking();
3811
3812 // Connect to a server as 2 clients.
3813 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3814 let client_a = server.create_client(cx_a, "user_a").await;
3815 let client_b = server.create_client(cx_b, "user_b").await;
3816 let mut status_b = client_b.status();
3817
3818 // Create an org that includes these 2 users.
3819 let db = &server.app_state.db;
3820 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3821 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3822 .await
3823 .unwrap();
3824 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3825 .await
3826 .unwrap();
3827
3828 // Create a channel that includes all the users.
3829 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3830 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3831 .await
3832 .unwrap();
3833 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3834 .await
3835 .unwrap();
3836 db.create_channel_message(
3837 channel_id,
3838 client_b.current_user_id(&cx_b),
3839 "hello A, it's B.",
3840 OffsetDateTime::now_utc(),
3841 2,
3842 )
3843 .await
3844 .unwrap();
3845
3846 let channels_a = cx_a
3847 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3848 channels_a
3849 .condition(cx_a, |list, _| list.available_channels().is_some())
3850 .await;
3851
3852 channels_a.read_with(cx_a, |list, _| {
3853 assert_eq!(
3854 list.available_channels().unwrap(),
3855 &[ChannelDetails {
3856 id: channel_id.to_proto(),
3857 name: "test-channel".to_string()
3858 }]
3859 )
3860 });
3861 let channel_a = channels_a.update(cx_a, |this, cx| {
3862 this.get_channel(channel_id.to_proto(), cx).unwrap()
3863 });
3864 channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3865 channel_a
3866 .condition(&cx_a, |channel, _| {
3867 channel_messages(channel)
3868 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3869 })
3870 .await;
3871
3872 let channels_b = cx_b
3873 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3874 channels_b
3875 .condition(cx_b, |list, _| list.available_channels().is_some())
3876 .await;
3877 channels_b.read_with(cx_b, |list, _| {
3878 assert_eq!(
3879 list.available_channels().unwrap(),
3880 &[ChannelDetails {
3881 id: channel_id.to_proto(),
3882 name: "test-channel".to_string()
3883 }]
3884 )
3885 });
3886
3887 let channel_b = channels_b.update(cx_b, |this, cx| {
3888 this.get_channel(channel_id.to_proto(), cx).unwrap()
3889 });
3890 channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3891 channel_b
3892 .condition(&cx_b, |channel, _| {
3893 channel_messages(channel)
3894 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3895 })
3896 .await;
3897
3898 // Disconnect client B, ensuring we can still access its cached channel data.
3899 server.forbid_connections();
3900 server.disconnect_client(client_b.current_user_id(&cx_b));
3901 cx_b.foreground().advance_clock(Duration::from_secs(3));
3902 while !matches!(
3903 status_b.next().await,
3904 Some(client::Status::ReconnectionError { .. })
3905 ) {}
3906
3907 channels_b.read_with(cx_b, |channels, _| {
3908 assert_eq!(
3909 channels.available_channels().unwrap(),
3910 [ChannelDetails {
3911 id: channel_id.to_proto(),
3912 name: "test-channel".to_string()
3913 }]
3914 )
3915 });
3916 channel_b.read_with(cx_b, |channel, _| {
3917 assert_eq!(
3918 channel_messages(channel),
3919 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3920 )
3921 });
3922
3923 // Send a message from client B while it is disconnected.
3924 channel_b
3925 .update(cx_b, |channel, cx| {
3926 let task = channel
3927 .send_message("can you see this?".to_string(), cx)
3928 .unwrap();
3929 assert_eq!(
3930 channel_messages(channel),
3931 &[
3932 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3933 ("user_b".to_string(), "can you see this?".to_string(), true)
3934 ]
3935 );
3936 task
3937 })
3938 .await
3939 .unwrap_err();
3940
3941 // Send a message from client A while B is disconnected.
3942 channel_a
3943 .update(cx_a, |channel, cx| {
3944 channel
3945 .send_message("oh, hi B.".to_string(), cx)
3946 .unwrap()
3947 .detach();
3948 let task = channel.send_message("sup".to_string(), cx).unwrap();
3949 assert_eq!(
3950 channel_messages(channel),
3951 &[
3952 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3953 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3954 ("user_a".to_string(), "sup".to_string(), true)
3955 ]
3956 );
3957 task
3958 })
3959 .await
3960 .unwrap();
3961
3962 // Give client B a chance to reconnect.
3963 server.allow_connections();
3964 cx_b.foreground().advance_clock(Duration::from_secs(10));
3965
3966 // Verify that B sees the new messages upon reconnection, as well as the message client B
3967 // sent while offline.
3968 channel_b
3969 .condition(&cx_b, |channel, _| {
3970 channel_messages(channel)
3971 == [
3972 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3973 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3974 ("user_a".to_string(), "sup".to_string(), false),
3975 ("user_b".to_string(), "can you see this?".to_string(), false),
3976 ]
3977 })
3978 .await;
3979
3980 // Ensure client A and B can communicate normally after reconnection.
3981 channel_a
3982 .update(cx_a, |channel, cx| {
3983 channel.send_message("you online?".to_string(), cx).unwrap()
3984 })
3985 .await
3986 .unwrap();
3987 channel_b
3988 .condition(&cx_b, |channel, _| {
3989 channel_messages(channel)
3990 == [
3991 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3992 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3993 ("user_a".to_string(), "sup".to_string(), false),
3994 ("user_b".to_string(), "can you see this?".to_string(), false),
3995 ("user_a".to_string(), "you online?".to_string(), false),
3996 ]
3997 })
3998 .await;
3999
4000 channel_b
4001 .update(cx_b, |channel, cx| {
4002 channel.send_message("yep".to_string(), cx).unwrap()
4003 })
4004 .await
4005 .unwrap();
4006 channel_a
4007 .condition(&cx_a, |channel, _| {
4008 channel_messages(channel)
4009 == [
4010 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4011 ("user_a".to_string(), "oh, hi B.".to_string(), false),
4012 ("user_a".to_string(), "sup".to_string(), false),
4013 ("user_b".to_string(), "can you see this?".to_string(), false),
4014 ("user_a".to_string(), "you online?".to_string(), false),
4015 ("user_b".to_string(), "yep".to_string(), false),
4016 ]
4017 })
4018 .await;
4019 }
4020
4021 #[gpui::test(iterations = 10)]
4022 async fn test_contacts(
4023 cx_a: &mut TestAppContext,
4024 cx_b: &mut TestAppContext,
4025 cx_c: &mut TestAppContext,
4026 ) {
4027 cx_a.foreground().forbid_parking();
4028 let lang_registry = Arc::new(LanguageRegistry::test());
4029 let fs = FakeFs::new(cx_a.background());
4030
4031 // Connect to a server as 3 clients.
4032 let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4033 let client_a = server.create_client(cx_a, "user_a").await;
4034 let client_b = server.create_client(cx_b, "user_b").await;
4035 let client_c = server.create_client(cx_c, "user_c").await;
4036
4037 // Share a worktree as client A.
4038 fs.insert_tree(
4039 "/a",
4040 json!({
4041 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4042 }),
4043 )
4044 .await;
4045
4046 let project_a = cx_a.update(|cx| {
4047 Project::local(
4048 client_a.clone(),
4049 client_a.user_store.clone(),
4050 lang_registry.clone(),
4051 fs.clone(),
4052 cx,
4053 )
4054 });
4055 let (worktree_a, _) = project_a
4056 .update(cx_a, |p, cx| {
4057 p.find_or_create_local_worktree("/a", true, cx)
4058 })
4059 .await
4060 .unwrap();
4061 worktree_a
4062 .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4063 .await;
4064
4065 client_a
4066 .user_store
4067 .condition(&cx_a, |user_store, _| {
4068 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4069 })
4070 .await;
4071 client_b
4072 .user_store
4073 .condition(&cx_b, |user_store, _| {
4074 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4075 })
4076 .await;
4077 client_c
4078 .user_store
4079 .condition(&cx_c, |user_store, _| {
4080 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4081 })
4082 .await;
4083
4084 let project_id = project_a
4085 .update(cx_a, |project, _| project.next_remote_id())
4086 .await;
4087 project_a
4088 .update(cx_a, |project, cx| project.share(cx))
4089 .await
4090 .unwrap();
4091
4092 let _project_b = Project::remote(
4093 project_id,
4094 client_b.clone(),
4095 client_b.user_store.clone(),
4096 lang_registry.clone(),
4097 fs.clone(),
4098 &mut cx_b.to_async(),
4099 )
4100 .await
4101 .unwrap();
4102
4103 client_a
4104 .user_store
4105 .condition(&cx_a, |user_store, _| {
4106 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4107 })
4108 .await;
4109 client_b
4110 .user_store
4111 .condition(&cx_b, |user_store, _| {
4112 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4113 })
4114 .await;
4115 client_c
4116 .user_store
4117 .condition(&cx_c, |user_store, _| {
4118 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4119 })
4120 .await;
4121
4122 project_a
4123 .condition(&cx_a, |project, _| {
4124 project.collaborators().contains_key(&client_b.peer_id)
4125 })
4126 .await;
4127
4128 cx_a.update(move |_| drop(project_a));
4129 client_a
4130 .user_store
4131 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4132 .await;
4133 client_b
4134 .user_store
4135 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4136 .await;
4137 client_c
4138 .user_store
4139 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4140 .await;
4141
4142 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
4143 user_store
4144 .contacts()
4145 .iter()
4146 .map(|contact| {
4147 let worktrees = contact
4148 .projects
4149 .iter()
4150 .map(|p| {
4151 (
4152 p.worktree_root_names[0].as_str(),
4153 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4154 )
4155 })
4156 .collect();
4157 (contact.user.github_login.as_str(), worktrees)
4158 })
4159 .collect()
4160 }
4161 }
4162
4163 #[gpui::test(iterations = 100)]
4164 async fn test_random_collaboration(cx: &mut TestAppContext, rng: StdRng) {
4165 cx.foreground().forbid_parking();
4166 let max_peers = env::var("MAX_PEERS")
4167 .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
4168 .unwrap_or(5);
4169 let max_operations = env::var("OPERATIONS")
4170 .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
4171 .unwrap_or(10);
4172
4173 let rng = Arc::new(Mutex::new(rng));
4174
4175 let guest_lang_registry = Arc::new(LanguageRegistry::test());
4176 let (language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
4177
4178 let fs = FakeFs::new(cx.background());
4179 fs.insert_tree(
4180 "/_collab",
4181 json!({
4182 ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
4183 }),
4184 )
4185 .await;
4186
4187 let operations = Rc::new(Cell::new(0));
4188 let mut server = TestServer::start(cx.foreground(), cx.background()).await;
4189 let mut clients = Vec::new();
4190
4191 let mut next_entity_id = 100000;
4192 let mut host_cx = TestAppContext::new(
4193 cx.foreground_platform(),
4194 cx.platform(),
4195 cx.foreground(),
4196 cx.background(),
4197 cx.font_cache(),
4198 cx.leak_detector(),
4199 next_entity_id,
4200 );
4201 let host = server.create_client(&mut host_cx, "host").await;
4202 let host_project = host_cx.update(|cx| {
4203 Project::local(
4204 host.client.clone(),
4205 host.user_store.clone(),
4206 Arc::new(LanguageRegistry::test()),
4207 fs.clone(),
4208 cx,
4209 )
4210 });
4211 let host_project_id = host_project
4212 .update(&mut host_cx, |p, _| p.next_remote_id())
4213 .await;
4214
4215 let (collab_worktree, _) = host_project
4216 .update(&mut host_cx, |project, cx| {
4217 project.find_or_create_local_worktree("/_collab", true, cx)
4218 })
4219 .await
4220 .unwrap();
4221 collab_worktree
4222 .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
4223 .await;
4224 host_project
4225 .update(&mut host_cx, |project, cx| project.share(cx))
4226 .await
4227 .unwrap();
4228
4229 clients.push(cx.foreground().spawn(host.simulate_host(
4230 host_project,
4231 language_server_config,
4232 operations.clone(),
4233 max_operations,
4234 rng.clone(),
4235 host_cx,
4236 )));
4237
4238 while operations.get() < max_operations {
4239 cx.background().simulate_random_delay().await;
4240 if clients.len() >= max_peers {
4241 break;
4242 } else if rng.lock().gen_bool(0.05) {
4243 operations.set(operations.get() + 1);
4244
4245 let guest_id = clients.len();
4246 log::info!("Adding guest {}", guest_id);
4247 next_entity_id += 100000;
4248 let mut guest_cx = TestAppContext::new(
4249 cx.foreground_platform(),
4250 cx.platform(),
4251 cx.foreground(),
4252 cx.background(),
4253 cx.font_cache(),
4254 cx.leak_detector(),
4255 next_entity_id,
4256 );
4257 let guest = server
4258 .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
4259 .await;
4260 let guest_project = Project::remote(
4261 host_project_id,
4262 guest.client.clone(),
4263 guest.user_store.clone(),
4264 guest_lang_registry.clone(),
4265 FakeFs::new(cx.background()),
4266 &mut guest_cx.to_async(),
4267 )
4268 .await
4269 .unwrap();
4270 clients.push(cx.foreground().spawn(guest.simulate_guest(
4271 guest_id,
4272 guest_project,
4273 operations.clone(),
4274 max_operations,
4275 rng.clone(),
4276 guest_cx,
4277 )));
4278
4279 log::info!("Guest {} added", guest_id);
4280 }
4281 }
4282
4283 let mut clients = futures::future::join_all(clients).await;
4284 cx.foreground().run_until_parked();
4285
4286 let (host_client, mut host_cx) = clients.remove(0);
4287 let host_project = host_client.project.as_ref().unwrap();
4288 let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
4289 project
4290 .worktrees(cx)
4291 .map(|worktree| {
4292 let snapshot = worktree.read(cx).snapshot();
4293 (snapshot.id(), snapshot)
4294 })
4295 .collect::<BTreeMap<_, _>>()
4296 });
4297
4298 host_client
4299 .project
4300 .as_ref()
4301 .unwrap()
4302 .read_with(&host_cx, |project, cx| project.check_invariants(cx));
4303
4304 for (guest_client, mut guest_cx) in clients.into_iter() {
4305 let guest_id = guest_client.client.id();
4306 let worktree_snapshots =
4307 guest_client
4308 .project
4309 .as_ref()
4310 .unwrap()
4311 .read_with(&guest_cx, |project, cx| {
4312 project
4313 .worktrees(cx)
4314 .map(|worktree| {
4315 let worktree = worktree.read(cx);
4316 (worktree.id(), worktree.snapshot())
4317 })
4318 .collect::<BTreeMap<_, _>>()
4319 });
4320
4321 assert_eq!(
4322 worktree_snapshots.keys().collect::<Vec<_>>(),
4323 host_worktree_snapshots.keys().collect::<Vec<_>>(),
4324 "guest {} has different worktrees than the host",
4325 guest_id
4326 );
4327 for (id, host_snapshot) in &host_worktree_snapshots {
4328 let guest_snapshot = &worktree_snapshots[id];
4329 assert_eq!(
4330 guest_snapshot.root_name(),
4331 host_snapshot.root_name(),
4332 "guest {} has different root name than the host for worktree {}",
4333 guest_id,
4334 id
4335 );
4336 assert_eq!(
4337 guest_snapshot.entries(false).collect::<Vec<_>>(),
4338 host_snapshot.entries(false).collect::<Vec<_>>(),
4339 "guest {} has different snapshot than the host for worktree {}",
4340 guest_id,
4341 id
4342 );
4343 }
4344
4345 guest_client
4346 .project
4347 .as_ref()
4348 .unwrap()
4349 .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
4350
4351 for guest_buffer in &guest_client.buffers {
4352 let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
4353 let host_buffer = host_project.read_with(&host_cx, |project, cx| {
4354 project.buffer_for_id(buffer_id, cx).expect(&format!(
4355 "host does not have buffer for guest:{}, peer:{}, id:{}",
4356 guest_id, guest_client.peer_id, buffer_id
4357 ))
4358 });
4359 let path = host_buffer
4360 .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
4361
4362 assert_eq!(
4363 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
4364 0,
4365 "guest {}, buffer {}, path {:?} has deferred operations",
4366 guest_id,
4367 buffer_id,
4368 path,
4369 );
4370 assert_eq!(
4371 guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
4372 host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
4373 "guest {}, buffer {}, path {:?}, differs from the host's buffer",
4374 guest_id,
4375 buffer_id,
4376 path
4377 );
4378 }
4379
4380 guest_cx.update(|_| drop(guest_client));
4381 }
4382
4383 host_cx.update(|_| drop(host_client));
4384 }
4385
4386 struct TestServer {
4387 peer: Arc<Peer>,
4388 app_state: Arc<AppState>,
4389 server: Arc<Server>,
4390 foreground: Rc<executor::Foreground>,
4391 notifications: mpsc::UnboundedReceiver<()>,
4392 connection_killers: Arc<Mutex<HashMap<UserId, barrier::Sender>>>,
4393 forbid_connections: Arc<AtomicBool>,
4394 _test_db: TestDb,
4395 }
4396
4397 impl TestServer {
4398 async fn start(
4399 foreground: Rc<executor::Foreground>,
4400 background: Arc<executor::Background>,
4401 ) -> Self {
4402 let test_db = TestDb::fake(background);
4403 let app_state = Self::build_app_state(&test_db).await;
4404 let peer = Peer::new();
4405 let notifications = mpsc::unbounded();
4406 let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
4407 Self {
4408 peer,
4409 app_state,
4410 server,
4411 foreground,
4412 notifications: notifications.1,
4413 connection_killers: Default::default(),
4414 forbid_connections: Default::default(),
4415 _test_db: test_db,
4416 }
4417 }
4418
4419 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
4420 let http = FakeHttpClient::with_404_response();
4421 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
4422 let client_name = name.to_string();
4423 let mut client = Client::new(http.clone());
4424 let server = self.server.clone();
4425 let connection_killers = self.connection_killers.clone();
4426 let forbid_connections = self.forbid_connections.clone();
4427 let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
4428
4429 Arc::get_mut(&mut client)
4430 .unwrap()
4431 .override_authenticate(move |cx| {
4432 cx.spawn(|_| async move {
4433 let access_token = "the-token".to_string();
4434 Ok(Credentials {
4435 user_id: user_id.0 as u64,
4436 access_token,
4437 })
4438 })
4439 })
4440 .override_establish_connection(move |credentials, cx| {
4441 assert_eq!(credentials.user_id, user_id.0 as u64);
4442 assert_eq!(credentials.access_token, "the-token");
4443
4444 let server = server.clone();
4445 let connection_killers = connection_killers.clone();
4446 let forbid_connections = forbid_connections.clone();
4447 let client_name = client_name.clone();
4448 let connection_id_tx = connection_id_tx.clone();
4449 cx.spawn(move |cx| async move {
4450 if forbid_connections.load(SeqCst) {
4451 Err(EstablishConnectionError::other(anyhow!(
4452 "server is forbidding connections"
4453 )))
4454 } else {
4455 let (client_conn, server_conn, kill_conn) =
4456 Connection::in_memory(cx.background());
4457 connection_killers.lock().insert(user_id, kill_conn);
4458 cx.background()
4459 .spawn(server.handle_connection(
4460 server_conn,
4461 client_name,
4462 user_id,
4463 Some(connection_id_tx),
4464 cx.background(),
4465 ))
4466 .detach();
4467 Ok(client_conn)
4468 }
4469 })
4470 });
4471
4472 client
4473 .authenticate_and_connect(&cx.to_async())
4474 .await
4475 .unwrap();
4476
4477 Channel::init(&client);
4478 Project::init(&client);
4479
4480 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
4481 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
4482
4483 let client = TestClient {
4484 client,
4485 peer_id,
4486 user_store,
4487 project: Default::default(),
4488 buffers: Default::default(),
4489 };
4490 client.wait_for_current_user(cx).await;
4491 client
4492 }
4493
4494 fn disconnect_client(&self, user_id: UserId) {
4495 self.connection_killers.lock().remove(&user_id);
4496 }
4497
4498 fn forbid_connections(&self) {
4499 self.forbid_connections.store(true, SeqCst);
4500 }
4501
4502 fn allow_connections(&self) {
4503 self.forbid_connections.store(false, SeqCst);
4504 }
4505
4506 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
4507 let mut config = Config::default();
4508 config.session_secret = "a".repeat(32);
4509 config.database_url = test_db.url.clone();
4510 let github_client = github::AppClient::test();
4511 Arc::new(AppState {
4512 db: test_db.db().clone(),
4513 handlebars: Default::default(),
4514 auth_client: auth::build_client("", ""),
4515 repo_client: github::RepoClient::test(&github_client),
4516 github_client,
4517 config,
4518 })
4519 }
4520
4521 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
4522 self.server.store.read()
4523 }
4524
4525 async fn condition<F>(&mut self, mut predicate: F)
4526 where
4527 F: FnMut(&Store) -> bool,
4528 {
4529 async_std::future::timeout(Duration::from_millis(500), async {
4530 while !(predicate)(&*self.server.store.read()) {
4531 self.foreground.start_waiting();
4532 self.notifications.next().await;
4533 self.foreground.finish_waiting();
4534 }
4535 })
4536 .await
4537 .expect("condition timed out");
4538 }
4539 }
4540
4541 impl Drop for TestServer {
4542 fn drop(&mut self) {
4543 self.peer.reset();
4544 }
4545 }
4546
4547 struct TestClient {
4548 client: Arc<Client>,
4549 pub peer_id: PeerId,
4550 pub user_store: ModelHandle<UserStore>,
4551 project: Option<ModelHandle<Project>>,
4552 buffers: HashSet<ModelHandle<language::Buffer>>,
4553 }
4554
4555 impl Deref for TestClient {
4556 type Target = Arc<Client>;
4557
4558 fn deref(&self) -> &Self::Target {
4559 &self.client
4560 }
4561 }
4562
4563 impl TestClient {
4564 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
4565 UserId::from_proto(
4566 self.user_store
4567 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
4568 )
4569 }
4570
4571 async fn wait_for_current_user(&self, cx: &TestAppContext) {
4572 let mut authed_user = self
4573 .user_store
4574 .read_with(cx, |user_store, _| user_store.watch_current_user());
4575 while authed_user.next().await.unwrap().is_none() {}
4576 }
4577
4578 fn simulate_host(
4579 mut self,
4580 project: ModelHandle<Project>,
4581 mut language_server_config: LanguageServerConfig,
4582 operations: Rc<Cell<usize>>,
4583 max_operations: usize,
4584 rng: Arc<Mutex<StdRng>>,
4585 mut cx: TestAppContext,
4586 ) -> impl Future<Output = (Self, TestAppContext)> {
4587 let files: Arc<Mutex<Vec<PathBuf>>> = Default::default();
4588
4589 // Set up a fake language server.
4590 language_server_config.set_fake_initializer({
4591 let rng = rng.clone();
4592 let files = files.clone();
4593 let project = project.downgrade();
4594 move |fake_server| {
4595 fake_server.handle_request::<lsp::request::Completion, _>(|_, _| {
4596 Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
4597 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
4598 range: lsp::Range::new(
4599 lsp::Position::new(0, 0),
4600 lsp::Position::new(0, 0),
4601 ),
4602 new_text: "the-new-text".to_string(),
4603 })),
4604 ..Default::default()
4605 }]))
4606 });
4607
4608 fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_, _| {
4609 Some(vec![lsp::CodeActionOrCommand::CodeAction(
4610 lsp::CodeAction {
4611 title: "the-code-action".to_string(),
4612 ..Default::default()
4613 },
4614 )])
4615 });
4616
4617 fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(
4618 |params, _| {
4619 Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4620 params.position,
4621 params.position,
4622 )))
4623 },
4624 );
4625
4626 fake_server.handle_request::<lsp::request::GotoDefinition, _>({
4627 let files = files.clone();
4628 let rng = rng.clone();
4629 move |_, _| {
4630 let files = files.lock();
4631 let mut rng = rng.lock();
4632 let count = rng.gen_range::<usize, _>(1..3);
4633 let files = (0..count)
4634 .map(|_| files.choose(&mut *rng).unwrap())
4635 .collect::<Vec<_>>();
4636 log::info!("LSP: Returning definitions in files {:?}", &files);
4637 Some(lsp::GotoDefinitionResponse::Array(
4638 files
4639 .into_iter()
4640 .map(|file| lsp::Location {
4641 uri: lsp::Url::from_file_path(file).unwrap(),
4642 range: Default::default(),
4643 })
4644 .collect(),
4645 ))
4646 }
4647 });
4648
4649 fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _>({
4650 let rng = rng.clone();
4651 let project = project.clone();
4652 move |params, mut cx| {
4653 if let Some(project) = project.upgrade(&cx) {
4654 project.update(&mut cx, |project, cx| {
4655 let path = params
4656 .text_document_position_params
4657 .text_document
4658 .uri
4659 .to_file_path()
4660 .unwrap();
4661 let (worktree, relative_path) =
4662 project.find_local_worktree(&path, cx)?;
4663 let project_path =
4664 ProjectPath::from((worktree.read(cx).id(), relative_path));
4665 let buffer =
4666 project.get_open_buffer(&project_path, cx)?.read(cx);
4667
4668 let mut highlights = Vec::new();
4669 let highlight_count = rng.lock().gen_range(1..=5);
4670 let mut prev_end = 0;
4671 for _ in 0..highlight_count {
4672 let range =
4673 buffer.random_byte_range(prev_end, &mut *rng.lock());
4674 let start = buffer
4675 .offset_to_point_utf16(range.start)
4676 .to_lsp_position();
4677 let end = buffer
4678 .offset_to_point_utf16(range.end)
4679 .to_lsp_position();
4680 highlights.push(lsp::DocumentHighlight {
4681 range: lsp::Range::new(start, end),
4682 kind: Some(lsp::DocumentHighlightKind::READ),
4683 });
4684 prev_end = range.end;
4685 }
4686 Some(highlights)
4687 })
4688 } else {
4689 None
4690 }
4691 }
4692 });
4693 }
4694 });
4695
4696 project.update(&mut cx, |project, _| {
4697 project.languages().add(Arc::new(Language::new(
4698 LanguageConfig {
4699 name: "Rust".into(),
4700 path_suffixes: vec!["rs".to_string()],
4701 language_server: Some(language_server_config),
4702 ..Default::default()
4703 },
4704 None,
4705 )));
4706 });
4707
4708 async move {
4709 let fs = project.read_with(&cx, |project, _| project.fs().clone());
4710 while operations.get() < max_operations {
4711 operations.set(operations.get() + 1);
4712
4713 let distribution = rng.lock().gen_range::<usize, _>(0..100);
4714 match distribution {
4715 0..=20 if !files.lock().is_empty() => {
4716 let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4717 let mut path = path.as_path();
4718 while let Some(parent_path) = path.parent() {
4719 path = parent_path;
4720 if rng.lock().gen() {
4721 break;
4722 }
4723 }
4724
4725 log::info!("Host: find/create local worktree {:?}", path);
4726 let find_or_create_worktree = project.update(&mut cx, |project, cx| {
4727 project.find_or_create_local_worktree(path, true, cx)
4728 });
4729 let find_or_create_worktree = async move {
4730 find_or_create_worktree.await.unwrap();
4731 };
4732 if rng.lock().gen() {
4733 cx.background().spawn(find_or_create_worktree).detach();
4734 } else {
4735 find_or_create_worktree.await;
4736 }
4737 }
4738 10..=80 if !files.lock().is_empty() => {
4739 let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4740 let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4741 let (worktree, path) = project
4742 .update(&mut cx, |project, cx| {
4743 project.find_or_create_local_worktree(
4744 file.clone(),
4745 true,
4746 cx,
4747 )
4748 })
4749 .await
4750 .unwrap();
4751 let project_path =
4752 worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4753 log::info!(
4754 "Host: opening path {:?}, worktree {}, relative_path {:?}",
4755 file,
4756 project_path.0,
4757 project_path.1
4758 );
4759 let buffer = project
4760 .update(&mut cx, |project, cx| {
4761 project.open_buffer(project_path, cx)
4762 })
4763 .await
4764 .unwrap();
4765 self.buffers.insert(buffer.clone());
4766 buffer
4767 } else {
4768 self.buffers
4769 .iter()
4770 .choose(&mut *rng.lock())
4771 .unwrap()
4772 .clone()
4773 };
4774
4775 if rng.lock().gen_bool(0.1) {
4776 cx.update(|cx| {
4777 log::info!(
4778 "Host: dropping buffer {:?}",
4779 buffer.read(cx).file().unwrap().full_path(cx)
4780 );
4781 self.buffers.remove(&buffer);
4782 drop(buffer);
4783 });
4784 } else {
4785 buffer.update(&mut cx, |buffer, cx| {
4786 log::info!(
4787 "Host: updating buffer {:?} ({})",
4788 buffer.file().unwrap().full_path(cx),
4789 buffer.remote_id()
4790 );
4791 buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4792 });
4793 }
4794 }
4795 _ => loop {
4796 let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
4797 let mut path = PathBuf::new();
4798 path.push("/");
4799 for _ in 0..path_component_count {
4800 let letter = rng.lock().gen_range(b'a'..=b'z');
4801 path.push(std::str::from_utf8(&[letter]).unwrap());
4802 }
4803 path.set_extension("rs");
4804 let parent_path = path.parent().unwrap();
4805
4806 log::info!("Host: creating file {:?}", path,);
4807
4808 if fs.create_dir(&parent_path).await.is_ok()
4809 && fs.create_file(&path, Default::default()).await.is_ok()
4810 {
4811 files.lock().push(path);
4812 break;
4813 } else {
4814 log::info!("Host: cannot create file");
4815 }
4816 },
4817 }
4818
4819 cx.background().simulate_random_delay().await;
4820 }
4821
4822 log::info!("Host done");
4823
4824 self.project = Some(project);
4825 (self, cx)
4826 }
4827 }
4828
4829 pub async fn simulate_guest(
4830 mut self,
4831 guest_id: usize,
4832 project: ModelHandle<Project>,
4833 operations: Rc<Cell<usize>>,
4834 max_operations: usize,
4835 rng: Arc<Mutex<StdRng>>,
4836 mut cx: TestAppContext,
4837 ) -> (Self, TestAppContext) {
4838 while operations.get() < max_operations {
4839 let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4840 let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4841 project
4842 .worktrees(&cx)
4843 .filter(|worktree| {
4844 let worktree = worktree.read(cx);
4845 worktree.is_visible()
4846 && worktree.entries(false).any(|e| e.is_file())
4847 })
4848 .choose(&mut *rng.lock())
4849 }) {
4850 worktree
4851 } else {
4852 cx.background().simulate_random_delay().await;
4853 continue;
4854 };
4855
4856 operations.set(operations.get() + 1);
4857 let (worktree_root_name, project_path) =
4858 worktree.read_with(&cx, |worktree, _| {
4859 let entry = worktree
4860 .entries(false)
4861 .filter(|e| e.is_file())
4862 .choose(&mut *rng.lock())
4863 .unwrap();
4864 (
4865 worktree.root_name().to_string(),
4866 (worktree.id(), entry.path.clone()),
4867 )
4868 });
4869 log::info!(
4870 "Guest {}: opening path {:?} in worktree {} ({})",
4871 guest_id,
4872 project_path.1,
4873 project_path.0,
4874 worktree_root_name,
4875 );
4876 let buffer = project
4877 .update(&mut cx, |project, cx| {
4878 project.open_buffer(project_path.clone(), cx)
4879 })
4880 .await
4881 .unwrap();
4882 log::info!(
4883 "Guest {}: opened path {:?} in worktree {} ({}) with buffer id {}",
4884 guest_id,
4885 project_path.1,
4886 project_path.0,
4887 worktree_root_name,
4888 buffer.read_with(&cx, |buffer, _| buffer.remote_id())
4889 );
4890 self.buffers.insert(buffer.clone());
4891 buffer
4892 } else {
4893 operations.set(operations.get() + 1);
4894
4895 self.buffers
4896 .iter()
4897 .choose(&mut *rng.lock())
4898 .unwrap()
4899 .clone()
4900 };
4901
4902 let choice = rng.lock().gen_range(0..100);
4903 match choice {
4904 0..=9 => {
4905 cx.update(|cx| {
4906 log::info!(
4907 "Guest {}: dropping buffer {:?}",
4908 guest_id,
4909 buffer.read(cx).file().unwrap().full_path(cx)
4910 );
4911 self.buffers.remove(&buffer);
4912 drop(buffer);
4913 });
4914 }
4915 10..=19 => {
4916 let completions = project.update(&mut cx, |project, cx| {
4917 log::info!(
4918 "Guest {}: requesting completions for buffer {} ({:?})",
4919 guest_id,
4920 buffer.read(cx).remote_id(),
4921 buffer.read(cx).file().unwrap().full_path(cx)
4922 );
4923 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4924 project.completions(&buffer, offset, cx)
4925 });
4926 let completions = cx.background().spawn(async move {
4927 completions.await.expect("completions request failed");
4928 });
4929 if rng.lock().gen_bool(0.3) {
4930 log::info!("Guest {}: detaching completions request", guest_id);
4931 completions.detach();
4932 } else {
4933 completions.await;
4934 }
4935 }
4936 20..=29 => {
4937 let code_actions = project.update(&mut cx, |project, cx| {
4938 log::info!(
4939 "Guest {}: requesting code actions for buffer {} ({:?})",
4940 guest_id,
4941 buffer.read(cx).remote_id(),
4942 buffer.read(cx).file().unwrap().full_path(cx)
4943 );
4944 let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
4945 project.code_actions(&buffer, range, cx)
4946 });
4947 let code_actions = cx.background().spawn(async move {
4948 code_actions.await.expect("code actions request failed");
4949 });
4950 if rng.lock().gen_bool(0.3) {
4951 log::info!("Guest {}: detaching code actions request", guest_id);
4952 code_actions.detach();
4953 } else {
4954 code_actions.await;
4955 }
4956 }
4957 30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
4958 let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
4959 log::info!(
4960 "Guest {}: saving buffer {} ({:?})",
4961 guest_id,
4962 buffer.remote_id(),
4963 buffer.file().unwrap().full_path(cx)
4964 );
4965 (buffer.version(), buffer.save(cx))
4966 });
4967 let save = cx.background().spawn(async move {
4968 let (saved_version, _) = save.await.expect("save request failed");
4969 assert!(saved_version.observed_all(&requested_version));
4970 });
4971 if rng.lock().gen_bool(0.3) {
4972 log::info!("Guest {}: detaching save request", guest_id);
4973 save.detach();
4974 } else {
4975 save.await;
4976 }
4977 }
4978 40..=44 => {
4979 let prepare_rename = project.update(&mut cx, |project, cx| {
4980 log::info!(
4981 "Guest {}: preparing rename for buffer {} ({:?})",
4982 guest_id,
4983 buffer.read(cx).remote_id(),
4984 buffer.read(cx).file().unwrap().full_path(cx)
4985 );
4986 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4987 project.prepare_rename(buffer, offset, cx)
4988 });
4989 let prepare_rename = cx.background().spawn(async move {
4990 prepare_rename.await.expect("prepare rename request failed");
4991 });
4992 if rng.lock().gen_bool(0.3) {
4993 log::info!("Guest {}: detaching prepare rename request", guest_id);
4994 prepare_rename.detach();
4995 } else {
4996 prepare_rename.await;
4997 }
4998 }
4999 45..=49 => {
5000 let definitions = project.update(&mut cx, |project, cx| {
5001 log::info!(
5002 "Guest {}: requesting definitions for buffer {} ({:?})",
5003 guest_id,
5004 buffer.read(cx).remote_id(),
5005 buffer.read(cx).file().unwrap().full_path(cx)
5006 );
5007 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5008 project.definition(&buffer, offset, cx)
5009 });
5010 let definitions = cx.background().spawn(async move {
5011 definitions.await.expect("definitions request failed")
5012 });
5013 if rng.lock().gen_bool(0.3) {
5014 log::info!("Guest {}: detaching definitions request", guest_id);
5015 definitions.detach();
5016 } else {
5017 self.buffers
5018 .extend(definitions.await.into_iter().map(|loc| loc.buffer));
5019 }
5020 }
5021 50..=54 => {
5022 let highlights = project.update(&mut cx, |project, cx| {
5023 log::info!(
5024 "Guest {}: requesting highlights for buffer {} ({:?})",
5025 guest_id,
5026 buffer.read(cx).remote_id(),
5027 buffer.read(cx).file().unwrap().full_path(cx)
5028 );
5029 let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5030 project.document_highlights(&buffer, offset, cx)
5031 });
5032 let highlights = cx.background().spawn(async move {
5033 highlights.await.expect("highlights request failed");
5034 });
5035 if rng.lock().gen_bool(0.3) {
5036 log::info!("Guest {}: detaching highlights request", guest_id);
5037 highlights.detach();
5038 } else {
5039 highlights.await;
5040 }
5041 }
5042 55..=59 => {
5043 let search = project.update(&mut cx, |project, cx| {
5044 let query = rng.lock().gen_range('a'..='z');
5045 log::info!("Guest {}: project-wide search {:?}", guest_id, query);
5046 project.search(SearchQuery::text(query, false, false), cx)
5047 });
5048 let search = cx
5049 .background()
5050 .spawn(async move { search.await.expect("search request failed") });
5051 if rng.lock().gen_bool(0.3) {
5052 log::info!("Guest {}: detaching search request", guest_id);
5053 search.detach();
5054 } else {
5055 self.buffers.extend(search.await.into_keys());
5056 }
5057 }
5058 _ => {
5059 buffer.update(&mut cx, |buffer, cx| {
5060 log::info!(
5061 "Guest {}: updating buffer {} ({:?})",
5062 guest_id,
5063 buffer.remote_id(),
5064 buffer.file().unwrap().full_path(cx)
5065 );
5066 buffer.randomly_edit(&mut *rng.lock(), 5, cx)
5067 });
5068 }
5069 }
5070 cx.background().simulate_random_delay().await;
5071 }
5072
5073 log::info!("Guest {} done", guest_id);
5074
5075 self.project = Some(project);
5076 (self, cx)
5077 }
5078 }
5079
5080 impl Drop for TestClient {
5081 fn drop(&mut self) {
5082 self.client.tear_down();
5083 }
5084 }
5085
5086 impl Executor for Arc<gpui::executor::Background> {
5087 type Timer = gpui::executor::Timer;
5088
5089 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
5090 self.spawn(future).detach();
5091 }
5092
5093 fn timer(&self, duration: Duration) -> Self::Timer {
5094 self.as_ref().timer(duration)
5095 }
5096 }
5097
5098 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
5099 channel
5100 .messages()
5101 .cursor::<()>()
5102 .map(|m| {
5103 (
5104 m.sender.github_login.clone(),
5105 m.body.clone(),
5106 m.is_pending(),
5107 )
5108 })
5109 .collect()
5110 }
5111
5112 struct EmptyView;
5113
5114 impl gpui::Entity for EmptyView {
5115 type Event = ();
5116 }
5117
5118 impl gpui::View for EmptyView {
5119 fn ui_name() -> &'static str {
5120 "empty view"
5121 }
5122
5123 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
5124 gpui::Element::boxed(gpui::elements::Empty)
5125 }
5126 }
5127}